Based on https://www.cs.cmu.edu/~rsalakhu/papers/oneshot1.pdf

In [2]:
# from google.colab import drive
# drive.mount('/content/drive')

In [2]:
!pip install tensorflow tensorflow-gpu opencv-python matplotlib



In [3]:
import cv2
import os
import random
import numpy as np
import matplotlib.pyplot as plt

from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf

In [4]:
# Set GPU growth to avoid out of memory errors

gpus = tf.config.experimental.list_physical_devices("GPU")
for gpu in gpus:
  tf.config.experimental.set_memory_growth(gpu, True)


In [5]:
# Setting paths
POS_PATH = os.path.join('data','positive')
NEG_PATH = os.path.join('data','negative')
ANC_PATH = os.path.join('data','anchor')

# # Making the directories
# os.makedirs(POS_PATH)
# os.makedirs(NEG_PATH)
# os.makedirs(ANC_PATH)

Labeled data from http://vis-www.cs.umass.edu/lfw/

In [5]:
# Uncompress the data and move it to negative folder
# !tar -xf lfw.tgz
# for directory in os.listdir('lfw'):
#   for file_name in os.listdir(os.path.join('lfw', directory)):
#     EX_PATH = os.path.join('lfw', directory, file_name)
#     NEW_PATH = os.path.join(NEG_PATH, file_name)
#     os.replace(EX_PATH, NEW_PATH)


In [15]:
# Import uuid library to generate unique image names
import uuid

# Establish a connection to the webcam
# cap = cv2.VideoCapture(0)

# while cap.isOpened(): 
#     ret, frame = cap.read()
   
#     # Cut down frame to 250x250px
#     frame = frame[120:120+250,150:150+250, :]
    
#     # Collect anchors 
#     if cv2.waitKey(1) & 0XFF == ord('a'):
#         # Create the unique file path 
#         imgname = os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1()))
#         # Write out anchor image
#         cv2.imwrite(imgname, frame)
    
#     # Collect positives
#     if cv2.waitKey(1) & 0XFF == ord('p'):
#         # Create the unique file path 
#         imgname = os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1()))
#         # Write out positive image
#         cv2.imwrite(imgname, frame)
    
#     # Show image back to screen
#     cv2.imshow('Image Collection', frame)
    
#     # Breaking gracefully
#     if cv2.waitKey(1) & 0XFF == ord('q'):
#         break
        
# # Release the webcam
# cap.release()
# # Close the image show frame
# cv2.destroyAllWindows()

In [6]:
# Preprocess images

anchor = tf.data.Dataset.list_files(ANC_PATH+ '\*.jpg').take(300)
positive = tf.data.Dataset.list_files(POS_PATH+ '\*.jpg').take(300)
negative = tf.data.Dataset.list_files(NEG_PATH+ '\*.jpg').take(300)

def preprocess(file_path):
    byte_img = tf.io.read_file(file_path)
    img = tf.io.decode_jpeg(byte_img)
    img = tf.image.resize(img, (105,105))
    img = img/255.0
    return img

positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

def preprocess_twin(input_img, validation_img, label):
    return (preprocess(input_img), preprocess(validation_img), label)
    
# Data pipeline
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size = 1024)


In [7]:
sample_size = round(len(data)* 0.7)
train_data = data.take(sample_size)
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

test_data = data.skip(sample_size)
test_data = test_data.take(round(len(data)*0.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

In [30]:
def make_embedding():
    def build_block(image, conv_size,dimention):
        conv = Conv2D(dimention, (conv_size, conv_size),activation = 'relu')(image)
        return MaxPooling2D(64,(2,2), padding = 'same')(conv)
    
    input_image = Input(shape = (105,105,3), name= 'input_img')
    
    block_1 = build_block(input_image, 10, 64)
    block_2 = build_block(block_1, 7, 128)
    block_3 = build_block(block_2, 4, 128)
    
    conv_layer = Conv2D(256, (4,4),activation = 'relu')(block_3)
    flatten_layer = Flatten()(conv_layer)
    dense_layer = Dense(4096, activation ='sigmoid')(flatten_layer)
    
    return Model(inputs = [input_image], outputs=[dense_layer], name='embedding')
embedding = make_embedding()

In [37]:
# building L1 Distance layer (https://stackoverflow.com/questions/59659494/euclidean-distance-in-keras)
class L1Dist(Layer):
    def __init__(self, **kwargs):
        super().__init__()
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)
        


In [39]:

def make_siamese_model():
    
    input_image = Input(name = 'input_imgage', shape =(105,105,3))
    validation_image = Input(name = 'validation_image', shape =(105,105,3))
    
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distance_layer = siamese_layer(embedding(input_image), embedding(validation_image))
    
    classifier = Dense(1, activation = 'sigmoid')(distance_layer)
    
    return Model(inputs = [input_image, validation_image], outputs = classifier, name = 'SiameseNetwork')

siamese_model = make_siamese_model()

In [41]:
siamese_model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_imgage (InputLayer)      [(None, 105, 105, 3  0           []                               
                                )]                                                                
                                                                                                  
 validation_image (InputLayer)  [(None, 105, 105, 3  0           []                               
                                )]                                                                
                                                                                                  
 embedding (Functional)         (None, 4096)         38960448    ['input_imgage[0][0]',           
                                                                  'validation_image[0

In [49]:
# Training setup

binary_cross_loss = tf.losses.BinaryCrossentropy(from_logits = True)
opt = tf.keras.optimizers.Adam(1e-4)

checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt = opt, siamese_model = siamese_model)

# model.load(checkpoint_dir) # to get pretrained checkpoints instead of new train



In [53]:
@tf.function
def train_step(batch):
    with tf.GradientTape() as tape:
        x = batch[:2]
        y_true = batchf[2]
        
        y_pred = siamese_model(x, training = True)
        loss = binary_cross_loss(y_true, y_pred)
        print(loss)
        
        gradient = tape.gradient(loss, siamese_model.trainable_variables)
        opt.apply_gradient(zip(gradient, siamese_model.trainable_variables))
        
        return loss

In [None]:
# training loop
def train(data, EPOCHS):
    for epoch in range(1, EPOCHS + 1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(train_data))
        
        for idx,batch in enumerate(train_data):
            train_step(batch)
            progbar.update(idx+1)
            
        if epoch % 10 ==0:
            checkpoint.save(file_prefix= checkpoint_prefix)
            