# Homework 2

In [None]:
import matplotlib.pyplot as plt # plotting library
import numpy as np # this module is useful to work with numerical arrays
import pandas as pd # this module is useful to work with tabular data
import random # this module will be used to select random samples from a collection
import os # this module will be used just to create directories in the local filesystem
from tqdm import tqdm # this module is useful to plot progress bars
import plotly.express as px

# Pytorch
import torch
import torchvision
from torchvision import transforms
from torch.utils.data import DataLoader
from torch import nn

# Pytorch lightning wrapper
import pytorch_lightning as pl
from pytorch_lightning import Callback
from pytorch_lightning.callbacks.early_stopping import EarlyStopping

# Hyperparams optimization
import optuna
from optuna.integration import PyTorchLightningPruningCallback


# Dataset

## Define the dataset

For this lab. we will use one of the dataset already included in PyTorch ([https://pytorch.org/docs/stable/torchvision/datasets.html#mnist](https://pytorch.org/docs/stable/torchvision/datasets.html#mnist)). These dataset do not require the definition of a custom `Dataset` class, so we can focus on the network implementation.

The MNIST dataset is a colletion of hand-written digits. The size of the images is $28 \times 28$, and there is a single channel only (black and white images).

In [None]:
### Download the data and create dataset
data_dir = 'dataset'
# With these commands the train and test datasets, respectively, are downloaded 
# automatically and stored in the local "data_dir" directory.
train_dataset = torchvision.datasets.MNIST(data_dir, train=True, download=True)
test_dataset  = torchvision.datasets.MNIST(data_dir, train=False, download=True)

Let's plot some random samples from the dataset. The first element of the sample is the actual image, while the second is the corresponding label.

In [None]:
### Plot some sample
fig, axs = plt.subplots(5, 5, figsize=(8,8))
for ax in axs.flatten():
    # random.choice allows to randomly sample from a list-like object (basically anything that can be accessed with an index, like our dataset)
    img, label = random.choice(train_dataset)
    ax.imshow(np.array(img), cmap='gist_gray')
    ax.set_title('Label: %d' % label)
    ax.set_xticks([])
    ax.set_yticks([])
plt.tight_layout()

## Define the dataset transform

In this example we are using the input images without any modification. As always, the only requirement is to transform the input data to tensors of the proper shape.

In [None]:
# In this case the train_transform and test_transform are the same, but we keep them separate for potential future updates
angles = 45
gaussian_kernel = 5
#rotate = transforms.RandomRotation(angles)
#noise = transforms.GaussianBlur(gaussian_kernel)
#random_transform = transforms.RandomChoice([rotate, noise])

train_transform = transforms.Compose([
#    random_transform, 
    transforms.ToTensor(),
])
test_transform = transforms.Compose([
    transforms.ToTensor(),
])


Since we already defined our datasets, this is an alternative (and recommended) way to add (or modify) a dataset transformation without reinitializing the dataset (very useful when the dataset initialization is slow):

In [None]:
# Set the train transform
train_dataset.transform = train_transform
# Set the test transform
test_dataset.transform = test_transform

## Define the dataloader

The dataloader allows to easily create batch of data, in this case we set a batch size of 256, and we also enable data shuffling for the training dataset.

In [None]:
train_set, val_set = torch.utils.data.random_split(train_dataset, 
                                                         [50000, 10000])
### Define train dataloader
train_dataloader = DataLoader(train_set, batch_size=256, shuffle=True)
### Define validation dataloader
val_dataloader = DataLoader(val_set, batch_size=256, shuffle=False)
### Define test dataloader
test_dataloader = DataLoader(test_dataset, batch_size=256, shuffle=False)


batch_data, batch_labels = next(iter(train_dataloader))
print(f"TRAIN BATCH SHAPE")
print(f"\t Data: {batch_data.shape}")
print(f"\t Labels: {batch_labels.shape}")

batch_data, batch_labels = next(iter(test_dataloader))
print(f"TEST BATCH SHAPE")
print(f"\t Data: {batch_data.shape}")
print(f"\t Labels: {batch_labels.shape}")

# Autoencoder

## Hyperparameters optimization

We use Optuna and pytorch lightning to optimize the hyperparameters

In [None]:
class AutoEncoder_HP(pl.LightningModule):

    def __init__(self, trial):
        super().__init__()
        encoded_space_dim = trial.suggest_int('encoded_space_dim', 2, 20)
        self.trial = trial
        
        # Encoder
        self.encoder = nn.Sequential(
                                        # First convolutional layer
                                        nn.Conv2d(1, 8, kernel_size=3, padding=1, stride=2),
                                        nn.ReLU(),
                                        # Second convolutional layer
                                        nn.Conv2d(8, 16, kernel_size=3, padding=1, stride=2),
                                        nn.ReLU(),
                                        # Third convolutional layer
                                        nn.Conv2d(16, 32, kernel_size=3, padding=0, stride=2),
                                        nn.ReLU(),
                                        # Flatten layer
                                        nn.Flatten(start_dim=1),
                                        # First linear layer
                                        nn.Linear(288, 64),
                                        nn.ReLU(),
                                        # Second linear layer (output layer)
                                        nn.Linear(64, encoded_space_dim)
                                    )
        # Decoder
        self.decoder =  nn.Sequential(
                                        # First linear layer
                                        nn.Linear(encoded_space_dim, 64),
                                        nn.ReLU(True),
                                        # Second linear layer
                                        nn.Linear(64, 3*3*32), # (64, 288)
                                        nn.ReLU(True),
                                        # Unflatten
                                        nn.Unflatten(dim=1, unflattened_size=(32, 3, 3)),
                                        # First transposed convolution
                                        nn.ConvTranspose2d(32, 16, kernel_size=3, output_padding=0, stride=2),
                                        nn.ReLU(True),
                                        # Second transposed convolution
                                        nn.ConvTranspose2d(16, 8, kernel_size=3, output_padding=1, padding=1, stride=2),
                                        nn.ReLU(True),
                                        # Third transposed convolution
                                        nn.ConvTranspose2d(8, 1, kernel_size=3, output_padding=1, padding=1, stride=2),
                                        # To obtain an output in [0,1]
                                        nn.Sigmoid()
                                    )
        self.configure_loss(loss_fn)
        
    def forward(self, x):
        # in lightning, forward defines the prediction/inference actions
        embedding = self.encoder(x)
        return embedding

    def training_step(self, batch, batch_idx):
        # training_step defined the train loop. It is independent of forward
        x, y = batch
        z = self.encoder(x)
        x_hat = self.decoder(z)
        loss = self.loss_fn(x_hat, x)
        self.log('train_loss', loss)
        return loss
    
    def validation_step(self, batch, batch_idx, loss_name='validation_loss'):
        x, y = batch
        z = self.encoder(x)
        x_hat = self.decoder(z)
        val_loss = self.loss_fn(x_hat, x)
        self.log(loss_name, val_loss, prog_bar=True)
        return val_loss
    
    def test_step(self, batch, batch_idx):
        self.validation_step(batch, batch_idx, loss_name='test_loss')

    def configure_optimizers(self):
        opt = self.trial.suggest_categorical('optimizer', ['SGD', 'Adam'])
        lr = self.trial.suggest_loguniform('learning_rate', 1e-5, 1e-3)
        wd = self.trial.suggest_loguniform('weight_decay', 1e-5, 1e-3)
        if opt=='Adam':
            optimizer = torch.optim.Adam(self.parameters(), lr=lr, 
                                        weight_decay=wd)
        elif opt=='SGD':
            optimizer = torch.optim.SGD(self.parameters(), lr=lr, 
                                        momentum=0.9, weight_decay=wd)
            
        return optimizer
    
    def configure_loss(self, loss_fn):
        self.loss_fn = loss_fn

In [None]:
class MetricsCallback(Callback):
    """PyTorch Lightning metric callback."""

    def __init__(self):
        super().__init__()
        self.metrics = []

    def on_validation_end(self, trainer, pl_module):
        self.metrics.append(trainer.callback_metrics)

PERCENT_TEST_EXAMPLES = 1
def objective(trial):
    # Function to optimize from optuna
    checkpoint_callback = pl.callbacks.ModelCheckpoint(
        os.path.join(MODEL_DIR, "trial_{}".format(trial.number)), monitor="validation_loss"
    )

    metrics_callback = MetricsCallback()
    
    trainer = pl.Trainer(
        logger=False,
        checkpoint_callback=checkpoint_callback,
        max_epochs=50,
        gpus=1 if torch.cuda.is_available() else None,
        callbacks=[metrics_callback, 
                   PyTorchLightningPruningCallback(trial, monitor="validation_loss")],
    )

    model = AutoEncoder_HP(trial)
    trainer.fit(model, train_dataloader, val_dataloader)
    
    return metrics_callback.metrics[-1]["validation_loss"].item()

In [None]:
DIR = os.getcwd()
MODEL_DIR = os.path.join(DIR, "result")

pruner = optuna.pruners.MedianPruner()
study = optuna.create_study(direction="minimize", pruner=pruner)
study.optimize(objective, n_trials=100, timeout=None)#600)

print("Number of finished trials: {}".format(len(study.trials)))

print("Best trial:")
trial = study.best_trial

print("  Value: {}".format(trial.value))

print("  Params: ")
for key, value in trial.params.items():
    print("    {}: {}".format(key, value))

In [None]:
optuna.visualization.plot_optimization_history(study, target_name='Validation loss')

In [None]:
optuna.visualization.plot_intermediate_values(study)

In [None]:
optuna.visualization.plot_parallel_coordinate(study,  target_name='Validation loss')

We now define the autoencoder with the best hyperparameters

In [None]:
class AutoEncoder(pl.LightningModule):

    def __init__(self, encoded_space_dim):
        super().__init__()
        # Encoder
        self.encoder = nn.Sequential(
                                        # First convolutional layer
                                        nn.Conv2d(1, 8, kernel_size=3, padding=1, stride=2),
                                        nn.ReLU(),
                                        # Second convolutional layer
                                        nn.Conv2d(8, 16, kernel_size=3, padding=1, stride=2),
                                        nn.ReLU(),
                                        # Third convolutional layer
                                        nn.Conv2d(16, 32, kernel_size=3, padding=0, stride=2),
                                        nn.ReLU(),
                                        # Flatten layer
                                        nn.Flatten(start_dim=1),
                                        # First linear layer
                                        nn.Linear(288, 64),
                                        nn.ReLU(),
                                        # Second linear layer (output layer)
                                        nn.Linear(64, encoded_space_dim)
                                    )
        # Decoder
        self.decoder =  nn.Sequential(
                                        # First linear layer
                                        nn.Linear(encoded_space_dim, 64),
                                        nn.ReLU(True),
                                        # Second linear layer
                                        nn.Linear(64, 3*3*32), # (64, 288)
                                        nn.ReLU(True),
                                        # Unflatten
                                        nn.Unflatten(dim=1, unflattened_size=(32, 3, 3)),
                                        # First transposed convolution
                                        nn.ConvTranspose2d(32, 16, kernel_size=3, output_padding=0, stride=2),
                                        nn.ReLU(True),
                                        # Second transposed convolution
                                        nn.ConvTranspose2d(16, 8, kernel_size=3, output_padding=1, padding=1, stride=2),
                                        nn.ReLU(True),
                                        # Third transposed convolution
                                        nn.ConvTranspose2d(8, 1, kernel_size=3, output_padding=1, padding=1, stride=2),
                                        # To obtain an output in [0,1]
                                        nn.Sigmoid()
                                    )
    def forward(self, x):
        # in lightning, forward defines the prediction/inference actions
        embedding = self.encoder(x)
        return embedding

    def training_step(self, batch, batch_idx):
        # training_step defined the train loop. It is independent of forward
        x, y = batch
        z = self.encoder(x)
        x_hat = self.decoder(z)
        loss = self.loss_fn(x_hat, x)
        self.log('train_loss', loss)
        return loss
    
    def validation_step(self, batch, batch_idx, loss_name='validation_loss'):
        x, y = batch
        z = self.encoder(x)
        x_hat = self.decoder(z)
        val_loss = self.loss_fn(x_hat, x)
        self.log(loss_name, val_loss, prog_bar=True)
        return val_loss
    
    def test_step(self, batch, batch_idx):
        self.validation_step(batch, batch_idx, loss_name='test_loss')

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=0.0008157670205790078, weight_decay=1.4786239067036069e-05)
        return optimizer
    
    def configure_loss(self, loss_fn):
        self.loss_fn = loss_fn

In [None]:
### Set the random seed for reproducible results
torch.manual_seed(49)
np.random.seed(49)

### Initialize the two networks
encoded_space_dim = 9
auto_enc = AutoEncoder(encoded_space_dim=encoded_space_dim)

Let's check if all the shapes are correct.

In [None]:
### Some examples
# Take an input image (remember to add the batch dimension)
img, _ = test_dataset[0]
img = img.unsqueeze(0) # Add the batch dimension in the first axis
print('Original image shape:', img.shape)
# Encode the image
img_enc = auto_enc.forward(img)
print('Encoded image shape:', img_enc.shape)
# Decode the image
dec_img = auto_enc.decoder(img_enc)
print('Decoded image shape:', dec_img.shape)

# Training

In [None]:
### Define the loss function
loss_fn = torch.nn.MSELoss()
# Check if the GPU is available
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print(f'Selected device: {device}')

# Move both the encoder and the decoder to the selected device
auto_enc.to(device)
auto_enc.configure_loss(loss_fn)

In [None]:
trainer = pl.Trainer(gpus=1, max_epochs=100, progress_bar_refresh_rate=20, 
                     callbacks=[EarlyStopping(monitor='validation_loss')])

In [None]:
trainer.fit(auto_enc, train_dataloader, val_dataloader)

## Testing function

In [None]:
trainer.test(auto_enc, test_dataloader)

In [None]:
# Saving the model
trainer.save_checkpoint("best_model.ckpt")

In [None]:
### Plot some sample
fig, axs = plt.subplots(2, 4, figsize=(8,6))
for ax in axs:
    # random.choice allows to randomly sample from a list-like object (basically anything that can be accessed with an index, like our dataset)
    img, label = random.choice(test_dataset)
    img1 = img[0]
    with torch.no_grad():
        encoded_img  = auto_enc.encoder(img.unsqueeze(0).to(device))
        decoded_img  = auto_enc.decoder(encoded_img)
    
    ax[0].imshow(np.array(img1), cmap='gist_gray')
    ax[0].set_title('Original, Label: %d' % label)
    ax[0].set_xticks([])
    ax[0].set_yticks([])

    ax[1].imshow(decoded_img.squeeze().cpu().numpy(), cmap='gist_gray')
    ax[1].set_title('Reconstructed, Label: %d' % label)
    ax[1].set_xticks([])
    ax[1].set_yticks([])
    
    img, label = random.choice(test_dataset)
    img1 = img[0]
    with torch.no_grad():
        encoded_img  = auto_enc.encoder(img.unsqueeze(0).to(device))
        decoded_img  = auto_enc.decoder(encoded_img)
    
    ax[2].imshow(np.array(img1), cmap='gist_gray')
    ax[2].set_title('Original, Label: %d' % label)
    ax[2].set_xticks([])
    ax[2].set_yticks([])

    ax[3].imshow(decoded_img.squeeze().cpu().numpy(), cmap='gist_gray')
    ax[3].set_title('Reconstructed, Label: %d' % label)
    ax[3].set_xticks([])
    ax[3].set_yticks([])
    
    
plt.tight_layout()

# Denoiser
To implement the denoiser we simply add some gaussian noise to the image during the training/evaluation procedures, using the not-noisy image for the computation of the loss

In [None]:
class Denoiser(pl.LightningModule):

    def __init__(self, encoded_space_dim):
        super().__init__()
        # Encoder
        self.encoder = nn.Sequential(
                                        # First convolutional layer
                                        nn.Conv2d(1, 8, kernel_size=3, padding=1, stride=2),
                                        nn.ReLU(),
                                        # Second convolutional layer
                                        nn.Conv2d(8, 16, kernel_size=3, padding=1, stride=2),
                                        nn.ReLU(),
                                        # Third convolutional layer
                                        nn.Conv2d(16, 32, kernel_size=3, padding=0, stride=2),
                                        nn.ReLU(),
                                        # Flatten layer
                                        nn.Flatten(start_dim=1),
                                        # First linear layer
                                        nn.Linear(288, 64),
                                        nn.ReLU(),
                                        # Second linear layer (output layer)
                                        nn.Linear(64, encoded_space_dim)
                                    )
        # Decoder
        self.decoder =  nn.Sequential(
                                        # First linear layer
                                        nn.Linear(encoded_space_dim, 64),
                                        nn.ReLU(True),
                                        # Second linear layer
                                        nn.Linear(64, 3*3*32), # (64, 288)
                                        nn.ReLU(True),
                                        # Unflatten
                                        nn.Unflatten(dim=1, unflattened_size=(32, 3, 3)),
                                        # First transposed convolution
                                        nn.ConvTranspose2d(32, 16, kernel_size=3, output_padding=0, stride=2),
                                        nn.ReLU(True),
                                        # Second transposed convolution
                                        nn.ConvTranspose2d(16, 8, kernel_size=3, output_padding=1, padding=1, stride=2),
                                        nn.ReLU(True),
                                        # Third transposed convolution
                                        nn.ConvTranspose2d(8, 1, kernel_size=3, output_padding=1, padding=1, stride=2),
                                        # To obtain an output in [0,1]
                                        nn.Sigmoid()
                                    )
        

        
    def forward(self, x):
        # in lightning, forward defines the prediction/inference actions
        embedding = self.encoder(x)
        return embedding

    def training_step(self, batch, batch_idx):
        # training_step defined the train loop. It is independent of forward
        x, y = batch
        # Apply noise to x
        mean = torch.randn(1).to(device) * 1
        std = torch.randn(1).to(device) * 0.5 + 0.5
        noisy_x = x + torch.randn(x.size()).to(device) *std + mean
        z = self.encoder(noisy_x)
        x_hat = self.decoder(z)
        loss = self.loss_fn(x_hat, x)
        self.log('train_loss', loss)
        return loss
    
    def validation_step(self, batch, batch_idx, loss_name='validation_loss'):
        x, y = batch
        # Apply noise to x
        mean = torch.randn(1).to(device) * 1
        std = torch.randn(1).to(device) * 0.5 + 0.5
        noisy_x = x + torch.randn(x.size()).to(device) *std + mean
        z = self.encoder(noisy_x)
        x_hat = self.decoder(z)
        val_loss = self.loss_fn(x_hat, x)
        self.log(loss_name, val_loss, prog_bar=True)
        return val_loss
    
    def test_step(self, batch, batch_idx):
        self.validation_step(batch, batch_idx, loss_name='test_loss')

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=0.0008157670205790078, weight_decay=1.4786239067036069e-05)
        return optimizer
    
    def configure_loss(self, loss_fn):
        self.loss_fn = loss_fn

In [None]:
### Set the random seed for reproducible results
torch.manual_seed(49)
np.random.seed(49)

### Initialize the two networks
encoded_space_dim = 9
denoise = Denoiser(encoded_space_dim=encoded_space_dim)

### Define the loss function
loss_fn = torch.nn.MSELoss()
# Check if the GPU is available
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print(f'Selected device: {device}')

# Move both the encoder and the decoder to the selected device
denoise.to(device)
denoise.configure_loss(loss_fn)

In [None]:
# Training
trainer = pl.Trainer(gpus=1, max_epochs=100, progress_bar_refresh_rate=20, 
                     callbacks=[EarlyStopping(monitor='validation_loss')])
trainer.fit(denoise, train_dataloader, val_dataloader)

In [None]:
trainer.test(denoise, test_dataloader)

In [None]:
### Plot some sample
fig, axs = plt.subplots(3, 6, figsize=(10,6))
for ax in axs:
    # random.choice allows to randomly sample from a list-like object (basically anything that can be accessed with an index, like our dataset)
    img, label = random.choice(test_dataset)
    img1 = img[0]
    ax[0].imshow(np.array(img1), cmap='gist_gray')
    ax[0].set_title('Original, Label: %d' % label)
    ax[0].set_xticks([])
    ax[0].set_yticks([])
    
    img1 = img1 + np.random.normal(0, 1, size=img1.shape)
    ax[1].imshow(np.array(img1), cmap='gist_gray')
    ax[1].set_title('Noisy, Label: %d' % label)
    ax[1].set_xticks([])
    ax[1].set_yticks([])
    
    img = img.unsqueeze(0).to(device)
    denoise.eval()
    with torch.no_grad():
        encoded_img  = denoise.encoder(img)
        decoded_img  = denoise.decoder(encoded_img)
    ax[2].imshow(np.array(decoded_img.cpu()[0][0]), cmap='gist_gray')
    ax[2].set_title('Decoded, Label: %d' % label)
    ax[2].set_xticks([])
    ax[2].set_yticks([])
    
    # random.choice allows to randomly sample from a list-like object (basically anything that can be accessed with an index, like our dataset)
    img, label = random.choice(train_dataset)
    img1 = img[0]
    ax[3].imshow(np.array(img1), cmap='gist_gray')
    ax[3].set_title('Original, Label: %d' % label)
    ax[3].set_xticks([])
    ax[3].set_yticks([])
    
    img1 = img1 + np.random.normal(0, 1, size=img1.shape)
    ax[4].imshow(np.array(img1), cmap='gist_gray')
    ax[4].set_title('Noisy, Label: %d' % label)
    ax[4].set_xticks([])
    ax[4].set_yticks([])
    
    img = img.unsqueeze(0).to(device)
    denoise.eval()
    with torch.no_grad():
        encoded_img  = denoise.encoder(img)
        decoded_img  = denoise.decoder(encoded_img)
    ax[5].imshow(np.array(decoded_img.cpu()[0][0]), cmap='gist_gray')
    ax[5].set_title('Decoded, Label: %d' % label)
    ax[5].set_xticks([])
    ax[5].set_yticks([])
    
    
    
plt.tight_layout()

# Fine tuning
We will apply the methods of transfer learning: we will keep all the network up to the encoded space fixed, and add on top of it a fully-connected layer and a readout layer. We will train only these last two.

In [None]:
class Classifier(pl.LightningModule):

    def __init__(self, encoded_space_dim, pretrained):
        super().__init__()
        # Encoder
        self.encoder = pretrained.encoder
        # Decoder
        self.fine_tune =  nn.Sequential(
                                        # First linear layer
                                        nn.Linear(encoded_space_dim, 64),
                                        nn.ReLU(True),
                                        # Second linear layer
                                        nn.Linear(64, 10),
                                        nn.LogSoftmax()
                                       
                                    )
        self.accuracy = pl.metrics.Accuracy()
        
    def forward(self, x):
        # in lightning, forward defines the prediction/inference actions
        x = self.encoder(x)
        x = self.fine_tune(x)
        return x

    def training_step(self, batch, batch_idx):
        # training_step defined the train loop. It is independent of forward
        x, y = batch
        z = self.forward(x)
        loss = self.loss_fn(z, y)
        self.log('train_loss', loss)
        return loss
    
    def validation_step(self, batch, batch_idx, loss_name='validation_loss'):
        x, y = batch
        z = self.forward(x)
        val_loss = self.loss_fn(z, y)
        self.log(loss_name, val_loss, prog_bar=True)
        return val_loss
    
    def test_step(self, batch, batch_idx):
        x, y = batch
        z = self.forward(x)
        self.log('accuracy', self.accuracy(z, y), prog_bar=True)
        return self.accuracy(z, y)

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=0.00013901410327079125, weight_decay=7.274106352015157e-05)
        return optimizer
    
    def configure_loss(self, loss_fn):
        self.loss_fn = loss_fn

In [None]:
pretrained = AutoEncoder.load_from_checkpoint(checkpoint_path="best_model.ckpt", encoded_space_dim=encoded_space_dim)
fine_tuner = Classifier(encoded_space_dim, pretrained)

# Freeze params
for param in fine_tuner.encoder.parameters():
    param.requires_grad = False

neg_log_like = torch.nn.NLLLoss()

fine_tuner.configure_loss(neg_log_like)
fine_tuner.to(device)


In [None]:
# Training
trainer = pl.Trainer(gpus=1, max_epochs=50, progress_bar_refresh_rate=20, 
                     callbacks=[EarlyStopping(monitor='validation_loss')])
trainer.fit(fine_tuner, train_dataloader, val_dataloader)

In [None]:
# Test accuracy
trainer.test(fine_tuner, test_dataloader)

# Variational Autoencoders

We want to encode the samples into probability distribution. For a full explanation see the report. 

In [None]:
class VAE(pl.LightningModule):

    def __init__(self, encoded_space_dim):
        super().__init__()
        # Encoder
        self.g1 = nn.Sequential(
                                        # First convolutional layer
                                        nn.Conv2d(1, 8, kernel_size=3, padding=1, stride=2),
                                        nn.ReLU(),
                                        # Second convolutional layer
                                        nn.Conv2d(8, 16, kernel_size=3, padding=1, stride=2),
                                        nn.ReLU(),
                                        # Third convolutional layer
                                        nn.Conv2d(16, 32, kernel_size=3, padding=0, stride=2),
                                        nn.ReLU(),
                                        # Flatten layer
                                        nn.Flatten(start_dim=1)
                                )
        self.g2 = nn.Sequential(
                                        # First linear layer
                                        nn.Linear(288, 64),
                                        nn.ReLU(),
                                        # Second linear layer (output layer). Average of gaussian distr
                                        nn.Linear(64, encoded_space_dim)
                                        
                                    )
        
        self.h2 = nn.Sequential(
                                        # First linear layer
                                        nn.Linear(288, 64),
                                        nn.ReLU(),
                                        # Second linear layer (output layer). Diagonal of covariance matrix
                                        nn.Linear(64, encoded_space_dim)
                                    )
        # Decoder
        self.decoder =  nn.Sequential(
                                        # First linear layer
                                        nn.Linear(encoded_space_dim, 64),
                                        nn.ReLU(True),
                                        # Second linear layer
                                        nn.Linear(64, 3*3*32), # (64, 288)
                                        nn.ReLU(True),
                                        # Unflatten
                                        nn.Unflatten(dim=1, unflattened_size=(32, 3, 3)),
                                        # First transposed convolution
                                        nn.ConvTranspose2d(32, 16, kernel_size=3, output_padding=0, stride=2),
                                        nn.ReLU(True),
                                        # Second transposed convolution
                                        nn.ConvTranspose2d(16, 8, kernel_size=3, output_padding=1, padding=1, stride=2),
                                        nn.ReLU(True),
                                        # Third transposed convolution
                                        nn.ConvTranspose2d(8, 1, kernel_size=3, output_padding=1, padding=1, stride=2),
                                        # To obtain an output in [0,1]
                                        nn.Sigmoid()
                                    )

        
        
        
    def forward(self, x):
        # in lightning, forward defines the prediction/inference actions
        encode = self.g1(x)
        means = self.g2(encode)
        cov = self.h2(encode)
        sample = cov*torch.randn(cov.size()).to(self.device) + means
        return sample

    def training_step(self, batch, batch_idx):
        # training_step defined the train loop. It is independent of forward
        x, y = batch
        encode = self.g1(x)
        means = self.g2(encode)
        cov = self.h2(encode)
        # Sampling
        z = cov*torch.randn(cov.size()).to(self.device) + means
        
        x_hat = self.decoder(z)
        loss = self.loss_fn(x_hat, x) + self.kl_reg*self._kl_div(means, cov)
        self.log('train_loss', loss)
        return loss
    
    def validation_step(self, batch, batch_idx, loss_name='validation_loss'):
        x, y = batch
        encode = self.g1(x)
        means = self.g2(encode)
        cov = self.h2(encode)
        z = cov*torch.randn(cov.size()).to(self.device) + means
        
        x_hat = self.decoder(z)
        val_loss = self.loss_fn(x_hat, x) + self.kl_reg*self._kl_div(means, cov)
        self.log(loss_name, val_loss, prog_bar=True)
        return val_loss
    
    def test_step(self, batch, batch_idx):
        self.validation_step(batch, batch_idx, loss_name='test_loss')

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=0.0008157670205790078, weight_decay=1.4786239067036069e-05)
        return optimizer
    
    def configure_loss(self, loss_fn, kl_reg):
        self.loss_fn = loss_fn
        self.kl_reg = kl_reg
        
    def _kl_div(self, means, stds):
        kl = 0.5*torch.sum( stds**2+means**2-1-torch.log( stds**2 ) )
        return kl

In [None]:
### Set the random seed for reproducible results
torch.manual_seed(49)
np.random.seed(49)

### Initialize the two networks
encoded_space_dim = 9
vae = VAE(encoded_space_dim=encoded_space_dim)

### Define the loss function
loss_fn = torch.nn.MSELoss()
# Check if the GPU is available
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print(f'Selected device: {device}')

# Move both the encoder and the decoder to the selected device
vae.to(device)
vae.configure_loss(loss_fn, 1e-5)

In [None]:
# Training
trainer = pl.Trainer(gpus=1, max_epochs=100, progress_bar_refresh_rate=20, 
                     callbacks=[EarlyStopping(monitor='validation_loss')])
trainer.fit(vae, train_dataloader, val_dataloader)

In [None]:
# Test accuracy
trainer.test(vae, test_dataloader)

In [None]:
trainer.save_checkpoint("best_vae.ckpt")

In [None]:
img, label = random.choice(test_dataset)
original = img.squeeze().numpy()
img = img.unsqueeze(0).to(device)
vae.eval()
with torch.no_grad():
    encoded_samples_start  = vae.forward(img)

if encoded_space_dim == 9:
    # Decode sample
    auto_enc.eval()
    with torch.no_grad():
        generated_img  = vae.decoder(encoded_samples_start+1.5*torch.randn(encoded_samples_start.size()).to(device))
        decoded_img  = vae.decoder(encoded_samples_start)
        
    fig, ax = plt.subplots(2, 6, figsize=(16,4))
    
    ax = ax.flatten()
    
    ax[0].set_title(f'Original, Label: {label}')
    ax[0].imshow(original, cmap='gist_gray')
    ax[0].set_xticks([])
    ax[0].set_yticks([])
    
    ax[1].set_title(f'Decoded')
    ax[1].imshow(decoded_img.squeeze().cpu().numpy(), cmap='gist_gray')
    ax[1].set_xticks([])
    ax[1].set_yticks([])
    
    for i in range(2, len(ax)):
        k = 0
        shift = torch.zeros( encoded_samples_start.size() ).to(device)
        shift[0][k] += encoded_samples_start[0][k]/5*i
        with torch.no_grad():
            generated_img  = vae.decoder(encoded_samples_start+shift)
        ax[i].set_title('Shift: %.3f' %(shift[0][k]) )
        ax[i].imshow(generated_img.squeeze().cpu().numpy(), cmap='gist_gray')
        ax[i].set_xticks([])
        ax[i].set_yticks([])
    plt.show()

# Encoded space analysis

In [None]:
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from matplotlib.patches import Ellipse

#### Classical Autencoder

In [None]:
# Load network parameters
auto_enc = AutoEncoder.load_from_checkpoint("best_model.ckpt", encoded_space_dim=encoded_space_dim).to(device)

In [None]:
### Get the encoded representation of the test samples
encoded_samples = []
enc_aut = np.zeros(encoded_space_dim+1)
for sample in tqdm(test_dataset):
    img = sample[0].unsqueeze(0).to(device)
    label = sample[1]
    # Encode image
    auto_enc.eval()
    with torch.no_grad():
        encoded_img  = auto_enc.encoder(img)
    # Append to list
    encoded_img = encoded_img.flatten().cpu().numpy()
    encoded_sample = {f"Enc. Variable {i}": enc for i, enc in enumerate(encoded_img)}
    encoded_sample['label'] = label
    encoded_samples.append(encoded_sample)
    enc_aut = np.vstack( (enc_aut, np.hstack( (encoded_img, label)) ) )

In [None]:
# Convert to a dataframe
encoded_samples = pd.DataFrame(encoded_samples)

# PCA
n_components = 2
pca = PCA(n_components=n_components)
pca.fit(encoded_samples.iloc[:,0:encoded_space_dim])
columns = [ f'Enc. Variable {i}' for i in range(n_components)]
compressed_samples = pd.DataFrame( pca.transform(encoded_samples.iloc[:,0:encoded_space_dim]), columns=columns )

if n_components == 2:
    fig = px.scatter(compressed_samples, x='Enc. Variable 0', y='Enc. Variable 1', 
           color=encoded_samples.label.astype(str), opacity=0.7)
elif n_components == 3:
    fig = px.scatter_3d(compressed_samples,  x='Enc. Variable 0', y='Enc. Variable 1', z='Enc. Variable 2',
                    color=encoded_samples.label.astype(str), opacity=0.7)
fig.show()

In [None]:
# Convert to a dataframe
encoded_samples = pd.DataFrame(encoded_samples)

# PCA
n_components = 2
tsne = TSNE(n_components=n_components)
tsne.fit(encoded_samples.iloc[:, 0:encoded_space_dim])
columns = [ f'Enc. Variable {i}' for i in range(n_components)]
compressed_samples = pd.DataFrame( tsne.fit_transform(encoded_samples.iloc[:,0:encoded_space_dim]), columns=columns )

if n_components == 2:
    fig = px.scatter(compressed_samples, x='Enc. Variable 0', y='Enc. Variable 1', 
           color=encoded_samples.label.astype(str), opacity=0.7)
elif n_components == 3:
    fig = px.scatter_3d(compressed_samples,  x='Enc. Variable 0', y='Enc. Variable 1', z='Enc. Variable 2',
                    color=encoded_samples.label.astype(str), opacity=0.7)
fig.show()

In [None]:
compr = pca.transform(enc_aut[:,:encoded_space_dim]) 
compr = np.hstack( (compr, enc_aut[:, -1].reshape(10001, 1) ) )
pca_aut = np.zeros(4)
for i in range(10):
    means = compr[ compr[:, -1]==i ].mean(axis=0)
    stds = compr[ compr[:, -1]==i ].std(axis=0)
    tot = np.hstack( (means[:2], stds[:2]) )
    pca_aut = np.vstack( (pca_aut, tot) )
    
pca_aut = pca_aut[1:, :]

compr = tsne.fit_transform(enc_aut[:,:encoded_space_dim]) 
compr = np.hstack( (compr, enc_aut[:, -1].reshape(10001, 1) ) )
tsne_aut = np.zeros(4)
for i in range(10):
    means = compr[ compr[:, -1]==i ].mean(axis=0)
    stds = compr[ compr[:, -1]==i ].std(axis=0)
    tot = np.hstack( (means[:2], stds[:2]) )
    tsne_aut = np.vstack( (tsne_aut, tot) )
    
tsne_aut = tsne_aut[1:, :]

In [None]:
# Average difference of the two standard deviation along the two dimension.
# If the distribution is perfectly gaussian then the difference should be 0
avg_diff = [ np.abs(pca_aut[:, 2]- pca_aut[:, 3]).mean() , np.abs(tsne_aut[:, 2]- tsne_aut[:, 3]).mean() ]
print(avg_diff)

In [None]:
colors = ['pink', 'cyan', 'red', 'lime', 'green', 'orange', 'gray', 'black', 'navy', 'purple']

fig, ax = plt.subplots(1, 2, figsize=(15, 6))
for i in range(10):
    ellipse = Ellipse((pca_aut[i, 0], pca_aut[i, 1]), width=pca_aut[i, 2] * 2, height=pca_aut[i, 3] * 2, 
                      alpha=0.5, facecolor = colors[i], label=str(i))
    
    ax[0].add_patch(ellipse)
ax[0].legend()
    
ax[0].scatter(pca_aut[:, 0], pca_aut[:, 1], color='black')
ax[0].set_xlim( (1.5*pca_aut[:, 0].min(), 1.5*pca_aut[:, 0].max()) )
ax[0].set_ylim( (1.5*pca_aut[:, 1].min(), 1.5*pca_aut[:, 1].max()) )
ax[0].set_xlabel('Encoded variable 0')
ax[0].set_ylabel('Encoded variable 1')
ax[0].set_title('PCA', fontsize=16)

for i in range(10):
    ellipse = Ellipse((tsne_aut[i, 0], tsne_aut[i, 1]), width=tsne_aut[i, 2] * 2, height=tsne_aut[i, 3] * 2, 
                      alpha=0.5, facecolor = colors[i], label=str(i))
    
    ax[1].add_patch(ellipse)
ax[1].legend()
    
ax[1].scatter(tsne_aut[:, 0], tsne_aut[:, 1], color='black')
ax[1].set_xlim( (1.5*tsne_aut[:, 0].min(), 1.5*tsne_aut[:, 0].max()) )
ax[1].set_ylim( (1.5*tsne_aut[:, 1].min(), 1.5*tsne_aut[:, 1].max()) )
ax[1].set_xlabel('Encoded variable 0')
ax[1].set_ylabel('Encoded variable 1')
ax[1].set_title('t-SNE', fontsize=16)

plt.show()

In [None]:
# Generation of new samples
### Plot some sample
fig, ax = plt.subplots(1, 3, figsize=(8,6))
# random.choice allows to randomly sample from a list-like object (basically anything that can be accessed with an index, like our dataset)
img, label = random.choice(test_dataset)
img1 = img[0]
with torch.no_grad():
    encoded_img  = auto_enc.encoder(img.unsqueeze(0).to(device))
    decoded_img  = auto_enc.decoder(20*torch.randn_like(encoded_img).to(device) )

ax[0].imshow( decoded_img.squeeze().cpu().numpy(), cmap='gist_gray')
ax[0].set_title('Generated sample' )
ax[0].set_xticks([])
ax[0].set_yticks([])

with torch.no_grad():
    decoded_img  = auto_enc.decoder(20*torch.randn_like(encoded_img).to(device) )

ax[1].imshow(decoded_img.squeeze().cpu().numpy(), cmap='gist_gray')
ax[1].set_title('Generated sample' )
ax[1].set_xticks([])
ax[1].set_yticks([])

img, label = random.choice(test_dataset)
img1 = img[0]
with torch.no_grad():
    decoded_img  = auto_enc.decoder(20*torch.randn_like(encoded_img).to(device) )

ax[2].imshow(decoded_img.squeeze().cpu().numpy(), cmap='gist_gray')
ax[2].set_title('Generated sample' )
ax[2].set_xticks([])
ax[2].set_yticks([])
    
    
plt.tight_layout()

### Variational autoencoder

In [None]:
# Load network parameters
vae = VAE.load_from_checkpoint("best_vae.ckpt", encoded_space_dim=encoded_space_dim).to(device)

In [None]:
enc_vae = np.zeros(encoded_space_dim+1)
### Get the encoded representation of the test samples
encoded_samples = []
vae.to(device)
for sample in tqdm(test_dataset):
    img = sample[0].unsqueeze(0).to(device)
    label = sample[1]
    # Encode image
    vae.eval()
    with torch.no_grad():
        encoded_img  = vae.forward(img)
    # Append to list
    encoded_img = encoded_img.flatten().cpu().numpy()
    encoded_sample = {f"Enc. Variable {i}": enc for i, enc in enumerate(encoded_img)}
    encoded_sample['label'] = label
    encoded_samples.append(encoded_sample)
    enc_vae = np.vstack( (enc_vae, np.hstack((encoded_img, label))  ) )

In [None]:
# Convert to a dataframe
encoded_samples = pd.DataFrame(encoded_samples)
n_components = 2
pca = PCA(n_components=n_components)
pca.fit(encoded_samples.iloc[:, 0:encoded_space_dim])
columns = [ f'Enc. Variable {i}' for i in range(n_components)]
compressed_samples = pd.DataFrame( pca.transform(encoded_samples.iloc[:, 0:encoded_space_dim]), columns=columns )

if n_components == 2:
    fig = px.scatter(compressed_samples, x='Enc. Variable 0', y='Enc. Variable 1', 
           color=encoded_samples.label.astype(str), opacity=0.7)
elif n_components == 3:
    fig = px.scatter_3d(compressed_samples,  x='Enc. Variable 0', y='Enc. Variable 1', z='Enc. Variable 2',
                    color=encoded_samples.label.astype(str), opacity=0.7)
fig.show()

In [None]:
# Convert to a dataframe
encoded_samples = pd.DataFrame(encoded_samples)

# PCA
n_components = 2
tsne = TSNE(n_components=n_components)
tsne.fit(encoded_samples.iloc[:, 0:encoded_space_dim])
columns = [ f'Enc. Variable {i}' for i in range(n_components)]
compressed_samples = pd.DataFrame( tsne.fit_transform(encoded_samples.iloc[:,0:encoded_space_dim]), columns=columns )

if n_components == 2:
    fig = px.scatter(compressed_samples, x='Enc. Variable 0', y='Enc. Variable 1', 
           color=encoded_samples.label.astype(str), opacity=0.7)
elif n_components == 3:
    fig = px.scatter_3d(compressed_samples,  x='Enc. Variable 0', y='Enc. Variable 1', z='Enc. Variable 2',
                    color=encoded_samples.label.astype(str), opacity=0.7)
fig.show()

In [None]:
compr = pca.transform(enc_vae[:,:encoded_space_dim]) 
compr = np.hstack( (compr, enc_vae[:, -1].reshape(10001, 1) ) )
pca_vae = np.zeros(4)
for i in range(10):
    means = compr[ compr[:, -1]==i ].mean(axis=0)
    stds = compr[ compr[:, -1]==i ].std(axis=0)
    tot = np.hstack( (means[:2], stds[:2]) )
    pca_vae = np.vstack( (pca_vae, tot) )
    
pca_vae = pca_vae[1:, :]

compr = tsne.fit_transform(enc_vae[:,:encoded_space_dim]) 
compr = np.hstack( (compr, enc_vae[:, -1].reshape(10001, 1) ) )
tsne_vae = np.zeros(4)
for i in range(10):
    means = compr[ compr[:, -1]==i ].mean(axis=0)
    stds = compr[ compr[:, -1]==i ].std(axis=0)
    tot = np.hstack( (means[:2], stds[:2]) )
    tsne_vae = np.vstack( (tsne_vae, tot) )
    
tsne_vae = tsne_vae[1:, :]

In [None]:
# Average difference of the two standard deviation along the two dimension.
# If the distribution is perfectly gaussian then the difference should be 0
avg_diff = [ np.abs(pca_vae[:, 2]- pca_vae[:, 3]).mean() , np.abs(tsne_vae[:, 2]- tsne_vae[:, 3]).mean() ]
print(avg_diff)

In [None]:
colors = ['pink', 'cyan', 'red', 'lime', 'green', 'orange', 'gray', 'black', 'navy', 'purple']

fig, ax = plt.subplots(1, 2, figsize=(15, 6))
for i in range(10):
    ellipse = Ellipse((pca_vae[i, 0], pca_vae[i, 1]), width=pca_vae[i, 2] * 2, height=pca_vae[i, 3] * 2, 
                      alpha=0.5, facecolor = colors[i], label=str(i))
    
    ax[0].add_patch(ellipse)
ax[0].legend()
    
ax[0].scatter(pca_vae[:, 0], pca_vae[:, 1], color='black')
ax[0].set_xlim( (1.5*pca_vae[:, 0].min(), 1.5*pca_vae[:, 0].max()) )
ax[0].set_ylim( (1.5*pca_vae[:, 1].min(), 1.5*pca_vae[:, 1].max()) )
ax[0].set_xlabel('Encoded variable 0')
ax[0].set_ylabel('Encoded variable 1')
ax[0].set_title('PCA', fontsize=16)

for i in range(10):
    ellipse = Ellipse((tsne_vae[i, 0], tsne_vae[i, 1]), width=tsne_vae[i, 2] * 2, height=tsne_vae[i, 3] * 2, 
                      alpha=0.5, facecolor = colors[i], label=str(i))
    
    ax[1].add_patch(ellipse)
ax[1].legend()
    
ax[1].scatter(tsne_vae[:, 0], tsne_vae[:, 1], color='black')
ax[1].set_xlim( (1.5*tsne_vae[:, 0].min(), 1.5*tsne_vae[:, 0].max()) )
ax[1].set_ylim( (1.5*tsne_vae[:, 1].min(), 1.5*tsne_vae[:, 1].max()) )
ax[1].set_xlabel('Encoded variable 0')
ax[1].set_ylabel('Encoded variable 1')
ax[1].set_title('t-SNE', fontsize=16)

plt.show()

In [None]:
# Generation of new samples
### Plot some sample
fig, ax = plt.subplots(1, 3, figsize=(8,6))
# random.choice allows to randomly sample from a list-like object (basically anything that can be accessed with an index, like our dataset)
img, label = random.choice(test_dataset)
img1 = img[0]
with torch.no_grad():
    encoded_img  = vae.forward(img.unsqueeze(0).to(device))
    decoded_img  = vae.decoder(torch.randn_like(encoded_img).to(device) )

ax[0].imshow( decoded_img.squeeze().cpu().numpy(), cmap='gist_gray')
ax[0].set_title('Generated sample' )
ax[0].set_xticks([])
ax[0].set_yticks([])

with torch.no_grad():
    decoded_img  = vae.decoder(torch.randn_like(encoded_img).to(device) )

ax[1].imshow(decoded_img.squeeze().cpu().numpy(), cmap='gist_gray')
ax[1].set_title('Generated sample' )
ax[1].set_xticks([])
ax[1].set_yticks([])

img, label = random.choice(test_dataset)
img1 = img[0]
with torch.no_grad():
    decoded_img  = vae.decoder(torch.randn_like(encoded_img).to(device) )

ax[2].imshow(decoded_img.squeeze().cpu().numpy(), cmap='gist_gray')
ax[2].set_title('Generated sample' )
ax[2].set_xticks([])
ax[2].set_yticks([])
    
    
plt.tight_layout()