# Create a Siamese Network with Triplet Loss in Keras

# Task 1: Understanding the Approach

In [1]:
%matplotlib notebook

import tensorflow as tf
import matplotlib.pyplot as plt
import numpy as np
import random

from pca_plotter import PCAPlotter

print('TensorFlow version:', tf.__version__)

TensorFlow version: 2.2.0


## Understanding the Approach

This appraoch is taken from the popular [FaceNet](https://arxiv.org/abs/1503.03832) paper.

We have a CNN model called `EmbeddingModel`:

![CNN](assets/CNN.png)

We use three images for each training example:
1. `person1_image1.jpg` (Anchor Example, represented below in green)
2. `person1_image2.jpg` (Positive Example, in blue)
3. `person2_image1.jpg` (Negative Example, in red).

![Embeddings](assets/embeddings.png)


## Siamese Network

All the three images of an example pass through the model, and we get the three Embeddings: One for the Anchor Example, one for the Positive Example, and one for the Negative Example.

![Siamese Network](assets/siamese.png)

The three instances of the `EmbeddingModel` shown above are not different instances. It's the same, shared model instance - i.e. the parameters are shared, and are updated for all the three paths simultaneously.

# Task 2: Importing the Data

In [2]:
(x_train,y_train), (x_test,y_test) = tf.keras.datasets.mnist.load_data()

In [3]:
print(x_train.shape)

(60000, 28, 28)


In [4]:
x_train=np.reshape(x_train,(60000,784))/255.
x_test=np.reshape(x_test,(10000,784))/255.
print(x_train.shape)

(60000, 784)


In [5]:
print(x_train[0].shape)

(784,)


# Task 3: Plotting Examples

In [6]:
def plot_triplet(triplet):
    plt.figure(figsize=(6,2))
    for i in range(3):
        plt.subplot(1,3,i+1)
        plt.imshow(np.reshape(triplet[i],(28,28)),cmap='binary')
    plt.show()

In [7]:
plot_triplet(list([x_train[i] for i in range(3)]))

<IPython.core.display.Javascript object>

# Task 4: A Batch of Triplets

In [8]:
min(y_train)
a=[1,2,3,4]
a.remove(1)
a

[2, 3, 4]

In [9]:
from collections import defaultdict

class triplets():
    def __init__(self,y_train):
        self.loctonum = defaultdict(lambda:[])
        for key,val in enumerate(y_train):
            self.loctonum[val].append(key)
             
    def _get_triplet_locs(self,num):
        '''
        num the number we are interested in
        returns 2 instances of num and 1 of a different num
        '''
        anchor,positive = random.sample(self.loctonum[num],2)
        keys=list(self.loctonum.keys())
        keys.remove(num)
#         print(keys)
        negative=random.choice(self.loctonum[random.choice(keys)])
        return anchor,positive,negative
    def reshape(a):
        return np.reshape(a,())
    def get_triplet_images(self,x_train, num=None):
        if(num is None):
            num=random.choice(list(self.loctonum.keys()))
        a,p,n=self._get_triplet_locs(num)
        return x_train[a].flatten(),x_train[p].flatten(),x_train[n].flatten()

In [10]:
trips = triplets(y_train)
plot_triplet(trips.get_triplet_images(x_train))  

<IPython.core.display.Javascript object>

In [11]:
#test
a,b,c=trips.get_triplet_images(x_train)
a=np.reshape(a,(28*28,))
a.flatten()
a.shape

(784,)

In [12]:
#coursera code
# def create_batch(batch_size=256):
#     x_anchors = np.zeros((batch_size, 784))
#     x_positives = np.zeros((batch_size, 784))
#     x_negatives = np.zeros((batch_size, 784))
    
#     for i in range(0, batch_size):
#         # We need to find an anchor, a positive example and a negative example
#         random_index = random.randint(0, x_train.shape[0] - 1)
#         x_anchor = x_train[random_index]
#         y = y_train[random_index]
        
#         indices_for_pos = np.squeeze(np.where(y_train == y))
#         indices_for_neg = np.squeeze(np.where(y_train != y))
        
#         x_positive = x_train[indices_for_pos[random.randint(0, len(indices_for_pos) - 1)]]
#         x_negative = x_train[indices_for_neg[random.randint(0, len(indices_for_neg) - 1)]]
        
#         x_anchors[i] = x_anchor
#         x_positives[i] = x_positive
#         x_negatives[i] = x_negative
        
#     return [x_anchors, x_positives, x_negatives]

In [13]:
#my code
def create_batch(batch_size):
    anchors=np.zeros((batch_size, 784))
    positive=np.zeros((batch_size, 784))
    negative=np.zeros((batch_size, 784))

    for i in range(batch_size):       
        anchors[i],positive[i],negative[i] = trips.get_triplet_images(x_train)
    
    return [anchors,positive,negative]
    

In [14]:
# triplet = create_batch(100)
# len(triplet[2])
# for i in range(20):
#     plot_triplet((triplet[0][i],triplet[1][i],triplet[2][i]))

# Task 5: Embedding Model

In [15]:
emb_dim = 64
embedding_model= tf.keras.models.Sequential([
    tf.keras.layers.Dense(64, activation='relu',input_shape=(784,)),
    tf.keras.layers.Dense(emb_dim, activation='sigmoid',input_shape = (64,))
])

embedding_model.summary()

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense (Dense)                (None, 64)                50240     
_________________________________________________________________
dense_1 (Dense)              (None, 64)                4160      
Total params: 54,400
Trainable params: 54,400
Non-trainable params: 0
_________________________________________________________________


In [16]:
example = np.expand_dims(x_train[0],axis=0)
ex_embedding = embedding_model.predict(example)
print(ex_embedding[0])
type(ex_embedding[0])

[0.5111483  0.52586067 0.5974436  0.52331305 0.5157557  0.40768176
 0.4013723  0.5682705  0.55382574 0.59278065 0.3414685  0.4314249
 0.5969012  0.49017692 0.5494121  0.51137596 0.5192355  0.34831324
 0.39300948 0.3880542  0.3892489  0.5236647  0.6262604  0.3271479
 0.61145353 0.45061976 0.56183225 0.43333516 0.54364085 0.39001575
 0.5361954  0.50048685 0.39881772 0.6662161  0.4895449  0.41390407
 0.5552802  0.30500275 0.49158415 0.6310427  0.41034502 0.53102165
 0.63982105 0.47370726 0.669521   0.66184604 0.54445946 0.53598076
 0.46715885 0.4879839  0.64476305 0.527823   0.3510817  0.5653497
 0.56127363 0.4850961  0.35885632 0.49568486 0.5127719  0.2625663
 0.61391014 0.44705415 0.6195937  0.54756033]


numpy.ndarray

In [17]:
example.shape

(1, 784)

# Task 6: Siamese Network

In [18]:
#create a trainable model
in_anc = tf.keras.layers.Input(shape=(784,))
in_pos = tf.keras.layers.Input(shape=(784,))
in_neg = tf.keras.layers.Input(shape=(784,))

em_anc=embedding_model(in_anc)
em_pos=embedding_model(in_pos)
em_neg=embedding_model(in_neg)

out=tf.keras.layers.concatenate([em_anc,em_pos,em_neg], axis=1)

net=tf.keras.models.Model([in_anc,in_pos,in_neg],
                         out)


net.summary()

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 784)]        0                                            
__________________________________________________________________________________________________
input_2 (InputLayer)            [(None, 784)]        0                                            
__________________________________________________________________________________________________
input_3 (InputLayer)            [(None, 784)]        0                                            
__________________________________________________________________________________________________
sequential (Sequential)         (None, 64)           54400       input_1[0][0]                    
                                                                 input_2[0][0]                

# Task 7: Triplet Loss

A loss function that tries to pull the Embeddings of Anchor and Positive Examples closer, and tries to push the Embeddings of Anchor and Negative Examples away from each other.

Root mean square difference between Anchor and Positive examples in a batch of N images is:
$
\begin{equation}
d_p = \sqrt{\frac{\sum_{i=0}^{N-1}(f(a_i) - f(p_i))^2}{N}}
\end{equation}
$

Root mean square difference between Anchor and Negative examples in a batch of N images is:
$
\begin{equation}
d_n = \sqrt{\frac{\sum_{i=0}^{N-1}(f(a_i) - f(n_i))^2}{N}}
\end{equation}
$

For each example, we want:
$
\begin{equation}
d_p \leq d_n
\end{equation}
$

Therefore,
$
\begin{equation}
d_p - d_n \leq 0
\end{equation}
$

This condition is quite easily satisfied during the training.

We will make it non-trivial by adding a margin (alpha):
$
\begin{equation}
d_p - d_n + \alpha \leq 0
\end{equation}
$

Given the condition above, the Triplet Loss L is defined as:
$
\begin{equation}
L = max(d_p - d_n + \alpha, 0)
\end{equation}
$

In [19]:
def triplet_loss(alpha,emb_dim):
    def loss(y_true,y_pred):
        anc,pos,neg = y_pred[:,:emb_dim],y_pred[:,emb_dim:2*emb_dim],y_pred[:,2*emb_dim:]
        dp=tf.reduce_mean(tf.square(anc-pos), axis=1)
        dn=tf.reduce_mean(tf.square(anc-neg), axis=1)
        return tf.maximum(dp-dn+alpha,0.)
    return loss

# Task 8: Data Generator

In [20]:
def data_generator(batch_size=256):
    while True:
        x= create_batch(batch_size)
        y=np.zeros((batch_size,3*emb_dim))  #bogus here, anc and pos, neg have all info
        yield x,y

In [21]:
a,b=next(data_generator())
len(a)

3

# Task 9: Model Training

In [22]:
batch_size=2048
epochs=10
alpha=.2
steps_per_epoch = int(len(x_train)/batch_size)

net.compile(loss=triplet_loss(alpha=alpha,emb_dim=emb_dim), optimizer = 'adam')

x_val,y_val=x_test[:1000],y_test[:1000]



In [23]:
_=net.fit(
    data_generator(batch_size),
    batch_size=batch_size,
    epochs=epochs,
    steps_per_epoch=steps_per_epoch,
    verbose=False,
    callbacks=[PCAPlotter(plt, embedding_model,x_val,y_val)]
)

<IPython.core.display.Javascript object>