<a href="https://colab.research.google.com/github/joshi-suraj/Face_Recognition_DL/blob/main/FaceRecognition.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**1. SETUP**




**1.1 Install Dependencies**

The below libraries are already installed in colab these are specified if using any other platform like jupyter.


In [1]:
pip install tensorflow opencv-python matplotlib

**1.2 Import Dependencies**

In [2]:
# Importing Standard Dependencies
import cv2
import os
import random
import numpy as np
from matplotlib import pyplot as plt

In [3]:
# Importing Tensorflow Dependencies - Functional API
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten

**1.3 Set GPU Growth**

Setting up GPU growth or dynamic memory allocation for GPU memory is usually required when you're working with TensorFlow (or other deep learning frameworks) on systems that have a physical GPU, like in personal computers or remote servers.

This is not typically necessary when using Google Colab because Colab manages GPU memory allocation automatically. However, if you're working on your local machine or a remote server, you might need to do this to avoid reserving all GPU memory at once, which can lead to inefficient usage.

This code iterates through the available GPUs and enables memory growth for each of them. Memory growth means that TensorFlow will only allocate GPU memory as needed, rather than reserving the entire GPU memory from the start.

In [4]:
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
  tf.config.experimental.set_memory_growth(gpu, True)

**1.4 Create Folders Structures**

In [5]:
Pos_Path = os.path.join('data', 'Positive')
Neg_Path = os.path.join('data', 'Negative')
Anc_Path = os.path.join('data', 'Anchor')

In [None]:
os.makedirs(Pos_Path)
os.makedirs(Neg_Path)
os.makedirs(Anc_Path)

**2. COLLECTING ANCHOR AND POSITIVE IMAGES**

**2.1 Getting Labelled Faces in the Wild Dataset**

In [None]:
!tar -xf lfw.tgz

In [None]:
for directory in os.listdir('lfw'):
  for file in os.listdir(os.path.join('lfw', directory)):
    Ex_Path = os.path.join('lfw',directory,file)
    New_Path = os.path.join(Neg_Path,file)
    os.replace(Ex_Path,New_Path)

In [None]:
for directory in os.listdir('lfw'):
  for file in os.listdir(os.path.join('lfw', directory)):
    print(os.path.join('lfw',directory,file))
    print(os.path.join(Neg_Path,file))


In [8]:
#Import uuid(Universal Unique Identifier) library to generate unique image names
import uuid

In [9]:
#Establish a connection to the webcam
cap = cv2.VideoCapture(4)
while cap.isOpened():
  ret, frame=cap.read()

  #Collect Anchors
  if cv2.waitkey(1) & 0XFF == ord('a'):
    #Create the Unique File Path
    imgname = os.path.join(Anc_Path, '{}.jpg'.format(uuid.uuid1()))
    #Write out Anchor image
    cv2.imwrite(imgname, frame)


  #Collect Positive
  if cv2.waitkey(1) & 0XFF == ord('a'):
    #Create the Unique File Path
    imgname = os.path.join(Pos_Path, '{}.jpg'.format(uuid.uuid1()))
    #Write out Positive image
    cv2.imwrite(imgname, frame)

  #Show images back to the Screen
  cv2.imshow('Suraj Image', frame)

  #Breaking Gracefully
  if cv2.waitkey(1) & 0XFF == ord('q'):
    break

#Releasing Webcam
cap.release()
#close the image show frame
cv2.destroyAllWindows()

**3. LOAD AND PREPROCESS IMAGE**

**3.1 Get Image Directories**

In [None]:
anchor = tf.data.Dataset.list_files(Anc_Path+'\*.jpg').take(300)
positive = tf.data.Dataset.list_files(Pos_Path+'\*.jpg').take(300)
negative = tf.data.Dataset.list_files(Neg_Path+'\*.jpg').take(300)

**3.2 Preprocessing - Scale and Resize**

In [11]:
def preprocess(file_path):
  #Reading in image from file path
  byte_img = tf.io.read_file(file_path)

  #Decoding the jpg image
  img = tf.io.decode_jpeg(byte_img)

  #Resize the image to 100*100 pixel
  img = tf.image.resize(img, (100,100))

  #rescaling or normalizing the image i.e. Scale image to be between 0 and 1
  img = img/255.0

  return img

**3.3 Create Labelled Dataset**

In [None]:
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
data = positives.concatenate(negatives)

**3.4 Build Train and Test Partition**

In [None]:
def preprocess_twin(input_img, validation_img, label):
    return(preprocess(input_img), preprocess(validation_img), label)

In [None]:
# Build dataloader pipeline
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=10000)

In [None]:
# Training partition
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [None]:
# Testing partition
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

**4. Model Engineering**

**4.1 Build Embedding Layer**

In [None]:
def make_embedding():
    inp = Input(shape=(100,100,3), name='input_image')

    # First block
    c1 = Conv2D(64, (10,10), activation='relu')(inp)
    m1 = MaxPooling2D(64, (2,2), padding='same')(c1)

    # Second block
    c2 = Conv2D(128, (7,7), activation='relu')(m1)
    m2 = MaxPooling2D(64, (2,2), padding='same')(c2)

    # Third block
    c3 = Conv2D(128, (4,4), activation='relu')(m2)
    m3 = MaxPooling2D(64, (2,2), padding='same')(c3)

    # Final embedding block
    c4 = Conv2D(256, (4,4), activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation='sigmoid')(f1)


    return Model(inputs=[inp], outputs=[d1], name='embedding')

In [None]:
embedding = make_embedding()

In [None]:
embedding.summary()

**4.2 Build Distance Layer**

In [None]:
# Siamese L1 Distance class
class L1Dist(Layer):

    # Init method - inheritance
    def __init__(self, **kwargs):
        super().__init__()

    # Magic happens here - similarity calculation
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

**4.3 Make Siamese Model**

In [None]:
def make_siamese_model():

    # Anchor image input in the network
    input_image = Input(name='input_img', shape=(100,100,3))

    # Validation image in the network
    validation_image = Input(name='validation_img', shape=(100,100,3))

    # Combine siamese distance components
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(input_image), embedding(validation_image))

    # Classification layer
    classifier = Dense(1, activation='sigmoid')(distances)

    return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')

In [None]:
siamese_model = make_siamese_model()

In [None]:
siamese_model.summary()

**5. Training**

**5.1 Setup Loss and Optimizer**

In [None]:
binary_cross_loss = tf.losses.BinaryCrossentropy()

In [None]:
opt = tf.keras.optimizers.Adam(1e-4) # 0.0001

**5.2 Establish Checkpoints**

In [None]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)

**5.3 Build Train Step Function**

In [None]:
@tf.function
def train_step(batch):

    # Record all of our operations
    with tf.GradientTape() as tape:
        # Get anchor and positive/negative image
        X = batch[:2]
        # Get label
        y = batch[2]

        # Forward pass
        yhat = siamese_model(X, training=True)
        # Calculate loss
        loss = binary_cross_loss(y, yhat)
    print(loss)

    # Calculate gradients
    grad = tape.gradient(loss, siamese_model.trainable_variables)

    # Calculate updated weights and apply to siamese model
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))

    # Return loss
    return loss

**5.4 Build Training Loop**

In [None]:
# Import metric calculations
from tensorflow.keras.metrics import Precision, Recall

In [None]:
def train(data, EPOCHS):
    # Loop through epochs
    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))

        # Creating a metric object
        r = Recall()
        p = Precision()

        # Loop through each batch
        for idx, batch in enumerate(data):
            # Run train step here
            loss = train_step(batch)
            yhat = siamese_model.predict(batch[:2])
            r.update_state(batch[2], yhat)
            p.update_state(batch[2], yhat)
            progbar.update(idx+1)
        print(loss.numpy(), r.result().numpy(), p.result().numpy())

        # Save checkpoints
        if epoch % 10 == 0:
            checkpoint.save(file_prefix=checkpoint_prefix)

**5.5 Train the model**

In [None]:
EPOCHS = 50

In [None]:
train(train_data, EPOCHS)

**6. Evaluate Model**

**6.1 Import Metrics**

In [None]:
# Import metric calculations
from tensorflow.keras.metrics import Precision, Recall

**6.2 Make Predictions**

In [None]:
# Get a batch of test data
test_input, test_val, y_true = test_data.as_numpy_iterator().next()

In [None]:
y_hat = siamese_model.predict([test_input, test_val])

In [None]:
# Post processing the results
[1 if prediction > 0.5 else 0 for prediction in y_hat ]

In [None]:
y_true

**6.3 Calculate Metrics**

In [None]:
# Creating a metric object
m = Recall()

# Calculating the recall value
m.update_state(y_true, y_hat)

# Return Recall Result
m.result().numpy()

In [None]:
# Creating a metric object
m = Precision()

# Calculating the recall value
m.update_state(y_true, y_hat)

# Return Recall Result
m.result().numpy()

**6.4 Viz Results**

In [None]:
# Set plot size
plt.figure(figsize=(10,8))

# Set first subplot
plt.subplot(1,2,1)
plt.imshow(test_input[0])

# Set second subplot
plt.subplot(1,2,2)
plt.imshow(test_val[0])

# Renders cleanly
plt.show()

**7. Save Model**

In [None]:
# Save weights
siamese_model.save('siamesemodelv2.h5')

In [None]:
# Reload model
siamese_model = tf.keras.models.load_model('siamesemodelv2.h5',
                                   custom_objects={'L1Dist':L1Dist, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy})

In [None]:
# Make predictions with reloaded model
siamese_model.predict([test_input, test_val])

In [None]:
# View model summary
siamese_model.summary()

**8. Real Time Test**

**8.1 Verification Function**

In [None]:
def verify(model, detection_threshold, verification_threshold):
    # Build results array
    results = []
    for image in os.listdir(os.path.join('application_data', 'verification_images')):
        input_img = preprocess(os.path.join('application_data', 'input_image', 'input_image.jpg'))
        validation_img = preprocess(os.path.join('application_data', 'verification_images', image))

        # Make Predictions
        result = model.predict(list(np.expand_dims([input_img, validation_img], axis=1)))
        results.append(result)

    # Detection Threshold: Metric above which a prediciton is considered positive
    detection = np.sum(np.array(results) > detection_threshold)

    # Verification Threshold: Proportion of positive predictions / total positive samples
    verification = detection / len(os.listdir(os.path.join('application_data', 'verification_images')))
    verified = verification > verification_threshold

    return results, verified

**8.2 OpenCV Real Time Verification**

In [None]:
cap = cv2.VideoCapture(4)
while cap.isOpened():
    ret, frame = cap.read()
    frame = frame[120:120+250,200:200+250, :]

    cv2.imshow('Verification', frame)

    # Verification trigger
    if cv2.waitKey(10) & 0xFF == ord('v'):
        # Save input image to application_data/input_image folder
        cv2.imwrite(os.path.join('application_data', 'input_image', 'input_image.jpg'), frame)
        # Run verification
        results, verified = verify(siamese_model, 0.9, 0.7)
        print(verified)

    if cv2.waitKey(10) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()