In [26]:
import os
import cv2
import random
import numpy as np
import glob

import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split

from PIL import Image as PILImage
from PIL.ExifTags import TAGS

from IPython.display import display

import tensorflow as tf
from sklearn.model_selection import train_test_split
from tensorflow.keras import layers, models
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.utils import Sequence

from keras.layers import Layer

In [27]:
BATCH_SIZE = 32
IMAGE_SIZE = (256, 256)

In [46]:
def imshow(a, size=1.0):
    # Clip and convert the image to uint8
    a = a.clip(0, 255).astype("uint8")
    
    # Resize the image if a size factor is provided
    if size != 1.0:
        new_dim = (int(a.shape[1] * size), int(a.shape[0] * size))
        a = cv2.resize(a, new_dim, interpolation=cv2.INTER_AREA)
    
    
    # Display the image
    display(PILImage.fromarray(a))

In [29]:
def get_label(file_path):
    label = file_path.split("/")[-2]
    label = label.split(".")[-2]
    label = int(label)
    return label

In [37]:
def read_image(file_path):
    img = cv2.imread(file_path)
    img = cv2.resize(img, IMAGE_SIZE)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    return img

In [31]:
def generate_triplets(file_paths, labels):
    label_to_indices = {}
    for idx, label in enumerate(labels):
        if label not in label_to_indices:
            label_to_indices[label] = []
        label_to_indices[label].append(idx)
    
    triplets = []
    for i in range(len(file_paths)):
        # Select an anchor image and its label
        anchor_idx = i
        anchor_label = labels[anchor_idx]

        # Select a positive image (same label)
        positive_idx = random.choice(label_to_indices[anchor_label])
        while positive_idx == anchor_idx:
            positive_idx = random.choice(label_to_indices[anchor_label])

        # Select a negative image (different label)
        negative_label = random.choice([l for l in label_to_indices.keys() if l != anchor_label])
        negative_idx = random.choice(label_to_indices[negative_label])

        triplets.append((file_paths[anchor_idx], file_paths[positive_idx], file_paths[negative_idx]))
    
    return triplets

In [32]:
class DataGenerator(Sequence):
    def __init__(self, triplets, batch_size, image_size, **kwargs):
        super().__init__(**kwargs)
        self.triplets = triplets
        self.batch_size = batch_size
        self.image_size = image_size
      
    def __len__(self):
        return len(self.triplets) // self.batch_size

    def __getitem__(self, index):
        # Get batch of triplets
        batch_triplets = self.triplets[index * self.batch_size : (index + 1) * self.batch_size]
        
        # Prepare arrays for images
        anchors, positives, negatives = [], [], []
        for anchor_path, positive_path, negative_path in batch_triplets:
            # Load and normalize images
            anchors.append(read_image(anchor_path) / 255.0)
            positives.append(read_image(positive_path) / 255.0)
            negatives.append(read_image(negative_path) / 255.0)

        # Convert lists to arrays and return
        anchors = np.array(anchors)
        positives = np.array(positives)
        negatives = np.array(negatives)
        
     
        
        #print(f"Anchors shape: {anchors.shape}, Positives shape: {positives.shape}, Negatives shape: {negatives.shape}")
        return (anchors, positives, negatives), np.zeros((self.batch_size, 1))
        

In [33]:
data_folder = "Data/00*"
image_files = glob.glob(os.path.join(data_folder, "*.jpg"), recursive=True)

labels = [get_label(file_path) for file_path in image_files]

train_x, val_x, train_y, val_y = train_test_split(image_files, labels, test_size=0.2, random_state=42, stratify=labels)   

In [34]:
train_triplets = generate_triplets(train_x, train_y)
val_triplets = generate_triplets(val_x, val_y)

train_gen = DataGenerator(train_triplets, batch_size=BATCH_SIZE, image_size=(256, 256, 3))
val_gen = DataGenerator(val_triplets, batch_size=BATCH_SIZE, image_size=(256, 256, 3))

In [39]:
class L2Normalization(Layer):
    def call(self, inputs):
        return tf.math.l2_normalize(inputs, axis=1)

In [40]:
def build_resnet_embedding_network(input_shape=(256, 256, 3), embedding_dim=128):
    # Load pre-trained ResNet50 without the top layer
    base_model = ResNet50(weights="imagenet", include_top=False, input_shape=input_shape)
    
    # Freeze the base model's weights (optional, for fine-tuning later)
    base_model.trainable = False
    
    # Add custom layers for embedding extraction
    model = models.Sequential([
        base_model,
        layers.GlobalAveragePooling2D(),
        layers.Dense(embedding_dim, activation='relu'),  # Embedding dimension
        L2Normalization()  # L2 normalize embeddings
    ])
    
    return model

# Create the embedding network
embedding_model = build_resnet_embedding_network()
embedding_model.summary()

In [41]:
def build_siamese_model_with_resnet(embedding_model, input_shape=(256, 256, 3)):
    # Inputs for anchor, positive, and negative images
    anchor_input = layers.Input(name="anchor", shape=input_shape)
    positive_input = layers.Input(name="positive", shape=input_shape)
    negative_input = layers.Input(name="negative", shape=input_shape)
    

    # Pass each input through the embedding network
    anchor_embedding = embedding_model(anchor_input)
    positive_embedding = embedding_model(positive_input)
    negative_embedding = embedding_model(negative_input)

    embeddings = layers.Lambda(lambda x: tf.concat(x, axis=1))(
        [anchor_embedding, positive_embedding, negative_embedding]
    )
    
    # Combine embeddings into a Siamese model
    siamese_model = models.Model(
        inputs=[anchor_input, positive_input, negative_input],
        outputs=embeddings
    )

    return siamese_model

siamese_model = build_siamese_model_with_resnet(embedding_model)
siamese_model.summary()

In [42]:
def triplet_loss(y_true, y_pred, margin=0.2):
    # Split y_pred into anchor, positive, and negative
    anchor, positive, negative = tf.split(y_pred, num_or_size_splits=3, axis=1)
    
    # # Compute distances
    pos_dist = tf.reduce_sum(tf.square(anchor - positive), axis=1)  # L2 distance
    neg_dist = tf.reduce_sum(tf.square(anchor - negative), axis=1)
    

    # # Compute triplet loss
    loss = tf.maximum(pos_dist - neg_dist + margin, 0.0)
    return tf.reduce_mean(loss)

In [13]:
from tensorflow.keras.callbacks import LambdaCallback
import matplotlib.pyplot as plt

def inspect_outputs(epoch, logs):
    # Fetch the first batch of data from the generator
    inputs, _ = train_gen[0]
    
    # Get model predictions for the batch
    pred = siamese_model.predict(inputs, verbose=0)
    
    anchors = pred[:, :128]
    positives = pred[:, 128:256]
    negatives = pred[:, 256:]
    
    # Print the embeddings for debugging
    print(f"Epoch {epoch + 1}:")
    print(f"Anchor embedding sample: {anchors[0]}")
    print(f"Positive embedding sample: {positives[0]}")
    print(f"Negative embedding sample: {negatives[0]}")

output_inspector = LambdaCallback(on_epoch_end=inspect_outputs)

In [43]:
siamese_model.compile(optimizer='adam', loss=triplet_loss)

# Train the model
siamese_model.fit(
    train_gen,
    validation_data=val_gen,
    epochs=10
)

Epoch 1/10




[1m28/28[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m55s[0m 1s/step - loss: 0.1895 - val_loss: 0.1694
Epoch 2/10
[1m28/28[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m32s[0m 1s/step - loss: 0.1667 - val_loss: 0.1569
Epoch 3/10
[1m28/28[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m30s[0m 1s/step - loss: 0.1572 - val_loss: 0.1597
Epoch 4/10
[1m28/28[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m32s[0m 1s/step - loss: 0.1413 - val_loss: 0.1610
Epoch 5/10
[1m28/28[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m28s[0m 983ms/step - loss: 0.1506 - val_loss: 0.1542
Epoch 6/10
[1m28/28[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m28s[0m 994ms/step - loss: 0.1395 - val_loss: 0.1606
Epoch 7/10
[1m28/28[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m30s[0m 1s/step - loss: 0.1401 - val_loss: 0.1656
Epoch 8/10
[1m28/28[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m29s[0m 1s/step - loss: 0.1258 - val_loss: 0.1666
Epoch 9/10
[1m28/28[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[

<keras.src.callbacks.history.History at 0x7f1a4a88f2b0>

In [44]:
siamese_model.save("siamese_resnet_model.keras")

In [45]:
embedding_model.save("embedding_resnet_model.keras")