# FancyNet2
FancyNet2 applies a convolutional network to both channels of the input data separately. I added one more convolution with respect to FancyNet1. The output feature vectors of the conv. net are then used in two different ways:
- Directly fed into digit classifier -> Classify digits
- Concatenated to form a feature vector of 2 x output of convnet. Then fed into classifier for final binary decision

The training is based on the weighted sum of three losses:
- Loss from digit classifier of channel 0
- Loss from digit classifier of channel 1
- Loss from final binary classifier

In [None]:
import sys
import matplotlib.pyplot as plt


import torch
from torch.autograd import Variable
from torch import nn
from torch import optim
from torch.nn import functional as F

import dlc_practical_prologue as prologue

In [None]:
def normalize_data(data):
    mu, std = data[0].mean(), data[0].std()
    
    data[0].sub_(mu).div_(std)
    data[3].sub_(mu).div_(std)
    
    return data

data = prologue.generate_pair_sets(1000)
data = normalize_data(data)

In [None]:
class FancyNet2(nn.Module):
    def __init__(self):
        super(FancyNet2, self).__init__()
        
        # convolutional network for feature extraction
        self.features = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.ReLU(inplace=True),
            nn.Conv2d(32, 64, kernel_size=3),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 512, kernel_size=2),
            nn.ReLU(inplace=True),
        )
        
        # fully connected layer for digit classification
        self.classifierNumber = nn.Sequential(
            nn.Dropout(),
            nn.Linear(512, 512),
            nn.ReLU(inplace=True),
            nn.Dropout(),
            nn.Linear(512, 10),
        )
        
        # fully connected layer for final decision
        self.classifierFinal = nn.Sequential(
            nn.Dropout(),
            nn.Linear(2*512, 1000),
            nn.ReLU(inplace=True),
            nn.Dropout(),
            nn.Linear(1000, 2),
        )
    
    def forward(self, x):
        
        # classification of digit of channel 0
        x_num1 = self.features(x[:,0:1,:,:])
        x_num1 = x_num1.view(-1, 512)
        
        # classification of digit of channel 1
        x_num2 = self.features(x[:,1:,:,:])
        x_num2 = x_num2.view(-1, 512)
        
        x_f = torch.cat((x_num1, x_num2), 1)

        x_num1 = self.classifierNumber(x_num1)
        x_num2 = self.classifierNumber(x_num2)
        x_f = self.classifierFinal(x_f)
        
        return x_num1, x_num2, x_f 

In [None]:
def train_model(model, train_data, nb_epochs, lr):
    
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr, momentum = 0.3)
    #optimizer = optim.Adam(model.parameters(), lr)
    
    # weights for different losses
    w_1 = 1
    w_2 = 1
    w_f = 1

    for e in range(nb_epochs):
        sum_loss = 0
        for b in range(0, train_data[0].size(0), mini_batch_size):
            out_1, out_2, out_f = model(train_data[0].narrow(0, b, mini_batch_size))
            
            # losses of digit classification
            loss_1 = criterion(out_1, train_data[2][b:b + mini_batch_size, 0])
            loss_2 = criterion(out_2, train_data[2][b:b + mini_batch_size, 1])
            
            # loss of final classification
            loss_f = criterion(out_f, train_data[1].narrow(0, b, mini_batch_size))
            
            # total loss
            loss = w_1 * loss_1 + w_2 * loss_2 + w_f * loss_f
            model.zero_grad()
            loss.backward()
            optimizer.step()
            sum_loss = sum_loss + loss.item()
            
        print(e, sum_loss)
        

In [None]:
def compute_nb_errors(model, data):

    nb_num1_errors = 0
    nb_num2_errors = 0
    nb_final_errors = 0

    for b in range(0, data[0].size(0), mini_batch_size):
        out_1, out_2, out_f = model(data[0].narrow(0, b, mini_batch_size))
        
        pred_num_1 = torch.argmax(out_1, 1)
        pred_num_2 = torch.argmax(out_2, 1)
        predicted_decision = torch.argmax(out_f, 1)

        for k in range(mini_batch_size):
            if data[2][b + k, 0] != pred_num_1[k] :
                nb_num1_errors = nb_num1_errors + 1
            if data[2][b + k, 1] != pred_num_2[k] :
                nb_num2_errors = nb_num2_errors + 1
            if data[1][b + k] != predicted_decision[k] :
                nb_final_errors = nb_final_errors + 1

    return nb_num1_errors, nb_num2_errors, nb_final_errors

In [None]:
mini_batch_size = 100
nb_epochs = 50
lr = 1e-1

fancyNet2 = FancyNet2()
train_model(fancyNet2, data, nb_epochs, lr)

In [None]:
nb_train_errors = compute_nb_errors(fancyNet2, data[0:3])
nb_test_errors = compute_nb_errors(fancyNet2, data[3:6])

train_size = data[0].size(0)
test_size = data[3].size(0)


print('train errors: \n \t num 1 errors : {:0.2f}% {:g}/{:g} \n \
\t num 2 errors : {:0.2f}% {:g}/{:g} \n \
\t final errors : {:0.2f}% {:g}/{:g}'.format((100 * nb_train_errors[0]) / train_size, nb_train_errors[0], train_size,
(100 * nb_train_errors[1]) / train_size, nb_train_errors[1], train_size,
(100 * nb_train_errors[2]) / train_size, nb_train_errors[2], train_size))

print('test errors: \n \t num 1 errors : {:0.2f}% {:g}/{:g} \n \
\t num 2 errors : {:0.2f}% {:g}/{:g} \n \
\t final errors : {:0.2f}% {:g}/{:g}'.format((100 * nb_test_errors[0]) / test_size, nb_test_errors[0], test_size,
(100 * nb_test_errors[1]) / test_size, nb_test_errors[1], test_size,
(100 * nb_test_errors[2]) / test_size, nb_test_errors[2], test_size))

