In [None]:
import cv2
import os
import uuid
import random
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow import keras
from keras import layers
from keras.models import Model, Sequential, load_model
from keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten, Dropout
from keras.layers import Input, Convolution2D, ZeroPadding2D, MaxPool2D, Flatten, Activation, Dense, Dropout
from keras.metrics import Precision, Recall
from keras.callbacks import EarlyStopping
from keras.preprocessing.image import ImageDataGenerator


model_path = 'model/siamesemodel_xhlayer_224_dataaugmented_epoch40_299samples_b16.h5'

BATCH_SIZE = 16
EPOCH = 70

anchor_dir = 'train_data/anchor'
positive_dir = 'train_data/positive'
negative_dir = 'train_data_yx/negative_lfw'


# Load negative data into Tensorflow dataset
def load_data(filepath, folder_no):
    directories = os.listdir(filepath)
    all_data = []

    for directory in directories:
        current_path = os.path.join(filepath, directory)
        load_data = tf.data.Dataset.list_files(current_path + '/*.*').take(folder_no)
        dir_iterator = load_data.as_numpy_iterator()
        folder_data = [dir_iterator.next() for _ in range(len(load_data))]
        all_data.extend(folder_data)

        # Generate tensorflow dataset
        load_data = tf.data.Dataset.from_tensor_slices(all_data)

    return load_data


# Image preprocessing
def preprocessing(filepath):
    byte_img = tf.io.read_file(filepath)
    img = tf.io.decode_jpeg(byte_img)
    img = tf.image.resize(img, (224, 224))
    img = img / 255.0
    return img

# Function to preprocess image for twin pairs
def preprocess_twin(input_img, validation_img, label):
    input_img = preprocessing(input_img)
    validation_img = preprocessing(validation_img)
    return (input_img, validation_img, label)


data_augmentation_layer = tf.keras.Sequential([
    layers.RandomFlip("horizontal"),
    layers.RandomRotation(0.05),
    layers.Lambda(lambda x: tf.image.adjust_brightness(x, tf.random.uniform(shape=(), minval=0.05, maxval=0.15))),
    layers.Rescaling(scale=0.8)
])

# Create a data augmentation layer
def random_augmentation(x):
    augmented_img = tf.cond(
        tf.greater(tf.random.uniform(shape=(), minval=0, maxval=1), 0.5),
        lambda: data_augmentation_layer(x),
        lambda: x
        )

    return augmented_img


# Categorise train dataset with augmentation
def create_train_dataset(data, batch=16, prefetch=8):
    train_data = data.take(round(len(data) * 0.7))
    train_data = train_data.cache()
    train_data = train_data.shuffle(buffer_size=len(train_data))
    train_data = train_data.map(lambda x, y, z: (random_augmentation(x), random_augmentation(y), z))
    train_data = train_data.batch(batch)
    train_data = train_data.prefetch(prefetch)

    return train_data


# Categorise test dataset
def create_test_dataset(data, batch=16, prefetch=8):
    test_data=data.skip(round(len(data) * 0.7))
    test_data=test_data.take(round(len(data)*0.3))
    test_data=test_data.batch(batch)
    test_data=test_data.prefetch(prefetch)

    return test_data


# Display augmented image
def compare_augmented_images(augmented_samples):
    for i, (input_img, validation_img, label) in enumerate(augmented_samples):
        plt.figure(figsize=(15, 5))
        plt.subplot(1, 2, 1)
        plt.imshow(input_img[0])
        plt.title(f'Input Image - Label: {label.numpy()}')
        plt.axis('off')

        plt.subplot(1, 2, 2)
        plt.imshow(validation_img[0])
        plt.title(f'Validation Image - Label: {label.numpy()}')
        plt.axis('off')
        plt.show()


# Caluclate L1 (Manhattan) Distance
class L1Dist(Layer):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)


# Define CNN for embedding using VGGFace
def embedding_model():
    inp = Input(shape=(224, 224, 3))
    c1 = Conv2D(64, (3, 3), activation='relu')(inp)
    m1 = MaxPool2D((2, 2), padding='same')(c1)
    c2 = Conv2D(128, (3, 3), activation='relu')(m1)
    m2 = MaxPool2D((2, 2), padding='same')(c2)
    c3 = Conv2D(256, (3, 3), activation='relu')(m2)
    m3 = MaxPool2D((2, 2), padding='same')(c3)
    c4 = Conv2D(512, (3, 3), activation='relu')(m3)
    m4 = MaxPool2D((2, 2), padding='same')(c4)
    c5 = Conv2D(512, (3, 3), activation='relu')(m4)
    m5 = MaxPool2D((2, 2), padding='same')(c5)

    f1 = Flatten()(m5)
    d1 = Dense(4096, activation='sigmoid')(f1)
    return Model(inputs=[inp], outputs=[d1], name='embedding')

    return model



# Compare similarity using two pipelines
def siamese_model(embedding):
    # define two input tensors
    input_image = Input(name='input_img', shape=(224, 224, 3))
    validation_image = Input(name='validation_img', shape=(224, 224, 3))

    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(input_image), embedding(validation_image))

    # Apply dense layer with single neuron for similarity classifier
    # Sigmoid to compress single input to 0 and 1 as output
    classifier = Dense(1, activation='sigmoid')(distances)

    siamese_model = keras.Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNeuralNetwork')
    return siamese_model


@tf.function
# Execute single training step for Siamese Neural Network
def train_step(batch, siamese_model, optimizer, loss_fn):
    with tf.GradientTape() as tape:
        x = batch[:2]       # Input 1 and 2
        y = batch[2]        # Label
        yhat = siamese_model(x, training=True)
        loss = loss_fn(y, yhat)

    # Compute loss gradient for variables optimization
    grad = tape.gradient(loss, siamese_model.trainable_variables)
    optimizer.apply_gradients(zip(grad, siamese_model.trainable_variables))

    return loss


# Define EarlyStopping callback
early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

# Train the Siamese Neural Network
def train_model(siamese_model, train_data, test_data, optimizer, loss_fn, EPOCHS, early_stopping=None):
    losses = []
    accuracies = []
    val_losses = []
    val_accuracies = []

    callbacks = [early_stopping] if early_stopping else None

    for epoch in range(1, EPOCHS + 1):
        print(f'\nEpoch {epoch}/{EPOCHS}')
        progbar1 = tf.keras.utils.Progbar(len(train_data))
        progbar2 = tf.keras.utils.Progbar(len(test_data))

        epoch_losses = []
        epoch_accuracies = []
        for idx1, batch in enumerate(train_data):
            loss = train_step(batch, siamese_model, optimizer, loss_fn)
            y_true = batch[2].numpy()
            y_pred = siamese_model.predict(batch[:2])
            y_pred = (y_pred > 0.5).astype(int)
            accuracy = np.mean(y_true == y_pred.flatten())

            epoch_losses.append(loss.numpy())
            epoch_accuracies.append(accuracy)

        val_epoch_losses = []
        val_epoch_accuracies = []
        for idx2, val_batch in enumerate(test_data):
            val_loss = loss_fn(siamese_model(val_batch[:2], training=False), val_batch[2])
            y_true_val = val_batch[2].numpy()
            y_pred_val = siamese_model.predict(val_batch[:2])
            y_pred_val = (y_pred_val > 0.5).astype(int)
            val_accuracy = np.mean(y_true_val == y_pred_val.flatten())

            val_epoch_losses.append(val_loss.numpy())
            val_epoch_accuracies.append(val_accuracy)

        avg_loss = np.mean(epoch_losses)
        avg_accuracy = np.mean(epoch_accuracies)
        print(f'\nEpoch {epoch} - Average Training Loss: {avg_loss:.4f} - Average Training Accuracy: {avg_accuracy:.4f}')
        losses.append(avg_loss)
        accuracies.append(avg_accuracy)

        avg_val_loss = np.mean(val_epoch_losses)
        avg_val_accuracy = np.mean(val_epoch_accuracies)
        print(f'Epoch {epoch} - Average Validation Loss: {avg_val_loss:.4f} - Average Validation Accuracy: {avg_val_accuracy:.4f}')
        val_losses.append(avg_val_loss)
        val_accuracies.append(avg_val_accuracy)

        # Check if early stopping condition is met
        if early_stopping and early_stopping.stopped_epoch > 0:
            print(f'Early stopping triggered. Restoring model weights from epoch {early_stopping.stopped_epoch}.')
            break

        progbar1.update(idx1 + 1)
        progbar2.update(idx2 + 1)

    # Plot the loss, accuracy, validation loss, and validation accuracy upon training completion
    plt.figure(figsize=(12, 6))

    plt.subplot(1, 2, 1)
    plt.plot(range(1, len(losses) + 1), losses, label='Training Loss')
    plt.plot(range(1, len(val_losses) + 1), val_losses, label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(range(1, len(accuracies) + 1), accuracies, label='Training Accuracy')
    plt.plot(range(1, len(val_accuracies) + 1), val_accuracies, label='Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()

    plt.show()


# Evaluate the trained model
def evaluate_metrics(siamese_model, test_data):
    test_input, test_val, y_true = test_data.as_numpy_iterator().next()
    y_pred = siamese_model.predict([test_input, test_val])

    precision_metric = Precision()
    precision_metric.update_state(y_true, y_pred)
    precision_result = precision_metric.result().numpy()

    recall_metric = Recall()
    recall_metric.update_state(y_true, y_pred)
    recall_result = recall_metric.result().numpy()

    print('\nEvaluation Results:')
    print(f'Precision: {precision_result:.4f}')
    print(f'Recall: {recall_result:.4f}')


# Prepare tensorflow dataset for anchor, positive and negative
# Load datasets
pos_path = '/content/drive/MyDrive/Siamese_Neural_Network_Train/train_data/positive'
anchor_path = '/content/drive/MyDrive/Siamese_Neural_Network_Train/train_data/anchor'
# neg_path = os.path.join('train_data', 'negative')

# Initialize a list to store all data
all_data = []

# Loop over celebrity folders in positive directory
celeb_folders = os.listdir(pos_path)

print(celeb_folders)
for celeb_folder in celeb_folders:
    # print(celeb_folder)

    celeb_path = os.path.join(pos_path, celeb_folder)

    # Take positive samples from the current celebrity folder
    positive_samples = tf.data.Dataset.list_files(celeb_path + '/*.*').take(95)

    # Take anchor samples from the anchor folder (same person)
    anchor_samples = tf.data.Dataset.list_files(os.path.join(anchor_path, celeb_folder) + '/*.*').take(95)

    # Create pairs of anchor, positive, and label (1 for same person, 0 for different persons)
    positive_pairs = tf.data.Dataset.zip((anchor_samples, positive_samples, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor_samples)))))

    all_data.append(positive_pairs)

    # Take negative samples from other celebrity folders in positive_path (different person)
    other_celebs = [folder for folder in celeb_folders if folder != celeb_folder]
    # print(other_celebs)
    for otherpath in other_celebs:  # Repeat for each remaining celebrity folder
        # Randomly select a different celebrity folder for negative samples
        other_celeb_path = os.path.join(pos_path, otherpath)

        # Take positive samples from the other celebrity folder
        other_positive_samples = tf.data.Dataset.list_files(other_celeb_path + '/*.*',shuffle=True).take(5)

        # Use the same set of anchor images for the length of other_positive_samples
        additional_anchor_samples = anchor_samples.take(len(other_positive_samples))

        # Create pairs of anchor, negative, and label (0 for different persons)
        negative_pairs = tf.data.Dataset.zip((additional_anchor_samples, other_positive_samples, tf.data.Dataset.from_tensor_slices(tf.zeros(len(additional_anchor_samples)))))

        # Append positive and negative pairs to the list
        all_data.append(negative_pairs)

# Concatenate all datasets in the list to produce a single dataset
final_dataset = all_data[0]
for dataset in all_data[1:]:
    final_dataset = final_dataset.concatenate(dataset)

data=final_dataset
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=len(data))

train_data = create_train_dataset(data, BATCH_SIZE, 8)
test_data = create_test_dataset(data, BATCH_SIZE, 8)

for item in data.take(10):  # Take the first two indices
    anchor_img, positive_img, label = item

    # Convert images back to numpy array for plotting
    anchor_img = anchor_img.numpy()
    positive_img = positive_img.numpy()

    plt.figure(figsize=(8, 4))

    plt.subplot(1, 2, 1)
    plt.imshow(anchor_img)
    plt.title("Anchor Image")

    plt.subplot(1, 2, 2)
    plt.imshow(positive_img)
    if label == 1:
        plt.title("Positive Image")
    else:
        plt.title("Negative Image")
    plt.show()

print(f"Length of Total Data : {len(data)}")
print(f"Length of Train Data : {len(train_data)}")
print(f"Length of Test Data  : {len(test_data)}")

augmented_samples = train_data.take(20)
compare_augmented_images(augmented_samples)

embedding = embedding_model()
siamese_model = siamese_model(embedding)
binary_cross_loss = tf.keras.losses.BinaryCrossentropy()
opt = tf.keras.optimizers.Adam(1e-5)

checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)

train_model(siamese_model, train_data, test_data, opt, binary_cross_loss, EPOCH)

evaluate_metrics(siamese_model, test_data)
siamese_model.save(model_path)
