Objetive of the project could be finding a regularisation of the latent space to extract high-level features of the signals obtained from the Sauron Semiconductors placed inside the AGATA.

Things to do:
-Solve errors of Classifier.py.
-Find regularisation strategies from the book ML for Physicist in this unsupervised autoencoder.
-Add .root dataset of Daniele Mengoni to the code.
-Improve the noise reduction by including other techniques to the dataset (e.g. cross-validation).
-Add axis units.


In [1]:
#Loading the libraries and compiling the c++ module

from torch.utils.cpp_extension import load
import torch
import matplotlib.pyplot as plt
import numpy as np
import os
import glob
import matplotlib
import pytorch_model_summary as pms

from autoencoder import *

#Compile the c++ module
pid_dataset = load(name="PIDDataset",
                   sources=["PidDataset.cpp","ReadRawData.cpp","ReadBinaryData.cpp", "mwdlib.cpp"],
                   verbose=True,
                   extra_cflags=['-O3'],
                  )

# TO SHOW INTERACTIVE PLOT
%matplotlib widget

torch.set_float32_matmul_precision('medium')

Using /home/luna/.cache/torch_extensions/py311_cu118 as PyTorch extensions root...
Emitting ninja build file /home/luna/.cache/torch_extensions/py311_cu118/PIDDataset/build.ninja...
Building extension module PIDDataset...
Allowing ninja to set a default number of workers... (overridable by setting the environment variable MAX_JOBS=N)


ninja: no work to do.


Loading extension module PIDDataset...


In [2]:
#Define NN parameters


derivative_order=1
normalized=True
convolutional=True #I can try to put this to False!!!
variational=False #Variational autoencoder.
dropout_p=0.0 #In big NN we can put this parameter to 50% to avoid overfitting, by eliminating neurons. In this case, a smaller NN, it is not that useful.
latent_dim_list = [2]
nLayers_list = [4]
#act_fn = torch.nn.ReLU
act_fn = torch.nn.GELU #Activation function

#Training parameters
batch_size = 1000 #this was working. 
learning_rate = 5e-3 #If it's too big, it may go out of the gradient descent.
#batch_size = 5000 #this only makes things slow
retrain=False #??
version_nr = 0
EarlyStoppingNr = 15
max_epochs = 300
exclude_outliers = False
device = "cpu"

#Dataset
sauronData = False

if sauronData:
    trainDatasetFile = "./DataFusEvSauron/RU_caendig_i1468_0005_0000.caendat"
    datasetEvents = 3_000_000 #Events=Number of total signals.
    samples_number=128
    first_sample=0
    n_channels=64
else:
    trainDatasetFile = "./DataLiFOscar/data_0-7.caendat"
    datasetEvents = 709_755
    samples_number=40
    first_sample=200
    n_channels=16

In [3]:
#Debug cell???

for nLayers in nLayers_list:
   for latent_dim in latent_dim_list:
       autoenctest = Autoencoder(base_channel_size=nLayers, 
                                    latent_dim=latent_dim, 
                                    encoder_class=ConvEncoder, 
                                    decoder_class=ConvDecoder, 
                                    width=derivative_order, 
                                    height=samples_number, 
                                    act_fn=act_fn, 
                                    dropout_p=dropout_p, 
                                    learning_rate=learning_rate)
   
       x1_s = torch.zeros([3, 1, derivative_order, samples_number], dtype=torch.float32)
       x1_c = torch.zeros([3, 64+1], dtype=torch.float32)

       x2_s = torch.zeros([3, latent_dim], dtype=torch.float32)

       print(pms.summary(autoenctest, x1_s, x1_c))
       #print(autoenctest._get_reconstruction_loss([x1_s, x1_c]))
       #print(autoenctest.forward(x1_s, x1_c).shape)

-----------------------------------------------------------------------
      Layer (type)        Output Shape         Param #     Tr. Param #
     ConvEncoder-1              [3, 2]           2,546           2,546
     ConvDecoder-2       [3, 1, 1, 40]           2,317           2,317
Total params: 4,863
Trainable params: 4,863
Non-trainable params: 0
-----------------------------------------------------------------------


In [4]:
#Load the dataset

split_percent = [0.9, 0.05, 0.05] #90% for training, 5% for testing and 5% for validating.
#This was a normal splitting but we could add cross-validation after???
split = [int(i*datasetEvents) for i in split_percent] #Multiply the number of total events to each percentage. Output would be, if datasetEvents=1000, [900, 5, 5].

split[0] += datasetEvents - sum(split) #To ensure the total always matches datasetEvents.

[train_dataset, val_dataset, test_dataset] = torch.utils.data.random_split(pid_dataset.PIDDataset(root=trainDatasetFile, 
                                       mode=pid_dataset.kTrain, 
                                       count=datasetEvents, 
                                       nder=derivative_order, 
                                       nsamples=samples_number, 
                                       firstsample=first_sample,
                                       nchannels=n_channels,
                                       normalized=normalized),
                                       split)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=True, pin_memory=True, num_workers=4)
#Batch size: number of signals per batch.
#Shuffle: shuffle data at every epoch.
#Epoch: During an epoch, the model processes each training example once, 
#and the learning process updates the model's parameters based on the loss calculated from the training examples.

val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=2000, shuffle=False, drop_last=False, num_workers=4)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=2000, shuffle=False, drop_last=False, num_workers=4)

def get_train_images(num):
    return [torch.stack([train_dataset[i][0] for i in range(num)], dim=0), torch.stack([train_dataset[i][1] for i in range(num)], dim=0)]
    #Here i is the batch and what is 0? dim=0 is the dimension of the output tensor.
    #What are IDs???

def get_validation_images(num):
    return [torch.stack([val_dataset[i][0] for i in range(num)], dim=0), torch.stack([val_dataset[i][1] for i in range(num)], dim=0)]

def get_test_images(num):
    return [torch.stack([test_dataset[i][0] for i in range(num)], dim=0), torch.stack([test_dataset[i][1] for i in range(num)], dim=0)]

Reading 709755 events
Number of board in the stream 5
Epoch start = 0x657C613D


In [5]:
#Callback functions

CHECKPOINT_PATH = os.environ.get("PATH_CHECKPOINT", "saved_models/autoencoder")

class ImageCallback(lightning.pytorch.callbacks.Callback):
    def __init__(self, input_imgs, every_n_epochs=1, data_type="train"):
        super().__init__()# It's often used to call the parent class's __init__ method to ensure that the parent class is properly initialized when creating an instance of the derived class.
        self.input_imgs = input_imgs  # Images to reconstruct during training
        # Only save uthose images every N epochs (otherwise tensorboard gets quite large)
        self.every_n_epochs = every_n_epochs
        self.data_type = data_type

    def on_train_epoch_end(self, trainer, pl_module):
        if trainer.current_epoch % self.every_n_epochs == 0:
            # Reconstruct images
            input_imgs = self.input_imgs[0].to(pl_module.device)
            input_ids = self.input_imgs[1].to(pl_module.device)
            with torch.no_grad():
                pl_module.eval()
                reconst_imgs = pl_module(input_imgs, input_ids)
                pl_module.train()
            # Plot and add to tensorboard (only makes sense for images)
            #imgs = torch.stack([input_imgs, reconst_imgs], dim=1).flatten(0, 1)
            #grid = torchvision.utils.make_grid(imgs, nrow=2, normalize=True, value_range=(-1, 1))
            #trainer.logger.experiment.add_image("Reconstruction Images %s" % (self.data_type), grid, global_step=trainer.global_step)

            ncols = math.ceil(math.sqrt(input_imgs.shape[0]))
            fig, axes = plt.subplots(ncols, ncols, figsize=(10, 10))
            for i, ax in enumerate(axes.flat):
                if i >= len(input_imgs):
                    break
                ax.plot(input_imgs[i][0][0].cpu().numpy(), "-", color="black")
                ax.plot(reconst_imgs[i][0][0].cpu().numpy(), "--", color="red")
            trainer.logger.experiment.add_figure("Reconstructions Plots %s" % (self.data_type), fig, global_step=trainer.global_step)
            

class LatentSpaceCallback(lightning.pytorch.callbacks.Callback):
    def __init__(self, input_imgs, every_n_epochs=1):
        super().__init__()
        self.input_imgs = input_imgs  # Images to reconstruct during training
        # Only save uthose images every N epochs (otherwise tensorboard gets quite large)
        self.every_n_epochs = every_n_epochs

    def on_train_epoch_end(self, trainer, pl_module):
        if trainer.current_epoch % self.every_n_epochs == 0:
            # Reconstruct images
            input_imgs = self.input_imgs[0].to(pl_module.device)
            input_ids = self.input_imgs[1].to(pl_module.device)
            with torch.no_grad():
                pl_module.eval()
                latent_output = pl_module.encoder(input_imgs, input_ids)
                pl_module.train()

            # Move latent_output to CPU
            latent_output = latent_output.cpu()
            try:
                histo = torch.histogramdd(latent_output, bins=1_000, range=None)
            except RuntimeError:
                print("RuntimeError")
                return
            
            # Plot and add to tensorboard as an image
            #grid = torchvision.utils.make_grid(histo.unsqueeze(0).unsqueeze(0), normalize=True)
            #trainer.logger.experiment.add_image("Latent space representation", grid, global_step=trainer.global_step)

            #Plot as a figure (usually better)
            fig, axes = plt.subplots(1, 1, figsize=(10, 10))
            axes.imshow(histo[0].detach().numpy(), 
                        extent=[histo[1][0][0], 
                                histo[1][0][-1],
                                histo[1][1][0], 
                                histo[1][1][-1]], 
                        aspect='auto', 
                        origin='lower', 
                        norm=matplotlib.colors.LogNorm())
            trainer.logger.experiment.add_figure("Latent space representation", fig, global_step=trainer.global_step)

In [9]:
#Training function definition

def train(latent_dim, base_channel_size, retrain=False):
    # Create a PyTorch Lightning trainer with the generation callback

    netName = "model_ld%i_bc%i_do%i" % (latent_dim, base_channel_size, derivative_order)
    
    callbacks=[
        #lightning.pytorch.callbacks.ModelCheckpoint(save_weights_only=True),
        ImageCallback(get_train_images(36), every_n_epochs=1, data_type="train"),
        ImageCallback(get_validation_images(36), every_n_epochs=1, data_type="val"),
        lightning.pytorch.callbacks.LearningRateMonitor("epoch"),
        lightning.pytorch.callbacks.ModelCheckpoint(
            monitor='val_loss',
            mode='min',
            save_top_k=1,  # Save only the best checkpoint based on validation loss
            save_last=True,  # Save the latest checkpoint
            filename='{epoch}-{val_loss:.2f}',  # Customize the filename with epoch and validation loss
            verbose=False,
        ),
        lightning.pytorch.callbacks.EarlyStopping(
            monitor='val_loss',
            mode='min',
            patience=EarlyStoppingNr,
            verbose=False,
            min_delta=0.00,
        ),
        # Other callbacks...
    ]

    if latent_dim == 2:
        callbacks.append(LatentSpaceCallback(get_train_images(100_000), every_n_epochs=1))

    trainer = lightning.Trainer(
        default_root_dir=os.path.join(CHECKPOINT_PATH, "model_%i_%i" % (latent_dim, base_channel_size)),
        accelerator="auto",
        devices=1,
        callbacks=callbacks,
        max_epochs=max_epochs,
        logger=lightning.pytorch.loggers.tensorboard.TensorBoardLogger("saved_models/autoencoder", 
                                                                       name=netName),
    )
    trainer.logger._log_graph = True  # If True, we plot the computation graph in tensorboard
    trainer.logger._default_hp_metric = None  # Optional logging argument that we don't need

    # Check whether pretrained model exists. If yes, load it and skip training
    #ckpt_files = glob.glob('saved_models/autoencoder/%s/version_0/checkpoints/epoch*.ckpt' % (netName))
    ckpt_files = glob.glob('saved_models/autoencoder/%s/version_%i/checkpoints/epoch*.ckpt' % (netName, version_nr))

    if ckpt_files and not retrain:
        print("Found pretrained model, called %s" % ckpt_files[0])
        if variational:
            model = VarAutoencoder.load_from_checkpoint(ckpt_files[0])
        else:
            model = Autoencoder.load_from_checkpoint(ckpt_files[0])
        #model.to("cuda")
        model.to(device)
    else:
        if convolutional:
            decoder_class = ConvDecoder
            encoder_class = ConvEncoder
        else:
            decoder_class = FcDecoder
            encoder_class = FcEncoder

        if variational:
            model_class = VarAutoencoder
        else:
            model_class = Autoencoder
            
        ids_nr = get_test_images(1)[1].shape[1] - 1 
        model = model_class(base_channel_size=base_channel_size, 
                            latent_dim=latent_dim, 
                            width=derivative_order, 
                            height=samples_number,
                            decoder_class=decoder_class, 
                            encoder_class=encoder_class,
                            dropout_p=dropout_p,
                            act_fn=act_fn,
                            ids_nr=ids_nr,
                            learning_rate=learning_rate)
                        
        print("testing on a single batch...")
        print(model.encoder.forward(get_train_images(100)[0], get_train_images(100)[1]).shape)
        print("inished testing on a single batch...")

        trainer.fit(model, train_loader, val_loader)
        print("Finished training, saving model...")

    # Test best model on validation and test set
    val_result = trainer.test(model, dataloaders=val_loader, verbose=False)
    test_result = trainer.test(model, dataloaders=test_loader, verbose=False)
    result = {"test": test_result, "val": val_result}
    return model, result

In [10]:
#Training loop
colors = ["b", "g", "r", "c", "m", "y", "k", "w", "orange", "purple", "brown", "pink", "olive", "cyan"]
model_list= []
for latent_dim in latent_dim_list:
    for nlayers in nLayers_list:
        model_ld, result_ld = train(latent_dim, nlayers, retrain=retrain)
        model_list.append({"latent_dim": latent_dim, 
                           "nlayers": nlayers,
                           "model": model_ld, 
                           "result": result_ld, 
                           "color": colors.pop(0)})

GPU available: False, used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs


Found pretrained model, called saved_models/autoencoder/model_ld2_bc4_do1/version_0/checkpoints/epoch=85-val_loss=0.00.ckpt


NameError: name 'device' is not defined

In [None]:
 #Compare loss as a function of latent dimensionality

latent_dims = [m["latent_dim"] for m in model_list]
nlayers = [m["nlayers"] for m in model_list]
val_scores = [m["result"]["val"][0]["test_loss"] for m in model_list]
colors = [m["color"] for m in model_list]

fig, axes = plt.subplots(1, 3, figsize=(16, 4))
axes[0].scatter(latent_dims, val_scores, color=colors, marker="*", s=100)
axes[0].set_xlabel("Latent Dimensionality")
axes[0].set_ylabel("Validation Loss")

axes[1].scatter(nlayers, val_scores, color=colors, marker="*", s=100)
axes[1].set_xlabel("Number of Layers")
axes[1].set_ylabel("Validation Loss")

# Scatter plot with latent_dim on x-axis, nlayers on y-axis, and colormap on z-axis (val_scores)
scatter = axes[2].scatter(latent_dims, nlayers, c=val_scores, cmap='viridis', marker="s", s=600)
axes[2].set_xlabel("Latent Dimensionality")
axes[2].set_ylabel("Number of Layers")
axes[2].set_title("Validation Loss")

# Add colorbar
cbar = fig.colorbar(scatter, ax=axes[2])
cbar.set_label("Validation Loss")
plt.show()

NameError: name 'model_list' is not defined

In [None]:
#Define plot functions

def plot(inTensor, axes, color, dim, label, linestyle="--", skip=0):
    for i, ax in enumerate(axes.flat):
        if i >= len(inTensor)+skip:
            break
        if i < skip:
            continue    
        ax.plot(inTensor[i-skip][0][dim].numpy(), linestyle, color=color, label=label)
        ax.legend()

def reconstruct_signals(input_signal, model_l, axes, dim):
    # Reconstruct images
    model = model_l["model"].eval()
    color = model_l["color"]
    label = "ld %s nl %s" % (model_l["latent_dim"], model_l["nlayers"])
    with torch.no_grad():
        reconst_imgs = model(input_signal[0].to(model.device), input_signal[1].to(model.device))    
    reconst_imgs = reconst_imgs.cpu()
    plot(reconst_imgs, axes, color, dim, label)

def generate_signals(input_latents, model_l, axes, dim, skip=0, color="b"):
    # Reconstruct images
    model = model_l["model"].eval()
    label = "e=%.1f, x=%.1f, y=%.1f" % (input_latents[1][0][0].numpy(), input_latents[0][0][0].numpy(), input_latents[0][0][1].numpy())
    with torch.no_grad():
        reconst_imgs = model.decoder(input_latents[0].to(model.device), input_latents[1].to(model.device))    
    reconst_imgs = reconst_imgs.cpu()
    plot(reconst_imgs, axes, color, dim, label, skip=skip)

In [None]:
#Compare the reconstructions with different model hyper-parameters
ncols = 4
nrows = 16
dim = 0

fig, axes = plt.subplots(nrows, ncols, figsize=(15, 40))
input_imgs = get_train_images(ncols*nrows)
#input_imgs = get_validation_images(ncols*nrows)

for m in model_list:
    reconstruct_signals(input_imgs, m, axes, dim)
plot(input_imgs[0], axes, "black", dim=dim, label="input", linestyle="-")

plt.show()

In [None]:
#Visualize the latent space

import ipywidgets as widgets
import ipywidgets as widgets
    
# Create the slider
ene_slider = widgets.FloatSlider(value=0.5, min=0, max=1, step=0.01, description='Energy:')
nch = get_train_images(1)[1].shape[1] - 1
ch_dropdown = widgets.Dropdown(
    options=[(f'Channel {i}', i) for i in range(-1,nch+1)],
    value=0,
    description='Channel:',
)
# Display the slider
display(ch_dropdown)
display(ene_slider)

idx_test = 0

def get_histo(idx_ch):
    with torch.no_grad():
        model_list[idx_test]["model"].eval()
        dataset = get_train_images(500_000)
        # Filter the dataset
        if idx_ch >= 0:
            dataset = (dataset[0][dataset[1][:, idx_ch+1] == 1], 
                       dataset[1][dataset[1][:, idx_ch+1] == 1])
        avg_id = dataset[1].mean(dim=0)

        

        histoOut = torch.histogramdd(   model_list[idx_test]["model"].encoder(dataset[0].to(model_list[idx_test]["model"].device), 
                                                                                dataset[1].to(model_list[idx_test]["model"].device)),
                                        bins=5_000,
                                        range = None,
                                        )
        histoOut[0].detach().numpy()

        return [histoOut, avg_id]


#Create the 2 canvases
fig, axes = plt.subplots(1, 2, figsize=(15, 10))

                
def on_ch_dropdown_change(change):
    value = change['new']
    histo, avg_id = get_histo(value)

    colors=["b", "g", "r", "c", "m", "y", "k", "w", "orange", "purple", "brown", "pink", "olive", "cyan"]

    axes[0].imshow( histo[0], 
                    extent=[histo[1][0][0], 
                            histo[1][0][-1],
                            histo[1][1][0], 
                            histo[1][1][-1]], 
                    aspect='auto', 
                    origin='lower', 
                    norm=matplotlib.colors.LogNorm())
                    
    def on_histo_double_click(event):
        if event.dblclick:
            x = event.xdata
            y = event.ydata
            if x is not None and y is not None:

                #Get coordinates
                last_click = [x, y] 
                print("[%.2f, %.2f]" % (x, y))

                color=colors.pop(0)
                #Annote in the plot
                axes[0].annotate("[%.2f, %.2f]" % (x, y), (x, y), color=color, bbox=dict(facecolor='white', edgecolor='white', alpha=0.5))
                axes[0].plot(x, y, 'o', color=color)

                #Plot the signal
                def on_ene_slider_change(change):
                    value = change['new']
                    double_click_tensor = torch.FloatTensor([last_click])
                    id_tensor = avg_id.unsqueeze(0).repeat(double_click_tensor.size(0), 1)
                    id_tensor[:, 0] = value
                    generate_signals([double_click_tensor, id_tensor], model_list[idx_test], axes, dim=0, skip=1, color=color)

                ene_slider.observe(on_ene_slider_change, names='value')

    fig.canvas.mpl_connect('button_press_event', on_histo_double_click)

ch_dropdown.observe(on_ch_dropdown_change, names='value')            



    

plt.show()

In [None]:
#Finding similar signals in the latent space and then checking the signals

def embed_imgs(model, data_loader):
    # Encode all images in the data_laoder using model, and return both images and encodings
    img_list, embed_list = [], []
    model.eval()
    for imgs, ids in data_loader:
        with torch.no_grad():
            z = model.encoder(imgs.to(model.device), ids.to(model.device))
        img_list.append(imgs)
        embed_list.append(z)
    return (torch.cat(img_list, dim=0), torch.cat(embed_list, dim=0))


train_img_embeds = embed_imgs(model_list[idx_test]["model"], train_loader)
test_img_embeds = embed_imgs(model_list[idx_test]["model"], test_loader)

def find_similar_images(query_img, query_z, key_embeds, ax, K=8, dim=0):
    # Find closest K images. We use the euclidean distance here but other like cosine distance can also be used.
    dist = torch.cdist(query_z[None, :], key_embeds[1], p=2)
    dist = dist.squeeze(dim=0)
    dist, indices = torch.sort(dist)
    # Plot K closest images
    imgs_to_display = torch.cat([query_img[None], key_embeds[0][indices[:K]]], dim=0)
    #grid = torchvision.utils.make_grid(imgs_to_display, nrow=K + 1, normalize=True, value_range=(-1, 1))
    #grid = grid.permute(1, 2, 0)
    #plt.figure(figsize=(12, 3))
    #plt.imshow(grid)
    #plt.axis("off")
    #plt.show()
    colors = ["b", "g", "r", "c", "m", "y", "k", "w", "orange", "purple", "brown", "pink"]
    ax.plot(query_img[0][dim].numpy(), color="k", label="query")
    for i in range(imgs_to_display.size()[0]):
        ax.plot(imgs_to_display[i][0][dim].numpy(), "--", color=colors.pop(0), label="latent_dim %i" % latent_dim)

In [None]:
# Plot the closest images for the first N test images as example
ncols = 6
nrows = 6
fig, axes = plt.subplots(nrows, ncols, figsize=(10, 10))
for i, ax in enumerate(axes.flat):
    find_similar_images(test_img_embeds[0][i], test_img_embeds[1][i], key_embeds=train_img_embeds, K=6, ax=ax, dim=0)

#This does not make sense, i probabily did something wrong