# 5-DPO

直接偏好优化（Direct Preference Optimization，DPO）是后训练阶段中，使用正反样例激励大模型产生符合人类偏好的回答的策略，为人类反馈强化学习（Reinforcement Learning from Human Feedback, RLHF）提供了一个高效简化的替代方案。通过这一阶段的训练，大模型将会学会依照人类的喜好生成回复.

在这个笔记本中，我们仅对 DPO 的训练流程进行展示和学习，因此只给出必要的代码片段，如 wandb 和 ddp 不会在此笔记本中涉及.

此笔记本的完整实现见主仓库 `/minimind/train_dpo.py`

In [1]:
# 导入依赖
import os
import platform
import argparse
import time
import math
import warnings

import pandas as pd
import torch
import torch.nn.functional as F
import torch.distributed as dist
from contextlib import nullcontext

from torch import optim, nn
from torch.nn.parallel import DistributedDataParallel
from torch.utils.data import DataLoader, DistributedSampler
from transformers import AutoTokenizer, AutoModelForCausalLM
from model.model import MiniMindLM
from model.LMConfig import LMConfig
from model.dataset import DPODataset

In [2]:
warnings.filterwarnings('ignore')

## 可选参数设置

首先，查看训练的可选参数，这些参数在实际使用时通过命令行导入，为了保持笔记本的易用性，选择用 class 进行包装.

In [3]:
class args:
    # out_dir: str = "out" # pytorch 格式权重文件保存位置 我们只展示训练过程 所以不使用
    epochs: int = 1 # 训练轮数
    batch_size: int = 2 # pretrain 数据集仅两个样本，设置 batch 为 2
    # sft阶段学习率为 「5e-6」->「5e-7」长度512，建议离线正负样本「概率」偏好对齐阶段lr <=「1e-8」长度3000，否则很容易遗忘训坏
    learning_rate: float = 5e-4 # 学习率
    device: str = 'cuda' if torch.cuda.is_available() else 'cpu'
    dtype: str = 'bfloat16' # 16 bit 浮点数：8 bit 指数 + 7 bit 尾数
    # use_wandb: bool = False # 是否使用 wandb 我们不使用
    wandb_project: str = 'MiniMind-Notebook'
    num_workers: int = 1 # 工作进程数
    # ddp：bool = False # 单机多卡
    accumulation_steps: int = 1 # 梯度累积步数
    grad_clip: float = 1.0 # 梯度剪裁
    warmup_iters: int = 0 # 学习率热启动
    log_interval: int = 1 # 每一步打印日志 仅用于观察
    # save_interval: int = 100 # checkpoint 保存点 我们不使用
    local_rank: int = 1 # device 设备号
    dim: int = 512 # 词嵌入维度 模型超参数
    n_layers: int = 1 # MiniMind Block 数量 模型超参数 | 由于 dpo 要加载两个模型 我们出于演示目的设定 n_layers = 1
    max_seq_len: int = 512 # 序列长度阈值
    use_moe: bool = False # 是否启用混合专家
    data_path: str = './toydata/dpo_data.jsonl' # 数据集路径

In [4]:
print(f'查看工作设备 {args.device}')

查看工作设备 cuda


## 初始化训练

接下来，我们对一些重要模块进行初始化，我们已经了解过，分词器，模型和数据集是大模型的基本组件，我们对其进行初始化.

> 在这一阶段 我们调整的是大模型的问答偏好 因此与 sft 阶段同理 我们需要载入在 sft 阶段微调好的问答模型

In [5]:
def init_model(lm_config):
    """注意 在此处我们注释了加载模型部分代码 但应当认识到 actor 和 ref 两个模型都具备 sft 阶段获得的权重"""
    tokenizer = AutoTokenizer.from_pretrained('../model/minimind_tokenizer')
    model = MiniMindLM(lm_config)
    moe_path = '_moe' if lm_config.use_moe else ''
    # ckp = f'./out/full_sft_{lm_config.dim}{moe_path}.pth' # 指示上一阶段训练保存的模型文件位置
    # state_dict = torch.load(ckp, map_location=args.device) # 载入模型状态字典
    # model.load_state_dict(state_dict, strict=False) # 装入模型
    # 初始化参考模型
    ref_model = MiniMindLM(lm_config)
    # ref_model.load_state_dict(state_dict, strict=False)
    ref_model.eval()
    ref_model.requires_grad_(False)

    print(f'LLM总参数量：{sum(p.numel() for p in model.parameters() if p.requires_grad) / 1e6:.3f} 百万')
    model = model.to(args.device)
    ref_model = ref_model.to(args.device)

    return model, ref_model, tokenizer

In [6]:
lm_config = LMConfig(dim=args.dim, n_layers=args.n_layers, max_seq_len=args.max_seq_len, use_moe=args.use_moe)
model, ref_model ,tokenizer = init_model(lm_config)

train_ds = DPODataset(args.data_path, tokenizer, max_length=lm_config.max_seq_len)

train_loader = DataLoader(
    train_ds,
    batch_size=args.batch_size,
    pin_memory=True,
    drop_last=False,
    shuffle=False,
    num_workers=args.num_workers,
)

print(f'模型位于设备：{model.device}, 词表长度：{tokenizer.vocab_size}, DataLoader：{train_loader}')

LLM总参数量：6.096 百万
模型位于设备：cuda:0, 词表长度：6400, DataLoader：<torch.utils.data.dataloader.DataLoader object at 0x000001AF0BBB71C0>


In [7]:
loader = iter(train_loader)
print(f'打印一个 iter 的数据:\n{next(loader)}\n')
print(f'数据集大小：{len(train_ds)}, DataLoader 大小：{len(loader)}')

打印一个 iter 的数据:
{'x_chosen': tensor([[  1,  85, 736,  ...,   0,   0,   0],
        [  1,  85, 736,  ...,   0,   0,   0]]), 'y_chosen': tensor([[ 85, 736, 201,  ...,   0,   0,   0],
        [ 85, 736, 201,  ...,   0,   0,   0]]), 'mask_chosen': tensor([[0, 0, 0,  ..., 0, 0, 0],
        [0, 0, 0,  ..., 0, 0, 0]]), 'x_rejected': tensor([[  1,  85, 736,  ...,   0,   0,   0],
        [  1,  85, 736,  ...,   0,   0,   0]]), 'y_rejected': tensor([[ 85, 736, 201,  ...,   0,   0,   0],
        [ 85, 736, 201,  ...,   0,   0,   0]]), 'mask_rejected': tensor([[0, 0, 0,  ..., 0, 0, 0],
        [0, 0, 0,  ..., 0, 0, 0]])}

数据集大小：2, DataLoader 大小：1


我们发现，train loader 的每一个 iter 都包含一个拥有六个键值对的字典，这是因为 train_dataset 每一次取数据都会返回:

- chosen 样本 X: 包含 \<bos> 在内的输入 content
- chosen 标签 Y: 包含 \<eos> 在内的输出 content
- chosen 掩码 loss_mask: 指示需要计算损失的 token 位置
- rejected 样本 X: 包含 \<bos> 在内的输入 content
- rejected 标签 Y: 包含 \<eos> 在内的输出 content
- rejected 掩码 loss_mask: 指示需要计算损失的 token 位置

由于我们的数据集只有两条数据，而 batch size 设置为 2，因此我们的 dataloader 只有一个 iter.

# 启动训练

训练一个深度学习模型，还涉及到了优化器，损失函数和学习率调度. 接下来，我们查看 MiniMind 训练部分的代码，并进行一轮简单的训练.

> DPO 阶段涉及 DPO 损失函数涉及 因此与前两个阶段相比内容略有增加 不过整体流程与逻辑类似

In [8]:
# 学习率调度方面 采用余弦退火学习率
def get_lr(current_step, total_steps, lr):
    return lr / 10 + 0.5 * lr * (1 + math.cos(math.pi * current_step / total_steps))

# 优化器方面 选择 AdamW 优化器 并在混精度场景下创建 scaler 进行梯度缩放避免数值下溢
scaler = torch.cuda.amp.GradScaler(enabled=(args.dtype in ['float16', 'bfloat16']))
optimizer = optim.AdamW(model.parameters(), lr=args.learning_rate)

device_type = "cuda" if "cuda" in args.device else "cpu"
ctx = nullcontext() if device_type == "cpu" else torch.cuda.amp.autocast() # 在 cuda 上启动混精度训练，否则空白上下文

DPO 的原理是增加偏好样本的对数概率与减小非偏好样本响应的对数概率.

该阶段引入 DPO 损失函数，通过计算选择样本和拒绝样本的对数比率，然后基于这些比率计算 DPO 损失，适用于偏好学习任务.

In [9]:
def logits_to_probs(logits, labels):
    # logits shape: (batch_size, seq_len, vocab_size)
    # labels shape: (batch_size, seq_len)
    # probs shape: (batch_size, seq_len)
    log_probs = F.log_softmax(logits, dim=2)
    probs = torch.gather(log_probs, dim=2, index=labels.unsqueeze(2)).squeeze(-1)
    return probs


def dpo_loss(ref_probs, probs, beta):
    # ref_probs 和 probs 都是 shape: (batch_size, seq_len)
    # 计算每个样本的平均概率
    ref_probs = ref_probs.mean(dim=1)
    probs = probs.mean(dim=1)

    # 将 chosen 和 rejected 数据分开
    batch_size = ref_probs.shape[0]
    chosen_ref_probs = ref_probs[:batch_size // 2]
    reject_ref_probs = ref_probs[batch_size // 2:]
    chosen_probs = probs[:batch_size // 2]
    reject_probs = probs[batch_size // 2:]

    # 计算对数比率，比较偏好差异
    pi_logratios = chosen_probs - reject_probs
    ref_logratios = chosen_ref_probs - reject_ref_probs
    logits = pi_logratios - ref_logratios
    loss = -F.logsigmoid(beta * logits)
    return loss.mean()

接下来，我们来看看 MiniMind 的训练函数

In [10]:
def train_epoch(epoch):
    start_time = time.time()
    for step, batch in enumerate(train_loader):
        # 提取数据
        x_chosen = batch['x_chosen'].to(args.device)
        x_rejected = batch['x_rejected'].to(args.device)
        y_chosen = batch['y_chosen'].to(args.device)
        y_rejected = batch['y_rejected'].to(args.device)
        mask_chosen = batch['mask_chosen'].to(args.device)
        mask_rejected = batch['mask_rejected'].to(args.device)
        # 正反例拼接
        x = torch.cat([x_chosen, x_rejected], dim=0)
        y = torch.cat([y_chosen, y_rejected], dim=0)
        mask = torch.cat([mask_chosen, mask_rejected], dim=0)

        lr = get_lr(epoch * iter_per_epoch + step, args.epochs * iter_per_epoch, args.learning_rate)
        for param_group in optimizer.param_groups:
            param_group['lr'] = lr

        with ctx:
            with torch.no_grad(): # 计算 ref 模型输出
                ref_outputs = ref_model(x)
                ref_logits = ref_outputs.logits
            ref_probs = logits_to_probs(ref_logits, y)
            ref_probs = ref_probs * mask # 得到 ref 概率
            outputs = model(x) # 计算 actor 模型输出
            logits = outputs.logits
            probs = logits_to_probs(logits, y)
            probs = probs * mask # 得到 actor 概率
            loss = dpo_loss(ref_probs, probs, beta=0.1) # dpo 损失
            loss = loss / args.accumulation_steps

        scaler.scale(loss).backward() # 梯度缩放

        if (step + 1) % args.accumulation_steps == 0:
            scaler.unscale_(optimizer)
            torch.nn.utils.clip_grad_norm_(model.parameters(), args.grad_clip) # 梯度剪裁
            scaler.step(optimizer)
            scaler.update()
            optimizer.zero_grad(set_to_none=True)

        if step % args.log_interval == 0:
            spend_time = time.time() - start_time
            print(
                'Epoch:[{}/{}]({}/{}) loss:{:.3f} lr:{:.12f} epoch_Time:{}min:'.format(
                    epoch + 1,
                    args.epochs,
                    step,
                    iter_per_epoch,
                    loss.item(),
                    optimizer.param_groups[-1]['lr'],
                    spend_time / (step + 1) * iter_per_epoch // 60 - spend_time // 60))

        # 到达指定保存步数时，save as PyTorch
        # if (step + 1) % args.save_interval == 0 and (not ddp or dist.get_rank() == 0):
        #     model.eval()
        #     moe_path = '_moe' if lm_config.use_moe else ''
        #     ckp = f'{args.save_dir}/rlhf_{lm_config.dim}{moe_path}.pth'

        #     if isinstance(model, torch.nn.parallel.DistributedDataParallel):
        #         state_dict = model.module.state_dict()
        #     else:
        #         state_dict = model.state_dict()

        #     torch.save(state_dict, ckp)
        #     model.train()

准备完毕，我们尝试一轮长度 1 个 iter 的训练.

In [11]:
iter_per_epoch = len(train_loader)
for epoch in range(args.epochs):
    train_epoch(epoch)

Epoch:[1/1](0/1) loss:0.694 lr:0.000550000000 epoch_Time:0.0min:


In [12]:
del model