In [None]:
import os
import uuid
import random
import numpy as np
import cv2
from matplotlib import pyplot as plt
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
from tensorflow.keras.metrics import Precision, Recall

In [None]:
# DIRECTORY PATHS
POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')
INPUT_IMAGE_PATH = os.path.join('application_data', 'input_image')
VERIFICATION_IMAGES_PATH = os.path.join('application_data', 'verification_images')


In [None]:
# FUNCTION TO CREATE DIRECTORIES IF THEY DON'T EXIST
def create_dirs():
    os.makedirs(POS_PATH, exist_ok=True)
    os.makedirs(NEG_PATH, exist_ok=True)
    os.makedirs(ANC_PATH, exist_ok=True)
    os.makedirs(INPUT_IMAGE_PATH, exist_ok=True)
    os.makedirs(VERIFICATION_IMAGES_PATH, exist_ok=True)

create_dirs()

In [None]:
# COLLECT POSITIVE AND ANCHOR IMAGES
def collect_images(cap, ANC_PATH, POS_PATH):
    if not cap.isOpened():
        print("Error: Camera not accessible.")
        return

    print(f"Anchor Path: {ANC_PATH}")
    print(f"Positive Path: {POS_PATH}")

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            print("Error: Failed to capture image.")
            break
        
        frame = frame[120:120+250, 200:200+250, :]
        cv2.imshow('Image Collection', frame)

        key = cv2.waitKey(1)
        if key & 0xFF == ord('a'):
            imgname = os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1()))
            print(f"Saving anchor image to {imgname}")
            cv2.imwrite(imgname, frame)
        elif key & 0xFF == ord('p'):
            imgname = os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1()))
            print(f"Saving positive image to {imgname}")
            cv2.imwrite(imgname, frame)
        elif key & 0xFF == ord('q'):
            print("Quitting.")
            break

    cap.release()
    cv2.destroyAllWindows()

cap = cv2.VideoCapture(0)
collect_images(cap, ANC_PATH, POS_PATH)

In [None]:
# LOAD AND PRE-PROCESS DATA
def preprocess(file_path):
    byte_img = tf.io.read_file(file_path)
    img = tf.io.decode_jpeg(byte_img)
    img = tf.image.resize(img, (105, 105))
    img = img / 255.0
    return img

In [None]:
# PREPARE DATASET
anchor = tf.data.Dataset.list_files(ANC_PATH + '/*.jpg').take(400)
positive = tf.data.Dataset.list_files(POS_PATH + '/*.jpg').take(400)
negative = tf.data.Dataset.list_files(NEG_PATH + '/*.jpg').take(400)

positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

def preprocess_twin(input_img, validator_img, label):
    return (preprocess(input_img), preprocess(validator_img), label)

data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=1024)

In [None]:
# SPLIT DATA INTO TRAINING AND TESTING SETS
train_data = data.take(round(len(data) * 0.7))
train_data = train_data.batch(16).prefetch(8)
test_data = data.skip(round(len(data) * 0.7)).take(round(len(data) * 0.3)).batch(16).prefetch(8)

In [None]:
# EMBEDDING NETWORK
def make_embedding():
    inp = Input(shape=(105, 105, 3), name='input_image')
    x = Conv2D(64, (10, 10), activation='relu')(inp)
    x = MaxPooling2D(64, (2, 2), padding='same')(x)
    x = Conv2D(128, (7, 7), activation='relu')(x)
    x = MaxPooling2D(64, (2, 2), padding='same')(x)
    x = Conv2D(128, (4, 4), activation='relu')(x)
    x = MaxPooling2D(64, (2, 2), padding='same')(x)
    x = Conv2D(256, (4, 4), activation='relu')(x)
    x = Flatten()(x)
    output = Dense(4096, activation='sigmoid')(x)
    return Model(inputs=inp, outputs=output, name='embedding')

embedding = make_embedding()
embedding.summary()

In [None]:
# L1 DISTANCE LAYER
class L1Dist(Layer):
    def __init__(self, **kwargs):
        super().__init__()
        
    def call(self, inputs):
        input_embedding, validation_embedding = inputs
        return tf.math.abs(input_embedding - validation_embedding)

In [None]:
# SIAMESE MODEL
def make_siamese_model():
    input_image = Input(name='input_img', shape=(105, 105, 3))
    validation_image = Input(name='validation_img', shape=(105, 105, 3))
    siamese_layer = L1Dist(name='distance')([embedding(input_image), embedding(validation_image)])
    classifier = Dense(1, activation='sigmoid')(siamese_layer)
    return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')

siamese_model = make_siamese_model()
siamese_model.summary()

In [None]:
# COMPILE AND TRAIN THE MODEL
binary_cross_loss = tf.losses.BinaryCrossentropy()
opt = tf.keras.optimizers.Adam(1e-4)
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)

@tf.function
def train_step(batch):
    with tf.GradientTape() as tape:
        X = batch[:2]
        y = batch[2]
        yhat = siamese_model(X, training=True)
        loss = binary_cross_loss(y, yhat)
    grad = tape.gradient(loss, siamese_model.trainable_variables)
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))
    return loss

def train(data, EPOCHS):
    for epoch in range(1, EPOCHS + 1):
        print(f'\nEpoch {epoch}/{EPOCHS}')
        progbar = tf.keras.utils.Progbar(len(data))
        for idx, batch in enumerate(data):
            train_step(batch)
            progbar.update(idx + 1)
        if epoch % 10 == 0:
            checkpoint.save(file_prefix=checkpoint_prefix)

EPOCHS = 50
train(train_data, EPOCHS)

In [None]:
# EVALUATE MODEL
test_input, test_val, y_true = test_data.as_numpy_iterator().next()
y_hat = siamese_model.predict([test_input, test_val])
y_hat = [1 if prediction > 0.5 else 0 for prediction in y_hat]

recall = Recall()
recall.update_state(y_true, y_hat)
print(f'Recall: {recall.result().numpy()}')

precision = Precision()
precision.update_state(y_true, y_hat)
print(f'Precision: {precision.result().numpy()}')

In [None]:
# VISUALIZE MODEL
plt.figure(figsize=(10, 8))
plt.subplot(1, 2, 1)
plt.imshow(test_input[1])
plt.subplot(1, 2, 2)
plt.imshow(test_input[1])
plt.show()

In [None]:
# SAVE AND LOAD MODEL
siamese_model.save('siamese_model.keras')
model = tf.keras.models.load_model('siamese_model.keras', custom_objects={'L1Dist': L1Dist, 'BinaryCrossentropy': tf.losses.BinaryCrossentropy})
model.summary()

In [None]:
# REAL TIME VERIFICATION FUNCTION
def verify(model, detection_threshold, verification_threshold):
    results = []
    for image in os.listdir(os.path.join('application_data', 'verification_images')):
        input_img = preprocess(os.path.join('application_data', 'input_image', 'input_image.jpg'))
        validation_img = preprocess(os.path.join('application_data', 'verification_images', image))
        result = model.predict([np.expand_dims(input_img, axis=0), np.expand_dims(validation_img, axis=0)])
        results.append(result)
    
    detection = np.sum(np.array(results) > detection_threshold)
    verification = detection / len(os.listdir(os.path.join('application_data', 'verification_images')))
    verified = verification > verification_threshold
    return results, verified

In [None]:
# REAL TIME VERIFICATION USING OPEN-CV
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()
    frame = frame[120:120+250, 200:200+250, :]
    cv2.imshow('Verification', frame)
    
    if cv2.waitKey(10) & 0xFF == ord('v'):
        cv2.imwrite(os.path.join(INPUT_IMAGE_PATH, 'input_image.jpg'), frame)
        results, verified = verify(model, 0.9, 0.9)
        print(verified)
    
    if cv2.waitKey(10) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()