<a href="https://colab.research.google.com/github/manashpratim/Face-Recognition-/blob/master/HW2P2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

In [0]:
import os
os.environ['CUDA_LAUNCH_BLOCKING'] = '1' 

In [0]:
import os
import numpy as np
from PIL import Image
import torchvision.transforms as transforms 
import torchvision 
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
from torch import optim
from torchvision import datasets, transforms, models
import sklearn.metrics
import sys
from torch.utils import data
from torch.optim.lr_scheduler import StepLR
import time
from torch.optim.lr_scheduler import ReduceLROnPlateau
cuda = torch.cuda.is_available()
cuda

# **Classification**

In [0]:
!unzip '/content/drive/My Drive/train_data.zip'
!unzip '/content/drive/My Drive/validation_classification.zip'
!unzip '/content/drive/My Drive/test_classification.zip'

In [0]:
class ImageDataset(Dataset):
    def __init__(self, file_list):
        self.file_list = file_list

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, index):
        img = Image.open(self.file_list[index])
        img = torchvision.transforms.ToTensor()(img)
        return img

In [0]:
train_set= torchvision.datasets.ImageFolder(root='/content/train_data/medium', transform=torchvision.transforms.ToTensor())
train_loader= DataLoader(train_set, batch_size=256, shuffle=True, num_workers=8)

val_set= torchvision.datasets.ImageFolder(root='/content/validation_classification/medium', transform=torchvision.transforms.ToTensor())
val_loader= DataLoader(val_set, batch_size=128, shuffle=False, num_workers=8)

test_file = open("/content/drive/My Drive/test_order_classification.txt","r").read().split('\n')
test_files = ['/content/test_classification/medium/'+i for i in test_file ]

# Use either of the two lines below. Comment one of them.
test_set= torchvision.datasets.ImageFolder(root='/content/test_classification/', transform=torchvision.transforms.ToTensor())
test_set = ImageDataset(test_files)
test_loader = DataLoader(test_set, batch_size=1, shuffle=False, num_workers=1, drop_last=False)

# **ResNet 50**

**PyTorch**

In [0]:
class convolution(torch.nn.Module):
  
    def __init__(self,in_channels,out_channels,stride=1):
        super(convolution,self).__init__()
        self.conv1 = torch.nn.Conv2d(in_channels,out_channels, kernel_size=1,stride=1,bias = False)
        self.bn1 = torch.nn.BatchNorm2d(out_channels)

        self.conv2 = torch.nn.Conv2d(out_channels, out_channels, kernel_size=3,stride=stride, padding=1,bias = False)
        self.bn2 = torch.nn.BatchNorm2d(out_channels)
        self.conv3 = torch.nn.Conv2d(out_channels,out_channels*4,kernel_size=1, stride=1, bias = False)
        self.bn3 = torch.nn.BatchNorm2d(out_channels*4)

        self.conv3_sc = torch.nn.Conv2d(in_channels,out_channels*4,kernel_size=1,stride=stride,bias = False)
        self.bn3_sc = torch.nn.BatchNorm2d(out_channels*4)
   
    def forward(self,x):
        res = x
        output = F.relu(self.bn1(self.conv1(x)))
        output = F.relu(self.bn2(self.conv2(output)))
        output = self.bn3(self.conv3(output))

        res = self.bn3_sc(self.conv3_sc(res))
        output = output + res
        output = F.relu(output)
        return output

class identity(torch.nn.Module):
  
    def __init__(self,in_channels,out_channels):
        super(identity,self).__init__()

        self.conv1 = torch.nn.Conv2d(in_channels,out_channels, kernel_size=1,stride=1,bias = False)
        self.bn1 = torch.nn.BatchNorm2d(out_channels)

        self.conv2 = torch.nn.Conv2d(out_channels, out_channels, kernel_size=3,stride=1, padding=1,bias = False)
        self.bn2 = torch.nn.BatchNorm2d(out_channels)

        self.conv3 = torch.nn.Conv2d(out_channels,out_channels*4,kernel_size=1, stride=1, bias = False)
        self.bn3 = torch.nn.BatchNorm2d(out_channels*4)

  
    def forward(self,x):
        res = x
        output = F.relu(self.bn1(self.conv1(x)))
        output = F.relu(self.bn2(self.conv2(output)))
        output = self.bn3(self.conv3(output))

        output = output + res
        output = F.relu(output)
        return output

class Resnet50(torch.nn.Module):
    def __init__(self,classes=2300):
        super(Resnet50,self).__init__()
        self.input_planes = 64
        self.conv1 = torch.nn.Conv2d(3,64,kernel_size=3,stride=1,padding=1,bias = False)
        self.bn1 = torch.nn.BatchNorm2d(64)

        self.layer11 = convolution(64,64,stride=1)
        self.layer12 = identity(256,64)
        self.layer13 = identity(256,64)

        self.layer21 = convolution(256,128,stride=2)
        self.layer22 = identity(512,128)
        self.layer23 = identity(512,128)
        self.layer24 = identity(512,128)

        self.layer31 = convolution(512,256,stride=2)
        self.layer32 = identity(1024,256)
        self.layer33 = identity(1024,256)
        self.layer34 = identity(1024,256)
        self.layer35 = identity(1024,256)
        self.layer36 = identity(1024,256)

        self.layer41 = convolution(1024,512,stride=2)
        self.layer42 = identity(2048,512)
        self.layer43 = identity(2048,512)

        self.avgpool = torch.nn.AdaptiveAvgPool2d((1, 1))
        
        self.fc = torch.nn.Linear(2048,classes,bias = False)
        
    def forward(self,x):

        x = F.relu(self.bn1(self.conv1(x)))

        x = self.layer11(x)
        x = self.layer12(x)
        x = self.layer13(x)
        
        x = self.layer21(x)
        x = self.layer22(x)
        x = self.layer23(x)
        x = self.layer24(x)
        x = self.layer24(x)

        x = self.layer31(x)
        x = self.layer32(x)
        x = self.layer33(x)
        x = self.layer34(x)
        x = self.layer35(x)
        x = self.layer36(x)

        x = self.layer41(x)
        x = self.layer42(x)
        x = self.layer43(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)/torch.norm(self.fc.weight,dim=1)


        return x

In [0]:
def train(model, data_loader, test_loader,numEpochs):
    model.train()

    for epoch in range(numEpochs):
        avg_loss = 0.0
        train_loss = []
        accuracy = 0
        total = 0
        start_time = time.time()
        for batch_num, (feats, labels) in enumerate(data_loader):
            feats, labels = feats.to(device), labels.to(device)
            
            optimizer.zero_grad()
            outputs,_ = model(feats)

            _, pred_labels = torch.max(F.softmax(outputs, dim=1), 1)
            pred_labels = pred_labels.view(-1)

            loss = criterion(outputs, labels.long())

            accuracy += torch.sum(torch.eq(pred_labels, labels)).item()
            total += len(labels)
            train_loss.extend([loss.item()]*feats.size()[0])
            
            loss.backward()
            optimizer.step()
            
            avg_loss += loss.item()

            if batch_num % 1000 == 999:
                print('Epoch: {}\tBatch: {}\tAvg-Loss: {:.4f}'.format(epoch+1, batch_num+1, avg_loss/1000))
                avg_loss = 0.0    
            
            torch.cuda.empty_cache()
            del feats
            del labels
            del loss
            del outputs
            del pred_labels

        torch.save(model.state_dict(), '/content/drive/My Drive/model2.pt')

        with torch.no_grad():
            model.eval()
            test_loss = []
            test_accuracy = 0
            test_total = 0

            for t_batch_num, (t_feats, t_labels) in enumerate(test_loader):
                t_feats, t_labels = t_feats.to(device), t_labels.to(device)
                t_outputs,_ = model(t_feats)

                _, t_pred_labels = torch.max(F.softmax(t_outputs, dim=1), 1)
                t_pred_labels = t_pred_labels.view(-1)

                t_loss = criterion(t_outputs, t_labels.long())

                test_accuracy += torch.sum(torch.eq(t_pred_labels, t_labels)).item()
                test_total += len(t_labels)
                test_loss.extend([t_loss.item()]*t_feats.size()[0])
                torch.cuda.empty_cache()
                del t_feats
                del t_labels
                del t_loss
                del t_pred_labels
                del t_outputs
        
        model.train()
        end_time = time.time()
            
        if task == 'Classification':

            print('Train Loss: {:.4f}\tTrain Accuracy: {:.4f}\tVal Loss: {:.4f}\tVal Accuracy: {:.4f}\t Time: {:.4f}s'.format(np.mean(train_loss),accuracy/total, np.mean(test_loss),test_accuracy/test_total,end_time - start_time))
        else:
            test_verify(model, test_loader) 


In [0]:
def init_weights(m):
    if type(m) == nn.Conv2d or type(m) == nn.Linear:
        torch.nn.init.xavier_uniform_(m.weight.data)

In [0]:
numEpochs = 12
model = Resnet50()
model.apply(init_weights)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(),lr=0.001)
#scheduler = StepLR(optimizer, step_size=5, gamma=0.1)
scheduler = ReduceLROnPlateau(optimizer, mode='max', factor=0.1, patience=1, verbose=True)
device = torch.device("cuda")
model.to(device)

In [0]:
def test_classify(model,test_loader):
  pred = []
  with torch.no_grad():
      model.eval()
      for batch_num, feats in enumerate(test_loader):
              feats = feats.to(device)
              
              outputs = model(feats)

              _, pred_labels = torch.max(F.softmax(outputs, dim=1), 1)
              pred_labels = pred_labels.view(-1)
              pred.append(train_set.classes[pred_labels.cpu().numpy().tolist()[0]])

  return pred

In [0]:
pred = test_classify(model,test_loader)

In [0]:
with open('/content/drive/My Drive/mbarman_class.csv', 'w') as w:
    w.write('Id,Category\n')
    for i in range(len(pred)):
            w.write(str(test_file[i])+','+str(pred[i])+'\n')
            if i==0:
              print(str(test_file[i])+','+str(pred[i])+'\n')

# **Verification**

In [0]:
!unzip '/content/drive/My Drive/validation_verification.zip'
!unzip '/content/drive/My Drive/test_verification.zip'

In [0]:
ver_dev_file = open("/content/drive/My Drive/validation_trials_verification.txt","r").read().split('\n')
ver_test_file = open("/content/drive/My Drive/test_trials_verification.txt","r").read().split('\n')

In [0]:
class VerificationDataset(Dataset):
    def __init__(self, file_list1,file_list2):
        self.file_list1 = file_list1
        self.file_list2 = file_list2

    def __len__(self):
        return len(self.file_list1)

    def __getitem__(self, index):
        img1 = Image.open(self.file_list1[index])
        img1 = torchvision.transforms.ToTensor()(img1)
        img2 = Image.open(self.file_list2[index])
        img2 = torchvision.transforms.ToTensor()(img2)
        return img1,img2

In [0]:
ver_col1 = []
ver_col2 = []
ver_labels = []

for i in range(len(ver_dev_file)-1):
     
      p = ver_dev_file[i].split()

      ver_col1.append('/content/test_verification/'+p[0]) 
      ver_col2.append('/content/test_verification/'+p[1]) 
      ver_labels.append(int(p[2])) 
   

In [0]:
ver_set = VerificationDataset(ver_col1,ver_col2)
ver_loader = DataLoader(ver_set, batch_size=1024, shuffle=False, num_workers=8, drop_last=False)

In [0]:
from sklearn.decomposition import KernelPCA
import sklearn

dist=[]
with torch.no_grad():
      
      model.eval()
      for batch_num, (feats1,feats2) in enumerate(ver_loader):
              feats1, feats2 = feats1.to(device), feats2.to(device)
              
              output1 = model(feats1)
              output2 = model(feats2)
     
              embed_train = np.concatenate((output1.cpu(), output2.cpu()), axis=0)
              pca_model = KernelPCA(n_components=100,kernel='cosine',n_jobs=-1)
              pca_model.fit(embed_train)
              embed1 = pca_model.transform(output1.cpu())
              embed2 = pca_model.transform(output2.cpu())
             
              for i in range(embed1.shape[0]):
                  
                  dist2 = sklearn.metrics.pairwise.cosine_similarity(np.expand_dims(embed1[i], axis=0),np.expand_dims(embed2[i], axis=0))
                 
                  dist.append(dist2[0][0])

In [0]:
with open('/content/drive/My Drive/mbarman_ver.csv', 'w') as w:
    w.write('trial,score\n')
    for i in range(len(dist)):
            w.write(str(ver_test_file[i].split()[0])+' '+str(ver_test_file[i].split()[1])+','+str(dist[i])+'\n')
            if i==0:
              print(str(ver_test_file[i].split()[0])+' '+str(ver_test_file[i].split()[1])+','+str(dist[i])+'\n')

#**Keras**


**ResNet50**

In [0]:
def bottleneck(x,f,s):

      shortcut = x
      out = tf.keras.layers.Conv2D(f,  kernel_size = (1, 1), strides = (1,1), use_bias=False)(x)
      out = tf.keras.layers.BatchNormalization()(out)
      out = tf.keras.activations.relu(out)
    
      out = tf.keras.layers.Conv2D(f,  kernel_size = (3, 3), strides = (s,s), padding = 'same', use_bias=False)(out)
      out = tf.keras.layers.BatchNormalization()(out)
      out = tf.keras.activations.relu(out)

      out = tf.keras.layers.Conv2D(f*4,  kernel_size = (1, 1), strides = (1,1), use_bias=False)(out)
      out = tf.keras.layers.BatchNormalization()(out)
      out = tf.keras.activations.relu(out)
     
      shortcut = tf.keras.layers.Conv2D(f*4,  kernel_size = (1, 1), strides = (s,s), use_bias=False)(shortcut)
      shortcut = tf.keras.layers.BatchNormalization()(shortcut)
    
      out = tf.keras.layers.Add()([out, shortcut])
      out = tf.keras.activations.relu(out)
      
      return out

def identity(x,f):

      shortcut = x
      out = tf.keras.layers.Conv2D(f,  kernel_size = (1, 1), strides = (1,1), use_bias=False)(x)
      out = tf.keras.layers.BatchNormalization()(out)
      out = tf.keras.activations.relu(out)

      out = tf.keras.layers.ZeroPadding2D()(out)
      out = tf.keras.layers.Conv2D(f,  kernel_size = (3, 3), strides = (1,1), use_bias=False)(out)
      out = tf.keras.layers.BatchNormalization()(out)
      out = tf.keras.activations.relu(out)

      out = tf.keras.layers.Conv2D(f*4,  kernel_size = (1, 1), strides = (1,1), use_bias=False)(out)
      out = tf.keras.layers.BatchNormalization()(out)
      out = tf.keras.activations.relu(out)

      out = tf.keras.layers.Add()([out, shortcut])
      out = tf.keras.activations.relu(out)
      
      return out

def ResNet50(input_shape = (32, 32, 3), classes = 2300):

      input = tf.keras.Input(shape=input_shape)
      x = tf.keras.layers.Conv2D(64,  use_bias=False, kernel_size = (3, 3), strides = (1, 1), padding = 'same')(input)
      x = tf.keras.layers.BatchNormalization()(x)
      x = tf.keras.activations.relu(x)

      x = bottleneck(x, f = 64, s = 1)
      x = identity(x, f = 64)
      x = identity(x, f = 64)

      x = bottleneck(x, f = 128, s = 2)
      x = identity(x, f = 128)
      x = identity(x, f = 128)
      x = identity(x, f = 128)

     
      x = bottleneck(x, f = 256, s = 2)
      x = identity(x, f = 256)
      x = identity(x, f = 256)
      x = identity(x, f = 256)
      x = identity(x, f = 256)
      x = identity(x, f = 256)


      x = bottleneck(x, f = 512, s = 2)
      x = identity(x, f = 512)
      x = identity(x, f = 512)

      x = tf.keras.layers.Flatten()(x)

      x = tf.keras.layers.Dense(classes, use_bias=False, activation='softmax')(x)
      model = tf.keras.models.Model(inputs = input, outputs = x, name='ResNet50')
      return model

In [0]:
model = ResNet50()