# Attention-based Seq2Seq

+ 视频：[35、基于PyTorch手写Attention-based Seq2Seq模型](https://www.bilibili.com/video/BV1ML411w7k7/)

以 NMT 任务为例，实现基于注意力机制的 Seq2Seq 模型。

**NMT**：Nerual Machine Translation

In [21]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import Tensor

## Encoder

此处 Encoder 使用 LSTM 来实现，将 input sequence 建模为上下文相关的 vector sequence。

In [22]:
class Seq2SeqEncoder(nn.Module):
    """
    实现基于 LSTM 的 encoder，也可以是其他类型的，比如 CNN、TransformerEncoder
    """
    
    def __init__(self, embed_dim: int, hidden_size: int, src_vocab_size: int) -> None:
        """
        :param embed_dim: token 的嵌入维度
        :param hidden_size: LSTM 的隐藏状态的大小
        :param src_vocab_size: 输入词汇表的大小
        """
        super().__init__()
        
        self.lstm_layer = nn.LSTM(
            input_size=embed_dim,
            hidden_size=hidden_size,
            batch_first=True
        )
        
        self.embedding_table = nn.Embedding(src_vocab_size, embed_dim)
    
    def forward(self, input_ids: Tensor):
        """
        :param input_ids: 由 token id 组成的输入序列
        :return: LSTM 输出的 output 和 h_final
        LSTM 的 output: 各时刻输出的 hidden state，形状 [batch, seq_len, num_directions * hidden_size]
        h_final: LSTM 最后时刻的 hidden state，形状 [num_directions * num_layers, hidden_size]
        """
        input_seq = self.embedding_table(input_ids)  # [batch, seq_len, embed_dim]
        output_states, (h_final, c_final) = self.lstm_layer(input_seq)
        return output_states, h_final

## 注意力机制

这里使用 dot-product 的 Attention 机制，其中没有可学习的参数。

我们假设 input 不是流式的，也就是可以一次性拿到一整个 input sequence，进而经过 encoder 直接得到整个 `encoder_states`。

由于 decoder 是自回归形式来产生结果的，所以 decoder 每次运算都会调用一次 AttentionMachanism 的 forward 函数来获得 context vector，并使用它来进行 decode。

这里的 Attention 是假设了 encoder 与 decoder 的 hidden_size 是大小一致的。如果不一致的话，可以在实现中加一层 MLP 映射一下。

In [23]:
class Seq2SeqAttentionMachanism(nn.Module):
    """
    实现了 dot-product 的 Attention
    """
    def __init__(self) -> None:
        super().__init__()
    
    def forward(self, decoder_state_t: Tensor, encoder_states: Tensor):
        """

        :param decoder_state_t: t 时刻的 decoder state，[batch, hidden_size]
        :param encoder_states: 输入序列的全部位置的 encoder states，[batch, src_len, hidden_size]
        """
        bs, src_len, hidden_size = encoder_states.shape
        
        # 计算 t 时刻的 decoder state 与全部位置的 encoder states 的 attention scores
        # 由于 decoder_state_t 只是一个时刻的 state，因此需要先对 decoder_state_t 进行扩维
        decoder_state_t = decoder_state_t.unsqueeze(1)  # [batch, 1, hidden_size]
        decoder_state_t = decoder_state_t.tile([1, src_len, 1])  # [batch, src_len, hidden_size]
        # 计算 scores 与 scores 的规范化
        scores = torch.sum(decoder_state_t * encoder_states, dim=-1)  # [bs, src_len]
        attn_probs = F.softmax(scores, dim=-1)  # 也就是 attention weights，[bs, src_len]
        
        # 获得 context vector，它就是 t 时刻 decoder 所需要的上下文向量
        context = torch.sum(attn_probs.unsqueeze(-1) * encoder_states, dim=1)  # 按照 attention weights 加权求和
        
        return attn_probs, context

## Decoder

训练阶段使用 **Teacher Force** 的训练方式。

In [24]:
class Seq2SeqDecoder(nn.Module):
    def __init__(
        self,
        embed_dim: int,
        hidden_size: int,
        num_classes: int,
        target_vocab_size: int,
        start_id: int,
        end_id: int
    ) -> None:
        super().__init__()
        
        self.num_classes = num_classes  # 目标领域的 token 个数，在 NMT 任务中，其实就是 target_vocab_size
        self.start_id = start_id  # 最开始的输入
        self.end_id = end_id  # 指示预测的结束
        
        self.lstm_cell = nn.LSTMCell(embed_dim, hidden_size)
        self.proj_layer = nn.Linear(hidden_size * 2, num_classes)  # 映射到分类的分布上，这里输入选择 hidden_size * 2，是因为我们参考论文将 context 和 decoder state 一起拼起来送给 linear 层来分类
        self.attn_machanism = Seq2SeqAttentionMachanism()  # 用于获得 decoder 所需要的 context vector
        self.embedding_table = nn.Embedding(target_vocab_size, embed_dim)  # 目标序列的 embedding
    
    def forward(self, shifted_target_ids: Tensor, encoder_states: Tensor):
        """
        训练阶段调用
        这里每一时刻都是把 target token 输给 decoder，因此这种训练方式也叫“Teacher Force”的训练
        :param shifted_target_ids: Decoder 的输入，这与预测向右偏移了一位，因此这个序列的第一个位置是一个 start_id
        :param encoder_states: 完整的 encoder 的输出 states
        """
        shifted_target = self.embedding_table(shifted_target_ids)
        
        bs, target_len, embed_dim = shifted_target.shape
        bs, src_len, hidden_size = encoder_states.shape
        
        logits = torch.zeros(bs, target_len, self.num_classes)  # 每一个时刻的分类的分布
        probs = torch.zeros(bs, target_len, src_len)
        
        # 在训练阶段，我们的 target length 是知道的，因此这里可以用一个 for 循环来完成一个 sequence 的训练
        # 若在 inference 阶段，那么 target length 是不知道的，那么无法使用 for 循环
        for t in range(target_len):
            decoder_input_t = shifted_target[:, t, :]  # 当前时刻的 decoder 的 input，[bs, embed_dim]
            # 由于只有一步，所以我们才使用 LSTMCell
            if t == 0:
                h_t, c_t = self.lstm_cell(decoder_input_t)
            else:
                h_t, c_t = self.lstm_cell(decoder_input_t, (h_t, c_t))
            
            # 让当前时刻 decoder state 与 encoder states 做 Attention，得到 context vector
            attn_prob, context = self.attn_machanism(h_t, encoder_states)
            
            # 将 context 和 decoder state 拼接在一起，一同丢给 linear 来做分类
            decoder_output = torch.cat([context, h_t], dim=-1)
            
            logits[:, t, :] = self.proj_layer(decoder_output)  # 当前时刻的分类分布
            probs[:, t, :] = attn_prob  # 当前 decoder state 与 encoder states 的 attention weights
            
        return probs, logits
    
    def inference(self, encoder_states: Tensor):
        """
        推理阶段使用
        :param encoder_states: encoder 的完整 states
        """
        target_id = torch.tensor([self.start_id], dtype=torch.int32)  # 固定以 start_id 开始
        print('target_id', target_id)
        h_t = None
        result = []
        
        while True:
            decoder_input_t = self.embedding_table(target_id)
            if h_t is None:
                h_t, c_t = self.lstm_cell(decoder_input_t)
            else:
                h_t, c_t = self.lstm_cell(decoder_input_t, (h_t, c_t))
            
            attn_prob, context = self.attn_machanism(h_t, encoder_states)
            
            decoder_output = torch.cat([context, h_t], dim=-1)
            logits = self.proj_layer(decoder_output)
            
            target_id = torch.argmax(logits, -1)  # 分类结果
            print('target_id', target_id)
            result.append(target_id)
            
            if torch.any(target_id == self.end_id):  # 解码的终止条件
                # stop decoding
                break
        
        predicted_ids = torch.tensor(result).reshape([-1])
        return predicted_ids

## Seq2Seq

将 Encoder 与 Decoder 结合起来，形成一个完整的 Seq2Seq model。

In [25]:
class Seq2Seq(nn.Module):
    
    def __init__(
        self,
        embed_dim: int,
        hidden_size: int,
        num_classes: int,
        src_vocab_size: int,
        target_vocab_size: int,
        start_id: int,
        end_id: int
    ) -> None:
        super().__init__()
        
        self.encoder = Seq2SeqEncoder(embed_dim, hidden_size, src_vocab_size)
        
        self.decoder = Seq2SeqDecoder(embed_dim, hidden_size, num_classes,
                                      target_vocab_size, start_id, end_id)
        
    def forward(self, input_seq_ids: Tensor, shifted_target_ids: Tensor):
        """
        训练阶段使用
        :param input_seq_ids: src sequence 的 id 序列，[batch, src_seq_len]
        :param shifted_target_ids: target sequence 的向右偏移一个单位的 id 序列，[batch, target_seq_len]
        """
        encoder_states, h_final = self.encoder(input_seq_ids)
        probs, logits = self.decoder(shifted_target_ids, encoder_states)
        return probs, logits
    
    def inference(self, input_seq_ids: Tensor):
        """
        推理阶段使用
        :param input_seq_ids: src sequence 的 id 序列，[]
        """
        encoder_states, h_final = self.encoder(input_seq_ids)
        predicted_ids = self.decoder.inference(encoder_states)
        return predicted_ids

## 主程序

这里没有进行训练，在实际任务，根据任务类型来选择 loss function：

+ 若是分类任务，则采用 cross-entropy；
+ 若是回归任务，则采用欧氏距离 L1 或 L2。

In [26]:
SRC_LEN = 3  # src seq length
TGT_LEN = 4  # target seq length
EMBED_DIM = 8
HIDDEN_SIZE = 16
BATCH_SIZE = 2
START_ID = 0
END_ID = 0
SRC_VOCAB_SIZE = 100  # src 的词表大小
TGT_VOCAB_SIZE = 100  # tgt 的词表大小
NUM_CLASSES = TGT_VOCAB_SIZE

input_seq_ids = torch.randint(low=0, high=SRC_VOCAB_SIZE, size=[BATCH_SIZE, SRC_LEN]).to(torch.int32)

target_ids = torch.randint(low=0, high=TGT_VOCAB_SIZE, size=[BATCH_SIZE, TGT_LEN])
target_ids = torch.cat([target_ids, END_ID * torch.ones(BATCH_SIZE, 1)], dim=1)  # 在每个 seq 的结尾加了个 end_id

# 将 target seq 右移一位，这里丢弃了第一个 token，实际上不应该丢弃的，但为了方便
shifted_target_ids = torch.cat([START_ID * torch.ones(BATCH_SIZE, 1), target_ids[:, 1:]], dim=1).to(torch.int32)

model = Seq2Seq(EMBED_DIM, HIDDEN_SIZE, NUM_CLASSES, SRC_VOCAB_SIZE, TGT_VOCAB_SIZE, START_ID, END_ID)
probs, logits = model(input_seq_ids, shifted_target_ids)

print(f'probs shape:{probs.shape}')
print(f'logits shape:{logits.shape}')

probs shape:torch.Size([2, 5, 3])
logits shape:torch.Size([2, 5, 100])


往下就是 inference 过程了，但由于 model 没有被训练，这会导致出现不正常的输出，因此下面就不运行了

In [27]:
raise

RuntimeError: No active exception to reraise

In [None]:
test_input_seq_ids = torch.randint(low=0, high=SRC_VOCAB_SIZE, size=[1, SRC_LEN]).to(torch.int32)
print('input: ', test_input_seq_ids)
output_seq = model.inference(test_input_seq_ids)
print('output: ', output_seq)