<a href="https://colab.research.google.com/github/lcontrerasroa/macedonian/blob/main/notebooks/EAF2MAUS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [11]:
# @title Fetch both original and _new .eaf files from GitHub
import os
from pathlib import Path
import shutil

REPO_URL = "https://github.com/lcontrerasroa/macedonian.git"
REPO_DIR = Path("/content/macedonian")
EAF_NEW = Path("/content/eaf_new")
EAF_ORIG = Path("/content/eaf_orig")

# reset pour que tout soit limpiecito ;)
for d in [REPO_DIR, EAF_NEW, EAF_ORIG]:
    if d.exists():
        shutil.rmtree(d)

# clone du dépôt complet (profondeur minimale)
os.system(f"git clone --depth 1 {REPO_URL} {REPO_DIR}")

# création des dossiers cibles
EAF_NEW.mkdir(parents=True, exist_ok=True)
EAF_ORIG.mkdir(parents=True, exist_ok=True)

# copie des fichiers depuis le dépôt GitHub
os.system(f"cp {REPO_DIR}/data/eaf_new/*.eaf {EAF_NEW}/ 2>/dev/null || true")
os.system(f"cp {REPO_DIR}/data/eaf_orig/*.eaf {EAF_ORIG}/ 2>/dev/null || true")

# vérification
print("✅ Copied the following:")
os.system("ls -1 /content/eaf_new | head -n 5 && echo '... (eaf_new)'")
os.system("ls -1 /content/eaf_orig | head -n 5 && echo '... (eaf_orig)'")


✅ Copied the following:


0

In [12]:
# @title Inspect the internal tier structure of all EAF files (new + orig)
!pip -q install pympi-ling

from pathlib import Path
import pympi
import re

# === Paths ===
roots = [Path("/content/eaf_new"), Path("/content/eaf_orig")]
for r in roots:
    assert r.exists(), f"The folder {r} doesn't exist. Run the GitHub import first."

# === Function to summarize one EAF ===
def summarize_eaf(path):
    eaf = pympi.Elan.Eaf(str(path))
    tiers = eaf.get_tier_names()
    info = []
    for t in tiers:
        anns = eaf.get_annotation_data_for_tier(t)
        non_empty = [a for a in anns if a[2] and str(a[2]).strip()]
        info.append({
            "tier": t,
            "annotations": len(anns),
            "non_empty": len(non_empty)
        })
    return info

# === Loop over both directories ===
results = []
for base in roots:
    for eaf_file in sorted(base.glob("*.eaf")):
        try:
            tiers = summarize_eaf(eaf_file)
            results.append((f"{base.name}/{eaf_file.name}", tiers))
        except Exception as e:
            results.append((f"{base.name}/{eaf_file.name}", f"ERROR: {e}"))

# === Display ===
for name, tiers in results:
    print(f"\n=== {name} ===")
    if isinstance(tiers, str):
        print("  ", tiers)
        continue
    for t in tiers:
        tname = t["tier"]
        nn = t["non_empty"]
        tot = t["annotations"]
        pct = (nn / tot * 100) if tot else 0
        print(f"  - {tname:30s} {nn:4d}/{tot:<4d} ({pct:5.1f}%) non-empty")



=== eaf_new/Macedonian_decata_so_zlatna_kosa_new.eaf ===
  - ref@SP1                         257/257  (100.0%) non-empty
  - rp@SP1                          109/109  (100.0%) non-empty
  - comm                              2/2    (100.0%) non-empty
  - qt@SP1                           61/61   (100.0%) non-empty
  - ft@SP1                          257/257  (100.0%) non-empty
  - lit@SP1                           0/0    (  0.0%) non-empty
  - tx@SP1                          257/257  (100.0%) non-empty
  - not@SP1                           4/4    (100.0%) non-empty
  - typ@SP1                          67/67   (100.0%) non-empty
  - mot@SP1                        1014/1014 (100.0%) non-empty
  - wps@SP1                         998/1014 ( 98.4%) non-empty
  - mb@SP1                         1319/1319 (100.0%) non-empty
  - ge@SP1                         1319/1319 (100.0%) non-empty
  - ps@SP1                         1319/1319 (100.0%) non-empty
  - par@SP1                          71/71   (

In [13]:
# @title Extract Cyrillic TXT per speaker from tx_cyr@SPx (fallback to tx@SPx) — robust tuples
!pip -q install pympi-ling

import re
from pathlib import Path
import pympi
import shutil

SRC_DIR = Path("/content/eaf_orig")          # ← les .eaf ORIGINAUX
OUT_DIR = Path("/content/out_txt_cyr")       # txt cyrilliques

# Reset propre de la sortie
if OUT_DIR.exists():
    shutil.rmtree(OUT_DIR)
OUT_DIR.mkdir(parents=True, exist_ok=True)

tier_re_tx_cyr = re.compile(r"^tx_cyr@SP(\d+)$", re.I)
tier_re_tx     = re.compile(r"^tx@SP(\d+)$", re.I)

def get_speaker_tiers(eaf):
    names = eaf.get_tier_names()
    cyr = [(t, int(m.group(1))) for t in names if (m:=tier_re_tx_cyr.match(t))]
    lat = [(t, int(m.group(1))) for t in names if (m:=tier_re_tx.match(t))]
    # index par SP: préfère tx_cyr, sinon tx
    speakers = {}
    for t, sp in lat:
        speakers.setdefault(sp, (f"tx@SP{sp}", t))
    for t, sp in cyr:
        speakers[sp] = (f"tx_cyr@SP{sp}", t)
    chosen = []
    for sp, pair in sorted(speakers.items()):
        tag, tname = pair
        chosen.append((tname, sp, tag.startswith("tx_cyr")))
    return chosen

def _normalize_ann_tuple(item):
    """
    Accepte (start, end, value) ou (ann_id, start, end, value).
    Retourne (start, end, value) avec value en str.
    """
    if not isinstance(item, (list, tuple)):
        return None
    if len(item) == 3:
        s, e, v = item
    elif len(item) >= 4:
        s, e, v = item[-3], item[-2], item[-1]
    else:
        return None
    return s, e, "" if v is None else str(v)

def concat_intervals(eaf, tiername):
    raw = eaf.get_annotation_data_for_tier(tiername)
    norm = []
    for it in raw:
        tup = _normalize_ann_tuple(it)
        if tup is None:
            continue
        s, e, v = tup
        v = v.strip()
        if v:
            norm.append((s, e, v))
    # tri temporel quand possible (mais je pense que c'est pas nécessaire parce
    # que l'annotation est déja dans l'ordre temporel)
    def _key(x):
        s, e, _ = x
        try:
            return (int(s), int(e))
        except Exception:
            return (float("inf"), float("inf"))
    norm.sort(key=_key)
    return "".join(v + " " for _,__,v in norm).strip()

count_files = 0
for eaf_path in sorted(SRC_DIR.glob("*.eaf")):
    eaf = pympi.Elan.Eaf(str(eaf_path))
    for tiername, sp, is_cyr in get_speaker_tiers(eaf):
        txt = concat_intervals(eaf, tiername)
        if not txt:
            continue
        stem = eaf_path.stem
        # on n'exporte que le cyrillique si présent; sinon le latin prendra la place cyr (comme fallback)
        out = OUT_DIR / f"{stem}_SP{sp}_cyr.txt"
        out.write_text(txt + "\n", encoding="utf-8")
        count_files += 1
        print(f"Wrote {out.name} ({len(txt)} chars)")
print(f"\n✅ Done. Wrote {count_files} TXT file(s) to {OUT_DIR}")


Wrote Macedonian_decata_so_zlatna_kosa_SP1_cyr.txt (9508 chars)
Wrote Macedonian_duhot_od_grobot_SP1_cyr.txt (6075 chars)
Wrote Macedonian_dva_braka_SP1_cyr.txt (1703 chars)
Wrote Macedonian_dva_braka_SP2_cyr.txt (23 chars)
Wrote Macedonian_itar_pejo_SP1_cyr.txt (1079 chars)
Wrote Macedonian_itar_pejo_SP2_cyr.txt (47 chars)
Wrote Macedonian_kralevic_marko_SP1_cyr.txt (869 chars)
Wrote Macedonian_kralevic_marko_SP2_cyr.txt (28 chars)
Wrote Macedonian_kusa_SP1_cyr.txt (1595 chars)
Wrote Macedonian_kusa_SP2_cyr.txt (56 chars)
Wrote Macedonian_lisicata_i_dedoto_SP1_cyr.txt (5510 chars)
Wrote Macedonian_makata_SP1_cyr.txt (839 chars)
Wrote Macedonian_masha_SP1_cyr.txt (2120 chars)
Wrote Macedonian_masha_SP2_cyr.txt (104 chars)
Wrote Macedonian_mrzlivata_zena_SP1_cyr.txt (6269 chars)
Wrote Macedonian_mrzlivata_zena_SP2_cyr.txt (59 chars)
Wrote Macedonian_najdenko_SP1_cyr.txt (4415 chars)
Wrote Macedonian_narecnicite_SP1_cyr.txt (1299 chars)
Wrote Macedonian_ovcarot_SP1_cyr.txt (1913 chars)
W

In [14]:
# @title Inspect each EAF: tier overview with sample content
!pip -q install pympi-ling pandas

import pandas as pd
from pathlib import Path
import pympi

SRC_DIR = Path("/content/eaf_orig")  # adapte si besoin
OUT_DIR = Path("/content/eaf_tier_summary")
OUT_DIR.mkdir(exist_ok=True)

def sample_value(annotations):
    for a in annotations:
        if isinstance(a, (list, tuple)):
            val = a[-1] if a else ""
        else:
            val = a
        if isinstance(val, str) and val.strip():
            txt = val.strip().replace("\n", " ")
            return (txt[:80] + "…") if len(txt) > 80 else txt
    return ""

def summarize_eaf_tiers(eaf_path):
    eaf = pympi.Elan.Eaf(str(eaf_path))
    data = []
    for tier in eaf.get_tier_names():
        anns = eaf.get_annotation_data_for_tier(tier)
        non_empty = [a for a in anns if (isinstance(a, (list, tuple)) and len(a) > 0 and str(a[-1]).strip())]
        sample = sample_value(non_empty)
        data.append({
            "tier": tier,
            "annotations": len(anns),
            "non_empty": len(non_empty),
            "sample": sample
        })
    df = pd.DataFrame(data)
    df = df.sort_values("tier")
    return df

summaries = {}
for eaf_path in sorted(SRC_DIR.glob("*.eaf")):
    df = summarize_eaf_tiers(eaf_path)
    summaries[eaf_path.name] = df
    out_csv = OUT_DIR / f"{eaf_path.stem}_tiers.csv"
    df.to_csv(out_csv, index=False, encoding="utf-8")
    print(f"✅ {eaf_path.name}: {len(df)} tiers → {out_csv.name}")

print(f"\nAll CSV summaries saved in: {OUT_DIR}")


✅ Macedonian_decata_so_zlatna_kosa.eaf: 15 tiers → Macedonian_decata_so_zlatna_kosa_tiers.csv
✅ Macedonian_duhot_od_grobot.eaf: 15 tiers → Macedonian_duhot_od_grobot_tiers.csv
✅ Macedonian_dva_braka.eaf: 30 tiers → Macedonian_dva_braka_tiers.csv
✅ Macedonian_itar_pejo.eaf: 30 tiers → Macedonian_itar_pejo_tiers.csv
✅ Macedonian_kralevic_marko.eaf: 30 tiers → Macedonian_kralevic_marko_tiers.csv
✅ Macedonian_kusa.eaf: 30 tiers → Macedonian_kusa_tiers.csv
✅ Macedonian_lisicata_i_dedoto.eaf: 15 tiers → Macedonian_lisicata_i_dedoto_tiers.csv
✅ Macedonian_makata.eaf: 15 tiers → Macedonian_makata_tiers.csv
✅ Macedonian_masha.eaf: 30 tiers → Macedonian_masha_tiers.csv
✅ Macedonian_mrzlivata_zena.eaf: 30 tiers → Macedonian_mrzlivata_zena_tiers.csv
✅ Macedonian_najdenko.eaf: 15 tiers → Macedonian_najdenko_tiers.csv
✅ Macedonian_narecnicite.eaf: 15 tiers → Macedonian_narecnicite_tiers.csv
✅ Macedonian_ovcarot.eaf: 15 tiers → Macedonian_ovcarot_tiers.csv
✅ Macedonian_ovenot_i_kozata.eaf: 30 tiers →

In [15]:
# @title Inspect EAF tier hierarchy (works with tuple-structured tiers)
!pip -q install pympi-ling
from pathlib import Path
import pympi

path = Path("/content/eaf_orig/Macedonian_decata_so_zlatna_kosa.eaf")
eaf = pympi.Elan.Eaf(str(path))

print(f"File: {path.name}\n")
print(f"{'Tier':35s} {'Parent':20s} {'Children'}")
print("-" * 80)

# extraction prudente de PARENT_REF
def safe_parent(info):
    if isinstance(info, dict):
        return info.get("PARENT_REF")
    if isinstance(info, (list, tuple)):
        for item in info:
            if isinstance(item, dict) and "PARENT_REF" in item:
                return item["PARENT_REF"]
    return None

# construire map parent → enfants
child_map = {}
parent_map = {}
for t, info in eaf.tiers.items():
    parent = safe_parent(info)
    if parent:
        parent_map[t] = parent
        child_map.setdefault(parent, []).append(t)

# affichage hiérarchique
for t in eaf.get_tier_names():
    parent = parent_map.get(t, "-")
    children = ", ".join(child_map.get(t, [])) or "-"
    print(f"{t:35s} {parent:20s} {children}")


File: Macedonian_decata_so_zlatna_kosa.eaf

Tier                                Parent               Children
--------------------------------------------------------------------------------
ref@SP1                             -                    ft@SP1, lit@SP1, tx@SP1, not@SP1, tx_cyr@SP1
rp@SP1                              -                    typ@SP1
comm                                -                    -
qt@SP1                              -                    -
ft@SP1                              ref@SP1              -
lit@SP1                             ref@SP1              -
tx@SP1                              ref@SP1              mot@SP1
not@SP1                             ref@SP1              -
typ@SP1                             rp@SP1               -
mot@SP1                             tx@SP1               mb@SP1
mb@SP1                              mot@SP1              ge@SP1, ps@SP1
ge@SP1                              mb@SP1               par@SP1
ps@SP1                

In [16]:
# @title Debug tier names in one EAF
from pathlib import Path
import pympi

eaf_path = Path("/content/eaf_orig/Macedonian_decata_so_zlatna_kosa.eaf")
eaf = pympi.Elan.Eaf(str(eaf_path))
print("Tier names in this file:\n")
for t in eaf.get_tier_names():
    anns = eaf.get_annotation_data_for_tier(t)
    non_empty = [a for a in anns if isinstance(a, (list, tuple)) and str(a[-1]).strip()]
    sample = (non_empty[0][-1] if non_empty else "")[:60]
    print(f"{t:30s} | annots={len(anns):4d} | non_empty={len(non_empty):4d} | sample={sample}")


Tier names in this file:

ref@SP1                        | annots= 257 | non_empty= 257 | sample=Macedonian_decata_so_zlatna_kosa.001
rp@SP1                         | annots= 109 | non_empty= 109 | sample=DR Event
comm                           | annots=   2 | non_empty=   2 | sample=insubordinate hortative
qt@SP1                         | annots=  61 | non_empty=  61 | sample=DR Event + Discourse Report
ft@SP1                         | annots= 257 | non_empty= 257 | sample=Macedonian_decata_so_zlatna_kosa.001
lit@SP1                        | annots=   0 | non_empty=   0 | sample=
tx@SP1                         | annots= 257 | non_empty= 257 | sample=Macedonian_decata_so_zlatna_kosa.001
not@SP1                        | annots=   4 | non_empty=   4 | sample=Macedonian_decata_so_zlatna_kosa.038
typ@SP1                        | annots=  67 | non_empty=  67 | sample=Discourse Report
mot@SP1                        | annots=1014 | non_empty=1014 | sample=Macedonian_decata_so_zlatna_kosa.001


In [17]:
# @title Analyze single eaf tier content

from pathlib import Path
import xml.etree.ElementTree as ET

sample = Path("/content/eaf_orig/Macedonian_dva_braka.eaf")
print(f"Inspecting {sample.name}")

tree = ET.parse(sample)
root = tree.getroot()

# findall sans namespace
tiers = root.findall(".//TIER")

print(f"Found {len(tiers)} tiers\n")

for tier in tiers:
    tid = tier.attrib.get("TIER_ID", "")
    ling = tier.attrib.get("LINGUISTIC_TYPE_REF", "")
    vals = [v.text.strip() for v in tier.findall(".//ANNOTATION_VALUE") if v.text and v.text.strip()]
    preview = " | ".join(vals[:3])
    print(f"{tid:25s} ({ling:10s})  →  {len(vals)} vals  |  {preview}")


Inspecting Macedonian_dva_braka.eaf
Found 30 tiers

ref@SP1                   (ref       )  →  72 vals  |  Macedonian_dva_braka.01 | Macedonian_dva_braka.02 | Macedonian_dva_braka.04
rp@SP1                    (rp        )  →  62 vals  |  DR Event | Discourse Report | DR Event
comm                      (ref       )  →  6 vals  |  DR Event + DR + Quot + DR | the actual report doesn't come until later | vaka i vaka = so and so (pronoun for reported speech)
qt@SP1                    (qt        )  →  31 vals  |  DR Event + Discourse Report | Other | DR Event + Discourse Report
ref@SP2                   (ref       )  →  1 vals  |  Macedonian_dva_braka.03
rp@SP2                    (rp        )  →  0 vals  |  
qt@SP2                    (qt        )  →  0 vals  |  
comm@SP2                  (ref       )  →  0 vals  |  
ft@SP1                    (ft        )  →  71 vals  |  Shall I start? | Should I say again I am grandma Mare? | Yеs.
lit@SP1                   (lit       )  →  2 vals  |  When he

In [18]:
# @title Extract only tx@SPx and tx_cyr@SPx text (in order of appearance)
# This will extract each speaker in a separate tier
# which might be problematic for WEBMaus alignment.

from pathlib import Path
import xml.etree.ElementTree as ET
import re
import shutil

SRC_DIR = Path("/content/eaf_orig")  # dossier des fichiers originaux
OUT_DIR = Path("/content/out_txt_cyr")

# ⚠️ Vider le dossier de sortie avant d'écrire les nouveaux fichiers
if OUT_DIR.exists():
    shutil.rmtree(OUT_DIR)
OUT_DIR.mkdir(parents=True, exist_ok=True)

tier_pattern = re.compile(r"^(tx(_cyr)?@SP\d+)$", re.I)

written = 0

for eaf_file in sorted(SRC_DIR.glob("*.eaf")):
    root = ET.parse(eaf_file).getroot()
    for tier in root.findall(".//TIER"):
        tid = tier.attrib.get("TIER_ID", "")
        if not tier_pattern.match(tid):
            continue
        vals = [v.text.strip() for v in tier.findall(".//ANNOTATION_VALUE") if v.text and v.text.strip()]
        if not vals:
            continue
        txt = " ".join(vals)
        suffix = "cyr" if "tx_cyr" in tid.lower() else "lat"
        out = OUT_DIR / f"{eaf_file.stem}_{tid}_{suffix}.txt"
        out.write_text(txt + "\n", encoding="utf-8")
        written += 1
        print(f"Wrote {out.name} ({len(txt)} chars, {len(vals)} segments)")

print(f"\n✅ Done. Wrote {written} files in {OUT_DIR}")


Wrote Macedonian_decata_so_zlatna_kosa_tx@SP1_lat.txt (5189 chars, 257 segments)
Wrote Macedonian_decata_so_zlatna_kosa_tx_cyr@SP1_cyr.txt (5170 chars, 257 segments)
Wrote Macedonian_duhot_od_grobot_tx@SP1_lat.txt (5600 chars, 196 segments)
Wrote Macedonian_duhot_od_grobot_tx_cyr@SP1_cyr.txt (5573 chars, 196 segments)
Wrote Macedonian_dva_braka_tx@SP1_lat.txt (2027 chars, 71 segments)
Wrote Macedonian_dva_braka_tx@SP2_lat.txt (17 chars, 1 segments)
Wrote Macedonian_dva_braka_tx_cyr@SP1_cyr.txt (2026 chars, 71 segments)
Wrote Macedonian_dva_braka_tx_cyr@SP2_cyr.txt (17 chars, 1 segments)
Wrote Macedonian_itar_pejo_tx@SP1_lat.txt (1297 chars, 45 segments)
Wrote Macedonian_itar_pejo_tx@SP2_lat.txt (41 chars, 2 segments)
Wrote Macedonian_itar_pejo_tx_cyr@SP1_cyr.txt (1295 chars, 45 segments)
Wrote Macedonian_itar_pejo_tx_cyr@SP2_cyr.txt (41 chars, 2 segments)
Wrote Macedonian_kralevic_marko_tx@SP1_lat.txt (845 chars, 30 segments)
Wrote Macedonian_kralevic_marko_tx@SP2_lat.txt (23 chars, 1 

In [20]:
# @title Merge speakers into one transcript by time (tx_cyr/tx) — namespace-proof + SP1 priority on overlaps
from pathlib import Path
import xml.etree.ElementTree as ET
import re
import shutil
import math

SRC_DIR = Path("/content/eaf_orig")   # EAF originaux
OUT_DIR = Path("/content/out_txt_merged")

# reset sortie
if OUT_DIR.exists():
    shutil.rmtree(OUT_DIR)
OUT_DIR.mkdir(parents=True, exist_ok=True)

def iter_elems(root, suffix):
    """Yield all elements whose tag ends with given suffix (namespace-agnostic)."""
    for el in root.iter():
        if isinstance(el.tag, str) and el.tag.endswith(suffix):
            yield el

def text_of(el):
    return (el.text or "").strip()

def build_time_maps(root):
    # TIME_ORDER/TIME_SLOT
    ts = {}
    for ts_node in iter_elems(root, "TIME_SLOT"):
        tid = ts_node.attrib.get("TIME_SLOT_ID")
        val = ts_node.attrib.get("TIME_VALUE")
        if tid and val is not None:
            try:
                ts[tid] = int(val)
            except ValueError:
                pass
    # ALIGNABLE_ANNOTATION -> (start_ms, end_ms)
    align = {}
    for aln in iter_elems(root, "ALIGNABLE_ANNOTATION"):
        aid = aln.attrib.get("ANNOTATION_ID")
        r1  = aln.attrib.get("TIME_SLOT_REF1")
        r2  = aln.attrib.get("TIME_SLOT_REF2")
        if aid and r1 in ts and r2 in ts:
            align[aid] = (ts[r1], ts[r2])
    return ts, align

tier_id_pat = re.compile(r"^tx(_cyr)?@SP(\d+)$", re.I)

def collect_segments(root):
    """
    Retourne des dicts: {tier_id, sp, kind, ref_id, start, end, text, order}
    kind: 'cyr' pour tx_cyr, 'lat' pour tx
    """
    segs = []
    order_counter = 0
    for tier in iter_elems(root, "TIER"):
        tid = tier.attrib.get("TIER_ID", "")
        ltr = tier.attrib.get("LINGUISTIC_TYPE_REF", "")
        # Filtre strict sur l'ID (plus fiable que LINGUISTIC_TYPE_REF seul)
        m = tier_id_pat.match(tid)
        if not m and ltr not in ("tx", "tx_cyr"):
            continue

        # speaker
        sp = "SP?"
        if m:
            sp = f"SP{m.group(2)}"
        # type
        kind = "cyr" if ("tx_cyr" in tid.lower() or ltr == "tx_cyr") else "lat"

        # 1) REF_ANNOTATION
        for ref in iter_elems(tier, "REF_ANNOTATION"):
            ref_id = ref.attrib.get("ANNOTATION_REF")
            # valeur
            for val_el in list(ref):
                if isinstance(val_el.tag, str) and val_el.tag.endswith("ANNOTATION_VALUE"):
                    txt = text_of(val_el)
                    if txt:
                        segs.append({
                            "tier_id": tid, "sp": sp, "kind": kind,
                            "ref_id": ref_id, "start": None, "end": None,
                            "text": txt, "order": order_counter
                        })
                        order_counter += 1
                    break

        # 2) fallback: ALIGNABLE_ANNOTATION avec texte inline
        for aln in iter_elems(tier, "ALIGNABLE_ANNOTATION"):
            aid = aln.attrib.get("ANNOTATION_ID")
            for val_el in list(aln):
                if isinstance(val_el.tag, str) and val_el.tag.endswith("ANNOTATION_VALUE"):
                    txt = text_of(val_el)
                    if txt:
                        segs.append({
                            "tier_id": tid, "sp": sp, "kind": kind,
                            "ref_id": aid, "start": None, "end": None,
                            "text": txt, "order": order_counter
                        })
                        order_counter += 1
                    break
    return segs

def attach_times(segs, align_map):
    for s in segs:
        aid = s.get("ref_id")
        if aid in align_map:
            s["start"], s["end"] = align_map[aid]
    return segs

def sort_key(s):
    start = s["start"]
    return (math.inf if start is None else start, s["order"])

def select_with_overlap_priority(segs_sorted):
    """
    Sélectionne les segments fusionnés en privilégiant SP1 en cas de chevauchement temporel.
    Si start/end manquent, on prend le segment (pas de base pour l'exclure).
    """
    selected = []
    for s in segs_sorted:
        s0, s1, sp = s["start"], s["end"], s["sp"]
        if not selected:
            selected.append(s)
            continue
        last = selected[-1]
        l0, l1, lsp = last["start"], last["end"], last["sp"]

        # Si on n'a pas d'horodatage pour l'un des deux, on garde.
        if s0 is None or l1 is None:
            selected.append(s)
            continue

        # Chevauchement: s commence avant la fin du précédent
        if s0 < l1:
            # privilégier SP1
            if sp == "SP1" and lsp != "SP1":
                # Remplace le précédent si overlap
                selected[-1] = s
            elif sp != "SP1" and lsp == "SP1":
                # On jette s
                continue
            else:
                # Même priorité (les deux SP1 ou aucun SP1) → garder l'ordre
                selected.append(s)
        else:
            selected.append(s)
    return selected

written = 0
files_seen = 0

for eaf in sorted(SRC_DIR.glob("*.eaf")):
    files_seen += 1
    try:
        root = ET.parse(eaf).getroot()
    except Exception as ex:
        print(f"❌ Parse error {eaf.name}: {ex}")
        continue

    _, align = build_time_maps(root)
    segs = collect_segments(root)
    if not segs:
        # rien à fusionner
        continue

    segs = attach_times(segs, align)
    segs.sort(key=sort_key)
    segs_sel = select_with_overlap_priority(segs)

    # séparer cyr et lat pour les TXT "WebMAUS-safe"
    cyr = [s for s in segs_sel if s["kind"] == "cyr"]
    lat = [s for s in segs_sel if s["kind"] == "lat"]

    if cyr:
        txt_cyr = " ".join(s["text"] for s in cyr).strip()
        (OUT_DIR / f"{eaf.stem}_merged_cyr.txt").write_text(txt_cyr + "\n", encoding="utf-8")
        written += 1
    if lat:
        txt_lat = " ".join(s["text"] for s in lat).strip()
        (OUT_DIR / f"{eaf.stem}_merged_lat.txt").write_text(txt_lat + "\n", encoding="utf-8")
        written += 1

    # version lisible avec locuteur pour vérifier des overlaps dans la
    # conversation
    pretty = [f"[{s['sp']}] {s['text']}" for s in segs_sel]
    (OUT_DIR / f"{eaf.stem}_merged_with_spk.txt").write_text("\n".join(pretty) + "\n", encoding="utf-8")

    # TSV segments pour audit (los timestamps de cada intervención)
    lines = ["start_ms\tend_ms\tsp\tkind\ttext"]
    for s in segs:
        lines.append(f"{'' if s['start'] is None else s['start']}\t"
                     f"{'' if s['end'] is None else s['end']}\t"
                     f"{s['sp']}\t{s['kind']}\t{s['text'].replace(chr(9),' ')}")
    (OUT_DIR / f"{eaf.stem}_segments.tsv").write_text("\n".join(lines) + "\n", encoding="utf-8")

print(f"✅ Merged outputs written to: {OUT_DIR} — files written: {written}, EAF scanned: {files_seen}")


✅ Merged outputs written to: /content/out_txt_merged — files written: 52, EAF scanned: 26


In [21]:
# @title Convert Cyrillic TXT → Spanish-like proxy (mk→es)
import re, unicodedata
from pathlib import Path

IN_DIR  = Path("/content/out_txt_merged")
OUT_DIR = Path("/content/out_txt_proxy")
OUT_DIR.mkdir(parents=True, exist_ok=True)

# --- cyr → latin de base (macédonien) ---
MK_CYR_TO_LAT = {
    "Ќ":"ḱ","ќ":"ḱ","Ѓ":"ǵ","ѓ":"ǵ","Ж":"ž","ж":"ž","З":"z","з":"z","Ѕ":"dz","ѕ":"dz",
    "Ч":"č","ч":"č","Џ":"dž","џ":"dž","Ш":"š","ш":"š","Ј":"j","ј":"j","Љ":"lj","љ":"lj",
    "Њ":"nj","њ":"nj","А":"a","а":"a","Б":"b","б":"b","В":"v","в":"v","Г":"g","г":"g",
    "Д":"d","д":"d","Е":"e","е":"e","И":"i","и":"i","К":"k","к":"k","Л":"l","л":"l",
    "М":"m","м":"m","Н":"n","н":"n","О":"o","о":"o","П":"p","п":"p","Р":"r","р":"r",
    "С":"s","с":"s","Т":"t","т":"t","У":"u","у":"u","Ф":"f","ф":"f","Х":"h","х":"h"
}
def mk_cyr_to_basic_latin(text:str)->str:
    return "".join(MK_CYR_TO_LAT.get(ch, ch) for ch in text)

# --- règles proxy espagnol (avec z→s et dz→ds pour éviter des θ) ---
VOWELS = "aeiouáéíóúüAEIOUÁÉÍÓÚÜ"
def normalize_ws(t): return re.sub(r"\s+", " ", t).strip()
def strip_punct(t):
    t = re.sub(r"[„“”«»\"“”]", "", t)
    t = re.sub(r"[—–…]", " ", t)
    t = re.sub(r"[.,;:!?()\[\]{}/\\]", " ", t)
    return normalize_ws(t)

def apply_spanish_proxy_rules(text:str)->str:
    t = text
    # palatales
    t = t.replace("ḱ", "ky")
    t = t.replace("ǵ", "y")
    # affriquées et sibilantes
    t = t.replace("dž", "y")     # /dʒ/
    t = t.replace("dz", "ds")    # <- contrainte pour éviter des θ
    t = t.replace("č", "ch")
    t = t.replace("š", "s")
    t = t.replace("ž", "y")
    # lj, nj, j
    t = re.sub(r"\blj", "ll", t); t = t.replace("lj","ll")
    t = re.sub(r"\bnj", "ñ", t);  t = t.replace("nj","ñ")
    t = t.replace("j","y")
    # h (issu de х) → j dans environnements vocaliques
    t = re.sub(r"\b[hH](?=[%s])" % VOWELS, "j", t)
    t = re.sub(r"(?<=[%s])[hH](?=[%s])" % (VOWELS,VOWELS), "j", t)
    # k/g devant e,i → qu/gu
    t = re.sub(r"\bk(?=[eiéí])", "qu", t); t = re.sub(r"(?<=[^a-zA-Z])k(?=[eiéí])", "qu", t)
    t = re.sub(r"\bg(?=[eiéí])", "gu", t); t = re.sub(r"(?<=[^a-zA-Z])g(?=[eiéí])", "gu", t)
    # ailleurs, k → c (optionnel, plus “espagnol”)
    t = re.sub(r"\bk", "c", t);  t = re.sub(r"(?<=[^a-zA-Z])k", "c", t)
    return normalize_ws(t)

def mk_to_es_proxy(text:str)->str:
    t = strip_punct(text)
    t = mk_cyr_to_basic_latin(t)
    t = apply_spanish_proxy_rules(t)
    return t

written = 0
for f in sorted(IN_DIR.glob("*.txt")):
    raw = f.read_text(encoding="utf-8", errors="ignore")
    prox = mk_to_es_proxy(raw)
    out = OUT_DIR / f.name.replace("_cyr.txt", "_proxy_es.txt")
    out.write_text(prox + "\n", encoding="utf-8")
    written += 1
    print(f"Wrote {out.name} ({len(prox)} chars)")
print(f"\n✅ Done. Wrote {written} proxy TXT file(s) to {OUT_DIR}")


Wrote Macedonian_decata_so_zlatna_kosa_merged_proxy_es.txt (5200 chars)
Wrote Macedonian_decata_so_zlatna_kosa_merged_lat.txt (5200 chars)
Wrote Macedonian_decata_so_zlatna_kosa_merged_with_spk.txt (12457 chars)
Wrote Macedonian_duhot_od_grobot_merged_proxy_es.txt (5492 chars)
Wrote Macedonian_duhot_od_grobot_merged_lat.txt (5501 chars)
Wrote Macedonian_duhot_od_grobot_merged_with_spk.txt (12562 chars)
Wrote Macedonian_dva_braka_merged_proxy_es.txt (1979 chars)
Wrote Macedonian_dva_braka_merged_lat.txt (1996 chars)
Wrote Macedonian_dva_braka_merged_with_spk.txt (4548 chars)
Wrote Macedonian_itar_pejo_merged_proxy_es.txt (1305 chars)
Wrote Macedonian_itar_pejo_merged_lat.txt (1304 chars)
Wrote Macedonian_itar_pejo_merged_with_spk.txt (2986 chars)
Wrote Macedonian_kralevic_marko_merged_proxy_es.txt (831 chars)
Wrote Macedonian_kralevic_marko_merged_lat.txt (831 chars)
Wrote Macedonian_kralevic_marko_merged_with_spk.txt (1903 chars)
Wrote Macedonian_kusa_merged_proxy_es.txt (2485 chars)
W

In [24]:
# @title Build Spanish-proxy transcripts (with palatalization: ḱ→kiV, ǵ→guiV)
from pathlib import Path
import re
import shutil

SRC_MERGED = Path("/content/out_txt_merged")   # contient *_merged_lat.txt
OUT_PROXY  = Path("/content/out_proxy_esp")

# reset de sortie
if OUT_PROXY.exists():
    shutil.rmtree(OUT_PROXY)
OUT_PROXY.mkdir(parents=True, exist_ok=True)

# Règles globales (insensibles au contexte local)
# On traite d'abord les plus longs/rares
global_rules = [
    (r"dž", "ds"),
    (r"dz", "ds"),
    (r"č",  "ch"),
    (r"š",  "s"),
    (r"ž",  "y"),
    (r"lj", "ll"),
    (r"nj", "ñ"),
    (r"h",  "j"),   # [x] ≈ jota
    (r"c",  "s"),   # mac. c=ts → s (pas de /θ/)
    (r"j",  "y"),   # mac. j=/j/ → y
]

# Règles contextuelles pour palatalisées:
# ḱ + V → kiV ; ǵ + V → guiV
def pal_k(match):
    nxt = match.group(1)
    return "ki" + nxt

def pal_g(match):
    nxt = match.group(1)
    return "gui" + nxt

# Si ḱ/ǵ en fin de mot ou avant non-voyelle, on dégrade: ḱ→ki ; ǵ→gui
def pal_k_final(_):
    return "ki"

def pal_g_final(_):
    return "gui"

v = "aeiouáéíóúü"  # voyelles étendues, au cas où

def to_proxy_es(text: str) -> str:
    t = text.lower()

    # palatalisées d'abord (contexte sensible)
    t = re.sub(r"ḱ([" + v + "])", pal_k, t)
    t = re.sub(r"ǵ([" + v + "])", pal_g, t)
    t = re.sub(r"ḱ(?![" + v + "])", pal_k_final, t)
    t = re.sub(r"ǵ(?![" + v + "])", pal_g_final, t)

    # règles globales
    for pat, rep in global_rules:
        t = re.sub(pat, rep, t)

    # espaces propres
    t = re.sub(r"\s+", " ", t).strip()
    return t

written = 0
for f in sorted(SRC_MERGED.glob("*_merged_lat.txt")):
    stem = f.name.replace("_merged_lat.txt", "")
    raw = f.read_text(encoding="utf-8")
    prox = to_proxy_es(raw)
    out = OUT_PROXY / f"{stem}.txt"   # WebMAUS demande le même basename que l’audio
    out.write_text(prox + "\n", encoding="utf-8")
    written += 1
    print(f"Wrote {out.name} ({len(prox)} chars)")
print(f"\n✅ Spanish-proxy files written: {written} → {OUT_PROXY}")


Wrote Macedonian_decata_so_zlatna_kosa.txt (5262 chars)
Wrote Macedonian_duhot_od_grobot.txt (5713 chars)
Wrote Macedonian_dva_braka.txt (2078 chars)
Wrote Macedonian_itar_pejo.txt (1362 chars)
Wrote Macedonian_kralevic_marko.txt (858 chars)
Wrote Macedonian_kusa.txt (2610 chars)
Wrote Macedonian_lisicata_i_dedoto.txt (3519 chars)
Wrote Macedonian_makata.txt (944 chars)
Wrote Macedonian_masha.txt (2882 chars)
Wrote Macedonian_mrzlivata_zena.txt (5026 chars)
Wrote Macedonian_najdenko.txt (5299 chars)
Wrote Macedonian_narecnicite.txt (1790 chars)
Wrote Macedonian_ovcarot.txt (1965 chars)
Wrote Macedonian_ovenot_i_kozata.txt (1237 chars)
Wrote Macedonian_pepelaska.txt (2474 chars)
Wrote Macedonian_petle.txt (5855 chars)
Wrote Macedonian_praseto_i_zeladite.txt (3025 chars)
Wrote Macedonian_prdlivata_nevesta.txt (1923 chars)
Wrote Macedonian_prosti.txt (2157 chars)
Wrote Macedonian_role_i_rolejca.txt (5974 chars)
Wrote Macedonian_siljan_strkot.txt (3826 chars)
Wrote Macedonian_svadbata.txt 

In [26]:
# @title Extract referenced media filenames from EAF headers
from pathlib import Path
import xml.etree.ElementTree as ET
import pandas as pd

EAF_DIR = Path("/content/eaf_orig")

rows = []
for eaf_path in sorted(EAF_DIR.glob("*.eaf")):
    try:
        root = ET.parse(eaf_path).getroot()
        # cherche tous les MEDIA_DESCRIPTOR
        for md in root.findall(".//MEDIA_DESCRIPTOR"):
            media_url = md.attrib.get("MEDIA_URL", "")
            rel_media = media_url.split("/")[-1]  # nom de fichier seul
            mime = md.attrib.get("MIME_TYPE", "")
            rows.append({"EAF": eaf_path.name, "Media": rel_media, "Type": mime})
    except Exception as e:
        rows.append({"EAF": eaf_path.name, "Media": f"ERROR: {e}", "Type": ""})

df = pd.DataFrame(rows)
display(df)


Unnamed: 0,EAF,Media,Type
0,Macedonian_decata_so_zlatna_kosa.eaf,predacki.wav,audio/x-wav
1,Macedonian_duhot_od_grobot.eaf,Dduhotodgrobot.wav,audio/x-wav
2,Macedonian_dva_braka.eaf,dva_braka_v1_cut.mp4,video/mp4
3,Macedonian_itar_pejo.eaf,itarpejo.wav,audio/x-wav
4,Macedonian_kralevic_marko.eaf,kralevic_marko_v1.mp4,video/mp4
5,Macedonian_kusa.eaf,bravite_v1.mp4,video/mp4
6,Macedonian_lisicata_i_dedoto.eaf,lisicata_i_dedoto_v1.mp4,video/mp4
7,Macedonian_makata.eaf,makata.wav,audio/x-wav
8,Macedonian_masha.eaf,masha_v1.mp4,video/mp4
9,Macedonian_mrzlivata_zena.eaf,mrzlivata_zena_v1_cut.mp4,video/mp4


In [None]:
# @title WebMAUS Basic batch: audio + proxy txt -> TextGrid
!pip -q install requests lxml

import os, time, pathlib, requests
from lxml import etree

AUDIO_DIR = pathlib.Path("/content/media")           # mets tes .wav ou .mp4 ici
TXT_DIR   = pathlib.Path("/content/out_proxy_esp")   # nos proxies /proxy_esp/*.txt
OUT_DIR   = pathlib.Path("/content/macedonian/data/textgrid")
OUT_DIR.mkdir(parents=True, exist_ok=True)

ENDPOINT  = "https://clarin.phonetik.uni-muenchen.de/BASWebServices/services/runMAUSBasic"
LANGUAGE  = "spa"       # proxy espagnol
OUTFORMAT = "TextGrid"  # format de sortie
OUTSYMBOL = "ipa"       # ou "sampa" si tu préfères

AUDIO_EXTS = {".wav", ".nis", ".nist", ".sph", ".mp4", ".mpeg", ".mpg"}  # formats acceptés par le service

def run_webmaus(signal_path: pathlib.Path, text_path: pathlib.Path):
    files = {
        "SIGNAL": (signal_path.name, open(signal_path, "rb")),
        "TEXT":   (text_path.name,   open(text_path,   "rb")),
    }
    data = {
        "LANGUAGE": LANGUAGE,
        "OUTFORMAT": OUTFORMAT,
        "OUTSYMBOL": OUTSYMBOL,
    }
    try:
        r = requests.post(ENDPOINT, files=files, data=data, timeout=120)
    finally:
        for f in files.values():
            f[1].close()
    r.raise_for_status()
    # parse XML
    root = etree.fromstring(r.content)
    success = (root.findtext(".//success") or "").strip().lower() == "true"
    if not success:
        msg = (root.findtext(".//message") or root.findtext(".//output") or "unknown error").strip()
        raise RuntimeError(f"WebMAUS failed: {msg}")
    url = (root.findtext(".//downloadLink") or "").strip()
    if not url:
        raise RuntimeError("No downloadLink in response.")
    return url

def download_file(url: str, dest: pathlib.Path):
    with requests.get(url, stream=True, timeout=120) as r:
        r.raise_for_status()
        with open(dest, "wb") as f:
            for chunk in r.iter_content(chunk_size=65536):
                if chunk:
                    f.write(chunk)

processed, skipped = 0, 0
for audio in sorted(AUDIO_DIR.iterdir()):
    if audio.suffix.lower() not in AUDIO_EXTS or not audio.is_file():
        continue
    base = audio.stem
    txt = TXT_DIR / f"{base}.txt"
    if not txt.exists():
        print(f"⚠️  Pas de transcript proxy pour {audio.name}, on saute.")
        skipped += 1
        continue
    out_tg = OUT_DIR / f"{base}.TextGrid"
    if out_tg.exists():
        print(f"⏭️  Déjà présent: {out_tg.name}")
        continue
    try:
        print(f"▶️  WebMAUS: {audio.name} + {txt.name}")
        url = run_webmaus(audio, txt)
        download_file(url, out_tg)
        print(f"✅  {out_tg.name}")
        processed += 1
        time.sleep(1.0)  # politesse minimale
    except Exception as e:
        print(f"❌  {base}: {e}")

print(f"\nTerminé. {processed} TextGrid créés, {skipped} fichiers audio sans .txt correspondant.")
