### 1.1 词嵌入向量

程序段1.1：词向量层（Input Embedding）的编程实现

In [5]:
import torch
import torch.nn as nn
import math

class InputEmbeddings(nn.Module):  # 词向量层

    def __init__(self, vocab_size: int, d_model: int):
        super().__init__()
        self.vocab_size = vocab_size
        self.d_model = d_model
        self.embedding = nn.Embedding(vocab_size, d_model) #创建PyTorch的Embedding层

    def forward(self, x):
        # (batch, seq_len) --> (batch, seq_len, d_model)
        return self.embedding(x.long()) * math.sqrt(self.d_model) #将输入x转换为long类型(确保是整数索引)

In [6]:
# 测试
vocab_size = 21128  # 设置词典大小
d_model = 4     # 设置模型维度
batch_size = 4   # 设置批次大小
seq_len = 8      # 设置序列最大长度
# 使用 torch.randint 生成随机整数张量 x，取值范围为 [0, vocab_size),表示输入的序列
x = torch.randint(0, vocab_size, (batch_size, seq_len))
print(f'词向量编码前{x.shape}：\n, {x}')  # x的维度应该是 (batch_size, seq_len)
input_embedding = InputEmbeddings(vocab_size, d_model)
x = input_embedding(x)
print(f'词向量编码后{x.shape}：\n {x}')  # x的维度应该是 (batch_size, seq_len, d_model)

词向量编码前torch.Size([4, 8])：
, tensor([[ 5187,  8910, 11108,  4598, 16055,  4120, 19649,  5527],
        [14470,  8284,  1161, 18348, 19652,  3536,  3066, 17416],
        [10857,  2649,  1603,  1797, 16212, 12583,  2065,  4399],
        [10251, 16728,   349,  4992, 16588,  9539,  2361, 14609]])
词向量编码后torch.Size([4, 8, 4])：
 tensor([[[ 1.2678e-01,  1.2483e-01, -7.6980e-01, -7.9810e-01],
         [-3.0506e+00, -2.6890e+00,  3.1259e+00,  1.1606e+00],
         [-1.4637e+00, -5.7394e-01,  1.9243e-01, -1.6307e+00],
         [-2.7608e+00, -4.5781e+00, -2.7612e+00,  2.5439e+00],
         [ 1.6018e+00, -2.2088e+00,  2.9062e+00,  6.3336e-01],
         [ 3.5289e+00,  8.7768e-01, -3.2656e+00, -2.5945e-01],
         [-8.6179e-01,  2.1509e+00, -2.2301e+00,  3.0614e+00],
         [ 6.8590e-01,  9.4952e-01, -1.3004e+00,  2.5137e+00]],

        [[-2.2569e-02, -1.7442e+00, -2.3781e+00,  2.8922e+00],
         [-1.3270e+00, -1.4574e+00,  6.1618e-01, -3.8060e+00],
         [ 2.5347e-01, -2.9460e-01,  5.7769e-

### 1.2 位置编码

程序段1.2：位置编码层（Positional Encoding）的编程实现

In [7]:
class PositionalEncoding(nn.Module):  # 位置编码层

    def __init__(self, seq_len: int, d_model: int, dropout: float):
        super().__init__()
        self.dropout = nn.Dropout(dropout)
        # 使用 torch.zeros 生成初值为 0 的位置矩阵，维度为：(seq_len, d_model)
        pe = torch.zeros(seq_len, d_model)
        # 将 pe 的维度扩展为 (1, seq_len, d_model)，第一个维度，表示批次的大小
        pe = pe.unsqueeze(0) # (1, seq_len, d_model)
        # 创建位置张量 (seq_len)
        position = torch.arange(0, seq_len, dtype=torch.float).unsqueeze(1) # (seq_len, 1)
        # 计算 position * (10000 ** (2i / d_model) ，2i用 torch.arange(0, d_model, 2) 生成
        angle = position / torch.pow(10000, torch.arange(0, d_model, 2).float() / d_model) 
        # 对 pe 中偶数索引的列应用正弦函数
        pe[:, :, 0::2] = torch.sin(angle) 
        # 对 pe 中奇数索引的列应用余弦函数
        pe[:, :, 1::2] = torch.cos(angle)   
        # 使用 register_buffer 方法将 pe 注册为一个缓冲区
        self.register_buffer('pe', pe)

    def forward(self, x):
        position_encode = self.pe.requires_grad_(False)
         # 将位置编码与词向量相加， 得到的形状仍为 (batch, seq_len, d_model)
        x = x + (self.pe[:, :x.shape[1], :]).requires_grad_(False)
        x = self.dropout(x)  # dropout的目的是让输出更具泛化能力
        return x, position_encode

In [8]:
# 测试
dropout = 0.1
position_encoding = PositionalEncoding(seq_len, d_model, dropout)
x, positon_encode = position_encoding(x)
print(f'得到的位置编码{positon_encode.shape}：\n {positon_encode}') 
print(f'\n位置编码与词向量相加后的x {x.shape}：\n {x}')  

得到的位置编码torch.Size([1, 8, 4])：
 tensor([[[ 0.0000,  1.0000,  0.0000,  1.0000],
         [ 0.8415,  0.5403,  0.0100,  0.9999],
         [ 0.9093, -0.4161,  0.0200,  0.9998],
         [ 0.1411, -0.9900,  0.0300,  0.9996],
         [-0.7568, -0.6536,  0.0400,  0.9992],
         [-0.9589,  0.2837,  0.0500,  0.9988],
         [-0.2794,  0.9602,  0.0600,  0.9982],
         [ 0.6570,  0.7539,  0.0699,  0.9976]]])

位置编码与词向量相加后的x torch.Size([4, 8, 4])：
 tensor([[[ 0.1409,  1.2498, -0.8553,  0.2243],
         [-0.0000, -2.3875,  0.0000,  2.4006],
         [-0.6160, -1.1001,  0.2360, -0.7011],
         [-2.9107, -6.1868, -3.0346,  3.9371],
         [ 0.9389, -0.0000,  3.2736,  1.8140],
         [ 2.8555,  1.2904, -3.5729,  0.8214],
         [-1.2680,  3.4567, -2.4113,  4.5107],
         [ 1.4921,  1.8927, -1.3672,  3.9014]],

        [[-0.0251, -0.8268, -2.6423,  4.3247],
         [-0.5395, -1.0190,  0.6958, -3.1179],
         [ 1.2920, -0.0000,  0.6641,  6.6702],
         [ 0.4319, -0.7878, -1.24

### 1.3 Q、K、V矩阵

程序段1.3实现了Q、K、V矩阵的生成逻辑。

In [9]:
W_q = nn.Linear(d_model, d_model, bias=False) # Wq矩阵
W_k = nn.Linear(d_model, d_model, bias=False) # Wk矩阵
W_v = nn.Linear(d_model, d_model, bias=False) # Wv矩阵
print(f'W_q矩阵{W_q.weight.shape}：\n {W_q.weight}') 
print(f'W_k矩阵{W_k.weight.shape}：\n {W_k.weight}') 
print(f'W_v矩阵{W_v.weight.shape}：\n {W_v.weight}') 
 # 输出的维度是 (batch_size, seq_len, d_model)
Q = W_q(x)
K = W_k(x)
V = W_v(x)
print(f'Q矩阵{Q.shape}：\n {Q}') 
print(f'K矩阵{K.shape}：\n {K}') 
print(f'V矩阵{V.shape}：\n {V}') 

W_q矩阵torch.Size([4, 4])：
 Parameter containing:
tensor([[-0.2306, -0.2550,  0.2075,  0.4690],
        [-0.3763, -0.1178, -0.3161, -0.1143],
        [-0.2948,  0.1617,  0.3861,  0.2138],
        [ 0.0054,  0.2621,  0.0655, -0.2242]], requires_grad=True)
W_k矩阵torch.Size([4, 4])：
 Parameter containing:
tensor([[-0.4327,  0.4112, -0.2834,  0.4604],
        [ 0.2423, -0.0924, -0.4268, -0.4884],
        [ 0.1975,  0.2601, -0.0412,  0.3488],
        [-0.1899, -0.2365, -0.2957,  0.3863]], requires_grad=True)
W_v矩阵torch.Size([4, 4])：
 Parameter containing:
tensor([[ 0.0900,  0.4058, -0.4526, -0.3298],
        [ 0.0822, -0.4185,  0.1396, -0.0061],
        [ 0.1199, -0.3806, -0.1170, -0.0273],
        [ 0.4441, -0.0816, -0.3588,  0.1857]], requires_grad=True)
Q矩阵torch.Size([4, 8, 4])：
 tensor([[[-0.4234,  0.0446, -0.1217,  0.2220],
         [ 1.7346,  0.0067,  0.1271, -1.1641],
         [ 0.1427,  0.3669, -0.0550, -0.1190],
         [ 3.4653,  2.3331, -0.4724, -2.7189],
         [ 1.3135, -1.5956

### 1.4 自注意力

In [10]:
def attention(Q, K, V, dropout: nn.Dropout):
    d_k = Q.shape[-1]   # 得到词向量维度
    # (1)根据论文公式计算注意力分布矩阵
    # (batch_size, seq_len, d_k) --> (batch_size, seq_len, seq_len)
    attention_matrix = (Q @ K.transpose(-2, -1)) / math.sqrt(d_k)
    attention_matrix = attention_matrix.softmax(dim=-1) #对矩阵最后一个维度做归一化 
    if dropout is not None:
        attention_matrix = dropout(attention_matrix)
    # （2）加权求和
    # (batch_size, seq_len, seq_len) --> (batch_size, seq_len, d_v)
    X = torch.matmul(attention_matrix, V)
    # 返回注意力分布矩阵用于后续的可视化
    return X, attention_matrix

In [11]:
# 测试
dropout_layer = nn.Dropout(p = 0.1)
X, attention_matrix = attention(Q, K, V, dropout_layer)
print(f'自注意力分布矩阵{attention_matrix.shape}：\n {attention_matrix}') 
print(f'自注意力计算结果{X.shape}：\n {X}') 

自注意力分布矩阵torch.Size([4, 8, 8])：
 tensor([[[1.2754e-01, 1.7161e-01, 1.8320e-01, 1.9311e-01, 1.4973e-01,
          1.3565e-01, 5.8319e-02, 9.1963e-02],
         [6.8067e-02, 1.5752e-02, 1.9029e-02, 8.0425e-03, 2.9105e-02,
          4.8730e-02, 7.7898e-01, 1.4340e-01],
         [1.6402e-01, 0.0000e+00, 1.5378e-01, 1.1436e-01, 1.0149e-01,
          2.0378e-01, 1.2857e-01, 1.2787e-01],
         [1.3547e-01, 0.0000e+00, 1.6308e-02, 4.0948e-04, 1.8914e-03,
          2.6820e-01, 6.3158e-01, 5.5634e-02],
         [5.0111e-03, 5.7917e-03, 9.1927e-04, 4.6598e-03, 1.5348e-02,
          2.3897e-03, 9.6273e-01, 1.1427e-01],
         [7.4134e-02, 1.5589e-01, 0.0000e+00, 1.3638e-01, 1.8234e-01,
          3.0691e-02, 0.0000e+00, 5.9186e-03],
         [6.1080e-02, 2.6243e-02, 1.7509e-02, 2.2492e-02, 2.6068e-02,
          1.0408e-01, 6.3973e-01, 0.0000e+00],
         [7.5446e-02, 6.4655e-02, 4.1244e-02, 4.4399e-02, 1.3055e-01,
          3.7818e-02, 0.0000e+00, 1.9520e-01]],

        [[1.1469e-01, 8.7725e-

### 1.5 交叉注意力

程序段1.5：交叉注意力计算逻辑

In [12]:
# Q 矩阵来源自于序列 X1，键矩阵 K 和值矩阵 V 源自序列 X2
X1 = torch.randint(0, vocab_size, (batch_size, seq_len)) # 模拟序列 X1
X2 = torch.randint(0, vocab_size, (batch_size, seq_len)) # 模拟序列 X2
input_embedding = InputEmbeddings(vocab_size, d_model)  # 词向量编码
X1 = input_embedding(X1)
X2 = input_embedding(X2)
position_encoding = PositionalEncoding(seq_len, d_model, dropout)
X1, positon_encode = position_encoding(X1)
X2, positon_encode = position_encoding(X2)

 # 源自不同的序列，输出的维度是 (batch_size, seq_len, d_model)
Q = W_q(X1)
K = W_k(X2)
V = W_v(X2)

dropout_layer = nn.Dropout(p = 0.1)
X, attention_matrix = attention(Q, K, V, dropout_layer)
print(f'交叉注意力分布矩阵{attention_matrix.shape}：\n {attention_matrix}') 
print(f'\n交叉注意力计算结果{X.shape}：\n {X}') 

交叉注意力分布矩阵torch.Size([4, 8, 8])：
 tensor([[[1.5774e-01, 4.4293e-02, 3.0673e-01, 1.3085e-01, 4.6552e-02,
          2.9525e-01, 9.9089e-02, 0.0000e+00],
         [0.0000e+00, 1.0799e+00, 2.0402e-03, 1.5450e-04, 1.9553e-02,
          2.5313e-04, 5.2604e-03, 1.3821e-03],
         [1.0317e-01, 0.0000e+00, 9.1092e-02, 7.1809e-02, 2.2481e-01,
          5.8494e-02, 1.5557e-01, 1.5196e-01],
         [4.8893e-02, 0.0000e+00, 4.9056e-02, 1.3419e-01, 2.5282e-01,
          3.6709e-02, 2.1352e-01, 3.7255e-01],
         [3.0923e-02, 9.9031e-01, 3.9583e-02, 5.0485e-03, 8.1219e-03,
          2.6019e-02, 9.0277e-03, 2.0823e-03],
         [0.0000e+00, 1.0285e+00, 1.1949e-02, 5.5123e-03, 1.7375e-02,
          9.8177e-03, 9.5383e-03, 9.6989e-03],
         [5.3914e-03, 1.0879e+00, 2.6269e-03, 1.2270e-03, 6.0681e-03,
          2.1552e-03, 2.4521e-03, 3.3031e-03],
         [9.2723e-02, 1.1099e-02, 1.2354e-01, 1.4503e-01, 2.1857e-01,
          8.2328e-02, 2.4435e-01, 1.9347e-01]],

        [[5.5223e-03, 2.2540e

### 1.6 掩码注意力

程序段1.6实现了掩码注意力的计算逻辑。

In [13]:
def attention(Q, K, V, mask, dropout: nn.Dropout):
    d_k = Q.shape[-1]   # 得到词向量维度
    # (1)根据论文公式计算注意力分布矩阵
    # (batch_size, seq_len, d_k) --> (batch_size, seq_len, seq_len)
    attention_matrix = (Q @ K.transpose(-2, -1)) / math.sqrt(d_k)
    if mask is not None:
        # 掩码操作，将 mask 为 0 的元素设置为 一个极小的数
        attention_matrix.masked_fill_(mask == 0, -1e9)
    attention_matrix = attention_matrix.softmax(dim=-1) #对矩阵最后一个维度做归一化 
    if dropout is not None:
        attention_matrix = dropout(attention_matrix)
    # （2）加权求和
    # (batch_size, seq_len, seq_len) --> (batch_size, seq_len, d_v)
    X = torch.matmul(attention_matrix, V)
    # 返回注意力分布矩阵用于后续的可视化
    return X, attention_matrix

In [14]:
# 测试
dropout_layer = nn.Dropout(p = 0.1)
# 构建一个下三角矩阵，屏蔽未来位置
mask = torch.tril(torch.ones(seq_len, seq_len)).unsqueeze(0)
X, attention_matrix = attention(Q, K, V, mask, dropout_layer)
print(f'掩码注意力分布矩阵{attention_matrix.shape}：\n {attention_matrix}') 
print(f'\n掩码注意力计算结果{X.shape}：\n {X}') 

掩码注意力分布矩阵torch.Size([4, 8, 8])：
 tensor([[[1.1111e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00,
          0.0000e+00, 0.0000e+00, 0.0000e+00],
         [2.6720e-03, 1.1084e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00,
          0.0000e+00, 0.0000e+00, 0.0000e+00],
         [2.5561e-01, 6.2981e-01, 0.0000e+00, 0.0000e+00, 0.0000e+00,
          0.0000e+00, 0.0000e+00, 0.0000e+00],
         [2.3067e-01, 0.0000e+00, 2.3144e-01, 6.3306e-01, 0.0000e+00,
          0.0000e+00, 0.0000e+00, 0.0000e+00],
         [3.1992e-02, 1.0245e+00, 4.0952e-02, 5.2230e-03, 0.0000e+00,
          0.0000e+00, 0.0000e+00, 0.0000e+00],
         [1.9086e-02, 1.0466e+00, 1.2159e-02, 5.6094e-03, 1.7682e-02,
          9.9907e-03, 0.0000e+00, 0.0000e+00],
         [5.4074e-03, 1.0911e+00, 2.6347e-03, 1.2306e-03, 6.0861e-03,
          2.1616e-03, 2.4595e-03, 0.0000e+00],
         [9.2723e-02, 1.1099e-02, 1.2354e-01, 1.4503e-01, 2.1857e-01,
          8.2328e-02, 2.4435e-01, 1.9347e-01]],

        [[1.1111e+00, 0.0000e

### 1.7 多头注意力

程序段1.7实现了多头注意力的计算逻辑。

In [15]:
# 多头注意力模块，带掩码设置，也可计算交叉注意力
class MultiHeadAttention(nn.Module):  

    def __init__(self, d_model: int, heads: int, dropout: float):
        super().__init__()
        self.d_model = d_model   # 模型维度
        self.heads = heads  # 头数
        # 需要确定 d_model 能被 heads 整除
        assert d_model % heads == 0, "d_model 不能被 heads 整除"

        self.d_k = d_model // heads  # 计算单个头的词向量长度
        self.W_q = nn.Linear(d_model, d_model, bias=False) # Wq矩阵
        self.W_k = nn.Linear(d_model, d_model, bias=False) # Wk矩阵
        self.W_v = nn.Linear(d_model, d_model, bias=False) # Wv矩阵
        self.W_o = nn.Linear(d_model, d_model, bias=False) # Wo矩阵
        self.dropout = nn.Dropout(dropout)

    @staticmethod
    def attention(Q, K, V, mask, dropout: nn.Dropout):
        d_k = Q.shape[-1]   # 词向量长度
        # （1）根据论文公式计算注意力
        # (batch_size, heads, seq_len, d_k) --> (batch_size, heads, seq_len, seq_len)
        attention_matrix = (Q @ K.transpose(-2, -1)) / math.sqrt(d_k)
        if mask is not None:
            # 掩码操作，根据 mask 矩阵，将 attention_matrix 掩码位置设为一个极小的数
            attention_matrix.masked_fill_(mask == 0, -1e9)
        # 在 (batch_size, heads, seq_len, seq_len) 最后一个维度上归一化 
        attention_matrix = attention_matrix.softmax(dim=-1) 
        if dropout is not None:
            attention_matrix = dropout(attention_matrix)
        # （2）加权求和
        # (batch_size, seq_len, seq_len) --> (batch_size, seq_len, d_v)
        X = torch.matmul(attention_matrix, V)
        # 返回注意力分数矩阵用于后续的可视化
        return X, attention_matrix
        
    # Query, Key, Value 表示来自上一层的输入
    def forward(self, Query, Key, Value, mask):
        # （1）上一层的输入映射为 Q、K、V 矩阵
        Q = self.W_q(Query)  
        K = self.W_k(Key)  
        V = self.W_v(Value)  
        # （2）分头操作，调整矩阵维度
        # (batch_size, seq_len, d_model) --> (batch_size, seq_len, heads, d_k) 
        # --> (batch_size, heads, seq_len, d_k)
        Q = Q.view(Q.shape[0], Q.shape[1], self.heads, self.d_k).transpose(1, 2)
        K = K.view(K.shape[0], K.shape[1], self.heads, self.d_k).transpose(1, 2)
        V = V.view(V.shape[0], V.shape[1], self.heads, self.d_k).transpose(1, 2)

        # （3）计算注意力，注意 Q、K、V 的矩阵维度此时是分头状态
        X, self.attention_matrix = MultiHeadAttention.attention(Q, K, V, mask, self.dropout)
        
        # （4）合并单头注意力，调整 Q、K、V 的矩阵维度
        # (batch_size, heads, seq_len, d_k) --> (batch_size, seq_len, heads, d_k) 
        # --> (batch_size, seq_len, d_model)
        X = X.transpose(1, 2).contiguous().view(X.shape[0], -1, self.heads * self.d_k)

        # （5）多头注意力的最后一层是线性层，用 Wo 矩阵表示
        X = self.W_o(X)
        X = self.dropout(X)
        return X

In [16]:
# 测试
vocab_size = 2118  # 设置词典大小
batch_size = 4    # 设置批次大小
seq_len = 8       # 设置序列最大长度
d_model = 512     # 模型维度
heads = 8  # 头数
dropout = 0.1
# 构建一个下三角矩阵，屏蔽未来位置，用作掩码矩阵
mask = torch.tril(torch.ones(seq_len, seq_len)).unsqueeze(0)

# 使用 torch.randint 生成随机整数张量 X，取值范围为 [0, vocab_size),表示输入的序列
X = torch.randint(0, vocab_size, (batch_size, seq_len))

# 词向量编码
input_embedding = InputEmbeddings(vocab_size, d_model)
X = input_embedding(X)

# 位置编码叠加词向量编码
position_encoding = PositionalEncoding(seq_len, d_model, dropout)
X, positon_encode = position_encoding(X)

# 多头注意力编码
mh_attention_block = MultiHeadAttention(d_model, heads, dropout)
X = mh_attention_block(X,X,X,mask)

print(f'多头注意力计算结果{X.shape}：\n {X}') 

多头注意力计算结果torch.Size([4, 8, 512])：
 tensor([[[ -7.3791,   2.1448,   0.7829,  ...,   6.9030,   9.3861,   0.0000],
         [  2.5428,  -5.7630,   0.0000,  ...,  -1.1007, -13.4121,   5.6321],
         [-13.7783,   1.8668, -11.1591,  ...,  -8.8603,  -7.9482, -14.5927],
         ...,
         [ -5.5646,   4.3508,  -5.0623,  ...,   3.9187,  -1.9886,  -4.7542],
         [  3.5289,   2.2999,   5.4827,  ..., -10.9634,  -9.4228,  -3.6248],
         [ 11.2869,   9.8718,  -4.5895,  ...,   4.5762,  -7.1419,  -3.2897]],

        [[  5.2099,  13.9342,   1.2197,  ..., -14.3893,  18.5205,  -5.3147],
         [  8.7584,  -3.1686,   0.0000,  ..., -10.5332,   9.2925,  10.2795],
         [  2.5875,   7.0298,   4.4812,  ...,  -7.1233,   6.6587,   5.7045],
         ...,
         [  2.9565,   4.6786,   6.4331,  ...,   5.4211, -13.6371,   6.2226],
         [ 11.8787,   4.6417,  -0.0000,  ...,  11.8295,   3.2298,  -9.1687],
         [-13.2332,  12.3829, -20.7847,  ..., -10.4894,  13.8055,  19.5107]],

        [

### 1.8 层标准化

程序段1.8实现了层标准化的计算逻辑。

In [17]:
# 自定义的层标准化
class LayerNormalization(nn.Module):

    def __init__(self, d_model: int, eps:float=10**-8):
        super().__init__()
        self.eps = eps
        self.alpha = nn.Parameter(torch.ones(d_model)) # alpha是可训练参数
        self.bias = nn.Parameter(torch.zeros(d_model)) # bias是可训练参数

    def forward(self, x):
        # x: (batch_size, seq_len, d_model)
        mean = x.mean(dim = -1, keepdim = True) # (batch_size, seq_len, 1)
        std = x.std(dim = -1, keepdim = True) # (batch_size, seq_len, 1)
        return self.alpha * (x - mean) / (std + self.eps) + self.bias

In [18]:
# 测试
LN = LayerNormalization(d_model)  # 用自己定义的层标准化
x1_ln = LN(X)  # X样本做层标准化
LN2 = nn.LayerNorm(d_model)  # 用 pytorch 预定义的层标准化函数
x2_ln = LN2(X)
print(f'层标准化之前的X：\n {X}')
print(f'\n自定义的层标准化：\n {x1_ln}')
print(f'\nPytorch定义的层标准化：\n {x2_ln}')

层标准化之前的X：
 tensor([[[ -7.3791,   2.1448,   0.7829,  ...,   6.9030,   9.3861,   0.0000],
         [  2.5428,  -5.7630,   0.0000,  ...,  -1.1007, -13.4121,   5.6321],
         [-13.7783,   1.8668, -11.1591,  ...,  -8.8603,  -7.9482, -14.5927],
         ...,
         [ -5.5646,   4.3508,  -5.0623,  ...,   3.9187,  -1.9886,  -4.7542],
         [  3.5289,   2.2999,   5.4827,  ..., -10.9634,  -9.4228,  -3.6248],
         [ 11.2869,   9.8718,  -4.5895,  ...,   4.5762,  -7.1419,  -3.2897]],

        [[  5.2099,  13.9342,   1.2197,  ..., -14.3893,  18.5205,  -5.3147],
         [  8.7584,  -3.1686,   0.0000,  ..., -10.5332,   9.2925,  10.2795],
         [  2.5875,   7.0298,   4.4812,  ...,  -7.1233,   6.6587,   5.7045],
         ...,
         [  2.9565,   4.6786,   6.4331,  ...,   5.4211, -13.6371,   6.2226],
         [ 11.8787,   4.6417,  -0.0000,  ...,  11.8295,   3.2298,  -9.1687],
         [-13.2332,  12.3829, -20.7847,  ..., -10.4894,  13.8055,  19.5107]],

        [[-15.7769,   0.0000,  11

### 1.9 前馈网络

程序段1.9实现了前馈网络的计算逻辑。

In [19]:
class FeedForward(nn.Module):  # 前馈网络

    def __init__(self, d_model: int, d_ff: int, dropout: float):
        super().__init__()
        self.linear_1 = nn.Linear(d_model, d_ff) # 第一层宽度为 d_ff = 4 * d_model
        self.dropout = nn.Dropout(dropout)
        self.linear_2 = nn.Linear(d_ff, d_model) 
        self.norm = nn.LayerNorm(d_model)

    def forward(self, x):
        # (batch_size, seq_len, d_model) --> (batch_size, seq_len, d_ff)  
        x = self.linear_1(self.norm(x))
        x = torch.relu(x)
        x = self.dropout(x)
        # (batch_size, seq_len, d_ff) --> (batch_size, seq_len, d_model)
        x = self.linear_2(x)
        x = self.dropout(x)
        return x

In [20]:
# 测试
d_model = 512
d_ff = 2048
dropout = 0.1
feed_forward = FeedForward(d_model, d_ff, dropout)  # 前馈网络
X = X + feed_forward(X)   # 残差块

print(f'前馈网络结合残差块之后的X的形状：\n {X.shape}')
print(f'\n前馈网络结合残差块之后的X：\n {X}')

前馈网络结合残差块之后的X的形状：
 torch.Size([4, 8, 512])

前馈网络结合残差块之后的X：
 tensor([[[-7.1005e+00,  1.9521e+00,  7.4475e-01,  ...,  7.0929e+00,
           9.3393e+00,  3.2801e-01],
         [ 2.8510e+00, -5.8337e+00,  1.8062e-01,  ..., -1.1898e+00,
          -1.3069e+01,  5.2116e+00],
         [-1.3757e+01,  1.4670e+00, -1.1211e+01,  ..., -8.8951e+00,
          -7.7223e+00, -1.5258e+01],
         ...,
         [-5.4868e+00,  4.1346e+00, -5.1322e+00,  ...,  4.1459e+00,
          -1.2711e+00, -4.8632e+00],
         [ 4.0392e+00,  1.8754e+00,  5.5188e+00,  ..., -1.0963e+01,
          -8.7869e+00, -3.9294e+00],
         [ 1.1364e+01,  9.7671e+00, -4.5206e+00,  ...,  4.5762e+00,
          -7.1844e+00, -3.4258e+00]],

        [[ 5.5804e+00,  1.3896e+01,  1.2083e+00,  ..., -1.4566e+01,
           1.8533e+01, -5.3147e+00],
         [ 9.1912e+00, -3.0121e+00, -1.1040e-01,  ..., -1.0125e+01,
           9.3409e+00,  1.0558e+01],
         [ 2.7110e+00,  7.1455e+00,  4.4812e+00,  ..., -6.5864e+00,
           6.928

### 1.10 Transformer编码器定义

程序段1.10：编码器单层和整体的定义

In [21]:
class EncoderLayer(nn.Module):  # 编码器的单层定义

    def __init__(self, 
                 d_model: int, 
                 heads: int,
                 d_ff: int = 2048,
                 dropout: float = 0.1):
        super().__init__()
        self.norm = nn.LayerNorm(d_model)
        self.self_attention = MultiHeadAttention(d_model, heads, dropout)  # 多头自注意力
        self.feed_forward = FeedForward(d_model, d_ff, dropout)  # 前馈网络
        
    # 编码器输入 x 和 掩码矩阵 src_mask
    def forward(self, x, src_mask):
        # 多头注意力计算，注意力残差块
        x = x + self.self_attention(self.norm(x),   # Q
                                    self.norm(x),   # K
                                    self.norm(x),   # V
                                    src_mask)     # 填充掩码矩阵
        # 前馈网络计算，残差块
        x = x + self.feed_forward(self.norm(x))   
        return x

class Encoder(nn.Module):  # 编码器

    def __init__(self, 
                 d_model: int, 
                 heads: int,
                 d_ff: int = 2048,
                 dropout: float = 0.1, 
                 num_layers: int = 6):
        
        super().__init__()
        # 创建编码器各层
        encoder_blocks = []
        for _ in range(num_layers):
            encoder_layer = EncoderLayer(d_model, heads, d_ff, dropout)
            encoder_blocks.append(encoder_layer)
        self.layers = nn.ModuleList(encoder_blocks)   # 编码器各层列表
    
    # 编码器输入 x 和 填充掩码矩阵 src_mask
    def forward(self, x, src_mask):
        for layer in self.layers:
            x = layer(x, src_mask)  # 连接各个编码器层
        return x 

In [22]:
# 测试
num_layers = 6  # 编码器层数
d_model = 512
heads = 8
dropout = 0.1
d_ff = 2048
# 创建编码器
encoder = Encoder(d_model, heads, d_ff, dropout, num_layers)
src_mask = None
X = encoder(X, src_mask)  # 编码器推理
print(f'编码器输出X的形状：\n {X.shape}')
print(f'编码器输出X的内容：\n {X}')

编码器输出X的形状：
 torch.Size([4, 8, 512])
编码器输出X的内容：
 tensor([[[ -6.9696,   2.0169,   1.7395,  ...,   5.6665,  10.2217,   0.8741],
         [  3.5567,  -4.6368,   0.0454,  ...,  -2.2614, -13.6203,   5.2182],
         [-12.5085,   1.6310, -10.4852,  ...,  -9.9112,  -7.5822, -14.9854],
         ...,
         [ -5.0721,   4.7207,  -5.2429,  ...,   3.1427,  -1.5077,  -4.7265],
         [  3.9986,   2.7173,   6.1194,  ..., -12.2413,  -8.1812,  -3.7893],
         [ 11.7539,  10.0915,  -4.7571,  ...,   3.1658,  -6.9866,  -2.9779]],

        [[  6.4644,  14.3475,   1.0192,  ..., -15.8006,  19.5611,  -4.8293],
         [  9.8280,  -3.5177,   0.1406,  ..., -11.4785,   9.8215,  10.5814],
         [  2.4028,   6.3767,   4.5184,  ...,  -6.8328,   8.0347,   6.7899],
         ...,
         [  2.8619,   4.0514,   7.9163,  ...,   4.9041, -13.2488,   4.7620],
         [ 12.6679,   4.4329,   0.8263,  ...,  11.2225,   3.1525,  -8.7614],
         [-12.9275,  12.8445, -20.0032,  ..., -10.8710,  14.1446,  19.8254]

### 1.11 Transformer解码器定义

程序段1.11：解码器单层和整体的定义

In [23]:
class DecoderLayer(nn.Module):  # 解码器的单层定义

    def __init__(self, 
                 d_model: int, 
                 heads: int, 
                 d_ff: int = 2048, 
                 dropout: float = 0.1):
        super().__init__()
        self.norm = nn.LayerNorm(d_model)
        self.self_attention = MultiHeadAttention(d_model, heads, dropout)   # 掩码多头自注意力
        self.cross_attention = MultiHeadAttention(d_model, heads, dropout)  # 交叉多头注意力
        self.feed_forward = FeedForward(d_model, d_ff, dropout)  # 前馈网络

    def forward(self, x, encoder_output, src_mask, tgt_mask):
        # 多头自注意力计算， 注意力残差块，输入的是 Q 、K、V，tgt_mask 解码器掩码
        x = x + self.self_attention(self.norm(x), self.norm(x), self.norm(x), tgt_mask)
        # 交叉多头注意力计算， 注意力残差块
        x = x + self.cross_attention(self.norm(x),  # 解码器上一层输出的 Q矩阵
                                     self.norm(encoder_output),   # 编码器输出的 K矩阵
                                     self.norm(encoder_output),   # 编码器输出的 V矩阵
                                     src_mask)     # src_mask 表示填充掩码
        # 前馈网络计算， 残差块
        x = x + self.feed_forward(self.norm(x))   
        return x

class Decoder(nn.Module):   # 解码器

    def __init__(self, 
                 d_model: int, 
                 heads: int, 
                 d_ff: int = 2048, 
                 dropout: float = 0.1, 
                 num_layers: int = 6):
        
        super().__init__()
        # 创建解码器各层
        decoder_blocks = []
        for _ in range(num_layers):
            decoder_layer = DecoderLayer(d_model, heads, d_ff, dropout)
            decoder_blocks.append(decoder_layer)
        self.layers = nn.ModuleList(decoder_blocks)   # 解码器各层列表
    
    def forward(self, x, encoder_output, src_mask, tgt_mask):
        for layer in self.layers:
            x = layer(x, encoder_output, src_mask, tgt_mask)  # 连接各层
        return x 

In [24]:
# 测试
num_layers = 6  # 解码器层数
d_model = 512
heads = 8
dropout = 0.1
d_ff = 2048

# 创建解码器
decoder = Decoder(d_model, heads, d_ff, dropout, num_layers)
src_mask = None
tgt_mask = None
encoder_output = X  #  假定 X 是编码器的输出 

# 为便于演示，这里解码器的输入暂时也用 X 表示
X = decoder(X, encoder_output, src_mask, tgt_mask)  # 解码器推理
print(f'解码器输出X的形状：\n {X.shape}')
print(f'解码器输出的X内容：\n {X}')

解码器输出X的形状：
 torch.Size([4, 8, 512])
解码器输出的X内容：
 tensor([[[ -6.6993,   2.1376,   0.7858,  ...,   5.9735,  10.4620,  -0.2455],
         [  3.8111,  -4.0369,  -1.0978,  ...,  -2.3895, -13.1481,   4.6516],
         [-11.8709,   3.3796, -10.1133,  ...,  -9.0457,  -7.6680, -15.5601],
         ...,
         [ -4.1956,   4.9070,  -5.7385,  ...,   3.4915,  -1.3652,  -5.5563],
         [  4.3013,   2.8888,   4.9513,  ..., -11.2582,  -9.6134,  -5.4908],
         [ 12.3409,  11.4703,  -5.1791,  ...,   2.9760,  -6.9357,  -4.3056]],

        [[  6.8585,  13.3532,   0.4404,  ..., -16.0193,  19.0073,  -3.9782],
         [  9.9097,  -4.5325,  -0.1822,  ..., -13.1542,  10.2615,  10.2845],
         [  1.8819,   7.1601,   4.4242,  ...,  -7.0217,   7.2066,   7.6939],
         ...,
         [  2.7568,   4.4886,   7.6113,  ...,   3.4948, -12.9694,   5.8626],
         [ 13.5333,   4.8222,   0.4031,  ...,   9.4285,   3.4756,  -9.1734],
         [-12.6108,  12.9711, -19.6789,  ..., -11.6283,  12.9637,  20.3004]

### 1.12 Transformer模型定义

In [25]:
 # 字典映射层，将解码器输出的词向量映射为词典长度的向量
class OutputLayer(nn.Module): 

    def __init__(self, d_model, vocab_size):
        super().__init__()
        self.proj = nn.Linear(d_model, vocab_size) # 映射回词典空间

    def forward(self, x):
        # (batch, seq_len, d_model) --> (batch, seq_len, vocab_size)
        return self.proj(x)

In [26]:
class Transformer(nn.Module):  # Transformer 模型定义

    def __init__(self, 
          src_vocab_size: int,   # 源词典大小
          tgt_vocab_size: int,   # 目标词典大小
          src_seq_len: int,      # 源序列最大长度
          tgt_seq_len: int,      # 目标序列最大长度
          d_model: int = 512,    # 模型宽度
          heads: int=8,          # 注意力头数
          num_encoder_layers: int = 6,    # 编码器层数
          num_decoder_layers: int = 6,    # 解码器层数
          d_ff: int = 2048,      # 前馈网络第一层的宽度
          dropout: float = 0.1):
        
        super().__init__()
        # 创建词向量输入层
        self.src_embed = InputEmbeddings(src_vocab_size, d_model)
        self.tgt_embed = InputEmbeddings(tgt_vocab_size, d_model)

        # 创建词向量位置编码层
        self.src_pos = PositionalEncoding(src_seq_len, d_model, dropout)
        self.tgt_pos = PositionalEncoding(tgt_seq_len, d_model, dropout)       
        # 创建编码器
        self.encoder = Encoder(d_model, heads, d_ff, dropout, num_encoder_layers)
        # 创建解码器
        self.decoder = Decoder(d_model, heads, d_ff, dropout, num_decoder_layers)      
        # 词向量投射到字典空间
        self.project_layer = OutputLayer(d_model, tgt_vocab_size)

    def encode(self, src, src_mask):  # 编码器编码过程
        # (batch, seq_len, d_model)
        src = self.src_embed(src)  # 编码器词嵌入
        src, _ = self.src_pos(src)   # 编码器的位置编码
        encoder_output = self.encoder(src, src_mask)  # 编码器推理
        return encoder_output   # 输出的形状维度：(batch_size, seq_len, d_model)
    
    def decode(self, 
               tgt: torch.Tensor,
               encoder_output: torch.Tensor, 
               src_mask: torch.Tensor,  
               tgt_mask: torch.Tensor):
        # (batch, seq_len, d_model)
        tgt = self.tgt_embed(tgt)   # 解码器词嵌入
        tgt, _ = self.tgt_pos(tgt)     # 解码器的位置编码
        decoder_output = self.decoder(tgt, encoder_output, src_mask, tgt_mask)  # 解码器推理
        return decoder_output  # 输出的形状维度：(batch_size, seq_len, d_model)
    def output_layer(self, x):
        # (batch, seq_len, vocab_size)
        return self.project_layer(x)

In [27]:
# 测试
src_seq_len = 8  #  源序列最大长度
tgt_seq_len = 8  #  目标序列最大长度
src_vocab_size = 21128  # 设置编码器端的词典大小
tgt_vocab_size = 30522  # 设置解码器词端的典大小
d_model = 512     # 设置模型维度
heads = 8   # 注意力头数
num_encoder_layers = 6   # 编码器层数
num_decoder_layers = 6   # 解码器层数
d_ff = 2048   # 前馈网络第一层的宽度
dropout = 0.1   
batch_size = 4   # 设置批次大小
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 创建 Transformer 模型实例
model = Transformer(src_vocab_size, 
              tgt_vocab_size, 
              src_seq_len, 
              tgt_seq_len, 
              d_model, 
              heads, 
              num_encoder_layers, 
              num_decoder_layers, 
              d_ff,
              dropout)

# 初始化模型参数，对于维度大于 1 的参数，使用 Xavier 均匀初始化方法进行初始化
for p in model.parameters():  # 遍历模型中的可学习参数 
    if p.dim() > 1:
        nn.init.xavier_uniform_(p)
# 将模型移到正确的设备
model = model.to(device)

# 使用 torch.randint 生成随机整数张量 src，取值范围为 [0, src_vocab_size),表示编码器的输入
src = torch.randint(0, src_vocab_size, (batch_size, src_seq_len))
print(f'编码器输入的形状：\n {src.shape}')
print(f'\n编码器输入的内容：\n {src}')

# 使用 torch.randint 生成随机整数张量 tgt，取值范围为 [0, tgt_vocab_size),表示解码器的输入
tgt = torch.randint(0, tgt_vocab_size, (batch_size, tgt_seq_len))
print(f'\n解码器输入的形状：\n {tgt.shape}')
print(f'\n解码器输入的内容：\n {tgt}')

# 掩码矩阵
src_mask = torch.tril(torch.ones(src_seq_len, src_seq_len)).unsqueeze(0)
tgt_mask = torch.tril(torch.ones(tgt_seq_len, tgt_seq_len)).unsqueeze(0)
print(f'\n编码器掩码矩阵的形状：\n {src_mask.shape}')
print(f'\n解码器掩码矩阵的形状：\n {tgt_mask.shape}')

# 确保输入数据在正确的设备上
src = src.to(device)
tgt = tgt.to(device)
src_mask = src_mask.to(device)
tgt_mask = tgt_mask.to(device)

# 编码器推理
encoder_output = model.encode(src, src_mask)
# 解码器推理
decoder_output = model.decode(tgt, encoder_output, src_mask, tgt_mask)

# 如果是训练模式，输出层投射时，decoder_output的维度不变
# 如果是推理模式，用 decoder_output[:, -1]投射，即投射每个样本的最后一个词嵌入向量
# 训练模式维度变化：(batch, seq_len, d_model) --> (batch, seq_len, vocab_size)
# 推理模式维度变化： (batch, d_model) --> (batch, vocab_size)
logits = model.output_layer(decoder_output[:,-1])

print(f'Transformer输出的形状：\n {logits.shape}')
print(f'Transformer输出的内容：\n {logits}')

编码器输入的形状：
 torch.Size([4, 8])

编码器输入的内容：
 tensor([[ 4118,  7961, 15944, 20728, 18026,  2926,  4422, 14583],
        [ 1391, 13255, 18307,  5554, 10778, 14992, 20213, 10499],
        [ 3916,  6704, 10057, 18068, 19197,  2098, 15748,  5110],
        [ 8677, 13728, 15260, 19508,  5855,  8106,  7820, 16374]])

解码器输入的形状：
 torch.Size([4, 8])

解码器输入的内容：
 tensor([[29213, 19024,  8361, 19524,   825, 15783, 25248, 25059],
        [15110, 10750, 25489,   993, 20811,  8838, 22323, 23622],
        [ 7417, 27051, 21660, 27075, 15680,  3570, 12608, 15916],
        [21175,  5771, 23228, 21306, 28363, 17342,  4529,  4211]])

编码器掩码矩阵的形状：
 torch.Size([1, 8, 8])

解码器掩码矩阵的形状：
 torch.Size([1, 8, 8])
Transformer输出的形状：
 torch.Size([4, 30522])
Transformer输出的内容：
 tensor([[-2.2468e-01,  9.3363e-01, -4.9548e-02,  ..., -5.1730e-02,
          8.5177e-01,  3.4542e-01],
        [-8.5632e-01,  4.6375e-01,  6.7897e-01,  ..., -5.5763e-04,
          5.7953e-01,  5.4860e-01],
        [-1.3713e+00,  4.3081e-01,  4.3034e-02

查看模型需要学习和训练的参数总量：

In [29]:
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f'模型可训练参数数量是: {trainable_params}')

模型可训练参数数量是: 86198074
