In [1]:
# from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from sklearn.datasets import fetch_olivetti_faces
from numpy.random import RandomState
import time
import tensorflow as tf

import numpy as np
import random, math, re

In [2]:
FLAGS = tf.app.flags.FLAGS
tf.app.flags.DEFINE_integer('batch_size', 40, """Number of images to process in a batch.""")
TOWER_NAME = 'tower'

In [3]:
#read olivetti face dataset
def read_facedata(shuffle=True, batch_size=20):
    numpy_rng = RandomState(123)
    dataset = fetch_olivetti_faces(shuffle=True, random_state=numpy_rng)

    # faces = dataset.data
    images = dataset.images
    label = dataset.target

    np.random.shuffle(images)

    digit_indices = [np.where(label == i)[0] for i in range(40)]

    train_data =[]
    train_label =[]
    # valid_data = []
    # valid_label = []
    test_data = []
    test_label = []
    for d in range(len(digit_indices)):
        # train_data += d[0:6]
        for i in range(10):
            if i <8:
                index = digit_indices[d][i]
                train_data.append(images[index])
                train_label.append(label[index])
            # elif i >= 6 and i <8:
            # valid_data.append(images[digit_indices[d][i]])
            # valid_label.append(label[digit_indices[d][i]])
            else :
                test_data.append(images[digit_indices[d][i]])
                test_label.append(label[digit_indices[d][i]])

    return train_data, train_label, test_data, test_label

#create pairs for siamese network input
def create_pairs(x, digit_indices):
    pairs = []
    labels = []
    n = min([len(digit_indices[d]) for d in range(40)]) - 1
    for d in range(40):
        for i in range(n):
            z1, z2 = digit_indices[d][i], digit_indices[d][i+1]
            pairs += [[x[z1], x[z2]]]
            inc = random.randrange(1, 40)
            dn = (d + inc) % 40
            z1, z2 = digit_indices[d][i], digit_indices[dn][i]
            pairs += [[x[z1], x[z2]]]
            labels += [1, 0]
    return np.array(pairs), np.array(labels)

#get next batch in pair
def next_batch(start,end,inputs,labels):
    input1 = inputs[start:end,0]
    input2 = inputs[start:end,1]
    y= np.reshape(labels[start:end],(len(range(start,end)),1))
    return input1,input2,y

In [4]:
# referred from Tensorflow tutorial https://github.com/tensorflow/models/tree/master/tutorials/image/cifar10
def _activation_summary(x):
    tensor_name = re.sub('%s_[0-9]*/' % TOWER_NAME, '', x.op.name)
    tf.summary.histogram(tensor_name + '/activations', x)
    tf.summary.scalar(tensor_name + '/sparsity', tf.nn.zero_fraction(x))
    
def _variable(name, shape, initializer):
    dtype = tf.float32
    var = tf.get_variable(name, shape, initializer=initializer, dtype=dtype)
    return var

def _variable_with_weight_decay(name, shape, stddev, wd):
    dtype = tf.float32
    var = tf.get_variable(name=name,
      shape=shape,
      initializer=tf.random_normal_initializer(stddev=stddev, dtype=dtype))
    if wd is not None:
        weight_decay = tf.multiply(tf.nn.l2_loss(var), wd, name='weight_loss')
        tf.add_to_collection('losses', weight_decay)
    return var

def conv_relu(input, kernel_shape, bias_shape):
    # Create variable named "weights".
    weights = tf.get_variable("weights", kernel_shape,
        initializer=tf.random_normal_initializer(stddev=0.01))
    # Create variable named "biases".
    biases = tf.get_variable("biases", bias_shape,
        initializer=tf.constant_initializer(0.0))
    conv = tf.nn.conv2d(input, weights,
        strides=[1, 1, 1, 1], padding='SAME')
    return tf.nn.relu(tf.nn.bias_add(conv, biases))

def inference(images):
    images = tf.reshape(images, shape=[-1, 64, 64, 1])
    with tf.variable_scope('conv1') as scope:
#         kernel = _variable_with_weight_decay('weights',
#                                          shape=[5, 5, 1, 64],
#                                          stddev=.01,
#                                          wd=0.0)
#         conv = tf.nn.conv2d(images, kernel, [1, 1, 1, 1], padding='SAME')
#         biases = tf.get_variable(name='biases', shape=[64], initializer=tf.constant_initializer(0.1))
#         pre_activation = tf.nn.bias_add(conv, biases)
#         conv1 = tf.nn.relu(pre_activation, name=scope.name)
        conv1 = conv_relu(images, [5, 5, 1, 64], [64])
        _activation_summary(conv1)
    pool1 = tf.nn.max_pool(conv1, ksize=[1, 3, 3, 1], strides=[1, 2, 2, 1],
                         padding='SAME', name='pool1')

    with tf.variable_scope('conv2') as scope:
#         kernel = _variable_with_weight_decay('weights',
#                                              shape=[3, 3, 64, 32],
#                                              stddev=1e-2,
#                                              wd=0.0)
#         conv = tf.nn.conv2d(pool1, kernel, [1, 1, 1, 1], padding='SAME')
#         biases = tf.get_variable(name='biases', shape=[32], initializer=tf.constant_initializer(0.1))
#         pre_activation = tf.nn.bias_add(conv, biases)
#         conv2 = tf.nn.relu(pre_activation, name=scope.name)
        conv2 = conv_relu(pool1, [3, 3, 64, 32], [32])
        _activation_summary(conv2)
    pool2 = tf.nn.max_pool(conv2, ksize=[1, 3, 3, 1],
                         strides=[1, 2, 2, 1], padding='SAME', name='pool2')

    with tf.variable_scope('local2') as scope:
        reshape = tf.reshape(pool2, [FLAGS.batch_size, -1])
        dim = reshape.get_shape()[1].value
        weights = _variable_with_weight_decay('weights', shape=[dim, 384],
                                          stddev=0.01, wd=0.004)
        biases = tf.get_variable(name='biases', shape=[384], initializer=tf.constant_initializer(0.1))
        local2 = tf.add(tf.matmul(reshape, weights), biases, name=scope.name)
        _activation_summary(local2)
    return local2

# getting probability as defined by the paper
def sigmoid_prob(feature1, feature2):
    with tf.variable_scope('softmax_linear') as scope:
        weights = _variable_with_weight_decay('weights', [384, 1], stddev=1/384.0, wd=0.0)
        feature = tf.abs(feature1 - feature2)
        predictionProb = tf.sigmoid(tf.matmul(feature, weights), name=None)
        # print(predictionProb.get_shape())
        # _activation_summary(softmax_linear)
    return predictionProb

In [5]:
#referenced from site https://github.com/jeromeyoon/Tensorflow-siamese/blob/master/main.py
#train and test data
X_train, y_train, X_test, y_test = read_facedata(batch_size=FLAGS.batch_size)

# create training + test positive and negative pairs
digit_indices = []
for j in range(40):
    digit_indices.append([i for i, x in enumerate(y_train) if x == j])
tr_pairs, tr_y = create_pairs(X_train, digit_indices)

digit_indices = []
for j in range(40):
    digit_indices.append([i for i, x in enumerate(y_test) if x == j])
te_pairs, te_y = create_pairs(X_test, digit_indices)

#placeholders for twin network input
images_L = tf.placeholder(tf.float32,shape=([FLAGS.batch_size, 64,64]),name='L')
images_R = tf.placeholder(tf.float32,shape=([FLAGS.batch_size, 64,64]),name='R')
labels = tf.placeholder(tf.float32,shape=([FLAGS.batch_size, 1]),name='gt')

with tf.variable_scope("siamese") as scope:
    model1= inference(images_L)
    scope.reuse_variables()
    model2 = inference(images_R)

#similiarity score
predictionProb  = sigmoid_prob(model1,model2)

# Define loss and optimizer
cross_entropy_mean = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=predictionProb, labels=labels, name='cross_entropy_per_example'), name='cross_entropy') 
tf.add_to_collection('losses', cross_entropy_mean)
loss = tf.add_n(tf.get_collection('losses'), name='total_loss')

starter_learning_rate = 0.1
global_step = tf.Variable(0, name='global_step', trainable=False)
decay_steps = int(100)
lr = tf.train.exponential_decay(starter_learning_rate,global_step, decay_steps, 0.1,staircase=True)
tf.summary.scalar('learning_rate', lr)
momentum = 0.5
optimizer = tf.train.MomentumOptimizer(learning_rate=lr, momentum=momentum, use_nesterov=False).minimize(loss)

In [6]:
#Evaluation metric accuracy
def compute_accuracy(prediction,labels):
    return labels[prediction.ravel() < 0.5].mean()

# Launch the graph
with tf.Session() as sess:
    tf.global_variables_initializer().run()

    # Training cycle
    for epoch in range(100):
        avg_loss = 0.
        avg_accuracy = 0.
        total_batch = int(len(X_train)/FLAGS.batch_size)
        start_time = time.time()
        # Loop over all batches
        for i in range(total_batch):
            s  = i * FLAGS.batch_size
            e = (i+1) * FLAGS.batch_size
            # Fit training using batch data
            input1, input2, y = next_batch(s, e, tr_pairs, tr_y)
            # print(y.get_shape())
            _, loss_value, predict=sess.run([optimizer,loss,predictionProb], feed_dict={images_L:input1,images_R:input2 ,labels:y})
            feature1=model1.eval(feed_dict={images_L:input1})
            feature2=model2.eval(feed_dict={images_R:input2})
            tr_acc = compute_accuracy(predict, y)
#             print(tr_acc)
            avg_loss += loss_value
            avg_accuracy +=tr_acc*100
        #print('epoch %d loss %0.2f' %(epoch,avg_loss/total_batch))
        duration = time.time() - start_time
        print('epoch %d  time: %f loss %0.5f acc %0.3f' %(epoch,duration,avg_loss/(total_batch), avg_accuracy/total_batch))
    total_batch = int(len(X_train)/FLAGS.batch_size)
    te_acc = 0.
    for i in range(total_batch):
        s  = i * FLAGS.batch_size
        e = (i+1) * FLAGS.batch_size
        input1, input2, y = next_batch(s, e, tr_pairs, tr_y)
        predict=predictionProb.eval(feed_dict={images_L:input1,images_R:input2,labels:y})
        tr_acc += compute_accuracy(predict,y)
    print('Accuract training set %0.3f' % (100 * tr_acc/total_batch))

    # Test model
    total_batch = int(len(X_test)/FLAGS.batch_size)
    te_acc = 0.
    for i in range(total_batch):
        s  = i * FLAGS.batch_size
        e = (i+1) * FLAGS.batch_size
        # Fit training using batch data
        input1, input2, y = next_batch(s, e, te_pairs, te_y)
        predict=predictionProb.eval(feed_dict={images_L:input1,images_R:input2,labels:y})
        te_acc += compute_accuracy(predict,y)
    print('Accuract test set %0.3f' % (100 * te_acc/total_batch))

epoch 0  time: 11.333193 loss 1.24735 acc 47.524
epoch 1  time: 11.449757 loss 1.21673 acc 49.640
epoch 2  time: 10.031758 loss 1.18591 acc 50.000
epoch 3  time: 9.821339 loss 1.15587 acc 50.000
Accuract training set 56.250
Accuract test set 50.000
