In [3]:
pip install pandas

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 25.0.1 -> 25.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [4]:
# System/perf settings
import multiprocessing as mp
from PIL import ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True
# Environment & imports
import os, sys, json, random, math, time
from pathlib import Path
from collections import Counter, defaultdict

import numpy as np
import pandas as pd

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

from PIL import Image

# Cap CPU threads to avoid oversubscription
try:
    torch.set_num_threads(max(1, min(os.cpu_count() or 4, 8)))
except Exception:
    pass

print('CPU threads set to:', torch.get_num_threads())


CPU threads set to: 4


In [5]:


try:
    import torchvision
    from torchvision import transforms
except Exception as e:
    print("torchvision not found; attempting to continue with PIL-only transforms")
    torchvision = None
    transforms = None

ASSETS = Path('Assets')
assert ASSETS.exists(), f"Assets folder not found at {ASSETS.resolve()}"

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', device)


Using device: cpu


In [6]:
# Discover datasets
from typing import List, Tuple

IMG_EXTS = {'.png', '.jpg', '.jpeg', '.bmp', '.gif'}

def list_images_in_dir_of_dirs(root: Path) -> List[Tuple[Path, str]]:
    samples = []
    if not root.exists():
        return samples
    for class_dir in sorted([p for p in root.iterdir() if p.is_dir()]):
        label = class_dir.name
        for p in class_dir.rglob('*'):
            if p.suffix.lower() in IMG_EXTS and p.is_file():
                samples.append((p, label))
    return samples

# 1) augmented_images
aug_dir = ASSETS / 'augmented_images'
aug_samples = list_images_in_dir_of_dirs(aug_dir)
print(f"augmented_images: {len(aug_samples)} samples")

# 2) handwritten-english-characters-and-digits/train and test
hed_root = ASSETS / 'handwritten-english-characters-and-digits'
hed_train = list_images_in_dir_of_dirs(hed_root / 'train')
hed_test = list_images_in_dir_of_dirs(hed_root / 'test')
print(f"HED train: {len(hed_train)} samples, HED test: {len(hed_test)} samples")

# 3) image_labels.csv: filename,label
csv_path = ASSETS / 'image_labels.csv'
if not csv_path.exists():
    alt_csv_path = ASSETS / 'image_label.csv'  # support alternate name
    if alt_csv_path.exists():
        csv_path = alt_csv_path

csv_samples = []
if csv_path.exists():
    df = pd.read_csv(csv_path)
    assert {'filename','label'}.issubset(df.columns)
    # Index all images under Assets for filename lookup
    all_imgs = {p.name: p for p in ASSETS.rglob('*') if p.suffix.lower() in IMG_EXTS}
    missing = 0
    for _, row in df.iterrows():
        fname = str(row['filename'])
        label = str(row['label'])
        if fname in all_imgs:
            csv_samples.append((all_imgs[fname], label))
        else:
            # Try to search by suffix match if duplicates are unlikely
            matches = [p for n,p in all_imgs.items() if n.endswith(fname)]
            if len(matches) == 1:
                csv_samples.append((matches[0], label))
            else:
                missing += 1
    print(f"CSV samples resolved from {csv_path.name}: {len(csv_samples)} (missing: {missing})")
else:
    print("image_labels.csv not found; skipping CSV source")

# Merge all; if duplicates appear, prefer explicit CSV labels > hed > aug
# Use image absolute path as key
merged = {}
for p,l in aug_samples:
    merged[str(p.resolve())] = (p, l)
for p,l in hed_train + hed_test:
    merged[str(p.resolve())] = (p, l)
for p,l in csv_samples:
    merged[str(p.resolve())] = (p, l)

all_samples = list(merged.values())
print(f"Total unique samples: {len(all_samples)}")

# Map labels to indices
labels = sorted(sorted({l for _, l in all_samples}))
label_to_idx = {l:i for i,l in enumerate(labels)}
idx_to_label = {i:l for l,i in label_to_idx.items()}
print(f"Num classes: {len(labels)}")

# Class distribution
counts = Counter([l for _,l in all_samples])
print("Class distribution (top 20):", counts.most_common(20))


augmented_images: 13640 samples
HED train: 2728 samples, HED test: 682 samples
CSV samples resolved from image_labels.csv: 0 (missing: 13640)
Total unique samples: 17050
Num classes: 62
Class distribution (top 20): [('0', 275), ('1', 275), ('2', 275), ('3', 275), ('4', 275), ('5', 275), ('6', 275), ('7', 275), ('8', 275), ('9', 275), ('a', 275), ('A_caps', 275), ('b', 275), ('B_caps', 275), ('c', 275), ('C_caps', 275), ('d', 275), ('D_caps', 275), ('e', 275), ('E_caps', 275)]


In [7]:
# Train/Val/Test split (stratified)
from sklearn.model_selection import train_test_split

random_seed = 42
rng = np.random.RandomState(random_seed)

paths = np.array([str(p) for p,_ in all_samples])
labels_arr = np.array([label_to_idx[l] for _,l in all_samples])

# If HED has an explicit test set, we already included it. We'll still split overall
# into train/val/test=0.8/0.1/0.1 stratified.
X_temp, X_test, y_temp, y_test = train_test_split(
    paths, labels_arr, test_size=0.1, random_state=random_seed, stratify=labels_arr
)
X_train, X_val, y_train, y_val = train_test_split(
    X_temp, y_temp, test_size=0.1111, random_state=random_seed, stratify=y_temp
)  # 0.8 * 0.1111 ≈ 0.0889 so final ≈ 80/10/10

print(len(X_train), len(X_val), len(X_test))


13640 1705 1705


In [None]:
# (Removed) Baseline transforms and dataset — superseded by v2. Keeping small helper for compatibility.
IMG_SIZE = 64
basic_train_tfms = None
basic_val_tfms = None

class HandwritingDataset(Dataset):
    def __init__(self, paths: np.ndarray, labels: np.ndarray, transform=None):
        self.paths = paths
        self.labels = labels
        self.transform = transform
    def __len__(self):
        return len(self.paths)
    def __getitem__(self, idx):
        p = Path(self.paths[idx])
        y = int(self.labels[idx])
        with Image.open(p) as img:
            img = img.convert('L')
            if self.transform is not None:
                x = self.transform(img)
            else:
                # minimal fallback
                img = img.resize((IMG_SIZE, IMG_SIZE))
                arr = np.array(img, dtype=np.float32)/255.0
                arr = arr[None, ...]
                x = torch.from_numpy(arr)
        return x, y


In [None]:
# (Removed) Baseline dataloaders — use v2 or v2-fast loaders below
'Use cells 17–19 for training/eval loaders.'


(13640, 1705, 1705)

In [None]:
# (Removed) SmallCNN baseline — using pretrained ResNet18 (v2)
'Use cells 11–19 for model, training and evaluation.'


(2206462,
 SmallCNN(
   (features): Sequential(
     (0): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
     (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
     (2): ReLU(inplace=True)
     (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
     (4): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
     (5): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
     (6): ReLU(inplace=True)
     (7): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
     (8): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
     (9): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
     (10): ReLU(inplace=True)
     (11): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
   )
   (classifier): Sequential(
     (0): Dropout(p=0.3, inplace=False)
     (1): Linear(in_features=8192, 

In [None]:
# (Removed) Baseline training utilities — replaced by v2 training below
'Use cells 18–19 (or 15–16) for training and evaluation.'




Epoch 01/15 - loss: 4.1028 - val_acc: 0.0575
Saved new best model to artifacts\handwriting_cnn.pt (val_acc=0.0575)




Epoch 02/15 - loss: 3.6793 - val_acc: 0.1959
Saved new best model to artifacts\handwriting_cnn.pt (val_acc=0.1959)




Epoch 03/15 - loss: 3.0769 - val_acc: 0.3619
Saved new best model to artifacts\handwriting_cnn.pt (val_acc=0.3619)




Epoch 04/15 - loss: 2.7654 - val_acc: 0.4094
Saved new best model to artifacts\handwriting_cnn.pt (val_acc=0.4094)




Epoch 05/15 - loss: 2.5328 - val_acc: 0.4522
Saved new best model to artifacts\handwriting_cnn.pt (val_acc=0.4522)




Epoch 06/15 - loss: 2.3462 - val_acc: 0.5196
Saved new best model to artifacts\handwriting_cnn.pt (val_acc=0.5196)




Epoch 07/15 - loss: 2.2319 - val_acc: 0.5724
Saved new best model to artifacts\handwriting_cnn.pt (val_acc=0.5724)




Epoch 08/15 - loss: 2.1515 - val_acc: 0.5736
Saved new best model to artifacts\handwriting_cnn.pt (val_acc=0.5736)




Epoch 09/15 - loss: 2.0736 - val_acc: 0.6240
Saved new best model to artifacts\handwriting_cnn.pt (val_acc=0.6240)




Epoch 10/15 - loss: 2.0110 - val_acc: 0.6416
Saved new best model to artifacts\handwriting_cnn.pt (val_acc=0.6416)




Epoch 11/15 - loss: 1.9877 - val_acc: 0.5789




Epoch 12/15 - loss: 1.9472 - val_acc: 0.5320




KeyboardInterrupt: 

In [None]:
# (Removed) Baseline evaluation — use v2/v2-fast evaluation cells (16 or 19)
'Use the v2 model: artifacts_v2/handwriting_resnet18_best.pt'


Test accuracy: 0.0416
Classification report:
              precision    recall  f1-score   support

           0       0.00      0.00      0.00        27
           1       0.00      0.00      0.00        28
           2       0.00      0.00      0.00        28
           3       0.00      0.00      0.00        27
           4       0.00      0.00      0.00        27
           5       0.00      0.00      0.00        27
           6       0.00      0.00      0.00        27
           7       0.00      0.00      0.00        28
           8       0.03      1.00      0.05        27
           9       0.00      0.00      0.00        27
      A_caps       0.00      0.00      0.00        28
      B_caps       0.00      0.00      0.00        28
      C_caps       0.00      0.00      0.00        27
      D_caps       0.00      0.00      0.00        27
      E_caps       0.00      0.00      0.00        28
      F_caps       0.00      0.00      0.00        27
      G_caps       0.00      0.00   

  _warn_prf(average, modifier, f"{metric.capitalize()} is", result.shape[0])
  _warn_prf(average, modifier, f"{metric.capitalize()} is", result.shape[0])
  _warn_prf(average, modifier, f"{metric.capitalize()} is", result.shape[0])


Saved predictions to artifacts\test_predictions.csv


In [None]:
# Improved configuration (v2)
IMG_SIZE_V2 = 128
CHANNELS_V2 = 3  # use 3-channel to leverage pretrained backbones
EPOCHS_V2 = 30
BATCH_SIZE_V2 = 128
WEIGHT_DECAY_V2 = 1e-4
LABEL_SMOOTHING_V2 = 0.1
RANDOM_SEED_V2 = 42
USE_PRETRAINED_V2 = True  # will fallback automatically if not available

rng_v2 = np.random.RandomState(RANDOM_SEED_V2)

# Create artifacts dir for v2
save_dir_v2 = Path('artifacts_v2')
save_dir_v2.mkdir(exist_ok=True)
model_path_v2 = save_dir_v2 / 'handwriting_resnet18_best.pt'
labels_path_v2 = save_dir_v2 / 'labels.json'
with open(labels_path_v2, 'w') as f:
    json.dump(idx_to_label, f, indent=2)


In [None]:
# Model v2: ResNet18 backbone with pretrained fallback
from typing import Optional

USE_IMAGENET_NORM_V2 = False

def build_resnet18_head(num_classes: int, pretrained: bool = True) -> nn.Module:
    global USE_IMAGENET_NORM_V2
    if torchvision is None:
        raise RuntimeError('torchvision not available')
    try:
        try:
            # torchvision >=0.13
            from torchvision.models import resnet18, ResNet18_Weights
            weights = ResNet18_Weights.IMAGENET1K_V1 if pretrained else None
            model = resnet18(weights=weights)
            USE_IMAGENET_NORM_V2 = pretrained
        except Exception:
            # older API
            from torchvision.models import resnet18
            model = resnet18(pretrained=pretrained)
            USE_IMAGENET_NORM_V2 = pretrained
    except Exception as e:
        print('Pretrained resnet18 unavailable, falling back to randomly initialized.')
        from torchvision.models import resnet18
        model = resnet18(pretrained=False)
        USE_IMAGENET_NORM_V2 = False
    # Adjust first layer to accept 3-channel grayscale (we feed 3-channel grayscale, so no change needed)
    in_features = model.fc.in_features
    model.fc = nn.Linear(in_features, num_classes)
    return model

try:
    model_v2 = build_resnet18_head(len(labels), pretrained=USE_PRETRAINED_V2).to(device)
    print('Using ResNet18 backbone for v2. Imagenet norm:', USE_IMAGENET_NORM_V2)
except Exception as e:
    print('Falling back to SmallCNN due to error:', e)
    CHANNELS_V2 = 1
    class DeeperCNN(nn.Module):
        def __init__(self, num_classes: int):
            super().__init__()
            self.features = nn.Sequential(
                nn.Conv2d(1, 64, 3, padding=1), nn.BatchNorm2d(64), nn.ReLU(),
                nn.Conv2d(64, 64, 3, padding=1), nn.BatchNorm2d(64), nn.ReLU(),
                nn.MaxPool2d(2),
                nn.Conv2d(64, 128, 3, padding=1), nn.BatchNorm2d(128), nn.ReLU(),
                nn.Conv2d(128, 128, 3, padding=1), nn.BatchNorm2d(128), nn.ReLU(),
                nn.MaxPool2d(2),
                nn.Conv2d(128, 256, 3, padding=1), nn.BatchNorm2d(256), nn.ReLU(),
                nn.MaxPool2d(2),
            )
            fsize = IMG_SIZE_V2 // 8
            self.classifier = nn.Sequential(
                nn.Dropout(0.4), nn.Linear(256*fsize*fsize, 512), nn.ReLU(),
                nn.Dropout(0.4), nn.Linear(512, num_classes)
            )
        def forward(self, x):
            x = self.features(x)
            x = torch.flatten(x, 1)
            return self.classifier(x)
    model_v2 = DeeperCNN(len(labels)).to(device)
    USE_IMAGENET_NORM_V2 = False

sum(p.numel() for p in model_v2.parameters() if p.requires_grad), model_v2.__class__.__name__


Downloading: "https://download.pytorch.org/models/resnet18-f37072fd.pth" to C:\Users\royha/.cache\torch\hub\checkpoints\resnet18-f37072fd.pth


100.0%


Using ResNet18 backbone for v2. Imagenet norm: True


(11208318, 'ResNet')

In [None]:
# Transforms v2 (with optional dataset normalization)
from typing import Tuple

def compute_mean_std(paths_subset: np.ndarray, channels: int, sample_size: int = 1024) -> Tuple[list, list]:
    sample_paths = paths_subset.copy()
    if len(sample_paths) > sample_size:
        sample_paths = rng_v2.choice(sample_paths, size=sample_size, replace=False)
    means = np.zeros(channels, dtype=np.float64)
    sq_means = np.zeros(channels, dtype=np.float64)
    n_pix_total = 0
    for p in sample_paths:
        with Image.open(p) as img:
            img = img.convert('L')  # grayscale base
            img = img.resize((IMG_SIZE_V2, IMG_SIZE_V2))
            arr = np.array(img, dtype=np.float32) / 255.0
            if channels == 1:
                arr_c = arr[None, ...]
            else:
                arr_c = np.stack([arr, arr, arr], axis=0)
            n_pix = arr_c.shape[1] * arr_c.shape[2]
            means += arr_c.reshape(channels, -1).sum(axis=1)
            sq_means += (arr_c.reshape(channels, -1) ** 2).sum(axis=1)
            n_pix_total += n_pix
    means /= n_pix_total
    stds = np.sqrt(sq_means / n_pix_total - means**2)
    return means.tolist(), stds.tolist()

if transforms is None:
    raise RuntimeError('torchvision is required for v2 pipeline transforms')

if USE_IMAGENET_NORM_V2:
    # ImageNet mean/std
    mean_v2 = [0.485, 0.456, 0.406]
    std_v2 = [0.229, 0.224, 0.225]
else:
    mean_v2, std_v2 = compute_mean_std(X_train, CHANNELS_V2, sample_size=1024)

train_tfms_v2 = transforms.Compose([
    transforms.Grayscale(num_output_channels=CHANNELS_V2),
    transforms.Resize(int(IMG_SIZE_V2*1.1)),
    transforms.RandomCrop(IMG_SIZE_V2),
    transforms.RandomRotation(8, fill=0),
    transforms.RandomAffine(degrees=0, shear=8, translate=(0.05,0.05)),
    transforms.ColorJitter(brightness=0.2, contrast=0.2),
    transforms.RandomPerspective(distortion_scale=0.2, p=0.3),
    transforms.ToTensor(),
    transforms.Normalize(mean_v2, std_v2),
])

val_tfms_v2 = transforms.Compose([
    transforms.Grayscale(num_output_channels=CHANNELS_V2),
    transforms.Resize((IMG_SIZE_V2, IMG_SIZE_V2)),
    transforms.ToTensor(),
    transforms.Normalize(mean_v2, std_v2),
])


In [None]:
# Dataset & DataLoaders v2 with class balancing
from torch.utils.data import WeightedRandomSampler

class HandwritingDatasetV2(Dataset):
    def __init__(self, paths: np.ndarray, labels: np.ndarray, transform=None):
        self.paths = paths
        self.labels = labels
        self.transform = transform
    def __len__(self):
        return len(self.paths)
    def __getitem__(self, idx):
        p = Path(self.paths[idx])
        y = int(self.labels[idx])
        with Image.open(p) as img:
            img = img.convert('L')
            x = (self.transform or val_tfms_v2)(img)
        return x, y

train_ds_v2 = HandwritingDatasetV2(X_train, y_train, transform=train_tfms_v2)
val_ds_v2   = HandwritingDatasetV2(X_val, y_val, transform=val_tfms_v2)
test_ds_v2  = HandwritingDatasetV2(X_test, y_test, transform=val_tfms_v2)

# Class weights for sampler and loss
class_counts = Counter(y_train.tolist())
num_classes = len(labels)
class_freq = np.array([class_counts.get(i, 0) for i in range(num_classes)], dtype=np.float64)
class_weights = 1.0 / np.maximum(class_freq, 1.0)
class_weights = class_weights / class_weights.sum() * num_classes

# Sample weight per instance
sample_weights = np.array([class_weights[y] for y in y_train], dtype=np.float64)
sampler = WeightedRandomSampler(weights=torch.DoubleTensor(sample_weights), num_samples=len(sample_weights), replacement=True)

num_workers_v2 = 0 if os.name == 'nt' else 2
train_loader_v2 = DataLoader(train_ds_v2, batch_size=BATCH_SIZE_V2, sampler=sampler, num_workers=num_workers_v2, pin_memory=True)
val_loader_v2   = DataLoader(val_ds_v2, batch_size=BATCH_SIZE_V2, shuffle=False, num_workers=num_workers_v2)
test_loader_v2  = DataLoader(test_ds_v2, batch_size=BATCH_SIZE_V2, shuffle=False, num_workers=num_workers_v2)

len(train_ds_v2), len(val_ds_v2), len(test_ds_v2), class_counts.most_common(10)


(13640,
 1705,
 1705,
 [(34, 220),
  (42, 220),
  (52, 220),
  (53, 220),
  (37, 220),
  (47, 220),
  (50, 220),
  (17, 220),
  (55, 220),
  (51, 220)])

: 

In [None]:
# Training loop v2 with AMP, label smoothing, AdamW, OneCycleLR, early stopping on macro F1
from sklearn.metrics import accuracy_score, f1_score
from torch.cuda.amp import autocast, GradScaler

criterion_v2 = nn.CrossEntropyLoss(weight=torch.tensor(class_weights, dtype=torch.float32, device=device),
                                   label_smoothing=LABEL_SMOOTHING_V2)
optimizer_v2 = torch.optim.AdamW(model_v2.parameters(), lr=3e-3, weight_decay=WEIGHT_DECAY_V2)
steps_per_epoch = max(1, math.ceil(len(train_ds_v2) / BATCH_SIZE_V2))
scheduler_v2 = torch.optim.lr_scheduler.OneCycleLR(optimizer_v2, max_lr=3e-3, steps_per_epoch=steps_per_epoch,
                                                   epochs=EPOCHS_V2, pct_start=0.2, div_factor=10.0, final_div_factor=10.0)

scaler = GradScaler(enabled=(device.type=='cuda'))

best_val_f1 = 0.0
patience = 6
pat = 0

history = {'epoch': [], 'train_loss': [], 'val_acc': [], 'val_f1': []}

for epoch in range(1, EPOCHS_V2+1):
    model_v2.train()
    running_loss = 0.0
    b = 0
    for xb, yb in train_loader_v2:
        xb = xb.to(device, non_blocking=True)
        yb = yb.to(device, non_blocking=True)
        optimizer_v2.zero_grad(set_to_none=True)
        with autocast(enabled=(device.type=='cuda')):
            logits = model_v2(xb)
            loss = criterion_v2(logits, yb)
        scaler.scale(loss).backward()
        scaler.step(optimizer_v2)
        scaler.update()
        scheduler_v2.step()
        running_loss += loss.item()
        b += 1
    train_loss = running_loss / max(1, b)

    # Validate
    model_v2.eval()
    ys, ys_pred = [], []
    with torch.no_grad():
        for xb, yb in val_loader_v2:
            xb = xb.to(device)
            yb = yb.to(device)
            with autocast(enabled=(device.type=='cuda')):
                logits = model_v2(xb)
            preds = logits.argmax(dim=1)
            ys.extend(yb.cpu().numpy().tolist())
            ys_pred.extend(preds.cpu().numpy().tolist())
    val_acc = accuracy_score(ys, ys_pred)
    val_f1 = f1_score(ys, ys_pred, average='macro')
    history['epoch'].append(epoch)
    history['train_loss'].append(train_loss)
    history['val_acc'].append(val_acc)
    history['val_f1'].append(val_f1)

    print(f"[v2] Epoch {epoch:02d}/{EPOCHS_V2} - loss: {train_loss:.4f} - val_acc: {val_acc:.4f} - val_macro_f1: {val_f1:.4f}")

    if val_f1 > best_val_f1:
        best_val_f1 = val_f1
        torch.save({'model_state': model_v2.state_dict(),
                    'config': {'img_size': IMG_SIZE_V2, 'num_classes': len(labels), 'channels': CHANNELS_V2,
                               'mean': mean_v2, 'std': std_v2, 'backbone': model_v2.__class__.__name__}},
                   model_path_v2)
        print(f"Saved best v2 model (val_macro_f1={best_val_f1:.4f}) -> {model_path_v2}")
        pat = 0
    else:
        pat += 1
        if pat >= patience:
            print('Early stopping triggered')
            break

json.dump(history, open(save_dir_v2 / 'train_history.json', 'w'), indent=2)
print('Best v2 val macro F1:', best_val_f1)


  scaler = GradScaler(enabled=(device.type=='cuda'))
  with autocast(enabled=(device.type=='cuda')):
  with autocast(enabled=(device.type=='cuda')):


[v2] Epoch 01/30 - loss: 1.8219 - val_acc: 0.8117 - val_macro_f1: 0.8068
Saved best v2 model (val_macro_f1=0.8068) -> artifacts_v2\handwriting_resnet18_best.pt


  with autocast(enabled=(device.type=='cuda')):


In [None]:
# Test evaluation v2
from sklearn.metrics import classification_report, confusion_matrix

if model_path_v2.exists():
    ckpt = torch.load(model_path_v2, map_location=device)
    model_v2.load_state_dict(ckpt['model_state'])

model_v2.eval()
ys, ys_pred = [], []
with torch.no_grad():
    for xb, yb in test_loader_v2:
        xb = xb.to(device)
        yb = yb.to(device)
        logits = model_v2(xb)
        preds = logits.argmax(dim=1)
        ys.extend(yb.cpu().numpy().tolist())
        ys_pred.extend(preds.cpu().numpy().tolist())

test_acc_v2 = accuracy_score(ys, ys_pred)
test_f1_v2 = f1_score(ys, ys_pred, average='macro')
print(f"[v2] Test accuracy: {test_acc_v2:.4f}, macro F1: {test_f1_v2:.4f}")

report_v2 = classification_report(ys, ys_pred, target_names=[idx_to_label[i] for i in range(len(labels))])
print(report_v2)

cm_v2 = confusion_matrix(ys, ys_pred)
print('Confusion matrix shape:', cm_v2.shape)

pd.DataFrame({
    'path': list(X_test),
    'true_label': [idx_to_label[int(i)] for i in ys],
    'pred_label': [idx_to_label[int(i)] for i in ys_pred],
}).to_csv(save_dir_v2 / 'test_predictions_v2.csv', index=False)

print('Saved v2 predictions to', save_dir_v2 / 'test_predictions_v2.csv')


In [None]:
# Fast(er) loaders for CPU, and lighter transforms if no CUDA
from tqdm.auto import tqdm

pin_memory_v2 = (device.type == 'cuda')
num_workers_v2 = 0 if os.name == 'nt' else max(1, min(4, (os.cpu_count() or 2) - 1))

if transforms is None:
    raise RuntimeError('torchvision is required for v2 fast pipeline')

if device.type == 'cuda':
    train_tfms_v2_fast = train_tfms_v2
    val_tfms_v2_fast = val_tfms_v2
else:
    # Lighter CPU transforms
    train_tfms_v2_fast = transforms.Compose([
        transforms.Grayscale(num_output_channels=CHANNELS_V2),
        transforms.Resize(int(IMG_SIZE_V2*1.05)),
        transforms.RandomCrop(IMG_SIZE_V2),
        transforms.RandomRotation(6, fill=0),
        transforms.RandomAffine(degrees=0, shear=6, translate=(0.03,0.03)),
        transforms.ToTensor(),
        transforms.Normalize(mean_v2, std_v2),
    ])
    val_tfms_v2_fast = transforms.Compose([
        transforms.Grayscale(num_output_channels=CHANNELS_V2),
        transforms.Resize((IMG_SIZE_V2, IMG_SIZE_V2)),
        transforms.ToTensor(),
        transforms.Normalize(mean_v2, std_v2),
    ])

train_ds_v2_fast = HandwritingDatasetV2(X_train, y_train, transform=train_tfms_v2_fast)
val_ds_v2_fast   = HandwritingDatasetV2(X_val, y_val, transform=val_tfms_v2_fast)
test_ds_v2_fast  = HandwritingDatasetV2(X_test, y_test, transform=val_tfms_v2_fast)

# Rebuild sampler to match dataset length
class_counts = Counter(y_train.tolist())
num_classes = len(labels)
class_freq = np.array([class_counts.get(i, 0) for i in range(num_classes)], dtype=np.float64)
class_weights = 1.0 / np.maximum(class_freq, 1.0)
class_weights = class_weights / class_weights.sum() * num_classes
sample_weights = np.array([class_weights[y] for y in y_train], dtype=np.float64)
sampler_fast = WeightedRandomSampler(weights=torch.DoubleTensor(sample_weights), num_samples=len(sample_weights), replacement=True)

train_loader_v2_fast = DataLoader(train_ds_v2_fast, batch_size=BATCH_SIZE_V2, sampler=sampler_fast, num_workers=num_workers_v2, pin_memory=pin_memory_v2)
val_loader_v2_fast   = DataLoader(val_ds_v2_fast, batch_size=BATCH_SIZE_V2, shuffle=False, num_workers=num_workers_v2, pin_memory=pin_memory_v2)
test_loader_v2_fast  = DataLoader(test_ds_v2_fast, batch_size=BATCH_SIZE_V2, shuffle=False, num_workers=num_workers_v2, pin_memory=pin_memory_v2)

len(train_ds_v2_fast), len(val_ds_v2_fast), len(test_ds_v2_fast), num_workers_v2, pin_memory_v2


In [None]:
# v2 training (fast) with head-only warmup then partial unfreeze, tqdm, new torch.amp API
from sklearn.metrics import accuracy_score, f1_score

# Update AMP API per warning
use_amp = (device.type == 'cuda')
scaler = torch.amp.GradScaler(device.type) if use_amp else None

# Rebuild optimizer & scheduler
for p in model_v2.parameters():
    p.requires_grad = True

# Head-only warmup: train final layer(s) first
head_params = list(model_v2.fc.parameters()) if hasattr(model_v2, 'fc') else list(model_v2.classifier.parameters())
backbone_params = [p for p in model_v2.parameters() if p not in head_params]
for p in backbone_params:
    p.requires_grad = False

optimizer_v2 = torch.optim.AdamW(filter(lambda p: p.requires_grad, model_v2.parameters()), lr=3e-3, weight_decay=WEIGHT_DECAY_V2)
steps_per_epoch = max(1, math.ceil(len(train_ds_v2_fast) / BATCH_SIZE_V2))
scheduler_v2 = torch.optim.lr_scheduler.OneCycleLR(optimizer_v2, max_lr=3e-3, steps_per_epoch=steps_per_epoch,
                                                   epochs=EPOCHS_V2, pct_start=0.2, div_factor=10.0, final_div_factor=10.0)

criterion_v2 = nn.CrossEntropyLoss(weight=torch.tensor(class_weights, dtype=torch.float32, device=device),
                                   label_smoothing=LABEL_SMOOTHING_V2)

best_val_f1 = 0.0
patience = 5
pat = 0

unfrozen = False

for epoch in range(1, EPOCHS_V2+1):
    model_v2.train()
    running_loss = 0.0
    pbar = tqdm(train_loader_v2_fast, desc=f"[v2-fast] Epoch {epoch}/{EPOCHS_V2}")
    for xb, yb in pbar:
        xb = xb.to(device)
        yb = yb.to(device)
        optimizer_v2.zero_grad(set_to_none=True)
        if use_amp:
            with torch.amp.autocast(device_type=device.type, enabled=True):
                logits = model_v2(xb)
                loss = criterion_v2(logits, yb)
            scaler.scale(loss).backward()
            scaler.step(optimizer_v2)
            scaler.update()
        else:
            logits = model_v2(xb)
            loss = criterion_v2(logits, yb)
            loss.backward()
            optimizer_v2.step()
        scheduler_v2.step()
        running_loss += loss.item()
        pbar.set_postfix({'loss': f"{running_loss / (pbar.n or 1):.4f}"})
    train_loss = running_loss / max(1, len(train_loader_v2_fast))

    # Validate
    model_v2.eval()
    ys, ys_pred = [], []
    with torch.no_grad():
        for xb, yb in val_loader_v2_fast:
            xb = xb.to(device)
            yb = yb.to(device)
            if use_amp:
                with torch.amp.autocast(device_type=device.type, enabled=True):
                    logits = model_v2(xb)
            else:
                logits = model_v2(xb)
            preds = logits.argmax(dim=1)
            ys.extend(yb.cpu().numpy().tolist())
            ys_pred.extend(preds.cpu().numpy().tolist())
    val_acc = accuracy_score(ys, ys_pred)
    val_f1 = f1_score(ys, ys_pred, average='macro')
    print(f"[v2-fast] Epoch {epoch:02d}/{EPOCHS_V2} - loss: {train_loss:.4f} - val_acc: {val_acc:.4f} - val_macro_f1: {val_f1:.4f}")

    # Unfreeze backbone after 3 epochs
    if (not unfrozen) and epoch >= 3:
        for p in backbone_params:
            p.requires_grad = True
        optimizer_v2 = torch.optim.AdamW(model_v2.parameters(), lr=1e-3, weight_decay=WEIGHT_DECAY_V2)
        scheduler_v2 = torch.optim.lr_scheduler.OneCycleLR(optimizer_v2, max_lr=1e-3, steps_per_epoch=steps_per_epoch,
                                                           epochs=max(5, EPOCHS_V2-epoch), pct_start=0.2, div_factor=10.0, final_div_factor=10.0)
        unfrozen = True
        print('Backbone unfrozen and optimizer reset with lower LR')

    if val_f1 > best_val_f1:
        best_val_f1 = val_f1
        torch.save({'model_state': model_v2.state_dict(),
                    'config': {'img_size': IMG_SIZE_V2, 'num_classes': len(labels), 'channels': CHANNELS_V2,
                               'mean': mean_v2, 'std': std_v2, 'backbone': model_v2.__class__.__name__}},
                   model_path_v2)
        print(f"Saved best v2 model (val_macro_f1={best_val_f1:.4f}) -> {model_path_v2}")
        pat = 0
    else:
        pat += 1
        if pat >= patience:
            print('Early stopping triggered')
            break


In [None]:
# v2-fast test evaluation
if model_path_v2.exists():
    ckpt = torch.load(model_path_v2, map_location=device)
    model_v2.load_state_dict(ckpt['model_state'])

model_v2.eval()
ys, ys_pred = [], []
with torch.no_grad():
    for xb, yb in test_loader_v2_fast:
        xb = xb.to(device)
        yb = yb.to(device)
        logits = model_v2(xb)
        preds = logits.argmax(dim=1)
        ys.extend(yb.cpu().numpy().tolist())
        ys_pred.extend(preds.cpu().numpy().tolist())

test_acc_v2 = accuracy_score(ys, ys_pred)
test_f1_v2 = f1_score(ys, ys_pred, average='macro')
print(f"[v2-fast] Test accuracy: {test_acc_v2:.4f}, macro F1: {test_f1_v2:.4f}")

pd.DataFrame({
    'path': list(X_test),
    'true_label': [idx_to_label[int(i)] for i in ys],
    'pred_label': [idx_to_label[int(i)] for i in ys_pred],
}).to_csv(save_dir_v2 / 'test_predictions_v2_fast.csv', index=False)

print('Saved v2-fast predictions to', save_dir_v2 / 'test_predictions_v2_fast.csv')


In [None]:
# Inference helper v2
class InferenceModelV2:
    def __init__(self, model_path: Path, labels_path: Path):
        with open(labels_path, 'r') as f:
            self.idx_to_label = {int(k):v for k,v in json.load(f).items()}
        ckpt = torch.load(model_path, map_location=device)
        cfg = ckpt.get('config', {})
        self.img_size = cfg.get('img_size', IMG_SIZE_V2)
        self.channels = cfg.get('channels', CHANNELS_V2)
        self.mean = cfg.get('mean', [0.5]*self.channels)
        self.std = cfg.get('std', [0.5]*self.channels)
        # Build model
        try:
            self.model = build_resnet18_head(len(self.idx_to_label), pretrained=False).to(device)
        except Exception:
            self.model = model_v2.__class__(len(self.idx_to_label)).to(device)
        self.model.load_state_dict(ckpt['model_state'])
        self.model.eval()
        # Transforms
        self.tfms = transforms.Compose([
            transforms.Grayscale(num_output_channels=self.channels),
            transforms.Resize((self.img_size, self.img_size)),
            transforms.ToTensor(),
            transforms.Normalize(self.mean, self.std),
        ])
    @torch.no_grad()
    def predict(self, image_path: Path):
        with Image.open(image_path) as img:
            img = img.convert('L')
            x = self.tfms(img).unsqueeze(0).to(device)
        logits = self.model(x)
        pred = logits.argmax(dim=1).item()
        return self.idx_to_label[pred]

# Example:
# infer_v2 = InferenceModelV2(model_path_v2, labels_path_v2)
# infer_v2.predict(Path('Assets/...'))


In [None]:
# (Removed) Baseline inference — use InferenceModelV2 with artifacts_v2
'Use InferenceModelV2(model_path_v2, labels_path_v2)'
