# PART 1

## Install Dependencies

In [1]:
import cv2
import os
import random
import numpy as np
from matplotlib import pyplot as plt

In [2]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf



## SET GPU Growth

In [3]:
import tensorflow as tf
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

In [4]:
len(gpus)

0

## Create Folder structures

In [3]:
# Setup Paths
POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')

In [4]:
# Create Directories
# os.makedirs(POS_PATH)
# os.makedirs(NEG_PATH)
# os.makedirs(ANC_PATH)

# Part 2

## Create Dataset
Done locally

In [5]:
# Run only 1 time for extracting the tgz file
# !tar -xf lfw.tgz
# import uuid

In [6]:
# For copying images from name folder of lfw to negative folder of our data
# for directory in os.listdir('lfw'):
#     for file in os.listdir(os.path.join('lfw', directory)):
#         EX_PATH  = os.path.join('lfw', directory, file)
#         NEW_PATH = os.path.join(NEG_PATH, file)
#         os.replace(EX_PATH, NEW_PATH)

In [7]:
# Run only 1 time, for copying the file from tfw folder to our data folder

# for directory in os.listdir('lfw'):
#     for file in os.listdir(os.path.join('lfw', directory)):
#         EX_PATH  = os.path.join('lfw', directory, file)
#         NEW_PATH = os.path.join(NEG_PATH, file)
#         os.replace(EX_PATH, NEW_PATH)

In [8]:
# # Establish a connection to webcam and capture all the images of anchor and postiive

# cap = cv2.VideoCapture(1)
# while cap.isOpened():
#     ret, frame = cap.read()

#     # Cut down frame to 250x250px
#     frame = frame[70:70+250,220:220+250, :]

#     # Collect anchors
#     if cv2.waitKey(1) & 0XFF == ord('a'):
#         # Create the unique file path
#         imgname = os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1()))
#         # Write out anchor image
#         cv2.imwrite(imgname, frame)

#     # Collect positives
#     if cv2.waitKey(1) & 0XFF == ord('p'):
#         # Create the unique file path
#         imgname = os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1()))
#         # Write out positive image
#         cv2.imwrite(imgname, frame)

#     # Show image back to screen
#     cv2.imshow('Image Collection', frame)

#     # Breaking gracefully
#     if cv2.waitKey(1) & 0XFF == ord('q'):
#         break


# # Release the webcam
# cap.release()
# # Close the image show frame
# cv2.destroyAllWindows()


# Part 3
 Load and Preprocess Image

## Get Image Directories

In [9]:
import tensorflow as tf

In [10]:
anchor = tf.data.Dataset.list_files(ANC_PATH+'/*.jpg').take(300)
positive = tf.data.Dataset.list_files(POS_PATH+'/*.jpg').take(300)
negative = tf.data.Dataset.list_files(NEG_PATH+'/*.jpg').take(300)

In [11]:
dir_test = anchor.as_numpy_iterator()

In [12]:
print(dir_test.next())

b'data\\anchor\\580412e0-f7ce-11ee-9d3b-c8e265d0c50f.jpg'


## Preprocessing - Scale and Resize

In [13]:
def preprocess(file_path):

    # Read in image from file path
    byte_img = tf.io.read_file(file_path)
    # Load in the image
    img = tf.io.decode_jpeg(byte_img)

    # Preprocessing steps - resizing the image to be 100x100x3
    img = tf.image.resize(img, (100,100))
    # Scale image to be between 0 and 1
    img = img / 255.0

    # Return image
    return img

In [14]:
img = preprocess('data\\anchor\\2ba5370f-f7ce-11ee-ab82-c8e265d0c50f.jpg')

In [15]:
print(img.numpy().max())
print(img.numpy().min())

1.0
0.0031862746


In [16]:
# plt.imshow(img)

## Create Labelled Dataset

In [17]:
# (anchor, positive) => 1,1,1,1,1,1
# (anchor, negative) => 0,0,0,0,0,0

In [18]:
# tf.ones(len(anchor))

In [19]:
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

In [20]:
data

<_ConcatenateDataset element_spec=(TensorSpec(shape=(), dtype=tf.string, name=None), TensorSpec(shape=(), dtype=tf.string, name=None), TensorSpec(shape=(), dtype=tf.float32, name=None))>

In [21]:
samples = data.as_numpy_iterator()

In [22]:
samples.next()

(b'data\\anchor\\28ac311b-f7ce-11ee-abe4-c8e265d0c50f.jpg',
 b'data\\positive\\8be044f2-f7ce-11ee-b9f0-c8e265d0c50f.jpg',
 1.0)

In [23]:
def preprocess_twin(input_img, validation_img, label):
    return(preprocess(input_img), preprocess(validation_img), label)

In [24]:
example = samples.next()

In [25]:
res = preprocess_twin(*example)
#  print(len(res)) -> 3 Values

In [26]:
# res[0] -> Anchor image
# res[1] -> Positive OR Negative image
# res[2] -> Label 0 or 1
# plt.imshow(res[0])

In [27]:
res[2]

1.0

In [28]:
# Build dataloader pipeline
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=1024)

In [29]:
data

<_ShuffleDataset element_spec=(TensorSpec(shape=(100, 100, None), dtype=tf.float32, name=None), TensorSpec(shape=(100, 100, None), dtype=tf.float32, name=None), TensorSpec(shape=(), dtype=tf.float32, name=None))>

In [30]:
samples = data.as_numpy_iterator()
samples.next()

(array([[[0.6127451 , 0.60490197, 0.55784315],
         [0.61470586, 0.6068627 , 0.5598039 ],
         [0.61470586, 0.6068627 , 0.5598039 ],
         ...,
         [0.6039216 , 0.59607846, 0.54509807],
         [0.60612744, 0.5982843 , 0.5473039 ],
         [0.6117647 , 0.59607846, 0.54901963]],
 
        [[0.6127451 , 0.60490197, 0.55784315],
         [0.61470586, 0.6068627 , 0.5598039 ],
         [0.61470586, 0.6068627 , 0.5598039 ],
         ...,
         [0.6039216 , 0.59607846, 0.54509807],
         [0.6046569 , 0.59681374, 0.54583335],
         [0.6127451 , 0.59705883, 0.55      ]],
 
        [[0.6068627 , 0.60490197, 0.55784315],
         [0.6102941 , 0.6068627 , 0.5598039 ],
         [0.61470586, 0.6068627 , 0.5598039 ],
         ...,
         [0.6017157 , 0.59387255, 0.54289216],
         [0.6129902 , 0.6007353 , 0.55196077],
         [0.6186274 , 0.60294116, 0.55588233]],
 
        ...,
 
        [[0.51666665, 0.5147059 , 0.49803922],
         [0.5129902 , 0.5110294 , 0.49436

In [31]:
len(samples.next())

3

In [32]:
samp = samples.next()

In [33]:
img1 = samp[0]
img2 = samp[1]
res = samp[2]

In [34]:
# plt.imshow(img1)

In [35]:
# plt.imshow(img2)

In [36]:
res

0.0

In [37]:
round(len(data)*.7) # -> train data is 70%

420

## Experimental

In [38]:
train_data = data.take(round(len(data)*.70)) # -> train data is 70%

In [39]:
len(train_data)
it = train_data.as_numpy_iterator()

In [40]:
tn = it.next()
# fig, axs = plt.subplots(1, 2)
# axs[0].imshow(tn[0])
# axs[1].imshow(tn[1])
# plt.show()

In [41]:
tn[2]

0.0

In [42]:
test_data = data.skip(round(len(data)*.7)) # Skip first 70% of data, as they are used in training
test_data = test_data.take(round(len(data)*.3)) # use last 30% of data
len(test_data)

180

In [43]:
test_data = data.skip(round(len(data)*.7)) # Skip first 70% of data, as they are used in training
test_data = test_data.take(round(len(data)*.3)) # use last 30% of data

In [44]:
len(train_data)

420

In [45]:
len(test_data)

180

In [46]:
# # Training partition
# train_data = data.take(round(len(data)*.7)) # -> train data is 70%
# train_data = train_data.batch(16) # We are going to pass train set as a batch of 16 images
# train_data = train_data.prefetch(8) # This start preprocessing next set of images

In [47]:
# train_sample = train_samples.next()

In [48]:
# len(train_sample[0]) # Size of 1 batch is 16, so now our sample is in batch size

In [49]:
# # Testing Partition
# test_data = data.skip(round(len(data)*.7)) # Skip first 70% of data, as they are used in training
# test_data = test_data.take(round(len(data)*.3)) # use last 30% of data
# test_data = test_data.batch(16)
# test_data = test_data.prefetch(8)

# Part 4
Model Engineering

## Build Embedding Layer
Everything is according to the research paper

In [50]:
# This is input layer
inp = Input(shape=(100,100,3), name='input_image')
inp
# (Batch_size, 100 pixels, 100 pixels, 3 channel)
# IN paper value is 105 and not 100, but 100 will work

<KerasTensor shape=(None, 100, 100, 3), dtype=float32, sparse=None, name=input_image>

In [51]:
c1 = Conv2D(64, (10,10), activation='relu')(inp)
c1

<KerasTensor shape=(None, 91, 91, 64), dtype=float32, sparse=False, name=keras_tensor>

In [52]:
inp = Input(shape=(100,100,3), name='input_image')
# First Block
c1 = Conv2D(64, (10,10), activation='relu')(inp)
m1 = MaxPooling2D(64, (2,2), padding='same')(c1)

# Second Block
c2 = Conv2D(128, (7,7), activation='relu')(m1)
m2 = MaxPooling2D(64, (2,2), padding='same')(c2)

# Third Block
c3 = Conv2D(128, (4,4), activation='relu')(m2)
m3 = MaxPooling2D(64, (2,2), padding='same')(c3)

# Final embedding block
c4 = Conv2D(256, (4,4), activation='relu')(m3)
f1 = Flatten()(c4)
d1 = Dense(4096, activation='sigmoid')(f1)

# Full Model as given in paper

In [53]:
mod = Model(inputs = [inp], outputs=[d1], name='embedding')
mod.summary()
# Pura Model

In [54]:
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten, Dropout, GlobalAveragePooling2D
def make_embedding():
    inputs = Input((100, 100, 1))
    x = Conv2D(96, (11, 11), padding="same", activation="relu")(inputs)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = Dropout(0.3)(x)

    x = Conv2D(256, (5, 5), padding="same", activation="relu")(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = Dropout(0.3)(x)

    x = Conv2D(384, (3, 3), padding="same", activation="relu")(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = Dropout(0.3)(x)

    pooledOutput = GlobalAveragePooling2D()(x)
    pooledOutput = Dense(1024)(pooledOutput)
    outputs = Dense(128)(pooledOutput)

    model = Model(inputs, outputs)
    return model

In [55]:
embedding = make_embedding()

In [56]:
embedding.summary()

## Build Distance Layer
L1

In [57]:
# Siamese L1 Distance class
class L1Dist(Layer):

    # Init method - inheritance
    def __init__(self, **kwargs):
        super().__init__()

    # Magic happens here - similarity calculation
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

In [58]:
l1 = L1Dist()

In [59]:
l1

<L1Dist name=l1_dist, built=False>

## Make Siamese Network

In [60]:
input_image = Input(name='input_img', shape=(100,100,3))
inp_embedding = embedding(input_image)
# embedding function, 1 input image lega, and usko flatten krke 4096 values
# me convert kr dega, as per Siamese network

In [61]:
validation_image = Input(name='validation_img', shape=(100,100,3))
val_embedding = embedding(validation_image)
# Same for validation

In [62]:
siamese_layer = L1Dist()
# This function takes 2 input, see above and gives the distance

In [63]:
distances = siamese_layer(inp_embedding, val_embedding)




In [64]:
classifier = Dense(1, activation='sigmoid')(distances)

In [65]:
classifier

<KerasTensor shape=(None, 1), dtype=float32, sparse=False, name=keras_tensor_48>

In [57]:
siamese_network = Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')
siamese_network.summary()

In [66]:
def make_siamese_model():
  # Handle inputs
  # Anchor Image input in the network
  input_image = Input(name='input_img', shape=(100,100,3))

  # Validation Image in the network
  validation_image = Input(name='validation_img', shape=(100,100,3))

  # Combine siamese distance components
  siamese_layer = L1Dist()
  siamese_layer._name = 'distance'
  distances = siamese_layer(embedding(input_image), embedding(validation_image))

  # Classification Layer (Last Layer) -> Gives 0 or 1
  classifier = Dense(1, activation='sigmoid')(distances)

  return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')


In [67]:
siamese_model = make_siamese_model()

In [68]:
siamese_model.summary() # Same as above of siamese_network.summary() as both are same

# Training

## Setup Loss and Optimizer
optimizer - Use Backpropogation and find the best values


In [63]:
# feature_extractor = siamese_model()
# imgA = Input(shape=(64, 64, 1))
# imgB = Input(shape=(64, 64, 1))
# featA = feature_extractor(imgA)
# featB = feature_extractor(imgB)

In [69]:
binary_cross_loss = tf.losses.BinaryCrossentropy()
# Loss Function is Binary Cross Entropy

In [70]:
opt = tf.keras.optimizers.Adam(1e-4)
# We are using Adam optimzer, with learning rate = 0.0001

## Establish Checkpoints

In [66]:
# checkpoint_dir = os.path.join('drive', 'MyDrive', 'Sem 2', 'IP', 'training_checkpoints')
# checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
# checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)
# # We will save or component as a checkpoint, and use this again and agian

## Build Train Step Function
Applied to a single batch

The Basic Flow for training on one batch is :-
1. Make a prediction
2. Calculate Loss
3. Derive Gradients
4. Calculate new weights and apply

In [71]:
test_batch = train_data.as_numpy_iterator()
batch_1 = test_batch.next()

In [72]:
# We are working on batch of 16
# 1 batch contains 3 things
# 1. Anchor Image
# 2. Validation Image
# 3. Label

In [73]:
X = batch_1[:2]
np.array(X).shape
# Each batch contains of 3 things, anchor, validation and result
# We are slicing the array by taking only 2 elements
# the INPUT image, and VALIDATION image.

(2, 100, 100, 3)

In [74]:
y = batch_1[2]
y
# This are the corresponding the labels

0.0

In [75]:
@tf.function
def train_step(batch):

  # This will find the GRADIENTS of all the functions and store in tape.
  with tf.GradientTape() as tape:
    # Get anchor and positive/negative image
    X = batch[:2]
    # Get Label
    y = batch[2]

    # Forward Pass
    yhat = siamese_model(X, training=True)
    # Calculate Loss
    loss = binary_cross_loss(y, yhat)

  # Calculate Gradients
  grad = tape.gradient(loss, siamese_model.trainable_variables)

  # Calculate updated weights and apply to siamese model
  # This will use ADAM's optimization technique
  opt.apply_gradients(zip(grad, siamese_model.trainable_variables))

  # Return Loss
  return loss


## Build Training Loop
Applied to all the batches

In [76]:
# Import metric calculations
from tensorflow.keras.metrics import Precision, Recall

In [77]:
def train(data, EPOCHS):
    # Loop through epochs
    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))

        # Loop through each batch
        for idx, batch in enumerate(data):
            # Run train step here
            loss = train_step(batch)
            progbar.update(idx+1)

        # Save checkpoints
        if epoch % 10 == 0:
            checkpoint.save(file_prefix=checkpoint_prefix)

# # This will go to each batch, and find the loss for each batch
# # and accordingly learn the weights
# def train(data, EPOCHS):
#     # Loop through epochs
#     for epoch in range(1, EPOCHS+1):
#         print('\n Epoch {}/{}'.format(epoch, EPOCHS))
#         progbar = tf.keras.utils.Progbar(len(data))

#         # Creating a metric object
#         r = Recall()
#         p = Precision()

#         # Loop through each batch
#         for idx, batch in enumerate(data):
#             # Run train step here
#             loss = train_step(batch)
#             yhat = siamese_model.predict(batch[:2])
#             r.update_state(batch[2], yhat)
#             p.update_state(batch[2], yhat)
#             progbar.update(idx+1)
#         print(loss.numpy(), r.result().numpy(), p.result().numpy())

#         # Save checkpoints
#         if epoch % 10 == 0:
#             checkpoint.save(file_prefix=checkpoint_prefix)

## Train the model

In [78]:
EPOCHS = 50

In [79]:
# train(train_data, EPOCHS)

## Import Metrics

In [80]:
from tensorflow.keras.metrics import Precision, Recall

## Make Predictions

In [81]:
# Get a batch of test data
test_input, test_val, y_true = test_data.as_numpy_iterator().next()

In [82]:
print(len(test_input), len(test_val))
print(y_true)

100 100
0.0


In [83]:
y_hat = siamese_model.predict([test_input, test_val])

In [None]:
y_hat

In [84]:
# Post processing the results
[1 if prediction > 0.5 else 0 for prediction in y_hat ]

In [85]:
y_true

0.0