### 机器翻译网络总览

使用编码器-解码器架构的 seq to seq 模型图解如下：

![](md-img\机器翻译.jpg)

其中编码器使用 RNN 读取输入的 seq 的特征，解码器使用另一个 RNN 输出预测的 seq

编码器和解码器的 RNN 层数以及隐藏层的大小是一样的

<br>

使用编码器最后一个时间步输出的各层隐藏状态来初始化解码器 RNN 的隐藏状态（粉色箭头）

解码器中 RNN 的输入使用的是标签 seq 与编码器最后一个时间步最后一层隐藏状态合并输入（红色箭头）

<br>

此外还需要对特征 seq 以及标签 seq 做预处理，特征 seq 尾部添加 '\<eos>'，标签 seq 头部添加 '\<bos>'

且输入的 seq 只是将 token 转为 数值（索引），不需要使用独热编码

如下图所示：

![](md-img\机器翻译2.png)

<br>

### 代码实现

In [2]:
import torch
from torch import nn

编码器设计：

In [3]:
class Encoder(nn.Module):
    def __init__(self, vocab_size, embed_size, hiden_size, num_layers):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.rnn = nn.GRU(embed_size, hiden_size, num_layers)

    # 输入 x 形状：(batch_size, seq_len)
    def forward(self, x):
        x = self.embedding(x).permute(1, 0, 2)
        output, state = self.rnn(x)
        # output 形状：(seq_len, batch_size, hiden_size)
        # state 形状：(num_layers, batch_size, hiden_size)
        return output, state

<br>

解码器设计：

In [4]:
class Decoder(nn.Module):
    def __init__(self, vocab_size, embed_size, hiden_size, num_layers):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.rnn = nn.GRU(embed_size + hiden_size, hiden_size, num_layers)
        self.dense = nn.Linear(hiden_size, vocab_size)

    # 输入 y 形状：(batch_size, seq_len)
    # 输入 init_state 形状：(num_layers, batch_size, hiden_size)
    # 输入 context 形状为：(batch_size, hiden_size)，一般是编码器最后一个时间步、最后一层的隐藏状态
    def forward(self, y, init_state, context):
        y = self.embedding(y).permute(1, 0, 2)
        context = context.repeat(y.shape[0], 1, 1)
        y_and_context = torch.cat((y, context), dim=2)
        output, state = self.rnn(y_and_context, init_state)
        output = self.dense(output).permute(1, 0, 2)
        # output 形状：(batch_size, seq_len, vocab_size)
        # state 形状：(num_layers, batch_size, hiden_size)
        return output, state

<br>

整合编码器和解码器：

In [5]:
class SeqToSeqModel(nn.Module):
    def __init__(self, encoder: Encoder, decoder: Decoder):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, x, y):
        _, state = self.encoder(x)
        return self.decoder(y, state, state[-1])

<br>

损失函数设计：

In [6]:
# weights 的形状：(batch_size, seq_len)，表示每一个词元损失的权重
# valid_len 的形状：(batch_size,)

# 设计函数，用于屏蔽填充项的损失，获取每一个词元损失的权重
def seq_mask(weights, valid_len, value=0):
    seq_len = weights.shape[1]
    mask = torch.arange(seq_len)[None, :] < valid_len[:, None]
    # mask 的形状：(batch_size, seq_len)，保留部分为 True，不保留部分为 False

    weights[~mask] = value     # 选出 False 在的地方进行赋值
    # 舍弃一个填充词元的损失等于舍弃一整排vocab_size
    return weights


# 改写 CrossEntropyLoss 类，为其屏蔽填充项
class CELossWithMask(nn.CrossEntropyLoss):
    # 改写 forward 函数
    # 模型输出的 pred 形状：(batch_size, seq_len, vocab_size)
    # label 的形状：(batch_size, seq_len)，非独热编码
    # valid_len 的形状：(batch_size,)
    def forward(self, pred, label, valid_len):
        weights = torch.ones_like(label)    # 初始权重均为 1
        weights = seq_mask(weights, valid_len)

        # 设置不对每一个词元损失进行累加
        self.reduction = 'none'

        unweighted_loss = super(CELossWithMask, self).forward(pred.permute(0, 2, 1), label)
        # unweighted_loss 的形状：(batch_size, seq_len)，表示每个词元的损失

        weighted_loss = unweighted_loss * weights

        # 返回每一个样本的损失，形状为：(batch_size,)
        return weighted_loss.mean(dim=1)

<br>

训练过程如下：

In [7]:
from torch.utils.data import DataLoader
from torch.nn.utils import clip_grad_norm_


def train(model: SeqToSeqModel, data_loader: DataLoader, lr, num_epochs, device):
    model.to(device)

    # 损失函数与优化器的选择
    loss_func = CELossWithMask().to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr)

    model.train()
    for _ in range(num_epochs):
        for x, y, y_valid_len in data_loader:
            x.to(device)
            y.to(device)
            y_valid_len.to(device)

            # 损失的计算
            optimizer.zero_grad()
            y_hat, _ = model(x, y)
            loss = loss_func(y_hat, y, y_valid_len).sum()

            # 反向传播
            loss.backward()

            # 梯度裁剪与梯度更新
            clip_grad_norm_(model.parameters(), max_norm=1, norm_type=2)
            optimizer.step()

<br>

使用模型进行预测的过程如下：

In [None]:
# 模型预测时，编码器处理与训练时一致（提取特征）
# 解码器进行预测时，使用前一个时间步的预测词元作为下一个时间步的输入
# x 是一个样本形状为：(seq_len,)
# token_to_idx 是输出序列的词表
# max_len 是输出序列的最大长度
def predict(net: SeqToSeqModel, x, token_to_idx, max_len):
    _, state = net.encoder(x.reshape(1, -1))
    context = state[-1]
    dec_input = torch.tensor([token_to_idx['<bos>']]).reshape(1, 1)

    preds = []

    for _ in range(max_len):
        output, state = net.decoder(dec_input, state, context)
        dec_input = output.argmax(dim=2)
        pred = dec_input[0][0].item()

        if pred == token_to_idx['<eos>']:
            break
        preds.append(pred)
    return preds

# 该预测函数用于预测一个样本