In [None]:
!pip install kaggle
!mkdir ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json
! kaggle datasets download watesoyan/omniglot
! unzip omniglot

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
  inflating: omniglot/images_evaluation/Mongolian/character16/1374_01.png  
  inflating: omniglot/images_evaluation/Mongolian/character16/1374_02.png  
  inflating: omniglot/images_evaluation/Mongolian/character16/1374_03.png  
  inflating: omniglot/images_evaluation/Mongolian/character16/1374_04.png  
  inflating: omniglot/images_evaluation/Mongolian/character16/1374_05.png  
  inflating: omniglot/images_evaluation/Mongolian/character16/1374_06.png  
  inflating: omniglot/images_evaluation/Mongolian/character16/1374_07.png  
  inflating: omniglot/images_evaluation/Mongolian/character16/1374_08.png  
  inflating: omniglot/images_evaluation/Mongolian/character16/1374_09.png  
  inflating: omniglot/images_evaluation/Mongolian/character16/1374_10.png  
  inflating: omniglot/images_evaluation/Mongolian/character16/1374_11.png  
  inflating: omniglot/images_evaluation/Mongolian/character16/1374_12.png  
  inflating: omniglot/i

In [None]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import os
from tqdm import tqdm #for progress bar
import cv2 #for image processing
from numpy import random as rng
from sklearn.utils import shuffle
import pickle
import time

from tensorflow.keras.layers import Input, Lambda, Conv2D, MaxPooling2D, BatchNormalization, Dense, Flatten, Activation, Dropout
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras import backend as K

In [None]:
image_train_dir = '/content/omniglot/images_background'
image_eval_dir = '/content/omniglot/images_evaluation'



def load_data(dir_path):
    X = []
    y = []
    lang_dict = {}
    classNo = 0
    for alphabet in tqdm(sorted(os.listdir(dir_path))):
        lang_dict[alphabet] = [classNo,None]
        alpha_path = os.path.join(dir_path,alphabet)
        for letter in sorted(os.listdir(alpha_path)):
            cat_images= []
            for img in sorted(os.listdir(os.path.join(alpha_path,letter))):
                img_path = os.path.join(alpha_path,letter,img)
                #cv2.cvtColor() method is used to convert an image from one color space to another.
                cat_images.append(cv2.cvtColor(cv2.imread(img_path),cv2.COLOR_BGR2GRAY))
                y.append(classNo)
            classNo += 1
            X.append(cat_images)
            lang_dict[alphabet][1] = classNo-1
    X = np.array(X)
    y = np.array(y)
    print(X.shape,y.shape)
    return X, y, lang_dict




#splitting for validation

x_train, y_train,train_dict = load_data(image_train_dir)
x_test, y_test, test_dict = load_data(image_eval_dir)

VALIDATION_SIZE = int(len(x_train) * 0.2)

x_val = x_train[:VALIDATION_SIZE]
y_val = y_train[:VALIDATION_SIZE*20]

x_train = x_train[VALIDATION_SIZE:]
y_train = y_train[VALIDATION_SIZE*20:]

train_items = train_dict.items()
val_dict = list(train_items)[:VALIDATION_SIZE]


#x_train = preprocess(x_train)
#x_val = preprocess(x_val)
#x_test = preprocess(x_test)

print(f"Train: {len(x_train)}")
print(f"Validation: {len(x_val)}")
print(f"Test: {len(x_test)}")

100%|██████████| 30/30 [00:01<00:00, 15.89it/s]


(964, 20, 105, 105) (19280,)


100%|██████████| 20/20 [00:01<00:00, 16.89it/s]


(659, 20, 105, 105) (13180,)
Train: 772
Validation: 192
Test: 659


In [None]:
def get_batch(batch_size,dset='train'):
    if dset == 'train':
        X = x_train
    else:
        X = x_val
    n_classes, n_examples, w, h = X.shape
    input_1 = np.random.choice(n_classes, size=batch_size, replace=False)

    targets = np.zeros((batch_size,))
    #target for similar pairs, second-half
    targets[batch_size//2:] = 1
    pairs = [np.zeros((batch_size,w,h,1)) for _ in range(2)]
    for i in range(batch_size):
        #sample numbers for input_1
        ex_no = np.random.randint(n_examples)
        pairs[0][i,:,:,:] = X[input_1[i],ex_no,:,:].reshape(w,h,1)
        input_2 = 0
        #for similar pair, assigning same class numbers for second-half of input_2 as input_1
        if i >= batch_size // 2:
            input_2 = input_1[i]
        else:
            #dissimilar pairs, for first half of input_1
            input_2 = (input_1[i] + np.random.randint(1,n_classes)) % n_classes
        #sample numbers for input_2
        ex_no2 = np.random.randint(n_examples)
        pairs[1][i,:,:,:] = X[input_2,ex_no2,:,:].reshape(w,h,1)
    return pairs,targets



def make_one_shot_task(N,dset='val'):
    if dset == 'train':
      X = x_train
    elif dset== 'val':
      X = x_val
    else:
      X = x_test

    n_classes, n_examples, w, h = X.shape

    #from n_classes create an array labels/classes of size N=20 with repetition
    class_no = np.random.choice(n_classes,size=(N,),replace=False)
    #each character/class has 20 examples. From that create an array of size N=20 with repetition
    sample_no = np.random.choice(n_examples,size=(N,))


    #from the character label array pick the first label
    true_class = class_no[0]

    #from 20 samples, randomly chose one sample
    ex1 = np.random.randint(n_examples)

   # from train/val image from true_cat label pick the ex1 th sample and reshape it
    test_image = np.array([X[true_class,ex1]]*N).reshape(N,w,h,1)

    # class_no has the class/character
    # sample_no is to get one of the 20 samples from a class
    support_set = X[class_no,sample_no].reshape(N,w,h,1)

    targets = np.zeros((N,))

    #setting the label of the query image class in support set as 1, for accuracy calc.
    targets[0] = 1

    test_image,support_set,targets = shuffle(test_image,support_set,targets)

    return [test_image,support_set], targets







#to calculate accuracy
def test_one_shot(model,N,k,dset='val'):
    n_correct = 0
    for _ in range(k):
        inputs, outputs = make_one_shot_task(N,dset)
        preds = model.predict(inputs)
        if np.argmax(outputs) == np.argmax(preds):
            n_correct += 1
    return n_correct / k

In [None]:
#from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.applications.inception_v3 import InceptionV3
import numpy as np

In [None]:
#Import vgg model by not defining an input shape.
vgg_model = InceptionV3(include_top=False, weights='imagenet')
#print(vgg_model.summary())

#Get the dictionary of config for vgg16
vgg_config = vgg_model.get_config()

# Change the input shape to new desired shape
h, w, c = 105, 105, 1
vgg_config["layers"][0]["config"]["batch_input_shape"] = (None, h, w, c)


#Create new model with the updated configuration
vgg_updated = Model.from_config(vgg_config)
#print(vgg_updated.summary())

# Check Weights of first conv layer in the original model...
orig_model_conv1_block1_wts = vgg_model.layers[1].get_weights()[0]

print(orig_model_conv1_block1_wts[:,:,0,0])
print(orig_model_conv1_block1_wts[:,:,1,0])
print(orig_model_conv1_block1_wts[:,:,2,0])
print("---------------top3")
# Check Weights of first conv layer in the new model...
new_model_conv1_block1_wts = vgg_updated.layers[1].get_weights()[0]
print(new_model_conv1_block1_wts[:,:,0,0])
#Notice the Random weights....

#Let us average weights for all RGB channels for the first convolutional layer
#and assign it to the first conv layer in our new model.

# Function that calculates average of weights along the channel axis
def avg_wts(weights):
  average_weights = np.mean(weights, axis=-2).reshape(weights[:,:,-1:,:].shape)  #Find mean along the channel axis (second to last axis)
  return(average_weights)

#Get the configuration for the updated model and extract layer names.
#We will use these names to copy over weights from the original model.
vgg_updated_config = vgg_updated.get_config()
vgg_updated_layer_names = [vgg_updated_config['layers'][x]['name'] for x in range(len(vgg_updated_config['layers']))]

#Name of the first convolutional layer.
#Remember that this is the only layer with new additional weights. All other layers
#will have same weights as the original model.
first_conv_name = vgg_updated_layer_names[1]

#Update weights for all layers. And for the first conv layer replace weights with average of all 3 channels.
i=0
for layer in vgg_model.layers:
    if layer.name in vgg_updated_layer_names:
      print("weights value")
      print(layer.get_weights(),end=str(i))
      #,layer.get_weights()[1].shape)
      if layer.get_weights() != []:  #All convolutional layers and layers with weights (no input layer or any pool layers)
        target_layer = vgg_updated.get_layer(layer.name)
        #print(len(layer.get_weights()))
        #print(layer.get_weights())
        if layer.name in first_conv_name:    #For the first convolutionl layer
          weights = layer.get_weights()[0]
          biases  = layer.get_weights()[1]

          weights_single_channel = avg_wts(weights)

          target_layer.set_weights([weights_single_channel, biases])  #Now set weights for the first conv. layer
          target_layer.trainable = False   #You can make this trainable if you want.

        else:
          target_layer.set_weights(layer.get_weights())   #Set weights to all other layers.
          target_layer.trainable = False  #You can make this trainable if you want.
    i+=1
   #   if layer.name in ['block5_conv3']:
    #      target_layer.trainable = True

     # if target_layer.trainable:
      #    print(layer.name)

# Check Weights of first conv layer in the new model...
#Compare against the original model weights
new_model_conv1_block1_wts_updated = vgg_updated.layers[1].get_weights()[0]
print(new_model_conv1_block1_wts_updated[:,:,0,0])


[[-0.45910555 -0.75126916 -0.07530744]
 [-0.30967137 -0.55001074 -0.06132052]
 [ 0.16115063  0.04062082  0.08363139]]
[[0.31637633 0.35581657 0.11293176]
 [0.18523026 0.19094716 0.04152641]
 [0.06393196 0.03313763 0.11437388]]
[[ 0.22171251  0.51100105 -0.07296094]
 [ 0.16024701  0.36926833 -0.02629304]
 [-0.22752108 -0.0536594  -0.19414912]]
---------------top3
[[-0.08517279 -0.08799333  0.1313252 ]
 [ 0.00909777  0.112884    0.10065673]
 [ 0.03882381 -0.06455877 -0.05428698]]
weights value
[]0weights value
[array([[[[-4.59105551e-01, -4.14526574e-02, -3.62577499e-03,
          -9.87672508e-02, -3.37070860e-02,  4.79929037e-02,
           2.32542142e-01,  3.23927671e-01,  5.90160079e-02,
           9.47738215e-02,  4.24996056e-02,  1.26620471e-01,
           1.33215979e-01,  1.22741833e-01, -7.92686343e-02,
           2.08227970e-02, -1.99647829e-01, -3.02687973e-01,
          -2.10659921e-01, -3.52898419e-01, -5.58023095e-01,
           3.20223093e-01,  4.53596050e-03, -3.09266411e-0

IndexError: ignored

In [None]:
vgg_updated.summary()

In [None]:
def feature_extractor(input_f):
  feature = vgg_updated(input_f)
  feature =Flatten()(feature)
  #feature =Dense(512,activation='sigmoid',kernel_regularizer='l2')(feature)

  feature = layers.Dense(1024, activation='relu')(feature)
  # Add a dropout rate of 0.2
  x = layers.Dropout(0.2)(x)

  return feature

In [None]:
def get_siamese(input_shape):
    left_input = Input(input_shape)
    right_input = Input(input_shape)


    left_cnn = feature_extractor(left_input)

    right_cnn = feature_extractor(right_input)



    L1_Layer = Lambda(lambda tensors: K.abs(tensors[0] - tensors[1]))
    L1_Dist = L1_Layer([left_cnn,right_cnn])
    OP = Dense(1,activation='sigmoid',kernel_regularizer='l2')(L1_Dist)
    siamese_net = Model(inputs=[left_input,right_input],outputs=OP)
    return siamese_net

In [None]:
num_iterations = 7000
batch_size = 256

evaluateEvery = 100
# Calculate 250 20-way validation testing
k = 250
N = 20
margin=1
n_classes, n_examples, w, h = x_train.shape
#n_classes, n_examples, w, h = trainImages.shape

lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate=0.05,
    decay_steps=4000,
    decay_rate=0.0001)
opt = tf.keras.optimizers.Adam(learning_rate=lr_schedule)

model = get_siamese((w, h, 1))
model.compile(
    loss='binary_crossentropy',
    optimizer=opt,
    metrics=['accuracy']
)

model.summary()

In [None]:
lossArr = []
trainAccArr = []
valAccArr = []
currTime = time.time()
counter =[]
for i in range(0,num_iterations+1):
    x,y = get_batch(batch_size)
    loss = model.train_on_batch(x,y)
    if i % evaluateEvery == 0:
        lossArr.append(loss[0])
        trainAcc = round(test_one_shot(model,N,k,'train') * 100,2)
        valAcc = round(test_one_shot(model,N,k,'val') * 100,2)
        trainAccArr.append(trainAcc)
        valAccArr.append(valAcc)
        print('Iteration',i,'('+str(round(time.time() - currTime,1))+'s) - Loss:',loss[0],'Acc:',round(loss[1],2),'',end='')
        print(k,str(N)+'-way train accuracy:', trainAcc,'%, ',end='')
        print(k,str(N)+'-way val accuracy:', valAcc,'%')
        currTime = time.time()
        counter.append(i)
plt.plot(counter,lossArr)