<a href="https://colab.research.google.com/github/toanpt74/COLAB_RD/blob/main/Siamese_MNIST.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import tensorflow as tf
import numpy as np
import random
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import Layer
from matplotlib import pyplot as plt
import cv2
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '1'
epochs = 30
batch_size = 16
margin = 1  # Margin for constrastive loss.

(x_train_val, y_train_val), (x_test, y_test) = keras.datasets.mnist.load_data()

# Change the data type to a floating point format
x_train_val = x_train_val.astype("float32")
x_test = x_test.astype("float32")

# Keep 50% of train_val  in validation set
x_train, x_val = x_train_val[:30000], x_train_val[30000:]
y_train, y_val = y_train_val[:30000], y_train_val[30000:]
del x_train_val, y_train_val

def make_pairs(x, y):
    """Creates a tuple containing image pairs with corresponding label.

    Arguments:
        x: List containing images, each index in this list corresponds to one image.
        y: List containing labels, each label with datatype of `int`.

    Returns:
        Tuple containing two numpy arrays as (pairs_of_samples, labels),
        where pairs_of_samples' shape is (2len(x), 2,n_features_dims) and
        labels are a binary array of shape (2len(x)).
    """
    num_classes = max(y) + 1
    digit_indices = [np.where(y == i)[0] for i in range(num_classes)]
    pairs = []
    labels = []
    for idx1 in range(len(x)):
        # add a matching example
        x1 = x[idx1]
        label1 = y[idx1]
        idx2 = random.choice(digit_indices[label1])
        x2 = x[idx2]
        pairs += [[x1, x2]]
        labels += [1]
        # add a non-matching example
        label2 = random.randint(0, num_classes - 1)
        while label2 == label1:
            label2 = random.randint(0, num_classes - 1)
        idx2 = random.choice(digit_indices[label2])
        x2 = x[idx2]
        pairs += [[x1, x2]]
        labels += [0]
    return np.array(pairs), np.array(labels).astype("float32")
# make train pairs
pairs_train, labels_train = make_pairs(x_train, y_train)
# make validation pairs
pairs_val, labels_val = make_pairs(x_val, y_val)
# make test pairs
pairs_test, labels_test = make_pairs(x_test, y_test)
x_train_1 = pairs_train[:, 0]  # x_train_1.shape is (60000, 28, 28)
x_train_2 = pairs_train[:, 1]
x_val_1 = pairs_val[:, 0]  # x_val_1.shape = (60000, 28, 28)
x_val_2 = pairs_val[:, 1]
x_test_1 = pairs_test[:, 0]  # x_test_1.shape = (20000, 28, 28)
x_test_2 = pairs_test[:, 1]

def visualize(pairs, labels, to_show=6, num_col=3, predictions=None, test=False, w=5, h=5):
    """Creates a plot of pairs and labels, and prediction if it's test dataset.

    Arguments:
        pairs: Numpy Array, of pairs to visualize, having shape
               (Number of pairs, 2, 28, 28).
        to_show: Int, number of examples to visualize (default is 6)
                `to_show` must be an integral multiple of `num_col`.
                 Otherwise it will be trimmed if it is greater than num_col,
                 and incremented if if it is less then num_col.
        num_col: Int, number of images in one row - (default is 3)
                 For test and train respectively, it should not exceed 3 and 7.
        predictions: Numpy Array of predictions with shape (to_show, 1) -
                     (default is None)
                     Must be passed when test=True.
        test: Boolean telling whether the dataset being visualized is
              train dataset or test dataset - (default False).

    Returns:
        None.
    """

    # Define num_row
    # If to_show % num_col != 0
    #    trim to_show,
    #       to trim to_show limit num_row to the point where
    #       to_show % num_col == 0
    #
    # If to_show//num_col == 0
    #    then it means num_col is greater then to_show
    #    increment to_show
    #       to increment to_show set num_row to 1
    num_row = to_show // num_col if to_show // num_col != 0 else 1

    # `to_show` must be an integral multiple of `num_col`
    #  we found num_row and we have num_col
    #  to increment or decrement to_show
    #  to make it integral multiple of `num_col`
    #  simply set it equal to num_row * num_col
    to_show = num_row * num_col

    # Plot the images
    fig, axes = plt.subplots(num_row, num_col, figsize=(w, h))
    for i in range(to_show):

        # If the number of rows is 1, the axes array is one-dimensional
        if num_row == 1:
            ax = axes[i % num_col]
        else:
            ax = axes[i // num_col, i % num_col]

        ax.imshow(tf.concat([pairs[i][0], pairs[i][1]], axis=1), cmap="gray")
        ax.set_axis_off()
        if test:
            ax.set_title("True: {} | Pred: {:.5f}".format(labels[i], predictions[i][0]))
        else:
            ax.set_title("Label: {}".format(labels[i]))
    # if test:
    #     plt.tight_layout(rect=(0, 0, 1.9, 1.9), w_pad=0.0)
    # else:
    #     plt.tight_layout(rect=(0, 0, 1.5, 1.5))
    plt.show()

########## new custom design layer ###############################################

class L1_dist(Layer):
    def __init__(self, **kwargs):
        super().__init__()

    def call(self, input_embed, val_embed):
        sum_square = tf.math.reduce_sum(tf.math.square(input_embed - val_embed), axis=1, keepdims=True)
        return tf.math.sqrt(tf.math.maximum(sum_square, tf.keras.backend.epsilon()))

input = layers.Input((28, 28, 1))
x = tf.keras.layers.BatchNormalization()(input)
x = layers.Conv2D(4, (5, 5), activation="tanh")(x)
x = layers.AveragePooling2D(pool_size=(2, 2))(x)
x = layers.Conv2D(16, (5, 5), activation="tanh")(x)
x = layers.AveragePooling2D(pool_size=(2, 2))(x)
x = layers.Flatten()(x)

x = tf.keras.layers.BatchNormalization()(x)
x = layers.Dense(10, activation="tanh")(x)
embedding_network = keras.Model(input, x)

input_1 = layers.Input((28, 28, 1))
input_2 = layers.Input((28, 28, 1))

# As mentioned above, Siamese Network share weights between
# tower networks (sister networks). To allow this, we will use
# same embedding network for both tower networks.
tower_1 = embedding_network(input_1)
tower_2 = embedding_network(input_2)

siamese_layer = L1_dist()
siamese_layer._name = 'distance'
###################### Here is the change ################################
distances = siamese_layer(tower_1, tower_2)
# merge_layer = layers.Lambda(euclidean_distance)([tower_1, tower_2])
##########################################################################
normal_layer = tf.keras.layers.BatchNormalization()(distances)
output_layer = layers.Dense(1, activation="sigmoid")(normal_layer)
siamese = keras.Model(inputs=[input_1, input_2], outputs=output_layer)


def loss(margin=1):
    """Provides 'constrastive_loss' an enclosing scope with variable 'margin'.

  Arguments:
      margin: Integer, defines the baseline for distance for which pairs
              should be classified as dissimilar. - (default is 1).

  Returns:
      'constrastive_loss' function with data ('margin') attached.
  """

    # Contrastive loss = mean( (1-true_value) * square(prediction) +
    #                         true_value * square( max(margin-prediction, 0) ))
    def contrastive_loss(y_true, y_pred, margin=1):
        """Calculates the constrastive loss.

      Arguments:
          y_true: List of labels, each label is of type float32.
          y_pred: List of predictions of same length as of y_true,
                  each label is of type float32.

      Returns:
          A tensor containing constrastive loss as floating point value.
      """

        square_pred = tf.math.square(y_pred)
        margin_square = tf.math.square(tf.math.maximum(margin - (y_pred), 0))
        return tf.math.reduce_mean(
            (1 - y_true) * square_pred + (y_true) * margin_square
        )

    return contrastive_loss

# siamese.compile(loss=loss(margin=margin), optimizer="RMSprop", metrics=["accuracy"])
# siamese.summary()
#
# history = siamese.fit(
#     [x_train_1, x_train_2],
#     labels_train,
#     validation_data=([x_val_1, x_val_2], labels_val),
#     batch_size=batch_size,
#     epochs=epochs,
# )

# print("[INFO] saving siamese model...")
# siamese.save('models/best_model_new' + '.h5')
# # plot the training history
# print("[INFO] plotting training history...")

#Load model


def Predict(x_test_1, x_test_2):
    class L1_dist(Layer):
        def __init__(self, **kwargs):
            super().__init__()

        def call(self, input_embed, val_embed):
            sum_square = tf.math.reduce_sum(tf.math.square(input_embed - val_embed), axis=1, keepdims=True)
            return tf.math.sqrt(tf.math.maximum(sum_square, tf.keras.backend.epsilon()))

    def contrastive_loss(y_true, y_pred, margin=1):
        """Calculates the constrastive loss.
      Arguments:
          y_true: List of labels, each label is of type float32.
          y_pred: List of predictions of same length as of y_true,
                  each label is of type float32.

      Returns:
          A tensor containing constrastive loss as floating point value.
      """
        square_pred = tf.math.square(y_pred)
        margin_square = tf.math.square(tf.math.maximum(margin - (y_pred), 0))
        return tf.math.reduce_mean(
            (1 - y_true) * square_pred + (y_true) * margin_square
        )

    print("[INFO] loading siamese model...")
    model = tf.keras.models.load_model('models/best_model_new.h5',
                                       custom_objects={'L1_dist': L1_dist, 'contrastive_loss': contrastive_loss})
    predictions = model.predict([x_test_1, x_test_2])
    visualize(pairs_test, labels_test, to_show=10, predictions=predictions, test=True, w = 5, h = 5)
    #input_img = x_test[0] / 255.0
    # for j in range(1,10):
    #     #for j in range(2):
    #         #input_img = x_test[i] / 255.0
    #         validation_img = x_test[j] / 255.0
    #
    #         predictions = model.predict(list(np.expand_dims([input_img, validation_img], axis=1)))
    #         print(np.round(predictions[0][0], 7))
    #         print(predictions)
    #         # visualize(pairs_test, labels_test, to_show=1, predictions=predictions, test=True)
    #         # print('shape 0 : ', x_test[0].shape)
    #         # print('shape 1 : ', x_test[j].shape)
    #         print('label 0 : ', y_test[0])
    #         print('label 1 : ', y_test[j])
    #         cv2.namedWindow('image1', cv2.WINDOW_FREERATIO)
    #         cv2.namedWindow('image2', cv2.WINDOW_FREERATIO)
    #         cv2.imshow('image1', x_test[i])
    #         cv2.imshow('image2', x_test[j])
    #         cv2.waitKey(0)
    #
    # cv2.destroyAllWindows()
Predict(x_test_1, x_test_2)

def Create_Pairs_For_Predict(index, x, y):
    num_classes = 10
    digit_indices = [np.where(y == i)[0] for i in range(num_classes)]
    pairs_test = []
    labels_test = []
    d = 0
    img = x[index]
    for idx1 in range(len(x)):
        label1 = y[index]
        idx2 = random.choice(digit_indices[label1])
        x2 = x_train[idx2]
        pairs_test += [[img, x2]]
        labels_test += [1]
        # add a non-matching example
        label2 = random.randint(0, num_classes - 1)
        while label2 == label1:
            label2 = random.randint(0, num_classes - 1)
        idx2 = random.choice(digit_indices[label2])
        x2 = x[idx2]
        pairs_test += [[img, x2]]
        labels_test += [0]
        d = d + 1
        if d == 10:
            break
    return np.array(pairs_test), np.array(labels_test).astype("float32")

# pairs_test, labels_test = Create_Pairs_For_Predict(1, x_train, y_train)
# x_test_1 = pairs_test[:, 0]  # x_test_1.shape = (20000, 28, 28)
# x_test_2 = pairs_test[:, 1]
# Predict(x_test_1, x_test_2)
#
# #visualize(pairs_test,labels_test,to_show=12, num_col=3)
#
# # predictions = siamese.predict([x_test_1, x_test_2])
# # visualize(pairs_test, labels_test, to_show=15, predictions=predictions, test=True)
#
# num_classes=10
# x = x_train[1]
# num_classes = 10
# digit_indices = [np.where(y_train == i)[0] for i in range(num_classes)]
# pairs_test=[]
# labels_test=[]
# d=0
# num_classes=10
# for idx1 in range(len(x_train)):
#         # add a matching example
#         x1 = x
#         label1 = y_train[1]
#         idx2 = random.choice(digit_indices[label1])
#         x2 = x_train[idx2]
#         pairs_test += [[x, x2]]
#         labels_test += [1]
#         # add a non-matching example
#         label2 = random.randint(0, num_classes - 1)
#         while label2 == label1:
#             label2 = random.randint(0, num_classes - 1)
#         idx2 = random.choice(digit_indices[label2])
#         x2 = x_train[idx2]
#         pairs_test += [[x, x2]]
#         labels_test += [0]
#         d = d+1
#         if d == 10:
#           break
#
# visualize(pairs_test,labels_test,to_show=10, num_col=3)