In [None]:
# === Cell 1 ===
import os, json, time, random, re, hashlib
from typing import List, Dict, Any, Optional
from tqdm import tqdm

# 試著載入 .env（若未安裝 python-dotenv 或檔案不存在，仍可略過）
try:
    from dotenv import load_dotenv
    load_dotenv()
except Exception:
    pass

# 也允許直接從環境變數讀取
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "").strip()
assert OPENAI_API_KEY, "缺少 OPENAI_API_KEY，請在 .env 或系統環境變數設定。"

from openai import OpenAI
client = OpenAI(api_key=OPENAI_API_KEY)

# 推薦用 gpt-4o（品質較好），也可換 gpt-4o-mini（省成本）
OPENAI_MODEL = "gpt-4o"

# === 產出資料量（可依預算調整）===
TOTAL_TARGET = 10_000      # 目標總筆數
TRAIN_RATIO  = 0.80
VAL_RATIO    = 0.10
TEST_RATIO   = 0.10

# 題型比例
HARD_MULTIHOP_PCT   = 0.82  # 多跳難題（強化推理） 
BASIC_KNOWLEDGE_PCT = 0.10  # 基礎知識（避免災忘）
AUGMENT_PCT         = 0.08  # 對抗/改寫補強（減少背誦）

MIN_STEPS = 3               # 每題至少 3 步
SEED = 42
random.seed(SEED)

# 輸出檔名（ChatML：train/val/test；診斷：steps 等）
OUT_CHATML_TRAIN = "train_chatml.jsonl"
OUT_CHATML_VAL   = "val_chatml.jsonl"
OUT_CHATML_TEST  = "test_chatml.jsonl"
OUT_DIAG         = "dataset_diagnostics.jsonl"   # 含 steps/depends_on，用於之後子題評測

# 系統提示：訓練時只讓 assistant 輸出最終答案（你現有的 answer-only loss 相容）
SYSTEM_TRAIN = "你會在腦中分步推理，但只輸出最後一行：最終答案：XXXX"
SYSTEM_COT = "先在腦中推理；若我要求理由摘要，請給 1~2 句；最後輸出一行：最終答案：XXXX"
  

In [2]:
# === Cell 2 ===
def norm_text(s: str) -> str:
    s = (s or "").strip()
    s = re.sub(r"\s+", " ", s)
    return s

def sha1(s: str) -> str:
    return hashlib.sha1(s.encode("utf-8")).hexdigest()

def call_gpt_json(prompt: str, *, max_retries: int = 3, temp: float = 0.7) -> Optional[Dict[str, Any]]:
    """呼叫 OpenAI，強制請求 JSON 物件；自動重試簡單失敗。"""
    for attempt in range(1, max_retries + 1):
        try:
            resp = client.chat.completions.create(
                model=OPENAI_MODEL,
                temperature=temp,
                response_format={"type": "json_object"},
                messages=[{"role": "user", "content": prompt}],
            )
            txt = resp.choices[0].message.content or ""
            return json.loads(txt)
        except Exception as e:
            if attempt == max_retries:
                print(f"[API] 放棄：{e}")
                return None
            time.sleep(1.2 * attempt)

def valid_item(d: Dict[str, Any]) -> bool:
    """
    驗證生成項目：
    - question: str
    - steps: list of {sub_question, short_answer, depends_on: [idx], justification(<=2句)}
    - final_answer: str
    - 至少 MIN_STEPS 步，且每步的 depends_on 合法
    """
    try:
        if not isinstance(d.get("question"), str): return False
        steps = d.get("steps", [])
        if not isinstance(steps, list) or len(steps) < MIN_STEPS: return False
        for i, st in enumerate(steps):
            if not isinstance(st.get("sub_question"), str): return False
            if not isinstance(st.get("short_answer"), str): return False
            deps = st.get("depends_on", [])
            if not isinstance(deps, list): return False
            # 0..i-1 之內才是合法依賴
            if any((not isinstance(x, int)) or (x < 0) or (x >= i) for x in deps):
                return False
            # justification 限制為 0~2 句，避免長篇 COT
            just = st.get("justification", "")
            if not isinstance(just, str): return False
            if len(re.split(r"[。\.!?]\s*", just.strip())) > 3:  # 3 段切分 ≈ 2 句
                return False
        if not isinstance(d.get("final_answer"), str): return False
        return True
    except Exception:
        return False

def make_chatml_entry(question: str, final_answer: str) -> Dict[str, Any]:
    """輸出一筆 ChatML 訓練樣本（assistant 只留最終答案行）。"""
    return {
        "messages": [
            {"role": "system", "content": SYSTEM_TRAIN},
            {"role": "user", "content": question},
            {"role": "assistant", "content": f"最終答案：{final_answer}"}
        ]
    }

def write_jsonl(path: str, obj: Dict[str, Any]) -> None:
    with open(path, "a", encoding="utf-8") as f:
        json.dump(obj, f, ensure_ascii=False)
        f.write("\n")


In [3]:
# === Cell 3 ===
MULTIHOP_PROMPT = f"""
你是一位資料標註員，請依下列「嚴格 JSON 格式」產生一題「高難度的多跳歷史題」：
- 每題至少 {MIN_STEPS} 步。
- 每一步 steps[i] 必須透過 depends_on 參考先前某步的結論（至少依賴一個前步），形成真正的推理鏈。
- 僅在 steps[*].justification 提供最多 2 句「簡短理由」，不要長篇推理。
- 內容真實可靠，避免年份/朝代矛盾；面向可含：制度演變、跨時代因果、國際格局轉折、史實比較、假設性情境的可行性判斷等。
- final_answer 必須能由 steps 的結論綜合得到。

JSON 格式：
{{
  "question": "題目（單句描述，要求多跳整合）",
  "steps": [
    {{
      "sub_question":  "子題 1（明確回答範圍）",
      "short_answer":   "子題 1 的簡短結論",
      "depends_on":     [],    # 第 1 步通常為 []
      "justification":  "為何此步合理（最多 2 句）"
    }},
    {{
      "sub_question":  "子題 2（需用到步驟 1 的結論）",
      "short_answer":   "子題 2 的簡短結論",
      "depends_on":     [0],
      "justification":  "最多 2 句"
    }},
    ...
  ],
  "final_answer": "綜合 steps 的最終答案（簡潔）"
}}
務必輸出有效 JSON，且 steps 至少 {MIN_STEPS} 項，且每一步的 depends_on 合法且非空（除了 i=0 可空）。
"""


In [4]:
# === Cell 4 ===
COUNTERFACTUAL_PROMPT = """
你會收到一個題目物件（JSON）。請挑選其中一個「關鍵史實」做出合理、最小幅度的改動，
導致最終答案改變，並回傳「新的完整題目物件」（相同結構：question/steps/final_answer）。
要求：
- 只改動一個關鍵事實，其餘保持一致。
- steps[*].depends_on 仍合規；justification 最多 2 句。
- 新 final_answer 與原題不同，且對應合理。
請僅輸出新的 JSON。
"""

PARAPHRASE_PROMPT = """
你會收到一個題目物件（JSON）。請「改寫 question 句子」但不改變其語義與最終答案，
步驟不變；回傳一個相同結構的 JSON，其中只有 question 的寫法不同（同義轉述、調序等）。
請僅輸出新的 JSON。
"""


In [5]:
# === Cell 5 ===
BASIC_KNOWLEDGE_PROMPT = """
請產生一題「基礎歷史/通識」問答，仍保持 JSON 結構，但 steps 僅 1~2 步、依賴關係可空或簡單，
用來幫助模型保持舊有常識，避免災難性遺忘。
限制：
- steps[*].justification 最多 2 句。
- final_answer 簡潔。
JSON 結構與多跳題一致（question/steps/final_answer）。
"""


In [6]:
# === Cell 6 ===
def gen_multihop() -> Optional[Dict[str, Any]]:
    obj = call_gpt_json(MULTIHOP_PROMPT, temp=0.8)
    return obj if obj and valid_item(obj) else None

def gen_basic() -> Optional[Dict[str, Any]]:
    obj = call_gpt_json(BASIC_KNOWLEDGE_PROMPT, temp=0.7)
    return obj if obj and valid_item(obj) else None

def make_counterfactual(src: Dict[str, Any]) -> Optional[Dict[str, Any]]:
    prompt = COUNTERFACTUAL_PROMPT + "\n原題：\n" + json.dumps(src, ensure_ascii=False)
    obj = call_gpt_json(prompt, temp=0.8)
    return obj if obj and valid_item(obj) else None

def make_paraphrase(src: Dict[str, Any]) -> Optional[Dict[str, Any]]:
    prompt = PARAPHRASE_PROMPT + "\n原題：\n" + json.dumps(src, ensure_ascii=False)
    obj = call_gpt_json(prompt, temp=0.6)
    # 只接受 question 改寫、final_answer 不變
    if obj and valid_item(obj):
        if norm_text(obj["final_answer"]) == norm_text(src["final_answer"]):
            return obj
    return None

# 全局去重集合（以 question 正規化+final_answer 雜湊避免重覆）
seen_keys = set()

def unique_key(q: str, a: str) -> str:
    return sha1(norm_text(q) + "||" + norm_text(a))

def is_new_item(q: str, a: str) -> bool:
    k = unique_key(q, a)
    if k in seen_keys: return False
    seen_keys.add(k)
    return True


In [7]:
# === Cell 7 ===
def pick_split(idx: int, total: int) -> str:
    # 固定比率切分
    if idx < int(total * TRAIN_RATIO): return "train"
    if idx < int(total * (TRAIN_RATIO + VAL_RATIO)): return "val"
    return "test"

def write_item(item: Dict[str, Any], idx: int, total: int) -> None:
    split = pick_split(idx, total)
    chatml = make_chatml_entry(item["question"], item["final_answer"])
    diag = {
        "split": split,
        "question": item["question"],
        "steps": item["steps"],
        "final_answer": item["final_answer"]
    }
    if split == "train":
        write_jsonl(OUT_CHATML_TRAIN, chatml)
    elif split == "val":
        write_jsonl(OUT_CHATML_VAL, chatml)
    else:
        write_jsonl(OUT_CHATML_TEST, chatml)
    write_jsonl(OUT_DIAG, diag)


In [8]:
# === Cell 8 ===
def generate_all():
    os.makedirs(".", exist_ok=True)
    # 清空舊檔
    for p in [OUT_CHATML_TRAIN, OUT_CHATML_VAL, OUT_CHATML_TEST, OUT_DIAG]:
        if os.path.exists(p): os.remove(p)

    n_multi = int(TOTAL_TARGET * HARD_MULTIHOP_PCT)
    n_basic = int(TOTAL_TARGET * BASIC_KNOWLEDGE_PCT)
    n_aug   = TOTAL_TARGET - n_multi - n_basic  # 對抗/改寫

    print(f"[目標] 多跳:{n_multi}  基礎:{n_basic}  增強:{n_aug}  → 總:{TOTAL_TARGET}")

    bucket: List[Dict[str, Any]] = []

    # 1) 先產多跳
    with tqdm(total=n_multi, desc="多跳題") as bar:
        attempts = 0
        while len(bucket) < n_multi and attempts < n_multi * 4:
            attempts += 1
            obj = gen_multihop()
            if not obj: 
                continue
            if not is_new_item(obj["question"], obj["final_answer"]):
                continue
            bucket.append(obj)
            bar.update(1)

    # 2) 再產基礎
    with tqdm(total=n_basic, desc="基礎題") as bar:
        got = 0
        attempts = 0
        while got < n_basic and attempts < n_basic * 4:
            attempts += 1
            obj = gen_basic()
            if not obj: 
                continue
            if not is_new_item(obj["question"], obj["final_answer"]):
                continue
            bucket.append(obj)
            got += 1
            bar.update(1)

    # 3) 增強：在現有 bucket 上做對抗 or 改寫（各半）
    random.shuffle(bucket)
    pool_for_aug = bucket[:max(1, min(len(bucket), n_aug))]
    half = len(pool_for_aug) // 2
    aug_items: List[Dict[str, Any]] = []

    with tqdm(total=len(pool_for_aug), desc="增強(對抗/改寫)") as bar:
        # 對抗
        for src in pool_for_aug[:half]:
            cf = make_counterfactual(src)
            if cf and is_new_item(cf["question"], cf["final_answer"]):
                aug_items.append(cf)
            bar.update(1)
        # 改寫
        for src in pool_for_aug[half:]:
            pf = make_paraphrase(src)
            if pf and is_new_item(pf["question"], pf["final_answer"]):
                aug_items.append(pf)
            bar.update(1)

    # 整併 & 補量（若增強不足則再補多跳）
    all_items = bucket + aug_items
    if len(all_items) < TOTAL_TARGET:
        need = TOTAL_TARGET - len(all_items)
        with tqdm(total=need, desc="補量(多跳)") as bar:
            attempts = 0
            while need > 0 and attempts < need * 5:
                attempts += 1
                obj = gen_multihop()
                if not obj: 
                    continue
                if not is_new_item(obj["question"], obj["final_answer"]):
                    continue
                all_items.append(obj)
                need -= 1
                bar.update(1)

    # 寫檔：依固定切分（train/val/test）
    for i, it in enumerate(tqdm(all_items[:TOTAL_TARGET], desc="寫入檔案")):
        write_item(it, i, TOTAL_TARGET)

    print("完成：", OUT_CHATML_TRAIN, OUT_CHATML_VAL, OUT_CHATML_TEST, OUT_DIAG)


In [9]:
# === Cell 9 ===
if __name__ == "__main__":
    print("開始生成 ChatML + 診斷資料集 ...")
    generate_all()

    # 檢查各 split 筆數
    def count_lines(p):
        return sum(1 for _ in open(p, "r", encoding="utf-8")) if os.path.exists(p) else 0

    n_tr = count_lines(OUT_CHATML_TRAIN)
    n_va = count_lines(OUT_CHATML_VAL)
    n_te = count_lines(OUT_CHATML_TEST)
    n_dg = count_lines(OUT_DIAG)
    print(f"ChatML: train={n_tr}, val={n_va}, test={n_te} | diagnostics={n_dg}")


開始生成 ChatML + 診斷資料集 ...
[目標] 多跳:8200  基礎:1000  增強:800  → 總:10000


多跳題: 100%|██████████| 8200/8200 [14:22:32<00:00,  6.31s/it]  
基礎題:   0%|          | 0/1000 [2:12:32<?, ?it/s]
增強(對抗/改寫): 100%|██████████| 800/800 [41:28<00:00,  3.11s/it]  
補量(多跳):  83%|████████▎ | 1001/1201 [1:26:16<17:14,  5.17s/it]
寫入檔案: 100%|██████████| 9800/9800 [00:07<00:00, 1232.06it/s]


完成： train_chatml.jsonl val_chatml.jsonl test_chatml.jsonl dataset_diagnostics.jsonl
ChatML: train=8000, val=1000, test=800 | diagnostics=9800


In [1]:
import json
import os
from typing import Dict, Any

# 檔案名稱
DIAG_FILE = "dataset_diagnostics.jsonl"
OUT_COT_FILE = "train_chatml_cot.jsonl"

# 確保診斷檔存在
if not os.path.exists(DIAG_FILE):
    print(f"錯誤：找不到診斷檔 {DIAG_FILE}。請先運行一次完整的資料生成腳本以創建它。")
else:
    # 這裡重新定義你需要的格式化函式，確保它與你期望的格式一致
    SYSTEM_COT = "先在腦中分步推理；若我要求理由摘要，請給 1~2 句；最後輸出一行：最終答案：XXXX"
    
    def make_chatml_entry_cot(item: Dict[str, Any]) -> Dict[str, Any]:
        """
        將完整的 JSON 項目，轉為帶詳細推理過程的 ChatML 格式。
        """
        cot_parts = []
        
        # 腦中推理：逐一呈現解題步驟
        cot_parts.append("腦中推理：")
        for i, step in enumerate(item["steps"]):
            # 顯示子問題
            cot_parts.append(f"步驟 {i+1}：")
            cot_parts.append(f"子問題：{step['sub_question']}")
            
            # 顯示依賴關係（如果有的話）
            if step["depends_on"]:
                deps_str = ", ".join([f"步驟 {j+1}" for j in step["depends_on"]])
                cot_parts.append(f"解題依據：這個問題需要依賴 {deps_str} 的結論。")
                
            # 顯示簡短結論與理由
            cot_parts.append(f"結論：{step['short_answer']}")
            cot_parts.append(f"簡要理由：{step['justification']}")
            cot_parts.append("") # 新增空行以區隔步驟
        
        # 理由摘要
        cot_parts.append("理由摘要：")
        cot_parts.append(item['steps'][-1]['justification'])
        cot_parts.append("")
        
        # 最終答案
        cot_parts.append(f"最終答案：{item['final_answer']}")
        
        assistant_content = "\n".join(cot_parts)

        return {
            "messages": [
                {"role": "system", "content": SYSTEM_COT},
                {"role": "user", "content": item["question"]},
                {"role": "assistant", "content": assistant_content}
            ]
        }
    
    # 開始轉換並寫入新檔案
    with open(DIAG_FILE, "r", encoding="utf-8") as diag_file, open(OUT_COT_FILE, "w", encoding="utf-8") as out_file:
        count = 0
        for line in diag_file:
            data = json.loads(line)
            # 只處理訓練集 split
            if data["split"] == "train":
                cot_entry = make_chatml_entry_cot(data)
                json.dump(cot_entry, out_file, ensure_ascii=False)
                out_file.write("\n")
                count += 1
    
    print(f"成功從診斷檔生成 {count} 筆 CoT 訓練資料，已寫入 {OUT_COT_FILE}")

成功從診斷檔生成 8000 筆 CoT 訓練資料，已寫入 train_chatml_cot.jsonl
