In [2]:
#importing std dependencies
import cv2
import os
import numpy as np
import matplotlib.pyplot as plt


In [3]:
#importing pytorch dependencies
import torch
import torchvision
from torchmetrics.classification import Accuracy, Precision, Recall

In [4]:
device = torch.accelerator.current_accelerator().type if torch.accelerator.is_available() else "cpu"
print(f"Using {device} device")

Using mps device


In [5]:
#creating folders
POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')
'''os.makedirs(POS_PATH)
os.makedirs(NEG_PATH)
os.makedirs(ANC_PATH)'''

'os.makedirs(POS_PATH)\nos.makedirs(NEG_PATH)\nos.makedirs(ANC_PATH)'

moving lfw images to negative folder

In [8]:
for directory in os.listdir('lfw_funneled'):
    if os.path.isdir(os.path.join('lfw_funneled',directory,file)):
            for file in os.listdir('lfw_funneled'+'/'+directory):
        
                
                EX_PATH = os.path.join('lfw_funneled',directory,file)
                NEW_PATH = os.path.join(NEG_PATH,file)
                os.replace(EX_PATH,NEW_PATH)

collecting anchor and positive images

In [8]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()
    # cutting frame to 250 by 250px
    frame = frame[0:250, 0:250 ,:]

    cv2.imshow("frame", frame)
    #collecting anchors 
    if cv2.waitKey(1) & 0XFF == ord('a'):
        pass

    #collecting positives
    if cv2.waitKey(1) & 0XFF == ord('p'):
      pass

    if cv2.waitKey(1) & 0XFF == ord('q'):
      break
cap.release()
cv2.destroyAllWindows()

Preprocessing images

In [6]:
from torchvision.io import decode_image,read_file


def preprocess(img_path):
    img_bytes = read_file(img_path)
    image = decode_image(img_bytes)
    image = torchvision.transforms.Resize((100,100))(image)
    image = image / 255.0
    return image

In [7]:
def preprocess_twin(input_img,validation_img,label):
    input_img = preprocess(input_img)
    validation_img = preprocess(validation_img)
    return (input_img,validation_img,label)

Creating labelled dataset


In [8]:
from torch.utils.data import Dataset, DataLoader
import glob

anchor_files = glob.glob(ANC_PATH + '/*.jpg')[:300]
positive_files = glob.glob(POS_PATH + '/*.jpg')[:300]
negative_files = glob.glob(NEG_PATH + '/*.jpg')[:300]


# Dataset that returns (anchor_path, other_path, label)
class PairedFilePathDataset(Dataset):
    def __init__(self, anchor_files, other_files, label,transform = None):
        assert len(anchor_files) == len(other_files), "Anchor and other files must match in length"
        self.anchor_files = anchor_files
        self.other_files = other_files
        self.label = label
        self.transform = transform
        self.cache = {}  # in-memory caching

    def __len__(self):
        return len(self.anchor_files)

    def __getitem__(self, idx):
        if idx in self.cache:
            return self.cache[idx]
        anchor_path = self.anchor_files[idx]
        other_path = self.other_files[idx]
        label_tensor = torch.tensor(self.label, dtype=torch.float32)
         # Apply preprocessing function here
        if self.transform:
            anchor_path, other_path,label_tensor = self.transform(anchor_path, other_path,label_tensor)
        sample = (anchor_path, other_path, label_tensor)
        self.cache[idx] = sample  # cache it in memory
        return sample


# Create positive and negative datasets
positive_dataset = PairedFilePathDataset(anchor_files, positive_files, label=1,transform=preprocess_twin)
negative_dataset = PairedFilePathDataset(anchor_files, negative_files, label=0,transform=preprocess_twin)

# Combine datasets manually (no ConcatDataset nesting)
full_dataset = positive_dataset + negative_dataset  # works because __getitem__ returns tuples


# Custom collate function to return tuple directly
def single_collate_fn(batch):
    # batch is a list of 1 item if batch_size=1
    anchor, other, label = batch[0]
    return anchor, other, label


# DataLoader
paired_dataloader = DataLoader(
    full_dataset,
    batch_size=1,
    shuffle=True,
    collate_fn=single_collate_fn
)

# Example usage
example = next(iter(paired_dataloader))
example


(tensor([[[0.6824, 0.6824, 0.6824,  ..., 0.4235, 0.4235, 0.4157],
          [0.6824, 0.6824, 0.6824,  ..., 0.4275, 0.4275, 0.4275],
          [0.6824, 0.6824, 0.6824,  ..., 0.4275, 0.4353, 0.4314],
          ...,
          [0.2000, 0.1961, 0.2157,  ..., 0.6039, 0.5961, 0.5843],
          [0.1961, 0.2039, 0.2980,  ..., 0.6118, 0.6118, 0.6196],
          [0.2000, 0.2353, 0.3412,  ..., 0.6157, 0.6196, 0.6078]],
 
         [[0.5765, 0.5765, 0.5765,  ..., 0.3608, 0.3608, 0.3569],
          [0.5765, 0.5765, 0.5765,  ..., 0.3608, 0.3647, 0.3686],
          [0.5765, 0.5765, 0.5765,  ..., 0.3686, 0.3725, 0.3686],
          ...,
          [0.2157, 0.2118, 0.2314,  ..., 0.6824, 0.6784, 0.6667],
          [0.2157, 0.2196, 0.3137,  ..., 0.6824, 0.6824, 0.6980],
          [0.2235, 0.2549, 0.3569,  ..., 0.6824, 0.6863, 0.6784]],
 
         [[0.5020, 0.5020, 0.5020,  ..., 0.3137, 0.3137, 0.2980],
          [0.5020, 0.5020, 0.5020,  ..., 0.3176, 0.3176, 0.3059],
          [0.5098, 0.5059, 0.5059,  ...,

Building train and test partitions

In [9]:
len(example)

3

In [None]:
plt.imshow(example[0].permute(1,2,0))

Building a data pipeline

In [11]:
from torch.utils.data import random_split
total_len = len(full_dataset)
train_len = int(0.7 * total_len)
test_len = total_len - train_len
train_dataset, test_dataset = random_split(full_dataset, [train_len, test_len])

train_loader = DataLoader(
    train_dataset,
    batch_size=16,
    shuffle=True      # shuffles the dataset every epoch
)

test_loader = DataLoader(
    test_dataset,
    batch_size=16,
    shuffle=False
)


Building an embedding layer 

In [12]:
import torch.nn as nn
import torch.nn.functional as F

class Embedding(nn.Module):
    def __init__(self):
        super(Embedding, self).__init__()
        self.conv1 = nn.Conv2d(3, 64, 10)
        self.conv2 = nn.Conv2d(64, 128, 7)
        self.conv3 = nn.Conv2d(128, 128, 4)
        self.conv4 = nn.Conv2d(128, 256, 4)

        self.fc1 = nn.Linear(256 * 5 * 5, 4096)

    def forward(self, x):
        x = F.max_pool2d(F.relu(self.conv1(x)), 2)
        x = F.max_pool2d(F.relu(self.conv2(x)), 2)
        x = F.max_pool2d(F.relu(self.conv3(x)), 2)
        x = F.relu(self.conv4(x))
        x = x.view(x.size(0), -1)
        x = torch.sigmoid(self.fc1(x))  
        return x

model = Embedding()
model

Embedding(
  (conv1): Conv2d(3, 64, kernel_size=(10, 10), stride=(1, 1))
  (conv2): Conv2d(64, 128, kernel_size=(7, 7), stride=(1, 1))
  (conv3): Conv2d(128, 128, kernel_size=(4, 4), stride=(1, 1))
  (conv4): Conv2d(128, 256, kernel_size=(4, 4), stride=(1, 1))
  (fc1): Linear(in_features=6400, out_features=4096, bias=True)
)

Creating siamese L1 distance layer

In [13]:
class L1Dist(nn.Module):
    def __init__(self)->None:
        super().__init__()
    
    def forward(self,input_embedding,validation_embedding):
        return torch.abs(input_embedding - validation_embedding)


Creating siamese model

In [14]:
class SiameseNeuralNetwork(nn.Module):
    def __init__(self):
        super(SiameseNeuralNetwork, self).__init__()
        self.embedding = Embedding()
        self.L1Dist = L1Dist()
        self.fc1 = nn.Linear(4096,1)

    def forward(self,input_img,validation_img):
        input_embedding = self.embedding(input_img)
        validation_embedding = self.embedding(validation_img)

        l1_distance = self.L1Dist(input_embedding,validation_embedding)
        output = torch.sigmoid(self.fc1(l1_distance))
        return output

model = SiameseNeuralNetwork()

Creating Train step model

In [15]:
batch_1 = next(iter(test_loader))

In [16]:
batch_1[2]

tensor([0., 1., 1., 1., 1., 0., 0., 0., 1., 0., 1., 1., 1., 1., 1., 0.])

setting up checkpoints

In [23]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')

setting up loss and optimizer 

In [24]:
lossfn = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(),lr = 0.0001)

Training function

In [25]:
def train_one_epoch(batch):
    model.train()
    anchor_img,validation_img,label = batch
    label = label.unsqueeze(1)
    optimizer.zero_grad()
    output = model(anchor_img,validation_img)
    loss = lossfn(output,label)
    print(loss)
    loss.backward()
    optimizer.step()
    return loss

Training loop

In [26]:
def train(data,epochs):
    for epoch in range(1,epochs+1):
        print('\n Epoch {}/{}'.format(epoch, epochs))
        for batch in data:
            loss = train_one_epoch(batch)
            print(f"Epoch {epoch+1} Loss: {loss}")

        if epoch % 10 == 0:
            torch.save(model.state_dict(), f"{checkpoint_prefix}/siamese_epoch_{epoch}.pth")
            print(f"Model saved at epoch {epoch}")

Training Model

In [27]:
epochs = 50
train(train_loader,epochs)


 Epoch 1/50
tensor(0.6941, grad_fn=<BinaryCrossEntropyBackward0>)
Epoch 2 Loss: 0.6941414475440979
tensor(0.6898, grad_fn=<BinaryCrossEntropyBackward0>)
Epoch 2 Loss: 0.6897565126419067
tensor(0.6810, grad_fn=<BinaryCrossEntropyBackward0>)
Epoch 2 Loss: 0.6810295581817627
tensor(0.6646, grad_fn=<BinaryCrossEntropyBackward0>)
Epoch 2 Loss: 0.6645702123641968
tensor(0.6809, grad_fn=<BinaryCrossEntropyBackward0>)
Epoch 2 Loss: 0.6809360980987549

 Epoch 2/50
tensor(0.6460, grad_fn=<BinaryCrossEntropyBackward0>)
Epoch 3 Loss: 0.646033763885498
tensor(0.5600, grad_fn=<BinaryCrossEntropyBackward0>)
Epoch 3 Loss: 0.5599795579910278
tensor(0.5813, grad_fn=<BinaryCrossEntropyBackward0>)
Epoch 3 Loss: 0.5812582969665527
tensor(0.4297, grad_fn=<BinaryCrossEntropyBackward0>)
Epoch 3 Loss: 0.4297126531600952
tensor(0.5201, grad_fn=<BinaryCrossEntropyBackward0>)
Epoch 3 Loss: 0.5201035141944885

 Epoch 3/50
tensor(0.3565, grad_fn=<BinaryCrossEntropyBackward0>)
Epoch 4 Loss: 0.3565317988395691
tenso

Evaluating model and making visualizations

In [27]:
test_input, test_val, test_label = batch_1
y_pred = model(test_input,test_val)
y_pred

tensor([[1.6790e-01],
        [9.9999e-01],
        [1.0000e+00],
        [9.8267e-01],
        [9.9392e-01],
        [9.5789e-01],
        [9.3023e-01],
        [9.9815e-01],
        [1.0000e+00],
        [2.1119e-06],
        [9.1153e-01],
        [9.9770e-01],
        [9.9975e-01],
        [9.9829e-01],
        [9.9974e-01],
        [7.6667e-06]], grad_fn=<SigmoidBackward0>)

In [28]:
y_pred = torch.tensor([1 if prediction > 0.5 else 0 for prediction in y_pred])
y_pred

tensor([0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 0])

In [20]:
test_label

tensor([0., 1., 1., 1., 1., 0., 0., 0., 1., 0., 1., 1., 1., 1., 1., 0.])

In [29]:
#calculating metrics
p = Precision(task="binary")
a = Accuracy(task="binary")
r = Recall(task="binary")
precision = p(y_pred,test_label)
accuracy = a(y_pred,test_label)
recall = r(y_pred,test_label)
print("precision:",precision.item())
print("accuracy:",accuracy.item())
print("recall:",recall.item())


precision: 0.7692307829856873
accuracy: 0.8125
recall: 1.0


Visualizing Results

In [None]:
plt.figure(figsize=(8,8))
plt.subplot(1,2,1)
plt.imshow(test_input[5].permute(1,2,0))
plt.subplot(1,2,2)
plt.imshow(test_val[5].permute(1,2,0))
plt.show()


Saving model

In [50]:
torch.save(model, 'siamese_model.pth')

In [23]:
#reloading the model 
model = torch.load('siamese_model.pth',weights_only=False)
model.eval()


SiameseNeuralNetwork(
  (embedding): Embedding(
    (conv1): Conv2d(3, 64, kernel_size=(10, 10), stride=(1, 1))
    (conv2): Conv2d(64, 128, kernel_size=(7, 7), stride=(1, 1))
    (conv3): Conv2d(128, 128, kernel_size=(4, 4), stride=(1, 1))
    (conv4): Conv2d(128, 256, kernel_size=(4, 4), stride=(1, 1))
    (fc1): Linear(in_features=6400, out_features=4096, bias=True)
  )
  (L1Dist): L1Dist()
  (fc1): Linear(in_features=4096, out_features=1, bias=True)
)

In [24]:
#predicting 
prediction = model(test_input,test_val)
prediction

tensor([[1.6790e-01],
        [9.9999e-01],
        [1.0000e+00],
        [9.8267e-01],
        [9.9392e-01],
        [9.5789e-01],
        [9.3023e-01],
        [9.9815e-01],
        [1.0000e+00],
        [2.1119e-06],
        [9.1153e-01],
        [9.9770e-01],
        [9.9975e-01],
        [9.9829e-01],
        [9.9974e-01],
        [7.6667e-06]], grad_fn=<SigmoidBackward0>)

Real time test

In [25]:
def verify(model,detection_threshold,verification_threshold):
    results = []
    for image in os.listdir(os.path.join('application_data','verification_images')):
        if(image.endswith('.jpg')==False):
            continue
        input_img = preprocess(os.path.join('application_data','input_image','input_image.jpg'))
        validation_img = preprocess(os.path.join('application_data','verification_images',image))
        
        input_img = input_img.unsqueeze(0)
        validation_img = validation_img.unsqueeze(0)
        
        output = model(input_img,validation_img)
        results.append(output.item())

        #detection threshold: tells us is above which value we should consider it to be a positive match
        #verification threshold: tells us what percentage of verification images should match to consider it verified
        detection = np.sum(np.array(results) > detection_threshold)
        verification = detection / len(os.listdir(os.path.join('application_data','verification_images')))
        verified = verification > verification_threshold
    return results, verified

In [26]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()
    # cutting frame to 250 by 250px
    frame = frame[275:275+250, 550:550+250, :]

    cv2.imshow("frame", frame)
    #saving input image
    if cv2.waitKey(1) & 0XFF == ord('v'):
        input_img_path = os.path.join('application_data','input_image','input_image.jpg')
        cv2.imwrite(input_img_path, frame)
        results, verified = verify(model,0.5,0.5)
        print(f"verified: {verified}, results: {results}")

    if cv2.waitKey(1) & 0XFF == ord('q'):
      break
cap.release()
cv2.destroyAllWindows()

verified: True, results: [0.9998984336853027, 0.9997974038124084, 0.07506275177001953, 0.9999966621398926, 0.999798595905304, 0.9998936653137207, 0.9999730587005615, 0.998054027557373, 0.9999892711639404, 0.9999885559082031, 0.9997119307518005, 0.9999001026153564, 0.9998830556869507, 0.14195623993873596, 0.9941732287406921, 0.07823236286640167, 0.993735134601593, 0.999997615814209, 0.9793763160705566, 0.9988034963607788, 0.9994121789932251, 0.9999897480010986, 0.9998098015785217, 0.5837090611457825, 0.7138606309890747, 0.998222291469574, 0.999901533126831, 0.9998866319656372, 0.7465131282806396, 0.9999946355819702, 0.9999963045120239, 0.9994511008262634, 0.995814859867096, 0.9998763799667358, 0.9999855756759644, 0.999996542930603, 0.8863745331764221, 0.13079853355884552, 0.9999043941497803, 0.9958627223968506, 0.22529682517051697, 0.9981738328933716, 0.06748965382575989, 0.995241641998291, 0.9999936819076538, 0.9997188448905945, 0.9766618013381958, 0.9982652068138123, 0.999997019767761