In [1]:
# Imports
import torch
import torchvision
import torch.nn as nn 
import torch.optim as optim 
import torch.nn.functional as F


from torch.utils.data import DataLoader 


from torch.utils.data import Dataset 

import torchvision.transforms as transforms


import glob
import numpy as np
import cv2

import pandas as pd

import os
# https://arxiv.org/pdf/1503.03832.pdf

# Custom Dataset Class

In [2]:
class LFW_Dataset(Dataset):
    
    def __init__(self):
        
        # This is a dictionary with key as class name and values as a list of images path
        self.data = {}
        
        # class list i.e. IDs names
        self.classes = []
        
        # home Directory of dataset 
        self.imgs_path = "dataset/"
        
        # gets all folders name inside the dataset directory
        names_list = glob.glob(self.imgs_path + "*")
        
        #considering each folder as a unique class
        for class_path in names_list:
            #Spliting Class name from Path
            class_name = class_path.split("\\")[-1]
            
            # Only Select those classes with more than 1 image
            # if class have more than one Image 
            if (len(glob.glob(class_path + "\\*.jpg")) > 1 ):
        
                #generating list of classes
                self.classes.append(class_name)
                
                images = []
                for img_path in glob.glob(class_path + "\\*.jpg"):
                    images.append(img_path)
                self.data[class_name] = images
     
    # return total length of data
    def __len__(self):
        return len(self.data)
      
    # This function must return a single datapoint when asked 
    # datapoint is a triplet, so it must return a triplet
    # And i am returning a triplet 
    def __getitem__(self, idx):
        # randomly pick two random classes
        positive_class, negative_class = np.random.choice(self.classes, size=2, replace=False)
        
        # randomly pick two images from positive_class
        anchor_image_path , positive_image_path = np.random.choice(self.data[positive_class], size=2, replace=False)
    
        # randomly pick one image from negative class
        negative_image_path = np.random.choice(self.data[negative_class])
        
        base_path = r'C:\Users\Tayyab\OneDrive\University\MS\Fast\2nd Semester\DL\Project\project'
        
        
        # Anchor Image read #################################################################
        anchor_image = cv2.imread(os.path.join(base_path,anchor_image_path))
        # Transformations on anchor
        anchor_image = torch.from_numpy(anchor_image)
        anchor_image = anchor_image.permute(2, 0, 1)
        
        #positive Image read #################################################################
        positive_image = cv2.imread(os.path.join(base_path,positive_image_path))
        # Transformations on positive
        positive_image = torch.from_numpy(positive_image)
        positive_image = positive_image.permute(2, 0, 1)
        
        # Negative Image read ################################################################
        negative_image = cv2.imread(os.path.join(base_path,negative_image_path))
        # Transformations on negative
        negative_image = torch.from_numpy(negative_image)
        negative_image = negative_image.permute(2, 0, 1)
        
        ######################################################################################
        # create a triplet tuple
        triplet = (anchor_image, positive_image, negative_image)  
        
        #returns immutable tuple
        return triplet

# Hyper parameters

In [3]:
batch_size = 30
epochs = 20
learning_rate = 0.001

# DataLoader

In [4]:
dataset = LFW_Dataset()
dataLoader = DataLoader(dataset, batch_size, shuffle = True)

# Deep Neural Network

In [5]:
# import torch.nn as nn

class ZF_Net(nn.Module):
    def __init__(self):
        super().__init__()

        self.conv1 = nn.Conv2d(in_channels = 3   , out_channels = 64 , kernel_size = 7 , stride = 2 ) 
        self.pool1 = nn.MaxPool2d((3, 3), stride=1)
        
        self.conv2 = nn.Conv2d(in_channels = 64 , out_channels = 192  , kernel_size = 3 , stride = 2 ) 
        self.pool2 = nn.MaxPool2d((3, 3), stride=1)
        
        self.conv3 = nn.Conv2d(in_channels = 192  , out_channels = 384 , kernel_size = 3 , stride = 1) 
        self.pool3 = nn.MaxPool2d((3, 3), stride=1)

        
        self.conv4 = nn.Conv2d(in_channels = 384 , out_channels = 256 , kernel_size = 3 , stride = 1 ) 
        self.conv5 = nn.Conv2d(in_channels = 256 , out_channels = 256 , kernel_size = 3 , stride = 1 ) 
        
        self.conv6 = nn.Conv2d(in_channels = 256 , out_channels = 64 , kernel_size = 3 , stride = 1 )  
        self.pool4 = nn.MaxPool2d((3, 3), stride=2)        
        
        self.fc1 = nn.Linear(64* 23*23, 1*32*128) 
        self.fc2 = nn.Linear(1*32*128 ,16*128)  
        self.L2  = nn.Linear(1*16*128 , 1*128)

    def forward(self, x):
        
        out = self.conv1(x)
        out = self.pool1(out)
    
        out = self.conv2(out)
        out = self.pool2(out)
        
        out = self.conv3(out)
        out = self.pool3(out)
    
        out = self.conv4(out)
        out = self.conv5(out)
        out = self.conv6(out)
        
        out = self.pool4(out)
        
        out = torch.flatten(out, 1)
        
        out = self.fc1(out)
        out = F.relu(out)
        
        out = self.fc2(out)
        out = F.relu(out)
        
        out = self.L2(out)

        return out
    
network = ZF_Net()
network

ZF_Net(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2))
  (pool1): MaxPool2d(kernel_size=(3, 3), stride=1, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(64, 192, kernel_size=(3, 3), stride=(2, 2))
  (pool2): MaxPool2d(kernel_size=(3, 3), stride=1, padding=0, dilation=1, ceil_mode=False)
  (conv3): Conv2d(192, 384, kernel_size=(3, 3), stride=(1, 1))
  (pool3): MaxPool2d(kernel_size=(3, 3), stride=1, padding=0, dilation=1, ceil_mode=False)
  (conv4): Conv2d(384, 256, kernel_size=(3, 3), stride=(1, 1))
  (conv5): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1))
  (conv6): Conv2d(256, 64, kernel_size=(3, 3), stride=(1, 1))
  (pool4): MaxPool2d(kernel_size=(3, 3), stride=2, padding=0, dilation=1, ceil_mode=False)
  (fc1): Linear(in_features=33856, out_features=4096, bias=True)
  (fc2): Linear(in_features=4096, out_features=2048, bias=True)
  (L2): Linear(in_features=2048, out_features=128, bias=True)
)

# Loss function and optimizer

In [6]:
# Defining Loss function 
reference_loss_function = nn.TripletMarginLoss(margin=2.5)
#Defining Optimizer
optimizer = optim.SGD(network.parameters(),lr = learning_rate)

## Custom Loss Function

In [7]:
class Triplet_Loss(nn.Module):
    def __init__(self, weight=None, size_average=True):
        super(Triplet_Loss, self).__init__()

    def forward(self, anchor,positive, negative, margin):

        # Euclidean distance
        d_A_P = torch.sqrt(torch.sum(torch.square(anchor-positive))) # torch.cdist(anchor,positive)**2 # torch.sum(((anchor - positive)**2))
        d_A_N = torch.sqrt(torch.sum(torch.square(anchor-negative))) #torch.cdist(anchor,negative)**2 #torch.sum(((anchor- negative)**2))
       
        # Calculating Loss
        loss = torch.maximum((d_A_P - d_A_N) + margin,torch.tensor(0.0))
        
        # Calculating Mean so siamese Network
        loss = torch.mean(loss)

        return loss
    
loss_function = Triplet_Loss()

# Train the network

In [None]:
for epoch in range(0,epochs):  # loop over the dataset multiple times  
    for i, tempData in enumerate(dataLoader,0):
        
        anchor = tempData[0].float()
        positive = tempData[1].float()
        nagative = tempData[2].float()
        
        
        #Step 1: Predition:  Sending All images one by one to network
        
        f_Anchor = network(anchor)
        f_Positive = network(positive)
        f_Negative = network(nagative)
        
        # All three outputs must be 128 x 1
        
        # Step 2 Loss : Sending all 3 matrixes to compute loss
        loss = loss_function(f_Anchor ,f_Positive ,f_Negative,2.5)
        loss_reference = reference_loss_function(f_Anchor ,f_Positive ,f_Negative)
        
        #step 3 Backward Pass
        loss.backward(retain_graph=True)
        loss_reference.backward()
        
        #step 4 parameters Update
        optimizer.step()
        
        #step 5 Reset the gradients to zero for next forward pass
        optimizer.zero_grad()
        
        print("Epoch :", epoch+1, ' Batch: ', i , ' Loss : ', loss.item(), 'Ref Loss : ', loss_reference.item() )
    #Saving model after everry batch
    PATH = './models/DLModel_{}_{}.pth'.format(epoch+1,i)
    torch.save(network.state_dict(), PATH)
print('Finished Training')

Epoch : 1  Batch:  0  Loss :  3.5032100677490234 Ref Loss :  2.610060930252075
Epoch : 1  Batch:  1  Loss :  0.0 Ref Loss :  1.7938430309295654
Epoch : 1  Batch:  2  Loss :  0.7141342163085938 Ref Loss :  2.3509488105773926
Epoch : 1  Batch:  3  Loss :  0.0 Ref Loss :  5.215240955352783
Epoch : 1  Batch:  4  Loss :  0.0 Ref Loss :  2.674360752105713
Epoch : 1  Batch:  5  Loss :  0.0 Ref Loss :  1.8178126811981201
