In [1]:
import math
import random

import torch
import torch.nn as nn
import numpy as np

In [2]:
max_length=16

In [3]:
class PositionalEncoding(nn.Module):
    "Implement the PE function."

    def __init__(self, d_model, dropout, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        # 初始化Shape为(max_len, d_model)的PE (positional encoding)
        pe = torch.zeros(max_len, d_model)
        # 初始化一个tensor [[0, 1, 2, 3, ...]]
        position = torch.arange(0, max_len).unsqueeze(1)
        # 这里就是sin和cos括号中的内容，通过e和ln进行了变换
        div_term = torch.exp(
            torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model)
        )
        # 计算PE(pos, 2i)
        pe[:, 0::2] = torch.sin(position * div_term)
        # 计算PE(pos, 2i+1)
        pe[:, 1::2] = torch.cos(position * div_term)
        # 为了方便计算，在最外面在unsqueeze出一个batch
        pe = pe.unsqueeze(0)
        # 如果一个参数不参与梯度下降，但又希望保存model的时候将其保存下来
        # 这个时候就可以用register_buffer
        self.register_buffer("pe", pe)

    def forward(self, x):
        """
        x 为embedding后的inputs，例如(1,7, 128)，batch size为1,7个单词，单词维度为128
        """
        # 将x和positional encoding相加。
        x = x + self.pe[:, : x.size(1)].requires_grad_(False)
        return self.dropout(x)

In [4]:
class CopyTaskModel(nn.Module):

    def __init__(self, d_model=128):
        super(CopyTaskModel, self).__init__()

        # 定义词向量，词典数为10。我们不预测两位小数。
        self.embedding = nn.Embedding(num_embeddings=10, embedding_dim=128)
        # 定义Transformer。超参是我拍脑袋想的
        self.transformer = nn.Transformer(d_model=128, num_encoder_layers=2, num_decoder_layers=2, dim_feedforward=512, batch_first=True)

        # 定义位置编码器
        self.positional_encoding = PositionalEncoding(d_model, dropout=0)

        # 定义最后的线性层，这里并没有用Softmax，因为没必要。
        # 因为后面的CrossEntropyLoss中自带了
        self.predictor = nn.Linear(128, 10)

    def forward(self, src, tgt):
        # 生成mask
        tgt_mask = nn.Transformer.generate_square_subsequent_mask(tgt.size()[-1])
        src_key_padding_mask = CopyTaskModel.get_key_padding_mask(src)
        tgt_key_padding_mask = CopyTaskModel.get_key_padding_mask(tgt)

        # 对src和tgt进行编码
        src = self.embedding(src)
        tgt = self.embedding(tgt)
        # 给src和tgt的token增加位置信息
        src = self.positional_encoding(src)
        tgt = self.positional_encoding(tgt)

        # 将准备好的数据送给transformer
        out = self.transformer(src, tgt,
                               tgt_mask=tgt_mask,
                               src_key_padding_mask=src_key_padding_mask,
                               tgt_key_padding_mask=tgt_key_padding_mask)

        """
        这里直接返回transformer的结果。因为训练和推理时的行为不一样，
        所以在该模型外再进行线性层的预测。
        """
        return out

    @staticmethod
    def get_key_padding_mask(tokens):
        """
        用于key_padding_mask
        """
        key_padding_mask = torch.zeros(tokens.size())
        key_padding_mask[tokens == 2] = -torch.inf
        return key_padding_mask

In [5]:
model = CopyTaskModel()

In [126]:
src = torch.LongTensor([[0, 3, 4, 5, 6, 1, 2, 2]])
tgt = torch.LongTensor([[3, 4, 5, 6, 3, 4, 5, 6, 1, 2, 2, 2, 2]])
out = model(src, tgt)
print(out.size())
print(out)

torch.Size([1, 13, 128])
tensor([[[-0.2952, -0.8189, -0.2171,  ...,  1.3933,  1.8787,  1.0751],
         [-0.5632, -0.3965,  0.8395,  ...,  0.8429,  2.5393,  0.9666],
         [-0.7840, -1.1798, -0.9036,  ...,  0.3380,  1.7237,  0.1108],
         ...,
         [-0.8202,  1.1193,  0.9514,  ..., -1.3758, -1.0707, -0.6261],
         [-0.7995,  1.0968,  0.9023,  ..., -1.3757, -1.0640, -0.7312],
         [-0.7636,  1.0694,  0.8826,  ..., -1.3629, -1.0563, -0.7993]]],
       grad_fn=<NativeLayerNormBackward0>)


In [7]:
criteria = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=3e-4)

In [110]:
def generate_random_batch(batch_size, max_length=16):
    src = []
    src_copy2 = []
    for i in range(batch_size):
        # 随机生成句子长度
        random_len = random.randint(1, max_length - 2)
        # 随机生成句子词汇，并在开头和结尾增加<bos>和<eos>
        random_num = [random.randint(3, 9) for _ in range(random_len)]
        
        src_random_nums = [0] + random_num + [1]
        src_copy2_random_nums = [0] + random_num * 2 + [1]
        
        # 如果句子长度不足max_length，进行填充
        src_random_nums = src_random_nums + [2] * (max_length - random_len - 2)
        src.append(src_random_nums)
        
        src_copy2_random_nums = src_copy2_random_nums + [2] * (max_length*2 - random_len*2 - 2)
        src_copy2.append(src_copy2_random_nums)
    
    src_tensor = torch.LongTensor(src)
    
    src_copy2_tensor = torch.LongTensor(src_copy2)
    
    # tgt不要最后一个token
    tgt = src_copy2_tensor[:, :-1]
    
    
    # tgt_y不要第一个的token
    tgt_y = src_copy2_tensor[:, 1:]
    # 计算tgt_y，即要预测的有效token的数量
    n_tokens = (tgt_y != 2).sum()

    # 这里的n_tokens指的是我们要预测的tgt_y中有多少有效的token，后面计算loss要用
    return src_tensor, tgt, tgt_y, n_tokens

In [143]:
generate_random_batch(2, max_length=6)

(tensor([[0, 5, 7, 3, 1, 2],
         [0, 8, 5, 7, 1, 2]]),
 tensor([[0, 5, 7, 3, 5, 7, 3, 1, 2, 2, 2],
         [0, 8, 5, 7, 8, 5, 7, 1, 2, 2, 2]]),
 tensor([[5, 7, 3, 5, 7, 3, 1, 2, 2, 2, 2],
         [8, 5, 7, 8, 5, 7, 1, 2, 2, 2, 2]]),
 tensor(14))

In [112]:
total_loss = 0

In [118]:
for step in range(2000):
    # 生成数据
    src, tgt, tgt_y, n_tokens = generate_random_batch(batch_size=30, max_length=max_length)

    # 清空梯度
    optimizer.zero_grad()
    # 进行transformer的计算
    out = model(src, tgt)
    # 将结果送给最后的线性层进行预测
    out = model.predictor(out)
    """
    计算损失。由于训练时我们的是对所有的输出都进行预测，所以需要对out进行reshape一下。
            我们的out的Shape为(batch_size, 词数, 词典大小)，view之后变为：
            (batch_size*词数, 词典大小)。
            而在这些预测结果中，我们只需要对非<pad>部分进行，所以需要进行正则化。也就是
            除以n_tokens。
    """
    loss = criteria(out.contiguous().view(-1, out.size(-1)), tgt_y.contiguous().view(-1)) / n_tokens
    # 计算梯度
    loss.backward()
    # 更新参数
    optimizer.step()

    total_loss += loss

    # 每40次打印一下loss
    if step != 0 and step % 40 == 0:
        print("Step {}, total_loss: {}".format(step, total_loss))
        total_loss = 0

Step 40, total_loss: 0.2376667708158493
Step 80, total_loss: 0.006520564667880535
Step 120, total_loss: 0.004718097858130932
Step 160, total_loss: 0.004160034004598856
Step 200, total_loss: 0.0037665499839931726
Step 240, total_loss: 0.0034146145917475224
Step 280, total_loss: 0.002977953990921378
Step 320, total_loss: 0.0029661322478204966
Step 360, total_loss: 0.002745328238233924
Step 400, total_loss: 0.002540662419050932
Step 440, total_loss: 0.0025036060251295567
Step 480, total_loss: 0.0023454115726053715
Step 520, total_loss: 0.002332857111468911
Step 560, total_loss: 0.002048691501840949
Step 600, total_loss: 0.001919168746098876
Step 640, total_loss: 0.0018854098161682487
Step 680, total_loss: 0.001782771898433566
Step 720, total_loss: 0.0017636781558394432
Step 760, total_loss: 0.0016696611419320107
Step 800, total_loss: 0.001660187030211091
Step 840, total_loss: 0.001568673411384225
Step 880, total_loss: 0.001480542472563684
Step 920, total_loss: 0.0014961248962208629
Step 9

In [142]:
model = model.eval()
# 随便定义一个src
src = torch.LongTensor([[0, 4, 3, 4, 6, 7, 3, 1, 2]])
# tgt从<bos>开始，看看能不能重新输出src中的值
tgt = torch.LongTensor([[0]])

# 一个一个词预测，直到预测为<eos>，或者达到句子最大长度
for i in range(max_length):
    # 进行transformer计算
    out = model(src, tgt)
    
    # 预测结果，因为只需要看最后一个词，所以取`out[:, -1]`
    predict = model.predictor(out[:, -1])
    # 找出最大值的index
    y = torch.argmax(predict, dim=1)
    # 和之前的预测结果拼接到一起
    tgt = torch.concat([tgt, y.unsqueeze(0)], dim=1)

    # 如果为<eos>，说明预测结束，跳出循环
    if y == 1:
        break
print(tgt)

tensor([[0, 4, 3, 4, 6, 7, 3, 4, 4, 3, 4, 6, 7, 6, 3, 3, 7]])


## 测试 transformer的输入

In [137]:
# 定义transformer，模型维度为128（也就是词向量的维度）
transformer = nn.Transformer(d_model=5, nhead=1, batch_first=True) # batch_first一定不要忘记

# 定义源句子，可以想想成是 <bos> 我 爱 吃 肉 和 菜 <eos> <pad> <pad>
src = torch.Tensor([[[0, 3, 4, 1, 2]]])

# 定义目标句子，可以想想是 <bos> I like eat meat and vegetables <eos> <pad>
tgt = torch.Tensor([[[0, 3, 4, 1, 2]]])

# 将token编码后送给transformer（这里暂时不加Positional Encoding）
outputs = transformer(src, tgt)

outputs.size()

torch.Size([1, 1, 5])