In [7]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

**词嵌入和位置编码：**

In [8]:
class TokenEmbedding(nn.Module):
    def __init__(self, vocab_size, d_model):
        super(TokenEmbedding, self).__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)#vocab_size是词汇表大小
        self.d_model = d_model

    def forward(self, x):
        return self.embedding(x) * math.sqrt(self.d_model)  # 缩放


class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):#max_len是最大序列长度
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)#创建一个全0矩阵
        position = torch.arange(0, max_len).unsqueeze(1).float()#构建位置索引，unsqueeze(1)的作用是转为列向量
        #位置按奇数偶数编码
        div_term = torch.exp(torch.arange(0, d_model, 2).float() *-(math.log(10000.0) / d_model))#计算公式中的分母
        pe[:, 0::2] = torch.sin(position * div_term)#所有行，每隔两列从第0列开始 → 偶数列
        pe[:, 1::2] = torch.cos(position * div_term)#所有行，每隔两列从第1列开始 → 奇数列
        pe = pe.unsqueeze(0)  # [1, max_len, d_model]
        self.register_buffer('pe', pe)#注册为buffer,把 pe当作模型的一部分保存下来不是可训练参数（不需要梯度）,会随模型一起被保存、加载、移动到 GPU

    def forward(self, x):
        return x + self.pe[:, :x.size(1)]#self.pe是预先计算好的位置编码

**注意力机制：**

In [9]:
def scaled_dot_product_attention(query, key, value, mask=None, dropout=0.0):
    d_k = query.size(-1)
    scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)
    if mask is not None:
        scores = scores.masked_fill(~mask, -1e9)
    attn_weights = F.softmax(scores, dim=-1)
    if dropout > 0.0:
        attn_weights = F.dropout(attn_weights, p=dropout, training=True)
    output = torch.matmul(attn_weights, value)
    return output, attn_weights


**多头注意力机制：**

In [10]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads, dropout=0.0):
        super(MultiHeadAttention, self).__init__()
        assert d_model % num_heads == 0 #断言：d_model 必须能被 num_heads 整除
        self.d_k = d_model // num_heads #计算每一个头的维度
        self.num_heads = num_heads #头的数量
        self.w_q = nn.Linear(d_model, d_model) 
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model) #
        self.w_o = nn.Linear(d_model, d_model) #输出的线性变换
        self.dropout_p = dropout

    def forward(self, query, key, value, mask=None):
        batch_size = query.size(0)

        Q = self.w_q(query).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2) 
        #.query是线性投影"（Linear Projection），目的是让模型学习如何生成适合注意力机制的 Query 向量。
        #.view() 是重塑张量形状的操作，也可以理解维分头操作
        #.transpose(1,2)是交换维度，因为我们要对每个头独立计算注意力，所以要把 num_heads 放在 seq_len 前面。
        K = self.w_k(key).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        V = self.w_v(value).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)

        # 注意力机制
        attn_output, _ = scaled_dot_product_attention(Q, K, V, mask, self.dropout_p)

        # 连接多头
        attn_output = attn_output.transpose(1, 2).contiguous().view(batch_size, -1, self.num_heads * self.d_k)

        # 输出的线性变换
        output = self.w_o(attn_output)
        return output


**前馈网络：**

In [11]:
class FeedForwardNetwork(nn.Module):
    def __init__(self, d_model, d_ff, dropout=0.0):
        super(FeedForwardNetwork, self).__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        x = self.linear1(x)
        x = F.relu(x)
        x = self.dropout(x)
        x = self.linear2(x)
        return x

**编码器层（单个）：**

In [12]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.0):
        super(EncoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads, dropout) #多头自注意力机制
        self.ffn = FeedForwardNetwork(d_model, d_ff, dropout) #FFN
        self.norm1 = nn.LayerNorm(d_model) 
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    
    def forward(self, x, mask):
        attn_out = self.self_attn(x, x, x, mask) #自注意力机制
        x = self.norm1(x + self.dropout(attn_out)) #残差连接Residual和归一化操作

        ffn_out = self.ffn(x) #FFN操作
        x = self.norm2(x + self.dropout(ffn_out)) #残差连接Residual和归一化操作
        return x


**解码器层（单个）：**

In [13]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.0):
        super(DecoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads, dropout)
        self.enc_dec_attn = MultiHeadAttention(d_model, num_heads, dropout)
        self.ffn = FeedForwardNetwork(d_model, d_ff, dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, enc_out, src_mask, tgt_mask):
        self_attn_out = self.self_attn(x, x, x, tgt_mask)#自注意力机制
        x = self.norm1(x + self.dropout(self_attn_out))
        
        enc_dec_attn_out = self.enc_dec_attn(x, enc_out, enc_out, src_mask)#交叉注意力机制
        x = self.norm2(x + self.dropout(enc_dec_attn_out))

        ffn_out = self.ffn(x) #FFN
        x = self.norm3(x + self.dropout(ffn_out))
        return x

**编码器层和解码器层（整体）：**

In [14]:
class Encoder(nn.Module):
    def __init__(self, num_layers, d_model, num_heads, d_ff, vocab_size, dropout=0.0):
        super(Encoder, self).__init__()
        self.d_model = d_model
        self.embedding = TokenEmbedding(vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model)
        self.layers = nn.ModuleList([
            EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)
        ])
        self.dropout = nn.Dropout(dropout)

    def forward(self, src, src_mask):
        x = self.embedding(src)
        x = self.positional_encoding(x)
        x = self.dropout(x)
        for layer in self.layers:
            x = layer(x, src_mask)
        return x


class Decoder(nn.Module):
    def __init__(self, num_layers, d_model, num_heads, d_ff, vocab_size, dropout=0.0):
        super(Decoder, self).__init__()
        self.embedding = TokenEmbedding(vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model)
        self.layers = nn.ModuleList([
            DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)
        ])
        self.dropout = nn.Dropout(dropout)

    def forward(self, tgt, enc_out, src_mask, tgt_mask):
        x = self.embedding(tgt)
        x = self.positional_encoding(x)
        x = self.dropout(x)
        for layer in self.layers:
            x = layer(x, enc_out, src_mask, tgt_mask)
        return x


**transformer模型：**

In [15]:
class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model=512, num_heads=8, num_layers=6, d_ff=2048, dropout=0.1):
        super(Transformer, self).__init__()
        self.encoder = Encoder(num_layers, d_model, num_heads, d_ff, src_vocab_size, dropout)
        self.decoder = Decoder(num_layers, d_model, num_heads, d_ff, tgt_vocab_size, dropout)
        self.output_projection = nn.Linear(d_model, tgt_vocab_size) #输出模块

    #填充掩码
    def make_src_mask(self, src, pad_idx):
        return (src != pad_idx).unsqueeze(-2).unsqueeze(-2)  # (batch, 1, 1, seq_len)

    #目标序列掩码（填充掩码+前瞻掩码）
    def make_tgt_mask(self, tgt, pad_idx):
        tgt_pad_mask = (tgt != pad_idx).unsqueeze(-2).unsqueeze(-2)  # (batch, 1, 1, seq_len)
        tgt_len = tgt.size(1)
        tgt_sub_mask = torch.tril(torch.ones(tgt_len, tgt_len)).bool().to(tgt.device)
        return tgt_pad_mask & tgt_sub_mask  # (batch, 1, seq_len, seq_len)

    def forward(self, src, tgt, src_pad_idx, tgt_pad_idx):
        src_mask = self.make_src_mask(src, src_pad_idx)#先创建掩码
        tgt_mask = self.make_tgt_mask(tgt, tgt_pad_idx)

        enc_out = self.encoder(src, src_mask)
        dec_out = self.decoder(tgt, enc_out, src_mask, tgt_mask)
        logits = self.output_projection(dec_out)  # (batch, seq, vocab_size 把 d_model 维的隐藏状态映射到 vocab_size 维的“未归一化对数概率”（即 logits）
        probs = F.log_softmax(logits, dim=-1)#输出每一个词的概率
        return probs

**接下来是示例运行：**

In [17]:
if __name__ == "__main__":
    # 模型参数
    SRC_VOCAB_SIZE = 1000 #源语言词表大小
    TGT_VOCAB_SIZE = 1000 #目标语言词表大小
    D_MODEL = 512
    NUM_HEADS = 8
    NUM_LAYERS = 6
    D_FF = 2048
    DROPOUT = 0.1
    PAD_IDX = 0 #padding token 的索引

    # 构建模型
    model = Transformer(SRC_VOCAB_SIZE, TGT_VOCAB_SIZE, D_MODEL, NUM_HEADS, NUM_LAYERS, D_FF, DROPOUT)

    # 示例输入
    batch_size = 2
    src_seq_len = 10
    tgt_seq_len = 12

    src = torch.randint(1, SRC_VOCAB_SIZE, (batch_size, src_seq_len))  # (batch, seq_len)
    tgt = torch.randint(1, TGT_VOCAB_SIZE, (batch_size, tgt_seq_len)) #生成一个形状为 [2, 10] 的张量，每个元素是 1 到 999 之间的随机整数（token ID），从 1 开始是为了避开 PAD_IDX=0

    # 前向传播
    output_probs = model(src, tgt, PAD_IDX, PAD_IDX)

    print("输入源序列形状:", src.shape)
    print("输入目标序列形状:", tgt.shape)
    print("输出概率分布形状:", output_probs.shape)  # 应为 (batch, tgt_seq_len, vocab_size)

输入源序列形状: torch.Size([2, 10])
输入目标序列形状: torch.Size([2, 12])
输出概率分布形状: torch.Size([2, 12, 1000])
