In [1]:
import tensorflow as tf 
import keras
from keras.applications.inception_resnet_v2 import InceptionResNetV2
import keras.utils as image
from tensorflow.keras.applications.inception_resnet_v2 import preprocess_input, decode_predictions
import numpy as np
from keras.models import Model,Sequential
from tensorflow.keras.layers import *
from tensorflow.keras import optimizers
import pandas as pd 
import cv2
import os 
import matplotlib.pyplot as plt
from tensorflow.keras.metrics import Precision,Recall

Data Pipeline

In [2]:
physical_devices =tf.config.list_physical_devices ('GPU')
tf.config.experimental.set_memory_growth(physical_devices[0],True)

In [3]:
csv_file_path = './data/train.csv'
directory = './data/'
batch_size= 16

In [4]:
df = pd.read_csv(csv_file_path)
image1 = df['frame_A'].values
image2 = df['frame_B'].values
# image_pairs = df[['frame_A','frame_B']].values
labels = df['label'].values

In [5]:
data = tf.data.Dataset.from_tensor_slices((image1,image2,labels))

In [6]:
data

<TensorSliceDataset element_spec=(TensorSpec(shape=(), dtype=tf.string, name=None), TensorSpec(shape=(), dtype=tf.string, name=None), TensorSpec(shape=(), dtype=tf.int64, name=None))>

In [7]:
def read_image(image_path):

   img = tf.io.read_file(image_path)
   img = tf.image.decode_image(img,channels=3,dtype=tf.float32)
   
   return img

In [8]:
def read_pair(image1_path,image2_path,label):
   
   return (read_image(directory+image1_path),read_image(directory+image2_path),label)

In [9]:
# test_pairs,test_label = read_pair(image_pairs[0],labels[0])
# img_test =test_pairs[1]
# a = img_test.numpy()
# print(a.max())
# plt.imshow(a)

In [10]:
data = data.map(read_pair)
data = data.cache()
data = data.shuffle(buffer_size=1024)

In [11]:
# samples = data.as_numpy_iterator()
# example = samples.next()
# print(len(example[0][0][0][0]))
# example

In [12]:
# plt.imshow(example[0])

In [13]:
# train test partition
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(4)
train_data = train_data.prefetch(2)

In [14]:
train_data

<PrefetchDataset element_spec=(TensorSpec(shape=<unknown>, dtype=tf.float32, name=None), TensorSpec(shape=<unknown>, dtype=tf.float32, name=None), TensorSpec(shape=(None,), dtype=tf.int64, name=None))>

In [103]:
train_samples = train_data.as_numpy_iterator()
train_sample =train_samples.next()
train_sample[2]

In [128]:
test_data = data.skip(round(len(data)*.7))
test_data=test_data.take(round(len(data)*.3))
test_data=test_data.batch(4)
test_data=test_data.prefetch(2)

In [116]:
test_data

<PrefetchDataset element_spec=(TensorSpec(shape=<unknown>, dtype=tf.float32, name=None), TensorSpec(shape=<unknown>, dtype=tf.float32, name=None), TensorSpec(shape=(None,), dtype=tf.int64, name=None))>

In [117]:
# test_samples = test_data.as_numpy_iterator()
# test_sample =test_samples.next()
# test_sample[2]

Model Engineering

In [18]:
def make_embedding():
   inp = Input(shape = (299,299,3),name = 'input_image')
   base_model = InceptionResNetV2(weights='imagenet', include_top=False,input_tensor=inp)
   l_1 = GlobalAveragePooling2D()(base_model.output)
   return Model(inputs=[inp],outputs=[l_1],name='feature_extractor_cnn')

In [19]:
embedding = make_embedding()
# embedding.summary()
# print(len(embedding.layers))

In [20]:
# making the concat layer 
class L2Concat(Layer):
    def __init__(self,**kwargs):
        super().__init__()

    def call(self,image_1_embedding,image_2_embedding):
         return  tf.concat([image_1_embedding,image_2_embedding],-1)

In [21]:
def make_siamese_model():
    
    input_image_1 = Input(name ='input_img_1',shape = (299,299,3))
    input_image_2 = Input(name ='input_img_2',shape = (299,299,3))

    siamese_layer = L2Concat()
    siamese_layer._name = 'total_features' 
    
    features = siamese_layer(embedding(input_image_1),embedding(input_image_2))

    classifier = Dense(1,activation='sigmoid')(features)

    # classifier = Dense(2,activation='softmax')(features)

    return Model(inputs = [input_image_1,input_image_2],outputs=classifier,name = 'SiameseNetwork')

In [22]:
siamese_model = make_siamese_model()
# siamese_model.summary()

Training

In [23]:
# loss function 
binary_cross_loss =tf.losses.BinaryCrossentropy()
# optimizer 
opt = tf.keras.optimizers.Adam(3e-4)

In [24]:
# establishing  checkpoints 
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir,'ckpt')
checkpoint = tf.train.Checkpoint(opt =opt,siamese_model = siamese_model)

In [25]:
# test_batch=train_data.as_numpy_iterator()
# batch_1=test_batch.next()
# len(batch_1)

In [26]:
# len(batch_1[0])
# X=batch_1[:2]

In [27]:
# np.array(X).shape

In [28]:
# y=batch_1[2]
# y

In [29]:
#Build Train Step function
@tf.function
def train_step(batch):

    with tf.GradientTape() as tape: 

        X = batch[:2]

        y = batch[2]


        yhat = siamese_model(X,training =True)

        loss = binary_cross_loss(y,yhat)

    print(loss)

    grad = tape.gradient(loss,siamese_model.trainable_variables)

    opt.apply_gradients(zip(grad,siamese_model.trainable_variables))   

    return loss


In [30]:
# build training loop 

def train(data,EPOCHS):

    for epoch in range(1,EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch,EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))

        for idx,batch in enumerate(data):

            train_step(batch)
            progbar.update(idx+1)

        #checkpoints 
        if epoch%10 == 0:
            checkpoint.save(file_prefix = checkpoint_prefix)  

In [31]:
EPOCHS =50

train(train_data,EPOCHS)


 Epoch 1/50
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)

 Epoch 2/50

 Epoch 3/50

 Epoch 4/50

 Epoch 5/50

 Epoch 6/50

 Epoch 7/50

 Epoch 8/50

 Epoch 9/50

 Epoch 10/50

 Epoch 11/50

 Epoch 12/50

 Epoch 13/50

 Epoch 14/50

 Epoch 15/50

 Epoch 16/50

 Epoch 17/50

 Epoch 18/50

 Epoch 19/50

 Epoch 20/50

 Epoch 21/50

 Epoch 22/50

 Epoch 23/50

 Epoch 24/50

 Epoch 25/50

 Epoch 26/50

 Epoch 27/50

 Epoch 28/50

 Epoch 29/50

 Epoch 30/50

 Epoch 31/50

 Epoch 32/50

 Epoch 33/50

 Epoch 34/50

 Epoch 35/50

 Epoch 36/50

 Epoch 37/50

 Epoch 38/50

 Epoch 39/50

 Epoch 40/50

 Epoch 41/50

 Epoch 42/50

 Epoch 43/50

 Epoch 44/50

 Epoch 45/50

 Epoch 46/50

 Epoch 47/50

 Epoch 48/50

 Epoch 49/50

 Epoch 50/50


Evaluation 

In [126]:
test_img1,test_img2,y_true = test_data.as_numpy_iterator().next() 

In [127]:
y_true

array([1, 0, 1, 1], dtype=int64)

In [129]:
y_hat = siamese_model.predict([test_img1,test_img2])
y_hat



array([[9.9990010e-01],
       [2.0170526e-07],
       [9.9999917e-01],
       [9.9980313e-01]], dtype=float32)

In [135]:
y__ = [1 if prediction > 0.5 else 0 for prediction in y_hat]

In [136]:
y__

[1, 0, 1, 1]

In [131]:
r = Recall()

r.update_state(y_true,y_hat)

recall_batch = r.result().numpy()

1.0

In [132]:
p= Precision()

p.update_state(y_true,y_hat)

precision_batch = p.result().numpy()



1.0


In [152]:
precision_values = []
recall_values = []
y_true_total = []
y_hat_total = []


In [153]:
for i in range(0,75):
    
    test_img1,test_img2,y_true = test_data.as_numpy_iterator().next() 
    
    y_hat = siamese_model.predict([test_img1,test_img2])
     
    y_hat_rounded =  [1 if prediction > 0.5 else 0 for prediction in y_hat]
    
    print(y_true)
    print(y_hat)
    print(y_hat_rounded)

    y_true_total.extend(y_true)
    y_hat_total.extend(y_hat_rounded)



    r = Recall()
    r.update_state(y_true,y_hat)
    recall_batch = r.result().numpy()

    p = Precision()
    p.update_state(y_true,y_hat)
    precision_batch = p.result().numpy()
    
    precision_values.append(precision_batch)
    recall_values.append(recall_batch)



[0 1 1 1]
[[8.1720182e-06]
 [9.9989772e-01]
 [9.3562728e-01]
 [9.8659617e-01]]
[0, 1, 1, 1]
[1 0 0 0]
[[9.9996066e-01]
 [1.0403514e-07]
 [2.2459773e-12]
 [5.9494315e-13]]
[1, 0, 0, 0]
[1 1 0 1]
[[9.6593261e-01]
 [9.9995565e-01]
 [1.2604176e-04]
 [9.9359196e-01]]
[1, 1, 0, 1]
[0 0 1 1]
[[1.5165813e-14]
 [2.9322490e-02]
 [9.9940968e-01]
 [9.9999905e-01]]
[0, 0, 1, 1]
[1 1 0 1]
[[9.9993300e-01]
 [9.8416358e-01]
 [1.0814568e-05]
 [9.8874247e-01]]
[1, 1, 0, 1]
[1 1 1 1]
[[0.99958235]
 [0.99966335]
 [0.9980901 ]
 [0.9652585 ]]
[1, 1, 1, 1]
[0 1 1 0]
[[1.8608046e-09]
 [9.9994576e-01]
 [9.9987662e-01]
 [3.4173701e-07]]
[0, 1, 1, 0]
[0 1 1 1]
[[3.9477902e-18]
 [9.9204504e-01]
 [9.9985778e-01]
 [9.9388582e-01]]
[0, 1, 1, 1]
[1 1 0 0]
[[9.9937004e-01]
 [9.9999952e-01]
 [2.4945665e-13]
 [1.2171021e-10]]
[1, 1, 0, 0]
[0 0 1 1]
[[1.1793779e-06]
 [2.5828507e-05]
 [9.9994528e-01]
 [9.9999344e-01]]
[0, 0, 1, 1]
[1 0 0 1]
[[9.9940968e-01]
 [1.8199728e-04]
 [6.5191681e-14]
 [9.9848831e-01]]
[1, 0, 0, 1]


In [154]:
print(len(recall_values))
print(len(precision_values))
print(len(y_true_total))
print(len(y_hat_total))

75
75
300
300


In [155]:
def calc_avg(lst):
    return sum(lst) / len(lst)

In [156]:
def calc_accuracy(true_values,predicted_values):
    res=0
    for i in range(0,len(true_values)):
           if(true_values[i]==predicted_values[i]):
            res+=1

    return ((res/len(true_values))*100)



In [157]:
recall_avg = calc_avg(recall_values)
precision_avg = calc_avg(precision_values)
accuracy = calc_accuracy(y_true_total,y_hat_total)

In [158]:
print('Precision = {}'.format(precision_avg))

Precision = 0.9266666666666666


In [159]:
print('Recall = {}'.format(recall_avg))

Recall = 0.9333333333333333


In [161]:
print('Accuracy = {}%'.format(accuracy))

Accuracy = 99.66666666666667%


In [162]:
siamese_model.save('siamesemodel.h5')



In [165]:
model =tf.keras.models.load_model('siamesemodel.h5',custom_objects={'L2Concat':L2Concat,'BinaryCrossentropy':tf.losses.BinaryCrossentropy})

