## 安装依赖

如果当前环境已经有相关依赖了，则不用执行

In [None]:
!pip install transformers[torch] datasets==3.6.0 evaluate

## 加载数据

In [None]:
from datasets import load_dataset

model_checkpoint = "bert-base-uncased"
batch_size = 16

datasets = load_dataset("swag","regular")


In [None]:
# 你可以查看dataset对象都封装了什么东西
datasets

## 数据集可视化

为了能够进一步理解数据长什么样子，下面的函数将从数据集里随机选择几个例子进行展示。

In [None]:
from datasets import ClassLabel, Sequence
import random
import pandas as pd
from IPython.display import display, HTML

def show_random_elements(dataset, num_examples=10):
  assert num_examples <= len(dataset), "Can't pick more elements than there are in the dataset."
  picks = []
  for _ in range(num_examples):
      pick = random.randint(0, len(dataset)-1)
      while pick in picks:
          pick = random.randint(0, len(dataset)-1)
      picks.append(pick)


  df = pd.DataFrame(dataset[picks])
  for column, typ in dataset.features.items():
    if isinstance(typ, ClassLabel):
        df[column] = df[column].transform(lambda i: typ.names[i])
    elif isinstance(typ, Sequence) and isinstance(typ.feature, ClassLabel):
        df[column] = df[column].transform(lambda x: [typ.feature.names[i] for i in x])
  display(HTML(df.to_html()))

show_random_elements(datasets["train"], num_examples=2)

以下是一个完整的数据样本示例：

In [None]:
def show_one(example):
    print(f"Context: {example['sent1']}")
    print(f"  A - {example['sent2']} {example['ending0']}")
    print(f"  B - {example['sent2']} {example['ending1']}")
    print(f"  C - {example['sent2']} {example['ending2']}")
    print(f"  D - {example['sent2']} {example['ending3']}")
    print(f"\nGround truth: option {['A', 'B', 'C', 'D'][example['label']]}")

show_one(datasets["train"][0])

## 数据预处理

In [None]:
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained(model_checkpoint, use_fast=True)
ending_names = ["ending0", "ending1", "ending2", "ending3"]

def preprocess_function(examples):
    # 将题目重复4次，用来匹配每一个选项
    first_sentences = [[context] * 4 for context in examples["sent1"]]
    # 获取选项头
    question_headers = examples["sent2"]
    # 把每个选项头对应的4个结尾，拼接到选项头上
    second_sentences = [[f"{header} {examples[end][i]}" for end in ending_names] for i, header in enumerate(question_headers)]

    # 将二维的展开成一维结构
    first_sentences = sum(first_sentences, [])
    second_sentences = sum(second_sentences, [])

    tokenized_examples = tokenizer(first_sentences,second_sentences,truncation=True)
    # 把每个分词器的输出重新分组，每4条数据对应一个题目的4个选项
    # 保证模型输入是(batch_size,num_choice,seq_len)的形式
    return {k: [v[i:i+4] for i in range(0, len(v), 4)] for k,v in tokenized_examples.items()}

### 代码详解

In [None]:
# first_sentences = [[context] * 4 for context in examples["sent1"]]

假设examples的数据是 examples["sent1"] = ["A man is cooking", "She is running"]

那么就会输出：

> first_sentences = [
>
>   ["A man is cooking", "A man is cooking", "A man is cooking", "A man is cooking"],
>
>   ["She is running", "She is running", "She is running", "She is running"]
>
> ]

In [None]:
# question_headers = examples["sent2"]

假设你的数据集有两个样本，因为每个样本都只会有一个sent2，所以这里的question_headers就是：

question_headers = ["He then", "She then"]

他会自动把数据集里的样本按属性进行压缩，压缩成一个list

In [None]:
# second_sentences = [[f"{header} {examples[end][i]}" for end in ending_names] for i, header in enumerate(question_headers)]

假设你的datasets的训练集有两个样本，每个样本对应4个选项，那他的数据结构就是这样的:

> examples["ending0"] = ["eats a sandwich", "jumps over a hurdle"]
>
> examples["ending1"] = ["cooks pasta", "falls down"]
>
> examples["ending2"] = ["drives a car", "wins a race"]
>
> examples["ending3"] = ["washes dishes", "sits down"]
>

然后把`examples["sent2"][i],examples["ending0"][i],examples["ending1"][i],examples["ending2"][i],examples["ending3"][i]`拼接起来：成为second_sentences：

> second_sentences = [
>
>     ["He then eats a sandwich", "He then cooks pasta", "He then drives a car", "He then washes dishes"],
>
>     ["She then jumps over a hurdle", "She then falls down", "She then wins a race", "She then sits down"]
>
> ]

In [None]:
# 因为现在处理后的first_sent和second_sent都是二维的，tokenizer需要一维的数据，所以把他展开
# first_sentences = sum(first_sentences, [])
# second_sentences = sum(second_sentences, [])

变成这样：

> first_sentences = [
>
> "A man is cooking", "A man is cooking", "A man is cooking", "A man is cooking",
>
> "She is running", "She is running", "She is running", "She is running"
>
> ]
>
> second_sentences = [
>
> "He then eats a sandwich", "He then cooks pasta", "He then drives a car", "He then washes dishes",
>
> "She then jumps over a hurdle", "She then falls down", "She then wins a race", "She then sits down"
>
> ]

In [None]:
# tokenized_examples.items()会返回一个kv对的迭代对象，类似于java的Map.entry

# return {k: [v[i:i+4] for i in range(0, len(v), 4)] for k,v in tokenized_examples.items()}

2个样本，每个样本有4个选项，所以tokenizer会输出8个特征集；

input_ids = [[101, 123, 102], [101, 456, 102], ..., 共8条]

返回时会分为：把8个特征集按4个元素平均分为各个样本；

> input_ids = [
>
>     [[101, 123, 102], [101, 456, 102], [101, 789, 102], [101, 321, 102]],  # 第1题4个选项
>
>     [[101, 654, 102], [101, 987, 102], [101, 741, 102], [101, 852, 102]]   # 第2题4个选项
>
> ]

我们把预处理后的样本特征进行解码，进一步理解：

In [None]:
# features = preprocess_function(examples)
# print([tokenizer.decode(features["input_ids"][3][i]) for i in range(4)])

可以看到，解码后的数据样本就是【题目+选项的序列对】，4个序列对组成一个样本；

> [
>
>     "[CLS] a drum line passes by walking down the street playing their instruments. [SEP] members of the procession are playing ping pong and celebrating one left each in quick. [SEP]",
>
>     "[CLS] a drum line passes by walking down the street playing their instruments. [SEP] members of the procession wait slowly towards the cadets. [SEP]",
>
>     "[CLS] a drum line passes by walking down the street playing their instruments. [SEP] members of the procession makes a square call and ends by jumping down into snowy streets where fans begin to take their positions. [SEP]",
>
>     "[CLS] a drum line passes by walking down the street playing their instruments. [SEP] members of the procession play and go back and forth hitting the drums while the audience claps for them. [SEP]"
> ]
>

同样的，我们需要将预处理后的字典数据映射会datasets供模型训练时提取；

In [None]:
encoded_datasets = datasets.map(preprocess_function, batched=True)

现在需要自定义DataLoader，让模型知道怎么从预处理的数据集中获取数据进行训练；

In [None]:
from dataclasses import dataclass
from transformers.tokenization_utils_base import PreTrainedTokenizerBase, PaddingStrategy
from typing import Optional, Union
import torch

@dataclass
class DataCollatorForMultipleChoice:
    """
    用于批处理多选数据时动态填充序列的数据收集器；
    padding默认为True，每个序列都会填充到当前批次中，选项的最大长度
    """
    tokenizer: PreTrainedTokenizerBase # 传入的分词器，用于动态填充序列
    padding: Union[bool, str, PaddingStrategy] = True # 是否填充（True 自动按最长序列，或指定 "max_length"）
    max_length: Optional[int] = None # 填充/截断到的最大长度
    pad_to_multiple_of: Optional[int] = None # 可选，将序列填充到某个倍数长度（比如 GPU Tensor Core 需要 8 的倍数）

    def __call__(self, features):
        label_name = "label" if "label" in features[0].keys() else "labels"
        # 提取所有样本的标签，并从 features 中删除标签字段（避免干扰后续 padding）
        labels = [feature.pop(label_name) for feature in features]
        batch_size = len(features)
        # 每个样本的选项数
        num_choices = len(features[0]["input_ids"])
        # 每条数据按照选项数展开
        flattened_features = [[{k: v[i] for k, v in feature.items()} for i in range(num_choices)] for feature in features]
        # 展开成一维
        flattened_features = sum(flattened_features, [])
        # 对所有展开后的序列进行动态填充，返回 PyTorch tensor 格式
        batch = self.tokenizer.pad(
            flattened_features,
            padding=self.padding,
            max_length=self.max_length,
            pad_to_multiple_of=self.pad_to_multiple_of,
            return_tensors="pt",
        )
        # 将填充好的 tensor 重新 reshape 回 (batch_size, num_choices, seq_len)，还原多选的结构
        batch = {k: v.view(batch_size, num_choices, -1) for k, v in batch.items()}
        # 将提取的标签加回去，作为最终返回的 batch
        batch["labels"] = torch.tensor(labels, dtype=torch.int64)
        return batch

测试这个函数能不能用，因为这些升维，和降维操作，都是容易出错的；

In [None]:
accepted_keys = ["input_ids", "attention_mask", "label"]
features = [{k: v for k, v in encoded_datasets["train"][i].items() if k in accepted_keys} for i in range(10)]
batch = DataCollatorForMultipleChoice(tokenizer)(features)
print(batch["input_ids"].shape)
print("=============")
print([tokenizer.decode(batch["input_ids"][8][i].tolist()) for i in range(4)])
print("=============")
show_one(datasets["train"][8])

## 封装评估函数

In [None]:
import numpy as np

def compute_metrics(eval_predictions):
    predictions, label_ids = eval_predictions
    preds = np.argmax(predictions, axis=1)
    return {"accuracy": (preds == label_ids).astype(np.float32).mean().item()}

## 构建训练参数配置器

In [None]:
from transformers import AutoModelForMultipleChoice, TrainingArguments

model = AutoModelForMultipleChoice.from_pretrained(model_checkpoint)

args = TrainingArguments(
    "test-swag",
    eval_strategy = "epoch",
    learning_rate=5e-5,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    num_train_epochs=3,
    weight_decay=0.01,
    report_to="none"
)

## 构建训练器

In [None]:
from transformers import Trainer

trainer = Trainer(
    model,
    args,
    train_dataset=encoded_datasets["train"],
    eval_dataset=encoded_datasets["validation"],
    tokenizer=tokenizer,
    data_collator=DataCollatorForMultipleChoice(tokenizer),
    compute_metrics=compute_metrics,
)

## 开始训练

In [None]:
trainer.train()

## 模型评估

In [None]:
trainer.evaluate()