In [23]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
import os
import random
from PIL import Image
import copy
from tqdm import tqdm
from itertools import combinations
from collections import defaultdict
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torch.utils.data import WeightedRandomSampler
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import transforms
import torch.nn.functional as F

## Loading the dataset.
Please change the path.

In [24]:
faces = {}
dataset_path = '/kaggle/input/iiitb-faces/IIITB-FACES'
sub_folders = os.listdir(dataset_path)
for sub_folder in sub_folders:
    image_paths = os.listdir(os.path.join(dataset_path, sub_folder))
    for image_path in image_paths:
        image_path_actual = os.path.join(dataset_path, sub_folder, image_path)
        faces[image_path_actual] = cv2.resize(cv2.cvtColor(cv2.imread(image_path_actual), cv2.COLOR_BGR2GRAY), (400, 400))

In [26]:
dataset_dict = {}
for key in faces.keys():
    class_key = str(key.split('/')[5])
    if class_key in list(dataset_dict.keys()):
        dataset_dict[class_key] = dataset_dict[class_key] + [Image.open(key)]
    else:
        dataset_dict[class_key] = [Image.open(key)]

## Performing the train-test image split

In [27]:
dataset_dict_test_split = {}
dataset_keys = list(dataset_dict.keys())
dataset_dict_test_split[str(dataset_keys[len(dataset_keys)-1])] = dataset_dict[str(dataset_keys[len(dataset_keys)-1])]
dataset_dict_new = {}
for i in range(len(dataset_keys)-1):
    dataset_dict_new[dataset_keys[i]] = dataset_dict[dataset_keys[i]]
dataset_dict_whole = copy.deepcopy(dataset_dict)
dataset_dict = copy.deepcopy(dataset_dict_new)
def split_dataset(dataset_dict, test_ratio=0.2):
    train_data = {}
    test_data = {}

    for class_name, image_paths in dataset_dict.items():
        num_samples = len(image_paths)
        num_test_samples = int(test_ratio * num_samples)
        random.shuffle(image_paths)
        train_data[class_name] = image_paths[:-num_test_samples]
        test_data[class_name] = image_paths[-num_test_samples:]

    return train_data, test_data

train_data_dict, test_data_dict = split_dataset(dataset_dict)

## Writing our custom dataloaders

In [29]:
class SiameseDataset(Dataset):
    def __init__(self, data_dict, transform=None):
        self.data_dict = data_dict
        self.transform = transform
        self.class_pairs = []
        for class1 in self.data_dict.keys():
            for class2 in self.data_dict.keys():
                self.class_pairs.append([class1, class2])
        self.samples = self.generate_samples()

	#Used to generate pairs of data
    def generate_samples(self):
        samples = []
        for class1, class2 in self.class_pairs:
            for img1_path in self.data_dict[class1]:
                for img2_path in self.data_dict[class2]:
                    if class1 == class2:
                        label = 1
                    else:
                        label = 0
                    samples.append((img1_path, img2_path, label))
        return samples

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, index):
        img1_path, img2_path, label = self.samples[index]

        img1 = img1_path
        img2 = img2_path

        if self.transform is not None:
            img1 = self.transform(img1)
            img2 = self.transform(img2)

        return img1, img2, label
    
	#This function is specifically made for weighted sampling. This will be used later
    def get_labels(self):
        labels = []
        for index in range(0, self.__len__()):
            labels.append(self.samples[index][2])
        return labels

class SiameseDataset_Testing(Dataset):
    def __init__(self, data_dict_test, data_dict_whole, transform=None):
        self.data_dict_test = data_dict_test
        self.data_dict_whole = data_dict_whole
        self.transform = transform
        self.class_pairs = []
        for class1 in self.data_dict_test.keys():
            for class2 in self.data_dict_whole.keys():
                self.class_pairs.append([class1, class2])
        self.samples = self.generate_samples()

	#Used to generate pairs of data
    def generate_samples(self):
        samples = []
        for class1, class2 in self.class_pairs:
            for img1_path in self.data_dict_test[class1]:
                for img2_path in self.data_dict_whole[class2]:
                    if class1 == class2:
                        label = 1
                    else:
                        label = 0
                    samples.append((img1_path, img2_path, label))
        return samples

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, index):
        img1_path, img2_path, label = self.samples[index]
		
        img1 = img1_path
        img2 = img2_path

        if self.transform is not None:
            img1 = self.transform(img1)
            img2 = self.transform(img2)

        return img1, img2, label
    
	#This function is specifically made for weighted sampling. This will be used later
    def get_labels(self):
        labels = []
        for index in range(0, self.__len__()):
            labels.append(self.samples[index][2])
        return labels

In [30]:
# Define transformations
transform = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.ToTensor()
])

#Finding the weights/probabilites of the each sample in the train dataset, by using the frequency of the label.
train_dataset = SiameseDataset(train_data_dict, transform=transform)
class_counts = [0, 0]
for label in train_dataset.get_labels():
    class_counts[label] += 1
print(class_counts)
weights = [1.0 / class_counts[label] for label in train_dataset.get_labels()]
#Weighted Sampling for counteracting the label/class bias in the dataset
sampler = WeightedRandomSampler(
     weights=weights,
     num_samples=len(train_dataset),
     replacement=True
)
train_loader = DataLoader(train_dataset, sampler=sampler, batch_size=128)

test_dataset = SiameseDataset_Testing(test_data_dict, dataset_dict, transform=transform)
test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False)

test_dataset_out_of_sample = SiameseDataset_Testing(dataset_dict_test_split, dataset_dict_whole, transform=transform)
test_loader_out_of_sample = DataLoader(test_dataset_out_of_sample, batch_size=128, shuffle=False)

[452460, 9940]


## Defining our model architecture

In [31]:
class SiameseNetwork(nn.Module):
    def __init__(self):
        super(SiameseNetwork, self).__init__()

        self.conv_layers = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=5),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2,2),
            nn.Conv2d(32, 64, kernel_size=5),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2,2),
            nn.Conv2d(64, 128, kernel_size=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2,2),
            nn.Conv2d(128, 256, kernel_size=2),
            nn.ReLU(inplace=True)
        )

        self.decode = nn.Sequential(
            nn.ConvTranspose2d(256, 128,
                               kernel_size=3, stride=2),
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(128, 64,
                               kernel_size=4, stride=1),
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(64, 32,
                               kernel_size=5, stride=2),
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(32, 3,
                               kernel_size=4, stride=2),
            nn.Sigmoid()
        )

        self.fc = nn.Sequential(
            nn.Linear(43264, 512),
            nn.ReLU(inplace=True),
            nn.Linear(512, 256),
            nn.ReLU(inplace=True),
            nn.Linear(256, 128),
            nn.ReLU(inplace=True)
        )

        self.dropout = nn.Dropout(p=0.2)

    def forward_one(self, x):
        x = self.conv_layers(x)
        decoded_x = self.decode(x)
        # Flatten
        x = x.view(x.size()[0], -1)  
        x = self.fc(x)
        return x, decoded_x

    def forward(self, input1, input2):
        output1, decoded1 = self.forward_one(input1)
        output2, decoded2 = self.forward_one(input2)

        return output1, output2, decoded1, decoded2

siamese_net = SiameseNetwork()
print(siamese_net)

SiameseNetwork(
  (conv_layers): Sequential(
    (0): Conv2d(3, 32, kernel_size=(5, 5), stride=(1, 1))
    (1): ReLU(inplace=True)
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Conv2d(32, 64, kernel_size=(5, 5), stride=(1, 1))
    (4): ReLU(inplace=True)
    (5): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (6): Conv2d(64, 128, kernel_size=(2, 2), stride=(1, 1))
    (7): ReLU(inplace=True)
    (8): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (9): Conv2d(128, 256, kernel_size=(2, 2), stride=(1, 1))
    (10): ReLU(inplace=True)
  )
  (decode): Sequential(
    (0): ConvTranspose2d(256, 128, kernel_size=(3, 3), stride=(2, 2))
    (1): ReLU(inplace=True)
    (2): ConvTranspose2d(128, 64, kernel_size=(4, 4), stride=(1, 1))
    (3): ReLU(inplace=True)
    (4): ConvTranspose2d(64, 32, kernel_size=(5, 5), stride=(2, 2))
    (5): ReLU(inplace=True)
    (6): ConvTranspose2d(32, 3, k

## Training

Also, during evaluation we find the un-normalised classification accuracy as well as the normalised classification accuracy.
Un-Normalised classification accuracy is nothing but classification accuracy, number(correct_predictions)/number(predictions)
Normalised classification accuracy is the average classification accuracy for every class:
	Normalsied Classification accuracy = (number(correct_class1_predicitons)/number(class1) + number(correct_class2_predicitons)/number(class2))/2

In [32]:
class ContrastiveLoss(torch.nn.Module):
    def __init__(self, margin=0.2):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, output1, output2, label):
        euclidean_distance = F.pairwise_distance(output1, output2)
        loss_contrastive = torch.mean((label) * torch.pow(euclidean_distance, 2) +
                                      (1-label) * torch.pow(torch.clamp(self.margin - euclidean_distance, min=0.0), 2))
        return loss_contrastive

criterion = ContrastiveLoss()
criterion_reconstruction = nn.MSELoss()
optimizer = optim.Adam(siamese_net.parameters(), lr=0.00001)

if torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")
print(device)

# Training loop
siamese_net.to(device)
num_epochs = 1
for epoch in range(num_epochs):
    siamese_net.train()
    total_loss = 0.0
    
    progress_bar = tqdm(train_loader)
    for img1, img2, label in progress_bar:
        img1, img2, label = img1.to(device), img2.to(device), label.to(device)
        optimizer.zero_grad()
        output1, output2, decoded1, decoded2 = siamese_net(img1, img2)
        loss_contrastive = criterion(output1, output2, label.float())
        loss_reconstruction = 0.00001*criterion_reconstruction(img1, decoded1) + 0.00001*criterion_reconstruction(img2, decoded2)
        loss = loss_contrastive + loss_reconstruction
        loss.backward()
        optimizer.step()
        progress_bar.set_description(f'Loss: {loss.item()}')
        total_loss += loss.item()
    
    print(f"Epoch [{epoch + 1}/{num_epochs}] Loss: {total_loss / len(train_loader):.4f}")

    # Testing loop
    siamese_net.eval()
    correct = 0
    total = 0
    correct_0 = 0
    correct_1 = 0
    total_0 = 0
    total_1 = 0
    with torch.no_grad():
        for img1, img2, label in test_loader:
            img1, img2, label = img1.to(device), img2.to(device), label.to(device)
            output1, output2, decoded1, decoded2 = siamese_net(img1, img2)
            predicted = (F.pairwise_distance(output1, output2) <= 0.2).float()
            correct += (predicted == label).sum().item()
            correct_0 += ((label.float() == 0.0) & (predicted == 0.0)).sum().item()
            correct_1 += ((label.float() == 1.0) & (predicted == 1.0)).sum().item()
            total_1 += (label.float() == 1.0).sum().item()
            total_0 += (label.float() == 0.0).sum().item()
            total += label.size(0)

        accuracy = (correct / total) * 100.0
        print(f"Test Accuracy: {accuracy:.2f}%")
        norm_acc = (correct_0/total_0 + correct_1/total_1)/2 * 100
        print(f'Test Normalised Accuracy: {norm_acc:.2f}%')

        correct = 0
        total = 0
        correct_0 = 0
        correct_1 = 0
        total_0 = 0
        total_1 = 0
        for img1, img2, label in test_loader_out_of_sample:
            img1, img2, label = img1.to(device), img2.to(device), label.to(device)
            output1, output2, decoded1, decoded2 = siamese_net(img1, img2)
            predicted = (F.pairwise_distance(output1, output2) <= 0.2).float()
            correct += (predicted == label).sum().item()
            correct_0 += ((label.float() == 0.0) & (predicted == 0.0)).sum().item()
            correct_1 += ((label.float() == 1.0) & (predicted == 1.0)).sum().item()
            total_1 += (label.float() == 1.0).sum().item()
            total_0 += (label.float() == 0.0).sum().item()
            total += label.size(0)
        accuracy = (correct / total) * 100.0
        print(f"Out of sample Test Accuracy out of sample: {accuracy:.2f}%")
        norm_acc = (correct_0/total_0 + correct_1/total_1)/2 * 100
        print(f'Out of sample Test Normalised Accuracy: {norm_acc:.2f}%')

cuda


Loss: 3.4876770769187715e-06: 100%|██████████| 3613/3613 [47:40<00:00,  1.26it/s]


Epoch [1/1] Loss: 0.0004
Test Accuracy: 97.32%
Test Normalised Accuracy: 98.30%
Out of sample Test Accuracy out of sample: 90.38%
Out of sample Test Normalised Accuracy: 95.09%


## Saving the model

In [33]:
EPOCH = 1
PATH = "model_ckpt.pt"

torch.save({
            'epoch': EPOCH,
            'model_state_dict': siamese_net.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            }, PATH)