In [1]:
import warnings
warnings.filterwarnings('ignore')
import os
import random
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.data import AUTOTUNE
from tensorflow.keras.models import Model
from tensorflow.keras.backend import epsilon
from tensorflow.keras.optimizers import Adam
from tensorflow.math import square, maximum, reduce_mean, sqrt, reduce_sum
from tensorflow.keras.layers import Input, Dense, Flatten, Dropout, Conv2D, MaxPooling2D, AveragePooling2D, BatchNormalization, Lambda
from keras.callbacks import EarlyStopping
import keras_tuner as kt

In [2]:
#Parameters' initialization
random_seed = 123

In [3]:
def read_pairs(sample_name, separate = True):
    
    final_df = pd.read_csv(f'./csv/{sample_name}_pairs.csv')

    if separate:
        for col in ['img_1', 'img_2']:
            final_df[col] =  [f'./cropped_images/{i}'for i in final_df[col]]

        imgs = final_df[['img_1', 'img_2']]
        labels = final_df[['label']]

        return np.array(imgs), np.array(labels)
        
    else:
        return final_df

In [4]:
train_imgs, train_labels = read_pairs('train')
valid_imgs, valid_labels = read_pairs('valid')
test_imgs, test_labels = read_pairs('test')

In [6]:
print(len(train_imgs))
print(len(valid_imgs))
print(len(test_imgs))

4050
1250
1250


In [7]:
def tf_img_pipeline(anchor, comparison):
    
    def tf_img_processing(img_path):
        img = tf.io.read_file(img_path)
        img = tf.image.decode_jpeg(img, channels = 3)
        img = tf.image.resize(img, [224,224], method = 'bilinear')
        img = tf.image.convert_image_dtype(img, tf.float32) /  tf.constant(255, dtype = tf.float32)

        return img

    return tf_img_processing(anchor), tf_img_processing(comparison)

def tf_label_pipeline(label):
    return tf.cast(label, tf.float32)

def tf_data_processing_pipeline(images, labels):

    images_tf = tf.data.Dataset.from_tensor_slices((images[:, 0] , images[:, 1])).map(tf_img_pipeline)
    labels_tf = tf.data.Dataset.from_tensor_slices(labels).map(tf_label_pipeline)

    dataset = tf.data.Dataset.zip((images_tf,
                                    labels_tf)).batch(2,
                                                      num_parallel_calls = AUTOTUNE).cache().prefetch(buffer_size = AUTOTUNE)
    
    
    return dataset

In [8]:
train_tf = tf_data_processing_pipeline(train_imgs, train_labels)
valid_tf = tf_data_processing_pipeline(valid_imgs, valid_labels)
test_tf = tf_data_processing_pipeline(test_imgs, test_labels)

In [9]:
print(tf.data.experimental.cardinality(train_tf).numpy())
print(tf.data.experimental.cardinality(valid_tf).numpy())
print(tf.data.experimental.cardinality(test_tf).numpy())

2025
625
625


In [10]:
#Function for calculation an Euclidean distance between the two feature vectors
def euclidean_distance(vectors):

    x, y = vectors
    sum_square = reduce_sum(square(x - y), axis = 1, keepdims = True)

    return sqrt(maximum(sum_square, epsilon()))



#Function for a calculation of a contrastive loss
def contrastive_loss(margin = 1):

    def contrastive__loss(y_true, y_pred):

        square_pred = tf.math.square(y_pred)
        margin_square = tf.math.square(tf.math.maximum(margin - (y_pred), 0))
        return tf.math.reduce_mean(
            (1 - y_true) * square_pred + (y_true) * margin_square
        )

    return contrastive__loss

In [12]:
def model_building(hp):

    #Input layer
    inputs = Input(shape = (224, 224, 3))
    x = inputs

    #Tuning a number of convolutional blocks
    for i in range(hp.Int('conv_blocks', min_value = 2, max_value = 5, default = 3)):
    
        #Tuning the number of convolution's output filters
        filters = hp.Int('filters_' + str(i), min_value = 32,
                        max_value = 1000, step = 32) 

        #Within each block, perform 2 convolutions and batch normalization
        for _ in range(2):

        #Tuning the number of convolution's output filters
            x = Conv2D(filters, kernel_size=(3, 3), padding = 'same',
                 activation = 'relu')(x)
            x = BatchNormalization()(x)

        #Tuning the pooling type in the convolutional block
        if hp.Choice('pooling_' + str(i), ['avg', 'max']) == 'max':
            x = MaxPooling2D()(x)
        else:
            x = AveragePooling2D()(x)
    
    #Tuning the dropout rate in the dropout layer in the convolutional block
        x = Dropout((hp.Float('dropout', 0, 0.5, step = 0.05, default = 0.5)), seed = 123)(x)

    #Flatten the output
    x = Flatten()(x)

    #Tuning the number of units in the dense layer
    x = Dense(hp.Int('Dense units' ,min_value = 50,
                   max_value = 100, step = 10, default = 50),
                  activation='relu')(x)

    #Tuning the dropout rate in the dropout layer - the final feature vector layer
    feature_layer = Dropout((hp.Float('dropout', 0, 0.5, step = 0.05, default = 0.5)), seed = 123)(x)

    #Mapping a embedding model
    embedding_network = Model(inputs, feature_layer)
    
    #Setting an input layer for the image pairs
    input_1 = Input((224, 224, 3))
    input_2 = Input((224, 224, 3))
    tower_1 = embedding_network(input_1)
    tower_2 = embedding_network(input_2)

    #Layers for calculation of the Euclidean distance between the two feature vectors, with further normalization
    merge_layer = Lambda(euclidean_distance)([tower_1, tower_2])
    normal_layer = BatchNormalization()(merge_layer)

    #Final output layer (classification whether the images are of the same label/person)
    output_layer = Dense(1, activation="sigmoid")(normal_layer)

    #Final model mapping
    model = Model(inputs=[input_1, input_2], outputs = output_layer)

    #Model compilation:
        #Tuning the learning rate of the stochastic gradient method in the Adam optimizer.
        #Minimizing a binary cross entropy loss function and maximizing an accuracy.
        #We compute the binary cross entropy for each label separately and then sum them up for the complete loss.
        
    model.compile(optimizer = Adam(hp.Float('learning_rate', min_value = 1e-4,
                                                            max_value = 1e-2,
                                                            sampling = 'log')), 
                    loss = contrastive_loss(margin = 1))

    return model

In [13]:
bayes_opt = kt.tuners.BayesianOptimization(model_building, objective = 'val_loss', max_trials = 5, seed = random_seed)

bayes_opt.search(train_tf,
                 validation_data = valid_tf,
                 epochs = 1,
                 callbacks = [EarlyStopping(patience = 2)])

INFO:tensorflow:Reloading Oracle from existing project .\untitled_project\oracle.json

Search: Running Trial #1

Value             |Best Value So Far |Hyperparameter
2                 |?                 |conv_blocks
960               |?                 |filters_0
avg               |?                 |pooling_0
0.3               |?                 |dropout
64                |?                 |filters_1
avg               |?                 |pooling_1
608               |?                 |filters_2
max               |?                 |pooling_2
50                |?                 |Dense units
0.00066812        |?                 |learning_rate

