In [1]:
import cv2
import os
import matplotlib.pyplot as plt
import random
import numpy as np

In [2]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf
import uuid

In [3]:
# Set GPU growth
gpus = tf.config.experimental.list_physical_devices("GPU")
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

### Setting up folder Structure

In [4]:
POS_PATH = os.path.join("data", "positive")
NEG_PATH = os.path.join("data", "negative")
ANC_PATH = os.path.join("data", "anchor")

In [12]:
# Make Directories
os.makedirs(POS_PATH)
os.makedirs(NEG_PATH)
os.makedirs(ANC_PATH)

### Uncompress tar file and move images

In [13]:
!tar -xf lfw.tgz

In [15]:
for directory in os.listdir("lfw"):
    for file in os.listdir(os.path.join('lfw', directory)):
        EX_PATH = os.path.join("lfw", directory, file)
        NEW_PATH = os.path.join(NEG_PATH, file)
        os.replace(EX_PATH, NEW_PATH)

### Collect Positive and Anchor class

### Images are collected using image collector file which uses openCV

#### Data Aug

In [5]:
def data_aug(img):
    data = []
    for i in range(9):
        img = tf.image.stateless_random_brightness(img, max_delta=0.02, seed=(1,2))
        img = tf.image.stateless_random_contrast(img, lower=0.6, upper=1, seed=(1,3))
        # img = tf.image.stateless_random_crop(img, size=(20,20,3), seed=(1,2))
        img = tf.image.stateless_random_flip_left_right(img, seed=(np.random.randint(100),np.random.randint(100)))
        img = tf.image.stateless_random_jpeg_quality(img, min_jpeg_quality=90, max_jpeg_quality=100, seed=(np.random.randint(100),np.random.randint(100)))
        img = tf.image.stateless_random_saturation(img, lower=0.9,upper=1, seed=(np.random.randint(100),np.random.randint(100)))
            
        data.append(img)
    
    return data

In [None]:
for file_name in os.listdir(os.path.join(POS_PATH)):
    img_path = os.path.join(POS_PATH, file_name)
    img = cv2.imread(img_path)
    augmented_images = data_aug(img) 
    
    for image in augmented_images:
        cv2.imwrite(os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1())), image.numpy())

## Load And preprocessing

In [6]:
anchor = tf.data.Dataset.list_files(ANC_PATH+'/*.jpg').take(300)
positive = tf.data.Dataset.list_files(POS_PATH+'/*.jpg').take(300)
negative = tf.data.Dataset.list_files(NEG_PATH+'/*.jpg').take(300)

In [7]:
def preprocess(file_path):
    
    # Read in image from file path
    byte_img = tf.io.read_file(file_path)
    # Load in the image 
    img = tf.io.decode_jpeg(byte_img)
    
    # Preprocessing steps - resizing the image to be 100x100x3
    img = tf.image.resize(img, (100,100))
    # Scale image to be between 0 and 1 
    img = img / 255.0

    # Return image
    return img

In [8]:
anchor.as_numpy_iterator().next()


b'data/anchor/f20f7cf9-a67b-11ec-a71b-84a938529240.jpg'

In [9]:
preprocess("data/anchor/fbe227ff-a67b-11ec-a54c-84a938529240.jpg")

<tf.Tensor: shape=(100, 100, 3), dtype=float32, numpy=
array([[[0.77156866, 0.7519608 , 0.74019605],
        [0.77156866, 0.7519608 , 0.7392157 ],
        [0.77156866, 0.7519608 , 0.7362745 ],
        ...,
        [0.7497549 , 0.7144608 , 0.68897057],
        [0.7490196 , 0.7137255 , 0.69411767],
        [0.7441176 , 0.7088235 , 0.68921566]],

       [[0.7735294 , 0.75392157, 0.74215686],
        [0.7735294 , 0.75392157, 0.7411765 ],
        [0.7735294 , 0.75392157, 0.7382353 ],
        ...,
        [0.75      , 0.7147059 , 0.6887255 ],
        [0.75      , 0.7147059 , 0.6931372 ],
        [0.74607843, 0.7107843 , 0.68921566]],

       [[0.7764706 , 0.75686276, 0.74509805],
        [0.7764706 , 0.75686276, 0.7441176 ],
        [0.7764706 , 0.75686276, 0.7411765 ],
        ...,
        [0.7529412 , 0.7176471 , 0.6960784 ],
        [0.7519608 , 0.71666664, 0.6906863 ],
        [0.7529412 , 0.7176471 , 0.6901961 ]],

       ...,

       [[0.6625    , 0.65857846, 0.64485294],
        [0.66

## Create labelled dataset

In [10]:
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

In [11]:
sample = data.as_numpy_iterator()

In [12]:
example = sample.next()

### Build Train and Test Partition

In [13]:
def preprocess_twin(input_img, validation_img, label):
    return(preprocess(input_img), preprocess(validation_img), label)

In [14]:
res = preprocess_twin(*example)

In [15]:
# Build dataloader pipeline
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=10000)

In [16]:
# Training partition
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)


In [17]:
# Testing partition
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

### Model Engineering

In [18]:
## Embedding layer

def make_embedding():
    inp = Input(shape=(100,100,3), name='input_image')
    
    c1 = Conv2D(64, (10,10), activation='relu')(inp)
    m1 = MaxPooling2D(64, (2,2), padding='same')(c1)
    
    c2 = Conv2D(128, (7,7), activation='relu')(m1)
    m2 = MaxPooling2D(64, (2,2), padding='same')(c2)
    
    c3 = Conv2D(128, (4,4), activation='relu')(m2)
    m3 = MaxPooling2D(64, (2,2), padding='same')(c3)
    
    c4 = Conv2D(256, (4,4), activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation='sigmoid')(f1)
    
    
    return Model(inputs=[inp], outputs=[d1], name='embedding')

In [23]:
embedding = make_embedding()

In [24]:
# Siamese L1 Distance class
class L1Dist(Layer):
    
    # Init method - inheritance
    def __init__(self, **kwargs):
        super().__init__()
       
    # Magic happens here - similarity calculation
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

In [25]:
def make_siamese_model(): 
    
    # Anchor image input in the network
    input_image = Input(name='input_img', shape=(100,100,3))
    
    # Validation image in the network 
    validation_image = Input(name='validation_img', shape=(100,100,3))
    
    # Combine siamese distance components
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(input_image), embedding(validation_image))
    
    # Classification layer 
    classifier = Dense(1, activation='sigmoid')(distances)
    
    return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')

In [30]:
siamese_model = make_siamese_model()
siamese_model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_img (InputLayer)         [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 validation_img (InputLayer)    [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 embedding (Functional)         (None, 4096)         38960448    ['input_img[0][0]',              
                                                                  'validation_img[0][

## Training

In [31]:
binary_cross_loss = tf.losses.BinaryCrossentropy()

In [32]:
opt = tf.keras.optimizers.Adam(1e-4)

In [33]:
# Establish Checkpoints
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)

In [34]:
## Training Step
@tf.function
def train_step(batch):
    
    # Record all of our operations 
    with tf.GradientTape() as tape:     
        # Get anchor and positive/negative image
        X = batch[:2]
        # Get label
        y = batch[2]
        
        # Forward pass
        yhat = siamese_model(X, training=True)
        # Calculate loss
        loss = binary_cross_loss(y, yhat)
    print(loss)
        
    # Calculate gradients
    grad = tape.gradient(loss, siamese_model.trainable_variables)
    
    # Calculate updated weights and apply to siamese model
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))
        
    # Return loss
    return loss

In [35]:
## Training Loop

In [36]:
# Import metric calculations
from tensorflow.keras.metrics import Precision, Recall

In [37]:
def train(data, EPOCHS):
    # Loop through epochs
    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))
        
        # Creating a metric object 
        r = Recall()
        p = Precision()
        
        # Loop through each batch
        for idx, batch in enumerate(data):
            # Run train step here
            loss = train_step(batch)
            yhat = siamese_model.predict(batch[:2])
            r.update_state(batch[2], yhat)
            p.update_state(batch[2], yhat) 
            progbar.update(idx+1)
        print(loss.numpy(), r.result().numpy(), p.result().numpy())
        
        # Save checkpoints
        if epoch % 10 == 0: 
            checkpoint.save(file_prefix=checkpoint_prefix)

In [38]:
EPOCHS = 50

In [39]:
train(train_data, EPOCHS)


 Epoch 1/50
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)
0.4726411 0.25870648 1.0

 Epoch 2/50
0.03083205 0.93838865 1.0

 Epoch 3/50
0.00062949123 0.9798995 0.9898477

 Epoch 4/50
0.00029029994 1.0 1.0

 Epoch 5/50
0.000104179584 0.9949495 1.0

 Epoch 6/50
0.010683576 0.9953052 1.0

 Epoch 7/50
1.4841858e-05 1.0 0.9953052

 Epoch 8/50
0.00202232 1.0 0.995283

 Epoch 9/50
0.06516875 1.0 1.0

 Epoch 10/50
0.001586797 0.98578197 0.9904762

 Epoch 11/50
0.005792315 1.0 0.99523807

 Epoch 12/50
0.0002925712 1.0 1.0

 Epoch 13/50
6.2436334e-06 1.0 1.0

 Epoch 14/50
0.0015130199 1.0 1.0

 Epoch 15/50
0.0010976442 1.0 1.0

 Epoch 16/50
7.488403e-05 1.0 1.0

 Epoch 17/50
9.509478e-05 1.0 1.0

 Epoch 18/50
0.00014956217 1.0 1.0

 Epoch 19/50
1.281501e-06 1.0 1.0

 Epoch 20/50
3.914837e-05 1.0 1.0

 Epoch 21/50
3.904119e-06 1.0 1.0

 Epoch 22/50
3.6063397e-05 1.0 1.0

 Epoch 23/50
2.6734

## Predictions

In [43]:
test_input, test_val, y_true = test_data.as_numpy_iterator().next()

In [44]:
# Make Predictions
predictions = siamese_model.predict([test_input, test_val])
predictions

array([[2.40502285e-08],
       [9.99999881e-01],
       [1.08785486e-07],
       [8.53380868e-07],
       [1.00000000e+00],
       [3.72569048e-07],
       [1.00000000e+00],
       [1.82244584e-08],
       [9.99999881e-01],
       [4.57443130e-05],
       [1.00000000e+00],
       [8.36157952e-08],
       [4.45058674e-08],
       [9.99827981e-01],
       [2.44839021e-06],
       [1.00000000e+00]], dtype=float32)

In [46]:
# Post processing the results
[1 if prediction > 0.5 else 0 for prediction in predictions]

[0, 1, 0, 0, 1, 0, 1, 0, 1, 0, 1, 0, 0, 1, 0, 1]

### Calculate Matrics

In [48]:
# Creating a metric object 
m = Recall()

# Calculating the recall value 
m.update_state(y_true, predictions)

# Return Recall Result
m.result().numpy()

1.0

In [49]:
# Creating a metric object 
m = Precision()

# Calculating the recall value 
m.update_state(y_true, predictions)

# Return Recall Result
m.result().numpy()

1.0

In [51]:
r = Recall()
p = Precision()

for test_input, test_val, y_true in test_data.as_numpy_iterator():
    yhat = siamese_model.predict([test_input, test_val])
    r.update_state(y_true, yhat)
    p.update_state(y_true,yhat) 

print(r.result().numpy(), p.result().numpy())

1.0 1.0


In [None]:
## Visualize Results
# Set plot size 
plt.figure(figsize=(10,8))

# Set first subplot
plt.subplot(1,2,1)
plt.imshow(test_input[0])

# Set second subplot
plt.subplot(1,2,2)
plt.imshow(test_val[0])

# Renders cleanly
plt.show()

### Save Model

In [59]:
# Save weights
siamese_model.save('siamesemodelv2.h5')



In [55]:
L1Dist

__main__.L1Dist

In [65]:
# Reload model 
siamese_model = tf.keras.models.load_model('siamesemodelv2.h5', compile = False,
                                   custom_objects={'L1Dist':L1Dist, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy})

In [72]:
# METRICS = [
#       tf.keras.metrics.TruePositives(name='tp'),
#       tf.keras.metrics.FalsePositives(name='fp'),
#       tf.keras.metrics.TrueNegatives(name='tn'),
#       tf.keras.metrics.FalseNegatives(name='fn'), 
#       tf.keras.metrics.BinaryAccuracy(name='accuracy'),
#       tf.keras.metrics.Precision(name='precision'),
#       tf.keras.metrics.Recall(name='recall'),
#       tf.keras.metrics.AUC(name='auc'),
# ]

# siamese_model.compile(optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4),
#               loss = 'binary_crossentropy',
#               metrics = METRICS
#              )

In [73]:
# Make predictions with reloaded model
siamese_model.predict([test_input, test_val])

array([[9.9999511e-01],
       [8.5947534e-07],
       [5.8255893e-07],
       [9.9999738e-01]], dtype=float32)

In [74]:
# View model summary
siamese_model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_img (InputLayer)         [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 validation_img (InputLayer)    [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 embedding (Functional)         (None, 4096)         38960448    ['input_img[0][0]',              
                                                                  'validation_img[0][

# Real Time Verification

### For real time verification refer the verification.py file in the current directory