In [None]:

!python --version

In [None]:
!conda create -n tf_env python=3.8 -y





In [None]:
%pip install tensorflow


In [None]:
!pip install tensorflow==2.4.1 tensorflow-gpu==2.4.1 opencv-python matplotlib

In [None]:

!pip install tensorflow opencv-python matplotlib numpy



In [None]:
'''import tensorflow as tf

# Print TensorFlow version
print("TensorFlow Version:", tf.__version__)

# Check for GPU availability
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    print("GPUs Available:", gpus)
else:
    print("No GPUs Available.")'''

In [None]:

import cv2
import os
import random
import numpy as np
from matplotlib import pyplot as plt

In [None]:
# Import tensorflow dependencies - Functional API
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf

In [None]:
# Avoid OOM errors by setting GPU Memory Consumption Growth
'''gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus: 
    tf.config.experimental.set_memory_growth(gpu, True)'''

In [None]:
# Setup paths
POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')

In [None]:
# Make the directories
os.makedirs(POS_PATH,exist_ok=True)
os.makedirs(NEG_PATH,exist_ok=True)
os.makedirs(ANC_PATH,exist_ok=True)

In [None]:
#http://vis-www.cs.umass.edu/lfw/lfw.tgz

In [None]:

# Uncompress Tar GZ Labelled Faces in the Wild Dataset
!tar -xf lfw.tgz

In [None]:
# Move LFW Images to the following repository data/negative
for directory in os.listdir('lfw'):
    for file in os.listdir(os.path.join('lfw', directory)):
        EX_PATH = os.path.join('lfw', directory, file)
        NEW_PATH = os.path.join(NEG_PATH, file)
        
        os.replace(EX_PATH, NEW_PATH)

In [None]:
# Import uuid library to generate unique image names
import uuid

In [None]:
#os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1()))

In [None]:
# Establish a connection to the webcam
cap = cv2.VideoCapture(0)
while cap.isOpened(): 
    ret, frame = cap.read()

   
    # Cut down frame to 250x250px
    frame = frame[120:120+250,200:200+250, :]
    
    # Collect anchors 
    if cv2.waitKey(1) & 0XFF == ord('a'):
        # Create the unique file path 
        imgname = os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1()))
        # Write out anchor image
        cv2.imwrite(imgname, frame)
    
    # Collect positives
    if cv2.waitKey(1) & 0XFF == ord('p'):
        # Create the unique file path 
        imgname = os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1()))
        # Write out positive image
        cv2.imwrite(imgname, frame)
    
    # Show image back to screen
    cv2.imshow('Image Collection', frame)
    
    # Breaking gracefully
    if cv2.waitKey(1) & 0XFF == ord('q'):
        break
        
# Release the webcam
cap.release()
# Close the image show frame
cv2.destroyAllWindows()

## Data Augmentation

In [None]:


# Set the paths for the image directories
ANC_PATH = "data/anchor"  # Directory containing anchor images
POS_PATH = "data/positive"  # Directory containing positive images

# Data augmentation function
def data_aug(img):
    data = []
    for i in range(9):  # Generate 9 augmented images
        img_aug = tf.image.stateless_random_brightness(img, max_delta=0.02, seed=(1, 2))
        img_aug = tf.image.stateless_random_contrast(img_aug, lower=0.6, upper=1, seed=(1, 3))
        img_aug = tf.image.stateless_random_flip_left_right(img_aug, seed=(np.random.randint(100), np.random.randint(100)))
        img_aug = tf.image.stateless_random_jpeg_quality(img_aug, min_jpeg_quality=90, max_jpeg_quality=100, seed=(np.random.randint(100), np.random.randint(100)))
        img_aug = tf.image.stateless_random_saturation(img_aug, lower=0.9, upper=1, seed=(np.random.randint(100), np.random.randint(100)))
        
        data.append(img_aug)
    
    return data

# Function to augment images in a given directory
def augment_images_in_directory(directory):
    for file_name in os.listdir(directory):
        if file_name.endswith('.jpg'):  # Process only jpg files
            img_path = os.path.join(directory, file_name)
            img = cv2.imread(img_path)
             if img is None:
                raise ValueError(f"Error: Could not read the image at {img_path}")

            # Apply data augmentation
            augmented_images = data_aug(img)

            # Save the augmented images
            for image in augmented_images:
                cv2.imwrite(os.path.join(directory, '{}.jpg'.format(uuid.uuid1())), image.numpy())

            #print(f"Augmented images saved for {file_name} in {directory}")

# Augment images in both directories
augment_images_in_directory(ANC_PATH)
augment_images_in_directory(POS_PATH)

print("Data augmentation completed for all images in both directories.")


In [None]:

'''def data_aug(img):
    data = []
    for i in range(9):
        img = tf.image.stateless_random_brightness(img, max_delta=0.02, seed=(1,2))
        img = tf.image.stateless_random_contrast(img, lower=0.6, upper=1, seed=(1,3))
        # img = tf.image.stateless_random_crop(img, size=(20,20,3), seed=(1,2))
        img = tf.image.stateless_random_flip_left_right(img, seed=(np.random.randint(100),np.random.randint(100)))
        img = tf.image.stateless_random_jpeg_quality(img, min_jpeg_quality=90, max_jpeg_quality=100, seed=(np.random.randint(100),np.random.randint(100)))
        img = tf.image.stateless_random_saturation(img, lower=0.9,upper=1, seed=(np.random.randint(100),np.random.randint(100)))
            
        data.append(img)
    
    return data'''

In [None]:
#import os
#import uuid

In [None]:
'''img_path = os.path.join(ANC_PATH, 'fdb9723d-89e8-11ef-ad57-e8fb1c369fc0.jpg')
img = cv2.imread(img_path)

augmented_images = data_aug(img)

for image in augmented_images:
    cv2.imwrite(os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1())), image.numpy())'''

In [None]:
'''for file_name in os.listdir(os.path.join(POS_PATH)):
    img_path = os.path.join(POS_PATH, file_name)
    img = cv2.imread(img_path)
    augmented_images = data_aug(img) 
    
    for image in augmented_images:
        cv2.imwrite(os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1())), image.numpy())'''

##  3. Load and Preprocess Images

###  3.1 Get Image Directories

In [None]:
anchor=tf.data.Dataset.list_files(ANC_PATH+'/*.jpg').take(300)
positive=tf.data.Dataset.list_files(POS_PATH+'/*.jpg').take(300)
negative=tf.data.Dataset.list_files(NEG_PATH+'/*.jpg').take(300)


In [None]:
dir_test=anchor.as_numpy_iterator()

In [None]:
print(dir_test.next())

3.2 Preprocessing-scale and resize

In [None]:
def preprocess(file_path):

    #Read in image in file path 
    byte_img=tf.io.read_file(file_path)

    #load in the image
    img=tf.io.decode_jpeg(byte_img)

    #resizing the image to be 100x100x3
    img=tf.image.resize(img,(100,100))
    img=img/255.0
    return img

In [None]:
img=preprocess('data\\anchor\\dedb3ff2-8894-11ef-8a22-e8fb1c369fc0.jpg')
plt.imshow(img)
img.numpy().min()

In [None]:
'''# Take one element from the dataset and check its shape
for img in anchor_processed.take(1):
    print(img.shape)'''


#### 3.3 Create labelled dataset



In [None]:
positives=tf.data.Dataset.zip(anchor,positive,tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))) 
negatives=tf.data.Dataset.zip(anchor,negative,tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))) 
data=positives.concatenate(negatives)

In [None]:
samples=data.as_numpy_iterator()

In [None]:
ex=samples.next()

In [None]:
ex

3.4 Build Train and Test Partition


In [None]:
def preprocess_twin(input_image,validation_image,label):
    return (preprocess(input_image),preprocess(validation_image),label)

In [None]:
res=preprocess_twin(*ex)

In [None]:
len(res)

In [None]:
plt.imshow(res[0])

In [None]:
plt.imshow(res[1])

In [None]:
res[2]

In [None]:
#Build dataloader pipeline
data=data.map(preprocess_twin)
data=data.cache()
data=data.shuffle(buffer_size=1024)
#round(len(data))

In [None]:
#Testing whether the suffling is occurred or not and check the output
'''samples=data.as_numpy_iterator()
samp=samples.next()
len(samp)
plt.imshow(samp[0])
plt.imshow(samp[1])
plt.imshow(samp[2])'''



In [None]:
#Training partition
train_data=data.take(round(len(data)*0.8))
train_data=train_data.batch(16)
train_data=train_data.prefetch(8)


In [None]:
# checking whether the batch_size is 16 or not
'''train_samples=train_data.as_numpy_iterator()
t_samp=train_samples.next()
len(t_samp[0])'''


In [None]:
#Testing partition
test_data=data.skip(round(len(data)*0.8))
test_data=data.take(round(len(data)*0.2))
test_data=test_data.batch(16)
test_data=test_data.prefetch(8)

In [None]:
# checking whether the batch_size is 16 or not
'''test_samples=test_data.as_numpy_iterator()
t_samp=test_samples.next()
len(t_samp[0])'''

## 4. Model Engineering


### 4.1 Build Embedding Layer

In [None]:
inp = Input(shape=(100,100,3), name='input_image')

In [None]:
c1 = Conv2D(64, (10,10), activation='relu')(inp)
m1 = MaxPooling2D(64, (2,2), padding='same')(c1)
c2 = Conv2D(128, (7,7), activation='relu')(m1)
m2 = MaxPooling2D(64, (2,2), padding='same')(c2)
c3 = Conv2D(128, (4,4), activation='relu')(m2)
m3 = MaxPooling2D(64, (2,2), padding='same')(c3)
c4 = Conv2D(256, (4,4), activation='relu')(m3)
f1 = Flatten()(c4)
d1 = Dense(4096, activation='sigmoid')(f1)
mod = Model(inputs=[inp], outputs=[d1], name='embedding')
mod.summary()

In [None]:
def make_embedding(): 
    inp = Input(shape=(100,100,3), name='input_image')
    
    # First block
    c1 = Conv2D(64, (10,10), activation='relu')(inp)
    m1 = MaxPooling2D(64, (2,2), padding='same')(c1)
    
    # Second block
    c2 = Conv2D(128, (7,7), activation='relu')(m1)
    m2 = MaxPooling2D(64, (2,2), padding='same')(c2)
    
    # Third block 
    c3 = Conv2D(128, (4,4), activation='relu')(m2)
    m3 = MaxPooling2D(64, (2,2), padding='same')(c3)
    
    # Final embedding block
    c4 = Conv2D(256, (4,4), activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation='sigmoid')(f1)
    
    
    return Model(inputs=[inp], outputs=[d1], name='embedding')


embedding = make_embedding()
embedding.summary()

### 4.2 Build Distance Layer)

In [None]:
# Siamese L1 Distance class
class L1Dist(Layer):
    
    # Init method - inheritance
    def __init__(self, **kwargs):
        super().__init__()
       
    # Magic happens here - similarity calculation
    def call(self, input_embedding, validation_embedding):
         # Ensure the inputs are tensors
        input_embedding = tf.convert_to_tensor(input_embedding)
        validation_embedding = tf.convert_to_tensor(validation_embedding)
        
        return tf.math.abs(input_embedding - validation_embedding)
        #return tf.sqrt(tf.reduce_sum(tf.square(input_embedding - validation_embedding), axis=1))
l1 = L1Dist()


#### 4.3 Make Siamese Model

In [None]:
'''input_image = Input(name='input_img', shape=(100,100,3))
validation_image = Input(name='validation_img', shape=(100,100,3))
inp_embedding = embedding(input_image)
val_embedding = embedding(validation_image)
siamese_layer = l1(inp_embedding,val_embedding)
#print(len(siamese_layer)) 
print(siamese_layer.shape)'''


In [None]:
'''distances=siamese_layer(inp_embedding, val_embedding)
classifier = Dense(1, activation='sigmoid')(distances)
classifier'''

In [None]:
'''siamese_network = Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')
siamese_network.summary()'''

In [None]:
def make_siamese_model(): 
    
    # Anchor image input in the network
    input_image = Input(name='input_img', shape=(100,100,3))
    
    # Validation image in the network 
    validation_image = Input(name='validation_img', shape=(100,100,3))
    
    # Combine siamese distance components
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(input_image), embedding(validation_image))
    
    # Classification layer 
    classifier = Dense(1, activation='sigmoid')(distances)
    
    return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')

In [None]:
siamese_model = make_siamese_model()
siamese_model.summary()

## 5. Training

#### 5.1 Setup Loss and Optimizer

In [None]:
binary_cross_loss = tf.losses.BinaryCrossentropy()
opt = tf.keras.optimizers.Adam(0.0001) # 0.0001

#### 5.2 Establish Checkpoints

In [None]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)

#### 5.3 Build Train Step Function

In [None]:
test_batch = train_data.as_numpy_iterator()
batch_1 = test_batch.next()
X = batch_1
y=batch_1[2]


In [None]:
y=batch_1[:2]
len(y)

In [None]:
@tf.function
def train_step(batch):
    
    # Record all of our operations 
    with tf.GradientTape() as tape:     
        # Get anchor and positive/negative image
        X = batch[:2]
        # Get label
        y = batch[2]
        
        # Forward pass
        
        yhat = siamese_model(X, training=True)
        
        #yhat=tf.squeeze(yhat)
        # Reshape yhat to match y
        yhat = tf.reshape(yhat, y.shape)
        # Ensure yhat has the same shape as y (reshape predictions)
        #yhat = tf.reshape(yhat, y.shape)  # Reshape to match target shape
        # Calculate loss
        loss = binary_cross_loss(y, yhat)
     print(loss)
        
    # Calculate gradients
    grad = tape.gradient(loss, siamese_model.trainable_variables)
    
    # Calculate updated weights and apply to siamese model
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))
    
    # Return loss
    return loss

In [None]:
'''@tf.function
def train_step(batch):
    
    # Record all of our operations 
    with tf.GradientTape() as tape:     
        # Get anchor and positive/negative image
        X = batch[:2]
        # Get label
        y = batch[2]
        
        # Forward pass
        yhat = siamese_model(X, training=True)
        # Reshape y to ensure it matches the shape of yhat
        y = tf.reshape(y, (-1, 1))  # Ensure y has shape [batch_size, 1]
        # Calculate loss
        loss = binary_cross_loss(y, yhat)
    #print(loss)
        
    # Calculate gradients
    grad = tape.gradient(loss, siamese_model.trainable_variables)
    
    # Calculate updated weights and apply to siamese model
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))
        
    # Return loss
    return loss'''

#### 5.4 Build Training Loop

In [None]:
def train(data, EPOCHS):
    # Loop through epochs
    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))
        
        # Loop through each batch
        for idx, batch in enumerate(data):
            # Run train step here
            train_step(batch)
            progbar.update(idx+1)
        
        # Save checkpoints
        if epoch % 10 == 0: 
            checkpoint.save(file_prefix=checkpoint_prefix)

5.5 Train the model

In [None]:
EPOCHS = 60
train(train_data, EPOCHS)


## 6.Evaluate the model

#### 6.1 import metric

In [None]:
#import metric calculations
from tensorflow.keras.metrics import Precision,Recall

#### 6.2make predictions

In [None]:
#get A batch of test data
test_input,test_val,y_true=test_data.as_numpy_iterator().next()

In [None]:
#Make predictions
y_hat=siamese_model.predict([test_input,test_val])
y_hat

In [None]:
#post processing the results
y_hat_flat=y_hat.flatten()
bi_predictions=[1 if prediction > 0.8 else 0 for prediction in y_hat_flat]
bi_predictions

In [None]:
y_true

#### 6.3 Calculate Metrices

In [None]:
m=Recall()
m.update_state(y_true,y_hat_flat)
m.result().numpy()

#### 6.4 Viz results

In [None]:
#set plot size
plt.figure(figsize=(10,8))

#set 1st subplot
plt.subplot(1,2,1)
plt.imshow(test_input[2])

#set 2nd subplot
plt.subplot(1,2,2)
plt.imshow(test_val[2])
plt.show()


## 7.Save model

In [None]:
#save weights
siamese_model.save("siamesemodel.h5")

In [None]:
#Reload the model
model= tf.keras.models.load_model("siamesemodel.h5",
                                custom_objects={"L1Dist":L1Dist})

In [None]:
# Make predictions with reloaded model
siamese_model.predict([test_input, test_val])

In [None]:
# View model summary
siamese_model.summary()

## 8. Real Time Test

#### 8.1 Verification Function

In [None]:
os.listdir(os.path.join('application_data', 'verification_images'))

In [None]:
os.path.join('application_data', 'input_image', 'input_image.jpg')

In [None]:
def verify(model, detection_threshold, verification_threshold):
    # Build results array
    results = []
    for image in os.listdir(os.path.join('application_data', 'verification_images')):
        input_img = preprocess(os.path.join('application_data', 'input_image', 'input_image.jpg'))
        validation_img = preprocess(os.path.join('application_data', 'verification_images', image))
        
        # Make Predictions 
        result = model.predict(list(np.expand_dims([input_img, validation_img], axis=1)))
        results.append(result)
    
    # Detection Threshold: Metric above which a prediciton is considered positive 
    detection = np.sum(np.array(results) > detection_threshold)
    
    # Verification Threshold: Proportion of positive predictions / total positive samples 
    verification = detection / len(os.listdir(os.path.join('application_data', 'verification_images'))) 
    #verified = verification > verification_threshold
    
    return results, verification

#### 8.2 OpenCV Real Time Verification

In [None]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()
    frame = frame[120:120+250,200:200+250, :]
    
    cv2.imshow('Verification', frame)
    
    # Verification trigger
    if cv2.waitKey(10) & 0xFF == ord('v'):
        # Save input image to application_data/input_image folder 
#         hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
#         h, s, v = cv2.split(hsv)

#         lim = 255 - 10
#         v[v > lim] = 255
#         v[v <= lim] -= 10
        
#         final_hsv = cv2.merge((h, s, v))
#         img = cv2.cvtColor(final_hsv, cv2.COLOR_HSV2BGR)

        cv2.imwrite(os.path.join('application_data', 'input_image', 'input_image.jpg'), frame)
        # Run verification
        results, verification = verify(siamese_model, 0.99, 0.7)
        #print(verified)
        print(verification)
    
    if cv2.waitKey(10) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()
verification.type()

In [None]:
len(results)

In [None]:
np.sum(np.squeeze(results) > 0.9)

In [None]:
results