In [1]:
import torch
from torch import nn
from torch.nn import functional as F
import math

In [2]:
# 缩放点积注意力
def scaled_dot_product_attention(Q, K, V, mask=None):
    # 每个单词所映射到的维度
    embed_size = Q.size(-1)
    # (batch_size, seq_len, embed_size)
    # 计算点积并且缩放
    scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(embed_size)
    # 有掩码的话就对掩码进行填充
    if mask is not None:
        scores = scores.masked_fill(mask == 0, float('-inf'))
    # 对缩放后的分数应用Softmax函数,得到注意力权重
    atten_weights = F.softmax(scores, dim=-1)
    # 1.加权求和计算输出 2.返回注意力权重
    return atten_weights @ V, atten_weights

In [6]:
# 单头注意力机制
# class Attention(nn.Module):
#     def __init__(self, x):
#         super(Attention, self).__init__()
#         self.x = x

#     def forward(mask=None):
#         embed_size = x.size(-1)
        
#         Q = nn.Linear(embed_size, embed_size)
#         K = nn.Linear(embed_size, embed_size)
#         V = nn.Linear(embed_size, embed_size)

#         output, atten_weights = scaled_dot_product_attention(Q, K, V, mask)
        
#         return output, atten_weights

# 单头注意力机制
class Attention(nn.Module):
    def __init__(self, embed_size):
        super(Attention, self).__init__()
        self.embed_size = embed_size

        # 定义线性层,用于生成查询、键和值矩阵
        self.w_q = nn.Linear(embed_size, embed_size)
        self.w_k = nn.Linear(embed_size, embed_size)
        self.w_v = nn.Linear(embed_size, embed_size)

    def forward(self, q, k, v, mask=None):
        # 将输入序列通过线性变换,变成 Q K V 矩阵
        Q = self.w_q(q)
        K = self.w_k(k)
        V = self.w_v(v)

        # 缩放点积注意力 得到值已经注意力权重
        output, atten_weights = scaled_dot_product_attention(Q, K, V, mask)

        return output, atten_weights

In [7]:
# 自注意力机制
class SelfAttention(nn.Module):
    def __init__(self, embed_size):
        super(SelfAttention, self).__init__()
        self.embed_size = embed_size
        self.attention = Attention(embed_size)

    def forward(self, x, mask=None):
        # 在自注意力机制中,q、k、v都来自于同一个输入序列
        output, atten_weights = self.attention(x, x, x, mask)
        return output, atten_weights

In [9]:
# 交叉注意力
class CrossAttention(nn.Module):
    def __init__(self, embed_size):
        self.embed_size = embed_size
        self.attention = Attention(embed_size)

    def forward(self, q, kv, mask=None):
        # 在交叉注意力中,q来自于解码器,kv来自于编码器
        output, atten_weights = self.attention(q, kv, kv, mask)
        return output, atten_weights

In [11]:
# 多头注意力机制
class MultiHeadAttention(nn.Module):
    def __init__(self, embed_size, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.embed_size = embed_size
        self.num_heads = num_heads

        assert self.embed_size % self.num_heads == 0

        self.d_k = self.embed_size // self.num_heads

        # 定义线性层,用于生成q k v 矩阵
        self.w_q = nn.Linear(embed_size, embed_size)
        self.w_k = nn.Linear(embed_size, embed_size)
        self.w_v = nn.Linear(embed_size, embed_size)

        # 输出线性层,用于将多头拼接后的输出映射回embed_size
        self.fc_out = nn.Linear(embed_size, embed_size)

    def forward(self, q, k, v, mask=None):
        n_batch = q.size(0)
        # (batch_size, seq_len, embed_size) -> (batch_size, num_heads, seq_len, head_nums)
        query = self.w_q(q).view(n_batch, -1, self.num_heads, self.d_k).transpose(1, 2)
        key = self.w_k(k).view(n_batch, -1, self.num_heads, self.d_k).transpose(1, 2)
        value = self.w_v(v).view(n_batch, -1, self.num_heads, self.d_k).transpose(1, 2)

        # x的形状 (batch_size, num_heads, seq_len_q, d_v)
        x, atten_weights = scaled_dot_product_attention(query, key, value, mask)
        # (batch_size, seq_len, embed_size)
        x = x.transpose(1, 2).contiguous().view(n_batch, -1, self.num_heads * self.d_k)
        # (batch_size, seq_len, embed_size)
        return self.fc_out(x)

In [19]:
batch_size = 2
num_heads = 2
seq_len_q = 3 # 查询序列长度
seq_len_k = 3 # 键序列长度
head_dim = 4

# 模拟查询矩阵 Q 和键值矩阵 K, V
Q = torch.randn(batch_size, num_heads, seq_len_q, head_dim)
K = torch.randn(batch_size, num_heads, seq_len_k, head_dim)
V = torch.randn(batch_size, num_heads, seq_len_k, head_dim)

# 生成下三角掩码矩阵(1, 1, seq_len_q, seq_len_k),通过广播应用到所有头
mask = torch.tril(torch.ones(seq_len_q, seq_len_k)).unsqueeze(0).unsqueeze(0)

# 执行缩放点积注意力,并应用下三角掩码
output, atten_weights = scaled_dot_product_attention(Q, K, V, mask)

In [21]:
atten_weights

tensor([[[[1.0000, 0.0000, 0.0000],
          [0.5771, 0.4229, 0.0000],
          [0.1815, 0.2287, 0.5898]],

         [[1.0000, 0.0000, 0.0000],
          [0.4707, 0.5293, 0.0000],
          [0.8778, 0.0980, 0.0242]]],


        [[[1.0000, 0.0000, 0.0000],
          [0.9347, 0.0653, 0.0000],
          [0.1374, 0.4930, 0.3695]],

         [[1.0000, 0.0000, 0.0000],
          [0.3168, 0.6832, 0.0000],
          [0.3102, 0.4320, 0.2579]]]])

In [23]:
# FFN 前馈神经网络
# self.w_2(self.dropout(self.w_1(x).relu()))
# 本质就是两层线性网络,加一个relu函数
class PositionwiseFeedForward(nn.Module):
    """
    位置前馈网络

    参数:
        d_model: 输入和输出向量的维度
        d_ff: FNN隐藏层的维度,或者说中间层
        dropout: 随机失活率(Dropout),即随机屏蔽部分神经元的输出,用于防止过拟合
    """
    def __init__(self, d_model, d_ff, dropout=0.1):
        super(PositionwiseFeedForward, self).__init__()
        self.w_1 = nn.Linear(d_model, d_ff)
        self.w_2 = nn.Linear(d_ff, d_model)
        # self.dropout = nn.Dropout(dropout)
        self.relu = nn.ReLU()

    def forward(self, x):
        return self.w_2(self.relu(self.w_1(x)))

In [25]:
# 残差连接,避免梯度消失
class ResidualConnection(nn.Module):
    def __init__(self, dropout=0.1):
        super(ResidualConnection, self).__init__()
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, sublayer):
        return x + self.dropout(sublayer(x))

In [26]:
class LayerNorm(nn.Module):
    # Output = γx + β 其中γ和β是可学习的参数,用于进一步调整归一化后的输出
    def __init__(self, feature_size, epsilon=1e-9):
        # 可学习缩放参数,初始值为1
        self.gamma = nn.Parameter(torch.ones(feature_size))
        # 可学习偏移参数,初始值为0
        self.beta = nn.Parameter(torch.zeros(feature_size))
        self.epsilon = epsilon

    def forward(self, x):
        mean = x.mean(dim=-1, keepdim=True)
        std = x.std(dim=-1, keepdim=True)
        return self.gamma * (x - mean) / (std + self.epsilon) + self.beta

In [27]:
# 残差网络和归一化
# Output = LayerNorm(x + Sublayer(x))
class SublayerConnection(nn.Module):
    def __init__(self, feature_size, dropout=0.1, epsilon=1e-9):
        super(SublayerConnection, self).__init__()
        self.redidual = ResidualConnection(dropout) # 使用ResidualConnection 进行残差连接
        self.norm = LayerNorm(feature_size, epsilon) # 层归一化

    def forward(self, x, sublayer):
        # 将子层输出应用dropout后经过残差连接后再进行归一化
        return self.norm(self.redidual(x, sublayer))

In [28]:
class SublayerConnection(nn.Module):
    def __init__(self, feature_size, dropout=0.1, epsilon=1e-9):
        super(SublayerConnection, self).__init__()
        self.norm = LayerNorm(feature_size, epsilon)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, sublayer):
        # 将子层输出到应用dropout后经过残差连接后再进行归一化
        return self.norm(x + self.dropout(sublayer(x)))

In [29]:
class Embeddings(nn.Module):
    def __init__(self, vocab_size, d_model):
        super(Embeddings, self).__init__()
        self.embed = nn.Embedding(vocab_size, d_model)
        self.scale_factor = math.sqrt(d_model)

    def forward(self, x):
        return self.embed(x) * self.scale_factor

In [35]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1) # 位置索引 (max_len, 1)

        # 计算每个维度对应的频率
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0)/d_model))
        
        # 位置和频率相结合
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        # 增加一个维度方便后续相加, 形状变为 (1, max_len, d_model)
        pe = pe.unsqueeze(0)

        #将位置编码注册为模型的缓冲区,不作为参数更新
        self.register_buffer('pe', pe)

    def forward(self, x):
        # 取出与输入序列长度相同的部分位置编码，并与输入相加
        x = x + self.pe[:, :x.size(1), :]
        return self.dropout(x)

In [36]:
class SourceEmbedding(nn.Module):
    def __init__(self, src_vocab_size, d_model, dropout=0.1):
        super(SourceEmbedding, self).__init__()
        self.embed = Embeddings(src_vocab_size, d_model) # 词嵌入层
        self.positional_encoding = PositionalEncoding(d_model, dropout)

    def forward(self, x):
        x = self.embed(x)
        return self.positional_encoding(x)

In [38]:
class TargetEmbedding(nn.Module):
    def __init__(self, tgt_vocab_size, d_model, dropout=0.1):
        super(TargetEmbedding, self).__init__()
        self.embed = nn.Embedding(tgt_vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, dropout)

    def forward(self, x):
        x = self.embed(x)
        return self.positional_encoding(x)

In [41]:
# Padding Mask 填充掩码
def create_padding_mask(seq, pad_token_id=0):
    # seq的形状为(batch_size, seq_len)
    mask = (seq != pad_token_id).unsqueeze(1).unsqueeze(2) # (batch_size, 1, 1, seq_len)
    return mask

In [40]:
# seq = torch.tensor([[5, 7, 9, 0, 0], [8, 6, 0, 0, 0]])  # 0 表示 <PAD>
# print(create_padding_mask(seq))

tensor([[[[ True,  True,  True, False, False]]],


        [[[ True,  True, False, False, False]]]])


In [44]:
# 未来信息掩码 Look-ahead Mask
def create_look_ahead_mask(size):
    # 下三角矩阵
    mask = torch.tril(torch.ones(size, size)).type(torch.bool) 
    return mask # (seq_len, seq_len)

In [45]:
print(create_look_ahead_mask(5))

tensor([[ True, False, False, False, False],
        [ True,  True, False, False, False],
        [ True,  True,  True, False, False],
        [ True,  True,  True,  True, False],
        [ True,  True,  True,  True,  True]])


In [52]:
# 组合掩码,我们需要将填充掩码和未来信息掩码进行组合,同时实现两种掩码的效果
def create_decoder_mask(tgt_seq, pad_token_id=0):
    # (batch_size, 1, 1, seq_len_tgt)
    padding_mask = create_padding_mask(tgt_seq, pad_token_id)
    # (seq_len_tgt, seq_len_tgt)
    look_ahead_mask = create_look_ahead_mask(tgt_seq.size(1)).to(tgt_seq.device)

    # 广播机制 pytorch会从右往左对齐
    # (batch_size, 1, seq_len_tgt, seq_len_tgt)
    combined_mask = look_ahead_mask.unsqueeze(0) & padding_mask
    return combined_mask

In [53]:
# tgt_seq = torch.tensor([[1, 2, 3, 4, 0]])  # 0 表示 <PAD>
# print(create_decoder_mask(tgt_seq))

In [54]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, h, d_ff, dropout):
        """
        编码层

        参数:
            d_model: 嵌入维度
            h: 多头注意力的头数
            d_ff: 前馈神经网络的隐藏层维度
            dropout: Dropout 概率
        """
        super(EncoderLayer, self).__init__()
        self.self_atten = MultiHeadAttention(d_model, h)
        self.feed_forward = PositionwiseFeedForward(d_model, d_ff)

        # 定义两个子层,分别用于多头注意力跟前馈神经网络
        self.sublayers = nn.ModuleList([SublayerConnection(d_model, dropout) for _ in range(2)])
        self.d_model = d_model

    def forward(self, x, src_mask):
        # 自注意力层
        x = self.sublayers[0](x, lambda x: self.self_atten(x, x, x, src_mask))
        # 前馈子层
        x = self.sublayers[1](x, self.feed_forward)

In [55]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, h, d_ff, dropout):
        """
        解码器

        参数:
            d_model: 嵌入维度
            h: 多头注意力的头数
            d_ff: 前馈神经网络的隐藏层维度
            dropout: Dropout 概率
        """
        super(DecoderLayer, self).__init__()
        # 带掩码的多头注意力
        self.self_attn = MultiHeadAttention(d_model, h)
        # 交叉注意力
        # self.cross_attn = CrossAttention(d_model)
        self.cross_attn = MultiHeadAttention(d_model, h)
        # 前馈神经网络
        self.feed_forward = PositionwiseFeedForward(d_model, d_ff, dropout)

        # 定义三个子层连接,分别用于掩码多头注意力、多头注意力和前馈神经网络
        self.sublayers = nn.ModuleList([SublayerConnection(d_model, dropout) for _ in range(3)])
        self.d_model = d_model

    def forward(self, x, memory, src_mask, tgt_mask):
        # 第一个子层: 掩码多头自注意力
        x = self.sublayers[0](x, lambda x: self.self_attn(x, x, x, tgt_mask))
        
        # 第二层: 交叉多头注意力
        x = self.sublayers[1](x, lambda x: self.cross_attn(x, memory, memory, src_mask))

        # 第三层: 前馈神经网络
        x = self.sublayers[2](x, self.feed_forward)

        return x

In [56]:
class Encoder(nn.Module):
    def __init__(self, d_model, N, h, d_ff, dropout=0.1):
        """
        编码器,由N个EncoderLayer堆叠而成

        参数:
            d_model: 嵌入维度
            N: 编码器层的数量
            h: 多头注意力的头数
            d_ff: 前馈神经网络的隐藏层维度
            dropout: Dropout 概率
        """
        super(Encoder, self).__init__()
        self.layers = nn.ModuleList([EncoderLayer(d_model, h, d_ff, dropout) for _ in range(N)])
        self.norm = LayerNorm(d_model) # 最后一层再做一次归一化

    def forward(self, x, mask):
        for layer in self.layers:
            x = layer(x, mask)
        return self.norm(x) # 最后层归一化

In [None]:
class Decoder(nn.Module):
    def __init__(self, d_model, N, h, d_ff, dropout=0.1):
        super(Decoder, self).__init__()
        self.layers = nn.ModuleList([DecoderLayer(d_model, h, dropout) for _ in range(N)])
        self.norm = LayerNorm(d_model) # 最后一层归一化

    def forward(self, x, memory, src_mask, tgt_mask):
        """
        前向传播函数。
        
        参数:
            x: 解码器输入 (batch_size, seq_len_tgt, d_model)
            memory: 编码器的输出 (batch_size, seq_len_src, d_model)
            src_mask: 用于交叉注意力的源序列掩码
            tgt_mask: 用于自注意力的目标序列掩码
            
        返回:
            解码器的输出
        """
        for layer in self.layers:
            x = layer(x, memory, src_mask, tgt_mask)
        return self.norm(x) # 最后一层归一化