In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

In [55]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model=256, num_hidden=512, num_heads=8, dropout=0.1, bias=False, Masked=False, **kwargs):
        """
        :param d_model: 输入的特征维度
        :param num_hidden: 线性变换的隐藏层维度
        :param num_heads: 注意力头的数量
        :param dropout: dropout概率
        :param bias: 是否使用偏置
        :param Masked: 是否使用掩码
        """
        super(MultiHeadAttention, self).__init__()
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.Masked = Masked
        self.num_heads = num_heads
        self.dropout = nn.Dropout(dropout)
        self.WQ = nn.Linear(d_model, num_hidden, bias=bias) # 线性变换，将输入转换为num_hidden维度
        self.WK = nn.Linear(d_model, num_hidden, bias=bias) # shape: [batch_size, seq_len, num_hidden]
        self.WV = nn.Linear(d_model, num_hidden, bias=bias)
        self.WO = nn.Linear(num_hidden, d_model, bias=bias) # shape: [batch_size, seq_len, d_model]
    def split_heads(self, x):
        """
        :param x: [batch_size, seq_len, d_model]
        :return: [batch_size, num_heads, seq_len, heads_features]
        """
        x = x.view(x.size(0), x.size(1), self.num_heads, -1) # 将d_model拆分为num_heads × feature
        return x.permute(0, 2, 1, 3) # 将num_heads提前到第二维，相当于将输入拆分为num_heads个子输入
    def combine_heads(self, x):
        """
        :param x: [batch_size, num_heads, seq_len, heads_features]
        :return: [batch_size, seq_len, d_model]
        """
        x = x.permute(0, 2, 1, 3) # 将num_heads提前到第三维
        return x.contiguous().view(x.size(0), x.size(1), -1) # 将num_heads × feature合并为d_model，便于多种操作
    def get_padding_mask(self, X, valid_lens, value=-1e9):
        """
        :param X: [batch_size, num_heads, seq_len, seq_len]
        :param valid_lens: [batch_size]
        :param value: 填充值
        :return: [batch_size, num_heads, seq_len, seq_len]
        """
        if valid_lens is None:
            return F.softmax(X, dim=-1)
        else:
            shape = X.shape
            if valid_lens.dim() == 1:
                mask = torch.arange(shape[-1])[None, :] >= valid_lens[:, None] # 生成填充掩码
            else:
                raise ValueError("valid_lens must be 1D")
            padding_mask = mask.unsqueeze(1).unsqueeze(1).repeat(1, shape[1], shape[2], 1) 
            return padding_mask
    def scaled_dot_product_attention(self, Q_head, K_head, V_head, valid_lens):
        """
        :param Q_head: [batch_size, num_heads, query_seq_len, heads_features]
        :param K_head: [batch_size, num_heads, key_seq_len, heads_features]
        :param V_head: [batch_size, num_heads, value_seq_len, heads_features] -> key_seq_len == value_seq_len
        :param valid_lens: [batch_size] 每个样本的有效长度
        :return: [batch_size, num_heads, value_seq_len, heads_features]
        """
        d_k = Q_head.size(-1)
        query_seq_len = Q_head.size(-2)
        key_seq_len = K_head.size(-2)
        scores = torch.matmul(Q_head, K_head.permute(0, 1, 3, 2)) / np.sqrt(d_k) # Q与K的点积
        padding_mask = self.get_padding_mask(scores, valid_lens) # 填充掩码，填充值为负无穷
        if self.Masked == True:
            causal_mask = torch.tril(torch.ones(query_seq_len, key_seq_len)).bool() # 因果掩码，只保留左下角的元素，防止获取未来信息
            mask = padding_mask & causal_mask # 与操作，只保留填充值和因果掩码的交集
        else:
            mask = padding_mask
        masked_scores = scores.masked_fill(mask, -1e9) # 忽略填充值和未来信息
        weights = F.softmax(self.dropout(masked_scores), dim=-1) # 经过dropout和softmax
        attention = torch.matmul(weights, V_head) # 加权求和
        return attention
    def forward(self, Q, K, V, valid_lens):
        """
        :param Q: [batch_size, query_seq_len, d_model]
        :param K: [batch_size, key_seq_len, d_model]
        :param V: [batch_size, value_seq_len, d_model] -> key_seq_len == value_seq_len
        :param valid_lens: [batch_size]
        :return: [batch_size, value_seq_len, d_model]
        """
        Q_head = self.split_heads(self.WQ(Q)) # 拆分Q，K，V为num_heads个子输入
        K_head = self.split_heads(self.WK(K)) # shape: [batch_size, num_heads, seq_len, feature]
        V_head = self.split_heads(self.WV(V)) 
        attention = self.scaled_dot_product_attention(Q_head, K_head, V_head, valid_lens)
        attention = self.combine_heads(attention)
        return self.WO(attention)

In [56]:
Q = torch.rand(8, 5, 256) * 10
K = torch.rand(8, 5, 256) * 10
V = torch.rand(8, 5, 256) * 10
model = MultiHeadAttention(d_model=256, num_hidden=512, num_heads=8)
value_lens = torch.tensor([1, 2, 3, 4, 3, 2, 3, 4])
output = model(Q, K, V, value_lens)
print(output.size())

scores: torch.Size([8, 8, 5, 5])
weights: torch.Size([8, 8, 5, 5])
torch.Size([8, 5, 512])


In [11]:
class LayerNorm(nn.Module):
    """
    BatchNorm 对输入的每个channel进行归一化
    LayerNorm 对输入的每个样本进行归一化
    """
    def __init__(self, normalized_shape, eps=1e-6):
        super(LayerNorm, self).__init__()
        self.eps = eps
        self.gamma = nn.Parameter(torch.ones(normalized_shape))
        self.beta = nn.Parameter(torch.zeros(normalized_shape))
    def forward(self, x):
        mean = x.mean(-1, keepdim=True)
        var = x.var(-1, keepdim=True)
        x = (x - mean) / torch.sqrt(var +self.eps)
        out = self.gamma * x + self.beta
        return out

In [59]:
class PositionWiseFFN(nn.Module):
    def __init__(self, input_size, num_hidden, output_size, dropout=0.1, **kwargs):
        super(PositionWiseFFN, self).__init__()
        self.fc1 = nn.Linear(input_size, num_hidden)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout)
        self.fc2 = nn.Linear(num_hidden, output_size)
    def forward(self, x):
        return self.fc2(self.dropout(self.relu(self.fc1(x))))

In [57]:
class AddNorm(nn.Module):
    def __init__(self, normalized_shape, dropout, **kwargs):
        super(AddNorm, self).__init__()
        self.dropout = nn.Dropout(dropout)
        self.norm = nn.LayerNorm(normalized_shape)
    def forward(self, X, Y):
        # X: 输入，Y: 经过多头注意力或者FFN的输出
        return self.norm(self.dropout(X + Y))

In [None]:
class PositionEncoding(nn.Module):
    def __init__(self, d_model, dropout, max_len=1000):
        super(PositionEncoding, self).__init__()
        self.dropout = nn.Dropout(dropout)
        self.P = torch.zeros((1, max_len, d_model))
        X = torch.arange(max_len).reshape(-1, 1) / torch.pow(10000, torch.arange(0, d_model, 2) / d_model)
        self.P[:, :, 0::2] = torch.sin(X)
        self.P[:, :, 1::2] = torch.cos(X)
    def forward(self, X):
        # X: [batch_size, seq_len, d_model]
        X = X + self.P[:, :X.size(1), :].to(X.device)
        return self.dropout(X)

In [None]:
class EncoderBlock(nn.Module):
    def __init__(self, d_model, num_hidden, num_heads, ffn_num_hidden, num_ffn=1, dropout=0, **kwargs):
        super(EncoderBlock, self).__init__()
        self.attention = MultiHeadAttention(d_model, num_hidden, num_heads, dropout, bias=False, Masked=False)
        self.addnorm1 = AddNorm(d_model, dropout)
        self.ffn = nn.Sequential()
        for i in range(num_ffn):
            self.ffn.add_module(f'ffn_{i}', PositionWiseFFN(d_model, ffn_num_hidden, d_model, dropout))
        self.addnorm2 = AddNorm(d_model, dropout)
    def forward(self, X, valid_lens):
        output = self.attention(X, X, X, valid_lens)
        output = self.addnorm1(X, output)
        X = output
        for ffn in self.ffn:
            output = ffn(output)
        output = self.addnorm2(X, output)
        return output

In [None]:
class TransformerEncoder(nn.Module):
    def __init__(self, vocab_size, d_model, num_hidden, num_heads, ffn_num_hidden, num_ffn=1, num_layers=1, dropout=0, **kwargs):
        """
        :param vocab_size: 词典大小（多少个不同的词）
        :param d_model: 要输出的词向量维度
        :param num_hidden: 线性变换的隐藏层维度
        :param num_heads: 注意力头的数量
        :param ffn_num_hidden: FFN的隐藏层维度
        :param num_ffn: FFN的层数
        :param num_layers: 编码器的层数
        :param dropout: dropout概率
        """
        super(TransformerEncoder, self).__init__()
        self.d_model = d_model
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_embedding = PositionEncoding(d_model, dropout)
        self.encoder = nn.Sequential()
        for i in range(num_layers):
            self.encoder.add_module(f'encoder_{i}', EncoderBlock(d_model, num_hidden, num_heads, ffn_num_hidden, num_ffn, dropout))
    def forward(self, X, valid_lens):
        # 因为位置编码值在 -1~1 之间，所以Embedding编码值需要乘以d_model的平方根，进行缩放，类似于Scaled Dot-Product Attention
        X = self.embedding(X) * np.sqrt(self.d_model)
        X = self.pos_embedding(X)
        self._attention_weights = [None] * len(self.encoder)
        for i, encoder in enumerate(self.encoder):
            X = encoder(X, valid_lens)
            self._attention_weights[i] = encoder.attention.attention.attention_weights
        return X
    @property
    def attention_weights(self):
        return self._attention_weights

In [None]:
class DecoderBlock(nn.Module):
    def __init__(self, d_model, num_hidden, num_heads, ffn_num_hidden, num_ffn=1, dropout=0, num_layers=1, **kwargs):
        super(DecoderBlock, self).__init__()
        self.attention1 = MultiHeadAttention(d_model, num_hidden, num_heads, dropout, bias=False, Masked=True)
        self.addnorm1 = AddNorm(d_model, dropout)
        self.attention2 = MultiHeadAttention(d_model, num_hidden, num_heads, dropout, bias=False, Masked=False)
        self.addnorm2 = AddNorm(d_model, dropout)
        self.ffn = nn.Sequential()
        self.num_layers = num_layers
        for i in range(num_ffn):
            self.ffn.add_module(f'ffn_{i}', PositionWiseFFN(d_model, ffn_num_hidden, d_model, dropout))
        self.addnorm3 = AddNorm(d_model, dropout)
    def forward(self, X, state):
        # 在预测阶段，输入的X是包含了当前时间步的词元以及之前所有时间步词元的序列，需要在每次输出后，将最新的预测值（即X[-1]）与输入进行拼接
        # 预测阶段需要重新写代码实现
        enc_outputs, enc_valid_lens, dec_valid_lens = state[0], state[1], state[2]
        output = self.attention1(X, X, X, dec_valid_lens)
        output = self.addnorm1(X, output)
        X = output
        output = self.attention2(output, enc_outputs, enc_outputs, enc_valid_lens)
        output = self.addnorm2(X, output)
        X = output
        for ffn in self.ffn:
            output = ffn(output)
        output = self.addnorm3(X, output)
        return output

In [None]:
class DecoderBlock(nn.Module):
    def __init__(self, d_model, num_hidden, num_heads, ffn_num_hidden, num_ffn=1, dropout=0, num_layers=1, **kwargs):
        super(DecoderBlock, self).__init__()
        self.attention1 = MultiHeadAttention(d_model, num_hidden, num_heads, dropout, bias=False, Masked=True)
        self.addnorm1 = AddNorm(d_model, dropout)
        self.attention2 = MultiHeadAttention(d_model, num_hidden, num_heads, dropout, bias=False, Masked=False)
        self.addnorm2 = AddNorm(d_model, dropout)
        self.ffn = nn.Sequential()
        self.num_layers = num_layers
        for i in range(num_ffn):
            self.ffn.add_module(f'ffn_{i}', PositionWiseFFN(d_model, ffn_num_hidden, d_model, dropout))
        self.addnorm3 = AddNorm(d_model, dropout)
    def forward(self, X, state):
        """
        对于掩码自注意力机制：Q，K，V都是X
        在训练阶段，输入全部序列，输出也是全部序列
        在预测阶段，第一个时间步输入的X是单个词元/起始符，之后时间步包含了当前时间步的词元以及之前所有时间步词元的序列
            因此，需要在每次输出后，将最新的预测值（即X[-1]）与输入进行拼接
            PS：在预测过程中，每次都会对之前所有的时间步进行预测，但是只有最后一个时间步的预测值（最新预测值）才会与输入拼接，并被用于下一个时间步的预测
                每次预测中，之前时间步的预测值只是为了使当前时间步的预测更准确，因为这样可以更好地利用上下文信息
        """
        enc_outputs, enc_valid_lens, dec_valid_lens = state[0], state[1], state[2]
        output = self.attention1(X, X, X, dec_valid_lens)
        output = self.addnorm1(X, output)
        X = output
        output = self.attention2(output, enc_outputs, enc_outputs, enc_valid_lens)
        output = self.addnorm2(X, output)
        X = output
        for ffn in self.ffn:
            output = ffn(output)
        output = self.addnorm3(X, output)
        return output, state

In [None]:
class TransformerDecoder(nn.Module):
    def __init__(self, vocab_size, d_model, num_hidden, num_heads, ffn_num_hidden, num_ffn=1, num_layers=1, dropout=0, training=True, **kwargs):
        super(TransformerDecoder, self).__init__()
        self.d_model = d_model
        self.num_layers = num_layers
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_embedding = PositionEncoding(d_model, dropout)
        self.decoder = nn.Sequential()
        for i in range(num_layers):
            self.decoder.add_module(f'decoder_{i}', DecoderBlock(d_model, num_hidden, num_heads, ffn_num_hidden, num_ffn, dropout, i))
        self.fc = nn.Linear(d_model, vocab_size)
        self.training = training
    def init_state(self, enc_outputs, enc_valid_lens, dec_valid_lens): # 初始化解码器的状态
        if self.training:
            # 训练阶段，所有（batch_size个）句子一同输入Decoder，因此state[2]为每个句子的有效长度（句子中词的个数），shape: [batch_size]
            return [enc_outputs, enc_valid_lens, dec_valid_lens]
        else:
            # 预测阶段，每个时间步只解码一个词元，因此state[2]为None
            return [enc_outputs, enc_valid_lens, None]
    def forward(self, X, state):
        """
        :param X: [batch_size, seq_len]
        :param state: enc_outputs, enc_valid_lens, key_values
        :return: [batch_size, seq_len, vocab_size]
        """
        X = self.embedding(X) * np.sqrt(self.d_model)
        X = self.pos_embedding(X)
        self._attention_weights = [[None] * len(self.decoder) for _ in range(2)]
        for i, decoder in enumerate(self.decoder):
            X, state = decoder(X, state)
            self._attention_weights[0][i] = decoder.attention1.attention.attention_weights # 解码器自注意力权重
            self._attention_weights[1][i] = decoder.attention2.attention.attention_weights # 编码器-解码器自注意力权重
        return self.fc(X), state
    @property
    def attention_weights(self):
        return self._attention_weights

In [None]:
class EncoderDecoder(nn.Module):
    def __init__(self, encoder, decoder, **kwargs):
        super(EncoderDecoder, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.start_token = 0 # 起始符索引
        self.end_token = 1 # 结束符索引
        self.output = None # 输出
    def forward(self, enc_X, dec_X, enc_valid_lens, dec_valid_lens):
        enc_outputs = self.encoder(enc_X, enc_valid_lens)
        dec_state = self.decoder.init_state(enc_outputs, enc_valid_lens, dec_valid_lens)
        return self.decoder(dec_X, dec_state)
    def predict(self, enc_X, dec_X, enc_valid_lens):
        # 预测阶段，每个时间步只解码一个词元，因此state[2]为None
        # enc_X: [batch_size, seq_len], dec_X: [batch_size, 1], enc_valid_lens: [batch_size]
        # PS：Encoder和Decoder中输入的词元均为词元的索引
        enc_outputs = self.encoder(enc_X, enc_valid_lens)
        dec_state = self.decoder.init_state(enc_outputs, enc_valid_lens, None)
        while self.output != self.end:
            self.output = torch.argmax(self.decoder(dec_X, dec_state)[:, -1, :], dim=-1)[:, None] # 输出shape: [batch_size, 1]
            dec_X = torch.cat((dec_X, self.output), dim=1)
        return self.output

In [None]:
# 先将词元转换为词元的索引，然后将词元的索引输入EncoderDecoder
# EncoderDecoder的输入是词元的索引，输出也是词元的索引
# 词元的索引是词典中词元的位置
# 1. 构建词典
# 2. 构建函数，将词典中的词元转换为词元的索引，并将词元的索引转换为词典中的词元

## 词典介绍：
# 词典中的词元是唯一的，每个词元对应一个索引
# 词典中的词元的索引是从0开始的，0表示起始符，1表示结束符
# 词典中的词元的索引是按照词典中词元的出现频率排序的，出现频率越高，索引越小