# Settings

In [1]:
import os
import random
import numpy as np
import tensorflow as tf
from sklearn.utils import shuffle
import pickle
import math
import numpy.random as rnd

  from ._conv import register_converters as _register_converters


# Load data

In [2]:
path = '../../omniglot_images/'
data_subsets = ["train", "val", "test"]

data = {}
categories = {}
info = {}
        
for name in data_subsets:
    file_path = os.path.join(path, name + ".pickle")
    print("loading data from {}".format(file_path))
    with open(file_path,"rb") as f:
        (X,c) = pickle.load(f)
        data[name] = X
        categories[name] = c

loading data from ../../omniglot_images/train.pickle
loading data from ../../omniglot_images/val.pickle
loading data from ../../omniglot_images/test.pickle


In [3]:
def create_train_data(size, s='train'):
    #get train data and shape
    X=data[s]
    n_classes, n_examples, w, h = X.shape
    
    #initialize 2 empty arrays for the input size in a list
    pairs=[np.zeros((size, h, w,1)) for i in range(2)]
    
    #initialize vector for the targets
    targets=np.zeros((size,1))
    
    for x in range(size):
        #randomly sample one class (character)
        category = rnd.choice(n_classes,1,replace=False)
        #randomly sample one example from class (1-20 characters)
        idx_1 = rnd.randint(0, n_examples)
        pairs[0][x,:,:,:] = X[category, idx_1].reshape(w, h, 1)
        #randomly sample again one example from class and add last class with modulo
        # ..to ensure not same class pairs are created
        idx_2 = (idx_1 + rnd.randint(0, n_examples)) % n_examples
        #pick images of different class for 1st half and same class for 2nd half
        if x >= size // 2:
            category_2 = category
            targets[x] = 1
        else: 
        #add a random number to the category modulo n classes to ensure 2nd image has
        # ..different category
            idx_2 = rnd.randint(0, n_examples) 
            category_2 = (category + rnd.randint(1,n_classes)) % n_classes
            targets[x] = 0
        pairs[1][x,:,:,:] = X[category_2,idx_2].reshape(w, h,1)
        
    return pairs, targets

In [4]:
print('train shape')
print(data['train'].shape)
print()
print('val shape')
print(data['val'].shape)
print()
print('test shape')
print(data['test'].shape)

train shape
(964, 20, 105, 105)

val shape
(321, 20, 105, 105)

test shape
(338, 20, 105, 105)


In [5]:
train_set, train_labels = create_train_data(30000)
val_set, val_labels = create_train_data(10000)

In [6]:
print('shape first pair train')
print(train_set[0].shape)
print()
print('shape second pair train')
print(train_set[1].shape)
print()
print('shape of labels train')
print(train_labels.shape)
print()
print()
print('shape first pair val')
print(val_set[0].shape)
print()
print('shape second pair val')
print(val_set[1].shape)
print()
print('shape of labels val')
print(val_labels.shape)

shape first pair train
(30000, 105, 105, 1)

shape second pair train
(30000, 105, 105, 1)

shape of labels train
(30000, 1)


shape first pair val
(10000, 105, 105, 1)

shape second pair val
(10000, 105, 105, 1)

shape of labels val
(10000, 1)


In [20]:
tf.reset_default_graph()

## Convolutional layer method

In [21]:
def create_conv_layer(input_data, num_input_channels, num_filters, filter_shape, pool_shape, name, pooling):
    conv_filter_shape = [filter_shape[0], filter_shape[1], num_input_channels, num_filters]
    weights = tf.get_variable(name+"_W", initializer=tf.truncated_normal(conv_filter_shape, stddev=0.03))
    bias = tf.get_variable(name+"_b", initializer=tf.truncated_normal([num_filters], stddev=0.01))
    
    out_layer = tf.nn.conv2d(input_data, weights, [1,1,1,1], padding = 'VALID')
    out_layer_bias = tf.add(out_layer, bias)
    
    out_layer_activation = tf.nn.relu(out_layer_bias)
    
    if pooling is True: 
        out_layer_pooling = tf.nn.max_pool(out_layer_activation, ksize = [1,pool_shape[0], pool_shape[1], 1], 
                                           strides = [1,2,2,1], padding = 'VALID')
        return(out_layer_pooling)
    
    return(out_layer_activation)

## Dense layer

In [22]:
def create_dense_layer(input_data, input_shape, neurons, name):
    weights = tf.get_variable(name+'_W', initializer=tf.truncated_normal([input_shape, neurons], stddev=0.03))
    bias = tf.get_variable(name+'b', initializer=tf.truncated_normal([neurons], stddev=0.01))
    fully_connected = tf.add(tf.matmul(input_data, weights), bias)
    out_dense_activation = tf.nn.sigmoid(fully_connected)
    
    return(out_dense_activation)

# Create tensor graph

In [23]:
epochs = 10
batch_size = 1024
learning_rate = 0.00006
beta = 0.1

### Placeholders

Two placeholders, X1 and X2, for the twin input of the siamese network

In [24]:
X1 = tf.placeholder(tf.float32, [None, 105, 105, 1])
X2 = tf.placeholder(tf.float32, [None, 105, 105, 1])

y = tf.placeholder(tf.float32, [None, 1])

In [25]:
def create_network(data_input):
    layer1 = create_conv_layer(data_input, 1, 64, [10,10], [2,2], 'layer1', True)
    layer2 = create_conv_layer(layer1, 64, 128, [7,7], [2,2], 'layer2', True)
    layer3 = create_conv_layer(layer2, 128, 128, [4,4], [2,2], 'layer3', True)
    layer4 = create_conv_layer(layer3, 128, 256, [4,4], [2,2], 'layer4', False)
    flat4 = tf.reshape(layer4, [-1, 6*6*256])
    fc5 = create_dense_layer(flat4, 6*6*256 , 4096, 'fc5')
    
    return(fc5)

In [26]:
with tf.variable_scope("siamese") as scope:
    output1 = create_network(X1)
    scope.reuse_variables()
    output2 = create_network(X2)

l1_dis_layer5 = tf.abs(tf.subtract(output1, output2))

weights = tf.Variable(tf.truncated_normal([4096, 1], stddev=0.03), name='w6')
bias = tf.Variable(tf.truncated_normal([1], stddev=0.01), name='b6')
fully_connected = tf.add(tf.matmul(l1_dis_layer5, weights), bias)
y_estimate = tf.nn.sigmoid(fully_connected)
tf.summary.histogram('predictions', y_estimate)

<tf.Tensor 'predictions:0' shape=() dtype=string>

In [27]:
cross_entropy = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(labels = y, logits = fully_connected))

Insert L2 regularization as mentioned in the paper. Select all weight variables and exclude bias variables.

In [28]:
variables = tf.trainable_variables()
regularizer = tf.add_n([ tf.nn.l2_loss(v) for v in variables if 'b' not in v.name ])
loss = tf.reduce_mean(tf.add(cross_entropy, tf.multiply(regularizer,beta)))
tf.summary.histogram('loss', loss)
optimiser = tf.train.AdamOptimizer(learning_rate = learning_rate).minimize(loss)

### Predictions

Use tf.round which rounds to nearest integer. Hence, treshold is on 0.5

In [29]:
correct_prediction = tf.equal(tf.round(y_estimate), y)
accuracy = tf.reduce_mean(tf.cast(correct_prediction, dtype = tf.float32))

In [30]:
N_way_pred = tf.equal(tf.argmax(y_estimate, axis=0), tf.argmax(y, axis=0))
answer = tf.cast(N_way_pred, dtype = tf.float32)

# Setup training

In [18]:
init_op = tf.global_variables_initializer()
train_acc = []
val_acc = []

with tf.Session() as sess:
    train_writer = tf.summary.FileWriter( './logs/1/train ', sess.graph)
    sess.run(init_op)
    total_batch = int(len(train_labels)/batch_size)
    counter=0
    for epoch in range(epochs):
        avg_cost = 0
        for i in range(total_batch):
            counter+=1
            batch_x1, batch_x2, batch_y = shuffle(train_set[0],train_set[1], train_labels, n_samples = batch_size)
            merge = tf.summary.merge_all()
            summary, a, c= sess.run([merge, optimiser, loss], feed_dict={X1: batch_x1, X2: batch_x2, y: batch_y})
            avg_cost += c/total_batch
            print('batch iteration:', i)
            train_writer.add_summary(summary, counter)
        print("Epoch:", (epoch + 1), "cost =", "{:.3f}".format(avg_cost))
        train_acc.append(sess.run(accuracy, feed_dict={X1: batch_x1, X2: batch_x2, y: batch_y}))
        val = sess.run(accuracy, feed_dict={X1: val_set[0], X2: val_set[1], y: val_labels})
        print()
        print("Test accuracy:", val)
        print()
        val_acc.append(val)

batch iteration: 0
batch iteration: 1
batch iteration: 2
batch iteration: 3
batch iteration: 4
batch iteration: 5
batch iteration: 6
batch iteration: 7
batch iteration: 8
batch iteration: 9
batch iteration: 10


KeyboardInterrupt: 

In [31]:
def generate_oneshot_set(N_way, s='val'):
    N = N_way
    X=data[s]
    n_classes, n_examples, w, h = X.shape
    indices = rnd.randint(0,n_examples,size=(N,))
    #Get mutually exclusive classes (characters) from test set
    categories = rnd.choice(range(n_classes),size=(N,),replace=False)
    #Set true image as first indice from chosen classes
    true_category = categories[0]
    #Generate 2 character places for true image
    ex1, ex2 = rnd.choice(n_examples,replace=False,size=(2,))
    #Pick test image with true image indice class and one character indice.
    # ..multipli array in N_way
    test_image = np.asarray([X[true_category,ex1,:,:]]*N).reshape(N, w, h,1)
    #Select N_way set from test set with class and character indices
    support_set = X[categories,indices,:,:]
    #Set first indice as true image, but with ex2 character
    # ..this is to make sure both true images are not the same character
    support_set[0,:,:] = X[true_category,ex2]
    support_set = support_set.reshape(N, w, h,1)
    targets = np.zeros((N,))
    targets[0] = 1
    #Shuffle 
    targets, test_image, support_set = shuffle(targets, test_image, support_set)
    pairs = [test_image,support_set]

    return pairs, targets

In [33]:
init_op = tf.global_variables_initializer()
k = 320

with tf.Session() as sess:
    train_writer = tf.summary.FileWriter( './logs/1/train ', sess.graph)
    sess.run(init_op)
    total_batch = int(len(train_labels)/batch_size)
    counter=0
    for epoch in range(epochs):
        avg_cost = 0
        for i in range(total_batch):
            counter+=1
            batch_x1, batch_x2, batch_y = shuffle(train_set[0],train_set[1], train_labels, n_samples = batch_size)
            merge = tf.summary.merge_all()
            summary, a, c= sess.run([merge, optimiser, loss], feed_dict={X1: batch_x1, X2: batch_x2, y: batch_y})
            avg_cost += c/total_batch
            print('batch iteration:', i)
            train_writer.add_summary(summary, counter)
        print("Epoch:", (epoch + 1), "cost =", "{:.3f}".format(avg_cost))
        train_acc = sess.run(accuracy, feed_dict={X1: train_set[0], X2: train_set[1], y: train_labels})
        n_correct = 0
        for j in range(k):
            pairs, labels = generate_oneshot_set(20)
            n_correct += sess.run(answer, feed_dict={X1: pairs[0], X2: pairs[1], y: labels})
        val_acc = (100.0*n_correct / k)
        print("train accuracy:", train_acc)
        print("validation accuracy:", val_acc)

batch iteration: 0
batch iteration: 1


KeyboardInterrupt: 