# Siamese Networks

<img src="static/siamese.png" width="500">

## Contrastive Loss

Consider a set $\mathcal{X}$ composed of high dimensional vectors $\vec{X_i}$. For each $\vec{X_i}$, there exists is a set of vectors $\mathcal{S}_{\vec{X_i}}$ that are considered to be similar. This distance between vectors can be computed using some prior knowledge. The result of this is a set of parameters(W) that help map the vectors to a manifold.

This mapping from high to low dimensional space maps similar vectors to nearby points and dissimilar points to distant points in the output manifold. We define the contrastive function whose minimization will result in the desired optimal mapping:

Let $\vec{X_1}$ and $\vec{X_2} \in \mathcal{X}$ be a pair of vectors that need to be compared. We also have $Y$, a binary label associated with this pair. If the pair is similar, $Y = 0$, else $Y = 1$. Suppose we have learnt a mapping $G_W$ from the high dimensional space of vectors from $\mathcal{X}$ to some lower dimensional space. This space is such that, the euclidean distance $D_W$ between points gives a measure of similarity of the higher dimensional vectors.

Therefore, we learn the following distance function:
$$
D_W (\vec{X_1}, \vec{X_2}) = \vert\vert G_W(\vec{X_1}) - G_W(\vec{X_2}) \vert\vert_2
$$


To optimize the distance function, we minimize the following loss function:
$$
\mathcal{L}(W) = \sum_{i=1}^{P} L(W, (Y, \vec{X_1}, \vec{X_2})^i)
\\
L(W, (Y, \vec{X_1}, \vec{X_2})^i) = (1 - Y) L_S(D_W^i) + Y L_D(D_W^i)
$$


Here, $W$ by itself and as a subscript in $G_W$ and $D_W$ refers to learned parameters for the mapping of vectors in $\mathcal{X}$. $P$ is the number of vector pairs available and $(Y, \vec{X_1}, \vec{X_2})^i$ is one of these pairs with the label. $L$ is broken into partial functions: $L_S$ for similar points and $L_D$ for dissimilar points.

$L_S$ and $L_D$ are to be designed such that minimizing $L$ would result in lower values of $D_W$ for similar pairs and higher values of $D_W$ for dissimilar pairs. $L_S$ and $L_D$ are defined as follows:
$$
L_S = \frac{1}{2}(D_W)^2
\\
L_D = \frac{1}{2}(max\{0, m - D_W\})^2
$$


Therefore, L can be rewritten as:
$$
L(W, (Y, \vec{X_1}, \vec{X_2})) = (1 - Y)\frac{1}{2}(D_W)^2 + Y\frac{1}{2}max\{0, m - D_W\}^2
$$

In [1]:
import tensorflow as tf
import numpy as np
import os
from tensorflow.examples.tutorials.mnist import input_data

In [2]:
tf.reset_default_graph()

In [3]:
class Siamese:
    def __init__(self, n_input, margin, learning_rate, model_dir, model_name):
        self.X1 = tf.placeholder(tf.float32, [None, n_input], name="X1")
        self.X2 = tf.placeholder(tf.float32, [None, n_input], name="X2")
        self.y_ = tf.placeholder(tf.float32, [None,], name="y_")
        
        self.margin = tf.constant(margin, name="margin")
        self.learning_rate = learning_rate
        
        self.out1, self.out2 = self.create_network()
        self.loss = self.contrastive_loss()
        self.optimizer = self.optimizer_init()
        
        self.saver = tf.train.Saver()
        self.sess = tf.Session()
        self.sess.run(tf.global_variables_initializer())
        
        
        self.model_dir = model_dir
        self.model_name = model_name
    
    def dense_layer(self, input, dim, name):
        n_features = input.get_shape()[1]
        
        W = tf.get_variable(
            name='{}_W'.format(name),
            dtype=tf.float32,
            shape=[n_features, dim])
        
        b = tf.get_variable(
            name='{}_b'.format(name),
            dtype=tf.float32,
            shape=[dim],
            initializer=tf.zeros_initializer())
        
        dense = tf.nn.bias_add(tf.matmul(input, W), b)
        return dense
    
    def network(self, x):
        dense1 = self.dense_layer(x, 1024, "Dense_1")
        a1 = tf.nn.relu(dense1)
        
        dense2 = self.dense_layer(a1, 1024, "Dense_2")
        a2 = tf.nn.relu(dense2)
        
        dense3 = self.dense_layer(a2, 2, "Dense_3")
        return dense3
    
    def create_network(self):
        with tf.variable_scope("siamese") as scope:
            out1 = self.network(self.X1)
            scope.reuse_variables()
            out2 = self.network(self.X2)
        return out1, out2
    
    def euclidian_dist(self, p1, p2):
        sum_squared_diff = tf.reduce_sum(tf.pow(tf.subtract(p1, p2), 2), axis=1, name="euclid_dist_sq")
        distance = tf.sqrt(sum_squared_diff + 1e-6, name="euclid_dist")
        
        return distance, sum_squared_diff
    
    def contrastive_loss(self):
        # yi*distance + (1-yi)*max(0, m-distance)
        with tf.variable_scope("loss") as scope:
            dist, dist_sq = self.euclidian_dist(self.out1, self.out2)
            
            loss_similar = 0.5 * dist_sq
            pos = tf.multiply(self.y_, loss_similar, name="loss_pos")
        
            y_dissimilar = tf.subtract(1.0, self.y_)
            loss_dissimilar = 0.5 * tf.pow(tf.maximum(tf.subtract(self.margin, dist), 0), 2)
            neg = tf.multiply(y_dissimilar, loss_dissimilar, name="loss_neg")
        
            loss = tf.reduce_mean(tf.add(pos, neg), name="loss")
        return loss
    
    def optimizer_init(self):
        optimizer = tf.train.GradientDescentOptimizer(self.learning_rate).minimize(self.loss)
        return optimizer
    
    def train(self, input1, input2, label):
        _, loss = self.sess.run([self.optimizer, self.loss], feed_dict={self.X1: input1,
                                                                        self.X2: input2,
                                                                        self.y_: label})
        return loss
    
    def test_model(self, input):
        output = self.sess.run(self.out1, feed_dict = {self.X1: input})
        return output

    def load_model(self, model_name=None):
        if model_name is None:
            model_name = self.model_name
        self.saver.restore(self.sess, os.path.join(self.model_dir, model_name))

    def save_model(self, model_name=None):
        if not os.path.exists(self.model_dir):
            os.makedirs(self.model_dir)
        if model_name is None:
            model_name = self.model_name
        self.saver.save(self.sess, os.path.join(self.model_dir, model_name))


In [4]:
n_input = 28 * 28
margin = 5.0
learning_rate = 0.01
batch_size = 128
n_epochs = 10
model_dir = "tf_logs/siamese-0.1"
model_name = "siamese-0.1"

In [5]:
(X_train, y_train), (X_test, y_test) = tf.keras.datasets.mnist.load_data()

X_train = X_train.astype(np.float32).reshape((-1, n_input)) / 255.
X_test = X_test.astype(np.float32).reshape((-1, n_input)) / 255.

In [6]:
siamese = Siamese(n_input=n_input, margin=margin, learning_rate=learning_rate,
                  model_dir=model_dir, model_name=model_name)

In [7]:
def get_batch(X, y, batch_size):
    rnd_idx = np.random.permutation(len(X))
    n_batches = len(X) // batch_size

    idxs = [batch_idx for batch_idx in np.array_split(rnd_idx, n_batches)]
    len(idxs)
    for i,j in zip(idxs[0::2], idxs[1::2]):
        X1_batch = X[i]
        X2_batch = X[j]
        y_batch = (y[i] == y[j]).astype(np.float32)

        yield X1_batch, X2_batch, y_batch

In [8]:
step = 0
for epoch in range(n_epochs):
    for batch_X1, batch_X2, batch_y in get_batch(X_train, y_train, batch_size):
        step += 1
        
        loss = siamese.train(input1=batch_X1, input2=batch_X2, label=batch_y)
        if step % 10 == 0:
            print('step {}, loss: {}'.format(step, loss))
    
    print('end of epoch {}, loss: {}'.format(epoch, loss))
    if epoch % 5 == 0:
        siamese.save_model()

step 10, loss: 6.698212623596191
step 20, loss: 3.2929441928863525
step 30, loss: 3.1118156909942627
step 40, loss: 2.5552587509155273
step 50, loss: 2.4288651943206787
step 60, loss: 1.8632421493530273
step 70, loss: 1.7675292491912842
step 80, loss: 2.0606393814086914
step 90, loss: 2.7103660106658936
step 100, loss: 1.5873215198516846
step 110, loss: 1.111082673072815
step 120, loss: 1.3335705995559692
step 130, loss: 2.2416059970855713
step 140, loss: 1.32935631275177
step 150, loss: 1.529503583908081
step 160, loss: 2.1370935440063477
step 170, loss: 1.1548789739608765
step 180, loss: 1.7969799041748047
step 190, loss: 1.6378898620605469
step 200, loss: 1.8471750020980835
step 210, loss: 1.428123950958252
step 220, loss: 1.8339557647705078
step 230, loss: 1.6519286632537842
end of epoch 0, loss: 1.7872884273529053
step 240, loss: 1.497567057609558
step 250, loss: 1.4996349811553955
step 260, loss: 1.3743512630462646
step 270, loss: 0.7755866050720215
step 280, loss: 1.741788387298

step 2230, loss: 1.1453388929367065
step 2240, loss: 0.9828543066978455
step 2250, loss: 0.6444646716117859
step 2260, loss: 0.9774099588394165
step 2270, loss: 1.1849122047424316
step 2280, loss: 0.7715990543365479
step 2290, loss: 0.9089393615722656
step 2300, loss: 0.6883367896080017
step 2310, loss: 0.9107192158699036
step 2320, loss: 0.986473798751831
step 2330, loss: 1.1138514280319214
step 2340, loss: 0.7945080995559692
end of epoch 9, loss: 0.7945080995559692


In [9]:
siamese.test_model(input=X_test).tofile('static/embed.txt')

In [10]:
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt

In [11]:
def visualize(embed, labels):

    labelset = set(labels.tolist())

    fig = plt.figure(figsize=(8,8))
    ax = fig.add_subplot(111)

    for label in labelset:
        indices = np.where(labels == label)
        ax.scatter(embed[indices,0], embed[indices,1], label = label, s = 20)
    ax.legend()
    fig.savefig('embed.jpeg', format='jpeg', dpi=600, bbox_inches='tight')
    plt.close()

In [12]:
embed = np.fromfile('static/embed.txt', dtype = np.float32)
embed = embed.reshape([-1, 2])

visualize(embed, y_test)

<img src="static/embed.jpeg" width="500">