In [None]:
import torch 

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')


In [None]:
dataset = "/kaggle/input/face-recognition-dataset/Extracted Faces/Extracted Faces"

def read_image(index):
    path = os.path.join(dataset, index[0], index[1])
    image = cv2.imread(path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    return image

In [None]:
import os
import random

def split_dataset(path, split=0.9):
    
    folders = os.listdir(path)
    
    num_train = int(len(folders) * split)
    
    random.shuffle(folders)
    
    train_list, test_list = {}, {}
    
    for folder in folders[:num_train]:
        folder_path = os.path.join(path, folder)
        num_files = len(os.listdir(folder_path))
        train_list[folder] = num_files
       
    for folder in folders[num_train:]:
        folder_path = os.path.join(path, folder)
        num_files = len(os.listdir(folder_path))
        test_list[folder] = num_files
        
    
    return train_list, test_list


train_list, test_list = split_dataset(dataset, split=0.90)

In [None]:

def create_triplets(directory, folder_list, max_files=10):
    
    triplets = []
    folders = list(folder_list.keys())
    
    for folder in folders:
        path = os.path.join(directory, folder)
        files = os.listdir(path)
        random.shuffle(files)  
        files = files[:max_files]  
        num_files = len(files)
        
        for i in range(num_files - 1):
            for j in range(i + 1, num_files):
                anchor = (folder, files[i])
                positive = (folder, files[j])

                neg_folder = random.choice([f for f in folders if f != folder])
                neg_file = random.choice(os.listdir(os.path.join(directory, neg_folder)))
                
                negative = (neg_folder, neg_file)
                triplets.append((anchor, positive, negative))
    
    random.shuffle(triplets)  
    return triplets



In [None]:
train_triplet = create_triplets(dataset, train_list)
test_triplet  = create_triplets(dataset, test_list)

In [None]:
def get_batch(triplet_list, batch_size=100, preprocess=True):
   
    batch_steps = len(triplet_list) // batch_size

    for i in range(batch_steps + 1):
        anchor = []
        positive = []
        negative = []
        
        start_index = i * batch_size
        end_index = min((i + 1) * batch_size, len(triplet_list))

        for j in range(start_index, end_index):
            a, p, n = triplet_list[j]
            anchor.append(read_image(a))
            positive.append(read_image(p))
            negative.append(read_image(n))
        
        anchor = np.array(anchor)
        positive = np.array(positive)
        negative = np.array(negative)
        
        if preprocess:
            anchor = preprocess_input(anchor)
            positive = preprocess_input(positive)
            negative = preprocess_input(negative)
        
        yield ([anchor, positive, negative])

In [None]:
def get_encoder(input_shape):
  
    pretrained_model = Xception(
        input_shape=input_shape,
        weights='imagenet',
        include_top=False,
        pooling='avg',
    )
    
    for layer in pretrained_model.layers[:-27]:
        layer.trainable = False

    encode_model = Sequential([
        pretrained_model,
        layers.Flatten(),
        layers.Dense(512, activation='relu'),
        layers.BatchNormalization(),
        layers.Dense(256, activation="relu"),
        layers.Lambda(lambda x: tf.math.l2_normalize(x, axis=1))
    ], name="Encode_Model")
    
    return encode_model

In [None]:
class DistanceLayer(layers.Layer):
    # A layer to compute ‖f(A) - f(P)‖² and ‖f(A) - f(N)‖²
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def call(self, anchor, positive, negative):
        ap_distance = tf.reduce_sum(tf.square(anchor - positive), -1)
        an_distance = tf.reduce_sum(tf.square(anchor - negative), -1)
        return (ap_distance, an_distance)
    

def get_siamese_network(input_shape = (128, 128, 3)):
    encoder = get_encoder(input_shape)
    
    # Input Layers for the images
    anchor_input   = layers.Input(input_shape, name="Anchor_Input")
    positive_input = layers.Input(input_shape, name="Positive_Input")
    negative_input = layers.Input(input_shape, name="Negative_Input")
    
    encoded_a = encoder(anchor_input)
    encoded_p = encoder(positive_input)
    encoded_n = encoder(negative_input)
    
    distances = DistanceLayer()(
        encoder(anchor_input),
        encoder(positive_input),
        encoder(negative_input)
    )
    
    siamese_network = Model(
        inputs  = [anchor_input, positive_input, negative_input],
        outputs = distances,
        name = "Siamese_Network"
    )
    return siamese_network

siamese_network = get_siamese_network()
siamese_network.summary()

In [None]:
plot_model(siamese_network, show_shapes=True, show_layer_names=True)

In [None]:
class SiameseModel(Model):
    # Builds a Siamese model based on a base-model
    def __init__(self, siamese_network, margin=4):
        super(SiameseModel, self).__init__()
        
        # Marginea folosită în calculul pierderii triplet
        self.margin = margin
        
        # Rețeaua siameză pe care o vom antrena
        self.siamese_network = siamese_network
        
        # Un metric pentru a urmări pierderea medie în timpul antrenamentului
        self.loss_tracker = metrics.Mean(name="loss")

    # Metoda call definește comportamentul modelului la efectuarea unei treceri înainte
    def call(self, inputs):
        return self.siamese_network(inputs)

    # Metoda train_step definește un singur pas de antrenament
    def train_step(self, data):
        # GradientTape înregistrează toate operațiile pentru a calcula gradientii.
        with tf.GradientTape() as tape:
            # Calculăm pierderea folosind datele de antrenament.
            loss = self._compute_loss(data)
            
        # Calculăm gradientii pierderii față de toate greutățile antrenabile ale rețelei.
        gradients = tape.gradient(loss, self.siamese_network.trainable_weights)
        
        # Aplicăm gradientii la greutăți pentru a le actualiza și pentru a reduce pierderea.
        self.optimizer.apply_gradients(zip(gradients, self.siamese_network.trainable_weights))
        
        # Actualizăm starea pierderii pentru a ține evidența pierderii medii.
        self.loss_tracker.update_state(loss)
        
        # Returnăm pierderea medie pentru a monitoriza performanța modelului în timpul antrenamentului.
        return {"loss": self.loss_tracker.result()}

    # Metoda test_step definește un singur pas de testare/validare
    def test_step(self, data):
        # Calculăm pierderea folosind datele de testare.
        loss = self._compute_loss(data)
        
        # Actualizăm starea pierderii pentru a ține evidența pierderii medii.
        self.loss_tracker.update_state(loss)
        
        # Returnăm pierderea medie pentru a monitoriza performanța modelului în timpul testării/validării.
        return {"loss": self.loss_tracker.result()}

    # Metoda _compute_loss definește modul de calcul al pierderii
    def _compute_loss(self, data):
        # Obținem cele două distanțe de la rețeaua siameză: ancoră-pozitiv și ancoră-negativ
        ap_distance, an_distance = self.siamese_network(data)
        
        # Calculăm pierderea triplet folosind distanțele și marginea
        loss = tf.maximum(ap_distance - an_distance + self.margin, 0.0)
        
        # Returnăm pierderea
        return loss

    @property
    def metrics(self):
        # Listăm metricile noastre pentru ca reset_states() să fie apelat automat.
        return [self.loss_tracker]


In [None]:
siamese_model = SiameseModel(siamese_network)
optimizer = Adam(learning_rate=1e-3, epsilon=1e-01)
siamese_model.compile(optimizer=optimizer)

In [None]:
def test_on_triplets(data_triplet, batch_size = 256):
    pos_scores, neg_scores = [], []

    for data in get_batch(data_triplet, batch_size=batch_size):
        prediction = siamese_model.predict(data)
        pos_scores += list(prediction[0])
        neg_scores += list(prediction[1])
    
    accuracy = np.sum(np.array(pos_scores) < np.array(neg_scores)) / len(pos_scores)
    
    return accuracy

In [None]:
save_all = False
epochs = 5
batch_size = 128

max_acc = 0
train_loss = []
val_loss = []
train_acc = []
val_acc = []

with tf.device('/gpu:0'):
    for epoch in range(1, epochs+1):
        t = time.time()

        # Training the model on train data
        epoch_train_loss = []
        for data in get_batch(train_triplet, batch_size=batch_size):
            loss = siamese_model.train_on_batch(data)
            epoch_train_loss.append(loss)
        epoch_train_loss = sum(epoch_train_loss)/len(epoch_train_loss)
        train_loss.append(epoch_train_loss)

        epoch_val_loss = []
        for data in get_batch(test_triplet, batch_size=batch_size):
            loss = siamese_model.train_on_batch(data)
            epoch_val_loss.append(loss)
        epoch_val_loss = sum(epoch_val_loss)/len(epoch_val_loss)
        val_loss.append(epoch_val_loss)

        print(f"\nEPOCH: {epoch} \t (Epoch done in {int(time.time()-t)} sec)")
        print(f"Loss on train : {epoch_train_loss:.5f} | Loss on val : {epoch_val_loss:.5f}")
        
        acc_train = test_on_triplets(train_triplet, batch_size=batch_size)
        train_acc.append(acc_train)
        # Testing the model on test data
        acc_val = test_on_triplets(test_triplet, batch_size=batch_size)
        val_acc.append(acc_val)
        accuracy = acc_val

        # Saving the model weights
        if save_all or accuracy>=max_acc:
            siamese_model.save_weights("siamese_model.h5")
            max_acc = accuracy
            
        print(f"Accuracy on train = {acc_train:.5f} | Accuracy on test = {accuracy:.5f}")

# Saving the model after all epochs run
siamese_model.save_weights("siamese_model-final.h5")

In [None]:
def plot_metrics(loss_train, loss_val, accuracy):
    plt.figure(figsize=(15,5))
    
    # Plotting the loss over epochs
    plt.subplot(121)
    plt.plot(train_loss, 'b', label='Train Loss')
    plt.plot(val_loss, 'r', label='Val Loss')
    plt.title('Loss')
    plt.legend()
    
    # Plotting the accuracy over epochs
    plt.subplot(122)
    plt.plot(train_acc, 'b', label='Train Accuracy')
    plt.plot(val_acc, 'r', label='Val Accuracy')

    plt.title('Accuracy')
    plt.legend()
    
    plt.figure(figsize=(15,5))

val_acc = np.array(val_acc)
plot_metrics(train_loss, val_loss, val_acc)