In [None]:
import os
import cv2
import random
from tqdm import tqdm
from PIL import Image
import matplotlib.pyplot as plt

import numpy as np
import pandas as pd

import torch
import torchvision
from torch import nn
import torch.cuda.amp as amp
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import torchvision.transforms as T
import torch.nn.functional as F

from sklearn.model_selection import StratifiedKFold, train_test_split

In [None]:
class cfg:

    seed = 29
    random_state = 29

    epochs = 5
    batch_size = 4
    
    image_folder = 'image_data/'

In [None]:
def split_dataset(directory, split=0.9):
    folders = os.listdir(directory)
    num_train = int(len(folders)*split)
    
    random.shuffle(folders)
    train_split, valid_split = {}, {}
    
    # Creating Train-split
    for folder in folders[:num_train]:
        num_files = len(os.listdir(os.path.join(directory, folder)))
        train_split[folder] = num_files
    
    # Creating Test-split
    for folder in folders[num_train:]:
        num_files = len(os.listdir(os.path.join(directory, folder)))
        valid_split[folder] = num_files  

    print(f'Train split: {len(train_split)}, Valid split: {len(valid_split)}')
    return train_split, valid_split

train, valid = split_dataset(cfg.image_folder)

In [None]:
class Face_dataset(Dataset):

    def __init__(self, img_directory, folder_split):
        self.img_directory = img_directory
        self.folder_split = folder_split
        
    def __getitem__(self, idx):

        folders = list(self.folder_split.keys())
        folder1 = random.choice(folders)

        path1 = os.path.join(self.img_directory, str(folder1))
        files = list(os.listdir(path1))
        img1_idx = random.randint(0, len(files)-1)
        img_path1 = os.path.join(path1, f"{img1_idx}.jpg")
        img1 = Image.open(img_path1)

        if idx % 2 == 0:
            img2_idx = random.randint(0, len(files)-1)
            while img2_idx == img1_idx:
                img2_idx = random.randint(0, len(files)-1)
            img_path2 = os.path.join(path1, f"{img2_idx}.jpg")
            img2 = Image.open(img_path2)
            target = torch.tensor(1, dtype=torch.float)
        else:
            folder2 = random.choice(folders)
            while folder2 == folder1:
                folder2 = random.choice(folders)
            path2 = os.path.join(self.img_directory, str(folder2))
            files = list(os.listdir(path2))
            num_files = len(files)
            for i in range(num_files):
                img_path2 = os.path.join(path2, f"{i}.jpg")
                img2 = Image.open(img_path2)
                target = torch.tensor(0, dtype=torch.float)

        img1 = T.ToTensor()(img1)
        img2 = T.ToTensor()(img2)
            
        return img1, img2, target
    
    def __len__(self):
        return len(self.folder_split)

In [None]:
train_dataset = Face_dataset(cfg.image_folder, train)
valid_dataset = Face_dataset(cfg.image_folder, valid)

In [None]:
train_loader = DataLoader(train_dataset,
                         batch_size = cfg.batch_size,
                         shuffle = True,
                         num_workers = 0)

valid_loader = DataLoader(valid_dataset,
                         batch_size = cfg.batch_size * 2,
                         shuffle = False,
                         num_workers = 0)

In [None]:
img1, img2, target = train_dataset.__getitem__(4)
img1 = img1.permute(1,2,0)
img2 = img2.permute(1,2,0)

fig, axes = plt.subplots(1, 2, figsize=(10, 5)) 

axes[0].imshow(img1)
axes[0].set_title('Image 1')  

axes[1].imshow(img2)
axes[1].set_title('Image 2')  

print(target)

plt.show()

In [None]:
class SiameseNet(nn.Module):
    def __init__(self):
        super(SiameseNet, self).__init__()

        self.resnet = torchvision.models.resnet18(weights=torchvision.models.ResNet18_Weights)
        out_fea = list(self.resnet.modules())[-1].out_features

        self.cls_head = nn.Sequential(
            nn.Dropout(p=0.5),
            nn.Linear(out_fea*2, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Dropout(p=0.5),
            nn.Linear(512, 128),
            nn.BatchNorm1d(128),
            nn.Sigmoid(),
            nn.Dropout(p=0.5),
            nn.Linear(128, 1),
            nn.Sigmoid()
            )

    def forward_once(self, x):
        out = self.resnet(x)
        out = out.view(out.shape[0], -1)
        return out

    def forward(self, input1, input2):
        # get two images' features
        output1 = self.forward_once(input1)
        output2 = self.forward_once(input2)
        #print(output1.shape, output2.shape)

        # concatenate both images' features
        output = torch.cat((output1, output2), 1)
        #print(output.shape)

        output = self.cls_head(output)
        
        return output

In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

model = SiameseNet().to(device)
criterion = nn.BCELoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=0.0001)

In [None]:
def train(model, device, dataloader, optimizer, criterion):
    model.train()
    running_loss = 0.0
    
    #scaler = torch.cuda.amp.GradScaler()
    
    for (img1, img2, target) in tqdm(dataloader, total=len(dataloader)):
        img1, img2, target = img1.to(device), img2.to(device), target.to(device)
        
        optimizer.zero_grad()
        output = model(img1, img2).squeeze()
        loss = criterion(output, target)
            
        loss.backward()
        optimizer.step()
            
        running_loss  += loss.item()
        
    train_loss = running_loss / len(dataloader)
    print(f"\nTrain_loss: {train_loss}")

In [None]:
def evaluate(model, device, dataloader, criterion):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    outputs = []
    
    with torch.no_grad():
        for (img1, img2, target) in tqdm(dataloader, total=len(dataloader)):
            img1, img2, target = img1.to(device), img2.to(device), target.to(device)
            output = model(img1, img2).squeeze()
            loss = criterion(output, target) 
            correct += torch.count_nonzero(target == (output > 0.5)).item()
            total += len(target)
            running_loss += loss.item()
            outputs.append(output)
            
    valid_loss = running_loss / len(dataloader)
    accuracy = correct / total

    print(f'\nTest set: Average loss: {valid_loss:.4f}, Accuracy: {accuracy}\n')
    return outputs

In [None]:
epochs = 5
for epoch in range(1, epochs+ 1):
    print(f"Starting epoch: {epoch}")
    train(model, device, train_loader, optimizer, criterion)
    outputs = evaluate(model, device, valid_loader, criterion)