In [1]:
# https://github.com/KinWaiCheuk/Triplet-net-keras/blob/master/Triplet%20NN%20Test%20on%20MNIST.ipynb
from keras.layers import Input, Conv2D, Lambda, Dense, Flatten,MaxPooling2D, concatenate
from keras.models import Model, Sequential
from keras.regularizers import l2
from keras import backend as K
from keras.optimizers import SGD,Adam
import numpy as np
import tensorflow as tf
import sys
from keras.backend import int_shape

Using TensorFlow backend.


In [2]:
DATASET_DIR = '../dataset/mnist-triplet-loss/'

In [19]:
def tf_print(op, tensors, message=None):
    def print_message(x):
        sys.stdout.write(message + " %s\n" % x)
        return x

    prints = [tf.py_func(print_message, [tensor], tensor.dtype) for tensor in tensors]
    with tf.control_dependencies(prints):
        op = tf.identity(op)
    return op

# def triplet_loss(y_true, y_pred, alpha = 0.4):
#     """
#     Implementation of the triplet loss function
#     Arguments:
#     y_true -- true labels, required when you define a loss in Keras, you don't need it in this function.
#     y_pred -- python list containing three objects:
#             anchor -- the encodings for the anchor data
#             positive -- the encodings for the positive data (similar to anchor)
#             negative -- the encodings for the negative data (different from anchor)
#     Returns:
#     loss -- real number, value of the loss
#     """
# #     y_pred = tf_print(y_pred, [y_pred], message='y_pred ' + str(int_shape(y_true)))
# #     print('y_pred.shape = ', y_pred)
    
#     total_length = y_pred.shape.as_list()[-1]
    
#     anchor = y_pred[:,0:int(total_length*1/3)]
#     positive = y_pred[:,int(total_length*1/3):int(total_length*2/3)]
#     negative = y_pred[:,int(total_length*2/3):int(total_length*3/3)]

#     # distance between the anchor and the positive
#     pos_dist = K.sum(K.square(anchor-positive),axis=1)

#     # distance between the anchor and the negative
#     neg_dist = K.sum(K.square(anchor-negative),axis=1)

#     # compute loss
#     basic_loss = pos_dist-neg_dist+alpha
#     loss = K.maximum(basic_loss,0.0)
    
#     return loss

def triplet_loss(y_true, y_pred, alpha = 0.2):
    """
    Implementation of the triplet loss as defined by formula (3)
    
    Arguments:
    y_true -- true labels, required when you define a loss in Keras, you don't need it in this function.
    y_pred -- python list containing three objects:
            anchor -- the encodings for the anchor images, of shape (None, 128)
            positive -- the encodings for the positive images, of shape (None, 128)
            negative -- the encodings for the negative images, of shape (None, 128)
    
    Returns:
    loss -- real number, value of the loss
    """
    
    y_pred = tf_print(y_pred, [y_pred], message='y_pred ' + str(int_shape(y_pred)))
    print('y_pred.shape = ', y_pred)
    anchor, positive, negative = y_pred[:, 0], y_pred[:, 1], y_pred[:, 2]
    
    # Step 1: Compute the (encoding) distance between the anchor and the positive, you will need to sum over axis=-1
    pos_dist = tf.reduce_sum(tf.square(tf.subtract(anchor,positive)),axis=-1) 
    # Step 2: Compute the (encoding) distance between the anchor and the negative, you will need to sum over axis=-1
    neg_dist = tf.reduce_sum(tf.square(tf.subtract(anchor,negative)),axis=-1)
    # Step 3: subtract the two previous distances and add alpha.
    basic_loss = pos_dist - neg_dist + alpha
    # Step 4: Take the maximum of basic_loss and 0.0. Sum over the training examples.
    loss = tf.reduce_sum(tf.maximum(basic_loss , 0.0))
    
    return loss

In [24]:
def base_network(in_dims):
    """
    Base network to be shared.
    """
    model = Sequential()
    model.add(Conv2D(5,(7,7),padding='same',input_shape=(in_dims[0],in_dims[1],in_dims[2],),activation='relu',name='conv1'))
    model.add(MaxPooling2D((2,2),(2,2),padding='same',name='pool1'))
    model.add(Conv2D(7,(5,5),padding='same',activation='relu',name='conv2'))
    model.add(MaxPooling2D((2,2),(2,2),padding='same',name='pool2'))
    model.add(Flatten(name='flatten'))
    model.add(Dense(128,name='embeddings'))
    model.add(Lambda(lambda x: tf.expand_dims(x, 1)))
    # model.add(Dense(600))
    
    return model

In [5]:
def load_triplet_dataset():
    X_train = np.load(DATASET_DIR + 'tr_triplets.npy')
    X_test = np.load(DATASET_DIR + 'ts_triplets.npy')
    return X_train, X_test

In [6]:
X_train, X_test = load_triplet_dataset()
input_shape = X_train.shape[1:]
input_shape

(3, 28, 28, 1)

In [7]:
dummy_y_train = np.empty((X_train.shape[0], 1))
dummy_y_test = np.empty((X_test.shape[0], 1))

In [25]:
anchor_input = Input((28,28,1, ), name='anchor_input')
positive_input = Input((28,28,1, ), name='positive_input')
negative_input = Input((28,28,1, ), name='negative_input')

# Shared embedding layer for positive and negative items
shared_network = base_network([28,28,1,])


encoded_anchor = shared_network(anchor_input)
encoded_positive = shared_network(positive_input)
encoded_negative = shared_network(negative_input)


merged_vector = concatenate([encoded_anchor, encoded_positive, encoded_negative], axis=1, name='merged_layer')


adam = Adam(lr=0.0001, beta_1=0.9, beta_2=0.999)
model = Model(inputs=[anchor_input,positive_input, negative_input], outputs=merged_vector)
# model = Model(inputs=[anchor_input,positive_input, negative_input], outputs=[encoded_anchor, encoded_positive, encoded_negative])

In [26]:
model.compile(loss=triplet_loss, optimizer=adam)
model.summary()

y_pred.shape =  Tensor("merged_layer_4/concat:0", shape=(?, 3, 128), dtype=float32)
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
anchor_input (InputLayer)       (None, 28, 28, 1)    0                                            
__________________________________________________________________________________________________
positive_input (InputLayer)     (None, 28, 28, 1)    0                                            
__________________________________________________________________________________________________
negative_input (InputLayer)     (None, 28, 28, 1)    0                                            
__________________________________________________________________________________________________
sequential_5 (Sequential)       (None, 1, 128)       45164       anchor_input[0][0]               
                         

In [31]:
model.fit([X_train[:, 0] ,X_train[:, 1],X_train[:, 2]],
          y=dummy_y_train, 
          validation_data=([X_test[:, 0], X_test[:, 1], X_test[:, 2]], dummy_y_test), 
          epochs=10, 
          batch_size=128)

Train on 54200 samples, validate on 8910 samples
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x7f47b6e6bb00>

In [32]:
model.save('trained-weights-model/MNIST-triplet-loss-network-andrew-ng.h5')