# CLDR to OpenFST

> "ChatGPT generated code; only basic part works"

- categories: [cldr, openfst, chatgpt]
- branch: master
- hidden: true
- badges: true

In [16]:
!pip install pynini



In [None]:
from __future__ import annotations
import re
from dataclasses import dataclass
from typing import Dict, List, Tuple

import pynini
from pynini import *
from pynini.lib import pynutil
from pynini.lib import utf8


_U_HEX = re.compile(r"\\u([0-9A-Fa-f]{4})")
_U_HEX_LONG = re.compile(r"\\U([0-9A-Fa-f]{6,8})")

def _decode_escapes(s: str) -> str:
    """Decodes CLDR-style escapes like \\u0259 and \\U0001F600."""
    def rpl4(m):
        return chr(int(m.group(1), 16))
    def rpl8(m):
        return chr(int(m.group(1), 16))
    s = _U_HEX.sub(rpl4, s)
    s = _U_HEX_LONG.sub(rpl8, s)
    return s

def _expand_char_class(cls: str) -> List[str]:
    """
    Expand a simple bracket class like [abc] or [a-z].
    This is intentionally minimal; it does not handle nested classes, properties, or set ops.
    """
    out = []
    i = 0
    while i < len(cls):
        if i + 2 < len(cls) and cls[i+1] == "-":
            start = ord(cls[i])
            end = ord(cls[i+2])
            for cp in range(start, end + 1):
                out.append(chr(cp))
            i += 3
        else:
            out.append(cls[i])
            i += 1
    return out

def _tokenize_pattern(pat: str) -> List[str]:
    """
    Tokenize a very small subset of CLDR pattern syntax:
    - Literal characters
    - Character classes [..]
    Returns a list of alternatives (strings) if the pattern is a single char-class;
    otherwise returns [pat] (treated as a literal string).
    """
    pat = pat.strip()
    if len(pat) >= 2 and pat[0] == "[" and pat[-1] == "]":
        inner = _decode_escapes(pat[1:-1])
        alts = _expand_char_class(inner)
        return alts
    return [_decode_escapes(pat)]

def _string_map(pairs: List[Tuple[str, str]]) -> Fst:
    """
    Build a union of literal string transductions, determinize & minimize.
    """
    t = pynini.string_map(pairs)
    t.optimize()
    return t


_RULE_RE = re.compile(r"""
    ^\s*
    (?P<lhs>.+?)
    \s*
    (?P<op><>|>|<)
    \s*
    (?P<rhs>.+?)
    ;
    \s*$
""", re.VERBOSE)

@dataclass
class Rule:
    lhs: str
    rhs: str
    op: str

def parse_cldr_rules_simple(text: str) -> List[Rule]:
    """
    Parse a subset of CLDR rule lines:
      LHS > RHS ;
      LHS < RHS ;
      LHS <> RHS ;
    Strips comments (# ...) and blank lines.
    Ignores directives (:: ... ;)
    """
    rules: List[Rule] = []
    for raw in text.splitlines():
        line = raw.strip()
        if not line or line.startswith("#"):
            continue
        if line.startswith("::"):
            # ignore directives in this simple converter
            continue
        m = _RULE_RE.match(line)
        if not m:
            # Not supported yet; skip quietly so you can still test quickly.
            continue
        lhs = m.group("lhs").strip()
        rhs = m.group("rhs").strip()
        op = m.group("op")
        rules.append(Rule(lhs=lhs, rhs=rhs, op=op))
    return rules


def build_transducer_from_rules(rules: List[Rule]) -> Fst:
    """
    Build a transducer from simple (context-free) CLDR rules.
    Uses cdrewrite with utf8.VALID_UTF8.closure() so non-matching chars pass through.
    """
    pairs: List[Tuple[str, str]] = []

    for r in rules:
        lhs_alts = _tokenize_pattern(r.lhs)
        rhs_alts = _tokenize_pattern(r.rhs)

        def add_pairs(A: List[str], B: List[str]):
            if len(A) == len(B):
                for x, y in zip(A, B):
                    pairs.append((_decode_escapes(x), _decode_escapes(y)))
            else:
                for x in A:
                    for y in B:
                        pairs.append((_decode_escapes(x), _decode_escapes(y)))

        if r.op == ">":
            add_pairs(lhs_alts, rhs_alts)
        elif r.op == "<":
            add_pairs(rhs_alts, lhs_alts)
        elif r.op == "<>":
            add_pairs(lhs_alts, rhs_alts)
            add_pairs(rhs_alts, lhs_alts)

    sigma_star = utf8.VALID_UTF8_CHAR.closure()

    if not pairs:
        # Identity FST if no rules
        return pynini.cdrewrite(pynini.cross("", ""), "", "", sigma_star)

    # Sequentially compose cdrewrite rules
    t = None
    for src, tgt in pairs:
        rule = pynini.cdrewrite(
            pynini.cross(src, tgt),
            "", "", sigma_star
        )
        t = rule if t is None else (t @ rule)

    t.optimize()
    return t


def prefix_language_token(lang_token: str) -> Fst:
    """
    Create an acceptor for a language prefix token like '<eng>'.
    We allow it as literal at the beginning and then delete it from output:
      '<eng>' x ...  -> ... (so the prefix doesn't appear in the output)
    """
    # Literal token
    token_acceptor = pynini.accep(lang_token)
    # We want to delete it from output: cross(lang_token, "").
    return pynini.cross(lang_token, "")


def build_multilingual_transducer(
    lang_to_rules_text: Dict[str, str],
    token_fmt: str = "<{lang}>",
) -> Fst:
    """
    For each language:
      - parse rules
      - build transducer
      - prepend a required language token (deleted on output)
    Then union all.
    """
    unified = None
    for lang, text in lang_to_rules_text.items():
        rules = parse_cldr_rules_simple(text)
        t = build_transducer_from_rules(rules)
        pref = prefix_language_token(token_fmt.format(lang=lang))
        lang_t = pref + t  # concatenation; prefix must come first
        lang_t.optimize()
        unified = lang_t if unified is None else (unified | lang_t)

    if unified is None:
        # No rules: accept anything and echo it (after removing an imaginary token)
        unified = pynini.transducer("", "")
    unified.optimize()
    # Determinize/minimize for speed
    unified = pynini.determinize(unified).minimize()
    return unified


def save_fst(fst: Fst, path: str) -> None:
    fst.write(path)


def demo():
    cldr_en = r"""
        # English-ish toy rules
        th > θ ;
        sh > ʃ ;
        ch > t͡ʃ ;
        ng > ŋ ;
        [aeiou] > a ;   # silly vowel collapse to 'a'
    """

    cldr_es = r"""
        # Spanish-ish toy rules
        ll > ʎ ;
        ñ > ɲ ;
        qu > k ;
        c > k ;
        z > s ;
        [aeiou] > a ;
    """

    lang_rules = {
        "eng": cldr_en,
        "spa": cldr_es,
    }

    fst = build_multilingual_transducer(lang_rules, token_fmt="<{lang}>")
    save_fst(fst, "multilang.fst")

    test_inputs = [
        "<eng>thing",
        "<eng>mashing",
        "<spa>llama",
        "<spa>quiza",
    ]

    def apply(input_str: str) -> str:
        lattice = pynini.compose(input_str, fst)
        if lattice.start() == pynini.NO_STATE_ID:
            return "<no-path>"
        try:
            return pynini.shortestpath(lattice, 1).string()
        except Exception:
            return "<no-output>"

    for s in test_inputs:
        print(s, "->", apply(s))

    print("Wrote combined FST to multilang.fst")

In [32]:
demo()

<eng>thing -> θiŋ
<eng>mashing -> maʃiŋ
<spa>llama -> ʎama
<spa>quiza -> kasa
Wrote combined FST to multilang.fst


In [95]:
from __future__ import annotations
import re
from dataclasses import dataclass
from typing import Dict, List, Tuple, Optional
import sys
import io

import pynini
from pynini import Fst
from pynini.lib import utf8

_U_HEX = re.compile(r"\\u([0-9A-Fa-f]{4})")
_U_HEX_LONG = re.compile(r"\\U([0-9A-Fa-f]{6,8})")
_ESCAPED = re.compile(r"\\([][\\/\-^(){}_.*+?|])")

def _decode_escapes(s: str) -> str:
    """Decodes CLDR-style escapes."""
    def rpl4(m): return chr(int(m.group(1), 16))
    def rpl8(m): return chr(int(m.group(1), 16))
    s = _U_HEX.sub(rpl4, s)
    s = _U_HEX_LONG.sub(rpl8, s)
    s = _ESCAPED.sub(lambda m: m.group(1), s)
    return s

def _char_range(a: str, b: str) -> List[str]:
    return [chr(cp) for cp in range(ord(a), ord(b) + 1)]

def _parse_unicode_set(text: str) -> Tuple[bool, List[str]]:
    """Simple parser for CLDR-style character sets like [a-z] or [^abc]."""
    assert text.startswith("[") and text.endswith("]"), "not a set"
    inner = text[1:-1]
    neg = inner.startswith("^")
    if neg: inner = inner[1:]
    items: List[str] = []
    i = 0
    def read_char(ix: int) -> Tuple[str, int]:
        if ix < len(inner) and inner[ix] == "\\":
            m4 = _U_HEX.match(inner, ix)
            if m4: return (chr(int(m4.group(1), 16)), m4.end())
            m8 = _U_HEX_LONG.match(inner, ix)
            if m8: return (chr(int(m8.group(1), 16)), m8.end())
            if ix + 1 < len(inner): return (inner[ix + 1], ix + 2)
            return ("\\", ix + 1)
        return (inner[ix], ix + 1)
    while i < len(inner):
        c1, j = read_char(i)
        if j < len(inner) - 1 and inner[j] == "-" and j + 1 < len(inner):
            c2, k = read_char(j + 1)
            items.extend(_char_range(c1, c2))
            i = k
        else:
            items.append(c1)
            i = j
    return (neg, items)

def _acceptor(s: str) -> Fst:
    return pynini.accep(s, token_type="utf8")


@dataclass
class Rule:
    # CLDR syntax: May contain embedded context markers like '}' or '{'
    raw_lhs: str
    rhs: str
    op: str

    # Pynini syntax: Explicit L and R contexts (parsed from raw_lhs/rhs if markers are found)
    lhs: str = ""   # X (The target string being rewritten)
    left: str = ""  # L (The preceding context)
    right: str = "" # R (The following context)


@dataclass
class Directive:
    kind: str
    payload: Optional[str] = None

@dataclass
class VarAssign:
    name: str
    expr: str

Line = Tuple[str, object]

# CLDR rule format: [CONTEXT] TARGET [CONTEXT] OP RESULT ;
# E.g.: [ei] } c > s; (Rewrite c to s if preceded by [ei])
_CLDR_RULE_RE = re.compile(r"""
    ^\s*
    (?P<raw_lhs>.+?) # Everything before the operator
    \s*
    (?P<op><>|>|<)
    \s*
    (?P<rhs>.+?) # Everything after the operator until the semicolon
    \s*;
    \s*$
""", re.VERBOSE)

_VAR_RE = re.compile(r"""
    ^\s*
    (?P<name>\$[A-Za-z0-9_]+)
    \s*=\s*
    (?P<expr>.+?)
    \s*;?\s*$
""", re.VERBOSE)

def _extract_cldr_context_parts(raw_lhs: str) -> Tuple[str, str, str]:
    """
    Transforms a raw CLDR LHS string (which may contain '}' or '{' markers)
    into the three Pynini rewrite components: L (Left Context), X (LHS Target), R (Right Context).

    CLDR conventions:
    1. L } X: Lookbehind: L is context, X is target. R is empty.
    2. X { R: Lookahead: R is context, X is target. L is empty.
    3. L / X _ R: Full Context (ignored for this version, assumes L}X or X{R).

    Returns: (L_ctx_str, X_target_str, R_ctx_str)
    """
    s = raw_lhs.strip()

    if s.count('}') > 1 or s.count('{') > 1 or (s.count('}') == 1 and s.count('{') == 1):
        raise ValueError(f"Ambiguous context markers in CLDR rule LHS: {raw_lhs}")

    if '}' in s:
        # Case 1: L } X (Lookbehind)
        parts = s.split('}', 1)
        L_ctx = parts[0].strip()
        X_target = parts[1].strip()
        return (L_ctx, X_target, "")

    if '{' in s:
        # Case 2: X { R (Lookahead)
        parts = s.split('{', 1)
        X_target = parts[0].strip()
        R_ctx = parts[1].strip()
        return ("", X_target, R_ctx)

    # Case 3: Simple X > Y rule (no context markers)
    return ("", s, "")


def _parse_line(line: str) -> Optional[Line]:
    s = line.strip()
    if not s or s.startswith("#"):
        return None

    mvar = _VAR_RE.match(s)
    if mvar and mvar.group("expr").strip():
        name = mvar.group("name")
        expr = mvar.group("expr").strip().rstrip(';')
        return ("var", VarAssign(name=name, expr=expr))

    if s.startswith("::"):
        body = s[2:].strip()
        if body.lower() == "null;": return ("dir", Directive(kind="null"))
        if body.lower() == "nfd;": return ("dir", Directive(kind="nfd"))
        if body.lower() == "nfc;": return ("dir", Directive(kind="nfc"))
        if body.startswith("[") and body.endswith(";"):
            payload = body[:-1].strip()
            return ("dir", Directive(kind="filter", payload=payload))
        return ("dir", Directive(kind="unknown", payload=body))

    m = _CLDR_RULE_RE.match(s)
    if m:
        raw_lhs = m.group("raw_lhs").strip()
        rhs_core = m.group("rhs").strip()
        op = m.group("op")

        try:
            # New logic: Parse L, X, R from the raw LHS string
            L_ctx_str, X_target_str, R_ctx_str = _extract_cldr_context_parts(raw_lhs)
        except ValueError as e:
            print(f"Error parsing CLDR rule '{s}': {e}", file=sys.stderr)
            return None

        # Populate the Rule object with the Pynini components
        return ("rule", Rule(
            raw_lhs=raw_lhs,
            rhs=rhs_core,
            op=op,
            lhs=X_target_str,
            left=L_ctx_str,
            right=R_ctx_str
        ))
    return None

def parse_cldr(text: str) -> List[Line]:
    out: List[Line] = []
    for raw in text.splitlines():
        p = _parse_line(raw)
        if p: out.append(p)
    return out

class Env:
    def __init__(self) -> None:
        self.vars: Dict[str, Fst] = {}
        self.sigma: Fst = utf8.VALID_UTF8_CHAR.optimize()
        self.sigma_star: Fst = self.sigma.closure().optimize()
        I_char = pynini.cross(self.sigma, self.sigma)
        self.I_sigma_star: Fst = I_char.closure().optimize()

    def get(self, name: str) -> Fst:
        if name not in self.vars:
            raise KeyError(f"Undefined variable {name}")
        return self.vars[name]

def _compile_atom(expr: str, env: Env) -> Fst:
    expr = expr.strip()
    if expr.startswith("$"):
        return env.get(expr)

    # CLDR Boundary Support: Treat ^ (start) and $ (end) as empty string acceptors
    if expr == "^" or expr == "$":
        return _acceptor("")

    if expr.startswith("[") and expr.endswith("]"):
        neg, items = _parse_unicode_set(expr)
        if not items:
            return env.sigma if neg else pynini.Fst()
        u = pynini.union(*(_acceptor(ch) for ch in items)).optimize()
        if neg:
            return (env.sigma - u).optimize()
        return u

    return _acceptor(_decode_escapes(expr))

def _compile_seq(expr: Optional[str], env: Env) -> Fst:
    """Compiles a sequence of atoms (literals, sets, variables, boundaries) into an Fst acceptor."""
    if not expr: return _acceptor("")
    s = expr
    parts: List[str] = []
    i = 0
    cur = []
    depth = 0

    # Simple tokenizer logic
    while i < len(s):
        ch = s[i]
        is_escape_char = (ch == '\\') and (i + 1 < len(s) and s[i+1].isalpha())

        # Token separation logic
        if ch == "[" and not is_escape_char: depth += 1
        elif ch == "]" and depth > 0 and not is_escape_char: depth -= 1
        elif ch.isspace() and depth == 0:
            if cur: parts.append("".join(cur)); cur = []
            i += 1
            continue

        cur.append(ch)
        i += 1
        if is_escape_char and i < len(s):
            pass

    if cur: parts.append("".join(cur))
    if not parts: return _acceptor("")

    # Composition of FST atoms
    fst = _compile_atom(parts[0], env)
    for p in parts[1:]:
        fst = fst + _compile_atom(p, env)
    return fst.optimize()

def compile_lines(lines: List[Line]) -> Fst:
    env = Env()

    vars_to_compile: List[VarAssign] = []
    for kind, payload in lines:
        if kind == "var":
            vars_to_compile.append(payload) # type: ignore[arg-type]

    for va in vars_to_compile:
        env.vars[va.name] = _compile_seq(va.expr, env).optimize()

    all_rules: List[Rule] = []
    for kind, payload in lines:
        if kind == "rule":
            all_rules.append(payload) # type: ignore[arg-type]

    cascade = env.I_sigma_star

    for r in all_rules:
        # X is the target (r.lhs), Y is the replacement (r.rhs)
        X_target = _compile_seq(r.lhs, env)
        Y_replacement = _compile_seq(r.rhs, env)

        # L and R are the context FSTs (r.left, r.right)
        L_ctx = _compile_seq(r.left, env)
        R_ctx = _compile_seq(r.right, env)

        def add_rule(X: Fst, Y: Fst, L: Fst, R: Fst, raw_rule: str):
            nonlocal cascade

            if X.num_states() == 0:
                print(f"Warning: Empty LHS FST for rule '{raw_rule}'. Skipping.")
                return

            # Create the rewriting transducer (tau = X -> Y)
            try:
                rewrite = pynini.cdrewrite(
                    pynini.cross(X, Y),
                    L, R,
                    env.sigma_star
                ).optimize()
            except pynini.FstOpError as e:
                print(f"Severe Warning: Rewrite failed for rule '{raw_rule}'. Error: {e}", file=sys.stderr)
                return

            composed = cascade @ rewrite
            cascade = composed.optimize()

        # The rule structure requires L, X, R to be correctly assigned before calling add_rule
        # This is handled by _extract_cldr_context_parts
        if r.op == ">":
            add_rule(X_target, Y_replacement, L_ctx, R_ctx, r.raw_lhs + r.op + r.rhs)
        elif r.op == "<":
            # Inverse: Y->X / R_rev _ L_rev
            add_rule(Y_replacement, X_target, R_ctx, L_ctx, r.raw_lhs + r.op + r.rhs)
        elif r.op == "<>":
            add_rule(X_target, Y_replacement, L_ctx, R_ctx, r.raw_lhs + r.op + r.rhs)
            add_rule(Y_replacement, X_target, R_ctx, L_ctx, r.raw_lhs + r.op + r.rhs)

    try:
        cascade = pynini.determinize(cascade).minimize()
    except pynini.FstOpError as e:
        print(f"Warning: Failed to determinize/minimize final language cascade. Error: {e}", file=sys.stderr)
        cascade.optimize()

    return cascade

def prefix_language_token(lang_token: str) -> Fst:
    return pynini.cross(_acceptor(lang_token), _acceptor(""))

def build_multilingual_transducer(lang_to_rules_text: Dict[str, str], token_fmt: str = "<{lang}>") -> Fst:
    unified: Optional[Fst] = None
    for lang, text in lang_to_rules_text.items():
        lines = parse_cldr(text)
        t = compile_lines(lines)
        lt = prefix_language_token(token_fmt.format(lang=lang)) + t
        lt.optimize()
        unified = lt if unified is None else (unified | lt)
    if unified is None:
        unified = pynini.transducer("", "")

    return pynini.determinize(unified).minimize()

def save_fst(fst: Fst, path: str) -> None:
    fst.write(path)

def demo():
    cldr_en = r"""
        $V = [aeiou] ;
        # Simple rewrite (no context)
        th > θ ;
        sh > ʃ ;
        ng > ŋ ;
        ch > t͡ʃ ;
    """

    cldr_es = r"""
        $V = [aeiou] ;
        $E_OR_I = [ei];
        :: [a-zñ] ;

        # 1. CLDR Lookahead (Soft C): Rewrite 'c' to 's' if FOLLOWED by 'e' or 'i'.
        # X { R > Y: Target X=c, R=E_OR_I, Y=s.
        c { $E_OR_I > s;

        # 2. CLDR Lookbehind: Rewrite 's' to 'z' if PRECEDED by a vowel.
        # L } X > Y: L=V, X=s, Y=z.
        $V } s > z;

        # 3. CLDR Lookbehind (Boundary): Rewrite ll to lambda if preceded by start of string
        ^ } ll > ʎ;

        # 4. Simple rewrites
        ñ > ɲ ;
        qu > k ;

        ::Null;
    """

    lang_rules = {"eng": cldr_en, "spa": cldr_es}

    fst = build_multilingual_transducer(lang_rules, token_fmt="<{lang}>")
    save_fst(fst, "multilang.fst")

    tests = [
        "<eng>thing",      # th -> θ
        "<eng>mashing",    # sh -> ʃ, ng -> ŋ

        # SPA tests using CLDR syntax:
        "<spa>cama",     # Rule 1: 'c' followed by 'a' (no match) -> cama. Rule 2: 's' not present. -> cama
        "<spa>cita",     # Rule 1: c { [ei] -> s: sita. Rule 2: 's' preceded by 'i' ($V } s): sita -> siza. Final: siza
        "<spa>ceca",     # Rule 1: c { [ei] -> s: seca. Rule 2: 's' preceded by 'e' ($V } s): seca -> seza. Final: seza
        "<spa>sopa",     # Rule 2: 's' not preceded by vowel (at start) -> sopa
        "<spa>mesa",     # Rule 2: $V } s -> z: meza
        "<spa>acacia",   # 2nd 'c' followed by 'i' (Rule 1): acasia. 1st 's' preceded by 'a' (Rule 2): acazia. Final: acazia
        "<spa>llave",    # Rule 3: ^ } ll -> ʎ: ʎave
    ]

    def apply(s: str) -> str:
        lat = pynini.compose(_acceptor(s), fst)
        if lat.start() == pynini.NO_STATE_ID:
            return "<no-path>"

        raw_output = pynini.shortestpath(lat, 1).string(token_type="utf8")
        if isinstance(raw_output, bytes):
            return raw_output.decode("utf-8")
        return raw_output

    print("--- Transliteration Results ---")
    for t in tests:
        result = apply(t)
        print(f"{t} -> {result}")

    print("Wrote multilang.fst")

if __name__ == "__main__":
    demo()




FstOpError: Operation failed

In [77]:
demo()

<eng>thing -> <no-path>
<eng>mashing -> <no-path>
<spa>llama -> <no-path>
<spa>quiza -> kasa
<spa>cita -> sata
<spa>cuna -> kana
Wrote multilang.fst
