In [1]:
import os
import pandas as pd
from tqdm import tqdm
import requests
import json
from openai import OpenAI
import configparser
import argparse

import tools.ProcessFiles as pf

In [2]:
excel = pf.get_excel_data(1, pf.BASE_PATH.replace('jinym', 'yejin'))
json_files = pf.get_filelist(1, pf.FINAL_DATA_PATH.replace('jinym', 'yejin'))
data_dir = '/Users/yejin/Desktop/Desktop_AICenter✨/SFAIcenter/data_yejin/FIN_workbook/1C'
data_dir

'/Users/yejin/Desktop/Desktop_AICenter✨/SFAIcenter/data_yejin/FIN_workbook/1C'

In [None]:
# excel[excel['분류'] != 'Lv5']

In [3]:
multiple, short, essay = [],[],[]
origin = json.load(open(data_dir+'/merged_extracted_qna_domain.json', 'r', encoding='utf-8'))
for qna in origin:
    if len(qna['qna_data']['description']['answer']) > 0:
        if qna.get('qna_type') == "multiple-choice":
            multiple.append(qna)
        elif qna.get('qna_type') == "short-answer":
            short.append(qna)
        elif qna.get('qna_type') == "essay":
            essay.append(qna)
# data_dir = os.path.join(base_dir,'Lv2', 're')
# origin = json.load(open(os.path.join(base_dir, 'merged_qna_set.json'), 'r', encoding='utf-8'))

In [4]:
len(multiple), len(short), len(essay)

(2296, 8, 384)

In [None]:
short

In [22]:
import pandas as pd
import numpy as np
import re
from typing import List, Dict, Tuple, Iterable, Set
from dataclasses import dataclass

# -----------------------------
# 0) 유틸: 텍스트 정규화
# -----------------------------
CIRCLED_MAP = {"①":"1","②":"2","③":"3","④":"4","⑤":"5"}

def normalize_option_text(s: str) -> str:
    """선지 앞에 붙은 ①~⑤, 1), (1), 1. 등 번호 표기를 제거하고 본문만 남김."""
    if s is None:
        return ""
    s = str(s).strip()
    # ①~⑤ 제거
    s = re.sub(r"^\s*[①-⑤]\s*", "", s)
    # 1), (1), 1. 등 제거
    s = re.sub(r"^\s*(?:\(?([1-5])\)?[.)])\s*", "", s)
    return s.strip()

def parse_answer_set(ans: str) -> Set[int]:
    """'①, ⑤' 같은 복수정답도 {1,5}로 파싱. 빈/이상값은 빈 set."""
    if not ans:
        return set()
    s = str(ans)
    # ①~⑤ 를 1~5로 치환
    for k, v in CIRCLED_MAP.items():
        s = s.replace(k, v)
    # 쉼표/슬래시/공백 구분 모두 허용하여 1~5 추출
    nums = re.findall(r"[1-5]", s)
    return set(int(n) for n in nums)

# -----------------------------
# 1) JSON → df_all 변환
# -----------------------------
def json_to_df_all(json_list: List[dict]) -> pd.DataFrame:
    """
    입력 JSON(list[dict])을 파싱해 df_all 생성.
    컬럼: book_id, tag, id, question, opt1..opt5, answer_set
    """
    rows = []
    for item in json_list:
        book_id = str(item.get("file_id", ""))
        qna = item.get("qna_data", {}) or {}
        tag  = qna.get("tag", "")
        desc = qna.get("description", {}) or {}
        q    = (desc.get("question") or "").strip()
        opts = desc.get("options") or []
        # 5지선다 기준으로 빈칸 보정
        opts = list(opts)[:5] + [""] * max(0, 5 - len(opts))
        opts = [normalize_option_text(x) for x in opts]
        ans_set = parse_answer_set(desc.get("answer", ""))

        rows.append({
            "book_id": book_id,
            "tag": tag,
            "id": f"{book_id}_{tag}",
            "question": q,
            "opt1": opts[0], "opt2": opts[1], "opt3": opts[2], "opt4": opts[3], "opt5": opts[4],
            "answer_set": ans_set
        })
    df = pd.DataFrame(rows)
    # 혹시 id 중복이 있으면 마지막 것 유지(필요시 정책 변경)
    df = df.drop_duplicates("id", keep="last").reset_index(drop=True)
    return df

# -----------------------------
# 2) 배치 사용자 프롬프트 생성 (50문제)
# -----------------------------
SYSTEM_PROMPT = """당신은 금융전문가이자 객관식 문제 풀이 전문가입니다.
여러 금융 객관식 문제에 대해, 각 문제의 정답 "번호만" 하나 선택합니다.

규칙
- 각 문제는 고유 ID와 함께 제시됩니다.
- 출력은 반드시 한 줄당 "ID<TAB>번호" 형식으로만 합니다. (예: 9791166791123_q_0377_0001<TAB>3)
- 다른 글자, 마크다운, 이유, 기호는 절대 출력하지 않습니다.
- 모든 문제는 보기(1~5) 중 하나만 고릅니다.
- 출력 줄 수는 입력 문제 개수와 동일해야 합니다.
"""

def build_user_prompt(batch_df: pd.DataFrame) -> str:
    lines = []
    lines.append("다음은 금융 객관식 문제들입니다. 각 문제에 대해 정답 번호만 고르세요.\n")
    lines.append("문제들")
    for _, r in batch_df.iterrows():
        lines.append(f"ID: {r['id']}")
        lines.append(f"Q: {r['question']}")
        lines.append(f"1) {r['opt1']}")
        lines.append(f"2) {r['opt2']}")
        lines.append(f"3) {r['opt3']}")
        lines.append(f"4) {r['opt4']}")
        lines.append(f"5) {r['opt5']}\n")
    lines.append("출력 형식(중요)")
    for _, r in batch_df.iterrows():
        lines.append(f"{r['id']}\\t{{번호}}")
    return "\n".join(lines)

# -----------------------------
# 3) LLM 호출 추상화 (모의/실제)
# -----------------------------
def call_llm(model_name: str, system_prompt: str, user_prompt: str, mock_mode: bool=False) -> str:
    """
    - mock_mode=True면 임의 번호(1~5)를 생성해 파이프라인 검증용 출력 반환.
    - 실제 환경에서는 이 함수를 OpenAI/사내엔진 호출로 교체.
    """
    if mock_mode:
        # 입력 user_prompt에서 ID 목록 회수
        ids = [ln.split("\t")[0] for ln in user_prompt.splitlines() if "\t{번호}" in ln]
        # 무작위 예측(1~5)
        rng = np.random.default_rng(42)
        preds = rng.integers(1, 6, size=len(ids))
        return "\n".join(f"{_id}\t{int(a)}" for _id, a in zip(ids, preds))
        raise NotImplementedError("call_llm를 실제 API로 연결하세요 (mock_mode=True로 파이프라인 검증 가능).")

    # ---- 실제 호출 예시 (의사코드)
    # from openai import OpenAI
    # client = OpenAI()
    # resp = client.chat.completions.create(
    #     model=model_name,
    #     temperature=0,
    #     top_p=1,
    #     messages=[
    #         {"role": "system", "content": system_prompt},
    #         {"role": "user", "content": user_prompt},
    #     ],
    # )
    # return resp.choices[0].message.content
    else:
        import tools.Openrouter as Openrouter
        ans = Openrouter.query_model_openrouter(system_prompt, user_prompt, model_name)
        return ans

# -----------------------------
# 4) 모델 출력 파싱 (ID<TAB>번호)
# -----------------------------
def parse_model_output(raw: str, expected_ids: List[str]) -> Dict[str, float]:
    """
    모델 원시 출력(raw)을 {id: answer(1~5)}로 변환.
    - 'ID\\t번호' 포맷 기준
    - 잘못된 줄/누락 줄은 NaN 처리
    """
    id_set = set(expected_ids)
    out: Dict[str, float] = {k: np.nan for k in expected_ids}

    for ln in (raw or "").splitlines():
        ln = ln.strip()
        if not ln or "\t" not in ln:
            continue
        left, right = ln.split("\t", 1)
        _id = left.strip()
        if _id not in id_set:
            continue
        # 첫 번째 1~5 추출
        m = re.search(r"[1-5]", right)
        if m:
            out[_id] = float(m.group(0))
    return out

# -----------------------------
# 5) 파이프라인: 300 샘플 → 50개씩 × 모델 호출 → DF 정리/정확도
# -----------------------------
def run_eval_pipeline(
    json_list: List[dict],
    models: List[str],
    sample_size: int = 300,
    batch_size: int = 50,
    seed: int = 42,
    mock_mode: bool = False,
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """
    반환:
      df_all      : 전체 원장 (정규화 선지 + answer_set)
      pred_long   : (id, model_name, answer) 롱 포맷
      pred_wide   : id 기준 모델별 예측 와이드
      acc_by_model: 모델별 정확도 (복수정답 지원: 예측 ∈ answer_set 이면 정답)
    """
    # (1) JSON → df_all
    df_all = json_to_df_all(json_list)
    df_all = df_all.sort_values(by=['book_id', 'tag'], ascending=False).reset_index(drop=True)

    # (2) 300개 샘플 (재현성 유지)
    df_sample = df_all.sample(n=sample_size, random_state=seed).reset_index(drop=True)

    # (3) 50문제 배치 분할
    batches = [df_sample.iloc[i:i+batch_size] for i in range(0, len(df_sample), batch_size)]

    # (4) 모델 호출/파싱 누적
    rows = []
    for bidx, bdf in enumerate(batches, 1):
        user_prompt = build_user_prompt(bdf)
        ids = bdf["id"].tolist()
        for model in models:
            raw = call_llm(model, SYSTEM_PROMPT, user_prompt, mock_mode=mock_mode)
            parsed = parse_model_output(raw, ids)
            for _id in ids:
                rows.append({"id": _id, "model_name": model, "answer": parsed[_id]})

    pred_long = pd.DataFrame(rows)
    pred_long = pred_long.sort_values(by=['id'], ascending=True).reset_index(drop=True)
    # (5) 와이드 포맷
    pred_wide = pred_long.pivot(index="id", columns="model_name", values="answer").reset_index()
    pred_wide = pred_wide.sort_values(by=['id'], ascending=True).reset_index(drop=True)

    # (6) 정확도 (복수정답 집합 매칭)
    key = df_sample[["id", "answer_set"]].copy()
    # 정답 집합이 비어있으면 채점 제외 (NaN)
    def _is_correct(pred: float, s: Set[int]) -> float:
        if np.isnan(pred) or not s:
            return np.nan
        return float(int(pred) in s)

    merged = pred_long.merge(key, on="id", how="left")
    merged["correct"] = merged.apply(lambda r: _is_correct(r["answer"], r["answer_set"]), axis=1)

    acc_by_model = (
        merged.groupby("model_name", dropna=False)["correct"]
        .mean()
        .reset_index()
        .rename(columns={"correct": "accuracy"})
        .sort_values("accuracy", ascending=False)
    )
    acc_by_model = acc_by_model.sort_values("accuracy", ascending=False)

    return df_all, pred_long, pred_wide, acc_by_model

# -----------------------------
# 6) 사용 예시
# -----------------------------
# data = [...]  # 네가 제공한 JSON 리스트
data = multiple
models = ['anthropic/claude-sonnet-4', 'google/gemini-2.5-flash', 'x-ai/grok-4-fast', 'deepseek/deepseek-chat-v3-0324', 'openai/gpt-4.1', 'openai/gpt-oss-120b', 'openai/gpt-5', 'meta-llama/llama-3.3-70b-instruct']  # 실제 모델명
# models = ['x-ai/grok-4-fast:free']
df_all, pred_long, pred_wide, acc = run_eval_pipeline(
    data, models, sample_size=300, batch_size=50, seed=42, mock_mode=False
)

In [23]:
with pd.ExcelWriter("evaluation_results.xlsx", engine="openpyxl") as w:
    df_all.to_excel(w, index=False, sheet_name="final_report")   # 통합 뷰
    pred_wide.to_excel(w, index=False, sheet_name="pred_wide")         # 모델별 예측(가로)
    acc.to_excel(w, index=False, sheet_name="accuracy")       # 모델별 정확도 요약

print(acc)
# final_report 시트만 보면 거의 끝납니다.

                          model_name  accuracy
2            google/gemini-2.5-flash  0.753333
0          anthropic/claude-sonnet-4  0.750000
4                     openai/gpt-4.1  0.683333
1     deepseek/deepseek-chat-v3-0324  0.673575
7                   x-ai/grok-4-fast  0.670000
3  meta-llama/llama-3.3-70b-instruct  0.533333
5                       openai/gpt-5       NaN
6                openai/gpt-oss-120b       NaN
