# ArcFace Evaluation - Kaggle

Notebook danh gia va test model ArcFace da train.

## Noi dung:
1. **Integration Tests**: Model pipeline, Data pipeline, Inference pipeline
2. **Evaluation Metrics**: Top-1/Top-5 Accuracy, Confusion Matrix, ROC Curve/AUC
3. **Performance Tests**: Inference latency, DataLoader throughput
4. **Visualization**: t-SNE, Sample predictions, Error analysis

## Chuan bi:
1. Add dataset `celeba-aligned-balanced`
2. Add dataset checkpoint (chua `arcface_best.pth`)
3. Bat GPU: Settings > Accelerator > GPU

In [None]:
# === CAU HINH ===
import os
import sys
import time
import json
from datetime import datetime

ROOT = "/kaggle/working/FaceRecognition"
CHECKPOINT_DIR = "/kaggle/working/checkpoints/arcface"
KAGGLE_DATASET_NAME = "celeba-aligned-balanced"
DATA_DIR = f"/kaggle/input/{KAGGLE_DATASET_NAME}"
CHECKPOINT_DATASET_NAME = "arcface-checkpoints"

os.makedirs(CHECKPOINT_DIR, exist_ok=True)
print(f"ROOT: {ROOT}")
print(f"DATA_DIR: {DATA_DIR}")

In [None]:
# Copy checkpoint tu input dataset
import shutil
import glob

checkpoint_input = f"/kaggle/input/{CHECKPOINT_DATASET_NAME}"
if os.path.exists(checkpoint_input):
    pth_files = glob.glob(os.path.join(checkpoint_input, "**/*.pth"), recursive=True)
    for pth in pth_files:
        dest = os.path.join(CHECKPOINT_DIR, os.path.basename(pth))
        if not os.path.exists(dest):
            shutil.copy(pth, dest)
            print(f"Copied: {os.path.basename(pth)}")
    print(f"Checkpoint files: {os.listdir(CHECKPOINT_DIR)}")
else:
    print(f"[WARN] Checkpoint dataset not found: {checkpoint_input}")

In [None]:
# Clone repository
REPO_URL = "https://github.com/sin0235/FaceRecognition.git"
if os.path.exists(ROOT):
    %cd {ROOT}
    !git pull
else:
    !git clone {REPO_URL} {ROOT}
    %cd {ROOT}
if ROOT not in sys.path:
    sys.path.insert(0, ROOT)

In [None]:
!pip install -q opencv-python-headless Pillow scikit-learn tqdm pyyaml matplotlib seaborn

In [None]:
import torch
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from PIL import Image
from tqdm import tqdm
from sklearn.metrics import confusion_matrix, roc_curve, auc
from sklearn.preprocessing import label_binarize
from sklearn.manifold import TSNE

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Device: {device}")

---
## 1. Integration Tests

In [None]:
test_results = {}
def run_test(name, test_func):
    try:
        start = time.time()
        result = test_func()
        elapsed = time.time() - start
        test_results[name] = {'status': 'PASS', 'time': elapsed}
        print(f"[PASS] {name} ({elapsed:.2f}s)")
        return result
    except Exception as e:
        test_results[name] = {'status': 'FAIL', 'error': str(e)}
        print(f"[FAIL] {name}: {e}")
        return None

In [None]:
# Test: Load Model
def test_load_model():
    from models.arcface.arcface_model import ArcFaceModel
    checkpoint_path = os.path.join(CHECKPOINT_DIR, "arcface_best.pth")
    checkpoint = torch.load(checkpoint_path, map_location='cpu', weights_only=False)
    num_classes = checkpoint.get('num_classes', 9343)
    model = ArcFaceModel(num_classes=num_classes, pretrained=False)
    model.load_state_dict(checkpoint['model_state_dict'])
    return {'num_classes': num_classes, 'epoch': checkpoint.get('epoch', 0) + 1}

model_info = run_test("Load Model", test_load_model)
if model_info:
    print(f"  Classes: {model_info['num_classes']}, Epochs: {model_info['epoch']}")

In [None]:
# Test: Forward Pass
def test_forward_pass():
    from models.arcface.arcface_model import ArcFaceModel
    checkpoint = torch.load(os.path.join(CHECKPOINT_DIR, "arcface_best.pth"), map_location='cpu', weights_only=False)
    model = ArcFaceModel(num_classes=checkpoint.get('num_classes', 9343), pretrained=False)
    model.load_state_dict(checkpoint['model_state_dict'])
    model.to(device).eval()
    with torch.no_grad():
        emb = model.extract_features(torch.randn(4, 3, 112, 112).to(device))
    assert emb.shape == (4, 512)
    return {'shape': list(emb.shape)}

run_test("Forward Pass", test_forward_pass)

In [None]:
# Test Summary
passed = sum(1 for r in test_results.values() if r['status'] == 'PASS')
print(f"\nIntegration Tests: {passed}/{len(test_results)} passed")

---
## 2. Evaluation Metrics

In [None]:
# Load model
from models.arcface.arcface_model import ArcFaceModel
checkpoint_path = os.path.join(CHECKPOINT_DIR, "arcface_best.pth")
checkpoint = torch.load(checkpoint_path, map_location='cpu', weights_only=False)
num_classes = checkpoint.get('num_classes', 9343)
model = ArcFaceModel(num_classes=num_classes, pretrained=False)
model.load_state_dict(checkpoint['model_state_dict'])
model.to(device).eval()
print(f"Model: {num_classes} classes, Best val acc: {checkpoint.get('best_val_acc', 0):.2f}%")

In [None]:
# Load test data
from models.arcface.arcface_dataloader import FolderBasedDataset, get_val_transforms
from torch.utils.data import DataLoader

test_dir = os.path.join(DATA_DIR, "CelebA_Aligned_Balanced", "test")
if not os.path.exists(test_dir):
    test_dir = os.path.join(DATA_DIR, "CelebA_Aligned_Balanced", "val")

eval_dataset = FolderBasedDataset(test_dir, transform=get_val_transforms(112), min_images_per_identity=1)
eval_loader = DataLoader(eval_dataset, batch_size=128, shuffle=False, num_workers=4)
print(f"Eval: {len(eval_dataset)} samples, {len(eval_loader)} batches")

In [None]:
# Evaluate: Top-1 / Top-5 Accuracy
all_preds, all_labels, all_probs = [], [], []
with torch.no_grad():
    for images, labels in tqdm(eval_loader):
        images, labels = images.to(device), labels.to(device)
        logits, _ = model(images, labels)
        probs = torch.softmax(logits, dim=1)
        _, preds = torch.max(logits, 1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())
        all_probs.extend(probs.cpu().numpy())

all_preds = np.array(all_preds)
all_labels = np.array(all_labels)
all_probs = np.array(all_probs)

top1_acc = (all_preds == all_labels).mean() * 100
top5_preds = np.argsort(all_probs, axis=1)[:, -5:]
top5_acc = np.mean([l in p for l, p in zip(all_labels, top5_preds)]) * 100

print(f"\n{'='*40}")
print(f"Top-1 Accuracy: {top1_acc:.2f}%")
print(f"Top-5 Accuracy: {top5_acc:.2f}%")

In [None]:
# Confusion Matrix
sample_classes = 20
class_counts = np.bincount(all_labels)
top_classes = np.argsort(class_counts)[-sample_classes:]
mask = np.isin(all_labels, top_classes)
labels_sub, preds_sub = all_labels[mask], all_preds[mask]

label_map = {old: new for new, old in enumerate(sorted(set(labels_sub)))}
labels_re = np.array([label_map.get(l, -1) for l in labels_sub])
preds_re = np.array([label_map.get(p, -1) for p in preds_sub])
valid = (labels_re >= 0) & (preds_re >= 0)

cm = confusion_matrix(labels_re[valid], preds_re[valid])
plt.figure(figsize=(10, 8))
sns.heatmap(cm, cmap='Blues')
plt.title(f'Confusion Matrix (Top {sample_classes} Classes)')
plt.savefig('/kaggle/working/confusion_matrix.png', dpi=150)
plt.show()

In [None]:
# ROC Curve / AUC
print("Computing ROC Curve...")
classes_for_roc = np.unique(all_labels)

# Sample de tinh nhanh
sample_size = min(5000, len(all_labels))
idx = np.random.choice(len(all_labels), sample_size, replace=False)
labels_sample = all_labels[idx]
probs_sample = all_probs[idx]

y_true_bin = label_binarize(labels_sample, classes=classes_for_roc)
fpr, tpr, _ = roc_curve(y_true_bin.ravel(), probs_sample[:, classes_for_roc].ravel())
roc_auc = auc(fpr, tpr)

print(f"\nMicro-average AUC: {roc_auc:.4f}")

In [None]:
# Plot ROC Curve
plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, label=f'ROC (AUC = {roc_auc:.4f})', color='blue', linewidth=2)
plt.plot([0, 1], [0, 1], 'k--', label='Random')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend()
plt.grid(alpha=0.3)
plt.savefig('/kaggle/working/roc_curve.png', dpi=150)
plt.show()

In [None]:
# Threshold Analysis
max_probs = np.max(all_probs, axis=1)
is_correct = (all_preds == all_labels).astype(int)

thresholds = np.arange(0.0, 1.0, 0.05)
accs, covs = [], []
for t in thresholds:
    m = max_probs >= t
    covs.append(m.mean() * 100)
    accs.append(is_correct[m].mean() * 100 if m.sum() > 0 else 0)

fig, ax1 = plt.subplots(figsize=(10, 5))
ax1.plot(thresholds, accs, 'b-', label='Accuracy')
ax1.set_ylabel('Accuracy (%)', color='blue')
ax2 = ax1.twinx()
ax2.plot(thresholds, covs, 'r--', label='Coverage')
ax2.set_ylabel('Coverage (%)', color='red')
plt.title('Accuracy vs Coverage at Thresholds')
plt.savefig('/kaggle/working/threshold_analysis.png', dpi=150)
plt.show()

---
## 3. Performance Tests

In [None]:
# Latency Test
dummy = torch.randn(1, 3, 112, 112).to(device)
for _ in range(10): model.extract_features(dummy)  # warmup

latencies = []
for _ in range(100):
    if device == 'cuda': torch.cuda.synchronize()
    start = time.time()
    model.extract_features(torch.randn(1, 3, 112, 112).to(device))
    if device == 'cuda': torch.cuda.synchronize()
    latencies.append((time.time() - start) * 1000)

print(f"\nLatency: {np.mean(latencies):.2f} ms (avg), {np.percentile(latencies, 95):.2f} ms (p95)")

In [None]:
# Throughput Test
batch_sizes = [1, 16, 32, 64, 128]
throughputs = []
for bs in batch_sizes:
    dummy = torch.randn(bs, 3, 112, 112).to(device)
    for _ in range(5): model.extract_features(dummy)
    if device == 'cuda': torch.cuda.synchronize()
    start = time.time()
    for _ in range(20): model.extract_features(dummy)
    if device == 'cuda': torch.cuda.synchronize()
    throughputs.append((bs * 20) / (time.time() - start))
    print(f"Batch {bs}: {throughputs[-1]:.1f} img/s")

plt.bar([str(b) for b in batch_sizes], throughputs)
plt.ylabel('Throughput (img/s)')
plt.savefig('/kaggle/working/throughput.png', dpi=150)
plt.show()

---
## 4. t-SNE Visualization

In [None]:
# Extract embeddings
emb_list, lbl_list = [], []
with torch.no_grad():
    for img, lbl in tqdm(eval_loader):
        if len(emb_list) * 128 >= 2000: break
        emb_list.append(model.extract_features(img.to(device)).cpu().numpy())
        lbl_list.extend(lbl.numpy())

embs = np.concatenate(emb_list)[:2000]
lbls = np.array(lbl_list)[:2000]

# Filter top 50 classes
uniq = np.unique(lbls)[:50]
mask = np.isin(lbls, uniq)
embs, lbls = embs[mask], lbls[mask]

tsne = TSNE(n_components=2, perplexity=30, random_state=42)
embs_2d = tsne.fit_transform(embs)

plt.figure(figsize=(12, 8))
plt.scatter(embs_2d[:, 0], embs_2d[:, 1], c=lbls, cmap='tab20', s=10, alpha=0.6)
plt.title('t-SNE Embedding')
plt.savefig('/kaggle/working/tsne.png', dpi=150)
plt.show()

---
## 5. Final Report

In [None]:
report = {
    'timestamp': datetime.now().isoformat(),
    'model': {'num_classes': int(num_classes), 'epochs': int(checkpoint.get('epoch', 0) + 1)},
    'metrics': {
        'top1_accuracy': float(top1_acc),
        'top5_accuracy': float(top5_acc),
        'auc': float(roc_auc)
    },
    'performance': {
        'avg_latency_ms': float(np.mean(latencies)),
        'max_throughput': float(max(throughputs))
    }
}

with open('/kaggle/working/evaluation_report.json', 'w') as f:
    json.dump(report, f, indent=2)

print("\n" + "="*50)
print("EVALUATION REPORT")
print("="*50)
print(f"Top-1 Accuracy: {top1_acc:.2f}%")
print(f"Top-5 Accuracy: {top5_acc:.2f}%")
print(f"AUC Score: {roc_auc:.4f}")
print(f"Avg Latency: {np.mean(latencies):.2f} ms")
print(f"Max Throughput: {max(throughputs):.1f} img/s")

In [None]:
# List outputs
!ls -la /kaggle/working/*.png /kaggle/working/*.json