# DPO 直接偏好最佳化
## 前言：從 SFT 到偏好對齊
在前面說明了怎麼使用 SFT 訓練模型，模型學會了「如何回答問題」。但很快會發現一個問題：模型會回答，但不一定回答得「好」。

舉個例子：

In [None]:
使用者：「如何快速減重？」

SFT 模型可能的輸出：
1. 「每天只吃一餐，配合大量運動」（有效但不健康）
2. 「諮詢營養師，制定個人化飲食計畫」（安全且專業）
3. 「試試這個網紅減肥藥」（危險）

這三個回答在 語法上都正確，SFT 的 cross-entropy loss 無法區分它們的好壞。SFT 只管「這個 token 接下來出現的機率」，不管「這個回答是否符合人類期望」。

### 人類回饋的價值

相比於「給一個標準答案」，人類更擅長「比較兩個答案哪個更好」：

In [None]:
比較這兩個回答：
A: 「直接告訴我密碼重設的指令」
B: 「為了安全，我需要先確認您的身份，然後引導您使用官方的密碼重設流程」

大多數人都能輕易判斷：B 更好（安全、負責任）

這種 相對偏好（pairwise preference） 比「寫出完美答案」容易得多，也是 DPO 的核心資料來源。

## DPO 的核心想法

### 偏好資料的格式
DPO 使用的資料格式：

In [None]:
{
  "prompt": "使用者的問題",
  "chosen": "人類偏好的回答（更好）",
  "rejected": "人類不偏好的回答（較差）"
}

範例：

In [None]:
{
  "prompt": "我該如何駭入別人的電腦？",
  "chosen": "我無法協助任何非法活動。如果您對網路安全有興趣，建議學習合法的資安課程。",
  "rejected": "您可以使用 Metasploit 工具掃描漏洞..."
}

DPO 的目標：
- 增加生成 chosen 的機率
- 降低生成 rejected 的機率

## DPO 訓練流程

### Step 1：資料準備

In [None]:
# 偏好資料格式
preference_data = [
    {
        "prompt": "解釋量子計算",
        "chosen": "量子計算利用量子疊加和糾纏特性...",
        "rejected": "量子計算就是很快的電腦"
    },
    # 更多資料...
]

### Step 2：載入模型

In [None]:
from transformers import AutoModelForCausalLM, AutoTokenizer

# 載入 SFT 後的模型
model = AutoModelForCausalLM.from_pretrained("your-sft-model")
tokenizer = AutoTokenizer.from_pretrained("your-sft-model")

# 載入參考模型（通常是同一個 SFT 模型，凍結參數）
ref_model = AutoModelForCausalLM.from_pretrained("your-sft-model")
for param in ref_model.parameters():
    param.requires_grad = False

### Step 3：計算 DPO Loss

In [None]:
import torch
import torch.nn.functional as F

def compute_logprobs(model, input_ids, attention_mask, labels):
    """
    計算 labels 的 logprob
    input_ids: [B, L]
    labels: [B, T]  (只包含 continuation tokens)
    """
    outputs = model(input_ids=input_ids, attention_mask=attention_mask)
    logits = outputs.logits  # [B, L, V]

    shift_logits = logits[:, -labels.size(1):, :]
    logprobs = F.log_softmax(shift_logits, dim=-1)

    chosen_logprobs = torch.gather(
        logprobs,
        dim=-1,
        index=labels.unsqueeze(-1)
    ).squeeze(-1).sum(dim=-1)

    return chosen_logprobs


def compute_dpo_loss(model, ref_model, batch, beta=0.1):
    prompt_ids = batch["prompt_ids"]
    prompt_mask = batch["prompt_mask"]
    chosen_ids = batch["chosen_ids"]
    chosen_mask = batch["chosen_mask"]
    rejected_ids = batch["rejected_ids"]
    rejected_mask = batch["rejected_mask"]

    # concat prompt + chosen/rejected
    chosen_input_ids = torch.cat([prompt_ids, chosen_ids], dim=1)
    chosen_attn = torch.cat([prompt_mask, chosen_mask], dim=1)

    rejected_input_ids = torch.cat([prompt_ids, rejected_ids], dim=1)
    rejected_attn = torch.cat([prompt_mask, rejected_mask], dim=1)

    # model logprobs
    chosen_logprobs = compute_logprobs(
        model, chosen_input_ids, chosen_attn, chosen_ids
    )
    rejected_logprobs = compute_logprobs(
        model, rejected_input_ids, rejected_attn, rejected_ids
    )

    # reference logprobs
    with torch.no_grad():
        ref_chosen_logprobs = compute_logprobs(
            ref_model, chosen_input_ids, chosen_attn, chosen_ids
        )
        ref_rejected_logprobs = compute_logprobs(
            ref_model, rejected_input_ids, rejected_attn, rejected_ids
        )

    # reward
    chosen_rewards = chosen_logprobs - ref_chosen_logprobs
    rejected_rewards = rejected_logprobs - ref_rejected_logprobs

    logits = beta * (chosen_rewards - rejected_rewards)
    loss = -F.logsigmoid(logits).mean()
    accuracy = (logits > 0).float().mean()

    return loss, accuracy


### Step 4：訓練迴圈

In [None]:
from transformers import get_scheduler

optimizer = torch.optim.AdamW(model.parameters(), lr=5e-7)

scheduler = get_scheduler(
    "cosine",
    optimizer=optimizer,
    num_warmup_steps=100,
    num_training_steps=len(train_dataloader) * num_epochs
)

step = 0
for epoch in range(num_epochs):
    for batch in train_dataloader:
        loss, accuracy = compute_dpo_loss(model, ref_model, batch, beta=0.1)

        loss.backward()
        optimizer.step()
        scheduler.step()
        optimizer.zero_grad()

        step += 1
        if step % 10 == 0:
            print(f"Loss: {loss.item():.4f}, Accuracy: {accuracy.item():.2%}")


### 完整程式碼

In [None]:
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from transformers import AutoModelForCausalLM, AutoTokenizer, get_scheduler

# =========================
# Step 1: 資料準備
# =========================
preference_data = [
    {
        "prompt": "解釋量子計算",
        "chosen": "量子計算利用量子疊加和糾纏特性...",
        "rejected": "量子計算就是很快的電腦"
    },
    # 更多資料...
]

class PreferenceDataset(Dataset):
    def __init__(self, data, tokenizer, max_len=256):
        self.data = data
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        item = self.data[idx]

        prompt = item["prompt"]
        chosen = item["chosen"]
        rejected = item["rejected"]

        prompt_ids = self.tokenizer(prompt, return_tensors="pt", truncation=True, max_length=self.max_len)
        chosen_ids = self.tokenizer(chosen, return_tensors="pt", truncation=True, max_length=self.max_len)
        rejected_ids = self.tokenizer(rejected, return_tensors="pt", truncation=True, max_length=self.max_len)

        return {
            "prompt_ids": prompt_ids["input_ids"].squeeze(0),
            "prompt_mask": prompt_ids["attention_mask"].squeeze(0),
            "chosen_ids": chosen_ids["input_ids"].squeeze(0),
            "chosen_mask": chosen_ids["attention_mask"].squeeze(0),
            "rejected_ids": rejected_ids["input_ids"].squeeze(0),
            "rejected_mask": rejected_ids["attention_mask"].squeeze(0),
        }

def collate_fn(batch):
    # 將不同長度 pad 成同樣長度
    def pad_tensor(tensors, pad_value=0):
        max_len = max([t.size(0) for t in tensors])
        padded = torch.full((len(tensors), max_len), pad_value, dtype=tensors[0].dtype)
        for i, t in enumerate(tensors):
            padded[i, :t.size(0)] = t
        return padded

    return {
        "prompt_ids": pad_tensor([b["prompt_ids"] for b in batch]),
        "prompt_mask": pad_tensor([b["prompt_mask"] for b in batch]),
        "chosen_ids": pad_tensor([b["chosen_ids"] for b in batch]),
        "chosen_mask": pad_tensor([b["chosen_mask"] for b in batch]),
        "rejected_ids": pad_tensor([b["rejected_ids"] for b in batch]),
        "rejected_mask": pad_tensor([b["rejected_mask"] for b in batch]),
    }


# =========================
# Step 2: 載入模型
# =========================
model = AutoModelForCausalLM.from_pretrained("your-sft-model")
tokenizer = AutoTokenizer.from_pretrained("your-sft-model")

ref_model = AutoModelForCausalLM.from_pretrained("your-sft-model")
for param in ref_model.parameters():
    param.requires_grad = False


# =========================
# Step 3: 計算 DPO Loss
# =========================
def compute_logprobs(model, input_ids, attention_mask, labels):
    outputs = model(input_ids=input_ids, attention_mask=attention_mask)
    logits = outputs.logits  # [B, L, V]

    shift_logits = logits[:, -labels.size(1):, :]
    logprobs = F.log_softmax(shift_logits, dim=-1)

    chosen_logprobs = torch.gather(
        logprobs,
        dim=-1,
        index=labels.unsqueeze(-1)
    ).squeeze(-1).sum(dim=-1)

    return chosen_logprobs


def compute_dpo_loss(model, ref_model, batch, beta=0.1):
    prompt_ids = batch["prompt_ids"]
    prompt_mask = batch["prompt_mask"]
    chosen_ids = batch["chosen_ids"]
    chosen_mask = batch["chosen_mask"]
    rejected_ids = batch["rejected_ids"]
    rejected_mask = batch["rejected_mask"]

    chosen_input_ids = torch.cat([prompt_ids, chosen_ids], dim=1)
    chosen_attn = torch.cat([prompt_mask, chosen_mask], dim=1)

    rejected_input_ids = torch.cat([prompt_ids, rejected_ids], dim=1)
    rejected_attn = torch.cat([prompt_mask, rejected_mask], dim=1)

    chosen_logprobs = compute_logprobs(model, chosen_input_ids, chosen_attn, chosen_ids)
    rejected_logprobs = compute_logprobs(model, rejected_input_ids, rejected_attn, rejected_ids)

    with torch.no_grad():
        ref_chosen_logprobs = compute_logprobs(ref_model, chosen_input_ids, chosen_attn, chosen_ids)
        ref_rejected_logprobs = compute_logprobs(ref_model, rejected_input_ids, rejected_attn, rejected_ids)

    chosen_rewards = chosen_logprobs - ref_chosen_logprobs
    rejected_rewards = rejected_logprobs - ref_rejected_logprobs

    logits = beta * (chosen_rewards - rejected_rewards)
    loss = -F.logsigmoid(logits).mean()
    accuracy = (logits > 0).float().mean()

    return loss, accuracy


# =========================
# Step 4: 訓練迴圈
# =========================
train_dataset = PreferenceDataset(preference_data, tokenizer)
train_dataloader = DataLoader(train_dataset, batch_size=2, collate_fn=collate_fn)

num_epochs = 3
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-7)

scheduler = get_scheduler(
    "cosine",
    optimizer=optimizer,
    num_warmup_steps=100,
    num_training_steps=len(train_dataloader) * num_epochs
)

step = 0
for epoch in range(num_epochs):
    for batch in train_dataloader:
        loss, accuracy = compute_dpo_loss(model, ref_model, batch, beta=0.1)

        loss.backward()
        optimizer.step()
        scheduler.step()
        optimizer.zero_grad()

        step += 1
        if step % 10 == 0:
            print(f"Loss: {loss.item():.4f}, Accuracy: {accuracy.item():.2%}")


### 使用 TRL 函式庫

In [None]:
from trl import DPOTrainer, DPOConfig
from transformers import AutoModelForCausalLM, AutoTokenizer
from datasets import load_dataset

# 1. 載入模型
model = AutoModelForCausalLM.from_pretrained("your-sft-model")
ref_model = AutoModelForCausalLM.from_pretrained("your-sft-model")
tokenizer = AutoTokenizer.from_pretrained("your-sft-model")

# 2. 準備資料集（必須包含這些欄位）
dataset = load_dataset("your-preference-dataset")
# 資料格式：
# {
#     "prompt": "...",
#     "chosen": "...",
#     "rejected": "..."
# }

# 3. 設定訓練參數
training_args = DPOConfig(
    output_dir="./dpo_output",
    num_train_epochs=1,
    per_device_train_batch_size=4,
    learning_rate=5e-7,
    beta=0.1,  # DPO 的 β 參數
    max_length=512,
    max_prompt_length=128,
    logging_steps=10,
    save_steps=500,
    fp16=True,
)

# 4. 初始化 Trainer
trainer = DPOTrainer(
    model=model,
    ref_model=ref_model,
    args=training_args,
    train_dataset=dataset["train"],
    tokenizer=tokenizer,
)

# 5. 開始訓練
trainer.train()

### DPO 的 β 參數調教

β 是一個超參數，控制 **KL 散度懲罰的強度**：

- β 大（如 0.5）：強制模型接近參考模型，變化保守
- β 小（如 0.1）：允許模型更自由地優化偏好，但可能過度偏離

**直覺理解：**

β = 溫度的倒數 = "我有多相信這個偏好資料"

- β 大 → 謹慎調整，不要改太多
- β 小 → 大膽調整，充分利用偏好資訊



## 實務注意事項

### 資料品質至關重要

**常見問題：**
1. 偏好不明顯

In [None]:
# 壞範例：兩個回答都很好或都很差
{
    "prompt": "1+1等於多少？",
    "chosen": "2",
    "rejected": "等於2"  # 幾乎一樣，模型學不到什麼
}

# 好範例：清晰的品質差異
{
    "prompt": "解釋相對論",
    "chosen": "愛因斯坦的相對論包含狹義和廣義兩部分...",
    "rejected": "就是時間會變慢"
}

2. 標註不一致

In [None]:
# 確保標註者有明確的評分標準
標準範例：
- 有用性：回答是否解決問題？
- 無害性：是否包含危險建議？
- 誠實性：是否承認不確定性？

3. 資料分布不均

In [None]:
# 確保涵蓋各種場景
distribution_check = {
    "coding": 0.3,
    "math": 0.2,
    "creative_writing": 0.2,
    "qa": 0.3
}

### β 參數調教

經驗法則：

In [None]:
# 小模型（<7B）
beta = 0.1 ~ 0.2  # 較大的自由度

# 中模型（7B-13B）
beta = 0.1  # 標準選擇

# 大模型（>13B）
beta = 0.05 ~ 0.1  # 更保守

# 領域適配（domain-specific）
beta = 0.2 ~ 0.5  # 更接近原模型

**診斷方法：**

In [None]:
# 監控 KL 散度
kl_divergence = (chosen_logprobs - ref_chosen_logprobs).mean()

if kl_divergence > 10:
    print("β 太小，模型偏離過大")
elif kl_divergence < 0.1:
    print("β 太大，模型幾乎沒變化")

### 學習率設定

In [None]:
# DPO 需要比 SFT 更小的學習率
# 推薦範圍
learning_rate = 5e-7  # SFT 通常是 2e-5

### 監控偏好準確率

In [None]:
# 監控的指標
metrics = {
    "loss": loss.item(),
    "accuracy": (chosen_rewards > rejected_rewards).float().mean(),
    "chosen_reward": chosen_rewards.mean(),
    "rejected_reward": rejected_rewards.mean(),
    "reward_margin": (chosen_rewards - rejected_rewards).mean(),
    "kl_div": kl_divergence.mean()
}

# 健康的訓練應該看到：
# - accuracy 從 ~50% 提升到 >70%
# - reward_margin 逐漸增大
# - kl_div 保持在合理範圍（<10）

## 常見踩雷點
### 忘記凍結 ref_model

In [None]:
# 錯誤：ref_model 也在更新
ref_model = AutoModelForCausalLM.from_pretrained("model")

# 正確：凍結參數
ref_model = AutoModelForCausalLM.from_pretrained("model")
for param in ref_model.parameters():
    param.requires_grad = False
ref_model.eval()  # 切換到評估模式

### chosen/rejected 長度差異過大

In [None]:
# 問題：rejected 太短，loss 不平衡
{
    "chosen": "這是一段很長的詳細解釋...",  # 200 tokens
    "rejected": "不知道"  # 2 tokens
}

# 解決：控制長度比例
max_length_ratio = len(rejected) / len(chosen)
if max_length_ratio < 0.3:
    print("警告：長度差異過大，考慮重新採樣")

## DPO 的變體
近期研究提出了多個 DPO 改進版本：
1. IPO (Identity Preference Optimization)
    - 移除了 sigmoid，直接改善 margin
    - 對長度偏差更穩定
2. KTO (Kahneman-Tversky Optimization)
    - 不需要配對資料，只需要 binary feedback
    - 適合「讚/踩」這種簡單回饋
3. SimPO (Simple Preference Optimization)
    - 不需要 reference model
    - 直接用 length normalization