
# Kraken + ALTO OCR ‚Äî Colab (Per‚ÄëManuscript **Project Folder** on Drive)

**One input:** set `PROJECT_ID` (your manuscript ID).  
This notebook will create and use a **project folder on Drive** so that **everything** for that manuscript stays together:

```
MyDrive/kraken_projects/<PROJECT_ID>/
‚îú‚îÄ‚îÄ data/           # upload & extracted dataset (ALTO XML + images)
‚îî‚îÄ‚îÄ models/
    ‚îú‚îÄ‚îÄ rec/        # recognition models (attempt_01.mlmodel, ...)
    ‚îî‚îÄ‚îÄ seg/        # segmentation models (optional)
```

What you get:
- ALTO‚Äëfirst pairing, ZIP upload into the **project data/** folder.
- **Auto attempt detection**: cross‚Äëmanuscript warm‚Äëstart for attempt_01 if available; resume & lower LR later.
- **CPU speed boosts** (threads, Pillow‚ÄëSIMD, tuned batch/workers).
- Lean Drive usage (small pip cache only).


## 1) Connect Google Drive

In [None]:

from google.colab import drive  # type: ignore
drive.mount('/content/drive')
print("‚úÖ Drive mounted at /content/drive")


## 2) Project Settings (only edit `PROJECT_ID`)

In [None]:

#@title üîß Project Settings
from pathlib import Path
import os

PROJECT_ID = "0093"  #@param {type:"string"}

ROOT_IN_DRIVE = "/content/drive/MyDrive"
PROJECTS_ROOT = f"{ROOT_IN_DRIVE}/kraken_projects"
PROJECT_DIR   = f"{PROJECTS_ROOT}/{PROJECT_ID}"
DATA_DIR      = f"{PROJECT_DIR}/data"
MODELS_ROOT   = f"{PROJECT_DIR}/models"
REC_MODELS    = f"{MODELS_ROOT}/rec"
SEG_MODELS    = f"{MODELS_ROOT}/seg"
PIP_CACHE_DIR = f"{ROOT_IN_DRIVE}/.pip-cache"   # small cache only
LISTS_DIR     = f"{PROJECT_DIR}/lists"          # keep lists with project

# Create the full project tree
for p in [PROJECTS_ROOT, PROJECT_DIR, DATA_DIR, MODELS_ROOT, REC_MODELS, SEG_MODELS, PIP_CACHE_DIR, LISTS_DIR]:
    Path(p).mkdir(parents=True, exist_ok=True)

TRAIN_LIST = f"{LISTS_DIR}/train.txt"
VAL_LIST   = f"{LISTS_DIR}/val.txt"

# Auto‚Äëdetect cores; keep 1 for OS
CORES = os.cpu_count() or 2
CPU_THREADS = max(2, CORES - 1)
DEVICE = "cpu"   # set "cuda" if you enable a T4 GPU in Colab

print("‚úÖ Project folders ready")
print("PROJECT_DIR:", PROJECT_DIR)
print("DATA_DIR:", DATA_DIR)
print("REC_MODELS:", REC_MODELS)
print("SEG_MODELS:", SEG_MODELS)
print("LISTS_DIR:", LISTS_DIR)
print("CPU_THREADS:", CPU_THREADS)


## 3) CPU Speed Boost (threads & math libs)

In [None]:

import os
os.environ["OMP_NUM_THREADS"] = str(CPU_THREADS)
os.environ["MKL_NUM_THREADS"] = str(CPU_THREADS)
os.environ["OPENBLAS_NUM_THREADS"] = str(CPU_THREADS)
os.environ["NUMEXPR_NUM_THREADS"] = str(CPU_THREADS)
os.environ["KMP_BLOCKTIME"] = "1"
os.environ["KMP_SETTINGS"] = "0"
os.environ["KMP_AFFINITY"] = "granularity=fine,compact,1,0"

for k in ["OMP_NUM_THREADS","MKL_NUM_THREADS","OPENBLAS_NUM_THREADS","NUMEXPR_NUM_THREADS","KMP_BLOCKTIME","KMP_AFFINITY"]:
    print(k, "=", os.environ.get(k))


## 4) Install Kraken (lean) + Pillow‚ÄëSIMD (faster image IO)

In [None]:

import os, subprocess, shlex

os.environ["PIP_CACHE_DIR"] = PIP_CACHE_DIR
os.environ["PIP_DISABLE_PIP_VERSION_CHECK"] = "1"
os.environ["PIP_NO_INPUT"] = "1"

def is_importable(pkg: str) -> bool:
    try:
        __import__(pkg)
        return True
    except Exception:
        return False

if is_importable("kraken"):
    import kraken
    print(f"‚úÖ Kraken available (version: {getattr(kraken, '__version__', 'unknown')})")
else:
    print("‚è≥ Installing Kraken (lean) ...")
    subprocess.run(shlex.split("python -m pip -q install --upgrade pip"), check=True)
    subprocess.run(shlex.split(
        "python -m pip -q install --prefer-binary --upgrade-strategy only-if-needed "kraken[cairo]""
    ), check=True)
    import kraken, importlib
    importlib.reload(kraken)
    print(f"‚úÖ Installed Kraken (version: {getattr(kraken, '__version__', 'unknown')})")

# Faster image decoding: Pillow-SIMD
print("‚è≥ Switching to Pillow-SIMD for faster image ops...")
subprocess.run(shlex.split("python -m pip -q uninstall -y pillow"), check=True)
subprocess.run(shlex.split(
    "python -m pip -q install --prefer-binary --upgrade-strategy only-if-needed pillow-simd"
), check=True)
print("‚úÖ Pillow-SIMD installed")

def maybe_purge_cache(purge: bool = False):
    if purge:
        print("Purging pip cache on Drive...")
        subprocess.run(shlex.split("python -m pip cache purge"), check=True)


## 5) Upload your ALTO dataset (ZIP ‚Üí Drive project folder)

In [None]:

from google.colab import files  # type: ignore
import zipfile, os

print("üì¶ Please select your ZIP (ALTO XML + images)...")
uploaded = files.upload()
if not uploaded:
    raise SystemExit("‚ùå No file uploaded.")

zip_name = next(iter(uploaded.keys()))
zip_path = f"/content/{zip_name}"

# Extract into the Drive project data folder
with zipfile.ZipFile(zip_path, 'r') as zf:
    zf.extractall(DATA_DIR)

print(f"‚úÖ Extracted into: {DATA_DIR}")
!find "$DATA_DIR" -maxdepth 2 -type f | head -n 20


## 6) Build train/val lists from ALTO (in project folder)

In [None]:

import os
from pathlib import Path
from typing import List, Tuple, Optional
import xml.etree.ElementTree as ET

IMG_EXTS = {".png", ".jpg", ".jpeg", ".tif", ".tiff"}

def _strip_ns(tag: str) -> str:
    return tag.split('}', 1)[1] if '}' in tag else tag

def alto_image_from_xml(xml_path: Path) -> Optional[str]:
    try:
        tree = ET.parse(xml_path)
        root = tree.getroot()
        for el in root.iter():
            if _strip_ns(el.tag) == "fileName":
                if el.text and el.text.strip():
                    return el.text.strip()
    except Exception:
        pass
    return None

def find_image_candidates(root: Path) -> dict:
    images = {}
    for p in root.rglob("*"):
        if p.is_file() and p.suffix.lower() in IMG_EXTS:
            images.setdefault(p.stem, str(p.resolve()))
    return images

def resolve_image_for_alto(xml_path: Path, data_root: Path, images_by_stem: dict) -> Optional[str]:
    fn = alto_image_from_xml(xml_path)
    if fn:
        candidate = (xml_path.parent / fn)
        if candidate.exists():
            return str(candidate.resolve())
        for p in data_root.rglob(Path(fn).name):
            if p.is_file() and p.suffix.lower() in IMG_EXTS:
                return str(p.resolve())
    stem = xml_path.stem
    return images_by_stem.get(stem)

def find_pairs_alto_first(root: str) -> List[Tuple[str, str]]:
    rootp = Path(root)
    images_by_stem = find_image_candidates(rootp)
    pairs: List[Tuple[str, str]] = []
    for xml in rootp.rglob("*.xml"):
        try:
            with open(xml, "r", encoding="utf-8", errors="ignore") as fh:
                head = fh.read(4096)
                if "<alto" not in head:
                    continue
        except Exception:
            continue
        img_path = resolve_image_for_alto(xml, rootp, images_by_stem)
        if img_path:
            pairs.append((img_path, str(xml.resolve())))
    return pairs

def write_list(pairs: List[Tuple[str, str]], out_path: str):
    with open(out_path, "w", encoding="utf-8") as f:
        for img, xml in pairs:
            f.write(f"{img}\t{xml}\n")

pairs = sorted(set(find_pairs_alto_first(DATA_DIR)))
n = len(pairs)
print(f"Found {n} image+ALTO pairs.")

if n < 2:
    raise SystemExit(f"‚ùå Not enough samples in {DATA_DIR}. Found {n}. Check your ZIP structure.")

# 90/10 split
cut = max(1, int(n * 0.9))
train_pairs, val_pairs = pairs[:cut], pairs[cut:]
write_list(train_pairs, TRAIN_LIST)
write_list(val_pairs,   VAL_LIST)
print(f"‚úÖ Wrote lists ‚Üí {TRAIN_LIST} ({len(train_pairs)}), {VAL_LIST} ({len(val_pairs)})")

print("\nSample train lines:")
print("\n".join(open(TRAIN_LIST, encoding="utf-8").read().splitlines()[:5]))


## 7) Auto‚Äëdetect attempt and choose base model (within Drive projects)

In [None]:

import os, re, glob
from pathlib import Path

def list_attempt_models(models_dir: str):
    return sorted(Path(models_dir).glob("attempt_*.mlmodel"))

def next_attempt_id(models_dir: str) -> int:
    attempts = list_attempt_models(models_dir)
    if not attempts:
        return 1
    nums = []
    for p in attempts:
        m = re.search(r"attempt_(\d+)\.mlmodel$", p.name)
        if m:
            nums.append(int(m.group(1)))
    return (max(nums) + 1) if nums else 1

def find_previous_attempt_model(models_dir: str, attempt_id: int) -> str or None:
    prev_id = attempt_id - 1
    if prev_id < 1:
        return None
    cand = Path(models_dir) / f"attempt_{prev_id:02d}.mlmodel"
    return str(cand) if cand.exists() else None

def newest_model_from_other_projects(projects_root: str, exclude_project: str) -> str or None:
    # Search all rec models under kraken_projects/*/models/rec/*.mlmodel
    pattern = str(Path(projects_root) / "*" / "models" / "rec" / "*.mlmodel")
    newest = None
    newest_mtime = -1
    for p in glob.glob(pattern):
        if f"/{exclude_project}/" in p or f"\\{exclude_project}\\" in p:
            continue
        try:
            mtime = os.path.getmtime(p)
            if mtime > newest_mtime:
                newest_mtime = mtime
                newest = p
        except Exception:
            pass
    return newest

ATTEMPT_ID = next_attempt_id(REC_MODELS)
OUT_MODEL = str(Path(REC_MODELS) / f"attempt_{ATTEMPT_ID:02d}.mlmodel")

if ATTEMPT_ID == 1:
    BASE_MODEL = newest_model_from_other_projects(PROJECTS_ROOT, PROJECT_ID)
    if BASE_MODEL:
        print(f"‚ÑπÔ∏è Attempt {ATTEMPT_ID:02d}: using cross‚Äëmanuscript base ‚Üí {BASE_MODEL}")
    else:
        print(f"‚ÑπÔ∏è Attempt {ATTEMPT_ID:02d}: starting from scratch.")
else:
    BASE_MODEL = find_previous_attempt_model(REC_MODELS, ATTEMPT_ID)
    if BASE_MODEL:
        print(f"‚ÑπÔ∏è Attempt {ATTEMPT_ID:02d}: resuming from previous attempt ‚Üí {BASE_MODEL}")
    else:
        print(f"‚ÑπÔ∏è Attempt {ATTEMPT_ID:02d}: previous attempt not found; starting from scratch.")

LR_FOR_LATER = 1e-4
AUTO_LR = LR_FOR_LATER if ATTEMPT_ID > 1 else None

# CPU batch heuristic
BATCH_SIZE = min(32, max(8, (os.cpu_count() or 2) * 2))

print(f"ATTEMPT_ID: {ATTEMPT_ID:02d}\nOUT_MODEL: {OUT_MODEL}\nBASE_MODEL: {BASE_MODEL}\nAUTO_LR: {AUTO_LR}\nBATCH_SIZE: {BATCH_SIZE}")


## 8) Train recognition model (in project folder)

In [None]:

import shlex, subprocess

cmd = [
    "ketos","train",
    "-o", OUT_MODEL,
    "--workers", str(int(CPU_THREADS)),
    "--device", DEVICE,
    "--batch-size", str(int(BATCH_SIZE)),
    "-f", "alto",
    TRAIN_LIST, VAL_LIST
]
if BASE_MODEL:
    cmd += ["--load", BASE_MODEL]
if AUTO_LR is not None:
    cmd += ["--lr", str(AUTO_LR)]

print("Running:", " ".join(shlex.quote(x) for x in cmd))
result = subprocess.run(cmd, text=True)

if result.returncode == 0:
    print(f"‚úÖ Training finished. Model at: {OUT_MODEL}")
else:
    raise SystemExit("‚ùå Training failed. Check logs above.")


## 9) Evaluate (CER/WER)

In [None]:

import shlex, subprocess

cmd = ["ketos", "test", "-f", "alto", "-m", OUT_MODEL, VAL_LIST]
print("Running:", " ".join(shlex.quote(x) for x in cmd))
res = subprocess.run(cmd, text=True)

if res.returncode == 0:
    print("‚úÖ Evaluation completed.")
else:
    raise SystemExit("‚ùå Evaluation failed.")



## 10) (Optional) Train a **Segmentation** model (same project)

If you have **segmentation ground truth** (e.g., PAGE‚ÄëXML/POLY) you can train a segmentation model and keep it under:
```
{SEG_MODELS}/attempt_XX.mlmodel
```

> This cell uses a generic command. Adjust file lists/format options to your segmentation data (e.g., `-f page`).


In [None]:

# Example (adjust to your GT format):
# - Prepare SEG_TRAIN_LIST / SEG_VAL_LIST pointing to images + PAGE/POLY files
# - Change "-f page" to your format if needed

from pathlib import Path
import shlex, subprocess

SEG_TRAIN_LIST = f"{LISTS_DIR}/seg_train.txt"
SEG_VAL_LIST   = f"{LISTS_DIR}/seg_val.txt"
SEG_ATTEMPT_ID = 1
SEG_OUT_MODEL  = f"{SEG_MODELS}/attempt_{SEG_ATTEMPT_ID:02d}.mlmodel"

if Path(SEG_TRAIN_LIST).exists() and Path(SEG_VAL_LIST).exists():
    cmd = [
        "ketos","segtrain",
        "-o", SEG_OUT_MODEL,
        "--workers", str(int(CPU_THREADS)),
        "--device", "cpu",              # change to "cuda" if GPU
        "--batch-size", "4",            # tune for RAM
        "-f", "page",                   # or "polygonal", etc.
        SEG_TRAIN_LIST, SEG_VAL_LIST
    ]
    print("Running:", " ".join(shlex.quote(x) for x in cmd))
    res = subprocess.run(cmd, text=True)
    if res.returncode == 0:
        print(f"‚úÖ Segmentation training finished. Model at: {SEG_OUT_MODEL}")
    else:
        print("‚ùå Segmentation training failed. Check logs above.")
else:
    print("‚ÑπÔ∏è No segmentation lists found. Create:", SEG_TRAIN_LIST, "and", SEG_VAL_LIST, "to enable this step.")



### Notes
- **Everything per manuscript** lives under `MyDrive/kraken_projects/<PROJECT_ID>/`.
- For attempt_01, we try to **warm‚Äëstart** from the newest recognition model in **other** projects.
- Later attempts automatically **resume** and **lower LR**.
- Keep Drive small: only a **pip cache** is shared across projects; models stay inside each project folder.
- Enable a **T4 GPU** in Colab and set `DEVICE="cuda"` for much faster training.
