In [40]:
import os
import numpy as np
import torch
from torch.utils.data import Dataset, dataloader
from PIL import Image
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tqdm import tqdm
import time
import random
import matplotlib.pyplot as plt
import time
from tensorflow.keras import Input, Sequential, Model
from tensorflow.keras import backend as K
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Lambda, BatchNormalization, Activation, \
    Dropout, Layer
from tensorflow.keras.optimizers import Adam, schedules, SGD
from tensorflow.keras.regularizers import l2
from tensorflow.keras import initializers


In [41]:
from google.colab import drive 
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [42]:
dirName = "/content/drive/MyDrive/DeepLearning2Data/lfw2"
train_path = "/content/drive/MyDrive/pairsDevTrain.txt"
test_path = "/content/drive/MyDrive/pairsDevTrain.txt"

In [43]:
def open_file(path):
  examples_true = []
  examples_false = []
  with open(path) as train_file:
    for line in train_file:
      curr_line = line.split("\t")
      curr_line[-1] = curr_line[-1][:-1]
      if len(curr_line) == 3:
        examples_true.append(curr_line)
      if len(curr_line)==4:
        examples_false.append(curr_line)
  return (examples_false , examples_true)


In [44]:
def get_image_pairs(main_directory, data_set, true_examples=True):
  images = [np.array([None for x in range(2)]) for _ in range(len(data_set))]
  main_dir_str = main_directory+"/"
  for idx, i in tqdm(enumerate(data_set)):

    if true_examples:
      zeros1=(4-len(i[1]))*"0"
      zeros2=(4-len(i[2]))*"0"
      main_string = main_dir_str + f"{i[0]}/{i[0]}_"
      image1 = main_string + zeros1+i[1]+".jpg"
      image2 = main_string + zeros2+i[2]+".jpg"
    else:
      zeros1=(4-len(i[1]))*"0"
      zeros2=(4-len(i[3]))*"0"

      image1 = main_dir_str+i[0]+f"/{i[0]}_"+zeros1+i[1]+".jpg"
      image2 = main_dir_str+i[2]+f"/{i[2]}_"+zeros2+i[3]+".jpg"
    
    images[idx][0] = image1
    images[idx][1] = image2
  
  # print(f"Total time: {end - start}. Time in main functions: {total_main}")
  
  return images

In [45]:
def get_images_from_files(path):
    """
    We open the image in the given path. The path must lead to a .jpg file.
    We then resize it to 105x105 like in the paper (the dataset contains 250x250 images.)

    """
    my_image = tf.io.read_file(path)
    my_image = tf.image.decode_jpeg(my_image, channels=1)
    my_image = tf.image.resize(my_image, (105, 105))
    my_image = tf.cast(my_image/255, tf.float32)
    my_image = tf.image.convert_image_dtype(my_image, tf.float32)
    return my_image

In [46]:
def pair_from_files(paths_tuple):
  return get_images_from_files(paths_tuple[0]), get_images_from_files(paths_tuple[1])

In [47]:
train_false, train_true = open_file(train_path)
test_false, test_true = open_file(test_path)

train_false = get_image_pairs(dirName, train_false, true_examples=False)
test_false = get_image_pairs(dirName, test_false, true_examples=False)
test_true = get_image_pairs(dirName, test_true, true_examples=True)
train_true = get_image_pairs(dirName, train_true, true_examples=True)

1100it [00:00, 223187.62it/s]
1100it [00:00, 287944.48it/s]
1100it [00:00, 300902.26it/s]
1100it [00:00, 209154.29it/s]


In [48]:
# Split into train and validation (maybe here it goes bad?)
validation_size = 0.25
X_train_same, X_val_same, y_train_same, y_val_same = train_test_split(np.array(train_true), np.ones(len(train_true)), test_size=validation_size, random_state=42)
X_train_diff, X_val_diff, y_train_diff, y_val_diff = train_test_split(np.array(train_false), np.zeros(len(train_false)), test_size=validation_size, random_state=42)

In [49]:
# Combine same and diff to create final training sets
X_train, y_train = np.vstack((X_train_same, X_train_diff)), np.hstack((np.array(y_train_same), np.array(y_train_diff)))
X_val, y_val = np.vstack((X_val_same, X_val_diff)), np.hstack(np.array(y_val_same))
X_test, y_test = np.vstack((np.array(test_true), np.array(test_false))), np.hstack((np.ones(len(test_true)), np.zeros(len(test_false))))

In [50]:
# Given images and their labels gets them from file.
def get_dataset(pairs, labels):
  dataset = tf.data.Dataset.from_tensor_slices(pairs)
  labels = tf.data.Dataset.from_tensor_slices(labels)
  dataset = tf.data.Dataset.zip((dataset.map(pair_from_files, num_parallel_calls=tf.data.experimental.AUTOTUNE), labels))
  return dataset.repeat(1).shuffle(buffer_size=1024).batch(16)


In [51]:
total_train = len(train_false) + len(train_true)
total_test = len(test_false) + len(test_true)
total_val = len(X_val_diff) + len(X_val_same)
total = total_train + total_test
print(f"Total images in train {total_train - total_val}")
print(f"Total images in test {total_test}")
print(f"Total images in validation {total_val}")
print(f"The number of similar or different pairs is equal in all sets. So {len(train_false) - len(X_val_same)} in the training set, {len(test_true)} in the test set, and {len(X_val_same)} in the validation set")
print(f"Total number of images: {total}")

Total images in train 1650
Total images in test 2200
Total images in validation 550
The number of similar or different pairs is equal in all sets. So 825 in the training set, 1100 in the test set, and 275 in the validation set
Total number of images: 4400


### Building The model

In [52]:
# From the article:
# Minibatch size: 128
# Allowed diff lr for each layer. Decayed uniformly by 1%

# Momentum starts at 0.5 in every layer, increasing until mu_j of that layer.
# mµ_j ∈ [0, 1],

# CNN Weight initializations: W ~ N(0, 0.01)
# Dense Weight inits: W ~ N(0, 0.2)
# Bias initializations: B ~ N(0.5, 0.01)

# Regularization: L2 on each layer
# Stopped when validation error did not decrease for 20 epochs.

# Adds a pooling layer. Optional batch normalization and dropout for experiments
def add_pooling(layer, batchnorm, dropout):
    
    if (batchnorm):
      layer = BatchNormalization()(layer)
    m1 = MaxPooling2D()(layer)
    if (dropout):
      m1 = Dropout(dropout)(m1)
    
    return m1
    
def conv_layers(batchnorm = False, dropout = False):
    input = Input(shape=(105, 105, 1), name='input_image')

    conv1 = Conv2D(64, (10,10), activation='relu', kernel_regularizer=l2(0.0001),
                kernel_initializer=initializers.RandomNormal(stddev=0.01),
                bias_initializer=initializers.RandomNormal(mean=0.5, stddev=0.01))(input)



    pool1 = add_pooling(conv1, batchnorm=batchnorm, dropout=dropout)

    conv2 = Conv2D(128, (7, 7), activation='relu', kernel_regularizer=l2(0.0001),
              kernel_initializer=initializers.RandomNormal(stddev=0.01),
              bias_initializer=initializers.RandomNormal(mean=0.5, stddev=0.01))(pool1)

    pool2 = add_pooling(conv2, batchnorm=batchnorm, dropout=dropout)

    conv3 = Conv2D(128, (4, 4), activation='relu', kernel_regularizer=l2(0.0001),
              kernel_initializer=initializers.RandomNormal(stddev=0.01),
              bias_initializer=initializers.RandomNormal(mean=0.5, stddev=0.01))(pool2)

    pool3 = add_pooling(conv3, batchnorm=batchnorm, dropout=dropout)

    conv4 = Conv2D(256, (4,4), activation='relu', kernel_regularizer=l2(0.0001),
              kernel_initializer=initializers.RandomNormal(stddev=0.01),
              bias_initializer=initializers.RandomNormal(mean=0.5, stddev=0.01))(pool3)
    
    if (batchnorm):
      conv4 = BatchNormalization()(conv4)

    flatten = Flatten()(conv4)
    dense1 = Dense(2048, activation='sigmoid', kernel_regularizer=l2(0.0001))(flatten)

    return Model(inputs=[input], outputs=[dense1], name="embedding")

In [53]:
# The layer mentioned in the paper to compute the distance.
class L1Layer(Layer):
    
    def __init__(self, **kwargs):
        super().__init__()
       
    
    def call(self, embed1, embed2):
        return tf.math.abs(embed1 - embed2)

In [54]:
# Combine the functions we built in order to make a full model
def make_full_model(batchnorm = False, dropout=False):
    
    # The two inputs to the network
    input_image = Input(name='Image_one', shape=(105, 105, 1))
    validation_image = Input(name='Image_two', shape=(105, 105, 1))
    
    # Creating the Convolutional Layers
    conv_model = conv_layers(batchnorm=batchnorm, dropout=dropout)

    # Layers Getting distance between the two image embeddings
    combination_layer = L1Layer()
    distances = combination_layer(conv_model(input_image), 
                                  conv_model(validation_image))
    
    classifier = Dense(1, activation="sigmoid")(distances)

    return Model(inputs=[input_image, validation_image], outputs=classifier, name='full_network')

In [55]:
learning_rate = 0.0001

siamese_net = make_full_model(batchnorm = False, dropout = 0.1)
siamese_net.summary()

optimizer = Adam(learning_rate)

siamese_net.compile(loss="binary_crossentropy", optimizer=optimizer, metrics=['binary_accuracy'])

# early stopping definition
callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10, mode="min", verbose=1)

Model: "full_network"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 Image_one (InputLayer)         [(None, 105, 105, 1  0           []                               
                                )]                                                                
                                                                                                  
 Image_two (InputLayer)         [(None, 105, 105, 1  0           []                               
                                )]                                                                
                                                                                                  
 embedding (Functional)         (None, 2048)         20071232    ['Image_one[0][0]',              
                                                                  'Image_two[0][0]']   

In [56]:
train = get_dataset(X_train,y_train)
validation = get_dataset(X_val , y_val)
test = get_dataset(X_test, y_test)

In [57]:
#training of the model
start = time.time() 
model_out = siamese_net.fit(x=train, epochs=100, validation_data= validation, callbacks=[callback])
end = time.time()

# prediction results- an array contains all the predictions probabilities.
print('Predict results')
pred = siamese_net.predict(test)

# training time measurments. 
print('Model convergence time: {} s.'.format(end - start))

# evaluation of the model's performances on the test set
results_test = siamese_net.evaluate(test)
print('\nEvaluation results:')
print(results_test)

# evaluation of the performance on the validation set
results_val = siamese_net.evaluate(validation)
print('\nEvaluation results:')
print(results_val)

Epoch 1/100


NotFoundError: ignored

In [None]:
print(model_out.history.keys())
# summarize history for accuracy
plt.plot(model_out.history['binary_accuracy'])
plt.plot(model_out.history['val_binary_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper left')
plt.show()

# summarize history for loss
plt.plot(model_out.history['loss'])
plt.plot(model_out.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')

plt.legend(['train', 'validation'], loc='upper left')
plt.show()