<!--Copyright © ZOMI 适用于[License](https://github.com/Infrasys-AI/AIInfra)版权许可-->

# 实战 Transformer 机器翻译

author by: ZOMI

本次实验将把之前实现的 Transformer 模型应用于真实的机器翻译任务，使用 [Multi30k 数据集](https://github.com/multi30k/dataset)。该数据集包含英德双语平行语料，句子长度适中（多为日常对话或短文本），适合验证 Transformer 在中低资源翻译任务中的效果。

我们将引入训练过程的最佳实践，包括学习率调度、标签平滑、梯度裁剪等优化技巧，并实现贪婪搜索和束搜索算法进行推理解码，最后使用 BLEU 分数评估翻译质量。

## 1. 环境准备与数据加载

首先，导入必要的库并设置环境。

In [62]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchtext.data import Field, BucketIterator
import spacy
import numpy as np
import random
import math
import time
import os
from torchtext.datasets import Multi30k
from nltk.translate.bleu_score import corpus_bleu, SmoothingFunction

# 设置随机种子以确保结果可重现
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"使用设备: {device}")

使用设备: cuda


### 1.1 加载和预处理数据

使用 torchtext 库加载 Multi30k 数据集并进行预处理。

In [63]:
# 加载英语和德语的 spacy 模型用于分词
try:
    spacy_en = spacy.load('en_core_web_sm')
    spacy_de = spacy.load('de_core_news_sm')
except OSError:
    print("正在下载 spacy 模型...")
    try:
        import urllib.request
        urllib.request.urlopen("https://github.com", timeout=3)
        import spacy.cli as spacy_cli
        spacy_cli.download('en_core_web_sm')
        spacy_cli.download('de_core_news_sm')
        spacy_en = spacy.load('en_core_web_sm')
        spacy_de = spacy.load('de_core_news_sm')
    except Exception as e:
        print(f"spacy 模型下载失败，改用简易分词器: {e}")
        spacy_en = spacy.blank('en')
        spacy_de = spacy.blank('de')

def tokenize_en(text):
    return [token.text for token in spacy_en.tokenizer(text)]

def tokenize_de(text):
    return [token.text for token in spacy_de.tokenizer(text)]

# 定义 Field 对象处理文本
SRC = Field(tokenize=tokenize_de, 
            init_token='<sos>',
            eos_token='<eos>',
            lower=True,
            batch_first=True)

TRG = Field(tokenize=tokenize_en, 
            init_token='<sos>', 
            eos_token='<eos>', 
            lower=True,
            batch_first=True)

# 加载 Multi30k 数据集
print("加载 Multi30k 数据集...")
DATA_ROOT = '/workspace/AIInfra/06AlgoData/01Basic/data'
DATA_PATH = os.path.join(DATA_ROOT, 'multi30k')
train_data, valid_data, test_data = Multi30k.splits(
    exts=('.de', '.en'),
    fields=(SRC, TRG),
    path=DATA_PATH,
    train='train',
    validation='val',
    test='test_2016_flickr'
)

# 构建词汇表
print("构建词汇表...")
SRC.build_vocab(train_data, min_freq=2)
TRG.build_vocab(train_data, min_freq=2)

print(f"源语言词汇表大小: {len(SRC.vocab)}")
print(f"目标语言词汇表大小: {len(TRG.vocab)}")

# 创建数据迭代器
BATCH_SIZE = 128
train_iterator, valid_iterator, test_iterator = BucketIterator.splits(
    (train_data, valid_data, test_data),
    batch_size=BATCH_SIZE,
    device=device)

print("数据加载完成!")

加载 Multi30k 数据集...
构建词汇表...
源语言词汇表大小: 7853
目标语言词汇表大小: 5893
数据加载完成!


## 2. 模型构建与优化技术

### 2.1 构建 Transformer 模型

使用之前实现的 Transformer 模型，并针对机器翻译任务进行调整。

In [64]:
from transformer_components import Embedding, PositionalEncoding, MultiHeadAttention
from transformer_components import FeedForward, SublayerConnection, EncoderLayer
from transformer_components import DecoderLayer, Encoder, Decoder, Transformer, Generator
from copy import deepcopy

def make_model(src_vocab_size, trg_vocab_size, d_model=512, N=6, d_ff=2048, h=8, dropout=0.1):
    """
    构建完整的 Transformer 模型
    """
    attn = MultiHeadAttention(d_model, h, dropout)
    ff = FeedForward(d_model, d_ff, dropout)
    position = PositionalEncoding(d_model, dropout)
    
    model = Transformer(
        Encoder(EncoderLayer(d_model, attn, ff, dropout), N),
        Decoder(DecoderLayer(d_model, attn, attn, ff, dropout), N),
        nn.Sequential(Embedding(src_vocab_size, d_model), deepcopy(position)),
        nn.Sequential(Embedding(trg_vocab_size, d_model), deepcopy(position)),
        Generator(d_model, trg_vocab_size)
    )
    
    # Xavier 均匀初始化
    for p in model.parameters():
        if p.dim() > 1:
            nn.init.xavier_uniform_(p)
            
    return model

# 创建模型
INPUT_DIM = len(SRC.vocab)
OUTPUT_DIM = len(TRG.vocab)
D_MODEL = 256
N_LAYERS = 3
HID_DIM = 512
N_HEADS = 8
DROPOUT = 0.1

model = make_model(INPUT_DIM, OUTPUT_DIM, D_MODEL, N_LAYERS, HID_DIM, N_HEADS, DROPOUT).to(device)

print(f"模型参数量: {sum(p.numel() for p in model.parameters()):,}")

模型参数量: 8,198,661


### 2.2 标签平滑 (Label Smoothing)

标签平滑是一种正则化技术，通过软化硬标签来防止模型过度自信，提高泛化能力。

原理公式：
$$
y_{\text{smooth}} = (1 - \epsilon) \cdot y + \frac{\epsilon}{K}
$$

In [65]:
class LabelSmoothing(nn.Module):
    def __init__(self, smoothing=0.1, pad_idx=0):
        super().__init__()
        assert 0.0 <= smoothing < 1.0
        self.smoothing = smoothing
        self.pad_idx = pad_idx
        self.confidence = 1.0 - smoothing
        self.criterion = nn.KLDivLoss(reduction='sum')

    def forward(self, pred_log_probs, target):
        pred = pred_log_probs.view(-1, pred_log_probs.size(-1))
        tgt = target.view(-1)

        non_pad_mask = tgt != self.pad_idx
        valid_cnt = non_pad_mask.sum()
        if valid_cnt == 0:
            return torch.tensor(0.0, device=pred.device)

        smooth_dist = pred.detach().clone()
        smooth_dist.fill_(self.smoothing / (pred.size(1) - 2))
        smooth_dist.scatter_(1, tgt.unsqueeze(1), self.confidence)
        smooth_dist[:, self.pad_idx] = 0.0
        smooth_dist[~non_pad_mask] = 0.0

        loss = self.criterion(pred, smooth_dist)
        return loss / valid_cnt

### 2.3 学习率调度 (Learning Rate Scheduling)

Transformer 使用带 warmup 的学习率调度策略，先线性增加学习率，然后按步数的平方根反比衰减。

原理公式：
$$
\text{lrate} = d_{\text{model}}^{-0.5} \cdot \min(\text{step\_num}^{-0.5}, \text{step\_num} \cdot \text{warmup\_steps}^{-1.5})
$$

In [66]:
class TransformerOptimizer:
    def __init__(self, optimizer, d_model, warmup_steps=4000, factor=1.0):
        self.optimizer = optimizer
        self.d_model = d_model
        self.warmup_steps = warmup_steps
        self.factor = factor
        self.step_num = 0
        self.lr = 0
        
    def step(self):
        self.step_num += 1
        lr = self._get_lr()
        for p in self.optimizer.param_groups:
            p['lr'] = lr
        self.optimizer.step()
        
    def zero_grad(self):
        self.optimizer.zero_grad()
        
    def _get_lr(self):
        lr = self.factor * (self.d_model ** -0.5) * \
             min(self.step_num ** -0.5, self.step_num * self.warmup_steps ** -1.5)
        self.lr = lr
        return lr

# 创建优化器和学习率调度器
optimizer = TransformerOptimizer(
    optim.Adam(model.parameters(), betas=(0.9, 0.98), eps=1e-9),
    d_model=D_MODEL,
    warmup_steps=2000,
    factor=1.0
)

### 2.4 梯度裁剪 (Gradient Clipping)

梯度裁剪可以防止训练过程中梯度爆炸问题，提高训练稳定性。

In [67]:
def clip_gradients(model, max_norm=1.0):
    total_norm = 0
    for p in model.parameters():
        if p.grad is not None:
            param_norm = p.grad.data.norm(2)
            total_norm += param_norm.item() ** 2
    total_norm = total_norm ** (1. / 2)
    
    clip_coef = max_norm / (total_norm + 1e-6)
    if clip_coef < 1:
        for p in model.parameters():
            if p.grad is not None:
                p.grad.data.mul_(clip_coef)

## 3. 训练与验证

### 3.1 训练循环

In [68]:
def train_epoch(model, iterator, optimizer, criterion, clip):
    model.train()
    epoch_loss = 0
    
    for i, batch in enumerate(iterator):
        src = batch.src
        trg = batch.trg
        
        # 创建掩码
        src_mask = (src != SRC.vocab.stoi['<pad>']).unsqueeze(1).unsqueeze(1)
        
        tgt_in = trg[:, :-1]
        B, L = tgt_in.size()
        pad_mask = (tgt_in != TRG.vocab.stoi['<pad>']).unsqueeze(1).unsqueeze(1).repeat(1, 1, L, 1)
        look_ahead = torch.triu(torch.ones((1, 1, L, L), device=device), diagonal=1) == 0
        tgt_mask = pad_mask & look_ahead
        
        optimizer.zero_grad()
        
        output = model(src, tgt_in, src_mask, tgt_mask)
        output = model.generator(output)
        
        output_flat = output.contiguous().view(-1, output.size(-1))
        target_flat = trg[:, 1:].contiguous().view(-1)
        loss = criterion(output_flat, target_flat)
        
        loss.backward()
        clip_gradients(model, clip)
        optimizer.step()
        
        epoch_loss += loss.item()
        
        if i % 100 == 0:
            print(f"批次 {i}, 损失: {loss.item():.4f}, 学习率: {optimizer.lr:.6f}")
            
    return epoch_loss / len(iterator)

### 3.2 验证循环

In [69]:
def evaluate(model, iterator, criterion):
    model.eval()
    epoch_loss = 0
    
    with torch.no_grad():
        for i, batch in enumerate(iterator):
            src = batch.src
            trg = batch.trg
            
            src_mask = (src != SRC.vocab.stoi['<pad>']).unsqueeze(1).unsqueeze(1)
            tgt_in = trg[:, :-1]
            B, L = tgt_in.size()
            pad_mask = (tgt_in != TRG.vocab.stoi['<pad>']).unsqueeze(1).unsqueeze(1).repeat(1, 1, L, 1)
            look_ahead = torch.triu(torch.ones((1, 1, L, L), device=device), diagonal=1) == 0
            tgt_mask = pad_mask & look_ahead

            output = model(src, tgt_in, src_mask, tgt_mask)
            output = model.generator(output)
            
            output_flat = output.contiguous().view(-1, output.size(-1))
            target_flat = trg[:, 1:].contiguous().view(-1)
            loss = criterion(output_flat, target_flat)
            epoch_loss += loss.item()
            
    return epoch_loss / len(iterator)

### 3.3 训练模型

In [70]:
# 定义特殊 token 索引
PAD_IDX = TRG.vocab.stoi['<pad>']
SOS_IDX = TRG.vocab.stoi['<sos>']
EOS_IDX = TRG.vocab.stoi['<eos>']

criterion = LabelSmoothing(smoothing=0.1, pad_idx=PAD_IDX)

print(f"PAD_IDX: {PAD_IDX}")
print(f"SOS_IDX: {SOS_IDX}")
print(f"EOS_IDX: {EOS_IDX}")

PAD_IDX: 1
SOS_IDX: 2
EOS_IDX: 3


In [71]:
# 训练参数
N_EPOCHS = 50
CLIP = 1.0
best_valid_loss = float('inf')
patience = 10
patience_counter = 0

print("开始训练模型...")
print(f"总训练步数（约）: {len(train_iterator) * N_EPOCHS}")
print(f"Warmup 步数: 2000")
print("-" * 60)

for epoch in range(N_EPOCHS):
    start_time = time.time()
    
    train_loss = train_epoch(model, train_iterator, optimizer, criterion, CLIP)
    valid_loss = evaluate(model, valid_iterator, criterion)
    
    end_time = time.time()
    elapsed = end_time - start_time
    mins, secs = divmod(int(elapsed), 60)
    
    is_best = valid_loss < best_valid_loss
    if is_best:
        best_valid_loss = valid_loss
        patience_counter = 0
        torch.save(model.state_dict(), 'best-model.pt')
        tag = "★ 新最佳模型"
    else:
        patience_counter += 1
        tag = f"未改善 ({patience_counter}/{patience})"
    
    print(f'Epoch: {epoch+1:02d} | Time: {mins}m {secs}s | {tag}')
    print(f'\tTrain Loss: {train_loss:.4f} | Train PPL: {math.exp(min(train_loss, 20)):7.3f}')
    print(f'\t Val  Loss: {valid_loss:.4f} |  Val PPL: {math.exp(min(valid_loss, 20)):7.3f}')
    print(f'\t当前学习率: {optimizer.lr:.6f}')
    print("=" * 70)
    
    if patience_counter >= patience:
        print(f"早停触发：验证损失连续 {patience} 个 epoch 未改善")
        break
    
print("\n训练完成！")
print(f"最佳验证损失: {best_valid_loss:.4f}")
print(f"最佳验证困惑度: {math.exp(best_valid_loss):.3f}")

开始训练模型...
总训练步数（约）: 11350
Warmup 步数: 2000
------------------------------------------------------------
批次 0, 损失: 7.4962, 学习率: 0.000001
批次 100, 损失: 5.9606, 学习率: 0.000071
批次 200, 损失: 4.4937, 学习率: 0.000140
Epoch: 01 | Time: 0m 4s | ★ 新最佳模型
	Train Loss: 5.7926 | Train PPL: 327.873
	 Val  Loss: 4.1621 |  Val PPL:  64.205
	当前学习率: 0.000159
批次 0, 损失: 4.2669, 学习率: 0.000159
批次 100, 损失: 3.7943, 学习率: 0.000229
批次 200, 损失: 3.3209, 学习率: 0.000299
Epoch: 02 | Time: 0m 4s | ★ 新最佳模型
	Train Loss: 3.7030 | Train PPL:  40.570
	 Val  Loss: 3.1395 |  Val PPL:  23.091
	当前学习率: 0.000317
批次 0, 损失: 3.3088, 学习率: 0.000318
批次 100, 损失: 2.9615, 学习率: 0.000388
批次 200, 损失: 2.9238, 学习率: 0.000458
Epoch: 03 | Time: 0m 4s | ★ 新最佳模型
	Train Loss: 3.0572 | Train PPL:  21.267
	 Val  Loss: 2.7050 |  Val PPL:  14.954
	当前学习率: 0.000476
批次 0, 损失: 2.7909, 学习率: 0.000477
批次 100, 损失: 2.6827, 学习率: 0.000546
批次 200, 损失: 2.5862, 学习率: 0.000616
Epoch: 04 | Time: 0m 4s | ★ 新最佳模型
	Train Loss: 2.6765 | Train PPL:  14.534
	 Val  Loss: 2.3942 |  Val

随着训练的进行，训练损失和验证损失应该逐渐下降，表明模型在学习翻译任务。

## 4. 推理解码算法

### 4.1 贪婪搜索 (Greedy Search)

贪婪搜索在每一步选择概率最高的词作为当前输出，速度快但可能陷入局部最优。

In [72]:
def greedy_decode(model, src, src_mask, max_len, start_symbol):
    model.eval()
    
    memory = model.encode(src, src_mask)
    ys = torch.ones(1, 1).fill_(start_symbol).type_as(src.data)
    
    for i in range(max_len-1):
        L = ys.size(1)
        pad_mask = (ys != TRG.vocab.stoi['<pad>']).unsqueeze(1).unsqueeze(1).repeat(1, 1, L, 1)
        look_ahead = torch.triu(torch.ones((1, 1, L, L), device=device), diagonal=1) == 0
        trg_mask = pad_mask & look_ahead
        
        out = model.decode(memory, src_mask, ys, trg_mask)
        prob = model.generator(out[:, -1])
        _, next_word = torch.max(prob, dim=1)
        next_word = next_word.item()
        
        ys = torch.cat([ys, torch.ones(1, 1).type_as(src.data).fill_(next_word)], dim=1)
        
        if next_word == TRG.vocab.stoi['<eos>']:
            break
            
    return ys

### 4.2 束搜索 (Beam Search)

束搜索通过保留多个候选序列，能产生质量更高的翻译结果，但速度较慢。

束搜索在每步保留`beam_size`个候选序列，通过多路径探索避免局部最优。

In [73]:
def beam_search_decode(model, src, src_mask, max_len, start_symbol, beam_size, length_penalty=0.6):
    model.eval()
    import torch.nn.functional as F
    
    memory = model.encode(src, src_mask)
    beams = [([start_symbol], 0.0)]
    
    for i in range(max_len):
        all_candidates = []
        
        for seq, score in beams:
            if seq[-1] == TRG.vocab.stoi['<eos>']:
                all_candidates.append((seq, score))
                continue
                
            ys = torch.tensor(seq).unsqueeze(0).to(device)
            
            L = ys.size(1)
            pad_mask = (ys != TRG.vocab.stoi['<pad>']).unsqueeze(1).unsqueeze(1).repeat(1, 1, L, 1)
            look_ahead = torch.triu(torch.ones((1, 1, L, L), device=device), diagonal=1) == 0
            trg_mask = pad_mask & look_ahead
            
            with torch.no_grad():
                out = model.decode(memory, src_mask, ys, trg_mask)
                prob = model.generator(out[:, -1])
                log_prob = F.log_softmax(prob, dim=1)
                
            topk_prob, topk_idx = torch.topk(log_prob, beam_size, dim=1)
            
            for j in range(beam_size):
                candidate_seq = seq + [topk_idx[0, j].item()]
                candidate_score = score + topk_prob[0, j].item()
                all_candidates.append((candidate_seq, candidate_score))
                
        ordered = sorted(all_candidates, key=lambda x: x[1], reverse=True)
        beams = ordered[:beam_size]
        
        if all(seq[-1] == TRG.vocab.stoi['<eos>'] for seq, _ in beams):
            break
            
    best_seq, best_score = max(
        beams,
        key=lambda x: x[1] / (len(x[0]) ** length_penalty),
    )
            
    return best_seq

## 5. 模型评估

### 5.1 翻译函数

In [74]:
def translate_sentence(sentence, model, src_field, trg_field, max_len=50, beam_size=5):
    model.eval()

    # 文本预处理
    sentence = sentence.strip().lower()

    # 分词 + 边界标记 + 数值化
    tokenized = src_field.tokenize(sentence)
    tokenized = [src_field.init_token] + tokenized + [src_field.eos_token]
    numericalized = [src_field.vocab.stoi.get(token, src_field.vocab.stoi.get(src_field.unk_token, 0))
                     for token in tokenized]

    src_tensor = torch.LongTensor(numericalized).unsqueeze(0).to(device)
    src_mask = (src_tensor != src_field.vocab.stoi['<pad>']).unsqueeze(1).unsqueeze(1)

    # 调用束搜索解码
    trg_indexes = beam_search_decode(
        model,
        src_tensor,
        src_mask,
        max_len,
        trg_field.vocab.stoi[trg_field.init_token],
        beam_size
    )

    # 索引转词
    trg_tokens = [trg_field.vocab.itos[i] for i in trg_indexes]
    
    # 去掉边界符号
    if trg_tokens and trg_tokens[0] == trg_field.init_token:
        trg_tokens = trg_tokens[1:]
    if trg_tokens and trg_tokens[-1] == trg_field.eos_token:
        trg_tokens = trg_tokens[:-1]
    
    # 过滤特殊 token
    trg_tokens = [tok for tok in trg_tokens 
                  if tok not in ['<unk>', '<pad>', trg_field.init_token, trg_field.eos_token]]

    return ' '.join(trg_tokens)

### 5.2 BLEU 分数评估

BLEU (Bilingual Evaluation Understudy) 是机器翻译中最常用的自动评估指标，通过比较机器翻译输出与参考翻译的 n-gram 重叠度来评估质量。

In [75]:
def calculate_bleu(data, model, src_field, trg_field, max_len=50, beam_size=5):
    from tqdm import tqdm
    
    trgs = []
    pred_trgs = []
    
    for example in tqdm(data, desc="计算 BLEU 分数"):
        src = vars(example)['src']
        trg = vars(example)['trg']
        
        trg = [trg_field.init_token] + trg + [trg_field.eos_token]
        trgs.append([trg])
        
        pred_trg = translate_sentence(' '.join(src), model, src_field, trg_field, max_len, beam_size)
        pred_trgs.append(pred_trg.split())
    
    smooth = SmoothingFunction().method4
    bleu_score = corpus_bleu(trgs, pred_trgs, smoothing_function=smooth)
    
    return bleu_score

# 加载最佳模型
model.load_state_dict(torch.load('best-model.pt', weights_only=True))

# 计算测试集 BLEU 分数
bleu_score = calculate_bleu(test_data, model, SRC, TRG)
print(f'BLEU 分数: {bleu_score*100:.2f}')

计算 BLEU 分数: 100%|██████████| 1000/1000 [02:32<00:00,  6.57it/s]

BLEU 分数: 26.52





### 5.3 推理翻译

通过实际示例验证模型的翻译效果。

In [76]:
print("=" * 70)
print("翻译效果展示（图片描述类句子）")
print("=" * 70)

examples = [
    "Ein Mann sitzt auf einer Bank",
    "Eine Frau läuft durch den Park",
    "Zwei Kinder spielen im Garten",
    "Ein Hund springt über einen Zaun",
    "Menschen stehen vor einem Gebäude",
]

for i, sent in enumerate(examples, 1):
    translation = translate_sentence(sent, model, SRC, TRG, max_len=60, beam_size=5)
    print(f"\n{i}. 德语: {sent}")
    print(f"   英语: {translation}")

print("\n" + "=" * 70)

翻译效果展示（图片描述类句子）

1. 德语: Ein Mann sitzt auf einer Bank
   英语: a man sitting on a bench .

2. 德语: Eine Frau läuft durch den Park
   英语: a woman running through the park .

3. 德语: Zwei Kinder spielen im Garten
   英语: two children are playing in the yard .

4. 德语: Ein Hund springt über einen Zaun
   英语: a dog jumps over a fence .

5. 德语: Menschen stehen vor einem Gebäude
   英语: people standing in front of a building .



## 6. 总结

在本实验中，我们完成了以下内容：

1. 使用 torchtext 加载 Multi30k 数据集，并进行分词和词汇表构建
2. 构建 Transformer 模型并应用优化技术（标签平滑、学习率调度、梯度裁剪）
3. 实现贪婪搜索和束搜索解码算法
4. 使用 BLEU 分数评估翻译质量

本实验成功地将 Transformer 模型应用于英德翻译任务，验证了多种优化技术的有效性。这些技术对于训练高质量的大模型至关重要，也是深度学习实践中不可或缺的部分。

你可以尝试调整超参数（如模型大小、学习率调度参数、束搜索宽度等），观察它们对翻译质量的影响，进一步加深对机器翻译任务的理解。