# Đánh giá hệ thống Chatbot (Fashion RAG)

Notebook này đánh giá chatbot của bạn theo 5 nhóm chính:

1) **Retrieval/RAG** (khi có nhãn *gold*): Hit@K, Recall@K, MRR, nDCG@K.
2) **Task correctness (không cần reference)**: mức **tuân thủ ràng buộc** (budget, màu, usage/occasion, gender, articleType).
3) **Answer faithfulness/groundedness**: câu trả lời có “bịa” ngoài danh sách `products` trả về hay không (heuristic).
4) **Safety** (heuristic): phát hiện rò rỉ PII (email/phone/CC) và nội dung nhạy cảm theo regex.
5) **Hiệu năng**: latency p50/p90/p95, error-rate, empty-result rate.

> Lưu ý: API hiện **English-only** (trong `app/main.py`). Notebook có kiểm thử hành vi từ chối với query tiếng Việt.

---

## Các độ đo gợi ý (tóm tắt)

### Retrieval / RAG
- **Hit@K**: có ít nhất 1 tài liệu đúng trong top-K.
- **Recall@K**: $\frac{|\text{relevant} \cap \text{topK}|}{|\text{relevant}|}$.
- **MRR**: $\text{MRR} = \frac{1}{N}\sum_i \frac{1}{\text{rank}_i}$ (rank của kết quả đúng đầu tiên).
- **nDCG@K**: đo chất lượng thứ hạng; với relevance nhị phân, DCG@K = $\sum_{j=1..K} \frac{rel_j}{\log_2(j+1)}$.

### Task / Constraint adherence
- **Budget adherence**: mọi item trả về có price nằm trong [$min$, $max$] (nếu có price).
- **Color/Usage/Gender/Type adherence**: tỉ lệ item khớp ràng buộc.

### Hiệu năng
- **Latency p50/p95**, **Error rate**, **Empty products rate**.

---


In [2]:
# 1) Cài đặt & import thư viện
# Notebook ưu tiên chạy được với requirements.txt hiện có.
# Một số thư viện (tqdm, matplotlib, seaborn, rouge_score, sacrebleu, bert_score, tiktoken, jsonschema)
# sẽ được import theo kiểu optional.

from __future__ import annotations

import os
import re
import json
import time
import math
import hashlib
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

import requests
import pandas as pd

# Optional imports
try:
    from tqdm.auto import tqdm
except Exception:
    tqdm = None

try:
    import numpy as np
except Exception:
    np = None

# Optional plotting
try:
    import matplotlib.pyplot as plt
except Exception:
    plt = None

ARTIFACTS_DIR = Path("artifacts")
OUTPUTS_DIR = Path("outputs")
ARTIFACTS_DIR.mkdir(parents=True, exist_ok=True)
OUTPUTS_DIR.mkdir(parents=True, exist_ok=True)

print("Artifacts:", ARTIFACTS_DIR.resolve())
print("Outputs:", OUTPUTS_DIR.resolve())

Artifacts: D:\Study\CS311\CS311\artifacts
Outputs: D:\Study\CS311\CS311\outputs


In [14]:
# 2) Khai báo cấu hình thí nghiệm

API_BASE = os.getenv("API_BASE", "http://127.0.0.1:8081").rstrip("/")
TIMEOUT_S = float(os.getenv("EVAL_TIMEOUT_S", "30"))
MAX_RETRIES = int(os.getenv("EVAL_MAX_RETRIES", "2"))
TOP_K_DEFAULT = int(os.getenv("EVAL_TOP_K", "5"))

CONFIG = {
    "api_base": API_BASE,
    "timeout_s": TIMEOUT_S,
    "max_retries": MAX_RETRIES,
    "top_k_default": TOP_K_DEFAULT,
    "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
}

(ARTIFACTS_DIR / "eval_config.json").write_text(json.dumps(CONFIG, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(CONFIG, ensure_ascii=False, indent=2))


def _request_json(method: str, url: str, payload: Optional[dict] = None) -> Tuple[int, dict, float, Optional[str]]:
    """HTTP helper with retries.

    Returns: (status_code, json_or_error, latency_ms, error_text)
    """
    last_err = None
    for attempt in range(MAX_RETRIES + 1):
        t0 = time.perf_counter()
        try:
            resp = requests.request(
                method=method,
                url=url,
                json=payload,
                timeout=TIMEOUT_S,
            )
            latency_ms = (time.perf_counter() - t0) * 1000
            try:
                data = resp.json()
            except Exception:
                data = {"_raw": resp.text}
            if 200 <= resp.status_code < 300:
                return resp.status_code, data, latency_ms, None
            last_err = f"HTTP {resp.status_code}: {data}"
            # retry on 5xx
            if resp.status_code >= 500 and attempt < MAX_RETRIES:
                time.sleep(0.5 * (attempt + 1))
                continue
            return resp.status_code, data, latency_ms, last_err
        except Exception as e:
            latency_ms = (time.perf_counter() - t0) * 1000
            last_err = f"{type(e).__name__}: {e}"
            if attempt < MAX_RETRIES:
                time.sleep(0.5 * (attempt + 1))
                continue
            return 0, {}, latency_ms, last_err


def healthcheck() -> bool:
    status, data, ms, err = _request_json("GET", f"{API_BASE}/health")
    print("/health", {"status": status, "latency_ms": round(ms, 2), "data": data, "err": err})
    return status == 200


_healthy = healthcheck()
if not _healthy:
    print(
        "\n[HINT] API chưa chạy hoặc sai API_BASE.\n"
        "- Nếu chạy bằng docker compose: mở http://127.0.0.1:8081/docs\n"
        "- Nếu chạy local uvicorn: kiểm tra port và biến API_BASE\n"
    )

{
  "api_base": "http://127.0.0.1:8081",
  "timeout_s": 30.0,
  "max_retries": 2,
  "top_k_default": 5,
  "timestamp": "2026-01-14 09:19:58"
}
/health {'status': 200, 'latency_ms': 6.08, 'data': {'status': 'ok'}, 'err': None}


In [4]:
# 3) Chuẩn hoá schema dữ liệu test & loader (đơn giản)

# Trong project này, endpoint /chat nhận:
# { query: str, top_k: int, filters?: dict, messages?: [{role,content}], max_tokens?: int, temperature?: float }
# Trả về: { answer: str, products: [...], sources: [...] }

@dataclass
class TestCase:
    name: str
    query: str
    expected_ids: Optional[List[str]] = None  # nếu bạn có nhãn product id đúng
    expected_constraints: Optional[Dict[str, Any]] = None
    should_reject: bool = False
    top_k: int = TOP_K_DEFAULT


def default_testset() -> List[TestCase]:
    # NOTE: API English-only, nên query nên là tiếng Anh.
    # expected_ids: bạn có thể điền sau (sau khi ingest xong) để tính retrieval metrics.
    return [
        TestCase(
            name="basic_men_shoes_budget",
            query="men black sneakers under $80",
            expected_constraints={"gender": "Men", "color": "Black", "max_price": 80},
        ),
        TestCase(
            name="formal_office",
            query="women formal office outfit under $100",
            expected_constraints={"usage": "Formal", "max_price": 100},
        ),
        TestCase(
            name="color_strict",
            query="women red dress",
            expected_constraints={"color": "Red"},
        ),
        TestCase(
            name="type_tshirts",
            query="men white t-shirt",
            expected_constraints={"articleType": "Tshirts", "color": "White", "gender": "Men"},
        ),
        # Kiểm thử English-only: tiếng Việt có dấu sẽ bị từ chối
        TestCase(
            name="reject_vietnamese",
            query="Tôi muốn mua áo sơ mi trắng đi làm dưới 40 đô",
            should_reject=True,
        ),
    ]


TESTSET = default_testset()
print(f"Loaded {len(TESTSET)} test cases")
pd.DataFrame([t.__dict__ for t in TESTSET])

Loaded 5 test cases


Unnamed: 0,name,query,expected_ids,expected_constraints,should_reject,top_k
0,basic_men_shoes_budget,men black sneakers under $80,,"{'gender': 'Men', 'color': 'Black', 'max_price...",False,5
1,formal_office,women formal office outfit under $100,,"{'usage': 'Formal', 'max_price': 100}",False,5
2,color_strict,women red dress,,{'color': 'Red'},False,5
3,type_tshirts,men white t-shirt,,"{'articleType': 'Tshirts', 'color': 'White', '...",False,5
4,reject_vietnamese,Tôi muốn mua áo sơ mi trắng đi làm dưới 40 đô,,,True,5


In [5]:
# 4) Chạy chatbot hàng loạt (batch) + logging

CACHE_PATH = ARTIFACTS_DIR / "eval_cache.json"
try:
    _CACHE = json.loads(CACHE_PATH.read_text(encoding="utf-8")) if CACHE_PATH.exists() else {}
except Exception:
    _CACHE = {}


def _hash_payload(payload: dict) -> str:
    s = json.dumps(payload, sort_keys=True, ensure_ascii=False)
    return hashlib.sha256(s.encode("utf-8")).hexdigest()


def call_chat(query: str, top_k: int = 5, filters: Optional[dict] = None, messages: Optional[list] = None) -> dict:
    payload: Dict[str, Any] = {
        "query": query,
        "top_k": int(top_k),
    }
    if filters:
        payload["filters"] = filters
    if messages:
        payload["messages"] = messages

    key = _hash_payload(payload)
    if key in _CACHE:
        out = dict(_CACHE[key])
        out["_cached"] = True
        return out

    status, data, latency_ms, err = _request_json("POST", f"{API_BASE}/chat", payload)
    out = {
        "status": status,
        "latency_ms": latency_ms,
        "error": err,
        "response": data,
        "_cached": False,
    }
    _CACHE[key] = out
    return out


def run_eval(testset: List[TestCase]) -> pd.DataFrame:
    rows = []
    iterator = tqdm(testset, desc="Evaluating") if tqdm else testset
    for tc in iterator:
        res = call_chat(tc.query, top_k=tc.top_k)
        resp = res.get("response") or {}
        products = resp.get("products") or []
        sources = resp.get("sources") or []
        answer = resp.get("answer")
        rows.append(
            {
                "name": tc.name,
                "query": tc.query,
                "should_reject": bool(tc.should_reject),
                "expected_ids": tc.expected_ids,
                "expected_constraints": tc.expected_constraints,
                "status": res.get("status"),
                "latency_ms": float(res.get("latency_ms") or 0.0),
                "error": res.get("error"),
                "cached": bool(res.get("_cached")),
                "answer": answer,
                "products": products,
                "sources": sources,
                "n_products": len(products) if isinstance(products, list) else 0,
            }
        )

    df = pd.DataFrame(rows)
    # Persist cache and raw results
    CACHE_PATH.write_text(json.dumps(_CACHE, ensure_ascii=False, indent=2), encoding="utf-8")
    (ARTIFACTS_DIR / "predictions.jsonl").write_text(
        "\n".join(json.dumps(r, ensure_ascii=False) for r in rows),
        encoding="utf-8",
    )
    return df


DF = run_eval(TESTSET)
DF.head()

Unnamed: 0,name,query,should_reject,expected_ids,expected_constraints,status,latency_ms,error,cached,answer,products,sources,n_products
0,basic_men_shoes_budget,men black sneakers under $80,False,,"{'gender': 'Men', 'color': 'Black', 'max_price...",200,531.8409,,True,Quick summary: Here are the closest matches fo...,"[{'id': '24626', 'name': 'Converse Men Black R...","[{'id': '24626', 'text': 'Converse Men Black R...",5
1,formal_office,women formal office outfit under $100,False,,"{'usage': 'Formal', 'max_price': 100}",200,1042.7743,,True,Quick summary: Here are the closest matches fo...,"[{'id': '57116', 'name': 'Elle Women White Sem...","[{'id': '57116', 'text': 'Elle Women White Sem...",4
2,color_strict,women red dress,False,,{'color': 'Red'},200,43.0992,,True,Quick summary: Here are the closest matches fo...,"[{'id': '45777', 'name': 'Remanika Women Red D...","[{'id': '45777', 'text': 'Remanika Women Red D...",5
3,type_tshirts,men white t-shirt,False,,"{'articleType': 'Tshirts', 'color': 'White', '...",200,39.2038,,True,Quick summary: Here are the closest matches fo...,"[{'id': '2853', 'name': 'Mr.Men Printed White ...","[{'id': '2853', 'text': 'Mr.Men Printed White ...",5
4,reject_vietnamese,Tôi muốn mua áo sơ mi trắng đi làm dưới 40 đô,True,,,200,3.6427,,True,English only: please rephrase your request in ...,[],[],0


In [6]:
# 5) Metrics: retrieval + constraint adherence + English-only + hiệu năng

def _safe_float(x: Any) -> Optional[float]:
    try:
        if x is None:
            return None
        return float(x)
    except Exception:
        return None


def _normalize_str(x: Any) -> str:
    return ("" if x is None else str(x)).strip()


def _normalize_color(x: Any) -> str:
    s = _normalize_str(x).lower()
    s = re.sub(r"[^a-z]+", "", s)
    if s == "grey":
        return "gray"
    return s


def _normalize_article_type(x: Any) -> str:
    s = _normalize_str(x).lower()
    s = re.sub(r"[^a-z0-9]+", "", s)
    return s


def _extract_prices(products: list) -> List[float]:
    out = []
    for p in products or []:
        if not isinstance(p, dict):
            continue
        v = _safe_float(p.get("price"))
        if v is None:
            continue
        if v >= 0:
            out.append(v)
    return out


def _ids_from_products(products: list) -> List[str]:
    out: List[str] = []
    for p in products or []:
        if isinstance(p, dict) and p.get("id") is not None:
            out.append(str(p.get("id")))
    return out


def _ids_from_sources(sources: list) -> List[str]:
    out: List[str] = []
    for s in sources or []:
        if isinstance(s, dict) and s.get("id") is not None:
            out.append(str(s.get("id")))
    return out


def hit_at_k(pred_ids: List[str], gold_ids: List[str], k: int) -> float:
    if not gold_ids:
        return float("nan")
    topk = pred_ids[:k]
    return 1.0 if any(pid in set(gold_ids) for pid in topk) else 0.0


def recall_at_k(pred_ids: List[str], gold_ids: List[str], k: int) -> float:
    if not gold_ids:
        return float("nan")
    topk = pred_ids[:k]
    g = set(gold_ids)
    return len([pid for pid in topk if pid in g]) / max(1, len(g))


def mrr(pred_ids: List[str], gold_ids: List[str]) -> float:
    if not gold_ids:
        return float("nan")
    g = set(gold_ids)
    for i, pid in enumerate(pred_ids, start=1):
        if pid in g:
            return 1.0 / i
    return 0.0


def ndcg_at_k(pred_ids: List[str], gold_ids: List[str], k: int) -> float:
    if not gold_ids:
        return float("nan")
    g = set(gold_ids)

    def dcg(ids: List[str]) -> float:
        s = 0.0
        for j, pid in enumerate(ids[:k], start=1):
            rel = 1.0 if pid in g else 0.0
            s += rel / math.log2(j + 1)
        return s

    ideal = [pid for pid in gold_ids][:k]
    # If there are more gold than k, ideal DCG is k ones.
    if len(ideal) < k:
        ideal = ideal + ["__non__"] * (k - len(ideal))
    denom = dcg(ideal)
    if denom <= 0:
        return 0.0
    return dcg(pred_ids) / denom


def english_only_rejected(answer: str, products: list, response_obj: dict) -> bool:
    # /chat returns answer string; for /query it returns error field.
    a = (answer or "").lower()
    if "english only" in a:
        return True
    if isinstance(response_obj, dict) and "error" in response_obj:
        if "english" in str(response_obj.get("error") or "").lower():
            return True
    # Heuristic: reject if no products and message indicates english
    if (not products) and ("rephrase" in a and "english" in a):
        return True
    return False


def constraint_checks(products: list, constraints: Optional[Dict[str, Any]]) -> Dict[str, Any]:
    """Return a dict of constraint adherence checks.

    We evaluate on returned `products` (UI cards) because in this project the answer is derived from them.
    """
    constraints = constraints or {}
    out: Dict[str, Any] = {}

    if not products:
        out["has_products"] = False
        # If user expects constraints but there are no products, treat as fail.
        out["constraint_pass"] = False if constraints else True
        return out

    out["has_products"] = True

    # --- Color ---
    if "color" in constraints and constraints["color"] is not None:
        want = _normalize_color(constraints["color"])
        colors = [_normalize_color((p or {}).get("color")) for p in products if isinstance(p, dict)]
        # strict: all returned products must match
        out["color_all_match"] = all(c == want and c for c in colors)
    else:
        out["color_all_match"] = None

    # --- Usage ---
    if "usage" in constraints and constraints["usage"] is not None:
        want = _normalize_str(constraints["usage"]).lower()
        usages = [_normalize_str((p or {}).get("usage")).lower() for p in products if isinstance(p, dict)]
        out["usage_all_match"] = all(u == want and u for u in usages)
    else:
        out["usage_all_match"] = None

    # --- Gender ---
    if "gender" in constraints and constraints["gender"] is not None:
        want = _normalize_str(constraints["gender"]).lower()
        genders = [_normalize_str((p or {}).get("gender")).lower() for p in products if isinstance(p, dict)]
        out["gender_all_match"] = all(g == want and g for g in genders)
    else:
        out["gender_all_match"] = None

    # --- Article type ---
    if "articleType" in constraints and constraints["articleType"] is not None:
        want = _normalize_article_type(constraints["articleType"])
        ats = [_normalize_article_type((p or {}).get("subcategory") or (p or {}).get("category") or (p or {}).get("articleType")) for p in products if isinstance(p, dict)]
        # NOTE: dataset stores articleType in metadata but API card currently doesn't include it.
        # If missing, we can't reliably check.
        out["type_all_match"] = None if all(a == "" for a in ats) else all(a == want for a in ats if a)
    else:
        out["type_all_match"] = None

    # --- Budget ---
    prices = _extract_prices(products)
    min_p = _safe_float(constraints.get("min_price"))
    max_p = _safe_float(constraints.get("max_price"))
    if min_p is not None or max_p is not None:
        if not prices:
            out["budget_all_match"] = False
        else:
            ok = True
            for p in prices:
                if min_p is not None and p < min_p - 1e-9:
                    ok = False
                if max_p is not None and p > max_p + 1e-9:
                    ok = False
            out["budget_all_match"] = ok
    else:
        out["budget_all_match"] = None

    # Aggregate
    bool_checks = [v for v in [out.get("color_all_match"), out.get("usage_all_match"), out.get("gender_all_match"), out.get("budget_all_match"), out.get("type_all_match")] if isinstance(v, bool)]
    out["constraint_pass"] = all(bool_checks) if bool_checks else True
    return out


def faithfulness_heuristic(answer: str, products: list) -> Dict[str, Any]:
    """Heuristic groundedness checks.

    This project builds answer deterministically from `products` in llm_client._answer_from_products,
    so violations should be rare. Still, we check for:
    - Mentioning IDs not in products
    - Mentioning prices not in products (very heuristic)
    """
    ans = answer or ""
    pids = set(_ids_from_products(products))

    # Extract tokens like "(12345)" or " 12345 " as ids.
    mentioned = set(re.findall(r"\((\d+)\)", ans))
    extra_ids = sorted([x for x in mentioned if x not in pids])

    # Extract $numbers in answer
    money = [float(x) for x in re.findall(r"\$(\d+(?:\.\d{1,2})?)", ans)]
    prices = set(round(p, 2) for p in _extract_prices(products))
    # If answer mentions money but we have no product prices, flag.
    money_ok = True
    if money and not prices:
        money_ok = False
    else:
        for m in money:
            if round(m, 2) not in prices:
                # allow small formatting differences (integers)
                if round(m, 0) not in {round(p, 0) for p in prices}:
                    money_ok = False

    return {
        "mentioned_ids": sorted(list(mentioned)),
        "extra_ids_not_in_products": extra_ids,
        "money_mentions": money,
        "money_matches_products": money_ok,
        "faithful": (len(extra_ids) == 0) and money_ok,
    }


def pii_safety_heuristic(text: str) -> Dict[str, Any]:
    s = text or ""
    email = re.findall(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}", s)
    phone = re.findall(r"\b(?:\+?\d{1,3}[-.\s]?)?(?:\(?\d{2,4}\)?[-.\s]?)?\d{3,4}[-.\s]?\d{3,4}\b", s)
    cc = re.findall(r"\b(?:\d[ -]*?){13,19}\b", s)

    # Reduce false positives for CC: keep only sequences with >=13 digits
    cc_clean = []
    for c in cc:
        digits = re.sub(r"\D", "", c)
        if len(digits) >= 13:
            cc_clean.append(digits)

    return {
        "email_found": len(email) > 0,
        "phone_found": len(phone) > 0,
        "cc_like_found": len(cc_clean) > 0,
        "pii_violation": (len(email) > 0) or (len(cc_clean) > 0),
    }


def compute_metrics(df: pd.DataFrame, k_values: List[int] = [1, 3, 5]) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    rows = []
    for _, r in df.iterrows():
        products = r.get("products") or []
        sources = r.get("sources") or []
        answer = r.get("answer") or ""
        resp_obj = r.get("response") if "response" in df.columns else {}

        pred_ids = _ids_from_sources(sources) or _ids_from_products(products)
        gold = r.get("expected_ids") or []
        gold = [str(x) for x in gold] if isinstance(gold, list) else []

        faith = faithfulness_heuristic(answer, products)
        constraints = constraint_checks(products, r.get("expected_constraints"))
        rejected = english_only_rejected(answer, products, {})
        pii = pii_safety_heuristic(answer)

        row = {
            "name": r.get("name"),
            "status": r.get("status"),
            "latency_ms": r.get("latency_ms"),
            "n_products": r.get("n_products"),
            "rejected_english_only": rejected,
            **{f"faith_{k}": v for k, v in faith.items()},
            **{f"c_{k}": v for k, v in constraints.items()},
            **{f"pii_{k}": v for k, v in pii.items()},
        }

        for k in k_values:
            row[f"hit@{k}"] = hit_at_k(pred_ids, gold, k) if gold else float("nan")
            row[f"recall@{k}"] = recall_at_k(pred_ids, gold, k) if gold else float("nan")
            row[f"ndcg@{k}"] = ndcg_at_k(pred_ids, gold, k) if gold else float("nan")
        row["mrr"] = mrr(pred_ids, gold) if gold else float("nan")

        rows.append(row)

    mdf = pd.DataFrame(rows)

    # Summary
    summary: Dict[str, Any] = {}

    # Basic operational
    summary["n_cases"] = int(len(df))
    summary["error_rate"] = float((df["status"].fillna(0).astype(int) == 0).mean()) if len(df) else 0.0
    summary["empty_products_rate"] = float((df["n_products"].fillna(0).astype(int) == 0).mean()) if len(df) else 0.0

    lat = df["latency_ms"].astype(float) if len(df) else pd.Series([], dtype=float)
    if len(lat):
        summary["latency_p50_ms"] = float(lat.quantile(0.50))
        summary["latency_p90_ms"] = float(lat.quantile(0.90))
        summary["latency_p95_ms"] = float(lat.quantile(0.95))
        summary["latency_mean_ms"] = float(lat.mean())

    # English-only behavior
    if "should_reject" in df.columns:
        want_reject = df["should_reject"].astype(bool)
        got_reject = mdf["rejected_english_only"].astype(bool)
        if want_reject.any():
            summary["english_only_reject_accuracy"] = float((got_reject[want_reject] == True).mean())

    # Constraint
    if "c_constraint_pass" in mdf.columns:
        # Only count rows where constraints exist or has_products? We'll just average boolean where it's bool.
        vals = mdf["c_constraint_pass"].dropna()
        if len(vals):
            summary["constraint_pass_rate"] = float(vals.astype(bool).mean())

    # Faithfulness
    vals = mdf["faith_faithful"].dropna() if "faith_faithful" in mdf.columns else pd.Series([], dtype=bool)
    if len(vals):
        summary["faithfulness_pass_rate"] = float(vals.astype(bool).mean())

    # Retrieval metrics: average over rows that have gold
    gold_mask = df["expected_ids"].apply(lambda x: isinstance(x, list) and len(x) > 0)
    if gold_mask.any():
        for k in k_values:
            summary[f"hit@{k}"] = float(mdf.loc[gold_mask, f"hit@{k}"].mean())
            summary[f"recall@{k}"] = float(mdf.loc[gold_mask, f"recall@{k}"].mean())
            summary[f"ndcg@{k}"] = float(mdf.loc[gold_mask, f"ndcg@{k}"].mean())
        summary["mrr"] = float(mdf.loc[gold_mask, "mrr"].mean())

    return mdf, summary


MDF, SUMMARY = compute_metrics(DF)
print(json.dumps(SUMMARY, ensure_ascii=False, indent=2))
MDF

{
  "n_cases": 5,
  "error_rate": 0.0,
  "empty_products_rate": 0.2,
  "latency_p50_ms": 43.099200000142446,
  "latency_p90_ms": 838.4009399997012,
  "latency_p95_ms": 940.5876199998601,
  "latency_mean_ms": 332.11217999923974,
  "english_only_reject_accuracy": 1.0,
  "constraint_pass_rate": 0.8,
  "faithfulness_pass_rate": 0.8
}


Unnamed: 0,name,status,latency_ms,n_products,rejected_english_only,faith_mentioned_ids,faith_extra_ids_not_in_products,faith_money_mentions,faith_money_matches_products,faith_faithful,...,hit@1,recall@1,ndcg@1,hit@3,recall@3,ndcg@3,hit@5,recall@5,ndcg@5,mrr
0,basic_men_shoes_budget,200,531.8409,5,False,"[24626, 3585, 6344, 6652]",[],"[80.0, 45.0, 25.0, 80.0, 61.0]",True,True,...,,,,,,,,,,
1,formal_office,200,1042.7743,4,False,"[12514, 2880, 32407, 57116]",[],"[100.0, 16.0, 5.0, 16.0, 87.0]",False,False,...,,,,,,,,,,
2,color_strict,200,43.0992,5,False,"[33199, 43680, 45777, 57057]",[],"[110.0, 150.0, 38.0, 27.0]",True,True,...,,,,,,,,,,
3,type_tshirts,200,39.2038,5,False,"[23945, 2853, 8271, 8274]",[],"[140.0, 111.0, 103.0, 188.0]",True,True,...,,,,,,,,,,
4,reject_vietnamese,200,3.6427,0,True,[],[],[],True,True,...,,,,,,,,,,


In [7]:
# 6) Báo cáo + lưu artifacts

# Merge metrics back to main DF for convenience
OUT = DF.merge(MDF, on="name", how="left", suffixes=("", "_m"))

csv_path = OUTPUTS_DIR / "eval_results.csv"
OUT.to_csv(csv_path, index=False, encoding="utf-8")
(ARTIFACTS_DIR / "eval_summary.json").write_text(json.dumps(SUMMARY, ensure_ascii=False, indent=2), encoding="utf-8")

print("Saved:")
print("-", csv_path.resolve())
print("-", (ARTIFACTS_DIR / "eval_summary.json").resolve())

# Pretty display
cols_show = [
    "name",
    "status",
    "latency_ms",
    "n_products",
    "should_reject",
    "rejected_english_only",
    "c_constraint_pass",
    "faith_faithful",
]
cols_show = [c for c in cols_show if c in OUT.columns]
OUT[cols_show]


# Optional: plot latency histogram
if plt is not None and len(OUT):
    plt.figure(figsize=(7, 4))
    plt.hist(OUT["latency_ms"].astype(float), bins=20)
    plt.title("Latency histogram (ms)")
    plt.xlabel("ms")
    plt.ylabel("count")
    plt.grid(True, alpha=0.2)
    plt.show()

Saved:
- D:\Study\CS311\CS311\outputs\eval_results.csv
- D:\Study\CS311\CS311\artifacts\eval_summary.json


## Bổ sung: làm thế nào để có *gold labels* cho Retrieval metrics?

Vì dataset `styles.csv` khá lớn, cách thực tế nhất là:

1) Chạy vài query tiêu biểu.
2) Nhìn top kết quả trong `products`/`sources`.
3) Chọn 1–3 `product id` bạn xem là **đúng nhất** → điền vào `expected_ids` trong `TESTSET`.

Sau đó rerun notebook để có **Hit@K / MRR / nDCG**.

---

## (Tuỳ chọn) Metrics theo reference answer (BLEU/ROUGE/BERTScore)

Trong dự án hiện tại, câu trả lời `answer` được tạo “deterministic” từ `products` (để tránh hallucination), nên reference-answer metrics thường **không phản ánh đúng** chất lượng tìm kiếm.

Nếu bạn có bộ `reference` (ví dụ: câu trả lời mẫu do người chấm viết), bạn có thể thêm cột `reference` vào test cases và cài thêm thư viện như `sacrebleu`, `rouge_score`, `bert_score` để tính.


## C) Đánh giá Image RAG (tìm kiếm bằng ảnh)

Hệ của bạn có endpoint `POST /search/image/upload` (embed ảnh bằng OpenCLIP và truy vấn collection `products_image`).

### Tư tưởng đánh giá “tối ưu” cho Image RAG

1) **Self-retrieval**: lấy một ảnh từ dataset làm query → kết quả top-K có chứa **chính id của ảnh đó**.
   - Đây là dạng *gold label* rẻ nhất vì không cần gán nhãn thủ công.

2) **Robustness**: áp dụng vài biến đổi nhẹ (resize/crop/brightness) lên ảnh query → vẫn hit được id.

3) **Hiệu năng**: latency p50/p95 cho endpoint ảnh.

> Điều kiện: bạn cần ingest ảnh trước (chạy `ingest_images.py` hoặc gọi `POST /ingest_image`). Nếu chưa ingest, endpoint ảnh có thể trả kết quả rỗng hoặc lỗi.


In [8]:
# C.1) Utilities gọi endpoint ảnh + tạo testset tự động

import io
from dataclasses import dataclass

try:
    from PIL import Image, ImageEnhance
except Exception:
    Image = None
    ImageEnhance = None


IMAGE_DIR = Path(os.getenv("EVAL_IMAGE_DIR", "datasets/archive/fashion-dataset/images"))
IMAGE_TOP_K = int(os.getenv("EVAL_IMAGE_TOP_K", "5"))
IMAGE_N_SAMPLES = int(os.getenv("EVAL_IMAGE_N_SAMPLES", "20"))


@dataclass
class ImageTestCase:
    name: str
    image_path: Path
    expected_id: Optional[str] = None
    top_k: int = IMAGE_TOP_K


def _list_image_files(img_dir: Path) -> List[Path]:
    if not img_dir.exists():
        return []
    exts = {".jpg", ".jpeg", ".png"}
    files = [p for p in img_dir.glob("*") if p.suffix.lower() in exts]
    files.sort(key=lambda p: p.name)
    return files


def _expected_id_from_filename(p: Path) -> Optional[str]:
    # dataset uses <id>.jpg
    m = re.match(r"^(\d+)", p.stem)
    return m.group(1) if m else None


def call_image_search_file(image_path: Path, top_k: int = 5) -> dict:
    """Call POST /search/image/upload with a local file.

    Returns dict with: status, latency_ms, error, response
    """
    url = f"{API_BASE}/search/image/upload"

    t0 = time.perf_counter()
    try:
        with image_path.open("rb") as f:
            files = {"file": (image_path.name, f, "application/octet-stream")}
            resp = requests.post(url, files=files, params={"top_k": int(top_k)}, timeout=TIMEOUT_S)
        latency_ms = (time.perf_counter() - t0) * 1000
        try:
            data = resp.json()
        except Exception:
            data = {"_raw": resp.text}

        err = None
        if not (200 <= resp.status_code < 300):
            err = f"HTTP {resp.status_code}: {data}"

        return {"status": resp.status_code, "latency_ms": latency_ms, "error": err, "response": data}
    except Exception as e:
        latency_ms = (time.perf_counter() - t0) * 1000
        return {"status": 0, "latency_ms": latency_ms, "error": f"{type(e).__name__}: {e}", "response": {}}


def call_image_search_bytes(image_bytes: bytes, filename: str = "query.jpg", top_k: int = 5) -> dict:
    """Call POST /search/image/upload with bytes (for robustness variants)."""
    url = f"{API_BASE}/search/image/upload"

    t0 = time.perf_counter()
    try:
        bio = io.BytesIO(image_bytes)
        files = {"file": (filename, bio, "application/octet-stream")}
        resp = requests.post(url, files=files, params={"top_k": int(top_k)}, timeout=TIMEOUT_S)
        latency_ms = (time.perf_counter() - t0) * 1000
        try:
            data = resp.json()
        except Exception:
            data = {"_raw": resp.text}

        err = None
        if not (200 <= resp.status_code < 300):
            err = f"HTTP {resp.status_code}: {data}"

        return {"status": resp.status_code, "latency_ms": latency_ms, "error": err, "response": data}
    except Exception as e:
        latency_ms = (time.perf_counter() - t0) * 1000
        return {"status": 0, "latency_ms": latency_ms, "error": f"{type(e).__name__}: {e}", "response": {}}


def build_image_testset(n_samples: int = 20) -> List[ImageTestCase]:
    files = _list_image_files(IMAGE_DIR)
    if not files:
        return []

    # Deterministic sample: first N files (stable across runs)
    pick = files[: max(1, min(n_samples, len(files)))]
    out = []
    for p in pick:
        out.append(
            ImageTestCase(
                name=f"img_{p.stem}",
                image_path=p,
                expected_id=_expected_id_from_filename(p),
                top_k=IMAGE_TOP_K,
            )
        )
    return out


IMAGE_TESTSET = build_image_testset(IMAGE_N_SAMPLES)
print(f"IMAGE_DIR={IMAGE_DIR} exists={IMAGE_DIR.exists()} | cases={len(IMAGE_TESTSET)}")
if IMAGE_TESTSET[:3]:
    display(pd.DataFrame([{"name": t.name, "image": str(t.image_path), "expected_id": t.expected_id, "top_k": t.top_k} for t in IMAGE_TESTSET[:3]]))
else:
    print("[HINT] Không tìm thấy ảnh. Hãy kiểm tra EVAL_IMAGE_DIR hoặc dataset path.")

IMAGE_DIR=datasets\archive\fashion-dataset\images exists=True | cases=20


Unnamed: 0,name,image,expected_id,top_k
0,img_10000,datasets\archive\fashion-dataset\images\10000.jpg,10000,5
1,img_10001,datasets\archive\fashion-dataset\images\10001.jpg,10001,5
2,img_10002,datasets\archive\fashion-dataset\images\10002.jpg,10002,5


In [9]:
# C.2) Robustness variants (tuỳ chọn) + runner


def make_variants(image_path: Path) -> Dict[str, bytes]:
    """Return a dict variant_name -> encoded image bytes.

    If PIL is unavailable, returns only original bytes.
    """
    raw = image_path.read_bytes()
    variants: Dict[str, bytes] = {"orig": raw}

    if Image is None:
        return variants

    try:
        img = Image.open(io.BytesIO(raw)).convert("RGB")
    except Exception:
        return variants

    # NOTE: avoid referencing Image.Image in type annotations because this notebook
    # intentionally sets Image=None when Pillow isn't installed; that confuses some linters.
    def _to_jpeg_bytes(im, quality: int = 90) -> bytes:
        b = io.BytesIO()
        im.save(b, format="JPEG", quality=quality)
        return b.getvalue()

    # Resize smaller (simulate thumbnail)
    try:
        im = img.copy()
        im.thumbnail((224, 224))
        variants["thumb_224"] = _to_jpeg_bytes(im)
    except Exception:
        pass

    # Center crop square
    try:
        im = img.copy()
        w, h = im.size
        side = min(w, h)
        left = (w - side) // 2
        top = (h - side) // 2
        im = im.crop((left, top, left + side, top + side)).resize((224, 224))
        variants["center_crop_224"] = _to_jpeg_bytes(im)
    except Exception:
        pass

    # Brightness up/down
    if ImageEnhance is not None:
        try:
            variants["bright_1p2"] = _to_jpeg_bytes(ImageEnhance.Brightness(img).enhance(1.2))
        except Exception:
            pass
        try:
            variants["bright_0p8"] = _to_jpeg_bytes(ImageEnhance.Brightness(img).enhance(0.8))
        except Exception:
            pass

    # Horizontal flip
    try:
        variants["flip_lr"] = _to_jpeg_bytes(img.transpose(Image.FLIP_LEFT_RIGHT))
    except Exception:
        pass

    return variants


def _ids_from_image_results(resp: dict) -> List[str]:
    results = (resp or {}).get("results") or []
    out: List[str] = []
    for r in results:
        if isinstance(r, dict) and r.get("id") is not None:
            out.append(str(r.get("id")))
    return out


def run_image_eval(testset: List[ImageTestCase], k_values: List[int] = [1, 3, 5], with_variants: bool = True) -> pd.DataFrame:
    rows = []
    iterator = tqdm(testset, desc="ImageEval") if tqdm else testset

    for tc in iterator:
        if not tc.image_path.exists():
            rows.append({"name": tc.name, "status": 0, "error": "missing_file", "image": str(tc.image_path)})
            continue

        expected = tc.expected_id
        gold = [expected] if expected else []

        if with_variants:
            var_map = make_variants(tc.image_path)
        else:
            var_map = {"orig": tc.image_path.read_bytes()}

        for vname, vbytes in var_map.items():
            res = call_image_search_bytes(vbytes, filename=f"{tc.image_path.stem}_{vname}.jpg", top_k=tc.top_k)
            pred_ids = _ids_from_image_results(res.get("response") or {})

            row = {
                "name": tc.name,
                "variant": vname,
                "image": str(tc.image_path),
                "expected_id": expected,
                "status": res.get("status"),
                "latency_ms": float(res.get("latency_ms") or 0.0),
                "error": res.get("error"),
                "pred_ids": pred_ids,
                "n_results": len(pred_ids),
            }
            for k in k_values:
                row[f"hit@{k}"] = hit_at_k(pred_ids, gold, k) if gold else float("nan")
                row[f"recall@{k}"] = recall_at_k(pred_ids, gold, k) if gold else float("nan")
                row[f"ndcg@{k}"] = ndcg_at_k(pred_ids, gold, k) if gold else float("nan")
            row["mrr"] = mrr(pred_ids, gold) if gold else float("nan")

            rows.append(row)

    return pd.DataFrame(rows)


IMG_DF = run_image_eval(IMAGE_TESTSET, with_variants=True)
IMG_DF.head()

Unnamed: 0,name,variant,image,expected_id,status,latency_ms,error,pred_ids,n_results,hit@1,recall@1,ndcg@1,hit@3,recall@3,ndcg@3,hit@5,recall@5,ndcg@5,mrr
0,img_10000,orig,datasets\archive\fashion-dataset\images\10000.jpg,10000,0,30014.203,ReadTimeout: HTTPConnectionPool(host='127.0.0....,[],0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1,img_10001,orig,datasets\archive\fashion-dataset\images\10001.jpg,10001,0,30004.3613,ReadTimeout: HTTPConnectionPool(host='127.0.0....,[],0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2,img_10002,orig,datasets\archive\fashion-dataset\images\10002.jpg,10002,200,1152.1375,,"[10002, 41000, 41001, 38503, 38938]",5,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0
3,img_10003,orig,datasets\archive\fashion-dataset\images\10003.jpg,10003,200,150.0781,,"[10003, 22627, 22579, 22600, 17923]",5,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0
4,img_10004,orig,datasets\archive\fashion-dataset\images\10004.jpg,10004,200,220.5692,,"[10004, 38566, 32648, 14024, 38568]",5,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0


In [10]:
# C.3) Tổng hợp metrics cho Image RAG + xuất artifacts


def summarize_image_eval(img_df: pd.DataFrame, k_values: List[int] = [1, 3, 5]) -> Dict[str, Any]:
    if img_df is None or len(img_df) == 0:
        return {"n_rows": 0}

    out: Dict[str, Any] = {
        "n_rows": int(len(img_df)),
        "n_cases": int(img_df["name"].nunique()) if "name" in img_df.columns else None,
        "error_rate": float((img_df["status"].fillna(0).astype(int) == 0).mean()) if "status" in img_df.columns else None,
        "empty_results_rate": float((img_df["n_results"].fillna(0).astype(int) == 0).mean()) if "n_results" in img_df.columns else None,
    }

    if "latency_ms" in img_df.columns:
        lat = img_df["latency_ms"].astype(float)
        out.update(
            {
                "latency_p50_ms": float(lat.quantile(0.50)),
                "latency_p90_ms": float(lat.quantile(0.90)),
                "latency_p95_ms": float(lat.quantile(0.95)),
                "latency_mean_ms": float(lat.mean()),
            }
        )

    for k in k_values:
        col = f"hit@{k}"
        if col in img_df.columns:
            out[col] = float(img_df[col].mean())

    if "mrr" in img_df.columns:
        out["mrr"] = float(img_df["mrr"].mean())

    # Optional: per-variant breakdown
    if "variant" in img_df.columns:
        per_variant = (
            img_df.groupby("variant")[[c for c in ["mrr"] + [f"hit@{k}" for k in k_values] if c in img_df.columns]]
            .mean(numeric_only=True)
            .reset_index()
        )
        out["per_variant"] = per_variant.to_dict(orient="records")

    return out


IMG_SUMMARY = summarize_image_eval(IMG_DF)
print(json.dumps(IMG_SUMMARY, ensure_ascii=False, indent=2))

img_csv = OUTPUTS_DIR / "eval_image_results.csv"
IMG_DF.to_csv(img_csv, index=False, encoding="utf-8")
(ARTIFACTS_DIR / "image_eval_summary.json").write_text(json.dumps(IMG_SUMMARY, ensure_ascii=False, indent=2), encoding="utf-8")

print("Saved:")
print("-", img_csv.resolve())
print("-", (ARTIFACTS_DIR / "image_eval_summary.json").resolve())

# Optional plot
if plt is not None and len(IMG_DF) and "latency_ms" in IMG_DF.columns:
    plt.figure(figsize=(7, 4))
    plt.hist(IMG_DF["latency_ms"].astype(float), bins=20)
    plt.title("Image search latency histogram (ms)")
    plt.xlabel("ms")
    plt.ylabel("count")
    plt.grid(True, alpha=0.2)
    plt.show()

{
  "n_rows": 20,
  "n_cases": 20,
  "error_rate": 0.1,
  "empty_results_rate": 0.1,
  "latency_p50_ms": 150.89359999910812,
  "latency_p90_ms": 4037.3598800012087,
  "latency_p95_ms": 30004.85338500148,
  "latency_mean_ms": 3186.539654999433,
  "hit@1": 0.9,
  "hit@3": 0.9,
  "hit@5": 0.9,
  "mrr": 0.9,
  "per_variant": [
    {
      "variant": "orig",
      "mrr": 0.9,
      "hit@1": 0.9,
      "hit@3": 0.9,
      "hit@5": 0.9
    }
  ]
}
Saved:
- D:\Study\CS311\CS311\outputs\eval_image_results.csv
- D:\Study\CS311\CS311\artifacts\image_eval_summary.json


## Text LLM Evaluation (rubric + optional judge + A/B win-rate)

Phần này tập trung đánh giá **chất lượng câu trả lời dạng text**.

Vì hệ hiện tại tạo `answer` từ `products` (deterministic), nên đánh giá “tối ưu” cho text là:

1) **Rubric scoring (rule-based)**: rẻ, ổn định, chạy được trong CI.
2) **LLM-as-a-judge (optional)**: chỉ bật khi bạn cấu hình `LLM_*` để chấm các tiêu chí mềm (usefulness, clarity…).
3) **A/B win-rate**: so 2 phiên bản API (A và B) theo tỷ lệ thắng (pairwise comparison).

### Khi nào dùng A/B?
- Khi bạn thay đổi embedding model, threshold lọc, prompt, hoặc bật LLM generative.
- A và B nên là **hai base URL khác nhau** (vd 2 container / 2 nhánh config).

> Cấu hình: đặt `API_BASE_A` và `API_BASE_B` trong environment. Nếu không đặt, sẽ dùng `API_BASE` hiện tại.


In [11]:
# T.1) Rule-based rubric scoring

# We keep this section dependency-light: only stdlib + pandas.

API_BASE_A = os.getenv("API_BASE_A", API_BASE).rstrip("/")
API_BASE_B = os.getenv("API_BASE_B", "").rstrip("/")


def _extract_pick_lines(answer: str) -> List[str]:
    """Extract lines that look like enumerated picks: '1) ...'"""
    lines = (answer or "").splitlines()
    picks = []
    for ln in lines:
        if re.match(r"^\s*\d+\)\s+", ln):
            picks.append(ln.strip())
    return picks


def rubric_score_rule_based(
    query: str,
    answer: str,
    products: list,
    expected_constraints: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
    """Return rubric scores (0..2) using deterministic rules.

    Dimensions (0..2):
    - faithfulness: consistent with products (uses existing heuristic)
    - format: has a 'Top picks' section with enumerated items
    - completeness: mentions 2-4 picks and includes id for each
    - constraint_support: if constraints exist, returned products satisfy them
    - conciseness: not too long / not empty

    Note: for 'constraint_support' we reuse constraint_checks() which checks returned products.
    """
    ans = answer or ""

    # Faithfulness
    faith = faithfulness_heuristic(ans, products)
    faithfulness = 2 if faith.get("faithful") else 0

    # Format
    picks = _extract_pick_lines(ans)
    has_top_picks_header = bool(re.search(r"(?im)^\s*top\s+picks\s*:\s*$", ans))
    if has_top_picks_header and len(picks) >= 2:
        format_score = 2
    elif len(picks) >= 1:
        format_score = 1
    else:
        format_score = 0

    # Completeness
    # We want 2-4 unique ids mentioned as '(123)' OR ' (123)'
    mentioned_ids = set(re.findall(r"\((\d+)\)", ans))
    prod_ids = set(_ids_from_products(products))
    mentioned_valid = [x for x in mentioned_ids if x in prod_ids]
    if 2 <= len(picks) <= 4 and len(mentioned_valid) >= min(2, len(prod_ids)):
        completeness = 2
    elif len(picks) >= 1:
        completeness = 1
    else:
        completeness = 0

    # Constraint support
    c = constraint_checks(products, expected_constraints)
    constraint_support = 2 if c.get("constraint_pass") else (1 if c.get("has_products") else 0)

    # Conciseness
    n_chars = len(ans.strip())
    if n_chars == 0:
        conciseness = 0
    elif n_chars <= 1200:
        conciseness = 2
    else:
        conciseness = 1

    total = faithfulness + format_score + completeness + constraint_support + conciseness

    return {
        "rubric_faithfulness": faithfulness,
        "rubric_format": format_score,
        "rubric_completeness": completeness,
        "rubric_constraint": constraint_support,
        "rubric_conciseness": conciseness,
        "rubric_total": total,
        # helpful debug fields
        "rubric_pick_count": len(picks),
        "rubric_answer_chars": n_chars,
    }


def score_df_rule_based(df: pd.DataFrame) -> pd.DataFrame:
    rows = []
    for _, r in df.iterrows():
        rs = rubric_score_rule_based(
            query=r.get("query") or "",
            answer=r.get("answer") or "",
            products=r.get("products") or [],
            expected_constraints=r.get("expected_constraints"),
        )
        rows.append({"name": r.get("name"), **rs})
    return pd.DataFrame(rows)


RUBRIC_DF = score_df_rule_based(DF)
display(RUBRIC_DF)
print("Rule-based rubric avg:")
print(RUBRIC_DF[[c for c in RUBRIC_DF.columns if c.startswith('rubric_')]].mean(numeric_only=True))

Unnamed: 0,name,rubric_faithfulness,rubric_format,rubric_completeness,rubric_constraint,rubric_conciseness,rubric_total,rubric_pick_count,rubric_answer_chars
0,basic_men_shoes_budget,2,2,2,2,2,10,4,594
1,formal_office,0,2,2,2,2,8,4,586
2,color_strict,2,2,2,2,2,10,4,543
3,type_tshirts,2,2,2,1,2,9,4,561
4,reject_vietnamese,2,0,0,2,2,6,0,54


Rule-based rubric avg:
rubric_faithfulness      1.6
rubric_format            1.6
rubric_completeness      1.6
rubric_constraint        1.8
rubric_conciseness       2.0
rubric_total             8.6
rubric_pick_count        3.2
rubric_answer_chars    467.6
dtype: float64


In [12]:
# T.2) Optional LLM-as-a-judge (bật khi có LLM_*)

# We support OpenAI-compatible endpoints via the `openai` Python package.
# The project already depends on `openai` in requirements.txt.

LLM_BASE_URL = (os.getenv("LLM_BASE_URL", "") or "").strip()
LLM_API_KEY = (os.getenv("LLM_API_KEY", "") or "").strip()
LLM_MODEL = (os.getenv("LLM_MODEL", "") or "").strip()


def _llm_judge_enabled() -> bool:
    return bool(LLM_API_KEY and LLM_MODEL)


def _get_openai_client_for_judge():
    from openai import OpenAI

    base_url = LLM_BASE_URL.strip() or "https://api.openai.com/v1"
    return OpenAI(base_url=base_url, api_key=LLM_API_KEY)


def llm_judge_pairwise(
    query: str,
    a: Dict[str, Any],
    b: Dict[str, Any],
    max_tokens: int = 400,
) -> Dict[str, Any]:
    """Pairwise judge between system A and B.

    Inputs a/b should contain:
    - answer: str
    - products: list[dict]

    Returns dict with winner in {"A","B","TIE"} and per-dimension scores.
    """
    if not _llm_judge_enabled():
        return {"enabled": False, "winner": "TIE", "reason": "LLM judge disabled (missing LLM_API_KEY/LLM_MODEL)"}

    # Keep products slim to reduce prompt size and avoid leaking irrelevant fields.
    def slim_products(ps: list) -> list:
        out = []
        for p in ps or []:
            if not isinstance(p, dict):
                continue
            out.append(
                {
                    "id": p.get("id"),
                    "name": p.get("name"),
                    "price": p.get("price"),
                    "color": p.get("color"),
                    "gender": p.get("gender"),
                    "category": p.get("category"),
                    "subcategory": p.get("subcategory"),
                    "usage": p.get("usage"),
                }
            )
        return out

    payload = {
        "query": query,
        "system_A": {"answer": a.get("answer") or "", "products": slim_products(a.get("products") or [])},
        "system_B": {"answer": b.get("answer") or "", "products": slim_products(b.get("products") or [])},
        "rubric": {
            "faithfulness": "Answer must not contradict or invent facts not present in its own product list.",
            "usefulness": "Clear, actionable, and matches the shopping intent.",
            "format": "2–4 picks with id + short reason.",
        },
    }

    prompt = (
        "You are a strict evaluator for a shopping assistant.\n"
        "You will compare System A vs System B for the same user query.\n"
        "CRITICAL: Each system has its own PRODUCT LIST. Treat each list as authoritative for that system.\n"
        "Penalize hallucinations: mentioning prices/colors/ids not in that system's product list.\n"
        "Return ONLY valid JSON with this schema:\n"
        "{\n"
        "  \"winner\": \"A\"|\"B\"|\"TIE\",\n"
        "  \"scores\": {\"A\": {\"faithfulness\":0|1|2,\"usefulness\":0|1|2,\"format\":0|1|2},\n"
        "             \"B\": {\"faithfulness\":0|1|2,\"usefulness\":0|1|2,\"format\":0|1|2}},\n"
        "  \"reason\": \"short explanation\"\n"
        "}\n\n"
        f"INPUT:\n{json.dumps(payload, ensure_ascii=False)}"
    )

    client = _get_openai_client_for_judge()
    try:
        resp = client.chat.completions.create(
            model=LLM_MODEL,
            messages=[{"role": "user", "content": prompt}],
            temperature=0,
            max_tokens=max_tokens,
        )
        txt = (resp.choices[0].message.content or "").strip()
        # Best-effort parse
        data = json.loads(txt)
        data["enabled"] = True
        return data
    except Exception as e:
        return {"enabled": True, "winner": "TIE", "reason": f"Judge error: {type(e).__name__}: {e}"}


print({"llm_judge_enabled": _llm_judge_enabled(), "LLM_BASE_URL": LLM_BASE_URL or "(default)", "LLM_MODEL": LLM_MODEL or "(unset)"})

{'llm_judge_enabled': True, 'LLM_BASE_URL': 'https://router.huggingface.co/v1', 'LLM_MODEL': 'meta-llama/Llama-3.1-8B-Instruct'}


In [13]:
# T.3) A/B evaluation + win-rate report


def call_chat_against_base(api_base: str, query: str, top_k: int = 5, filters: Optional[dict] = None, messages: Optional[list] = None) -> dict:
    api_base = (api_base or "").rstrip("/")
    if not api_base:
        return {"status": 0, "latency_ms": 0.0, "error": "missing api_base", "response": {}}

    payload: Dict[str, Any] = {"query": query, "top_k": int(top_k)}
    if filters:
        payload["filters"] = filters
    if messages:
        payload["messages"] = messages

    # Reuse _request_json logic, but with explicit base
    status, data, latency_ms, err = _request_json("POST", f"{api_base}/chat", payload)
    return {"status": status, "latency_ms": latency_ms, "error": err, "response": data}


def run_eval_on_base(api_base: str, testset: List[TestCase]) -> pd.DataFrame:
    rows = []
    iterator = tqdm(testset, desc=f"Eval {api_base}") if tqdm else testset
    for tc in iterator:
        res = call_chat_against_base(api_base, tc.query, top_k=tc.top_k)
        resp = res.get("response") or {}
        rows.append(
            {
                "name": tc.name,
                "query": tc.query,
                "status": res.get("status"),
                "latency_ms": float(res.get("latency_ms") or 0.0),
                "error": res.get("error"),
                "answer": resp.get("answer"),
                "products": resp.get("products") or [],
                "sources": resp.get("sources") or [],
                "expected_constraints": tc.expected_constraints,
            }
        )
    return pd.DataFrame(rows)


def ab_winrate_rule_based(df_a: pd.DataFrame, df_b: pd.DataFrame) -> pd.DataFrame:
    """Compare A vs B using rule-based rubric_total."""
    a_sc = score_df_rule_based(df_a).set_index("name")
    b_sc = score_df_rule_based(df_b).set_index("name")

    names = sorted(set(a_sc.index) & set(b_sc.index))
    rows = []
    for n in names:
        a = a_sc.loc[n].to_dict()
        b = b_sc.loc[n].to_dict()
        a_total = float(a.get("rubric_total") or 0)
        b_total = float(b.get("rubric_total") or 0)
        if a_total > b_total:
            winner = "A"
        elif b_total > a_total:
            winner = "B"
        else:
            winner = "TIE"
        rows.append({"name": n, "winner_rule": winner, "A_total": a_total, "B_total": b_total})
    return pd.DataFrame(rows)


def ab_winrate_llm_judge(df_a: pd.DataFrame, df_b: pd.DataFrame, max_cases: int = 30) -> pd.DataFrame:
    """Compare A vs B using optional LLM judge."""
    names = sorted(set(df_a["name"]) & set(df_b["name"]))
    names = names[: max_cases]

    rows = []
    for n in (tqdm(names, desc="LLM-judge A/B") if tqdm else names):
        ra = df_a[df_a["name"] == n].iloc[0].to_dict()
        rb = df_b[df_b["name"] == n].iloc[0].to_dict()

        out = llm_judge_pairwise(
            query=ra.get("query") or "",
            a={"answer": ra.get("answer"), "products": ra.get("products")},
            b={"answer": rb.get("answer"), "products": rb.get("products")},
        )

        rows.append({"name": n, **out})

    return pd.DataFrame(rows)


def summarize_winrate(win_df: pd.DataFrame, winner_col: str) -> Dict[str, Any]:
    if win_df is None or len(win_df) == 0:
        return {"n": 0}
    counts = win_df[winner_col].value_counts(dropna=False).to_dict()
    n = int(len(win_df))
    return {
        "n": n,
        "A_wins": int(counts.get("A", 0)),
        "B_wins": int(counts.get("B", 0)),
        "ties": int(counts.get("TIE", 0)),
        "A_win_rate": float(counts.get("A", 0) / max(1, n)),
        "B_win_rate": float(counts.get("B", 0) / max(1, n)),
    }


print({"API_BASE_A": API_BASE_A, "API_BASE_B": API_BASE_B or "(not set)"})

# Only run A/B if API_BASE_B is provided
if API_BASE_B:
    DF_A = run_eval_on_base(API_BASE_A, TESTSET)
    DF_B = run_eval_on_base(API_BASE_B, TESTSET)

    WIN_RULE = ab_winrate_rule_based(DF_A, DF_B)
    print("Rule-based win-rate summary:")
    print(json.dumps(summarize_winrate(WIN_RULE, "winner_rule"), ensure_ascii=False, indent=2))
    display(WIN_RULE)

    # Optional LLM judge win-rate
    if _llm_judge_enabled():
        WIN_JUDGE = ab_winrate_llm_judge(DF_A, DF_B, max_cases=len(TESTSET))
        # Normalize winner field if missing
        if "winner" in WIN_JUDGE.columns:
            WIN_JUDGE["winner"] = WIN_JUDGE["winner"].fillna("TIE")
        print("LLM-judge win-rate summary:")
        print(json.dumps(summarize_winrate(WIN_JUDGE, "winner"), ensure_ascii=False, indent=2))
        display(WIN_JUDGE[[c for c in ["name", "winner", "reason"] if c in WIN_JUDGE.columns]])

        # Save judge artifacts
        (ARTIFACTS_DIR / "ab_llm_judge.jsonl").write_text(
            "\n".join(json.dumps(r, ensure_ascii=False) for r in WIN_JUDGE.to_dict(orient="records")),
            encoding="utf-8",
        )

    # Save A/B artifacts
    (ARTIFACTS_DIR / "ab_rule_winrate.csv").write_text(WIN_RULE.to_csv(index=False), encoding="utf-8")
    DF_A.to_csv(OUTPUTS_DIR / "eval_text_A.csv", index=False, encoding="utf-8")
    DF_B.to_csv(OUTPUTS_DIR / "eval_text_B.csv", index=False, encoding="utf-8")

    print("Saved A/B artifacts to outputs/ and artifacts/")
else:
    print(
        "A/B is disabled because API_BASE_B is not set.\n"
        "To enable A/B, set environment variables:\n"
        "- API_BASE_A=http://127.0.0.1:8081 (or your A server)\n"
        "- API_BASE_B=http://127.0.0.1:8082 (or your B server)\n"
    )

{'API_BASE_A': 'http://127.0.0.1:8081', 'API_BASE_B': '(not set)'}
A/B is disabled because API_BASE_B is not set.
To enable A/B, set environment variables:
- API_BASE_A=http://127.0.0.1:8081 (or your A server)
- API_BASE_B=http://127.0.0.1:8082 (or your B server)

