In [1]:
# conda activate py_gpu

import torch
from torch import nn
import torch.nn.functional as F
import math

In [2]:
# 测试torch工作正常，随机生成一个4x4的矩阵
random_torch = torch.rand(4, 4)
print(random_torch)

tensor([[0.7011, 0.5376, 0.9051, 0.7693],
        [0.6151, 0.5230, 0.9866, 0.0510],
        [0.9894, 0.8698, 0.7650, 0.3095],
        [0.2850, 0.5232, 0.0903, 0.1520]])


### Embedding层

In [3]:
from torch import Tensor
# 将输入的词汇表索引转换为指定维度的Embedding向量


class TokenEmbedding(nn.Embedding):
    # def __init__(self, num_embeddings: int, embedding_dim: int = 768):
    #     super().__init__(num_embeddings, embedding_dim, padding_idx=0)
    def __init__(self, vocab_size, d_model): # d_model是模型embedding的维度
        # 用索引为1的token作为padding token
        super(TokenEmbedding, self).__init__(vocab_size, d_model, padding_idx=1)

In [4]:
class PositionalEmbedding(nn.Embedding):
    def __init__(self, d_model, max_len, device):
        super(PositionalEmbedding, self).__init__()
        # 初始化一个形状为(max_len, d_model)的全零矩阵
        self.encoding = torch.zeros(max_len, d_model, device=device)
        # 位置编码不需要参与反向传播
        self.encoding.requires_grad = False

        pos = torch.arange(0, max_len, device=device).float()
        pos = pos.float().unsqueeze(dim=1) # 转换为浮点型 二维张量

        _2i = torch.arange(0, d_model, step=2, device=device).float()

        self.encoding[:, 0::2] = torch.sin(pos / (10000 ** (_2i / d_model)))
        self.encoding[:, 1::2] = torch.cos(pos / (10000 ** (_2i / d_model)))

    def forward(self, x):
        batch_size, seq_len = x.size()
        return self.encoding[:seq_len, :] # 返回的是位置编码矩阵的前seq_len行


In [5]:
class TransformerEmbedding(nn.Module):
    def __init__(self, vocab_size, d_model, max_len, drop_prob, device):
        super(TransformerEmbedding, self).__init__()
        self.tok_emb = TokenEmbedding(vocab_size, d_model)
        self.pos_emb = PositionalEmbedding(d_model, max_len, device)
        self.drop_out = nn.Dropout(p=drop_prob)

    def forward(self, x):
        tok_emb = self.tok_emb(x)
        pos_emb = self.pos_emb(x)
        return self.drop_out(tok_emb + pos_emb)

### Multi-head Attention层

In [6]:
d_model = 512
n_head = 8

In [7]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, n_head):
        super(MultiHeadAttention, self).__init__()
        self.n_head = n_head
        self.d_model = d_model
        # 定义Q K V
        self.W_q = nn.Linear(d_model, n_head) # Q
        self.W_k = nn.Linear(d_model, n_head)
        self.W_v = nn.Linear(d_model, n_head)
        # 输出层
        self.W_o = nn.Linear(n_head, d_model)
        # softmax层
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, q, k, v, mask=None):
        # q, k, v的形状是(batch_size, seq_len, d_model), 即批次大小，序列长度（时间），模型维度（词向量维度）
        batch, time, dimension = q.shape

        n_d = self.d_model // self.n_head # 每个头的维度
        q, k, v = self.W_q(q), self.W_k(k), self.W_v(v) 
        # 将Q K V映射到多头
        q_s = q.view(batch, time, self.n_head, n_d).permute(0, 2, 1, 3) # (batch, n_head, time, n_d)
        k_s = k.view(batch, time, self.n_head, n_d).permute(0, 2, 1, 3)
        v_s = v.view(batch, time, self.n_head, n_d).permute(0, 2, 1, 3)

        # 计算Q K 点积， 即注意力分数
        score = q_s @ k_s.transpose(2, 3) / math.sqrt(n_d) # (batch, n_head, time, time)

        if mask is not None:
            score = score.masked_fill(mask == 0, -1e9) # 掩码为0的位置填充为负无穷，即屏蔽时间序列以后的信息

        # 计算注意力权重
        score = self.softmax(score)@v_s 
        score = score.permute(0, 2, 1, 3).contiguous().view(batch, time, dimension) 
        out = self.W_o(score)
        return out

attention = MultiHeadAttention(d_model, n_head)

In [8]:
# out = attention(x, x, x)
# print(out)

### Layer Normalization层

In [9]:
class LayerNorm(nn.Module):
    def __init__(self, d_model, eps=1e-12):  # eps 数值稳定性，非常小的常数
        super(LayerNorm, self).__init__()
        # y = gamma * x + beta
        self.gamma = nn.Parameter(torch.ones(d_model)) # 缩放参数 初始化为1
        self.beta = nn.Parameter(torch.zeros(d_model)) # 偏移参数 初始化为0
        self.eps = eps

    def forward(self, x):
        mean = x.mean(-1, keepdim=True) # 计算均值
        var = x.var(-1, unbiased=False, keepdim=True) # 计算方差 (unbiased=False表示不使用无偏估计，keepdim=True表示保持维度)
        out = (x - mean) / torch.sqrt(var + self.eps) # 归一化
        out = self.gamma * out + self.beta
        return out

### Encoder

In [10]:
# 定义 position-wised 前馈神经网络 类
class PositionwiseFeedForward(nn.Module):
    def __init__(self, d_model, hidden, dropout=0.1): # hidden是隐藏层的维度， dropout率，默认0.1防止过拟合
        super(PositionwiseFeedForward, self).__init__()
        # 定义第一个全连接层
        self.fc1 = nn.Linear(d_model, hidden) # 输入维度是d_model，输出维度是hidden
        # 定义第二个全连接层
        self.fc2 = nn.Linear(hidden, d_model) # 输入维度是hidden，输出维度是d_model
        self.dropout = nn.Dropout(dropout) # 定义dropout率

    def forward(self, x):
        x = self.fc1(x) # 第一个全连接层
        x = F.relu(x) # 激活函数
        x = self.dropout(x) # dropout防止过拟合
        x = self.fc2(x) # 第二个全连接层，回到d_model维度
        return x

        

In [11]:
# 定义一个Transformer的Encoder Layer类
class EncoderLayer(nn.Module):
    def __init__(self, d_model, n_head, hidden, dropout=0.1): # d_model是模型embedding的维度，n_head是多头注意力的头数，hidden是前馈神经网络的隐藏层维度
        super(EncoderLayer, self).__init__()
        self.attention = MultiHeadAttention(d_model, n_head) # 定义多头注意力层
        self.norm1 = LayerNorm(d_model) # 定义LayerNorm 归一化层
        self.dropout1 = nn.Dropout(dropout) # 定义第一个dropout层
        self.ffn = PositionwiseFeedForward(d_model, hidden, dropout) # 定义前馈神经网络层
        self.norm2 = LayerNorm(d_model) # 定义第二个LayerNorm 归一化层
        self.dropout2 = nn.Dropout(dropout) # 定义第二个dropout层

    def forward(self, x, mask=None):
        # 保存原始输入，用于残差连接
        _x = x

        x = self.attention(x, x, x, mask) # 多头注意力， QKV的输入都是x 这里的参数是定义模型的类里forward函数实现时定义的参数
        x = self.dropout1(x) # dropout防止过拟合
        x = self.norm1(x + _x) # 残差连接，归一化

        # 保存这一阶段，用于下一个子层的残差连接
        _x = x
        x = self.ffn(x) # 前馈神经网络
        x = self.dropout2(x) # dropout防止过拟合
        x = self.norm2(x + _x) # 残差连接，归一化
        return x


In [12]:
class Encoder(nn.Module): # 多层的Encoder Layer
    # def __init__(self, d_model, n_head, hidden, n_layers, vocab_size, max_len, drop_prob, device):
    def __init__(self, enc_voc_size, max_len, d_model, ffn_hidden, n_head, n_layers, dropout, device):
        super(Encoder, self).__init__()
        self.embedding = TransformerEmbedding(enc_voc_size, d_model, max_len, dropout, device) # Transformer的Embedding层
        self.layers = nn.ModuleList(
            [
                EncoderLayer(d_model, ffn_hidden, n_head, device) for _ in range(n_layers)
            ]
        ) # 多层的Encoder Layer 

    def forward(self, x, mask=None):
        x = self.embedding(x) # Embedding层
        for layer in self.layers: # 多层的Encoder Layer
            x = layer(x, mask)
        return x # 返回所有编码器返回的x （将被用作解码器decoder的输入的k-v）

### Decoder

In [13]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, ffn_hidden, n_head, drop_prob):
        super(DecoderLayer, self).__init__()
        self.attention1 = MultiHeadAttention(d_model, n_head) # 第一个多头注意力层， 自注意力层
        self.norm1 = LayerNorm(d_model)
        self.dropout1 = nn.Dropout(drop_prob)
        self.cross_attention = MultiHeadAttention(d_model, n_head) # 第二个多头注意力层， 交叉注意力层, 用于encoder-decoder的attention, k-v来自encoder的输出
        self.norm2 = LayerNorm(d_model)
        self.dropout2 = nn.Dropout(drop_prob)
        self.ffn = PositionwiseFeedForward(d_model, ffn_hidden) # 前馈神经网络， 用于解码器的输出
        self.norm3 = LayerNorm(d_model)
        self.dropout3 = nn.Dropout(drop_prob)

    def forward(self, dec, enc, self_mask=None, enc_mask=None):  # dec是decoder的输入， enc是encoder的输出(用于第二层注意力的输入)，self_mask是自注意力层掩码， enc_mask是交叉注意力层掩码
        _x = dec # 保存原始输入，用于残差连接
        x = self.attention1(dec, dec, dec, self_mask) # 自注意力层， QKV的输入都是decoder的输入dec，这里的参数是定义模型的类里forward函数实现时定义的参数
        x = self.dropout1(x)
        x = self.norm1(x + _x) # 残差连接，归一化

        _x = x # 保存这一阶段，用于下一个子层的残差连接

        x = self.cross_attention(x, enc, enc, enc_mask) # 交叉注意力层， k-v来自encoder的输出enc， q来自上一层的输出x
        x = self.dropout2(x)
        x = self.norm2(x + _x) # 残差连接，归一化

        _x = x # 保存这一阶段，用于下一个子层的残差连接 

        x = self.ffn(x) # 前馈神经网络
        x = self.dropout3(x)
        x = self.norm3(x + _x)

        return x


In [14]:
class Decoder(nn.Module):
    def __init__(self, dec_voc_size, max_len, d_model, ffn_hidden, n_head, n_layers, drop_prob, device):
        super(Decoder, self).__init__()
        self.embedding = TransformerEmbedding(dec_voc_size, d_model, max_len, drop_prob, device) # 输入的词汇表，转为embedding向量
        self.layers = nn.ModuleList(
            [
                DecoderLayer(d_model, ffn_hidden, n_head, drop_prob) for _ in range(n_layers)
            ]
        )
        self.fc=nn.Linear(d_model, dec_voc_size) # 输出词汇表，将decoder的输出映射到词汇表的维度

    def forward(self, dec, enc, self_mask=None, enc_mask=None): # 这里写的和视频不一样，应该是视频写错了
        x = self.embedding(dec) # Embedding层，将decoder的输入部分dec转为embedding向量
        for layer in self.layers: # 进入多层的Decoder Layer循环，每一层都要考虑两部分输入，一部分时decoder的embedding，一部分是encoder的输出，并且两部分都有掩码，分别是第一层的自注意力层掩码和第二层的交叉注意力层掩码
            x = layer(x, enc, self_mask, enc_mask)
        return self.fc(x)

### Transformer实现

In [15]:
class Transformer(nn.Module):
    def __init__(self, 
                src_pad_idx,
                trg_pad_idx,
                enc_voc_size,
                dec_voc_size,
                d_model,
                max_len,
                n_heads,
                ffn_hidden,
                n_layers,
                drop_prob,
                device):
        super(Transformer, self).__init__()
        self.encoder = Encoder(enc_voc_size, d_model, max_len, ffn_hidden, n_heads, n_layers, drop_prob, device)
        self.decoder = Decoder(dec_voc_size, d_model, max_len, ffn_hidden, n_heads, n_layers, drop_prob, device)
        self.src_pad_idx = src_pad_idx
        self.trg_pad_idx = trg_pad_idx
        self.device = device

    def make_pad_mask(self, q, k, pad_idx_q, pad_idx_k):
        len_q, len_k = q.size(1), k.size(1)
        q = q.ne(pad_idx_q).unsqueeze(1).unsqueeze(3)
        q = q.repeat(1, 1, 1, len_k)
        k = k.ne(pad_idx_k).unsqueeze(1).unsqueeze(2)
        k = k.repeat(1, 1, len_q, 1)
        mask = q & k
        return mask
    
    def make_casual_mask(self, q, k):
        len_q, len_k = q.size(1), k.size(1)
        mask = torch.trill(torch.ones(len_q, len_k)).type(torch.BoolTensor).to(self.device)
        return mask
    
    def forward(self, src, trg):
        src_mask = self.make_pad_mask(src, src, self.src_pad_idx, self.src_pad_idx)
        trg_mask = self.make_pad_mask(trg, trg, self.trg_pad_idx, self.trg_pad_idx) * self.make_casual_mask(trg, trg)
        enc_src = self.encoder(src, src_mask) # encoder的输入是src，src_mask是自注意力层的掩码
        out = self.decoder(trg, enc_src, trg_mask, src_mask) # decoder的输入是trg，enc_src是encoder的输出，trg_mask是自注意力层的掩码，src_mask是交叉注意力层的掩码
        return out