# imports

In [None]:
import sys
sys.path.append('D:/Anaconda/envs/task1/Lib/site-packages')

import cv2
import os
import random
import numpy as np
from matplotlib import pyplot as plt

from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf

# Setting GPU consumption growth

In [None]:
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu,True)

# Creating paths for Positive, Negative and Anchor Images

In [None]:
POS_PATH = os.path.join('data','positive')
NEG_PATH = os.path.join('data','negative')
ANC_PATH = os.path.join('data','anchor')

In [None]:
#make directories
os.makedirs(POS_PATH)
os.makedirs(NEG_PATH)
os.makedirs(ANC_PATH)

In [None]:
!tar -xf lfw.tgz

In [None]:
for directory in os.listdir('lfw'):
    for file in os.listdir(os.path.join('lfw',directory)):
        EX_PATH = os.path.join('lfw',directory,file)
        NEW_PATH = os.path.join(NEG_PATH, file)
        os.replace(EX_PATH,NEW_PATH)

In [None]:
#import uuid library for unique image names
import uuid

# video capture to gather positive and anchor Images

* we gather negative images from the 'lone faces in the wild' dataset but for anchor and positive images make sure the dataset has images with varying backgrounds and lightings

In [None]:
#capture video and show
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()
    frame = frame[120:370, 200:450, :]
    cv2.imshow('Image Collection', frame)
    
    #collect anchors
    if cv2.waitKey(1) & 0XFF == ord('a'):
        imgname = os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(imgname,frame)
    
    #collect positives
    if cv2.waitKey(1) & 0XFF == ord('p'):
        imgname = os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(imgname,frame)
        
       
    #breaking
    if cv2.waitKey(1) & 0XFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()

# take 400 photos each for training the neural net

In [None]:
anchor = tf.data.Dataset.list_files(ANC_PATH+'\*jpg').take(400)
positive = tf.data.Dataset.list_files(POS_PATH+'\*jpg').take(400)
negative = tf.data.Dataset.list_files(NEG_PATH+'\*jpg').take(400)

In [None]:
#image scaling from [0,1]
def preprocess(file_path):
    byte_img = tf.io.read_file(file_path)
    img = tf.io.decode_jpeg(byte_img)
    img = tf.image.resize(img, (100,100))
    img = img / 255.0
    return img

In [None]:
#create labelled dataset
positives = tf.data.Dataset.zip((anchor,positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor,negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

In [None]:
samples = data.as_numpy_iterator()

In [None]:
def preprocess_twin(input_img, validation_img, label):
    return(preprocess(input_img), preprocess(validation_img), label)

# Dataloader Pipeline
* zip data into labelled dataset with each examples containing an anchor img, positive/negative img, and a 0/1 label
* concatenate into one data
* preprocess the images inside every example
* shuffle and take 70% as training data
* batch it into 16 per batch

In [None]:
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=1024)

In [None]:
#training partition
train_data = data.take(round(len(data)*0.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [None]:
#testing partition
test_data = data.skip(round(len(data)*0.7))
test_data = test_data.take(round(len(data)*0.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

# embedding layer
* contains the structure of the neural network
* will take a 100x100 input
* pass through a 2D convolution layer -> maxpooling layer three times
* flatten the feature map
* pass through a sigmoid dense layer with 4096 features nodes each of which is connected to every node of the flatten layer

In [None]:
def make_embedding():
    inp = Input(shape=(100,100,3), name="input_image")
    
    #first block
    c1 = Conv2D(64, (10,10), activation='relu')(inp)
    m1 = MaxPooling2D(64, (2,2), padding ='same')(c1)
    
    #second block
    c2 = Conv2D(128, (7,7), activation='relu')(m1)
    m2 = MaxPooling2D(64, (2,2), padding ='same')(c2) 
               
    #third block
    c3 = Conv2D(128, (4,4), activation='relu')(m2)
    m3 = MaxPooling2D(64, (2,2), padding ='same')(c3)
    
    #final embedding
    c4 = Conv2D(256, (4,4), activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation = 'sigmoid')(f1) 
    
    return Model(inputs=[inp], outputs=[d1], name='embedding')

# Siamese distance layer
to calculate the distance between the anchor image and the corresponding positive/negative image after it has passed through the  embedding

In [None]:
class L1Dist(Layer):
    
    def __init__(self, **kwargs):
        super().__init__()
    
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

In [None]:
embedding = make_embedding()

# the final model

* takes the input and validation image
* passess both through the embedding
* calculates the siamese distance between the both dense layers
* uses this distance to create a dense classifier with sigmoid activation

In [None]:
def make_siamese_model():
    
    #inputs
    input_image = Input(name='input_img', shape=(100,100,3))
    validation_image = Input(name='validation_img', shape=(100,100,3))
    
    #siamese distance
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(input_image), embedding(validation_image))
    
    #classification layer
    classifier = Dense(1, activation='sigmoid')(distances)
    
    return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')


In [None]:
siamese_model = make_siamese_model()

# we use cross entropy as the loss function

In [None]:
 #loss function
binary_cross_loss = tf.losses.BinaryCrossentropy()
 #optimiser
opt = tf.keras.optimizers.Adam(1e-4)

In [None]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model = siamese_model)

# Defining each training step
* get a batch from the train set
* get anchor and validation images as x
* get label as y
* pass x into the neural net to get classifier value yhat
* calculate loss between y and yhat
* calculate gradients of loss function with respect to all trainable variables inside the model
* apply the gradients to all trainable variables

In [None]:
@tf.function
def train_step(batch):
    
    with tf.GradientTape() as tape:
        #get anchor and positive/negative image
        x = batch[:2]
        #get label
        y = batch[2]
        
        #forward pass
        yhat = siamese_model(x, training=True)
        #calculate loss
        loss = binary_cross_loss(y, yhat)
        
    #gradients
    grad = tape.gradient(loss, siamese_model.trainable_variables)
    
    #backprop
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))
     
    return loss

# train the data in batches and save every 10 epochs

In [None]:
def train(data, EPOCHS):
    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch,EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))
        
        for idx, batch in enumerate(data):
            train_step(batch)
            progbar.update(idx+1)
        
        if epoch % 10 == 0:
            checkpoint.save(file_prefix = checkpoint_prefix)

In [None]:
train(train_data, 50)

In [None]:
#save weights

siamese_model.save('siamesemodel.h5')

In [None]:
 #reload model
model = tf.keras.models.load_model('siamesemodel.h5', 
                                    custom_objects={'L1Dist':L1Dist, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy})

# verification function for few shot face verification after training the model

In [None]:
def verify(frame, model, detection_threshold, verification_threshold):
    results = []
    for image in os.listdir(os.path.join('application_data','verification_images')):
        input_img = preprocess(os.path.join('application_data','input_image','input_image.jpg'))
        validation_img = preprocess(os.path.join('application_data','verification_images',image))
        
        result = model.predict(list(np.expand_dims([input_img,validation_img], axis=1)))
        results.append(result)
        
    detection = np.sum(np.array(results) > detection_threshold)
    verification = detection / len(os.listdir(os.path.join('application_data', 'verification_images')))
    verified = verification > verification_threshold
    
    return results, verified

In [None]:
#opencv detection
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()
    frame = frame[120:370, 200:450, :]
    
    cv2.imshow('verification', frame)
    
    #verification
    if cv2.waitKey(10) & 0xFF == ord('v'):
        cv2.imwrite(os.path.join('application_data','input_image','input_image.jpg'), frame)
        results, verified = verify(frame, model, 0.6, 0.7)
        print(verified)
        
    if cv2.waitKey(10) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()

In [None]:
list(results) 

In [None]:
list(results) 

In [None]:
np.sum(np.squeeze(results) >0.8)