In [1]:
# Import standard dependencies
from keras.applications.vgg16 import VGG16
from keras.layers import Dense, Input, GlobalAveragePooling2D, Flatten
from keras.models import Model
import keras
import tensorflow as tf
import cv2
import os
import random
import numpy as np
from matplotlib import pyplot as plt


In [2]:
# Setup paths
POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')

In [3]:
anchor = tf.data.Dataset.list_files(ANC_PATH+'\*.jpg').take(600)
positive = tf.data.Dataset.list_files(POS_PATH+'\*.jpg').take(600)
negative = tf.data.Dataset.list_files(NEG_PATH+'\*.jpg').take(600)

In [4]:
dir_test = anchor.as_numpy_iterator()

In [5]:
print(dir_test.next())

b'data\\anchor\\ba578a91-f3cc-11ee-b754-973b43a59e11.jpg'


In [6]:
def preprocess(file_path):
    
    # Read in image from file path
    byte_img = tf.io.read_file(file_path)
    # Load in the image 
    img = tf.io.decode_jpeg(byte_img)
    
    # Preprocessing steps - resizing the image to be 100x100x3
    img = tf.image.resize(img, (100,100))
    # Scale image to be between 0 and 1 
    img = img / 255.0
    
    # Return image
    return img

In [7]:
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

In [8]:
def preprocess_twin(input_img, validation_img, label):
    return(preprocess(input_img), preprocess(validation_img), label)

In [9]:
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=1024)

In [10]:
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [11]:
# Testing partition
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

In [12]:
class L1Dist(keras.layers.Layer):
    def __init__(self, **kwargs):
        super(L1Dist, self).__init__(**kwargs)

    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding[0] - validation_embedding[0])

In [13]:
def make_embedding():
    inp = Input(shape=(100, 100, 3), name='input_image')
    vgg = VGG16(include_top=False,input_shape=(100,100,3))(inp)
    p = GlobalAveragePooling2D()(vgg)
    f = Flatten()(p)
    d = Dense(512, activation='sigmoid')(f)
    

    return Model(inputs=[inp], outputs=[d], name='embedding')

In [14]:
embedding = make_embedding()
input_image = Input(name='input_img', shape=(100, 100, 3))
validation_image = Input(name='validation_img', shape=(100, 100, 3))

inp_embedding = embedding(input_image)
val_embedding = embedding(validation_image)

distance = L1Dist()
distances = distance(inp_embedding, val_embedding)
classifier = Dense(1, activation='sigmoid')(distances)
vgg_network = Model(inputs=[input_image, validation_image], outputs=classifier, name='VGGNetwork')




In [15]:
embedding.summary()

In [16]:
vgg_network.summary()

In [17]:
binary_cross_loss = tf.losses.BinaryCrossentropy()
opt = tf.keras.optimizers.Adam(1e-4) # 0.0001
checkpoint_dir = './vgg_training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=vgg_network)
test_batch = train_data.as_numpy_iterator()
batch_1 = test_batch.next()
X = batch_1[:2]
y = batch_1[2]

In [18]:
@tf.function
def train_step(batch):
    
    # Record all of our operations 
    with tf.GradientTape() as tape:     
        # Get anchor and positive/negative image
        X = batch[:2]
        # Get label
        y = batch[2]
        
        # Forward pass
        yhat = vgg_network(X, training=True)
        # Calculate loss
        loss = binary_cross_loss(y, yhat)
    print(loss)
        
    # Calculate gradients
    grad = tape.gradient(loss, vgg_network.trainable_variables)
    
    # Calculate updated weights and apply to siamese model
    opt.apply_gradients(zip(grad, vgg_network.trainable_variables))
    
    # Return loss
    return loss

In [19]:
def train(data, EPOCHS):
    # Loop through epochs
    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))
        
        # Loop through each batch
        for idx, batch in enumerate(data):
            # Run train step here
            train_step(batch)
            progbar.update(idx+1)
        
        # Save checkpoints
        if epoch % 10 == 0: 
            checkpoint.save(file_prefix=checkpoint_prefix)

In [20]:
EPOCHS = 50

In [21]:
train(train_data, EPOCHS)


 Epoch 1/50
Tensor("binary_crossentropy/truediv:0", shape=(), dtype=float32)
Tensor("binary_crossentropy/truediv:0", shape=(), dtype=float32)
[1m16/17[0m [32m━━━━━━━━━━━━━━━━━━[0m[37m━━[0m [1m1s[0m 2s/stepTensor("binary_crossentropy/truediv:0", shape=(), dtype=float32)
[1m17/17[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m37s[0m 2s/step

 Epoch 2/50
[1m17/17[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m33s[0m 2s/step

 Epoch 3/50
[1m17/17[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m32s[0m 2s/step

 Epoch 4/50
[1m17/17[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m31s[0m 2s/step

 Epoch 5/50
[1m17/17[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m32s[0m 2s/step

 Epoch 6/50
[1m17/17[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m32s[0m 2s/step

 Epoch 7/50
[1m17/17[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m32s[0m 2s/step

 Epoch 8/50
[1m17/17[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m31s[0m 2s/step

 Epoch 9/50
[1m17/17[0m [32m━━━━━━━━━━━━━━

In [22]:
from keras.metrics import Precision, Recall

test_input, test_val, y_true = test_data.as_numpy_iterator().next()

In [23]:
y_hat = vgg_network.predict([test_input, test_val])
y_hat

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 708ms/step


array([[3.7021926e-04],
       [9.9960029e-01],
       [7.0564228e-04],
       [2.3559788e-04],
       [2.0772417e-04],
       [4.2950229e-05],
       [9.9972236e-01],
       [3.1115767e-04],
       [9.9819458e-01],
       [6.9859438e-05],
       [9.9990916e-01],
       [3.7109721e-04],
       [9.9969333e-01],
       [3.8650603e-04],
       [1.7400959e-04],
       [9.9997258e-01]], dtype=float32)

In [30]:
[1 if prediction > 0.5 else 0 for prediction in y_hat ]

[0, 1, 0, 0, 0, 0, 1, 0, 1, 0, 1, 0, 1, 0, 0, 1]

In [24]:
y_true

array([0., 1., 0., 0., 0., 0., 1., 0., 1., 0., 1., 0., 1., 0., 0., 1.],
      dtype=float32)

In [25]:
m = Recall()

# Calculating the recall value 
m.update_state(y_true, y_hat)

# Return Recall Result
m.result().numpy()

1.0

In [26]:
# Creating a metric object 
m = Precision()

# Calculating the recall value 
m.update_state(y_true, y_hat)

# Return Recall Result
m.result().numpy()

1.0

In [27]:
vgg_network.save('vgg_model.keras')

In [28]:
model = tf.keras.models.load_model('vgg_model.keras', custom_objects={'L1Dist':L1Dist})

In [29]:
model.summary()