* Refrences for code of Siamese network: https://keras.io/examples/vision/siamese_network/
> Note: The code has been slightly modified before using in this project


In [None]:
import os
import cv2
import time
import random
import numpy as np

import tensorflow as tf
from tensorflow.keras.applications.inception_v3 import preprocess_input
from tensorflow.keras import backend, layers, metrics

from tensorflow.keras.optimizers import Adam
from tensorflow.keras.applications import Xception
from tensorflow.keras.models import Model, Sequential

from tensorflow.keras.utils import plot_model
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
import seaborn as sns
import matplotlib.pyplot as plt
tf.__version__, np.__version__

In [None]:
# Setting random seeds to enable consistency while testing.
random.seed(5)
np.random.seed(5)
tf.random.set_seed(5)

ROOT = "/kaggle/input/face-recognition-dataset/Extracted Faces/Extracted Faces"
# function to read an image and resize it
def read_image(index):
    path = os.path.join(ROOT, index[0], index[1])
    image = cv2.imread(path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image=cv2.resize(image, (128,128), interpolation = cv2.INTER_AREA)
    return image

In [None]:
# splitting the data so that we can have a subset of people that have not been seen by the model
def split_dataset(directory, split=0.9):
    folders = os.listdir(directory)
    num_train = int(len(folders)*split)
    
    random.shuffle(folders)
    
    train_list, test_list = {}, {}
    
    # Creating Train-list
    for folder in folders[:num_train]:
        num_files = len(os.listdir(os.path.join(directory, folder)))
        train_list[folder] = num_files
    
    # Creating Test-list
    for folder in folders[num_train:]:
        num_files = len(os.listdir(os.path.join(directory, folder)))
        test_list[folder] = num_files  
    
    return train_list, test_list

train_list, test_list = split_dataset(ROOT, split=0.9)
print("Length of training list:", len(train_list))
print("Length of testing list :", len(test_list))


In [None]:
#function used to make triplets from the data, returns anchor, positive example and negative example
def create_triplets(directory, folder_list, max_files=10):
    triplets = []
    folders = list(folder_list.keys())
    
    for folder in folders:
        path = os.path.join(directory, folder)
        files = list(os.listdir(path))[:max_files]
        num_files = len(files)
        
        for i in range(0,num_files-1):
            for j in range(i+1, num_files):
                anchor = (folder, f"{i}.jpg")
                positive = (folder, f"{j}.jpg")

                neg_folder = folder
                while neg_folder == folder:
                    neg_folder = random.choice(folders)
                neg_file = random.randint(0, folder_list[neg_folder]-1)
                negative = (neg_folder, f"{neg_file}.jpg")

                triplets.append((anchor, positive, negative))
            
    random.shuffle(triplets)
    return triplets

In [None]:
train_triplet = create_triplets(ROOT, train_list)
test_triplet  = create_triplets(ROOT, test_list)

print("Number of training triplets:", len(train_triplet))
print("Number of testing triplets :", len(test_triplet))

print("\nExamples of triplets:")
for i in range(5):
    print(train_triplet[i])

In [None]:
#function used get batch for training or testing the network
def get_batch(triplet_list, batch_size=256, preprocess=True):
    batch_steps = len(triplet_list)//batch_size
    
    for i in range(batch_steps+1):
        anchor   = []
        positive = []
        negative = []
        
        j = i*batch_size
        while j<(i+1)*batch_size and j<len(triplet_list):
            a, p, n = triplet_list[j]
            anchor.append(read_image(a))
            positive.append(read_image(p))
            negative.append(read_image(n))
            j+=1
            
        anchor = np.array(anchor)
        positive = np.array(positive)
        negative = np.array(negative)
        
        if preprocess:
            anchor = preprocess_input(anchor)
            positive = preprocess_input(positive)
            negative = preprocess_input(negative)
        
        yield ([anchor, positive, negative])

In [None]:
num_plots = 6

f, axes = plt.subplots(num_plots, 3, figsize=(15, 20))

for x in get_batch(train_triplet, batch_size=num_plots, preprocess=False):
    a,p,n = x
    for i in range(num_plots):
        axes[i, 0].imshow(a[i])
        axes[i, 1].imshow(p[i])
        axes[i, 2].imshow(n[i])
        i+=1
    break

In [None]:
#This function is used to get the ppre-trained encoder architecture from keras 
#and also we add some more layers on top of it 
def get_encoder(input_shape):
    """ Returns the image encoding model """

    pretrained_model = Xception(
        input_shape=input_shape,
        weights='imagenet',
        include_top=False,
        pooling='avg',
    )
    #we freeze the model weights asit is already trained for faster training
    for i in range(len(pretrained_model.layers)-27):
        pretrained_model.layers[i].trainable = False

    encode_model = Sequential([
        pretrained_model,
        layers.Flatten(),
        layers.Dense(512, activation='relu'),
        layers.BatchNormalization(),
        layers.Dense(256, activation="relu"),
        layers.Lambda(lambda x: tf.math.l2_normalize(x, axis=1))
    ], name="Encode_Model")
    return encode_model

In [None]:
#This layer is used to get distance between samples, it is used in loss function
class DistanceLayer(layers.Layer):
    # A layer to compute ‖f(A) - f(P)‖² and ‖f(A) - f(N)‖²
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def call(self, anchor, positive, negative):
        ap_distance = tf.reduce_sum(tf.square(anchor - positive), -1)
        an_distance = tf.reduce_sum(tf.square(anchor - negative), -1)
        return (ap_distance, an_distance)
    
#Here we desgin a siemese network around a encoder
def get_siamese_network(input_shape = (128, 128, 3)):
    encoder = get_encoder(input_shape)
    
    # Input Layers for the images
    anchor_input   = layers.Input(input_shape, name="Anchor_Input")
    positive_input = layers.Input(input_shape, name="Positive_Input")
    negative_input = layers.Input(input_shape, name="Negative_Input")
    
    ## Generate the encodings (feature vectors) for the images
    encoded_a = encoder(anchor_input)
    encoded_p = encoder(positive_input)
    encoded_n = encoder(negative_input)
    
    # A layer to compute ‖f(A) - f(P)‖² and ‖f(A) - f(N)‖²
    distances = DistanceLayer()(
        encoder(anchor_input),
        encoder(positive_input),
        encoder(negative_input)
    )
    
    # Creating the Model
    siamese_network = Model(
        inputs  = [anchor_input, positive_input, negative_input],
        outputs = distances,
        name = "Siamese_Network"
    )
    return siamese_network

siamese_network = get_siamese_network()
siamese_network.summary()

In [None]:
plot_model(siamese_network, show_shapes=True, show_layer_names=True)

In [None]:
#Here we define our siamese model and how it will be trained
#We define what will happen during the forward operation and also how loss function is applied
class SiameseModel(Model):
    # Builds a Siamese model based on a base-model
    def __init__(self, siamese_network, margin=1.0):
        super(SiameseModel, self).__init__()
        
        self.margin = margin
        self.siamese_network = siamese_network
        self.loss_tracker = metrics.Mean(name="loss")

    def call(self, inputs):
        return self.siamese_network(inputs)

    def train_step(self, data):
        # GradientTape get the gradients when we compute loss, and uses them to update the weights
        with tf.GradientTape() as tape:
            loss = self._compute_loss(data)
            
        gradients = tape.gradient(loss, self.siamese_network.trainable_weights)
        self.optimizer.apply_gradients(zip(gradients, self.siamese_network.trainable_weights))
        
        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}

    def test_step(self, data):
        loss = self._compute_loss(data)
        
        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}

    def _compute_loss(self, data):
        # Get the two distances from the network, then compute the triplet loss
        ap_distance, an_distance = self.siamese_network(data)
        loss = tf.maximum(ap_distance - an_distance + self.margin, 0.0)
        return loss

    @property
    def metrics(self):
        # We need to list our metrics so the reset_states() can be called automatically.
        return [self.loss_tracker]

In [None]:
siamese_model = SiameseModel(siamese_network)

optimizer = Adam(learning_rate=1e-3, epsilon=1e-01)
siamese_model.compile(optimizer=optimizer)

In [None]:
#This function is used while training to get the test scores on test set
def test_on_triplets(batch_size = 256):
    pos_scores, neg_scores = [], []

    for data in get_batch(test_triplet, batch_size=batch_size):
        prediction = siamese_model.predict(data)
        pos_scores += list(prediction[0])
        neg_scores += list(prediction[1])
    
    accuracy = np.sum(np.array(pos_scores) < np.array(neg_scores)) / len(pos_scores)
    ap_mean = np.mean(pos_scores)
    an_mean = np.mean(neg_scores)
    ap_stds = np.std(pos_scores)
    an_stds = np.std(neg_scores)
    
    print(f"Accuracy on test = {accuracy:.5f}")
    return (accuracy, ap_mean, an_mean, ap_stds, an_stds)

In [None]:
#training the model
save_all = False
epochs = 30
batch_size = 128

max_acc = 0
train_loss = []
test_metrics = []

for epoch in range(1, epochs+1):
    t = time.time()
    
    # Training the model on train data
    epoch_loss = []
    for data in get_batch(train_triplet, batch_size=batch_size):
        loss = siamese_model.train_on_batch(data)
        epoch_loss.append(loss)
    epoch_loss = sum(epoch_loss)/len(epoch_loss)
    train_loss.append(epoch_loss)

    print(f"\nEPOCH: {epoch} \t (Epoch done in {int(time.time()-t)} sec)")
    print(f"Loss on train    = {epoch_loss:.5f}")
    
    # Testing the model on test data
    metric = test_on_triplets(batch_size=batch_size)
    test_metrics.append(metric)
    accuracy = metric[0]
    
    # Saving the model weights
    if save_all or accuracy>=max_acc:
        siamese_model.save("siamese_model_final_wof/",save_format='tf') 
        max_acc = accuracy

# Saving the model after all epochs run
siamese_model.save("siamese_model_final_wof/",save_format='tf') 

In [None]:
# import shutil
# shutil.make_archive("/kaggle/working/wwfinal_siamese_model_final_wof_saved", 'zip', "/kaggle/working/siamese_model_final_wof")

In [None]:
#Extracting encoder which will be used to generate embeddings or features
def extract_encoder(model):
    encoder = get_encoder((128, 128, 3))
    i=0
    for e_layer in model.layers[0].layers[3].layers:
        layer_weight = e_layer.get_weights()
        encoder.layers[i].set_weights(layer_weight)
        i+=1
    return encoder

encoder = extract_encoder(siamese_model)
encoder.save("encoder_ourfaces.h5")
encoder.summary()

In [None]:
# encoder=tf.keras.models.load_model('/kaggle/input/siamese-encoder/encoder_siamese.h5')

In [None]:
#Functions to get FAR and FRR values (coded by student)
def get_FAR(face_list1, face_list2, threshold=1.7):
    # Getting the encodings for the passed faces
    tensor1 = encoder.predict(face_list1)
    tensor2 = encoder.predict(face_list2)
    distance = np.sum(np.square(tensor1-tensor2), axis=-1)
    FAR = np.where(distance<=threshold, 1, 0)
    return FAR

def get_FRR(face_list1, face_list2, threshold=1.3):
    # Getting the encodings for the passed faces
    tensor1 = encoder.predict(face_list1)
    tensor2 = encoder.predict(face_list2)
    distance = np.sum(np.square(tensor1-tensor2), axis=-1)
    FRR = np.where(distance>threshold, 1, 0)
    return FRR

In [None]:
#Get FAR and FRR values for a certain threshold value intervals
pos_list = np.array([])
neg_list = np.array([])
FRR = []
FAR= []
thresold_vals=list(np.arange(1, 1.9+0.2, 0.2))


for data in get_batch(test_triplet, batch_size=256):
        a, p, n = data
        for th in thresold_vals:
            pos_list = np.append(pos_list, get_FRR(a, p,threshold=th))
            neg_list = np.append(neg_list, get_FAR(a, n,threshold=th))
            FRR_rate=sum(pos_list)/len(pos_list)
            FAR_rate=sum(neg_list)/len(neg_list)
            FRR.append(FRR_rate)
            FAR.append(FAR_rate)
        break
    
    

In [None]:
#plot the FAR and FRR plots
import matplotlib.pyplot as plt

# Your lists for False Rejection Rate (FRR) and False Acceptance Rate (FAR)

# Your thresholds
thresholds = thresold_vals

# Create the plot
plt.plot(thresholds, FRR, label='FRR')
plt.plot(thresholds, FAR, label='FAR')

# Add labels and legend
plt.xlabel('Threshold')
plt.ylabel('Rate')
plt.legend()
plt.title('FRR vs FAR on test subset')

# Show the plot
plt.show()

# Get embeddings for face to make face encoding Database

In [None]:
def get_folder_names(path):
    return [os.path.join(path, d) for d in os.listdir(path) if os.path.isdir(os.path.join(path, d))]

def get_images(n):
    folder_names = get_folder_names('/kaggle/input/face-recog-test/Biometric_data/')
    images=[]
    for folder in folder_names:
        images_in_folder = os.listdir(folder)
        chosen_images = random.sample(images_in_folder, n)
        images += [os.path.join(folder, image) for image in chosen_images]

    return images


In [None]:
encodings=[]
names=[]
for i in get_images(2):
    ttimages=cv2.imread(i)
    ttimages = cv2.cvtColor(ttimages, cv2.COLOR_BGR2RGB)
    ttimages=cv2.resize(ttimages, (128,128), interpolation = cv2.INTER_AREA)
    ttimages=np.expand_dims(ttimages, axis=0)
    ttimages=preprocess_input(ttimages)
    enc1=encoder(ttimages)
    encodings.append(enc1[0])
    names.append(os.path.basename(os.path.dirname(i)))

In [None]:
# distance = np.sum(np.square(encodings[5]-encodings[4]), axis=-1)
# distance

In [None]:
encodings=np.stack(encodings, axis=0)
names=np.array(names)
np.savez('names_en.npz', names=names)
np.savez('encodings_en.npz', names=encodings)

In [None]:
# loaded_name = np.load('/kaggle/working/names.npz')
# names_array = loaded_name['names']

# loaded_enc = np.load('/kaggle/working/encodings.npz')
# encoding = loaded_enc['names']
# print(encoding)

## Convert model to ONNX format for faster inference

In [None]:
!pip install onnxruntime
!pip install -U tf2onnx

In [None]:
# (coded by student)
import tf2onnx
import onnxruntime as rt

spec = (tf.TensorSpec((None, 128, 128, 3), tf.float32, name="input"),)
output_path = "face-recog_enc" + ".onnx"

model_proto, _ = tf2onnx.convert.from_keras(encoder, input_signature=spec, opset=13, output_path=output_path)
output_names = [n.name for n in model_proto.graph.output]

In [None]:
providers = ['CPUExecutionProvider']
m = rt.InferenceSession('/kaggle/working/face-recog.onnx', providers=providers)
onnx_pred = m.run(output_names, {"input": a})

# print('ONNX Predicted:',onnx_pred)

# make sure ONNX and keras have the same results
# np.testing.assert_allclose(pred, onnx_pred[0], rtol=1e-5)

In [None]:
#preprocess function should be used mandatory for getting enccoding at inference
def c_preprocess_input(x):
    x = x.astype('float32')
    x /= 255.
    x -= 0.5
    x *= 2.
    return x
