In [34]:
import matplotlib.pyplot as plt
import numpy as np
import os
import random
import tensorflow as tf
from keras.layers import Dense, Dropout, BatchNormalization, GlobalAveragePooling2D, Flatten
from keras import Model
import keras
import warnings

warnings.filterwarnings("ignore")

In [177]:

def build_embedding_generator(k_layers_to_tune=10):

    base_model = tf.keras.applications.EfficientNetB1(weights="imagenet", 
                                                      input_shape=(100, 100, 3),
                                                      include_top = False)

    for l in base_model.layers[:-k_layers_to_tune]:
        l.trainable = False
    
    x = base_model.output
    x = GlobalAveragePooling2D()(x)
    x = Flatten()(x)
    x = Dense(512, activation="relu")(x)
    x = BatchNormalization()(x)
    x = Dropout(0.2)(x)
    x = Dense(256, activation="relu")(x)
    x = BatchNormalization()(x)
    x = Dense(128, activation="linear")(x)
    x = tf.nn.l2_normalize(x, axis=1)
    
    embedding_model = Model(base_model.input, x, name="Embedding")

    return embedding_model


In [194]:
embedding_model = build_embedding_generator(10)

In [195]:
embedding_model.summary()

Model: "Embedding"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_15 (InputLayer)       [(None, 100, 100, 3)]        0         []                            
                                                                                                  
 rescaling_28 (Rescaling)    (None, 100, 100, 3)          0         ['input_15[0][0]']            
                                                                                                  
 normalization_14 (Normaliz  (None, 100, 100, 3)          7         ['rescaling_28[0][0]']        
 ation)                                                                                           
                                                                                                  
 rescaling_29 (Rescaling)    (None, 100, 100, 3)          0         ['normalization_14[0][

### Custom layers & Model

In [180]:
class DistanceLayer(tf.keras.layers.Layer):
    """
    This layer is responsible for computing the distance between the anchor
    embedding and the positive embedding, and the anchor embedding and the
    negative embedding.
    """

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def call(self, anchor, positive, negative):

        anchor_pos_distance = tf.reduce_sum(tf.square(anchor - positive))
        anchor_neg_distance = tf.reduce_sum(tf.square(anchor - negative))

        return (anchor_pos_distance, anchor_neg_distance)


def build_siamesenetwork(embedding_model):

    anchor_input = keras.layers.Input(name="anchor", shape=(100, 100, 3))
    pos_input = keras.layers.Input(name="positive", shape=(100, 100, 3))
    neg_input = keras.layers.Input(name="negative", shape=(100, 100, 3))

    distances = DistanceLayer()(
        embedding_model(anchor_input),
        embedding_model(pos_input),
        embedding_model(neg_input)
    )

    siamese_network = Model(
            inputs=[anchor_input, pos_input, neg_input],
            outputs=distances
    )

    return siamese_network



In [181]:
class SiameseModel(Model):
    """The Siamese Network model with a custom training and testing loops.

    Computes the triplet loss using the three embeddings produced by the
    Siamese Network.

    The triplet loss is defined as:
       L(A, P, N) = max(‖f(A) - f(P)‖² - ‖f(A) - f(N)‖² + margin, 0)
    """
    def __init__(self, siamese_network, margin=0.5):
        super().__init__()
        self.siamese_network = siamese_network
        self.margin = margin
        self.loss_tracker = keras.metrics.Mean(name="loss")

    def call(self, inputs):
        return self.siamese_network(inputs)

    

    def train_step(self, data):

        with tf.GradientTape() as tape:
            loss = self._compute_loss(data)

        gradients = tape.gradient(loss, self.siamese_network.trainable_weights)
        self.optimizer.apply_gradients(zip(gradients, self.siamese_network.trainable_weights))
        
        self.loss_tracker.update_state(loss)

        return {"loss" : self.loss_tracker.result()}

    def _compute_loss(self, data):

        ap_distance, an_distance = self.siamese_network(data)

        loss = ap_distance - an_distance
        #loss = an_distance - ap_distance
        loss = tf.maximum(loss + self.margin, 0.0)
        return loss

    def test_step(self, data):
        loss = self._compute_loss(data)
        self.loss_tracker.update_state(loss)

        return {"loss" : self.loss_tracker.result()}

    @property
    def metrics(self):

        return [self.loss_tracker]


In [196]:
siam_net = build_siamesenetwork(embedding_model)

In [197]:
siam_model = SiameseModel(siam_net, margin=0.8)

## UTILS


In [172]:
import pandas as pd
import numpy as np

In [173]:
PATH_TO_IMGS = "../images/"

triplets_df = pd.read_csv("../triplets.csv")




def parse_csv_line(line):
    columns = ['anchor', 'id1', 'pos', 'id2', 'neg', 'id3']
    
    # Decode the CSV line
    record_defaults = [''] * 6  # All fields are strings
    parsed_line = tf.io.decode_csv(line, record_defaults)
    parsed_line = dict(zip(columns, parsed_line))
    return parsed_line


def load_and_preprocess_image(path):
    
    image = tf.io.read_file(path)
    image = tf.image.decode_jpeg(image, channels=3)
    image = tf.image.resize(image, [100, 100])
    image = image / 255.0
    return image



def create_triplet_dataset(csv_file_path, batch_size=32):
    dataset = tf.data.TextLineDataset(csv_file_path)
    # Skip the header line
    dataset = dataset.skip(1)
    
    # Parse each line
    dataset = dataset.map(lambda line: parse_csv_line(line))
    # Load and preprocess the images
    def load_images(parsed_line):

        base_path = tf.constant(PATH_TO_IMGS)
        
        anchor_path = tf.strings.join([base_path, parsed_line['anchor']], separator='')
        pos_path = tf.strings.join([base_path, parsed_line['pos']], separator='')
        neg_path = tf.strings.join([base_path, parsed_line['neg']], separator='')

        
        anchor = load_and_preprocess_image(anchor_path)
        pos = load_and_preprocess_image(pos_path)
        neg = load_and_preprocess_image(neg_path)
        return anchor, pos, neg

    
    dataset = dataset.map(load_images)
    dataset = dataset.batch(batch_size)
    return dataset


In [184]:
dataset = create_triplet_dataset("../triplets.csv", batch_size=256)

In [185]:
batch = next(iter(dataset.take(5)))

In [198]:
siam_model.compile(optimizer=keras.optimizers.Adam(0.0001), 
                   )

siam_model.fit(batch, epochs=10)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.src.callbacks.History at 0x7ec301389450>

In [88]:
siam_model

<__main__.SiameseModel at 0x7ec389dbd720>

In [199]:
anch_embds = embedding_model.predict(batch[0])
pos_embds = embedding_model.predict(batch[1])
neg_emds = embedding_model.predict(batch[2])



In [206]:
cossim = keras.metrics.CosineSimilarity()

pos_sim = cossim(anch_embds[1], pos_embds[1])
neg_sim = cossim(anch_embds[1], neg_emds[1])

In [207]:
pos_sim

<tf.Tensor: shape=(), dtype=float32, numpy=0.9864649>

In [208]:
neg_sim

<tf.Tensor: shape=(), dtype=float32, numpy=0.9917867>