# Install Dependencies

In [None]:
%pip install -r requirements.txt

# Import Dependencies

In [1]:
import warnings
warnings.filterwarnings('ignore')
import cv2
import os
import random
import shutil
import uuid
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Input, Conv2D, MaxPooling2D, Flatten, Dense
from tensorflow.keras.metrics import Precision, Recall

2023-11-17 22:43:13.234494: W tensorflow/stream_executor/platform/default/dso_loader.cc:60] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /home/sanskar/Desktop/deep_facial_recognition/.conda/lib/python3.8/site-packages/cv2/../../lib64:
2023-11-17 22:43:13.234513: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.


# Set GPU Growth

In [4]:
gpus = tf.config.experimental.list_physical_devices('GPU')
print(gpus)
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

[]


# Setup Paths and create folder structures

In [3]:
# setup paths
POS_PATH = os.path.join('data', 'positiveimgs')
NEG_PATH = os.path.join('data', 'negativeimgs')
ANC_PATH = os.path.join('data', 'anchorimgs')

In [6]:
# create folder structures
os.makedirs(POS_PATH, exist_ok=True)
os.makedirs(NEG_PATH, exist_ok=True)
os.makedirs(ANC_PATH, exist_ok=True)

# Collecting Data

In [None]:
# negative images -- from the Labelled faces in the wild dataset (https://shorturl.at/aitN9) (all images are in the 250 x 250 format)
# anchor images -- from the webcam
# positive images -- from the webcam

Collecting Negative images :

In [None]:

#install tar file
!wget http://vis-www.cs.umass.edu/lfw/lfw.tgz

In [None]:
# untar the file
!tar -xvzf lfw.tgz

In [7]:
# move all images to the negative folder
for directory in os.listdir('lfw'):
    for file in os.listdir(os.path.join('lfw', directory)):
        EX_PATH = os.path.join('lfw', directory, file)
        NEW_PATH = os.path.join(NEG_PATH, file)
        os.replace(EX_PATH, NEW_PATH)

Collecting Positive and Anchor images :

In [None]:
cap = cv2.VideoCapture(0)

while cap.isOpened(): # while webcam is open
    success, frame = cap.read()
    
    frame = frame[160:160+250, 200:200+250, :] # crop the frame to 250 x 250 with all color channels
    
    # collect anchors
    if cv2.waitKey(1) & 0xFF == ord('a'): # if 'a' is pressed while it waits for a millisecond
        imgname = os.path.join(ANC_PATH, str(uuid.uuid1()) + '.jpg') # create a unique name for the image # uuid.uuid1 generates a random uuid (universally unique id)
        cv2.imwrite(imgname, frame) # write out anchor image
    
    # collect positives
    if cv2.waitKey(1) & 0xFF == ord('p'): # if 'p' is pressed while it waits for a millisecond
        imgname = os.path.join(POS_PATH, str(uuid.uuid1()) + '.jpg')
        cv2.imwrite(imgname, frame)
    
    cv2.imshow('frame', frame)
    if cv2.waitKey(1) & 0xFF == ord('q'): # if 'q' is pressed while it waits for a millisecond
        break
    
cap.release() # release the webcam
cv2.destroyAllWindows() # close all windows

# Load and Preprocess images

Load the data :

In [4]:
anchor = tf.data.Dataset.list_files(os.path.join(ANC_PATH, '*.jpg')).take(359) # take 359 images from the anchor folder #we choose 359 because we have a minimum of 359 images in the anchor, positive and negative data folders
positive = tf.data.Dataset.list_files(os.path.join(POS_PATH, '*.jpg')).take(359) # take 359 images from the positive folder
negative = tf.data.Dataset.list_files(os.path.join(NEG_PATH, '*.jpg')).take(359) # take 359 images from the negative folder

2023-11-17 22:44:27.142640: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2023-11-17 22:44:27.143985: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2023-11-17 22:44:27.166754: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2023-11-17 22:44:27.166779: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (Lapy): /proc/driver/nvidia/version does not exist
2023-11-17 22:44:27.167665: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-11-17 22:44:27.1684

Create Labelled dataset :

In [5]:
# (anchor, positive) => 1
# (anchor, negative) => 0

positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor))))) # create a dataset of (anchor, positive, 1) # from_tensor_slices creates a dataset from a tensor
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor))))) # create a dataset of (anchor, negative, 0)
data = positives.concatenate(negatives) # concatenate the two datasets

print(data)

<ConcatenateDataset shapes: ((), (), ()), types: (tf.string, tf.string, tf.float32)>


In [6]:
samples = data.as_numpy_iterator()
print(samples.next())

(b'data/anchorimgs/b14ba46f-8486-11ee-82cd-3b68759fe6e2.jpg', b'data/positiveimgs/3bf18032-8485-11ee-82cd-3b68759fe6e2.jpg', 1.0)


Preprocessing and Data loader pipeline :

In [7]:
def preprocess(file_path):
    byte_img = tf.io.read_file(file_path) # read the image
    img = tf.image.decode_jpeg(byte_img) # decode the image
    img = tf.image.resize(img, (100, 100)) # resize the image to 100x100
    img = img/255.0 # normalize the image
    return img

def preprocess_twin(inputimg, valimg, label):
    return (preprocess(inputimg), preprocess(valimg)), label

data = data.map(preprocess_twin) # map the preprocess_twin function to the dataset # data gets converted from ( a, p, l) to ((a,  p) , l) due to the return statement in the preprocess_twin function
data = data.cache() # cache the dataset
data = data.shuffle(1024) # shuffle the dataset

# Visualizing 5 samples from the data

In [None]:
samples = data.take(5)

fig, axs = plt.subplots(5, 2, figsize=(10, 20))
for i, sample in enumerate(samples):
    axs[i, 0].imshow(sample[0][0])
    axs[i, 0].set_title('Anchor Image')
    axs[i, 1].imshow(sample[0][1])
    axs[i, 1].set_title('Validation Image')
plt.show()


# Create Train and Test partition

Training partition :

In [8]:
train_data = data.take(round(0.7 * len(data))) # take 70% of the data for training
train_data = train_data.batch(16) # batch the data
train_data = train_data.prefetch(8) # prefetch the data for faster processing and avoiding bottlenecks

Testing partition :

In [9]:
test_data = data.skip(round(0.7 * len(data))) # skip 70% of the data for testing
test_data = test_data.take(round(0.3 * len(data))) # take 30% of the data for testing
test_data = test_data.batch(16) # batch the data
test_data = test_data.prefetch(8) # prefetch the data

# Building the Model

Build embedding layer :

In [10]:
def make_embedding():
    
    inp = Input(shape=(100, 100, 3)) # input layer
    
    # Frist Block
    c1 = Conv2D(64, (10, 10), activation='relu')(inp) # convolutional layer
    m1 = MaxPooling2D(64, (2,2), padding='same')(c1) # max pooling layer
    
    # Second Block
    c2 = Conv2D(128, (7, 7), activation='relu')(m1) # convolutional layer
    m2 = MaxPooling2D(64, (2,2), padding='same')(c2) # max pooling layer
    
    # Third Block
    c3 = Conv2D(128, (4, 4), activation='relu')(m2) # convolutional layer
    m3 = MaxPooling2D(64, (2,2), padding='same')(c3) # max pooling layer
    
    # Final Block
    c4 = Conv2D(256, (4, 4), activation='relu')(m3) # convolutional layer
    f1 = Flatten()(c4) # flatten the output
    d1 = Dense(4096, activation='sigmoid')(f1) # dense layer
    
    return Model(inputs=[inp], outputs=[d1], name='embedding') # return the model

In [11]:
embedding = make_embedding() # create the model
embedding.summary() # print the model summary

Model: "embedding"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, 100, 100, 3)]     0         
_________________________________________________________________
conv2d (Conv2D)              (None, 91, 91, 64)        19264     
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 46, 46, 64)        0         
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 40, 40, 128)       401536    
_________________________________________________________________
max_pooling2d_1 (MaxPooling2 (None, 20, 20, 128)       0         
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 17, 17, 128)       262272    
_________________________________________________________________
max_pooling2d_2 (MaxPooling2 (None, 9, 9, 128)         0 

Build Distance layer :

In [12]:
class L1Dist(Layer): 
        
    def __init__(self, **kwargs):
        super().__init__()
    
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding) # return the absolute difference between the two inputs
    

Build Siamese model :

In [13]:
def make_siamese_model():
    
    # handle inputs 
    anchor_img = Input(name="anchor_img", shape=(100, 100, 3))
    validation_img = Input(name="val_img", shape=(100, 100, 3)) 
    
    # combining the distance component and the embedding component
    dist_layer = L1Dist() # create the distance layer
    dist_layer._name = 'distance' # name the layer
    distances = dist_layer(embedding(anchor_img), embedding(validation_img)) # calculate the distance between the two embeddings
    
    #classification component
    classification = Dense(1, activation='sigmoid')(distances) # create the classification layer
    
    return Model(inputs=[anchor_img, validation_img], outputs=[classification], name='siameseNetwork') # return the model

In [14]:
siamese_net = make_siamese_model() # create the model
siamese_net.summary() # print the model summary

Model: "siameseNetwork"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
anchor_img (InputLayer)         [(None, 100, 100, 3) 0                                            
__________________________________________________________________________________________________
val_img (InputLayer)            [(None, 100, 100, 3) 0                                            
__________________________________________________________________________________________________
embedding (Functional)          (None, 4096)         38960448    anchor_img[0][0]                 
                                                                 val_img[0][0]                    
__________________________________________________________________________________________________
distance (L1Dist)               (None, 4096)         0           embedding[0][0]     

# Train the model

Setup loss and optimizers :

In [62]:
binary_cross_loss = tf.keras.losses.BinaryCrossentropy() # create the loss function
opt = tf.keras.optimizers.Adam(1e-4) # create the optimizer # 1e-4 = 0.0001

Create Checkpoints :

In [63]:
checkpoint_dir = "./training_checkpoints"
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(opt=opt, model=siamese_net) # create a checkpoint

Build train step function :

In [106]:
@tf.function
def train_step(batch):
    with tf.GradientTape() as tape: # record all operations done to the inputs
        X = batch[0][:2] # get the anchor and pos/neg images
        y = batch[1] # get the labels
        
        # forward prop 
        yhat = siamese_net(X, training=True) # get the predictions
        loss = binary_cross_loss(y, yhat) # calculate the loss
    
    # back prop
    grads = tape.gradient(loss, siamese_net.trainable_variables) # calculate the gradients
    opt.apply_gradients(zip(grads, siamese_net.trainable_variables)) # apply the gradients
    
    return loss

Build training loop :

In [111]:
def train(data, EPOCHS):
    # Loop through epochs
    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))
        
        # Creating a metric object 
        r = Recall()
        p = Precision()
        
        # Loop through each batch
        for idx, batch in enumerate(data):
            # Run train step here
            loss = train_step(batch)
            yhat = siamese_net.predict(batch[0][:2])
            p.update_state(batch[1], yhat) 
            r.update_state(batch[1], yhat)
            progbar.update(idx+1)
        print("LOSS: " + str(loss.numpy()) + " RECALL: " + str(r.result().numpy()) + " PRECISION: " + str(p.result().numpy()))
        
        # Save checkpoints
        if epoch % 10 == 0: 
            checkpoint.save(file_prefix=checkpoint_prefix)


Training:

In [112]:
EPOCHS = 50
train(train_data, EPOCHS)


 Epoch 1/50
LOSS: 0.11887312 RECALL: 0.9147287 PRECISION: 0.9874477

 Epoch 2/50
LOSS: 0.056339644 RECALL: 0.97131145 PRECISION: 0.9957983

 Epoch 3/50
LOSS: 0.0002914805 RECALL: 0.9882353 PRECISION: 0.99604744

 Epoch 4/50
LOSS: 0.027512914 RECALL: 0.988 PRECISION: 0.99596775

 Epoch 5/50
LOSS: 0.0020919587 RECALL: 0.9922179 PRECISION: 0.9922179

 Epoch 6/50
LOSS: 0.31449324 RECALL: 0.9919355 PRECISION: 1.0

 Epoch 7/50
LOSS: 0.09957038 RECALL: 0.96428573 PRECISION: 0.99590164

 Epoch 8/50
LOSS: 0.2540321 RECALL: 0.946281 PRECISION: 0.9956522

 Epoch 9/50
LOSS: 0.011796054 RECALL: 0.95752895 PRECISION: 0.992

 Epoch 10/50
LOSS: 0.13321283 RECALL: 0.9922179 PRECISION: 0.9922179

 Epoch 11/50
LOSS: 0.18760954 RECALL: 0.983871 PRECISION: 1.0

 Epoch 12/50
LOSS: 0.01487653 RECALL: 0.99230766 PRECISION: 0.99230766

 Epoch 13/50
LOSS: 0.063454255 RECALL: 0.9919028 PRECISION: 1.0

 Epoch 14/50
LOSS: 9.9472716e-05 RECALL: 1.0 PRECISION: 1.0

 Epoch 15/50
LOSS: 0.0018794456 RECALL: 0.9959514 

# Evaluate Model

Precision and recall values for all th 14 batches in the test data :

In [128]:
precision = Precision()
recall = Recall()
for id, batch in enumerate(test_data):
    test_input = batch[0][0]
    test_val = batch[0][1]
    y_true = batch[1]
    yhat = siamese_net.predict([test_input, test_val])
    precision.update_state(y_true, yhat)
    recall.update_state(y_true, yhat)
    print(f"Test data Batch no {id} => Precision: {precision.result().numpy()} , Recall: {recall.result().numpy()}")

Test data Batch no 0 => Precision: 1.0 , Recall: 1.0
Test data Batch no 1 => Precision: 1.0 , Recall: 1.0
Test data Batch no 2 => Precision: 1.0 , Recall: 1.0
Test data Batch no 3 => Precision: 1.0 , Recall: 1.0
Test data Batch no 4 => Precision: 1.0 , Recall: 1.0
Test data Batch no 5 => Precision: 1.0 , Recall: 1.0
Test data Batch no 6 => Precision: 1.0 , Recall: 1.0
Test data Batch no 7 => Precision: 1.0 , Recall: 1.0
Test data Batch no 8 => Precision: 1.0 , Recall: 1.0
Test data Batch no 9 => Precision: 1.0 , Recall: 1.0
Test data Batch no 10 => Precision: 1.0 , Recall: 1.0
Test data Batch no 11 => Precision: 1.0 , Recall: 1.0
Test data Batch no 12 => Precision: 1.0 , Recall: 1.0
Test data Batch no 13 => Precision: 1.0 , Recall: 1.0


# Save the model

In [15]:
# Save weights
siamese_net.save('siamesemodel.h5')

# Real time test

Reload the model :

In [16]:
model = tf.keras.models.load_model('siamesemodel.h5', custom_objects={'L1Dist': L1Dist}) # load the model # custom_objects is used to load the custom layer



Opencv real time facial recognition :

In [19]:
# return the name and the probability of the input image which matches the anchor image the most
# it assumes that you have a folder called 'recognition_data' in the same directory as this notebook and that folder contains a folder called 'input_image' which contains the input image and a folder called 'verification_imgs' which contains the images of different persons and the imaes are named after the person's name

def recognize():
    pred_name = ''
    max_prob = 0
    for name in os.listdir(os.path.join('recognition_data', 'verification_imgs')):
        img = os.path.join('recognition_data', 'verification_imgs', name)
        img = preprocess(img)
        val_img = os.path.join('recognition_data', 'input_image', 'input.jpg')
        val_img = preprocess(val_img)
        result = model.predict(list(np.expand_dims([img, val_img], axis=1)))
        if result>max_prob:
            max_prob = result
            pred_name = name
    return pred_name, max_prob


In [23]:
cap = cv2.VideoCapture(0)

while cap.isOpened():
    success, frame = cap.read()
    frame = frame[160:160+250, 200:200+250, :]
    
    # recognition trigger 
    if cv2.waitKey(10) & 0xFF == ord('v'):
        cv2.imwrite(os.path.join("recognition_data", "input_image", "input.jpg"), frame) # save the input image
        name, prob = recognize()
        print(str(name)[:-4], prob)
    
    cv2.imshow('Recognize', frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
    
cap.release()
cv2.destroyAllWindows()

Sanskar. [[0.4971676]]


Do verification :

Create required folders :

In [141]:
os.makedirs(os.path.join("verification_data", "verification_images"), exist_ok=True)
os.makedirs(os.path.join("verification_data", "input_images"), exist_ok=True)

# copy some positive images to the verification folder
for i in range(30):
    imgname = os.path.join(POS_PATH, os.listdir(POS_PATH)[i])
    newname = os.path.join("verification_data", "verification_images", str(uuid.uuid1()) + '.jpg')
    shutil.copy(imgname, newname)


Verification function :

In [137]:
def verify(model, detection_threshold, verification_threshold):
    
    #detection threshold : metric above which a prediction is considered as positive
    #verification threshold : min proportion of (positive predictions))/(total positive samples) required to verify a person
    
    results=[]
    for img in os.listdir(os.path.join("verification_data", "verification_images")):
        input_img = preprocess(os.path.join("verification_data", "input_images", "input_image.jpg"))
        val_img = preprocess(os.path.join("verification_data", "verification_images", img))
        result = model.predict(list(np.expand_dims([input_img, val_img], axis=1))) #list(np.expand_dims([input_img, val_img], axis=1)) will first expand the dimensions of the input and val images from (100, 100, 3) to (1, 100, 100, 3) by adding it into another set of parenthesis and then convert it to a list # we added the extra dimension because we are passing a single input
        results.append(result)
        
    detection = np.sum(np.array(results) > detection_threshold) # calculate the number of positive predictions
    verification = detection/len(os.listdir(os.path.join("verification_data", "verification_images"))) # calculate the proportion of positive predictions
    verified = verification > verification_threshold # check if the proportion of positive predictions is greater than the verification threshold
    
    return results, verified

Opencv real time verification :

In [None]:
cap = cv2.VideoCapture(0)

while cap.isOpened():
    success, frame = cap.read()
    frame = frame[160:160+250, 200:200+250, :]
    
    # verification trigger 
    if cv2.waitKey(10) & 0xFF == ord('v'):
        cv2.imwrite(os.path.join("verification_data", "input_images", "input_image.jpg"), frame) # save the input image
        results, verified = verify(model, 0.5, 0.5) # verify the input image
        print(verified)
    
    cv2.imshow('Verification', frame)
    if cv2.waitKey(10) & 0xFF == ord('q'):
        break
    
cap.release()
cv2.destroyAllWindows()