In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

# Import Libraries

In [None]:
import os
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"

import random
import numpy as np
import pandas as pd

import cv2
import matplotlib.pyplot as plt
from PIL import Image

import tensorflow as tf
from tensorflow.keras import layers, models, optimizers, losses, regularizers, metrics
from tensorflow.keras.utils import to_categorical, plot_model
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications.efficientnet import EfficientNetB7, preprocess_input

from mlxtend.plotting import plot_confusion_matrix
from sklearn.metrics import roc_curve, auc, classification_report, confusion_matrix
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, roc_auc_score

# Load Data

In [None]:
root_dir = "/kaggle/input/persons/CV_Signature_Dataset/"

In [None]:
train_csv = []
for folder in os.listdir(root_dir):
    for file in os.listdir(os.path.join(root_dir, folder, "Train")):
        if file.endswith(".csv"):
            train_csv.append(os.path.join(root_dir, folder, "Train", file))

In [None]:
test_csv = []
for folder in os.listdir(root_dir):
    for file in os.listdir(os.path.join(root_dir, folder, "Test")):
        if file.endswith(".csv"):
            test_csv.append(os.path.join(root_dir, folder, "Test", file))

In [None]:
train_triplets = []
for csv in train_csv:
    df = pd.read_csv(csv)
    real_images = df[df["label"] == "real"]["image_name"].tolist()
    forged_images = df[df["label"] == "forged"]["image_name"].tolist()

    for anchor in real_images:
        for image in real_images:
            anchor_image = os.path.join(root_dir, image[:7], "Train", image)
            positive_image = os.path.join(root_dir, image[:7], "Train", random.choice(real_images))
            negative_image = os.path.join(root_dir, image[:7], "Train", random.choice(forged_images))
            train_triplets.append((anchor_image, positive_image, negative_image))

In [None]:
test_triplets = []
for csv in test_csv:
    df = pd.read_csv(csv)
    real_images = df[df["label"] == "real"]["image_name"].tolist()
    forged_images = df[df["label"] == "forged"]["image_name"].tolist()

    for anchor in real_images:
        for image in real_images:
            anchor_image = os.path.join(root_dir, image[:7], "Test", image)
            positive_image = os.path.join(root_dir, image[:7], "Test", random.choice(real_images))
            negative_image = os.path.join(root_dir, image[:7], "Test", random.choice(forged_images))
            test_triplets.append((anchor_image, positive_image, negative_image))

In [None]:
print("Train Size:", len(train_triplets))
print("Test Size:", len(test_triplets))

# Visualization

In [None]:
def plot_images(images):
    plt.figure(figsize=(20, 6))

    for i in range(8):
        idx = np.random.randint(len(images))
        plt.subplot(3, 8, i + 1)
        img = Image.open(images[idx][0])
        plt.imshow(img)
        plt.axis("off")
        plt.title("Anchor")

        plt.subplot(3, 8, i + 9)
        img = Image.open(images[idx][1])
        plt.imshow(img)
        plt.axis("off")
        plt.title("Positive")

        plt.subplot(3, 8, i + 17)
        img = Image.open(images[idx][2])
        plt.imshow(img)
        plt.axis("off")
        plt.title("Negative")

    plt.show()

In [None]:
plot_images(train_triplets)

# Model

In [None]:
def read_image(index):
    path = os.path.join(root_dir, index)
    image = cv2.imread(path)
    image = cv2.resize(image, (128, 128))
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    return image

In [None]:
def get_batch(triplet_list, batch_size):
    batch_steps = len(triplet_list) // batch_size
    
    for i in range(batch_steps+1):
        anchor   = []
        positive = []
        negative = []
        
        j = i * batch_size
        while j < (i + 1) * batch_size and j < len(triplet_list):
            a, p, n = triplet_list[j]
            anchor.append(read_image(a))
            positive.append(read_image(p))
            negative.append(read_image(n))
            j += 1
            
        anchor = np.array(anchor)
        positive = np.array(positive)
        negative = np.array(negative)
        
        anchor = preprocess_input(anchor)
        positive = preprocess_input(positive)
        negative = preprocess_input(negative)
        
        yield ([anchor, positive, negative])

In [None]:
pretrained_model = EfficientNetB7(weights="imagenet",
                            input_shape=(128, 128, 3),
                            include_top=False,
                            pooling="avg")

for i in range(len(pretrained_model.layers) - 25):
    pretrained_model.layers[i].trainable = False

encode_model = models.Sequential([
    pretrained_model,
    layers.Flatten(),
    layers.Dense(512, activation='relu'),
    layers.BatchNormalization(),
    layers.Dropout(0.3),
    layers.Dense(256, activation="relu"),
    layers.BatchNormalization(),
    layers.Dropout(0.3),
    layers.Dense(128, activation="relu"),
    layers.BatchNormalization(),
    layers.Dropout(0.3),
    layers.Dense(128)
], name="Embedding")

In [None]:
encode_model.summary()

In [None]:
class DistanceLayer(layers.Layer):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def call(self, anchor, positive, negative):
        ap_distance = tf.reduce_sum(tf.square(anchor - positive), -1)
        an_distance = tf.reduce_sum(tf.square(anchor - negative), -1)
        return ap_distance, an_distance

In [None]:
anchor_input = layers.Input(name='anchor', shape=(128, 128, 3), dtype=tf.uint8)
positive_input = layers.Input(name='positive', shape=(128, 128, 3), dtype=tf.uint8)
negative_input = layers.Input(name='negative', shape=(128, 128, 3), dtype=tf.uint8)

distances = DistanceLayer()(
    encode_model(preprocess_input(anchor_input)),
    encode_model(preprocess_input(positive_input)),
    encode_model(preprocess_input(negative_input))
)

siamese_net = models.Model(
    inputs=[anchor_input,
            positive_input,
            negative_input],
    outputs=distances,
    name = "Siamese_Network"
)

In [None]:
plot_model(siamese_net, show_shapes=True, show_layer_names=True)

In [None]:
class SiameseModel(models.Model):
    def __init__(self, siamese_network, margin=1.0):
        super(SiameseModel, self).__init__()
        self.siamese_network = siamese_network
        self.margin = margin
        self.loss_tracker = metrics.Mean(name="loss")
        self.accuracy_tracker = metrics.Mean(name="accuracy")

    def call(self, inputs):
        return self.siamese_network(inputs)

    def train_step(self, data):
        with tf.GradientTape() as tape:
            loss = self._compute_loss(data)
            acc = self._compute_accuracy(data)
            
        gradients = tape.gradient(loss, self.siamese_network.trainable_weights)
        self.optimizer.apply_gradients(zip(gradients, self.siamese_network.trainable_weights))
        
        self.loss_tracker.update_state(loss)
        self.accuracy_tracker.update_state(acc)

        return {"loss": self.loss_tracker.result(), "accuracy": self.accuracy_tracker.result()}

    def test_step(self, data):
        loss = self._compute_loss(data)
        acc = self._compute_accuracy(data)

        self.loss_tracker.update_state(loss)
        self.accuracy_tracker.update_state(acc)

        return {"loss": self.loss_tracker.result(), "accuracy": self.accuracy_tracker.result()}

    def _compute_loss(self, data):
        ap_distance, an_distance = self.siamese_network(data)
        loss = tf.maximum(ap_distance - an_distance + self.margin, 0.0)
        return loss

    def _compute_accuracy(self, data):
        ap_distance, an_distance = self.siamese_network(data)
        return tf.reduce_mean(tf.cast(ap_distance < an_distance, tf.float32))

    @property
    def metrics(self):
        return [self.loss_tracker, self.accuracy_tracker]

In [None]:
siamese_model = SiameseModel(siamese_net)
siamese_model.compile(optimizer=optimizers.Adam(learning_rate=0.001))

In [None]:
def test_on_triplets(batch_size):
    pos_scores = [] 
    neg_scores = []

    for data in get_batch(test_triplets, batch_size=batch_size):
        prediction = siamese_model.predict(data, verbose=0)
        pos_scores += list(prediction[0])
        neg_scores += list(prediction[1])
    
    accuracy = np.sum(np.array(pos_scores) < np.array(neg_scores)) / len(pos_scores)
    return accuracy

# Train

In [None]:
epochs = 10
batch_size = 128

train_loss = []
train_accuracy = []
val_loss = []
val_accuracy = []

for epoch in range(epochs):
    epoch_train_loss = []
    epoch_train_accuracy = []
    
    for data in get_batch(train_triplets, batch_size=batch_size):
        results = siamese_model.train_on_batch(data)
        epoch_train_loss.append(results[0])
        epoch_train_accuracy.append(results[1])
    
    avg_train_loss = sum(epoch_train_loss) / len(epoch_train_loss)
    avg_train_accuracy = sum(epoch_train_accuracy) / len(epoch_train_accuracy)
    train_loss.append(avg_train_loss)
    train_accuracy.append(avg_train_accuracy)
    
    epoch_val_loss = []
    epoch_val_accuracy = []
    
    for data in get_batch(test_triplets, batch_size=batch_size):
        results = siamese_model.test_on_batch(data)
        epoch_val_loss.append(results[0])
        epoch_val_accuracy.append(results[1])
    
    avg_val_loss = sum(epoch_val_loss) / len(epoch_val_loss)
    avg_val_accuracy = sum(epoch_val_accuracy) / len(epoch_val_accuracy)
    val_loss.append(avg_val_loss)
    val_accuracy.append(avg_val_accuracy)

    print(f"Epoch: {epoch} | Train Loss: {avg_train_loss:.5f} | Validation Loss: {avg_val_loss:.5f} | Train Accuracy: {avg_train_accuracy:.5f} | Test Accuracy: {avg_val_accuracy:.5f}")

# Results

In [None]:
plt.figure(figsize=(8, 6))
plt.plot(train_loss)
plt.plot(val_loss)
plt.legend(["Train Loss", "Validation Loss"])
plt.show()

In [None]:
plt.figure(figsize=(8, 6))
plt.plot(train_accuracy)
plt.plot(val_accuracy)
plt.legend(["Train Accuracy", "Validation Accuracy"])
plt.show()

# Test

In [None]:
pos_list = []
neg_list = []

for data in get_batch(test_triplets, batch_size=256):
    anchor, positive, negative = data

    anchor_pred = encode_model.predict(anchor, verbose=0)
    pos_pred = encode_model.predict(positive, verbose=0)
    neg_pred = encode_model.predict(negative, verbose=0)

    pos_distance = np.sum(np.square(anchor_pred - pos_pred), axis=-1)
    pos_prediction = np.where(pos_distance <= 1.3, 0, 1)
    pos_list.append(pos_prediction)
    
    neg_distance = np.sum(np.square(anchor_pred - neg_pred), axis=-1)
    neg_prediction = np.where(neg_distance <= 1.3, 0, 1)
    neg_list.append(neg_prediction)

In [None]:
test_labels = np.array([0] * len(pos_list[0]) + [1] * len(neg_list[0]))
pred_labels = np.append(pos_list, neg_list)

In [None]:
model_precision_score = precision_score(test_labels, pred_labels, average='weighted')
model_f1_score = f1_score(test_labels, pred_labels, average='weighted')
model_recall_score = recall_score(test_labels, pred_labels, average='weighted')
model_accuracy_score = accuracy_score(test_labels, pred_labels)

print(f"Precision Score = {model_precision_score * 100:.2f}%")
print(f"F1 Score = {model_f1_score * 100:.2f}%")
print(f"Recall Score = {model_recall_score * 100:.2f}%")
print(f"Accuracy Score = {model_accuracy_score * 100:.2f}%")

In [None]:
cm = confusion_matrix(test_labels, pred_labels)
fig, ax = plot_confusion_matrix(conf_mat=cm, show_absolute=True, show_normed=True, colorbar=True)
plt.show()