In [4]:
%matplotlib inline
import torchvision
import torchvision.datasets as dset
import torchvision.transforms as transforms
from torch.utils.data import DataLoader,Dataset
import matplotlib.pyplot as plt
import torchvision.utils
import numpy as np
import random
from PIL import Image
import torch
from torch.autograd import Variable
import PIL.ImageOps    
import torch.nn as nn
from torch import optim
import torch.nn.functional as F
from numpy import *
from shutil import copyfile

background = False
pretrained = False

# Custom Siamese Dataset class
Each time will give a tuple of anchor image, postive image OR negative image

In [5]:
class SiameseNetworkDataset(Dataset):
    
    def __init__(self,imageFolderDataset,transform=None,should_invert=True):
        self.imageFolderDataset = imageFolderDataset    
        self.transform = transform
        self.should_invert = should_invert
        
    def __getitem__(self,index):
        img0_tuple = random.choice(self.imageFolderDataset.imgs)
        #we need to make sure approx 50% of images are in the same class
        should_get_same_class = random.randint(0,1) 
        if should_get_same_class:
            while True:
                #keep looping till the same class image is found
                img1_tuple = random.choice(self.imageFolderDataset.imgs) 
                if img0_tuple[1]==img1_tuple[1]:
                    break
        else:
            img1_tuple = random.choice(self.imageFolderDataset.imgs)

        img0 = Image.open(img0_tuple[0])
        img1 = Image.open(img1_tuple[0])
        img0 = img0.convert("RGB")
        img1 = img1.convert("RGB")
        
        if self.should_invert:
            img0 = PIL.ImageOps.invert(img0)
            img1 = PIL.ImageOps.invert(img1)

        if self.transform is not None:
            img0 = self.transform(img0)
            img1 = self.transform(img1)
        
        return img0, img1 , torch.from_numpy(np.array([int(img1_tuple[1]!=img0_tuple[1])],dtype=np.float32))
    
    def __len__(self):
        return len(self.imageFolderDataset.imgs)

# Create training and test sets using CASIA dataset

In [6]:
import os
import shutil
if not pretrained:
    training_dir = "/home/shared/CS341/Siamese/Facial-Similarity-with-Siamese-Networks-in-Pytorch/CS341/"
    testing_dir = "/home/shared/CS341/Siamese/Facial-Similarity-with-Siamese-Networks-in-Pytorch/test/"
    for file in os.listdir(training_dir):
        if (random.random() > 0.85):
            shutil.move(os.path.join(training_dir, file), os.path.join(testing_dir, file))
folder_dataset_CASIA = dset.ImageFolder("/home/shared/CS341/Siamese/Facial-Similarity-with-Siamese-Networks-in-Pytorch/CS341/") 

# Visuallize images of one batch

In [9]:
siamese_dataset = SiameseNetworkDataset(imageFolderDataset=folder_dataset_CASIA,
                                                transform=transforms.Compose([transforms.Scale((100,100)),
                                                                      transforms.ToTensor()
                                                                      ])
                                       ,should_invert=False)

vis_dataloader = DataLoader(siamese_dataset,
                        shuffle=True,
                        num_workers=8,
                        batch_size=8)
dataiter = iter(vis_dataloader)


example_batch = next(dataiter)
concatenated = torch.cat((example_batch[0],example_batch[1], example_batch[2]))
imshow(torchvision.utils.make_grid(concatenated))

# Siamese Architecture and Contrastive loss function

In [10]:
class SiameseNetwork(nn.Module):
    def __init__(self):
        super(SiameseNetwork, self).__init__()
        self.cnn1 = nn.Sequential(
            nn.ReflectionPad2d(1),
            nn.Conv2d(3, 4, kernel_size=3),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(4),
            
            nn.ReflectionPad2d(1),
            nn.Conv2d(4, 8, kernel_size=3),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(8),

            nn.ReflectionPad2d(1),
            nn.Conv2d(8, 8, kernel_size=3),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(8),
        )

        self.fc1 = nn.Sequential(
            nn.Linear(8*100*100, 500),
            nn.ReLU(inplace=True),

            nn.Linear(500, 500),
            nn.ReLU(inplace=True),

            nn.Linear(500, 7))

    def forward_once(self, x):
        output = self.cnn1(x)
        output = output.view(output.size()[0], -1)######？
        output = self.fc1(output)
        return output

    def forward(self, input1, input2):
        output1 = self.forward_once(input1)
        output2 = self.forward_once(input2)
        return output1, output2

class ContrastiveLoss(torch.nn.Module):
    """
    Contrastive loss function.
    Based on: http://yann.lecun.com/exdb/publis/pdf/hadsell-chopra-lecun-06.pdf
    """

    def __init__(self, margin=2.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, output1, output2, label):
        euclidean_distance = F.pairwise_distance(output1, output2)
        loss_contrastive = torch.mean((1-label) * torch.pow(euclidean_distance, 2) +
                                      (label) * torch.pow(torch.clamp(self.margin - euclidean_distance, min=0.0), 2))
        return loss_contrastive

# Training time

In [12]:
train_dataloader = DataLoader(siamese_dataset,
                        shuffle=True,
                        num_workers=8,
                        batch_size=32)

net = SiameseNetwork().cuda()
criterion = ContrastiveLoss()
optimizer = optim.Adam(net.parameters(),lr = 0.0005 )

counter = []
loss_history = [] 
iteration_number= 0

for epoch in range(0,50):
    for i, data in enumerate(train_dataloader,0):
        img0, img1 , label = data
        img0, img1 , label = Variable(img0).cuda(), Variable(img1).cuda() , Variable(label).cuda()
        output1,output2 = net(img0,img1)
        optimizer.zero_grad()
        loss_contrastive = criterion(output1,output2,label)
        loss_contrastive.backward()
        optimizer.step()
        if i %10 == 0 :
            torch.save(net.state_dict(),'siamesecolor7_newtest.pkl')
            print('Saved model checkpoints')
            print("Epoch number {}\n Current loss {}\n".format(epoch,loss_contrastive.data[0]))
            iteration_number +=10
            counter.append(iteration_number)
            loss_history.append(loss_contrastive.data[0])
show_plot(counter,loss_history)

# Load pre-trained model based on Reddit training data (Pre-trained Reddit)

In [None]:
import os
if pretrained:
    net = SiameseNetwork().cuda()
    if os.path.exists('pretrained_newtest.pkl'):
        checkpoint = torch.load('pretrained_newtest.pkl')
        net.load_state_dict(checkpoint)
        print("loaded")

# Load CASIA test dataset

In [59]:
folder_dataset = dset.ImageFolder(root=testing_dir)
len(folder_dataset.imgs)

981

# Retrieve encodings of CASIA test images using trained model (only CASIA)

In [53]:
image_tuples = {}
encodings = None
index = 0
nums = {}
for x in range(276): 
    nums[x] = 0
transform=transforms.Compose([transforms.Scale((100,100)),transforms.ToTensor()])
for img in folder_dataset.imgs:
    label = img[1]
    nums[label] += 1
    img = Image.open(img[0])
    img = img.convert("RGB")
    img = transform(img).view(1,3,100,100)
    encoding = net.forward_once(Variable(img).cuda())
    image_tuples[index] = label
    index += 1
    if (encodings is None):
        encodings = list([encoding.data.cpu().numpy()[0]])
    else:
        encodings.append(encoding.data.cpu().numpy()[0])

  "please use transforms.Resize instead.")
  " Skipping tag %s" % (size, len(data), tag))
  " Skipping tag %s" % (size, len(data), tag))


# Extract and add feature vectors of training images to the test set as background (CASIA + background)

In [54]:
background = False
if background:
    folder_dataset = dset.ImageFolder("/home/shared/CS341/Dataprocessing/train")
    for img in folder_dataset.imgs:
        img = Image.open(img[0])
        img = img.convert("RGB")
        img = transform(img).view(1,3,100,100)
        encoding = net.forward_once(Variable(img).cuda())
        if (encodings is None):
            encodings = list([encoding.data.cpu().numpy()[0]])
        else:
            encodings.append(encoding.data.cpu().numpy()[0])

  'to RGBA images')


In [2]:
# encodings = np.array(encodings)
# encodings.shape

# Perform Image ranking for each of the images in CASIA test set

In [60]:
from sklearn.neighbors import NearestNeighbors
import numpy as np
X = np.array([[-1, -1], [-2, -1], [-3, -2], [1, 1], [2, 1], [3, 2]])
nbrs = NearestNeighbors(n_neighbors=981, algorithm='ball_tree').fit(encodings)
distances, indices = nbrs.kneighbors(encodings[0:981])

In [61]:
result = np.ones((981,981))
for i in range(981):
    for j in range(981):
        try:
            result[i][j] = image_tuples[indices[i][j]]
        except:
            pass

# Calulate top k accuracy
In this case, k = 2

In [95]:
count = 0
k = 2
for i in range(981):
    if(result[i][0] in result[i][1:k+1]):
        count+= 1

In [96]:
count/981

0.8817533129459735

# MRR Calculation

In [None]:
recip = 0
count = 0
for i in range(981):
    for j in range(1, 981):
        if(result[i][j] == result[i][0]):
            recip += 1/j
            count += 1

In [None]:
recip/count