In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

# Import Libraries

In [None]:
import os
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"

import random
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras import layers, optimizers, models, callbacks
from tensorflow.keras.datasets import mnist
from tensorflow.keras.utils import plot_model

# Load Data

In [None]:
(train_images, train_labels), (test_images, test_labels) = mnist.load_data()

In [None]:
#train_images = train_images / 255
#test_images = test_images / 255

In [None]:
X_train = train_images[:50000]
X_valid = train_images[50000:]

y_train = train_labels[:50000]
y_valid = train_labels[50000:]

X_test = test_images
y_test = test_labels

# Create Pairs

In [None]:
def make_pairs(x, y):
    digits = [np.where(y == i)[0] for i in range(10)]
    pairs = []
    labels = []

    for idx1, (x1, label1) in enumerate(zip(x, y)):
        idx2 = random.choice(digits[label1])
        pairs.extend([[x1, x[idx2]], [x1, x[random.choice(digits[random.choice([i for i in range(10) if i != label1])])]]])
        labels.extend([0, 1])

    return np.array(pairs), np.array(labels).astype("float32")

In [None]:
X_train, y_train = make_pairs(X_train, y_train)
X_valid, y_valid = make_pairs(X_valid, y_valid)
X_test, y_test = make_pairs(X_test, y_test)

In [None]:
X_train1 = X_train[:, 0]
X_train2 = X_train[:, 1]

X_valid1 = X_valid[:, 0]
X_valid2 = X_valid[:, 1]

X_test1 = X_test[:, 0]
X_test2 = X_test[:, 1]

# Visualization

In [None]:
def plot_pairs(pairs, labels):
    fig, axes = plt.subplots(4, 4, figsize=(10, 10))
    for i, ax in enumerate(axes.ravel()):
        ax.imshow(tf.concat([pairs[i][0], pairs[i][1]], axis=1), cmap="gray")
        ax.axis("off")
        ax.set_title(f"Distance: {labels[i]}")

    plt.show()

In [None]:
plot_pairs(X_train, y_train)

# Model

In [None]:
def euclidean_distance(vects):
    return tf.sqrt(tf.maximum(tf.reduce_sum(tf.square(vects[0] - vects[1]), axis=1, keepdims=True), tf.keras.backend.epsilon()))

In [None]:
input1 = layers.Input((28, 28, 1))
conv_1_1 = layers.Conv2D(8, kernel_size=(2, 2), activation="relu")(input1)
avgpool_1_1 = layers.AveragePooling2D((2, 2))(conv_1_1)
conv_1_2 = layers.Conv2D(16, kernel_size=(2, 2), activation="relu")(avgpool_1_1)
avgpool_1_2 = layers.AveragePooling2D((2, 2))(conv_1_2)
conv_1_3 = layers.Conv2D(32, kernel_size=(2, 2), activation="relu")(avgpool_1_2)
avgpool_1_3 = layers.AveragePooling2D((2, 2))(conv_1_3)
flat_1_1 = layers.Flatten()(avgpool_1_3)
batchnorm_1_1 = layers.BatchNormalization()(flat_1_1)
dense_1_1 = layers.Dense(10, activation="tanh")(batchnorm_1_1)

input2 = layers.Input((28, 28, 1))
conv_2_1 = layers.Conv2D(8, kernel_size=(2, 2), activation="relu")(input2)
avgpool_2_1 = layers.AveragePooling2D((2, 2))(conv_2_1)
conv_2_2 = layers.Conv2D(16, kernel_size=(2, 2), activation="relu")(avgpool_2_1)
avgpool_2_2 = layers.AveragePooling2D((2, 2))(conv_2_2)
conv_2_3 = layers.Conv2D(32, kernel_size=(2, 2), activation="relu")(avgpool_2_2)
avgpool_2_3 = layers.AveragePooling2D((2, 2))(conv_2_3)
flat_2_1 = layers.Flatten()(avgpool_2_3)
batchnorm_2_1 = layers.BatchNormalization()(flat_2_1)
dense_2_1 = layers.Dense(10, activation="tanh")(batchnorm_2_1)

merge_layer = layers.Lambda(euclidean_distance)([dense_1_1, dense_2_1])
batchnorm_1 = layers.BatchNormalization()(merge_layer)
output_layer = layers.Dense(1, activation="sigmoid")(batchnorm_1)

model = models.Model(inputs=[input1, input2], outputs=output_layer)

In [None]:
model.summary()

In [None]:
plot_model(model, show_shapes=True)

In [None]:
def loss(margin):
    return lambda y_true, y_pred: tf.reduce_mean((1 - y_true) * tf.square(y_pred) + y_true * tf.square(tf.maximum(margin - y_pred, 0)))

In [None]:
model.compile(loss=loss(margin=1),
                optimizer=optimizers.RMSprop(),
                metrics=["accuracy"])

# Train

In [None]:
history = model.fit(
    x=[X_train1, X_train2],
    y=y_train,
    validation_data=([X_valid1, X_valid2], y_valid),
    batch_size=32,
    epochs=25
)

# Results

In [None]:
plt.figure()
plt.plot(history.history["accuracy"])
plt.plot(history.history["val_accuracy"])
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.legend(["train", "valid"])
plt.title("Accuracy Curve")
plt.ylim([0, 1])
plt.show()

In [None]:
plt.figure()
plt.plot(history.history["loss"])
plt.plot(history.history["val_loss"])
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend(["train", "valid"])
plt.title("Loss Curve")
plt.ylim([0, 1])
plt.show()

# Test

In [None]:
test_loss, test_acc = model.evaluate([X_test1, X_test2], y_test)
print(f"Test Loss: {test_loss}\nTest Accuracy: {test_acc}")

In [None]:
model_predictions = model.predict([X_test1, X_test2])

In [None]:
def plot_predictions(pairs, labels, preds):
    fig, axes = plt.subplots(4, 4, figsize=(10, 10))
    for i, ax in enumerate(axes.ravel()):
        ax.imshow(tf.concat([pairs[i][0], pairs[i][1]], axis=1), cmap="gray")
        ax.axis("off")
        ax.set_title(f"Distance True: {labels[i]}\n Distance Pred: {preds[i][0]:.4f}")

    plt.show()

In [None]:
plot_predictions(X_test, y_test, model_predictions)