[View in Colaboratory](https://colab.research.google.com/github/maxwelljohn/siamese-word2vec/blob/master/siamese.ipynb)

In [0]:
import numpy as np

import datetime
import itertools
import json
import os
import random
import skimage.transform
from keras.utils.data_utils import get_file
from keras.models import Model
from keras.layers import Input, Flatten, Dense, Dropout, Lambda
from keras.optimizers import RMSprop
from keras import backend as K
from scipy.misc import imread

!pip install scikit-optimize
import skopt

In [0]:
def euclidean_distance(vects):
    x, y = vects
    return K.sqrt(K.maximum(K.sum(K.square(x - y), axis=1, keepdims=True), K.epsilon()))


def eucl_dist_output_shape(shapes):
    shape1, shape2 = shapes
    return (shape1[0], 1)


def contrastive_loss(y_true, y_pred):
    '''Contrastive loss from Hadsell-et-al.'06
    http://yann.lecun.com/exdb/publis/pdf/hadsell-chopra-lecun-06.pdf
    '''
    margin = 1
    return K.mean(y_true * K.square(y_pred) +
                  (1 - y_true) * K.square(K.maximum(margin - y_pred, 0)))


def similarity(a, b):
    a, b = np.ravel(a), np.ravel(b)
    # Cosine similarity
    return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))


def create_pairs(x, class_indices, choose_hard_negs=1, max_per_class=float('infinity')):
    '''Positive and negative pair creation.
    Alternates between positive and negative pairs.
    '''
    pairs = []
    labels = []
    for family in class_indices:
        sibling_pairs = np.array(list(itertools.combinations(family, 2)))

        shuffled_indices = np.arange(len(x))
        np.random.shuffle(shuffled_indices)
        next_index = 0
        num_outside_family = len(shuffled_indices) - len(family)
        assert choose_hard_negs < num_outside_family

        if len(sibling_pairs) > max_per_class:
            sibling_pairs = sibling_pairs[np.random.choice(len(sibling_pairs), max_per_class)]

        for sibling_pair in sibling_pairs:
            random.shuffle(sibling_pair)
            anchor, pos = sibling_pair
            pairs.append([x[anchor], x[pos]])
            labels.append(1)

            hardest_neg = None
            closest_similarity = float('-infinity')
            candidates = 0
            while candidates < choose_hard_negs:
                try:
                    random_neg = shuffled_indices[next_index]
                    next_index += 1
                    while random_neg in family:
                        random_neg = shuffled_indices[next_index]
                        next_index += 1
                except IndexError:
                    np.random.shuffle(shuffled_indices)
                    next_index = 0
                    continue
                sim = similarity(x[anchor], x[random_neg])
                if sim > closest_similarity:
                    hardest_neg = random_neg
                    closest_similarity = sim
                candidates += 1
            pairs.append([x[anchor], x[hardest_neg]])
            labels.append(0)
    assert len(pairs) == len(labels)
    return np.array(pairs), np.array(labels)


def create_base_network(input_shape, layer_size=128, dropout_rate=0.1, layer_count=3):
    '''Base network to be shared (eq. to feature extraction).
    '''
    input = Input(shape=input_shape)
    x = Flatten()(input)
    for i in range(layer_count-1):
        x = Dense(layer_size, activation='relu')(x)
        x = Dropout(dropout_rate)(x)
    x = Dense(layer_size, activation='relu')(x)
    return Model(input, x)


def compute_accuracy(y_true, y_pred):
    '''Compute classification accuracy with a fixed threshold on distances.
    '''
    pred = y_pred.ravel() < 0.5
    return np.mean(pred == y_true)


def accuracy(y_true, y_pred):
    '''Compute classification accuracy with a fixed threshold on distances.
    '''
    return K.mean(K.equal(y_true, K.cast(y_pred < 0.5, y_true.dtype)))

In [0]:
zip_path = get_file('omniglot-background.zip', origin='https://github.com/brendenlake/omniglot/raw/master/python/images_background.zip', extract=True)
background_path = os.path.join(os.path.dirname(zip_path), 'images_background')
zip_path = get_file('omniglot-evaluation.zip', origin='https://github.com/brendenlake/omniglot/raw/master/python/images_evaluation.zip', extract=True)
eval_path = os.path.join(os.path.dirname(zip_path), 'images_evaluation')

def load_omniglot_images(path, cutoff=0):
    class_num = 0
    x = []
    y = []
    name_for_class_num = []
    class_num_for_name = {}
    if cutoff == 0:
        alphabets = os.listdir(path)
    if cutoff > 0:
        alphabets = sorted(os.listdir(path))[:cutoff]
    else:
        alphabets = sorted(os.listdir(path))[cutoff:]
    for alphabet in alphabets:
        alphabet_path = os.path.join(path, alphabet)
        for character in os.listdir(alphabet_path):
            name = alphabet + '-' + character
            name_for_class_num.append(name)
            class_num_for_name[name] = class_num
            character_path = os.path.join(alphabet_path, character)
            for img in os.listdir(character_path):
                img_path = os.path.join(character_path, img)
                thumbnail = skimage.transform.resize(imread(img_path), (28, 28))
                x.append(thumbnail)
                y.append(class_num)
            class_num += 1
    x = np.array(x)
    y = np.array(y)
    return x, y, name_for_class_num, class_num_for_name

x_train, y_train, name_for_class_num_train, class_num_for_name_train = load_omniglot_images(background_path)
# There are 20 evaluation alphabets... we'll split them into two groups of 10.
x_dev, y_dev, name_for_class_num_dev, class_num_for_name_dev = load_omniglot_images(eval_path, 10)
x_test, y_test, name_for_class_num_test, class_num_for_name_test = load_omniglot_images(eval_path, -10)
x_train = x_train.astype('float32')
x_dev = x_dev.astype('float32')
x_test = x_test.astype('float32')
x_train /= 255
x_dev /= 255
x_test /= 255
input_shape = x_train.shape[1:]

In [0]:
# create positive and negative pairs
class_indices = [np.where(y_train == i)[0] for i in range(len(name_for_class_num_train))]
tr_pairs, tr_y = create_pairs(x_train, class_indices, 1, 30)

class_indices = [np.where(y_dev == i)[0] for i in range(len(name_for_class_num_dev))]
dev_pairs, dev_y = create_pairs(x_dev, class_indices, 1, 30)

class_indices = [np.where(y_test == i)[0] for i in range(len(name_for_class_num_test))]
te_pairs, te_y = create_pairs(x_test, class_indices, 1, 30)

In [0]:
space = [
    skopt.space.Categorical([64, 128, 256, 512], name='layer_size'),
    skopt.space.Real(0, 0.5, name='dropout_rate'),
    skopt.space.Integer(1, 10, name='layer_count'),
    skopt.space.Categorical(['rms'], name='optimizer'),
    skopt.space.Categorical([128, 256, 512], name='batch_size'),
    skopt.space.Categorical([20], name='epochs'),
]

@skopt.utils.use_named_args(space)
def objective(layer_size, dropout_rate, layer_count, optimizer, batch_size, epochs):
    print(locals())
    start_time = datetime.datetime.now()
    
    # network definition
    base_network = create_base_network(input_shape, layer_size, dropout_rate, layer_count)

    input_a = Input(shape=input_shape)
    input_b = Input(shape=input_shape)

    # because we re-use the same instance `base_network`,
    # the weights of the network
    # will be shared across the two branches
    processed_a = base_network(input_a)
    processed_b = base_network(input_b)

    distance = Lambda(euclidean_distance,
                      output_shape=eucl_dist_output_shape)([processed_a, processed_b])

    model = Model([input_a, input_b], distance)
    
    # train
    if optimizer == 'rms':
        opt = RMSprop()
    else:
        raise ValueError("unknown optimizer")
    model.compile(loss=contrastive_loss, optimizer=opt, metrics=[accuracy])
    model.fit([tr_pairs[:, 0], tr_pairs[:, 1]], tr_y,
        batch_size=batch_size,
        epochs=epochs,
        verbose=0,
        validation_data=([dev_pairs[:, 0], dev_pairs[:, 1]], dev_y))
    
    # compute final accuracy on training and test sets
    y_pred = model.predict([tr_pairs[:, 0], tr_pairs[:, 1]])
    tr_acc = compute_accuracy(tr_y, y_pred)
    y_pred = model.predict([dev_pairs[:, 0], dev_pairs[:, 1]])
    dev_acc = compute_accuracy(dev_y, y_pred)

    print('* Optimization took {:.0f} seconds'.format((datetime.datetime.now() - start_time).total_seconds()))
    print('* Accuracy on training set: %0.2f%%' % (100 * tr_acc))
    print('* Accuracy on dev set: %0.2f%%' % (100 * dev_acc))
    
    return model

In [0]:
model = objective([128, 0.1, 3, 'rms', 128, 20])

In [0]:
y_pred = model.predict([te_pairs[:, 0], te_pairs[:, 1]])
te_acc = compute_accuracy(te_y, y_pred)
print('* Accuracy on test set: %0.2f%%' % (100 * te_acc))