# eScriptorium OCR → Canonical TEI Aligner (duplicate `xml:id` tolerant)

This version handles canonicals that contain **duplicate `xml:id`** values by:
- parsing with a safe fallback that doesn't error on duplicates, and
- **auto-disambiguating** repeated IDs by appending a deterministic suffix (e.g., `AV2`, `AV2__2`, `AV2__3`),
- preserving the original `xml:id` in `@data-origid`.

## 1) Setup

In [None]:
try:
    import lxml  # noqa
    LXML_OK = True
except Exception:
    LXML_OK = False
    try:
        import pip; pip.main(['install','lxml']); import lxml  # noqa
        LXML_OK = True
    except Exception:
        LXML_OK = False

import os, re, io, zipfile, itertools, shutil, json, csv, datetime, difflib
from pathlib import Path
from typing import List, Tuple, Optional, Dict
from xml.etree import ElementTree as ET

if LXML_OK:
    from lxml import etree as LET
    XMLLIB='lxml'
else:
    XMLLIB='etree'

print("XML library:", XMLLIB)
WORK=Path('/content/work_align'); WORK.mkdir(parents=True, exist_ok=True)
OUT=Path('/content/out'); OUT.mkdir(parents=True, exist_ok=True)

XML library: lxml


## 2) Upload files

In [3]:
from google.colab import files

print("Upload OCR export (ZIP of ALTO/PAGE or TXT/ZIP-of-TXTs):")
ocr_up = files.upload(); assert ocr_up, "No OCR uploaded."
ocr_name = list(ocr_up.keys())[0]; ocr_path = WORK/ocr_name
open(ocr_path, "wb").write(ocr_up[ocr_name])

print("\nUpload Canonical TEI/XML:")
can_up = files.upload(); assert can_up, "No canonical uploaded."
can_name = list(can_up.keys())[0]; can_path = WORK/can_name
open(can_path,"wb").write(can_up[can_name])

print("\n(Optional) substitution rules CSV:")
try: rules_up = files.upload()
except Exception: rules_up={}
rules_path=None
if rules_up:
    rname=list(rules_up.keys())[0]; rules_path=WORK/rname
    open(rules_path,"wb").write(rules_up[rname]); print("Rules:", rules_path.name)
else:
    print("No rules provided.")

print("\n(Optional) config JSON:")
try: cfg_up = files.upload()
except Exception: cfg_up={}
cfg={"min_block_ratio":0.40,"max_lines_per_block":12,"joiner":" ","lowercase":True,"strip_dots":True}
if cfg_up:
    cname=list(cfg_up.keys())[0]; cfg_path=WORK/cname
    open(cfg_path,"wb").write(cfg_up[cname])
    try:
        import json; cfg.update(json.loads(open(cfg_path,'r',encoding='utf-8').read()) or {})
        print("Loaded config.")
    except Exception as e:
        print("Config parse failed, using defaults.", e)

print("\nOCR:", ocr_path.name); print("Canonical:", can_path.name)

Upload OCR export (ZIP of ALTO/PAGE or TXT/ZIP-of-TXTs):


Saving export_doc6106_mirror_4000_alto_202601161836.zip to export_doc6106_mirror_4000_alto_202601161836.zip

Upload Canonical TEI/XML:


Saving finale FragardVidevdad_ids_synced.xml to finale FragardVidevdad_ids_synced.xml

(Optional) substitution rules CSV:


No rules provided.

(Optional) config JSON:



OCR: export_doc6106_mirror_4000_alto_202601161836.zip
Canonical: finale FragardVidevdad_ids_synced.xml


## 3) Parse OCR

In [5]:
def is_zip(p:Path)->bool:
    try:
        import zipfile
        zipfile.ZipFile(p,'r')
        return True
    except Exception:
        return False

def _is_real_ocr_xml(p: Path) -> bool:
    s = str(p)
    # macOS zip metadata
    if "/__MACOSX/" in s or "\\__MACOSX\\" in s:
        return False
    if p.name.startswith("._"):
        return False
    if p.suffix.lower() != ".xml":
        return False
    try:
        if p.stat().st_size == 0:
            return False
    except Exception:
        return False
    return True

def parse_alto(p:Path)->List[str]:
    tree=(LET.parse(str(p)) if LXML_OK else ET.parse(str(p))); root=tree.getroot()
    lines=[]
    for line in root.findall('.//{*}TextLine'):
        toks=[s.attrib.get('CONTENT') or s.attrib.get('content') for s in line.findall('.//{*}String')]
        toks=[t for t in toks if t]; txt=' '.join(toks).strip()
        if txt: lines.append(txt)
    return lines

def parse_page(p:Path)->List[str]:
    tree=(LET.parse(str(p)) if LXML_OK else ET.parse(str(p))); root=tree.getroot()
    lines=[]
    for reg in root.findall('.//{*}TextRegion'):
        for line in reg.findall('.//{*}TextLine'):
            uni=line.find('.//{*}TextEquiv/{*}Unicode')
            if uni is not None and (uni.text or '').strip():
                lines.append(uni.text.strip())
    if not lines:
        # Some PAGE exports store words at TextLine/Word level
        for line in root.findall('.//{*}TextLine'):
            words=[w.find('.//{*}TextEquiv/{*}Unicode') for w in line.findall('.//{*}Word')]
            words=[(w.text or '').strip() for w in words if w is not None and (w.text or '').strip()]
            if words: lines.append(' '.join(words))
    return lines

def parse_txt(p:Path)->List[str]:
    return [ln.rstrip('\n\r') for ln in open(p,'r',encoding='utf-8',errors='ignore') if ln.strip()]

unz=WORK/'ocr'; unz.mkdir(parents=True, exist_ok=True)
ocr_lines=[]; ocr_pages=[]

if ocr_path.suffix.lower()=='.txt':
    lines=parse_txt(ocr_path); ocr_pages.append(ocr_path.name)
    for ln in lines: ocr_lines.append((ocr_path.name, ln))
else:
    if is_zip(ocr_path):
        import zipfile
        zipfile.ZipFile(ocr_path,'r').extractall(unz)

        # Filter out macOS metadata + empty files
        xmls=sorted([p for p in unz.rglob('*.xml') if _is_real_ocr_xml(p)])
        txts=sorted([p for p in unz.rglob('*.txt') if "/__MACOSX/" not in str(p) and not Path(p).name.startswith("._")])

        if xmls:
            print("OCR XML set:", len(xmls), "files")
            fmt='ALTO' if any('alto' in (x.name.lower()+str(x.parent).lower()) for x in xmls) else 'PAGE'
            print("OCR XML format:", fmt)

            for xp in xmls:
                try:
                    ls = parse_alto(xp) if fmt=='ALTO' else parse_page(xp)
                except Exception as e:
                    print("Skipping XML:", xp.name, "|", type(e).__name__, "-", str(e)[:200])
                    continue

                if ls:
                    ocr_pages.append(xp.name)
                    for ln in ls:
                        ocr_lines.append((xp.name, ln))

        elif txts:
            print("OCR TXT set:", len(txts), "files")
            for tp in txts:
                ls=parse_txt(tp)
                if ls:
                    ocr_pages.append(tp.name)
                    for ln in ls:
                        ocr_lines.append((tp.name, ln))
        else:
            raise SystemExit("OCR ZIP had neither .xml nor .txt")
    else:
        raise SystemExit("Unsupported OCR input.")

print("OCR lines:", len(ocr_lines))


OCR XML set: 8 files
OCR XML format: ALTO
OCR lines: 146


## 4) Parse Canonical TEI/XML (allow duplicate ids)

In [6]:
def q(attr:str)->str: return '{http://www.w3.org/XML/1998/namespace}'+attr

def read_xml_dup_tolerant(p:Path):
    if LXML_OK:
        try:
            # try strict first
            return LET.parse(str(p)).getroot(), 'lxml-strict'
        except Exception as e:
            print("[warn] lxml strict parse failed:", e)
            try:
                # try recover mode
                parser = LET.XMLParser(recover=True, dtd_validation=False, load_dtd=False, no_network=True, huge_tree=True)
                return LET.parse(str(p), parser=parser).getroot(), 'lxml-recover'
            except Exception as e2:
                print("[warn] lxml recover parse failed:", e2)
    # fallback: ElementTree (doesn't enforce ID uniqueness)
    return ET.parse(str(p)).getroot(), 'etree'

def extract_blocks_and_fix_dups(root) -> List[tuple]:
    """
    Return a list of (unique_block_id, block_text, original_block_id).
    - unique_block_id: de-duplicated id (e.g., AV2, AV2__2, AV2__3)
    - block_text: text content of the block
    - original_block_id: the canonical's original xml:id value
    """
    prio = {'ab', 'seg', 'l', 'p'}
    blocks = []
    seen: Dict[str, int] = {}

    def text_of(el):
        return ''.join(el.itertext())

    def get_xml_id(el):
        # Prefer the xml namespace id
        if q('id') in el.attrib:
            return el.attrib[q('id')]
        # Fallbacks in case the doc used plain xml:id or id
        return el.attrib.get('xml:id') or el.attrib.get('id')

    for el in root.iter():
        tag = re.sub(r'^\{.*\}', '', el.tag).lower()
        orig = get_xml_id(el)
        if not orig:
            continue
        # Allocate a unique id without mutating the element
        n = seen.get(orig, 0) + 1
        seen[orig] = n
        unique = orig if n == 1 else f"{orig}__{n}"

        if tag in prio:
            txt = (text_of(el) or '').strip()
            if txt:
                blocks.append((unique, txt, orig))

    if not blocks:
        # Fallback: any xml:id-bearing element
        for el in root.iter():
            orig = get_xml_id(el)
            if not orig:
                continue
            n = seen.get(orig, 0) + 1
            seen[orig] = n
            unique = orig if n == 1 else f"{orig}__{n}"
            txt = (''.join(el.itertext()) if hasattr(el, 'itertext') else (el.text or '')).strip()
            if txt:
                blocks.append((unique, txt, orig))

    return blocks

can_root, parser_mode = read_xml_dup_tolerant(can_path)
canonical_blocks = extract_blocks_and_fix_dups(can_root)
print("Canonical blocks:", len(canonical_blocks), "| parser:", parser_mode)

[warn] lxml strict parse failed: ID VS7.0 already defined, line 1518, column 56 (finale FragardVidevdad_ids_synced.xml, line 1518)
Canonical blocks: 3126 | parser: lxml-recover


## 5) Normalization and alignment

In [7]:
import csv, re, difflib, datetime

PUNCT_RE = re.compile(r"[·\.\,\:\;\!\?\(\)\[\]\{\}“”\"'`´^~=…•►♦◊·•]+")
SUB_RULES = []
if 'rules_path' in globals() and rules_path:
    try:
        with open(rules_path, 'r', encoding='utf-8', errors='ignore') as f:
            rr = csv.reader(f)
            for row in rr:
                if row and not str(row[0]).startswith('#') and len(row) >= 2:
                    SUB_RULES.append((row[0], row[1]))
    except Exception as e:
        print("Failed loading rules:", e)

def normalize(s: str) -> str:
    if cfg.get('lowercase', True):
        s = s.lower()
    if cfg.get('strip_dots', True):
        s = PUNCT_RE.sub(' ', s)
        s = re.sub(r"\s+", " ", s).strip()
    for src, tgt in SUB_RULES:
        try:
            s = re.sub(src, tgt, s)
        except re.error:
            s = s.replace(src, tgt)
    return s

# OCR: [(page_name, raw_line, norm_line)]
norm_ocr = [(pg, raw, normalize(raw)) for (pg, raw) in ocr_lines]

# Canonical blocks now come as triples: (unique_id, block_text, original_id)
norm_blocks = [(uniq, normalize(txt)) for (uniq, txt, orig) in canonical_blocks]

# Optional: map unique -> original canonical id (for emitter metadata)
orig_of = {uniq: orig for (uniq, txt, orig) in canonical_blocks}

def ratio(a: str, b: str) -> float:
    return difflib.SequenceMatcher(None, a, b).ratio()

def assign_lines_to_blocks(norm_ocr, norm_blocks, cfg):
    min_ratio = float(cfg.get('min_block_ratio', 0.40))
    max_k = int(cfg.get('max_lines_per_block', 12))
    joiner = cfg.get('joiner', ' ')
    i = 0
    N = len(norm_ocr)
    assigns = []
    for (bid, btxt) in norm_blocks:
        best_score = -1.0
        best_span = (i, i)
        for k in range(1, max_k + 1):
            j = min(N, i + k)
            if j <= i:
                break
            seg = joiner.join([norm_ocr[t][2] for t in range(i, j)]).strip()
            if not seg:
                continue
            sc = ratio(seg, btxt)
            if sc > best_score:
                best_score = sc
                best_span = (i, j)
            if sc > 0.985:
                break
        sc, (si, sj) = best_score, best_span
        if sc >= min_ratio and sj > si:
            lines = [(norm_ocr[t][0], norm_ocr[t][1], norm_ocr[t][2]) for t in range(si, sj)]
            assigns.append((bid, lines, sc))
            i = sj
        else:
            assigns.append((bid, [], 0.0))
    leftovers = [] if i >= N else [(norm_ocr[t][0], norm_ocr[t][1], norm_ocr[t][2]) for t in range(i, N)]
    return assigns, leftovers

assignments, leftovers = assign_lines_to_blocks(norm_ocr, norm_blocks, cfg)
print("Assigned blocks with lines:", sum(1 for b, l, s in assignments if l))
print("Leftover OCR lines:", len(leftovers))

Assigned blocks with lines: 82
Leftover OCR lines: 0


## 6) Emit TEI and report

In [8]:
def xesc(s:str)->str:
    return (s.replace("&","&amp;").replace("<","&lt;").replace(">","&gt;")
            .replace('"',"&quot;").replace("'","&apos;"))

def write_output(assignments, leftovers, out_name="aligned_tei.xml"):
    now=datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
    header=(
        "<teiHeader>\n"
        "  <fileDesc>\n"
        "    <titleStmt><title>OCR aligned to canonical</title></titleStmt>\n"
        f"    <publicationStmt><p>Generated {now} (UTC).</p></publicationStmt>\n"
        "    <sourceDesc><p>eScriptorium OCR + canonical TEI.</p></sourceDesc>\n"
        "  </fileDesc>\n"
        "</teiHeader>"
    )
    body=["<div type=\"work\">"]
    for (bid, blines, sc) in assignments:
        body.append(f'  <div type="block" xml:id="{xesc(bid)}">')
        if blines:
            for idx,(pg,raw,_norm) in enumerate(blines, start=1):
                lid=f"{bid}.l{idx:04d}"
                body.append(f'    <ab xml:id="{xesc(lid)}" facs="{xesc(pg)}"><l>{xesc(raw)}</l></ab>')
        else:
            body.append('    <note type="alignment">no-ocr-assignment</note>')
        body.append("  </div>")
    if leftovers:
        body.append('  <div type="unaligned">')
        for idx,(pg,raw,_n) in enumerate(leftovers, start=1):
            body.append(f'    <ab xml:id="unaligned.l{idx:04d}" facs="{xesc(pg)}"><l>{xesc(raw)}</l></ab>')
        body.append('  </div>')
    body.append("</div>")
    tei = '<?xml version="1.0" encoding="UTF-8"?>\n<TEI xmlns="http://www.tei-c.org/ns/1.0">\n' + header + "\n<text>\n  <body>\n" + "\n".join(body) + "\n  </body>\n</text>\n</TEI>\n"
    out=OUT/out_name
    open(out,"w",encoding="utf-8").write(tei)
    return out

xml_out = write_output(assignments, leftovers, "aligned_tei.xml")
rep = OUT/"alignment_report.csv"
with open(rep,"w",encoding="utf-8",newline="") as f:
    w=csv.writer(f); w.writerow(["block_id","line_index","page_name","ocr_line"])
    for (bid, blines, sc) in assignments:
        for i,(pg,raw,_n) in enumerate(blines, start=1):
            w.writerow([bid,i,pg,raw])

print("Wrote:", OUT/"aligned_tei.xml")
print("Report:", OUT/"alignment_report.csv")

Wrote: /content/out/aligned_tei.xml
Report: /content/out/alignment_report.csv


  now=datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')


## 7) Download outputs

In [9]:
from google.colab import files
files.download('/content/out/aligned_tei.xml')
files.download('/content/out/alignment_report.csv')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>