In [None]:
import tensorflow as tf
from tensorflow.keras import backend as K
from tensorflow.keras.layers import Input, Conv2D, Lambda, Dense, Flatten, MaxPooling2D, Dropout, Concatenate, BatchNormalization, concatenate, ReLU, LeakyReLU
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.regularizers import l2
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import binary_crossentropy
import numpy as np
import os
import sys
import time
import pickle
import matplotlib.pyplot as plt
from sklearn.utils import shuffle
import imageio

In [None]:
from google.colab import drive
drive.mount('/content/drive/')

Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).


In [None]:
from tensorflow.python.client import device_lib
print(device_lib.list_local_devices())

[name: "/device:CPU:0"
device_type: "CPU"
memory_limit: 268435456
locality {
}
incarnation: 15543541608337009761
xla_global_id: -1
, name: "/device:GPU:0"
device_type: "GPU"
memory_limit: 14465892352
locality {
  bus_id: 1
  links {
  }
}
incarnation: 1407240943411743949
physical_device_desc: "device: 0, name: Tesla T4, pci bus id: 0000:00:04.0, compute capability: 7.5"
xla_global_id: 416903419
]


In [None]:
project_path = '/content/drive/My Drive/HPML_Project_Siamese_Networks_kpv222_spa9633/'

train_zip_path = project_path + 'images_background.zip'
validation_zip_path = project_path + 'images_evaluation.zip'

train_folder = "/content/images_background/"
val_folder = '/content/images_evaluation/'
save_path = '/content/'
print(train_folder)
print(val_folder)
print(save_path)

/content/images_background/
/content/images_evaluation/
/content/


In [None]:
from zipfile import ZipFile

with ZipFile(train_zip_path, 'r') as z:
  z.extractall()
print("Training folder zip extraction completed!")

with ZipFile(validation_zip_path, 'r') as z:
  z.extractall()
print("Validation folder zip extraction completed!")

Training folder zip extraction completed!
Validation folder zip extraction completed!


In [None]:
def loadimgs(path,n = 0):
    '''
    path => Path of train directory or test directory
    '''
    X=[]
    y = []
    cat_dict = {}
    lang_dict = {}
    curr_y = n
    # we load every alphabet seperately so we can isolate them later
    for alphabet in os.listdir(path):
        print("loading alphabet: " + alphabet)
        lang_dict[alphabet] = [curr_y,None]
        alphabet_path = os.path.join(path,alphabet)
        # every letter/category has it's own column in the array, so  load seperately
        for letter in os.listdir(alphabet_path):
            cat_dict[curr_y] = (alphabet, letter)
            category_images=[]
            letter_path = os.path.join(alphabet_path, letter)
            # read all the images in the current category
            for filename in os.listdir(letter_path):
                image_path = os.path.join(letter_path, filename)
                image = imageio.imread(image_path)
                category_images.append(image)
                y.append(curr_y)
            try:
                X.append(np.stack(category_images))
            # edge case  - last one
            except ValueError as e:
                print(e)
                print("error - category_images:", category_images)
            curr_y += 1
            lang_dict[alphabet][1] = curr_y - 1
    y = np.vstack(y)
    X = np.stack(X)
    return X,y,lang_dict

In [None]:
X,y,c = loadimgs(train_folder)

loading alphabet: Alphabet_of_the_Magi
loading alphabet: Bengali
loading alphabet: Arcadian
loading alphabet: Japanese_(katakana)
loading alphabet: N_Ko
loading alphabet: Tagalog
loading alphabet: Sanskrit
loading alphabet: Armenian
loading alphabet: Burmese_(Myanmar)
loading alphabet: Early_Aramaic
loading alphabet: Asomtavruli_(Georgian)
loading alphabet: Anglo-Saxon_Futhorc
loading alphabet: Hebrew
loading alphabet: Ojibwe_(Canadian_Aboriginal_Syllabics)
loading alphabet: Cyrillic
loading alphabet: Korean
loading alphabet: Braille
loading alphabet: Japanese_(hiragana)
loading alphabet: Gujarati
loading alphabet: Balinese
loading alphabet: Latin
loading alphabet: Mkhedruli_(Georgian)
loading alphabet: Futurama
loading alphabet: Inuktitut_(Canadian_Aboriginal_Syllabics)
loading alphabet: Grantha
loading alphabet: Malay_(Jawi_-_Arabic)
loading alphabet: Syriac_(Estrangelo)
loading alphabet: Tifinagh
loading alphabet: Blackfoot_(Canadian_Aboriginal_Syllabics)
loading alphabet: Greek


In [None]:
Xval,yval,cval = loadimgs(val_folder)

loading alphabet: Kannada
loading alphabet: Aurek-Besh
loading alphabet: Oriya
loading alphabet: Mongolian
loading alphabet: Gurmukhi
loading alphabet: Manipuri
loading alphabet: Keble
loading alphabet: Sylheti
loading alphabet: Tibetan
loading alphabet: Atlantean
loading alphabet: Old_Church_Slavonic_(Cyrillic)
loading alphabet: Syriac_(Serto)
loading alphabet: Tengwar
loading alphabet: Malayalam
loading alphabet: Angelic
loading alphabet: Atemayar_Qelisayer
loading alphabet: ULOG
loading alphabet: Ge_ez
loading alphabet: Avesta
loading alphabet: Glagolitic


In [None]:
with open(os.path.join(save_path,"train.pickle"), "wb") as f:
    pickle.dump((X,c),f)

In [None]:
with open(os.path.join(save_path,"val.pickle"), "wb") as f:
    pickle.dump((Xval,cval),f)

In [None]:
PATH = save_path

with open(os.path.join(PATH, "train.pickle"), "rb") as f:
    (X_train, c_train) = pickle.load(f)

with open(os.path.join(PATH, "val.pickle"), "rb") as f:
    (X_test, c_test) = pickle.load(f)

print("X_train shape:", X_train.shape)
print("X_test shape:", X_test.shape)
print("")
print("training alphabets")
print([key for key in c_train.keys()])
print("test alphabets:")
print([key for key in c_test.keys()])

X_train shape: (964, 20, 105, 105)
X_test shape: (659, 20, 105, 105)

training alphabets
['Alphabet_of_the_Magi', 'Bengali', 'Arcadian', 'Japanese_(katakana)', 'N_Ko', 'Tagalog', 'Sanskrit', 'Armenian', 'Burmese_(Myanmar)', 'Early_Aramaic', 'Asomtavruli_(Georgian)', 'Anglo-Saxon_Futhorc', 'Hebrew', 'Ojibwe_(Canadian_Aboriginal_Syllabics)', 'Cyrillic', 'Korean', 'Braille', 'Japanese_(hiragana)', 'Gujarati', 'Balinese', 'Latin', 'Mkhedruli_(Georgian)', 'Futurama', 'Inuktitut_(Canadian_Aboriginal_Syllabics)', 'Grantha', 'Malay_(Jawi_-_Arabic)', 'Syriac_(Estrangelo)', 'Tifinagh', 'Blackfoot_(Canadian_Aboriginal_Syllabics)', 'Greek']
test alphabets:
['Kannada', 'Aurek-Besh', 'Oriya', 'Mongolian', 'Gurmukhi', 'Manipuri', 'Keble', 'Sylheti', 'Tibetan', 'Atlantean', 'Old_Church_Slavonic_(Cyrillic)', 'Syriac_(Serto)', 'Tengwar', 'Malayalam', 'Angelic', 'Atemayar_Qelisayer', 'ULOG', 'Ge_ez', 'Avesta', 'Glagolitic']


In [None]:
#Improved siamese model
# define a convnet model to transforms data to an embeddings space. 
input_shape = (105, 105, 1)

# The architecture is similar to that in the paper (Koch et al., "Siamese Neural Networks for One-shot Image Recognition"), 
# but we include dropout and batch normalization to improve generalization and speed up training.
convnet = Sequential()
convnet.add(Conv2D(64, (3,3), input_shape=input_shape))
convnet.add(BatchNormalization())
convnet.add(ReLU())
convnet.add(MaxPooling2D((2,2)))

convnet.add(Conv2D(128, (3,3)))
convnet.add(BatchNormalization())
convnet.add(ReLU())
convnet.add(MaxPooling2D((2,2)))

convnet.add(Conv2D(128, (3,3)))
convnet.add(BatchNormalization())
convnet.add(ReLU())
convnet.add(MaxPooling2D((2,2)))

convnet.add(Conv2D(256, (3,3)))
convnet.add(BatchNormalization())
convnet.add(ReLU())
convnet.add(MaxPooling2D((2,2)))

convnet.add(Flatten())

convnet.add(Dense(1024, activation="linear"))

convnet._name = "leg"

convnet.summary()

Model: "leg"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d_16 (Conv2D)          (None, 103, 103, 64)      640       
                                                                 
 batch_normalization_16 (Bat  (None, 103, 103, 64)     256       
 chNormalization)                                                
                                                                 
 re_lu_16 (ReLU)             (None, 103, 103, 64)      0         
                                                                 
 max_pooling2d_16 (MaxPoolin  (None, 51, 51, 64)       0         
 g2D)                                                            
                                                                 
 conv2d_17 (Conv2D)          (None, 49, 49, 128)       73856     
                                                                 
 batch_normalization_17 (Bat  (None, 49, 49, 128)      512     

In [None]:
# The anchor, positive, negative image are merged together, as the input of the triplet network, then got split to get each one's neural codes.
generated = Input(shape=(3, 105, 105, 1), name='input')

anchor = Lambda(lambda x: x[:, 0])(generated)
pos = Lambda(lambda x: x[:, 1])(generated)
neg = Lambda(lambda x: x[:, 2])(generated)

# merge the anchor, positive, negative embedding together, 
# let the merged layer be the output of triplet network
anchor_embedding = convnet(anchor)
pos_embedding = convnet(pos)
neg_embedding = convnet(neg)  

merged_output = concatenate([anchor_embedding, pos_embedding, neg_embedding], axis=-1, name='merged_layer')

triplet_net = Model(inputs=generated, outputs=merged_output)
triplet_net.summary()

Model: "model_100"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input (InputLayer)             [(None, 3, 105, 105  0           []                               
                                , 1)]                                                             
                                                                                                  
 lambda_12 (Lambda)             (None, 105, 105, 1)  0           ['input[0][0]']                  
                                                                                                  
 lambda_13 (Lambda)             (None, 105, 105, 1)  0           ['input[0][0]']                  
                                                                                                  
 lambda_14 (Lambda)             (None, 105, 105, 1)  0           ['input[0][0]']          

In [None]:
# Notice that the ground truth variable is not used for loss calculation. 
# It is used as a function argument to by-pass some Keras functionality.
# This is because the network structure already implies the ground truth for the anchor image with the "positive" image.
def triplet_loss(ground_truth, network_output):

    anchor, positive, negative = tf.split(network_output, num_or_size_splits=3, axis=1)        

    # This is an easy implementation, but also a very inefficient one because it uses offline triplet mining (https://omoindrot.github.io/triplet-loss)
    positive_distance = tf.reduce_sum(tf.square(anchor - positive), 1)
    negative_distance = tf.reduce_sum(tf.square(anchor - negative), 1)

    margin = 2000
    loss = tf.maximum(positive_distance - negative_distance + margin, 0.0)
    loss = tf.reduce_mean(loss)
 
    return loss

In [None]:
# Notice that the returned  1 * np.zeros(batch_size) is to by-pass some Keras functionality, corresponding to ground_truth in tripletloss
# We use a variable hard_selection to control which method we are going to use. If we set hard_selection == False, we will select triplets random,If we set the variable hard_selection == True, we will select hard triplets.
def get_batch(batch_size, X, hard_selection):
    # Create a subset of the model that basically represents a "leg" of the model
    subset_model = Model(inputs=triplet_net.get_layer("leg").get_input_at(0), 
                         outputs=triplet_net.get_layer("leg").get_output_at(0))

    while True:
        n_classes, n_examples, w, h = X.shape
        
        # initialize result
        triplets = []

        for i in range(batch_size):
            triplet = [[], [], []]

            # Pick one random class for anchor
            anchor_class = np.random.randint(0, n_classes)

            # Pick two different random pics for this class => idx_A and idx_P
            [idx_A, idx_P] = np.random.choice(n_examples, size=2, replace=False)
            #print(f"Anchor class: {anchor_class}, idx_A: {idx_A}, idx_P: {idx_P}")
            
            # Pick another class for negative, different from anchor_class
            negative_class = np.random.choice(np.setdiff1d(range(0, n_classes), anchor_class))
            # print(f"Negative class: {negative_class}, shape: {X[negative_class].shape}")

            if not hard_selection:
                # Pick a random pic from this negative class => N 
                idx_N = np.random.choice(n_examples, size=1, replace=False)

            else:
                # Pick a hardest pic from this negative class => N
                
                # Get the embedding of the anchor image
                anchor_img = subset_model.predict(np.expand_dims(X[anchor_class][idx_A], axis=0))

                # Make a prediction for all images in the negative class
                neg_imgs = subset_model.predict(np.expand_dims(X[negative_class], axis=0).reshape(20, 105, 105, 1))
                
                # Compute the distance (note that we use the l2 distance) between the anchor and negative img embeddings
                distances = [np.linalg.norm(anchor_img - neg_img) for neg_img in neg_imgs]

                # Pick the image with the nearest distance as the "hard" image
                idx_N = np.argsort(distances)[0]

            triplet[0] = X[anchor_class][idx_A].reshape(w, h, 1)
            triplet[1] = X[anchor_class][idx_P].reshape(w, h, 1)
            triplet[2]=  X[negative_class][idx_N].reshape(w, h, 1)
            triplets.append(triplet)

        yield np.array(triplets), 1 * np.zeros(batch_size)

In [None]:
def make_oneshot_task(N, X, c, language=None):
    """Create pairs of (test image, support set image) with ground truth, for testing N-way one-shot learning."""
    n_classes, n_examples, w, h = X.shape
    indices = np.random.randint(0, n_examples, size=(N,))
    if language is not None:
        low, high = c[language]
        if N > high - low:
            raise ValueError("This language ({}) has less than {} letters".format(language, N))
        categories = np.random.choice(range(low,high), size=(N,), replace=False)
    else:  # if no language specified just pick a bunch of random letters
        categories = np.random.choice(range(n_classes), size=(N,), replace=False)            
    true_category = categories[0]
    ex1, ex2 = np.random.choice(n_examples, replace=False, size=(2,))
    test_image = np.asarray([X[true_category, ex1, :, :]]*N).reshape(N, w, h, 1)
    support_set = X[categories, indices, :, :]
    support_set[0, :, :] = X[true_category, ex2]
    support_set = support_set.reshape(N, w, h, 1)
    targets = np.zeros((N,))
    targets[0] = 1
    targets, test_image, support_set = shuffle(targets, test_image, support_set)
    pairs = [test_image, support_set]
    return pairs, targets

In [None]:
def test_oneshot(model, X, c, N=20, k=250, language=None, verbose=True):     
    """Test average N-way oneshot learning accuracy of a siamese neural net over k one-shot tasks."""
    n_correct = 0
    
    if verbose:
        print("Evaluating model on {} random {}-way one-shot learning tasks ...".format(k, N))

    for i in range(k):
        # Create a one-shot task 
        inputs, targets = make_oneshot_task(N, X, c, language=language)

        # 1. For a given one-shot task, obtain embeddings for the test image as well as the support set. 
        test_img = model.predict(inputs[0])
        support_set = model.predict(inputs[1])
        # Note that we use the l2 distance to compute the distances
        distances = [np.linalg.norm(x-y) for x,y in zip(test_img, support_set)]
        
        # 2. Pick the image from the support set that is closest (in L2-distance) to the test image as your one-shot prediction.
        if np.argmin(distances) == np.argmax(targets):
            n_correct += 1

    percent_correct = (100.0 * n_correct / k)
    
    if verbose:
        print("Got an average of {}% accuracy for {}-way one-shot learning".format(percent_correct, N))
    return percent_correct

In [None]:
def train(model, X_train, hard_selection=False, batch_size=64, steps_per_epoch=100, epochs=1):
    model.fit(get_batch(batch_size, X_train, hard_selection), steps_per_epoch=steps_per_epoch, epochs=epochs)

In [None]:
# Random triplet selection
triplet_net.compile(loss=triplet_loss, optimizer=Adam(lr=0.0001))
loops = 20
best_acc_random = 0
t1 = time.time()
for i in range(loops):
    print("=== Training loop {} ===".format(i+1))
    # === ADD CODE HERE ===
    train(triplet_net, X_train, hard_selection=False, batch_size=64, steps_per_epoch=100, epochs=1)
    subset_model = Model(inputs=triplet_net.get_layer("leg").get_input_at(0), 
                         outputs=triplet_net.get_layer("leg").get_output_at(0))
    test_acc = test_oneshot(subset_model, X_test, c_test)

    if test_acc >= best_acc_random:
        print("********* New best one-shot accuracy, saving model ********")
        triplet_net.save(os.path.join(".", "triplet_net_with_random_selection.h5"))
        best_acc_random = test_acc

    if test_acc >= 80:
      t2 = time.time()
      print("The time taken to reach the TTA target of 80% is", (t2-t1)/60, "minutes")
      sys.exit(0)

=== Training loop 1 ===


  super(Adam, self).__init__(name, **kwargs)


Evaluating model on 250 random 20-way one-shot learning tasks ...
Got an average of 58.0% accuracy for 20-way one-shot learning
********* New best one-shot accuracy, saving model ********
=== Training loop 2 ===
Evaluating model on 250 random 20-way one-shot learning tasks ...
Got an average of 68.0% accuracy for 20-way one-shot learning
********* New best one-shot accuracy, saving model ********
=== Training loop 3 ===
Evaluating model on 250 random 20-way one-shot learning tasks ...
Got an average of 71.2% accuracy for 20-way one-shot learning
********* New best one-shot accuracy, saving model ********
=== Training loop 4 ===
Evaluating model on 250 random 20-way one-shot learning tasks ...
Got an average of 72.4% accuracy for 20-way one-shot learning
********* New best one-shot accuracy, saving model ********
=== Training loop 5 ===
Evaluating model on 250 random 20-way one-shot learning tasks ...
Got an average of 71.6% accuracy for 20-way one-shot learning
=== Training loop 6 ===
