# === SETUP AND IMPORTS ===

In [1]:
# Ensure the latest version of the code is used by re-cloning the repository
!rm -rf Deep_Learning-Based_Signature_Forgery_Detection_for_Personal_Identity_Authentication
!git clone https://github.com/trongjhuongwr/Deep_Learning-Based_Signature_Forgery_Detection_for_Personal_Identity_Authentication.git
%cd Deep_Learning-Based_Signature_Forgery_Detection_for_Personal_Identity_Authentication

Cloning into 'Deep_Learning-Based_Signature_Forgery_Detection_for_Personal_Identity_Authentication'...
remote: Enumerating objects: 3440, done.[K
remote: Counting objects: 100% (143/143), done.[K
remote: Compressing objects: 100% (107/107), done.[K
remote: Total 3440 (delta 67), reused 89 (delta 36), pack-reused 3297 (from 3)[K
Receiving objects: 100% (3440/3440), 248.45 MiB | 60.26 MiB/s, done.
Resolving deltas: 100% (372/372), done.
/kaggle/working/Deep_Learning-Based_Signature_Forgery_Detection_for_Personal_Identity_Authentication


In [2]:
import pandas as pd
import shutil
import torch
import torch.nn.functional as F
import torchvision.transforms.functional as TF
import numpy as np
import os
import sys
import matplotlib.pyplot as plt
import seaborn as sns
import re
import json
import glob
import random
import torch.nn as nn
from tqdm.notebook import tqdm
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from tqdm.notebook import tqdm
from sklearn.metrics import roc_curve, auc, accuracy_score, confusion_matrix, precision_recall_fscore_support
from torchvision import transforms
import torch.optim as optim
import itertools

sys.path.append(os.path.abspath(os.getcwd()))

from models.feature_extractor import ResNetFeatureExtractor
from models.meta_learner import MetricGenerator
from dataloader.meta_dataloader import SignatureEpisodeDataset
from utils.model_evaluation import compute_metrics, _plot_roc_curve, _plot_score_distribution, _plot_confusion_matrix, _save_example_images

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

def seed_everything(seed=42):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    print(f"Random seed set to: {seed}")

SEED = 42
seed_everything(SEED)

print(f"Setup complete. Device: {DEVICE}")

Random seed set to: 42
Setup complete. Device: cuda


# === DATASET LOADER WITH PROTOCOL SPLIT ===

In [3]:
class CedarAdaptationDataset(Dataset):
    """
    Custom Loader for CEDAR that supports Domain Adaptation Protocol.
    
    Splitting Strategy:
    - Adaptation Set (Train): Users with ID <= split_user_id (e.g., 1-10).
      Used to fine-tune the model to the target domain style.
    - Evaluation Set (Test): Users with ID > split_user_id (e.g., 11-55).
      Used to validte the performance on unseen users (User-Independent).
    """
    def __init__(self, root_dir, mode='adaptation', split_user_id=10, n_pairs=500, transform=None):
        self.root_dir = root_dir
        self.mode = mode
        self.split_id = split_user_id
        self.n_pairs = n_pairs
        self.transform = transform
        self.users = {} # {uid: {'gen': [], 'forg': []}}
        self.pairs = []
        
        self._parse_cedar_structure()
        if self.mode == 'test':
            self._generate_exhaustive_pairs()
        else:
            self._generate_balanced_pairs()
        
    def _parse_cedar_structure(self):
        """Scans folder structure and groups files by User ID."""
        print(f" > [{self.mode.upper()}] Scanning CEDAR data...")
        
        # Paths based on your dataset structure
        gen_path = os.path.join(self.root_dir, 'full_org')
        forg_path = os.path.join(self.root_dir, 'full_forg')
        
        # 1. Parse Genuine
        for fpath in glob.glob(os.path.join(gen_path, "original_*.png")):
            # original_10_1.png -> ID=10
            try:
                uid = int(os.path.basename(fpath).split('_')[1])
                self._add_file(uid, fpath, 'gen')
            except: pass

        # 2. Parse Forged
        for fpath in glob.glob(os.path.join(forg_path, "forgeries_*.png")):
            # forgeries_10_1.png -> ID=10
            try:
                uid = int(os.path.basename(fpath).split('_')[1])
                self._add_file(uid, fpath, 'forg')
            except: pass
            
        print(f" > Total Users Found in Split: {len(self.users)}")

    def _add_file(self, uid, fpath, ftype):
        # Filtering Logic based on Mode
        if self.mode == 'adaptation' and uid <= self.split_id:
            if uid not in self.users: self.users[uid] = {'gen': [], 'forg': []}
            self.users[uid][ftype].append(fpath)
        elif self.mode == 'test' and uid > self.split_id:
            if uid not in self.users: self.users[uid] = {'gen': [], 'forg': []}
            self.users[uid][ftype].append(fpath)

    def _generate_balanced_pairs(self):
        """Generates balanced pairs for training/testing."""
        if len(self.users) == 0: return

        # Distribute n_pairs across available users
        pairs_per_user = max(20, self.n_pairs // len(self.users))
        
        for uid, data in self.users.items():
            gens = data['gen']
            forgs = data['forg']
            
            if len(gens) < 2: continue
            
            # 1. Genuine Pairs (Positive)
            for _ in range(pairs_per_user // 2):
                self.pairs.append((random.choice(gens), random.choice(gens), 1.0))
                
            # 2. Forged Pairs (Negative)
            if len(forgs) > 0:
                for _ in range(pairs_per_user // 2):
                    self.pairs.append((random.choice(gens), random.choice(forgs), 0.0))
        
        random.shuffle(self.pairs)
        print(f" > Generated {len(self.pairs)} pairs for {self.mode}.")

    def _generate_exhaustive_pairs(self):
        """For testing purposes: Explore ALL possible pairs (All-to-All)"""
        print(f" > [Exhaustive] Generating ALL possible pairs for {len(self.users)} users...")
        
        for uid, data in self.users.items():
            gens = data['gen']
            forgs = data['forg']
            
            # 1. Genuine Pairs: Combinations of 2 of all genuine signatures (No repetitions, no duplicates)
            gen_pairs = list(itertools.combinations(gens, 2))
            for p1, p2 in gen_pairs:
                self.pairs.append((p1, p2, 1.0))
            
            # 2. Forged Pairs: Descartes' product (Each genuine signature vs. each forged signature)
            for g in gens:
                for f in forgs:
                    self.pairs.append((g, f, 0.0))
                    
        print(f" > [Exhaustive] Total Pairs Generated: {len(self.pairs)} (Full Evaluation)")
    
    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        p1, p2, lbl = self.pairs[idx]
        img1 = Image.open(p1).convert('RGB')
        img2 = Image.open(p2).convert('RGB')
        
        if self.transform:
            img1 = self.transform(img1)
            img2 = self.transform(img2)
            
        return {
            'support_images': img1, 
            'query_images': img2, 
            'query_labels': torch.tensor(lbl, dtype=torch.float32),
            'paths': (p1, p2)
        }

# === ADAPTATION & EVALUATION ENGINES ===

In [4]:
def run_domain_adaptation(pretrained_path, train_loader, val_loader, device, epochs=10):
    """
    Fine-tunes the BHSig model on a small subset of CEDAR (10 users).
    """
    print(f"\n{'='*10} PHASE 1: FEW-SHOT DOMAIN ADAPTATION {'='*10}")
    
    # 1. Load Pre-trained Model
    feature_extractor = ResNetFeatureExtractor(backbone_name='resnet34').to(device)
    metric_generator = MetricGenerator(embedding_dim=1024).to(device)
    
    print(f" > Loading Source Weights: {os.path.basename(pretrained_path)}")
    ckpt = torch.load(pretrained_path, map_location=device, weights_only=False)
    feature_extractor.load_state_dict(ckpt['feature_extractor'])
    metric_generator.load_state_dict(ckpt['metric_generator'])
    
    # 2. Optimizer (Low Learning Rate for Stability)
    # We use a very small LR to gently adapt the weights without catastrophic forgetting
    optimizer = optim.AdamW([
        {'params': feature_extractor.parameters(), 'lr': 1e-5}, # Feature Extractor
        {'params': metric_generator.parameters(), 'lr': 5e-5}   # Relation Network
    ], weight_decay=1e-3)
    
    criterion = nn.BCEWithLogitsLoss()
    
    best_eer = 1.0
    best_state = None
    
    # 3. Adaptation Loop
    for epoch in range(epochs):
        feature_extractor.train()
        metric_generator.train()
        epoch_loss = 0
        
        for batch in tqdm(train_loader, desc=f"Adaptation Epoch {epoch+1}", leave=False):
            s = batch['support_images'].to(device)
            q = batch['query_images'].to(device)
            lbl = batch['query_labels'].to(device).unsqueeze(1)
            
            optimizer.zero_grad()
            
            # Forward
            combined = torch.cat((feature_extractor(s), feature_extractor(q)), dim=1)
            scores = metric_generator(combined)
            loss = criterion(scores, lbl)
            
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
            
        # Validation on Target Domain (Unseen Users)
        val_results = evaluate_model(feature_extractor, metric_generator, val_loader, device, silent=True)
        curr_eer = val_results['eer']
        
        print(f"   Epoch {epoch+1:02d} | Loss: {epoch_loss/len(train_loader):.4f} | Test EER: {curr_eer:.2%}")
        
        if curr_eer < best_eer:
            best_eer = curr_eer
            # Save in-memory best state
            best_state = {
                'fe': feature_extractor.state_dict(),
                'mg': metric_generator.state_dict()
            }
            
    print(f" > Adaptation Complete. Best EER Achieved: {best_eer:.2%}")
    
    # Load best weights for final testing
    feature_extractor.load_state_dict(best_state['fe'])
    metric_generator.load_state_dict(best_state['mg'])
    
    return feature_extractor, metric_generator

def evaluate_model(fe, mg, loader, device, output_dir=None, silent=False):
    """
    Standard evaluation function used for both validation and final testing.
    """
    fe.eval()
    mg.eval()
    all_scores, all_labels = [], []
    hard_negatives, hard_positives = [], []
    
    iter_bar = tqdm(loader, desc="Inference", leave=False) if not silent else loader
    
    with torch.no_grad():
        for batch in iter_bar:
            s = batch['support_images'].to(device)
            q = batch['query_images'].to(device)
            lbl = batch['query_labels'].to(device)
            paths = batch['paths']
            
            combined = torch.cat((fe(s), fe(q)), dim=1)
            probs = torch.sigmoid(mg(combined)).squeeze(1)
            
            all_scores.extend(probs.cpu().numpy())
            all_labels.extend(lbl.cpu().numpy())
            
            # Mine hard examples (only if output_dir is set)
            if output_dir:
                scores_np = probs.cpu().numpy()
                lbl_np = lbl.cpu().numpy()
                for i in range(len(scores_np)):
                    if lbl_np[i] == 1 and scores_np[i] < 0.5:
                        hard_positives.append((scores_np[i], 1, "FN", s[i].cpu(), q[i].cpu()))
                    elif lbl_np[i] == 0 and scores_np[i] > 0.5:
                        hard_negatives.append((scores_np[i], 0, "FP", s[i].cpu(), q[i].cpu()))
                        
    results = compute_metrics(all_labels, all_scores)
    
    if output_dir:
        # Full Reporting
        print(f"\n{'='*10} FINAL TEST RESULTS (CEDAR) {'='*10}")
        print(f"EER      : {results['eer']:.2%}")
        print(f"AUC      : {results['auc']:.4f}")
        print(f"Accuracy : {results['accuracy']:.2%}")
        print("="*40)
        
        _plot_roc_curve(results, output_dir)
        _plot_score_distribution(results, output_dir)
        _plot_confusion_matrix(results, output_dir)
        
        hard_positives.sort(key=lambda x: x[0])
        hard_negatives.sort(key=lambda x: x[0], reverse=True)
        _save_example_images(hard_positives[:5], "FalseNegative", output_dir)
        _save_example_images(hard_negatives[:5], "FalsePositive", output_dir)
        
    return results

# === EXECUTION ===

In [5]:
# Paths
CEDAR_ROOT = '/kaggle/input/cedardataset/signatures'
BHSIG_MODEL_PATH = '/kaggle/input/my-best-models-meta-learning/Deep_Learning-Based_Signature_Forgery_Detection_for_Personal_Identity_Authentication/checkpoints_meta/best_model_fold_2.pth' 
OUTPUT_DIR = '/kaggle/working/cedar_adaptation_results'

os.makedirs(OUTPUT_DIR, exist_ok=True)

# Transforms
test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# 1. Prepare Datasets
# Adaptation: The first 10 people, 2000 pairs to learn
adapt_set = CedarAdaptationDataset(CEDAR_ROOT, mode='adaptation', split_user_id=5, n_pairs=2000, transform=test_transform)
adapt_loader = DataLoader(adapt_set, batch_size=16, shuffle=True, num_workers=2)

# Test: 45 people remaining, 2000 pairs to compete
test_set = CedarAdaptationDataset(CEDAR_ROOT, mode='test', split_user_id=5, transform=test_transform)
test_loader = DataLoader(test_set, batch_size=64, shuffle=False, num_workers=2)

# 2. Run Process
if os.path.exists(BHSIG_MODEL_PATH) and len(adapt_set) > 0:
    # Phase 1: Adapt
    final_fe, final_mg = run_domain_adaptation(BHSIG_MODEL_PATH, adapt_loader, test_loader, DEVICE, epochs=15)
    
    # Phase 2: Final Test & Visualize
    evaluate_model(final_fe, final_mg, test_loader, DEVICE, output_dir=OUTPUT_DIR)
    
    # Save Adapted Model
    torch.save({
        'feature_extractor': final_fe.state_dict(),
        'metric_generator': final_mg.state_dict()
    }, os.path.join(OUTPUT_DIR, 'cedar_adapted_model.pth'))
    print(f" > Saved adapted model to {OUTPUT_DIR}")
    
else:
    print("Error: Check paths or dataset structure.")

 > [ADAPTATION] Scanning CEDAR data...
 > Total Users Found in Split: 5
 > Generated 2000 pairs for adaptation.
 > [TEST] Scanning CEDAR data...
 > Total Users Found in Split: 50
 > [Exhaustive] Generating ALL possible pairs for 50 users...
 > [Exhaustive] Total Pairs Generated: 42600 (Full Evaluation)



Downloading: "https://download.pytorch.org/models/resnet34-b627a593.pth" to /root/.cache/torch/hub/checkpoints/resnet34-b627a593.pth
100%|██████████| 83.3M/83.3M [00:00<00:00, 213MB/s]


 > Loading Source Weights: best_model_fold_2.pth


Adaptation Epoch 1:   0%|          | 0/125 [00:00<?, ?it/s]

   Epoch 01 | Loss: 0.2843 | Test EER: 22.68%


Adaptation Epoch 2:   0%|          | 0/125 [00:00<?, ?it/s]

   Epoch 02 | Loss: 0.0276 | Test EER: 17.04%


Adaptation Epoch 3:   0%|          | 0/125 [00:00<?, ?it/s]

   Epoch 03 | Loss: 0.0151 | Test EER: 15.42%


Adaptation Epoch 4:   0%|          | 0/125 [00:00<?, ?it/s]

   Epoch 04 | Loss: 0.0110 | Test EER: 13.55%


Adaptation Epoch 5:   0%|          | 0/125 [00:00<?, ?it/s]

   Epoch 05 | Loss: 0.0120 | Test EER: 13.52%


Adaptation Epoch 6:   0%|          | 0/125 [00:00<?, ?it/s]

   Epoch 06 | Loss: 0.0239 | Test EER: 12.30%


Adaptation Epoch 7:   0%|          | 0/125 [00:00<?, ?it/s]

   Epoch 07 | Loss: 0.0155 | Test EER: 8.18%


Adaptation Epoch 8:   0%|          | 0/125 [00:00<?, ?it/s]

   Epoch 08 | Loss: 0.0042 | Test EER: 7.43%


Adaptation Epoch 9:   0%|          | 0/125 [00:00<?, ?it/s]

   Epoch 09 | Loss: 0.0037 | Test EER: 6.47%


Adaptation Epoch 10:   0%|          | 0/125 [00:00<?, ?it/s]

   Epoch 10 | Loss: 0.0036 | Test EER: 6.81%


Adaptation Epoch 11:   0%|          | 0/125 [00:00<?, ?it/s]

   Epoch 11 | Loss: 0.0028 | Test EER: 6.16%


Adaptation Epoch 12:   0%|          | 0/125 [00:00<?, ?it/s]

   Epoch 12 | Loss: 0.0030 | Test EER: 6.46%


Adaptation Epoch 13:   0%|          | 0/125 [00:00<?, ?it/s]

   Epoch 13 | Loss: 0.0033 | Test EER: 5.89%


Adaptation Epoch 14:   0%|          | 0/125 [00:00<?, ?it/s]

   Epoch 14 | Loss: 0.0032 | Test EER: 3.51%


Adaptation Epoch 15:   0%|          | 0/125 [00:00<?, ?it/s]

   Epoch 15 | Loss: 0.0024 | Test EER: 4.34%
 > Adaptation Complete. Best EER Achieved: 3.51%


Inference:   0%|          | 0/666 [00:00<?, ?it/s]


EER      : 4.34%
AUC      : 0.9892
Accuracy : 95.66%
 > Saved ROC Plot to: /kaggle/working/cedar_adaptation_results/roc_curve.png


  with pd.option_context('mode.use_inf_as_na', True):
  with pd.option_context('mode.use_inf_as_na', True):


 > Saved Distribution Plot to: /kaggle/working/cedar_adaptation_results/score_distribution.png
 > Saved Confusion Matrix to: /kaggle/working/cedar_adaptation_results/confusion_matrix.png
 > Saved 5 hard examples for FalseNegative.
 > Saved 5 hard examples for FalsePositive.
 > Saved adapted model to /kaggle/working/cedar_adaptation_results
