In [None]:
import os
import numpy as np
from PIL import Image

import torch
import torchvision   
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import pandas as pd

from google.colab import drive
drive.mount('/content/gdrive')

"""# **Uploading Dataset from Kaggle and Unzipping the Folders**"""

from google.colab import files
kaggle=files.upload()

!pip install -q kaggle

!mkdir -p ~/.kaggle

!cp kaggle.json ~/.kaggle/

!kaggle datasets download -d cmu11785/20fall-hw2p2

!unzip -q 20fall-hw2p2.zip

"""# **Defining the Neural Network building blocks**
#The building blocks are based upon the base model architecture decribed on Piazza
"""

class ConvReLU(nn.Sequential):
    def __init__(self, in_channels, out_channels, kernel_size=3, stride=1):
        padding = (kernel_size - 1) // 2
        super(ConvReLU, self).__init__(
            nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding,bias=False),
            nn.BatchNorm2d(out_channels, affine = True, track_running_stats = True),
            nn.ReLU(inplace=True),
            nn.Dropout(p = 0.3)
            
        )

class ConvReLU2(nn.Sequential):
    def __init__(self, in_channels, out_channels, kernel_size=3, stride=1):
        padding = (kernel_size - 1) // 2
        super(ConvReLU2, self).__init__(
            nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding, bias=False),
            nn.BatchNorm2d(out_channels, affine = True, track_running_stats = True),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2,2)
        )

"""# **Developing the CNN Model**
# Built using the two building blocks. I used the base architecture from piazza with an additional Conv2d layer. By adding an extra Conv2d Layer I made my network deeper, hence improving the networks performance
"""

class Network(nn.Module):
    def __init__(self, num_feats, hidden_sizes, num_classes, feat_dim=15):
        super(Network, self).__init__()
        
        self.hidden_sizes = [num_feats] + hidden_sizes + [num_classes]
        
        self.layers = []
        
        self.layers.append(ConvReLU(3, 64, kernel_size = 3, stride = 1))
        self.layers.append(ConvReLU2(64, 64, kernel_size = 3, stride = 1))
        self.layers.append(ConvReLU2(64, 64, kernel_size = 3, stride = 1))
        self.layers.append(ConvReLU(64, 128, kernel_size = 3, stride = 1))
        self.layers.append(ConvReLU2(128, 128, kernel_size = 3, stride = 1))
        self.layers.append(ConvReLU(128, 256, kernel_size = 3, stride = 1))
        self.layers.append(ConvReLU2(256, 256, kernel_size = 3, stride = 1))
        self.layers.append(ConvReLU2(256, 256, kernel_size = 3, stride = 1))
        self.layers.append(ConvReLU2(256, 256, kernel_size = 3, stride = 1))
        self.layers.append(nn.Conv2d(256,256,3,1,1))
            
           
    
        self.layers = nn.Sequential(*self.layers)
        self.linear_label = nn.Linear(256, num_classes, bias=False)
        
    
    def forward(self, x, evalMode=False):
        output = x
        output = self.layers(output)
            
        output = F.max_pool2d(output, [output.size(2), output.size(3)], stride=1)
        output = output.reshape(output.shape[0], output.shape[1])
        
        label_output = self.linear_label(output)
        
        
        
        closs_output = output

        return closs_output, label_output

def init_weights(m):
    if type(m) == nn.Conv2d or type(m) == nn.Linear:
        torch.nn.init.xavier_normal_(m.weight.data)

"""# **Definied the Training and Classification Method. The code structure is based upon example code given in the 'CNN:Losses, transfer learning' recitation. The recitation occured on OCT. 2nd**"""

def train(model, data_loader, test_loader, task='Classification'):
    model.train()

    for epoch in range(numEpochs):
        avg_loss = 0.0
        for batch_num, (feats, labels) in enumerate(data_loader):
            feats, labels = feats.to(device), labels.to(device)
            
            optimizer.zero_grad()
            outputs = model(feats)[1]

            loss = criterion(outputs, labels.long())
            loss.backward()
            optimizer.step()
            
            avg_loss += loss.item()

            if batch_num % 50 == 49:
                print('Epoch: {}\tBatch: {}\tAvg-Loss: {:.4f}'.format(epoch+1, batch_num+1, avg_loss/50))
                avg_loss = 0.0    
            
            torch.cuda.empty_cache()
            del feats
            del labels
            del loss
        
        if task == 'Classification':
            val_loss, val_acc = test_classify(model, test_loader)
            train_loss, train_acc = test_classify(model, data_loader)
            print('Train Loss: {:.4f}\tTrain Accuracy: {:.4f}\tVal Loss: {:.4f}\tVal Accuracy: {:.4f}'.
                  format(train_loss, train_acc, val_loss, val_acc))
        else:
         
            pass


def test_classify(model, test_loader):
    model.eval()
    test_loss = []
    accuracy = 0
    total = 0

    for batch_num, (feats, labels) in enumerate(test_loader):
        feats, labels = feats.to(device), labels.to(device)
        outputs = model(feats)[1]
        
        _, pred_labels = torch.max(F.softmax(outputs, dim=1), 1)
        pred_labels = pred_labels.view(-1)
        
        loss = criterion(outputs, labels.long())
        
        accuracy += torch.sum(torch.eq(pred_labels, labels)).item()
        total += len(labels)
        test_loss.extend([loss.item()]*feats.size()[0])
        del feats
        del labels

    model.train()
    return np.mean(test_loss), accuracy/total

"""# **Defining the Co-sine Similarity Method. This Method would be used for the face verication objective. The method outputs a similarity score between a pair of image embeddings optained from the CNN model**"""

def cos_sim(model, final_loader):
    with torch.no_grad():
        model.eval()
        predict=[]

        for batch_idx, (data1, data2) in enumerate(final_loader):
            data1 = data1.to(device)
            data2 = data2.to(device)
            img1Embed = model(data1)[0]
            img2Embed = model(data2)[0]

            simcpu = F.cosine_similarity(img1Embed, img2Embed)
            predict = np.concatenate((predict, simcpu.cpu().reshape(-1)))
            del data1
            del data2

    return np.array(predict)

"""# **Loading my Training and Validation Data by utilizing 'Data Loader' and 'Torch Vision'**"""

train_dataset = torchvision.datasets.ImageFolder(root='classification_data/train_data',transform=torchvision.transforms.ToTensor())
train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=200, 
                                               shuffle=True, num_workers=5)

dev_dataset = torchvision.datasets.ImageFolder(root='classification_data/val_data', 
                                               transform=torchvision.transforms.ToTensor())
dev_dataloader = torch.utils.data.DataLoader(dev_dataset, batch_size=200, 
                                             shuffle=True, num_workers=5)

"""# **Defining: hyperparameters, number of epochs, loss fuction and optimization algorithim**"""

numEpochs = 1
num_feats = 10

learningRate = 0.15
weightDecay = 5e-5

hidden_sizes = [64,128, 256, 510]
num_classes = len(train_dataset.classes)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

network = Network(num_feats, hidden_sizes, num_classes)
network.apply(init_weights)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(network.parameters(), lr=learningRate, weight_decay=weightDecay, momentum=0.9)

"""# **Initiating the training of the developed network**"""

network.train()
network.to(device)
train(network, train_dataloader, dev_dataloader)

"""# **Developing a Method that extracts a couple of imgaes from the test data set and inputs them in to the trained network. The network outputs a face embedding for each image. The co-sine similarity method takes the face embeddings as an input and gives a similarity score between both images. This method achieves the verification objective.**"""

class ImageDataset(Dataset):
    file_pairs = Dataset
    def __init__(self, file_pairs):
        with open(file_pairs) as files:
            self.file_list = [line.rstrip() for line in files]

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, index):
        i1, i2 = self.file_list[index].split()
        img1 = Image.open(i1)
        img2 = Image.open(i2)
        img1 = torchvision.transforms.ToTensor()(img1)
        img2 = torchvision.transforms.ToTensor()(img2)
        return img1, img2 

    def getPairs(self):
        return self.file_list

sub_data = ImageDataset("verification_pairs_test.txt")

final_loader = DataLoader(sub_data, shuffle=False, batch_size=200, num_workers=1, pin_memory=True)

trial = np.array(sub_data.getPairs())

test_score = cos_sim(network, final_loader)

"""# **Saving the results of the verification test on a CSV file**"""

test_score = np.array(test_score)
df = pd.DataFrame({"Id" : trial, "Category" : test_score})
df.to_csv(r'./sikandab_HW2P2_4.csv', index=False)

"""# **DISCLAIMER: THE SKELETON OF THE CODE AND VARIOUS FUNCTIONS/CLASSES OF THE CODE ARE BASED UPON EXAMPLE CODE GIVEN IN RECITATION 5 (OCT. 2ND). THE FILE NAMES OF THE EXAMPLE CODE IS: 'recitation'**"""