In [1]:
import os
import numpy as np
np.random.seed(0)
import matplotlib.pyplot as plt
%matplotlib inline
from pylab import *
from keras.models import Sequential
from keras.optimizers import Adam
from keras.layers import Conv2D, ZeroPadding2D, Activation, Input, concatenate, MaxPooling2D, Lambda, Flatten, Dense, Dropout
from keras.models import Model
from keras.datasets import mnist

from keras.layers.normalization import BatchNormalization
from keras.initializers import glorot_uniform,he_uniform

from keras.engine.topology import Layer
from keras.regularizers import l2
from keras import backend as K


Using TensorFlow backend.


In [0]:
nb_classes = 10
img_rows, img_cols = 28, 28
input_shape = (img_rows, img_cols, 1)

def buildDataSet():
    
    (x_train_origin, y_train_origin), (x_test_origin, y_test_origin) = mnist.load_data()

    assert K.image_data_format() == 'channels_last'
    x_train_origin = x_train_origin.reshape(x_train_origin.shape[0], img_rows, img_cols, 1)
    x_test_origin = x_test_origin.reshape(x_test_origin.shape[0], img_rows, img_cols, 1)
    
    dataset_train = []
    dataset_test = []
    
    #Sorting images by classes and normalize values 0=>1
    for n in range(nb_classes):
        images_class_n = np.asarray([row for idx,row in enumerate(x_train_origin) if y_train_origin[idx]==n])
        dataset_train.append(images_class_n/255)
        
        images_class_n = np.asarray([row for idx,row in enumerate(x_test_origin) if y_test_origin[idx]==n])
        dataset_test.append(images_class_n/255)
    nw_dataset_train = [dataset_train[i][:4] for i in range(10)]
    return nw_dataset_train,dataset_test,x_train_origin,y_train_origin,x_test_origin,y_test_origin

In [0]:

nw_dataset_train,dataset_test,x_train_origin,y_train_origin,x_test_origin,y_test_origin = buildDataSet()

In [0]:
def build_network(input_shape, embeddingsize):
    
     # Convolutional Neural Network
    network = Sequential()
    network.add(Conv2D(32, kernel_size=(3, 3),
                 activation='relu',
                 input_shape=input_shape))
    network.add(Conv2D(64, (3, 3), activation='relu'))
    network.add(MaxPooling2D(pool_size=(2, 2)))
    network.add(Dropout(0.25))
    network.add(Flatten())
    network.add(Dense(128, activation='relu'))
    network.add(Dropout(0.5))
    network.add(Dense(embeddingsize, activation=None,
                   kernel_regularizer=l2(1e-3),
                   kernel_initializer='he_uniform'))
   
    #Force the encoding to live on the d-dimentional hypershpere
    network.add(Lambda(lambda x: K.l2_normalize(x,axis=-1)))
    
    return network

class TripletLossLayer(Layer):
    def __init__(self, alpha, **kwargs):
        self.alpha = alpha
        super(TripletLossLayer, self).__init__(**kwargs)
    
    def triplet_loss(self, inputs):
        anchor, positive, negative = inputs
        p_dist = K.sum(K.square(anchor-positive), axis=-1)
        n_dist = K.sum(K.square(anchor-negative), axis=-1)
        return K.sum(K.maximum(p_dist - n_dist + self.alpha, 0), axis=0)
    
    def call(self, inputs):
        loss = self.triplet_loss(inputs)
        self.add_loss(loss)
        return loss

def build_model(input_shape, network, margin=0.2):
    
     # Define the tensors for the three input images
    anchor_input = Input(input_shape, name="anchor_input")
    positive_input = Input(input_shape, name="positive_input")
    negative_input = Input(input_shape, name="negative_input") 
    
    # Generate the encodings (feature vectors) for the three images
    encoded_a = network(anchor_input)
    encoded_p = network(positive_input)
    encoded_n = network(negative_input)
    
    #TripletLoss Layer
    loss_layer = TripletLossLayer(alpha=margin,name='triplet_loss_layer')([encoded_a,encoded_p,encoded_n])
    
    # Connect the inputs with the outputs
    network_train = Model(inputs=[anchor_input,positive_input,negative_input],outputs=loss_layer)
    
    # return the model
    return network_train

In [5]:
network = build_network(input_shape,embeddingsize=10)
network_train = build_model(input_shape,network)
optimizer = Adam(lr = 0.00007)
network_train.compile(loss=None,optimizer=optimizer)

  'be expecting any data to be passed to {0}.'.format(name))


In [0]:
def get_batch_random(batch_size):

    m, w, h,c = nw_dataset_train[0].shape
    
    
    # initialize result
    triplets=[np.zeros((batch_size,h, w,c)) for i in range(3)]
    
    for i in range(batch_size):
        #Pick one random class for anchor
        anchor_class = np.random.randint(0, nb_classes)
        nb_sample_available_for_class_AP = nw_dataset_train[anchor_class].shape[0] # 4
        
        #Pick two different random pics for this class => A and P
        [idx_A,idx_P] = np.random.choice(nb_sample_available_for_class_AP,size=2,replace=False)
        
        #Pick another class for N, different from anchor_class
        negative_class = (anchor_class + np.random.randint(1,nb_classes)) % nb_classes
        nb_sample_available_for_class_N = nw_dataset_train[negative_class].shape[0] # 4
        
        #Pick a random pic for this negative class => N
        idx_N = np.random.randint(0, nb_sample_available_for_class_N)

        triplets[0][i,:,:,:] = nw_dataset_train[anchor_class][idx_A,:,:,:]
        triplets[1][i,:,:,:] = nw_dataset_train[anchor_class][idx_P,:,:,:]
        triplets[2][i,:,:,:] = nw_dataset_train[negative_class][idx_N,:,:,:]

    return triplets


In [0]:
def compute_dist(a,b):
    return np.sum(np.square(a-b))

def get_batch_hard(draw_batch_size,hard_batchs_size,norm_batchs_size,network):
    
    m, w, h,c = nw_dataset_train[0].shape
    
    
    #Step 1 : pick a random batch to study
    studybatch = get_batch_random(draw_batch_size)
    
    #Step 2 : compute the loss with current network : d(A,P)-d(A,N). The alpha parameter here is omited here since we want only to order them
    studybatchloss = np.zeros((draw_batch_size))
    
    #Compute embeddings for anchors, positive and negatives
    A = network.predict(studybatch[0])
    P = network.predict(studybatch[1])
    N = network.predict(studybatch[2])
    
    #Compute d(A,P)-d(A,N)
    studybatchloss = np.sum(np.square(A-P),axis=1) - np.sum(np.square(A-N),axis=1)
    
    #Sort by distance (high distance first) and take the 
    selection = np.argsort(studybatchloss)[::-1][:hard_batchs_size]
    
    #Draw other random samples from the batch
    selection2 = np.random.choice(np.delete(np.arange(draw_batch_size),selection),norm_batchs_size,replace=False)
    
    selection = np.append(selection,selection2)
    
    triplets = [studybatch[0][selection,:,:,:], studybatch[1][selection,:,:,:], studybatch[2][selection,:,:,:]]
    
    return triplets

In [8]:

print("Starting training process!")
print("-------------------------------------")
t_start = time.time()
for i in range(300):
    triplets = get_batch_hard(50,10,10,network)
    loss = network_train.train_on_batch(triplets, None)
    triplets = get_batch_hard(100,8,8,network)
    loss = network_train.train_on_batch(triplets, None)
    triplets = get_batch_hard(200,16,16,network)
    loss = network_train.train_on_batch(triplets, None)
    triplets = get_batch_hard(300,15,15,network)
    loss = network_train.train_on_batch(triplets, None)
    triplets = get_batch_hard(400,16,16,network)
    loss = network_train.train_on_batch(triplets, None)
    print("Time for {0} iterations: {1:.1f} mins, Train Loss: {2}".format(i, (time.time()-t_start)/60.0,loss,3))

Starting training process!
-------------------------------------
Time for 0 iterations: 0.0 mins, Train Loss: 11.441378593444824
Time for 1 iterations: 0.1 mins, Train Loss: 12.6925048828125
Time for 2 iterations: 0.1 mins, Train Loss: 6.295858383178711
Time for 3 iterations: 0.1 mins, Train Loss: 5.711435794830322
Time for 4 iterations: 0.1 mins, Train Loss: 10.109185218811035
Time for 5 iterations: 0.1 mins, Train Loss: 6.8788275718688965
Time for 6 iterations: 0.1 mins, Train Loss: 8.028914451599121
Time for 7 iterations: 0.1 mins, Train Loss: 7.6599016189575195
Time for 8 iterations: 0.1 mins, Train Loss: 5.436262607574463
Time for 9 iterations: 0.1 mins, Train Loss: 6.601184844970703
Time for 10 iterations: 0.1 mins, Train Loss: 9.051185607910156
Time for 11 iterations: 0.1 mins, Train Loss: 5.923454284667969
Time for 12 iterations: 0.1 mins, Train Loss: 7.440918922424316
Time for 13 iterations: 0.1 mins, Train Loss: 6.48535680770874
Time for 14 iterations: 0.1 mins, Train Loss: 5

In [0]:
def get_minimal_distances(network, images, refidx):
    
    predicted_values = []
    N=4
    _, w,h,c = dataset_test[0].shape
    nbimages=images.shape[0]
    
    #generates embedings for given images
    image_embedings = network.predict(images)
    
    #generates embedings for reference images
    ref_images = np.zeros((nb_classes,w,h,c))
    for i in range(nb_classes):
        ref_images[i,:,:,:] = nw_dataset_train[i][refidx,:,:,:]
    ref_embedings = network.predict(ref_images)
            
    for i in range(nbimages):
        
        distances = []
        for ref in range(nb_classes):
            #Compute distance between this images and references
            dist = compute_dist(image_embedings[i,:],ref_embedings[ref,:])
            distances.append(dist)
        predicted_values.append(argmin(distances))
    return predicted_values


In [0]:
def compute_accuracy(network):
  counts = []

  for j in range(100):
    predictions = []
    reals = []
    for index in np.random.randint(0,10000,100):
      reals.append(y_test_origin[index])
      preds = get_minimal_distances(network,np.expand_dims(x_test_origin[index,:,:,:],axis=0), np.random.randint(0,4))
      predictions.append(preds)

    count = 0
    for k in range(100):
      if predictions[k] == reals[k]:
        count=count +1
    counts.append(count)
  return mean(counts)


In [11]:
print("algorithm accuracy: ")
print(compute_accuracy(network))


algorithm accuracy: 
67.93
