In [1]:
# PyTorch imports
import torch
from torch import nn
from torch import optim
from torch.nn import functional as F
import numpy as np
# DLC Practical prologue
import dlc_practical_prologue as prologue

In [2]:
# Load data
N = 1000
train_input, train_target, train_classes, test_input, test_target, test_classes = prologue.generate_pair_sets(N)

In [3]:
# Data sizes for reference
print('Train input:', train_input.size())
print('Train target:', train_target.size())
print('Train classes:', train_classes.size())
print('... Same for test too')

Train input: torch.Size([1000, 2, 14, 14])
Train target: torch.Size([1000])
Train classes: torch.Size([1000, 2])
... Same for test too


In [4]:
class Baseline_Net(nn.Module):
    def __init__(self,params):
        super().__init__()
        self.use_weight_sharing=params[0]
        self.use_auxiliary_loss=params[1]
        self.conv_out = 64*6*6
        
        # X1
        self.x1_conv1 = nn.Conv2d(1, 32, kernel_size=3,padding=1)
        self.x1_conv2 = nn.Conv2d(32, 64, kernel_size=3,padding=1)
        self.x1_fc1 = nn.Linear(self.conv_out, 64)
        self.x1_fc2 = nn.Linear(64, 10)
        # X2
        if not self.use_weight_sharing:
            self.x2_conv1 = nn.Conv2d(1, 32, kernel_size=3,padding=1)
            self.x2_conv2 = nn.Conv2d(32, 64, kernel_size=3,padding=1)
            self.x2_fc1 = nn.Linear(self.conv_out, 64)
            self.x2_fc2 = nn.Linear(64, 10)
        # Combine
        self.comp_fc1 = nn.Linear(20, 100)
        self.comp_fc2 = nn.Linear(100, 50)
        self.comp_fc3 = nn.Linear(50, 2)
        # Dropout
        self.dropout=nn.Dropout()
        
    
    def forward(self, x):
    
        # X1
        x1 = F.relu(F.max_pool2d(self.x1_conv1(x[:,0:1]), kernel_size=3, stride=1))
        x1 = F.relu(F.max_pool2d(self.x1_conv2(x1), kernel_size=2, stride=2))
        x1 = x1.reshape(x1.size(0), -1)
        x1 = self.dropout(F.relu(self.x1_fc1(x1)))
        x1 = self.x1_fc2(x1)
        
        # X2
        if not self.use_weight_sharing:
            x2 = F.relu(F.max_pool2d(self.x2_conv1(x[:,1:2]), kernel_size=3, stride=1))
            x2 = F.relu(F.max_pool2d(self.x2_conv2(x2), kernel_size=2, stride=2))
            x2 = x2.reshape(x2.size(0), -1)
            x2 = self.dropout(F.relu(self.x2_fc1(x2)))
            x2 = self.x2_fc2(x2)
        else: 
            x2 = F.relu(F.max_pool2d(self.x1_conv1(x[:,1:2]), kernel_size=3, stride=1))
            x2 = F.relu(F.max_pool2d(self.x1_conv2(x2), kernel_size=2, stride=2))
            x2 = x2.reshape(x2.size(0), -1)
            x2 = self.dropout(F.relu(self.x1_fc1(x2)))
            x2 = self.x1_fc2(x2)
            
        # Combine
        x = F.relu(self.comp_fc1(torch.cat((x1, x2), 1)))
        x = F.relu(self.comp_fc2(x))
        x = torch.sigmoid(self.comp_fc3(x))
            
        if self.use_auxiliary_loss:
            return x, x1.softmax(1), x2.softmax(1)
        else:
            return x
    

In [5]:
import itertools

def train_model(model_type, train_input, train_target, train_classes, test_input, test_target, test_classes, mini_batch_size, nb_epochs, nb_iterations):
    
    model=None
    eta = 1e-1
    net_types=[(False,False),(True,False),(False,True),(True,True)]
    alpha=0.1
    for params in net_types:
        print("Training","-"*100)
        print("Using Weight Sharing:",params[0])
        print("Using Auxiliary Loss:",params[1])
        use_auxiliary_loss=params[1]
        
        
        tot_err=0.0
        for k in range(nb_iterations):
            if model_type=="shallow":
                model = Baseline_Net(params)
#             elif model_type=="deep":
#                 model = ConvNet(params)
#             elif model_type=="very_deep":
#                 model = DeepConvNet(params)
            model.to(device)
            
            optimizer= torch.optim.SGD(model.parameters(), lr = eta)
            criterion = nn.CrossEntropyLoss()
            
            for e in range(nb_epochs):

                for b in range(0, train_input.size(0), mini_batch_size):
                    if use_auxiliary_loss:
                        output,op2,op3 = model(train_input.narrow(0, b, mini_batch_size).to(device))
                        loss1 = criterion(output, train_target.narrow(0, b, mini_batch_size).to(device))
                        loss2 = criterion(op2, train_classes[:, 0].narrow(0, b, mini_batch_size).to(device))
                        loss3 = criterion(op3, train_classes[:, 1].narrow(0, b, mini_batch_size).to(device))
                        loss = (1-alpha)*loss1 + alpha*(loss2 + loss3)
                    else:
                        output = model(train_input.narrow(0, b, mini_batch_size).to(device))
                        loss = criterion(output, train_target.narrow(0, b, mini_batch_size).to(device))

                    optimizer.zero_grad()
                    loss.backward()
                    optimizer.step()

            if use_auxiliary_loss:
                (nb_errors,nb_errors2,nb_errors3)=compute_nb_errors(model, test_input, test_target, test_classes)
                print('test error Net {:0.2f}% {:d}/{:d}'.format((100 * nb_errors) / test_input.size(0),
                                                              nb_errors, test_input.size(0)))
                print('test error X1 {:0.2f}% {:d}/{:d}'.format((100 * nb_errors2) / test_classes.size(0),
                                                              nb_errors2, test_target.size(0)))
                print('test error X2 {:0.2f}% {:d}/{:d}'.format((100 * nb_errors3) / test_classes.size(0),
                                                              nb_errors3, test_classes.size(0)))
                tot_err+=nb_errors
                
            else:
                nb_test_errors = compute_nb_errors(model, test_input, test_target, test_classes)
                tot_err+=nb_test_errors
                print('test error Net {:0.2f}% {:d}/{:d}'.format((100 * nb_test_errors) / test_input.size(0),
                                                              nb_test_errors, test_input.size(0)))

        print("Avg. Error: ",(tot_err/(10*nb_iterations)), "%")    
    
def compute_nb_errors(model, test_input, test_target, test_classes):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        
    nb_errors = 0
    if model.use_auxiliary_loss:
        o1,o2,o3=model(test_input.to(device))
    else:
        o1=model(test_input.to(device))
    output1 = torch.argmax(o1, dim=1)
    expected1 = test_target.to(device)
    nb_errors = expected1.shape[0]-np.count_nonzero((expected1==output1).cpu().numpy())
    if model.use_auxiliary_loss:
        output2 = torch.argmax(o2, dim=1)
        expected2 = test_classes[:,0].to(device)
        nb_errors2 = expected2.shape[0]-np.count_nonzero((expected2==output2).cpu().numpy())
        output3 = torch.argmax(o3, dim=1)
        expected3 = test_classes[:,1].to(device)
        nb_errors3 = expected3.shape[0]-np.count_nonzero((expected3==output3).cpu().numpy())
        return (nb_errors,nb_errors2,nb_errors3)
    else:
         return nb_errors

In [6]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', device)
train_model('shallow',train_input, train_target, train_classes, test_input, test_target, test_classes, 100, 50, 10)

Using device: cuda
Training ----------------------------------------------------------------------------------------------------
Using Weight Sharing: False
Using Auxiliary Loss: False
test error Net 15.90% 159/1000
test error Net 16.90% 169/1000
test error Net 17.00% 170/1000
test error Net 15.80% 158/1000
test error Net 14.60% 146/1000
test error Net 14.90% 149/1000
test error Net 18.80% 188/1000
test error Net 15.50% 155/1000
test error Net 15.50% 155/1000
test error Net 19.30% 193/1000
Avg. Error:  16.42 %
Training ----------------------------------------------------------------------------------------------------
Using Weight Sharing: True
Using Auxiliary Loss: False
test error Net 11.60% 116/1000
test error Net 14.00% 140/1000
test error Net 15.70% 157/1000
test error Net 12.50% 125/1000
test error Net 16.80% 168/1000
test error Net 13.50% 135/1000
test error Net 16.20% 162/1000
test error Net 12.90% 129/1000
test error Net 12.90% 129/1000
test error Net 15.20% 152/1000
Avg. Erro

In [7]:
# m1 = train_input.mean()
# s1 = train_input.std()
# m3 = test_input.mean()
# s3 = test_input.std()
# x_train=train_input.sub_(m1).div_(s1)
# x_test=test_input.sub_(m3).div_(s3)


# def execute_norm(model_nm, nb_iterations, nb_epochs=20):
#     model=None
#     nb_epochs = nb_epochs
#     mini_batch_size = 50
#     eta = 1e-1
#     use_gpu=True
#     tot_err=0
#     for k in range(nb_iterations):
#         if(model_nm=="base"):
#             model = Baseline_Net()
#         else:
#             model = WtSharing_Net()
#         optimizer = torch.optim.SGD(model.parameters(), lr = eta)
#         criterion = nn.CrossEntropyLoss()#MSELoss()
#         train_model(model, x_train, train_target, train_classes, mini_batch_size, optimizer, criterion, nb_epochs, use_gpu)
#         nb_test_errors = compute_nb_errors(model, x_test, test_target, test_classes, mini_batch_size,use_gpu)
#         tot_err+=nb_test_errors
#         print('test error Net {:0.2f}% {:d}/{:d}'.format((100 * nb_test_errors) / x_test.size(0),
#                                                       nb_test_errors, x_test.size(0)))
#     print("Avg. Error: ",(tot_err/(10*nb_iterations)), "%")    


In [8]:
# from torchsummary import summary
# print(summary(model_base.to("cuda"),input_size=(2, 14, 14)))
# print(summary(model_ws.to("cuda"),input_size=(2, 14, 14)))

In [9]:
# !pip install torchsummary 