# Transformer

![img](https://zh.d2l.ai/_images/transformer.svg)


In [33]:
import math
import pandas as pd
import torch
from torch import nn

from NeuralNetwork import Attention
from d2l import torch as d2l

In [34]:
class PositionWiseFFN(nn.Module):
    """
    基于位置的MLP

    Args:
        ffn_num_input,
        ffn_num_hiddens,
        ffn_num_outputs,

    Inputs:
        X (torch.Tensor) : shape->(batch_size, seq_len, feature_size)
    Outputs:
        shape->(batch_size, seq_len, ffn_num_outputs)

    """

    def __init__(
            self,
            ffn_num_input,
            ffn_num_hiddens,
            ffn_num_outputs,
            **kwargs
    ):
        super(PositionWiseFFN, self).__init__()
        self.dense1 = nn.Linear(ffn_num_input, ffn_num_hiddens)
        self.relu = nn.ReLU()
        self.dense2 = nn.Linear(ffn_num_hiddens, ffn_num_outputs)

    def forward(self, X):
        X = self.dense1(X)
        X = self.relu(X)
        y = self.dense2(X)
        return y

In [35]:
ffn = PositionWiseFFN(4, 4, 8)
ffn.eval()
ffn(torch.ones((2, 3, 4))).shape

torch.Size([2, 3, 8])

In [36]:
ln = nn.LayerNorm(normalized_shape=2)
bn = nn.BatchNorm1d(num_features=2)
X = torch.tensor([[1, 2], [2, 3]], dtype=torch.float)
print('X:', X)
print('ln: ', ln(X))
print('bn: ', bn(X))

X: tensor([[1., 2.],
        [2., 3.]])
ln:  tensor([[-1.0000,  1.0000],
        [-1.0000,  1.0000]], grad_fn=<NativeLayerNormBackward0>)
bn:  tensor([[-1.0000, -1.0000],
        [ 1.0000,  1.0000]], grad_fn=<NativeBatchNormBackward0>)


## 残差连接和层规范化来实现AddNorm类

In [37]:
class AddNorm(nn.Module):
    """
    残差连接后的层规范化

    """

    def __init__(self, normalized_shape, dropout):
        super(AddNorm, self).__init__()
        self.dropout = nn.Dropout(dropout)
        self.ln = nn.LayerNorm(normalized_shape=normalized_shape)

    def forward(self, X, Y):
        return self.ln(self.dropout(Y) + X)

In [38]:
add_norm = AddNorm([3, 4], dropout=0.5)
add_norm.eval()
add_norm(torch.ones((2, 3, 4)), torch.ones(2, 3, 4))

tensor([[[0., 0., 0., 0.],
         [0., 0., 0., 0.],
         [0., 0., 0., 0.]],

        [[0., 0., 0., 0.],
         [0., 0., 0., 0.],
         [0., 0., 0., 0.]]], grad_fn=<NativeLayerNormBackward0>)

## Transformer Encoder Block

In [39]:
from NeuralNetwork import Attention


class EncoderBlock(nn.Module):
    """
    transformer 编码器

    Inputs:
        X (torch.Tensor) : shape->(batch_size, seq_len, feature_size)
    Outputs:
        Y (torch.Tensor) : shape->(batch_size, seq_len, feature_size)
    """

    def __init__(
            self,
            key_size,
            query_size,
            value_size,
            num_hiddens,
            norm_shape,
            ffn_num_input,
            ffn_num_hiddens,
            num_heads,
            dropout,
            use_bias=False
    ):
        super(EncoderBlock, self).__init__()
        self.attention = Attention.MultiHeadAttention(
            key_size=key_size,
            query_size=query_size,
            value_size=value_size,
            num_hiddens=num_hiddens,
            num_heads=num_heads,
            dropout=dropout,
            bias=use_bias,
        )
        self.addnorm1 = AddNorm(
            normalized_shape=norm_shape,
            dropout=dropout
        )
        self.ffn = PositionWiseFFN(
            ffn_num_input=ffn_num_input,
            ffn_num_hiddens=ffn_num_hiddens,
            ffn_num_outputs=num_hiddens,
        )
        self.addnorm2 = AddNorm(
            normalized_shape=norm_shape,
            dropout=dropout,
        )

    def forward(self, X, valid_lens):
        Y = self.addnorm1(X, self.attention(X, X, X, valid_lens))
        Y = self.addnorm2(Y, self.ffn(Y))
        return Y


In [40]:
X = torch.ones((2, 100, 24))
valid_lens = torch.tensor([3, 2])
encoder_blk = EncoderBlock(24, 24, 24, 24, [100, 24], 24, 48, 8, 0.5)
y = encoder_blk(X, valid_lens)
y.shape

torch.Size([2, 100, 24])

In [41]:
class TransformerEncoder(nn.Module):
    def __init__(
            self,
            vocab_size,
            key_size,
            query_size,
            value_size,
            num_hiddens,
            norm_shape,
            ffn_num_input,
            ffn_num_hiddens,
            num_heads,
            num_layers,
            dropout,
            use_bias=False,
    ):
        super(TransformerEncoder, self).__init__()
        self.num_hiddens = num_hiddens
        self.embedding = nn.Embedding(vocab_size, num_hiddens)
        self.pos_encoding = Attention.PositionalEncoding(num_hiddens, dropout=dropout)
        self.blks = nn.Sequential()
        for i in range(num_layers):
            self.blks.add_module(name='block' + str(i), module=
            EncoderBlock(key_size, query_size, value_size, num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens,
                         num_heads, dropout, use_bias))

    def forward(self, X, valid_lens):
        """
        Args:
            X(torch.Tensor) :shape->(batch_size, seq_len)
        """
        X = self.pos_encoding(self.embedding(X) * math.sqrt(self.num_hiddens))
        self.attention_weights = [None] * len(self.blks)
        for i, blk in enumerate(self.blks):
            X = blk(X, valid_lens)
            self.attention_weights[i] = blk.attention.attention.attention_weights
        return X

In [42]:
encoder = TransformerEncoder(
    vocab_size=200,
    key_size=24,
    query_size=24,
    value_size=24,
    num_hiddens=24,
    norm_shape=[100, 24],
    ffn_num_input=24,
    ffn_num_hiddens=48,
    num_heads=8,
    num_layers=2,
    dropout=0.5,
)
encoder.eval()
X = torch.ones((2, 100), dtype=torch.long)
valid_lens = torch.tensor([3, 2])
encoder(X, valid_lens).shape

torch.Size([2, 100, 24])

In [43]:
class DecoderBlock(nn.Module):
    def __init__(self, key_size, query_size, value_size, num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens,
                 num_heads, dropout, i, **kwargs):
        super(DecoderBlock, self).__init__()
        self.i = i
        self.attention1 = Attention.MultiHeadAttention(
            key_size, query_size, value_size, num_hiddens, num_heads, dropout)
        self.addnorm1 = AddNorm(norm_shape, dropout)
        self.attention2 = Attention.MultiHeadAttention(
            key_size, query_size, value_size, num_hiddens, num_heads, dropout)
        self.addnorm2 = AddNorm(norm_shape, dropout)
        self.ffn = PositionWiseFFN(ffn_num_input, ffn_num_hiddens, num_hiddens)
        self.addnorm3 = AddNorm(norm_shape, dropout)

    def forward(self, X, state):
        enc_outputs, enc_valid_lens = state[0], state[1]
        if state[2][self.i] is None:
            key_values = X
        else:
            key_values = torch.cat((state[2][self.i], X), axis=1)
        state[2][self.i] = key_values
        if self.training:
            batch_size, seq_len, _ = X.shape
            dec_valid_lens = torch.arange(1, seq_len + 1, device=X.device).repeat(batch_size, 1)
        else:
            dec_valid_lens = None

        # 自注意力
        X2 = self.attention1(X, key_values, key_values, dec_valid_lens)
        Y = self.addnorm1(X, X2)
        Y2 = self.attention2(Y, enc_outputs, enc_outputs, dec_valid_lens)
        Z = self.addnorm2(Y, Y2)
        Z2 = self.ffn(Z)
        dec_outputs = self.addnorm3(Z, Z2)
        return dec_outputs, state

In [45]:
decoder_blk = DecoderBlock(
    key_size=24,
    query_size=24,
    value_size=24,
    num_hiddens=24,
    norm_shape=[100, 24],
    ffn_num_input=24,
    ffn_num_hiddens=48,
    num_heads=8,
    dropout=0.5,
    i=0
)
X = torch.ones((2, 100, 24))
state = [encoder_blk(X, valid_lens), valid_lens, [None]]
decoder_blk(X, state)[0].shape

torch.Size([2, 100, 24])

In [None]:
class TransformerDecoder(nn.Module):
    def __init__(self, vocab_size, key_size, query_size, value_size,
                 num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens,
                 num_heads, num_layers, dropout, **kwargs):
        super(TransformerDecoder, self).__init__(**kwargs)
        self.num_hiddens = num_hiddens
        self.num_layers = num_layers
        self.embedding = nn.Embedding(vocab_size, num_hiddens)
        self.pos_encoding = Attention.PositionalEncoding(num_hiddens, dropout)
        self.blks = nn.Sequential()
        for i in range(num_layers):
            self.blks.add_module("block" + str(i),
                                 DecoderBlock(key_size, query_size, value_size, num_hiddens,
                                              norm_shape, ffn_num_input, ffn_num_hiddens,
                                              num_heads, dropout, i))
        self.dense = nn.Linear(num_hiddens, vocab_size)

    def init_state(self, enc_outputs, enc_valid_lens, *args):
        return [enc_outputs, enc_valid_lens, [None] * self.num_layers]

    def forward(self, X, state):
        X = self.pos_encoding(self.embedding(X) * math.sqrt(self.num_hiddens))
        self._attention_weights = [[None] * len(self.blks) for _ in range(2)]
        for i, blk in enumerate(self.blks):
            X, state = blk(X, state)
            # 解码器自注意力权重
            self._attention_weights[0][
                i] = blk.attention1.attention.attention_weights
            # “编码器－解码器”自注意力权重
            self._attention_weights[1][
                i] = blk.attention2.attention.attention_weights
        return self.dense(X), state

    @property
    def attention_weights(self):
        return self._attention_weights