In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Input, Flatten, Dense, Dropout, Lambda, BatchNormalization
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import backend as K
from tensorflow.keras import layers
import pandas as pd
import os
from PIL import Image
import random
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import csv
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.utils import load_img, img_to_array
from keras_vggface import utils



In [None]:
global left
global right

In [None]:
left = '/mnt/data/left_new/left'
right = '/mnt/data/right_new/right'


In [None]:
#enabling gpu
physical_devices = tf.config.list_physical_devices('GPU')
print(physical_devices, physical_devices[0])
if len(physical_devices) > 0:
    tf.config.experimental.set_memory_growth(physical_devices[0], True)

In [None]:
#Creating positive and negative pairs from dataset

def random_element_except(arr, exclude):
    return random.choice([element for element in arr if element != exclude])

def load_celeb_dataset(left, right):
    left_images = os.listdir(left)
    right_images = os.listdir(right)
    counter = 0
    labels = []
    pairs = []
    for ele in right_images:
        if ele in left_images:
            pairs.append([ele, ele])
            labels.append(1)
            random_image = random_element_except(left_images, ele)
            counter+=1
#             print(counter)
            pairs.append([ele, random_image])
            labels.append(0)
    return np.array(pairs), np.array(labels).astype("float32")
pairs, labels = load_celeb_dataset(left, right)
print("done")

In [None]:
x_train_val, x_test, y_train_val, y_test = train_test_split(
        pairs, labels, test_size=0.2, stratify=labels, random_state=42)

In [None]:
# Keep 50% of train_val  in validation set
n = int(len(x_train_val) /2)

x_train, x_val = x_train_val[:n], x_train_val[n:]
y_train, y_val = y_train_val[:n], y_train_val[n:]
del x_train_val, y_train_val

In [None]:
x_train.shape, y_train.shape, x_test.shape, y_test.shape, x_val.shape, y_val.shape

In [None]:
df_train = pd.DataFrame({"image1": x_train[:, 0], "image2": x_train[:, 1], "label": y_train})
df_test = pd.DataFrame({"image1": x_test[:, 0], "image2": x_test[:, 1], "label": y_test})
df_valid = pd.DataFrame({"image1": x_val[:, 0], "image2": x_val[:, 1], "label": y_val})

In [None]:
#storing pair and label in csv

df_train.to_csv("../csv/training.csv", header=True, index=False)
df_test.to_csv("../csv/testing.csv", header=True, index=False)
df_valid.to_csv("../csv/validation.csv", header=True, index=False)

In [None]:
# Set your image dimensions
image_width = 224
image_height = 224

# Set batch size
batch_size = 32

# Read the CSV file into a DataFrame

def data_generator(csv_file):
    data = pd.read_csv(csv_file)

    while True:
        # Shuffle the data at the beginning of each epoch
        data = data.sample(frac=1).reset_index(drop=True)

        # Initialize empty arrays for the image pairs and labels
        x1 = np.zeros((batch_size, image_width, image_height, 3))
        x2 = np.zeros((batch_size, image_width, image_height, 3))
        y = np.zeros(batch_size)

        for i in range(batch_size):
            # Get the image paths and label for the current batch index
            image1_path = data['image1'].iloc[i]
            image2_path = data['image2'].iloc[i]
            label = data['label'].iloc[i]

            # Load and preprocess the first image
            image1 = load_img(right + "/" + image1_path, target_size=(image_width, image_height))
            image1 = img_to_array(image1)
            image1 = utils.preprocess_input(image1, version=1) # or version=2

            # Load and preprocess the second image
            image2 = load_img(left + "/" + image2_path, target_size=(image_width, image_height))
            image2 = img_to_array(image2)
            image2 = utils.preprocess_input(image2, version=1) # or version=2

            # Assign the images and label to the batch arrays
            x1[i] = image1
            x2[i] = image2
            y[i] = label

        # Yield the image pairs and labels
        yield [x1, x2], y




In [None]:
train_generator = data_generator("../csv/training.csv")
test_generator = data_generator("../csv/testing.csv")
validation_generator = data_generator("../csv/validation.csv")

In [None]:
def euclidean_distance(vects):
    x, y = vects
    sum_square = tf.math.reduce_sum(tf.math.square(x - y), axis=1, keepdims=True)
    return tf.math.sqrt(tf.math.maximum(sum_square, tf.keras.backend.epsilon()))


In [None]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Flatten, Dense, BatchNormalization, Lambda, Dropout
from keras_vggface.vggface import VGGFace

# Load the VGGFace model without the top (fully connected) layers
base_model = VGGFace(model='vgg16', include_top=False, input_shape=(224, 224, 3))

# Freeze the layers in the base model to prevent them from being trained
for layer in base_model.layers:
    layer.trainable = False

# Add additional layers on top of the VGGFace base model
flatten = Flatten()(base_model.output)
dense1 = Dense(512, activation="relu")(flatten)
dense1 = Dropout(0.3)(dense1)
dense1 = BatchNormalization()(dense1)
dense2 = Dense(256, activation="relu")(dense1)
dense2 = Dropout(0.3)(dense2)
dense2 = BatchNormalization()(dense2)
output = Dense(256)(dense2)

# Create the embedding network with the VGGFace base model and additional layers
embedding_network = Model(inputs=base_model.input, outputs=output)

input_1 = Input((224, 224, 3))
input_2 = Input((224, 224, 3))

# As mentioned above, the Siamese Network shares weights between
# tower networks (sister networks). To allow this, we will use
# the same embedding network for both tower networks.
tower_1 = embedding_network(input_1)
tower_2 = embedding_network(input_2)
merge_layer = Lambda(euclidean_distance)([tower_1, tower_2])
normal_layer = BatchNormalization()(merge_layer)
normal_layer = Dropout(0.3)(normal_layer)
output_layer = Dense(1, activation="sigmoid")(normal_layer)
siamese = Model(inputs=[input_1, input_2], outputs=output_layer)


In [None]:
def loss(margin=1):
    def contrastive_loss(y_true, y_pred):
        square_pred = tf.math.square(y_pred)
        margin_square = tf.math.square(tf.math.maximum(margin - (y_pred), 0))
        return tf.math.reduce_mean(
            (1 - y_true) * square_pred + (y_true) * margin_square
        )

    return contrastive_loss

In [None]:
adam = Adam(learning_rate=1e-4)

siamese.compile(loss=loss(margin=1), optimizer=adam, metrics=["accuracy"])
siamese.summary()

In [None]:
df_train = pd.read_csv("/../csv/training.csv")
df_test = pd.read_csv("../csv/testing.csv")
df_validation = pd.read_csv("../csv/validation.csv")


In [None]:
# tf.keras.backend.clear_session()
# tf.random.set_seed(101)
# np.random.seed(101)

es = EarlyStopping(monitor='val_loss', mode='min', patience=10)
ck = ModelCheckpoint("../Backend/models/model.h5", monitor='val_loss', verbose=1, save_best_only=True, mode='min')


history = siamese.fit(
    train_generator,
    steps_per_epoch=len(df_train) // batch_size,
    epochs=100,
    validation_data=validation_generator,
    validation_steps=len(df_validation) // batch_size,
    callbacks=[es, ck]
)