In [None]:
!pip install tensorflow opencv-python matplotlib

In [None]:
import os
import cv2
import uuid
import random
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow import GradientTape
from tensorflow.train import Checkpoint
from tensorflow.keras.models import Model
from tensorflow.keras.utils import Progbar
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import Precision, Recall
from tensorflow.keras.losses import BinaryCrossentropy
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten

In [None]:
gpus = tf.config.experimental.list_physical_devices("GPU")
for gpu in gpus :
  tf.config.experimental.set_memory_growth(gpu, True)

In [None]:
tf.compat.v1.enable_eager_execution()

In [None]:
POSITIVE_PATH = os.path.join("data", "positive")
NEGATIVE_PATH = os.path.join("data", "negative")
ANCHOR_PATH = os.path.join("data", "anchor")

In [None]:
os.makedirs(POSITIVE_PATH)
os.makedirs(NEGATIVE_PATH)
os.makedirs(ANCHOR_PATH)

In [None]:
!wget http://vis-www.cs.umass.edu/lfw/lfw.tgz

In [None]:
!tar -xf lfw.tgz

In [None]:
for directory in os.listdir("lfw") :
  for file in os.listdir(os.path.join("lfw", directory)) :
    OLD_PATH = os.path.join("lfw", directory, file)
    NEW_PATH = os.path.join(NEGATIVE_PATH, file)
    os.replace(OLD_PATH, NEW_PATH)

In [None]:
capture = cv2.VideoCapture(0)

while capture.isOpened() :
  ret, frame = capture.read()
  frame = frame[100 : 350, 200 : 450, : ]

  if cv2.waitKey(1) & 0XFF == ord("a") :
    img_name = os.path.join(ANCHOR_PATH, f"{uuid.uuid1()}.jpg")
    cv2.imwrite(img_name, frame)

  if cv2.waitKey(1) & 0XFF == ord("p") :
    img_name = os.path.join(POSITIVE_PATH, f"{uuid.uuid1()}.jpg")
    cv2.imwrite(img_name, frame)

  cv2.imshow("Image Collection", frame)

  if cv2.waitKey(1) & 0XFF == ord("q") :
    break

capture.release()
cv2.destroyAllWindows()

In [None]:
plt.imshow(frame[100 : 350, 200 : 450, : ])

In [None]:
anchor_dir = tf.data.Dataset.list_files(ANCHOR_PATH + "/*.jpg").take(300)
positive_dir = tf.data.Dataset.list_files(POSITIVE_PATH + "/*.jpg").take(300)
negative_dir = tf.data.Dataset.list_files(NEGATIVE_PATH + "/*.jpg").take(300)

In [None]:
def preprocess(file_path) :
  byte_img = tf.io.read_file(file_path)
  img = tf.io.decode_jpeg(byte_img)
  img = tf.image.resize(img, (100, 100))
  img = img / 255.0

  return img

In [None]:
positives = tf.data.Dataset.zip((anchor_dir, positive_dir, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor_dir)))))
negatives = tf.data.Dataset.zip((anchor_dir, negative_dir, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor_dir)))))
data = positives.concatenate(negatives)

In [None]:
def preprocess_util(input_img, validation_img, label) :
  return (preprocess(input_img), preprocess(validation_img), label)

In [None]:
data = data.map(preprocess_util)
data = data.cache()
data = data.shuffle(buffer_size = 1024)

In [None]:
train_data = data.take(round(len(data) * 0.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [None]:
 test_data = data.skip(round(len(data) * 0.7))
 test_data = test_data.take(round(len(data) * 0.3))
 test_data = test_data.batch(16)
 test_data = test_data.prefetch(8)

In [None]:
def create_embeddings_model() :
  input_layer = Input(shape = (100, 100, 3), name = "input_layer")
  convolution_layer_1 = Conv2D(64, (10, 10), activation = "relu", name = "convolution_layer_1")(input_layer)
  max_pooling_layer_1 = MaxPooling2D(64, (2, 2), padding = "same", name = "max_pooling_layer_1")(convolution_layer_1)
  convolution_layer_2 = Conv2D(128, (8, 8), activation = "relu", name = "convolution_layer_2")(max_pooling_layer_1)
  max_pooling_layer_2 = MaxPooling2D(64, (2, 2), padding = "same", name = "max_pooling_layer_2")(convolution_layer_2)
  convolution_layer_3 = Conv2D(128, (6, 6), activation = "relu", name = "convolution_layer_3")(max_pooling_layer_2)
  max_pooling_layer_3 = MaxPooling2D(64, (2, 2), padding = "same", name = "max_pooling_layer_3")(convolution_layer_3)
  convolution_layer_4 = Conv2D(256, (4, 4), activation = "relu", name = "convolution_layer_4")(max_pooling_layer_3)
  flatten_layer_1 = Flatten()(convolution_layer_4)
  dense_layer_1 = Dense(4096, activation = "sigmoid")(flatten_layer_1)

  return Model(inputs = [input_layer], outputs = [dense_layer_1], name = "embedding_model")

In [None]:
embedding_model = create_embeddings_model()

In [None]:
embedding_model.summary()

In [None]:
class L1Dist(Layer) :
  def __init__(self, **kwargs) :
    super().__init__()

  def call(self, input_embedding, validation_embedding) :
    return tf.math.abs(input_embedding -  validation_embedding)

In [None]:
def create_siamese_model() :
  input_image = Input(name = "input_img", shape = (100, 100, 3))
  validation_image = Input(name = "validation_img", shape = (100, 100, 3))
  siamese_layer = L1Dist()
  siamese_layer._name = "l1_distance_layer"
  distances = siamese_layer(embedding_model(input_image), embedding_model(validation_image))
  classification_layer = Dense(1, activation = "sigmoid")(distances)

  return Model(inputs = [input_image, validation_image], outputs = [classification_layer], name = "siamese_model")

In [None]:
siamese_model = create_siamese_model()

In [None]:
siamese_model.summary()

In [None]:
loss = BinaryCrossentropy()
optimizer = Adam(learning_rate = 0.003)

In [None]:
checkpoint_dir = "training_checkpoints"
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = Checkpoint(opt = optimizer, model = siamese_model)

In [None]:
@tf.function
def train_step(batch) :
  with GradientTape() as tape :
    X = batch[:2]
    y = batch[2]
    y_hat = siamese_model(X, training = True)
    step_loss = loss(y, y_hat)

  grad = tape.gradient(step_loss, siamese_model.trainable_variables)
  optimizer.apply_gradients(zip(grad, siamese_model.trainable_variables))

  return step_loss

In [None]:
def train(data, epochs):
    for epoch in range(1, epochs + 1):
        print('\nEpoch {}/{}'.format(epoch, epochs))
        progress_bar = Progbar(len(data))

        r = Recall()
        p = Precision()

        for idx, batch in enumerate(data):
            step_loss = train_step(batch)
            y_hat = siamese_model.predict(batch[:2])
            r.update_state(batch[2], y_hat)
            p.update_state(batch[2], y_hat)
            progress_bar.update(idx+1)
        print(f"Loss : {step_loss.numpy()} Recall : {r.result().numpy()} Precision : {p.result().numpy()}")

        if epoch % 10 == 0:
            checkpoint.save(file_prefix=checkpoint_prefix)

In [None]:
train(train_data, 50)

In [None]:
test_input, test_validation, y = test_data.as_numpy_iterator().next()

In [None]:
y_hat = siamese_model.predict([test_input, test_validation])

In [None]:
y_hat = [1 if prediction > 0.5 else 0 for prediction in y_hat]

In [None]:
y_hat

In [None]:
precision, recall = Precision(), Recall()
precision.update_state(y, y_hat)
recall.update_state(y, y_hat)
precision.result().numpy(), recall.result().numpy()

In [None]:
plt.figure(figsize = (18, 8))
plt.subplot(1, 2, 1)
plt.imshow(test_input[0])
plt.subplot(1, 2, 2)
plt.imshow(test_validation[0])
plt.show()

In [None]:
siamese_model.save("model")

In [None]:
model = tf.keras.models.load_model("model", custom_objects = {"L1Dist" : L1Dist, "BinaryCrossentropy" : BinaryCrossentropy})

In [None]:
model.predict([test_input, test_validation])

In [None]:
model.summary()

In [None]:
!zip -r model.zip "model"

In [None]:
os.mkdir("application_data")
os.mkdir("application_data/validation_images")
os.mkdir("application_data/input_image")

In [None]:
INPUT_PATH = os.path.join("application_data", "input_image")
VALIDATION_PATH = os.path.join("application_data", "validation_images")

In [None]:
capture = cv2.VideoCapture(3)

while capture.isOpened() :
  ret, frame = capture.read()
  frame = frame[100 : 350, 200 : 450, : ]

  if cv2.waitKey(1) & 0XFF == ord("v") :
    img_name = os.path.join(VALIDATION_PATH, f"{uuid.uuid1()}.jpg")
    cv2.imwrite(img_name, frame)

  cv2.imshow("Verification Images", frame)

  if cv2.waitKey(1) & 0XFF == ord("q") :
    break

capture.release()
cv2.destroyAllWindows()

In [None]:
def verify(model, detection_threshold, verification_threshold) :
  results = []
  for image in os.listdir(VALIDATION_PATH) :
    input_img = preprocess(os.path.join(INPUT_PATH, "img.jpg"))
    validation_img = preprocess(os.path.join(VALIDATION_PATH, image))
    result = model.predict(list(np.expand_dims([input_img, validation_img], axis = 1)))
    results.append(result)

  detection = np.sum(np.array(results) > detection_threshold)
  verification = detection / len(os.listdir(os.path.join(VALIDATION_PATH)))
  verified = verification > verification_threshold

  return verified

In [None]:
capture = cv2.VideoCapture(3)

while capture.isOpened() :
  ret, frame = capture.read()
  frame = frame[100 : 350, 200 : 450, : ]

  if cv2.waitKey(1) & 0XFF == ord("i") :
    img_name = os.path.join(INPUT_PATH, "img.jpg")
    cv2.imwrite(img_name, frame)
    verified = verify(model, 0.7, 0.9)
    if verified ==  True :
      print("You are verified")
    else :
      print("Sorry, couldn't verify you")

  cv2.imshow("Input Image", frame)

  if cv2.waitKey(1) & 0XFF == ord("q") :
    break

capture.release()
cv2.destroyAllWindows()