# Triplet implementation with MNIST example

### The paper is described in https://arxiv.org/abs/1412.6622

<img src='assets/triplet.png'>

In [1]:
from tensorflow.examples.tutorials.mnist import input_data # MNIST data
import tensorflow as tf
import numpy as np
import os
import sys
print('Python version:',sys.version)
print('TF version:',tf.__version__)

  from ._conv import register_converters as _register_converters


Python version: 3.5.2 (default, Nov 23 2017, 16:37:01) 
[GCC 5.4.0 20160609]
TF version: 1.8.0


In [2]:
# get data
LOGDIR = 'logs/'
# Download the MNIST dataset
(X_train, y_train), (X_test, y_test) = tf.keras.datasets.mnist.load_data('/home/vijay/datasets/MNIST_data/mnist.npz')


In [9]:
# a CNN model

# Hyper parameters
DROP_OUT = 0.5
LEARNING_RATE = 3e-4

def network(x, is_train):
    
    # convolutional layer
    net = tf.layers.conv2d(x, 96, kernel_size=(3,3), strides=(1,1), name='conv1', activation=tf.nn.relu)
    
    # maxpool layer
    net = tf.layers.max_pooling2d(net, pool_size=(2, 2), strides=(2, 2), name='pool')
    
    # add regularization
    net = tf.layers.dropout(net, rate=DROP_OUT, training=is_train)
    
    # flatten the layers    
    net = tf.layers.flatten(net)
    
    # fully connected layer
    net = tf.layers.dense(net, 1024, activation=tf.nn.relu, name='fc1')
    
    # add regularization
    net = tf.layers.dropout(net, rate=DROP_OUT, training=is_train)
    
    # fully connected layer
    net = tf.layers.dense(net, 256, activation=tf.nn.relu, name='fc2')   
    
    # Normalize output
    net = tf.nn.l2_normalize(net, axis=1, epsilon=1e-10, name='l2-norm')
    return net

In [10]:

# placeholder for inputs
im_1 = tf.placeholder(tf.float32, [None, 28, 28, 1], name='img1')
im_2 = tf.placeholder(tf.float32, [None, 28, 28, 1], name='img2')
labels = tf.placeholder(tf.float32, [None, 1], name='labels')
is_train = tf.placeholder(tf.bool)
margin = 1.0

#model with shared weights
with tf.variable_scope('Triplet') as scope:
    model1_embed = network(im_1, is_train)
    scope.reuse_variables()
    model2_embed = network(im_2, is_train)
    
d2 = tf.reduce_sum(tf.square(tf.subtract(model1_embed, model2_embed)), 1,  keepdims=True)
distance = tf.sqrt(d2)
C = tf.constant(margin, name='C')
neg = tf.multiply(tf.subtract(1.0, labels) , tf.square(tf.maximum(0., tf.subtract(C, distance))))
pos = tf.multiply(labels, d2)
losses = tf.add(pos, neg)
with tf.name_scope('loss'):
    loss = tf.reduce_mean(losses)
with tf.name_scope('train'):
    optimizer = tf.train.AdamOptimizer(LEARNING_RATE).minimize(loss)

summ = tf.summary.merge_all()



In [11]:
#Training
import time
import random
def compute_accuracy(prediction,labels):
    
    return np.mean(labels.ravel() == 1*(predict.ravel() < 0.5))

def create_pairs(x, class_idx):
    pairs = []
    labels = []
    n_class = len(class_idx)
    n = min([len(class_idx[d]) for d in range(n_class)]) - 1
    for d in range(n_class):
        for i in range(n):
            idx1, idx2 = class_idx[d][i], class_idx[d][i+1]
            pairs += [[x[idx1], x[idx2]]]
            rd = random.randrange(1, n_class)
            dn = (d + rd) % n_class
            idx1, idx2 = class_idx[d][i], class_idx[dn][i]
            pairs += [[x[idx1], x[idx2]]]
            labels += [1, 0]
    return np.array(pairs), np.array(labels)
    
def next_batch(s,e,inputs,labels):
    input1 = inputs[s:e,0]
    input2 = inputs[s:e,1]
    y= np.reshape(labels[s:e],(len(range(s,e)),1))
    return input1,input2,y
    
batch_size = 512
n_epoch = 30
saver = tf.train.Saver()

sess = tf.Session()
sess.run(tf.global_variables_initializer())

#write the summaries
writer = tf.summary.FileWriter(LOGDIR + 'triplet')
writer.add_graph(sess.graph) 


#create training/test positive-negative pairs
n_classes = 10
class_idx = [np.where(y_train == i)[0] for i in range(n_classes)]
train_pairs, train_labels = create_pairs(X_train, class_idx)

class_idx = [np.where(y_test == i)[0] for i in range(n_classes)]
test_pairs, test_labels = create_pairs(X_test, class_idx)

for epoch in range(n_epoch):
    avg_loss = 0.
    avg_acc = 0.
    start_time = time.time()
    train_batch = int(len(y_train)/batch_size)
    test_batch = int(len(y_test)/batch_size)
    
    for step in range(train_batch):

        batch_x1, batch_x2, batch_y =next_batch(step*batch_size, (step+1)*batch_size, train_pairs, train_labels)
        _, loss_b = sess.run([optimizer, loss], feed_dict={
                                            im_1: batch_x1[...,None],
                                            im_2: batch_x2[...,None],
                                            labels: batch_y,
                                            is_train: True})
        avg_loss += loss_b 
        if ((step % 100) == 0) & (step > 0):
            print ('Train step: epoch [%d/%d]: loss: %.3f' % (epoch, n_epoch, avg_loss/(step+1)))
    # Validation
    if ((epoch+1) % 10) == 0:
        acc_avg = 0.
        loss_avg = 0.
        for step in range(test_batch):
            y = np.reshape(test_labels,(test_labels.shape[0],1))
            batch_x1, batch_x2, batch_y =next_batch(step*batch_size, (step+1)*batch_size, test_pairs, test_labels)
            predict, loss_b, summary = sess.run([distance, loss, summ], feed_dict={
                                                    im_1: batch_x1[...,None],
                                                    im_2: batch_x2[...,None],
                                                    labels: batch_y,
                                                    is_train: False})
            writer.add_summary(summary, epoch*test_batch+step)
            acc = compute_accuracy(predict, batch_y)
            acc_avg += acc
            loss_avg += loss_b
        acc_avg /= test_batch
        loss_avg /= test_batch
        duration = time.time() - start_time
        print ('Validation: epoch [%d/%d]: loss: %.3f accuracy: %.3f, duration: %.3f' % (epoch,n_epoch, loss_avg, acc_avg, duration))
print('Training completed ...')
saver.save(sess, os.path.join(LOGDIR, 'triplet.ckpt'))
embed = sess.run(model1_embed, feed_dict= {im_1: X_test[...,None], is_train: False})
sess.close()

Train step: epoch [0/30]: loss: 0.192
Train step: epoch [1/30]: loss: 0.119
Train step: epoch [2/30]: loss: 0.074
Train step: epoch [3/30]: loss: 0.052
Train step: epoch [4/30]: loss: 0.041
Train step: epoch [5/30]: loss: 0.033
Train step: epoch [6/30]: loss: 0.027
Train step: epoch [7/30]: loss: 0.023
Train step: epoch [8/30]: loss: 0.019
Train step: epoch [9/30]: loss: 0.015


TypeError: Fetch argument None has invalid type <class 'NoneType'>

In [None]:
batch_x1.shape

## Plot the embeddings using t-sne
http://stackoverflow.com/questions/38189119/simple-way-to-visualize-a-tensorflow-graph-in-jupyter

In [None]:
from sklearn.manifold import TSNE
display_only = 2000
tsne = TSNE(n_components=2)
reduced_dim = tsne.fit_transform(embed[:display_only])

In [None]:
import matplotlib.pyplot as plt
def plot_scatter(values, cls):
    # Create a color-map with a different color for each class.
    import matplotlib.cm as cm
    cmap = cm.rainbow(np.linspace(0.0, 1.0, n_classes))

    # Get the color for each sample.
    colors = cmap[cls]

    # Extract the x- and y-values.
    x = values[:, 0]
    y = values[:, 1]

    # Plot it.
    plt.scatter(x, y, color=colors)
    plt.show()

In [None]:
plot_scatter(reduced_dim, y_test[:display_only])

In [None]:
from tensorflow.contrib.tensorboard.plugins import projector

def generate_embeddings(embed):
    sess= tf.InteractiveSession()
    writer = tf.summary.FileWriter(LOGDIR + '/projector', sess.graph)
    embedding = tf.Variable(embed[:1024], trainable=False, name='embedding')
    
    saver = tf.train.Saver()
    
    
    sess.run(tf.global_variables_initializer())
       
    config = projector.ProjectorConfig()
    embedding_config = config.embeddings.add()
    embedding_config.tensor_name = embedding.name
    
    with open(LOGDIR +'projector/metadata.tsv', 'w') as f:
        for label in mnist.test.labels[:1024]:
            f.write('{}\n'.format(label))
    embedding_config.metadata_path = LOGDIR +'projector/metadata.tsv'
    embedding_config.sprite.image_path = 'mnist_10k_sprite.png'
    
    embedding_config.sprite.single_image_dim.extend([28, 28])
    projector.visualize_embeddings(writer, config)
    
    saver.save(sess, os.path.join(LOGDIR, "model.ckpt"), 1)
    show_graph(tf.get_default_graph().as_graph_def())
    sess.close()
    

In [None]:
_tra