In [1]:
# imports
import os
import numpy as np
from matplotlib import pyplot as plt

In [2]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPool2D, Input, Flatten, Dropout, GlobalAveragePooling2D, AveragePooling2D, Activation, BatchNormalization
import tensorflow as tf

In [3]:
gpus = tf.config.experimental.list_physical_devices('GPU')
print(gpus)

[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


In [4]:
# preventing tf from utilizing the full GPU
for gpu in gpus: 
    tf.config.experimental.set_memory_growth(gpu, True)

In [5]:
POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')

In [6]:
# selecting files from directories
anchor = tf.data.Dataset.list_files(ANC_PATH+'\*.png').take(262)
positive = tf.data.Dataset.list_files(POS_PATH+'\*.png').take(262)
negative = tf.data.Dataset.list_files(NEG_PATH+'\*.png').take(262)

In [7]:
# dir_test = anchor.as_numpy_iterator()

In [8]:
# print(dir_test.next())

In [9]:
# preprocessing
def preprocess(file_path):
    byte_img = tf.io.read_file(file_path)
    img = tf.io.decode_png(byte_img)    
    # img = tf.image.resize(img, (128,64))
    img = img / 255
    img = img[:,:,0]
    return img

In [10]:
# img = preprocess(r"data\anchor\POAG-000008-2009-02-03-OD.png")

In [11]:
# img.shape

In [12]:
# img2 = img[:,:,0]
# img2.shape

In [13]:
# img.numpy().max()
# plt.imshow(img,cmap="gray")

In [14]:
# creating matching and non-matching pairs
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

In [15]:
# data

In [16]:
# samples = data.as_numpy_iterator()

In [17]:
# example = samples.next()

In [18]:
# wrapper function for preprecessing 2 images
def preprocess_twin(input_img, validation_img, label):
    return(preprocess(input_img), preprocess(validation_img), label)

In [19]:
# res = preprocess_twin(*example)
# len(res)

In [20]:
# plt.imshow(res[1])

In [21]:
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=1024)

In [22]:
# setting up data for training
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [23]:
# setting up data for testing
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

In [24]:
# creating the model - VGG16
def make_embedding(): 
    inp = Input(shape=(128,64,1))

    x = Conv2D (filters =64, kernel_size =3, padding ='same', activation='relu')(inp)
    x = Conv2D (filters =64, kernel_size =3, padding ='same', activation='relu')(x)
    x = MaxPool2D(pool_size =2, strides =2, padding ='same')(x)

    x = Conv2D (filters =128, kernel_size =3, padding ='same', activation='relu')(x)
    x = Conv2D (filters =128, kernel_size =3, padding ='same', activation='relu')(x)
    x = MaxPool2D(pool_size =2, strides =2, padding ='same')(x)

    x = Conv2D (filters =256, kernel_size =3, padding ='same', activation='relu')(x)
    x = Conv2D (filters =256, kernel_size =3, padding ='same', activation='relu')(x)
    x = Conv2D (filters =256, kernel_size =3, padding ='same', activation='relu')(x)
    x = MaxPool2D(pool_size =2, strides =2, padding ='same')(x)

    x = Conv2D (filters =512, kernel_size =3, padding ='same', activation='relu')(x)
    x = Conv2D (filters =512, kernel_size =3, padding ='same', activation='relu')(x)
    x = Conv2D (filters =512, kernel_size =3, padding ='same', activation='relu')(x)
    x = MaxPool2D(pool_size =2, strides =2, padding ='same')(x)

    x = Conv2D (filters =512, kernel_size =3, padding ='same', activation='relu')(x)
    x = Conv2D (filters =512, kernel_size =3, padding ='same', activation='relu')(x)
    x = Conv2D (filters =512, kernel_size =3, padding ='same', activation='relu')(x)
    x = MaxPool2D(pool_size =2, strides =2, padding ='same')(x)

    x = Flatten()(x)
    x = Dense(units = 4096, activation ='relu')(x)
    x = Dense(units = 4096, activation ='relu')(x)
    output = Dense(units = 1, activation ='sigmoid')(x)

    model = Model (inputs=inp, outputs =output)
    
    return model

In [25]:
embedding = make_embedding()

In [26]:
embedding.summary()

Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 128, 64, 1)]      0         
                                                                 
 conv2d (Conv2D)             (None, 128, 64, 64)       640       
                                                                 
 conv2d_1 (Conv2D)           (None, 128, 64, 64)       36928     
                                                                 
 max_pooling2d (MaxPooling2D  (None, 64, 32, 64)       0         
 )                                                               
                                                                 
 conv2d_2 (Conv2D)           (None, 64, 32, 128)       73856     
                                                                 
 conv2d_3 (Conv2D)           (None, 64, 32, 128)       147584    
                                                             

In [27]:
# distancing/differencing layer
class L1Dist(Layer):
    def __init__(self, **kwargs):
        super().__init__()

    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

In [28]:
l1 = L1Dist()

In [29]:
# setting up the siamese architecture
def make_siamese_model(): 
    
    input_image = Input(shape=(128,64,1))
    
    validation_image = Input(shape=(128,64,1))
    
    siamese_layer = L1Dist()
    distances = siamese_layer(embedding(input_image), embedding(validation_image))
    
    classifier = Dense(1, activation='sigmoid')(distances)
    
    return Model(inputs=[input_image, validation_image], outputs=classifier)

In [30]:
siamese_model = make_siamese_model()

In [31]:
siamese_model.summary()

Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_2 (InputLayer)           [(None, 128, 64, 1)  0           []                               
                                ]                                                                 
                                                                                                  
 input_3 (InputLayer)           [(None, 128, 64, 1)  0           []                               
                                ]                                                                 
                                                                                                  
 model (Functional)             (None, 1)            48280257    ['input_2[0][0]',                
                                                                  'input_3[0][0]']          

In [32]:
# setting up optimizer and loss
binary_cross_loss = tf.losses.BinaryCrossentropy()
opt = tf.keras.optimizers.RMSprop(learning_rate=1e-3)

In [33]:
# setting up checkpoints
checkpoint_dir = './vgg-checkpoints/training_checkpoints_SGD_1e-2_200_vgg_custom'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)

In [34]:
# custom function for each training step
@tf.function
def train_step(batch):
    with tf.GradientTape() as tape:     
        X = batch[:2]
        y = batch[2]
        
        yhat = siamese_model(X, training=True)
        loss = binary_cross_loss(y, yhat)
    
        
    grad = tape.gradient(loss, siamese_model.trainable_variables)
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))
    
    return loss

In [35]:
# training and saving checkpoints
def train(data, EPOCHS):
    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))
        
        for idx, batch in enumerate(data):
            train_step(batch)
            progbar.update(idx+1)
        
        if epoch % 25 == 0: 
            checkpoint.save(file_prefix=checkpoint_prefix)

In [36]:
EPOCHS = 200

In [37]:
train(train_data, EPOCHS)


 Epoch 1/200
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)

 Epoch 2/200

 Epoch 3/200

 Epoch 4/200

 Epoch 5/200

 Epoch 6/200

 Epoch 7/200

 Epoch 8/200

 Epoch 9/200

 Epoch 10/200

 Epoch 11/200

 Epoch 12/200

 Epoch 13/200

 Epoch 14/200

 Epoch 15/200

 Epoch 16/200

 Epoch 17/200

 Epoch 18/200

 Epoch 19/200

 Epoch 20/200

 Epoch 21/200

 Epoch 22/200

 Epoch 23/200

 Epoch 24/200

 Epoch 25/200

 Epoch 26/200

 Epoch 27/200

 Epoch 28/200

 Epoch 29/200

 Epoch 30/200

 Epoch 31/200

 Epoch 32/200

 Epoch 33/200

 Epoch 34/200

 Epoch 35/200

 Epoch 36/200

 Epoch 37/200

 Epoch 38/200

 Epoch 39/200

 Epoch 40/200

 Epoch 41/200

 Epoch 42/200

 Epoch 43/200

 Epoch 44/200

 Epoch 45/200

 Epoch 46/200

 Epoch 47/200

 Epoch 48/200

 Epoch 49/200

 Epoch 50/200

 Epoch 51/200

 Epoch 52/200

 Epoch 53/200

 Epoch 54/200

 Epoch 55/200

 Epoch 56/200

 Epoch 57/200


In [38]:
# saving the model
siamese_model.save('siamesemodel_vgg_custom_200_SGD_1e-2_binary.h5')



In [None]:
# from tensorflow.keras.metrics import Precision, Recall

In [None]:
# test_input, test_val, y_true = test_data.as_numpy_iterator().next()

In [None]:
# y_hat = siamese_model.predict([test_input, test_val])
# y_hat

In [None]:
# [1 if prediction > 0.5 else 0 for prediction in y_hat ]

In [None]:
# y_true

In [None]:
# m = Recall()
# m.update_state(y_true, y_hat)
# m.result().numpy()

In [None]:
# m = Precision()
# m.update_state(y_true, y_hat)
# m.result().numpy()

In [None]:
# plt.figure(figsize=(10,8))

# plt.subplot(1,2,1)
# plt.imshow(test_input[0])

# plt.subplot(1,2,2)
# plt.imshow(test_val[0])

# plt.show()