<a href="https://colab.research.google.com/github/mohamedshouaib/iti/blob/main/Computer_Vision/Day01/siamesenetwork_faceverification4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!git clone https://github.com/mohamedshouaib/iti.git
!cd iti/Computer_Vision/Day01/Siamese

Cloning into 'iti'...
remote: Enumerating objects: 668, done.[K
remote: Counting objects: 100% (453/453), done.[K
remote: Compressing objects: 100% (444/444), done.[K
remote: Total 668 (delta 86), reused 0 (delta 0), pack-reused 215 (from 1)[K
Receiving objects: 100% (668/668), 27.23 MiB | 21.28 MiB/s, done.
Resolving deltas: 100% (158/158), done.


In [2]:
import os
import cv2
import csv
import time
import random
import argparse
import numpy as np
from PIL import Image
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.autograd import Variable
import torchvision.transforms as transforms
from datetime import datetime
from pytz import timezone
from tqdm import tqdm

In [3]:
# Hyper Parameters
BATCH_SIZE = 32
IMAGE_SIZE = (150, 150)

In [4]:
# Dataset Loading Functions (Your existing code)
def load_dataset(base_path="iti/Computer_Vision/Day01/Siamese"):
    data = {'train': {}, 'test': {}}

    for person in os.listdir(base_path):
        person_path = os.path.join(base_path, person)
        if not os.path.isdir(person_path):
            continue

        for split in ['Train', 'Test']:
            split_path = os.path.join(person_path, split)
            if not os.path.exists(split_path):
                print(f"Missing {split} folder for {person}")
                continue

            csv_files = [f for f in os.listdir(split_path) if f.endswith('.csv')]
            if not csv_files:
                print(f"No CSV found in {split_path}")
                continue

            csv_path = os.path.join(split_path, csv_files[0])

            genuine = []
            forged = []

            with open(csv_path, 'r') as f:
                try:
                    reader = csv.DictReader(f)
                    row = next(reader)

                    img_col = None
                    label_col = None

                    for col in row.keys():
                        col_lower = col.lower()
                        if 'image' in col_lower or 'name' in col_lower:
                            img_col = col
                        elif 'label' in col_lower or 'class' in col_lower:
                            label_col = col

                    if not img_col or not label_col:
                        raise ValueError("Couldn't detect required columns")

                    f.seek(0)
                    next(reader)

                    for row in reader:
                        img_name = row[img_col].strip()
                        img_path = os.path.join(split_path, img_name)

                        if not os.path.exists(img_path):
                            print(f"Missing image: {img_path}")
                            continue

                        label = row[label_col].strip().lower()
                        if label == 'real' or label == 'genuine':
                            genuine.append(img_path)
                        elif label == 'forged' or label == 'fake':
                            forged.append(img_path)

                except Exception as e:
                    print(f"Error reading {csv_path}: {str(e)}")
                    continue

            if genuine or forged:
                data[split.lower()][person] = {
                    'genuine': genuine,
                    'forged': forged
                }

    return data['train'], data['test']

In [5]:
def generate_triplets(data_dict, split='train'):
    triplets = []
    persons = list(data_dict[split].keys())

    for person in persons:
        genuine = data_dict[split][person]['genuine']
        forged = data_dict[split][person]['forged']

        for i in range(len(genuine)):
            for j in range(i+1, len(genuine)):
                anchor = genuine[i]
                positive = genuine[j]

                for neg in forged:
                    triplets.append((anchor, positive, neg))

    return triplets

In [6]:
# Custom Dataset Class
class SignatureTripletDataset(Dataset):
    def __init__(self, triplets, transform=None):
        self.triplets = triplets
        self.transform = transform or transforms.Compose([
            transforms.Resize(IMAGE_SIZE),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.5], std=[0.5])
        ])

    def __len__(self):
        return len(self.triplets)

    def __getitem__(self, idx):
        anchor_path, positive_path, negative_path = self.triplets[idx]

        anchor = Image.open(anchor_path).convert('L')
        positive = Image.open(positive_path).convert('L')
        negative = Image.open(negative_path).convert('L')

        if self.transform:
            anchor = self.transform(anchor)
            positive = self.transform(positive)
            negative = self.transform(negative)

        return anchor, positive, negative

In [7]:
# Model Architecture
class SiameseNetwork(nn.Module):
    def __init__(self):
        super(SiameseNetwork, self).__init__()

        self.cnn = nn.Sequential(
            nn.Conv2d(1, 64, kernel_size=5, padding=2),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(64),
            nn.MaxPool2d(2),
            nn.Dropout(0.3),  # Added dropout

            nn.Conv2d(64, 128, kernel_size=5, padding=2),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(128),
            nn.MaxPool2d(2),
            nn.Dropout(0.3),  # Added dropout

            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(256),
            nn.MaxPool2d(2),
            nn.Dropout(0.3),  # Added dropout

            nn.Conv2d(256, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(512),
            nn.Dropout(0.5),  # Higher dropout before final layers

            nn.Flatten(),
            nn.Linear(512*18*18, 1024),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),  # Dropout in fully connected layer
            nn.Linear(1024, 256)  # Smaller embedding size
        )

    def forward_once(self, x):
        return self.cnn(x)

    def forward(self, input1, input2, input3):
        output1 = self.forward_once(input1)
        output2 = self.forward_once(input2)
        output3 = self.forward_once(input3)
        return output1, output2, output3

In [8]:
# Triplet Loss
class TripletLoss(nn.Module):
    def __init__(self, margin=1.0):
        super(TripletLoss, self).__init__()
        self.margin = margin

    def forward(self, anchor, positive, negative):
        pos_dist = F.pairwise_distance(anchor, positive, 2)
        neg_dist = F.pairwise_distance(anchor, negative, 2)
        losses = F.relu(pos_dist - neg_dist + self.margin)
        return losses.mean()

In [9]:
def train_model(train_loader, test_loader, args):
    model = SiameseNetwork()
    if args.cuda:
        model = model.cuda()

    # Initialize Triplet Loss criterion with margin from args
    criterion = TripletLoss(margin=args.margin)

    # Optimizer with L2 regularization
    optimizer = torch.optim.Adam(model.parameters(), lr=0.0001, weight_decay=1e-4)

    # Learning rate scheduler
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, mode='min', patience=2, factor=0.5, verbose=True
    )

    # Early stopping setup
    best_val_loss = float('inf')
    patience = 3
    patience_counter = 0

    train_losses = []
    val_losses = []

    for epoch in range(args.epochs):
        # Training phase
        model.train()
        epoch_train_loss = 0.0

        with tqdm(train_loader, unit="batch", desc=f"Epoch {epoch+1}/{args.epochs} [Train]") as tepoch:
            for anchor, positive, negative in tepoch:
                if args.cuda:
                    anchor, positive, negative = anchor.cuda(), positive.cuda(), negative.cuda()

                optimizer.zero_grad()
                anchor_out, pos_out, neg_out = model(anchor, positive, negative)
                loss = criterion(anchor_out, pos_out, neg_out)

                loss.backward()
                # Gradient clipping
                torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
                optimizer.step()

                epoch_train_loss += loss.item()
                tepoch.set_postfix(loss=loss.item())

        avg_train_loss = epoch_train_loss / len(train_loader)
        train_losses.append(avg_train_loss)

        # Validation phase
        model.eval()
        epoch_val_loss = 0.0

        with torch.no_grad():
            with tqdm(test_loader, unit="batch", desc=f"Epoch {epoch+1}/{args.epochs} [Val]") as vepoch:
                for anchor, positive, negative in vepoch:
                    if args.cuda:
                        anchor, positive, negative = anchor.cuda(), positive.cuda(), negative.cuda()

                    anchor_out, pos_out, neg_out = model(anchor, positive, negative)
                    val_loss = criterion(anchor_out, pos_out, neg_out)
                    epoch_val_loss += val_loss.item()
                    vepoch.set_postfix(loss=val_loss.item())

        avg_val_loss = epoch_val_loss / len(test_loader)
        val_losses.append(avg_val_loss)

        # Update learning rate based on validation loss
        scheduler.step(avg_val_loss)

        # Early stopping check
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            torch.save(model.state_dict(), 'best_model.pth')
            patience_counter = 0
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print(f"\nEarly stopping triggered at epoch {epoch+1}")
                break

        # Print epoch summary
        print(f"\nEpoch {epoch+1} Summary:")
        print(f"Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f}")
        print(f"Learning Rate: {optimizer.param_groups[0]['lr']:.2e}")
        print(f"Best Val Loss: {best_val_loss:.4f}")

      # Plot training history
      plt.figure(figsize=(10, 5))
      plt.plot(train_losses, label='Training Loss')
      plt.plot(val_losses, label='Validation Loss')
      plt.xlabel('Epochs')
      plt.ylabel('Loss')
      plt.title('Training and Validation Loss')
      plt.legend()
      plt.savefig('training_history.png')
      plt.close()

    return model

In [10]:
class Args:
    epochs = 5
    margin = 1.0
    cuda = torch.cuda.is_available()

args = Args()


In [11]:
# Load data
print("Loading dataset...")
train_data, test_data = load_dataset()
print("Generating triplets...")
train_triplets = generate_triplets({'train': train_data, 'test': test_data}, 'train')
test_triplets = generate_triplets({'train': train_data, 'test': test_data}, 'test')

print(f"Training triplets: {len(train_triplets)}")
print(f"Testing triplets: {len(test_triplets)}")

# Create datasets
print("Creating datasets...")
train_dataset = SignatureTripletDataset(train_triplets)
test_dataset = SignatureTripletDataset(test_triplets)

# Create dataloaders
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

# Train model
print("Starting training...")
model = train_model(train_loader, test_loader, args)

# Save final model
torch.save(model.state_dict(), 'final_model.pth')
print("Training complete. Models saved as 'best_model.pth' and 'final_model.pth'")

Loading dataset...
Generating triplets...
Training triplets: 19000
Testing triplets: 120
Creating datasets...
Starting training...


Epoch 1/5 [Train]: 100%|██████████| 594/594 [06:42<00:00,  1.48batch/s, loss=0]
Epoch 1/5 [Val]: 100%|██████████| 4/4 [00:01<00:00,  2.68batch/s, loss=0]



Epoch 1 Summary:
Train Loss: 0.0337 | Val Loss: 2.0138
Learning Rate: 1.00e-04
Best Val Loss: 2.0138


Epoch 2/5 [Train]: 100%|██████████| 594/594 [06:36<00:00,  1.50batch/s, loss=0]
Epoch 2/5 [Val]: 100%|██████████| 4/4 [00:01<00:00,  2.54batch/s, loss=0.204]



Epoch 2 Summary:
Train Loss: 0.0074 | Val Loss: 0.8951
Learning Rate: 1.00e-04
Best Val Loss: 0.8951


Epoch 3/5 [Train]: 100%|██████████| 594/594 [06:35<00:00,  1.50batch/s, loss=0]
Epoch 3/5 [Val]: 100%|██████████| 4/4 [00:01<00:00,  2.70batch/s, loss=0]



Epoch 3 Summary:
Train Loss: 0.0044 | Val Loss: 2.4283
Learning Rate: 1.00e-04
Best Val Loss: 0.8951


Epoch 4/5 [Train]: 100%|██████████| 594/594 [06:35<00:00,  1.50batch/s, loss=0]
Epoch 4/5 [Val]: 100%|██████████| 4/4 [00:01<00:00,  2.64batch/s, loss=0]



Epoch 4 Summary:
Train Loss: 0.0076 | Val Loss: 1.7476
Learning Rate: 1.00e-04
Best Val Loss: 0.8951


Epoch 5/5 [Train]: 100%|██████████| 594/594 [06:35<00:00,  1.50batch/s, loss=0]
Epoch 5/5 [Val]: 100%|██████████| 4/4 [00:01<00:00,  2.67batch/s, loss=0]



Early stopping triggered at epoch 5
Training complete. Models saved as 'best_model.pth' and 'final_model.pth'


In [12]:
def load_triplet_images(triplet, img_size=(150, 150)):
    def load_image(path):
        img = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
        if img is None:
            raise ValueError(f"Failed to load image: {path}")
        img = cv2.resize(img, img_size)
        return img.astype(np.float32) / 255.0  # Normalize to [0,1]

    anchor = load_image(triplet[0])
    positive = load_image(triplet[1])
    negative = load_image(triplet[2])

    return anchor, positive, negative

In [13]:
def evaluate_model(model, test_triplets):
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for triplet in test_triplets:
            a, p, n = load_triplet_images(triplet)
            a = torch.tensor(a).unsqueeze(0).unsqueeze(0).to(device)
            p = torch.tensor(p).unsqueeze(0).unsqueeze(0).to(device)
            n = torch.tensor(n).unsqueeze(0).unsqueeze(0).to(device)

            out_a, out_p = model.forward_once(a), model.forward_once(p)
            out_n = model.forward_once(n)

            d_ap = F.pairwise_distance(out_a, out_p)
            d_an = F.pairwise_distance(out_a, out_n)

            if d_ap.item() < d_an.item():
                correct += 1
            total += 1

    print(f"Accuracy on test set: {correct / total:.2f}")


In [14]:
# Evaluate model on the test triplets
evaluate_model(model, test_triplets)

NameError: name 'device' is not defined

In [None]:
torch.save(model.state_dict(), 'siamese_signature_model.pth')

# To load later
# model.load_state_dict(torch.load('siamese_signature_model.pth'))
# model.eval()

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
def test_signature_pair(img_path1, img_path2, model, threshold=1.5):
    model.eval()
    img1, img2 = load_triplet_images((img_path1, img_path2, img_path2))[:2]  # ignore negative

    img1 = torch.tensor(img1).unsqueeze(0).unsqueeze(0).to(device)
    img2 = torch.tensor(img2).unsqueeze(0).unsqueeze(0).to(device)

    with torch.no_grad():
        out1 = model.forward_once(img1)
        out2 = model.forward_once(img2)

    distance = F.pairwise_distance(out1, out2).item()
    print(f"Distance: {distance:.4f}")
    return distance < threshold  # True if similar

In [None]:
# Example real vs. real (should return True if genuine match)
img_path1 = 'iti/Computer_Vision/Day01/Siamese/personA/Test/personA_29.png'
img_path2 = 'iti/Computer_Vision/Day01/Siamese/personA/Test/personA_13.png'

# Example real vs. forged (should return False if forged)
img_path3 = 'iti/Computer_Vision/Day01/Siamese/personA/Test/personA_29.png'
img_path4 = 'iti/Computer_Vision/Day01/Siamese/personA/Test/personA_33.png'

# Test genuine match
is_match = test_signature_pair(img_path1, img_path2, model)
print("Match (real vs. real):", is_match)

# Test forged case
is_forged = test_signature_pair(img_path3, img_path4, model)
print("Match (real vs. forged):", is_forged)

In [None]:
##################