### Configuration

In [None]:
from pathlib import Path


# Data
ROOT_DIR = Path("/home/work/djyoon/project/cot-data-validation")
DATA_DIR = ROOT_DIR / "data"
OUTPUT_DIR = ROOT_DIR / "output"
VERSION = "final_251120"
INPUT_DATA_PATH = DATA_DIR / "가공데이터"
OUTPUT_DATA_PATH = DATA_DIR / VERSION

# Training
# BASE_MODEL = "google/gemma-3-1b-it"
# TOKENIZER_MODEL = BASE_MODEL
# BASE_MODEL_NAME = BASE_MODEL.split("/")[1]
# EXP_NAME = f"cot/{BASE_MODEL_NAME}-{VERSION}"
# OUTPUT_MODEL_PATH = OUTPUT_DIR / "models" / EXP_NAME
SEED = 42

### Load dataset

In [None]:
import json
import random
from glob import glob
from pathlib import Path
from os.path import exists

from datasets import Dataset, DatasetDict, load_from_disk


SPECIAL_ANS = "answer"
TEST_SIZE = 0.1  # train:val:test = 8:1:1


def generate_dataset(
    input_path: str | Path, output_path: str | Path, test_size: float = TEST_SIZE
) -> DatasetDict:
    """Generate dataset from source data path"""
    if exists(output_path):
        return load_from_disk(output_path)
    else:
        random.seed(SEED)
        random.shuffle(data)

        data = load_json(input_path)
        n = len(data)
        ds = DatasetDict(
            {
                "train": Dataset.from_list(data[int(n * 2 * test_size) :]),
                "val": Dataset.from_list(
                    data[int(n * test_size) : int(n * 2 * test_size)]
                ),
                "test": Dataset.from_list(data[: int(n * test_size)]),
            }
        )

        # Generate qa set
        for id in ds:
            ds[id] = ds[id].map(
                generate_pair, remove_columns=["instruction", "context", "target_steps"]
            )

        ds.save_to_disk(output_path)
        print(ds)
        print(f"Dataset is saved to {output_path}.")

    return ds


def load_json(base_path: Path | str) -> list[dict]:
    """base_path 하위 모든 폴더를 재귀 탐색하여 JSON 파일 로드"""
    json_files = glob(f"{base_path}/**/*.json", recursive=True)
    data = []
    for path in json_files:
        try:
            with open(path, "r", encoding="utf-8") as f:
                obj = json.load(f)
            if isinstance(obj, list):
                data.extend([build_record(o) for o in obj])
            else:
                data.append(build_record(obj))
        except Exception as e:
            print(f"⚠️ {f} 로드 실패: {e}")
    print(f"✅ 총 {len(data)}개 JSON 파일 로드 완료 (경로: {base_path})")
    return data


def build_record(obj):
    """단일 JSON 객체 → CoT 학습용 포맷 (가치 산출 중심, auto context generation)"""
    p = obj.get("patent_info", {})
    c = obj.get("Company_info", {})
    ins = obj.get("instruction_id", {})
    val = obj.get("valuation_id", {})

    # ----- Instruction -----
    instruction = (
        ins.get("input") or f"{ins.get('title_ko','')} 특허의 가치를 단계별로 계산하라."
    )

    # ----- Dynamic Context Builder -----
    ctx_parts = []

    # 1. 특허 기본 정보
    for key, label in [
        ("invention_title", "특허명"),
        ("application_number", "출원번호"),
        ("open_number", "공개번호"),
        ("register_number", "등록번호"),
        ("ipc_all", "IPC"),
        ("applicant_name", "출원인"),
    ]:
        val_ = p.get(key)
        if val_:
            ctx_parts.append(f"{label}: {val_}")

    # 2. 회사/산업 정보
    for key, label in [
        ("company_name", "회사명"),
        ("industry", "산업분류"),
        ("ksic", "KSIC 코드"),
        ("sales", "매출액"),
        ("net_income", "당기순이익"),
        ("asset", "총자산"),
        ("liabilities", "부채"),
        ("equity", "자본"),
    ]:
        val_ = c.get(key)
        if val_ not in [None, "", 0]:
            ctx_parts.append(f"{label}: {val_}")

    # 3. 가치평가 관련 파라미터
    for key, label in [
        ("royalty_rate", "로열티율(%)"),
        ("useful_life_years", "경제적 수명(년)"),
        ("wacc", "WACC(%)"),
        ("business_risk", "사업위험도"),
    ]:
        val_ = val.get(key, ins.get(key))
        if val_ not in [None, "", 0]:
            ctx_parts.append(f"{label}: {val_}")

    # 4. 키워드 및 요약
    if ins.get("keywords"):
        ctx_parts.append(f"핵심 키워드: {', '.join(ins['keywords'])}")
    if ins.get("abstract_ko"):
        ctx_parts.append(f"요약: {ins['abstract_ko'][:300]}...")

    # 5. 목표
    ctx_parts.append(
        "이 데이터를 기반으로 로열티 공제법(Royalty Relief Method)을 사용하여 특허의 경제적 가치를 계산하라."
    )

    # ----- Target Steps & Answer -----
    steps = "\n".join(ins.get("output", []))
    ans = ins.get("answer")

    return {
        "instruction": instruction.strip(),
        "context": "\n".join(ctx_parts).strip(),
        "target_steps": steps.strip(),
        "target_answer": ans,
    }


def generate_pair(d: dict) -> dict:
    question = (
        f"당신은 특허 가치평가 전문가입니다.\n"
        f"주어진 데이터를 바탕으로 로열티 공제법(Royalty Relief Method)을 사용하여 특허의 경제적 가치를 추정하세요.\n"
        f"모든 계산은 단계별로 명확히 제시하고, 각 단계마다 수식과 계산 근거를 설명하세요.\n"
        f"최종 단계에서는 할인 후 현재가치를 계산하여 결과를 제시합니다.\n\n"
        f"[INSTRUCTION]\n{d['instruction']}\n\n"
        f"[CONTEXT]\n{d.get('context','없음')}\n\n"
        f"[OUTPUT FORMAT]\n"
        f"1. 단계별 계산 과정 (예: 매출 추정 → 로열티율 적용 → 세후 조정 → 할인 계산)\n"
        f'2. 마지막 줄에는 반드시 "<{SPECIAL_ANS}>{{정수}}원</{SPECIAL_ANS}>" 형식으로 최종 특허 가치를 표시하세요.\n\n'
        f"[EXAMPLE]\n"
        "① ...\n"
        "② ...\n"
        "...\n\n"
        f"<{SPECIAL_ANS}>152,000,000원</{SPECIAL_ANS}>"
    )
    steps = d.get("target_steps", "")
    ans = d.get("target_answer", None)
    if ans is None:
        answer = steps
    else:
        answer = f"{steps}\n\n<{SPECIAL_ANS}>{int(ans):,}원</{SPECIAL_ANS}>"
    return {"question": question, "answer": answer}


def combine_datasets(
    ds1: DatasetDict, ds2: DatasetDict, ds_dir: str, version: str
) -> DatasetDict:
    """두 DatasetDict를 합쳐서 새로운 DatasetDict 생성"""
    combined = {}
    for split in ["train", "val", "test"]:
        combined_data = ds1[split].to_list() + ds2[split].to_list()
        combined[split] = Dataset.from_list(combined_data)
    ds = DatasetDict(combined)

    # Save to disk
    ds.save_to_disk(ds_dir / version)
    print(
        f"Data length: train({len(ds['train'])}), val({len(ds['val'])}), test({len(ds['test'])})"
    )
    return ds


dataset = generate_dataset(INPUT_DATA_PATH, OUTPUT_DATA_PATH)

### Serve trained model

```bash
vllm serve "output/models/unsloth/cot/gemma-3-1b-it-final_251120" \
    --gpu-memory-utilization 0.8 \
    --max-num-seqs 512 \
    --enable-chunked-prefill \
    --port 8010
```

### Evaluation

In [None]:
import re
from time import time
from os.path import exists

import os
import asyncio

import pandas as pd
from openai import AsyncOpenAI
from tqdm.asyncio import tqdm_asyncio


def extract_answer(txt: str, marker: str = SPECIAL_ANS) -> int | None:
    # NOTE: Use findall to capture the last occurrence, avoiding examples in the prompt
    matches = re.findall(rf"<{marker}>([-+]?[\d]+[\d.,]*)원?</{marker}>", txt)
    if not matches:
        return None
    try:
        m_str = matches[-1]
        num = re.sub(r"(,|\.{2,})", "", m_str)
        return round(float(num))
    except Exception:
        return None


async def evaluate_results(
    ds, result_path: str, url: str, max_new_tokens: int = 2048
) -> pd.DataFrame:
    if exists(result_path):
        return pd.read_csv(result_path)

    # Generate predictions
    questions = list(ds["question"])

    client = AsyncOpenAI(base_url=f"{url}/v1", api_key="empty")
    semaphore = asyncio.Semaphore(256)

    model = (await client.models.list()).data[0].id

    async def safe_create(prompt: str) -> str:
        async with semaphore:
            try:
                response = await client.chat.completions.create(
                    model=model,
                    messages=[{"role": "user", "content": prompt}],
                    max_tokens=max_new_tokens,
                )
            except Exception as e:
                response = ""
            return response

    tasks = [safe_create(prompt) for prompt in questions]
    responses = await tqdm_asyncio.gather(*tasks)

    # Analyze results
    answers = [c.choices[0].message.content for c in responses]
    target_answers_pred = [extract_answer(s) for s in answers]
    target_answers = list(ds["target_answer"])
    diffs = [
        (abs(p - l) / l) if p and l else None
        for p, l in zip(target_answers_pred, target_answers)
    ]
    result = {
        "question": questions,
        "target_answer_pred": target_answers_pred,
        "target_answer": target_answers,
        "answer_pred": answers,
        "answer": list(ds["answer"]),
        "diff": diffs,
        "accuracy": [diff <= 0.3 if diff is not None else None for diff in diffs],
    }
    df = pd.DataFrame(result)
    df.to_csv(result_path, index=False)
    print(f"Result has been successfully saved to {result_path}.")
    return df


eval_dir = OUTPUT_DIR / "evaluation" / "unsloth/cot/gemma-3-1b-it-final_251120"
os.makedirs(eval_dir, exist_ok=True)

tasks = []
data_id = "test"
model_id = "final_251120"
url = "http://localhost:8010"
result_path = f"{eval_dir}/{model_id}-{data_id}.csv"
tasks.append(evaluate_results(dataset[data_id], result_path, url))

result = await asyncio.gather(*tasks)


for path in sorted(glob(f"{eval_dir}/*-test.csv")):
    df = pd.read_csv(path)
    print(f"MAP({path}): {df['diff'].mean():.2%}")