# NEURAL NETWORKS AND DEEP LEARNING

---
A.A. 2021/22 (6 CFU) - Dr. Alberto Testolin, Dr. Umberto Michieli
---


# Homework 1 - Supervised Deep Learning

### Author: Michele Guadagnini - Mt.1230663

In [None]:
# Total running time on Google Colab with GPU backend: < 30 min

# Classification Task

In [None]:
### ADDITIONAL LIBRARIES THAT NEED INSTALLATION (uncomment if needed)

#!pip install torchinfo
#!pip install optuna
#!pip install pytorch-lightning
#!pip install plotly

In [None]:
# PyTorch imports
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, Subset
from torch.utils.data import random_split
import torchvision
from torchvision import transforms

# python imports
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import time
import copy
import logging
import datetime

# additional libraries
from torchinfo import summary
import optuna
import pytorch_lightning as pl


In [None]:
# reduce verbosity 
logging.getLogger("optuna").setLevel(logging.ERROR)
logging.getLogger("pytorch_lightning").setLevel(logging.WARNING)

In [None]:
# setting the device
if torch.cuda.is_available():
    print('GPU available')
    device = torch.device("cuda")
    USE_GPU = True
else:
    print('GPU not available')
    device = torch.device("cpu")
    print("Available CPU cores:", os.cpu_count())
    USE_GPU = False

In [None]:
from pytorch_lightning.utilities.seed import seed_everything

# seed to set random states
magic_num = 23 

### set random state to have reproducible results
seed_everything(seed=magic_num) 
### the function above internally calls the followings:
#    random.seed(seed)
#    np.random.seed(seed)
#    torch.manual_seed(seed)
#    torch.cuda.manual_seed_all(seed)

## Guidelines

* The goal is to train a neural network that maps an input image (from fashionMNIST) to one of ten classes (multi-class classification problem with mutually exclusive classes).
* Define a proper loss (e.g. [torch.nn.CrossEntropyLoss](https://pytorch.org/docs/stable/generated/torch.nn.CrossEntropyLoss.html#torch.nn.CrossEntropyLoss))
* Also here, consider to create a validation set from you training data, or use a k-fold cross-validation strategy.
* Pay attention to the shape, data type and output values range. If needed, modify them accordingly to your implementation (read carefully the documentation of the layers that you use, e.g. [torch.nn.Conv2d](https://pytorch.org/docs/stable/generated/torch.nn.Conv2d.html)).
* Explore different optimizers, acivation functions, network architectures. Analyze the effect of different regularization methods, such as dropout layers, random transformations (image rotation, scaling, add noise...) or L2 regularization (weight decay).

# SOLUTION

## Outline of the notebook

1. **Dataset download and organization**: FashionMNIST dataset is downloaded directly with the PyTorch utility inside `torchvision` package. The custom datamodule class to use with *PyTorch_Lightning* framework is defined, together with some random transformations to apply to data samples with the aim of improving generalization capabilities of trained model.
1. **Building candidates models**: a Convolutional Neural Networks (CNN) model is created. It is allowed to assume different hyperparameters that will be tuned in the following section.
1. **Selection of best model & hyper-parameters set**: architectures and hyper-parameters sets are coarsly sampled from a defined range of values and evaluated using `optuna` package.
1. **Training the selected model**: the best model is trained again using the full dataset and also the random transformations defined in section 1. Also a checkpointing callback is implemented to store in memory the model with best validation loss. 
1. **Evaluation of final performance**: the model is tested over the test dataset, computing loss, accuracy and confusion matrix.
1. **Analysis of the network**: the trained model is investigated by visualizing the learned filters in the convolutional layers and the feature maps produced in the forward pass of a test image. Also, some synthetic images are produced by maximizing the mean activation of a particular filter or output neuron.

## Dataset download and organization

Download the dataset:

In [None]:
# folder to contain dataset
DATA_DIR_NAME = 'classifier_data'

# check if download is needed
to_download = not os.path.isdir(DATA_DIR_NAME) 

train_dataset = torchvision.datasets.FashionMNIST(DATA_DIR_NAME, 
                                                  train=True , 
                                                  download=to_download,
                                                 )

label_names = ['T-shirt/top','Trouser','Pullover','Dress','Coat','Sandal','Shirt','Sneaker','Bag','Ankle boot']

In [None]:
train_dataset.__len__()

How to get an image and the corresponding label:

In [None]:
sample_index = 6
image = train_dataset[sample_index][0]
label = train_dataset[sample_index][1]

fig = plt.figure(figsize=(4,4))
plt.imshow(image, cmap='Greys')
print(f"SAMPLE AT INDEX {sample_index}")
print(f"LABEL: {label} ({label_names[label]})")

The output of the dataset is a PIL Image, a python object specifically developed to manage and process images. PyTorch supports this format, and there are useful transforms available natively in the framework: https://pytorch.org/docs/stable/torchvision/transforms.html

Since we will make use of `PyTorch-Lightning`, we define here the datamodule object we will use to create a separate validation set and the dataloaders. Also, we introduce the parameters:
* `Nsamples`, to eventually reduce the size of the dataset;
* `transform`, to optionally pass a customized transformation to use on training data;
* `valid_frac`, fraction of train dataset to use for validation;
* `random_state`, seed number for the random split generator;

We define also the class `DatasetFromSubset` to create dataset objects from the subsets of data returned by the splitter function.

In [None]:
class DatasetFromSubset(Dataset):
    def __init__(self, subset, transform=None):
        self.subset = subset
        self.transform = transform
        
    def __getitem__(self, index):
        x, y = self.subset[index]
        if self.transform is not None:
            x = self.transform(x)
        return x, y
        
    def __len__(self):
        return len(self.subset)


class FashionMNISTDataModule(pl.LightningDataModule):
    def __init__(self, data_dir, batch_size, Nsamples=None, valid_frac=None, 
                 random_state=42, transform=None ):
        super().__init__()
        self.data_dir = data_dir
        self.batch_size = batch_size
        self.Nsamples = Nsamples # number of samples to load from the train dataset
        self.frac = valid_frac   # fraction of samples to use as validation set
        self.seed = random_state
        
        if transform is None:
            self.transform = transforms.ToTensor()
        else:
            self.transform = transform
        
    def prepare_data(self):
        ### train dataset
        self.full = torchvision.datasets.FashionMNIST(
            self.data_dir, train=True, download=False,
        )

    def setup(self, stage=None):
        # restricting to Nsamples 
        if self.Nsamples is not None:
            self.full = Subset(self.full, np.random.choice(range(self.full.__len__()), size=self.Nsamples))
            
        # split into train and validation 
        split_tr  = round( self.full.__len__()*(1-self.frac) )
        split_val = round( self.full.__len__()*self.frac )
        
        train, val = random_split(
            self.full,
            [split_tr, split_val], 
            generator=torch.Generator().manual_seed(self.seed),
        )
        
        # add transforms
        self.mnist_train = DatasetFromSubset(subset=train, transform=self.transform)
        self.mnist_val   = DatasetFromSubset(subset=val  , transform=transforms.ToTensor())    

    def train_dataloader(self):
        return DataLoader(
            self.mnist_train, batch_size=self.batch_size, shuffle=True, pin_memory=True
        )

    def val_dataloader(self):
        batch_size = min(self.mnist_val.__len__(), 1024)
        return DataLoader(
            self.mnist_val, batch_size=batch_size, shuffle=False, pin_memory=True
        )


Here we create a set of custom random transformations to test as regularization:
* `AddGaussianNoise`: add a gaussian noise with given mean and std with a certain probability;
* `AddOcclusion`: add a rectangular occlusion to image with a certain probability.

We will make use also of **horizontal and vertical random flipping** exploiting provided functions inside `torchvision` package.

Also other transformations could have been applied, like blurring, random rotation, random cropping ...

In [None]:
# transform that add a gaussian noise with given mean and std with a certain probability.
class AddGaussianNoise():
    def __init__(self, mean=0., std=1., prob=0.5):
        """
        mean : mean of the gaussian noise
        std  : std of the gaussian noise
        prob : occurring probability of the transformation
        """
        self.mean = mean
        self.std  = std
        self.prob = prob
        
    def __call__(self, tensor):
        if torch.rand(1) < self.prob:
            # generating and adding noise
            tensor += torch.randn(tensor.size())*self.std + self.mean
            
            # returning pixels values in range [0,1]
            min_val = torch.min(tensor)
            tensor -= min_val
            max_val = torch.max(tensor)
            tensor /= max_val
            return tensor
        else:
            return tensor
    
# transform that add a rectangular occlusion to image with a certain probability
class AddOcclusion():
    def __init__(self, max_area=0.5, prob=0.5):
        """
        max_area : maximum fraction of image area that is allowed to be covered by occlusion
        prob     : occurring probability of the transformation
        """
        self.max_area = max_area
        self.prob     = prob
        
    def __call__(self, tensor):
        if torch.rand(1) < self.prob:
            # taking random box vertices
            xs = np.rint( np.random.rand(2)*tensor.size()[1] )
            ys = np.rint( np.random.rand(2)*tensor.size()[2] ) 
            
            # ordering the vertices
            xs = np.sort(xs.astype(int))
            ys = np.sort(ys.astype(int))
            
            # checking if occluded area is greater than max_area
            max_area_pxs = tensor.size()[1]*tensor.size()[2] * self.max_area
            if (xs[1]-xs[0])*(ys[1]-ys[0]) > max_area_pxs:
                xs[1] = xs[1]//2
                ys[1] = ys[1]//2
            
            tensor[:,xs[0]:xs[1],ys[0]:ys[1]] = 0.
            return tensor
        else:
            return tensor


In the following cell we check that the transformations defined above works properly and plot an example.

In [None]:
to_tensor = transforms.ToTensor()
img = to_tensor(train_dataset[0][0])

# add gaussian noise
noiser = AddGaussianNoise(0., 0.1, prob=1.)
img_noisy = noiser(img.detach().clone())

# add occlusion
occluder = AddOcclusion(max_area=0.5, prob=1.)
img_occluded = occluder(img.detach().clone())

# add horizontal flipping
flipper = transforms.RandomVerticalFlip(p=1.)
img_flipped = flipper(img.detach().clone())

print("Original: ", img.size(), ", ", img.dtype)
print("Noisy   : ", img_noisy.size(), ", ", img_noisy.dtype)
print("Occluded: ", img_occluded.size(), ", ", img_occluded.dtype)
print("Flipped : ", img_flipped.size(), ", ", img_flipped.dtype)

# plot 
fig, axs = plt.subplots(1,4,figsize=(16,4))
axs[0].imshow(img.squeeze(), cmap="Greys")
axs[0].set_title("Original")
axs[1].imshow(img_noisy.squeeze(), cmap="Greys")
axs[1].set_title("Noisy")
axs[2].imshow(img_occluded.squeeze(), cmap="Greys")
axs[2].set_title("Occluded")
axs[3].imshow(img_flipped.squeeze(), cmap="Greys")
axs[3].set_title("Flipped")

plt.show()

## Building Candidate Models

Here we create the models that will be tested on our classification task. The model created is called `ConsecutiveConv` and it is a composition of a variable number of consecutive convolutional layers. Also, all convolutional parameters (feature maps, kernel sizes, strides, padding) can be passed as arguments in the `__init__` function (inside the `params` dict). The activation function is fixed to `ReLU`.  <br>
The model is completed by a fully-connected part composed of 2 hidden layers and an output layer, whose hyper-parameters are only the number of neurons in each layer. <br>
It is also possible to activate batch normalization after each layer by mean of the boolean flag `batch_norm` in the model initialization function. <br>

Hyper-parameters will be tuned by running a study with the `optuna` framework. <br>
The search for a good architecture of course is not exhaustive, and also other things could have been tested (like dropout for example, or pooling layers), but it would have required more time. Still, we should be able to solve the task.

In [None]:
def output_shape(dim, kernel_size=1, stride=1, pad=0, dilation=1):
    """
    Utility function that computes the output of a convolutional or pooling layer for a given input.
        Shape of the image and kernel are assumed to be square.
        Also, stride, pad and dilation are assumed to be symmetric.
    """
    out_dim = int( ((dim + (2*pad) - (dilation*(kernel_size-1)) -1 )/ stride) +1 )
    return out_dim
    

In [None]:
# consecutive convolutional layers without pooling
class ConsecutiveConv(nn.Module):
    
    def __init__(self, input_size, params=None):
        '''
        Convolutional hyper-parameters
            input_size    : int representing image size in pixels (assumed to be squared)
            n_conv        : int representing number of convolutional layers
            conv_features : list containing number of feature maps in each conv. layer
            kernel_s      : kernel size of conv. layers (list)
            stride_s      : stride of conv. layers (list)
            padding_s     : paddings of conv. layers (list)
        
        Fully-connected hyper-parameters
            Nl1 & Nl2 : number of units in 1st & 2nd linear layers
            No        : number of output units

        batch_norm : boolean flag to activate batch normalization on both conv. and linear layers
        '''
        super().__init__()
        
        if params is not None:
            self.hp = params
        else:
            self.hp = {
                "n_conv"        : 3,
                "conv_features" : [8,8,8],
                "kernel_s"      : [5,3,3],
                "stride_s"      : [2,2,2],
                "padding_s"     : [0,0,0],
                "Nl1"           : 16,
                "Nl2"           : 16,
                "No"            : 10,
                "batch_norm"    : False,
            }
        
#       ### convolutional layers
        conv_list = []       
        channels = self.hp["conv_features"].copy()
        channels.append(1)  # to be used as number of input channels in the 1st layer
        for it in range(self.hp["n_conv"]):
            conv_list.append(nn.Conv2d(in_channels=channels[it-1], 
                                       out_channels=channels[it], 
                                       kernel_size=self.hp["kernel_s"][it], 
                                       stride=self.hp["stride_s"][it], 
                                       padding=self.hp["padding_s"][it],
                                      )
                            )      
            if self.hp["batch_norm"]:
                conv_list.append(nn.BatchNorm2d(self.hp["conv_features"][it]))
            conv_list.append(nn.ReLU(inplace=True))
         
        self.conv = nn.Sequential(*conv_list)
        
        # Ni is the number of units needed after the flatten layer
        out_dim = input_size
        for it in range(self.hp["n_conv"]):
            out_dim = output_shape(out_dim, self.hp["kernel_s"][it], 
                                   self.hp["stride_s"][it], self.hp["padding_s"][it]
                                  )
        Ni = self.hp["conv_features"][self.hp["n_conv"]-1]*out_dim*out_dim
        
        # flatten layer
        self.flatten = nn.Flatten(start_dim=1)
        
#       ### fully-connected layers
        fc_list = []
        Nunits = [Ni, self.hp["Nl1"], self.hp["Nl2"], self.hp["No"]]
        
        for it in range(len(Nunits)-1):            
            fc_list.append(nn.Linear(in_features=Nunits[it], out_features=Nunits[it+1]))
            if it != (len(Nunits)-2):
                if self.hp["batch_norm"]:
                    fc_list.append(nn.BatchNorm1d(Nunits[it+1]))
                fc_list.append(nn.ReLU(inplace=True))
                
        self.fc = nn.Sequential(*fc_list)
        
            
    def forward(self, x, additional_out=False):
        
        # convolutional part
        x = self.conv(x)
        
        x = self.flatten(x)
           
        # fully-connected part
        x = self.fc(x)
        
        return x

Below we create `pytorch-lightning` module to wrap up the model defined above and train it inside this framework.

In [None]:
# pytorch-lightning module 
class Lit_CNN(pl.LightningModule): 
    def __init__(self, input_size, arch, params=None, optimizer=None, learning_rate=0.001, L2_penalty=0.):
        '''
        input_size    : size of the image in pixels (assumed square)
        arch          : model class (torch.nn.Module)
        params        : model hyper-parameters
        optimizer     : optimizer object (if None, optim.Adam is used)
        learning_rate : initial learning rate
        L2_penalty    : coefficient of L2 regularization
        '''    
        super().__init__()
        
        self.network = arch(input_size, params)
        
        if optimizer is not None:
            self.optim = optimizer
        else:
            self.optim = optim.Adam
            
        self.lr = learning_rate
        self.L2 = L2_penalty

        self.min_val_loss = 10000.0

    def forward(self, x, additional_out=False):
        return self.network(x)

    def training_step(self, batch, batch_idx=None):
        data, target = batch
        output = self(data)  
        
        train_loss = nn.functional.cross_entropy(output, target)
        self.log("train_loss", train_loss.item(), on_step=False, on_epoch=True)
        return train_loss

    def validation_step(self, batch, batch_idx=None):
        data, target = batch
        output = self(data)
        
        val_loss = nn.functional.cross_entropy(output, target)
        self.log("val_loss", val_loss.item(), on_step=False, on_epoch=True, prog_bar=True)

        # store minimum reached validation loss
        if self.min_val_loss > val_loss.item():
            self.min_val_loss = val_loss.item()
            self.log("min_val_loss", self.min_val_loss, on_step=False, on_epoch=True)
        return
    
    def configure_optimizers(self):
        return self.optim(self.network.parameters(), self.lr, weight_decay=self.L2)


## Best model selection

To select a proper model for this task we use the `optuna` package to run a search over hyperparameters space, which is defined in the cells below. <br>
The `objective` function takes care of sampling the hyperparameters, then it defines the model, the trainer and the datamodule and finally it runs the optimization. This function will be called by the optuna `study` object. <br>
The hyper-parameters for each iteration are sampled randomly only in the first few steps, then optuna uses by default a [TPESampler](https://optuna.readthedocs.io/en/stable/reference/generated/optuna.samplers.TPESampler.html), which uses information obtained from previous trials to choose new hyper-parameters configuration to test with a technique called *Tree-structured Parzen Estimator* (TPE). <br>
Since optimizing all the hyper-parameters would have required too much time, for some hyper-parameters that we expect to be less relevant we set a value which we suppose to be good and optimized only the remaining ones. We ended up testing the architectures configurations created, optimizing hyper-parameters of the convolutional part of the networks and the learning rate of the optimizer.

In [None]:
# defining list of possible values for hyper-parameters

n_feature_maps = [32,48,64,96,128]

optim_list = [optim.Adam, optim.Adadelta, optim.SGD]

batch_sizes = [256,512,1024]        

# architecture configs
configs = [{"n_conv"   : 2,    # shape through network: 28  -> 12  -> 8
            "kernel_s" : [5,5],
            "stride_s" : [2,1],
            "padding_s": [0,0],
           },
           {"n_conv"   : 2,    # shape through network: 28  -> 8  -> 8
            "kernel_s" : [7,3],
            "stride_s" : [3,1],
            "padding_s": [0,1],
           },
           {"n_conv"   : 2,    # shape through network: 28  -> 13  -> 6
            "kernel_s" : [3,3],
            "stride_s" : [2,2],
            "padding_s": [0,0],
           },
           {"n_conv"   : 3,      # shape through network: 28  -> 13  -> 7  -> 7
            "kernel_s" : [5,3,3],
            "stride_s" : [2,2,1],
            "padding_s": [1,1,1],
           },
           {"n_conv"   : 3,      # shape through network: 28  -> 13  -> 8  -> 8  
            "kernel_s" : [7,3,3],
            "stride_s" : [2,2,1],
            "padding_s": [2,2,1], 
           },
           {"n_conv"   : 3,      # shape through network: 28  -> 6  -> 6  -> 6 
            "kernel_s" : [9,5,3],
            "stride_s" : [4,1,1],
            "padding_s": [2,2,1], 
           },           
           {"n_conv"   : 3,      # shape through network: 28  -> 13  -> 7  -> 7  
            "kernel_s" : [5,5,3],
            "stride_s" : [2,2,1],
            "padding_s": [1,2,1], 
           },  
           {"n_conv"   : 3,      # shape through network: 28  -> 10  -> 10  -> 10  
            "kernel_s" : [3,3,3],
            "stride_s" : [3,1,1],
            "padding_s": [1,1,1], 
           },           
           {"n_conv"   : 4,        # shape through network: 28  -> 13  -> 7  -> 5  -> 5
            "kernel_s" : [7,5,5,3],
            "stride_s" : [2,2,1,1],
            "padding_s": [2,2,1,1], 
           },           
           {"n_conv"   : 4,        # shape through network: 28  -> 13  -> 7  -> 5  -> 5 
            "kernel_s" : [5,3,3,3],
            "stride_s" : [2,2,1,1],
            "padding_s": [1,1,0,1], 
           },
           {"n_conv"   : 4,        # shape through network: 28  -> 8  -> 8  -> 8  -> 8 
            "kernel_s" : [9,3,3,3],
            "stride_s" : [3,1,1,1],
            "padding_s": [2,1,1,1], 
           },
           {"n_conv"   : 4,        # shape through network: 28  -> 13  -> 8  -> 6  -> 6  
            "kernel_s" : [7,3,3,3],
            "stride_s" : [2,2,1,1],
            "padding_s": [2,2,0,1], 
           },
          ]

In [None]:
from optuna.integration import PyTorchLightningPruningCallback
from pytorch_lightning.callbacks import EarlyStopping

# optuna objective
def objective(trial):
    
    print(f"Trial [{trial.number}] started at:", datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S"))
    
    ### define model architecture and hyper-parameters
    # select model architecture
    arch = ConsecutiveConv

    conf = configs[trial.suggest_int("config_id", 0, len(configs)-1)]
    params = {"n_conv"        : conf["n_conv"],
              "conv_features" : [trial.suggest_categorical(f"conv_features{i}", n_feature_maps) for i in range(conf["n_conv"])],
              "kernel_s"      : conf["kernel_s"],
              "stride_s"      : conf["stride_s"],
              "padding_s"     : conf["padding_s"],
              "Nl1"           : 1024, #trial.suggest_int("Nl1", 256, 1024, step=16),
              "Nl2"           : 320,  #trial.suggest_int("Nl2", 64 , 320 , step=8 ),
              "No"            : 10,
              "batch_norm"    : True, #fixed to True since in all experiments we saw that 
                                      # it always accelerates the convergence
             }
    
    # others
    optimizer     = optim_list[0]   #optim_list[trial.suggest_int("optim_id", 0, len(optim_list)-1)]
    learning_rate = trial.suggest_float("lr", 1e-4, 1e-2, log=True)
    L2_penalty    = trial.suggest_float("L2_penalty", 1e-5, 1e-1, log=True)
    batch_size    = batch_sizes[0]  #trial.suggest_categorical("batch_size", batch_sizes)

    model = Lit_CNN(input_size = 28,
                    arch = arch,
                    params = params,
                    optimizer = optimizer,
                    learning_rate = learning_rate,
                    L2_penalty    = L2_penalty,
                   )

    ### define datamodule    
    Nsamples = 8192  # while optimizing hyper-parameters we reduce to a fraction of the dataset to save time
    datamodule = FashionMNISTDataModule(data_dir   = DATA_DIR_NAME, 
                                        batch_size = batch_size,
                                        Nsamples   = Nsamples,
                                        valid_frac = 1./8.,
                                        random_state = magic_num,
                                       )    
    ### define trainer
    early_stop = EarlyStopping(monitor="val_loss", 
                               min_delta = 0.001, 
                               patience  = 5,
                               verbose   = False, 
                               check_on_train_epoch_end=False, # check early_stop at end of validation
                              )
    pl_pruning = PyTorchLightningPruningCallback(trial, monitor="val_loss")

    epochs = 30
    trainer = pl.Trainer(logger     = False,
                         max_epochs = epochs,
                         gpus       = 1 if USE_GPU else None, #trainer will take care of moving model and datamodule to GPU
                         callbacks  = [pl_pruning, early_stop],
                         enable_checkpointing = False,
                         enable_model_summary = False,
                         deterministic = True,  #use deterministic algorithms to ensure reproducibility
                        )
    trainer.fit(model, datamodule=datamodule)
    
    # storing hyper-parameters as user attribute of trial object for convienience
    hypers = {"arch"         : arch,
              "params"       : params,
              "optimizer"    : optimizer,
              "learning_rate": learning_rate,
              "L2_penalty"   : L2_penalty,
              "batch_size"   : batch_size,
             }
    trial.set_user_attr("hypers", hypers)
    
    final_valid_loss = trainer.callback_metrics["val_loss"].item()
    
    # getting minimum reached validation loss
    try:
        min_valid_loss = trainer.callback_metrics["min_val_loss"].item()
    except:
        print("INFO: No 'min_val_loss' value logged")
        min_valid_loss = final_valid_loss

    print(f"Trial [{trial.number}] ended at:", datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")) 
    print(f"    Valid. loss: {final_valid_loss}; min valid. loss: {min_valid_loss}\n")
    return min_valid_loss


Below we define and run the study. We also set up a **pruner** to stop unpromising trials and save time and an **early stopping** callback monitoring validation loss. <br> The pruner is of type [`MedianPruner`](https://optuna.readthedocs.io/en/stable/reference/generated/optuna.pruners.MedianPruner.html#optuna.pruners.MedianPruner), which uses the *median stopping rule*: it prunes the current trial if the trial’s best intermediate result is worse than median of intermediate results of previous trials at the same step.

In [None]:
# set random state
seed_everything(seed=magic_num) 

### MedianPruner
pruner = optuna.pruners.MedianPruner(n_startup_trials=10, # trials to complete before starting to prune
                                     n_warmup_steps=20,   # steps to take before evaluating pruning
                                     interval_steps=10,   # steps between trail pruning checks
                                    )

# Make the default sampler behave in a deterministic way
sampler = optuna.samplers.TPESampler(seed=magic_num,
                                     n_startup_trials=10, # use random sampling at beginning
                                     )

### create study
study = optuna.create_study(study_name = "Lit_CNN_tuning", 
                            direction  = "minimize",
                            pruner     = pruner,
                            sampler    = sampler,
                           )

### run optimization
Ntrials = 30   #
MaxTime = None # 
logging.captureWarnings(True)
study.optimize(objective, 
               n_trials       = Ntrials, 
               timeout        = MaxTime, # timeout in seconds
               gc_after_trial = True,    # run garbage collection 
              )
logging.captureWarnings(False)

In [None]:
print("Number of trials: {}".format(len(study.trials)))

trial = study.best_trial
print("Best trial: #{}".format(trial.number))
print("  Value: {}".format(trial.value))
print("  Optimized params: ")
for key, value in trial.params.items():
    print("     {}: {}".format(key, value))
    if key == "config_id":  # print conv2d configs if present
        for kk, vv in configs[value].items():
            print("         {}: {}".format(kk, vv))

In [None]:
import optuna.visualization as optunaplt

optunaplt.plot_optimization_history(study)

In [None]:
# print dataframe with study results
study_df = study.trials_dataframe()

study_df.drop(columns="user_attrs_hypers", inplace=True)

study_df = study_df.sort_values(by="value")

study_df.head(30)

## Training the model

Now we have a promising model that we will train on the whole train dataset. Below we report a summary of the model.

In [None]:
print("Summary of the best model resulting from the optuna search:")

hyper_pars = study.best_trial.user_attrs["hypers"]

model = Lit_CNN(input_size    = 28,
                arch          = hyper_pars["arch"],
                params        = hyper_pars["params"],
                optimizer     = hyper_pars["optimizer"],
                learning_rate = hyper_pars["learning_rate"],
                L2_penalty    = hyper_pars["L2_penalty"],
               )

dummy_batch_shape = [hyper_pars["batch_size"], 1, 28, 28]
summary(model, dummy_batch_shape, 
        col_width = 18, 
        col_names = ("input_size","output_size","num_params","mult_adds",),
        depth     = 3, 
        row_settings = ("depth","var_names",))

Then we define objects and functions we will use for training. <br>
We created a callback `LossesTracker` to store on lists the losses (train and validation) during training. The class also has a function to plot them when training ended. <br>
We also created the function `RunTraining` that takes care of initilizing everything needed for training: model, datamodule, trainer and callbacks (an early stopping callback and a checkpointing one).

In [None]:
from pytorch_lightning.callbacks import Callback

class LossesTracker(Callback):

    def __init__(self, ep_frac): 
        self.ep_frac = ep_frac
        self.train = []
        self.valid = []
        self.tr_epochs = 0
        self.val_epochs = 0
    
    def on_train_epoch_end(self, trainer, module):
        self.train.append(trainer.logged_metrics["train_loss"])
        self.tr_epochs += 1
        return
    
    def on_validation_epoch_end(self, trainer, module):       
        self.valid.append(trainer.logged_metrics["val_loss"])
        self.val_epochs += 1
        return
    
    def plot_train_history(self, figsize=(8,5), ylog=True, save=None):
        # plot of the losses
        fig = plt.figure(figsize=figsize)

        plt.title("Losses history")
        plt.xlabel("Epoch")
        plt.ylabel("Loss value")
        if ylog:
            plt.yscale("log")

        epochs = np.arange(self.tr_epochs)
        val_epochs = np.arange(self.val_epochs)*self.ep_frac
        plt.plot(epochs, self.train, "-o", label="train loss")
        plt.plot(val_epochs, self.valid, "-o", label="valid. loss")

        # plot horizontal line at minimum validation loss
        min_valid = min(self.valid)
        plt.plot([0, self.tr_epochs-1], [min_valid, min_valid], color="red", 
                 ls="--", label="Minimum valid. loss: %.3f "%min_valid,
                )

        plt.legend()
        plt.tight_layout()
        plt.grid()
        plt.show()
        
        if save is not None:
            fig.savefig("images/"+save+".pdf", bbox_inches='tight')
        return


In [None]:
from pytorch_lightning.callbacks import ModelCheckpoint

def RunTraining(hypers, chkpt_prefix, transform=None, epochs=100, Nsamples=None, valid_frac=1./6.):
    
    # check validation every ep_frac (fraction of epoch)
    ep_frac = 1.
    
    ### callbacks
    losses_tracker = LossesTracker(ep_frac)
    early_stop_callback = EarlyStopping(monitor="val_loss", 
                                        min_delta=0.0001, 
                                        patience=6*(1./ep_frac), 
                                        verbose=False, 
                                        check_on_train_epoch_end=False, # check early_stop at end of validation
                                       )
    checkpoint = ModelCheckpoint(dirpath  = "FashionMNIST_checkpoint", 
                                 filename = chkpt_prefix+"_{epoch}_{val_loss:.2f}", 
                                 monitor  = "val_loss",
                                )
    
    ### define model architecture and hyper-parameters
    model = Lit_CNN(input_size    = 28,
                    arch          = hypers["arch"],
                    params        = hypers["params"],
                    optimizer     = hypers["optimizer"],
                    learning_rate = hypers["learning_rate"],
                    L2_penalty    = hypers["L2_penalty"],
                   )
    
    ### define datamodule
    datamodule = FashionMNISTDataModule(data_dir   = DATA_DIR_NAME, 
                                        batch_size = hypers["batch_size"],
                                        Nsamples   = Nsamples,
                                        valid_frac = valid_frac,
                                        random_state = magic_num,
                                        transform  = transform,
                                       )
    ### define trainer
    trainer = pl.Trainer(logger     = False,
                         max_epochs = epochs,
                         gpus       = 1 if USE_GPU else None,
                         callbacks  = [early_stop_callback, losses_tracker, checkpoint],
                         val_check_interval   = ep_frac,
                         enable_model_summary = False,
                         num_sanity_val_steps = 0,     # disable validation sanity check before training
                        )
    
    # measure running time
    fit_begin = time.time() 
    
    trainer.fit(model, datamodule=datamodule) # run the training

    fit_time = time.time() - fit_begin
    print(f"Fit time:", str(datetime.timedelta(seconds=fit_time)) )
    
    return model, trainer, losses_tracker, checkpoint   
    

Now we are ready to run the training after resetting the random state.

In [None]:
# set random state
seed_everything(seed=magic_num) 

model, trainer, losses, checkpoint = RunTraining(hyper_pars, 
                                                 chkpt_prefix="model", 
                                                 Nsamples = None, # use full dataset
                                                 epochs   = 100,
                                                 valid_frac=1./6.,
                                                )

In [None]:
losses.plot_train_history(ylog=False, save="losses")

## Performance evaluation

To evaluate performances over the test set we built two functions: `TestPerformance`, that runs the model in evaluation mode over the test dataset and computes the total accuracy, and `PlotConfusionMatrix`.

In [None]:
import torchmetrics
import matplotlib

def TestPerformance(model, dataloader, device=torch.device("cpu")):
    accuracy  = torchmetrics.Accuracy()
    
    # test
    preds = []
    labels = []
    losses = []
    accuracies = []

    model.eval()
    with torch.no_grad():
        for batch, target in test_loader:
            batch = batch.to(device)
            output = model(batch)

            output = output.cpu()
            # test loss
            loss = nn.functional.cross_entropy(output, target)
            losses.append(loss)

            # accuracy        
            pred = output.argmax(dim=1, keepdim=True)
            test_acc  = accuracy(pred.view_as(target), target)
            accuracies.append(test_acc)

            # predictions and labels to return
            preds.append(pred)
            labels.append(target)
            
    final_test_loss = np.mean(losses)
    final_test_acc  = np.mean(accuracies)
    print("FINAL TEST LOSS VALUE: {}".format(final_test_loss   ))
    print("FINAL TEST ACCURACY  : {}".format(final_test_acc    ))
    
    return torch.cat(preds), torch.cat(labels), final_test_acc
    
    
def PlotConfusionMatrix(guesses, true_labels, num_classes=10, save=None):
    # bulding confusion matrix
    confusion_matrix = torchmetrics.ConfusionMatrix(num_classes)
    mat = confusion_matrix(guesses, true_labels).numpy()

    # plotting    
    fig, ax = plt.subplots(figsize=(10,8))
    im = ax.imshow(mat)

    # Show all ticks and label them with the respective names
    ax.set_xticks(np.arange(num_classes))
    ax.set_xticklabels(label_names)
    
    ax.set_yticks(np.arange(num_classes))
    ax.set_yticklabels(label_names)

    # Rotate the tick labels and set their alignment.
    plt.setp(ax.get_xticklabels(), rotation=45, ha="right",
             rotation_mode="anchor")

    # Loop over data dimensions and create text annotations.
    formatter = matplotlib.ticker.StrMethodFormatter("{x:d}")
    
    for i in range(num_classes):
        for j in range(num_classes):
            datum = mat[i, j].astype(int)
            color = "white" if (datum < np.amax(mat)*0.5) else "black"
            text = ax.text(j, i, formatter(datum), fontsize=12,
                           ha="center", va="center", color=color)

    ax.set_title("Confusion Matrix", fontsize=14)
    ax.set_xlabel("Predicted labels")
    ax.set_ylabel("True labels")
    fig.tight_layout()
    plt.show()
            
    if save is not None:
        fig.savefig("images/"+save+".pdf", bbox_inches='tight')
    return
    

In the cells below we load the test dataset in memory, create the dataloader object, we restore the model to the checkpoint with best validation accuracy and then use the functions defined above to test the model.

In [None]:
# loading test dataset and creating dataloader
test_dataset = torchvision.datasets.FashionMNIST(DATA_DIR_NAME, 
                                                 train=False, 
                                                 download=to_download, 
                                                 transform = transforms.ToTensor(),
                                                )
test_loader = DataLoader(test_dataset, batch_size=1024, shuffle=False, pin_memory=True)

In [None]:
# load best model from checkpoint
model = Lit_CNN.load_from_checkpoint(checkpoint.best_model_path, 
                                     input_size    = 28,
                                     arch          = hyper_pars["arch"],
                                     params        = hyper_pars["params"],
                                     optimizer     = hyper_pars["optimizer"],
                                     learning_rate = hyper_pars["learning_rate"],
                                     L2_penalty    = hyper_pars["L2_penalty"],
                                    )
model.to(device)
# evaluate performance
preds, labels, model_acc = TestPerformance(model, test_loader, device)

In [None]:
# plot confusion matrix
PlotConfusionMatrix(preds, labels, save="confmatrix")

### Small change: retraining with added random transformations

Here we repeat the training above by adding some random transformations to images when training. This should act as a regularizer for the model and should improve its generalization capabilities.

In [None]:
# define transformations composition to serve as regularization
## the probability to have at least one transform applied is about 0.185
TrainTransform = transforms.Compose([
    transforms.ToTensor(),
    AddGaussianNoise(mean=0., std=0.1, prob=0.05),
    AddOcclusion(max_area=0.4, prob=0.05),
    transforms.RandomVerticalFlip(p=0.05),
    transforms.RandomHorizontalFlip(p=0.05),
])

# set random state
seed_everything(seed=magic_num) 

model_tr, trainer_tr, losses_tr, checkpoint_tr = RunTraining(hyper_pars, 
                                                             chkpt_prefix="model_tr",
                                                             transform=TrainTransform,
                                                             Nsamples = None, # use full dataset
                                                             epochs   = 100,
                                                             valid_frac=1./6.,
                                                            )

In [None]:
losses_tr.plot_train_history(save="losses-tr")

In [None]:
#load from checkpoint 
model_tr = Lit_CNN.load_from_checkpoint(checkpoint_tr.best_model_path,
                                        input_size    = 28,
                                        arch          = hyper_pars["arch"],
                                        params        = hyper_pars["params"],
                                        optimizer     = hyper_pars["optimizer"],
                                        learning_rate = hyper_pars["learning_rate"],
                                        L2_penalty    = hyper_pars["L2_penalty"],
                                       )                                       
model_tr.to(device)
# evaluate performance
preds_tr, labels_tr, model_tr_acc = TestPerformance(model_tr, test_loader, device)

In [None]:
# plot confusion matrix
PlotConfusionMatrix(preds_tr, labels_tr, save="confmatrix-tr")

## Analysis of the network

### Visualization of filter kernels

First we try to visualize the learned filters of the convolutional layers. We just need to retrieve the weights of convolutional layers and plot them. <br> 
For the first layer we have just one input channel, so we can plot them all; for subsequent layers we must select one input channel and plot only filters belonging to it.

In [None]:
model.network

In [None]:
from mpl_toolkits.axes_grid1 import ImageGrid

def ShowFiltersGrid(conv_layer, title="", figsize=(12,6), channel_id=0, save=None):
    
    # retrieve kernels from layer
    kernels = conv_layer.weight.detach().clone()

    if kernels.size(1) != 1:  # if there is more than 1 input channel, select channel_id
        kernels = kernels[:, channel_id].unsqueeze(dim=1)
        title = title + f" - channel #{channel_id}"
    else:
        channel_id = 0 # ignore channel_id
    
    print(title)
    print(f"  kernels shape: ({kernels.size(2)},{kernels.size(3)})" + 
          f" ; # of filters: {kernels.size(0)}" 
         )
    
    # normalize to range [0,1] for better visualization
    kmin = torch.min(kernels).item()
    kmax = torch.max(kernels).item()
    kernels = (kernels - kmin)/(kmax - kmin)
    
    # create grid of filters images
    cols = 16
    rows = kernels.size(0) // cols    
    
    # modify figsize height
    figsize = ( figsize[0], figsize[0]*rows/(cols-1) ) 
    
    fig = plt.figure(figsize=figsize)
    grid = ImageGrid(fig  = fig, 
                     rect = 111,  # as in subplot(111)
                     nrows_ncols = (rows, cols), 
                     axes_pad    = 0.05,  # pad between axes in inch.
                    )
    fig.suptitle(title, fontsize=14)
    
    for ax, im in zip(grid, kernels):       
        ax.imshow(im.squeeze().cpu(), cmap="gray")
        ax.axis("off")
    
    plt.show()
    
    if save is not None:
        fig.savefig("images/"+save+".pdf", bbox_inches='tight')
    return


In [None]:
# retrieve all convolutional layers inside network
ConvLayers = [module for module in model.network.modules() if isinstance(module, nn.Conv2d)]
print(f"Model has {len(ConvLayers)} convolutional layers")

In [None]:
# plot filters 
for it, layer in enumerate(ConvLayers):
    ShowFiltersGrid(conv_layer = layer,
                    title = f"Filters of Convolutional layer #{it+1}",
                    channel_id = np.random.randint(layer.in_channels),  # random channel id
                    save = f"kernels-layer{it+1}",
                   )

### Visualization of feature maps

Here instead we select a sample from the test dataset and pass it to the model. By mean of a hook for each convolutional layer we can collect the filtered images that are forwarded through the net and plot them.

In [None]:
def ShowFeatureMapsGrid(maps, title="", figsize=(16,6), save=None):
    
    print(title) 
    maps = maps.squeeze(dim=0) # remove first dim: from [1,C,H,W] to [C,H,W]    
    print(f"  feature maps shape: ({maps.size(1)},{maps.size(2)})" + 
          f" ; # of feature maps: {maps.size(0)}" 
         )
    
    # normalize to range [0,1] for better visualization
    fmin = torch.min(maps).item()
    fmax = torch.max(maps).item()
    maps = (maps - fmin)/(fmax - fmin)
    
    # create grid of filters images
    cols = 16
    rows = maps.size(0) // cols    
    
    figsize = ( figsize[0], figsize[0]*rows/cols )  # modify figsize height  
    fig = plt.figure(figsize=figsize)
    grid = ImageGrid(fig  = fig, 
                     rect = 111,  # as in subplot(111)
                     nrows_ncols = (rows, cols), 
                     axes_pad    = 0.05,  # pad between axes in inch.
                    )    
    fig.suptitle(title, fontsize=14)
    
    for ax, im in zip(grid, maps):
        ax.imshow(im.cpu(), cmap="gray")
        ax.axis("off")
    
    plt.show()
    
    if save is not None:
        fig.savefig("images/"+save+".pdf", bbox_inches='tight')
    return


In [None]:
# select image from test dataset

img_id = 9

img, true_label = test_dataset[img_id][0], test_dataset[img_id][1]

fig = plt.figure(figsize=(4,4))
plt.imshow(img.squeeze(), cmap="Greys")
plt.show()

In [None]:
class OutputFeatures():
    def __init__(self, network):
        self.net = network
        
        self.outputs = []
        self.hook_handles = []
        
        # Register hook on each conv. layer activations
        layers = list(self.net.modules())
        for it in range(len(layers)):
            if isinstance(layers[it], nn.Conv2d):
                self.hook_handles.append(layers[it+1].register_forward_hook(self._get_output_features))
                
        print(f"Registered {len(self.hook_handles)} hooks on network.")
        
    def _get_output_features(self, layer, input, output):
        self.outputs.append(output)

    def close(self):
        for handle in self.hook_handles:
            handle.remove()

In [None]:
# register hooks in convolutional layers
hooks = OutputFeatures(model.network)

model.eval()
with torch.no_grad():
    # pass an image to the network
    pred_label = model(img.unsqueeze(dim=0).to(device)).argmax(dim=1, keepdim=True).item()
    
# remove hooks
hooks.close()

feature_maps = hooks.outputs.copy()
    
print(f"True label     : {true_label} ({label_names[true_label]})")
print(f"Predicted label: {pred_label} ({label_names[pred_label]})")

In [None]:
# plot feature maps for each conv. layer
for it, fmaps in enumerate(feature_maps):   
    ShowFeatureMapsGrid(maps  = fmaps,
                        title = f"Feature maps of Convolutional layer #{it+1}",
                        save  = f"featuremaps-layer{it+1}",
                       )

### Optimize an image by maximizing activation

To further investigate what the network has learned we create the class `MaximizeActivation`. It receives in input the model and, with its method `optimize_sample`, it generates a synthetic image optimized to maximally activate a particular convolutional filter (when `layer_id` is passed to the function) or a particular output neuron (when `class_id` is provided). <br>
To control the quality of generated images some parameters should be adjusted, like learning rate, L2 penalty, .. or some technique can be applied during optimization, like gaussian blurring, clipping, ...

In [None]:
model.network

In [None]:
class MaximizeActivation():
    def __init__(self, model, img_size=[28, 28, 1], device=None, mean=[0.], std=[1.], 
                 verbose=False,
                ):
        
        self.model  = model
        self.shape  = img_size
        self.verbose= verbose

        if device is None:
            self.device = torch.device("cpu")
        else:
            self.device = device
        
        # store mean and std for normalization
        self.mean = mean
        self.std  = std
        
        # compute reverse mean and std for inverting normalization
        self.reverse_mean = list( -np.asarray(mean)/np.asarray(std) )
        self.reverse_std  = list( 1./np.asarray(std) )
        
        # hook handle
        self.hook_handle = []
        
### hooks management
    def register_conv_hook(self, layer_id):
        # register hook on conv layer
        if isinstance(self.model.conv[layer_id], nn.Conv2d):
            self.hook_handle.append( self.model.conv[layer_id].register_forward_hook(self._hook_f) )
        else:
            raise ValueError(f"Layer {layer_id} is NOT a 'Conv2d' instance.")
        if self.verbose:
            print(f"Registered hook on conv. layer [{layer_id}].")
        
    def register_fc_hook(self):
        # register hook on last linear layer
        self.hook_handle.append( self.model.fc[-1].register_forward_hook(self._hook_f) )
        if self.verbose:
            print(f"Registered hook on last linear layer output.")
        
    def _hook_f(self, layer, input, output):
        self.activ = output.clone()
        self.activ.requires_grad_(True)
        
    def remove_hooks(self):
        if len(self.hook_handle) > 0:
            [hh.remove() for hh in self.hook_handle]
            
        self.hook_handle = []
        
### loss function       
    def loss_f(self, class_id, layer_id, channel_id):
              
        if class_id is not None:
            loss = -1.* self.activ.squeeze()[class_id]
        else:
            loss = -1.* (self.activ[0, channel_id]).mean()
            
        return loss
        
### optimization 
    def optimize_sample(self, niter=100, input_img=None, lr=0.001, L2_c=1e-6,
                        class_id=None, layer_id=None, channel_id=None, 
                        blurrer=None, clipping=False, optimizer=torch.optim.Adam,
                       ): 
        if input_img is None:
            # generate noise image
            noise     = np.uint8( np.random.uniform(50, 80, (self.shape)) )
            # transform into torch float tensor (dividing by 255)
            input_img = transforms.functional.to_tensor(noise)
            
        if len(input_img.size()) < 4:
            input_img = input_img.unsqueeze(dim=0)
        
        # store initial image
        init_img = input_img.detach().clone()

        # normalize with mean and std provided to __init__
        input_img = transforms.functional.normalize(input_img, self.mean, self.std)
        
        # activate gradient tracking for image
        input_img = input_img.detach().clone().to(self.device).requires_grad_(True)
        
        # setup optimizer
        optim = optimizer([input_img], lr=lr, weight_decay=L2_c)
        
        # setup hooks
        if class_id is not None:
            self.register_fc_hook()
            print(f"Maximizing class {class_id} activation ...")
        elif layer_id is not None:
            self.register_conv_hook(layer_id)
            
            if channel_id is None:
                # sample a random channel index
                channel_id = np.random.randint(self.model.conv[layer_id].out_channels)
            print(f"Maximizing mean activation of conv. layer: {layer_id}, channel: {channel_id} ...")
        else:
            raise RuntimeError("Both 'class_id' and 'layer_id' are None. Provide one of them.")
        
        # run optimization
        self.model.eval()
        self.loss_history = []
        for it in range(niter):
            optim.zero_grad()
            
            if blurrer is not None:
                # blur (and clip) before passing img to model (should help to get a more regular sample)
                if clipping:
                    output = self.model(blurrer(input_img).clamp(0., 1.))
                else:
                    output = self.model(blurrer(input_img))
            else:
                output = self.model(input_img)
            
            # compute loss
            loss = self.loss_f(class_id, layer_id, channel_id)
            self.loss_history.append(loss.item())
            
            # optimize image
            loss.backward()
            optim.step()         
            
        # delete hooks
        self.remove_hooks()
        
        # invert normalization
        input_img = transforms.functional.normalize(input_img, self.reverse_mean, self.reverse_std)
        
        out_img = input_img.detach().clone()
        
        return out_img, init_img, [class_id, layer_id, channel_id]


In [None]:
# some utils functions
from PIL import Image

    
def plot_loss_history(losses):

    niter = len(losses)
    fig = plt.figure(figsize=(6,4))
    plt.title("Loss history")
    plt.xlabel("Iteration")
    plt.ylabel("Loss value")
    plt.plot(range(niter), losses)
    plt.grid()
    plt.show()
    return

def tensor_to_image(tensor, clip=False, brightness=0.):
    
    # brightness adjust
    out = tensor.add(brightness)        

    if clip: #clipping to [0,1] range
        out = out.clamp(min=0., max=1.)   
        out = out / torch.max(out)

    if len(out.size()) == 4:
        out = out.squeeze(dim=0)

    out = out.detach().numpy()        
    out = np.uint8( np.round(out * 255) )
    return out.transpose(1,2,0).squeeze()

def save_image_png(image, title="image.png"):
    
    pil_img = Image.fromarray(image)
    pil_img.save(title)
    return

In [None]:
inspector = MaximizeActivation(model.network, 
                               device = model.device,
                              )

### 1) maximize activation of some filters on convolutional layers

In [None]:
# get ids of conv layers
conv_layers_id = [it-1 for it, module in enumerate(model.network.conv.modules()) if isinstance(module, nn.Conv2d)]
print("Indices of conv. layers: ", conv_layers_id)

In [None]:
layers_dict = {}
Niters = [20,20,20,20]  #iterations for each layer (deeper layers require more iterations)

Nfilters = 8  # number of channel_id for each layer
for k, idx in enumerate(conv_layers_id):
    output = []
    settings = []
    for ff in range(Nfilters):
        generated, initial, info = inspector.optimize_sample(niter    = Niters[k],
                                                             layer_id = idx,
                                                             channel_id = None, # channel_id is randomly chosen
                                                             lr   = 0.2,
                                                             L2_c = 1e-6,
                                                             blurrer = None,
                                                             optimizer = torch.optim.Adam,
                                                            )
        output.append(generated)
        settings.append(info) 
    # store images and info in dict
    layers_dict.update({ f"{idx}" : {"out": output, "info": settings} })       
    

In [None]:
# plot generated images
tmp = 1
for key, layer in layers_dict.items():
    
    h_size = 16
    v_size = h_size/Nfilters +0.5
    fig, axs = plt.subplots(1, Nfilters, figsize=(h_size-2, v_size))
    fig.suptitle("Convolutional Layer id: "+str(tmp), fontsize=14)
    
    for jdx, out in enumerate(layer["out"]):
        img_to_plot = tensor_to_image(out.detach().clone().cpu(), clip=True, brightness=0.1)
        axs[jdx].imshow(img_to_plot, cmap="Greys")
        axs[jdx].set_title(f"channel: {layer['info'][jdx][2]}")
        axs[jdx].axis('off')
 
    #plt.tight_layout()
    plt.show()
    
    fig.savefig("images/patterns"+str(tmp)+".pdf", bbox_inches='tight')
    
    tmp += 1

As expected, maximizing activation of convolutional filters generated some patterns that are very simple for the first layer and gradually increasing in complexity for deeper layers. This effect is more visible in the images reported in appendix, obtained from a pre-trained AlexNet.

### 2) maximize activation of a neuron in the last linear layer

In [None]:
blurrer = transforms.GaussianBlur(kernel_size=(5,5), sigma=2.)

classes = [0,1,2,3,4,5,6,7,8,9]  # target class_id list

output = []
for k, idx in enumerate(classes):
    generated, initial, info = inspector.optimize_sample(niter    = 50,
                                                         class_id = idx,
                                                         input_img = None, # generate random noise image
                                                         lr   = 0.05,
                                                         L2_c = 0.99,
                                                         blurrer = blurrer,
                                                         clipping = True,
                                                         optimizer = torch.optim.Adam,
                                                        )
    output.append(generated)

In [None]:
# plot generated images
h_size = 16
v_size = 4*h_size/len(classes)
fig, axs = plt.subplots(2, len(classes)//2, figsize=(h_size, v_size))
fig.suptitle("Optimized images", fontsize=14)

# applying some transform to generated images for better visualization
images_to_plot = [tensor_to_image(oo.detach().clone().cpu(), clip=True) for oo in output]
    
for k, idx in enumerate(classes):
    axs[k//5][k%5].imshow(images_to_plot[k], cmap="Greys")
    axs[k//5][k%5].set_title(f"class: {idx} ({label_names[idx]})")
    
plt.show()

fig.savefig("images/generated.pdf", bbox_inches='tight')

In [None]:
# retrieving predictions of network over generated images
for img in images_to_plot:
    
    to_tensor = transforms.ToTensor()
    img = to_tensor(img).unsqueeze(dim=0)
    
    optim_out = model.network( img.to(device) )
    optim_pred_id = torch.argmax(optim_out, dim=1).item()

    print(f"Optimized sample inferred class: {optim_pred_id} ({label_names[optim_pred_id]})")
    print("    with softmax activation:", torch.max(optim_out.softmax(dim=1)).item() )

The images generated and plotted above present some of the features of the class they belong, however they are far from being a good quality sample, even if the network is highly confident on which label to assign.

## Conclusions

In this work a Convolutional Neural Network (CNN) was implemented to classify images in the [FashionMNIST](https://github.com/zalandoresearch/fashion-mnist) dataset. The hyper-parameters have been tuned using `optuna` package. The search has not been exhaustive, as a lot of other convolutional configurations could have been defined and also a lot of hyper-parameters sets could have been explored. However, the obtained model can solve the task, as the reached final accuracy is `0.906` for the model without random transformations applied to inputs. The addition of random transformations as augmentation has increased the accuracy to `0.911`. It would have been interesting to see if other transformations, like random rotation/cropping could have further increased the accuracy. <br>

Regarding the learned convolutional filters, it is hard to assign to them a clear meaning, and also the plotted feature maps for an input sample seems to be noisy, suggesting that learned filters are not well trained. The reason for this could be that the fully-connected layers have too much complexity and overfit too quickly the output feature maps. Maybe reducing the number of neurons or removing batch normalization from the fully-connected classifier could improve the learned filters. <br>

Finally, the generated images by maximizing a certain filter activation seems to reproduce some basic patterns that become more complex for inner layers. However, to obtain the visualization plotted some clipping and rescaling have been applied. <br>
The images generated by maximizing the activation of an output neuron are not very similar to dataset images of the same class. Also, to obtain these images strong constraints have been applied to the optimization (very high L2 penalty, gaussian blurring, clipping). <br> 
Improving the quality of learned filters could improve the quality of generated images, although this is not a generative model.


# Appendix

## Test the maximization of filter activations on a pretrained AlexNet

In order to ensure that the class `MaximizeActivation` created above is working properly we test it on a pretrained network from the `torchvision` package. We select the simplest model (`AlexNet`) to minimize download and running times.

In [None]:
# set random state
seed_everything(seed=magic_num) 

# load alexnet (only convolutional part)
alexnet = torchvision.models.alexnet(pretrained=True)

# since class 'MaximizeActivation' expects a network with a convolutional submodule called 'conv',
## we adopted this workaround
alexnet.conv = alexnet.features

In [None]:
alexnet.conv

In [None]:
alexnet.to(device)
alexnet.eval()
inspector = MaximizeActivation(alexnet,
                               device = device, 
                               img_size = [224,224,3], # ImageNet size, mean and std
                               mean     = [0.485, 0.456, 0.406], 
                               std      = [0.229, 0.224, 0.225],
                              )

In [None]:
# get ids of conv layers
conv_layers_id = [it-1 for it, module in enumerate(alexnet.conv.modules()) if isinstance(module, nn.Conv2d)]
print("Indices of conv. layers: ", conv_layers_id)

In [None]:
layers_dict = {}
Niters = [5,8,16,20,30]  #iterations for each layer (deeper layers require more iterations)

blurrer = transforms.GaussianBlur(kernel_size=(3,3), sigma=0.8)

Nfilters = 5  # number of different channel_ids for each layer
for k, idx in enumerate(conv_layers_id):
    output = []
    settings = []
    for ff in range(Nfilters):
        generated, initial, info = inspector.optimize_sample(niter    = Niters[k],
                                                             layer_id = idx,
                                                             channel_id = None, # channel_id is randomly chosen
                                                             lr   = 0.1,
                                                             L2_c = 1e-6,
                                                             blurrer = blurrer,
                                                             optimizer = torch.optim.Adam,
                                                            )
        output.append(generated)
        settings.append(info) 
    # store images and info in dict
    layers_dict.update({ f"{idx}" : {"out": output, "info": settings} })        
    

In [None]:
# plot generated images
for key, layer in layers_dict.items():
    
    h_size = 16
    v_size = h_size/Nfilters
    fig, axs = plt.subplots(1, Nfilters, figsize=(h_size-2, v_size))
    fig.suptitle("Convolutional Layer id: "+key, fontsize=14)
    
    for jdx, out in enumerate(layer["out"]):
        img_to_plot = tensor_to_image(out.detach().clone().cpu(), clip=True, brightness=0.3)
        axs[jdx].imshow(img_to_plot)
        axs[jdx].set_title(f"channel: {layer['info'][jdx][2]}")
        axs[jdx].axis('off')
 
    #plt.tight_layout()
    plt.show()
    
    fig.savefig("images/alexnet"+str(key)+".pdf", bbox_inches='tight')