In [1]:
import numpy as np
import time
import os
import numpy.random as rng

from keras.models import Sequential
from keras.optimizers import Adam
from keras.layers import Conv2D, Input
from keras.models import Model
from keras.layers.pooling import MaxPooling2D
from keras.layers.core import Lambda, Flatten, Dense
from keras.regularizers import l2
from keras import backend as K
from matplotlib.pyplot import imread
from sklearn.utils import shuffle

%matplotlib inline

Using TensorFlow backend.
  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])
  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])


In [2]:
#Loading Train and Validation data from temp folder 
train_folder = "D:\Dataset\One_Shot_learning\omniglot-master\ExtractedData\images_background"
val_folder = 'D:\Dataset\One_Shot_learning\omniglot-master\ExtractedData\images_evaluation'
save_path = 'D:\MLworkspace\OneShotLearning\omniglot_1\data'
model_path = 'D:\MLworkspace\OneShotLearning\omniglot_1\data'

Due to structure of the dataset folders, 
it's necessary to parse every folder and copy the characters(Image itself) into X
and the label for these images is the title of each image itself which also contains
total number of sample of that image 
* The Structure of our dataset is as follows Folder->Alphabets->Characters->20 samples of each image


In [3]:
def loading_data(path,n = 0):
    X=[]
    y = []
    category_dict = {}
    language_dict = {}
    curr_y = n
    # Iterate through all the Alphabets folder and load the names into language_dict
    for alphabet in os.listdir(path):
        print(alphabet)
        language_dict[alphabet] = [curr_y,None]
        alphabet_path = os.path.join(path,alphabet)
        # Iterate through all the folders in Each Alphabet and load the letter into category_dict
        for letter in os.listdir(alphabet_path):
            category_dict[curr_y] = (alphabet, letter)
            category_images=[]
            letter_path = os.path.join(alphabet_path, letter)
            # Load all the Images (20) and then the label is added as part of language_dict
            for filename in os.listdir(letter_path):
                image_path = os.path.join(letter_path, filename)
                image = imread(image_path)
                category_images.append(image)
                y.append(curr_y)
            try:
                X.append(np.stack(category_images))
                # edge case  - last one
            except ValueError as e:
                print(e)
                print("error - category_images:", category_images)
            curr_y += 1
            language_dict[alphabet][1] = curr_y - 1
    y = np.vstack(y)
    X = np.stack(X)
    return X,y,language_dict

In [4]:
Xtrain,ytrain,ctrain=loading_data(train_folder)
Xval,yval,cval=loading_data(val_folder)

Alphabet_of_the_Magi
Anglo-Saxon_Futhorc
Arcadian
Armenian
Asomtavruli_(Georgian)
Balinese
Bengali
Blackfoot_(Canadian_Aboriginal_Syllabics)
Braille
Burmese_(Myanmar)
Cyrillic
Early_Aramaic
Futurama
Grantha
Greek
Gujarati
Hebrew
Inuktitut_(Canadian_Aboriginal_Syllabics)
Japanese_(hiragana)
Japanese_(katakana)
Korean
Latin
Malay_(Jawi_-_Arabic)
Mkhedruli_(Georgian)
N_Ko
Ojibwe_(Canadian_Aboriginal_Syllabics)
Sanskrit
Syriac_(Estrangelo)
Tagalog
Tifinagh
Angelic
Atemayar_Qelisayer
Atlantean
Aurek-Besh
Avesta
Ge_ez
Glagolitic
Gurmukhi
Kannada
Keble
Malayalam
Manipuri
Mongolian
Old_Church_Slavonic_(Cyrillic)
Oriya
Sylheti
Syriac_(Serto)
Tengwar
Tibetan
ULOG


There are around 30 training Aplhabets examples and 20 validation alphabets available for training and validation

In [5]:
def initialize_weights(shape):
    return np.random.normal(loc = 0.0, scale = 1e-2, size = shape)

In [6]:
def initialize_bias(shape):
    return np.random.normal(loc = 0.5, scale = 1e-2, size = shape)

* To define model for one-shot learning we make use of siamese network, with 2 similar CNN layers as inputs
* One input layer is the actual image to be compared with and other image is the image being used to compare
* Both the results are passed on to final Sigmoid layer to better calculate the difference after the output from both layers
* Here i's using Keras to create the model, with functional layer , to work with multiple inputs and single output
* Because keras doesnt support layer to calculate output difference, we use custom Lambda layer to find absolute difference in output

In [7]:
def siamese_model(input_shape):
    left_input = Input(input_shape)
    right_input = Input(input_shape)
    
    model = Sequential()
    model.add(Conv2D(64, (10,10), activation='relu', input_shape=input_shape, kernel_initializer=initialize_weights, kernel_regularizer=l2(2e-4)))
    model.add(MaxPooling2D())
    model.add(Conv2D(128, (7,7), activation='relu', kernel_initializer=initialize_weights, bias_initializer=initialize_bias, kernel_regularizer=l2(2e-4)))
    model.add(MaxPooling2D())
    model.add(Conv2D(128, (4,4), activation='relu', kernel_initializer=initialize_weights, bias_initializer=initialize_bias, kernel_regularizer=l2(2e-4)))
    model.add(MaxPooling2D())
    model.add(Conv2D(256, (4,4), activation='relu', kernel_initializer=initialize_weights, bias_initializer=initialize_bias, kernel_regularizer=l2(2e-4)))
    model.add(Flatten())
    model.add(Dense(4096, activation='sigmoid', kernel_regularizer=l2(1e-3), kernel_initializer=initialize_weights,bias_initializer=initialize_bias))
    
    image_one = model(left_input)
    image_two = model(right_input)
    
    input_layer = Lambda(lambda tensors:K.abs(tensors[0] - tensors[1]))
    difference = input_layer([image_one, image_two])
    
    prediction = Dense(1,activation='sigmoid',bias_initializer=initialize_bias)(difference)
    siamese_net = Model(inputs=[left_input,right_input],outputs=prediction)
    return siamese_net


In [8]:
model = siamese_model((105, 105, 1))
model.summary()





__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            (None, 105, 105, 1)  0                                            
__________________________________________________________________________________________________
input_2 (InputLayer)            (None, 105, 105, 1)  0                                            
__________________________________________________________________________________________________
sequential_1 (Sequential)       (None, 4096)         38947648    input_1[0][0]                    
                                                                 input_2[0][0]                    
__________________________________________________________________________________________________
lambda_1 (Lambda)               (None, 4096)         0           sequential_1[1][0]               
      

In [9]:
# Using binary_crossentropy since the output value is in 0 and 1
optimizer = Adam(lr = 0.00006)
model.compile(loss="binary_crossentropy",optimizer=optimizer)



Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where


* Here we create a batch of data, which output PAIR of images which is the 2 Inputs for our model, and TARGET values which are the similarity value between those values
* Batch is created based on the given value, 20 in our case
* These batch of data is split into two part as per our training, 
    * 1st half of the values from batch are different and randomly selected
    * 2nd half of the values from batch are same and randomly selected
* Hence the TARGET value for 1st half is 0, 2nd half is 1

In [10]:
# We will use this block only for Training our model
def get_batch(batch_size,s="train"):
    if s == 'train':
        X = Xtrain
    else:
        X = Xval
    n_classes, n_examples, w, h = X.shape

    # This selects few characters in random to use in our training
    categories = rng.choice(n_classes,size=(batch_size,),replace=False)

    # Create empty array for Images and Target values
    pairs=[np.zeros((batch_size, h, w,1)) for i in range(2)]
    targets=np.zeros((batch_size,))
    
    targets[batch_size//2:] = 1
    for i in range(batch_size):
        category = categories[i]
        idx_1 = rng.randint(0, n_examples)
        pairs[0][i,:,:,:] = X[category, idx_1].reshape(w, h, 1)
        idx_2 = rng.randint(0, n_examples)
        
        if i >= batch_size // 2:
            category_2 = category  
        else: 
            category_2 = (category + rng.randint(1,n_classes)) % n_classes
        
        pairs[1][i,:,:,:] = X[category_2,idx_2].reshape(w, h,1)
    
    return pairs, targets
        

In [11]:
# This returns data in pairs for validation
def test_oneshot_preprocess(N, s="val"):
    if s == 'train':
        X = Xtrain
    else:
        X = Xval
    n_classes, n_examples, w, h = X.shape 
    indices = rng.randint(0, n_examples,size=(N,))
    
    categories = rng.choice(range(n_classes),size=(N,),replace=False)            
    true_category = categories[0]
    ex1, ex2 = rng.choice(n_examples,replace=False,size=(2,))
    test_image = np.asarray([X[true_category,ex1,:,:]]*N).reshape(N, w, h,1)
    support_set = X[categories,indices,:,:]
    support_set[0,:,:] = X[true_category,ex2]
    support_set = support_set.reshape(N, w, h,1)
    targets = np.zeros((N,))
    targets[0] = 1
    targets, test_image, support_set = shuffle(targets, test_image, support_set)
    pairs = [test_image,support_set]

    return pairs, targets

We predict and evaluate the model by running the prediction on our validation set 
and calculate average accuracy of our model,
* This is important way of evaluating the model on batch of 20 one-shot classification
* As the validation happens in batches the validation set accuracy can be taken from the overall average

In [12]:
def predict_oneshot(model, N, k, s = "val"):
    n_correct = 0
    for i in range(k):
        inputs, targets = test_oneshot_preprocess(N,s)
        probs = model.predict(inputs)
        if np.argmax(probs) == np.argmax(targets):
            n_correct+=1
    percent_correct = (100.0 * n_correct / k)
    return percent_correct

In [13]:
# Predefine few Hyper parameters
n_iter = 1000 # Total iterations
n_val = 200 # validate on this number of set
evaluate_every = 200 # evaluate model every 100 iterations
batch_size = 64
N_way = 20 # to be tested on 20 way images 
best = -1

(Optional) Here I am loading my pre trained weights which I already trained on the dataset and saved before to avoid repeated training for the notebook,
We can still wish to not use this and train from beginning without loading weights

In [14]:
model.summary()
model.load_weights("D:\MLworkspace\OneShotLearning\omniglot_1\data\pretrained_weights.h5")

__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            (None, 105, 105, 1)  0                                            
__________________________________________________________________________________________________
input_2 (InputLayer)            (None, 105, 105, 1)  0                                            
__________________________________________________________________________________________________
sequential_1 (Sequential)       (None, 4096)         38947648    input_1[0][0]                    
                                                                 input_2[0][0]                    
__________________________________________________________________________________________________
lambda_1 (Lambda)               (None, 4096)         0           sequential_1[1][0]               
          

Because of the pre-trained weights the model should converge at around 5 to 6 evaluation, 

In [15]:
t_start = time.time()
for i in range(1, n_iter+1):
    (inputs,targets) = get_batch(batch_size)
    loss = model.train_on_batch(inputs, targets)
    if i % evaluate_every == 0:
        print("\n Training Loss: {0}".format(loss)) 
        val_acc = predict_oneshot(model, N_way, n_val)
        model.save_weights(os.path.join(model_path, 'weights.{}.h5'.format(i)))
        if val_acc >= best:
            print("Current best: {0}, previous best: {1}".format(val_acc, best))
            best = val_acc



 Training Loss: 0.15595930814743042
Current best: 76.5, previous best: -1

 Training Loss: 0.24491146206855774
Current best: 78.5, previous best: 76.5

 Training Loss: 0.13311876356601715
Current best: 84.0, previous best: 78.5

 Training Loss: 0.1227961853146553
Current best: 86.0, previous best: 84.0

 Training Loss: 0.12759816646575928


As you can see the final accuracy achieved with our weight is around 86% on the validation set 
 , While the accuracy  of Test data is around 88-90% before it stops converging

