In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
import random
from PIL import Image
import PIL.ImageOps

import torchvision
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset, random_split
import torchvision.utils
import torch
from torch.autograd import Variable
import torch.nn as nn
from torch import optim
import torch.nn.functional as F
from sklearn.metrics import roc_auc_score

# Hyperparameters and paths
train_dataset_path = " "
val_dataset_path = " "
batch_size = 8
learning_rate = 0.0005
num_epochs = 150
margin = 2.0

# Plotting data
def show_plot(iteration, train_loss, val_loss):
    plt.plot(iteration, train_loss, label='Training Loss')
    plt.plot(iteration, val_loss, label='Validation Loss')
    plt.xlabel('Iterations')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()

def show_test_plot(iteration, test_data, label):
    plt.plot(iteration, test_data, label='Test ' + label)
    plt.xlabel('Iterations')
    plt.ylabel(label)
    plt.legend()
    plt.show()

class SiameseNetworkDataset(Dataset):
    def __init__(self, imageFolderDataset, transform=None):
        self.imageFolderDataset = imageFolderDataset
        self.transform = transform

    def __getitem__(self, index):
        img0_tuple = random.choice(self.imageFolderDataset.imgs)

        # We need to approximately 50% of images to be in the same class
        should_get_same_class = random.randint(0, 1)
        if should_get_same_class:
            while True:
                # Look until the same class image is found
                img1_tuple = random.choice(self.imageFolderDataset.imgs)
                if img0_tuple[1] == img1_tuple[1]:
                    break
        else:
            while True:
                # Look until a different class image is found
                img1_tuple = random.choice(self.imageFolderDataset.imgs)
                if img0_tuple[1] != img1_tuple[1]:
                    break

        img0 = Image.open(img0_tuple[0])
        img1 = Image.open(img1_tuple[0])

        img0 = img0.convert("L")
        img1 = img1.convert("L")

        if self.transform is not None:
            img0 = self.transform(img0)
            img1 = self.transform(img1)

        return img0, img1, torch.from_numpy(np.array([int(img1_tuple[1] != img0_tuple[1])], dtype=np.float32))

    def __len__(self):
        return len(self.imageFolderDataset.imgs)

# Resize the images and transform to tensors
transformation = transforms.Compose([transforms.Resize((100, 100)),
                                     transforms.ToTensor()
                                    ])

# Load the training dataset
train_folder_dataset = datasets.ImageFolder(root=train_dataset_path)
val_folder_dataset = datasets.ImageFolder(root=val_dataset_path)

# Initialize the network
train_siamese_dataset = SiameseNetworkDataset(imageFolderDataset=train_folder_dataset,
                                              transform=transformation)
val_siamese_dataset = SiameseNetworkDataset(imageFolderDataset=val_folder_dataset,
                                            transform=transformation)

# Load the training and validation datasets
train_dataloader = DataLoader(train_siamese_dataset, shuffle=True, num_workers=0, batch_size=batch_size)
val_dataloader = DataLoader(val_siamese_dataset, shuffle=True, num_workers=0, batch_size=batch_size)

# Create the Siamese Neural Network
class SiameseNetwork(nn.Module):

    def __init__(self):
        super(SiameseNetwork, self).__init__()

        # Setting up the Sequential of CNN Layers
        self.cnn1 = nn.Sequential(
            nn.Conv2d(1, 96, kernel_size=11, stride=4),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(3, stride=2),

            nn.Conv2d(96, 256, kernel_size=5, stride=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, stride=2),

            nn.Conv2d(256, 384, kernel_size=3, stride=1),
            nn.ReLU(inplace=True)
        )

        # Setting up the Fully Connected Layers
        self.fc1 = nn.Sequential(
            nn.Linear(384, 1024),
            nn.ReLU(inplace=True),

            nn.Linear(1024, 256),
            nn.ReLU(inplace=True),

            nn.Linear(256, 2)
        )

    def forward_once(self, x):
        # This function will be called for both images
        # Its output is used to determine the similarity
        output = self.cnn1(x)
        output = output.view(output.size()[0], -1)
        output = self.fc1(output)
        return output

    def forward(self, input1, input2):
        # In this function we pass in both images and obtain both vectors
        # which are returned
        output1 = self.forward_once(input1)
        output2 = self.forward_once(input2)

        return output1, output2

# Define the Contrastive Loss Function
class ContrastiveLoss(torch.nn.Module):
    def __init__(self, margin=2.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, output1, output2, label):
        # Calculate the Euclidean distance and calculate the contrastive loss
        euclidean_distance = F.pairwise_distance(output1, output2, keepdim=True)

        loss_contrastive = torch.mean((1 - label) * torch.pow(euclidean_distance, 2) +
                                      (label) * torch.pow(torch.clamp(self.margin - euclidean_distance, min=0.0), 2))

        return loss_contrastive

model = SiameseNetwork().cuda()
criterion = ContrastiveLoss(margin=margin)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

counter = []
train_loss_history = []
val_loss_history = []
auc_roc_score_history = []
iteration_number = 0

# Iterate through the epochs
for epoch in range(num_epochs):

    train_loss_epoch = 0
    val_loss_epoch = 0
    auc_roc_score_epoch = 0

    # Training phase
    model.train()
    for i, (img0, img1, label) in enumerate(train_dataloader, 0):
        # Send the images and labels to CUDA
        img0, img1, label = img0.cuda(), img1.cuda(), label.cuda()

        # Zero the gradients
        optimizer.zero_grad()

        # Pass in the two images into the network and obtain two outputs
        output1, output2 = model(img0, img1)

        # Pass the outputs of the networks and label into the loss function
        loss_contrastive = criterion(output1, output2, label)

        # Calculate the backpropagation
        loss_contrastive.backward()

        # Optimize
        optimizer.step()

        train_loss_epoch += loss_contrastive.item()

    # Validation phase
    model.eval()
    with torch.no_grad():
        for i, (img0, img1, label) in enumerate(val_dataloader, 0):
            # Send the images and labels to CUDA
            img0, img1, label = img0.cuda(), img1.cuda(), label.cuda()

            # Pass in the two images into the network and obtain two outputs
            output1, output2 = model(img0, img1)

            # Pass the outputs of the networks and label into the loss function
            loss_contrastive = criterion(output1, output2, label)

            val_loss_epoch += loss_contrastive.item()

    # Average losses
    train_loss_epoch /= len(train_dataloader)
    val_loss_epoch /= len(val_dataloader)

    # Calculate AUC-ROC for validation data
    model.eval()
    predictions = []
    true_labels = []
    with torch.no_grad():
        for i, (img0, img1, label) in enumerate(val_dataloader, 0):
            # Send the images and labels to CUDA
            img0, img1, label = img0.cuda(), img1.cuda(), label.cuda()

            # Pass in the two images into the network and obtain two outputs
            output1, output2 = model(img0, img1)

            # Calculate AUC-ROC
            euclidean_distance = F.pairwise_distance(output1, output2)
            predictions.extend((euclidean_distance > 1.0).float().cpu().numpy())
            true_labels.extend(label.cpu().numpy())

    auc_roc = roc_auc_score(true_labels, predictions)

    print(f"Epoch number {epoch}\n Train loss {train_loss_epoch}\n Validation loss {val_loss_epoch}\n AUC-ROC {auc_roc}\n")

    counter.append(iteration_number)
    train_loss_history.append(train_loss_epoch)
    val_loss_history.append(val_loss_epoch)
    auc_roc_score_history.append(auc_roc)
    iteration_number += 1

show_plot(counter, train_loss_history, val_loss_history)
show_test_plot(counter, auc_roc_score_history, 'AUC-ROC')
