
# PHASE 1: Baseline Metric Selection for Pre-training (K-Fold Comparison)

This notebook runs K-Fold cross-validation to compare fixed distance metrics (Euclidean, Cosine, Manhattan) and prepares the best pre-trained feature extractor for meta-training.

The environment setup and file paths have been updated to clone the repository and use paths relative to the cloned repo and Kaggle input structure.


In [None]:
# === Environment Setup: clone the repository and change working directory ===
# Ensure the latest version of the code is used
!rm -rf Deep-Learning-Based-Signature-Forgery-Detection-for-Personal-Identity-Authentication-Update
!git clone https://github.com/trongjhuongwr/Deep-Learning-Based-Signature-Forgery-Detection-for-Personal-Identity-Authentication-Update.git
%cd Deep-Learning-Based-Signature-Forgery-Detection-for-Personal-Identity-Authentication-Update


In [None]:
# === Imports and repo path setup ===
import os
import sys
import random
import json
import numpy as np
from tqdm.notebook import tqdm

import torch
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
import torchvision.transforms as transforms

# Add the current repo root to sys.path so imports resolve
BASE_DIR = os.path.abspath(os.getcwd())
sys.path.append(BASE_DIR)
print(f"Repo base directory: {BASE_DIR}")

# Import from the project (modules expected in the cloned repo)
from utils.helpers import load_config
from models.Triplet_Siamese_Similarity_Network import tSSN
from models.feature_extractor import ResNetFeatureExtractor
from losses.triplet_loss import TripletLoss
from dataloader.tSSN_trainloader import SignaturePretrainDataset

print("Imports completed and seed set.")

In [None]:
# === Set global seed for reproducibility ===
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

print("Random seed and torch backend configured.")

In [None]:
# === Helper functions and small classes ===
import re
from PIL import Image
from itertools import combinations
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

def _get_user_id_from_filename(filename):
    """Extract user ID from a filename (CEDAR or BHSig style)."""
    match = re.search(r'_(\d+)_', filename)  # CEDAR-style
    if match:
        return int(match.group(1))
    match = re.search(r'-(\d+)-', filename)  # BHSig-style
    if match:
        return int(match.group(1))
    return None

from torch.utils.data import Dataset

class SignaturePairDataset(Dataset):
    """Dataset that builds PAIRS for evaluation (genuine vs forged)."""
    def __init__(self, org_dir, forg_dir, user_ids, transform=None):
        self.transform = transform
        self.pairs = []
        self.user_map = {}
        supported_extensions = ('.png', '.jpg', '.jpeg', '.tif', '.tiff', '.bmp')
        user_ids_set = set(user_ids)

        # collect genuine images per user
        for f in os.listdir(org_dir):
            if f.lower().endswith(supported_extensions):
                user_id = _get_user_id_from_filename(f)
                if user_id in user_ids_set:
                    if user_id not in self.user_map:
                        self.user_map[user_id] = {'genuine': [], 'forged': []}
                    self.user_map[user_id]['genuine'].append(os.path.join(org_dir, f))

        # collect forged images per user
        for f in os.listdir(forg_dir):
            if f.lower().endswith(supported_extensions):
                user_id = _get_user_id_from_filename(f)
                if user_id in user_ids_set:
                    if user_id not in self.user_map:
                        continue
                    self.user_map[user_id]['forged'].append(os.path.join(forg_dir, f))

        # build pairs: genuine-genuine (label=1) and genuine-forged (label=0)
        for user_id in self.user_map:
            genuine_list = self.user_map[user_id]['genuine']
            forged_list = self.user_map[user_id]['forged']
            for (img_path1, img_path2) in combinations(genuine_list, 2):
                self.pairs.append((img_path1, img_path2, 1))
            for gen_path in genuine_list:
                for forg_path in forged_list:
                    self.pairs.append((gen_path, forg_path, 0))

        if not self.pairs:
            print(f"Warning: No pairs created for user_ids: {user_ids}.")

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        img_path1, img_path2, label = self.pairs[idx]
        try:
            img1 = Image.open(img_path1).convert('L')
            img2 = Image.open(img_path2).convert('L')
            if self.transform:
                img1 = self.transform(img1)
                img2 = self.transform(img2)
            return img1, img2, torch.tensor(label, dtype=torch.float32)
        except Exception as e:
            print(f"Error loading image: {e}. Returning None.")
            return None

def collate_fn_skip_none(batch):
    """Custom collate function that skips failed items (None)."""
    batch = list(filter(lambda x: x is not None, batch))
    if not batch:
        return torch.empty(0), torch.empty(0), torch.empty(0)
    return torch.utils.data.dataloader.default_collate(batch)

In [None]:
# === EER / FAR / FRR helper ===
import numpy as np
from sklearn.metrics import roc_auc_score, accuracy_score

def calculate_far_frr_eer(true_labels, distances):
    """Compute EER and threshold from arrays of true_labels and distances."""
    true_labels = np.array(true_labels)
    distances = np.array(distances)
    finite_mask = np.isfinite(distances)
    if not np.any(finite_mask):
        return 1.0, np.nan
    true_labels = true_labels[finite_mask]
    distances = distances[finite_mask]
    if len(np.unique(true_labels)) < 2 or len(distances) == 0:
        return 1.0, np.nan
    min_dist, max_dist = np.min(distances), np.max(distances)
    thresholds = np.linspace(min_dist - 1e-6, max_dist + 1e-6, num=500)
    far_list = []
    frr_list = []
    for thresh in thresholds:
        predictions = (distances < thresh).astype(int)
        tp = np.sum((predictions == 1) & (true_labels == 1))
        fp = np.sum((predictions == 1) & (true_labels == 0))
        tn = np.sum((predictions == 0) & (true_labels == 0))
        fn = np.sum((predictions == 0) & (true_labels == 1))
        far = fp / (fp + tn) if (fp + tn) > 0 else 0.0
        frr = fn / (fn + tp) if (fn + tp) > 0 else 0.0
        far_list.append(far)
        frr_list.append(frr)
    far_list = np.array(far_list)
    frr_list = np.array(frr_list)
    eer_index = np.nanargmin(np.abs(far_list - frr_list))
    eer = (far_list[eer_index] + frr_list[eer_index]) / 2.0
    eer_threshold = thresholds[eer_index]
    return eer, eer_threshold

In [None]:
# === Train epoch and pair-evaluation functions ===
def train_epoch(model, dataloader, loss_fn, optimizer, device):
    model.train()
    total_loss = 0.0
    num_batches = 0
    for item in dataloader:
        if isinstance(item, tuple) and len(item) == 3 and item[0].nelement() == 0:
            continue
        anchor, positive, negative = item
        anchor, positive, negative = anchor.to(device), positive.to(device), negative.to(device)
        optimizer.zero_grad()
        anchor_feat, positive_feat, negative_feat = model(anchor, positive, negative)
        loss = loss_fn(anchor_feat, positive_feat, negative_feat)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        num_batches += 1
    return (total_loss / num_batches) if num_batches > 0 else 0.0

def evaluate_on_pairs(model, dataloader, device, distance_mode='euclidean'):
    model.eval()
    all_labels = []
    all_distances = []
    with torch.no_grad():
        for item in dataloader:
            if isinstance(item, tuple) and len(item) == 3 and item[0].nelement() == 0:
                continue
            img1, img2, label = item
            img1, img2 = img1.to(device), img2.to(device)
            feat1 = model.feature_extractor(img1)
            feat2 = model.feature_extractor(img2)
            if distance_mode == 'euclidean':
                distances = F.pairwise_distance(feat1, feat2, p=2)
            elif distance_mode == 'cosine':
                distances = 1.0 - F.cosine_similarity(feat1, feat2, dim=1)
            elif distance_mode == 'manhattan':
                distances = F.pairwise_distance(feat1, feat2, p=1)
            else:
                distances = F.pairwise_distance(feat1, feat2, p=2)
            all_distances.extend(distances.cpu().numpy())
            all_labels.extend(label.cpu().numpy())
    if not all_labels or not all_distances:
        return 1.0, 0.0, 0.0
    eer, eer_threshold = calculate_far_frr_eer(all_labels, all_distances)
    roc_scores = -np.array(all_distances)
    roc_auc = roc_auc_score(all_labels, roc_scores)
    predictions = (np.array(all_distances) < eer_threshold).astype(int)
    accuracy = accuracy_score(all_labels, predictions)
    return eer, roc_auc, accuracy

In [None]:
NUM_SPLITS = 5
BASE_DATA_DIR = '/kaggle/input/cedardataset/signatures'
SPLIT_FILES_DIR = '/kaggle/working/Deep-Learning-Based-Signature-Forgery-Detection-for-Personal-Identity-Authentication-Update/scripts/prepare_kfold_splits'

print("Generating K-Fold split files...")
os.makedirs(SPLIT_FILES_DIR, exist_ok=True)

script_path = 'scripts/prepare_kfold_splits.py'
command = f"python {script_path} --base_data_dir {BASE_DATA_DIR} --output_dir {SPLIT_FILES_DIR} --seed {SEED} --num_splits {NUM_SPLITS}"

print(f"Running command: {command}")
!{command}

created_files = os.listdir(SPLIT_FILES_DIR)
print(f"Generated files in {SPLIT_FILES_DIR}: {created_files}")
if len(created_files) != NUM_SPLITS:
    print(f"Warning: Expected {NUM_SPLITS} split files, but found {len(created_files)}.")
else:
    print("K-Fold split files generated successfully.")

In [None]:
# === Load config, define data dirs, and K-Fold split path ===
config = load_config(os.path.join(BASE_DIR, 'configs', 'config_tSSN.yaml'))

# Default Kaggle dataset folder name (adjust if different)
KAGGLE_CEDAR_DATASET_NAME = 'cedardataset'
KAGGLE_BASE = f'/kaggle/input/{KAGGLE_CEDAR_DATASET_NAME}'
DATA_DIR = os.path.join(KAGGLE_BASE, 'signatures')
ORG_DIR = os.path.join(DATA_DIR, 'full_org')
FORG_DIR = os.path.join(DATA_DIR, 'full_forg')

# K-Fold split JSON assumed to be inside repo root or created previously
SPLIT_FILE_PATH = os.path.join(BASE_DIR, 'kfold_splits_cedar_5.json')

print('Configuration:')
print('  ORG_DIR =', ORG_DIR)
print('  FORG_DIR =', FORG_DIR)
print('  SPLIT_FILE_PATH =', SPLIT_FILE_PATH)

if not os.path.isdir(ORG_DIR) or not os.path.isdir(FORG_DIR):
    print('WARNING: Data directories do not exist. Please ensure the Kaggle dataset is added.')

In [None]:
# === Define transform and device ===
input_size = tuple(config['dataset']['input_size'])
transform = transforms.Compose([
    transforms.Resize(input_size),
    transforms.Grayscale(),
    transforms.ToTensor(),
    transforms.Lambda(lambda x: x.repeat(3, 1, 1)),
    transforms.Normalize(mean=[0.5,0.5,0.5], std=[0.5,0.5,0.5])
])

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', device)

In [None]:
# === Load K-Fold splits JSON ===
K_FOLDS = 5
if not os.path.exists(SPLIT_FILE_PATH):
    raise FileNotFoundError(f"Split file not found at {SPLIT_FILE_PATH}. Please generate it first.")
with open(SPLIT_FILE_PATH, 'r') as f:
    kfold_splits = json.load(f)
if len(kfold_splits) != K_FOLDS:
    print(f"Warning: expected {K_FOLDS} folds, found {len(kfold_splits)}")
print(f"Loaded {len(kfold_splits)} fold entries.")

In [None]:
# === Experiment configuration ===
NUM_EPOCHS_PER_FOLD = config['training']['num_epochs']
MODES_TO_TEST = ['euclidean', 'cosine', 'manhattan']
MARGINS_TO_TEST = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
results_data = []

In [None]:
# === Run K-Fold experiments ===
print(f"Running experiments: modes={MODES_TO_TEST} margins={MARGINS_TO_TEST} epochs={NUM_EPOCHS_PER_FOLD}")

for mode in MODES_TO_TEST:
    for margin in MARGINS_TO_TEST:
        fold_metrics = {'eer': [], 'roc_auc': [], 'accuracy': [], 'train_loss': []}
        config_name = f"mode={mode}_margin={margin}"
        print(f"\n--- Experiment: {config_name} ---")
        for fold_data in kfold_splits:
            fold_index = fold_data['fold']
            train_users = fold_data['train_users']
            val_users = fold_data['val_users']
            print(f"  Fold {fold_index}/{len(kfold_splits)}")
            model = tSSN(backbone_name=config['model']['backbone'], output_dim=config['model']['feature_dim'], pretrained=True).to(device)
            loss_fn = TripletLoss(margin=margin, mode=mode).to(device)
            optimizer = optim.Adam(model.parameters(), lr=config['training']['learning_rate'])
            # prepare train triplet dataset and filter by train_users
            train_triplet_dataset = SignaturePretrainDataset(org_dir=ORG_DIR, forg_dir=FORG_DIR, transform=transform)
            train_triplet_dataset.triplets = [
                t for t in train_triplet_dataset.triplets
                if _get_user_id_from_filename(os.path.basename(t[0])) in train_users
            ]
            print(f"    Train triplets: {len(train_triplet_dataset)} from {len(train_users)} users")
            val_pair_dataset = SignaturePairDataset(org_dir=ORG_DIR, forg_dir=FORG_DIR, user_ids=val_users, transform=transform)
            print(f"    Val pairs: {len(val_pair_dataset)} from {len(val_users)} users")
            train_loader = DataLoader(train_triplet_dataset, batch_size=config['training']['batch_size'], shuffle=True, num_workers=2, collate_fn=collate_fn_skip_none)
            val_loader = DataLoader(val_pair_dataset, batch_size=config['training']['batch_size']*2, shuffle=False, num_workers=2, collate_fn=collate_fn_skip_none)
            if len(train_loader) == 0 or len(val_loader) == 0:
                print(f"    Skipping fold {fold_index} due to no data")
                continue
            avg_train_loss = 0.0
            for epoch in range(NUM_EPOCHS_PER_FOLD):
                train_loss = train_epoch(model, train_loader, loss_fn, optimizer, device)
                avg_train_loss += train_loss
                if epoch == NUM_EPOCHS_PER_FOLD - 1:
                    eer, roc_auc, acc = evaluate_on_pairs(model, val_loader, device, distance_mode=mode)
                    print(f"    Fold {fold_index} final: Train Loss {train_loss:.4f}, Val EER {eer:.4f}, ROC-AUC {roc_auc:.4f}")
                    fold_metrics['eer'].append(eer)
                    fold_metrics['roc_auc'].append(roc_auc)
                    fold_metrics['accuracy'].append(acc)
                    fold_metrics['train_loss'].append(avg_train_loss / NUM_EPOCHS_PER_FOLD)
        mean_eer = np.mean(fold_metrics['eer']) if fold_metrics['eer'] else 1.0
        mean_roc_auc = np.mean(fold_metrics['roc_auc']) if fold_metrics['roc_auc'] else 0.0
        mean_acc = np.mean(fold_metrics['accuracy']) if fold_metrics['accuracy'] else 0.0
        mean_loss = np.mean(fold_metrics['train_loss']) if fold_metrics['train_loss'] else np.nan
        print(f"  >> {config_name} Mean EER {mean_eer:.4f} ROC_AUC {mean_roc_auc:.4f} Acc {mean_acc:.4f}")
        results_data.append({'mode': mode, 'margin': margin, 'mean_eer': mean_eer, 'mean_roc_auc': mean_roc_auc, 'mean_accuracy': mean_acc, 'mean_train_loss': mean_loss})
print("\nAll experiments completed.")


In [None]:
# === Analyze results and pick best config ===
import pandas as pd
results_df = pd.DataFrame(results_data)
print('Results summary:')
print(results_df.to_markdown(index=False, floatfmt='.4f'))
results_df = results_df.sort_values(by=['mean_eer', 'mean_roc_auc'], ascending=[True, False])
print('\nRanking (best first):')
print(results_df.to_markdown(index=False, floatfmt='.4f'))
best_config = results_df.iloc[0]
print('\nSelected best config:')
print(best_config.to_dict())

In [None]:
# # === Optional: final training on all data with the selected config and save feature extractor ===
# BEST_MODE = best_config['mode']
# BEST_MARGIN = best_config['margin']
# FINAL_EPOCHS = NUM_EPOCHS_PER_FOLD
# SAVE_PATH = os.path.join('/kaggle/working', 'models', 'baseline_best_feature_extractor.pth')
# final_model = tSSN(backbone_name=config['model']['backbone'], output_dim=config['model']['feature_dim'], pretrained=True).to(device)
# final_loss_fn = TripletLoss(margin=BEST_MARGIN, mode=BEST_MODE).to(device)
# final_optimizer = optim.Adam(final_model.parameters(), lr=config['training']['learning_rate'])
# final_train_dataset = SignaturePretrainDataset(org_dir=ORG_DIR, forg_dir=FORG_DIR, transform=transform)
# final_train_loader = DataLoader(final_train_dataset, batch_size=config['training']['batch_size'], shuffle=True, num_workers=2, collate_fn=collate_fn_skip_none)
# print(f"Final training on {len(final_train_dataset)} triplets")
# if len(final_train_loader) > 0:
#     for epoch in tqdm(range(FINAL_EPOCHS), desc='Final training'):
#         loss = train_epoch(final_model, final_train_loader, final_loss_fn, final_optimizer, device)
#         if (epoch+1) % 10 == 0:
#             print(f"Epoch {epoch+1}/{FINAL_EPOCHS} loss {loss:.4f}")
#     os.makedirs(os.path.dirname(SAVE_PATH), exist_ok=True)
#     torch.save(final_model.feature_extractor.state_dict(), SAVE_PATH)
#     print('Saved final feature extractor to', SAVE_PATH)
# else:
#     print('No data for final training.')
