**第15章   自然语言处理：应用**

In [2]:
import email
import torch
import torch.nn as nn
def corr1d(x,k):
    w=k.shape[0]
    Y=torch.zeros((x.shape[0]-w+1))
    for i in range(Y.shape[0]):
        Y[i]=(x[i:i+w]*k).sum()
    return Y
def corr1d_multi_in(x,k):
    y=sum(corr1d(x,k) for x,k in zip(x,k))
    
    
    return y
x=torch.tensor([0,1,2,3,4,5,6,7])
k=torch.tensor([1,2,3])
print(corr1d(x,k))
x=torch.tensor([[0,1,2,3,4,5,6],[1,2,3,4,5,6,7],[2,3,4,5,6,7,8]])
k=torch.tensor([[1,2],[3,4],[-1,-3]])
print(corr1d_multi_in(x,k))
for x,k in zip(x,k):
    print("k",k)
def mlp(num_inputs,num_hiddens,flatten):
    net=[]
    net.append(nn.Dropout(0.2))
    net.append(nn.Linear(num_inputs,num_hiddens))
    net.append(nn.ReLU())
    if flatten:
        net.append(nn.Flatten(start_dim=1))
    net.append(nn.Dropout(0.2))
    net.append(nn.Linear(num_hiddens,num_inputs))
    net.append(nn.ReLU())
    if flatten:
        net.append(nn.Flatten(start_dim=1))
    return nn.Sequential(*net)
class Attend(nn.Module):
    def __init__(self,num_inputs,num_hiddens,**kwargs):
        super().__init__()
        self.f=mlp(num_inputs,num_hiddens,flatten=False)
    def forward(self,A,B):
        f_A=self.f(A)
        f_B=self.f(B)
        e=torch.bmm(f_A,f_B.permute(0,2,1))
        beta=torch.softmax(F.softmax(e,dim=-1),B)
        alpha=torch.bmm(F.softmax(e.permute(0,2,1),dim=-1),A)
        return alpha,beta
class Compare(nn.Module):
    def __init__(self,num_inputs,num_hiddens,**kwargs):
        super().__init__()
        self.g=mlp(num_inputs,num_hiddens,flatten=False)
    def forward(self,A,B,beta,alpha):
        V_A=self.g(torch.cat([A,beta],dim=2))
        V_B=self.g(torch.cat([B,alpha],dim=2))

        return V_A,V_B
class Aggregate(nn.Module):
    def __init__(self,num_inputs,num_hiddens,num_outputs,**kwargs):
        super().__init__()
        self.h=mlp(num_inputs,num_hiddens,flatten=True)
        self.linear=nn.Linear(num_hiddens,num_outputs)
    def forward(self,V_A,V_B):
        V_A=V_A.sum(dim=1)
        V_B=V_B.sum(dim=1)
        Y_hat=self.linear(self.h(torch.cat([V_A,V_B],dim=1)))
        return Y_hat

class DecomposableAttention(nn.Module):
    def __init__(self,vocab,embed_size,num_hiddens,num_inputs_attend=100,num_inputs_compare=200,
                 num_inputs_agg=400,**kwargs):
        super().__init__()
        self.embedding=nn.Embedding(len(vocab),embed_size)
        self.attend=Attend(num_inputs_attend,num_hiddens)
        self.compare=Compare(num_inputs_compare,num_hiddens)

        self.aggregate=Aggregate(num_inputs_agg,num_hiddens,num_outputs=3)
    def forward(self,x):
        premises,hypotheses=x
        A=self.embedding(premises)
        B=self.embedding(hypotheses)
        beta,alpha=self.attend(A,B)
        V_A,V_B=self.compare(A,B,beta,alpha)
        Y_hat=self.aggregate(V_A,V_B)
        return Y_hat
                
        


tensor([ 8., 14., 20., 26., 32., 38.])
tensor([ 2.,  8., 14., 20., 26., 32.])
k tensor([1, 2])
k tensor([3, 4])
k tensor([-1, -3])


In [None]:
import torch
import torch.nn as nn

class BiRNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim):
        super(BiRNN, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.rnn = nn.LSTM(embedding_dim, hidden_dim, bidirectional=True)
        self.fc = nn.Linear(hidden_dim * 2, output_dim)
    
    def forward(self, text): # text: [sent len, batch size]
        embedded = self.embedding(text)
        output, (hidden, cell) = self.rnn(embedded)
        hidden = torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1)

        return self.fc(hidden.squeeze(0)) # [batch size, output_dim]
# 15.1.1 词汇表
# 15.1.2 词嵌入
# 15.1.3 循环神经网络
# 15.1.4 双向循环神经网络
# 15.1.5 双向长短期记忆网络

# 15.2 机器翻译
# 15.2.1 翻译模型
# 15.2.2 注意力机制
# 15.2.3 Transformer
# 15.2.4 Transformer的编码器和解码器
# 15.2.5 Transformer的注意力机制
# 15.2.6 Transformer的掩蔽多头注意力
# 15.2.7 Transformer的编码器和解码器

# 15.3 文本分类
# 15.3.1 文本分类模型
# 15.3.2 文本分类的损失函数
# 15.3.3 文本分类的评估指标
# 15.3.4 文本分类的模型训练
# 15.3.5 文本分类的模型评估
# 15.3.6 文本分类的模型预测
# 15.3.7 文本分类的模型保存和加载
# 15.3.8 文本分类的模型微调
# 15.3.9 文本分类的模型部署
# 15.4 文本生成
# 15.4.1 文本生成模型

# 15.4.2 文本生成模型的训练
# 15.4.3 文本生成模型的评估
# 15.4.4 文本生成模型的预测
# 15.4.5 文本生成模型的保存和加载
# 15.4.6 文本生成模型的微调
# 15.4.7 文本生成模型的部署
# 15.5 文本摘要
# 15.5.1 文本摘要模型
# 15.5.2 文本摘要模型的训练
# 15.5.3 文本摘要模型的评估
# 15.5.4 文本摘要模型的预测
# 15.5.5 文本摘要模型的保存和加载

# 15.5.6 文本摘要模型的微调
# 15.5.7 文本摘要模型的部署
# 15.6 文本情感分析
# 15.6.1 文本情感分析模型
# 15.6.2 文本情感分析模型的训练
# 15.6.3 文本情感分析模型的评估
# 15.6.4 文本情感分析模型的预测
# 15.6.5 文本情感分析模型的保存和加载

# 15.6.6 文本情感分析模型的微调
# 15.6.7 文本情感分析模型的部署


以下是针对你列出的 **NLP核心主题** 整理的 **笔记、可运行代码、数据集推荐**，所有代码基于PyTorch实现（与你已有BiRNN代码风格一致），结构按你的列表顺序排列，方便循序渐进学习：


# 15.1 基础组件：词汇表、词嵌入、循环神经网络
## 15.1.1 词汇表（Vocabulary）
### 核心笔记
- **作用**：将文本中的离散词（如“苹果”“喜欢”）映射为连续整数ID（如1→“苹果”，2→“喜欢”），是文本转向量的第一步。
- **核心概念**：
  - 词表大小（vocab_size）：包含的唯一词数量（通常过滤低频词，避免词表过大）；
  - 特殊符号：`<PAD>`（填充，统一序列长度）、`<UNK>`（未登录词，处理词表外的词）、`<SOS>`/`<EOS>`（生成任务的开始/结束符）。

### 代码：构建词汇表
```python
from collections import Counter
import torch

class Vocabulary:
    def __init__(self, min_freq=1):
        # 初始化特殊符号（优先级最高，ID从0开始）
        self.special_tokens = ['<PAD>', '<UNK>', '<SOS>', '<EOS>']
        self.min_freq = min_freq  # 过滤低频词的阈值
        self.word2id = {}  # 词→ID映射
        self.id2word = {}  # ID→词映射
        self.vocab_size = 0  # 词表大小

    def build_vocab(self, texts):
        """
        从文本列表构建词表
        texts: list，每个元素是一个句子（已分词，如["我 喜欢 编程", "苹果 很好吃"]）
        """
        # 1. 统计词频
        word_counts = Counter()
        for text in texts:
            words = text.split()
            word_counts.update(words)
        
        # 2. 过滤低频词，加入词表
        # 先添加特殊符号
        for token in self.special_tokens:
            self.word2id[token] = len(self.word2id)
        # 再添加普通词（词频≥min_freq）
        for word, count in word_counts.items():
            if count >= self.min_freq:
                self.word2id[word] = len(self.word2id)
        
        # 3. 构建反向映射
        self.id2word = {id: word for word, id in self.word2id.items()}
        # 4. 更新词表大小
        self.vocab_size = len(self.word2id)

    def text_to_ids(self, text):
        """将单个句子（分词后）转为ID序列"""
        words = text.split()
        return [
            self.word2id.get(word, self.word2id['<UNK>'])  # 未登录词用<UNK>的ID
            for word in words
        ]

    def ids_to_text(self, ids):
        """将ID序列转回文本（跳过<PAD>）"""
        words = []
        for id in ids:
            word = self.id2word[id]
            if word == '<PAD>':
                continue
            words.append(word)
        return ' '.join(words)


# 示例：使用
if __name__ == "__main__":
    # 样本文本（已分词）
    texts = ["我 喜欢 编程", "编程 很 有趣", "我 喜欢 苹果", "苹果 是 水果"]
    # 构建词表（过滤词频<1的词，这里所有词都保留）
    vocab = Vocabulary(min_freq=1)
    vocab.build_vocab(texts)
    # 查看词表
    print("词表大小：", vocab.vocab_size)
    print("词→ID：", vocab.word2id)
    # 文本转ID
    text = "我 喜欢 水果"
    ids = vocab.text_to_ids(text)
    print(f"文本'{text}'→ID：", ids)
    # ID转文本
    print(f"ID {ids}→文本：", vocab.ids_to_text(ids))
```

### 数据集
- 无需额外数据集，可基于任意文本（如你之前的text8、自己的句子）构建词表。


## 15.1.2 词嵌入（Word Embedding）
### 核心笔记
- **作用**：将词汇表的整数ID映射为低维连续向量（如1→[0.2, 0.5, -0.1]），让向量蕴含语义（如“国王”-“男人”+“女人”≈“女王”）。
- **两种方式**：
  1. 随机初始化：嵌入层随模型一起训练（适合特定任务）；
  2. 预训练嵌入：加载Word2Vec/GloVe等预训练向量（适合数据量小时迁移学习）。

### 代码1：随机初始化嵌入层（与模型训练）
```python
import torch
import torch.nn as nn

# 1. 先构建词表（复用上面的Vocabulary类）
texts = ["我 喜欢 编程", "编程 很 有趣", "我 喜欢 苹果", "苹果 是 水果"]
vocab = Vocabulary(min_freq=1)
vocab.build_vocab(texts)

# 2. 定义嵌入层（随机初始化）
embedding_dim = 10  # 嵌入向量维度
embedding = nn.Embedding(
    num_embeddings=vocab.vocab_size,  # 词表大小
    embedding_dim=embedding_dim,      # 嵌入维度
    padding_idx=vocab.word2id['<PAD>']# <PAD>的ID，嵌入向量固定为0
)

# 3. 示例：ID序列→嵌入向量
text = "我 喜欢 编程"
ids = torch.tensor(vocab.text_to_ids(text)).unsqueeze(0)  # [batch_size=1, seq_len=3]
embedded = embedding(ids)  # [1, 3, 10]（批次大小×序列长度×嵌入维度）
print("嵌入向量形状：", embedded.shape)
print("'<PAD>'的嵌入向量：", embedding(torch.tensor([vocab.word2id['<PAD>']])))  # 全0
```

### 代码2：加载预训练GloVe嵌入
```python
import torch
import torch.nn as nn
import numpy as np

def load_glove_embedding(glove_path, vocab, embedding_dim):
    """
    加载GloVe预训练向量，构建嵌入层
    glove_path: GloVe文件路径（如glove.6B.100d.txt）
    vocab: 自定义Vocabulary对象
    embedding_dim: 嵌入维度（需与GloVe一致，如100）
    """
    # 1. 读取GloVe向量，存储为{词: 向量}
    glove_dict = {}
    with open(glove_path, 'r', encoding='utf-8') as f:
        for line in f:
            parts = line.strip().split()
            word = parts[0]
            vec = np.array(parts[1:], dtype=np.float32)
            if len(vec) == embedding_dim:
                glove_dict[word] = vec

    # 2. 初始化嵌入矩阵
    embedding_matrix = torch.randn(vocab.vocab_size, embedding_dim)  # 随机初始化未匹配的词
    embedding_matrix[vocab.word2id['<PAD>']] = torch.zeros(embedding_dim)  # <PAD>设为0

    # 3. 填充匹配的预训练向量
    matched_count = 0
    for word, id in vocab.word2id.items():
        if word in glove_dict:
            embedding_matrix[id] = torch.tensor(glove_dict[word])
            matched_count += 1

    print(f"匹配到{matched_count}/{vocab.vocab_size}个词的预训练向量")

    # 4. 构建嵌入层（冻结或微调）
    embedding = nn.Embedding(
        num_embeddings=vocab.vocab_size,
        embedding_dim=embedding_dim,
        padding_idx=vocab.word2id['<PAD>']
    )
    embedding.weight.data.copy_(embedding_matrix)
    # 可选：冻结嵌入层（不训练，适合数据少）
    # embedding.weight.requires_grad = False

    return embedding

# 示例：使用（需先下载GloVe）
if __name__ == "__main__":
    # 1. 构建词表
    texts = ["i love programming", "programming is fun", "i love apple", "apple is fruit"]
    vocab = Vocabulary(min_freq=1)
    vocab.build_vocab(texts)

    # 2. 加载GloVe（需自行下载：https://nlp.stanford.edu/projects/glove/）
    glove_path = "glove.6B.100d.txt"  # 100维GloVe
    embedding_dim = 100
    embedding = load_glove_embedding(glove_path, vocab, embedding_dim)

    # 3. 测试
    text = "i love programming"
    ids = torch.tensor(vocab.text_to_ids(text)).unsqueeze(0)
    embedded = embedding(ids)
    print("预训练嵌入向量形状：", embedded.shape)
```

### 数据集（预训练嵌入）
- GloVe：https://nlp.stanford.edu/projects/glove/（推荐`glove.6B.100d.txt`，适合英文任务）；
- Word2Vec：https://code.google.com/archive/p/word2vec/（Google预训练英文/中文向量）；
- 中文预训练嵌入：https://github.com/Embedding/Chinese-Word-Vectors（包含多种中文Word2Vec/GloVe）。


## 15.1.3 循环神经网络（RNN）
### 核心笔记
- **作用**：处理序列数据（如文本）时，通过“循环单元”保留历史信息（如读“我喜欢”时，记住“我”的信息，再处理“喜欢”）。
- **局限**：梯度消失/爆炸（无法处理长序列，如超过50个词的句子），后续被LSTM/GRU替代。
- **PyTorch接口**：`nn.RNN(input_size, hidden_size, num_layers, bidirectional)`，输入需为`[seq_len, batch_size, input_size]`（序列长度×批次大小×输入维度）。

### 代码：基础RNN（序列分类示例）
```python
import torch
import torch.nn as nn
import torch.optim as optim

# 1. 数据准备（简单情感分类：正面=1，负面=0）
texts = [
    "我 喜欢 编程", "编程 很 有趣", "苹果 很好吃",  # 正面
    "我 讨厌 下雨", "作业 很难", "考试 不及格"     # 负面
]
labels = torch.tensor([1, 1, 1, 0, 0, 0], dtype=torch.long)

# 2. 构建词表和ID序列
vocab = Vocabulary(min_freq=1)
vocab.build_vocab(texts)
# 文本转ID，并统一序列长度（用<PAD>填充到最长序列长度=3）
seq_len = 3
ids_list = []
for text in texts:
    ids = vocab.text_to_ids(text)
    # 填充或截断到seq_len
    if len(ids) < seq_len:
        ids += [vocab.word2id['<PAD>']] * (seq_len - len(ids))
    else:
        ids = ids[:seq_len]
    ids_list.append(ids)
ids = torch.tensor(ids_list).permute(1, 0)  # 转置为[seq_len=3, batch_size=6]


# 3. 定义RNN分类模型
class RNNClassifier(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=vocab.word2id['<PAD>'])
        self.rnn = nn.RNN(
            input_size=embedding_dim,  # 输入维度=嵌入维度
            hidden_size=hidden_dim,    # 隐藏层维度
            num_layers=1,              # 1层RNN
            bidirectional=False        # 单向RNN
        )
        self.fc = nn.Linear(hidden_dim, output_dim)  # 分类头

    def forward(self, text_ids):
        # text_ids: [seq_len, batch_size]
        embedded = self.embedding(text_ids)  # [seq_len, batch_size, embedding_dim]
        output, hidden = self.rnn(embedded)  # output: [seq_len, batch_size, hidden_dim]; hidden: [1, batch_size, hidden_dim]
        # 用最后一步的隐藏态做分类（RNN最后一步包含整个序列的信息）
        return self.fc(hidden.squeeze(0))  # hidden.squeeze(0): [batch_size, hidden_dim]


# 4. 训练模型
embedding_dim = 10
hidden_dim = 20
output_dim = 2  # 二分类（正面/负面）
model = RNNClassifier(vocab.vocab_size, embedding_dim, hidden_dim, output_dim)
criterion = nn.CrossEntropyLoss()  # 分类损失
optimizer = optim.Adam(model.parameters(), lr=1e-3)

model.train()
for epoch in range(50):
    optimizer.zero_grad()
    logits = model(ids)  # [batch_size=6, output_dim=2]
    loss = criterion(logits, labels)
    loss.backward()
    optimizer.step()
    # 计算准确率
    preds = torch.argmax(logits, dim=1)
    acc = (preds == labels).float().mean()
    if (epoch + 1) % 10 == 0:
        print(f"Epoch {epoch+1}, Loss: {loss.item():.4f}, Acc: {acc.item():.4f}")

# 5. 预测
model.eval()
test_text = "我 喜欢 苹果"
test_ids = vocab.text_to_ids(test_text)
test_ids = torch.tensor(test_ids).unsqueeze(1)  # [seq_len=3, batch_size=1]
with torch.no_grad():
    logits = model(test_ids)
    pred = torch.argmax(logits, dim=1).item()
print(f"测试文本：{test_text}，预测情感：{'正面' if pred == 1 else '负面'}")
```

### 数据集
- 简单任务：可自定义文本（如上面的情感句子）；
- 真实任务：IMDB情感数据集（英文，用于情感分类）、THUCNews（中文，用于文本分类）。


## 15.1.4 双向循环神经网络（BiRNN）
### 核心笔记
- **作用**：解决普通RNN“只看过去，不看未来”的问题，通过两个方向的RNN（正向：从左到右；反向：从右到左）捕捉双向上下文。
- **关键变化**：
  - 隐藏层维度：双向RNN的输出维度=2×hidden_dim（正向+反向）；
  - 隐藏态拼接：最后一步需将正向最后隐藏态和反向最后隐藏态拼接。

### 代码：BiRNN分类（复用你之前的代码，补充训练）
```python
import torch
import torch.nn as nn
import torch.optim as optim

# 1. 数据准备（同15.1.3）
texts = [
    "我 喜欢 编程", "编程 很 有趣", "苹果 很好吃",
    "我 讨厌 下雨", "作业 很难", "考试 不及格"
]
labels = torch.tensor([1, 1, 1, 0, 0, 0], dtype=torch.long)
vocab = Vocabulary(min_freq=1)
vocab.build_vocab(texts)
seq_len = 3
ids_list = []
for text in texts:
    ids = vocab.text_to_ids(text)
    ids = ids + [vocab.word2id['<PAD>']]*(seq_len-len(ids)) if len(ids)<seq_len else ids[:seq_len]
    ids_list.append(ids)
ids = torch.tensor(ids_list).permute(1, 0)  # [seq_len=3, batch_size=6]


# 2. 你的BiRNN代码（补充注释）
class BiRNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim):
        super(BiRNN, self).__init__()
        # 词嵌入层
        self.embedding = nn.Embedding(
            vocab_size, embedding_dim, 
            padding_idx=vocab.word2id['<PAD>']  # 填充符嵌入为0
        )
        # 双向LSTM（用LSTM替代RNN，解决梯度消失）
        self.rnn = nn.LSTM(
            input_size=embedding_dim,  # 输入维度=嵌入维度
            hidden_size=hidden_dim,    # 单向隐藏层维度
            bidirectional=True         # 双向：True
        )
        # 分类头：输入维度=2×hidden_dim（正向+反向隐藏态）
        self.fc = nn.Linear(hidden_dim * 2, output_dim)
    
    def forward(self, text):  # text: [sent len, batch size]
        # 1. 词嵌入：[sent len, batch size] → [sent len, batch size, embedding_dim]
        embedded = self.embedding(text)
        # 2. 双向LSTM：输出output=[sent len, batch size, 2×hidden_dim]；hidden=(正向隐藏态, 反向隐藏态)
        # hidden形状：(num_layers×num_directions, batch size, hidden_dim) → 这里num_layers=1，num_directions=2
        output, (hidden, cell) = self.rnn(embedded)
        # 3. 拼接正向最后隐藏态（hidden[-2,:,:]）和反向最后隐藏态（hidden[-1,:,:]）
        hidden = torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1)  # [batch size, 2×hidden_dim]
        # 4. 分类：[batch size, 2×hidden_dim] → [batch size, output_dim]
        return self.fc(hidden)  # 原代码的squeeze(0)多余，因为hidden已无num_layers维度


# 3. 训练与预测
embedding_dim = 10
hidden_dim = 20
output_dim = 2
model = BiRNN(vocab.vocab_size, embedding_dim, hidden_dim, output_dim)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

# 训练
model.train()
for epoch in range(50):
    optimizer.zero_grad()
    logits = model(ids)  # [6, 2]
    loss = criterion(logits, labels)
    loss.backward()
    optimizer.step()
    preds = torch.argmax(logits, dim=1)
    acc = (preds == labels).float().mean()
    if (epoch + 1) % 10 == 0:
        print(f"Epoch {epoch+1}, Loss: {loss.item():.4f}, Acc: {acc.item():.4f}")

# 预测
model.eval()
test_text = "作业 很难"
test_ids = vocab.text_to_ids(test_text)
test_ids = torch.tensor(test_ids).unsqueeze(1)  # [3, 1]
with torch.no_grad():
    logits = model(test_ids)
    pred = torch.argmax(logits, dim=1).item()
print(f"测试文本：{test_text}，预测情感：{'正面' if pred == 1 else '负面'}")
```


## 15.1.5 双向长短期记忆网络（BiLSTM）
### 核心笔记
- **作用**：在BiRNN基础上，用LSTM的“门控机制”（输入门、遗忘门、输出门）解决普通RNN的梯度消失问题，能处理更长序列。
- **与BiRNN的关系**：BiLSTM是BiRNN的“升级版”（将RNN单元替换为LSTM单元），代码上仅需将`nn.RNN`改为`nn.LSTM`（如你之前的BiRNN代码已用LSTM）。
- **关键结构**：
  - 细胞状态（cell state）：类似“传送带”，可长期保存信息；
  - 门控：控制信息的加入/遗忘/输出，避免梯度消失。

### 代码：BiLSTM序列标注（命名实体识别示例）
```python
import torch
import torch.nn as nn
import torch.optim as optim

# 1. 数据准备（命名实体识别：O=非实体，PER=人名，LOC=地点）
# 格式：(文本, 标签)，标签与词一一对应
data = [
    ("小明 在 北京 工作", ["PER", "O", "LOC", "O"]),
    ("小红 去 上海 旅游", ["PER", "O", "LOC", "O"]),
    ("小李 在 广州 学习", ["PER", "O", "LOC", "O"]),
    ("小张 住 在 深圳", ["PER", "O", "O", "LOC"])
]
# 标签映射：O→0，PER→1，LOC→2
label2id = {"O": 0, "PER": 1, "LOC": 2}
id2label = {0: "O", 1: "PER", 2: "LOC"}

# 2. 词表与ID转换
texts = [item[0] for item in data]
vocab = Vocabulary(min_freq=1)
vocab.build_vocab(texts)
seq_len = 4  # 最长序列长度
# 文本转ID
ids_list = []
for text in texts:
    ids = vocab.text_to_ids(text)
    ids = ids + [vocab.word2id['<PAD>']]*(seq_len-len(ids)) if len(ids)<seq_len else ids[:seq_len]
    ids_list.append(ids)
ids = torch.tensor(ids_list).permute(1, 0)  # [seq_len=4, batch_size=4]
# 标签转ID
labels_list = []
for _, labels in data:
    label_ids = [label2id[label] for label in labels]
    label_ids = label_ids + [0]*(seq_len-len(label_ids)) if len(label_ids)<seq_len else label_ids[:seq_len]
    labels_list.append(label_ids)
labels = torch.tensor(labels_list).permute(1, 0)  # [seq_len=4, batch_size=4]（序列标注需每个词对应一个标签）


# 3. 定义BiLSTM序列标注模型
class BiLSTM_NER(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_classes):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=vocab.word2id['<PAD>'])
        self.bilstm = nn.LSTM(
            embedding_dim, hidden_dim, 
            bidirectional=True, batch_first=False
        )
        # 序列标注：每个词输出一个分类结果，输入维度=2×hidden_dim
        self.fc = nn.Linear(hidden_dim * 2, num_classes)

    def forward(self, text_ids):
        # text_ids: [seq_len, batch_size]
        embedded = self.embedding(text_ids)  # [seq_len, batch_size, embedding_dim]
        output, (hidden, cell) = self.bilstm(embedded)  # output: [seq_len, batch_size, 2×hidden_dim]
        logits = self.fc(output)  # [seq_len, batch_size, num_classes]（每个词的分类 logit）
        return logits


# 4. 训练与预测
embedding_dim = 10
hidden_dim = 20
num_classes = 3  # O, PER, LOC
model = BiLSTM_NER(vocab.vocab_size, embedding_dim, hidden_dim, num_classes)
criterion = nn.CrossEntropyLoss(ignore_index=0)  # 忽略<PAD>对应的标签（这里标签0是O，实际应单独设<PAD>标签，简化用O）
optimizer = optim.Adam(model.parameters(), lr=1e-3)

# 训练
model.train()
for epoch in range(100):
    optimizer.zero_grad()
    logits = model(ids)  # [4, 4, 3]
    # 交叉熵损失要求输入为[seq_len×batch_size, num_classes]，标签为[seq_len×batch_size]
    loss = criterion(logits.reshape(-1, num_classes), labels.reshape(-1))
    loss.backward()
    optimizer.step()
    # 计算准确率（忽略<PAD>，这里简化不忽略）
    preds = torch.argmax(logits, dim=2)  # [4, 4]
    acc = (preds == labels).float().mean()
    if (epoch + 1) % 20 == 0:
        print(f"Epoch {epoch+1}, Loss: {loss.item():.4f}, Acc: {acc.item():.4f}")

# 预测
model.eval()
test_text = "小刚 在 杭州 出差"
test_ids = vocab.text_to_ids(test_text)
test_ids = torch.tensor(test_ids).unsqueeze(1)  # [4, 1]
with torch.no_grad():
    logits = model(test_ids)  # [4, 1, 3]
    preds = torch.argmax(logits, dim=2).squeeze(1).tolist()  # [4]
    pred_labels = [id2label[id] for id in preds]
print(f"测试文本：{test_text}")
print(f"预测标签：{pred_labels}")  # 期望：["PER", "O", "LOC", "O"]
```

### 数据集
- 命名实体识别：CoNLL2003（英文）、MSRA（中文）；
- 序列标注通用：HuggingFace `datasets`库可直接加载（`from datasets import load_dataset`）。


# 15.2 机器翻译
## 15.2.1 翻译模型（基础Seq2Seq）
### 核心笔记
- **作用**：将一种语言（源语言，如中文）的序列转为另一种语言（目标语言，如英文）的序列。
- **基础架构**：Seq2Seq（Encoder-Decoder）：
  - Encoder：将源语言序列编码为固定长度的“上下文向量”；
  - Decoder：基于上下文向量，逐词生成目标语言序列（自回归生成）。

### 代码：基础Seq2Seq翻译（中→英，简化版）
```python
import torch
import torch.nn as nn
import torch.optim as optim
import random

# 1. 数据准备（简单中译英）
# (中文文本, 英文文本)
data = [
    ("我 喜欢 编程", "i love programming"),
    ("苹果 很好吃", "apple is delicious"),
    ("编程 很 有趣", "programming is fun"),
    ("我 讨厌 下雨", "i hate rain")
]

# 2. 构建源语言（中文）和目标语言（英文）词表
# 中文词表
src_texts = [item[0] for item in data]
src_vocab = Vocabulary(min_freq=1)
src_vocab.build_vocab(src_texts)
# 英文词表（需包含<SOS>/<EOS>）
tgt_texts = [item[1] for item in data]
tgt_vocab = Vocabulary(min_freq=1)
tgt_vocab.build_vocab(tgt_texts)

# 3. 数据预处理（源语言ID、目标语言输入ID、目标语言标签ID）
seq_len = 3
src_ids_list = []  # 源语言ID：[batch_size, seq_len]
tgt_input_ids_list = []  # 目标语言输入（加<SOS>）：[batch_size, seq_len+1]
tgt_label_ids_list = []  # 目标语言标签（加<EOS>）：[batch_size, seq_len+1]

for src_text, tgt_text in data:
    # 源语言ID
    src_ids = src_vocab.text_to_ids(src_text)
    src_ids = src_ids + [src_vocab.word2id['<PAD>']]*(seq_len-len(src_ids)) if len(src_ids)<seq_len else src_ids[:seq_len]
    src_ids_list.append(src_ids)
    
    # 目标语言输入（开头加<SOS>）
    tgt_ids = tgt_vocab.text_to_ids(tgt_text)
    tgt_input_ids = [tgt_vocab.word2id['<SOS>']] + tgt_ids
    tgt_input_ids = tgt_input_ids + [tgt_vocab.word2id['<PAD>']]*(seq_len+1-len(tgt_input_ids)) if len(tgt_input_ids)<seq_len+1 else tgt_input_ids[:seq_len+1]
    tgt_input_ids_list.append(tgt_input_ids)
    
    # 目标语言标签（结尾加<EOS>）
    tgt_label_ids = tgt_ids + [tgt_vocab.word2id['<EOS>']]
    tgt_label_ids = tgt_label_ids + [tgt_vocab.word2id['<PAD>']]*(seq_len+1-len(tgt_label_ids)) if len(tgt_label_ids)<seq_len+1 else tgt_label_ids[:seq_len+1]
    tgt_label_ids_list.append(tgt_label_ids)

# 转为张量并调整维度（Seq2Seq输入需[seq_len, batch_size]）
src_ids = torch.tensor(src_ids_list).permute(1, 0)  # [3, 4]
tgt_input_ids = torch.tensor(tgt_input_ids_list).permute(1, 0)  # [4, 4]（seq_len+1=4）
tgt_label_ids = torch.tensor(tgt_label_ids_list).permute(1, 0)  # [4, 4]


# 4. 定义Seq2Seq模型（Encoder+Decoder）
class Encoder(nn.Module):
    def __init__(self, src_vocab_size, embedding_dim, hidden_dim):
        super().__init__()
        self.embedding = nn.Embedding(src_vocab_size, embedding_dim, padding_idx=src_vocab.word2id['<PAD>'])
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, bidirectional=False)
    
    def forward(self, src_ids):
        # src_ids: [src_seq_len, batch_size]
        embedded = self.embedding(src_ids)  # [src_seq_len, batch_size, embedding_dim]
        output, (hidden, cell) = self.lstm(embedded)  # hidden: [1, batch_size, hidden_dim]
        # Encoder输出上下文向量：hidden（最后一步隐藏态）
        return hidden, cell


class Decoder(nn.Module):
    def __init__(self, tgt_vocab_size, embedding_dim, hidden_dim):
        super().__init__()
        self.embedding = nn.Embedding(tgt_vocab_size, embedding_dim, padding_idx=tgt_vocab.word2id['<PAD>'])
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, bidirectional=False)
        self.fc = nn.Linear(hidden_dim, tgt_vocab_size)  # 预测下一个词
    
    def forward(self, tgt_input_id, hidden, cell):
        # tgt_input_id：当前步输入词的ID，形状[1, batch_size]（每次输入一个词）
        embedded = self.embedding(tgt_input_id)  # [1, batch_size, embedding_dim]
        output, (hidden, cell) = self.lstm(embedded, (hidden, cell))  # output: [1, batch_size, hidden_dim]
        logits = self.fc(output.squeeze(0))  # [batch_size, tgt_vocab_size]
        return logits, hidden, cell


class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
    
    def forward(self, src_ids, tgt_input_ids):
        # 1. Encoder编码源语言
        hidden, cell = self.encoder(src_ids)
        # 2. Decoder逐词生成目标语言
        tgt_seq_len = tgt_input_ids.shape[0]  # 目标序列长度（含<SOS>）
        batch_size = src_ids.shape[1]
        tgt_vocab_size = self.decoder.fc.out_features
        # 存储每一步的预测logits
        outputs = torch.zeros(tgt_seq_len, batch_size, tgt_vocab_size)
        
        # 第一步输入：<SOS>（tgt_input_ids[0]）
        input_id = tgt_input_ids[0].unsqueeze(0)  # [1, batch_size]
        for t in range(tgt_seq_len):
            logits, hidden, cell = self.decoder(input_id, hidden, cell)
            outputs[t] = logits
            # 下一步输入：当前步预测的词（贪心选择）
            input_id = torch.argmax(logits, dim=1).unsqueeze(0)  # [1, batch_size]
        return outputs


# 5. 训练模型
embedding_dim = 10
hidden_dim = 20
# 初始化模型
encoder = Encoder(src_vocab.vocab_size, embedding_dim, hidden_dim)
decoder = Decoder(tgt_vocab.vocab_size, embedding_dim, hidden_dim)
model = Seq2Seq(encoder, decoder)
# 损失与优化器
criterion = nn.CrossEntropyLoss(ignore_index=tgt_vocab.word2id['<PAD>'])
optimizer = optim.Adam(model.parameters(), lr=1e-3)

# 训练
model.train()
for epoch in range(200):
    optimizer.zero_grad()
    outputs = model(src_ids, tgt_input_ids)  # [4, 4, tgt_vocab_size]
    # 损失计算：outputs→[tgt_seq_len×batch_size, tgt_vocab_size]；labels→[tgt_seq_len×batch_size]
    loss = criterion(outputs.reshape(-1, tgt_vocab_size), tgt_label_ids.reshape(-1))
    loss.backward()
    optimizer.step()
    # 计算准确率
    preds = torch.argmax(outputs, dim=2)  # [4, 4]
    acc = (preds == tgt_label_ids).float().mean()
    if (epoch + 1) % 40 == 0:
        print(f"Epoch {epoch+1}, Loss: {loss.item():.4f}, Acc: {acc.item():.4f}")


# 6. 预测（翻译）
def translate(model, src_text, src_vocab, tgt_vocab, max_len=10):
    model.eval()
    # 源文本转ID
    src_ids = src_vocab.text_to_ids(src_text)
    src_ids = torch.tensor(src_ids).unsqueeze(1)  # [src_seq_len, 1]（batch_size=1）
    # Encoder编码
    hidden, cell = model.encoder(src_ids)
    # Decoder生成
    translated_ids = []
    input_id = torch.tensor([[tgt_vocab.word2id['<SOS>']]])  # [1, 1]（<SOS>）
    
    for _ in range(max_len):
        logits, hidden, cell = model.decoder(input_id, hidden, cell)
        pred_id = torch.argmax(logits, dim=1).item()  # 贪心选择
        # 若生成<EOS>，停止
        if pred_id == tgt_vocab.word2id['<EOS>']:
            break
        translated_ids.append(pred_id)
        # 下一步输入
        input_id = torch.tensor([[pred_id]])
    
    # ID转文本
    translated_text = tgt_vocab.ids_to_text(translated_ids)
    return translated_text


# 测试翻译
test_src_text = "我 喜欢 苹果"  # 未在训练集中
translated_text = translate(model, test_src_text, src_vocab, tgt_vocab)
print(f"中文：{test_src_text} → 英文：{translated_text}")  # 期望接近"i love apple"（视训练效果）
```


## 15.2.2 注意力机制（Attention）
### 核心笔记
- **问题**：基础Seq2Seq的Encoder输出固定长度上下文向量，无法处理长序列（信息丢失）。
- **作用**：Decoder生成每个词时，动态“关注”Encoder中与当前词相关的源语言词（如翻译“programming”时，关注源语言的“编程”）。
- **核心公式**：  
  注意力权重 \( \alpha_{t,i} = \frac{\exp(\text{score}(h_t, s_i))}{\sum_j \exp(\text{score}(h_t, s_j))} \)（\( h_t \)是Decoder当前隐藏态，\( s_i \)是Encoder第i步输出）；  
  上下文向量 \( c_t = \sum_i \alpha_{t,i} s_i \)（加权求和Encoder输出）。

### 代码：带注意力的Seq2Seq
```python
import torch
import torch.nn as nn
import torch.nn.functional as F

# 1. 复用15.2.1的数据和词表（src_vocab, tgt_vocab, src_ids, tgt_input_ids, tgt_label_ids）

# 2. 定义带注意力的Decoder
class AttentionDecoder(nn.Module):
    def __init__(self, tgt_vocab_size, embedding_dim, hidden_dim, encoder_output_dim):
        super().__init__()
        self.embedding = nn.Embedding(tgt_vocab_size, embedding_dim, padding_idx=tgt_vocab.word2id['<PAD>'])
        # LSTM输入：嵌入维度 + 注意力上下文向量维度（=encoder_output_dim）
        self.lstm = nn.LSTM(embedding_dim + encoder_output_dim, hidden_dim)
        # 注意力分数计算：Decoder隐藏态（hidden_dim） + Encoder输出（encoder_output_dim）→ 1维分数
        self.attention = nn.Linear(hidden_dim + encoder_output_dim, 1)
        self.fc = nn.Linear(hidden_dim, tgt_vocab_size)

    def forward(self, tgt_input_id, hidden, cell, encoder_outputs):
        # tgt_input_id: [1, batch_size]
        # hidden: [1, batch_size, hidden_dim]
        # encoder_outputs: [src_seq_len, batch_size, encoder_output_dim]
        
        # 1. 词嵌入
        embedded = self.embedding(tgt_input_id)  # [1, batch_size, embedding_dim]
        
        # 2. 计算注意力权重（对每个batch的每个源语言词）
        src_seq_len = encoder_outputs.shape[0]
        batch_size = encoder_outputs.shape[1]
        # 扩展Decoder隐藏态到[src_seq_len, batch_size, hidden_dim]（与Encoder输出对齐）
        hidden_expanded = hidden.repeat(src_seq_len, 1, 1)  # [src_seq_len, batch_size, hidden_dim]
        # 拼接隐藏态和Encoder输出：[src_seq_len, batch_size, hidden_dim + encoder_output_dim]
        concat = torch.cat((hidden_expanded, encoder_outputs), dim=2)
        # 计算注意力分数：[src_seq_len, batch_size, 1] → 压缩为[src_seq_len, batch_size]
        scores = self.attention(concat).squeeze(2)
        # 归一化权重：[src_seq_len, batch_size]（每行和为1）
        attn_weights = F.softmax(scores, dim=0)
        
        # 3. 计算上下文向量（加权求和Encoder输出）
        # attn_weights扩展为[src_seq_len, batch_size, 1]，与Encoder输出做元素乘
        attn_weights_expanded = attn_weights.unsqueeze(2)  # [src_seq_len, batch_size, 1]
        # 加权求和：[batch_size, encoder_output_dim]（对src_seq_len维度求和）
        context_vec = torch.sum(attn_weights_expanded * encoder_outputs, dim=0).unsqueeze(0)  # [1, batch_size, encoder_output_dim]
        
        # 4. 拼接嵌入向量和上下文向量（作为LSTM输入）
        lstm_input = torch.cat((embedded, context_vec), dim=2)  # [1, batch_size, embedding_dim + encoder_output_dim]
        
        # 5. LSTM和分类
        output, (hidden, cell) = self.lstm(lstm_input, (hidden, cell))  # [1, batch_size, hidden_dim]
        logits = self.fc(output.squeeze(0))  # [batch_size, tgt_vocab_size]
        
        return logits, hidden, cell, attn_weights


# 3. 定义带注意力的Seq2Seq
class Seq2SeqWithAttention(nn.Module):
    def __init__(self, encoder, decoder):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
    
    def forward(self, src_ids, tgt_input_ids):
        # 1. Encoder编码：输出encoder_outputs（所有步输出）和最后隐藏态/细胞态
        encoder_outputs, (hidden, cell) = self.encoder(src_ids)  # encoder_outputs: [src_seq_len, batch_size, hidden_dim]
        
        # 2. Decoder生成
        tgt_seq_len = tgt_input_ids.shape[0]
        batch_size = src_ids.shape[1]
        tgt_vocab_size = self.decoder.fc.out_features
        outputs = torch.zeros(tgt_seq_len, batch_size, tgt_vocab_size)
        attn_weights_list = []  # 存储注意力权重，用于可视化
        
        input_id = tgt_input_ids[0].unsqueeze(0)
        for t in range(tgt_seq_len):
            logits, hidden, cell, attn_weights = self.decoder(input_id, hidden, cell, encoder_outputs)
            outputs[t] = logits
            attn_weights_list.append(attn_weights)
            input_id = torch.argmax(logits, dim=1).unsqueeze(0)
        
        return outputs, attn_weights_list


# 4. 重新定义Encoder（输出所有步的输出，而非仅最后隐藏态）
class AttentionEncoder(nn.Module):
    def __init__(self, src_vocab_size, embedding_dim, hidden_dim):
        super().__init__()
        self.embedding = nn.Embedding(src_vocab_size, embedding_dim, padding_idx=src_vocab.word2id['<PAD>'])
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, bidirectional=False)
    
    def forward(self, src_ids):
        embedded = self.embedding(src_ids)  # [src_seq_len, batch_size, embedding_dim]
        encoder_outputs, (hidden, cell) = self.lstm(embedded)  # encoder_outputs: [src_seq_len, batch_size, hidden_dim]
        return encoder_outputs, hidden, cell  # 输出所有步的encoder_outputs


# 5. 训练与预测（复用15.2.1的训练逻辑，仅替换模型）
embedding_dim = 10
hidden_dim = 20
encoder = AttentionEncoder(src_vocab.vocab_size, embedding_dim, hidden_dim)
decoder = AttentionDecoder(tgt_vocab.vocab_size, embedding_dim, hidden_dim, hidden_dim)
model = Seq2SeqWithAttention(encoder, decoder)
criterion = nn.CrossEntropyLoss(ignore_index=tgt_vocab.word2id['<PAD>'])
optimizer = optim.Adam(model.parameters(), lr=1e-3)

# 训练
model.train()
for epoch in range(200):
    optimizer.zero_grad()
    outputs, _ = model(src_ids, tgt_input_ids)
    loss = criterion(outputs.reshape(-1, tgt_vocab_size), tgt_label_ids.reshape(-1))
    loss.backward()
    optimizer.step()
    preds = torch.argmax(outputs, dim=2)
    acc = (preds == tgt_label_ids).float().mean()
    if (epoch + 1) % 40 == 0:
        print(f"Epoch {epoch+1}, Loss: {loss.item():.4f}, Acc: {acc.item():.4f}")

# 预测（修改translate函数，支持注意力）
def translate_with_attention(model, src_text, src_vocab, tgt_vocab, max_len=10):
    model.eval()
    src_ids = src_vocab.text_to_ids(src_text)
    src_ids = torch.tensor(src_ids).unsqueeze(1)  # [src_seq_len, 1]
    encoder_outputs, hidden, cell = model.encoder(src_ids)
    translated_ids = []
    attn_weights_list = []
    input_id = torch.tensor([[tgt_vocab.word2id['<SOS>']]])
    
    for _ in range(max_len):
        logits, hidden, cell, attn_weights = model.decoder(input_id, hidden, cell, encoder_outputs)
        pred_id = torch.argmax(logits, dim=1).item()
        if pred_id == tgt_vocab.word2id['<EOS>']:
            break
        translated_ids.append(pred_id)
        attn_weights_list.append(attn_weights.squeeze(1).detach().numpy())  # 存储注意力权重
        input_id = torch.tensor([[pred_id]])
    
    translated_text = tgt_vocab.ids_to_text(translated_ids)
    return translated_text, attn_weights_list

# 测试
test_src_text = "我 喜欢 苹果"
translated_text, attn_weights = translate_with_attention(model, test_src_text, src_vocab, tgt_vocab)
print(f"中文：{test_src_text} → 英文：{translated_text}")
print("注意力权重（每行对应目标词，每列对应源词）：")
for i, (word, weights) in enumerate(zip(translated_text.split(), attn_weights)):
    print(f"目标词'{word}'的注意力权重：{weights.round(3)}")  # 权重高的源词是关注重点
```


## 15.2.3~15.2.7 Transformer（编码器/解码器/注意力/掩蔽）
### 核心笔记
- **问题**：RNN/LSTM无法并行计算（序列依赖），Transformer用“自注意力”替代循环结构，实现并行，同时通过“多头注意力”捕捉多维度依赖。
- **核心架构**：
  - 编码器（N层）：多头自注意力 + 前馈网络；
  - 解码器（N层）：掩码多头自注意力（防止偷看未来词） + 编码器-解码器注意力 + 前馈网络；
  - 位置编码：弥补无循环结构的位置信息丢失。

### 代码：简化版Transformer（机器翻译）
```python
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

# 1. 位置编码（正弦余弦编码）
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        # 初始化位置编码矩阵：[max_len, d_model]
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)  # [max_len, 1]
        # 频率公式：10000^(-2i/d_model)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))  # [d_model/2]
        # 偶数位置用sin，奇数位置用cos
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        # 扩展为[1, max_len, d_model]，方便批次处理
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)  # 不参与训练的缓冲区

    def forward(self, x):
        # x: [batch_size, seq_len, d_model]
        x = x + self.pe[:, :x.size(1), :]  # 加位置编码
        return x


# 2. 多头注意力（Multi-Head Attention）
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, n_heads):
        super().__init__()
        assert d_model % n_heads == 0, "d_model必须能被n_heads整除"
        self.d_k = d_model // n_heads  # 每个头的维度
        self.n_heads = n_heads
        # Q/K/V线性变换层
        self.w_q = nn.Linear(d_model, d_model)
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model)
        # 输出线性变换层
        self.w_o = nn.Linear(d_model, d_model)

    def forward(self, q, k, v, mask=None):
        # q/k/v: [batch_size, seq_len, d_model]
        # mask: [batch_size, 1, seq_len]（用于掩盖PAD）或[batch_size, seq_len, seq_len]（用于掩码自注意力）
        batch_size = q.size(0)
        
        # 1. 线性变换并分多头：[batch_size, seq_len, d_model] → [batch_size, n_heads, seq_len, d_k]
        q = self.w_q(q).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2)
        k = self.w_k(k).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2)
        v = self.w_v(v).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2)
        
        # 2. 计算注意力分数：Q*K^T / sqrt(d_k) → [batch_size, n_heads, seq_len_q, seq_len_k]
        scores = torch.matmul(q, k.transpose(2, 3)) / math.sqrt(self.d_k)
        
        # 3. 应用掩码（将掩码位置设为-1e9，softmax后为0）
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)
        
        # 4. 计算注意力权重和上下文向量
        attn_weights = F.softmax(scores, dim=-1)  # [batch_size, n_heads, seq_len_q, seq_len_k]
        context = torch.matmul(attn_weights, v)  # [batch_size, n_heads, seq_len_q, d_k]
        
        # 5. 拼接多头结果并线性变换：[batch_size, seq_len_q, d_model]
        context = context.transpose(1, 2).contiguous().view(batch_size, -1, self.n_heads * self.d_k)
        output = self.w_o(context)
        
        return output, attn_weights


# 3. 前馈网络（Feed-Forward Network）
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff=2048):
        super().__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()

    def forward(self, x):
        # x: [batch_size, seq_len, d_model]
        return self.fc2(self.relu(self.fc1(x)))  # [batch_size, seq_len, d_model]


# 4. 编码器层（Encoder Layer）
class EncoderLayer(nn.Module):
    def __init__(self, d_model, n_heads, d_ff, dropout=0.1):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, n_heads)
        self.ff = FeedForward(d_model, d_ff)
        # 层归一化
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        # Dropout
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)

    def forward(self, x, mask):
        # 1. 多头自注意力 + 残差连接 + 层归一化
        attn_output, _ = self.self_attn(x, x, x, mask)  # 自注意力：Q=K=V
        x = self.norm1(x + self.dropout1(attn_output))
        # 2. 前馈网络 + 残差连接 + 层归一化
        ff_output = self.ff(x)
        x = self.norm2(x + self.dropout2(ff_output))
        return x


# 5. 解码器层（Decoder Layer）
class DecoderLayer(nn.Module):
    def __init__(self, d_model, n_heads, d_ff, dropout=0.1):
        super().__init__()
        self.masked_self_attn = MultiHeadAttention(d_model, n_heads)  # 掩码自注意力
        self.cross_attn = MultiHeadAttention(d_model, n_heads)        # 编码器-解码器注意力
        self.ff = FeedForward(d_model, d_ff)
        # 层归一化
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        # Dropout
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.dropout3 = nn.Dropout(dropout)

    def forward(self, x, enc_output, tgt_mask, src_mask):
        # 1. 掩码自注意力（防止偷看未来词）
        masked_attn_output, _ = self.masked_self_attn(x, x, x, tgt_mask)
        x = self.norm1(x + self.dropout1(masked_attn_output))
        # 2. 编码器-解码器注意力（关注Encoder输出）
        cross_attn_output, _ = self.cross_attn(x, enc_output, enc_output, src_mask)
        x = self.norm2(x + self.dropout2(cross_attn_output))
        # 3. 前馈网络
        ff_output = self.ff(x)
        x = self.norm3(x + self.dropout3(ff_output))
        return x


# 6. 完整Transformer
class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model=512, n_layers=6, 
                 n_heads=8, d_ff=2048, dropout=0.1, max_len=5000):
        super().__init__()
        # 嵌入层
        self.src_emb = nn.Embedding(src_vocab_size, d_model, padding_idx=src_vocab.word2id['<PAD>'])
        self.tgt_emb = nn.Embedding(tgt_vocab_size, d_model, padding_idx=tgt_vocab.word2id['<PAD>'])
        # 位置编码
        self.pos_enc = PositionalEncoding(d_model, max_len)
        # 编码器和解码器
        self.encoder = nn.ModuleList([EncoderLayer(d_model, n_heads, d_ff, dropout) for _ in range(n_layers)])
        self.decoder = nn.ModuleList([DecoderLayer(d_model, n_heads, d_ff, dropout) for _ in range(n_layers)])
        # 输出层（预测目标词）
        self.fc_out = nn.Linear(d_model, tgt_vocab_size)
        # Dropout
        self.dropout = nn.Dropout(dropout)
        # 模型参数初始化
        self.d_model = d_model

    def make_src_mask(self, src_ids):
        # 源语言掩码：掩盖<PAD>，形状[batch_size, 1, src_seq_len]
        src_mask = (src_ids != src_vocab.word2id['<PAD>']).unsqueeze(1)  # [batch_size, 1, src_seq_len]
        return src_mask

    def make_tgt_mask(self, tgt_ids):
        # 目标语言掩码：1. 掩盖<PAD>；2. 掩盖未来词（下三角矩阵）
        batch_size = tgt_ids.size(0)
        tgt_seq_len = tgt_ids.size(1)
        # 未来词掩码：[batch_size, tgt_seq_len, tgt_seq_len]，下三角为1，上三角为0
        nopeak_mask = (1 - torch.triu(torch.ones(1, tgt_seq_len, tgt_seq_len, device=tgt_ids.device), diagonal=1)).bool()
        # PAD掩码：[batch_size, 1, tgt_seq_len]
        pad_mask = (tgt_ids != tgt_vocab.word2id['<PAD>']).unsqueeze(1)
        # 合并掩码：两者都为1才保留
        tgt_mask = pad_mask & nopeak_mask  # [batch_size, tgt_seq_len, tgt_seq_len]
        return tgt_mask

    def forward(self, src_ids, tgt_ids):
        # src_ids: [batch_size, src_seq_len]
        # tgt_ids: [batch_size, tgt_seq_len]（目标语言输入，不含<EOS>）
        
        # 1. 嵌入 + 位置编码
        src_emb = self.dropout(self.pos_enc(self.src_emb(src_ids)))  # [batch_size, src_seq_len, d_model]
        tgt_emb = self.dropout(self.pos_enc(self.tgt_emb(tgt_ids)))  # [batch_size, tgt_seq_len, d_model]
        
        # 2. 生成掩码
        src_mask = self.make_src_mask(src_ids)
        tgt_mask = self.make_tgt_mask(tgt_ids)
        
        # 3. 编码器
        enc_output = src_emb
        for enc_layer in self.encoder:
            enc_output = enc_layer(enc_output, src_mask)  # [batch_size, src_seq_len, d_model]
        
        # 4. 解码器
        dec_output = tgt_emb
        for dec_layer in self.decoder:
            dec_output = dec_layer(dec_output, enc_output, tgt_mask, src_mask)  # [batch_size, tgt_seq_len, d_model]
        
        # 5. 输出层（预测下一个词）
        logits = self.fc_out(dec_output)  # [batch_size, tgt_seq_len, tgt_vocab_size]
        
        return logits


# 7. 训练与预测（复用15.2.1的数据，调整维度为[batch_size, seq_len]）
# 重新处理数据（将之前的[seq_len, batch_size]转为[batch_size, seq_len]）
src_ids = torch.tensor(src_ids_list)  # [4, 3]（batch_size=4, src_seq_len=3）
tgt_input_ids = torch.tensor(tgt_input_ids_list)  # [4, 4]（batch_size=4, tgt_seq_len=4）
tgt_label_ids = torch.tensor(tgt_label_ids_list)  # [4, 4]

# 初始化模型（简化参数：d_model=16, n_layers=2, n_heads=2）
model = Transformer(
    src_vocab_size=src_vocab.vocab_size,
    tgt_vocab_size=tgt_vocab.vocab_size,
    d_model=16,
    n_layers=2,
    n_heads=2,
    d_ff=64,
    dropout=0.1
)
# 初始化参数（Transformer需特殊初始化）
for p in model.parameters():
    if p.dim() > 1:
        nn.init.xavier_uniform_(p)

# 损失与优化器
criterion = nn.CrossEntropyLoss(ignore_index=tgt_vocab.word2id['<PAD>'])
optimizer = optim.Adam(model.parameters(), lr=1e-3, betas=(0.9, 0.98), eps=1e-9)

# 训练
model.train()
for epoch in range(300):
    optimizer.zero_grad()
    logits = model(src_ids, tgt_input_ids[:, :-1])  # tgt_input_ids[:, :-1]：去掉最后一列（避免与标签错位）
    # 标签：tgt_label_ids[:, 1:]（去掉<SOS>）
    loss = criterion(logits.reshape(-1, tgt_vocab.vocab_size), tgt_label_ids[:, 1:].reshape(-1))
    loss.backward()
    optimizer.step()
    # 准确率
    preds = torch.argmax(logits, dim=2)
    acc = (preds == tgt_label_ids[:, 1:]).float().mean()
    if (epoch + 1) % 50 == 0:
        print(f"Epoch {epoch+1}, Loss: {loss.item():.4f}, Acc: {acc.item():.4f}")

# 预测
def transformer_translate(model, src_text, src_vocab, tgt_vocab, max_len=10):
    model.eval()
    # 源文本转ID：[1, src_seq_len]（batch_size=1）
    src_ids = src_vocab.text_to_ids(src_text)
    src_ids = torch.tensor(src_ids).unsqueeze(0)
    # 初始化目标序列：[1, 1]（<SOS>）
    tgt_ids = torch.tensor([[tgt_vocab.word2id['<SOS>']]])
    
    for _ in range(max_len):
        # 预测
        logits = model(src_ids, tgt_ids)  # [1, tgt_seq_len, tgt_vocab_size]
        # 取最后一步的预测
        pred_id = torch.argmax(logits[:, -1, :], dim=1).item()
        # 若生成<EOS>，停止
        if pred_id == tgt_vocab.word2id['<EOS>']:
            break
        # 追加到目标序列
        tgt_ids = torch.cat((tgt_ids, torch.tensor([[pred_id]])), dim=1)
    
    # ID转文本（去掉<SOS>）
    translated_ids = tgt_ids.squeeze(0)[1:].tolist()
    translated_text = tgt_vocab.ids_to_text(translated_ids)
    return translated_text

# 测试
test_src_text = "我 喜欢 苹果"
translated_text = transformer_translate(model, test_src_text, src_vocab, tgt_vocab)
print(f"中文：{test_src_text} → 英文：{translated_text}")
```

### 数据集（机器翻译）
- 小规模：Multi30k（英德/英法，适合测试，HuggingFace可加载）；
- 中规模：IWSLT2017（多语言，如中英，句子较短）；
- 大规模：WMT2014（英法/英德，适合训练高性能模型）；
- 中文相关：OPUS-100（包含中英，https://www.opus.nlpl.eu/opus-100.php）。


# 15.3 文本分类（完整流程）
## 15.3.1~15.3.9 文本分类全流程（以IMDB情感分类为例）
### 核心笔记
- **任务**：将文本（如电影评论）分为预定义类别（如正面/负面）。
- **完整流程**：数据加载→预处理→模型构建→损失函数→训练→评估→预测→保存/加载→微调→部署。
- **常用模型**：CNN、BiLSTM、BERT（微调）。

### 代码：BERT微调文本分类（IMDB情感分类）
```python
from datasets import load_dataset
from transformers import BertTokenizer, BertForSequenceClassification, Trainer, TrainingArguments
import torch
import numpy as np
from sklearn.metrics import accuracy_score, f1_score

# 15.3.1 数据加载（IMDB情感分类：正面=1，负面=0）
dataset = load_dataset("imdb")  # 自动下载IMDB数据集
print("数据集结构：", dataset)  # train(25k) + test(25k)
print("样本示例：", dataset["train"][0])

# 15.3.2 数据预处理（Tokenizer）
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")  # 加载BERT分词器

def preprocess_function(examples):
    # 分词：max_length=128（截断/填充）
    return tokenizer(
        examples["text"],
        padding="max_length",
        truncation=True,
        max_length=128
    )

# 应用预处理
tokenized_datasets = dataset.map(preprocess_function, batched=True)
# 转换为PyTorch格式
tokenized_datasets.set_format("torch", columns=["input_ids", "attention_mask", "label"])
# 简化：用小数据集加速训练
small_train_dataset = tokenized_datasets["train"].shuffle(seed=42).select(range(1000))
small_test_dataset = tokenized_datasets["test"].shuffle(seed=42).select(range(1000))

# 15.3.3 模型构建（BERT分类模型）
model = BertForSequenceClassification.from_pretrained(
    "bert-base-uncased",
    num_labels=2  # 二分类（正面/负面）
)


In [None]:
import torch

"""
# 第15章 自然语言处理：应用

本章讨论了自然语言处理的多个任务，例如：
- 15.1 双向循环神经网络（BiRNN/双向LSTM）：用于模型编码
- 15.2 机器翻译及注意力机制
- 15.3 文本分类：包括模型定义、损失函数、评估、训练、预测、保存和加载
- 15.4 文本生成：模型训练、评估和部署
- 15.5 文本摘要：模型构建、训练、预测和微调
- 15.6 文本情感分析：建立情感分析模型并进行训练和评估

以下代码展示了文本分类的一个示例，使用前面定义的 BiRNN 模型进行前向传播，并构造了一个简单的语料库及词汇表。
"""

# 构造一个简单的语料库，及文本分类的标签（例如 1 为正面，0 为负面）
texts = [
    "this is good",
    "this is bad",
    "i love this",
    "i hate that"
]
labels = [1, 0, 1, 0]

# 15.1.1 词汇表：构建简单的单词到索引映射
word2idx = {"<pad>": 0}  # 用 0 表示 padding
for text in texts:
    for word in text.lower().split():
        if word not in word2idx:
            word2idx[word] = len(word2idx)

# 打印词汇表
print("Vocabulary:", word2idx)

# 将文本转换为索引列表
tokenized_texts = []
for text in texts:
    tokens = [word2idx[word] for word in text.lower().split()]
    tokenized_texts.append(tokens)

# 为了批量处理，需要 padding 每个序列到相同长度
max_len = max(len(t) for t in tokenized_texts)
padded_texts = [t + [word2idx["<pad>"]] * (max_len - len(t)) for t in tokenized_texts]


# 转换成 tensor，并调整形状为 [句子长度, batch size]
text_tensor = torch.tensor(padded_texts, dtype=torch.long).transpose(0, 1)
label_tensor = torch.tensor(labels, dtype=torch.long)

print("Text Tensor shape:", text_tensor.shape)
print("Label Tensor shape:", label_tensor.shape)

# 参数设置
vocab_size = len(word2idx)
embedding_dim = 10
hidden_dim = 16
output_dim = 2  # 假设有两个类别：正面和负面

# 实例化之前定义的 BiRNN 模型 (已在前面的单元中定义)
model = BiRNN(vocab_size, embedding_dim, hidden_dim, output_dim)

# 前向传播，获得预测结果
outputs = model(text_tensor)
print("Model outputs:", outputs)

# （后续可继续完成训练过程、损失计算、以及模型评估等任务）