# UNSUPERVISED LEARNING TASK
In this notebook I will implement a convolutional autoencoder able to learn and reproduce the distrubution of instances of the dataset, as well explore more advanced techniques to achieve similar goals.

In [None]:
!pip install --quiet optuna

In [None]:
import optuna

optuna.__version__

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision

import pandas as pd
import matplotlib.pyplot as plt
import math as mt
import random
import plotly.express as px
from tqdm import tqdm

from sklearn.model_selection import KFold
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE

## Datasets
Creation of the datasets: MNIST, and CIFAR (optional).

In [None]:
### Download the data and create dataset
DataDir_MNIST = 'MNIST'
DataDir_CIFAR = 'CIFAR10'

train_dataset_MN = torchvision.datasets.MNIST(DataDir_MNIST, train=True, download=True)
test_dataset_MN  = torchvision.datasets.MNIST(DataDir_MNIST, train=False, download=True)

#train_dataset_CF = torchvision.datasets.CIFAR10(DataDir_CF, train=True, download=True)
#test_dataset_CF  = torchvision.datasets.CIFAR10(DataDir_CF, train=False, download=True)

dt="MN"

Definition of the transformation to operate on the training and test dataset (in the case of the CIFAR10 dataset, normalizing and cropping transformation will be applied only to the training set).

In [None]:
training_transf = transforms.Compose([transforms.ToTensor()])
test_transf = transforms.Compose([transforms.ToTensor()])

In [None]:
train_dataset_MN.transform = training_transf
test_dataset_MN.transform = test_transf

#train_dataset_CF.transform = training_transf
#test_dataset_CF.transform = test_transf 

# Check if the GPU is available
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print(f'Selected device: {device}')

Selected device: cuda


## Model

### Encoder
Definition of the convolutional encoder, exploring the application of both pooling and non linear layers to the filtered image.

In [None]:
class Encoder(nn.Module):

  def __init__(self, encoded_dim, proc, nh1, nh2, ch1, ch2, ch3, act_h, act_cnv, p):
    super().__init__()

    # Definition of the convolutional layer for the encoder
    if proc=='pool':
      self.enc_conv=nn.Sequential(
          # 1st
          nn.Conv2d(1, ch1, kernel_size=3, stride=2, padding=1),
          nn.LPPool2d(2, kernel_size=2, stride=1),
          # 2nd
          nn.Conv2d(ch1, ch2, kernel_size=3, stride=2, padding=1),
          nn.LPPool2d(2, kernel_size=2, stride=1),
          # 3rd
          nn.Conv2d(ch2, ch3, kernel_size=3, stride=2, padding=1)
          # no pooling in the final layer
      )
    elif proc=='nlact':
      if act_cnv=='SiL':
        self.enc_conv=nn.Sequential(
            # 1st
            nn.Conv2d(1, ch1, kernel_size=3, stride=2, padding=1),
            nn.SiLU(True),
            # 2nd
            nn.Conv2d(ch1, ch2, kernel_size=3, stride=2, padding=1),
            nn.SiLU(True),
            # 3rd
            nn.Conv2d(ch2, ch3, kernel_size=3, stride=2, padding=0)
            # no non-linearity in the final layer
        )
      elif act_cnv=='Lk':
        self.enc_conv=nn.Sequential(
            # 1st
            nn.Conv2d(1, ch1, kernel_size=3, stride=2, padding=1),
            nn.LeakyReLU(True),
            # 2nd
            nn.Conv2d(ch1, ch2, kernel_size=3, stride=2, padding=1),
            nn.LeakyReLU(True),
            # 3rd
            nn.Conv2d(ch2, ch3, kernel_size=3, stride=2, padding=0)
            # no non-linearity in the final layer
        )
      elif act_cnv=='Ge':
        self.enc_conv=nn.Sequential(
            # 1st
            nn.Conv2d(1, ch1, kernel_size=3, stride=2, padding=1),
            nn.GELU(),
            # 2nd
            nn.Conv2d(ch1, ch2, kernel_size=3, stride=2, padding=1),
            nn.GELU(),
            # 3rd
            nn.Conv2d(ch2, ch3, kernel_size=3, stride=2, padding=0)
            # no non-linearity in the final layer
        )
      else:
        print("Invalid input for the type of processing")
    
    self.flatten=nn.Flatten(start_dim=1)

    # Definition of the linear layer
    if act_h=="Lk":
      self.encoder_lin=nn.Sequential(
        # First Linear Layer
        nn.Linear((ch3*3*3), nh1),
        nn.LeakyReLU(True),
        nn.Dropout(p),
        #Second linear
        nn.Linear(nh1, nh2),
        nn.LeakyReLU(True),
        nn.Dropout(p),
        # Output Linear Layer
        nn.Linear(nh2, encoded_dim)  
        )  
    elif act_h=="Ge":
      self.encoder_lin=nn.Sequential(
        # First Linear Layer
        nn.Linear((ch3*3*3), nh1),
        nn.GELU(),
        nn.Dropout(p),
        # Second Linear Layer
        nn.Linear(nh1, nh2),
        nn.GELU(),
        nn.Dropout(p),
        # Output Linear layer
        nn.Linear(nh2, encoded_dim)  
        )  
    elif act_h=="SiL":
      self.encoder_lin=nn.Sequential(
        # First Linear Layer
        nn.Linear((ch3*3*3), nh1),
        nn.SiLU(True),
        nn.Dropout(p),
        # Second Linear Layer
        nn.Linear(nh1, nh2),
        nn.SiLU(True),
        nn.Dropout(p),
        # Third Linear layer
        nn.Linear(nh2, encoded_dim)  
        )  
    elif act_h=="Se":
      self.encoder_lin=nn.Sequential(
        # First Linear Layer
        nn.Linear((ch3*3*3), nh1),
        nn.SELU(True),
        nn.Dropout(p),
        # Second Linear Layer
        nn.Linear(nh1, nh2),
        nn.SELU(True),
        nn.Dropout(p),
        # Third Linear Layer
        nn.Linear(nh2, encoded_dim)  
        )  
    elif act_h=="Si":
      self.encoder_lin=nn.Sequential(
        # First Linear Layer
        nn.Linear((ch3*3*3), nh1),
        nn.Sigmoid(),
        nn.Dropout(p),
        # Second Linear Layer
        nn.Linear(nh1, nh2),
        nn.Sigmoid(),
        nn.Dropout(p),
        # Third Linear Layer
        nn.Linear(nh2, encoded_dim)  
        )  
    
  def forward(self,x):
    #print(x.size())
    x=self.enc_conv(x)
    #print(x.size())
    x=self.flatten(x)
    #print(x.size())
    out=self.encoder_lin(x)
    #print(out.size())
    return out

### Decoder
Definition of the decoder, with the re-application of the same layers in inversed order.

In [None]:
class Decoder(nn.Module):

  def __init__(self, encoded_dim, proc, nh1, nh2, ch1, ch2, ch3, act_h, act_cnv, p):
    super().__init__()

    # Definition of the linear layer
    if act_h=="Lk":
      self.decoder_lin=nn.Sequential(
        # First Linear Layer
        nn.Linear(encoded_dim, nh2),
        nn.LeakyReLU(True),
        nn.Dropout(p),
        #Second linear
        nn.Linear(nh2, nh1),
        nn.LeakyReLU(True),
        nn.Dropout(p),
        # Output Linear Layer
        nn.Linear(nh1, ch3*3*3)  
        )  
    elif act_h=="Ge":
      self.decoder_lin=nn.Sequential(
        # First Linear Layer
        nn.Linear(encoded_dim, nh2),
        nn.GELU(),
        nn.Dropout(p),
        # Second Linear Layer
        nn.Linear(nh2, nh1),
        nn.GELU(),
        nn.Dropout(p),
        # Output Linear layer
        nn.Linear(nh1, ch3*3*3)  
        )  
    elif act_h=="SiL":
      self.decoder_lin=nn.Sequential(
        # First Linear Layer
        nn.Linear(encoded_dim, nh2),
        nn.SiLU(True),
        nn.Dropout(p),
        # Second Linear Layer
        nn.Linear(nh2, nh1),
        nn.SiLU(True),
        nn.Dropout(p),
        # Third Linear layer
        nn.Linear(nh1, ch3*3*3)  
        )  
    elif act_h=="Se":
      self.decoder_lin=nn.Sequential(
        # First Linear Layer
        nn.Linear(encoded_dim , nh2),
        nn.SELU(True),
        nn.Dropout(p),
        # Second Linear Layer
        nn.Linear(nh2, nh1),
        nn.SELU(True),
        nn.Dropout(p),
        # Third Linear Layer
        nn.Linear(nh1, ch3*3*3)  
        )  
    elif act_h=="Si":
      self.decoder_lin=nn.Sequential(
        # First Linear Layer
        nn.Linear(encoded_dim, nh2),
        nn.Sigmoid(),
        nn.Dropout(p),
        # Second Linear Layer
        nn.Linear(nh2, nh1),
        nn.Sigmoid(),
        nn.Dropout(p),
        # Third Linear Layer
        nn.Linear(nh1, ch3*3*3)  
        ) 
    
    self.unflatten=nn.Unflatten(dim=1, unflattened_size=(ch3,3,3))

    if proc=='pool':
      self.dec_conv=nn.Sequential(
          # 1st
          nn.ConvTranspose2d(ch3, ch2, kernel_size=3, stride=2, padding=0, output_padding=0),
          #nn.LPPool2d(2, kernel_size=3, stride=1),
          # 2nd
          nn.ConvTranspose2d(ch2, ch1, kernel_size=3, stride=2, padding=1, output_padding=1),
          #nn.LPPool2d(2, kernel_size=2, stride=1),
          # 3rd
          nn.ConvTranspose2d(ch1, 1, kernel_size=3, stride=2, padding=1, output_padding=1)
          # no pooling in the final layer
      )
    elif proc=='nlact':
      if act_cnv=='SiL':
        self.dec_conv=nn.Sequential(
            # 1st
            nn.ConvTranspose2d(ch3, ch2, kernel_size=3, stride=2, output_padding=0),
            nn.SiLU(True),
            # 2nd
            nn.ConvTranspose2d(ch2, ch1, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.SiLU(True),
            # 3rd
            nn.ConvTranspose2d(ch1, 1, kernel_size=3, stride=2, padding=1, output_padding=1)
            # no non-linearity in the final layer
        )
      elif act_cnv=='Lk':
        self.dec_conv=nn.Sequential(
            # 1st
            nn.ConvTranspose2d(ch3, ch2, kernel_size=3, stride=2, output_padding=0),
            nn.LeakyReLU(True),
            # 2nd
            nn.ConvTranspose2d(ch2, ch1, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.LeakyReLU(True),
            # 3rd
            nn.ConvTranspose2d(ch1, 1, kernel_size=3, stride=2, padding=1, output_padding=1)
            # no non-linearity in the final layer
        )
      elif act_cnv=='Ge':
        self.dec_conv=nn.Sequential(
            # 1st
            nn.ConvTranspose2d(ch3, ch2, kernel_size=3, stride=2, output_padding=0),
            nn.GELU(),
            # 2nd
            nn.ConvTranspose2d(ch2, ch1, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.GELU(),
            # 3rd
            nn.ConvTranspose2d(ch1, 1, kernel_size=3, stride=2, padding=1, output_padding=1)
            # no non-linearity in the final layer
        )
      else:
        print("Invalid input for the type of processing")    

  def forward(self, x):
    x=self.decoder_lin(x)
    #print(x.size())
    x=self.unflatten(x)
    #print(x.size())
    x=self.dec_conv(x)
    #print(x.size())
    return x

### General

In [None]:
torch.manual_seed(2)

nh1=128
nh2=150

ch1=8
ch2=16
ch3=32

if dt=="MN":
  train_dt=train_dataset_MN
  test_dt=test_dataset_MN
elif dt=="CF":
  train_dt=train_dataset_CF
  test_dt=test_dataset_CF
img=test_dt[0][0].unsqueeze(0).to(device)

## Training
Definition of the Encoder training function and the Decoder training function.

In [None]:
def RelEntr(mean, cov, encoded_dim):
  # We are considering the relative entropy between two gaussians
  RE = torch.sum((torch.sum(torch.exp(cov),1)+torch.sum(torch.square(mean),1)-encoded_dim-torch.log(torch.prod(torch.exp(cov),1)))/2)

  return RE

In [None]:
def train_f(encoder, decoder, mean, covTrac, data, loss_fn, optimizer, device, auto_enc, encoded_dim, eph_loss):
  # Definition of the training process on the basis of the two defined classes
  encoder.train()
  decoder.train()
  if auto_enc=="Variational":
    mean.train()
    covTrac.train()
  # The data is supposed to be already available in an iterable format
  # Deliberate neglection of the labels
  for images, _ in data:
    images=images.to(device)
    _=_.to(device)
    # Processing
    enc_data=encoder(images)
    if auto_enc=="Variational":
      mn=mean(enc_data)
      cv=covTrac(enc_data)
      q=torch.distributions.Normal(0,1)
      xi=q.rsample()
      enc_data=mn+xi*torch.sqrt(torch.exp(cv))
    dec_data=decoder(enc_data)
    # Loss (distance between the two datapoints according to a certain metric)
    if auto_enc=="Standard":
      Loss=loss_fn(dec_data,images)
    elif auto_enc=="Denoising":
      Loss=loss_fn(dec_data,_)
    elif auto_enc=="Variational":
      Loss=loss_fn(dec_data, images)+RelEntr(mn, cv, encoded_dim)
      eph_loss.append(Loss.data.detach().cpu().numpy())
    # Backprop
    optimizer.zero_grad()
    Loss.backward()
    optimizer.step()

    print('\t partial train loss (single batch): %f' % (Loss.data))

In [None]:
def test_f(encoder, decoder, mean, covTrac, data, loss_fn, device, auto_enc):
  # Definition of the evaluation process (both for validation and test set)
  encoder.eval()
  decoder.eval()
  with torch.no_grad():
    # Definition of the lists where to store real and reconstructed images
    Im_out=[]
    Im_rec=[]
    Im_lb=[]
    for images, _ in data:
      images=images.to(device)
      # Processing
      enc_data=encoder(images)
      if auto_enc=="Variational":
        mn=mean(enc_data)
        cv=covTrac(enc_data)
        enc_data=mn+cv
      dec_data=decoder(enc_data)
      # creation of lists
      Im_out.append(dec_data.cpu())
      Im_rec.append(images.cpu())
      Im_lb.append(_.cpu())
    # Conversion to tensor of the entire data structure
    Im_out=torch.cat(Im_out)
    Im_rec=torch.cat(Im_rec)
    Im_lb=torch.cat(Im_lb)
    # Computation of the final loss
    Loss=loss_fn(Im_rec, Im_out)
  return Im_out, Im_lb, Loss.data 

### Standard Autoencoder
Definition of the main loop for the training of the encoder and the decoder.

Use of the Optuna library to perform random search on the space of hyperparameters, where the chosen hp are:


*   Type of convolutional processing: pooling ('pool'), non-linear ('nlact') 
*   Type of convolutional activation: SiLU ('SiL'), LeakyReLU ('Lk'), GELU ('Ge')
* Type of activation (linear layers): SiLU ('SiL'), LeakyReLU ('Lk'), GELU ('Ge'), SELU ('Se'), Sigmoid ('Si')
* Learning rate
* Weight decay rate

The number of units (as well as the number of channels) is kept fixed in order not to increase drastically the size of the hyperparameters space to search.



In [None]:
def objective(trial):

  global tr
  global kfld
  global n_trials, auto_enc

  act_h = trial.suggest_categorical('act_h',['Lk','Ge','SiL','Se','Si'])
  act_cnv = trial.suggest_categorical('act_cnv',['SiL','Lk','Ge'])
  if auto_enc=='Variational':
    act_v = trial.suggest_categorical('act_v',['Lk','Re','Se'])

  proc = trial.suggest_categorical('proc',['pool','nlact','nlact','nlact'])
  encoded_dim = trial.suggest_int('encoded_dim',10,30)
  p = trial.suggest_uniform('p',0.01,0.1)

  encoder=Encoder(encoded_dim, proc, nh1, nh2, ch1, ch2, ch3, act_h=act_h, act_cnv=act_cnv, p=p)

  if auto_enc=='Variational':
    mean=StatNet(encoded_dim, act_v)
    covTrac=StatNet(encoded_dim, act_v)

    mean.to(device)
    covTrac.to(device)
  else:
    mean='boh'
    covTrac='boh'

  decoder=Decoder(encoded_dim, proc, nh1, nh2, ch1, ch2, ch3, act_h=act_h, act_cnv=act_cnv, p=p)
  encoder.to(device)
  decoder.to(device)

  lr = trial.suggest_uniform('lr',1e-4,1e-2) #4-2
  weight_dc = trial.suggest_uniform('weight_dc',1e-6,1e-5) #6-5
  rec_loss_vl=[]
  rec_loss_tr=[]

  if auto_enc=='Variational':  
    parameters_to_opt=[{'params': encoder.parameters()},
                       {'params': mean.parameters()},
                       {'params': covTrac.parameters()},
                       {'params': decoder.parameters()}
                      ]
    Model={'proc':proc,
         'act_h':act_h, 
         'act_cnv':act_cnv, 
         'act_v':act_v,
         'lr':lr, 
         'weight_dc':weight_dc, 
         'trial':tr, 
         'kfold':kfld,
         'encoded_dim':encoded_dim,
         'p':p}
  else:
    parameters_to_opt=[{'params': encoder.parameters()},
                      {'params': decoder.parameters()}
                      ]
    Model={'proc':proc,
         'act_h':act_h, 
         'act_cnv':act_cnv, 
         'lr':lr, 
         'weight_dc':weight_dc, 
         'trial':tr, 
         'kfold':kfld,
         'encoded_dim':encoded_dim,
         'p':p}

  optimizer=optim.Adam(parameters_to_opt, lr=lr, weight_decay=weight_dc)
  Models.append(Model)

  for eph in range(n_eph):
    # (use of the training function)
    eph_loss=[]
    train_f(encoder=encoder,
            mean=mean,
            covTrac=covTrac,
            decoder=decoder,
            data=train_dts,
            loss_fn=loss_fn,
            optimizer=optimizer,
            device=device,
            auto_enc=auto_enc,
            encoded_dim=encoded_dim,
            eph_loss=eph_loss) 
    _,_,val_loss=test_f(encoder=encoder,
                        decoder=decoder,
                        mean=mean,
                        covTrac=covTrac,
                        data=val_dts,
                        loss_fn=loss_fn,
                        device=device,
                        auto_enc=auto_enc)
    rec_loss_vl.append(val_loss.cpu().numpy())
    if auto_enc=='Variational':
      rec_loss_tr.append(eph_loss)

    if eph%5==0:
      print("-----------------------")
      print("Epoch: "+str(eph))
      print("Validation Loss: "+str(val_loss.cpu().numpy()))
      print("-----------------------")

    encoder.eval()
    decoder.eval()
    if auto_enc=='Variational':
      Final_Act=nn.Sigmoid()
      with torch.no_grad():
        rec_img=Final_Act(decoder(mean(encoder(img))+covTrac(encoder(img))))
    else:
      with torch.no_grad():
        rec_img=decoder(encoder(img))
    fig, axs = plt.subplots(1, 2, figsize=(12,6))
    axs[0].imshow(img.cpu().squeeze().numpy(), cmap='gist_gray')
    axs[0].set_title('Original Image')
    axs[1].imshow(rec_img.cpu().squeeze().numpy(), cmap='gist_gray')
    axs[1].set_title('Reconstructed Image (epoch - %d)' %(eph))
    plt.tight_layout()
    if eph==(n_eph-1):
      name="Rec_img_"+auto_enc+"_"+str(tr)+'_'+str(kfld)+'.png'
      plt.savefig(name)
    plt.show()
    plt.close()

  Rec_L={'Rec_Loss_vl':rec_loss_vl, 'Rec_Loss_tr':rec_loss_tr, 'trial':tr, 'kfold':kfld}
  Rec_Losses.append(Rec_L)
  
  plt.figure(figsize=(12,8))
  if auto_enc=='Variational':
    plt.semilogy([np.mean(x) for x in rec_loss_tr])
  else:
    plt.semilogy(rec_loss_vl)
  plt.xlabel('Epoch')
  plt.ylabel('Loss')
  plt.grid()
  name_loss="Rec_Loss_"+auto_enc+"_"+str(tr)+'_'+str(kfld)+'.png'
  plt.savefig(name_loss)
  plt.show()  

  # Save the parameters of the encoder and the decoder
  nameEnc='EncoderParameters_'+auto_enc+'_'+str(tr)+'_'+str(kfld)+'.pth'
  nameDec='DecoderParameters_'+auto_enc+'_'+str(tr)+'_'+str(kfld)+'.pth'
  torch.save(encoder.state_dict(),nameEnc)
  torch.save(decoder.state_dict(),nameDec)

  if auto_enc=='Variational':
    nameMean='Mean_'+str(tr)+'_'+str(kfld)+'.pth'
    nameCov='CovTrace_'+str(tr)+'_'+str(kfld)+'.pth'
    torch.save(mean.state_dict(), nameMean)
    torch.save(covTrac.state_dict(), nameCov)

  tr+=1

  return val_loss.data

Functions for the definition of the five best models, based on statistics of the validation loss for each model (mean).

In [None]:
def BestModels():
  global auto_enc

  means=[]
  for rec in Rec_Losses:
    #if auto_enc=='Variational':
      #if rec['Rec_Loss_tr'][-1][-1]=='nan':
      #  mean=1e+15
      #else:
        #mean=np.mean([np.mean(x) for x in rec['Rec_Loss_tr'][-5]])
    #else:
    if rec['Rec_Loss_vl'][-1]=='nan':
      mean=1e+15
    else:
      mean=np.mean(rec['Rec_Loss_vl'][-5:])
    means.append(mean)
  top=np.copy(means)
  top.sort()
  Best_Rec_idx=[]
  for rec in top[:5]:
    Best_Rec_idx.append(means.index(rec))
  
  return Best_Rec_idx

In [None]:
n_eph=10
kf=KFold(n_splits=5)
n_trials=15

kfld=1
Models=[]
Rec_Losses=[]
auto_enc="Standard"

loss_fn=nn.SmoothL1Loss()

for train_s, val in kf.split(train_dt):

  # Creation of the training and validation sets (folded)
  train_dts=DataLoader([(train_dataset_MN[x][0],train_dataset_MN[x][1]) for x in train_s], batch_size=256, shuffle=True, num_workers=0)
  val_dts=DataLoader([(train_dataset_MN[x][0],train_dataset_MN[x][1]) for x in val], batch_size=256, shuffle=False, num_workers=0)

  tr=0

  # Creation of the study
  study = optuna.create_study(direction="minimize")
  study.optimize(objective, n_trials=n_trials)
  
  kfld+=1

Rec_Losses_Standard=np.copy(Rec_Losses)
Models_Standard=np.copy(Models)

Definition of the best models and evaluation of their reconstruction error on the test set.

In [None]:
test_dts=DataLoader([(test_dt[x][0],test_dt[x][1]) for x in range(len(test_dt))],batch_size=256, shuffle=False, num_workers=0)

In [None]:
def TestErrors(idx, hook=False):

  global rank, auto_enc
  global nh1, nh2, ch1, ch2, ch3

  model=Models[idx]
  
  encoder=Encoder(model['encoded_dim'], model['proc'], nh1, nh2, ch1, ch2, ch3, act_h=model['act_h'], act_cnv=model['act_cnv'], p=model['p'])
  decoder=Decoder(model['encoded_dim'], model['proc'], nh1, nh2, ch1, ch2, ch3, act_h=model['act_h'], act_cnv=model['act_cnv'], p=model['p'])
  if auto_enc=='Variational':
    mean=StatNet(model['encoded_dim'], model['act_v'])
    covTrac=StatNet(model['encoded_dim'], model['act_v'])

    nameMean='Mean_'+str(model['trial'])+'_'+str(model['kfold'])+'.pth'
    nameCov='CovTrace_'+str(model['trial'])+'_'+str(model['kfold'])+'.pth'
    mean.load_state_dict(torch.load(nameMean))
    covTrac.load_state_dict(torch.load(nameCov))
    mean.to(device)
    covTrac.to(device)
  else:
    mean='boh'
    covTrac='boh'

  name_enc_par='EncoderParameters_'+auto_enc+'_'+str(model['trial'])+'_'+str(model['kfold'])+'.pth'
  name_dec_par='DecoderParameters_'+auto_enc+'_'+str(model['trial'])+'_'+str(model['kfold'])+'.pth'
  encoder.load_state_dict(torch.load(name_enc_par))
  decoder.load_state_dict(torch.load(name_dec_par))
  encoder.to(device)
  decoder.to(device)
  
  if hook==True:
    hook_handle=encoder.encoder_lin[6].register_forward_hook(hook_feature_space)

  Im_out , Im_lb ,test_error=test_f(encoder=encoder,
                        decoder=decoder,
                        mean=mean,
                        covTrac=covTrac,
                        data=test_dts,
                        loss_fn=loss_fn,
                        device=device,
                        auto_enc=auto_enc
                        )
  
  if hook==False:
    print()
    print('Validation Rank: '+str(rank))
    print('------------------------------')
    print('Test Reconstruction Error: '+str(test_error.cpu().numpy()))
    print('------------------------------')
    print(model)
    print('------------------------------')
  
  rank+=1

  return {'Test_Rec_err':test_error.data,'Model':model}, Im_out, Im_lb

In [None]:
Best_Models_Standard=BestModels()

test_err=[]
rank=1
for idx in Best_Models_Standard:
  tst_err,_,_=TestErrors(idx)
  test_err.append(tst_err)

### Denoising Autoencoder
In this section of the notebook I add noise to the input in order to force the NN to learn the data distribution instead of simply reproducing a pseudo-identity function.

Creation of a function for the introduction of different types of noise in the dataset (Gaussian, Uniform, Exponential, Gamma)

In [None]:
class AddNoise(object):

  def __init__(self, noise):
    self.noise=noise

  def __call__(self, tensor):
    if self.noise=="Norm":
      return tensor + torch.randn(tensor.size())*0.1
    elif self.noise=="Uni":
      return tensor + torch.Tensor(np.random.uniform(-1,1,size=tensor.size()))*0.1
    elif self.noise=="Exp":
      return tensor + torch.Tensor(np.multiply(np.random.randint(-1,1,tensor.size()),np.random.exponential(scale=2.5, size=tensor.size())))*0.1
    elif self.noise=="Gamma":
      return tensor + torch.Tensor(np.multiply(np.random.randint(-1,1,tensor.size()),np.random.gamma(shape=5,scale=3, size=tensor.size())))*0.1
  
  def __repr__(self):
    return self.__class__.__name__+"(Noise={})".format(self.noise)

In [None]:
transform_norm = transforms.Compose([AddNoise("Norm")])
transform_uni = transforms.Compose([AddNoise("Uni")])
transform_exp = transforms.Compose([AddNoise("Exp")])
transform_gam = transforms.Compose([AddNoise("Gamma")])

Definition of the training loop for the denoising autoencoder.

In [None]:
n_eph=10
kf=KFold(n_splits=4)
n_trials=15

kfld=1
Models=[]
Rec_Losses=[]
auto_enc="Denoising"

loss_fn=nn.SmoothL1Loss()

for train_s, val in kf.split(train_dt):

  # Creation of the training and validation sets (folded)
  if kfld==1:
    train_dts=DataLoader([(transform_norm(train_dt[x][0]),train_dt[x][0]) for x in train_s], batch_size=256, shuffle=True, num_workers=0)
  elif kfld==2:
    train_dts=DataLoader([(transform_uni(train_dt[x][0]),train_dt[x][0]) for x in train_s], batch_size=256, shuffle=True, num_workers=0)
  elif kfld==3:
    train_dts=DataLoader([(transform_exp(train_dt[x][0]),train_dt[x][0]) for x in train_s], batch_size=256, shuffle=True, num_workers=0)
  elif kfld==4:
    train_dts=DataLoader([(transform_gam(train_dt[x][0]),train_dt[x][0]) for x in train_s], batch_size=256, shuffle=True, num_workers=0)
  val_dts=DataLoader([(train_dt[x][0],train_dt[x][1]) for x in val], batch_size=256, shuffle=False, num_workers=0)

  tr=0

  # Creation of the study
  study = optuna.create_study(direction="minimize")
  study.optimize(objective, n_trials=n_trials)
  
  kfld+=1

Rec_Losses_Denoising=np.copy(Rec_Losses)
Models_Denoising=np.copy(Models)

In [None]:
Best_Models_Denoising=BestModels()

test_err=[]
rank=1
for idx in Best_Models_Denoising:
  tst_err,_,_=TestErrors(idx)
  test_err.append(tst_err)

### Supervised Fine-Tuning
Application of transfer learning on the correct classification of images in the MNIST dataset - performed only for the two best performing standard autoencoders.

In the first approach, I consider only the encoder, freeze its parameters, and stack a simple classification model on top of it, in order to evaluate if, based on the extracted latent code, the whole model is faster and more accurate than the one built for the first homework. (I will directly use the activation function and hyperparameters that performed better)

In [None]:
class Classifier(nn.Module):

  def __init__(self, Ni, n_or, nh1_cl, nh2_cl, No):
    super().__init__()
    # Activation function
    self.act=nn.SiLU()
    # NN layers
    self.ori=nn.Linear(Ni, n_or)
    self.fc1=nn.Linear(n_or, nh1_cl)
    self.fc2=nn.Linear(nh1_cl, nh2_cl)
    self.out=nn.Linear(nh2_cl, No)

  def forward(self, sample):
    x=self.ori(sample)
    x=self.act(self.fc1(x))
    x=self.act(self.fc2(x))
    out=self.out(x)

    return out

In [None]:
def train_clas(encoder, classifier, data, device, loss_fn_class, optimizer_class):
  # Setting to train for both the encoder and the classifier to perform gradient descent (or analogous optimization procedure)
  encoder.train()
  classifier.train()
  for image, label in data:
    image=image.to(device)
    label=label.to(device)
    # Processing of the image (batch)
    enc_im=encoder(image)
    emp_label=classifier(enc_im)
    # Loss
    Loss=loss_fn_class(emp_label, label)
    optimizer_class.zero_grad()
    Loss.backward()
    optimizer_class.step()
    # Loss for the single batch
    #print('\t Training loss (single batch):', float(loss.data))

In [None]:
def test_clas(encoder, classifier, data, device, loss_fn_class):
  # Setting in evaluation mode both the encoder and the classifier
  encoder.eval()
  classifier.eval()
  
  with torch.no_grad():
    Im_emp_lb=[]
    Im_lb=[]
    for image, label in data:
      image=image.to(device)
      label=label.to(device)
      # Processing of the image
      enc_im=encoder(image)
      emp_label=classifier(enc_im)
      # Storing of new and old information
      Im_emp_lb.append(emp_label.cpu())
      Im_lb.append(label.cpu())
    Im_emp_lb=torch.cat(Im_emp_lb)
    Im_lb=torch.cat(Im_lb)
    Loss=loss_fn_class(Im_emp_lb,Im_lb)

  return Loss.data

In [None]:
auto_enc='Standard'
kf=KFold(n_splits=2)

n_or=784
nh1_cl=128
nh2_cl=128
No=10

n_eph=35
loss_fn_class=nn.CrossEntropyLoss()
Loss=[]

i=0
for train_idx, val_idx in kf.split(train_dt):
  class_train_dt=DataLoader([(train_dt[x][0],train_dt[x][1]) for x in train_idx], batch_size=256, shuffle=True, num_workers=0)
  class_val_dt=DataLoader([(train_dt[x][0],train_dt[x][1]) for x in val_idx], batch_size=256, shuffle=False, num_workers=0)
  for idx in Best_Models_Standard[:3]:
    model=Models_Standard[idx]
    encoder=Encoder(model['encoded_dim'], model['proc'], nh1, nh2, ch1, ch2, ch3, act_h=model['act_h'], act_cnv=model['act_cnv'], p=model['p'])
    name_enc_par='EncoderParameters_'+auto_enc+'_'+str(model['trial'])+'_'+str(model['kfold'])+'.pth'
    encoder.load_state_dict(torch.load(name_enc_par))
    for param_name, param in encoder.named_parameters():
      param.requires_grad=False
    classifier=Classifier(model['encoded_dim'], n_or, nh1_cl, nh2_cl, No)
    encoder.to(device)
    classifier.to(device)

    params_class=[{"params":encoder.parameters()},
                  {"params":classifier.parameters()}]
    optimizer_class=optim.Adam(params_class, lr=1e-4, weight_decay=1e-6)

    # Training
    loss_class=[]
    for eph in range(n_eph):
      train_clas(encoder=encoder,
                 classifier=classifier,
                 data=class_train_dt,
                 device=device,
                 loss_fn_class=loss_fn_class,
                 optimizer_class=optimizer_class
                )
      val_loss=test_clas(encoder=encoder,
                         classifier=classifier,
                         data=class_val_dt,
                         device=device,
                         loss_fn_class=loss_fn_class)
      
      loss_class.append(val_loss.cpu().numpy())

      if eph%5==0:
        print("-----------------------")
        print("Epoch: "+str(eph))
        print("Validation Loss: "+str(val_loss.cpu().numpy()))
        print("-----------------------")

    Loss.append({"model":model,"val_loss":loss_class})

    plt.figure(figsize=(12,8))
    plt.semilogy(loss_class)
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.grid()
    name_class="Class_"+str(model["trial"])+'_'+str(model["kfold"])+'_'+str(i)+'.png'
    plt.savefig(name_class)
    plt.show() 

    name_classifier='Classifier_'+str(idx)+'_'+str(i)+'.pth'
    torch.save(classifier.state_dict(), name_classifier)

  i+=1

for idx in Best_Models_Standard[:3]:
  for i in range(2):
    model = Models_Standard[idx]
    encoder=Encoder(model['encoded_dim'], model['proc'], nh1, nh2, ch1, ch2, ch3, act_h=model['act_h'], act_cnv=model['act_cnv'], p=model['p'])
    classifier=Classifier(model['encoded_dim'], n_or, nh1_cl, nh2_cl, No)

    name_enc_par='EncoderParameters_'+auto_enc+'_'+str(model['trial'])+'_'+str(model['kfold'])+'.pth'
    encoder.load_state_dict(torch.load(name_enc_par))
    name_clas_par='Classifier_'+str(idx)+'_'+str(i)+'.pth'
    classifier.load_state_dict(torch.load(name_clas_par))
    encoder.to(device)
    classifier.to(device)

    test_loss=test_clas(encoder=encoder,
                        classifier=classifier,
                        data=test_dts,
                        device=device,
                        loss_fn_class=loss_fn_class)
    
    print('----------------------------')
    print('Fold: '+str(i))
    print('----------------------------')
    print('Test error: '+str(test_loss.cpu().numpy()))
    print('----------------------------')
    print(model)
    print('')

### Latent Space Analysis
The analysis exploits the latent code of the best denoising model (hence the one which should have been better at capturing the original data dsitribution) and applies PCA and t-SNE to it.


In [None]:
def hook_feature_space(module, input, output):
  latent_space.append(output)

In [None]:
latent_space=[]
id = Best_Models_Denoising[0]
auto_enc='Denoising'

_, Im_out, Im_lb = TestErrors(id, hook=True)

latent_space=torch.cat(latent_space)
print(latent_space)

latent_space=latent_space.cpu()
Im_out=Im_out.cpu()
Im_lb=Im_lb.cpu()

In [None]:
pca=PCA(n_components=2)
latent_space_pca=pca.fit_transform(latent_space)

df = pd.DataFrame(latent_space_pca)
df['label'] = Im_lb

px.scatter(df, x=0, y=1, color=df.label.astype(str), hover_data=[df.index], title='PCA - latent space')

In [None]:
tsne = TSNE(n_components=2, metric='euclidean', init='pca', random_state=0)
name = tsne.fit_transform(latent_space)

df_t = pd.DataFrame(name)
df_t['label'] = Im_lb

title='tSNE Euclidean - latent space'
px.scatter(df_t, x=0, y=1, color=df_t.label.astype(str), hover_data=[df_t.index], title=title)

In [None]:
tsne = TSNE(n_components=2, metric='cosine', init='pca', random_state=0)
name = tsne.fit_transform(latent_space)

df_t = pd.DataFrame(name)
df_t['label'] = Im_lb

title='tSNE Cosine - latent space'
px.scatter(df_t, x=0, y=1, color=df_t.label.astype(str), hover_data=[df_t.index], title=title)

I now try to feed to the decoder a vector of the dimension of the encoded space presenting value randomly drawn from specific probability distributions, and see which kinds of images i am able to retrieve.

In [None]:
def image_gen(decoder, encoded_d, mean, covTrac, auto_enc):
  distributions=['beta','exponential','gamma','normal','uniform']
  for distribution in distributions:
    if distribution=='beta':
      random_enc=torch.Tensor([(np.random.randint(-1,1))*(x**(0))*np.random.beta(5,1) for x in range(encoded_d)])
    elif distribution=='exponential':
      random_enc=torch.Tensor([(np.random.randint(-1,1))*(x**(0))*np.random.exponential(scale=2.5) for x in range(encoded_d)])  # value found in many exp law
    elif distribution=='gamma':
      random_enc=torch.Tensor([(np.random.randint(-1,1))*(x**(0))*np.random.gamma(shape=5, scale=3) for x in range(encoded_d)])
    elif distribution=='normal':
      random_enc=torch.Tensor([(x^(0))*np.random.normal() for x in range(encoded_d)])
    elif distribution=='uniform':
      random_enc=torch.Tensor([(x^(0))*np.random.uniform(-1,1) for x in range(encoded_d)])

    random_enc=random_enc.unsqueeze(0).to(device)
    print(random_enc)

    
    with torch.no_grad():
      if auto_enc=='Variational':
        Final_Act=nn.Sigmoid()
        #rec_img_rand=Final_Act(decoder(random_enc))
        rec_img_rand=Final_Act(decoder(mean(random_enc)+covTrac(random_enc)))
      else:
        rec_img_rand=decoder(random_enc)
    plt.figure(figsize=(12,8))
    plt.imshow(rec_img_rand.cpu().squeeze().numpy(), cmap='gist_gray')
    plt.title('Reconstructed Image (distribution - %s)' %(distribution))  
    name="Rec_img_"+distribution+"_"+'.png'
    plt.savefig(name)
    plt.show()

In [None]:
auto_enc='Denoising'
id=Best_Models_Denoising[0]

model=Models_Denoising[id]
mean_d='boh'
covTrac_d='boh'
encoded_d=model['encoded_dim']
decoder_d=Decoder(model['encoded_dim'], model['proc'], nh1, nh2, ch1, ch2, ch3, act_h=model['act_h'], act_cnv=model['act_cnv'], p=model['p'])
name_dec_par='DecoderParameters_'+auto_enc+'_'+str(model['trial'])+'_'+str(model['kfold'])+'.pth'
decoder_d.load_state_dict(torch.load(name_dec_par))
decoder_d.to(device)

image_gen(decoder_d, encoded_d, mean_d, covTrac_d, auto_enc)

### Variational Autoencoder
Finally, I try to implement a variational autoencoder, which should improve the overall performance w.r.t. the one of the standard or denoising autoencoder. However, such improvement might be hard to appreciate due to the 'statistical simplicity' inherent in the MNIST dataset.

In [None]:
class StatNet(nn.Module):
  def __init__(self, ni, act_v):
    super().__init__()

    self.ni=ni
    self.act_v=act_v

    if act_v=='Lk':
      self.mn=nn.Sequential(
                          nn.LeakyReLU(),
                          nn.Linear(ni, ni))
    elif act_v=='Re':
      self.mn=nn.Sequential(
                          nn.ReLU(),
                          nn.Linear(ni, ni))
    elif act_v=='Se':
      self.mn=nn.Sequential(
                          nn.SELU(),
                          nn.Linear(ni, ni))

  def forward(self, sample):
    stat=self.mn(sample)
    return stat

In [None]:
n_eph=10
kf=KFold(n_splits=5)
n_trials=15

kfld=1
Models=[]
Rec_Losses=[]
auto_enc='Variational'

loss_fn=nn.BCEWithLogitsLoss(reduction='sum')  #BCE reduction=sum

for train_s, val in kf.split(train_dt):

  # Creation of the training and validation sets (folded)
  train_dts=DataLoader([(train_dataset_MN[x][0],train_dataset_MN[x][1]) for x in train_s], batch_size=256, shuffle=True, num_workers=0)
  n_batches=len(train_dts)
  val_dts=DataLoader([(train_dataset_MN[x][0],train_dataset_MN[x][1]) for x in val], batch_size=256, shuffle=False, num_workers=0)

  tr=0

  # Creation of the study
  study = optuna.create_study(direction="minimize")
  study.optimize(objective, n_trials=n_trials)
  
  kfld+=1

Rec_Losses_Variational=np.copy(Rec_Losses)
Models_Variational=np.copy(Models)

In [None]:
Best_Models_Variational=BestModels()
print(Best_Models_Variational)

test_err=[]
rank=1
for idx in Best_Models_Variational:
  tst_err,_,_=TestErrors(idx)
  test_err.append(tst_err)

In [None]:
def ImagesVar(decoder, mean, cov, encoded_dim):
  norm_distr=torch.randn(size=(25,encoded_dim), requires_grad=False).to(device)
  Final_Act=nn.Sigmoid()
  decoder.eval()
  mean.eval()
  cov.eval()
  rec_img=Final_Act(decoder(mean(norm_distr)+cov(norm_distr)))

  fig,axs=plt.subplots(5,5, figsize=(8,8))
  cnt=0
  for i in range(5):
    for j in range(5):
      axs[i,j].imshow(rec_img[cnt,:].detach().cpu().squeeze().numpy(), cmap='gist_gray')
      axs[i,j].axis('off')
      cnt+=1
  plt.tight_layout()
  plt.savefig('Samples_Variational')
  plt.show()

In [None]:
auto_enc='Variational'
id=Best_Models_Variational[0]

model=Models_Variational[id]
encoded_v=model['encoded_dim']
decoder_v=Decoder(model['encoded_dim'], model['proc'], nh1, nh2, ch1, ch2, ch3, act_h=model['act_h'], act_cnv=model['act_cnv'], p=model['p'])
mean_v=StatNet(model['encoded_dim'], model['act_v'])
covTrac_v=StatNet(model['encoded_dim'], model['act_v'])

name_dec_par='DecoderParameters_'+auto_enc+'_'+str(model['trial'])+'_'+str(model['kfold'])+'.pth'
nameMean='Mean_'+str(model['trial'])+'_'+str(model['kfold'])+'.pth'
nameCov='CovTrace_'+str(model['trial'])+'_'+str(model['kfold'])+'.pth'

decoder_v.load_state_dict(torch.load(name_dec_par))
mean_v.load_state_dict(torch.load(nameMean))
covTrac_v.load_state_dict(torch.load(nameCov))

decoder_v.to(device)
mean_v.to(device)
covTrac_v.to(device)


image_gen(decoder_v, encoded_v, mean_v, covTrac_v, auto_enc)
ImagesVar(decoder_v, mean_v, covTrac_v, encoded_v)