# RAGAS Evaluation Notebook (Clean)

This notebook evaluates **baseline vs experiment** RAG pipelines using RAGAS and saves **summary/detail** outputs per run.

In [7]:
# %pip install datasets

In [8]:
# %pip install ragas

In [9]:
# import sys, subprocess, textwrap

# def sh(cmd):
#     print(">", cmd)
#     r = subprocess.run(cmd, shell=True, capture_output=True, text=True)
#     print(r.stdout)
#     if r.stderr.strip():
#         print("[stderr]")
#         print(r.stderr)

# print("python:", sys.executable)
# print("version:", sys.version)

# # 현재 패키지 상태 확인
# sh("python -c \"import numpy; print('numpy', numpy.__version__)\"")
# sh("python -c \"import pyarrow; print('pyarrow', pyarrow.__version__)\"")
# sh("python -c \"import datasets; print('datasets', datasets.__version__)\"")

In [10]:
# import sys, subprocess

# def pip(cmd):
#     print(">", cmd)
#     r = subprocess.run([sys.executable, "-m", "pip"] + cmd.split(), capture_output=True, text=True)
#     print(r.stdout)
#     if r.stderr.strip():
#         print("[stderr]")
#         print(r.stderr)

# # 1) 제거
# pip("uninstall -y pyarrow datasets numpy")

# # 2) 재설치: numpy<2 + 최신 pyarrow + datasets(너가 쓰던 버전)
# pip("install numpy<2 pyarrow>=14 datasets==2.19.2")

In [11]:
# ---- RAGAS metrics: version-tolerant loader ----
def build_metrics():
    # A안: embeddings 의존 가능성이 큰 AnswerRelevancy는 빼고 "완주"부터
    # 1) 함수형 metric
    try:
        from ragas.metrics import context_precision, context_recall, faithfulness
        return [context_precision, context_recall, faithfulness]
    except Exception:
        pass

    # 2) 클래스형 metric
    try:
        from ragas.metrics import ContextPrecision, ContextRecall, Faithfulness
        return [ContextPrecision(), ContextRecall(), Faithfulness()]
    except Exception:
        pass

    # 3) fallback 탐색
    import ragas.metrics as m
    wanted = ["ContextPrecision", "ContextRecall", "Faithfulness"]
    found = []
    for name in wanted:
        if hasattr(m, name):
            found.append(getattr(m, name)())
    if found:
        return found

    raise ImportError(
        "RAGAS metrics import failed for A-plan (without AnswerRelevancy). "
        "Paste `pip show ragas` and `python -c \"import ragas; print(ragas.__version__)\"`."
    )

METRICS = build_metrics()
print("✅ METRICS:", [getattr(x, '__name__', x.__class__.__name__) for x in METRICS])


  from .autonotebook import tqdm as notebook_tqdm


✅ METRICS: ['ContextPrecision', 'ContextRecall', 'Faithfulness']


  from ragas.metrics import context_precision, context_recall, faithfulness
  from ragas.metrics import context_precision, context_recall, faithfulness
  from ragas.metrics import context_precision, context_recall, faithfulness


In [12]:
import ragas
print("ragas version:", getattr(ragas, "__version__", "unknown"))

ragas version: 0.4.3


In [13]:
from ragas.metrics import context_precision, context_recall, faithfulness, answer_relevancy

METRICS = [context_precision, context_recall, faithfulness, answer_relevancy]
print("✅ METRICS:", [getattr(m, "__name__", m.__class__.__name__) for m in METRICS])

✅ METRICS: ['ContextPrecision', 'ContextRecall', 'Faithfulness', 'AnswerRelevancy']


  from ragas.metrics import context_precision, context_recall, faithfulness, answer_relevancy
  from ragas.metrics import context_precision, context_recall, faithfulness, answer_relevancy
  from ragas.metrics import context_precision, context_recall, faithfulness, answer_relevancy
  from ragas.metrics import context_precision, context_recall, faithfulness, answer_relevancy


## 0) Environment & Paths

In [14]:
# ============================================================
# PATH CONFIG (only this cell is modified)
# ============================================================
import json
import pandas as pd
import sys, importlib
from pathlib import Path
from dotenv import load_dotenv

# ✅ 프로젝트 루트 (새 경로)
PROJECT_ROOT = Path(r"C:\ai\source\chatbot_app")

# ✅ 모듈 경로 (원래 쓰던 구조 그대로)
MODULE_DIR = PROJECT_ROOT / "modules"

# ✅ 환경변수
ENV_PATH = PROJECT_ROOT / ".env"

# ✅ 결과 저장 루트
RUNS_DIR = PROJECT_ROOT / "results" / "ragas_runs"

# ⭕️ 여기서 어떤 테스트셋 쓸지 네가 직접 선택
# TESTSET_PATH = PROJECT_ROOT / "ragas_testset_single.jsonl"
# TESTSET_PATH = PROJECT_ROOT / "ragas_testset_v1_from_docx.jsonl"
TESTSET_PATH = PROJECT_ROOT / "ragas_testset_10_selected.jsonl"

# ------------------------------------------------------------
# setup
# ------------------------------------------------------------
sys.path.insert(0, str(PROJECT_ROOT))
sys.path.insert(0, str(MODULE_DIR))
importlib.invalidate_caches()

if ENV_PATH.exists():
    load_dotenv(ENV_PATH)

RUNS_DIR.mkdir(parents=True, exist_ok=True)

print("PROJECT_ROOT:", PROJECT_ROOT)
print("TESTSET_PATH:", TESTSET_PATH)
print("exists:", TESTSET_PATH.exists())

PROJECT_ROOT: C:\ai\source\chatbot_app
TESTSET_PATH: C:\ai\source\chatbot_app\ragas_testset_10_selected.jsonl
exists: True


## 1) Load testset (JSONL)

In [16]:
TESTSET_JSONL = PROJECT_ROOT / "ragas_testset_10_selected.jsonl"  # change if needed
assert TESTSET_JSONL.exists(), f"❌ JSONL not found: {TESTSET_JSONL}"

rows = []
with open(TESTSET_JSONL, "r", encoding="utf-8") as f:
    for line in f:
        line = line.strip()
        if not line:
            continue
        rows.append(json.loads(line))

print("✅ rows:", len(rows))
print("✅ keys example:", rows[0].keys())
pd.DataFrame(rows[:3])


✅ rows: 10
✅ keys example: dict_keys(['question_id', 'question', 'ground_truth', 'contexts', 'meta'])


Unnamed: 0,question_id,question,ground_truth,contexts,meta
0,q010,조정에서 합의했는데 집주인이 안 지켜요. 이거 강제할 수 있나요?,"네, 강제할 수 있습니다.\n주택임대차보호법 제27조에 따르면,\n 제26조제4항 ...","[주택임대차보호법 제27조 제1항, 주택임대차보호법 시행령 제34조, 주택임대차보호...","{'source': 'RAGAS 학습용 질문.docx', 'version': 'v1..."
1,q016,"임대인이 보증금을 안 돌려줘서 제가 일부러 전입신고를 유지하고 있는데, 다른 집으로...","다른 집으로 전입신고를 하면,\n 기존 주택에 대한 대항력을 상실하게 되어 임차인으...",[주택임대차보호법 제3조 제1항],"{'source': 'RAGAS 학습용 질문.docx', 'version': 'v1..."
2,q002,"계약서에 1년이라고 써 있으면, 1년 지나면 무조건 나가야 하나요?","아니요. 계약서에 1년이라고 적혀 있어도, 1년이 지나면 무조건 나가야 하는 것은 ...",[주택임대차보호법 제4조 제1항],"{'source': 'RAGAS 학습용 질문.docx', 'version': 'v1..."


## 2) Define baseline & experiment configs

- Keep **base_cfg** stable.
- Only put **changed knobs** in `exp_cfg = replace(base_cfg, ...)`.

In [None]:
from dataclasses import replace
from rag_module import RAGConfig

# =========================
# Base config (edit as needed)
# =========================
base_cfg = RAGConfig(
    # ---- LLM ----
    normalize_model="solar-pro2",
    generation_model="solar-pro2",
    temperature=0.1,
    normalize_temperature=0.0,

    # ---- Embedding ----
    embedding_backend="upstage",
    embedding_model="solar-embedding-1-large-passage",

    # ---- Dense Retrieval ----
    k_law=7,
    k_rule=7,
    k_case=3,
    search_multiplier=4,

    # ---- Hybrid Fusion ----
    hybrid_dense_weight=0.5,
    hybrid_sparse_weight=0.5,

    # ---- BM25 / Sparse ----
    enable_bm25=True,
    sparse_mode="auto",
    bm25_algorithm="okapi",
    bm25_k1=1.5,
    bm25_b=0.85,
    bm25_use_kiwi=True,

    # ---- BM25-title ----
    enable_bm25_title=True,
    bm25_title_field="title",
    hybrid_sparse_title_ratio=0.35,

    # ---- Rerank ----
    enable_rerank=True,
    rerank_model="rerank-multilingual-v3.0",
    rerank_threshold=0.22,
    rerank_max_documents=18,

    # ---- Output trimming ----
    bm25_max_doc_chars=3000,
    rerank_doc_max_chars=3000,

    # ---- Dedupe ----
    dedupe_key_fields=["chunk_id", "id"],
)

# =========================
# Experiment config (only diffs here)
# =========================
exp_cfg = replace(
    base_cfg,
    hybrid_dense_weight=0.7,
    hybrid_sparse_weight=0.3,
)
base_cfg, exp_cfg


## 3) Build pipelines

In [None]:
from rag_module import create_pipeline

base_pipe = create_pipeline(config=base_cfg)
exp_pipe  = create_pipeline(config=exp_cfg)

print("✅ pipelines ready")


## 4) (Optional) Quick trace sanity check

In [None]:
# If your rag_module exposes a trace / debug method, call it here.
# Otherwise you can skip this cell.

# Example (adjust to your actual API):
# ans, trace = base_pipe.answer_with_trace("테스트 질문 ...")
# display(trace)

print("ℹ️ Skip or customize depending on your pipeline API.")


## 5) Build RAGAS samples from your pipeline outputs

This converts each testset row into the RAGAS format:
- `question`
- `answer`
- `contexts` (list[str])
- `ground_truth`

In [None]:
def shrink_contexts(ctxs, max_chars=2400, max_contexts=30):
    out = []
    for c in (ctxs or []):
        if c is None:
            continue
        s = str(c).strip()
        if not s:
            continue
        out.append(s[:max_chars])
        if len(out) >= max_contexts:
            break
    return out

def row_get_ground_truth(r: dict):
    return r.get("ground_truth") or r.get("reference") or r.get("gt") or r.get("answer")

def run_pipe_to_samples(pipe, rows, max_chars=2400, max_contexts=30, limit=None):
    samples = []
    n = len(rows) if limit is None else min(limit, len(rows))

    for i in range(n):
        r = rows[i]
        q = r.get("question") or r.get("query")
        if not q:
            continue

        # ✅ 네 파이프라인은 이걸로 호출해야 함
        out = pipe.answer_with_trace(q)

        # out이 dict일 수도 있고, (answer, ctxs, trace) 튜플일 수도 있어서 안전 처리
        ans, ctxs, trace = "", [], None

        if isinstance(out, dict):
            ans = out.get("answer") or out.get("result") or out.get("output") or out.get("text") or ""
            ctxs = out.get("contexts") or out.get("context") or out.get("docs") or []
            trace = out.get("trace") or out.get("debug") or out.get("meta")
        elif isinstance(out, tuple):
            # 흔한 패턴들 대응
            if len(out) == 3:
                ans, ctxs, trace = out
            elif len(out) == 2:
                ans, ctxs = out
            elif len(out) == 1:
                ans = out[0]
        else:
            ans = str(out)

        samples.append({
            "question": q,
            "answer": ans or "",
            "contexts": shrink_contexts(ctxs, max_chars=max_chars, max_contexts=max_contexts),
            "ground_truth": row_get_ground_truth(r) or "",
            "_trace": trace,  # ✅ trace도 같이 보관(원하면 저장 가능)
        })

    return samples

# ✅ 스모크 테스트: 1문제만
BASE_SAMPLES = run_pipe_to_samples(base_pipe, rows, limit=None)
EXP_SAMPLES = run_pipe_to_samples(exp_pipe, rows, limit=None)

print("✅ BASE_SAMPLES:", len(BASE_SAMPLES))
print("✅ EXP_SAMPLES :", len(EXP_SAMPLES))
pd.DataFrame([{k:v for k,v in BASE_SAMPLES[0].items() if k != "_trace"}])

## 6) RAGAS evaluation (prepared cell)

- Creates per-sample detail dataframe (when supported by your RAGAS version)
- Creates summary dataframe (mean over samples)
- Keeps timing metrics

In [None]:
# ============================================================
# RAGAS compare + clean saving (ragas==0.3.2 compatible)
# FIXES:
#  1) detail.csv에서 _trace 제거 (trace는 trace.jsonl로만)
#  2) samples에 run_tag를 미리 주입해서 trace.jsonl에 태그가 남도록
#  3) ground_truths=[...] 안전장치 추가 (버전/환경 호환성↑)
#  4) samples/detail 컬럼 충돌 방지(가능한 한 안전하게 merge)
# ============================================================

import time, json, re
from pathlib import Path
from datetime import datetime

import pandas as pd
from datasets import Dataset

from ragas import evaluate
from ragas.llms import llm_factory

# ----------------------------
# LLM + METRICS (ragas 0.3.2)
# ----------------------------

from openai import OpenAI

client = OpenAI()  # OPENAI_API_KEY 환경변수 사용
llm = llm_factory("gpt-4o-mini", client=client)

def build_metrics_032():
    from ragas.metrics import context_precision, context_recall, faithfulness
    metrics = [context_precision, context_recall, faithfulness]
    # answer_relevancy는 환경에 따라 없을 수 있어 optional
    try:
        from ragas.metrics import answer_relevancy
        metrics.append(answer_relevancy)
    except ImportError:
        pass
    return metrics

METRICS = build_metrics_032()
print("✅ METRICS:", [getattr(m, "__name__", m.__class__.__name__) for m in METRICS])

# ----------------------------
# utils
# ----------------------------
def _json_safe(obj):
    """Make config/meta safe to dump to json."""
    try:
        json.dumps(obj, ensure_ascii=False)
        return obj
    except TypeError:
        if hasattr(obj, "model_dump"):
            return obj.model_dump()
        if hasattr(obj, "dict"):
            return obj.dict()
        if hasattr(obj, "__dict__"):
            return obj.__dict__
        return str(obj)

def _write_json(path: Path, data):
    path.write_text(json.dumps(_json_safe(data), ensure_ascii=False, indent=2), encoding="utf-8")

def _write_jsonl(path: Path, rows):
    with path.open("w", encoding="utf-8") as f:
        for r in rows:
            f.write(json.dumps(_json_safe(r), ensure_ascii=False) + "\n")

def _next_run_dir(project_root: Path, prefix: str):
    runs_root = Path(project_root) / "results" / "ragas_runs"
    runs_root.mkdir(parents=True, exist_ok=True)

    pat = re.compile(rf"^{re.escape(prefix)}_(\d{{4}})_")
    nums = []
    for p in runs_root.iterdir():
        if p.is_dir():
            m = pat.match(p.name)
            if m:
                nums.append(int(m.group(1)))
    next_idx = (max(nums) + 1) if nums else 1

    ts = datetime.now().strftime("%Y%m%d_%H%M%S")
    run_dir = runs_root / f"{prefix}_{next_idx:04d}_{ts}"
    run_dir.mkdir(parents=True, exist_ok=False)
    return run_dir, next_idx, ts

def _strip_trace(samples):
    """detail.csv에는 _trace를 넣지 않기(파일 폭발 방지)."""
    out = []
    for s in samples:
        if isinstance(s, dict):
            out.append({k: v for k, v in s.items() if k != "_trace"})
        else:
            out.append(s)
    return out

# ----------------------------
# core eval
# ----------------------------
def eval_ragas_with_details(samples, run_tag: str):
    # ✅ samples에 run_tag를 미리 주입 (trace.jsonl에서 태그 유지)
    for s in samples:
        if isinstance(s, dict):
            s["run_tag"] = run_tag
            # ✅ 안전장치: ground_truths도 함께
            if "ground_truths" not in s:
                gt = s.get("ground_truth") or ""
                s["ground_truths"] = [gt] if isinstance(gt, str) else (gt or [])

    ds = Dataset.from_list(samples)

    t0 = time.time()
    res = evaluate(dataset=ds, metrics=METRICS, llm=llm)  # ✅ 0.3.2 안전 패턴
    eval_sec = time.time() - t0

    t1 = time.time()
    detail_df = res.to_pandas() if hasattr(res, "to_pandas") else pd.DataFrame()
    to_pandas_sec = time.time() - t1

    # ✅ detail에는 _trace 제외
    samples_df = pd.DataFrame(_strip_trace(samples))

    # merge per-sample metrics back onto samples (길이 동일할 때만)
    if len(detail_df) == len(samples_df) and len(detail_df) > 0:
        # 충돌 컬럼 방지: detail_df의 컬럼이 samples_df에 이미 있으면 prefix
        overlap = set(samples_df.columns) & set(detail_df.columns)
        if overlap:
            detail_df = detail_df.rename(columns={c: f"metric__{c}" for c in overlap})

        out_detail = pd.concat(
            [samples_df.reset_index(drop=True), detail_df.reset_index(drop=True)],
            axis=1
        )
    else:
        out_detail = samples_df.copy()

    out_detail["eval_seconds"] = round(eval_sec, 3)
    out_detail["to_pandas_seconds"] = round(to_pandas_sec, 3)

    # summary (mean of numeric metric columns if available)
    summary = {}
    if len(detail_df) > 0:
        summary = detail_df.mean(numeric_only=True).to_dict()
    elif isinstance(res, dict):
        summary = {k: float(v) for k, v in res.items() if isinstance(v, (int, float))}

    summary["run_tag"] = run_tag
    summary["eval_seconds"] = round(eval_sec, 3)
    summary["to_pandas_seconds"] = round(to_pandas_sec, 3)

    return res, out_detail, pd.DataFrame([summary])

# ----------------------------
# compare + save (clean)
# ----------------------------
def run_compare_and_save(
    base_samples,
    exp_samples,
    project_root: Path,
    prefix="ragas_compare",
    base_cfg=None,
    exp_cfg=None,
):
    # --- sanity ---
    print(f"✅ base_samples: {len(base_samples)} | exp_samples: {len(exp_samples)}")

    base_res, base_detail_df, base_summary_df = eval_ragas_with_details(base_samples, "baseline")
    exp_res,  exp_detail_df,  exp_summary_df  = eval_ragas_with_details(exp_samples,  "experiment")

    summary_df = pd.concat([base_summary_df, exp_summary_df], ignore_index=True)
    detail_df  = pd.concat([base_detail_df,  exp_detail_df],  ignore_index=True)

    run_dir, run_id, ts = _next_run_dir(project_root, prefix)

    out_summary = run_dir / "summary.csv"
    out_detail  = run_dir / "detail.csv"
    out_meta    = run_dir / "meta.json"
    out_config  = run_dir / "config.json"
    out_base_in = run_dir / "samples_base.jsonl"
    out_exp_in  = run_dir / "samples_exp.jsonl"
    out_trace   = run_dir / "trace.jsonl"

    summary_df.to_csv(out_summary, index=False, encoding="utf-8-sig")
    detail_df.to_csv(out_detail, index=False, encoding="utf-8-sig")

    # config snapshot (best-effort)
    cfg_payload = {
        "base_cfg": _json_safe(base_cfg) if base_cfg is not None else None,
        "exp_cfg":  _json_safe(exp_cfg)  if exp_cfg  is not None else None,
        "llm": {"model": "gpt-4o-mini"},
        "metrics": [getattr(m, "__name__", m.__class__.__name__) for m in METRICS],
        "created_at": datetime.now().isoformat(),
    }
    _write_json(out_config, cfg_payload)

    # input snapshots (원본 유지: _trace 포함)
    _write_jsonl(out_base_in, base_samples)
    _write_jsonl(out_exp_in,  exp_samples)

    # trace snapshot (best-effort) - samples의 _trace만 모아서 저장
    trace_rows = []
    for s in list(base_samples) + list(exp_samples):
        if isinstance(s, dict) and ("_trace" in s) and (s.get("_trace") is not None):
            trace_rows.append({
                "run_tag": s.get("run_tag"),
                "question": s.get("question"),
                "_trace": s.get("_trace"),
            })

    # trace가 아예 없으면 최소 정보라도 남김
    if not trace_rows:
        cols = [c for c in ["run_tag", "question", "eval_seconds"] if c in detail_df.columns]
        trace_rows = detail_df[cols].to_dict(orient="records") if cols else []

    _write_jsonl(out_trace, trace_rows)

    meta = {
        "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        "ragas_version": "0.3.2",
        "run_id": run_id,
        "timestamp": ts,
        "run_dir": str(run_dir),
        "prefix": prefix,
        "n_base_samples": len(base_samples),
        "n_exp_samples": len(exp_samples),
        "saved": {
            "summary": str(out_summary),
            "detail": str(out_detail),
            "meta": str(out_meta),
            "config": str(out_config),
            "samples_base": str(out_base_in),
            "samples_exp": str(out_exp_in),
            "trace": str(out_trace),
        },
    }
    _write_json(out_meta, meta)

    print(f"✅ Saved to: {run_dir}")
    print(f"   - summary: {out_summary.name}")
    print(f"   - detail : {out_detail.name}")
    print(f"   - meta   : {out_meta.name}")
    print(f"   - config : {out_config.name}")
    print(f"   - inputs : {out_base_in.name}, {out_exp_in.name}")
    print(f"   - trace  : {out_trace.name}")

    return {
        "base_res": base_res,
        "exp_res": exp_res,
        "summary_df": summary_df,
        "detail_df": detail_df,
        "run_dir": run_dir,
        "out_summary": out_summary,
        "out_detail": out_detail,
        "out_meta": out_meta,
        "out_config": out_config,
        "out_samples_base": out_base_in,
        "out_samples_exp": out_exp_in,
        "out_trace": out_trace,
        "run_id": run_id,
    }

# ============================
# USAGE (예시)
# ============================
# result = run_compare_and_save(
#     base_samples=BASE_SAMPLES,
#     exp_samples=EXP_SAMPLES,
#     project_root=PROJECT_ROOT,
#     prefix="ragas_compare",
#     base_cfg=base_cfg,
#     exp_cfg=exp_cfg,
# )
# display(result["summary_df"])
# display(result["detail_df"].head(3))


## 7) Run + compare + save

In [None]:
# ============================================================
# One-cell: RAGAS compare + save (+ delta outputs + dataset fingerprint)
# Adds:
#  - delta_summary.csv
#  - delta_detail.csv
#  - top_regressions.csv
#  - top_improvements.csv
#  - meta.json: testset fingerprint (path/lines/sha1) if TESTSET_JSONL exists
# ============================================================

import time, re, json, hashlib
from pathlib import Path
from datetime import datetime

import pandas as pd
from datasets import Dataset

from openai import OpenAI
from ragas import evaluate
from ragas.llms import llm_factory


# ----------------------------
# LLM (non-deprecated)
# ----------------------------
client = OpenAI()
llm = llm_factory("gpt-4o-mini", client=client)  # ✅ client 전달


# ----------------------------
# METRICS (version-tolerant)
# ----------------------------
def build_metrics():
    from ragas.metrics import context_precision, context_recall, faithfulness
    metrics = [context_precision, context_recall, faithfulness]
    try:
        from ragas.metrics import answer_relevancy
        metrics.append(answer_relevancy)
    except ImportError:
        pass
    return metrics

METRICS = build_metrics()
print("✅ METRICS:", [getattr(m, "__name__", m.__class__.__name__) for m in METRICS])


# ----------------------------
# JSON helpers (safe)
# ----------------------------
def _json_safe(obj):
    try:
        json.dumps(obj, ensure_ascii=False)
        return obj
    except TypeError:
        if hasattr(obj, "model_dump"):
            return obj.model_dump()
        if hasattr(obj, "dict"):
            return obj.dict()
        if hasattr(obj, "__dict__"):
            return obj.__dict__
        return str(obj)

def _write_json(path: Path, data):
    path.write_text(json.dumps(_json_safe(data), ensure_ascii=False, indent=2), encoding="utf-8")

def _write_jsonl(path: Path, rows):
    with path.open("w", encoding="utf-8") as f:
        for r in rows:
            f.write(json.dumps(_json_safe(r), ensure_ascii=False) + "\n")

def _df_to_jsonl_rows(df, prefer_cols):
    cols = [c for c in prefer_cols if c in df.columns]
    if cols:
        df = df[cols].copy()
    return df.to_dict(orient="records"), cols


# ----------------------------
# dataset fingerprint helpers
# ----------------------------
def _file_sha1(path: Path, chunk_size=1024 * 1024) -> str:
    h = hashlib.sha1()
    with path.open("rb") as f:
        while True:
            b = f.read(chunk_size)
            if not b:
                break
            h.update(b)
    return h.hexdigest()

def _count_jsonl_lines(path: Path) -> int:
    n = 0
    with path.open("r", encoding="utf-8") as f:
        for line in f:
            if line.strip():
                n += 1
    return n


# ----------------------------
# core: eval + detail/summary frames
# ----------------------------
def eval_ragas_with_details(samples, run_tag: str):
    ds = Dataset.from_list(samples)

    t0 = time.time()
    res = evaluate(dataset=ds, metrics=METRICS, llm=llm)  # ✅ llm은 여기로
    eval_sec = time.time() - t0

    t1 = time.time()
    detail_df = res.to_pandas() if hasattr(res, "to_pandas") else pd.DataFrame()
    to_pandas_sec = time.time() - t1

    samples_df = pd.DataFrame(samples)

    # merge
    if len(detail_df) == len(samples_df) and len(detail_df) > 0:
        out_detail = pd.concat(
            [samples_df.reset_index(drop=True), detail_df.reset_index(drop=True)],
            axis=1
        )
    else:
        out_detail = samples_df.copy()

    out_detail["run_tag"] = run_tag
    out_detail["eval_seconds"] = round(eval_sec, 3)
    out_detail["to_pandas_seconds"] = round(to_pandas_sec, 3)

    # summary
    summary = {}
    if len(detail_df) > 0:
        summary = detail_df.mean(numeric_only=True).to_dict()
    elif isinstance(res, dict):
        summary = {k: float(v) for k, v in res.items() if isinstance(v, (int, float))}

    summary["run_tag"] = run_tag
    summary["eval_seconds"] = round(eval_sec, 3)
    summary["to_pandas_seconds"] = round(to_pandas_sec, 3)

    return res, out_detail, pd.DataFrame([summary])


# ----------------------------
# run dir allocator
# ----------------------------
def _next_run_dir(project_root: Path, prefix: str):
    runs_root = Path(project_root) / "results" / "ragas_runs"
    runs_root.mkdir(parents=True, exist_ok=True)

    pat = re.compile(rf"^{re.escape(prefix)}_(\d{{4}})_")
    nums = []
    for p in runs_root.iterdir():
        if p.is_dir():
            m = pat.match(p.name)
            if m:
                nums.append(int(m.group(1)))
    next_id = (max(nums) + 1) if nums else 1

    ts = datetime.now().strftime("%Y%m%d_%H%M%S")
    run_dir = runs_root / f"{prefix}_{next_id:04d}_{ts}"
    run_dir.mkdir(parents=True, exist_ok=False)
    return run_dir, next_id, ts


# ----------------------------
# delta builders
# ----------------------------
def _pick_question_col(df: pd.DataFrame) -> str:
    for c in ["question", "normalized_question", "normalized_query", "query"]:
        if c in df.columns:
            return c
    return ""

def _ensure_question_id(df: pd.DataFrame) -> pd.DataFrame:
    # prefer an existing stable id
    for c in ["question_id", "id", "sample_id"]:
        if c in df.columns:
            df = df.copy()
            df["__qid__"] = df[c].astype(str)
            return df
    # fallback: hash question text
    qcol = _pick_question_col(df)
    df = df.copy()
    if qcol:
        def _h(x: str) -> str:
            s = (x or "").strip().encode("utf-8")
            return hashlib.sha1(s).hexdigest()[:12]
        df["__qid__"] = df[qcol].astype(str).map(_h)
    else:
        df["__qid__"] = [f"row{i:04d}" for i in range(len(df))]
    return df

def _metric_cols(df: pd.DataFrame) -> list:
    # heuristic: numeric columns from ragas result + common metric names
    prefer = [
        "context_precision", "context_recall", "faithfulness", "answer_relevancy",
        "ContextPrecision", "ContextRecall", "Faithfulness", "AnswerRelevancy",
    ]
    cols = [c for c in prefer if c in df.columns]
    if cols:
        return cols

    # fallback: any numeric columns that are not obvious non-metrics
    exclude = set(["eval_seconds", "to_pandas_seconds"])
    num_cols = []
    for c in df.columns:
        if c in exclude:
            continue
        if pd.api.types.is_numeric_dtype(df[c]):
            num_cols.append(c)
    return num_cols


def _make_delta_summary(summary_df: pd.DataFrame) -> pd.DataFrame:
    # summary_df has rows: baseline/experiment
    metric_cols = [c for c in summary_df.columns if c not in ["run_tag"]]
    base = summary_df[summary_df["run_tag"] == "baseline"].iloc[0].to_dict()
    exp  = summary_df[summary_df["run_tag"] == "experiment"].iloc[0].to_dict()

    rows = []
    for c in metric_cols:
        if c == "run_tag":
            continue
        b = base.get(c)
        e = exp.get(c)
        if isinstance(b, (int, float)) and isinstance(e, (int, float)):
            rows.append({"metric": c, "baseline": float(b), "experiment": float(e), "delta": float(e - b)})
        else:
            # keep non-numeric too
            rows.append({"metric": c, "baseline": b, "experiment": e, "delta": None})
    return pd.DataFrame(rows)


def _make_delta_detail(detail_df: pd.DataFrame) -> pd.DataFrame:
    df = _ensure_question_id(detail_df)

    base = df[df["run_tag"] == "baseline"].copy()
    exp  = df[df["run_tag"] == "experiment"].copy()

    # --- choose question column ---
    qcol = _pick_question_col(df)

    # --- metric columns ---
    mcols = _metric_cols(df)

    # --- info columns to keep (these overlap across base/exp) ---
    info_cols = ["__qid__"]
    if qcol:
        info_cols.append(qcol)
    for c in ["ground_truth", "reference", "answer", "contexts"]:
        if c in df.columns:
            info_cols.append(c)

    # --- select + rename (so no overlap) ---
    base_small = base[info_cols + mcols].copy()
    exp_small  = exp[info_cols + mcols].copy()

    rename_base = {c: f"{c}_base" for c in info_cols if c != "__qid__"}
    rename_exp  = {c: f"{c}_exp"  for c in info_cols if c != "__qid__"}
    rename_base.update({c: f"{c}_base" for c in mcols})
    rename_exp.update({c: f"{c}_exp"  for c in mcols})

    base_small = base_small.rename(columns=rename_base)
    exp_small  = exp_small.rename(columns=rename_exp)

    # --- merge safely ---
    merged = exp_small.merge(base_small, on="__qid__", how="outer")

    # --- compute deltas ---
    for c in mcols:
        cb = f"{c}_base"
        ce = f"{c}_exp"
        if cb in merged.columns and ce in merged.columns:
            merged[f"{c}_delta"] = merged[ce] - merged[cb]

    # --- convenience: make a unified question column (prefer exp, fallback base) ---
    if qcol:
        qe = f"{qcol}_exp"
        qb = f"{qcol}_base"
        if qe in merged.columns or qb in merged.columns:
            merged[qcol] = None
            if qe in merged.columns:
                merged[qcol] = merged[qe]
            if qb in merged.columns:
                merged[qcol] = merged[qcol].fillna(merged[qb])

    # --- order columns nicely ---
    ordered = ["__qid__"]
    if qcol and qcol in merged.columns:
        ordered.append(qcol)

    # keep references (unified view is optional; we keep exp/base separately)
    for c in ["ground_truth", "reference"]:
        # add unified if you want; here we keep exp/base columns only
        pass

    # metrics grouped
    for c in mcols:
        for suf in ["_base", "_exp", "_delta"]:
            col = f"{c}{suf}"
            if col in merged.columns:
                ordered.append(col)

    # then common info columns (exp/base)
    tail_info = []
    for c in ["ground_truth", "reference", "answer", "contexts"]:
        ce, cb = f"{c}_exp", f"{c}_base"
        if ce in merged.columns:
            tail_info.append(ce)
        if cb in merged.columns:
            tail_info.append(cb)

    remaining = [c for c in merged.columns if c not in ordered and c not in tail_info]
    return merged[ordered + remaining + tail_info].copy()



def _make_top_changes(delta_detail_df: pd.DataFrame, top_k=10) -> tuple[pd.DataFrame, pd.DataFrame, str]:
    # choose primary metric for sorting
    candidates = [
        "answer_relevancy_delta", "AnswerRelevancy_delta",
        "faithfulness_delta", "Faithfulness_delta",
        "context_precision_delta", "ContextPrecision_delta",
        "context_recall_delta", "ContextRecall_delta",
    ]
    sort_col = next((c for c in candidates if c in delta_detail_df.columns), None)
    if sort_col is None:
        # fallback: first *_delta numeric column
        delta_cols = [c for c in delta_detail_df.columns if c.endswith("_delta") and pd.api.types.is_numeric_dtype(delta_detail_df[c])]
        sort_col = delta_cols[0] if delta_cols else ""

    if not sort_col:
        # nothing to rank
        return pd.DataFrame(), pd.DataFrame(), ""

    # pick minimal view columns
    qcol = _pick_question_col(delta_detail_df)
    view_cols = [c for c in ["__qid__", qcol, sort_col] if c and c in delta_detail_df.columns]
    # add also base/exp of that metric if present
    base_col = sort_col.replace("_delta", "_base")
    exp_col  = sort_col.replace("_delta", "_exp")
    for c in [base_col, exp_col]:
        if c in delta_detail_df.columns and c not in view_cols:
            view_cols.append(c)

    regress = delta_detail_df.sort_values(sort_col, ascending=True).head(top_k)[view_cols].copy()
    improve = delta_detail_df.sort_values(sort_col, ascending=False).head(top_k)[view_cols].copy()
    return regress, improve, sort_col


# ----------------------------
# main: compare + save (csv/json + optional snapshots)
# ----------------------------
def run_compare_and_save_all(
    base_samples,
    exp_samples,
    project_root: Path,
    prefix="ragas_compare",
    save_snapshots=True,
    save_config=True,
    save_samples_jsonl=True,
    save_trace_jsonl=True,
    trace_cols_priority=None,
    save_delta_outputs=True,
    top_k=10,
):
    # 1) evaluate
    base_res, base_detail_df, base_summary_df = eval_ragas_with_details(base_samples, "baseline")
    exp_res,  exp_detail_df,  exp_summary_df  = eval_ragas_with_details(exp_samples,  "experiment")

    summary_df = pd.concat([base_summary_df, exp_summary_df], ignore_index=True)
    detail_df  = pd.concat([base_detail_df,  exp_detail_df],  ignore_index=True)

    # 2) run dir
    run_dir, run_id, ts = _next_run_dir(project_root, prefix)

    # 3) basic outputs
    out_summary = run_dir / "summary.csv"
    out_detail  = run_dir / "detail.csv"
    out_meta    = run_dir / "meta.json"

    summary_df.to_csv(out_summary, index=False, encoding="utf-8-sig")
    detail_df.to_csv(out_detail, index=False, encoding="utf-8-sig")

    # --- dataset fingerprint (optional) ---
    testset_info = None
    if "TESTSET_JSONL" in globals():
        p = Path(globals()["TESTSET_JSONL"])
        if p.exists():
            testset_info = {
                "testset_path": str(p),
                "testset_lines": _count_jsonl_lines(p),
                "testset_sha1": _file_sha1(p),
            }

    meta = {
        "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        "run_id": run_id,
        "timestamp": ts,
        "run_dir": str(run_dir),
        "prefix": prefix,
        "n_base_samples": len(base_samples),
        "n_exp_samples": len(exp_samples),
        "metrics": [getattr(m, "__name__", m.__class__.__name__) for m in METRICS],
        "llm": {"model": "gpt-4o-mini"},
        "testset": testset_info,
    }
    _write_json(out_meta, meta)

    # 3-1) DELTA outputs (⭐️ NEW)
    extra = {}
    if save_delta_outputs:
        # delta_summary
        delta_summary_df = _make_delta_summary(summary_df)
        out_delta_summary = run_dir / "delta_summary.csv"
        delta_summary_df.to_csv(out_delta_summary, index=False, encoding="utf-8-sig")
        extra["out_delta_summary"] = str(out_delta_summary)

        # delta_detail
        delta_detail_df = _make_delta_detail(detail_df)
        out_delta_detail = run_dir / "delta_detail.csv"
        delta_detail_df.to_csv(out_delta_detail, index=False, encoding="utf-8-sig")
        extra["out_delta_detail"] = str(out_delta_detail)

        # top changes (regressions/improvements)
        top_regress, top_improve, sort_col = _make_top_changes(delta_detail_df, top_k=top_k)
        out_top_regress = run_dir / "top_regressions.csv"
        out_top_improve = run_dir / "top_improvements.csv"
        top_regress.to_csv(out_top_regress, index=False, encoding="utf-8-sig")
        top_improve.to_csv(out_top_improve, index=False, encoding="utf-8-sig")
        extra["out_top_regressions"] = str(out_top_regress)
        extra["out_top_improvements"] = str(out_top_improve)
        extra["top_rank_metric"] = sort_col

    # 4) optional snapshots
    if save_snapshots:
        # 4-1) config.json
        if save_config:
            cfg_payload = {
                "created_at": datetime.now().isoformat(),
                "llm": {"model": "gpt-4o-mini"},
                "metrics": [getattr(m, "__name__", m.__class__.__name__) for m in METRICS],
                "base_cfg": _json_safe(globals().get("base_cfg")) if "base_cfg" in globals() else None,
                "exp_cfg":  _json_safe(globals().get("exp_cfg"))  if "exp_cfg"  in globals() else None,
            }
            out_config = run_dir / "config.json"
            _write_json(out_config, cfg_payload)
            extra["out_config"] = str(out_config)

        # 4-2) input samples jsonl
        if save_samples_jsonl:
            out_samples_base = run_dir / "samples_base.jsonl"
            out_samples_exp  = run_dir / "samples_exp.jsonl"
            _write_jsonl(out_samples_base, base_samples)
            _write_jsonl(out_samples_exp,  exp_samples)
            extra["out_samples_base"] = str(out_samples_base)
            extra["out_samples_exp"]  = str(out_samples_exp)

        # 4-3) trace jsonl (from detail_df)
        if save_trace_jsonl:
            if trace_cols_priority is None:
                trace_cols_priority = [
                    "id", "sample_id", "__qid__",
                    "question", "normalized_question", "normalized_query", "query",
                    "answer", "ground_truth", "reference",
                    "contexts",
                    "_trace",
                    "retrieved_doc_ids", "retrieved_docs", "retrieval_scores",
                    "rerank_selected_ids", "rerank_scores",
                    "final_context_ids", "final_contexts",
                    "latency_ms", "latency_sec",
                    "run_tag",
                ]
            trace_rows, used_cols = _df_to_jsonl_rows(detail_df, trace_cols_priority)
            out_trace = run_dir / "trace.jsonl"
            _write_jsonl(out_trace, trace_rows)
            extra["out_trace"] = str(out_trace)
            extra["trace_cols_used"] = used_cols

    return {
        "base_res": base_res,
        "exp_res": exp_res,
        "summary_df": summary_df,
        "detail_df": detail_df,
        "run_dir": str(run_dir),
        "out_summary": str(out_summary),
        "out_detail": str(out_detail),
        "out_meta": str(out_meta),
        **extra,
    }


# ----------------------------
# RUN
# ----------------------------
result = run_compare_and_save_all(
    base_samples=BASE_SAMPLES,
    exp_samples=EXP_SAMPLES,
    project_root=PROJECT_ROOT,
    prefix="ragas_compare",
    save_snapshots=True,
    save_config=True,
    save_samples_jsonl=True,
    save_trace_jsonl=True,
    save_delta_outputs=True,  # ✅ NEW
    top_k=10,                 # ✅ NEW
)

print("✅ run_dir:", result["run_dir"])
print("✅ saved:", result["out_summary"], result["out_detail"], result["out_meta"])

# NEW delta outputs
if "out_delta_summary" in result:
    print("✅ delta saved:", result["out_delta_summary"], result.get("out_delta_detail"))
    print("✅ top changes metric:", result.get("top_rank_metric"))
    print("✅ top regressions:", result.get("out_top_regressions"))
    print("✅ top improvements:", result.get("out_top_improvements"))

# snapshots
if "out_config" in result:
    print("✅ extra saved config :", result["out_config"])
if "out_samples_base" in result:
    print("✅ extra saved samples:", result["out_samples_base"], "and", result.get("out_samples_exp"))
if "out_trace" in result:
    print("✅ extra saved trace  :", result["out_trace"])
    print("✅ trace columns used :", result.get("trace_cols_used"))
