## Import Required Packages

In [1]:
import sys
import numpy as np
import os
import shutil
import matplotlib.pyplot as plt
import cv2
%matplotlib inline

import time
from datetime import datetime

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import activations
from tensorflow.keras import preprocessing
from tensorflow.keras.preprocessing import image_dataset_from_directory
from tensorflow.keras.preprocessing.image import ImageDataGenerator, array_to_img, img_to_array, load_img

from keras.models import Sequential
from keras.optimizers import Adam, Adagrad, RMSprop
from keras.layers import Conv2D, ZeroPadding2D, Activation, Input, concatenate, Dropout
from keras.models import Model

from keras.layers.normalization import BatchNormalization
from keras.layers.pooling import MaxPooling2D
from keras.layers.merge import Concatenate
from keras.layers.core import Lambda, Flatten, Dense
from keras.initializers import glorot_uniform

from keras.engine.topology import Layer
from keras.regularizers import l2
from keras import backend as K

from keras.applications.vgg16 import decode_predictions
from keras.applications.vgg16 import VGG16

%load_ext tensorboard

from sklearn.utils import shuffle

import numpy.random as rng
np.random.seed(1337)

## Create Custom Data Generators to make Data Pairs

In [2]:
SIZE = 105
IMG_SIZE = (SIZE, SIZE)
NUM_CHANNELS = 3
BATCH_SIZE = 16

# Define training augmentations
train_datagen = ImageDataGenerator(
    rescale=1./255,
    width_shift_range=0.1,
    height_shift_range=0.1,
    zoom_range=0.2,
    shear_range=0.2)


# Define testing augmentations
test_datagen = ImageDataGenerator(rescale=1./255)

In [3]:
# # Create example augmented images
# img = load_img('F:/Work/University/Year 5/ACS6420_Advanced Project/Code/Data/Post Installation/Binary/Training/All Cases/Correct/Correct/IMG_1500.jpg')
# x = img_to_array(img)
# x = x.reshape((1,) + x.shape)

# i = 0
# for batch in train_datagen.flow(x, batch_size=1, save_to_dir='F:/Work/University/Year 5/ACS6420_Advanced Project/Code/Data/preview', save_format='jpeg'):
#     i+=1
#     if i > 9:
#         break

In [4]:
def generator(base_dir, gen_type):
    if gen_type == 'train':
        gen1 = train_datagen.flow_from_directory(base_dir+'/Greek', target_size=IMG_SIZE, color_mode='rgb', batch_size=BATCH_SIZE, class_mode='binary', shuffle=True, seed=1)
        gen2 = train_datagen.flow_from_directory(base_dir+'/Both', target_size=IMG_SIZE, color_mode='rgb', batch_size=BATCH_SIZE, class_mode='binary', shuffle=True, seed=2)
    else:
        gen1 = test_datagen.flow_from_directory(base_dir+'/Greek', target_size=IMG_SIZE, color_mode='rgb', batch_size=BATCH_SIZE, class_mode='binary', shuffle=True, seed=3)
        gen2 = test_datagen.flow_from_directory(base_dir+'/Both', target_size=IMG_SIZE, color_mode='rgb', batch_size=BATCH_SIZE, class_mode='binary', shuffle=True, seed=4)
    
    while True:
        input1 = gen1.next()
        input2 = gen2.next()
        yield([input1[0], input2[0]], input1[1]==input2[1])
    
train_dir = 'F:/Work/University/Year 5/ACS6420_Advanced Project/Code/Data/omniglot/train_snn'
val_dir = 'F:/Work/University/Year 5/ACS6420_Advanced Project/Code/Data/omniglot/val_snn/'

## Create Contrastive Loss Function

In [4]:
def contrastive_loss(y, preds, margin=1):
    y = tf.cast(y, preds.dtype)
    
    squaredPreds = K.square(preds)
    squaredMargin = K.square(K.maximum(margin - preds, 0))
    loss = K.mean(y * squaredPreds + (1 - y) * squaredMargin)

    return loss

## Create Siamese Neural Network

In [5]:
def create_model(input_shape):
    # Input layers for each network twin
    input1 = Input(input_shape)
    input2 = Input(input_shape)
    
    # Convolutional Neural Network
#     model = Sequential()
#     model.add(Conv2D(32, (5,5), input_shape=(SIZE, SIZE, NUM_CHANNELS), data_format='channels_last'))
#     model.add(BatchNormalization())
#     model.add(layers.Activation(activations.relu))
# #     model.add(Dropout(0.5))
#     model.add(MaxPooling2D())

#     model.add(Conv2D(64, (3,3)))
#     model.add(BatchNormalization())
#     model.add(layers.Activation(activations.relu))
# #     model.add(Dropout(0.5))
#     model.add(MaxPooling2D())

#     model.add(Conv2D(128, (3,3)))
#     model.add(BatchNormalization())
#     model.add(layers.Activation(activations.relu))

#     model.add(Flatten())
#     model.add(Dense(512))
#     model.add(BatchNormalization())
#     model.add(layers.Activation(activations.relu))
#     model.add(Dropout(0.5))
    
#     model.add(Dense(64))
#     model.add(BatchNormalization())
#     model.add(layers.Activation(activations.relu))
#     model.add(Dropout(0.5))
    

    model = VGG16(include_top=False, input_shape=(SIZE, SIZE, NUM_CHANNELS))

    # Only retrain last block of VGG16 - 3 conv layers
    for layer in model.layers:
#         if not layer.name.startswith('block5'):
        layer.trainable = False
            
    # Add own layers at end for task
    flat1 = Flatten()(model.layers[-1].output)
    dense1 = Dense(1024, activation='relu')(flat1)#, kernel_regularizer=l2(1E-2), bias_regularizer=l2(1E-2))(flat1)
    dropout1 = Dropout(0.5)(dense1)
    
    model = Model(inputs=model.inputs, outputs=dropout1)

    
    # Create features for each input image for comparison
    feature1 = model(input1)
    feature2 = model(input2)
    
    # Custom layer to compute the L1 distance between the features of the two images
    L1_layer = Lambda(lambda tensors:K.abs(tensors[0] - tensors[1]))
    L1_distance = L1_layer([feature1, feature2])
    
    # Calculate similarity score
    similarity = Dense(1, activation='sigmoid')(L1_distance)
    
    # Connect the inputs with the outputs
    siamese_net = Model(inputs=[input1, input2], outputs=similarity)
    
    # return the model
    return siamese_net

In [11]:
model = create_model((IMG_SIZE[0], IMG_SIZE[1], NUM_CHANNELS))
model.summary()

optimizer = Adam(lr = 0.00003)
model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['accuracy', 'Precision', 'Recall'])

Model: "functional_7"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_4 (InputLayer)            [(None, 105, 105, 3) 0                                            
__________________________________________________________________________________________________
input_5 (InputLayer)            [(None, 105, 105, 3) 0                                            
__________________________________________________________________________________________________
functional_5 (Functional)       (None, 1024)         19434304    input_4[0][0]                    
                                                                 input_5[0][0]                    
__________________________________________________________________________________________________
lambda_1 (Lambda)               (None, 1024)         0           functional_5[0][0]    

In [12]:
base_logdir = "logs/scalars/"
for f in os.listdir(base_logdir):
    file_path = os.path.join(base_logdir, f)
    
    shutil.rmtree(file_path)

logdir = "logs/scalars/" + datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = keras.callbacks.TensorBoard(log_dir=logdir)

In [13]:
model.fit(generator(train_dir, 'train'), steps_per_epoch=200//BATCH_SIZE, 
          validation_data=generator(val_dir, 'val'), validation_steps=96//BATCH_SIZE,
#           class_weight={0: 2, 1: 1}, 
          epochs=50, callbacks=[tensorboard_callback], verbose=1)

Found 200 images belonging to 1 classes.
Found 200 images belonging to 2 classes.
Epoch 1/50
Found 96 images belonging to 2 classes.
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50


Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


<tensorflow.python.keras.callbacks.History at 0x1d270c9f358>

In [14]:
%tensorboard --logdir logs/scalars --host localhost

Reusing TensorBoard on port 6006 (pid 2620), started 0:05:13 ago. (Use '!kill 2620' to kill it.)

## Voting System

In [16]:
val_dir2 = val_dir + '/Both'
im_correct = []
im_incorrect = []

# Load in validation data
for folder in os.listdir(val_dir2):
    if folder == 'Greek':
        folder_path = os.path.join(val_dir2, folder)
        for im in os.listdir(folder_path):
            im_array = cv2.imread(os.path.join(folder_path, im))
            im_array = cv2.resize(im_array, IMG_SIZE)
            im_array = cv2.cvtColor(im_array, cv2.COLOR_BGR2RGB)
            im_array = im_array/255
            im_correct.append(im_array)
    else:
        folder_path = os.path.join(val_dir2, folder)
        for im in os.listdir(folder_path):
            im_array = cv2.imread(os.path.join(folder_path, im))
            im_array = cv2.resize(im_array, IMG_SIZE)
            im_array = cv2.cvtColor(im_array, cv2.COLOR_BGR2RGB)
            im_array = im_array/255
            im_incorrect.append(im_array)
            
im_correct = np.asarray(im_correct)
im_incorrect = np.asarray(im_incorrect)

In [17]:
np.random.seed(0)
n_correct = 20  # Number of correct images to compare against

predict_correct = 0
fp = 0
fn = 0

for im in im_correct:
    Y = []
    idxs = np.random.randint(im_correct.shape[0], size=n_correct)  # Correct images to compare against
    
    for idx in idxs:
        im_ref = im_correct[idx]
        im_ref = im_ref.reshape(1,SIZE,SIZE,NUM_CHANNELS)
        im = im.reshape(1,SIZE,SIZE,NUM_CHANNELS)
        y = np.rint(model.predict([im_ref, im]))
        Y.append(y)
     
    vote_same = np.sum(Y)
    # If more than half the images compared to get voted as same, then set vote to same, otherwise to different
    if vote_same >= n_correct/2:
        Y_vote = 1
        predict_correct += 1
    else:
        Y_vote = 0
        fn += 1
        

for im in im_incorrect:
    Y = []
    idxs = np.random.randint(im_correct.shape[0], size=n_correct)  # Correct images to compare against
    
    for idx in idxs:
        im_ref = im_correct[idx]
        im_ref = im_ref.reshape(1,SIZE,SIZE,NUM_CHANNELS)
        im = im.reshape(1,SIZE,SIZE,NUM_CHANNELS)
        y = np.rint(model.predict([im_ref, im]))
        Y.append(y)
     
    vote_same = np.sum(Y)
    # If more than half the images compared to get voted as same, then set vote to same, otherwise to different
    if vote_same >= n_correct/2:
        Y_vote = 1
        fp += 1
    else:
        Y_vote = 0
        predict_correct += 1

In [19]:
Val_acc = predict_correct/96
print("Validation Accuracy = ", Val_acc)

Val_precision = predict_correct/(predict_correct + fp)
print("Validation Precision = ", Val_precision)

Val_recall = predict_correct/(predict_correct + fn)
print("Validation Recall = ", Val_recall)

Validation Accuracy =  0.8229166666666666
Validation Precision =  0.9294117647058824
Validation Recall =  0.8777777777777778


## N-way validation

In [20]:
def create_task(N, source):
    im_list = []
    target_list = []
    if source == 'train':
        im_dir = 'F:/Work/University/Year 5/ACS6420_Advanced Project/Code/Data/Post Installation/Binary/Training/All Cases/Both'
    else:
        im_dir = 'F:/Work/University/Year 5/ACS6420_Advanced Project/Code/Data/Post Installation/Binary/Validation/All Cases/Both'
    
    # Get single reference image
    reference = rng.choice(os.listdir(im_dir+'/Correct'))
    im_reference = cv2.imread(os.path.join(im_dir+'/Correct', reference))
    im_reference = cv2.resize(im_reference, IMG_SIZE)
    im_reference = cv2.cvtColor(im_reference, cv2.COLOR_BGR2RGB)
    im_reference = im_reference/255
    im_reference = im_reference.reshape(1,SIZE,SIZE,NUM_CHANNELS)
    target_reference = 0
        
    #plt.imshow(im_reference)
    #plt.show()
        
    # Get N test images
    for i in range(N):
        if i == 0:
            test = rng.choice(os.listdir(im_dir+'/Correct'))
            im_test = cv2.imread(os.path.join(im_dir+'/Correct', test))
        else:
            test = rng.choice(os.listdir(im_dir+'/Incorrect'))
            im_test = cv2.imread(os.path.join(im_dir+'/Incorrect', test))

        im_test = cv2.resize(im_test, IMG_SIZE)
        im_test = cv2.cvtColor(im_test, cv2.COLOR_BGR2RGB)
        im_test = im_test/255
        im_list.append(im_test)
        if i == 0:
            target_list.append(0)
        else:
            target_list.append(1)
        
    #plt.imshow(im_list[-1])
    #plt.show()
    
    #im_list, target_list = shuffle(im_list, target_list)
    
    im_array = np.asarray(im_list)
    
    return im_reference, target_reference, im_array, target_list

In [21]:
def n_way_validate(model, N, it, source):
    n_correct = 0
    
    for i in range(it):
        score_list = []
        im_reference, target_reference, im_list, target_list = create_task(N, source)
        
        for j in range(N):
            score = model.predict([im_reference, np.reshape(im_list[j], (1,SIZE,SIZE,NUM_CHANNELS))])
            score_list.append(score)
            
        if np.argmax(score_list) == np.argmin(target_list):
            n_correct += 1
            
    percent_correct = (100*n_correct / it)
    
    return percent_correct, score_list
            

In [22]:
N_way = 10
iterations = 40
val_source = 'train'

result, score_list = n_way_validate(model, N_way, iterations, val_source)
print(result)

27.5


In [23]:
val_source = 'validation'

result, score_list = n_way_validate(model, N_way, iterations, val_source)
print(result)

17.5


In [None]:
score_list