# Siamese Network Architecture 

##### Step1: Lets first import all libraries needed.

In [1]:
# -*- encoding: utf-8 -*-
import argparse
import torch
import torchvision.datasets as dsets
import random
import numpy as np
import time
import matplotlib.pyplot as plt
from torch.autograd import Variable
from torchvision import transforms
import pickle
import torch
import torch.nn as nn
%matplotlib inline


### Data Preprocessing
As we learned in theory part of Siamese Network, that as part of data preprocessing we need to create pairs.
1. 1 pair-> similar; y=0
2. 1 pair-> dissimilar; y=1

Note: We are using contrastive loss function.

#### Step2: To preporcess data, and create iterator for model, first create a Dataset Loader Class.

In [2]:
class Dataset(object):
    '''
    Class Dataset:
    Input: numpy values
    Output: torch variables.
    '''
    def __init__(self, x0, x1, label):
        self.size = label.shape[0] 
        self.x0 = torch.from_numpy(x0)
        self.x1 = torch.from_numpy(x1)
        self.label = torch.from_numpy(label)

    def __getitem__(self, index):
        return (self.x0[index],
                self.x1[index],
                self.label[index])

    def __len__(self):
        return self.size
    

##### Before creating an iterator, lets create pairs, and preprocess images in them.

In [3]:
def create_pairs(data, digit_indices):
    x0_data = []
    x1_data = []
    label = []
    n = min([len(digit_indices[d]) for d in range(11)]) - 1
    for d in range(11): # for MNIST dataset: as we have 10 digits
        for i in range(n):
            z1, z2 = digit_indices[d][i], digit_indices[d][i + 1]
            x0_data.append(data[z1]/255.) # Image Preprocessing Step
            x1_data.append(data[z2]/255.) # Image Preprocessing Step
            label.append(1)
            inc = random.randrange(1, 10)
            dn = (d + inc) % 10
            z1, z2 = digit_indices[d][i], digit_indices[dn][i]
            x0_data.append(data[z1]/255.) # Image Preprocessing Step
            x1_data.append(data[z2]/255.) # Image Preprocessing Step
            label.append(0)

    x0_data = np.array(x0_data, dtype=np.float32) #[:10201]
#     print (x0_data.shape)
    x0_data = x0_data.reshape([-1, 3, 224, 224])
    x1_data = np.array(x1_data, dtype=np.float32) #[:10201]
    x1_data = x1_data.reshape([-1, 3, 224, 224])
    label = np.array(label, dtype=np.int32)
#     print (label.shape)
    return x0_data, x1_data, label

def create_iterator(data, label, batchsize, shuffle=False):
#     print ("max label", max(label))
    digit_indices = [np.where(label == i)[0] for i in range(max(label)+1)]
    x0, x1, label = create_pairs(data, digit_indices)
    ret = Dataset(x0, x1, label)
    return ret


###### create iterator: returns set of given batchsize for training purpose

# Loss Function: Contrastive Loss Function 

#### Step 3: Create Loss Function
As we know contrastive loss function consists of 2 parts:
![contrastive%20Loss.png](Images/contrastiveLoss.png)

1. for similar points: (1-y)*(distance_function)^2
2. for dissimilar points: y*{max(0,(m-distance_function^2)}

Here Distance Function is taken as euclidean distance, also known as root mean square.

In [5]:
def contrastive_loss_function(x0, x1, y, margin=1.0):
    # euclidean distance
    diff = x0 - x1
    dist_sq = torch.sum(torch.pow(diff, 2), 1)
    dist = torch.sqrt(dist_sq)
    mdist = margin - dist
#     print (mdist)
    dist = torch.clamp(mdist, min=0.0)
    loss = y * dist_sq + (1 - y) * torch.pow(dist, 2)
#     print (loss)
    loss = torch.sum(loss) / 2.0 / x0.size()[0]
#     print (loss, x0.size()[0])
#     print ("next")
    return loss

def triplet_loss(positive, negative,anchor, size_average=True, margin=1.0):
    distance_positive = (anchor - positive).pow(2).sum(1)  # .pow(.5)
    distance_negative = (anchor - negative).pow(2).sum(1)  # .pow(.5)
    losses = F.relu(distance_positive - distance_negative + margin)
    return losses.mean() if size_average else losses.sum()


# Siamese Network Architecture

##### Step 4: Creating Siamese Network Architecture
For this first lets create a class called SiameseNetwork with 2 functions:
###### 1. forward_once: In forward_once pass through all layers and returns the output embeddings 
###### 2. forward: In forward, call forward_once 2 times for the Input pair given, and returns the embeddings obtained


As discussed in theory part of Siamese Network, we share parameters of twins, so we don't need to create explicitly both brances, we can just create one.

In [6]:
batchsize=8
import copy
# [N, C, W, H] 
class SiameseNetwork(nn.Module):
    def __init__(self):
        super(SiameseNetwork, self).__init__()
        self.cnn1 = nn.Sequential(
            nn.Conv2d(3, 20, kernel_size=5, padding=1),
            nn.MaxPool2d(2, stride=2),
            nn.Conv2d(20, 50, kernel_size=5, padding=1),
            nn.MaxPool2d(2, stride=2),
            nn.Conv2d(50, 50, kernel_size=5, padding=1),
            nn.MaxPool2d(2, stride=2))
        self.fc1 = nn.Sequential(
            nn.Linear(34312, 8),
            nn.ReLU(inplace=True),
            nn.Linear(8,10),
            nn.Linear(10, 2))
        global vgg_bottleneck
        vgg_bottleneck = VGG16(weights='imagenet', include_top=False, pooling='avg')
        
    def forward_once(self, x):
#         print (x.shape)
        x_= copy.deepcopy(x)
        x_= x_.data.numpy()
        x_= x_.reshape(len(x_),224,224,3)
        train_vgg_bf = vgg_bottleneck.predict(x_, batch_size=8, verbose=0)
        output = self.cnn1(x)
        output = output.view(output.size(0), -1)
        b = torch.tensor(train_vgg_bf)
        b = b.view(b.size(0), -1)

        output = torch.cat([output, b], dim=1)
#         print (output.shape) #torch.Size([8, 140450])
        output = self.fc1(output)
        return output

    def forward(self, input1, input2):
        output1 = self.forward_once(input1)
        output2 = self.forward_once(input2)
        return output1, output2


In [7]:
import numpy as np
import os
import shutil
from keras.preprocessing import image
from sklearn.model_selection import train_test_split
from keras.applications.vgg16 import VGG16
from keras.applications import xception
from keras.applications import inception_v3
from keras.applications.vgg16 import preprocess_input
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import log_loss, accuracy_score

def extract_features(path):
    directory_lists=os.listdir(path)
    X=[]
    Y=[]
    count=0
    if ('.DS_Store' in directory_lists):
            directory_lists.remove('.DS_Store')
    for d in directory_lists:
        nest=os.listdir(path+"/"+d)
        if ('.DS_Store' in nest):
            nest.remove('.DS_Store')
        for f in nest:
            img = image.load_img(path+"/"+d+"/"+f, target_size=(224, 224))
            img_data = image.img_to_array(img)
            img_data = preprocess_input(img_data)
            img_data = np.expand_dims(img_data, axis=0)
            X.append(img_data)
            Y.append(count)
        count+=1
    X=np.array(X)
    y=np.array(Y)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30, random_state=42)         
    return X_train, X_test, y_train, y_test 

X_train, X_test, y_train, y_test = extract_features("./Data/train/")


Using TensorFlow backend.


(1310, 1, 224, 224, 3)
(562, 1, 224, 224, 3)
(1310,)
(562,)


##### We created an iterator above, here we will use it to create training and test set iterators. 

In [8]:
train_iter = create_iterator(X_train,y_train,batchsize)
test_iter = create_iterator(X_test,y_test,batchsize)

# call model
model = SiameseNetwork()
# model = torch.load('./siamese.pth')
# model.eval()
learning_rate = 0.001 # learning rate for optimization
momentum = 0.9 # momentum

# Loss and Optimizer
criterion =  contrastive_loss_function
#contrastive_loss_function # we will use contrastive loss function as defined above
# optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate, momentum=momentum)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=50, verbose=True)
# creating a train loader, and a test loader.
train_loader = torch.utils.data.DataLoader(train_iter,batch_size=batchsize, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_iter,batch_size=2, shuffle=True)


## Let's Train our Model !!!

#### Step 5: Train Model for certain number of epochs. 

In [9]:
train_loss = []
epochs = 500
for epoch in range(epochs):
    total_loss = 0.0
    for batch_idx, (x0, x1, labels) in enumerate(train_loader):
        labels = labels.float()
        x0, x1, labels = Variable(x0), Variable(x1), Variable(labels)
        output1, output2 = model.forward(x0, x1)
        loss = criterion(output1, output2, labels)
        train_loss.append(loss.item())
        total_loss += loss.item()
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    scheduler.step(epoch)
    print('Epoch: {} \tLoss: {:.6f}'.format(epoch, total_loss*1.0/batchsize))
    if epoch%50==0:
        torch.save(model, './SiameseModified-epoch-%s.pth' % epoch)


Epoch: 0 	Loss: 0.909582
Epoch: 1 	Loss: 0.694766
Epoch: 2 	Loss: 0.655927
Epoch: 3 	Loss: 0.642793
Epoch: 4 	Loss: 0.612342
Epoch: 5 	Loss: 0.602218
Epoch: 6 	Loss: 0.593924
Epoch: 7 	Loss: 0.565308
Epoch: 8 	Loss: 0.555189
Epoch: 9 	Loss: 0.560129
Epoch: 10 	Loss: 0.538603
Epoch    12: reducing learning rate of group 0 to 1.0000e-04.
Epoch: 11 	Loss: 0.526148
Epoch: 12 	Loss: 0.505046
Epoch: 13 	Loss: 0.492801
Epoch: 14 	Loss: 0.490949
Epoch: 15 	Loss: 0.489370
Epoch: 16 	Loss: 0.487194
Epoch: 17 	Loss: 0.487279
Epoch: 18 	Loss: 0.487545
Epoch: 19 	Loss: 0.486376
Epoch: 20 	Loss: 0.479335
Epoch: 21 	Loss: 0.478816
Epoch    23: reducing learning rate of group 0 to 1.0000e-05.
Epoch: 22 	Loss: 0.476945
Epoch: 23 	Loss: 0.474335
Epoch: 24 	Loss: 0.475412
Epoch: 25 	Loss: 0.474042
Epoch: 26 	Loss: 0.474406
Epoch: 27 	Loss: 0.473888
Epoch: 28 	Loss: 0.476419
Epoch: 29 	Loss: 0.477092
Epoch: 30 	Loss: 0.472870
Epoch: 31 	Loss: 0.474559
Epoch: 32 	Loss: 0.476576
Epoch    34: reducing learni

Epoch: 297 	Loss: 0.471400
Epoch: 298 	Loss: 0.473667
Epoch: 299 	Loss: 0.472555
Epoch: 300 	Loss: 0.473114
Epoch: 301 	Loss: 0.472023
Epoch: 302 	Loss: 0.473164
Epoch: 303 	Loss: 0.472526
Epoch: 304 	Loss: 0.472077
Epoch: 305 	Loss: 0.473397
Epoch: 306 	Loss: 0.473392
Epoch: 307 	Loss: 0.473292
Epoch: 308 	Loss: 0.471774
Epoch: 309 	Loss: 0.472693


KeyboardInterrupt: 

# Let's Visualize our Plots

##### Step5: Lets Create all functions for plotting embeddings and loss function

##### Loss Function Plot

In [None]:
def plot_loss(train_loss,name="train_loss.png"):
    plt.plot(train_loss, label="train loss")
    plt.legend()
plot_loss(train_loss)
def plot_mnist(numpy_all, numpy_labels,name="./embeddings_plot.png"):
        c = ['#ff0000', '#ffff00', '#00ff00', '#00ffff', '#0000ff',
             '#ff00ff', '#990000', '#999900', '#009900', '#009999', '#000fff']
        for i in range(0,11):
            f = numpy_all[np.where(numpy_labels == i)]
#             print (f)
            plt.plot(f[:, 0], f[:, 1], '.', c=c[i])
        plt.legend(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10'])
        plt.savefig(name)

### Plotting test-set Embeddings

In [None]:
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
X_test = X_test/255.
y_test = np.array(y_test, dtype=np.int32)
# print (y_test)
def test_model(model):
        model.eval()
        all_ = []
        all_labels = []
        with torch.no_grad():
            for i in range(0, 562):
                x = Variable(torch.tensor(X_test[i].reshape([-1, 1, 224, 224])))
#                 print ("y_tes is", y_test[i])
                y_t = np.array(y_test[i], dtype=np.int32)
#                 print ("y_t is", y_t)
                y = Variable(torch.tensor(y_t))
#                 print ("labels are", y)
                output = model.forward_once(x)
                all_.extend(output.data.cpu().numpy().tolist())
                all_labels.append(y.data.cpu().numpy().tolist())

        numpy_all = np.array(all_)
        numpy_labels = np.array(all_labels)
#         print (numpy_labels)
        return numpy_all, numpy_labels

def testing_plots(model):
        dict_pickle={}
#         print (test_model(model))
        numpy_all, numpy_labels = test_model(model)
        dict_pickle["numpy_all"]=numpy_all
        dict_pickle["numpy_labels"]=numpy_labels
        clusterer = KMeans(n_clusters=11)
        preds = clusterer.fit_predict(numpy_all)
        centers = clusterer.cluster_centers_
        score = silhouette_score(numpy_all, preds)
        print (score)
        plot_mnist(numpy_all, numpy_labels)

from sklearn.metrics import silhouette_samples, silhouette_score
from pylab import rcParams
rcParams['figure.figsize'] = 5, 5
testing_plots(model)
        