### Importing dependencies

In [1]:
## Import dependencies

import cv2
import os
import random
import numpy as np
from matplotlib import pyplot as plt


In [2]:
## Import tensorflow dependencies - FUnctional API
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer,Conv2D,Dense,MaxPooling2D,Input,Flatten
import tensorflow as tf



## Set GPU Growth

##### Avoid `Out of Memory` Error by setting GPU Memory Consumption Growth

In [3]:
gpus = tf.config.experimental.list_physical_devices('GPU') #list all the gpus present
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu,True)

In [4]:
len(gpus)

1

### Create folder structure

In [5]:
# Anchor means the original input image
# Positive means whether the original image and verification image matches
# Negetive means original image and verificationn image do not match

# Positive and negetive comes inside of verification image.

POS_PATH = os.path.join('data','positive')
NEG_PATH = os.path.join('data','negative')
ANC_PATH = os.path.join('data','anchor')

In [6]:
# Make the directories
# os.makedirs(POS_PATH)
# os.makedirs(NEG_PATH)
# os.makedirs(ANC_PATH)

# Collect Positives and Anchors

### Unzip labelled Faces in Wild Dataset

In [7]:
# http://vis-www.cs.umass.edu/lfw/

In [8]:
# Uncompress Tar GZ Labelled Faces in the Wild Dataset

!tar -xf lfw.tgz

In [9]:
# Move LFW Images to the following repository data/negative

for directory in os.listdir('lfw'):
    for file in os.listdir(os.path.join('lfw',directory)):
        EX_PATH = os.path.join('lfw',directory,file)
        NEW_PATH = os.path.join(NEG_PATH,file)
        os.replace(EX_PATH,NEW_PATH)
        

In [10]:
# for i in range(10):
# #     print("*")
#     for j in range(i+1):
#         print("*",end="")
#     print("")
        

In [11]:
os.listdir('lfw')

['Aaron_Eckhart',
 'Aaron_Guiel',
 'Aaron_Patterson',
 'Aaron_Peirsol',
 'Aaron_Pena',
 'Aaron_Sorkin',
 'Aaron_Tippin',
 'Abbas_Kiarostami',
 'Abba_Eban',
 'Abdel_Aziz_Al-Hakim',
 'Abdel_Madi_Shabneh',
 'Abdel_Nasser_Assidi',
 'Abdoulaye_Wade',
 'Abdulaziz_Kamilov',
 'Abdullah',
 'Abdullah_Ahmad_Badawi',
 'Abdullah_al-Attiyah',
 'Abdullah_Gul',
 'Abdullah_Nasseef',
 'Abdullatif_Sener',
 'Abdul_Majeed_Shobokshi',
 'Abdul_Rahman',
 'Abel_Aguilar',
 'Abel_Pacheco',
 'Abid_Hamid_Mahmud_Al-Tikriti',
 'Abner_Martinez',
 'Abraham_Foxman',
 'Aby_Har-Even',
 'Adam_Ant',
 'Adam_Freier',
 'Adam_Herbert',
 'Adam_Kennedy',
 'Adam_Mair',
 'Adam_Rich',
 'Adam_Sandler',
 'Adam_Scott',
 'Adelina_Avila',
 'Adel_Al-Jubeir',
 'Adisai_Bodharamik',
 'Adolfo_Aguilar_Zinser',
 'Adolfo_Rodriguez_Saa',
 'Adoor_Gopalakarishnan',
 'Adriana_Lima',
 'Adriana_Perez_Navarro',
 'Adrianna_Zuzic',
 'Adrian_Annus',
 'Adrian_Fernandez',
 'Adrian_McPherson',
 'Adrian_Murrell',
 'Adrian_Nastase',
 'Adrien_Brody',
 'Afton_S

### Collect Positive and Anchor Classes

In [12]:
## uuid library to generate unique image names
import uuid #universal uniform identifier

In [13]:
'{}.jpg'.format(uuid.uuid1())

'a6a6f252-46ed-11ec-b891-4074e0844ddc.jpg'

In [None]:
os.path.join(ANC_PATH,'{}.jpg'.format(uuid.uuid1()))

In [None]:
## Establish a connection to the webcam

cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret,frame = cap.read() #capture video frame by frame
    
    #Cut down frame to 250px x 250px
    frame = (frame[220:220+250,250:250+250:])
    
    # Collect anchors 
    if cv2.waitKey(1) & 0xFF == ord('a'): # on pressing 'a' it collects anchor images
        # Create the unique file path
        imgname = os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1()))
        # Write out anchor image
        cv2.imwrite(imgname,frame)
    
    # Collect Positives
    if cv2.waitKey(1) & 0xFF == ord('p'): # on pressing 'p' it collects positive images
        # Create the unique file path
        imgname = os.path.join(POS_PATH,'{}.jpg'.format(uuid.uuid1()))
        # Write out positive image
        cv2.imwrite(imgname,frame)
    
    
    
    
    cv2.imshow('frame',frame) # render image backk to screen
    
    if cv2.waitKey(1) & 0xFF == ord('q'): # the 'q' button is set as the quitting button
        break
  
cap.release()
cv2.destroyAllWindows() 
    
    

In [None]:
plt.imshow(frame)

In [None]:
frame.shape

In [None]:
# plt.imshow(frame[120:120+250,200:200+250:])

# Load and Preprocess Image

### Get image Directories

In [None]:
anchor = tf.data.Dataset.list_files(ANC_PATH+'\*.jpg').take(300) # go and grab all of the different image wthin a specific directory. Only take 300 images 
positive = tf.data.Dataset.list_files(POS_PATH+'\*.jpg').take(300)
negative = tf.data.Dataset.list_files(NEG_PATH+'\*.jpg').take(300)

In [None]:
dir_test = anchor.as_numpy_iterator()

In [None]:
dir_test.next() # Continue grabbing the next element

### Preprocessing scale and Resize

In [None]:
def preprocess(file_path):
    
    # Read image from file path 
    byte_img = tf.io.read_file(file_path)
    # Load the image
    img = tf.image.decode_jpeg(byte_img)
    
    # Preprocessing steps - resizing the image to be 100x100x3
    img = tf.image.resize(img,(100,100))
    
    # Scale image to be between 0 and 1
    img = img/255.0
    
    # Return the image
    return img
    

In [None]:
img = preprocess('data\\anchor\\a394e16c-3e1a-11ec-8494-4074e0844ddc.jpg')

In [None]:
img.numpy().max()

In [None]:
img.numpy().min()

In [None]:
plt.imshow(img)

In [None]:
# dataset.map(preprocess)

### Create Labelled Dataset

In [None]:

# (anchor, positive) => 1,1,1,1,1
# (anchor, negative) => 0,0,0,0,0

In [None]:
class_labels = tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))

In [None]:
# positives will be containing data having 1 and negatives data having 0
positives =  tf.data.Dataset.zip((anchor,positive,tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor,negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

In [None]:
samples = data.as_numpy_iterator()

In [None]:
exampple = samples.next() #it contains anchor data positive value and labels

In [None]:
exampple

### Build Train and Test Partition

In [None]:
def preprocess_twin(input_img, validation_img,label):
    """
    input_img is anchor
    validation_img is either positive or negative
    respective label is also returned
        
    """
    return(preprocess(input_img),preprocess(validation_img),label) 


In [None]:
res = preprocess_twin(*exampple)

In [None]:
len(res)

In [None]:
res[0]

In [None]:
plt.imshow(res[1]) # EIther positive or negative

In [None]:
plt.imshow(res[0])# annchoe

In [None]:
res[2] #label

In [None]:
# Build data loader pipeline

data = data.map(preprocess_twin)
data = data.cache() #caching our images
data = data.shuffle(buffer_size=10000) # shuffle the data

In [None]:
data

In [None]:

##### <ShuffleDataset shapes: ((100, 100, None) #anchor image, (100, 100, None) #negative or positive image, ()) #label, types: (tf.float32, tf.float32, tf.float32)>

In [None]:
samples = data.as_numpy_iterator()

In [None]:
samp = samples.next()

In [None]:
plt.imshow(samples.next()[0])

In [None]:
plt.imshow(samples.next()[1])

#### keeping the image constant using samp variable

In [None]:
plt.imshow(samp[0])

In [None]:
plt.imshow(samp[1])

In [None]:
samp[2]

In [None]:
### Training and testing partition
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8) # starts preparing the data before head


In [None]:
round(len(data)*.7)

In [None]:
train_data

In [None]:
# testing partition
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

In [None]:
test_data

In [None]:
(round(len(data)*.3))

# Model Engineering

### Build Embedding layer

In [None]:
inp = Input(shape=(100,100,3),name='input_image')
inp

In [None]:
c1 = Conv2D(64,(10,10),activation='relu')(inp)
c1

In [None]:
m1 = MaxPooling2D(64, (2,2), padding='same')(c1)

In [None]:
c2 = Conv2D(128, (7,7), activation='relu')(m1)
m2 = MaxPooling2D(64, (2,2), padding='same')(c2)

In [None]:
c3 = Conv2D(128, (4,4), activation='relu')(m2)
m3 = MaxPooling2D(64, (2,2), padding='same')(c3)

In [None]:

c4 = Conv2D(256, (4,4), activation='relu')(m3)
f1 = Flatten()(c4)
d1 = Dense(4096, activation='sigmoid')(f1)

In [None]:

mod = Model(inputs=[inp], outputs=[d1], name='embedding')

In [None]:
mod.summary()

In [None]:
def make_embedding():
    inp = Input(shape=(100,100,3),name='input_image')
    #first block
    c1 = Conv2D(64,(10,10),activation='relu')(inp) #inp is connecting with first layer
    m1 = MaxPooling2D(64,(2,2),padding='same')(c1)
    #second block 
    c2 = Conv2D(128,(7,7),activation='relu')(m1)
    m2 = MaxPooling2D(64,(2,2),padding='same')(c2)
    
    #Third block
    c3 = Conv2D(128,(4,4),activation='relu')(m2)
    m3 = MaxPooling2D(64,(2,2),padding='same')(c3)
    
    # Final embedding block
    c4 = Conv2D(256,(4,4),activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096,activation='sigmoid')(f1)
    
    
    return Model(inputs=[inp] , outputs=[d1] , name='embedding')


In [None]:
embedding = make_embedding()

In [None]:
embedding.summary()

### Build distance layer

In [None]:
# siamese L1 distance class
class L1Dist(Layer): #layer comes from import layers
    """Takes output of the embedding.
    Subtracts the Anchor from either positive or negative
    so that we can compare both the faces"""
    
    def __init__(self,**kwargs):
        super().__init__()
        
    #similarity calculation
    def call(self,input_embedding,validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)


In [None]:
l1 = L1Dist()

### Make siamese model

In [None]:
input_image = Input(name='input_img',shape=(100,100,3))
validation_image = Input(name='validation_img',shape=(100,100,3))

In [None]:
inp_embedding = embedding(input_image)
val_embedding = embedding(validation_image)

In [None]:
inp_embedding

In [None]:
val_embedding

In [None]:
siamese_layer = L1Dist()
distances = siamese_layer(inp_embedding, val_embedding) # distance b/w i/p embedding and validation embedding

In [None]:
classifier = Dense(1, activation='sigmoid')(distances)
classifier

In [None]:
siamese_network = Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')

In [None]:
siamese_network.summary()

##### Create a funtion for above stuff

In [None]:
def make_siamese_model():
    
    # Anchor image i/p in the n/w
    input_image = Input(name='input_img',shape=(100,100,3)) #this Input module is from tensorflow.keras.layers
    #validation image i/p in the n/w
    validation_image = Input(name='validation_img',shape=(100,100,3))
    
    # combine siamese distance components
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(input_image),embedding(validation_image)) # distance b/w i/p embedding and validation embedding
    
    # classification layer
    classifier = Dense(1, activation='sigmoid')(distances)
    
    return Model(inputs = [input_image,validation_image], outputs=classifier,name = 'SiameseNetwork')



In [None]:

siamese_model = make_siamese_model()

In [None]:
siamese_model.summary()


# Training

### Setup loss and Optimizers

In [None]:
binary_cross_loss = tf.losses.BinaryCrossentropy()


In [None]:
opt = tf.keras.optimizers.Adam(1e-4) # 0.0001

### Establish checkpoints

In [None]:

if not os.path.exists('training_checkpoints'):
    os.makedirs('training_checkpoints')
    
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir,'ckpt')
checkpoint = tf.train.Checkpoint(opt = opt, siamese_model=siamese_model)


## To load pretrained weights form last checkpoint use
# model.load('path_to_checkpoint')

### Build train step function

In [None]:
test_batch = train_data.as_numpy_iterator()

In [None]:
batch_1 = test_batch.next()

In [None]:
len(batch_1)

In [None]:
len(batch_1[0])

In [None]:
len(batch_1[1])

In [None]:
batch_1[2] #label

In [None]:
X = batch_1[:2]

In [None]:
np.array(X).shape

In [None]:
y = batch_1[2]

In [None]:
y

In [None]:
@tf.function # compiles a function into a callable tensorflow graph

def train_step(batch):
    with tf.GradientTape() as tape:
        #Get anchor and positive/negative image
        X = batch[:2]
        # Get label
        y = batch[2]
        
        # Forward pass
        yhat = siamese_model(X, training=True)
        # calculate loss
        loss = binary_cross_loss(y,yhat)
        print(loss)
    
        # calculate gradients
        grad = tape.gradient(loss, siamese_model.trainable_variables)
        
        # Calculate updated weights and apply to siamese model
        opt.apply_gradients(zip(grad, siamese_model.trainable_variables))
        
        # the optimizer here is calculating and propagating the new weights using
        # adam's optimiation algorithm.
        
        return loss
    

### Build training loop

In [None]:
### Import metric calculations
from tensorflow.keras.metrics import Precision,Recall

In [None]:
def train(data, Epochs):
    """Takes data and Epochs as inputs 
    and trains the model"""
    for epoch in range(1,EPOCHS+1):
        print('\n epoch {}/{}'.format(epoch,Epochs))
        progbar = tf.keras.utils.Progbar(len(data))
        
        # Creating a metric object
        r = Recall()
        p = Precision()
        
        # Loop through each batch
        for idx,batch in enumerate(train_data):
            # Run train step here
            loss = train_step(batch)
            yhat = siamese_model.predict(batch[:2])
            r.update_state(batch[2],yhat)
            p.update_state(batch[2],yhat)
            progbar.update(idx+1)
            print(loss.numpy(), r.result().numpy(), p.result().numpy())
            
        # Save checkpoints
        if epoch % 10 ==0:
            checkpoint.save(file_prefix = checkpoint_prefix)

In [None]:
EPOCHS = 50

In [None]:
# train(train_data,EPOCHS)

# EValuate Model

In [None]:
test_input,test_val,y_true = test_data.as_numpy_iterator().next()

In [None]:
test_var = test_data.as_numpy_iterator().next()

In [None]:
len(test_var[0]) #anchor

In [None]:
len(test_var[1]) # positive/negative

In [None]:
test_var[2]

In [None]:
y_true

In [None]:
# Make predictions
y_hat = siamese_model.predict([test_input,test_val])
y_hat

### Make Predictions

In [None]:
# Post processing the result
[1 if prediction > 0.5 else 0 for prediction in y_hat]


In [None]:
y_true

In [None]:
# creating a metric object
m = Recall()
#calculating recall value
m.update_state(y_true,y_hat)
# Return rcall Result
m.result().numpy()

In [None]:
# Creating a metric object 
m = Precision()

# Calculating the recall value 
m.update_state(y_true, y_hat)

# Return Recall Result
m.result().numpy()

### Visualize the result

In [None]:
plt.figure(figsize=(18,8))

# Set first subplot
plt.subplot(1,2,1)
plt.imshow(test_input[3])

#set second subplot
plt.subplot(1,2,2)
plt.imshow(test_val[3])
plt.show()

# Save Model

In [None]:
## Save weights
# siamese_model.save('siamesemodelv2.h5')

In [None]:
# Reload the model
model = tf.keras.models.load_model('siamesemodel.h5',custom_objects={'L1Dist':L1Dist,'BinaryCrossentropy':tf.losses.BinaryCrossentropy})

In [None]:
model.predict([test_input,test_val])

In [None]:
# View model summary

siamese_model.summary()

# Real Time Test

In [None]:
## Create few directories and folders

if not os.path.exists('application_data'):
    os.makedirs('application_data')
    
subfolder_names = ['input_image','verification_images']

if not subfolder_name 
for subfolder_name in subfolder_names:
    os.makedirs(os.path.join('application_data',subfolder_name))

os.makedirs('application_data',exist_ok=True)

In [None]:
# copy random files from one positive folder to verification_image folder

import os
import random
import shutil

source = 'data/positive'
dest = 'application_data/verification_images'

files = os.listdir(source)
no_of_files = 50

path, dirs, files_len = next(os.walk('application_data/verification_images')) #dir is your directory path as string

print("Total number of files in directory are :",len(files_len))

if len(files_len) < 50:
    for file_name in random.sample(files,no_of_files):
        shutil.copy(os.path.join(source,file_name),dest)
else:
    pass


### Verification Function

In [None]:
os.listdir(os.path.join('application_data','verification_images'))

In [None]:
os.path.join('application_data','input_image','input_image.jpg')

In [None]:
for image in os.listdir(os.path.join('application_data','verification_images')):
    validation_img = preprocess(os.path.join('application_data','verification_images',image))
    print(validation_img)

In [None]:
def verify(model, detection_threshold, verification_threshold):
    """
    
    model is what model are we going to use for predictions
    Detection threshold is a metric above which a prediction is considered
    positive.
    Verification threshold is a proportion of a positive predictions / total positive
    samples
    
    Has input_img function that preprocess the image and saves it in application_data/input_image/input_image.jpg
    file path.
    """
    
    # Build results list
    results = []
    for image in os.listdir(os.path.join('application_data','verification_images')):
        input_img = preprocess(os.path.join('application_data','input_image','input_image.jpg'))
        #preprocess function that we created above to scale,resize and noramlize the images
        validation_img = preprocess(os.path.join('application_data','verification_images',image))
        
        result = model.predict(list(np.expand_dims([input_img,validation_img],axis=1)))
        results.append(result)
    
    
    detection = np.sum(np.array(results) > detection_threshold) # it determines how many of our
    # positive prediction are actually surpassing the detection_threshold
    
    verification = detection/len(os.listdir(os.path.join('application_data', 'verification_images')))
    
    verified = verification > verification_threshold
    
    return results, verified


### OpenCV Real time verification

In [None]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret,frame = cap.read()
    frame = (frame[220:220+250,250:250+250:])
    
    cv2.imshow('verification',frame)
    
    # Verification trigger
    if cv2.waitKey(10) & 0xFF == ord('v'):
        # Save input image to application_data/input_image folder
        cv2.imwrite(os.path.join('application_data','input_image','input_image.jpg'),frame)
        
        
        # Run verification
        results,verified = verify(model,0.9,0.7)
        print(verified)
        
    
    if cv2.waitKey(10) & 0xFF == ord('q'):
        break
        
cap.release()
cv2.destroyAllWindows()
    

In [None]:
np.squeeze(results) > 0.5

In [None]:
np.sum(np.squeeze(results)>0.5)