<a href="https://colab.research.google.com/github/supernovaeee/siamese_nn/blob/main/SiameseFineTuned.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# pip install tensorflow opencv-python matplotlib

In [None]:
# pip install -U scikit-learn

In [None]:
pip install tensorflow-addons[tensorflow]

Collecting tensorflow-addons[tensorflow]
  Downloading tensorflow_addons-0.21.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (612 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m612.1/612.1 kB[0m [31m6.6 MB/s[0m eta [36m0:00:00[0m
Collecting typeguard<3.0.0,>=2.7 (from tensorflow-addons[tensorflow])
  Downloading typeguard-2.13.3-py3-none-any.whl (17 kB)
Installing collected packages: typeguard, tensorflow-addons
Successfully installed tensorflow-addons-0.21.0 typeguard-2.13.3


## 1.2 Import Dependencies

In [None]:
import cv2
import os
import random
import numpy as np
from matplotlib import pyplot as plt
import glob
import string
import re
import tensorflow_addons as tfa


TensorFlow Addons (TFA) has ended development and introduction of new features.
TFA has entered a minimal maintenance and release mode until a planned end of life in May 2024.
Please modify downstream libraries to take dependencies from other repositories in our TensorFlow community (e.g. Keras, Keras-CV, and Keras-NLP). 

For more information see: https://github.com/tensorflow/addons/issues/2807 



In [None]:
# Import tensorflow dependencies - functional API
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf
from sklearn.metrics import accuracy_score
from tensorflow.keras import layers, models, losses, optimizers
from tensorflow.keras.utils import Progbar
from sklearn.model_selection import GridSearchCV
from tensorflow.keras.wrappers.scikit_learn import KerasClassifier
from tensorflow.keras.layers import BatchNormalization, Dropout, Dense
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.layers import Lambda


In [None]:
# tf.config.list_physical_devices("GPU")

## 1.3 Create Folder Structures

In [None]:
# set up paths
# for Data datasets (LFW + AFD): drive/MyDrive/data/anchor
# for FaceID datasets (LFW): drive/MyDrive/FaceID/data/positive
NEG_PATH = os.path.join('drive', 'MyDrive', 'CombinedDataset', 'Negative')
POS_PATH = os.path.join('drive', 'MyDrive',  'CombinedDataset', 'Positive')
ANC_PATH = os.path.join('drive', 'MyDrive', 'CombinedDataset', 'Anchor')

# 2. Collect Positives and Anchors

## 2.1 Untar Labelled Faces in the Wild Dataset

In [None]:
# https://vis-www.cs.umass.edu/lfw/

In [None]:
# # uncompress tar LFW dataset
# !tar -xf lfw.tgz

In [None]:


# # Define the letters to check
# letters_to_check = string.ascii_uppercase[2::2]  # 'C', 'E', 'G', ...

# # Move LFW images to the following repo (data/negative)
# for file in os.listdir('FaceID/data/negative'):
#     for letter in letters_to_check:
#         if file.startswith(letter):
#             EX_PATH = os.path.join('FaceID/data/negative', file)
#             NEW_PATH = os.path.join(ANC_PATH, file)
#             os.renames(EX_PATH, NEW_PATH)
#             break  # Break the loop if a match is found


In [None]:
# os.listdir(POS_PATH)

In [None]:
# len(os.listdir(POS_PATH))

In [None]:


# # Move images from anchor to positive if their filenames end with even numbers
# for file in os.listdir(POS_PATH):
#     if re.match('.*\_[01][012][0123456789][02468]\.jpg$', file):
#         EX_PATH = os.path.join(POS_PATH, file)
#         NEW_PATH = os.path.join(ANC_PATH, file)
#         os.renames(EX_PATH, NEW_PATH)


# 3. Load and Preprocess Images

## 3.1 Get Image Directories

In [None]:
# First, ensure that the image files are correctly stored into anchor, positive, and negative folders
# The anchor and positive images should refer to the same person, while negative is for remaining datasets
# Split a person's images into anchor and positive

In [None]:
# document_files = os.listdir(POS_PATH)
# document_files

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Get all filenames in the document directory
document_files = os.listdir(ANC_PATH)

# Extract unique names from the document filenames
names = []
for filename in document_files:
    parts = filename.split('_')
    if len(parts) == 3:
        # Format: FirstName_LastName_0002.jpg
        name = '_'.join(parts[:2]) + '_'
    elif len(parts) == 2:
        # Format: FirstName_0002.jpg
        name = parts[0] + '_'
    else:
        continue  # Skip files with unexpected format

    names.append(name)

# Remove duplicates from the names list
names = list(set(names))


In [None]:
len(names)

2935

In [None]:
# Randomly select names from the available names list
# num_names = len(names) - 1
selected_names = random.sample(names, 2935)

# Create empty anchor and positive datasets
anchor_list = []
positive_list = []
negative = tf.data.Dataset.from_tensor_slices([""])

# Load images from anchor and positive folders and pair them
for i, name in enumerate(selected_names):
    anchor_files = glob.glob(ANC_PATH + f'/{name}*.jpg')
    positive_files = glob.glob(POS_PATH + f'/{name}*.jpg')

    anchor_num = len(anchor_files)
    positive_num = len(positive_files)

    if (anchor_num > positive_num):
      # cycle through the filenames of the most abundant (if anchor files is more abundant than positive files for this name)
      for i in range(anchor_num):
        positive_item = positive_files[i % positive_num] # modulo operator makes sure the index used for positive_files never goes out of bounds. It will cycle back to zero (and increment from zero) once the index reaches a multiple of the list's len()
        anchor_item = anchor_files[i]

        # Append the datasets to anchor and positive
        anchor_list.append(anchor_item)
        positive_list.append(positive_item)
    else:
      # cycle through the filenames of the most abundant (else, if positive files is more abundant than anchor files for this name, or they are equal)
      for i in range(positive_num):
        anchor_item = anchor_files[i % anchor_num] # modulo operator makes sure the index used for anchor_files never goes out of bounds. It will cycle back to zero (and increment from zero) once the index reaches a multiple of the list's len()
        positive_item = positive_files[i]
        # Append the datasets to anchor and positive
        anchor_list.append(anchor_item)
        positive_list.append(positive_item)

# Turn anchor and positive list into tf.data.Dataset (tensorflow dataset(?))
anchor = tf.data.Dataset.from_tensor_slices(anchor_list)
positive = tf.data.Dataset.from_tensor_slices(positive_list)

negative = tf.data.Dataset.list_files(NEG_PATH+'/*.jpg').take(len(anchor)) # take the length of anchor since length of either one is the same.



In [None]:
len(anchor)

2937

In [None]:
# # Print the elements of the paired datasets -- FOR CHECKING
# num_elements_to_show = 30  # Specify the number of elements to show
# anchor_samples = anchor.take(num_elements_to_show)
# positive_samples = positive.take(num_elements_to_show)

# for a, p in tf.data.Dataset.zip((anchor_samples, positive_samples)):
#     print("Anchor:", a)
#     print("Positive:", p)

In [None]:
# # Print the elements of the paired datasets -- FOR CHECKING
# num_elements_to_show = 30  # Specify the number of elements to show
# anchor_samples = anchor.take(num_elements_to_show)
# negative_samples = negative.take(num_elements_to_show)

# for a, p in tf.data.Dataset.zip((anchor_samples, negative_samples)):
#     print("Anchor:", a)
#     print("Negative:", p)

In [None]:
# # Take a look at the anchor dataset -- FOR CHECKING
# dir_test = anchor.as_numpy_iterator()

In [None]:
# print(dir_test.next()) -- FOR CHECKING

## 3.2 Preprocessing - Scale and Resize

In [None]:
def preprocess(file_path):

    # Read in image from file path
    byte_img = tf.io.read_file(file_path)
    # Load in the image
    img = tf.io.decode_jpeg(byte_img)

    # Preprocessing steps - resizing the image to be 100x100x3
    img = tf.image.resize(img, (100,100))
    # Scale image to be between 0 and 1
    img = img / 255.0
#
    # Return image
    return img

In [None]:
def process(file_path):

 # Read in image from file path
    byte_img = tf.io.read_file(file_path)
    # Load in the image
    img = tf.io.decode_jpeg(byte_img)

    return img

In [None]:
# img = preprocess('drive/MyDrive/FaceID/data/anchor/Mick_Jagger_0002.jpg') -- FOR CHECKING

In [None]:
# x = process('drive/MyDrive/FaceID/data/anchor/Mick_Jagger_0002.jpg') -- FOR CHECKING
# # Display the image using Matplotlib
# plt.imshow(x)
# plt.axis('off')  # Remove axis ticks
# plt.show()

In [None]:
# # Convert the TensorFlow tensor to a NumPy array for visualization -- FOR CHECKING
# img_np = img.numpy()

# # Display the image using Matplotlib
# plt.imshow(img_np)
# plt.axis('off')  # Remove axis ticks
# plt.show()


In [None]:
# img.numpy().max() -- FOR CHECKING

## 3.3 Create Labelled Dataset

In [None]:
# (anchor, positive) => 1,1,1,1,1
# (anchor, negative) => 0,0,0,0,0

In [None]:
# len(negative)

In [None]:
# def preprocess(file_path):

#     # Read in image from file path
#     byte_img = tf.io.read_file(file_path)
#     # Load in the image
#     img = tf.io.decode_jpeg(byte_img)

#     # Preprocessing steps - resizing the image to be 100x100x3
#     img = tf.image.resize(img, (100,100))
#     # Scale image to be between 0 and 1
#     img = img / 255.0

#     # Return image
#     return img

In [None]:
# # def process(file_path):

# #     # Read in image from file path
# #     byte_img = tf.io.read_file(file_path)
# #     # Load in the image
# #     img = tf.io.decode_jpeg(byte_img)

# #     return img

# def preprocess_twin(input_img, validation_img, label):
#     return(preprocess(input_img), preprocess(validation_img), label)

In [None]:
# # Load and preprocess the anchor, positive, and negative images
# anchor_images = [preprocess(path) for path in anchor]
# positive_images = [preprocess(path) for path in positive]
# negative_images = [preprocess(path) for path in negative]

# # Convert the image lists to tensors
# anchor = tf.data.Dataset.from_tensor_slices(anchor_images)
# positive = tf.data.Dataset.from_tensor_slices(positive_images)
# negative = tf.data.Dataset.from_tensor_slices(negative_images)

# # Combine the datasets
# positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(positive)))))
# negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(negative)))))
# data = positives.concatenate(negatives)

# # Continue with the rest of the code for creating the Siamese network and training



In [None]:
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(positive)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(negative)))))
data = positives.concatenate(negatives)

In [None]:
# positives

In [None]:
# num_elements_to_show = 5  # Specify the number of elements to show
# positive_samples = positives.take(num_elements_to_show)

# for data_point in positive_samples:
#     print(data_point)


In [None]:
# samples = data.as_numpy_iterator()

In [None]:
# examples = samples.next()

## 3.4 Build Train and Test Partition

In [None]:
def preprocess_twin(input_img, validation_img, label):
    return(preprocess(input_img), preprocess(validation_img), label)

In [None]:
# res = preprocess_twin(*examples)

In [None]:
# plt.imshow(res[1])

In [None]:
# Build dataloader pipeline
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size = 1024)

In [None]:
len(data)

5874

In [None]:
# REPLACED - NOT USED
# # Training partition
# train_data = data.take(round(len(data)*.7))
# # train_data = data.take(4)
# print(len(train_data))
# train_data = train_data.batch(16)
# print(len(train_data))
# train_data = train_data.prefetch(8)
# print(len(train_data))

In [None]:
# REPLACED - NOT USED
# # Testing partition
# test_data = data.skip(round(len(data)*.7))
# # test_data = data.skip(4)
# # test_data = test_data.take(round(len(data)*.3)) # use the dataset that has been skipped the amount of what was taken for train_data
# print(len(test_data))
# test_data = test_data.batch(16)
# print(len(test_data))
# test_data = test_data.prefetch(8)
# print(len(test_data))

In [None]:
# Split the data into training, validation, and testing partitions
train_size = round(len(data) * 0.7)
val_size = round(len(data) * 0.2)
test_size = len(data) - train_size - val_size

train_data = data.take(train_size)
val_data = data.skip(train_size).take(val_size)
test_data = data.skip(train_size + val_size)

In [None]:
print(len(train_data))
print(len(val_data))
print(len(test_data))

4112
1175
587


In [None]:
# train_data.element_spec

In [None]:
batch_size = 16
train_data = train_data.batch(batch_size).prefetch(tf.data.AUTOTUNE) # add , drop_remainder=True after batch_size if want to drop remainder
val_data = val_data.batch(batch_size).prefetch(tf.data.AUTOTUNE)
test_data = test_data.batch(batch_size).prefetch(tf.data.AUTOTUNE)

In [None]:
len(test_data)

37

In [None]:
                                                                                                                                      # # Convert tf.data.Dataset to NumPy arrays
# train_data_np = np.array(list(train_data.as_numpy_iterator()))
# val_data_np = np.array(list(val_data.as_numpy_iterator()))

In [None]:
# train_samples = train_data.as_numpy_iterator()

In [None]:
# examples = train_samples.next()

In [None]:
# plt.imshow(examples[0])

In [None]:
# test_data_samples = test_data.as_numpy_iterator()
# examples = train_samples.next()


In [None]:
# plt.imshow(examples[1])

# 4. Model Engineering

In [None]:
def l1_reg(weight_matrix):
  return tf.keras.regularizers.l1(l=0.01)

In [None]:
def l2_reg(weight_matrix):
  return tf.keras.regularizers.l2(l=0.01)

In [None]:
def make_embedding():
    inp = Input(shape=(100,100,3), name='input_image')

    # First block
    c1 = Conv2D(32, (3,3), activation='relu', kernel_regularizer=l2_reg)(inp)
    m1 = MaxPooling2D(pool_size=(2,2))(c1)
    b1 = BatchNormalization()(m1)
    d1 = Dropout(0.5)(b1)

    # Second block
    c2 = Conv2D(64, (3,3), activation='relu', kernel_regularizer=l2_reg)(d1)
    b2 = BatchNormalization()(c2)
    d2 = Dropout(0.5)(b2)

    # Final embedding block
    f1 = Flatten()(d2)
    d1 = Dense(1024, activation='sigmoid', kernel_regularizer=l2_reg)(f1)

    return Model(inputs=[inp], outputs=[d1], name='embedding')

In [None]:
def contrastive_loss(y_true, y_pred):
    margin = 1
    square_pred = tf.square(y_pred)
    margin_square = tf.square(tf.maximum(margin - y_pred, 0))
    return tf.reduce_mean(y_true * square_pred + (1 - y_true) * margin_square)

In [None]:
# def contrastive_loss(left_feature, right_feature, label, margin):
#     #   Compute the contrastive loss as in


#     # L = 0.5 * Y * D^2 + 0.5 * (Y-1) * {max(0, margin - D)}^2

#     # **Parameters**
#     #  left_feature: First element of the pair ->
#     #  right_feature: Second element of the pair
#     #  label: Label of the pair (0 or 1)
#     #  margin: Contrastive margin

#     # **Returns**
#     #  Return the loss operation
#     # Calculate loss for similar pairs (y_true == 0)

#     positive_loss = (1 - y_true) * tf.square(distances)

#     # Calculate loss for dissimilar pairs (y_true == 1)
#     negative_loss = y_true * tf.square(tf.maximum(margin - distances, 0))

#     # Combine positive and negative losses
#     loss = positive_loss + negative_loss

#     # Calculate the mean loss over the batch
#     mean_loss = tf.reduce_mean(loss)

#     return mean_loss


In [None]:
# def compute_euclidean_distance(x, y):
#     """
#     Computes the euclidean distance between two tensorflow variables
#     """

#     d = tf.reduce_sum(tf.square(tf.sub(x, y)),1)
#     return d

In [None]:
# def compute_contrastive_loss(left_feature, right_feature, label, margin):

#     """
#     Compute the contrastive loss as in


#     L = 0.5 * Y * D^2 + 0.5 * (Y-1) * {max(0, margin - D)}^2

#     **Parameters**
#      left_feature: First element of the pair
#      right_feature: Second element of the pair
#      label: Label of the pair (0 or 1)
#      margin: Contrastive margin

#     **Returns**
#      Return the loss operation

#     """

#     label = tf.to_float(label)
#     one = tf.constant(1.0)

#     d = compute_euclidean_distance(left_feature, right_feature)
#     d_sqrt = tf.sqrt(compute_euclidean_distance(left_feature, right_feature))
#     first_part = tf.mul(one-label, d)# (Y-1)*(d)

#     max_part = tf.square(tf.maximum(margin-d_sqrt, 0))
#     second_part = tf.mul(label, max_part)  # (Y) * max(margin - d, 0)

#     loss = 0.5 * tf.reduce_mean(first_part + second_part)

#     return loss

In [None]:
# def triplet_loss(y_true, y_pred, alpha=0.2):
#     anchor, positive, negative = y_pred[0], y_pred[1], y_pred[2]

#     # Compute squared distances
#     pos_dist = tf.reduce_sum(tf.square(anchor - positive), axis=-1)
#     neg_dist = tf.reduce_sum(tf.square(anchor - negative), axis=-1)

#     # Compute triplet loss
#     basic_loss = pos_dist - neg_dist + alpha
#     loss = tf.reduce_mean(tf.maximum(basic_loss, 0.0), axis=0)

#     return loss

In [None]:
embedding = make_embedding()

In [None]:
embedding.summary()

Model: "embedding"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_image (InputLayer)    [(None, 100, 100, 3)]     0         
                                                                 
 conv2d (Conv2D)             (None, 98, 98, 32)        896       
                                                                 
 max_pooling2d (MaxPooling2D  (None, 49, 49, 32)       0         
 )                                                               
                                                                 
 batch_normalization (BatchN  (None, 49, 49, 32)       128       
 ormalization)                                                   
                                                                 
 dropout (Dropout)           (None, 49, 49, 32)        0         
                                                                 
 conv2d_1 (Conv2D)           (None, 47, 47, 64)        18

In [None]:
# Siamese L1 Distance class -- possible to change to equillibrium distance
class L1Dist(Layer):

    # Init method - inheritance
    def __init__(self, **kwargs):
        super().__init__()

    # Magic happens here - similarity calculation
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

In [None]:
l1 = L1Dist()

In [None]:
def make_siamese_model():

  # Anchor image input in the network
  input_image = Input(name='input_img', shape=(100,100,3))

  # Validation image in the network
  validation_image = Input(name='validation_img', shape=(100,100,3))

  # Combine siamese distance components
  siamese_layer = L1Dist()
  siamese_layer._name = 'distance'
  distances = siamese_layer(embedding(input_image), embedding(validation_image))

  # Classification layer
  classifier = Dense(1, activation='sigmoid')(distances)

  return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')

In [None]:
siamese_model = make_siamese_model()
siamese_model.compile(loss = contrastive_loss, optimizer=tf.keras.optimizers.Adam(1e-4))

In [None]:
siamese_model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_img (InputLayer)         [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 validation_img (InputLayer)    [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 embedding (Functional)         (None, 1024)         144789824   ['input_img[0][0]',              
                                                                  'validation_img[0][

In [None]:
# # Load pre-trained MobileNetV2 model without top (classification) layers
# base_model = MobileNetV2(include_top=False, weights='imagenet', input_shape=(100, 100, 3))

# # Freeze the pre-trained layers
# for layer in base_model.layers:
#     layer.trainable = False

# # Siamese network architecture on top of the pre-trained model
# input_a = Input(shape=(100, 100, 3))
# input_b = Input(shape=(100, 100, 3))

# embedding_a = base_model(input_a)
# embedding_b = base_model(input_b)

In [None]:
# # Create the Siamese network model
# siamese_model = make_siamese_model()

# # Compile and train the Siamese network
# siamese_model.compile(optimizer=tf.keras.optimizers.Adam(1e-4), loss=contrastive_loss)

# 5. Training

## 5.1 Setup Loss and Optimizer

In [None]:
binary_cross_loss = tf.losses.BinaryCrossentropy()

In [None]:
opt = tf.keras.optimizers.Adam(1e-4)

## 5.2 Build Train Step Function

In [None]:
@tf.function
def train_step(batch):
    # Record all of our operations
    with tf.GradientTape() as tape:
        # Get anchor and positive/negative image
        X = batch[:2]
        # Get label
        y = batch[2]

        # Forward pass
        yhat = siamese_model(X, training=True)
        # Calculate loss
        loss = contrastive_loss(y, yhat)

        # # Forward pass
        # anchor_embedding = embedding(X[0])  # Embedding of anchor image
        # positive_embedding = embedding(X[1])  # Embedding of positive image

        # # Calculate Euclidean distance (L2 norm) between embeddings
        # D = tf.norm(anchor_embedding - positive_embedding, axis=-1)  # Euclidean distance

        # # Calculate loss
        # loss = contrastive_loss(y, D)

    # Calculate gradients
    grad = tape.gradient(loss, siamese_model.trainable_variables)

    # Calculate updated weights and apply to siamese model
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))

    # Return loss
    return loss

## 5.3 Build Training Loop

In [None]:
# # Training function with validation
# def train_with_validation(train_data, val_data, EPOCHS):
#     train_losses = []  # To store training losses
#     val_losses = []    # To store validation losses

#     for epoch in range(1, EPOCHS+1):
#         print(f'\nEpoch {epoch}/{EPOCHS}')
#         progbar = tf.keras.utils.Progbar(len(train_data))
#         batches = 0  # To track the number of batches processed during training

#         # Training loop
#         for idx, batch in enumerate(train_data):
#             loss = train_step(batch).numpy()
#             train_losses.append(loss)
#             progbar.update(idx + 1)
#             batches += 1

#         # Validation loop
#         val_loss = 0.0
#         for idx, batch in enumerate(val_data):
#             val_loss += train_step(batch).numpy()
#         val_loss /= len(val_data)
#         val_losses.extend([val_loss] * batches)  # Extend the list to match train_losses

#         # Print training and validation losses
#         print(f'\nTraining Loss: {train_losses[-1]}, Validation Loss: {val_losses[-1]}')

#     # Plot the training and validation losses
#     plt.plot(range(1, len(train_losses)+1), train_losses, label='Training Loss')
#     plt.plot(range(1, len(val_losses)+1), val_losses, label='Validation Loss')
#     plt.xlabel('Training Steps')
#     plt.ylabel('Loss')
#     plt.legend()
#     plt.show()


In [None]:
def train_with_validation(train_data, val_data, EPOCHS):
    train_losses = []  # To store training losses
    val_losses = []    # To store validation losses

    for epoch in range(1, EPOCHS+1):
        print(f'\nEpoch {epoch}/{EPOCHS}')
        progbar = tf.keras.utils.Progbar(len(train_data))
        batches = 0  # To track the number of batches processed during training

        # Training loop
        for idx, batch in enumerate(train_data):
            loss = train_step(batch).numpy()
            train_losses.append(loss)
            progbar.update(idx + 1)
            batches += 1

        # Validation loop
        val_loss_batchwise = []
        for idx, batch in enumerate(val_data):
            val_loss_batchwise.append(train_step(batch).numpy())
        val_loss = np.mean(val_loss_batchwise)
        val_losses.extend([val_loss] * batches)  # Extend the list to match train_losses

        # Print training and validation losses
        print(f'\nTraining Loss: {train_losses[-1]}, Validation Loss: {val_loss}')

    # Plot the training and validation losses
    plt.plot(range(1, len(train_losses)+1), train_losses, label='Training Loss')
    plt.plot(range(1, len(val_losses)+1), val_losses, label='Validation Loss')
    plt.xlabel('Training Steps')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()


## 5.4 Train the model

In [None]:
EPOCHS = 50

In [None]:
train_with_validation(train_data, val_data, EPOCHS)


Epoch 1/50

Training Loss: 0.16150307655334473, Validation Loss: 0.07736042141914368

Epoch 2/50

Training Loss: 0.1604180932044983, Validation Loss: 0.07295187562704086

Epoch 3/50

Training Loss: 0.20453064143657684, Validation Loss: 0.0780431479215622

Epoch 4/50

Training Loss: 0.07063174247741699, Validation Loss: 0.06898830831050873

Epoch 5/50

Training Loss: 0.07241958379745483, Validation Loss: 0.06774327158927917

Epoch 6/50

Training Loss: 0.12934933602809906, Validation Loss: 0.06872374564409256

Epoch 7/50

Training Loss: 0.20970626175403595, Validation Loss: 0.07593211531639099

Epoch 8/50

Training Loss: 0.06829541176557541, Validation Loss: 0.0748375803232193

Epoch 9/50

Training Loss: 0.07441934198141098, Validation Loss: 0.07636623084545135

Epoch 10/50

Training Loss: 0.06843581795692444, Validation Loss: 0.07668355107307434

Epoch 11/50

Training Loss: 0.1666671633720398, Validation Loss: 0.07656723260879517

Epoch 12/50

Training Loss: 0.031001422554254532, Valid

In [None]:
 # # Call the training function with early stopping
# train_with_early_stopping(train_data, val_data, EPOCHS=5, patience=2)

# 6. Evaluate Model

## 6.1 Import Metrics

In [None]:
# import metric calculations
from tensorflow.keras.metrics import Precision, Recall

## 6.2 Make Predictions

In [None]:
# get entire test data

# Initialize empty lists
test_input_list = []
test_val_list = []
y_true_list = []
# Iterate over the test_data and collect the data

for test_input, test_val, y_true in test_data.as_numpy_iterator():
    test_input_list.append(test_input)
    test_val_list.append(test_val)
    y_true_list.append(y_true)

# Concatenate the collected data into numpy arrays
test_input = np.concatenate(test_input_list)
test_val = np.concatenate(test_val_list)
y_true = np.concatenate(y_true_list)

In [None]:
# Make predictions
y_hat = siamese_model.predict([test_input, test_val])

In [None]:
# Convert predictions to binary values
y_pred = np.where(y_hat > 0.5, 1, 0)
y = np.column_stack((y_pred, y_true))
# np.set_printoptions(threshold=np.inf)
# print(y)

## 6.3 Calculate Metrics

In [None]:
# Recall (Sensitivity) = TP / TP + FN
# Percentage of positive samples correctly predicted (over what is really positive)

# Creating a metric object
m = Recall()

# Calculating the recall value
m.update_state(y_true, y_pred)

# Return Recall result
m.result().numpy()

In [None]:
# Precision = TP / TP + FP
# Percentage of samples that were properly labeled positives (over positive labelled samples)
# Creating a metric object
m = Precision()

# Calculating the precision value
m.update_state(y_true, y_pred)

# Return Precision result
m.result().numpy()

In [None]:
# Accuracy = TP + TN / All
from sklearn.metrics import accuracy_score

# Compute the accuracy score
accuracy = accuracy_score(y_true, y_pred)

print("Accuracy:", accuracy)


In [None]:
# # Confusion Matrix
from sklearn.metrics import confusion_matrix

cm = confusion_matrix(y_true, y_pred)
tn, fp, fn, tp = cm.ravel()
print("True Negatives: ", tn, "False Positives: ", fp, "\nFalse Negatives: ", fn, "True Positives: ", tp)


## Evaluate on validation data

In [None]:
val_input_list = []
val_val_list = []
y_val_true_list = []


In [None]:
for val_input, val_val, y_val_true in val_data.as_numpy_iterator():
    val_input_list.append(val_input)
    val_val_list.append(val_val)
    y_val_true_list.append(y_val_true)


In [None]:
val_input = np.concatenate(val_input_list)
val_val = np.concatenate(val_val_list)
y_val_true = np.concatenate(y_val_true_list)


In [None]:
y_val_hat = siamese_model.predict([val_input, val_val])


In [None]:
y_val_pred = np.where(y_val_hat > 0.5, 1, 0)


In [None]:
from tensorflow.keras.metrics import Recall, Precision
from sklearn.metrics import accuracy_score, confusion_matrix

# ... [earlier code for preparing val_data and making predictions]

# Convert predictions to binary values (if you haven't already)
y_val_pred = np.where(y_val_hat > 0.5, 1, 0)

# Recall
m_recall = Recall()
m_recall.update_state(y_val_true, y_val_pred)
recall = m_recall.result().numpy()
print("Recall:", recall)

# Precision
m_precision = Precision()
m_precision.update_state(y_val_true, y_val_pred)
precision = m_precision.result().numpy()
print("Precision:", precision)

# Accuracy
accuracy = accuracy_score(y_val_true, y_val_pred)
print("Accuracy:", accuracy)

# Confusion Matrix
cm = confusion_matrix(y_val_true, y_val_pred)
tn, fp, fn, tp = cm.ravel()
print("True Negatives: ", tn)
print("False Positives: ", fp)
print("False Negatives: ", fn)
print("True Positives: ", tp)


## 6.4 Visualize Results

In [None]:
# Set plot size
plt.figure(figsize=(18,8))

# Set index
# index = 15
index = random.randint(0,400)
print("Index: ", index)

# Set first subplot
plt.subplot(1,2,1)
plt.imshow(test_input[index])

# Set second subplot
plt.subplot(1,2,2)
plt.imshow(test_val[index])

# Renders cleanly
plt.show()

if (y_pred[index] == 1):
    print("Same Person")
else:
    print("Different Persons")

print(y_hat[index])

# 7. Save Model

In [None]:
# # Save weights
siamese_model.save('drive/MyDrive/h5/siamesemodelFT_contrastive_full_L2.h5')

<h1> 8. Real Time Test <h1>


<h2> 8.1 Verification Function <h2>

In [None]:
import os
import numpy as np
import tensorflow as tf
def preprocess(file_path):

    # Read in image from file path
    byte_img = tf.io.read_file(file_path)
    # Load in the image
    img = tf.io.decode_jpeg(byte_img)

    # Preprocessing steps - resizing the image to be 100x100x3
    img = tf.image.resize(img, (100,100))
    # Scale image to be between 0 and 1
    img = img / 255.0
#
    # Return image
    return img

def plot_results_distribution(results):
    # Flatten the results array
    flat_results = np.array(results).flatten()

    # Plot histogram
    plt.hist(flat_results, bins=20, color='blue', alpha=0.7)
    plt.xlabel('Prediction Scores')
    plt.ylabel('Frequency')
    plt.title('Distribution of Prediction Scores')
    plt.show()

def verify(model, detection_threshold, verification_threshold):
    input_images_folder = os.path.join('drive', 'MyDrive', 'FaceID', 'application_data', 'input_images')
    verification_images_folder = os.path.join('drive', 'MyDrive', 'FaceID', 'application_data', 'verification_images')

    input_images = [os.path.join(input_images_folder, image) for image in os.listdir(input_images_folder)]
    verification_images = [os.path.join(verification_images_folder, image) for image in os.listdir(verification_images_folder)]

    results = []
    for input_img_path in input_images:
        input_img = preprocess(input_img_path)
        for verification_img_path in verification_images:
            validation_img = preprocess(verification_img_path)


            result = model.predict([np.expand_dims(input_img, axis=0), np.expand_dims(validation_img, axis=0)])
            # print(result)
            results.append(result)

            input_img_np = input_img.numpy()
            validation_img_np = validation_img.numpy()

            # # Set first subplot
            # plt.subplot(1,2,1)
            # plt.imshow(input_img_np)

            # # Set second subplot
            # plt.subplot(1,2,2)
            # plt.imshow(validation_img_np)

            # # Renders cleanly
            # plt.show()


    max = np.max(np.array(results))
    print("Max. prediction = ", max)
    min = np.min(np.array(results))
    print("Min. prediction = ", min)

    # Calculate the median
    median = np.median(np.array(results))

    # Calculate the mean
    mean = np.mean(np.array(results))

    print("Median:", median)
    print("Mean:", mean)

    detection = np.sum(np.array(results) > detection_threshold)
    print("No. of results above threshold: ", detection)

    undetection = np.sum(np.array(results) <= detection_threshold)
    print("No. of results below threshold: ", undetection)


    total_verification_pairs = len(input_images) * len(verification_images)
    print("No. of total verification pairs: ", total_verification_pairs)
    verification = detection / total_verification_pairs
    print("Verification", verification)
    verified = verification > verification_threshold
    return results, verified

# Example usage
detection_threshold = 0.5
verification_threshold = 0.8
model = tf.keras.models.load_model('drive/MyDrive/h5/siamesemodelFT_contrastive_full_L2.h5', custom_objects={'contrastive_loss': contrastive_loss, 'L1Dist':L1Dist, 'l2_reg': l2_reg})

results, verified = verify(model, detection_threshold, verification_threshold)
print("Results:", results)
print("Verified:", verified)

# Plot the distribution of prediction scores
plot_results_distribution(results)
