In [None]:
import cv2
import random
import numpy as np
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split

import tensorflow as tf
import tensorflow.keras.backend as K
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.layers import Input, Rescaling, Conv2D, \
                                    Dropout, MaxPooling2D,\
                                    Flatten, Dense, ReLU, Lambda

In [None]:
physical_devices = tf.config.experimental.list_physical_devices('GPU')
if len(physical_devices) > 0:
    print("We got a GPU")
    for gpu_instance in physical_devices: 
        tf.config.experimental.set_memory_growth(gpu_instance, True)
    if len(physical_devices) > 1:
        strategy = tf.distribute.MirroredStrategy()
        print('DEVICES AVAILABLE: {}'.format(strategy.num_replicas_in_sync))
else:
    print("Sorry, no GPU for you...")

!nvidia-smi

In [None]:
width = 200
height = 150
channels = 1
input_shape = (height, width, channels)

In [None]:
def load_data():
    X = np.load("/kaggle/input/signatures-numpy-no-split/X.npy")
    print(X.shape)
    y = np.load("/kaggle/input/signatures-numpy-no-split/y.npy")
    print(y.shape)

    X_t, X_test, y_t, y_test = train_test_split(X, y, test_size = 0.3, random_state = None)
    print(X_t.shape)
    print(y_t.shape)
    print(X_test.shape)
    print(y_test.shape)

    del X
    del y

    X_train, X_val, y_train, y_val = train_test_split(X_t, y_t, test_size = 0.2, random_state = None)
    print(X_train.shape)
    print(y_train.shape)
    print(X_val.shape)
    print(y_val.shape)

    del X_t
    del y_t

    return X_train, y_train, X_val, y_val, X_test, y_test

X_train, y_train, X_val, y_val, X_test, y_test = load_data()

## Data visualization

In [None]:
def visualize(pairs, labels, to_show=6, num_col=3, predictions=None, test=False):
    
    num_row = to_show // num_col if to_show // num_col != 0 else 1

    to_show = num_row * num_col

    # Plot the images
    fig, axes = plt.subplots(num_row, num_col, figsize=(5, 5))
    
    for i in range(to_show):
        # If the number of rows is 1, the axes array is one-dimensional
        if num_row == 1:
            ax = axes[i % num_col]
        else:
            ax = axes[i // num_col, i % num_col]
        
        n = random.randint(0, pairs.shape[0])
        print(f"Index to show: {n}")
        ax.imshow(np.concatenate([pairs[n][0], pairs[n][1]], axis=1), cmap="gray")
        ax.set_axis_off()
        if test:
            ax.set_title("True: {} | Pred: {:.5f}".format(labels[n], predictions[n][0]))
        else:
            ax.set_title("Label: {}".format(labels[n]))
    if test:
        plt.tight_layout(rect=(0, 0, 1.9, 1.9), w_pad=0.0)
    else:
        plt.tight_layout(rect=(0, 0, 1.5, 1.5))
    plt.show()


### Visualize train data

In [None]:
visualize(X_train[:-1], y_train[:-1], to_show = 4, num_col = 4)

In [None]:
visualize(X_val[:-1], y_val[:-1], to_show = 4, num_col = 4)

### Visualize test data

In [None]:
visualize(X_test[:-1], y_test[:-1], to_show = 4, num_col = 4)

## Siamese Network

In [None]:
def siamese_network(input_shape):
    input = Input(input_shape)

    x = Rescaling(1/255)(input)

    x = Conv2D(64, (11, 11), activation = "relu")(input)
    x = Lambda(tf.nn.local_response_normalization)(x)
    x = MaxPooling2D(pool_size = (2, 2))(x)

    x = Conv2D(128, (7, 7), activation = "relu")(x)
    x = Lambda(tf.nn.local_response_normalization)(x)
    x = MaxPooling2D(pool_size = (2, 2))(x)
    x = Dropout(0.3)(x)

    x = Conv2D(256, (5, 5), activation = "relu")(x)

    x = Conv2D(512, (3, 3), activation = "relu")(x)
    x = MaxPooling2D(pool_size = (2, 2))(x)
    x = Dropout(0.3)(x)

    x = Dense(1024, activation = "relu")(x)
    x = Dropout(0.5)(x)

    x = Flatten()(x)

    x = Dense(128, activation = "relu")(x)

    model = Model(input, x)

    return model

def euclidean_distance(vecs):
    (imgA, imgB) = vecs
    sum_square = K.sum(K.square(imgA - imgB), axis = 1, keepdims=True)
    return K.sqrt(K.maximum(sum_square, K.epsilon()))

In [None]:
if len(physical_devices) > 1:
    with strategy.scope():
        imageA = Input(input_shape)
        imageB = Input(input_shape)

        siamese = siamese_network(input_shape)

        modelA = siamese(imageA)
        modelB = siamese(imageB)

        distance = Lambda(euclidean_distance, output_shape=(1,))([modelA, modelB])
        
        output = Dense(1, activation = "sigmoid")(distance)

        model = Model(inputs = [imageA, imageB], outputs = output)

        model.compile(loss = "binary_crossentropy", optimizer = "adam", metrics = ["accuracy"])
else:
    imageA = Input(input_shape)
    imageB = Input(input_shape)

    siamese = siamese_network(input_shape)

    modelA = siamese(imageA)
    modelB = siamese(imageB)

    distance = Lambda(euclidean_distance, output_shape=(1,))([modelA, modelB])
    
    output = Dense(1, activation = "sigmoid")(distance)

    model = Model(inputs = [imageA, imageB], outputs = output)

    model.compile(loss = "binary_crossentropy", optimizer = "adam", metrics = ["accuracy"])

model.summary()

In [None]:
batch_size = 64
epochs = 20
early_stopping = EarlyStopping(monitor = "val_loss", patience = 5, verbose = 1)

history = model.fit([X_train[:, 0, :, :], X_train[:, 1, :, :]], y_train,
                    validation_data = ([X_val[:, 0, :, :], X_val[:, 1, :, :]], y_val),
                    batch_size = batch_size,
                    epochs = epochs,
                    callbacks = [early_stopping],
                    verbose = 1)

In [None]:
def plt_metric(history, metric, title, has_valid=True):
    
    plt.plot(history[metric])
    if has_valid:
        plt.plot(history["val_" + metric])
        plt.legend(["train", "validation"], loc="upper left")
    plt.title(title)
    plt.ylabel(metric)
    plt.xlabel("epoch")
    plt.show()


# Plot the accuracy
plt_metric(history=history.history, metric="accuracy", title="Model accuracy")

# Plot the loss
plt_metric(history=history.history, metric="loss", title="Binary Crossentropy Loss")


In [None]:
results = model.evaluate([X_test[:, 0, :, :], X_test[:, 1, :, :]], y_test)
print("test loss, test acc:", results)

In [None]:
predictions = model.predict([X_test[:, 0, :, :], X_test[:, 1, :, :]])
visualize(X_test[:-1], y_test[:-1], to_show=3, predictions=predictions, test=True)

In [None]:
model.save("/kaggle/working/signatures_siamese_64_512_lrn_bcrloss.keras")