# Import Dependencies

In [1]:
import cv2
import os
import uuid
import random
import numpy as np
from matplotlib import pyplot as plt

from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer,Conv2D,Dense,MaxPooling2D,Input,Flatten
import tensorflow as tf

# Setting GPU Consumption Growth

In [2]:
gpus = tf.config.experimental.list_physical_devices('GPU') # all different GPU s
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu,True)

In [3]:
tf.config.experimental.list_physical_devices()

[PhysicalDevice(name='/physical_device:CPU:0', device_type='CPU')]

# Setting Folder Structure

In [4]:
POS_PATH = os.path.join('data','positive')
NEG_PATH = os.path.join('data','negative')
ANC_PATH = os.path.join('data','anchor')
if os.path.exists(POS_PATH)==False:
    os.makedirs(POS_PATH)
    os.makedirs(NEG_PATH)
    os.makedirs(ANC_PATH)

# Generating Anchor And Positive Imgs

In [42]:
cap = cv2.VideoCapture(0)

while cap.isOpened():
    ret, frame = cap.read()

    frame = frame[120:120+250,200:200+250,:]
    cv2.imshow("Image Collection", frame)


    if cv2.waitKey(1) & 0XFF == ord('a'):
        
        imgname = os.path.join(ANC_PATH,f'{uuid.uuid1()}.jpg')
        cv2.imwrite(imgname,frame)

    if cv2.waitKey(1) & 0XFF == ord('p'):
        
        imgname = os.path.join(POS_PATH,f'{uuid.uuid1()}.jpg')
        cv2.imwrite(imgname,frame)

    if cv2.waitKey(1) & 0XFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

# Image Gathering

In [5]:
anchor = tf.data.Dataset.list_files(ANC_PATH + '\*.jpg').take(300)
positive = tf.data.Dataset.list_files(POS_PATH + '\*.jpg').take(300)
negative = tf.data.Dataset.list_files(NEG_PATH + '\*.jpg').take(300)

In [6]:
def preprocess(file_path):
    byte_img = tf.io.read_file(file_path)
    img = tf.io.decode_jpeg(byte_img)
    img = tf.image.resize(img,(105,105))
    img = img/255.0
    return img

In [7]:
positives = tf.data.Dataset.zip((anchor,positive,tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor,negative,tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

In [8]:
def preprocess_twin(input_img,validation_img,label):
    return (preprocess(input_img), preprocess(validation_img),label)

In [9]:
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size = 1024)

In [10]:
train_data = data.take(round(len(data)*0.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

test_data = data.skip(round(len(data)*0.7))
test_data = test_data.take(round(len(data)*0.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

# Model Engineering

In [11]:
def make_embedding():
    
    inp = Input(shape = (105,105,3), name = 'input_image')
    
    c1 = Conv2D(64, (10,10), activation = 'relu', name = 'Convolution_1')(inp)
    m1 = MaxPooling2D(64 , (2,2), padding = 'same', name = 'MaxPooling_1')(c1)

    c2 = Conv2D(128, (7,7), activation = 'relu', name = 'Convolution_2')(m1)
    m2 = MaxPooling2D(64 , (2,2), padding = 'same', name = 'MaxPooling_2')(c2)

    c3 = Conv2D(128, (4,4), activation = 'relu', name = 'Convolution_3')(m2)
    m3 = MaxPooling2D(64 , (2,2), padding = 'same', name = 'MaxPooling_3')(c3)
 
    c4 = Conv2D(256, (4,4), activation = 'relu', name = 'Convolution_4')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation = 'sigmoid')(f1)
    
    return Model(inputs = [inp], outputs = [d1], name = 'embedding')

In [12]:
embedding = make_embedding()

In [13]:
embedding.summary()

Model: "embedding"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_image (InputLayer)     [(None, 105, 105, 3)]     0         
_________________________________________________________________
Convolution_1 (Conv2D)       (None, 96, 96, 64)        19264     
_________________________________________________________________
MaxPooling_1 (MaxPooling2D)  (None, 48, 48, 64)        0         
_________________________________________________________________
Convolution_2 (Conv2D)       (None, 42, 42, 128)       401536    
_________________________________________________________________
MaxPooling_2 (MaxPooling2D)  (None, 21, 21, 128)       0         
_________________________________________________________________
Convolution_3 (Conv2D)       (None, 18, 18, 128)       262272    
_________________________________________________________________
MaxPooling_3 (MaxPooling2D)  (None, 9, 9, 128)         0 

In [14]:
class L1Dist(Layer):
    
    # inheritance from Layer
    def __init__(self, **kwargs):
        super().__init__()
    
    def call(self,input_embedding,validation_embedding):
        return tf.math.abs(input_embedding-validation_embedding)

In [15]:
def make_siamese_model():
    
    input_image = Input(name = 'input_img', shape = (105, 105,3))
    validation_image = Input(name = 'validation_img', shape = (105, 105,3))
    
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(input_image), embedding(validation_image))
    
    classifier = Dense(1, activation = 'sigmoid')(distances)
    
    return Model(inputs = [input_image, validation_image], outputs = [classifier], name = 'SiameseNetwork')   

In [16]:
Model = make_siamese_model()

In [17]:
Model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_img (InputLayer)          [(None, 105, 105, 3) 0                                            
__________________________________________________________________________________________________
validation_img (InputLayer)     [(None, 105, 105, 3) 0                                            
__________________________________________________________________________________________________
embedding (Functional)          (None, 4096)         38960448    input_img[0][0]                  
                                                                 validation_img[0][0]             
__________________________________________________________________________________________________
distance (L1Dist)               (None, 4096)         0           embedding[0][0]     

# Training The Model

In [18]:
binary_cross_loss = tf.losses.BinaryCrossentropy()

opt = tf.keras.optimizers.Adam(1e-4) # learning rate in Adam optimizer

In [19]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir,'ckpt')
checkpoint = tf.train.Checkpoint(opt = opt, siamese_model = Model)

In [20]:
@tf.function   
def train_step(batch):
    
    with tf.GradientTape() as tape:
        
        X = batch[:2]
        y = batch[2]
        
        yhat = Model(X, training = True)
        loss = binary_cross_loss(y, yhat)
        
    print(loss)
    
    grad = tape.gradient(loss, Model.trainable_variables)
    
    opt.apply_gradients(zip(grad, Model.trainable_variables))
    
    return loss

In [21]:
def train(data, EPOCHS):
    
    for epoch in range(1, EPOCHS+1):
        
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))
        
        for idx, batch in enumerate(data):
            
            train_step(batch)
            progbar.update(idx+1)
    
        if epoch % 10 == 0:
            checkpoint.save(file_prefix=checkpoint_prefix)

In [22]:
EPOCHS = 50

In [None]:
train(train_data, EPOCHS)

# Importing Model

In [23]:
from tensorflow.keras.metrics import Precision, Recall

In [24]:
model = tf.keras.models.load_model('siamesemodel.h5',
                                   custom_objects = {'L1Dist': L1Dist,'BinaryCrossentropy':tf.losses.BinaryCrossentropy})



In [25]:
test_input, test_val, y_true = test_data.as_numpy_iterator().next()

In [26]:
yhat = model.predict([test_input, test_val])

In [None]:
[1 if prediction > 0.5 else 0 for prediction in yhat]

# Real Time Test

In [36]:
def verify(model, detection_threshold, verification_threshold):
    
    results = []
    for image in os.listdir(os.path.join('application_data','verification_images')):
        
        input_image = preprocess(os.path.join('application_data','input_image', 'input_image.jpg'))
        validation_img = preprocess(os.path.join('application_data','verification_images', image))
        
        result = model.predict(list(np.expand_dims([input_image, validation_img], axis = 1)))
        results.append(result)
        
    detection = np.sum(np.array(results) > detection_threshold)
    verification = detection/len(os.listdir(os.path.join('application_data','verification_images')))
    verified = verification > verification_threshold    
        
    return results, verified
        

In [39]:
cap = cv2.VideoCapture(0)

while cap.isOpened():
    ret, frame = cap.read()
    
    frame = frame[120:120+250,200:200+250,:]
    cv2.imshow('Verification', frame)
    
    if cv2.waitKey(10) & 0XFF == ord('v'):
        cv2.imwrite(os.path.join('application_data','input_image','input_image.jpg'), frame)
        results, verified = verify(model, 0.4, 0.4)
        print(verified)
     
    if cv2.waitKey(10) & 0XFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows() 

True
