In [None]:
import os
import json
import random
random.seed(42)

import sklearn
import numpy as np
from PIL import Image
from keras import backend as K
from keras.callbacks import ModelCheckpoint, ReduceLROnPlateau
from keras.layers import Input, Flatten, Dense, Dropout, Lambda
from keras.models import Model, load_model
from keras.optimizers import RMSprop
from keras.applications.inception_resnet_v2 import preprocess_input, InceptionResNetV2

from utilities import convert_image_to_square_rgb, preprocess_image_inception_keras, contrastive_loss

# Training Data
Define the training data for training the siamese similarity model. Training data will be created from a JSON file labeling items and their images.

In [None]:
# Change these values to the location of your images and label file, which should be mounted to the Docker image
IMAGE_BASEPATH = '/data/images'
LABELED_IMAGES_FILENAME = '/data/labeled_images.json'
BATCH_SIZE = 32

In [None]:
# Don't change these unless you know what you are doing
IMAGE_SHAPE = (299, 299, 3)
IMAGE_SIZE = IMAGE_SHAPE[:2]
EVAL_PERCENT = 0.3

In [None]:
CACHED_IMAGES = {}

def get_and_cache_image(filename, image_dir=IMAGE_BASEPATH):
    """Loads am image into memory, transforms it to a RGB matrix
    and caches it for fast retrieval
    
    Args:
        filename (str): Relative filepath from image_dir of image
        image_dir (str): Directory containing all images
        
    Returns:
        ndarray
    """
    if image_dir:
        filename = os.path.join(image_dir, filename)
    if filename in CACHED_IMAGES:
        return CACHED_IMAGES[filename]
    else:
        image = Image.open(filename)
        rgb_matrix = convert_image_to_square_rgb(image, IMAGE_SIZE)
        CACHED_IMAGES[filename] = rgb_matrix
        return rgb_matrix

In [None]:
def batch_generator(X_pairs, y_labels, batch_size):
    """"Returns a batch of images for the model to consume
    
    Args:
        X_pairs (tuple[str, str]): List of tuples with two image filepaths
        y_labels (List[int]): List of labels for pairs, 1 for similar and 0 for dissimilar
    
    Yields:
        tuple[(ndarray, ndarray), ndarray]
    """
    total_pairs = len(y_labels)
    while True:
        batch = []
        labels = []
        for i, ((img_filename_1, img_filename_2), label) in enumerate(zip(X_pairs, y_labels)):
            img_1 = get_and_cache_image(img_filename_1)
            img_2 = get_and_cache_image(img_filename_2)
            img_1 = preprocess_image_inception_keras(img_1)
            img_2 = preprocess_image_inception_keras(img_2)
            batch.append([img_1, img_2])
            labels.append(label)
            if (i + 1) % batch_size == 0 or (i + 1) == total_pairs:
                result = np.array(batch, dtype=np.float32)
                yield ([result[:, 0], result[:, 1]], np.array(labels, dtype=np.float32))
                result = None
                batch = []
                labels = []

In [None]:
def create_pairs(candidates, shuffle=False):
    """Creates positives pairs and randomly samples negative pairs to train our model"""
    pairs_pairs = []
    labels_pairs = []
    
    for item in candidates:
        image_count = len(item['images'])
        if image_count < 2:
            continue
        for i, image_1 in enumerate(item['images']):
            for image_2 in item['images'][i+1:]:
                negative_item = random.choice(candidates)
                while set(item['labels']) & set(negative_item['labels']):
                    negative_item = random.choice(candidates)
                negative_image = random.choice(negative_item['images'])
                positive_pair = (image_1['filename'], image_2['filename'])
                positive_pair = random.sample(positive_pair, 2)
                negative_pair = (positive_pair[0], negative_image['filename'])
                pairs_pairs.append([positive_pair, negative_pair])
                labels_pairs.append([1, 0])
    if shuffle:           
        pairs_pairs, labels_pairs = sklearn.utils.shuffle(pairs_pairs, labels_pairs, random_state=42)

    example_count = len(labels_pairs) * 2
    pairs = [None] * example_count
    labels = [None] * example_count
    for i, (a, b) in enumerate(pairs_pairs):
        pairs[i*2] = a
        pairs[i*2+1] = b
    for i, (a, b) in enumerate(labels_pairs):
        labels[i*2] = a
        labels[i*2+1] = b
    return (pairs, labels)

In [None]:
with open(LABELED_IMAGES_FILENAME) as f:
    labeled_images = json.load(f)

candidates = list(labeled_images.values())
pivot = int(len(candidates) * EVAL_PERCENT)
random.shuffle(candidates)

train_candidates = candidates[:pivot]
eval_candidates = candidates[pivot:]

In [None]:
X_train, y_train = create_pairs(train_candidates, shuffle=True)
X_eval, y_eval = create_pairs(eval_candidates, shuffle=True)

## Model Objectives
The functions the models are trying to optimize in some way. `contrastive_loss` is the important loss function, but it must be defined in the utilities to properly load Keras models into memory.

In [None]:
def euclidean_distance(vects):
    x, y = vects
    return K.sqrt(K.maximum(K.sum(K.square(x - y), axis=1, keepdims=True), K.epsilon()))

In [None]:
def eucl_dist_output_shape(shapes):
    shape1, shape2 = shapes
    return (shape1[0], 1)

## Model Definition

In [None]:
def create_base_network(input_shape, freeze_layers_until=None):
    """Get the base network to do the feature extract for the latent embedding
    
    Args:
        input_shape (tuple): Shape of image tensor input
        
    Returns:
        keras.models.Model
    """
    input = Input(shape=input_shape)
    inception = InceptionResNetV2(weights='imagenet', input_tensor=input)
    inception.layers.pop()  # Remove classification layer

    if freeze_layers_until:
        assert freeze_layers_until in [l.name for l in inception.layers]
        for layer in inception.layers:
            layer.trainable = False
            if type(layer) == 'BatchNormalization':
                layer.momentum = 1.0
            if layer.name == freeze_layers_until:
                break

    model = Model(inputs=[input], outputs=[inception.layers[-1].output], name='embedding_model')
    return model

In [None]:
def compute_accuracy(y_true, y_pred):
    """Compute classification accuracy with a fixed threshold on distances."""
    pred = y_pred.ravel() < 0.5
    return np.mean(pred == y_true)

In [None]:
def accuracy(y_true, y_pred):
    """Compute classification accuracy with a fixed threshold on distances."""
    return K.mean(K.equal(y_true, K.cast(y_pred < 0.5, y_true.dtype)))

In [None]:
# Grab the Inception ResNet V2 model pretrained on image net
# Freeze layers up to mixed_6a for faster training and less overfitting
base_network = create_base_network(IMAGE_SHAPE, 'mixed_6a')

# Create two inputs for both images from pairs
input_a = Input(shape=IMAGE_SHAPE)
input_b = Input(shape=IMAGE_SHAPE)

In [None]:
# Connect the same base_network created above to the two image inputs
processed_a = base_network(input_a)
processed_b = base_network(input_b)

# Get the distance between the two input images
distance = Lambda(euclidean_distance,
                  output_shape=eucl_dist_output_shape)([processed_a, processed_b])

# Create a model which takes in a pair of images and returns the distance
model = Model([input_a, input_b], distance)

In [None]:
# Create callback functions to call after every epoch
ckpt_dir = 'checkpoints'
if not os.path.exists(ckpt_dir):
    os.mkdir(ckpt_dir)
ckpt_pattern = os.path.join(ckpt_dir, 'weights.{epoch:03d}-{val_loss:.5f}.hdf5')

callbacks = [
    ModelCheckpoint(ckpt_pattern,
                    monitor='val_loss',
                    save_best_only=True),
    ReduceLROnPlateau('loss', factor=0.5, patience=3, verbose=1, min_lr=1e-7)
]

In [None]:
init_learning_rate = 0.0005
rms = RMSprop(lr=init_learning_rate)
model.compile(loss=contrastive_loss, optimizer=rms, metrics=[accuracy])
model.fit_generator(batch_generator(X_train, y_train, BATCH_SIZE),
                    steps_per_epoch=len(X_train) // BATCH_SIZE,
                    epochs=5,
                    callbacks=callbacks,
                    validation_data=batch_generator(X_eval, y_eval, BATCH_SIZE),
                    validation_steps=len(y_eval) // BATCH_SIZE)

In [None]:
# Save the final model
model.save(os.path.join(ckpt_dir, 'final_model.hdf5'))

In [None]:
y_pred = model.predict_generator(batch_generator(X_eval, y_eval, BATCH_SIZE),
                                 steps=len(y_eval) // BATCH_SIZE,
                                 verbose=1)
te_acc = compute_accuracy(y_eval, y_pred)
print('* Accuracy on test set: %0.2f%%' % (100 * te_acc))