In [1]:
 #Import standard dependencies
 import cv2
 import os
 import random
 import numpy as np
 import matplotlib.pyplot as plt

In [2]:
#import tensorflow dependencies-Functional API
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf


In [3]:
#Avoid Out of Memory error by setting GPU Memory
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
  tf.config.experimental.set_memory_growth(gpu, True)

In [4]:
pos_path='/content/drive/MyDrive/face_rec_project/positive'
neg_path='/content/drive/MyDrive/face_rec_project/negative'
anc_path='/content/drive/MyDrive/face_rec_project/anchor'

**Collect Positivies And Anchors**

In [None]:
#uncompress tar gz labelled faces in the wild datset
%cd '/content/drive/MyDrive/face_rec_project'
#!tar -xf lfw.tgz

/content/drive/MyDrive/face_rec_project


In [None]:
#move lfw images to the following repository
#for directory in os.listdir('/content/drive/MyDrive/face_rec_project/lfw'):
 # for file in os.listdir(os.path.join('/content/drive/MyDrive/face_rec_project/lfw', directory)):
   # EX_PATH = os.path.join('/content/drive/MyDrive/face_rec_project/lfw',directory,file)
    #NEW_PATH = os.path.join(neg_path, file)
    #os.replace(EX_PATH, NEW_PATH)

**collect positive and anchor classes**

In [None]:
import os
import uuid
from IPython.display import display, Javascript, Image
from google.colab.output import eval_js
from base64 import b64decode
from PIL import Image as PILImage

def take_photo(num_images=1, keyword='capture', folder='positive', quality=0.8, width=None, height=None, crop=None):
    # Create the folder if it doesn't exist
    if not os.path.exists(folder):
        os.makedirs(folder)

    # Generate unique filenames using uuid for each captured image
    filenames = []
    for i in range(num_images):
        filename = os.path.join(folder, str(uuid.uuid4()) + '.jpg')
        filenames.append(filename)

        js = Javascript('''
            async function takePhoto(keyword, quality) {
                const div = document.createElement('div');
                const capture = document.createElement('button');
                capture.textContent = 'Capture';
                div.appendChild(capture);

                const video = document.createElement('video');
                video.style.display = 'block';
                const stream = await navigator.mediaDevices.getUserMedia({ video: true });

                document.body.appendChild(div);
                div.appendChild(video);
                video.srcObject = stream;
                await video.play();

                // Resize the output to fit the video element.
                google.colab.output.setIframeHeight(document.documentElement.scrollHeight, true);

                // Wait for the keyword to be entered.
                await new Promise((resolve) => {
                    document.addEventListener('keydown', (event) => {
                        if (event.key === keyword) {
                            resolve();
                        }
                    });
                });

                const canvas = document.createElement('canvas');
                const context = canvas.getContext('2d');
                canvas.width = video.videoWidth;
                canvas.height = video.videoHeight;
                context.drawImage(video, 0, 0, canvas.width, canvas.height);
                stream.getVideoTracks()[0].stop();
                div.remove();

                return canvas.toDataURL('image/jpeg', quality);
            }
        ''')
        display(js)
        data = eval_js('takePhoto("{}")'.format(keyword, quality))
        binary = b64decode(data.split(',')[1])
        with open(filename, 'wb') as f:
            f.write(binary)

        # Adjust the frame size if width and height are provided
        if width and height:
            img = PILImage.open(filename)
            img.thumbnail((width, height))
            img.save(filename)

        # Crop the image if crop parameter is provided
        if crop:
            img = PILImage.open(filename)
            img = img.crop(crop)
            img.save(filename)

    return filenames

# Specify the crop coordinates (left, upper, right, lower)
crop_coordinates = (150, 50, 450, 350)

# Capture 3 images when the keyword 'c' is entered
filenames = take_photo(num_images=1, keyword='c', crop=crop_coordinates)

# Display the captured images
for filename in filenames:
    display(Image(filename))

# Access the saved images
# You can use the "filenames" list to access the saved image paths
print('Saved images:')
for filename in filenames:
    print(filename)


In [5]:
def data_aug(img):
    data = []
    for i in range(9):
        img = tf.image.stateless_random_brightness(img, max_delta=0.02, seed=(1,2))
        img = tf.image.stateless_random_contrast(img, lower=0.6, upper=1, seed=(1,3))
        # img = tf.image.stateless_random_crop(img, size=(20,20,3), seed=(1,2))
        img = tf.image.stateless_random_flip_left_right(img, seed=(np.random.randint(100),np.random.randint(100)))
        img = tf.image.stateless_random_jpeg_quality(img, min_jpeg_quality=90, max_jpeg_quality=100, seed=(np.random.randint(100),np.random.randint(100)))
        img = tf.image.stateless_random_saturation(img, lower=0.9,upper=1, seed=(np.random.randint(100),np.random.randint(100)))

        data.append(img)

    return data

In [6]:
import uuid
for file_name in os.listdir(os.path.join(pos_path)):
    img_path = os.path.join(pos_path, file_name)
    img = cv2.imread(img_path)
    augmented_images = data_aug(img)

    for image in augmented_images:
        cv2.imwrite(os.path.join(pos_path, '{}.jpg'.format(uuid.uuid1())), image.numpy())

**LOAD AND PREPROCESS IMAGES**

In [7]:
#preprocessing the data using tensorflow pipeline
anchor = tf.data.Dataset.list_files(anc_path+'/*.jpg') #
positive = tf.data.Dataset.list_files(pos_path+'/*.jpg')
negative = tf.data.Dataset.list_files(neg_path+'/*.jpg')

**Preprocessing - scale and resize**

In [8]:
def preprocess(file_path):
  #read in image from file path
  byte_img = tf.io.read_file(file_path)
  #loading the image
  img = tf.io.decode_jpeg(byte_img)
  #preprocessing steps-resizig the image
  img = tf.image.resize(img,(105,105))
  #scale the image between 0 and 1
  img = img/255
  return img

**create labelled dataset**

In [None]:
# (anchor,positive) => 1,1,1,1,1
# (anchor,negative) => 0,0,0,0,0

In [9]:
positives = tf.data.Dataset.zip((anchor,positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor,negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

In [10]:
samples = data.as_numpy_iterator()


In [None]:
example=samples.next()
example

**Build Train and Test Partition**

In [12]:
def preprocess_twin(input_img, validation_img, label):
  return(preprocess(input_img), preprocess(validation_img), label)

In [13]:
res=preprocess_twin(*example)

In [None]:
plt.imshow(res[0])

In [None]:
res[2]

In [16]:
# Build dataloader pipeline
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=10000)
# Training partition
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)
# Testing partition
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

**MODEL ENGINNERING**

In [48]:
#Build Embedding layer

from tensorflow.keras.regularizers import l2

def make_embedding(regularization_rate=0.01):
    inp = Input(shape=(105, 105, 3), name='input_image')

    c1 = Conv2D(64, (10, 10), activation='relu', kernel_regularizer=l2(regularization_rate))(inp)
    m1 = MaxPooling2D(64, (2, 2), padding='same')(c1)

    c2 = Conv2D(128, (7, 7), activation='relu', kernel_regularizer=l2(regularization_rate))(m1)
    m2 = MaxPooling2D(64, (2, 2), padding='same')(c2)

    c3 = Conv2D(128, (4, 4), activation='relu', kernel_regularizer=l2(regularization_rate))(m2)
    m3 = MaxPooling2D(64, (2, 2), padding='same')(c3)

    c4 = Conv2D(256, (4, 4), activation='relu', kernel_regularizer=l2(regularization_rate))(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation='sigmoid', kernel_regularizer=l2(regularization_rate))(f1)

    return Model(inputs=[inp], outputs=[d1], name='embedding')

In [None]:
embedding = make_embedding()
embedding.summary()

**Build Distance Layer**

In [50]:
#siamese L1 Distance class
class L1Dist(Layer):

  #Init method - inheritance
  def __init__(self,**kwargs):
    super().__init__()

  #siilarity calculation
  def call(self, input_embedding, validation_embedding):
    return tf.math.abs(input_embedding - validation_embedding)

In [51]:
l1=L1Dist()

**Make Siamese Model**

In [52]:
def make_siamese_model():

  #Anchor image input in the network
  input_image=Input(shape=(105,105,3),name='input_image')

  #validation image input in the network
  validation_image = Input(shape=(105,105,3),name='validation_image')

  # combine siamese distance
  siamese_layer = L1Dist()
  siamese_layer._name = 'distance'
  distances = siamese_layer(embedding(input_image), embedding(validation_image))

  # Claasification Layer
  classifier = Dense(1,activation='sigmoid')(distances)

  return Model(inputs=[input_image,validation_image], outputs = classifier, name='SiameseNetwork')

In [None]:
model = make_siamese_model()
model.summary()

**Training the model**

In [54]:
binary_cross_loss = tf.losses.BinaryCrossentropy() #take a look at logitis= True or False

In [55]:
opt = tf.keras.optimizers.Adam(1e-4)

**Establish Checkpoints**

In [25]:
checkpoint_dir = '/content/drive/MyDrive/face_rec_project/checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, model=model)

**Build Train Step Function**

In [56]:
opt = tf.keras.optimizers.Adam(1e-4)
@tf.function
def train_step(batch):
  #Record all of our operations
    with tf.GradientTape() as tape:
      # get anchor and positive/negative image
      x= batch[:2]
      # get label
      y = batch[2]

      #forward pass
      yhat = model(x, training=True)
      #calculate loss
      loss = binary_cross_loss(y,yhat)
    print(loss)

    #calculate gradients
    grad = tape.gradient(loss,model.trainable_variables)

    #calculate updTED WEIGHT AND APPLY TO MODEL
    opt.apply_gradients(zip(grad,model.trainable_variables))
    return loss

def train(data, EPOCHS):
  #Loop through epochs
  for epoch in range(1,EPOCHS+1):
    print('\n Epoch {}/{}'.format(epoch,EPOCHS))
    progbar = tf.keras.utils.Progbar(len(data))


  #Loop through each batch
    for idx, batch in enumerate(data):
      train_step(batch)
      progbar.update(idx+1)

    # save checkpoints
    if epoch %20 ==0:
        checkpoint.save(file_prefix = checkpoint_prefix)


**build training loops**

In [57]:
EPOCHS = 50


In [None]:
train(train_data , EPOCHS)

**Testing our model**

In [59]:
# import metric calculations
from tensorflow.keras.metrics import Precision, Recall

In [60]:
# get a batch of test data
test_input, test_val, y_true = test_data.as_numpy_iterator().next()

In [None]:
y_true

In [None]:
# Make predictions
y_hat = model.predict([test_input,test_val])


In [None]:
# Post processing the results
[1 if prediction>0.5 else 0 for prediction in y_hat]

In [64]:
# Creating a metric oblect
m = Recall()
#Calculating the recall value
m.update_state(y_true, y_hat)
# Return Recall Result
m.result().numpy()

1.0

In [None]:
plt.figure(figsize=(18,8))
plt.subplot(1,2,1)
plt.imshow(test_input[4])
plt.subplot(1,2,2)
plt.imshow(test_val[4])
plt.show()

**SAVE MODEL**

In [None]:
model.save('model.h5')

In [None]:
# reload model
model = tf.keras.models.load_model('model.h5',
                                  custom_objects={'L1Dist':L1Dist, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy})

**REAL TIME TEST**

verification function

In [37]:
def verify( model, detection_threshold, verification_threshold):
    # Build results array
    results = []
    for images in os.listdir('/content/drive/MyDrive/face_rec_project/application data/verification image'):
      input_image = preprocess(os.path.join('/content/drive/MyDrive/face_rec_project/application data', 'input image', 'input_image.jpg'))
      validation_image = preprocess(os.path.join('/content/drive/MyDrive/face_rec_project/application data', 'verification image', images))

      result = model.predict(list(np.expand_dims([input_image, validation_image], axis=1)))
      results.append(result)

    detection = np.sum(np.array(results)>detection_threshold)
    verification=detection/len(os.listdir('/content/drive/MyDrive/face_rec_project/application data/verification image'))
    verified = verification > verification_threshold

    return results, verified

  #Detection Threshold: Metric above which a prediction is considered positive
  #Verification Threshold: Proportion of positive predictios/total positive samples


In [None]:
import os
from IPython.display import display, Javascript, Image
from google.colab.output import eval_js
from base64 import b64decode
from PIL import Image as PILImage

def take_photo(keyword='capture', folder='/content/drive/MyDrive/face_rec_project/application data/input image', quality=0.8, width=None, height=None, crop=None):
    # Create the folder if it doesn't exist
    if not os.path.exists(folder):
        os.makedirs(folder)

    # Generate a unique filename using the current timestamp
    filename = os.path.join(folder, 'input_image.jpg')

    js = Javascript('''
        async function takePhoto(keyword, quality) {
            const div = document.createElement('div');
            const capture = document.createElement('button');
            capture.textContent = 'Capture';
            div.appendChild(capture);

            const video = document.createElement('video');
            video.style.display = 'block';
            const stream = await navigator.mediaDevices.getUserMedia({ video: true });

            document.body.appendChild(div);
            div.appendChild(video);
            video.srcObject = stream;
            await video.play();

            // Resize the output to fit the video element.
            google.colab.output.setIframeHeight(document.documentElement.scrollHeight, true);

            // Wait for the keyword to be entered.
            await new Promise((resolve) => {
                document.addEventListener('keydown', (event) => {
                    if (event.key === keyword) {
                        resolve();
                    }
                });
            });

            const canvas = document.createElement('canvas');
            const context = canvas.getContext('2d');
            canvas.width = video.videoWidth;
            canvas.height = video.videoHeight;
            context.drawImage(video, 0, 0, canvas.width, canvas.height);
            stream.getVideoTracks()[0].stop();
            div.remove();

            return canvas.toDataURL('image/jpeg', quality);
        }
    ''')
    display(js)
    data = eval_js('takePhoto("{}")'.format(keyword, quality))
    binary = b64decode(data.split(',')[1])
    with open(filename, 'wb') as f:
        f.write(binary)

    # Adjust the frame size if width and height are provided
    if width and height:
        img = PILImage.open(filename)
        img.thumbnail((width, height))
        img.save(filename)

    # Crop the image if crop parameter is provided
    if crop:
        img = PILImage.open(filename)
        img = img.crop(crop)
        img.save(filename)

    return filename

# Specify the crop coordinates (left, upper, right, lower)
crop_coordinates = (150, 50, 450, 350)

# Capture an image when the keyword 'c' is entered
filename = take_photo(keyword='c', crop=crop_coordinates)

# Display the captured image
display(Image(filename))

results, verified = verify(model,0.7,0.7)
print(verified)