In [None]:
!pip install tensorflow tensorflow-gpu opencv-python

# import dependencies

In [None]:
import tensorflow as tf
import cv2
import os
import glob
import matplotlib.pyplot as plt
import numpy as np
import random

In [None]:
from tensorflow.keras import Model
from tensorflow.keras.layers import Dense, Conv2D, Layer , MaxPooling2D, Input, Flatten

# Set gpu growth

In [None]:
gpus=tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
  tf.config.experimental.set_memory_growth(gpu,True)

# Structure Folders

In [None]:
POS_PATH=os.path.join('data','positive')
NEG_PATH=os.path.join('data','negative')
ANC_PATH=os.path.join('data','anchor')

In [None]:
os.makedirs(POS_PATH)
os.makedirs(NEG_PATH)
os.makedirs(ANC_PATH)

# Collect Images (Positives and Anchors)

## Untar labelled faces from the dataset

In [None]:
!tar -xf lfw.tgz -C .

In [None]:
# Specify the directory pattern and filename pattern separately
directory_pattern = 'lfw/*/'

# Use glob to get a list of file paths that match the pattern
file_list = glob.glob(f'{directory_pattern}*.jpg')


In [None]:
print([os.path.basename(file) for file in file_list])

## collect negative images

In [None]:
for file in file_list:
  old_path=file
  new_path = os.path.join(NEG_PATH,os.path.basename(file))
  os.replace(old_path,new_path)

In [None]:
import shutil
shutil.rmtree('lfw')

## Collect Positives and Anchor Classes

In [None]:
import uuid
cap = cv2.VideoCapture(0)
if not cap.isOpened():
  print('error in opening camera')

while True:
  ret, frame = cap.read()
  frame=frame[20:300,200:550]
  # Display the captured image 
  cv2.imshow('captured_image',frame)
  if cv2.waitKey(1) & 0xFF == ord('q'):
    break 
  if cv2.waitKey(1) & 0xFF == ord('a'):
    imgname=os.path.join(ANC_PATH,f'{uuid.uuid1()}.jpg')
    cv2.imwrite(imgname, frame)
    
  if cv2.waitKey(1) & 0xFF == ord('p'):
    imgname=os.path.join(POS_PATH,f'{uuid.uuid1()}.jpg')
    cv2.imwrite(imgname, frame)
    

cv2.destroyAllWindows()
# Release the camera
cap.release()

In [None]:
l=glob.glob('data/anchor/*.jpg')
len(l)

# Preprocess Images

## get image directories

In [None]:
negative= tf.data.Dataset.list_files(f'{NEG_PATH}\*.jpg').take(3000)
positive= tf.data.Dataset.list_files(f'{POS_PATH}\*.jpg').take(3000)
anchor= tf.data.Dataset.list_files(f'{ANC_PATH}\*.jpg').take(3000)


In [None]:
list(negative)

In [None]:
neg_iterator=iter(negative)
next(neg_iterator)

In [None]:
negatives_list = negative.as_numpy_iterator()
sample_img=negatives_list.next()

## Scale and resize

In [None]:
def preprocess(img_path):
    img_bytes=tf.io.read_file(img_path)
    img=tf.image.decode_image(img_bytes, channels=3)
    img.set_shape([105, 105, 3])
    img=tf.image.resize(img,(105,105),method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)
    
    img= tf.cast(img, tf.float32) / 255.0
    # img= tf.image.convert_image_dtype(img, tf.float32)
    return img

In [None]:
img = preprocess(sample_img)
plt.imshow(img)

## Create labelled dataset

In [None]:
positives = tf.data.Dataset.zip((anchor,positive,tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor,negative,tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)


In [None]:
i=iter(data)
i.next()

In [None]:
i=data.as_numpy_iterator()
i.next()[0]

## Build Train and Test Partition

In [None]:
def process_tuple(input_img,validation_img,label):
    return(preprocess(input_img),preprocess(validation_img),label)


In [None]:
data=data.map(process_tuple)
data = data.cache()
data = data.shuffle(buffer_size=1024)

In [None]:
i=data.as_numpy_iterator()
sample=i.next()
fig,ax=plt.subplots(1,2)
ax[0].imshow(sample[0])
ax[1].imshow(sample[1])
plt.show()
print(sample[2])

In [None]:
train_data = data.take(round(len(data) * .7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [None]:
test_data = data.skip(round(len(data) * .7))
test_data = test_data.take(round(len(test_data) * .3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

# Model Building

## build embedding part

In [None]:
def make_embeding():
    input = Input(shape=(105,105,3),name='input_image')

    x = Conv2D(64,(10,10),activation='relu')(input)
    x = MaxPooling2D(64,(2,2),padding='same')(x)

    x = Conv2D(128,(7,7),activation='relu')(x)
    x = MaxPooling2D(64,(2,2),padding='same')(x)

    x = Conv2D(128,(4,4),activation='relu')(x)
    x = MaxPooling2D(64,(2,2),padding='same')(x)

    x = Conv2D(256,(4,4),activation='relu')(x)
    x = Flatten() (x)
    x = Dense(4096,activation='sigmoid')(x)
    
    return tf.keras.Model(inputs= [input], outputs= [x], name= 'embedding_blocks' )

In [None]:
embedding = make_embeding()
embedding.summary()

## Build distance layer

In [None]:
class L1Dist(Layer):
    def __init__(self,**kwargs):
        super().__init__()
    
    def call(self, input_embeddings, validation_embeddings):
        return tf.math.abs(input_embeddings - validation_embeddings)

In [None]:
L1 = L1Dist()
L1

## Build Siamese Model

In [None]:
def siames_model():
    anchor_image = Input(shape= (105,105,3), name='input_image')
    validation_image = Input(shape= (105,105,3), name='validation_image')

    distance_layer = L1Dist()
    distance_layer.__name = 'distance_layer'
    distances = distance_layer(embedding(anchor_image),embedding(validation_image))

    classifier = Dense(1, activation = 'sigmoid')(distances)
    
    return tf.keras.Model(inputs = [anchor_image,validation_image], outputs = [classifier], name = 'siamese_network')

In [None]:
siamese_model = siames_model()

# Model Training

## 1. loss function

In [None]:
binary_cross_loss = tf.losses.BinaryCrossentropy()

## 2. Setup optimizer

In [None]:
opt = tf.keras.optimizers.Adam(0.0001)

## Establish Checkpoints

In [None]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir,'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)

## 3. Train Step function

In [None]:
@tf.function
def train_step(batch):
    with tf.GradientTape() as tape:
       X = batch[:2]
       y = batch[2]

       y_pred = siamese_model(X,training=True)
       loss = binary_cross_loss(y,y_pred)

       grad = tape.gradient(loss,siamese_model.trainable_variables)

       opt.apply_gradients(zip(grad,siamese_model.trainable_variables))
       
       return loss

## 4. Trainning Loop

In [None]:
def train(data,epochs):
   for epoch in range(epochs,epochs+1):
      print(f'\n EPOCH: {epoch}/{epochs}')
      progbar = tf.keras.utils.Progbar(len(data))

      for idx,batch in enumerate(data):
         train_step(batch)
         progbar.update(idx+1)

      #save checkpoints
      if epoch % 10 == 0:
         checkpoint.save(file_prefix=checkpoint_prefix)

## 5. Train the model

In [None]:
EPOCHS=50
train(train_data,EPOCHS)

# Evaluate Model 

In [None]:
from tensorflow.keras.metrics import Recall, Precision

In [None]:
t=test_data.as_numpy_iterator()
t.next()[0].shape

In [None]:
test_input, test_validation, y_true =test_data.as_numpy_iterator().next()

In [None]:
test_input.shape

In [None]:
y_pred = siamese_model.predict([test_input,test_validation])

In [None]:
y_pred

In [None]:
y_true

In [None]:
p = Precision()
p.update_state(y_true,y_pred)
p.result().numpy()

In [None]:
m = Recall()
m.update_state(y_true,y_pred)
m.result().numpy()

# Save Model

In [None]:
siamese_model.save('siames_model.v0')

In [None]:
#reload model
model=tf.keras.models.load_model('siames_model.v0',custom_objects={'L1Dist':L1Dist,'binary_cross_loss':tf.losses.BinaryCrossentropy})

# Real Time Detection

In [None]:
def verify(model, detection_threshold, verification_threshold):
    results=[]
    for image in os.listdir(os.path.join('application_data','verification_images')):
        input_img = preprocess(os.path.join('application_data','input_image','input_image.jpg'))
        validation_img = preprocess(os.path.join('application_data','input_image',image))
        
        #make predictions
        pred = model.predict(list(np.expand_dims([input_img,validation_img],axis=1)))
        results.append(pred)

    detection = np.sum(np.array(results) > detection_threshold)
    verification = detection / len(os.listdir(os.path.join('application_data','verification_images')))
    verified = verification > verification_threshold

    return results, verified


In [None]:
cap = cv2.VideoCapture(0)
if not cap.isOpened():
    print('error while openning camera')

while True:
    ret, frame = cap.read()
    frame=frame[20:300,200:550,:]
    # Display the captured image 
    cv2.imshow('captured_image',frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break 
    if cv2.waitKey(2) & 0xFF == ord('c'):
        path = os.path.join('application_data','input_image','input_image.jpg')
        cv2.imwrite(path, frame)
        results, verified = verify(model,0.5,0.5)  
        print(verified)      
        
    
cv2.destroyAllWindows()
# Release the camera
cap.release()

    