In [36]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten, Lambda
import tensorflow as tf
from tensorflow.keras import backend as K ,layers

In [2]:
def preprocess(file_path):
    byte_img = tf.io.read_file(file_path)
    img = tf.io.decode_jpeg(byte_img)
    img = tf.image.resize(img, (100,100))
    img = img / 255.0
    return img

def preprocess_twin(input_img, validation_img, label):
    return ((preprocess(input_img), preprocess(validation_img)), label)


def make_embedding(): 
    inp = Input(shape=(100,100,3), name='input_image')
    
    # First block
    c1 = Conv2D(64, (10,10), activation='relu')(inp)
    m1 = MaxPooling2D(64, (2,2), padding='same')(c1)
    
    # Second block
    c2 = Conv2D(128, (7,7), activation='relu')(m1)
    m2 = MaxPooling2D(64, (2,2), padding='same')(c2)
    
    # Third block 
    c3 = Conv2D(128, (4,4), activation='relu')(m2)
    m3 = MaxPooling2D(64, (2,2), padding='same')(c3)
    
    # Final embedding block
    c4 = Conv2D(256, (4,4), activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation='sigmoid')(f1)
    
    
    return Model(inputs=[inp], outputs=d1, name='embedding')

In [3]:
import os
from matplotlib import pyplot as plt


POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')


anchor = tf.data.Dataset.list_files(ANC_PATH+'\*.jpg').take(3000)
positive = tf.data.Dataset.list_files(POS_PATH+'\*.jpg').take(3000)
negative = tf.data.Dataset.list_files(NEG_PATH+'\*.jpg').take(3000)

  anchor = tf.data.Dataset.list_files(ANC_PATH+'\*.jpg').take(3000)
  positive = tf.data.Dataset.list_files(POS_PATH+'\*.jpg').take(3000)
  negative = tf.data.Dataset.list_files(NEG_PATH+'\*.jpg').take(3000)


In [4]:
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))


data = positives.concatenate(negatives)
data = data.map(lambda x,y,z: preprocess_twin(x,y,z))
data = data.cache()
data = data.shuffle(buffer_size=10000)

# Training partition


In [5]:
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

# Testing partition


In [6]:
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

# 4. Model Engineering


In [57]:
def contrastive_loss_with_margin(margin):
    def contrastive_loss(y_true, y_pred):
        square_pred = K.square(y_pred)
        margin_square = K.square(K.maximum(margin - y_pred, 0))
        return (y_true * square_pred + (1 - y_true) * margin_square)
    return contrastive_loss

def euclidean_distance(x, y):
    sum_square = K.sum(K.square(x - y), axis=1, keepdims=True)
    return K.sqrt(K.maximum(sum_square, K.epsilon()))

class SiameseModel(Model):
    def __init__(self, embedding_model, margin=1.0):
        super().__init__()
        self.embedding = embedding_model
        self.margin = margin
        self.loss_fn = contrastive_loss_with_margin(self.margin)
        self.loss_tracker = tf.keras.metrics.Mean(name="loss")
        #self.classifier = tf.keras.Sequential([
        #    layers.Dense(128, activation='relu'),
        #    layers.Dropout(0.3),
        #    layers.Dense(64, activation='relu'),
        #    layers.Dense(1, activation='sigmoid')  
        #])
        
    @tf.function
    def call(self, inputs, training=False):
        x1, x2 = inputs
        embed1 = self.embedding(x1)
        embed2 = self.embedding(x2)
        distance = euclidean_distance(embed1, embed2)
        return distance
        #return self.classifier(distance, training=training)


    @tf.function
    def train_step(self, data):
        (x1, x2), y = data
        with tf.GradientTape() as tape:
            distances = self((x1, x2), training=True)
            loss = self.loss_fn(y, distances)
            #loss = tf.keras.losses.binary_crossentropy(y, distances)

        gradients = tape.gradient(loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))
        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}

    @tf.function
    def test_step(self, data):
        (x1, x2), y = data
        distances = self((x1, x2), training=False)
        loss = self.loss_fn(y, distances)
        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}

    @property
    def metrics(self):
        return [self.loss_tracker]


In [58]:
siamese_model = SiameseModel(embedding_model=make_embedding(), margin=1.0)
siamese_model.compile(optimizer=tf.keras.optimizers.Adam())

In [59]:
siamese_model.fit(train_data, validation_data=test_data, epochs=3)

Epoch 1/3


Expected: ['input_image']
Received: inputs=Tensor(shape=(None, 100, 100, None))


[1m6/6[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m15s[0m 2s/step - loss: 2.1284 - val_loss: 0.5828
Epoch 2/3
[1m6/6[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 3s/step - loss: 0.5830 - val_loss: 0.6076
Epoch 3/3
[1m6/6[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 3s/step - loss: 0.5639 - val_loss: 0.5996


<keras.src.callbacks.history.History at 0x1e1f135c680>

In [55]:
def verify_and_show(model, img1_path, img2_path, threshold=0.5):
    img1_tensor = preprocess(img1_path)
    img2_tensor = preprocess(img2_path)
    img1_tensor = tf.expand_dims(img1_tensor, axis=0)
    img2_tensor = tf.expand_dims(img2_tensor, axis=0)

    distance = model((img1_tensor, img2_tensor), training=False).numpy()[0][0]

    print(f"\nImage1: {img1_path}")
    print(f"Image2: {img2_path}")
    print(f"Distance = {distance:.4f} | Threshold = {threshold}")

    label = " SAME" if distance < threshold else " DIFFERENT"
    print(f"Prediction: {label}")

    # Plot
    img1 = tf.image.decode_jpeg(tf.io.read_file(img1_path)).numpy()
    img2 = tf.image.decode_jpeg(tf.io.read_file(img2_path)).numpy()

    plt.figure(figsize=(6,3))
    plt.subplot(1,2,1); plt.imshow(img1); plt.title("Anchor"); plt.axis("off")
    plt.subplot(1,2,2); plt.imshow(img2); plt.title("Target"); plt.axis("off")
    plt.suptitle(f"{label} (Distance: {distance:.4f})")
    plt.tight_layout()
    plt.show()

In [None]:
for filename in os.listdir(ANC_PATH):
    if filename.endswith(".jpg"):
        anchor_path = os.path.join(ANC_PATH, filename)
        print(f"\nComparing: {filename}")
        positive_files = [f for f in os.listdir(POS_PATH) if f.endswith('.jpg')]
        positive_path = os.path.join(POS_PATH, positive_files[0])  

        verify_and_show(siamese_model, anchor_path, positive_path)

In [None]:
for filename in os.listdir(ANC_PATH):
    if filename.endswith(".jpg"):
        anchor_path = os.path.join(ANC_PATH, filename)
        print(f"\nComparing: {filename}")
        negative_files = [f for f in os.listdir(NEG_PATH) if f.endswith('.jpg')]
        negative_path = os.path.join(NEG_PATH, negative_files[15])  

        verify_and_show(siamese_model, anchor_path, negative_path)