In [None]:
#Based of implementation at https://github.com/nicknochnack/FaceRecognition by author: nicknochnack
#This code explores the code provided above but goes further by adding pre-trained feature extractors amongst other things

#Import dependencies
import cv2
import os
import random
import numpy as np
from matplotlib import pyplot as plt

#Import tensorflow dependencies
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf

In [None]:
#Initial embedding used in the network. This CNN is very primitive and would be incapable of providing good results
#Due to this other pre-trained models were used
def make_embedding():
    input = Input(shape=(224,224,3), name='flood_image')

    #First convulutional layer
    c1 = Conv2D(64, (10,10), activation='relu')(input)
    m1 = MaxPooling2D(64, (2,2), padding='same')(c1)

    #Second convulutional layer
    c2 = Conv2D(128, (7,7), activation='relu')(m1)
    m2 = MaxPooling2D(64, (2,2), padding='same')(c2)

    #Third and final convulutional layer
    c3 = Conv2D(128, (4,4), activation='relu')(m2)
    m3 = MaxPooling2D(64, (2,2), padding='same')(c3)

    #Final layer
    c4 = Conv2D(256, (4,4), activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation='sigmoid')(f1)

    return Model(inputs=[input], outputs=[d1], name='embedding')

In [None]:
embedding = make_embedding()
embedding.summary()

Model: "embedding"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 flood_image (InputLayer)    [(None, 224, 224, 3)]     0         
                                                                 
 conv2d (Conv2D)             (None, 215, 215, 64)      19264     
                                                                 
 max_pooling2d (MaxPooling2D  (None, 108, 108, 64)     0         
 )                                                               
                                                                 
 conv2d_1 (Conv2D)           (None, 102, 102, 128)     401536    
                                                                 
 max_pooling2d_1 (MaxPooling  (None, 51, 51, 128)      0         
 2D)                                                             
                                                                 
 conv2d_2 (Conv2D)           (None, 48, 48, 128)       26

In [None]:
from scipy.spatial import distance

# Siamese L1 Distance class
class L1Dist(Layer):
    
    # Init method - inheritance
    def __init__(self, **kwargs):
        super().__init__()
       
    # Similarity calculation
    def call(self, input_embedding, validation_embedding):
        
        return tf.math.abs(input_embedding - validation_embedding)

In [None]:
from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.applications.vgg19 import VGG19
from tensorflow.keras.applications.vgg16 import preprocess_input
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dropout

model = VGG16(weights='imagenet')

# Store the fully connected layers
fc1 = model.layers[-3]
fc2 = model.layers[-2]
predictions = model.layers[-1]

# Create the dropout layers
dropout1 = Dropout(0.3)
dropout2 = Dropout(0.3)

# Reconnect the layers
x1 = dropout1(fc1.output)
x2 = fc2(x1)
x3 = dropout2(x2)

predictors = predictions(x3)

# Create a new model
model2 = Model(inputs=model.inputs, outputs=predictors)
vgg16_feature_extractor = Model(inputs=model2.inputs, outputs=model2.layers[-2].output)
vgg16_feature_extractor.summary()

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/vgg16/vgg16_weights_tf_dim_ordering_tf_kernels.h5
Model: "model_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 224, 224, 3)]     0         
                                                                 
 block1_conv1 (Conv2D)       (None, 224, 224, 64)      1792      
                                                                 
 block1_conv2 (Conv2D)       (None, 224, 224, 64)      36928     
                                                                 
 block1_pool (MaxPooling2D)  (None, 112, 112, 64)      0         
                                                                 
 block2_conv1 (Conv2D)       (None, 112, 112, 128)     73856     
                                                                 
 block2_conv2 (Conv2D)       (None, 112, 112, 128)     147

In [None]:
base = VGG19(weights='imagenet')
model = Model(inputs=base.inputs, outputs=base.layers[-2].output)

# Store the fully connected layers
fc1 = model.layers[-3]
fc2 = model.layers[-2]
predictions = model.layers[-1]

# Create the dropout layers
dropout1 = Dropout(0.3)
dropout2 = Dropout(0.3)

# Reconnect the layers
x1 = dropout1(fc1.output)
x2 = fc2(x1)
x3 = dropout2(x2)

predictors = predictions(x3)

# Create a new model
model2 = Model(inputs=model.inputs, outputs=predictors)
vgg19_feature_extractor = Model(inputs=model2.inputs, outputs=model2.layers[-2].output)

vgg19_feature_extractor.summary()

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/vgg19/vgg19_weights_tf_dim_ordering_tf_kernels.h5
Model: "model_4"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_2 (InputLayer)        [(None, 224, 224, 3)]     0         
                                                                 
 block1_conv1 (Conv2D)       (None, 224, 224, 64)      1792      
                                                                 
 block1_conv2 (Conv2D)       (None, 224, 224, 64)      36928     
                                                                 
 block1_pool (MaxPooling2D)  (None, 112, 112, 64)      0         
                                                                 
 block2_conv1 (Conv2D)       (None, 112, 112, 128)     73856     
                                                                 
 block2_conv2 (Conv2D)       (None, 112, 112, 128)     147

In [None]:
base = tf.keras.applications.ResNet101(weights="imagenet")

#base.summary()

model = Model(inputs=base.inputs, outputs=base.layers[-2].output)

# Create the dropout layers
dropout1 = Dropout(0.3)

fc1 = base.layers[-2]

# Reconnect the layers
predictors = dropout1(fc1.output)

# Create a new model
resnet_feature_extractor = Model(inputs=model.inputs, outputs=predictors)

resnet_feature_extractor.summary()

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet101_weights_tf_dim_ordering_tf_kernels.h5
Model: "model_6"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_3 (InputLayer)           [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 conv1_pad (ZeroPadding2D)      (None, 230, 230, 3)  0           ['input_3[0][0]']                
                                                                                                  
 conv1_conv (Conv2D)            (None, 112, 112, 64  9472        ['conv1_pad[0][0]']              
                                )                    

In [None]:
def make_siamese_model(): 
    
    # Anchor image input in the network
    input_image = Input(name='input_img', shape=(224,224,3))
    
    # Validation image in the network 
    validation_image = Input(name='validation_img', shape=(224,224,3))
    
    # Combine siamese distance components
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    
    #3 possible models
    #distances = siamese_layer(vgg16_feature_extractor(input_image), vgg16_feature_extractor(validation_image))
    #distances = siamese_layer(vgg19_feature_extractor(input_image), vgg19_feature_extractor(validation_image))
    distances = siamese_layer(resnet_feature_extractor(input_image), resnet_feature_extractor(validation_image))
    
    # Classification layer (2 possible ones: sigmoid and tanh)
    classifier = Dense(1, activation='tanh')(distances)
    #classifier = Dense(1, activation='sigmoid')(distances)
    
    return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')

In [None]:
'''
# Setup paths
ANC_PATH = '/content/drive/MyDrive/Third_Year_Project/Siamese_Network/Data_Siamese/Anchor'
NEG_PATH = '/content/drive/MyDrive/Third_Year_Project/Siamese_Network/Data_Siamese/Negative'
POS_PATH = '/content/drive/MyDrive/Third_Year_Project/Siamese_Network/Data_Siamese/Positive'

anchor = tf.data.Dataset.list_files(ANC_PATH+'/*.png').take(500)
positive = tf.data.Dataset.list_files(POS_PATH+'/*.png').take(500)
negative = tf.data.Dataset.list_files(NEG_PATH+'/*.png').take(500)

positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)
'''

#In this case the anchor is flooding true images, and the positive also flooding true and the negative is flooding false
PATH_TEST_1 = '/content/drive/MyDrive/Third_Year_Project/PyCharm Folder/PreProcessing/Data/ANCHOR'
PATH_TEST_2 = '/content/drive/MyDrive/Third_Year_Project/PyCharm Folder/PreProcessing/Data/NEGATIVE'
PATH_TEST_3 = '/content/drive/MyDrive/Third_Year_Project/PyCharm Folder/PreProcessing/Data/POSITIVE'

a = tf.data.Dataset.list_files(PATH_TEST_1+'/*.png', shuffle = False).take(500)
b = tf.data.Dataset.list_files(PATH_TEST_2+'/*.png', shuffle = False).take(500)
c = tf.data.Dataset.list_files(PATH_TEST_3+'/*.png', shuffle = False).take(500)

positives = tf.data.Dataset.zip((a, c, tf.data.Dataset.from_tensor_slices(tf.ones(len(a)))))
negatives = tf.data.Dataset.zip((a, b, tf.data.Dataset.from_tensor_slices(tf.zeros(len(a)))))
data = positives.concatenate(negatives)

def preprocess(file_path):
    
    # Read in image from file path
    byte_img = tf.io.read_file(file_path)
    
    # Load in the image 
    img = tf.io.decode_jpeg(byte_img)
    
    # Preprocessing steps - resizing the image to be 100x100x3
    img = tf.image.resize(img, (224,224))

    # Scale image to be between 0 and 1 
    img = img / 255.0

    # Return image
    return img

def preprocess_twin(input_img, validation_img, label):
    return(preprocess(input_img), preprocess(validation_img), label)

In [None]:
# Build dataloader pipeline
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=220) #very important to shuffle data, since it may cause the model to overfit otherwise

# Training partition
train_data = data.take(round(len(data)*.8))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

# Testing partition
test_data = data.skip(round(len(data)*.8))
test_data = test_data.take(round(len(data)*.2))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

In [None]:
siamese_model = make_siamese_model()
siamese_model.summary()
binary_cross_loss = tf.losses.BinaryCrossentropy()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_img (InputLayer)         [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 validation_img (InputLayer)    [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 model_6 (Functional)           (None, 2048)         42658176    ['input_img[0][0]',              
                                                                  'validation_img[0][

In [None]:
#Set the optimizer to Adam or Adamax
opt = tf.keras.optimizers.Adamax(1e-4)
#opt = tf.keras.optimizers.Adam(1e-3) # 0.001

In [None]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)

In [None]:
test_batch = train_data.as_numpy_iterator()

In [None]:
#Function used for training steps
@tf.function
def train_step(batch):
    
    # Record all of our operations 
    with tf.GradientTape() as tape:     
        # Get anchor and positive/negative image
        X = batch[:2]
        # Get label
        y = batch[2]
        
        # Forward pass
        yhat = siamese_model(X, training=True)
        # Calculate loss
        loss = binary_cross_loss(y, yhat)
    print(loss)
        
    # Calculate gradients
    grad = tape.gradient(loss, siamese_model.trainable_variables)
    
    # Calculate updated weights and apply to siamese model
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))
        
    # Return loss
    return loss

In [None]:
# Import metric calculations
from tensorflow.keras.metrics import Precision, Recall, Accuracy, BinaryAccuracy

In [None]:
metrics_plotting = {}
loss_arr = []
recall = []
precision = []
accuracy = []
def train(data, EPOCHS):
    
    # Loop through epochs
    for epoch in range(1, EPOCHS+1):
        
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))
        
        # Creating metric objects
        r = Recall()
        p = Precision()
        a = BinaryAccuracy()
        
        # Loop through each batch
        for idx, batch in enumerate(data):
            
            # Run train step here
            loss = train_step(batch)
            yhat = siamese_model.predict(batch[:2])

            #Printing calculated distance before post processing, i.e before rounding to 0 or 1
            print(yhat)

            #Normalizing all values to the same format
            #yhat = [1 if prediction > 0.5 else 0 for prediction in yhat]
            #yhat = [1 if ((prediction > 0.45) and (prediction < 0.55)) else 0 for prediction in yhat]         

            yhat = [1 if ((prediction > -0.50) and (prediction < 0.50)) else 0 for prediction in yhat]
            temp = [1 if e == 1.0 else 0 for e in batch[2]]
            
            #POST PROCESSING FOR TANH
            #yhat = [1 if (prediction < 0.5 and prediction > -0.5) else 0 for prediction in yhat] #1 for true, i.e match
            #temp = [1 if e > 0.5 else 0 for e in batch[2]] #0 for false, i.e not match

            #Printing out more values
            print(yhat)
            print(temp)

            r.update_state(temp, yhat)
            p.update_state(temp, yhat) 
            a.update_state(temp, yhat) 

            progbar.update(idx+1)

        #Print out metrics    
        print(loss.numpy(), r.result().numpy(), p.result().numpy(), a.result().numpy())

        loss_arr.append(loss.numpy())
        recall.append(r.result().numpy())
        precision.append(p.result().numpy())
        accuracy.append(a.result().numpy())
        
        # Save checkpoints
        if epoch % 5 == 0: 
            checkpoint.save(file_prefix=checkpoint_prefix)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
EPOCHS = 50

In [None]:
tf.config.run_functions_eagerly(True)
train(train_data, EPOCHS)


 Epoch 1/50
tf.Tensor(8.106174, shape=(), dtype=float32)


  "Even though the `tf.config.experimental_run_functions_eagerly` "


[[0.04549526]
 [0.19382977]
 [0.23367089]
 [0.11302537]
 [0.20738949]
 [0.02622624]
 [0.08157068]
 [0.02420389]
 [0.1573437 ]
 [0.10398606]
 [0.06933904]
 [0.12414758]
 [0.08611031]
 [0.09321554]
 [0.02107719]
 [0.13355069]]
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
[1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1, 1, 1, 1, 0, 0]
 1/11 [=>............................] - ETA: 5:03tf.Tensor(6.0916014, shape=(), dtype=float32)
[[ 0.12451026]
 [ 0.07407075]
 [ 0.30055305]
 [ 0.15440173]
 [ 0.16927698]
 [ 0.11487014]
 [ 0.18876818]
 [ 0.08958744]
 [ 0.02571942]
 [ 0.06728908]
 [ 0.01797558]
 [ 0.11632381]
 [ 0.0558189 ]
 [ 0.11309038]
 [ 0.05123478]
 [-0.02289977]]
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
[1, 0, 1, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 1, 0, 0]
 2/11 [====>.........................] - ETA: 9s  tf.Tensor(6.2416573, shape=(), dtype=float32)
[[ 0.02835516]
 [ 0.00969997]
 [ 0.08583714]
 [ 0.01696903]
 [-0.00748495]
 [ 0.07083166]
 [ 0.08270182]
 [ 0.10258839]
 [ 0.1050697 ]
 [ 0.125770

KeyboardInterrupt: ignored

In [None]:
epochs_arr = []
for epoch in range(1, EPOCHS+1):
  epochs_arr.append(epoch)

# summarize history for accuracy
plt.plot(epochs_arr, accuracy)
plt.title('Training accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Acc'], loc='upper left')
plt.savefig('Acc.png')
plt.show()

# summarize history for accuracy
plt.plot(epochs_arr, precision)
plt.title('Training precision')
plt.ylabel('Precision')
plt.xlabel('Epoch')
plt.legend(['Precision'], loc='upper left')
plt.savefig('Precision.png')
plt.show()

# summarize history for accuracy
plt.plot(epochs_arr, recall)
plt.title('Training recall')
plt.ylabel('Recall')
plt.xlabel('Epoch')
plt.legend(['Recall'], loc='upper left')
plt.savefig('Recall.png')
plt.show()

# summarize history for accuracy
plt.plot(epochs_arr, loss_arr)
plt.title('Training loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Loss'], loc='upper left')
plt.savefig('Loss.png')
plt.show()

In [None]:
# Get a batch of test data
test_input, test_val, y_true = test_data.as_numpy_iterator().next()

r = Recall()
p = Precision()
a = BinaryAccuracy()

val_accuracy = []
val_loss_arr = []
val_recall = []
val_precision = []

true_and_true = 0
true_and_false = 0
false_and_false = 0
false_and_true = 0
total = 0 
for test_input, test_val, y_true in test_data.as_numpy_iterator():
    
    yhat = siamese_model.predict([test_input, test_val])
    
    yhat = [1 if ((prediction > -0.50) and (prediction < 0.50)) else 0 for prediction in yhat] #USED FOR TANH ACTIVATION OF DISTANCES
    temp = [1 if e == 1.0 else 0 for e in y_true]

    for e, i in zip(yhat, temp):
      total += 1
      if e == 1 and i == 1:
        #true positive
        true_and_true += 1
      elif e == 1 and i == 0:
        #false positive
        true_and_false += 1
      elif e == 0 and i == 0:
        #true negative
        false_and_false += 1
      elif e == 0 and i == 1:
        #false negative
        false_and_true += 1

    r.update_state(temp,yhat)
    p.update_state(temp,yhat) 
    a.update_state(temp,yhat)

    print(temp)
    print(yhat)

print("------------------------------------------------")
print(true_and_true)
print(true_and_false)
print(false_and_false)
print(false_and_true)
print("------------------------------------------------")

print(r.result().numpy(), p.result().numpy(), a.result().numpy())

# Set plot size 
plt.figure(figsize=(10,8))

# Set first subplot
plt.subplot(1,2,1)
plt.imshow(test_input[0])

# Set second subplot
plt.subplot(1,2,2)
plt.imshow(test_val[0])

# Renders cleanly
plt.show()

In [None]:
# Save weights
siamese_model.save('/content/drive/MyDrive/Third_Year_Project/Siamese_Network/siamesemodelv2.h5')
L1Dist

# Reload model 
siamese_model = tf.keras.models.load_model('/content/drive/MyDrive/Third_Year_Project/Siamese_Network/siamesemodelv2.h5', 
                                   custom_objects={'L1Dist':L1Dist, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy})

# Make predictions with reloaded model
siamese_model.predict([test_input, test_val])

In [None]:
# View model summary
siamese_model.summary()

In [None]:
import h5py

filename = "/content/drive/MyDrive/Third_Year_Project/Siamese_Network/siamesemodelv2.h5"

h5 = h5py.File(filename,'r')

futures_data = h5  # VSTOXX futures data

print(h5)

h5.close()