A cute little demo showing the simplest usage of minGPT. Configured to run fine on Macbook Air in like a minute.

In [1]:
import torch
from torch.utils.data import Dataset
from torch.utils.data.dataloader import DataLoader
from mingpt.utils import set_seed
set_seed(3407)

In [2]:
import pickle
import torch
from torch.utils.data import Dataset

class SortDataset(Dataset):
    """ 
    排序问题的 Dataset。例如，对于长度为 6 的问题：
    输入：0 0 2 1 0 1 -> 输出：0 0 0 1 1 2
    这将连接后输入到 Transformer 中：
    input:  0 0 2 1 0 1 0 0 0 1 1
    output: I I I I I 0 0 0 1 1 2
    其中 I 是“忽略”，因为 Transformer 正在读取输入序列
    """

    def __init__(self, split, length=6, num_digits=3):
        """
        初始化 SortDataset。

        参数：
            split (str)：数据集的划分，'train' 或 'test'。
            length (int)：每个排序问题的长度。
            num_digits (int)：排序问题中使用的数字范围。
        """
        assert split in {'train', 'test'}
        self.split = split
        self.length = length
        self.num_digits = num_digits
    
    def __len__(self):
        """
        返回数据集的大小。
        """
        return 10000 # ...
    
    def get_vocab_size(self):
        """
        返回词汇表的大小，这里指数字的范围。
        """
        return self.num_digits
    
    def get_block_size(self):
        """
        返回输入 Transformer 的序列长度，包含连接的输入和输出，但 -1 因为
        Transformer 从最后一个输入元素开始进行预测。
        """
        return self.length * 2 - 1

    def __getitem__(self, idx):
        """
        获取指定索引的数据项。

        参数：
            idx (int)：数据项的索引。

        返回：
            tuple：包含输入序列和目标序列的元组。
        """
        
        # 使用拒绝采样从所需的划分中生成输入示例
        while True:
            # 生成一些随机整数
            inp = torch.randint(self.num_digits, size=(self.length,), dtype=torch.long)
            # 一半的时间尝试增加具有大量重复数字的示例数量，因为这似乎是模型在训练后期难以处理的问题，而且它们出现的频率很高
            if torch.rand(1).item() < 0.5:
                if inp.unique().nelement() > self.length // 2:
                    # 太多唯一的数字，重新采样
                    continue
            # 根据其哈希值确定这个生成的示例是训练集还是测试集
            h = hash(pickle.dumps(inp.tolist()))
            inp_split = 'test' if h % 4 == 0 else 'train' # 将 25% 的示例指定为测试集
            if inp_split == self.split:
                break # ok
        
        # 解决任务：即排序
        sol = torch.sort(inp)[0]

        # 连接问题描述和解决方案
        cat = torch.cat((inp, sol), dim=0)

        # Transformer 的输入将是偏移序列
        x = cat[:-1].clone() # 输入序列
        y = cat[1:].clone() # 目标序列
        # 我们只想预测输出位置，屏蔽掉输入位置的损失
        y[:self.length-1] = -1
        return x, y

In [3]:
# 导入 SortDataset 类，该类用于加载排序任务的数据集
from dataset import SortDataset 

# 创建训练数据集，传入 'train' 参数表示加载训练数据
train_dataset = SortDataset('train')
# 创建测试数据集，传入 'test' 参数表示加载测试数据
test_dataset = SortDataset('test')

# 从训练数据集中获取索引为 0 的样本，该样本包含待排序序列 x 和排序后的结果 y
x, y = train_dataset[0]

# 遍历待排序序列 x 和排序后的结果 y 中的每个元素
# a 表示待排序序列中的元素，b 表示排序后的结果中的元素
for a, b in zip(x,y):
    # 将 a 和 b 转换为整数类型并打印出来
    print(int(a),int(b)) 

1 -1
0 -1
1 -1
0 -1
0 -1
0 0
0 0
0 0
0 0
0 1
1 1


In [4]:
# 导入 GPT 类，该类用于创建 GPT 模型
from mingpt.model import GPT

# 获取 GPT 模型的默认配置
model_config = GPT.get_default_config()

# 设置模型类型为 'gpt-nano'，这可能是一个小型 GPT 模型
model_config.model_type = 'gpt-nano'

# 设置模型的词表大小，使用训练数据集的词表大小
model_config.vocab_size = train_dataset.get_vocab_size()

# 设置模型的块大小，使用训练数据集的块大小
model_config.block_size = train_dataset.get_block_size()

# 使用配置好的参数创建 GPT 模型实例
model = GPT(model_config)

number of parameters: 0.09M


In [5]:
# 导入 Trainer 类，该类用于训练 GPT 模型
from mingpt.trainer import Trainer

# 获取 Trainer 的默认配置
train_config = Trainer.get_default_config()

# 设置学习率为 5e-4，因为模型较小，可以使用更大的学习率
train_config.learning_rate = 5e-4 

# 设置最大迭代次数为 2000
train_config.max_iters = 2000

# 设置工作进程数量为 0，表示使用主进程进行训练
train_config.num_workers = 0

# 使用配置好的参数、模型和训练数据集创建 Trainer 对象
trainer = Trainer(train_config, model, train_dataset)

running on device cuda


In [6]:
def batch_end_callback(trainer):
    if trainer.iter_num % 100 == 0:
        print(f"iter_dt {trainer.iter_dt * 1000:.2f}ms; iter {trainer.iter_num}: train loss {trainer.loss.item():.5f}")
trainer.set_callback('on_batch_end', batch_end_callback)

trainer.run()

iter_dt 0.00ms; iter 0: train loss 1.06407
iter_dt 18.17ms; iter 100: train loss 0.14712
iter_dt 18.70ms; iter 200: train loss 0.05315
iter_dt 19.65ms; iter 300: train loss 0.04404
iter_dt 31.64ms; iter 400: train loss 0.04724
iter_dt 18.43ms; iter 500: train loss 0.02521
iter_dt 19.83ms; iter 600: train loss 0.03352
iter_dt 19.58ms; iter 700: train loss 0.00539
iter_dt 18.72ms; iter 800: train loss 0.02057
iter_dt 18.26ms; iter 900: train loss 0.00360
iter_dt 18.50ms; iter 1000: train loss 0.00788
iter_dt 20.64ms; iter 1100: train loss 0.01162
iter_dt 18.63ms; iter 1200: train loss 0.00963
iter_dt 18.32ms; iter 1300: train loss 0.02066
iter_dt 18.40ms; iter 1400: train loss 0.01739
iter_dt 18.37ms; iter 1500: train loss 0.00376
iter_dt 18.67ms; iter 1600: train loss 0.00133
iter_dt 18.38ms; iter 1700: train loss 0.00179
iter_dt 18.66ms; iter 1800: train loss 0.00079
iter_dt 18.48ms; iter 1900: train loss 0.00042


In [7]:
# now let's perform some evaluation
model.eval()

In [8]:
def eval_split(trainer, split, max_batches):
    """
    评估模型在给定数据集上的性能。

    Args:
        trainer: 训练器对象，包含模型和设备信息。
        split: 数据集分割，'train' 或 'test'。
        max_batches: 最大批次数，用于限制评估的数据量。

    Returns:
        正确预测的数量。
    """
    dataset = {'train':train_dataset, 'test':test_dataset}[split] # 根据 split 选择数据集
    n = train_dataset.length # 获取输入序列长度
    results = [] # 存储每个样本的预测结果
    mistakes_printed_already = 0 # 记录已打印的错误数量
    loader = DataLoader(dataset, batch_size=100, num_workers=0, drop_last=False) # 创建数据加载器
    for b, (x, y) in enumerate(loader): # 遍历数据批次
        x = x.to(trainer.device) # 将输入数据移动到设备
        y = y.to(trainer.device) # 将标签数据移动到设备
        # 分离输入模式
        inp = x[:, :n] 
        sol = y[:, -n:]
        # 让模型生成剩余的序列
        cat = model.generate(inp, n, do_sample=False) # 使用贪婪算法，不进行采样
        sol_candidate = cat[:, n:] # 分离填充的序列
        # 比较预测序列和真实序列
        correct = (sol == sol_candidate).all(1).cpu() # 判断预测是否完全正确
        for i in range(x.size(0)):
            results.append(int(correct[i])) # 记录预测结果
            if not correct[i] and mistakes_printed_already < 3: # 只打印最多3个错误
                mistakes_printed_already += 1
                print("GPT claims that %s sorted is %s but gt is %s" % (inp[i].tolist(), sol_candidate[i].tolist(), sol[i].tolist()))
        if max_batches is not None and b+1 >= max_batches: # 限制评估的批次数
            break
    rt = torch.tensor(results, dtype=torch.float)
    print("%s final score: %d/%d = %.2f%% correct" % (split, rt.sum(), len(results), 100*rt.mean())) # 打印评估结果
    return rt.sum()

# 从训练集和测试集中运行大量示例，并验证输出的正确性
with torch.no_grad():
    train_score = eval_split(trainer, 'train', max_batches=50)
    test_score  = eval_split(trainer, 'test',  max_batches=50)

train final score: 5000/5000 = 100.00% correct
test final score: 5000/5000 = 100.00% correct


In [9]:
# 运行一个随机给定的序列并通过模型进行预测
n = train_dataset.length  # 获取输入序列长度
inp = torch.tensor([[0, 0, 2, 1, 0, 1]], dtype=torch.long).to(trainer.device) # 创建一个随机输入序列
assert inp[0].nelement() == n # 确保输入序列长度正确
with torch.no_grad(): # 禁用梯度计算
    cat = model.generate(inp, n, do_sample=False) # 使用模型生成预测序列
sol = torch.sort(inp[0])[0] # 对输入序列进行排序，获取真实排序结果
sol_candidate = cat[:, n:] # 获取预测的排序结果
print('input sequence  :', inp.tolist()) # 打印输入序列
print('predicted sorted:', sol_candidate.tolist()) # 打印预测的排序结果
print('gt sort         :', sol.tolist()) # 打印真实的排序结果
print('matches         :', bool((sol == sol_candidate).all())) # 判断预测结果是否与真实结果一致

input sequence  : [[0, 0, 2, 1, 0, 1]]
predicted sorted: [[0, 0, 0, 1, 1, 2]]
gt sort         : [0, 0, 0, 1, 1, 2]
matches         : True
