### Import Dependencies

In [1]:
# import libraries
import os
import uuid
import cv2
import random
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf
from tensorflow.keras.metrics import Precision, Recall
from keras.saving import register_keras_serializable
import requests
import io
from PIL import Image

In [2]:
# gpus
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

In [3]:
print(len(f'{gpus} GPUS available.'))

148


### Augmentation

In [4]:
def data_aug(img):
    data = []
    for i in range(9):
        img = tf.image.stateless_random_brightness(img, max_delta=0.02, seed=(1,2))
        img = tf.image.stateless_random_contrast(img, lower=0.6, upper=1, seed=(1,3))
        img = tf.image.stateless_random_flip_left_right(img, seed=(np.random.randint(100),np.random.randint(100)))
        img = tf.image.stateless_random_jpeg_quality(img, min_jpeg_quality=90, max_jpeg_quality=100, seed=(np.random.randint(100),np.random.randint(100)))
        img = tf.image.stateless_random_saturation(img, lower=0.9,upper=1, seed=(np.random.randint(100),np.random.randint(100)))
            
        data.append(img)
    
    return data

In [5]:
for file_name in os.listdir('/kaggle/input/siamese-test/data/positive'):
    img_path = os.path.join('/kaggle/input/siamese-test/data/positive', file_name)
    img = cv2.imread(img_path)
    augmented_images = data_aug(img) 
    
    for image in augmented_images:
        cv2.imwrite(os.path.join('/kaggle/input/siamese-test/data/positive', '{}.jpg'.format(uuid.uuid1())), image.numpy())

### Get images

In [6]:
# list images in the directory
pos = tf.data.Dataset.list_files('/kaggle/input/siamese-test/data/positive' + '/*.jpg').take(300)
anchor = tf.data.Dataset.list_files('/kaggle/input/siamese-test/data/anchor' + '/*.jpg').take(300)
neg = tf.data.Dataset.list_files('/kaggle/input/siamese-test/data/negative' + '/*.jpg').take(300)

#### Scale and resize

In [7]:
# preprocess images
def preprocess(path):
    # read image
    byte_img = tf.io.read_file(path)
    img = tf.io.decode_jpeg(byte_img) # decode
    img = tf.image.resize(img, (100, 100)) # resize the imge to 100x100x3
    img = img / 255.0 # scale image to be between 0 to 1
    return img

#### Labelled dataset

In [8]:
positives = tf.data.Dataset.zip(anchor, pos, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor))))
negatives = tf.data.Dataset.zip(anchor, neg, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor))))
data = positives.concatenate(negatives)

### Build test and train partition

In [9]:
def preprocess_twin(input_img, validation_img, label):
    return (preprocess(input_img), preprocess(validation_img), label)

#### Dataloader pipeline

In [10]:
# map
data = data.map(preprocess_twin)
data = data.cache() # caching images
data = data.shuffle(buffer_size = 1024) # shuffle images

In [11]:
# training partition
train_data = data.take(round(len(data) * .7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [12]:
# test partition
test_data = data.skip(round(len(data) * .7))
test_data = data.take(round(len(data) * .3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

### Model

#### Embedding

In [13]:
def make_embedding():
    # input layer
    input_ = Input(shape = (100, 100, 3), name = 'input-image')
    
    # first block
    conv_1 = Conv2D(filters = 64, kernel_size = (10, 10), activation = 'relu')(input_)
    max_pool_1 = MaxPooling2D(64, (2, 2), padding = 'same')(conv_1)
    
    # second block
    conv_2 = Conv2D(filters = 128, kernel_size = (7, 7), activation = 'relu')(max_pool_1)
    max_pool_2 = MaxPooling2D(64, (2, 2), padding = 'same')(conv_2)
    
    # third block
    conv_3 = Conv2D(filters = 128, kernel_size = (4, 4), activation = 'relu')(max_pool_2)
    max_pool_3 = MaxPooling2D(64, (2, 2), padding = 'same')(conv_3)
    
    # final embedding block
    conv_4 = Conv2D(filters = 256, kernel_size = (4, 4), activation = 'relu')(max_pool_3)
    flat = Flatten()(conv_4)
    dense = Dense(units = 4096, activation = 'sigmoid')(flat)
    
    return Model(inputs = input_, outputs = dense, name = 'embedding')

#### Distance layer

In [14]:
@register_keras_serializable()
class L1Distance(Layer):
    def __init__(self, **kwargs): # init
        super().__init__() # inheritance
        
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding) # similarity calculation

### Siamese model

In [15]:
def siamese_model():
    # INPUTS
    input_img = Input(name = 'input_img', shape = (100, 100, 3)) # anchor image input
    validation_img = Input(name = 'valid_img', shape = (100, 100 , 3)) # validation image
    
    # combine Siamese model components
    siamese_dist_layer = L1Distance()
    siamese_dist_layer.__name = 'distance'
    embedding = make_embedding()
    distances = siamese_dist_layer(embedding(input_img), embedding(validation_img))
    
    # classification layer
    classifier = Dense(1, activation = 'sigmoid')(distances)
    
    return Model(inputs = [input_img, validation_img], outputs = classifier, name = 'SiameseNetwork')

In [16]:
siamese_model = siamese_model()

In [17]:
print(siamese_model.summary())

None


### Training

#### Loss and Optimizer

In [18]:
bce = tf.losses.BinaryCrossentropy() # loss function
optim = tf.keras.optimizers.Adam(learning_rate = 1e-4) # optimizer

#### Checkpoints

In [19]:
if os.path.isdir('/kaggle/working/training_checkpoints') == False:
    os.makedirs('/kaggle/working/training_checkpoints') # create dir if not present

In [20]:
checkpoint_dir = '/kaggle/working/training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(optimizer = optim, siamese_model = siamese_model) # save optimizer and model

#### Train step

In [21]:
@tf.function
def train_step(batch):
    # record operations
    with tf.GradientTape() as tape:
        X = batch[:2] # get anchor and positive/negative images
        y = batch[2] # label
        
        # forward pass
        y_hat = siamese_model(X, training = True)
        # calculate loss
        loss = bce(y, y_hat)
        
    # calculate gradients
    grad = tape.gradient(loss, siamese_model.trainable_variables)
    
    # calculate updated weights and apply to the model
    optim.apply_gradients(zip(grad, siamese_model.trainable_variables))
    
    return loss

#### Train

In [22]:
def train(data, epochs):
    # loop through the epochs
    for epoch in range(1, epochs + 1):
        print(f'\nEpoch: {epoch}/{epochs}')
        progbar = tf.keras.utils.Progbar(len(data))
        # loop through each batch
        for idx, batch in enumerate(data):
            # run train step
            loss = train_step(batch)
            progbar.update(idx + 1) # update progress bar
        # save checkpoint
        if epoch % 10 == 0:
            checkpoint.save(file_prefix = checkpoint_prefix)

In [23]:
epochs = 50

In [24]:
train(train_data, epochs)


Epoch: 1/50
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 218ms/step

Epoch: 2/50
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 189ms/step

Epoch: 3/50
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 189ms/step

Epoch: 4/50
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 187ms/step

Epoch: 5/50
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 188ms/step

Epoch: 6/50
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 189ms/step

Epoch: 7/50
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 190ms/step

Epoch: 8/50
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 191ms/step

Epoch: 9/50
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 192ms/step

Epoch: 10/50
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 193ms/step

Epoch: 11/50
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 193ms/step

Epoch: 12/50
[1m27/27[0m 

In [25]:
y_preds = []
y_trues = []
test_iter = test_data.as_numpy_iterator()
for i in range(len(test_data)):
    test_input, test_val, y_true = test_iter.next()
    
    y_hat = siamese_model.predict([test_input, test_val])
    
    y_preds.extend(y_hat)
    y_trues.extend(y_true)

I0000 00:00:1729642916.912063    1049 service.cc:145] XLA service 0x795560001be0 initialized for platform CUDA (this does not guarantee that XLA will be used). Devices:
I0000 00:00:1729642916.912105    1049 service.cc:153]   StreamExecutor device (0): Tesla T4, Compute Capability 7.5
I0000 00:00:1729642916.912109    1049 service.cc:153]   StreamExecutor device (1): Tesla T4, Compute Capability 7.5


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 24ms/step


I0000 00:00:1729642917.999045    1049 device_compiler.h:188] Compiled cluster using XLA!  This line is logged at most once for the lifetime of the process.


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 742ms/step


In [26]:
# post processing results of predictions
y_pred_labels = [1 if prediction > 0.5 else 0 for prediction in y_preds]
print(f'Prediction:\n{y_pred_labels}\nTrue:\n{y_trues}')

Prediction:
[0, 0, 1, 0, 0, 1, 1, 0, 0, 0, 0, 1, 1, 1, 1, 0, 1, 1, 1, 1, 0, 1, 1, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 1, 1, 0, 0, 1, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 0, 0, 1, 1, 0, 1, 1, 1, 1, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 0, 1, 0, 1, 1, 1, 0, 0, 1, 0, 1, 0, 1, 1, 1, 1, 0, 1, 0, 1, 1, 1, 1, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 1, 0, 1, 1, 0, 0, 1, 0, 1, 1, 0, 1, 1, 0, 1, 1, 0, 0, 1, 1, 0, 1, 1, 0, 1, 1, 1, 0, 1, 1, 0, 1, 0, 0, 0, 0, 1, 0, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 0, 0, 1, 1, 0]
True:
[0.0, 0.0, 1.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 1.0, 1.0, 0.0, 1.0, 1.0, 1.0, 1.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 1.0, 0.0, 0.0, 1.0, 1.0, 0.0, 1.0, 1.0, 1.0, 1.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, 1.0, 1.0, 0.0, 0.0, 1.0, 

In [27]:
# calcualte metrics
recall = Recall()
# recall value
recall.update_state(y_trues, y_preds)

In [28]:
# precision
precision = Precision()

# precision value 
precision.update_state(y_trues, y_preds)

In [29]:
print(f'Recall: {recall.result().numpy()}\nPrecision: {precision.result().numpy()}')

Recall: 1.0
Precision: 1.0


### Save and Load model

In [30]:
siamese_model.save('/kaggle/working/siamese.keras')

In [31]:
# load
model = tf.keras.models.load_model('/kaggle/working/siamese.keras', custom_objects={'L1Dist': L1Distance, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy})
model.summary()

##### Check the model after loading

In [32]:
y_preds = []
y_trues = []
test_iter = test_data.as_numpy_iterator()
for i in range(len(test_data)):
    test_input, test_val, y_true = test_iter.next()
    
    y_hat = siamese_model.predict([test_input, test_val])
    
    y_preds.extend(y_hat)
    y_trues.extend(y_true)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 22ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 22ms/step


In [33]:
# calcualte metrics
recall = Recall()
# recall value
recall.update_state(y_trues, y_preds)

In [34]:
# precision
precision = Precision()

# precision value 
precision.update_state(y_trues, y_preds)

In [35]:
y_trues_int = [int(x) for x in y_trues]

In [36]:
y_pred_labels = [1 if x > 0.5 else 0 for x in y_preds]

In [37]:
# accuracy
accuracy = tf.keras.metrics.Accuracy()
# accuracy value
accuracy.update_state(y_trues_int, y_pred_labels)

In [38]:
print(f'Precision: {precision.result().numpy()}\nRecall: {recall.result().numpy()}\nAccuracy: {accuracy.result().numpy() * 100}%')

Precision: 1.0
Recall: 1.0
Accuracy: 100.0%
