In [2]:
%load_ext autoreload
%autoreload 2

import os
import zipfile

if not os.path.exists("datasets/"):
    with zipfile.ZipFile("Multi30K.zip", "r") as zip_ref:
        zip_ref.extractall()

In [3]:
from transformers import BertTokenizer
import torch
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence

# 1. 初始化GPT2 Tokenizer
en_tokenizer = BertTokenizer.from_pretrained("bert-base-multilingual-cased")
de_tokenizer = BertTokenizer.from_pretrained("bert-base-multilingual-cased")

# 添加特殊标记
# special_tokens = {"bos_token": "<sos>", "eos_token": "<eos>", "pad_token": "<pad>"}
# en_tokenizer.add_special_tokens(special_tokens)
# de_tokenizer.add_special_tokens(special_tokens)

# 2. 自定义数据集
class Multi30KDataset(Dataset):
    def __init__(self, en_path, de_path, en_tokenizer, de_tokenizer):
        self.en_sentences = self._read_file(en_path)
        self.de_sentences = self._read_file(de_path)
        self.en_tokenizer = en_tokenizer
        self.de_tokenizer = de_tokenizer
        assert len(self.en_sentences) == len(self.de_sentences), "数据不匹配！"

    def _read_file(self, path):
        with open(path, 'r', encoding='utf-8') as f:
            return [line.strip() for line in f]

    def __len__(self):
        return len(self.en_sentences)

    def __getitem__(self, idx):
        en_encoded = self.en_tokenizer(
            self.en_sentences[idx],
            return_tensors="pt",
            padding=False,
            truncation=True,
            add_special_tokens=True,
        )["input_ids"].squeeze(0)

        de_encoded = self.de_tokenizer(
            self.de_sentences[idx],
            return_tensors="pt",
            padding=False,
            truncation=True,
            add_special_tokens=True,
        )["input_ids"].squeeze(0)

        return en_encoded, de_encoded

# 3. 定义collate_fn
def collate_fn(batch):
    en_batch, de_batch = zip(*batch)
    en_batch = pad_sequence(en_batch, batch_first=True, padding_value=en_tokenizer.pad_token_id)
    de_batch = pad_sequence(de_batch, batch_first=True, padding_value=de_tokenizer.pad_token_id)
    return en_batch, de_batch

# 4. 初始化数据集和数据加载器
en_file_path = 'datasets/train/train.en'
de_file_path = 'datasets/train/train.de'

dataset = Multi30KDataset(en_file_path, de_file_path, en_tokenizer, de_tokenizer)
dataloader = DataLoader(dataset, batch_size=64, shuffle=True, collate_fn=collate_fn)

# 5. 测试数据加载器
for en_batch, de_batch in dataloader:
    print("English batch shape:", en_batch.shape)
    print("German batch shape:", de_batch.shape)
    print("English batch example (tokens):", en_batch[0])
    print("German batch example (tokens):", de_batch[0])
    print("Decoded English:", en_tokenizer.decode(en_batch[0], skip_special_tokens=False))
    print("Decoded German:", de_tokenizer.decode(de_batch[0], skip_special_tokens=False))
    break


English batch shape: torch.Size([64, 39])
German batch shape: torch.Size([64, 45])
English batch example (tokens): tensor([  101, 46361, 78125, 24324, 10301, 18084, 54156, 10108,   169, 11342,
        49274, 14025,   119,   102,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0])
German batch example (tokens): tensor([   101,  68570,  29896, 100449,  10166,  10745,  44271,  16757,  18599,
         49582,    119,    102,      0,      0,      0,      0,      0,      0,
             0,      0,      0,      0,      0,      0,      0,      0,      0,
             0,      0,      0,      0,      0,      0,      0,      0,      0,
             0,      0,      0,      0,      0,      0,      0,      0,      0])
Decoded English: [CLS] Spectators are taking pictures of a bicycle race. [SEP] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [P

In [4]:
from transformer import Transformer

# 1. Transformer 模型参数
vocab_size = len(en_tokenizer)
d_model = 512
num_heads = 8
num_layers = 2
d_ff = 2048
max_seq_len = 100
dropout = 0.1

# 2. 填充值索引
src_pad_idx = en_tokenizer.pad_token_id
tgt_pad_idx = de_tokenizer.pad_token_id

# 3. 初始化 Transformer
transformer = Transformer(vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_len, dropout)

# 打印验证
print(f"Transformer initialized.")
print(f"Source padding index: {src_pad_idx}, Target padding index: {tgt_pad_idx}")
print(f"Vocabulary size: {vocab_size}")



Transformer initialized.
Source padding index: 0, Target padding index: 0
Vocabulary size: 119547


In [5]:
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

# 1. 定义损失函数和优化器
criterion = nn.CrossEntropyLoss(ignore_index=tgt_pad_idx)  # 忽略填充标记的损失
optimizer = optim.AdamW(transformer.parameters(), lr=0.0001)
num_epochs = 5

# 2. 定义训练函数
def train_epoch(transformer, dataloader, criterion, optimizer, device):
    transformer.train()  # 切换到训练模式
    total_loss = 0

    progress_bar = tqdm(dataloader, desc="Training")
    for batch in progress_bar:
        src, tgt = batch
        src, tgt = src.to(device), tgt.to(device)
        
        # 修正后的生成掩码
        tgt_input = tgt[:, :-1]
        tgt_target = tgt[:, 1:]
        
        #----------------------------------------------------
        # 验证 `tgt_input` 和 `tgt_target`
        # print(f"\nDEBUG: tgt_input tokens: {tgt_input[0]}")  # 打印第一个样本的 `tgt_input`
        # print(f"DEBUG: tgt_target tokens: {tgt_target[0]}")  # 打印第一个样本的 `tgt_target`
        # print(f"Decoded tgt_input: {de_tokenizer.decode(tgt_input[0])}")
        # print(f"Decoded tgt_target: {de_tokenizer.decode(tgt_target[0])}")
        # # 确保两者长度一致
        # assert tgt_input.size(1) == tgt_target.size(1), "tgt_input 和 tgt_target 长度不一致！"
        #----------------------------------------------------
        
        # 构造掩码
        src_mask = transformer.make_src_mask(src, src_pad_idx)
        tgt_mask = transformer.make_trg_mask(tgt_input, tgt_pad_idx)  # 修正为 tgt_input

        # 前向传播
        output = transformer(src, tgt_input, src_mask, tgt_mask)
        
        # ----------------------------------------------------
        output_tokens = output.argmax(dim=-1)  # 获取输出的预测标记
        # print("DEBUG: Output tokens (predicted):", output_tokens[0])
        # print("Decoded output (predicted):", de_tokenizer.decode(output_tokens[0].tolist(), skip_special_tokens=False))

        # ----------------------------------------------------

        # 调整输出形状以计算损失
        output = output.reshape(-1, vocab_size)
        tgt_target = tgt_target.reshape(-1)

        # 计算损失
        loss = criterion(output, tgt_target)

        # 反向传播与优化
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        
        # 在tqdm进度条中显示当前batch的loss
        progress_bar.set_postfix(batch_loss=loss.item())

    return total_loss / len(dataloader)

# 3. 定义训练主循环
def train_model(transformer, dataloader, num_epochs, device, pretrain=None):
    if pretrain:
        transformer.load_state_dict(torch.load(pretrain, map_location=device))
        print(f"Loaded pre-trained model from {pretrain}")
    
    for epoch in range(num_epochs):
        print(f"Epoch {epoch + 1}/{num_epochs}")
        epoch_loss = train_epoch(transformer, dataloader, criterion, optimizer, device)
        print(f"Epoch Loss: {epoch_loss:.4f}")

# 4. 开始训练
transformer = transformer.to(device)
pretrain_path = 'transformer.pth'

In [None]:
train_model(transformer, dataloader, num_epochs, device, pretrain=pretrain_path)


In [6]:
torch.save(transformer.state_dict(), "transformer.pth")

---

如果你保留上面训练时的调试语句，你可以看到output句子带有奇怪的重复[SEP]符号

我猜想是BertTokenizer的问题，我将在下面通过后处理尽量减少预测时出现这种情况。

In [23]:
import torch

# 去除重复的 [SEP] 和多余的填充值 [PAD]
def clean_prediction(output, tokenizer):
    decoded = tokenizer.decode(output, skip_special_tokens=False)
    # 移除多余的填充值 [PAD]
    decoded = decoded.replace(tokenizer.pad_token, "")
    # 去除连续重复的 [SEP]
    decoded = decoded.replace(f"{tokenizer.sep_token} {tokenizer.sep_token}", tokenizer.sep_token)
    # 截断第一个 [SEP] 后的内容
    if tokenizer.sep_token in decoded:
        decoded = decoded.split(tokenizer.sep_token)[0] + tokenizer.sep_token
    return decoded.strip()

transformer.load_state_dict(torch.load("transformer.pth", map_location=torch.device('cpu'))) # 你应该修改这里的路径来对应你想要加载的权重
transformer.eval()  # 切换到评估模式

with torch.no_grad():
    for batch in dataloader:
        src, tgt = batch
        src, tgt = src[0:1].to(device), tgt[0:1].to(device)
        
        # 修正后的生成掩码
        tgt_input = tgt[:, :-1]
        tgt_target = tgt[:, 1:]

        # 构造掩码
        src_mask = transformer.make_src_mask(src, src_pad_idx)
        tgt_mask = transformer.make_trg_mask(tgt_input, tgt_pad_idx)  # 修正为 tgt_input

        # 前向传播
        output = transformer(src, tgt_input, src_mask, tgt_mask)

        # 获取预测结果
        output = output.argmax(dim=-1)

        # 显示原始句子和预测句子
        print("Source Sentence:", en_tokenizer.decode(src[0], skip_special_tokens=True))
        print("Target Sentence:", clean_prediction(tgt[0].tolist(), de_tokenizer))
        print("Predicted Sentence:", clean_prediction(output[0].tolist(), de_tokenizer))
        break

Source Sentence: A snowboarder is grinding down a long concrete rail.
Target Sentence: [CLS] Ein Snowboardfahrer gleitet an einer Betonstange hinunter. [SEP]
Predicted Sentence: Ein Snowboarder grleitet eine einem Betontue.unter. [SEP]


取得了还不错的效果

我们接下来试试自定义句子

In [24]:
# 自定义句子
custom_sentence = "A snowboarder is grinding down a long concrete rail."

# 转换自定义句子为模型输入格式
src = en_tokenizer.encode(custom_sentence, return_tensors="pt", padding=False, truncation=True).to(device)

# 初始化目标序列（以 [CLS] 作为起始标记）
tgt_input = torch.tensor([[de_tokenizer.cls_token_id]], device=device)

# 构造源序列掩码
src_mask = transformer.make_src_mask(src, src_pad_idx)

# 逐步生成目标序列
with torch.no_grad():
    max_length = 100
    for _ in range(max_length):
        # 构造目标序列掩码
        tgt_mask = transformer.make_trg_mask(tgt_input, tgt_pad_idx)

        # 前向传播
        output = transformer(src, tgt_input, src_mask, tgt_mask)

        # 获取最后一个位置的预测
        next_token = output[:, -1, :].argmax(dim=-1, keepdim=True)

        # 拼接预测结果到目标序列
        tgt_input = torch.cat([tgt_input, next_token], dim=-1)

        # 如果预测到 [SEP]，则停止生成
        if next_token.item() == de_tokenizer.sep_token_id:
            break

    # 解码预测结果
    predicted_sentence = clean_prediction(tgt_input[0].tolist(), de_tokenizer)

    # 输出结果
    print("Source Sentence:", custom_sentence)
    print("Predicted Sentence:", predicted_sentence)

Source Sentence: A snowboarder is grinding down a long concrete rail.
Predicted Sentence: [CLS] Ein Snowboarder fährt eine Betonwand hinunter. [SEP]


---

观察👀：

1. 预测的句子的首字母总是缺少，我猜测是GPT2分词器的缘故
2. 翻译的句子末尾总会有乱码一样的幻觉，可能也是分词器？

我将使用BERT分词器尝试

使用BERT之后，句子开头的首字母的缺少的情况解决了

但是末尾总会出现乱码，并且使用自定义source sentence的时候总会出现开头的 `##er` 这是GPT2分词器的特征，很奇怪



---

训练结语：

取得了初具人型的结果，我认为可以加大epochs。

我将会使用自制tokenizer代替BertTokenizer。