In [5]:
"""
AEQG Normalization — Notebook (Exact‑Keyword Only)
-------------------------------------------------
This module is **normalization only** (no metric computation). Use directly in a
notebook to convert LLM‑noisy rubric cells into strict canonical labels.

Rules:
  • Cells may contain extra prose, but the exact label appears somewhere.
  • Case‑insensitive; supports minor separator variants (e.g., `more_or_less`).
  • Priority when extracting a label from a cell:
      1) Explicit final markers:  "final/overall/decision/verdict: <label>"
      2) Field‑labeled spans:     "Clear: <label>", "WouldYouUseIt=<label>", etc.
      3) Fallback:                **last** canonical token in the cell (word boundaries)
  • Strips any `<think>…</think>` or `<think>…<\think>` blocks before parsing.

Canonical columns (order for no‑header CSVs):
  0 Understandable | 1 TopicRelated | 2 Grammatical | 3 Clear | 4 Rephrase |
  5 Answerable | 6 Central | 7 WouldYouUseIt | 8 Bloom’sLevel

Canonical value sets:
  • Binary:        {"yes","no"}
  • Clear:         {"yes","no","more_or_less"}
  • WouldYouUseIt: {"yes","maybe","no"}
  • Bloom’sLevel:  {"remember","understand","apply","analyze","evaluate","create"}

Notebook usage:
    raw_df = read_csv_no_header("PS1_gemma3:latest.csv")
    clean_df, issues_df = normalize_exact_dataframe(raw_df, has_header=False)
    # -> pass clean_df to your metrics code separately
"""
from __future__ import annotations
from typing import List, Optional, Tuple
import re
import unicodedata
import pandas as pd
import os

# ============================
# Canonical schema / constants
# ============================
COLS = [
    "Understandable",
    "TopicRelated",
    "Grammatical",
    "Clear",
    "Rephrase",
    "Answerable",
    "Central",
    "WouldYouUseIt",
    "Bloom’sLevel",
]

ALLOWED = {
    "Understandable": {"yes", "no"},
    "TopicRelated": {"yes", "no"},
    "Grammatical": {"yes", "no"},
    "Clear": {"yes", "no", "more_or_less"},
    "Rephrase": {"yes", "no"},
    "Answerable": {"yes", "no"},
    "Central": {"yes", "no"},
    "WouldYouUseIt": {"yes", "maybe", "no"},
    "Bloom’sLevel": {"remember", "understand", "apply", "analyze", "evaluate", "create"},
}

# ======================
# Text + pattern helpers
# ======================
_THINK_RE = re.compile(r"<\s*think\s*>.*?<\s*[\\/]\s*think\s*>", re.IGNORECASE | re.DOTALL)
_WS_RE = re.compile(r"\s+")

# exact labels (word boundaries, case‑insensitive)
_RX_YES   = re.compile(r"\byes\b", re.IGNORECASE)
_RX_NO    = re.compile(r"\bno\b", re.IGNORECASE)
_RX_MAYBE = re.compile(r"\bmaybe\b", re.IGNORECASE)

# Clear middle state
_RX_MOL_SPACE = re.compile(r"\bmore\s+or\s+less\b", re.IGNORECASE)
_RX_MOL_DASH  = re.compile(r"\bmore-?or-?less\b", re.IGNORECASE)   # more-or-less / moreorless
_RX_MOL_UND   = re.compile(r"\bmore[_ ]or[_ ]less\b", re.IGNORECASE)

# Bloom (accept UK "analyse" → analyze)
_BLOOM_RXS: List[Tuple[re.Pattern, str]] = [
    (re.compile(r"\bremember\b", re.IGNORECASE),   "remember"),
    (re.compile(r"\bunderstand\b", re.IGNORECASE), "understand"),
    (re.compile(r"\bapply\b", re.IGNORECASE),      "apply"),
    (re.compile(r"\banal(?:y|ys|yz)e\b", re.IGNORECASE), "analyze"),
    (re.compile(r"\bevaluate\b", re.IGNORECASE),   "evaluate"),
    (re.compile(r"\bcreate\b", re.IGNORECASE),     "create"),
]

# Field signatures to clip labeled spans (we still extract exact tokens inside)
_FIELD_SIGS = {
    "Understandable": ["understandable", "understand"],
    "TopicRelated":   ["topicrelated", "topic"],
    "Grammatical":    ["grammatical", "grammar"],
    "Clear":          ["clear"],
    "Rephrase":       ["rephrase", "rephrased", "reword"],
    "Answerable":     ["answerable", "answer"],
    "Central":        ["central", "core"],
    "WouldYouUseIt":  ["wouldyouuseit", "would_use_it", "useit", "use"],
    "Bloom’sLevel":   ["bloom", "bloomslevel", "bloomlevel", "level"],
}

# generic value snippets used only to bound labeled spans
_VAL_YESNO = r"(yes|no)\b"
_VAL_MAYBE = r"(maybe|yes|no)\b"
_VAL_CLEAR = r"(more\s*or\s*less|yes|no)\b"
_VAL_BLOOM = r"(remember|understand|apply|anal(?:y|ys|yz)e|evaluate|create)\b"


def _strip_think(text: object) -> str:
    if text is None or (isinstance(text, float) and pd.isna(text)):
        return ""
    return _THINK_RE.sub(" ", str(text))


def _norm(text: object) -> str:
    s = _strip_think(text)
    s = unicodedata.normalize("NFKC", s)
    s = s.strip().lower()
    return _WS_RE.sub(" ", s)


def _find_last_token(t: str, tokens: List[str]) -> Optional[str]:
    last_label, last_pos = None, -1
    for tok in tokens:
        for m in re.finditer(rf"\b{re.escape(tok)}\b", t, flags=re.IGNORECASE):
            if m.start() > last_pos:
                last_pos, last_label = m.start(), tok
    return last_label


def _compile_field_label_regex(field: str, value_regex: str) -> List[re.Pattern]:
    rxs: List[re.Pattern] = []
    for sig in _FIELD_SIGS[field]:
        rxs.append(re.compile(rf"^(?:\s*{sig}\s*[:=-]\s*{value_regex})", re.IGNORECASE))
        rxs.append(re.compile(rf"\b{sig}\s*[:=-]\s*{value_regex}", re.IGNORECASE))
    return rxs

_RULE2_RXS = {
    "Understandable": _compile_field_label_regex("Understandable", _VAL_YESNO),
    "TopicRelated":   _compile_field_label_regex("TopicRelated",   _VAL_YESNO),
    "Grammatical":    _compile_field_label_regex("Grammatical",    _VAL_YESNO),
    "Clear":          _compile_field_label_regex("Clear",          _VAL_CLEAR),
    "Rephrase":       _compile_field_label_regex("Rephrase",       _VAL_YESNO),
    "Answerable":     _compile_field_label_regex("Answerable",     _VAL_YESNO),
    "Central":        _compile_field_label_regex("Central",        _VAL_YESNO),
    "WouldYouUseIt":  _compile_field_label_regex("WouldYouUseIt",  _VAL_MAYBE),
    "Bloom’sLevel":   _compile_field_label_regex("Bloom’sLevel",   _VAL_BLOOM),
}

_RULE1_RXS = [
    re.compile(r"\bfinal(?:\s*(?:answer|decision|verdict))?\s*[:=-]\s*(.+)$", re.IGNORECASE),
    re.compile(r"\boverall\s*(?:answer|decision|verdict)?\s*[:=-]\s*(.+)$", re.IGNORECASE)
    # re.compile(r"\banswer\s*[:=-]\s*\**([a-z0-9_ -]+)\**", re.IGNORECASE),
    # re.compile(r"\bskilllevel\s*[:=-]\s*\**([a-z0-9_ -]+)\**", re.IGNORECASE)
]


def _first_group(t: str, rx: re.Pattern) -> Optional[str]:
    m = rx.search(t)
    return m.group(1) if m else None

# =============================
# Column extractors (exact only)
# =============================

def extract_yes_no_exact(text: str, field: str) -> str:
    t = _norm(text)
    # Special case: treat bare field-name mentions as "yes"
    if any(sig == t for sig in _FIELD_SIGS[field]):
        return "yes"
    # 1) explicit final marker → search inside
    for rx in _RULE1_RXS:
        grp = _first_group(t, rx)
        if grp:
            sub = _norm(grp)
            lab = _find_last_token(sub, ["yes", "no"])  # last wins
            if lab: return lab
    # 2) field‑labeled span
    for rx in _RULE2_RXS[field]:
        m = rx.search(t)
        if m:
            span = m.group(0)
            lab = _find_last_token(span, ["yes", "no"]) or m.group(m.lastindex)
            if lab:
                lab = lab.lower()
                return "yes" if lab.startswith("y") else ("no" if lab.startswith("n") else lab)
    # 3) fallback: last token in whole text
    lab = _find_last_token(t, ["yes", "no"])  # last wins
    return lab if lab else t


def extract_clear_exact(text: str) -> str:
    t = _norm(text)
    # 1) explicit final marker
    for rx in _RULE1_RXS:
        grp = _first_group(t, rx)
        if grp:
            sub = _norm(grp)
            if _RX_MOL_SPACE.search(sub) or _RX_MOL_DASH.search(sub) or _RX_MOL_UND.search(sub):
                return "more_or_less"
            lab = _find_last_token(sub, ["yes", "no"])  # last wins
            if lab: return lab
    # 2) field‑labeled span
    for rx in _RULE2_RXS["Clear"]:
        m = rx.search(t)
        if m:
            span = _norm(m.group(0))
            if _RX_MOL_SPACE.search(span) or _RX_MOL_DASH.search(span) or _RX_MOL_UND.search(span):
                return "more_or_less"
            lab = _find_last_token(span, ["yes", "no"]) or m.group(m.lastindex)
            if lab:
                lab = lab.lower()
                if lab.startswith("yes"): return "yes"
                if lab.startswith("no"):  return "no"
    # 3) fallback: explicit '"more_or_less"' anywhere, else last yes/no
    if _RX_MOL_SPACE.search(t) or _RX_MOL_DASH.search(t) or _RX_MOL_UND.search(t):
        return "more_or_less"
    lab = _find_last_token(t, ["yes", "no"])  # last wins
    return lab if lab else t


def extract_would_use_exact(text: str) -> str:
    t = _norm(text)
    # 1) explicit final marker
    for rx in _RULE1_RXS:
        grp = _first_group(t, rx)
        if grp:
            sub = _norm(grp)
            lab = _find_last_token(sub, ["maybe", "yes", "no"])  # last wins
            if lab: return lab
    # 2) field‑labeled span
    for rx in _RULE2_RXS["WouldYouUseIt"]:
        m = rx.search(t)
        if m:
            span = _norm(m.group(0))
            lab = _find_last_token(span, ["maybe", "yes", "no"]) or m.group(m.lastindex)
            if lab:
                lab = lab.lower()
                if lab.startswith("may"): return "maybe"
                if lab.startswith("yes"): return "yes"
                if lab.startswith("no"):  return "no"
    # 3) fallback
    lab = _find_last_token(t, ["maybe", "yes", "no"])  # last wins
    return lab if lab else t


def extract_bloom_exact(text: str) -> str:
    t = _norm(text)
    # 1) explicit final marker
    for rx in _RULE1_RXS:
        grp = _first_group(t, rx)
        if grp:
            sub = _norm(grp)
            last_lbl, last_pos = None, -1
            for rx2, lbl in _BLOOM_RXS:
                for m in rx2.finditer(sub):
                    if m.start() > last_pos:
                        last_pos, last_lbl = m.start(), lbl
            if last_lbl: return last_lbl
    # 2) field‑labeled span
    for rx in _RULE2_RXS["Bloom’sLevel"]:
        m = rx.search(t)
        if m:
            span = _norm(m.group(0))
            last_lbl, last_pos = None, -1
            for rx2, lbl in _BLOOM_RXS:
                for m2 in rx2.finditer(span):
                    if m2.start() > last_pos:
                        last_pos, last_lbl = m2.start(), lbl
            if last_lbl: return last_lbl
    # 3) fallback
    last_lbl, last_pos = None, -1
    for rx2, lbl in _BLOOM_RXS:
        for m in rx2.finditer(t):
            if m.start() > last_pos:
                last_pos, last_lbl = m.start(), lbl
    return last_lbl if last_lbl else t

# ======================================
# DataFrame‑level normalization (notebook)
# ======================================

def normalize_exact_dataframe(df: pd.DataFrame, has_header: bool = False) -> tuple[pd.DataFrame, pd.DataFrame]:
    """Normalize AEQG data using **exact** labels only (case/spacing tolerant).

    Parameters
    ----------
    df : DataFrame read from your CSV; if no header, read with header=None first
    has_header : set True only if `df` already has the 9 canonical column names

    Returns
    -------
    clean_df : DataFrame with canonical columns and strict labels
    issues_df: DataFrame listing rows/columns that did not reduce to allowed labels
    """
    raw = df.copy()

    # If no header: assign canonical order/labels
    if not has_header:
        if all(isinstance(c, int) for c in raw.columns):
            if raw.shape[1] != len(COLS):
                raise ValueError(f"Expected {len(COLS)} columns; got {raw.shape[1]}.")
            raw.columns = COLS
        else:
            if len(raw.columns) != len(COLS):
                raise ValueError("Header/DataFrame does not match expected 9 columns.")
            # assume provided header order already matches COLS

    # Column‑wise extraction
    out = pd.DataFrame(index=raw.index)
    out["Understandable"] = raw["Understandable"].map(lambda x: extract_yes_no_exact(x, "Understandable"))
    out["TopicRelated"]   = raw["TopicRelated"].map(lambda x: extract_yes_no_exact(x, "TopicRelated"))
    out["Grammatical"]    = raw["Grammatical"].map(lambda x: extract_yes_no_exact(x, "Grammatical"))
    out["Clear"]          = raw["Clear"].map(extract_clear_exact)
    out["Rephrase"]       = raw["Rephrase"].map(lambda x: extract_yes_no_exact(x, "Rephrase"))
    out["Answerable"]     = raw["Answerable"].map(lambda x: extract_yes_no_exact(x, "Answerable"))
    out["Central"]        = raw["Central"].map(lambda x: extract_yes_no_exact(x, "Central"))
    out["WouldYouUseIt"]  = raw["WouldYouUseIt"].map(extract_would_use_exact)
    out["Bloom’sLevel"]    = raw["Bloom’sLevel"].map(extract_bloom_exact)

    # Validate & collect issues
    rows, cols, vals = [], [], []
    for col, allowed in ALLOWED.items():
        bad = ~out[col].isin(allowed)
        for r in out.index[bad].tolist():
            rows.append(r); cols.append(col); vals.append(out.at[r, col])
    issues = pd.DataFrame({"row": rows, "column": cols, "value": vals})

    return out, issues

# ============================
# Minimal helper for CSV input
# ============================

def read_csv_no_header(path: str, delimiter: str = ",", encoding: Optional[str] = None) -> pd.DataFrame:
    """Read a CSV with **no header row** and assign canonical column names (COLS)."""
    df = pd.read_csv(path, header=None, sep=delimiter, encoding=encoding)
    if df.shape[1] != len(COLS):
        raise ValueError(
            f"Expected {len(COLS)} columns in the CSV (no header). Got {df.shape[1]}.\n"
            f"Ensure the file has exactly these columns in order: {COLS}"
        )
    df.columns = COLS
    return df


In [None]:
# models = ['deepseek-r1:14b', 'phi4:latest', 'gemma3:latest', 'phi4-mini:latest', 'llama3.2:latest']
models = ['gpt-oss:latest', 'granite4:latest', 'mistral-small3.2:latest']
prompts = ['PS1','PS2','PS3','PS4','PS5']
output_dir_clean = 'clean_output'
output_dir_issues = 'clean_output/issues'
for model in models:
    print(f"----------------{model}----------------")
    for prompt in prompts:
        raw_df = read_csv_no_header(f"large_{prompt}_{model}.csv")
        clean_df, issues_df = normalize_exact_dataframe(raw_df, has_header=False)
        clean_df_path = os.path.join(output_dir_clean, f'{prompt}_{model}_clean.csv')
        clean_df.to_csv(clean_df_path, index=False, header=False)
        issues_df_path = os.path.join(output_dir_issues, f'{prompt}_{model}_issues.csv')
        issues_df.to_csv(issues_df_path, index=False, header=False)
    print(f"-------------------------------------")
    print()

----------------gpt-oss:latest----------------
-------------------------------------

----------------granite4:latest----------------
-------------------------------------

----------------mistral-small3.2:latest----------------
-------------------------------------

