## Loading Data

In [1]:
import numpy as np
import keras
import os
from scipy.misc import imread

path = ''
train_path = os.path.join(path, 'images_background')
valid_path = os.path.join(path, 'images_evaluation')

def omniglot_loader(path):
    
    img_array = []
    
    for alphabet in os.listdir(path):
        print("loading alphabet: " + alphabet)
        alphabet_path = os.path.join(path, alphabet)
        
        for letter in os.listdir(alphabet_path):
            alphabet_images = []
            letter_path = os.path.join(alphabet_path, letter)
            
            if not os.path.isdir(letter_path):
                continue
            
            for image in os.listdir(letter_path):
                image_path = os.path.join(letter_path, image)
                image = imread(image_path)
                
                image = image / 255
                image = 1 - image
                
                alphabet_images.append(image)
                
            try:
                img_array.append(np.stack(alphabet_images))
                
            except ValueError as e:
                print(e)
                print("error - alphabet_images", alphabet_images)
    
    img_array = np.stack(img_array)
    return img_array

xTrain = omniglot_loader(train_path)
print(xTrain.shape)

xValid = omniglot_loader(valid_path)
print(xValid.shape)

Using TensorFlow backend.


loading alphabet: Alphabet_of_the_Magi
loading alphabet: Anglo-Saxon_Futhorc


`imread` is deprecated in SciPy 1.0.0, and will be removed in 1.2.0.
Use ``imageio.imread`` instead.


loading alphabet: Arcadian
loading alphabet: Armenian
loading alphabet: Asomtavruli_(Georgian)
loading alphabet: Balinese
loading alphabet: Bengali
loading alphabet: Blackfoot_(Canadian_Aboriginal_Syllabics)
loading alphabet: Braille
loading alphabet: Burmese_(Myanmar)
loading alphabet: Cyrillic
loading alphabet: Early_Aramaic
loading alphabet: Futurama
loading alphabet: Grantha
loading alphabet: Greek
loading alphabet: Gujarati
loading alphabet: Hebrew
loading alphabet: Inuktitut_(Canadian_Aboriginal_Syllabics)
loading alphabet: Japanese_(hiragana)
loading alphabet: Japanese_(katakana)
loading alphabet: Korean
loading alphabet: Latin
loading alphabet: Malay_(Jawi_-_Arabic)
loading alphabet: Mkhedruli_(Georgian)
loading alphabet: N_Ko
loading alphabet: Ojibwe_(Canadian_Aboriginal_Syllabics)
loading alphabet: Sanskrit
loading alphabet: Syriac_(Estrangelo)
loading alphabet: Tagalog
loading alphabet: Tifinagh
(964, 20, 105, 105)
loading alphabet: Angelic
loading alphabet: Atemayar_Qelisay

## Batch Creation

The following cells create the batches for the training and validation sets. create_val_batch accepts arguments for how many different images the validation image will be tested against. The higher the value of N, the more difficult the few-shot task will be.

In [2]:
def create_batch(dataset, batch_size):
    
    classes, examples, w, h = dataset.shape
    
    pairs = [np.zeros((batch_size, h, w, 1)) for i in range(2)]
    
    targets = np.zeros((batch_size,))
    targets[batch_size//2:] = 1
    
    categories = np.random.choice(classes, size=(batch_size,), replace=False)
    
    for i in range(batch_size):
        category = categories[i]
        
        idx1 = np.random.randint(0, examples)
        idx2 = np.random.randint(0, examples)
        
        if targets[i] == 0:
            category_2 = category
        else:
            category_2 = (category + np.random.randint(1, classes)) % classes
            
        pairs[0][i,:,:,:] = dataset[category, idx1].reshape(w, h, 1)
        pairs[1][i,:,:,:] = dataset[category_2, idx2].reshape(w, h, 1)
    
    return pairs, targets

In [3]:
def create_val_batch(dataset, N):
    
    val_class, val_example, w, h = dataset.shape
    
    categories = np.random.choice(val_class, size=(N,), replace=False)
    true_category = categories[0]
    
    indices = np.random.randint(0, val_example, size=(N,))
    example1, example2 = np.random.choice(val_example, replace=False, size=(2,))
    
    valid_image = np.asarray([dataset[true_category, example1,:,:]]*N).reshape(N, w, h, 1)
    
    support_set = dataset[categories, indices,:,:]
    
    targets = np.zeros((N,))
    targets[0] = 1
    support_set[0,:,:] = dataset[true_category, example2]
    support_set = support_set.reshape(N, w, h, 1)
    
    pairs = [valid_image, support_set]
    
    return pairs, targets

The following cell is not necessary but useful to ensure proper memory usage if you are using a GPU.

In [4]:
import tensorflow as tf

config = tf.ConfigProto(allow_soft_placement=True)
config.gpu_options.allocator_type = 'BFC'
config.gpu_options.per_process_gpu_memory_fraction = 0.90

## Base Model Architecture

The code below outlines the convolutional neural network architecture. There is a shared base network in which images are inputted and ultimately output a vector in which images are classified as the same or different. 

In [5]:
from keras.layers import Conv2D, MaxPooling2D, Flatten, Dropout, Dense, Input, Lambda
from keras.models import Model
from keras.optimizers import Adam
from keras import backend as K
from keras.regularizers import l2

def siamese_net():
    
    input_image = Input(shape=(105,105,1))
    
    siamese = Conv2D(64, kernel_size= (10,10), activation='relu', kernel_regularizer=l2(2e-4))(input_image)
    siamese = MaxPooling2D()(siamese)
    
    siamese = Conv2D(128, kernel_size= (7,7), activation='relu', kernel_regularizer=l2(2e-4))(siamese)
    siamese = MaxPooling2D()(siamese)
    
    siamese = Conv2D(128, kernel_size= (4,4), activation='relu', kernel_regularizer=l2(2e-4))(siamese)
    siamese = MaxPooling2D()(siamese)
    
    siamese = Conv2D(256, kernel_size= (4,4), activation='relu', kernel_regularizer=l2(2e-4))(siamese)
    
    siamese = Flatten()(siamese)
    output = Dense(4096, activation='sigmoid', kernel_regularizer=l2(2e-4))(siamese)
    
    siamese_model = Model(input_image, output)
    
    input_1 = Input(shape=(105,105,1))
    input_2 = Input(shape=(105,105,1))
    
    output_1 = siamese_model(input_1)
    output_2 = siamese_model(input_2)
    
    l1_distance_layer = Lambda(lambda tensors: K.abs(tensors[0] - tensors[1]))
    l1_distance = l1_distance_layer([output_1, output_2])
    
    prediction = Dense(1, activation='sigmoid')(l1_distance)
    
    siameseModel = Model(inputs=[input_1, input_2], outputs=prediction)
    
    optimizer = Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-08, decay=0.0)
    
    siameseModel.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['accuracy'])
    
    return siameseModel

In [6]:
model = siamese_net()
model.summary()
model.count_params()

__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_2 (InputLayer)            (None, 105, 105, 1)  0                                            
__________________________________________________________________________________________________
input_3 (InputLayer)            (None, 105, 105, 1)  0                                            
__________________________________________________________________________________________________
model_1 (Model)                 (None, 4096)         38947648    input_2[0][0]                    
                                                                 input_3[0][0]                    
__________________________________________________________________________________________________
lambda_1 (Lambda)               (None, 4096)         0           model_1[1][0]                    
          

38951745

In [7]:
batch_size = 64
TRAIN_BATCH = 12000
N_WAY = 20
TEST_COUNT = 200
validate_batch = 100
best_val_acc = 0.0
file_path = "best_weights.hdf5"

for i in range(TRAIN_BATCH):
    pairs, targets = create_batch(xTrain, batch_size)
    targets = [[t] for t in targets]
    loss = model.train_on_batch(pairs, targets)
    if i % validate_batch == 0:
        correct_tested = 0
        for i in range(TEST_COUNT):
            pairs, targets = create_val_batch(xValid, N_WAY)
            pred = model.predict_on_batch(pairs)
            
            maxindex = np.argmin(pred)
            
            if maxindex == 0:
                correct_tested += 1
                
        val_acc = correct_tested / TEST_COUNT
        ## print(val_acc) COMMENTED OUT - UNCOMMENT IF YOU WANT TO TRACK VAL ACC
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            model.save_weights(file_path)

print("Best accuracy is " + str(best_val_acc))

Best accuracy is 0.86


## Contrastive Loss 

Performance is satisfactory on the above model but improvements can be made by using a different loss function. The previous model calculated the L1 distance and utilized binary cross-entropy to decrease the loss. The previous network worked to classify different images. Instead by using a contrastive loss function, the network can learn to maximize the ability to differentiate between images. 

Some notes for hyperparameters for the below code. Inclusion of epsilon in the euclidean distance function below is likely necessary as training often stalled out in the beginning otherwise. Similarly, the acc function which replaces the default accuracy function serves to help eliminate issues with very small values. Finally. the margin value in contrastive loss can be tuned to a different value but avoid making the margin value too low. 

For the network itself, I had to make sure to avoid using too much L2 regularization in the convolutional layers as the network would fail to train at all if there was too much regularization. If you encounter problems with the model failing to start training at the beginning, consider removing regularization hyperparameters. Additionally, I often encountered gradient explosion difficulties if I did not include kernel initalization. 

In [8]:
def euclidean_distance(vects):
    x, y = vects
    return K.sqrt(K.maximum(K.sum(K.square(x - y), axis=1, keepdims=True), K.epsilon()))

def euclidean_dist_output_shape(shapes):
    shape1, shape2 = shapes
    return (shape1[0], 1)

def contrastive_loss(y_true, y_pred):
    margin = 1
    y_true = -1 * y_true + 1
    return K.mean((1-y_true) * K.square(y_pred) + y_true *  K.square(K.maximum(margin - y_pred, 0.0)))

In [9]:
def acc(y_true, y_pred):
    ones = K.ones_like(y_pred)
    return K.mean(K.equal(y_true, ones - K.clip(K.round(y_pred), 0, 1)), axis=-1)

In [10]:
def W_init(shape, name=None):
    values = np.random.normal(loc=0, scale=1e-2, size=shape)
    return K.variable(values,name=name)

def contrastive_net():
    
    input_image = Input(shape=(105,105,1))
    
    siamese = Conv2D(64, kernel_size= (10,10), activation='relu', kernel_initializer=W_init, kernel_regularizer=l2(2e-4))(input_image)
    siamese = MaxPooling2D()(siamese)
    
    siamese = Conv2D(128, kernel_size= (7,7), activation='relu', kernel_initializer=W_init, kernel_regularizer=l2(2e-4))(siamese)
    siamese = MaxPooling2D()(siamese)
    
    siamese = Conv2D(128, kernel_size= (4,4), activation='relu', kernel_initializer=W_init, kernel_regularizer=l2(2e-4))(siamese)
    siamese = MaxPooling2D()(siamese)
    
    siamese = Conv2D(256, kernel_size= (4,4), activation='relu', kernel_initializer=W_init, kernel_regularizer=l2(2e-4))(siamese)
    
    siamese = Flatten()(siamese)
    output = Dense(4096, kernel_initializer=W_init, activation='sigmoid')(siamese)
    
    siamese_model = Model(input_image, output)
    
    input_1 = Input(shape=(105,105,1))
    input_2 = Input(shape=(105,105,1))
    
    output_1 = siamese_model(input_1)
    output_2 = siamese_model(input_2)
    
    distance = Lambda(euclidean_distance, output_shape=euclidean_dist_output_shape)([output_1, output_2])
    
    prediction = Dense(1, activation='sigmoid')(distance)
    
    siameseModel = Model(inputs=[input_1, input_2], outputs=prediction)

    optimizer = Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-08, decay=0.0)

    siameseModel.compile(loss=contrastive_loss, optimizer=optimizer, metrics=[acc])
    
    return siameseModel

In [11]:
model = contrastive_net()
model.summary()
model.count_params()

__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_5 (InputLayer)            (None, 105, 105, 1)  0                                            
__________________________________________________________________________________________________
input_6 (InputLayer)            (None, 105, 105, 1)  0                                            
__________________________________________________________________________________________________
model_3 (Model)                 (None, 4096)         38947648    input_5[0][0]                    
                                                                 input_6[0][0]                    
__________________________________________________________________________________________________
lambda_2 (Lambda)               (None, 1)            0           model_3[1][0]                    
          

38947650

The below code is similar to the previous iteration. However, take note that we are predicting on the output of the intermediate layer in this new model. We need the distance vector and not the final output prediction. 

In [12]:
batch_size = 64
TRAIN_BATCH = 12000
TEST_COUNT = 200
validate_batch = 100
best_val_acc = 0.0
file_path = "contrastive_best_weights.hdf5"

intermediate_layer = Model(inputs=model.input, outputs=model.get_layer(index=3).output)

for i in range(TRAIN_BATCH):
    pairs, targets = create_batch(xTrain, batch_size)
    targets = [[t] for t in targets]
    loss = model.train_on_batch(pairs, targets)
    if i % validate_batch == 0:
        correct_tested = 0
        for i in range(TEST_COUNT):
            pairs, targets = create_val_batch(xValid, N_WAY)
            pred = intermediate_layer.predict_on_batch(pairs)
            
            maxindex = np.argmin(pred)
            
            if maxindex == 0:
                correct_tested += 1
                
        val_acc = correct_tested / TEST_COUNT
        if val_acc == 1.0:
            print("Gradient Explosion")
            break
        ##print(val_acc) COMMENTED OUT - UNCOMMENT IF YOU WANT TO TRACK VAL ACC
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            model.save_weights(file_path)

print("Best accuracy is " + str(best_val_acc))

Best accuracy is 0.905
