先导入需要的库函数

In [22]:
import math
import copy
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Optional, List, Tuple
from collections import Counter
import matplotlib.pyplot as plt

此处,用于验证CUDA是否可用

In [23]:
# 1. 验证包能否 import
try:
    import torch
    print("✅ torch 导入成功，版本：", torch.__version__)
except ImportError as e:
    raise RuntimeError("❌ 未安装 torch") from e

# 2. 验证 CUDA 是否编译进 PyTorch
print("CUDA 是否被编译进 PyTorch：", torch.cuda.is_available())

# 3. 验证 GPU 设备能否被 PyTorch 识别 & 创建张量
if torch.cuda.is_available():
    gpu_id = torch.cuda.current_device()
    print("当前可见 GPU 编号：", gpu_id)
    print("GPU 名称：", torch.cuda.get_device_name(gpu_id))
    # 在 GPU 上随机生成 2×2 张量
    x = torch.randn(2, 2, device=f"cuda:{gpu_id}")
    print("张量 x 的设备：", x.device, "\n张量值：\n", x)
else:
    print("⚠️  PyTorch 未检测到 GPU，已自动退回 CPU")
    x = torch.randn(2, 2)
    print("张量 x 的设备：", x.device)

✅ torch 导入成功，版本： 2.4.1+cpu
CUDA 是否被编译进 PyTorch： False
⚠️  PyTorch 未检测到 GPU，已自动退回 CPU
张量 x 的设备： cpu


1.Embeding

In [24]:
# ---------------------------
# 1. 定义原始英文句子变量（保留所有翻译结果）
# ---------------------------
english_sentences: List[str] = [
    "Early morning sunlight filters through the window and falls on the desk.",
    "Xiao Ming carries his schoolbag and goes to school happily.",
    "Mom is carefully making breakfast in the kitchen.",
    "The flowers in the park are blooming extraordinarily brightly.",
    "The little dog is running around cheerfully on the grass.",
    "Dad takes me to the library to read books on weekends.",
    "The stars at night hang in the sky like countless diamonds.",
    "The teacher patiently explains difficult problems to the students.",
    "My younger sister likes to hold her teddy bear while sleeping.",
    "Autumn has arrived, and the leaves on the trees are slowly turning yellow.",
    "The spring birds are very lively and fly everywhere."
]

# ---------------------------
# 2. 文本预处理与分词（基础分词，适配Embedding输入）
# ---------------------------
def preprocess_and_tokenize(sentences: List[str]) -> List[List[str]]:
    """预处理：小写、去除标点、分词"""
    tokenized_sentences = []
    for sent in sentences:
        # 小写转换+去除标点
        cleaned = sent.lower().replace(".", "").replace(",", "").replace(";", "").replace("!", "").replace("?", "")
        # 空格分词（基础分词，复杂场景可替换为NLTK/Spacy分词器）
        tokens = cleaned.split()
        tokenized_sentences.append(tokens)
    return tokenized_sentences

# 执行分词，得到token列表（变量保留）
tokenized_data: List[List[str]] = preprocess_and_tokenize(english_sentences)
print("分词结果：", tokenized_data)
# ---------------------------
# 3. 构建词汇表（变量保留，支持OOV和特殊token）
# ---------------------------
def build_vocab(tokenized_data: List[List[str]], min_freq: int = 1) -> Tuple[dict, dict]:
    """构建词→索引、索引→词的映射"""
    # 统计词频
    word_counter = Counter([token for sent in tokenized_data for token in sent])
    # 过滤低频词，加入特殊token（PAD:填充, UNK:未登录词）
    vocab = {
        "<PAD>": 0,
        "<UNK>": 1
    }
    # 词频≥min_freq的词加入词汇表
    for word, freq in word_counter.items():
        if freq >= min_freq:
            vocab[word] = len(vocab)
    # 反向词汇表（索引→词）
    inv_vocab = {idx: word for word, idx in vocab.items()}
    return vocab, inv_vocab

# 构建词汇表（变量保留）
vocab: dict = build_vocab(tokenized_data)[0]  # 词→索引
print("\n词->索引：", vocab)
inv_vocab: dict = build_vocab(tokenized_data)[1]  # 索引→词
print("\n索引->词：", inv_vocab)
vocab_size: int = len(vocab)  # 词汇表大小（变量保留）

# ---------------------------
# 4. 整数编码（将token转换为索引，变量保留）
# ---------------------------
def encode_tokens(tokenized_data: List[List[str]], vocab: dict) -> List[torch.Tensor]:
    """将句子token转换为整数索引序列"""
    encoded_sentences = []
    for tokens in tokenized_data:
        # 未登录词映射为<UNK>（索引1）
        encoded = [vocab.get(token, vocab["<UNK>"]) for token in tokens]
        encoded_sentences.append(torch.tensor(encoded, dtype=torch.long))
    return encoded_sentences

# 执行编码（变量保留）
encoded_data: List[torch.Tensor] = encode_tokens(tokenized_data, vocab)
print("\n编码结果:", encoded_data)
# ---------------------------
# 5. 序列填充（统一长度，变量保留）
# ---------------------------
def pad_sequences(encoded_data: List[torch.Tensor], pad_idx: int = 0) -> torch.Tensor:
    """填充序列到最大长度，返回(batch_size, max_seq_len)"""
    max_seq_len = max([len(seq) for seq in encoded_data])
    padded_batch = torch.full((len(encoded_data), max_seq_len), pad_idx, dtype=torch.long)
    for i, seq in enumerate(encoded_data):
        padded_batch[i, :len(seq)] = seq
    return padded_batch

# 执行填充（变量保留，shape: [171, max_seq_len]）
padded_input: torch.Tensor = pad_sequences(encoded_data, vocab["<PAD>"])
print("\n为保证等长，对句子进行统一填充:\n", padded_input)
# ---------------------------
# 6. Embedding层定义与嵌入操作（核心步骤，变量保留）
# ---------------------------
# 超参数（变量保留，可调整）
english_embedding_dim: int = 64  # Embedding向量维度（与Transformer的d_model对应）

# 定义Embedding层（变量保留，可训练）
embedding_layer: nn.Embedding = nn.Embedding(
    num_embeddings=vocab_size,  # 词汇表大小
    embedding_dim=english_embedding_dim,  # 嵌入维度
    padding_idx=vocab["<PAD>"]  # PAD token的嵌入设为0且不训练
)

# 执行Embedding操作（变量保留最终嵌入结果）
english_embedded_output: torch.Tensor = embedding_layer(padded_input)

# ---------------------------
# 关键变量输出（验证结果）
# ---------------------------
print(f"总结：")
print(f"词汇表大小: {vocab_size}")
print(f"填充后输入形状: {padded_input.shape}")  # (句子数量, 最大序列长度)
print(f"Embedding输出形状: {english_embedded_output.shape}")  # (句子数量, 最大序列长度, 嵌入维度)
print(f"示例：'sunlight'的索引: {vocab.get('sunlight', vocab['<UNK>'])}")
# 取出第一个句子前 2 个 token 的索引，再反查词汇表
first3_idx = padded_input[0, :2].tolist()          # [idx1, idx2, idx3]
first3_words = [inv_vocab[i] for i in first3_idx]  # [word1, word2, word3]

print(f"示例：第一个句子前 2 个 token 对应的词: {first3_words}")
print(f"示例：第一个句子的嵌入向量前 2 个 token:\n{english_embedded_output[0, :2, :10]}")


分词结果： [['early', 'morning', 'sunlight', 'filters', 'through', 'the', 'window', 'and', 'falls', 'on', 'the', 'desk'], ['xiao', 'ming', 'carries', 'his', 'schoolbag', 'and', 'goes', 'to', 'school', 'happily'], ['mom', 'is', 'carefully', 'making', 'breakfast', 'in', 'the', 'kitchen'], ['the', 'flowers', 'in', 'the', 'park', 'are', 'blooming', 'extraordinarily', 'brightly'], ['the', 'little', 'dog', 'is', 'running', 'around', 'cheerfully', 'on', 'the', 'grass'], ['dad', 'takes', 'me', 'to', 'the', 'library', 'to', 'read', 'books', 'on', 'weekends'], ['the', 'stars', 'at', 'night', 'hang', 'in', 'the', 'sky', 'like', 'countless', 'diamonds'], ['the', 'teacher', 'patiently', 'explains', 'difficult', 'problems', 'to', 'the', 'students'], ['my', 'younger', 'sister', 'likes', 'to', 'hold', 'her', 'teddy', 'bear', 'while', 'sleeping'], ['autumn', 'has', 'arrived', 'and', 'the', 'leaves', 'on', 'the', 'trees', 'are', 'slowly', 'turning', 'yellow'], ['the', 'spring', 'birds', 'are', 'very', 'livel

In [25]:
# ---------------------------
# 1. 定义原始中文分词句子变量（保留所有句子）
# ---------------------------
chinese_tokenized_sentences: List[str] = [
    "清晨 阳光 透过 窗户 洒 在 书桌 上",
    "小明 背着 书包 高高兴兴 地 去 学校",
    "妈妈 在 厨房 里 认真 地 做 早餐",
    "公园里 的 花儿 开 得 格外 鲜艳",
    "小狗 欢快 地 在 草地上 跑 来 跑 去",
    "爸爸 周末 带 我 去 图书馆 看书",
    "夜晚 的 星星 像 无数 颗 钻石 挂 在 天空",
    "老师 耐心 地 给 同学们 讲解 难题",
    "妹妹 喜欢 抱着 她 的 玩具 熊 睡觉",
    "秋天 到了 ， 树上 的 叶子 慢慢 变黄",
    "春天 的 小鸟 很 活泼 ， 到处 飞"
]

# 转换为token列表（按空格拆分，变量保留）
chinese_tokens_list: List[List[str]] = [sent.split() for sent in chinese_tokenized_sentences]
print("分词结果：", chinese_tokens_list)

# ---------------------------
# 2. 构建中文词汇表（变量保留，支持OOV和特殊token）
# ---------------------------
def build_chinese_vocab(tokenized_data: List[List[str]], min_freq: int = 1) -> Tuple[dict, dict]:
    """构建中文词→索引、索引→词的映射"""
    word_counter = Counter([token for sent in tokenized_data for token in sent])
    # 过滤标点，加入特殊token（PAD:填充, UNK:未登录词）
    vocab = {
        "<PAD>": 0,
        "<UNK>": 1
    }
    for word, freq in word_counter.items():
        if freq >= min_freq and word not in [",", "，"]:  # 过滤标点
            vocab[word] = len(vocab)
    inv_vocab = {idx: word for word, idx in vocab.items()}
    return vocab, inv_vocab

# 构建词汇表（变量保留）
chinese_vocab: dict = build_chinese_vocab(chinese_tokens_list)[0]  # 词→索引
print("\n词->索引：", chinese_vocab)
chinese_inv_vocab: dict = build_chinese_vocab(chinese_tokens_list)[1]  # 索引→词
print("\n索引->词：", chinese_inv_vocab)
chinese_vocab_size: int = len(chinese_vocab)  # 词汇表大小（变量保留）

# ---------------------------
# 3. 整数编码（将token转换为索引，变量保留）
# ---------------------------
def encode_chinese_tokens(tokenized_data: List[List[str]], vocab: dict) -> List[torch.Tensor]:
    """中文token转换为整数索引序列"""
    encoded = []
    for tokens in tokenized_data:
        seq = [vocab.get(token, vocab["<UNK>"]) for token in tokens]  # OOV映射为UNK
        encoded.append(torch.tensor(seq, dtype=torch.long))
    return encoded

# 执行编码（变量保留）
chinese_encoded_data: List[torch.Tensor] = encode_chinese_tokens(chinese_tokens_list, chinese_vocab)
print("\n编码结果:", chinese_encoded_data)

# ---------------------------
# 4. 序列填充（统一长度，变量保留）
# ---------------------------
def pad_chinese_sequences(encoded_data: List[torch.Tensor], pad_idx: int = 0) -> torch.Tensor:
    """填充序列到最大长度，返回(batch_size, max_seq_len)"""
    max_seq_len = max([len(seq) for seq in encoded_data])
    padded_batch = torch.full((len(encoded_data), max_seq_len), pad_idx, dtype=torch.long)
    for i, seq in enumerate(encoded_data):
        padded_batch[i, :len(seq)] = seq
    return padded_batch

# 执行填充（变量保留）
chinese_padded_input: torch.Tensor = pad_chinese_sequences(chinese_encoded_data, chinese_vocab["<PAD>"])
print("\n为保证等长，对句子进行统一填充:\n", chinese_padded_input)

# ---------------------------
# 5. 中文Embedding层定义与嵌入操作（核心步骤，变量保留）
# ---------------------------
# 超参数（变量保留，可调整）
chinese_embedding_dim: int = 64  # 嵌入维度（与Transformer的d_model对应）

# 定义Embedding层（变量保留，可训练）
chinese_embedding_layer: nn.Embedding = nn.Embedding(
    num_embeddings=chinese_vocab_size,
    embedding_dim=chinese_embedding_dim,
    padding_idx=chinese_vocab["<PAD>"]  # PAD token的嵌入设为0且不训练
)

# 执行Embedding操作（变量保留最终嵌入结果）
chinese_embedded_output: torch.Tensor = chinese_embedding_layer(chinese_padded_input)

# ---------------------------
# 关键变量输出（验证结果，与英文格式一致）
# ---------------------------
print(f"\n总结：")
print(f"词汇表大小: {chinese_vocab_size}")
print(f"填充后输入形状: {chinese_padded_input.shape}")  # (句子数量, 最大序列长度)
print(f"Embedding输出形状: {chinese_embedded_output.shape}")  # (句子数量, 最大序列长度, 嵌入维度)
print(f"示例：'阳光'的索引: {chinese_vocab.get('阳光', chinese_vocab['<UNK>'])}")
# 取出第一个句子前 2 个 token 的索引，再反查词汇表
first3_idx = chinese_padded_input[0, :2].tolist()          # [idx1, idx2, idx3]
first3_words = [chinese_inv_vocab[i] for i in first3_idx]  # [word1, word2, word3]

print(f"示例：第一个句子前 2 个 token 对应的词: {first3_words}")
print(f"示例：第一个句子的嵌入向量前 2 个 token:\n{chinese_embedded_output[0, :2, :10]}")


分词结果： [['清晨', '阳光', '透过', '窗户', '洒', '在', '书桌', '上'], ['小明', '背着', '书包', '高高兴兴', '地', '去', '学校'], ['妈妈', '在', '厨房', '里', '认真', '地', '做', '早餐'], ['公园里', '的', '花儿', '开', '得', '格外', '鲜艳'], ['小狗', '欢快', '地', '在', '草地上', '跑', '来', '跑', '去'], ['爸爸', '周末', '带', '我', '去', '图书馆', '看书'], ['夜晚', '的', '星星', '像', '无数', '颗', '钻石', '挂', '在', '天空'], ['老师', '耐心', '地', '给', '同学们', '讲解', '难题'], ['妹妹', '喜欢', '抱着', '她', '的', '玩具', '熊', '睡觉'], ['秋天', '到了', '，', '树上', '的', '叶子', '慢慢', '变黄'], ['春天', '的', '小鸟', '很', '活泼', '，', '到处', '飞']]

词->索引： {'<PAD>': 0, '<UNK>': 1, '清晨': 2, '阳光': 3, '透过': 4, '窗户': 5, '洒': 6, '在': 7, '书桌': 8, '上': 9, '小明': 10, '背着': 11, '书包': 12, '高高兴兴': 13, '地': 14, '去': 15, '学校': 16, '妈妈': 17, '厨房': 18, '里': 19, '认真': 20, '做': 21, '早餐': 22, '公园里': 23, '的': 24, '花儿': 25, '开': 26, '得': 27, '格外': 28, '鲜艳': 29, '小狗': 30, '欢快': 31, '草地上': 32, '跑': 33, '来': 34, '爸爸': 35, '周末': 36, '带': 37, '我': 38, '图书馆': 39, '看书': 40, '夜晚': 41, '星星': 42, '像': 43, '无数': 44, '颗': 45, '钻石': 46, '挂': 47, '天空': 4

2、位置编码

In [26]:
# ------------------------------------------------------------------
# 1. Positional Encoding（位置编码）
# ------------------------------------------------------------------
class PositionalEncoding(nn.Module):
    """
     sinusoid 位置编码，与《Attention is All You Need》原文一致
    输出 shape 与 word embedding 相同，直接相加
    """
    def __init__(self, d_model: int, max_len: int = 5000):
        super().__init__()
        # 预先计算好 [max_len, d_model] 的编码矩阵，节省训练时 CPU->GPU 拷贝
        pe = torch.zeros(max_len, d_model)
        # [0,1,2,...,max_len-1]^T -> [max_len,1]
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)

        # 分母项：10000^(2i/d_model) 取对数后一次性计算
        div_term = torch.exp(
            torch.arange(0, d_model, 2).float() *
            (-math.log(10000.0) / d_model)          # 负号保证指数递减
        )                                           # shape [d_model/2]

        # 偶数列 sin，奇数列 cos
        pe[:, 0::2] = torch.sin(position * div_term)   # [max_len, d_model/2]
        if d_model % 2 == 1:                           # 奇数维最后一列单独处理
            pe[:, 1::2] = torch.cos(position * div_term[:-1])
        else:
            pe[:, 1::2] = torch.cos(position * div_term)

        pe = pe.unsqueeze(0)                           # [1, max_len, d_model]
        self.register_buffer('pe', pe)                 # 不参与训练，持久化到 state_dict

    def forward(self, x):
        # x: [batch, seq_len, d_model]
        # 截取与当前序列等长的一段位置编码，并加到 x 上
        x = x + self.pe[:, :x.size(1), :].to(x.device)
        return x

In [27]:
# 复用之前定义的PositionalEncoding类（无需重新定义）
# 初始化位置编码层（与英文Embedding维度一致）
english_pos_encoder = PositionalEncoding(d_model=english_embedding_dim)

# 打印位置编码矩阵信息
print(f"英文位置编码矩阵形状：{english_pos_encoder.pe.shape}")  # (1, 5000, 64)
print(f"前5个位置的前10维编码：")
print(english_pos_encoder.pe[0, :5, :10])

# 对英文Embedding结果添加位置编码
english_embedded_with_pos = english_pos_encoder(english_embedded_output)

# 打印关键结果验证
print("="*50)
print("英文 Embedding + 位置编码 结果验证")
print("="*50)
print(f"原始Embedding形状：{english_embedded_output.shape}")
print(f"添加位置编码后形状：{english_embedded_with_pos.shape}")  # 形状保持一致
print(f"\n第一个句子前2个token的最终表示（前10维）：")
print("token1（early）：", english_embedded_with_pos[0, 0, :10])
print("token2（morning）：", english_embedded_with_pos[0, 1, :10])


英文位置编码矩阵形状：torch.Size([1, 5000, 64])
前5个位置的前10维编码：
tensor([[ 0.0000,  1.0000,  0.0000,  1.0000,  0.0000,  1.0000,  0.0000,  1.0000,
          0.0000,  1.0000],
        [ 0.8415,  0.5403,  0.6816,  0.7318,  0.5332,  0.8460,  0.4093,  0.9124,
          0.3110,  0.9504],
        [ 0.9093, -0.4161,  0.9975,  0.0709,  0.9021,  0.4315,  0.7469,  0.6649,
          0.5911,  0.8066],
        [ 0.1411, -0.9900,  0.7783, -0.6279,  0.9933, -0.1160,  0.9536,  0.3010,
          0.8126,  0.5828],
        [-0.7568, -0.6536,  0.1415, -0.9899,  0.7785, -0.6277,  0.9933, -0.1157,
          0.9536,  0.3011]])
英文 Embedding + 位置编码 结果验证
原始Embedding形状：torch.Size([11, 13, 64])
添加位置编码后形状：torch.Size([11, 13, 64])

第一个句子前2个token的最终表示（前10维）：
token1（early）： tensor([-1.9077,  0.1819,  1.3342, -0.6592, -2.0412,  0.5700, -1.1602,  1.3132,
         0.8132,  0.9044], grad_fn=<SliceBackward0>)
token2（morning）： tensor([ 1.0494,  1.5846,  2.7223,  1.1437,  0.5340,  0.9870,  2.2092,  1.1820,
        -0.6553, -0.6178], grad_

In [28]:
# 初始化位置编码层
pos_encoder = PositionalEncoding(d_model=chinese_embedding_dim)
# 打印位置编码矩阵信息
print(f"位置编码矩阵形状：{pos_encoder.pe.shape}")  # (1, 5000, 64)
print(f"前5个位置的前10维编码：")
print(pos_encoder.pe[0, :5, :10])
# 对Embedding结果添加位置编码
embedded_with_pos = pos_encoder(chinese_embedded_output)

# 打印关键结果验证
print("="*50)
print("Embedding + 位置编码 结果验证")
print("="*50)
print(f"原始Embedding形状：{chinese_embedded_output.shape}")
print(f"添加位置编码后形状：{embedded_with_pos.shape}")  # 形状保持一致
print(f"\n第一个句子前2个token的最终表示（前10维）：")
print("token1（清晨）：", embedded_with_pos[0, 0, :10])
print("token2（阳光）：", embedded_with_pos[0, 1, :10])


位置编码矩阵形状：torch.Size([1, 5000, 64])
前5个位置的前10维编码：
tensor([[ 0.0000,  1.0000,  0.0000,  1.0000,  0.0000,  1.0000,  0.0000,  1.0000,
          0.0000,  1.0000],
        [ 0.8415,  0.5403,  0.6816,  0.7318,  0.5332,  0.8460,  0.4093,  0.9124,
          0.3110,  0.9504],
        [ 0.9093, -0.4161,  0.9975,  0.0709,  0.9021,  0.4315,  0.7469,  0.6649,
          0.5911,  0.8066],
        [ 0.1411, -0.9900,  0.7783, -0.6279,  0.9933, -0.1160,  0.9536,  0.3010,
          0.8126,  0.5828],
        [-0.7568, -0.6536,  0.1415, -0.9899,  0.7785, -0.6277,  0.9933, -0.1157,
          0.9536,  0.3011]])
Embedding + 位置编码 结果验证
原始Embedding形状：torch.Size([11, 10, 64])
添加位置编码后形状：torch.Size([11, 10, 64])

第一个句子前2个token的最终表示（前10维）：
token1（清晨）： tensor([-0.2951,  0.2095,  0.0197,  1.3987,  0.4551,  1.2550,  1.2473,  1.1035,
        -1.2611,  1.2596], grad_fn=<SliceBackward0>)
token2（阳光）： tensor([ 0.7890,  1.8493,  0.5053,  0.0667,  1.7511, -0.6152, -0.3826, -0.2176,
        -0.4186,  0.4364], grad_fn=<SliceBack

3.单头注意力

In [29]:
# ------------------------------------------------------------------
# 2. Scaled Dot-Product Attention（单头）
# ------------------------------------------------------------------
def attention(query, key, value,
              mask: Optional[torch.Tensor] = None,
              dropout: Optional[nn.Module] = None):
    """
    单头缩放点积注意力
    Args:
        query/key/value: [batch, heads, seq_q/k/v, head_dim]
        mask: 任意可 broadcast 到 [..., seq_q, seq_k] 的 0/1 张量，0 表示遮盖
    Returns:
        out: [..., seq_q, head_dim]
        attn_weights: [..., seq_q, seq_k]
    """
    d_k = query.size(-1)                                 # head_dim
    # 1) 点积 + 缩放
    scores = torch.matmul(query, key.transpose(-2, -1))  # [..., seq_q, seq_k]
    scores = scores / math.sqrt(d_k)                     # 缩放防止梯度消失

    # 2) mask：把填充位置变成 -inf，softmax 后概率≈0
    if mask is not None:
        scores = scores.masked_fill(mask == 0, float('-inf'))

    # 3) softmax 得到注意力权重
    p_attn = F.softmax(scores, dim=-1)

    # 4) dropout（可选）
    if dropout is not None:
        p_attn = dropout(p_attn)

    # 5) 加权求和得到输出
    out = torch.matmul(p_attn, value)                    # [..., seq_q, head_dim]
    return out, p_attn

测试用例

In [30]:
d_model = 64          # 与 Embedding 维度一致
head_dim = 64         # 单头
assert d_model % head_dim == 0
n_heads = d_model // head_dim   # 1

# 一个线性层即可，因为单头
W_q = nn.Linear(d_model, d_model, bias=False)
print("W_q：\n", W_q)
W_k = nn.Linear(d_model, d_model, bias=False)
W_v = nn.Linear(d_model, d_model, bias=False)

# 初始化（可选）
nn.init.xavier_uniform_(W_q.weight)
nn.init.xavier_uniform_(W_k.weight)
nn.init.xavier_uniform_(W_v.weight)

# 假设 chinese_embedded_output 是你「Embedding + 位置编码」后的张量，形状 [11, 10, 64]
Q = W_q(chinese_embedded_output)   # [11, 10, 64]
K = W_k(chinese_embedded_output)   # [11, 10, 64]
V = W_v(chinese_embedded_output)   # [11, 10, 64]

# 为了复用你之前写的 attention()，需要把 heads 维显式拆出来
# 单头场景：直接 unsqueeze(1) 即可
Q = Q.unsqueeze(1)   # [11, 1, 10, 64]
K = K.unsqueeze(1)
V = V.unsqueeze(1)

# 如果序列长度一致且没有 padding，mask 可以不给
out, attn_weights = attention(Q, K, V)   # out: [11, 1, 10, 64]

# 把 heads 维合并回去
out = out.squeeze(1)                     # [11, 10, 64]

# 打印张量维度，用中文一看就懂
print(f"Q 的维度：{Q.shape}")            # Q 的维度：torch.Size([11, 1, 10, 64])
print(f"注意力输出维度：{out.shape}")     # 注意力输出维度：torch.Size([11, 10, 64])
print(f"注意力权重维度：{attn_weights.shape}")  # 注意力权重维度：torch.Size([11, 1, 10, 10])

W_q：
 Linear(in_features=64, out_features=64, bias=False)
Q 的维度：torch.Size([11, 1, 10, 64])
注意力输出维度：torch.Size([11, 10, 64])
注意力权重维度：torch.Size([11, 1, 10, 10])


4.多头注意力

In [31]:
# ------------------------------------------------------------------
# 3. Multi-Head Attention
# ------------------------------------------------------------------
class MultiHeadAttention(nn.Module):
    def __init__(self, heads: int, d_model: int, dropout: float = 0.1):
        super().__init__()
        assert d_model % heads == 0
        self.d_k = d_model // heads   # 单头维度
        self.heads = heads
        # 3 个线性映射：把 Q/K/V 从 d_model -> d_model，再拆成 heads 个头
        self.linear_q = nn.Linear(d_model, d_model)
        self.linear_k = nn.Linear(d_model, d_model)
        self.linear_v = nn.Linear(d_model, d_model)
        self.linear_out = nn.Linear(d_model, d_model)  # 最后 concat 后再投影
        self.attn_dropout = nn.Dropout(dropout)
        self.dropout = nn.Dropout(dropout)

    def forward(self, query, key, value, mask=None):
        # query/key/value: [batch, seq_len, d_model]
        batch_size = query.size(0)

        # 辅助函数：reshape + transpose 得到 [batch, heads, seq, d_k]
        def shape(x):
            return x.view(batch_size, -1, self.heads, self.d_k).transpose(1, 2)

        # 1) 线性投影 + 拆头
        q = shape(self.linear_q(query))
        k = shape(self.linear_k(key))
        v = shape(self.linear_v(value))

        # 2) mask 加一维适配 heads
        if mask is not None:
            mask = mask.unsqueeze(1)   # [batch, 1, 1, seq_k]

        # 3) 缩放点积注意力
        x, attn = attention(q, k, v, mask=mask, dropout=self.attn_dropout)
        # x: [batch, heads, seq_q, d_k]

        # 4) 合并 heads
        x = (x.transpose(1, 2)                       # [batch, seq_q, heads, d_k]
             .contiguous()
             .view(batch_size, -1, self.heads * self.d_k))  # [batch, seq_q, d_model]

        # 5) 输出投影
        return self.linear_out(x)

5.前馈神经网络

In [32]:
# ------------------------------------------------------------------
# 4. Position-wise Feed-Forward Network
# ------------------------------------------------------------------
class PositionwiseFeedForward(nn.Module):
    """
    两层全连接+ReLU+Dropout，同一序列位置共享参数
    """
    def __init__(self, d_model, d_ff, dropout=0.1):
        super().__init__()
        self.w1 = nn.Linear(d_model, d_ff)
        self.w2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # x: [batch, seq, d_model]
        return self.w2(self.dropout(F.relu(self.w1(x))))   # 先升维再降维

6.编码层

In [33]:
# ------------------------------------------------------------------
# 5. Encoder Layer
# ------------------------------------------------------------------
class EncoderLayer(nn.Module):
    """
    每个 EncoderLayer = 多头自注意力 + Add&Norm + FFN + Add&Norm
    """
    def __init__(self, d_model, heads, d_ff, dropout=0.1):
        super().__init__()
        self.self_attn = MultiHeadAttention(heads, d_model, dropout)
        self.feed_forward = PositionwiseFeedForward(d_model, d_ff, dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, src_mask=None):
        # 1) 自注意力
        attn_out = self.self_attn(x, x, x, mask=src_mask)
        x = self.norm1(x + self.dropout(attn_out))   # 残差+LayerNorm

        # 2) FFN
        ff_out = self.feed_forward(x)
        x = self.norm2(x + self.dropout(ff_out))
        return x

7.解码层

In [34]:
# ------------------------------------------------------------------
# 6. Decoder Layer
# ------------------------------------------------------------------
class DecoderLayer(nn.Module):
    """
    每个 DecoderLayer = Masked 自注意力 + 交叉注意力 + FFN
    每一步都残差+LayerNorm
    """
    def __init__(self, d_model, heads, d_ff, dropout=0.1):
        super().__init__()
        self.self_attn = MultiHeadAttention(heads, d_model, dropout)
        self.src_attn = MultiHeadAttention(heads, d_model, dropout)  # 交叉注意力
        self.feed_forward = PositionwiseFeedForward(d_model, d_ff, dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, memory, tgt_mask=None, memory_mask=None):
        # 1) Masked 自注意力（防止看到未来）
        attn1 = self.self_attn(x, x, x, mask=tgt_mask)
        x = self.norm1(x + self.dropout(attn1))

        # 2) 交叉注意力：Query=解码器，Key/Value=编码器输出
        attn2 = self.src_attn(x, memory, memory, mask=memory_mask)
        x = self.norm2(x + self.dropout(attn2))

        # 3) FFN
        ff = self.feed_forward(x)
        x = self.norm3(x + self.dropout(ff))
        return x

8.编码解码堆叠

In [35]:
# ------------------------------------------------------------------
# 7. Encoder / Decoder 堆叠
# ------------------------------------------------------------------
class Encoder(nn.Module):
    def __init__(self, layer, N):
        super().__init__()
        # 深拷贝 N 个 EncoderLayer
        self.layers = nn.ModuleList([copy.deepcopy(layer) for _ in range(N)])
        # 最后再加一次 LayerNorm（与原始论文一致）
        self.norm = nn.LayerNorm(layer.self_attn.linear_out.out_features)

    def forward(self, x, src_mask=None):
        for layer in self.layers:
            x = layer(x, src_mask=src_mask)
        return self.norm(x)

class Decoder(nn.Module):
    def __init__(self, layer, N):
        super().__init__()
        self.layers = nn.ModuleList([copy.deepcopy(layer) for _ in range(N)])
        self.norm = nn.LayerNorm(layer.self_attn.linear_out.out_features)

    def forward(self, x, memory, tgt_mask=None, memory_mask=None):
        for layer in self.layers:
            x = layer(x, memory, tgt_mask=tgt_mask, memory_mask=memory_mask)
        return self.norm(x)

9.完整的Transfomer

In [36]:
# ------------------------------------------------------------------
# 8. 完整 Transformer
# ------------------------------------------------------------------
class Transformer(nn.Module):
    def __init__(self, src_vocab, tgt_vocab,
                 d_model=512, N=6, heads=8, d_ff=2048,
                 dropout=0.1, max_len=5000):
        super().__init__()
        # 源/目标嵌入 + 位置编码
        self.src_embed = nn.Sequential(
            nn.Embedding(src_vocab, d_model),
            PositionalEncoding(d_model, max_len)
        )
        self.tgt_embed = nn.Sequential(
            nn.Embedding(tgt_vocab, d_model),
            PositionalEncoding(d_model, max_len)
        )

        # 编码器 & 解码器
        encoder_layer = EncoderLayer(d_model, heads, d_ff, dropout)
        decoder_layer = DecoderLayer(d_model, heads, d_ff, dropout)
        self.encoder = Encoder(encoder_layer, N)
        self.decoder = Decoder(decoder_layer, N)

        # 输出 softmax 前的线性层
        self.out = nn.Linear(d_model, tgt_vocab)

        # Xavier 均匀初始化（>1 维参数）
        for p in self.parameters():
            if p.dim() > 1:
                nn.init.xavier_uniform_(p)

    # 编码阶段：src -> memory
    def encode(self, src, src_mask=None):
        return self.encoder(self.src_embed(src), src_mask=src_mask)

    # 解码阶段：tgt + memory -> 隐状态
    def decode(self, tgt, memory, tgt_mask=None, memory_mask=None):
        return self.decoder(self.tgt_embed(tgt), memory,
                            tgt_mask=tgt_mask, memory_mask=memory_mask)

    # 完整前向：src+tgt -> logits
    def forward(self, src, tgt,
                src_mask=None, tgt_mask=None, memory_mask=None):
        memory = self.encode(src, src_mask=src_mask)
        dec = self.decode(tgt, memory,
                          tgt_mask=tgt_mask, memory_mask=memory_mask)
        return self.out(dec)          # [batch, tgt_len, tgt_vocab]

10.掩码

In [37]:
# ------------------------------------------------------------------
# 9. Mask 工具函数
# ------------------------------------------------------------------
def make_src_mask(src, pad_idx=0):
    """
    屏蔽 <PAD> 位置，返回 [batch, 1, src_len]
    在多头注意力里会自动广播到 [batch, heads, src_len, src_len]
    """
    mask = (src != pad_idx).unsqueeze(-2)   # [batch, 1, src_len]
    return mask

def make_tgt_mask(tgt, pad_idx=0):
    """
    结合：
      1) padding mask：屏蔽 <PAD>
      2) subsequent mask：屏蔽未来信息（上三角）
    返回 [batch, 1, tgt_len, tgt_len]
    """
    batch_size, tgt_len = tgt.size()
    # padding mask
    pad_mask = (tgt != pad_idx).unsqueeze(-2)        # [batch, 1, tgt_len]
    # subsequent mask（下三角为 1）
    subsequent = torch.triu(
        torch.ones((1, tgt_len, tgt_len), device=tgt.device),
        diagonal=1
    ).bool()                                           # 上三角 True
    subsequent_mask = ~subsequent                      # 下三角 True

    # 合并：既要非 PAD，也要非未来
    mask = pad_mask & subsequent_mask                  # 广播到 [batch, 1, tgt_len, tgt_len]
    return mask