# RAG-Enhanced Translation System

This notebook builds on the baseline results from `Baseline.ipynb`.

**Features:**
- **Adaptive Retrieval**: Context-aware glossary and TM retrieval
- **Comprehensive Evaluation**: RAG vs baseline comparison

---

## 1. Setup & Data Loading

In [2]:
import os
import re
import json
import time
import random
import warnings
import hashlib
import threading
from pathlib import Path
from typing import Any, Dict, List, Tuple, Optional

from concurrent.futures import ThreadPoolExecutor, as_completed

import numpy as np
import pandas as pd
from dotenv import load_dotenv
from tqdm import tqdm

# RAG-specific
from sentence_transformers import SentenceTransformer
import chromadb

# OpenAI client
from openai import OpenAI

warnings.filterwarnings("ignore")
pd.set_option("display.max_columns", None)
pd.set_option("display.width", None)

load_dotenv()
print("RAG-Enhanced Translation System (Clean Rewrite)")
print("=" * 70)

def flatten_json_strings(obj: Any, prefix: str = "") -> List[Tuple[str, str]]:
    out: List[Tuple[str, str]] = []
    if isinstance(obj, dict):
        for k, v in obj.items():
            new_prefix = f"{prefix}.{k}" if prefix else k
            out.extend(flatten_json_strings(v, new_prefix))
    elif isinstance(obj, list):
        for i, v in enumerate(obj):
            new_prefix = f"{prefix}[{i}]"
            out.extend(flatten_json_strings(v, new_prefix))
    elif isinstance(obj, str) and obj.strip():
        out.append((prefix, obj))
    return out

SRC_FILE = Path("data/en.json")
if not SRC_FILE.exists():
    raise FileNotFoundError("Missing data/en.json")

with open(SRC_FILE, "r", encoding="utf-8") as f:
    en_json = json.load(f)

en_segments: List[Tuple[str, str]] = flatten_json_strings(en_json)
print(f"Loaded {len(en_segments)} source segments")

try:
    def _sha1(p): 
        h = hashlib.sha1()
        with open(p, 'rb') as f:
            for chunk in iter(lambda: f.read(8192), b''): 
                h.update(chunk)
        return h.hexdigest()
    SOURCE_SHA = _sha1("data/en.json")
except Exception:
    SOURCE_SHA = None

RAG-Enhanced Translation System (Clean Rewrite)
Loaded 76 source segments


## 2. Load Translation Memory (TM) & Glossary

In [3]:
def load_translation_memory() -> Dict[str, Dict[str, str]]:
    tm_dict = {"fr": {}, "ja": {}, "it": {}}
    tm_file = Path("data/translation_memory.csv")
    if tm_file.exists():
        try:
            df = pd.read_csv(tm_file)
            for _, row in df.iterrows():
                lang = row.get("tgt_lang")
                src = row.get("src_text")
                tgt = row.get("tgt_text")
                if lang in tm_dict and isinstance(src, str) and isinstance(tgt, str) and src and tgt:
                    tm_dict[lang][src] = tgt
            for lang in ["fr", "ja", "it"]:
                print(f"TM {lang.upper()}: {len(tm_dict[lang])} entries")
        except Exception as e:
            print(f"Could not load TM: {e}")
    else:
        print("No translation_memory.csv found")
    return tm_dict


def load_glossary() -> Tuple[List[str], Dict[str, Dict[str, str]], List[str]]:
    glossary_terms: List[str] = []
    glossary_map: Dict[str, Dict[str, str]] = {"fr": {}, "ja": {}, "it": {}}
    dnt_terms: List[str] = ["NaiLit"]

    gl_file = Path("data/glossary.csv")
    if gl_file.exists():
        try:
            df = pd.read_csv(gl_file)
            if "source_term" not in df.columns:
                raise ValueError("glossary.csv must have a 'source_term' column")
            glossary_terms = [t for t in df["source_term"].dropna().astype(str).tolist() if t]
            for lang in ["fr", "ja", "it"]:
                if lang in df.columns:
                    col = df[lang].astype(str)
                    glossary_map[lang] = {
                        st: tt for st, tt in zip(df["source_term"], col)
                        if pd.notna(st) and pd.notna(tt)
                    }
                    print(f"Glossary {lang.upper()}: {len(glossary_map[lang])} mappings")
            if "dnt" in df.columns:
                mask = df["dnt"].astype(str).str.upper() == "TRUE"
                dnt_terms.extend(df.loc[mask, "source_term"].dropna().astype(str).tolist())
        except Exception as e:
            print(f"Could not load glossary: {e}")
    else:
        print("No glossary.csv found")

    dnt_terms = sorted(list({t for t in dnt_terms if t}))
    return glossary_terms, glossary_map, dnt_terms

TM_DICT = load_translation_memory()
GLOSSARY_TERMS, GLOSSARY_MAP, DNT_TERMS = load_glossary()
print(f"DNT terms: {DNT_TERMS}")

TM FR: 2 entries
TM JA: 2 entries
TM IT: 2 entries
Glossary FR: 18 mappings
Glossary JA: 18 mappings
Glossary IT: 18 mappings
DNT terms: ['Gel-X', 'NaiLit']


## 3. Embeddings & Vector DB

In [4]:
print("Loading embedding model…")
EMB_MODEL = SentenceTransformer("intfloat/multilingual-e5-base")

CHROMA_PATH = ".chroma"
chroma_client = chromadb.PersistentClient(path=CHROMA_PATH)
GLOSSARY_COL = chroma_client.get_or_create_collection(
    name="glossary", 
    metadata={"hnsw:space": "cosine"}
)

if GLOSSARY_TERMS:
    print("Indexing glossary…")
    ids_all = [str(i) for i in range(len(GLOSSARY_TERMS))]
    try:
        existing = set(GLOSSARY_COL.get(ids=ids_all)["ids"])  # may raise if none
    except Exception:
        existing = set()
    to_add = [(i, t) for i, t in enumerate(GLOSSARY_TERMS) if str(i) not in existing]
    if to_add:
        batch_terms = [t for _, t in to_add]
        embs = EMB_MODEL.encode(batch_terms, batch_size=64, normalize_embeddings=True, show_progress_bar=True)
        GLOSSARY_COL.add(
            ids=[str(i) for i, _ in to_add], 
            documents=batch_terms, 
            embeddings=[e.tolist() for e in embs]
        )
    print("Glossary ready in Chroma")

Loading embedding model…
Indexing glossary…
Glossary ready in Chroma


## 4. Retrieval & Utility Helpers

In [5]:
HTML_TAG = re.compile(r"</?\w+(?:\s+[^>]*?)?>", re.IGNORECASE)
_WORD = re.compile(r"\w+", re.UNICODE)

def _normalize_for_retrieval(s: str) -> str:
    s = HTML_TAG.sub("", s or "")
    s = re.sub(r"\s+", " ", s).strip().lower()
    return s

def _tokenize(s: str) -> set:
    return set(w.lower() for w in _WORD.findall(s or ""))

def tm_lookup(src_text: str, lang: str) -> Optional[str]:
    return TM_DICT.get(lang, {}).get(src_text)

def tags_preserved(src: str, tgt: str) -> bool:
    return HTML_TAG.findall(src or "") == HTML_TAG.findall(tgt or "")

def retrieve_glossary_terms(
    segment_text: str, 
    top_k: int = 5,
    min_score: float = 0.45, 
    overfetch: int = 24,
    require_lex_for_long: bool = True
) -> List[str]:
    if not (GLOSSARY_COL and GLOSSARY_TERMS):
        return []
    norm = _normalize_for_retrieval(segment_text)
    # E5 query style
    q_vec = EMB_MODEL.encode([f"query: {norm}"], normalize_embeddings=True)[0].tolist()
    try:
        res = GLOSSARY_COL.query(query_embeddings=[q_vec], n_results=max(top_k * 3, overfetch))
        docs: List[str] = res.get("documents", [[]])[0] if res else []
        dists = res.get("distances", [[]])[0] if res else []
        sims = [(1.0 - d) if (0.0 <= d <= 2.0) else d for d in dists] if dists else [0.0] * len(docs)
    except Exception as e:
        print(f"Retrieval error: {e}")
        return []

    seg_tokens = _tokenize(norm)

    def _lex_boost(term: str) -> float:
        t = term.lower()
        b = 0.0
        if t in norm:
            b += 0.12
        overlap = len(seg_tokens & _tokenize(t))
        if overlap:
            b += min(0.05 * overlap, 0.15)
        return b

    long_text = len(norm) >= 80 if require_lex_for_long else False
    scored: Dict[str, float] = {}
    for term, base in zip(docs, sims):
        lb = _lex_boost(term)
        if long_text and lb == 0.0:
            continue
        score = base + lb
        if score >= min_score:
            if term not in scored or score > scored[term]:
                scored[term] = score

    ranked = sorted(scored.items(), key=lambda x: x[1], reverse=True)
    return [t for t, _ in ranked[:top_k]]

def build_constraints(src_text: str, lang: str, top_k: int = 3) -> List[str]:
    if lang not in GLOSSARY_MAP:
        return []
    terms = retrieve_glossary_terms(src_text, top_k=top_k, min_score=0.45)
    pairs: List[str] = []
    for en_term in terms:
        tgt = GLOSSARY_MAP[lang].get(en_term)
        if isinstance(tgt, str) and tgt:
            pairs.append(f"{en_term} → {tgt}")
    return pairs

## 5. Open AI client

In [6]:
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
if not OPENAI_API_KEY:
    raise RuntimeError("Missing OPENAI_API_KEY")

OPENAI_CLIENT = OpenAI(api_key=OPENAI_API_KEY)

try:
    OPENAI_MODELS
except NameError:
    OPENAI_MODELS = {}

try:
    BASELINE_MODELS
except NameError:
    BASELINE_MODELS = {}

OPENAI_MODELS["gpt-4o-mini"] = {
    "model": os.getenv("OPENAI_BASELINE_MODEL", "gpt-4o-mini")
}
BASELINE_MODELS["gpt-4o-mini"] = ("openai", OPENAI_MODELS["gpt-4o-mini"])

print("OPENAI models:", OPENAI_MODELS)

OPENAI models: {'gpt-4o-mini': {'model': 'gpt-4o-mini'}}


## 6. Few-shots & Prompt helpers

In [7]:
FEWSHOT_EXAMPLES = {
    "fr": [
        ("Each beautiful press-on nail is carefully crafted by <strong>our in-house</strong> nail techs.",
            "Chaque beau press-on nail est soigneusement conçu par nos stylistes ongulaires <strong>maison</strong>."),
        ("Instead of using brittle acrylic nail bases, NaiLit uses Gel-X soft gel nail bases that provide just the right curvature and comfort for your natural nails.",
            "Au lieu d’utiliser des bases d’ongles fragiles en acrylic, NaiLit utilise des bases en gel souple Gel-X qui offrent la courbure et le confort idéaux pour vos ongles naturels."),
        ("Whether you feel like elegant nude French coffin nails or trendy seafoam cat eye stiletto nails, NaiLit has just the right options for you to fully customize.",
            "Que vous ayez envie d’un nude French ballerine élégant ou d’un vert d’eau cat-eye pointu tendance, NaiLit vous propose les options idéales pour une personnalisation totale."),
    ],
    "ja": [
        ("Each beautiful press-on nail is carefully crafted by <strong>our in-house</strong> nail techs.",
            "それぞれの美しい press-on nail は、<strong>当社専属</strong>のネイリストが丁寧に仕上げています。"),
        ("Instead of using brittle acrylic nail bases, NaiLit uses Gel-X soft gel nail bases that provide just the right curvature and comfort for your natural nails.",
            "割れやすいアクリルベースの代わりに、NaiLit は柔らかいジェルの Gel-X ベースを使用し、地爪に最適なカーブと快適さを提供します。"),
        ("Whether you feel like elegant nude French coffin nails or trendy seafoam cat eye stiletto nails, NaiLit has just the right options for you to fully customize.",
            "エレガントなヌーディーのバレリーナフレンチや、トレンディなミント系のマグネット（cat-eye）ポイントネイルなど、NaiLit なら思い通りにカスタマイズできます。"),
    ],
    "it": [
        ("Each beautiful press-on nail is carefully crafted by <strong>our in-house</strong> nail techs.",
            "Ogni bellissima press-on nail è accuratamente realizzata dalle nostre onicotecniche <strong>della casa</strong>."),
        ("Instead of using brittle acrylic nail bases, NaiLit uses Gel-X soft gel nail bases that provide just the right curvature and comfort for your natural nails.",
            "Invece di utilizzare basi per unghie fragili in acrylic, NaiLit impiega basi in gel morbido Gel-X che garantiscono la curvatura e il comfort perfetti per le tue unghie naturali."),
        ("Whether you feel like elegant nude French coffin nails or trendy seafoam cat eye stiletto nails, NaiLit has just the right options for you to fully customize.",
            "Che tu abbia voglia di un’elegante nude French ballerina o di un trendy verde acqua cat-eye stiletto, NaiLit ha le opzioni perfette per personalizzare al massimo."),
    ],
}

LANGUAGE_NAMES_HUMAN = {"fr": "French", "ja": "Japanese", "it": "Italian"}

def render_fewshots(target_lang: str, constraints: list[str] | None = None) -> str:
    """Render few-shot examples and optional glossary constraints for a given language."""
    pairs = FEWSHOT_EXAMPLES.get(target_lang, [])
    examples_txt = []
    for i, (src, tgt) in enumerate(pairs, 1):
        examples_txt.append(
            f"Example {i}:\nSource:\n{src}\nTarget ({target_lang}):\n{tgt}\n"
        )

    cons_txt = ""
    if constraints:
        cons_txt = "Glossary constraints (use exactly when relevant):\n" + "\n".join(
            f"- {c}" for c in constraints
        ) + "\n\n"

    return (
        cons_txt
        + "### Few-shot Examples\n"
        + "\n".join(examples_txt)
        + "\n"
    )

## 7. Usage Meter

In [8]:
def _backoff_sleep(attempt: int, base: float = 0.5, jitter: float = 0.2):
    import time, random
    time.sleep(base * (2 ** attempt) + random.random() * jitter)

# Cache: (lang, src_text, sorted_constraints)
CACHE_TRANSLATIONS: Dict[Tuple[str, str, Tuple[str, ...]], str] = {}

def safe_usage_tokens(resp):
    """Return (input_tokens, output_tokens) from OpenAI responses; fallback to (0,0)."""
    try:
        u = getattr(resp, "usage", None)
        if u:
            if hasattr(u, "prompt_tokens") and hasattr(u, "completion_tokens"):
                return int(u.prompt_tokens or 0), int(u.completion_tokens or 0)
            if hasattr(u, "input_tokens") and hasattr(u, "output_tokens"):
                return int(u.input_tokens or 0), int(u.output_tokens or 0)
    except Exception:
        pass
    return 0, 0

import threading, time

class UsageMeter:
    def __init__(self):
        self.lock = threading.Lock()
        self.input_tokens = 0
        self.output_tokens = 0
        self.wall_seconds = 0.0
    def add(self, in_toks: int, out_toks: int, dt: float):
        with self.lock:
            self.input_tokens += int(in_toks or 0)
            self.output_tokens += int(out_toks or 0)
            self.wall_seconds += float(dt or 0.0)
    def snapshot(self):
        with self.lock:
            return {
                "input_tokens": self.input_tokens,
                "output_tokens": self.output_tokens,
                "wall_seconds": self.wall_seconds,
            }

CURRENT_METER: Optional[UsageMeter] = None

def openai_chat_with_usage(model: str, system_prompt: str, user_prompt: str) -> str:
    global CURRENT_METER
    t0 = time.time()
    resp = OPENAI_CLIENT.chat.completions.create(
        model=model,
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user",   "content": user_prompt},
        ],
        temperature=0.2,
        max_tokens=2048,
    )
    dt = time.time() - t0
    in_toks, out_toks = safe_usage_tokens(resp)
    if CURRENT_METER is not None:
        CURRENT_METER.add(in_toks, out_toks, dt)
    return (resp.choices[0].message.content or "").strip()

## 8. Translation Function (Single Segment)

In [9]:
MAX_RAG_RETRIES = int(os.getenv("RAG_MAX_RETRIES", "3"))

def _extract_translation_block(text: str) -> str:
    # Return content inside <translation>...</translation>, or raw fallback
    m = re.search(r"<translation>([\s\S]*?)</translation>", text or "", re.IGNORECASE)
    return (m.group(1).strip() if m else (text or "").strip())

def _build_user_prompt(src_text: str, lang: str, constraints: list[str] | None = None) -> str:
    # Compose the user prompt with shared few-shots and optional constraints
    lang_name = (LANGUAGE_NAMES_HUMAN.get(lang, lang) if "LANGUAGE_NAMES_HUMAN" in globals() else lang)
    fewshots_block = render_fewshots(lang, constraints=constraints)
    return (
        f"You are a professional UX translator. Translate the source into {lang_name}.\n"
        "Requirements:\n"
        "- Preserve all HTML tags exactly (do not add/remove/reorder tags).\n"
        "- Keep brand names and DNT terms as-is (case-sensitive), e.g., NaiLit.\n"
        "- Be natural, fluent, and consistent with terminology.\n"
        "- When glossary constraints are provided, apply them exactly when relevant.\n"
        "- Return ONLY the translation between <translation> and </translation>. Do not add notes.\n\n"
        f"{fewshots_block}"
        "Source:\n"
        f"{src_text}\n\n"
        "<translation>"
    )

def _model_name_for_rag() -> str:
    # Resolve the model string with fallbacks
    return (
        (OPENAI_MODELS.get("gpt-4o-mini", {}).get("model") if "OPENAI_MODELS" in globals() else None)
        or (globals().get("RAG_MODEL", {}).get("config", {}).get("model"))
        or os.getenv("OPENAI_RAG_MODEL", "gpt-4o-mini")
    )

def translate_segment_with_rag(src_text: str, lang: str, precomputed_constraints: Optional[List[str]] = None) -> str:
    # 0) TM exact match
    tm_hit = tm_lookup(src_text, lang)
    if tm_hit:
        return tm_hit

    # 1) Constraints + cache key
    constraints = (
        precomputed_constraints
        if precomputed_constraints is not None
        else build_constraints(src_text, lang, top_k=3)
    )
    constraints_sorted = tuple(sorted(constraints)) if constraints else tuple()
    cache_key = (lang, src_text, constraints_sorted)
    if cache_key in CACHE_TRANSLATIONS:
        return CACHE_TRANSLATIONS[cache_key]

    # 2) Prompts
    system_prompt = (
        "You are a precise, format-strict translator. "
        "Reply with only the target text enclosed in <translation>...</translation>."
    )
    user_prompt = _build_user_prompt(src_text, lang, constraints=list(constraints_sorted) or None)
    model_name = _model_name_for_rag()

    def _call_openai(prompt: str) -> str:
        # Records usage/latency if CURRENT_METER is set in the notebook
        return openai_chat_with_usage(model_name, system_prompt, prompt)

    # 3) Call with retries
    raw = None
    for attempt in range(MAX_RAG_RETRIES):
        try:
            raw = _call_openai(user_prompt)
            break
        except Exception as e:
            if attempt == MAX_RAG_RETRIES - 1:
                print(f"RAG call failed (final): {e}")
                CACHE_TRANSLATIONS[cache_key] = "[RAG_TRANSLATION_ERROR]"
                return CACHE_TRANSLATIONS[cache_key]
            _backoff_sleep(attempt)

    # 4) Extract translation block
    translation = _extract_translation_block(raw)

    # 5) Validate tag fidelity; one corrective retry if needed
    if not tags_preserved(src_text, translation):
        retry_prompt = user_prompt.replace(
            "<translation>",
            "IMPORTANT: Copy every HTML tag exactly as in the source. Only the translation.\n\n<translation>"
        )
        try:
            raw2 = _call_openai(retry_prompt)
            translation2 = _extract_translation_block(raw2)
            if tags_preserved(src_text, translation2):
                translation = translation2
        except Exception:
            pass  # keep the first translation even if retry fails

    # 6) Cache + return
    CACHE_TRANSLATIONS[cache_key] = translation
    return translation


## 8. End-to-End RAG Batch T9N Pipeline Config

In [10]:
from concurrent.futures import ThreadPoolExecutor, as_completed
import os, json, time
from pathlib import Path
from typing import Dict, List, Tuple, Any
from tqdm import tqdm

# Config
TARGET_LANGUAGES = ["fr", "ja", "it"]
LANGUAGE_NAMES = {"fr": "French", "ja": "Japanese", "it": "Italian"}
MAX_WORKERS = max(1, int(os.getenv("RAG_MAX_WORKERS", "4"))) 
MODEL_NAME = (globals().get("RAG_MODEL", {}).get("name") or "gpt-4o-mini")

print("Pipeline configuration:")
print(f" Languages: {', '.join(TARGET_LANGUAGES)}")
print(f" Segments: {len(en_segments)}")
print(f" Max workers: {MAX_WORKERS}")
print("=" * 70)


# RAG translation over all segments for a single target language
def translate_all_segments_rag(target_lang: str) -> List[Dict[str, Any]]:
    start = time.time()

    # 1) Deduplicate by source text (preserve stable order for reproducibility)
    src_to_paths: Dict[str, List[str]] = {}
    for path, src in en_segments:
        src_to_paths.setdefault(src, []).append(path)
    unique_srcs = sorted(src_to_paths.keys())  # stable, deterministic order
    uniq_count = len(unique_srcs)

    # 2) Precompute constraints and TM flags once per-unique source
    src_to_constraints: Dict[str, List[str]] = {}
    src_tm_hit: Dict[str, bool] = {}
    total_constraints_used = 0
    for src in unique_srcs:
        cons = build_constraints(src, target_lang, top_k=3)
        src_to_constraints[src] = cons
        total_constraints_used += len(cons)
        src_tm_hit[src] = bool(tm_lookup(src, target_lang))
    tm_hits = sum(1 for v in src_tm_hit.values() if v)

    # 3) Translate unique sources (sequential if MAX_WORKERS=1; otherwise threaded)
    results_map: Dict[str, str] = {}

    if MAX_WORKERS == 1:
        # Sequential path: friendlier to rate limits and easier to debug
        for src in tqdm(unique_srcs, total=uniq_count, desc=f"RAG → {target_lang} (seq)"):
            try:
                results_map[src] = translate_segment_with_rag(src, target_lang, src_to_constraints[src])
            except Exception as e:
                snippet = (src[:60] + "…") if len(src) > 60 else src
                print(f"Translate failed: {e} | src≈ {snippet!r}")
                results_map[src] = "[RAG_TRANSLATION_ERROR]"
    else:
        # Threaded path: faster but be mindful of provider rate limits
        futures = {}
        try:
            with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
                for src in unique_srcs:
                    fut = ex.submit(translate_segment_with_rag, src, target_lang, src_to_constraints[src])
                    futures[fut] = src

                for fut in tqdm(as_completed(futures), total=len(futures), desc=f"RAG → {target_lang}"):
                    src = futures[fut]
                    try:
                        results_map[src] = fut.result()
                    except Exception as e:
                        snippet = (src[:60] + "…") if len(src) > 60 else src
                        print(f"⚠️ Translate failed: {e} | src≈ {snippet!r}")
                        results_map[src] = "[RAG_TRANSLATION_ERROR]"
        except KeyboardInterrupt:
            # Best-effort cleanup on interrupt
            for fut in futures:
                fut.cancel()
            raise

    # 4) Expand back to all original segments (preserve original order)
    out: List[Dict[str, Any]] = []
    now = time.strftime("%Y-%m-%d %H:%M:%S")
    for path, src in en_segments:
        constraints = src_to_constraints.get(src, [])
        out.append({
            "path": path,
            "source": src,
            "translation": results_map.get(src, "[RAG_TRANSLATION_ERROR]"),
            "model": MODEL_NAME,
            "approach": "RAG",
            "target_lang": target_lang,
            "tm_hit": src_tm_hit.get(src, False),
            "constraints_found": len(constraints),
            "constraints_list": constraints,
            "timestamp": now,
            **({"source_sha": SOURCE_SHA} if SOURCE_SHA else {}),
        })

    # 5) Save + logging
    dur = time.time() - start
    seg_per_sec = len(out) / max(dur, 1e-6)

    out_dir = Path("translations/rag")
    out_dir.mkdir(parents=True, exist_ok=True)
    out_file = out_dir / f"{target_lang}.json"
    with open(out_file, "w", encoding="utf-8") as f:
        json.dump(out, f, ensure_ascii=False, indent=2)

    print(f"RAG translation completed: {target_lang}")
    print(f"   • Unique sources: {uniq_count}  | Segments: {len(out)}")
    print(f"   • Duration: {dur:.1f}s  | Speed: {seg_per_sec:.2f} seg/s")
    print(f"   • TM hits (unique-src): {tm_hits}/{uniq_count} ({tm_hits/max(uniq_count,1):.1%})")
    print(f"   • Constraints found (unique-src): {total_constraints_used}")
    print(f"   • Saved: {out_file}")

    return out

Pipeline configuration:
 Languages: fr, ja, it
 Segments: 76
 Max workers: 4


## 9. Execute RAG T9N

In [11]:
import time
from pathlib import Path
from typing import Dict, List, Any
import pandas as pd

RAG_RESULTS: Dict[str, List[Dict[str, Any]]] = {}
RAG_SUMMARY: Dict[str, Any] = {}
RAG_METERS:  Dict[str, Dict[str, float]] = {}
summary_rows: List[Dict[str, Any]] = []

for lang in TARGET_LANGUAGES:
    print(f"\nStarting {LANGUAGE_NAMES[lang]} ({lang})…")

    # Start a fresh usage meter per language if metering is available
    if "UsageMeter" in globals():
        CURRENT_METER = UsageMeter()  # type: ignore[name-defined]
    else:
        CURRENT_METER = None  # metering disabled

    t0 = time.time()
    try:
        # Run translation (writes translations/rag/{lang}.json internally)
        res = translate_all_segments_rag(lang)
        dt = max(time.time() - t0, 1e-9)

        # Collect usage if available
        usage = {"input_tokens": 0, "output_tokens": 0, "wall_seconds": 0.0}
        if CURRENT_METER is not None and hasattr(CURRENT_METER, "snapshot"):
            usage = CURRENT_METER.snapshot()  # type: ignore[assignment]

        RAG_METERS[lang] = dict(usage)
        RAG_RESULTS[lang] = res

        # Aggregate quick metrics (kept minimal; detailed eval happens later)
        total_segments   = len(res)
        unique_sources   = len({r.get("source") for r in res})
        tm_hits          = sum(1 for r in res if r.get("tm_hit"))
        errors           = sum(1 for r in res if "[RAG_TRANSLATION_ERROR]" in (r.get("translation") or ""))
        total_constraints= sum(int(r.get("constraints_found", 0)) for r in res)
        out_file         = Path("translations/rag") / f"{lang}.json"

        RAG_SUMMARY[lang] = {
            "language": lang,
            "total_segments": total_segments,
            "unique_sources": unique_sources,
            "tm_hits_segments": tm_hits,
            "tm_hit_rate_segments": (tm_hits / total_segments) if total_segments else 0.0,
            "errors": errors,
            "error_rate": (errors / total_segments) if total_segments else 0.0,
            "total_constraints": total_constraints,
            "avg_constraints_per_segment": (total_constraints / total_segments) if total_segments else 0.0,
            "duration_sec": round(dt, 2),
            "segments_per_sec": (total_segments / dt) if dt > 0 else 0.0,
            "output_file": str(out_file),
            "input_tokens": int(usage.get("input_tokens", 0)),
            "output_tokens": int(usage.get("output_tokens", 0)),
            "wall_seconds_metered": float(usage.get("wall_seconds", 0.0)),
        }
        summary_rows.append(RAG_SUMMARY[lang])

    except Exception as e:
        print(f"Failed {LANGUAGE_NAMES[lang]}: {e}")
        RAG_RESULTS[lang] = []
        err_row = {"error": str(e), "language": lang}
        RAG_SUMMARY[lang] = err_row
        summary_rows.append(err_row)

# Concise recap (details & charts are handled in later evaluation cells)
print("\n" + "=" * 70)
print("RAG TRANSLATIONS COMPLETE")
print("=" * 70)
for lang in TARGET_LANGUAGES:
    s = RAG_SUMMARY.get(lang, {})
    if "error" in s:
        print(f"{LANGUAGE_NAMES[lang]} ({lang}): {s['error']}")
    else:
        print(f"{LANGUAGE_NAMES[lang]} ({lang}) — saved: {s['output_file']}")

# Persist a one-line-per-language summary for downstream analysis
Path("eval").mkdir(parents=True, exist_ok=True)
summary_path = Path("eval/rag_run_summary.csv")
pd.DataFrame(summary_rows).to_csv(summary_path, index=False)
print(f"\nRun summary saved to: {summary_path}")


Starting French (fr)…


RAG → fr: 100%|████████████████████████████████████████████████████████████████████████| 73/73 [00:24<00:00,  3.02it/s]


RAG translation completed: fr
   • Unique sources: 73  | Segments: 76
   • Duration: 39.4s  | Speed: 1.93 seg/s
   • TM hits (unique-src): 2/73 (2.7%)
   • Constraints found (unique-src): 203
   • Saved: translations\rag\fr.json

Starting Japanese (ja)…


RAG → ja: 100%|████████████████████████████████████████████████████████████████████████| 73/73 [00:26<00:00,  2.72it/s]


RAG translation completed: ja
   • Unique sources: 73  | Segments: 76
   • Duration: 35.1s  | Speed: 2.17 seg/s
   • TM hits (unique-src): 2/73 (2.7%)
   • Constraints found (unique-src): 203
   • Saved: translations\rag\ja.json

Starting Italian (it)…


RAG → it: 100%|████████████████████████████████████████████████████████████████████████| 73/73 [00:26<00:00,  2.79it/s]

RAG translation completed: it
   • Unique sources: 73  | Segments: 76
   • Duration: 42.5s  | Speed: 1.79 seg/s
   • TM hits (unique-src): 2/73 (2.7%)
   • Constraints found (unique-src): 203
   • Saved: translations\rag\it.json

RAG TRANSLATIONS COMPLETE
French (fr) — saved: translations\rag\fr.json
Japanese (ja) — saved: translations\rag\ja.json
Italian (it) — saved: translations\rag\it.json

Run summary saved to: eval\rag_run_summary.csv





## 10. Baseline Loader

In [16]:
# Baseline loader
# gpt-4o-mini was selected in phase one of the project as the baseline.

from pathlib import Path
from typing import Dict, List, Optional, Tuple
import os, json

_PREFERRED_TOKENS = tuple(os.getenv("BASELINE_MODEL_HINT", "gpt-4o-mini gpt openai claude gemini").split())

def _choose_baseline_dir(
    base_dir: Path,
    langs: List[str],
    prefer: Tuple[str, ...] = _PREFERRED_TOKENS,
) -> Optional[Path]:
    """
    Pick a subdirectory under `base_dir` that contains {lang}.json for all langs.
    Tie-break by:
      (1) folder name contains any token in `prefer` (earlier token = higher score)
      (2) most recent mtime
    Returns Path or None.
    """
    if not base_dir.exists():
        return None

    candidates: List[Tuple[int, float, Path]] = []
    for d in sorted(base_dir.iterdir(), key=lambda p: p.name.lower()):
        if not d.is_dir():
            continue
        if not all((d / f"{lang}.json").exists() for lang in langs):
            continue

        dn = d.name.lower()
        pref_score = 0
        for i, tok in enumerate(prefer):
            if tok and tok.lower() in dn:
                pref_score = max(pref_score, len(prefer) - i)
        candidates.append((pref_score, d.stat().st_mtime, d))

    if not candidates:
        return None

    candidates.sort(key=lambda t: (t[0], t[1]), reverse=True)
    return candidates[0][2]


def load_baseline_results(
    target_languages: Optional[List[str]] = None,
    *,
    base_dir: Path = Path("translations/baseline"),
    prefer_hint_env: str = "BASELINE_MODEL_HINT",
    verbose: bool = True,
) -> Dict[str, List[Dict]]:
    """
    Load baseline translations from `translations/baseline/<chosen>/`.
    - Chooses a folder that has {lang}.json for all requested languages.
    - Prefers directories whose name contains tokens in BASELINE_MODEL_HINT
      (default: "gpt-4o-mini gpt openai claude gemini").
    Returns {lang: [rows]} where each row has: path, source, translation.
    """
    langs = target_languages or globals().get("TARGET_LANGUAGES", [])
    if not langs:
        if verbose:
            print("No target languages provided and TARGET_LANGUAGES not defined.")
        return {}

    prefer_tokens = tuple(os.getenv(prefer_hint_env, "gpt-4o-mini gpt openai claude gemini").split())
    chosen = _choose_baseline_dir(base_dir, langs, prefer=prefer_tokens)
    if not chosen:
        if verbose:
            print(f"No baseline folder with all languages {langs} under: {base_dir}")
        return {}

    if verbose:
        print(f"Loading baseline from: {chosen}")

    out: Dict[str, List[Dict]] = {}
    for lang in langs:
        f = chosen / f"{lang}.json"
        try:
            with f.open("r", encoding="utf-8") as fh:
                out[lang] = json.load(fh)
            if verbose:
                print(f"  • {lang}: {len(out[lang])} segments")
        except Exception as e:
            if verbose:
                print(f"  Failed to load {lang} from {f}: {e}")
            out[lang] = []
    return out


# Baseline Metrics loader
def _choose_baseline_metrics_dir(
    base_dir: Path = Path("eval/baseline"),
    langs: Optional[List[str]] = None,
    prefer: Tuple[str, ...] = _PREFERRED_TOKENS,
) -> Optional[Path]:
    """
    Pick a subdirectory under `eval/baseline/` that contains metrics_{lang}.json
    for all langs. Prefers directories matching tokens in `prefer`, then newest.
    """
    if not base_dir.exists():
        return None
    langs = langs or globals().get("TARGET_LANGUAGES", [])
    if not langs:
        return None

    candidates: List[Tuple[int, float, Path]] = []
    for d in sorted(base_dir.iterdir(), key=lambda p: p.name.lower()):
        if not d.is_dir():
            continue
        if not all((d / f"metrics_{lang}.json").exists() for lang in langs):
            continue

        dn = d.name.lower()
        pref_score = 0
        for i, tok in enumerate(prefer):
            if tok and tok.lower() in dn:
                pref_score = max(pref_score, len(prefer) - i)
        candidates.append((pref_score, d.stat().st_mtime, d))

    if not candidates:
        return None

    candidates.sort(key=lambda t: (t[0], t[1]), reverse=True)
    return candidates[0][2]


def load_baseline_eval_metrics(
    *,
    base_dir: Path = Path("eval/baseline"),
    prefer_hint_env: str = "BASELINE_MODEL_HINT",
    verbose: bool = True,
) -> Dict[str, dict]:
    """
    Load baseline metrics from `eval/baseline/<chosen>/metrics_{lang}.json`.
    Returns {lang: {...metrics...}}. Prefers `gpt-4o-mini` unless overridden by env.
    """
    langs = globals().get("TARGET_LANGUAGES", [])
    if not langs:
        if verbose:
            print("TARGET_LANGUAGES not defined; cannot load baseline metrics.")
        return {}

    prefer_tokens = tuple(os.getenv(prefer_hint_env, "gpt-4o-mini gpt openai claude gemini").split())
    chosen = _choose_baseline_metrics_dir(base_dir=base_dir, langs=langs, prefer=prefer_tokens)
    if not chosen:
        if verbose:
            print(f"No baseline metrics folder found under: {base_dir}")
        return {}

    if verbose:
        print(f"Using baseline eval metrics from: {chosen}")

    out: Dict[str, dict] = {}
    for lang in langs:
        p = chosen / f"metrics_{lang}.json"
        try:
            with p.open("r", encoding="utf-8") as fh:
                out[lang] = json.load(fh)
        except Exception as e:
            if verbose:
                print(f"  couldn't read {p}: {e}")
            out[lang] = {}
    return out

## 11. Quality Evaluation

In [17]:
from __future__ import annotations
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any
import json, os, re, time
import numpy as np
import pandas as pd

# Helpers
_HTML_TAG = re.compile(r"</?\w+(?:\s+[^>]*?)?>", re.IGNORECASE)

def tags_preserved(src: str, tgt: str) -> bool:
    return _HTML_TAG.findall(src or "") == _HTML_TAG.findall(tgt or "")

def dnt_preserved(src: str, tgt: str) -> bool:
    s, t = (src or ""), (tgt or "")
    for term in DNT_TERMS:
        if term in s and s.count(term) != t.count(term):
            return False
    return True

def glossary_adherence(src: str, tgt: str, lang: str) -> float:
    gm = GLOSSARY_MAP.get(lang, {})
    if not gm:
        return 1.0
    s, t = (src or ""), (tgt or "")
    checks = hits = 0
    for en_term, target_term in gm.items():
        if en_term and target_term and en_term in s:
            checks += 1
            if target_term in t:
                hits += 1
    return (hits / checks) if checks else 1.0

def retrieval_precision(cands: List[str], src: str) -> float:
    if not cands:
        return 1.0
    s = (src or "").lower()
    tp = sum(1 for t in cands if (t or "").lower() in s)
    return tp / max(len(cands), 1)

def _fmt(x, unit=""):
    return "n/a" if x is None else f"{x:.2f}{unit}"

# Optional: thin shim so we can call a public loader if it exists
def _try_load_baseline_eval_metrics() -> Dict[str, dict]:
    # Prefer the public helper if you pasted it elsewhere
    if "load_baseline_eval_metrics" in globals() and callable(globals()["load_baseline_eval_metrics"]):
        return globals()["load_baseline_eval_metrics"]()
    # Fallback to newest metrics folder
    root = Path("eval/baseline")
    if not root.exists():
        return {}
    cands = [d for d in root.iterdir() if d.is_dir()]
    cands = [d for d in cands if all((d / f"metrics_{lang}.json").exists() for lang in TARGET_LANGUAGES)]
    if not cands:
        return {}
    chosen = max(cands, key=lambda p: p.stat().st_mtime)
    out: Dict[str, dict] = {}
    for lang in TARGET_LANGUAGES:
        try:
            with open(chosen / f"metrics_{lang}.json", "r", encoding="utf-8") as fh:
                out[lang] = json.load(fh)
        except Exception:
            out[lang] = {}
    print(f"Using baseline eval metrics from: {chosen}")
    return out

# -- main ----------------------------------------------------------------------
def evaluate_rag_compact():
    print("\nRAG EVALUATION (compact)")
    print("=" * 60)

    # Baseline translations: dict or (dict, path)
    _loaded = load_baseline_results()
    if isinstance(_loaded, tuple):
        baseline_results, _ = _loaded
    else:
        baseline_results = _loaded or {}

    baseline_eval = _try_load_baseline_eval_metrics()

    results_rows: List[Dict[str, Any]] = []
    compare_rows: List[Dict[str, Any]] = []

    rag_model_name = (
        (OPENAI_MODELS.get("gpt-4o-mini", {}).get("model") if "OPENAI_MODELS" in globals() else None)
        or globals().get("MODEL_NAME")
        or os.getenv("OPENAI_RAG_MODEL", "gpt-4o-mini")
    )

    for lang in TARGET_LANGUAGES:
        rag = RAG_RESULTS.get(lang) or []
        if not rag:
            print(f"- No RAG results for {lang}")
            continue

        base = (baseline_results or {}).get(lang, [])
        base_lookup = {x.get("path"): x.get("translation", "") for x in base if "path" in x}

        seg_rows: List[Dict[str, Any]] = []
        rag_hyps, base_refs = [], []

        for it in rag:
            src = it.get("source", "")
            tgt = it.get("translation", "")
            path = it.get("path", "")
            retrieved = it.get("constraints_list", []) or []

            seg_rows.append({
                "dnt":   dnt_preserved(src, tgt),
                "gloss": glossary_adherence(src, tgt, lang),
                "tags":  tags_preserved(src, tgt),
                "retp":  retrieval_precision([t.split(" → ")[0] for t in retrieved], src),
                "err":   "[RAG_TRANSLATION_ERROR]" in (tgt or ""),
            })

            if path in base_lookup:
                rag_hyps.append(tgt or "")
                base_refs.append(base_lookup[path] or "")

        tot = len(seg_rows)
        if tot == 0:
            print(f"- No evaluable segments for {lang}")
            continue

        dnt_rate  = sum(r["dnt"]   for r in seg_rows) / tot
        gloss_avg = sum(r["gloss"] for r in seg_rows) / tot
        tag_rate  = sum(r["tags"]  for r in seg_rows) / tot
        retp_avg  = sum(r["retp"]  for r in seg_rows) / tot
        err_rate  = sum(r["err"]   for r in seg_rows) / tot

        sem_sim = None
        if rag_hyps and base_refs:
            try:
                h = EMB_MODEL.encode(rag_hyps, batch_size=64, normalize_embeddings=True)
                b = EMB_MODEL.encode(base_refs, batch_size=64, normalize_embeddings=True)
                sims = (h * b).sum(axis=1).astype(float)
                sem_sim = float(np.clip(sims, -1.0, 1.0).mean())
            except Exception as e:
                print(f"  (semantic sim failed for {lang}: {e})")

        rag_lat   = (RAG_SUMMARY.get(lang) or {}).get("duration_sec")
        rag_speed = (RAG_SUMMARY.get(lang) or {}).get("segments_per_sec")

        base_lat = base_speed = None
        if lang in baseline_eval:
            base_lat   = baseline_eval[lang].get("total_duration_sec")
            spm        = baseline_eval[lang].get("segments_per_minute")
            if isinstance(spm, (int, float)) and spm:
                base_speed = round(spm / 60.0, 4)

        results_rows.append({
            "language": lang,
            "total_segments": tot,
            "dnt_preservation_rate": round(dnt_rate, 3),
            "glossary_adherence_avg": round(gloss_avg, 3),
            "tag_preservation_rate": round(tag_rate, 3),
            "retrieval_precision_avg": round(retp_avg, 3),
            "semantic_similarity_avg": round(sem_sim, 3) if sem_sim is not None else None,
            "error_rate": round(err_rate, 3),
        })

        compare_rows.append({
            "language": lang,
            "rag_model": rag_model_name,
            "rag_duration_sec": rag_lat,
            "rag_segments_per_sec": rag_speed,
            "baseline_duration_sec": base_lat,
            "baseline_segments_per_sec": base_speed,
        })

        print(f"\n{LANGUAGE_NAMES[lang]} ({lang.upper()})")
        print("-" * 40)
        print(f"  DNT {dnt_rate:.1%} | Glossary {gloss_avg:.1%} | Tags {tag_rate:.1%} | Ret@k {retp_avg:.1%}")
        if sem_sim is not None:
            print(f"  Semantic similarity (vs baseline): {sem_sim:.3f}")
        print(f"  RAG  : { _fmt(rag_lat,'s')} | { _fmt(rag_speed,' seg/s')}")
        print(f"  Base : { _fmt(base_lat,'s')} | { _fmt(base_speed,' seg/s')}")

    # Save CSVs
    out_dir = Path("eval"); out_dir.mkdir(parents=True, exist_ok=True)
    rag_df = pd.DataFrame(results_rows)
    cmp_df = pd.DataFrame(compare_rows)
    if not rag_df.empty:
        rag_df.to_csv(out_dir / "rag_comprehensive_evaluation.csv", index=False)
        print(f"\nSaved: {out_dir / 'rag_comprehensive_evaluation.csv'}")
    if not cmp_df.empty:
        cmp_df.to_csv(out_dir / "rag_quality_vs_baseline.csv", index=False)
        print(f"Saved: {out_dir / 'rag_quality_vs_baseline.csv'}")

    return (rag_df if not rag_df.empty else None,
            cmp_df if not cmp_df.empty else None)

# Execute
RAG_DF, RAG_VS_BASELINE_DF = evaluate_rag_compact()


RAG EVALUATION (compact)
Loading baseline from: translations\baseline\gpt-4o-mini
  • fr: 76 segments
  • ja: 76 segments
  • it: 76 segments
Using baseline eval metrics from: eval\baseline\gpt-4o-mini

French (FR)
----------------------------------------
  DNT 100.0% | Glossary 96.1% | Tags 97.4% | Ret@k 18.4%
  Semantic similarity (vs baseline): 0.980
  RAG  : 39.40s | 1.93 seg/s
  Base : 32.69s | 2.33 seg/s

Japanese (JA)
----------------------------------------
  DNT 100.0% | Glossary 99.3% | Tags 97.4% | Ret@k 18.4%
  Semantic similarity (vs baseline): 0.977
  RAG  : 35.09s | 2.17 seg/s
  Base : 22.81s | 3.33 seg/s

Italian (IT)
----------------------------------------
  DNT 100.0% | Glossary 95.6% | Tags 97.4% | Ret@k 18.4%
  Semantic similarity (vs baseline): 0.983
  RAG  : 42.48s | 1.79 seg/s
  Base : 21.91s | 3.47 seg/s

Saved: eval\rag_comprehensive_evaluation.csv
Saved: eval\rag_quality_vs_baseline.csv
