1. Mount Google Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


2. Define project paths

In [None]:
BASE_DIR = '/content/drive/MyDrive/CSAFE_Handwriting'
DATA_DIR = f'{BASE_DIR}/data/writers'
SPLITS_DIR = f'{BASE_DIR}/splits'
CKPT_DIR = f'{BASE_DIR}/checkpoints'
SRC_DIR = f'{BASE_DIR}/src'

import os
for path in [DATA_DIR, SPLITS_DIR, SRC_DIR, CKPT_DIR]:
    print(path, 'correct' if os.path.exists(path) else 'wrong')


/content/drive/MyDrive/CSAFE_Handwriting/data/writers correct
/content/drive/MyDrive/CSAFE_Handwriting/splits correct
/content/drive/MyDrive/CSAFE_Handwriting/src correct
/content/drive/MyDrive/CSAFE_Handwriting/checkpoints correct


3. Copy data to fast disk (/content)

This is for reading many PNGs from Drive.


In [None]:
!mkdir -p /content/data/writers
!mkdir -p /content/splits

In [None]:
!rsync -a --info=progress2 "$DATA_DIR/" "/content/data/writers/"
!rsync -a "$SPLITS_DIR/" "/content/splits/"

  4,147,064,358  92%  597.43kB/s    1:52:58 (xfr#11261, ir-chk=1051/12788)

In [None]:
import os
print("Writer directories:", len(os.listdir("/content/data/writers")))
print("Split files:", os.listdir("/content/splits"))

In [None]:
import os

SRC_DIR = "/content/drive/MyDrive/CSAFE_Handwriting/src"

print("SRC_DIR:", SRC_DIR)
files = os.listdir(SRC_DIR)
for f in files:
    print(" -", f)


SRC_DIR: /content/drive/MyDrive/CSAFE_Handwriting/src
 - train_baseline.py


In [None]:
import os, re, json, unicodedata
from pathlib import Path

WRITERS_ROOT = Path("/content/data/writers")
SPLITS_DIR   = Path("/content/splits")

# helper
def norm(s):
    # normalize unicode, strip whitespace (incl. newlines, tabs), collapse spaces
    s2 = unicodedata.normalize("NFKC", s)
    s2 = s2.strip()
    s2 = re.sub(r"\s+", "", s2)
    return s2

# list directories (repr shows hidden whitespace/newlines)
dirs = [d for d in os.listdir(WRITERS_ROOT) if (WRITERS_ROOT/d).is_dir()]
print("Total dirs under writers:", len(dirs))

# identify suspicious names
valid_pat = re.compile(r"^w\d{4,}$")  # e.g., w0001
suspect = []
for d in dirs:
    n = norm(d)
    if (not n) or (not valid_pat.match(n)):
        suspect.append((d, n, len(d)))

print("\nSUSPECT folders (showing up to 20):")
for i, (raw, nrm, L) in enumerate(suspect[:20], 1):
    print(f"{i:02d}) raw={repr(raw)} | normalized={repr(nrm)} | len={L}")

print("\nCounts -> valid:", len(dirs)-len(suspect), "| suspect:", len(suspect))

# show split IDs and which won't match disk after normalization
def load_ids(name):
    with open(SPLITS_DIR/f"{name}.json","r",encoding="utf-8") as f:
        return json.load(f)

train_ids = load_ids("train")
val_ids   = load_ids("val")
test_ids  = load_ids("test")

all_ids_raw = train_ids + val_ids + test_ids
all_ids_norm = [norm(x) for x in all_ids_raw]

dir_set_norm = {norm(d) for d in dirs if valid_pat.match(norm(d))}
split_set_norm = set(all_ids_norm)

missing_on_disk = sorted(split_set_norm - dir_set_norm)
extra_on_disk   = sorted(dir_set_norm - split_set_norm)

print("\nSplit sizes -> train/val/test:", len(train_ids), len(val_ids), len(test_ids))
print("Unique split IDs (normalized):", len(split_set_norm))
print("Missing on disk (first 20):", missing_on_disk[:20])
print("Extra on disk (first 20):", extra_on_disk[:20])


Total dirs under writers: 475

SUSPECT folders (showing up to 20):

Counts -> valid: 475 | suspect: 0

Split sizes -> train/val/test: 1 1 1
Unique split IDs (normalized): 1
Missing on disk (first 20): ['']
Extra on disk (first 20): ['w0001', 'w0002', 'w0003', 'w0004', 'w0005', 'w0006', 'w0009', 'w0010', 'w0011', 'w0012', 'w0013', 'w0015', 'w0016', 'w0017', 'w0018', 'w0020', 'w0022', 'w0023', 'w0024', 'w0025']


In [None]:
import os, re, json, random, shutil
from pathlib import Path

# Paths
WRITERS_ROOT = Path("/content/data/writers")
LOCAL_SPLITS = Path("/content/splits")
DRIVE_SPLITS = Path("/content/drive/MyDrive/CSAFE_Handwriting/splits")

LOCAL_SPLITS.mkdir(parents=True, exist_ok=True)
DRIVE_SPLITS.mkdir(parents=True, exist_ok=True)

# Backup any existing splits in Drive (simple copy with suffix)
for name in ["train.json", "val.json", "test.json"]:
    src = DRIVE_SPLITS / name
    if src.exists():
        shutil.copy2(src, DRIVE_SPLITS / f"{name}.bak")

# Collect valid writer IDs
valid_pat = re.compile(r"^w\d{4,}$")
writers = sorted([d.name for d in WRITERS_ROOT.iterdir() if d.is_dir() and valid_pat.match(d.name)])

print("Detected writers:", len(writers))
assert len(writers) > 3, "Not enough writers found to split."

# Reproducible shuffle
random.seed(42)
random.shuffle(writers)

# Ratios
train_ratio, val_ratio, test_ratio = 0.70, 0.15, 0.15
N = len(writers)
n_train = int(N * train_ratio)
n_val   = int(N * val_ratio)
n_test  = N - n_train - n_val

train_ids = writers[:n_train]
val_ids   = writers[n_train:n_train+n_val]
test_ids  = writers[n_train+n_val:]

print(f"Split sizes -> train={len(train_ids)} val={len(val_ids)} test={len(test_ids)} (total={N})")

# Save to LOCAL (used by training) and DRIVE (for persistence)
for target in [LOCAL_SPLITS, DRIVE_SPLITS]:
    with open(target/"train.json", "w", encoding="utf-8") as f: json.dump(train_ids, f, indent=2)
    with open(target/"val.json",   "w", encoding="utf-8") as f: json.dump(val_ids,   f, indent=2)
    with open(target/"test.json",  "w", encoding="utf-8") as f: json.dump(test_ids,  f, indent=2)

# Quick verify
def load_ids(p):
    with open(p,"r",encoding="utf-8") as f: return json.load(f)

lt, lv, ls = load_ids(LOCAL_SPLITS/"train.json"), load_ids(LOCAL_SPLITS/"val.json"), load_ids(LOCAL_SPLITS/"test.json")
print("LOCAL check:", len(lt), len(lv), len(ls))
dt, dv, ds = load_ids(DRIVE_SPLITS/"train.json"), load_ids(DRIVE_SPLITS/"val.json"), load_ids(DRIVE_SPLITS/"test.json")
print("DRIVE check:", len(dt), len(dv), len(ds))

print("First few train IDs:", lt[:5])


Detected writers: 475
Split sizes -> train=332 val=71 test=72 (total=475)
LOCAL check: 332 71 72
DRIVE check: 332 71 72
First few train IDs: ['w0352', 'w0289', 'w0064', 'w0077', 'w0518']


In [None]:
SRC_DIR = "/content/drive/MyDrive/CSAFE_Handwriting/src"
print("SRC_DIR exists:", SRC_DIR, os.path.exists(SRC_DIR))
print("Files currently inside src:", os.listdir(SRC_DIR))


SRC_DIR exists: /content/drive/MyDrive/CSAFE_Handwriting/src True
Files currently inside src: ['train_baseline.py', 'train_resnet.py']


In [None]:
import re

fp = "/content/drive/MyDrive/CSAFE_Handwriting/src/train_resnet.py"

with open(fp,"r") as f:
    code = f.read()

# Replace the scheduler line (remove verbose=True)
code = re.sub(
    r'scheduler\s*=\s*optim\.lr_scheduler\.ReduceLROnPlateau\(optimizer, mode="max", factor=0.5, patience=2, verbose=True\)',
    'scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="max", factor=0.5, patience=2)',
    code
)

with open(fp,"w") as f:
    f.write(code)

print("✅ Patched successfully.")


✅ Patched successfully.


In [None]:
import sys, runpy
SCRIPT = "/content/drive/MyDrive/CSAFE_Handwriting/src/train_resnet_closedset.py"
sys.argv = [
    "train_resnet_closedset.py",
    "--data_root", "/content/data/writers",
    "--ckpt_dir", "/content/drive/MyDrive/CSAFE_Handwriting/checkpoints",
    "--image_size", "224",
    "--batch_size", "32",
    "--epochs", "10",
    "--lr", "1e-3",
    "--weight_decay", "1e-4",
]
print("Running:", SCRIPT)
print("Args:", " ".join(sys.argv[1:]))
runpy.run_path(SCRIPT, run_name="__main__")