In [None]:
import torch

from torch import nn
from torch.nn import functional as F
from torch import optim
from torchvision import datasets
from statistics import mean
from statistics import stdev

import dlc_practical_prologue as prologue
import matplotlib.pyplot as plt
import time

import models

%load_ext autoreload
%autoreload 2

# Get the dataset

In [None]:
train_input, train_target, train_classes, test_input, test_target, test_classes  = prologue.generate_pair_sets(1000)
train_target.unsqueeze_(1); test_target.unsqueeze_(1)
print('train_input', train_input.size(), 'train_target', train_target.size(), 'train_classes', train_classes.size())
print('test_input', test_input.size(), 'test_target', test_target.size(), 'test_classes', test_classes.size())

# Models

In [None]:
# First model, simplest one
net1 = models.Net1()
# Second model introduces weight sharing for the convolutional layer
net2 = models.Net2()
# Third model, we use the label of the digits as an auxiliary loss
net3 = models.Net3()

# Training functions

In [None]:
def compute_nb_errors(model, data_input, data_target,batch_size=50):
    nb_data_errors = 0
    
    for inputs, targets in zip(data_input.split(batch_size), data_target.split(batch_size)):
        output = model(inputs)
        output = output.narrow(dim=1,start=0,length=1)
        output = torch.ge(output,0.5).float()
        for k in range(len(targets)):
            if output[k] != targets[k]:
                nb_data_errors += 1
                
    return nb_data_errors

In [None]:
def train_model(model, train_input, train_target, test_input, test_target,
                train_classes=None, use_auxiliary_losses=False,
                round=0, epochs=25,eta=0.4,batch_size=100):
    
    criterion = nn.BCELoss(reduction='mean')
    optimizer = optim.SGD(model.parameters(), lr=eta)
    
    mu, std = train_input.mean(), train_input.std()
    train_input.sub_(mu).div_(std)
    test_input.sub_(mu).div_(std)
    
    for i in range(epochs):
        for inputs, targets in zip(train_input.split(batch_size), train_target.split(batch_size)):
            output = model(inputs) 
            loss = criterion(output, targets.float())
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()   
        test_accuracy = compute_nb_errors(model, test_input, test_target)
        train_accuracy = compute_nb_errors(model, train_input, train_target)
        test_accuracy = 100 * (1 - test_accuracy / test_input.size(0))
        train_accuracy = 100 * (1 - train_accuracy / train_input.size(0))
        if (round==0):
            print(f"Epoch # {i+1} / train accuracy: {train_accuracy:.2f} / test accuracy: {test_accuracy:.2f}")
        
    if (round>0):
        print(f"Round # {round} / train accuracy: {train_accuracy:.2f} / test accuracy: {test_accuracy:.2f}")

    return test_accuracy;

In [None]:
def train_model_auxiliary_loss(model, train_input, train_target, test_input, test_target, train_classes,
                round=0, epochs=25,eta=0.4,batch_size=100):
    
    criterion = nn.BCELoss(reduction='mean')
    auxiliary_criterion = nn.NLLLoss()
    optimizer = optim.SGD(model.parameters(), lr=eta)
    
    mu, std = train_input.mean(), train_input.std()
    train_input.sub_(mu).div_(std)
    test_input.sub_(mu).div_(std)
    
    for i in range(epochs):
        # model.train(True)
        for inputs, targets, class_targets in zip(train_input.split(batch_size),
                                   train_target.split(batch_size),
                                   train_classes.split(batch_size)):
            output = model(inputs) 
            # Prediction of which digit is larger
            l_1 = criterion(output.narrow(dim=1,start=0,length=1), targets.float())
            # Auxiliary losses for prediciting the actual digits
            l_2 = auxiliary_criterion(output.narrow(dim=1,start=1,length=10),
                                      class_targets[:,0])
            l_3 = auxiliary_criterion(output.narrow(dim=1,start=11,length=10),
                                      class_targets[:,1])
            loss = l_1 + l_2 + l_3
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()   

        test_accuracy = compute_nb_errors(model, test_input, test_target)
        train_accuracy = compute_nb_errors(model, train_input, train_target)
        test_accuracy = 100 * (1 - test_accuracy / test_input.size(0))
        train_accuracy = 100 * (1 - train_accuracy / train_input.size(0))
        if (round==0):
            print(f"Epoch # {i+1} / train accuracy: {train_accuracy:.2f} / test accuracy: {test_accuracy:.2f}")
        
    if (round>0):
        print(f"Round # {round} / train accuracy: {train_accuracy:.2f} / test accuracy: {test_accuracy:.2f}")

    return test_accuracy;

# One Round Training

In [None]:
net1 = models.Net1()
_ = train_model(net1, train_input, train_target, test_input, test_target, epochs=10)

In [None]:
net2 = models.Net2()
_ = train_model(net2, train_input, train_target, test_input, test_target, epochs=10)

In [None]:
net3 = models.Net3()
_ = train_model_auxiliary_loss(net3, train_input, train_target, test_input, test_target, train_classes,epochs=10)

In [None]:
net4 = models.Net4()
_ = train_model_auxiliary_loss(net3, train_input, train_target, test_input, test_target, train_classes,epochs=10)

# Multiple Round Results

In [None]:
nb_round = 100
epochs_per_round = 60
test_accuracy_1 = [0] * nb_round
test_accuracy_2 = [0] * nb_round
test_accuracy_3 = [0] * nb_round
test_accuracy_4 = [0] * nb_round

print("*** Testing Model 1 ***")
for i in range(0, nb_round):
    t0 = time.perf_counter()
    net1 = models.Net1()
    test_accuracy_1[i] = train_model(net1, train_input, train_target, test_input,
                                     test_target, round=i+1,epochs=epochs_per_round)
t_tot = time.perf_counter() - t0
print(f"Mean : {mean(test_accuracy_1):.2f} / STD : {stdev(test_accuracy_1):.2f} / Total Time : {t_tot:.2f} / Mean Time : {t_tot/nb_round:.2f}")

print("*** Testing Model 2 ***")
for i in range(0, nb_round):
    t0 = time.perf_counter()
    net2 = models.Net2()
    test_accuracy_2[i] = train_model(net2, train_input, train_target, test_input,
                                     test_target, round=i+1,epochs=epochs_per_round)
t_tot = time.perf_counter() - t0
print(f"Mean : {mean(test_accuracy_2):.2f} / STD : {stdev(test_accuracy_2):.2f} / Total Time : {t_tot:.2f} / Mean Time : {t_tot/nb_round:.2f}")

print("*** Testing Model 3 ***")
for i in range(0, nb_round):
    t0 = time.perf_counter()
    net3 = models.Net3()
    test_accuracy_3[i] = train_model_auxiliary_loss(net3, train_input, train_target, test_input,
                                                    test_target, train_classes,
                                                    round=i+1,epochs=epochs_per_round)
t_tot = time.perf_counter() - t0
print(f"Mean : {mean(test_accuracy_3):.2f} / STD : {stdev(test_accuracy_3):.2f} / Total Time : {t_tot:.2f} / Mean Time : {t_tot/nb_round:.2f}")

print("*** Testing Model 4 ***")
for i in range(0, nb_round):
    t0 = time.perf_counter()
    net4 = models.Net4()
    test_accuracy_4[i] = train_model_auxiliary_loss(net4, train_input, train_target, test_input,
                                                    test_target, train_classes,
                                                    round=i+1,epochs=epochs_per_round)
t_tot = time.perf_counter() - t0
print(f"Mean : {mean(test_accuracy_4):.2f} / STD : {stdev(test_accuracy_4):.2f} / Total Time : {t_tot:.2f} / Mean Time : {t_tot/nb_round:.2f}")



In [None]:
print(f"Mean : {mean(test_accuracy_4):.2f} / STD : {stdev(test_accuracy_4):.2f} / Total Time : {t_tot:.2f} / Mean Time : {t_tot/nb_round:.2f}")


# Miscellaneous

In [None]:
res = net(test_input[0:20,0].reshape(20,1,14,14), test_input[0:20,1].reshape(20,1,14,14))
res[res>0.5] = 1
res[res <= 0.5] = 0
for i in range(0, 20):
    plt.subplot(121),plt.imshow(test_input[i, 0].view(14,14)),plt.title('Original')
    plt.xticks([]), plt.yticks([])
    plt.subplot(122),plt.imshow(test_input[i, 1].view(14,14)),plt.title('gradient')
    plt.xticks([]), plt.yticks([])
    plt.show()
    print(res[i])

In [None]:
res

In [None]:
print(sum(p.numel() for p in net.parameters() if p.requires_grad))


In [None]:
https://www.overleaf.com/6688321767qwjpzsgnrdqb