In [141]:
import numpy as np
import torch
import torchvision
import matplotlib.pyplot as plt
from time import time
from torch.autograd import Variable
import torch.nn.functional as F
from torchvision import datasets, transforms
from torch import nn, optim


In [150]:

trainset = datasets.MNIST('PATH_TO_STORE_TRAINSET', download=True, train=True, transform=transforms.ToTensor())
testset = datasets.MNIST('PATH_TO_STORE_TESTSET', download=True, train=False, transform=transforms.ToTensor())
trainloader = torch.utils.data.DataLoader(trainset, batch_size=64, shuffle=True)
testloader = torch.utils.data.DataLoader(testset, batch_size=64, shuffle=True)

In [4]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
class SiameseModel(nn.Module):
    def __init__(self):
        super(SiameseModel, self).__init__()
        self.fc1 = nn.Linear(28*28, 512)
        self.fc2 = nn.Linear(512, 512)
        self.fc3 = nn.Linear(512, 2)
    def forward(self, x1, x2):
        y1 = self.fc3(F.relu(self.fc2(F.relu(self.fc1(x1)))))
        y2 = self.fc3(F.relu(self.fc2(F.relu(self.fc1(x2)))))
        return y1, y2

In [5]:
def contrastiveLoss(y1, y2, label):
      d = nn.functional.pairwise_distance(y1, y1)
      margin =3.0
      return torch.mean(((label) * torch.pow(d, 2)) + (1 - label) * torch.pow(torch.clamp(margin - d, min=0.0), 2))

In [70]:
def train(model,optimizer,criterion=contrastiveLoss):
  for epc in range(50):
      running_loss=0.0
      train_iter = iter(trainloader)
      while True:
        try:
          x1, label_1 = next(train_iter)
          x2, label_2 = next(train_iter)
          
          label_1=label_1.to(device)
          label_2=label_2.to(device)
          dim1= list(label_1.shape)
          dim2= list(label_2.shape)
          if dim1 != dim2:
              break
          label =torch.eq(label_1,label_2).to(device)
          label = label.type(torch.cuda.FloatTensor)
          x1 = x1.view(x1.size(0), -1)
          x2 = x2.view(x2.size(0), -1)
          x1=x1.to(device)
          x2=x2.to(device)
          y1, y2 = model(x1, x2)
          loss = criterion(x1, x2,label)
          loss = Variable(loss, requires_grad=True)
          loss.backward()
          optimizer.step()
          running_loss+=  loss.item()
        except StopIteration:
          break
      print('Epoch: %d Loss: %.3f' % (epc, running_loss/len(trainloader)))

In [64]:
def test(model,optimizer,criterion=contrastiveLoss):
   test_iter = iter(testloader)
   running_loss=0.0
   while True:
        try:
          x1, label_1 = next(test_iter)
          x2, label_2 = next(test_iter)
          
          label_1.to(device)
          label_2.to(device)
          dim1= list(label_1.shape)
          dim2= list(label_2.shape)
          if dim1 != dim2:
              break
          label =torch.eq(label_1,label_2).to(device)
          label = label.type(torch.cuda.FloatTensor)
          x1 = x1.view(x1.size(0), -1)
          x2 = x2.view(x2.size(0), -1)
          x1=x1.to(device)
          x2=x2.to(device)
          y1, y2 = model(x1, x2)
          loss = criterion(x1, x2,label)
          loss = Variable(loss, requires_grad=True)
          loss.backward()
          running_loss+=loss.item()
          optimizer.step()  
        except StopIteration:
          break
   print('Test Loss: %.3f' % (running_loss/len(testloader)))

In [71]:
# // Contrastive Loss
sim=SiameseModel()
sim.to(device)
optimizer=torch.optim.SGD(sim.parameters(), lr = 0.01)
train(sim,optimizer,contrastiveLoss)
test(sim,optimizer,contrastiveLoss)

Epoch: 0 Loss: 0.139
Epoch: 1 Loss: 0.147
Epoch: 2 Loss: 0.137
Epoch: 3 Loss: 0.137
Epoch: 4 Loss: 0.139
Epoch: 5 Loss: 0.139
Epoch: 6 Loss: 0.144
Epoch: 7 Loss: 0.142
Epoch: 8 Loss: 0.142
Epoch: 9 Loss: 0.137
Epoch: 10 Loss: 0.132
Epoch: 11 Loss: 0.142
Epoch: 12 Loss: 0.137
Epoch: 13 Loss: 0.152
Epoch: 14 Loss: 0.147
Epoch: 15 Loss: 0.129
Epoch: 16 Loss: 0.154
Epoch: 17 Loss: 0.134
Epoch: 18 Loss: 0.144
Epoch: 19 Loss: 0.144
Epoch: 20 Loss: 0.137
Epoch: 21 Loss: 0.142
Epoch: 22 Loss: 0.144
Epoch: 23 Loss: 0.142
Epoch: 24 Loss: 0.144
Epoch: 25 Loss: 0.139
Epoch: 26 Loss: 0.137
Epoch: 27 Loss: 0.127
Epoch: 28 Loss: 0.147
Epoch: 29 Loss: 0.144
Epoch: 30 Loss: 0.144
Epoch: 31 Loss: 0.139
Epoch: 32 Loss: 0.149
Epoch: 33 Loss: 0.144
Epoch: 34 Loss: 0.152
Epoch: 35 Loss: 0.149
Epoch: 36 Loss: 0.149
Epoch: 37 Loss: 0.149
Epoch: 38 Loss: 0.149
Epoch: 39 Loss: 0.149
Epoch: 40 Loss: 0.152
Epoch: 41 Loss: 0.144
Epoch: 42 Loss: 0.139
Epoch: 43 Loss: 0.144
Epoch: 44 Loss: 0.147
Epoch: 45 Loss: 0.14

In [75]:
class TripletMNIST():
    """
    Train: For each sample (anchor) randomly chooses a positive and negative samples
    Test: Creates fixed triplets for testing
    """

    def __init__(self, mnist_dataset):
        self.mnist_dataset = mnist_dataset
        self.train = self.mnist_dataset.train
        self.transform = self.mnist_dataset.transform

        if self.train:
            self.train_labels = self.mnist_dataset.train_labels
            self.train_data = self.mnist_dataset.train_data
            self.labels_set = set(self.train_labels.numpy())
            self.label_to_indices = {label: np.where(self.train_labels.numpy() == label)[0]
                                     for label in self.labels_set}

        else:
            self.test_labels = self.mnist_dataset.test_labels
            self.test_data = self.mnist_dataset.test_data
            # generate fixed triplets for testing
            self.labels_set = set(self.test_labels.numpy())
            self.label_to_indices = {label: np.where(self.test_labels.numpy() == label)[0]
                                     for label in self.labels_set}

            random_state = np.random.RandomState(29)

            triplets = [[i,
                         random_state.choice(self.label_to_indices[self.test_labels[i].item()]),
                         random_state.choice(self.label_to_indices[
                                                 np.random.choice(
                                                     list(self.labels_set - set([self.test_labels[i].item()]))
                                                 )
                                             ])
                         ]
                        for i in range(len(self.test_data))]
            self.test_triplets = triplets

    def __getitem__(self, index):
        if self.train:
            img1, label1 = self.train_data[index], self.train_labels[index].item()
            positive_index = index
            while positive_index == index:
                positive_index = np.random.choice(self.label_to_indices[label1])
            negative_label = np.random.choice(list(self.labels_set - set([label1])))
            negative_index = np.random.choice(self.label_to_indices[negative_label])
            img2 = self.train_data[positive_index]
            img3 = self.train_data[negative_index]
        else:
            img1 = self.test_data[self.test_triplets[index][0]]
            img2 = self.test_data[self.test_triplets[index][1]]
            img3 = self.test_data[self.test_triplets[index][2]]

        img1 = Image.fromarray(img1.numpy(), mode='L')
        img2 = Image.fromarray(img2.numpy(), mode='L')
        img3 = Image.fromarray(img3.numpy(), mode='L')
        if self.transform is not None:
            img1 = self.transform(img1)
            img2 = self.transform(img2)
            img3 = self.transform(img3)
        return (img1, img2, img3), []

    def __len__(self):
        return len(self.mnist_dataset)

In [83]:
trans = transforms.Compose([transforms.ToTensor()])
trainloader = TripletMNIST(torchvision.datasets.MNIST(root="~/torch_datasets", train = True, transform = trans, download=True))
testloader = TripletMNIST(torchvision.datasets.MNIST(root="~/torch_datasets", train = False, transform = trans, download=True))



In [133]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
class TripletModel(nn.Module):
    def __init__(self):
        super(TripletModel, self).__init__()
        self.fc1 = nn.Linear(28*28, 300)
        self.fc2 = nn.Linear(300, 300)
        self.fc3 = nn.Linear(300, 100)
        self.fc4 = nn.Linear(100, 2)

    def forward(self, x1, x2, x3):
        y1 =  self.fc4(F.relu(self.fc3( F.relu(self.fc2 (F.relu(self.fc1(x1)))))))
        y2 =  self.fc4(F.relu(self.fc3(F.relu(self.fc2 (F.relu(self.fc1(x2)))))))
        y3 =  self.fc4(F.relu(self.fc3( F.relu(self.fc2 (F.relu(self.fc1(x3)))))))
        return y1, y2, y3

In [147]:
def train(model,optimizer,criterion,epoch):
    for epch in range(epoch):
      running_loss = 0
      for i, data in enumerate(trainloader,0):
        images = data
        imgLst = list(images[0]) 
        optimizer.zero_grad()
        imgLst[0] = imgLst[0].reshape(-1,784)
        imgLst[1] = imgLst[1].reshape(-1,784)
        imgLst[2] = imgLst[2].reshape(-1,784)
        y1,y2,y3 = model(imgLst[0],imgLst[1],imgLst[2])
        loss = criterion( y1,y2,y3)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
      running_loss/= len(trainloader)
      # print('Average  train loss: {:.3f} , epoch: {}'.format(running_loss,epch))





In [119]:
def test(model,criterion):
  with torch.no_grad():
    model.eval()
    running_loss = 0
    for batch_idx, (data, target) in enumerate(testloader):
      imgLst = list(data)
      imgLst[0] = imgLst[0].reshape(-1,784)
      imgLst[1] = imgLst[1].reshape(-1,784)
      imgLst[2] = imgLst[2].reshape(-1,784)
      y1,y2,y3 = model(imgLst[0],imgLst[1],imgLst[2])
      loss = criterion(y1,y2,y3)
      running_loss+=loss.item()
    print('Average test loss: {:.3f} '.format(running_loss/len(testloader)))


In [138]:
# Rmsprop
tp = TripletModel()
criterion = nn.TripletMarginLoss(margin=1.0, p=2)
optimizer = torch.optim.RMSprop(tp.parameters(), lr=0.01)
train(tp,optimizer,criterion,5)
test(tp,criterion)

Average  train loss: 7.702 , epoch: 0
Average  train loss: 1.400 , epoch: 1
Average  train loss: 1.237 , epoch: 2
Average  train loss: 1.003 , epoch: 3
Average  train loss: 1.000 , epoch: 4
Average test loss: 1.000 


In [140]:
# Sgd
tp = TripletModel()
criterion = nn.TripletMarginLoss(margin=1.0, p=2)
optimizer = torch.optim.SGD(tp.parameters(), lr=0.01)
train(tp,optimizer,criterion,20)
test(tp,criterion)

Average  train loss: 1.001 , epoch: 0
Average  train loss: 1.001 , epoch: 1
Average  train loss: 1.000 , epoch: 2
Average  train loss: 1.004 , epoch: 3
Average  train loss: 0.997 , epoch: 4
Average  train loss: 1.004 , epoch: 5
Average  train loss: 1.002 , epoch: 6
Average  train loss: 0.998 , epoch: 7
Average  train loss: 0.992 , epoch: 8
Average  train loss: 1.011 , epoch: 9
Average  train loss: 1.000 , epoch: 10
Average  train loss: 1.003 , epoch: 11
Average  train loss: 0.984 , epoch: 12
Average  train loss: 0.988 , epoch: 13
Average  train loss: 1.010 , epoch: 14
Average  train loss: 1.005 , epoch: 15
Average  train loss: 1.001 , epoch: 16
Average  train loss: 1.002 , epoch: 17
Average  train loss: 1.000 , epoch: 18
Average  train loss: 1.001 , epoch: 19
Average test loss: 1.001 


In [146]:
# ada-delta
tp = TripletModel()
criterion = nn.TripletMarginLoss(margin=1.0, p=2)
optimizer = torch.optim.Adadelta(tp.parameters(), lr=0.001)
train(tp,optimizer,criterion,3)
test(tp,criterion)

Average  train loss: 1.000 , epoch: 0
Average  train loss: 0.929 , epoch: 1
Average  train loss: 0.848 , epoch: 2
Average test loss: 0.972 


In [149]:
p_range=[1,2,3,4,5]
margin_range=[1.0,2.0,3.0,4.0,5.0]
for p in p_range:
  for m in margin_range:
    tp = TripletModel()
    criterion = nn.TripletMarginLoss(margin=m, p=p)
    optimizer = torch.optim.RMSprop(tp.parameters())
    train(tp,optimizer,criterion,5)
    print('For p=',p,'margin=',m)
    test(tp,criterion)
# test/validation loss is minimized for p=3, margin=1.0

For p= 1 margin= 1.0
Average test loss: 1.000 
For p= 1 margin= 2.0
Average test loss: 2.000 
For p= 1 margin= 3.0
Average test loss: 3.000 
For p= 1 margin= 4.0
Average test loss: 4.000 
For p= 1 margin= 5.0
Average test loss: 10.986 
For p= 2 margin= 1.0
Average test loss: 1.000 
For p= 2 margin= 2.0
Average test loss: 1.869 
For p= 2 margin= 3.0
Average test loss: 3.295 
For p= 2 margin= 4.0
Average test loss: 4.000 
For p= 2 margin= 5.0
Average test loss: 20.612 
For p= 3 margin= 1.0
Average test loss: 0.969 
For p= 3 margin= 2.0
Average test loss: 1.972 
For p= 3 margin= 3.0
Average test loss: 5.297 
For p= 3 margin= 4.0
Average test loss: 3.637 
For p= 3 margin= 5.0
Average test loss: 5.139 
For p= 4 margin= 1.0
Average test loss: 1.000 
For p= 4 margin= 2.0
Average test loss: 2.069 
For p= 4 margin= 3.0
Average test loss: 3.262 
For p= 4 margin= 4.0
Average test loss: 3.980 
For p= 4 margin= 5.0
Average test loss: 5.000 
For p= 5 margin= 1.0
Average test loss: 1.000 
For p= 5 ma

In [None]:
# Pro
# 1)Siamese seem best suited for cases where we can have only a few examples per Class
# 2 Learning from Semantic Similarity b/w two comparable things

# Cons
# 1)Doesn't ouput probabilities
# 2 Reuires large training time

