In [1]:
!pip install tensorflow tensorflow-gpu matplotlib opencv-python



In [1]:
import numpy as np
import cv2
import os 
import matplotlib.pyplot as plt
import random
import uuid
import random

from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Layer, Conv2D, Dense, Flatten, MaxPooling2D
from tensorflow.keras.metrics import Precision, Recall
import tensorflow as tf

c:\users\nogi2\appdata\local\programs\python\python39\lib\site-packages\numpy\.libs\libopenblas.EL2C6PLE4ZYW3ECEVIV3OXXGRN2NRFM2.gfortran-win_amd64.dll
c:\users\nogi2\appdata\local\programs\python\python39\lib\site-packages\numpy\.libs\libopenblas.XWYDX2IKJW2NMTWSFYNGFUWKQU3LYTCZ.gfortran-win_amd64.dll


In [2]:
gpus=tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu,True)

In [3]:
POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')

In [5]:
pth=['positive', 'negative', 'anchor']
for p in pth:
    os.makedirs(os.path.join('data',p))

# Data Collection:

In [6]:
## http://vis-www.cs.umass.edu/lfw/
!tar -xf lfw.tgz

In [7]:
for directory in os.listdir('lfw'):
    for file in os.listdir(os.path.join('lfw', directory)):
        old_path = os.path.join('lfw', directory, file)
        new_path = os.path.join(NEG_PATH, file)
        os.replace(old_path, new_path)

In [9]:
cap=cv2.VideoCapture(0)
while cap.isOpened():
    ret,frame=cap.read()
    frame=frame[120:120+250,200:200+250, :]
    
    if cv2.waitKey(1) & 0xFF == ord('a'):
        pthanch=os.path.join(ANC_PATH, f'{uuid.uuid1()}.jpg')
        cv2.imwrite(pthanch, frame)
        
    if cv2.waitKey(1) & 0xFF == ord('p'):
        pthpos= os.path.join(POS_PATH, f'{uuid.uuid1()}.jpg')
        cv2.imwrite(pthpos,frame)
    
    cv2.imshow('Webcam', frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()

# Preprocessing and Dataloader:

In [4]:
def preprocess(pth):
    byte_img=tf.io.read_file(pth)
    jpg_img=tf.io.decode_jpeg(byte_img)
    img=tf.image.resize(jpg_img,(100,100))
    img=img/255.0
    return img

def preprocess_tuple(anch,val,label):
    return (preprocess(anch),preprocess(val),label)

# Data Augmentation:

In [11]:
def data_augment(img):
    data = []
    for i in range(9):
        img = tf.image.stateless_random_brightness(img, max_delta=0.02, seed=(1,2))
        img = tf.image.stateless_random_contrast(img, lower=0.6, upper=1, seed=(1,3))
        img = tf.image.stateless_random_flip_left_right(img, seed=(np.random.randint(100),np.random.randint(100)))
        img = tf.image.stateless_random_jpeg_quality(img, min_jpeg_quality=90, max_jpeg_quality=100, seed=(np.random.randint(100),np.random.randint(100)))
        img = tf.image.stateless_random_saturation(img, lower=0.9, upper=1, seed=(np.random.randint(100), np.random.randint(100)))
        data.append(img)
    return data

In [12]:
aug_pth = [ANC_PATH, POS_PATH]
for path in aug_pth:
    for file_name in os.listdir(os.path.join(path)):
        img_path=os.path.join(path, file_name)
        img = cv2.imread(img_path)
        augmented_images = data_augment(img)
        
        for aug_image in augmented_images:
            cv2.imwrite(os.path.join(path, f'{uuid.uuid1()}.jpg'), aug_image.numpy())

In [5]:
anchor = tf.data.Dataset.list_files(ANC_PATH + '\*.jpg').take(4000)
positive = tf.data.Dataset.list_files(POS_PATH + '\*.jpg').take(4000)
negative = tf.data.Dataset.list_files(NEG_PATH + '\*.jpg').take(4000)

In [6]:
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
dataset = positives.concatenate(negatives)

# Train & Test Split:

In [9]:
data = dataset.map(preprocess_tuple)
data = data.cache()
data = data.shuffle(buffer_size=10000)

train_data = data.take(round(len(data)*0.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

taken_data = data.skip(round(len(data)*0.7))
test_data = taken_data.take(round(len(data)*0.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

# Siamese Network:
Embedding Layer

In [10]:
def embedding_block():
    inp = Input(shape=(100,100,3), name="input_layer")
    c1=Conv2D(64, (10,10), activation='relu')(inp)
    m1=MaxPooling2D(64, (2,2), padding='same')(c1)
    
    c2=Conv2D(128, (7,7), activation='relu')(m1)
    m2=MaxPooling2D(64, (2,2), padding='same')(c2)
    
    c3=Conv2D(128, (4,4), activation='relu')(m2)
    m3=MaxPooling2D(64, (2,2), padding='same')(c3)
    
    c4=Conv2D(256, (4,4), activation='relu')(m3)
    f1=Flatten()(c4)
    d1=Dense(4096, activation='sigmoid')(f1)

    return Model(inputs=[inp], outputs=[d1], name='embedding_block')

embedding = embedding_block()

L1 Distance Layer

In [5]:
class l1_distance(Layer):
    def __init__(self, **kwargs):
        super().__init__()
    
    def call(self, anchor_embedding, validation_embedding):
        return tf.math.abs(anchor_embedding - validation_embedding)

In [12]:
def siamese():
    anch = Input(name="anchor", shape=(100,100,3))
    val = Input(name="validation", shape=(100,100,3))
    l1 = l1_distance()
    l1._name = 'distance'
    distance = l1(embedding(anch), embedding(val))
    classifier = Dense(1, activation='sigmoid')(distance)
    return Model(inputs=[anch, val], outputs=classifier, name='Siamese')
                
Siamese_net = siamese()

In [13]:
binary_cross_entropy_loss = tf.losses.BinaryCrossentropy()
opt=tf.keras.optimizers.Adam(lr=1e-4)

  super().__init__(name, **kwargs)


In [14]:
embedding.summary()

Model: "embedding_block"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_layer (InputLayer)    [(None, 100, 100, 3)]     0         
                                                                 
 conv2d (Conv2D)             (None, 91, 91, 64)        19264     
                                                                 
 max_pooling2d (MaxPooling2D  (None, 46, 46, 64)       0         
 )                                                               
                                                                 
 conv2d_1 (Conv2D)           (None, 40, 40, 128)       401536    
                                                                 
 max_pooling2d_1 (MaxPooling  (None, 20, 20, 128)      0         
 2D)                                                             
                                                                 
 conv2d_2 (Conv2D)           (None, 17, 17, 128)   

In [15]:
Siamese_net.summary()

Model: "Siamese"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 anchor (InputLayer)            [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 validation (InputLayer)        [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 embedding_block (Functional)   (None, 4096)         38960448    ['anchor[0][0]',                 
                                                                  'validation[0][0]']       

# Training:

In [16]:
#Checkpoints:
ckpt_dir='./Checkpoints'
checkpoint_prefix = os.path.join(ckpt_dir,'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=Siamese_net)

In [17]:
@tf.function
def train_batch(batch):
    with tf.GradientTape() as tape:
        X=batch[:2]
        Y=batch[2]
        yhat=Siamese_net(X, training=True)
        loss=binary_cross_entropy_loss(Y,yhat)
        #print(loss)
    gradient=tape.gradient(loss, Siamese_net.trainable_variables)
    opt.apply_gradients(zip(gradient, Siamese_net.trainable_variables))
    return loss

In [21]:
def train(train_data,EPOCHS):
    for epoch in range(1,EPOCHS+1):
        print(f"Epoch {epoch} of {EPOCHS}:")
        progbar=tf.keras.utils.Progbar(len(train_data))
        p = Precision()
        r = Recall()
        for id, batch in enumerate(train_data):
            loss=train_batch(batch)
            y_hat=Siamese_net.predict(batch[:2])
            r.update_state(y_true=batch[2],y_pred=y_hat)
            p.update_state(y_true=batch[2],y_pred=y_hat)
            progbar.update(id+1)
        print(loss.numpy())
        if (epoch % 10 == 0):
            checkpoint.save(file_prefix=checkpoint_prefix)

In [22]:
#training 
EPOCHS=50
train(train_data, EPOCHS)

Epoch 1 of 50:


0.009170415
Epoch 51 of 50:


0.008229048


# Evaluation and Testing :

In [24]:
p = Precision()
r = Recall()
# test_anch, test_val, label = test_data.as_numpy_iterator().next()
# y_hat=Siamese_net.predict([test_anch, test_val])
# [1 if pred>0.5 else o for pred in y_hat]

In [25]:
for test_anch, test_val, label in (test_data.as_numpy_iterator()):
    y_hat=Siamese_net.predict([test_anch, test_val])
    p.update_state(y_true=label, y_pred=y_hat)
    r.update_state(y_true=label, y_pred=y_hat)
print(r.result().numpy(), p.result().numpy())

0.99658996 0.99914527


In [26]:
Siamese_net.save('Siamese-Recognizer.h5')



In [27]:
del Siamese_net

In [6]:
Siamese_net = tf.keras.models.load_model('Siamese-Recognizer.h5',
                                         custom_objects={'l1_distance':l1_distance, 'BinaryCrossentropy': tf.losses.BinaryCrossentropy})



# Real Time Verification:

In [7]:
def verify(model, detection_threshold, verification_threshold):
    results = []
    for image in os.listdir(os.path.join('application_data', 'verification_images')):
        inp = preprocess(os.path.join('application_data', 'input_image', 'input_image.jpg'))
        val = preprocess(os.path.join('application_data', 'verification_images', image))
        
        result = model.predict(list(np.expand_dims([inp,val], axis=1)))
        results.append(result)
    
    detection = np.sum(np.array(results) > detection_threshold)
    verification = detection/len(os.path.join('application_data','verification_images'))
    flag = (verification > verification_threshold)
    return results, flag

In [8]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()
    frame = frame[120:120+250, 200:200+250, ]
    cv2.imshow('Webcam', frame)
    if cv2.waitKey(1) & 0xFF == ord('v'):
        cv2.imwrite(os.path.join('application_data', 'input_image', 'input_image.jpg'), frame)
        results, verification_flag = verify(Siamese_net, 0.9, 0.7)
        print('Verified Face') if verification_flag==1 else print('Unverified Face')
        
    if cv2.waitKey(10) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()

Bruh
Bruh2
Verified Face
Bruh3
Bruh
Bruh2
Unverified Face
Bruh3
Bruh
Bruh2
Unverified Face
Bruh3
Bruh


Bruh2
Unverified Face
Bruh3
Bruh
Bruh2
Verified Face
Bruh3
Bruh
Bruh2
Verified Face
Bruh3
Bruh


Bruh2
Unverified Face
Bruh3
Bruh
Bruh2
Unverified Face
Bruh3
