In [8]:
!git clone https://github.com/mirmirmirr/facial-recognition

Cloning into 'facial-recognition'...
remote: Enumerating objects: 931, done.[K
remote: Counting objects: 100% (931/931), done.[K
remote: Compressing objects: 100% (930/930), done.[K
remote: Total 931 (delta 1), reused 930 (delta 0), pack-reused 0[K
Receiving objects: 100% (931/931), 13.59 MiB | 31.41 MiB/s, done.
Resolving deltas: 100% (1/1), done.


### setup

In [2]:
# dependencies
import cv2
import os
import random
import numpy as mp
from matplotlib import pyplot as plt

In [3]:
# tensorflow dependencies (model compenents and deep learning components)
# using tensorflow funcational api
import tensorflow as tf
from keras.models import Model
from keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten


In [4]:
# avoiding out of memory (OOM) errors by setting GPU Memory Consumption Growth
gpu_list = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpu_list:
    tf.config.experimental.set_memory_growth(gpu, True)

In [5]:
# setup paths
POS_PATH = os.path.join('/content', 'facial-recognition', 'data', 'positive')
NEG_PATH = os.path.join('/content', 'facial-recognition', 'data', 'neg')
ANC_PATH = os.path.join('/content', 'facial-recognition', 'data', 'anchor')

In [6]:
POS_PATH+'\*.jpg'

'/content/facial-recognition/data/positive\\*.jpg'

### collecting postives and anchors

### preprocessing

In [10]:
# getting image directories

## creating data sets from the respective data folders randomly
postive = tf.data.Dataset.list_files(POS_PATH+'/*.jpg').take(300)
negative = tf.data.Dataset.list_files(NEG_PATH+'/*.jpg').take(300)
anchor = tf.data.Dataset.list_files(ANC_PATH+'/*.jpg').take(300)


'''
    accessing the dataset iterator: dir_txt = anchor.as_numpy_iterator()
    going through each file in the dataset: dir_test.next()
'''

'\n    accessing the dataset iterator: dir_txt = anchor.as_numpy_iterator() \n    going through each file in the dataset: dir_test.next()\n'

In [11]:
# image preprocessing - scaling and resizing

'''
ultimate goal/usage of preprocess funtion: dataset.map(preprocess)
    this will allow the program to go through each image in the datasets created from the data folders

returns the numpy equivalent of the image
'''
def preprocess(filePath):
    image = tf.io.read_file(filePath)
    img = tf.io.decode_jpeg(image)

    # preprocessing -resizing to be 100x100x3 (pixels, pixels, channels)
    img = tf.image.resize(img, (100, 100))

    # scaling image to be between 0 and 1
    ## traditonally, alll pixel values are between 0 and 255
    img = img/255.0

    return img

In [12]:
# creating labeled dataset
'''
(anchor, postive) => 1,1,1,1,1      creating a "right" dataset
(anchor, negative) => 0,0,0,0,0     creating a "wrong" dataset

data form:
<_ConcatenateDataset element_spec=(TensorSpec(shape=(), dtype=tf.string, name=None), TensorSpec(shape=(), dtype=tf.string, name=None), TensorSpec(shape=(), dtype=tf.float32, name=None))>

    shape:  ((), (), ())
    type:   (tf.string, tf.string, tf.float32)
            (anc filepath, pos/neg filepath, verification of 1 or 0 respectively)

'''

pos = tf.data.Dataset.zip((anchor, postive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
neg = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = pos.concatenate(neg)

In [13]:
'''
usage:
ex = data.as_numpy_iterator()
ample = ex.next()
preprocess_twin(*ample)
'''

def preprocess_twin(input, validation, label):
    return (preprocess(input), preprocess(validation), label)

In [14]:
# building data pipeline
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=1024)

In [15]:
# training partition
trained = data.take(round(len(data)*.7))
trained = trained.batch(16)
trained = trained.prefetch(8)

In [16]:
# testing partition
tested = data.skip(round(len(data)*.7))     # skipping the first 70% of images so we don't take any form trained sample
tested = tested.take(round(len(data)*.3))   # taking the last 30% of images for the testing sample
tested = tested.batch(16)
tested = tested.prefetch(8)

### model

In [17]:
def embedding():
    input = Input(shape=(100, 100, 3), name='input_image')

    # core block 1
    conv1 = Conv2D(64, (10, 10), activation='relu')(input)
    mp1 = MaxPooling2D(64, (2,2), padding='same')(conv1)

    # core block 2
    conv2 = Conv2D(128, (7, 7), activation='relu')(mp1)
    mp2 = MaxPooling2D(64, (2,2), padding='same')(conv2)

    # core block 3
    conv3 = Conv2D(128, (4, 4), activation='relu')(mp2)
    mp3 = MaxPooling2D(64, (2,2), padding='same')(conv3)

    # core block 4
    conv4 = Conv2D(256, (4, 4), activation='relu')(mp3)
    f1 = Flatten()(conv4)
    d1 = Dense(4096, activation='sigmoid')(f1)

    return Model(inputs=[input], outputs=[d1], name='embedding' )


In [18]:
model = embedding()

In [19]:
model.summary()

Model: "embedding"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_image (InputLayer)    [(None, 100, 100, 3)]     0         
                                                                 
 conv2d (Conv2D)             (None, 91, 91, 64)        19264     
                                                                 
 max_pooling2d (MaxPooling2D  (None, 46, 46, 64)       0         
 )                                                               
                                                                 
 conv2d_1 (Conv2D)           (None, 40, 40, 128)       401536    
                                                                 
 max_pooling2d_1 (MaxPooling  (None, 20, 20, 128)      0         
 2D)                                                             
                                                                 
 conv2d_2 (Conv2D)           (None, 17, 17, 128)       26

In [20]:
# custom neural network layer

# Siamese L1 Distance class
class L1Dist(Layer):
    def __init__(self, **kwargs):
        super().__init__()

    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)


In [21]:
def make_model():

    # inputs
    input = Input(name='input_img', shape=(100, 100, 3))
    validation = Input(name='validation_img', shape=(100, 100, 3))

    # combine siamese distance components
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(model(input), model(validation))

    # classification layer
    classifier = Dense(1, activation='sigmoid')(distances)

    return Model(inputs=[input, validation], outputs=classifier, name='SiameseCat')


In [22]:
siamese_model = make_model()

### training

In [23]:
# setting up loss and optimizer
binary_cross_loss = tf.losses.BinaryCrossentropy()
opt = tf.keras.optimizers.Adam(1e-4)


In [24]:
# checkpoint callbacks incase there are problems with training
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt = opt, siamese_model=siamese_model)

In [25]:
# build train step function
@tf.function
def train_step(batch):

    with tf.GradientTape() as tape:
        # get anchor and pos/neg image
        X = batch[:2]
        # get label
        y = batch[2]

        # forward pass
        yhat = siamese_model(X, training=True)
        # calculate loss
        loss = binary_cross_loss(y, yhat)

    # calculate gradients
    grad = tape.gradient(loss, siamese_model.trainable_variables)

    # calculate weights and apply to model
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))

    return loss

In [26]:
# training
def train(data, EPOCHS):

    # loop through epochs
    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))

        # oop through batches
        for idx, batch in enumerate(data):
            train_step(batch)
            progbar.update(idx+1)

        # save checkpoints
        if epoch % 10 == 0:
            checkpoint.save(file_prefix = checkpoint_prefix)

In [27]:
EPOCHS = 50

In [28]:
train(trained, EPOCHS)


 Epoch 1/50

 Epoch 2/50

 Epoch 3/50

 Epoch 4/50

 Epoch 5/50

 Epoch 6/50

 Epoch 7/50

 Epoch 8/50

 Epoch 9/50

 Epoch 10/50

 Epoch 11/50

 Epoch 12/50

 Epoch 13/50

 Epoch 14/50

 Epoch 15/50

 Epoch 16/50

 Epoch 17/50

 Epoch 18/50

 Epoch 19/50

 Epoch 20/50

 Epoch 21/50

 Epoch 22/50

 Epoch 23/50

 Epoch 24/50

 Epoch 25/50

 Epoch 26/50

 Epoch 27/50

 Epoch 28/50

 Epoch 29/50

 Epoch 30/50

 Epoch 31/50

 Epoch 32/50

 Epoch 33/50

 Epoch 34/50

 Epoch 35/50

 Epoch 36/50

 Epoch 37/50

 Epoch 38/50

 Epoch 39/50

 Epoch 40/50

 Epoch 41/50

 Epoch 42/50

 Epoch 43/50

 Epoch 44/50

 Epoch 45/50

 Epoch 46/50

 Epoch 47/50

 Epoch 48/50

 Epoch 49/50

 Epoch 50/50


### evaluate model

In [31]:
# import metric calculations
from keras.metrics import Precision, Recall

In [32]:
# get batch of test data
test_input, test_val, y_true = tested.as_numpy_iterator().next()

In [34]:
# predictions
y_hat = siamese_model.predict([test_input, test_val])
y_hat



array([[9.9999309e-01],
       [9.9999976e-01],
       [9.9999905e-01],
       [9.9999392e-01],
       [9.9998355e-01],
       [9.9998927e-01],
       [5.0777514e-11],
       [9.9996209e-01],
       [9.9980706e-01],
       [9.9999166e-01],
       [1.0000000e+00],
       [9.9999988e-01],
       [1.0000000e+00],
       [4.7239816e-12],
       [1.5443680e-10],
       [8.6299211e-11]], dtype=float32)

In [35]:
# post processing results
"""
result = []
for prediction in y_hat:
  if prediction > 0.5:
    res.append(1)
  else:
    res.append(0)
"""
[1 if prediction > 0.5 else 0 for prediction in y_hat]

[1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 0, 0, 0]

In [36]:
y_true

array([1., 1., 1., 1., 1., 1., 0., 1., 1., 1., 1., 1., 1., 0., 0., 0.],
      dtype=float32)

In [37]:
# calculating metrics
m = Recall()
m.update_state(y_true, y_hat)
m.result().numpy()

1.0

In [39]:
m = Precision()
m.update_state(y_true, y_hat)
m.result().numpy()

1.0

### save model

In [40]:
# save weights
siamese_model.save('siamesecat.h5')



In [42]:
# reload model
model = tf.keras.models.load_model('siamesecat.h5',
                                   custom_objects = {'L1Dist': L1Dist, "BinaryCossentropy": tf.losses.BinaryCrossentropy})



In [43]:
model.predict([test_input, test_val])



array([[9.9999309e-01],
       [9.9999976e-01],
       [9.9999905e-01],
       [9.9999392e-01],
       [9.9998355e-01],
       [9.9998927e-01],
       [5.0777514e-11],
       [9.9996209e-01],
       [9.9980706e-01],
       [9.9999166e-01],
       [1.0000000e+00],
       [9.9999988e-01],
       [1.0000000e+00],
       [4.7239816e-12],
       [1.5443680e-10],
       [8.6299211e-11]], dtype=float32)

In [44]:
model.summary()

Model: "SiameseCat"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_img (InputLayer)         [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 validation_img (InputLayer)    [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 embedding (Functional)         (None, 4096)         38960448    ['input_img[0][0]',              
                                                                  'validation_img[0][0]']