# Project 1: Classification, weight sharing, auxiliary losses 


The objective of this project is to test different architectures to compare two digits visible in a
two-channel image. It aims at showing in particular the impact of weight sharing, and of the use of an
auxiliary loss to help the training of the main objective.
It should be implemented with PyTorch only code, in particular without using other external libraries
such as scikit-learn or numpy.

The goal of this project is to implement a deep network such that, given as input a series of 2 ×14×14
tensor, corresponding to pairs of 14 × 14 grayscale images, it predicts for each pair if the first digit is
lesser or equal to the second. The training and test set should be 1, 000 pairs each, and the size of the images allows to run experiments rapidly, even in the VM with a single core and no GPU.
You can generate the data sets to use with the function generate˙pair˙sets(N) defined in the file
dlc˙practical˙prologue.py. This function returns six tensors:

## Set-up: 

In [None]:
import torch
from torch.utils.data import DataLoader
from torchvision.datasets import MNIST
import argparse
import os
import urllib
import torch.nn as nn

import matplotlib.pyplot as plt
import scikitplot as skplt # to generate nice plots
from sklearn.metrics import roc_auc_score # roc auc metric
import seaborn as sns # visualization library
import pandas as pd
from dlc_practical_prologue import *
%load_ext autoreload
%autoreload 2
%matplotlib inline

In [None]:
# Install a pip package in the current Jupyter kernel
import sys
# !{sys.executable} -m nbstripout --install --global
# !{sys.executable} -m pip install scikit-plot
# !{sys.executable} -m pip install seaborn

In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
device

In [None]:
from six.moves import urllib
opener = urllib.request.build_opener()
opener.addheaders = [('User-agent', 'Mozilla/5.0')]
urllib.request.install_opener(opener)

In [None]:
if not os.path.exists('../data/'):
    os.makedirs('../data/')

In [None]:
# Run this once to download the MNIST data-set. 
# There is a problem with the server on which it's hosted so only way right now 
# to have it :( 

# !wget www.di.ens.fr/~lelarge/MNIST.tar.gz
# !tar -zxvf MNIST.tar.gz

## Data: 

In [None]:
train_input, train_target, train_classes, test_input, test_target, test_classes = generate_pair_sets(
    1000)

In [None]:
print(f'Training and test input size: {train_input.size(), test_input.size()}')
print(f'Training and test target size: {train_target.size(), test_target.size()}')
print(f'Training and test classes size: {train_classes.size(), test_classes.size()}')

Generate dataset needed for training. For this as we have a special data case we rewrite the `Dataset` class in order to use a `dataloader` later. Remember `target` is 1 if first number is smaller or equal than the second image.  

In [None]:
class Dataset(torch.utils.data.Dataset):
    'Characterizes a dataset for PyTorch'
    def __init__(self, pairs, target, classes):
        'Initialization'
        # target = (0,1)
        self.target = target
        # image pairs (2,14,14)
        self.pairs = pairs
        # cipher classes (2 in [0,9])
        self.classes = classes

    def __len__(self):
        'Denotes the total number of samples'
        return len(self.pairs)

    def __getitem__(self, index):
        'Generates one sample of data'
        # image pairs
        X = self.pairs[index]
        # target:
        y = self.target[index]
        # classes:
        Y = self.classes[index]
        return X, y, Y

Create datasets (training and validation):

In [None]:
training_set = Dataset(train_input, train_target, train_classes)
test_set = Dataset(test_input, test_target, test_classes)

Have a look:

In [None]:
fig, ax = plt.subplots(6, 2, figsize=(5, 18))
for j in range(6):
    im1 = training_set.__getitem__(j)[0][0, :, :]
    im2 = training_set.__getitem__(j)[0][1, :, :]
    target = training_set.__getitem__(j)[1]
    classes = training_set.__getitem__(j)[2]
    ax[j, 0].imshow(im1, cmap='gray')
    ax[j, 1].imshow(im2, cmap='gray')
    ax[j, 1].set_title(f'Cipher: {classes[1]}')
    ax[j, 0].set_title(f'Cipher: {classes[0]}, target: {target}')

## Functions:

In [None]:
def train_loop(dataloader, model, loss_fn, optimizer, print_loss=True):
    size = len(dataloader.dataset)
    train_loss, correct = 0, 0
    aucs = []
    softmax = torch.nn.Softmax(dim=1)
    
    for batch, (X, y, Y) in enumerate(dataloader):
        # Compute prediction and loss:
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation:
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
        
        if print_loss:
            if batch % 10 == 0:
                loss, current = loss.item(), batch * len(X)
                print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")
            
        # Softmax to get probabilities:
        prob = softmax(pred)
        
        # compute accuracy and roc_auc:
        correct += (prob.argmax(1) == y).type(torch.float).sum().item()
        aucs.append(roc_auc_score(y.detach().numpy(), prob.detach().numpy()[:, 1]))
        
    # return average training loss:
    train_loss /= size
    correct *= 100 / size
    auc = sum(aucs)/len(aucs)
    
    return correct, train_loss, auc


def test_loop(dataloader, model, loss_fn, print_loss=True):
    size = len(dataloader.dataset)
    test_loss, correct = 0, 0
    aucs = []
    softmax = torch.nn.Softmax(dim=1)

    with torch.no_grad():
        for X, y, Y in dataloader:
            pred = model(X)
            test_loss += loss_fn(pred, y).item()

            # Softmax to get probabilities:
            prob = softmax(pred)
            
            # compute accuracy and roc_auc:
            correct += (prob.argmax(1) == y).type(torch.float).sum().item()
            aucs.append(roc_auc_score(y.detach().numpy(), prob.detach().numpy()[:, 1]))
            
    # return average test loss and accuracy:
    test_loss /= size
    correct *= 100 / size
    auc = sum(aucs)/len(aucs)
    
    if print_loss:
        print(
            f"Validation Error: \n Accuracy: {(correct):>0.1f}%, Avg loss: {test_loss:>8f} \n"
        )
        
    return correct, test_loss, auc


def train_eval(model, optimizer, loss_fn, epochs=25, save=False, print_loss=True, print_epoch=True):
    # Data loader for model, change num_workers when on GPU:
    params = {'batch_size': 64, 'shuffle': True, 'num_workers': 0}
    training_generator = torch.utils.data.DataLoader(training_set, **params)
    test_generator = torch.utils.data.DataLoader(test_set, **params)
    train_perf, test_perf = [], []

    for t in range(epochs):
        if print_epoch:
            print(f"Epoch {t+1}\n-------------------------------")
        train_perf.append(train_loop(training_generator, model, loss_fn, optimizer, print_loss=print_loss))
        test_perf.append(test_loop(test_generator, model, loss_fn, print_loss=print_loss))
        
    print("Done!")
    
    if save:
        torch.save({'model_state_dict': model.state_dict(),
                    'optimizer_state_dict': optimizer.state_dict(),
                    'train_perf': train_perf,
                    'test_perf': test_perf
                    }, '../data/{}'.format(type(model).__name__))
        print("Saved!")
    
    return train_perf, test_perf


def plot_performance(train_perf, test_perf):
    def sub_plot(axs_id, train_data, test_data, train_label, test_label, x_label, title):
        axs[axs_id].plot(train_data, label=train_label)
        axs[axs_id].plot(test_data, label=test_label)
        axs[axs_id].set_xlabel(x_label)
        axs[axs_id].set_title(title)
        axs[axs_id].legend()
        
    fig, axs = plt.subplots(1, 3, figsize=(8, 4))
    
    train_accs = list(zip(*train_perf))[0]
    test_accs = list(zip(*test_perf))[0]
    sub_plot(0, train_accs, test_accs, 'train accuracy', 'test accuracy', 'Num epochs', 'Accuracy')
    
    train_losses = list(zip(*train_perf))[1]
    test_losses = list(zip(*test_perf))[1]
    sub_plot(1, train_losses, test_losses, 'train loss', 'test loss', 'Num epochs', 'Loss')
    
    train_aucs = list(zip(*train_perf))[2]
    test_aucs = list(zip(*test_perf))[2]
    sub_plot(2, train_aucs, test_aucs, 'train auc', 'test auc', 'Num epochs', 'ROC AUC')
    
    plt.tight_layout()
    plt.show()


    
def plot_performance_runs(performance, metrics=['Accuracy', 'Loss', 'ROC_AUC'], style=0):
    df = pd.DataFrame(columns=['Run', 'is_Test', 'epoch'] + metrics)
    for i in range(len(performance)):
        for j in range(2):
            df_temp = pd.DataFrame(performance[i][j], columns=['Accuracy', 'Loss', 'ROC_AUC']).reset_index().rename(columns={'index':'epoch'})
            df_temp.insert(0, "is_Test", j)
            df_temp.insert(0, "Run", i)
            df = df.append(df_temp)
            df = df.reset_index().drop(columns=['index'])
    
    sns.set_theme()
    for metric in metrics:
        if style == 0:
            g = sns.lineplot(data=df, x='epoch', y=metric, hue='is_Test')
            g.legend(['Train','Test'])
        if style == 1:
            g = sns.lineplot(data=df, x='epoch', y=metric, hue='is_Test', style='Run')
            g.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
        plt.title("{} vs {}".format(metric, "epoch"))
        plt.show()

## Models:

### Model architectures:

#### Siamese network (with weight sharing): 
- Type: Siamese
- Layers: Fully connected
- Shared weights: Yes
- Loss: CE (cross entropy) 
- Activation function: softmax

In [None]:
class SiameseNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.input_size = 1 * 14 * 14
        hidden_sizes = [50, 50]
        # we need an intermediate output, because we are using a siamese network
        intermediate_output_size = 2
        # two digit output, probability of being 1 or 0:
        output_size = 2
        # flatten images to 1D input:
        self.flatten = nn.Flatten()
        # then two hidden layers:
        self.fc_seq = nn.Sequential(nn.Linear(self.input_size, hidden_sizes[0], bias=False),
                                   nn.ReLU(),
                                   nn.Linear(hidden_sizes[0], hidden_sizes[1], bias=False),
                                   nn.ReLU(),
                                   nn.Linear(hidden_sizes[1], intermediate_output_size, bias=False))
        self.fcout = nn.Linear(2*intermediate_output_size, output_size)

        
        
    def forward(self, x):
        x1 = x[:, 0].view(-1, self.input_size)
        x2 = x[:, 1].view(-1, self.input_size)
        output1 = self.fc_seq(x1)
        output2 = self.fc_seq(x2)
        output = torch.cat((output1, output2), 1)
        
        # flatten 2D->1D
        output = self.flatten(output)
        
        # predict probabilities:
        logits = self.fcout(output)
        return logits

#### Siamese network (without weight sharing): 
- Type: Siamese
- Layers: Fully connected
- Shared weights: No
- Loss: CE (cross entropy) 
- Activation function: softmax

In [None]:
class SiameseNoSharingNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.input_size = 1 * 14 * 14
        hidden_sizes = [50, 50]
        # we need an intermediate output, because we are using a siamese network
        intermediate_output_size = 2
        # two digit output, probability of being 1 or 0:
        output_size = 2
        # flatten images to 1D input:
        self.flatten = nn.Flatten()
        # then two hidden layers:
        self.fc_seq2 = nn.Sequential(nn.Linear(self.input_size, hidden_sizes[0], bias=False),
                                   nn.ReLU(),
                                   nn.Linear(hidden_sizes[0], hidden_sizes[1], bias=False),
                                   nn.ReLU(),
                                   nn.Linear(hidden_sizes[1], intermediate_output_size, bias=False))
        self.fc_seq1 = nn.Sequential(nn.Linear(self.input_size, hidden_sizes[0], bias=False),
                                   nn.ReLU(),
                                   nn.Linear(hidden_sizes[0], hidden_sizes[1], bias=False),
                                   nn.ReLU(),
                                   nn.Linear(hidden_sizes[1], intermediate_output_size, bias=False))
        self.fcout = nn.Linear(2*intermediate_output_size, output_size)

        
        
    def forward(self, x):
        x1 = x[:, 0].view(-1, self.input_size)
        x2 = x[:, 1].view(-1, self.input_size)
        output1 = self.fc_seq1(x1)
        output2 = self.fc_seq2(x2)
        output = torch.cat((output1, output2), 1)
        
        # flatten 2D->1D
        output = self.flatten(output)
        
        # predict probabilities:
        logits = self.fcout(output)
        return logits

#### Siamese convolutional network (with weight sharing): 
- Type: Siamese
- Layers: Convolutional and Fully connected (at the end)
- Shared weights: Yes
- Loss: CE (cross entropy) 
- Activation function: softmax

In [None]:
# Basic model with two layers and a two digit output:
class SiameseConvNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.length = 14
        self.input_size = 1 * 14 * 14
        # we need an intermediate output, because we are using a siamese network
        intermediate_output_size = 2
        # two digit output, probability of being 1 or 0:
        output_size = 2
        # flatten images to 1D input:
        self.flatten = nn.Flatten()
        # convolutional layers
        kernel_size = 3
        n_channel = 5
        self.conv_layer = nn.Sequential(nn.Conv2d(1, n_channel, kernel_size),
                                        nn.ReLU(),
                                        nn.Conv2d(n_channel, 2 * n_channel, kernel_size),
                                        nn.ReLU(),
                                        nn.Conv2d(2 * n_channel, 4 * n_channel, kernel_size),
                                        nn.BatchNorm2d(4 * n_channel),
                                        nn.ReLU(),
                                        nn.Dropout2d(p=0.3),
                                        nn.Conv2d(4 * n_channel, 8 * n_channel, kernel_size),
                                        nn.ReLU(),
                                        nn.Dropout2d(p=0.4),
                                        nn.Conv2d(8 * n_channel, 16 * n_channel, kernel_size),
                                        nn.ReLU(),
                                        nn.Dropout2d(p=0.5),
                                        nn.Conv2d(16 * n_channel, 32 * n_channel, kernel_size),
                                        nn.ReLU(),
                                        nn.Dropout2d(p=0.6)
                                        )
    
        def compute_conv2d_size(length):
            return  length - (kernel_size - 1) - 1 + 1
        
        length_out = self.length
        depth = 6
        
        for i in range(depth):
            length_out = compute_conv2d_size(length_out)
        
        concat_size = length_out * length_out * 2**depth * n_channel
#         self.fc = nn.Linear(concat_size, 20)
        self.fcout = nn.Linear(concat_size, output_size)

    def forward(self, x):
        x1 = x[:, 0].view(-1, 1, self.length, self.length)
        x2 = x[:, 1].view(-1, 1, self.length, self.length)
        output1 = self.conv_layer(x1)
        output2 = self.conv_layer(x2)
        output = torch.cat((output1, output2), 1)
        # flatten 2D->1D
        output = self.flatten(output)
#         output = self.fc(output)
        # predict probabilities:
        logits = self.fcout(output)
        return logits

#### Siamese convolutional network (without weight sharing): 
- Type: Siamese
- Layers: Convolutional and Fully connected (at the end)
- Shared weights: No
- Loss: CE (cross entropy) 
- Activation function: softmax

In [None]:
# Basic model with two layers and a two digit output:
class SiameseConvNoSharingNet(nn.Module):
    def __init__(self):
        super().__init__()       
        self.length = 14
        self.input_size = 1 * 14 * 14
        # we need an intermediate output, because we are using a siamese network
        intermediate_output_size = 2
        # two digit output, probability of being 1 or 0:
        output_size = 2
        # flatten images to 1D input:
        self.flatten = nn.Flatten()
        # convolutional layers
        kernel_size = 3
        n_channel = 3
        self.conv_layer1 = nn.Sequential(nn.Conv2d(1, n_channel, kernel_size),
                                        nn.ReLU(),
                                        nn.Conv2d(n_channel, 2 * n_channel, kernel_size),
                                        nn.ReLU(),
                                        nn.Conv2d(2 * n_channel, 4 * n_channel, kernel_size),
                                        nn.ReLU(),
                                        nn.Conv2d(4 * n_channel, 8 * n_channel, kernel_size),
                                        nn.ReLU()
                                        )
        self.conv_layer2 = nn.Sequential(nn.Conv2d(1, n_channel, kernel_size),
                                        nn.ReLU(),
                                        nn.Conv2d(n_channel, 2 * n_channel, kernel_size),
                                        nn.ReLU(),
                                        nn.Conv2d(2 * n_channel, 4 * n_channel, kernel_size),
                                        nn.ReLU(),
                                        nn.Conv2d(4 * n_channel, 8 * n_channel, kernel_size),
                                        nn.ReLU()
                                        )
    
        def compute_conv2d_size(length):
            return  length - (kernel_size - 1) - 1 + 1
        
        length_out = self.length
        depth = 4
        
        for i in range(depth):
            length_out = compute_conv2d_size(length_out)
        
        concat_size = length_out * length_out * 2**depth * n_channel
        self.fcout = nn.Linear(concat_size, output_size)

    def forward(self, x):
        x1 = x[:, 0].view(-1, 1, self.length, self.length)
        x2 = x[:, 1].view(-1, 1, self.length, self.length)
        output1 = self.conv_layer1(x1)
        output2 = self.conv_layer2(x2)
        output = torch.cat((output1, output2), 1)
        # flatten 2D->1D
        output = self.flatten(output)
        # predict probabilities:
        logits = self.fcout(output)
        return logits

## Train and evaluate

In [None]:
# ------------------------------------------------------------------
# Control the randomness
# ------------------------------------------------------------------
torch.manual_seed(0)

# ------------------------------------------------------------------
# Perform n_runs runs
# ------------------------------------------------------------------
perf = []
n_runs = 10

for i in range(n_runs):
### ------------------------------------------------------------------
### Initialize/reset the model and optimizer (i.e. reset the weights)
### ------------------------------------------------------------------
    model = SiameseConvNet().to(device)
    learning_rate = 1e-3
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

### ------------------------------------------------------------------
### Train and evaluate
### ------------------------------------------------------------------
    loss_fn = nn.CrossEntropyLoss()
    perf += [train_eval(model, optimizer, loss_fn=loss_fn, print_loss=False, print_epoch=False)]

# ------------------------------------------------------------------
# Plot performance
# ------------------------------------------------------------------
plot_performance_runs(perf, ['Loss', 'Accuracy', 'ROC_AUC'])

# roc curves
y_probas = nn.functional.softmax(model(test_input), dim=1).detach().numpy()
y_test = test_target
skplt.metrics.plot_roc(y_test, y_probas)
plt.show()

### Predictions on test set:

In [None]:
# Make a few predictions:
# size = len(test_generator.dataset)
# softmax = torch.nn.Softmax(dim=1)
# fig, ax = plt.subplots(6, 2, figsize=(5, 18))

# with torch.no_grad():
#     for batch, (X, y, Y) in enumerate(test_generator):
#         if batch == 0:
#             pred = model(X)
#             prob = softmax(pred)
#             prediction = prob.argmax(1).type(torch.float)
#             for j in range(6):
#                 im1 = X[j][0, :, :]
#                 im2 = X[j][1, :, :]
#                 target = y[j]
#                 classes = Y[j]
#                 pred = prediction[j]
#                 ax[j, 0].imshow(im1, cmap='gray')
#                 ax[j, 1].imshow(im2, cmap='gray')
#                 ax[j, 0].set_title(
#                     f'Cipher: {classes[0]}, target: {target}, pred: {pred}')
#                 ax[j, 1].set_title(f'Cipher: {classes[1]}')