In [1]:
# 固定种子
from my_mingpt.utils import set_seed
set_seed(42)

# 获得数据集
from torch.utils.data import Dataset
import pickle
import torch
class SortDataset(Dataset):
    """ 
    Dataset for the Sort problem. E.g. for problem length 6:
    Input: 0 0 2 1 0 1 -> Output: 0 0 0 1 1 2
    Which will feed into the transformer concatenated as:
    input:  0 0 2 1 0 1 0 0 0 1 1
    output: I I I I I 0 0 0 1 1 2
    where I is "ignore", as the transformer is reading the input sequence
    """

    def __init__(self, split, length=6, num_digits=3):
        assert split in {'train', 'test'}
        self.split = split
        self.length = length
        self.num_digits = num_digits
    
    def __len__(self):
        return 10000 # ...
    
    def get_vocab_size(self):
        return self.num_digits
    
    def get_block_size(self):
        # the length of the sequence that will feed into transformer, 
        # containing concatenated input and the output, but -1 because
        # the transformer starts making predictions at the last input element
        return self.length * 2 - 1

    def __getitem__(self, idx):
        
        # use rejection sampling to generate an input example from the desired split
        while True:
            # generate some random integers
            inp = torch.randint(self.num_digits, size=(self.length,), dtype=torch.long)
            # half of the time let's try to boost the number of examples that 
            # have a large number of repeats, as this is what the model seems to struggle
            # with later in training, and they are kind of rate
            if torch.rand(1).item() < 0.5:
                if inp.unique().nelement() > self.length // 2:
                    # too many unqiue digits, re-sample
                    continue
            # figure out if this generated example is train or test based on its hash
            h = hash(pickle.dumps(inp.tolist()))
            inp_split = 'test' if h % 4 == 0 else 'train' # designate 25% of examples as test
            if inp_split == self.split:
                break # ok
        
        # solve the task: i.e. sort
        sol = torch.sort(inp)[0]

        # concatenate the problem specification and the solution
        cat = torch.cat((inp, sol), dim=0)

        # the inputs to the transformer will be the offset sequence
        x = cat[:-1].clone()
        y = cat[1:].clone()
        # we only want to predict at output locations, mask out the loss at the input locations
        y[:self.length-1] = -1
        return x, y
    
# print an example instance of the dataset
train_data = SortDataset('train')
test_data = SortDataset('test')
x, y = train_data[0]
print('某样本的x:', x)
print('某样本的y:', y)
print(f'训练集样本数量{len(train_data)}, 测试集样本数量{len(test_data)}')

某样本的x: tensor([0, 2, 1, 1, 0, 2, 0, 0, 1, 1, 2])
某样本的y: tensor([-1, -1, -1, -1, -1,  0,  0,  1,  1,  2,  2])
训练集样本数量10000, 测试集样本数量10000


In [2]:
# 创建模型
from my_mingpt.model import minGPT
model_cfg = minGPT.get_default_config()
print('默认配置参数:')
print(model_cfg)
cfg = {
    'n_layer': 3,
    'n_head': 3,
    'n_embd': 48,
    'vocab_size': train_data.get_vocab_size(),
    'block_size': train_data.get_block_size(),
}
model_cfg.merge_from_dict(cfg)
print('设置完后的配置:')
print(model_cfg)

model = minGPT(model_cfg)

默认配置参数:
n_layer: None
n_head: None
n_embd: None
vocab_size: None
block_size: None
embd_pdrop: 0.1
resid_pdrop: 0.1
atten_pdrop: 0.1

设置完后的配置:
n_layer: 3
n_head: 3
n_embd: 48
vocab_size: 3
block_size: 11
embd_pdrop: 0.1
resid_pdrop: 0.1
atten_pdrop: 0.1



In [3]:
from my_mingpt.train import Trainer
train_cfg = Trainer.get_default_config()
print('默认训练参数:')
print(train_cfg)
cfg = {
    'device': 'cuda',
    'Epochs': 10,
}
train_cfg.merge_from_dict(cfg)
print('设置完后的训练参数:')
print(train_cfg)

optimizer = torch.optim.Adam(model.parameters(), lr=5e-4)
trainer = Trainer(train_cfg, model, optimizer, train_data, test_data)
trainer.run_train()

默认训练参数:
device: auto
Epochs: None
batch_size: 64
grad_norm_clip: 1.0

设置完后的训练参数:
device: cuda
Epochs: 10
batch_size: 64
grad_norm_clip: 1.0

Epoch: 1 --------------------------------

loss:   1.11 | 64/10000
loss:   0.13 | 6464/10000
loss:   0.17 | 10000/10000
Epoch: 2 --------------------------------

loss:   0.07 | 64/10000
loss:   0.03 | 6464/10000
loss:   0.06 | 10000/10000
Epoch: 3 --------------------------------

loss:   0.02 | 64/10000
loss:   0.04 | 6464/10000
loss:   0.01 | 10000/10000
Epoch: 4 --------------------------------

loss:   0.03 | 64/10000
loss:   0.01 | 6464/10000
loss:   0.03 | 10000/10000
Epoch: 5 --------------------------------

loss:   0.02 | 64/10000
loss:   0.03 | 6464/10000
loss:   0.01 | 10000/10000
Epoch: 6 --------------------------------

loss:   0.01 | 64/10000
loss:   0.06 | 6464/10000
loss:   0.02 | 10000/10000
Epoch: 7 --------------------------------

loss:   0.01 | 64/10000
loss:   0.02 | 6464/10000
loss:   0.00 | 10000/10000
Epoch: 8 ----------

In [4]:
trainer.run_eval(train_data.length)

acc: 1.00


In [5]:
model.eval()

n = train_data.length # naugy direct access shrug
x = torch.randint(0, train_data.num_digits, (1,n,), dtype=torch.long).to(trainer.device)
y = torch.sort(x[0])[0] # torch.sort() 会返回一个列表, [0] 是排序后的数组, [1] 是排序后元素的原下标

pred = model.generate(x, n, do_sample=False)
pred = pred[:, n:]

print('输入序列:', x)
print('标准答案:', y)
print('预测答案:', pred)
print('是否正确:', (pred == y).all())

输入序列: tensor([[2, 0, 0, 2, 1, 0]], device='cuda:0')
标准答案: tensor([0, 0, 0, 1, 2, 2], device='cuda:0')
预测答案: tensor([[0, 0, 0, 1, 2, 2]], device='cuda:0')
是否正确: tensor(True, device='cuda:0')
