In [None]:
!pip install d2l==0.14.2

# **10.5 Multi-Head Attention**

10.5.1 Model

10.5.2 Implementation


In [None]:
import math
import torch
from torch import nn
from d2l import torch as d2l

In [None]:
#@save
  class MultiHeadAttention(nn.Module):
  """Multi-head attention."""
  def __init__(self, key_size, query_size, value_size, num_hiddens,
      num_heads, dropout, bias=False, **kwargs):
        super(MultiHeadAttention, self).__init__(**kwargs)
        self.num_heads = num_heads
        self.attention = d2l.DotProductAttention(dropout)
        self.W_q = nn.Linear(query_size, num_hiddens, bias=bias)
        self.W_k = nn.Linear(key_size, num_hiddens, bias=bias)
        self.W_v = nn.Linear(value_size, num_hiddens, bias=bias)
        self.W_o = nn.Linear(num_hiddens, num_hiddens, bias=bias)

  def forward(self, queries, keys, values, valid_lens):
    # Shape of `queries`, `keys`, or `values`:
    # (`batch_size`, no. of queries or key-value pairs, `num_hiddens`)
    # Shape of `valid_lens`:
    # (`batch_size`,) or (`batch_size`, no. of queries)
    # After transposing, shape of output `queries`, `keys`, or `values`:
    # (`batch_size` * `num_heads`, no. of queries or key-value pairs,
    # `num_hiddens` / `num_heads`)
    queries = transpose_qkv(self.W_q(queries), self.num_heads)
    keys = transpose_qkv(self.W_k(keys), self.num_heads)
    values = transpose_qkv(self.W_v(values), self.num_heads)
    if valid_lens is not None:
      # On axis 0, copy the first item (scalar or vector) for
      # `num_heads` times, then copy the next item, and so on
      valid_lens = torch.repeat_interleave(
        valid_lens, repeats=self.num_heads, dim=0)
      
    # Shape of `output`: (`batch_size` * `num_heads`, no. of queries,
    # `num_hiddens` / `num_heads`)
    output = self.attention(queries, keys, values, valid_lens)

    # Shape of `output_concat`:
    # (`batch_size`, no. of queries, `num_hiddens`)
    output_concat = transpose_output(output, self.num_heads)
    return self.W_o(output_concat)

In [None]:
#@save
def transpose_qkv(X, num_heads):
  """Transposition for parallel computation of multiple attention heads."""
  # Shape of input `X`:
  # (`batch_size`, no. of queries or key-value pairs, `num_hiddens`).
  # Shape of output `X`:
  # (`batch_size`, no. of queries or key-value pairs, `num_heads`,
  # `num_hiddens` / `num_heads`)
  X = X.reshape(X.shape[0], X.shape[1], num_heads, -1)
  # Shape of output `X`:

  # (`batch_size`, `num_heads`, no. of queries or key-value pairs,
  # `num_hiddens` / `num_heads`)
  X = X.permute(0, 2, 1, 3)
  # Shape of `output`:
  # (`batch_size` * `num_heads`, no. of queries or key-value pairs,
  # `num_hiddens` / `num_heads`)
  return X.reshape(-1, X.shape[2], X.shape[3])

#@save
def transpose_output(X, num_heads):
  """Reverse the operation of `transpose_qkv`."""
  X = X.reshape(-1, num_heads, X.shape[1], X.shape[2])
  X = X.permute(0, 2, 1, 3)
  return X.reshape(X.shape[0], X.shape[1], -1)

In [None]:
num_hiddens, num_heads = 100, 5
attention = MultiHeadAttention(num_hiddens, num_hiddens, num_hiddens,
num_hiddens, num_heads, 0.5)
attention.eval()

In [None]:
batch_size, num_queries, num_kvpairs, valid_lens = 2, 4, 6, torch.tensor([3, 2])
X = torch.ones((batch_size, num_queries, num_hiddens))
Y = torch.ones((batch_size, num_kvpairs, num_hiddens))
attention(X, Y, Y, valid_lens).shape

# **10.6 Self-Attention and Positional Encoding**

In [None]:
import math
import torch
from torch import nn
from d2l import torch as d2l

10.6.1 Self-Attention

In [None]:
num_hiddens, num_heads = 100, 5
attention = d2l.MultiHeadAttention(num_hiddens, num_hiddens, num_hiddens,
      num_hiddens, num_heads, 0.5)
attention.eval()

In [None]:
batch_size, num_queries, valid_lens = 2, 4, torch.tensor([3, 2])
X = torch.ones((batch_size, num_queries, num_hiddens))
attention(X, X, X, valid_lens).shape

10.6.2 Comparing CNNs, RNNs, and Self-Attention

10.6.3 Positional Encoding

In [None]:
#@save
class PositionalEncoding(nn.Module):
  """Positional encoding."""
  def __init__(self, num_hiddens, dropout, max_len=1000):
    super(PositionalEncoding, self).__init__()
    self.dropout = nn.Dropout(dropout)
    # Create a long enough `P`
    self.P = torch.zeros((1, max_len, num_hiddens))
    X = torch.arange(max_len, dtype=torch.float32).reshape(
      -1, 1) / torch.pow(10000, torch.arange(
       0, num_hiddens, 2, dtype=torch.float32) / num_hiddens)
    self.P[:, :, 0::2] = torch.sin(X)
    self.P[:, :, 1::2] = torch.cos(X)

  def forward(self, X):
    X = X + self.P[:, :X.shape[1], :].to(X.device)
    return self.dropout(X)

In [None]:
encoding_dim, num_steps = 32, 60
pos_encoding = PositionalEncoding(encoding_dim, 0)
pos_encoding.eval()
X = pos_encoding(torch.zeros((1, num_steps, encoding_dim)))
P = pos_encoding.P[:, :X.shape[1], :]
d2l.plot(torch.arange(num_steps), P[0, :, 6:10].T, xlabel='Row (position)',
    figsize=(6, 2.5), legend=["Col %d" % d for d in torch.arange(6, 10)])

In [None]:
for i in range(8):
print(f'{i} in binary is {i:>03b}')

In [None]:
P = P[0, :, :].unsqueeze(0).unsqueeze(0)
d2l.show_heatmaps(P, xlabel='Column (encoding dimension)',
          ylabel='Row (position)', figsize=(3.5, 4), cmap='Blues')

# **10.7 Transformer**

10.7.1 Model

In [None]:
import math
import pandas as pd
import torch
from torch import nn
from d2l import torch as d2l

10.7.2 Positionwise Feed-Forward Networks

In [None]:
#@save
class PositionWiseFFN(nn.Module):
  """Positionwise feed-forward network."""
  def __init__(self, ffn_num_input, ffn_num_hiddens, ffn_num_outputs,
        **kwargs):
    super(PositionWiseFFN, self).__init__(**kwargs)
    self.dense1 = nn.Linear(ffn_num_input, ffn_num_hiddens)
    self.relu = nn.ReLU()
    self.dense2 = nn.Linear(ffn_num_hiddens, ffn_num_outputs)

  def forward(self, X):
    return self.dense2(self.relu(self.dense1(X)))

In [None]:
ffn = PositionWiseFFN(4, 4, 8)
ffn.eval()
ffn(torch.ones((2, 3, 4)))[0]

10.7.3 Residual Connection and Layer Normalization

In [None]:
ln = nn.LayerNorm(2)
bn = nn.BatchNorm1d(2)
X = torch.tensor([[1, 2], [2, 3]], dtype=torch.float32)
# Compute mean and variance from `X` in the training mode
print('layer norm:', ln(X), '\nbatch norm:', bn(X))

In [None]:
#@save
class AddNorm(nn.Module):
  """Residual connection followed by layer normalization."""
  def __init__(self, normalized_shape, dropout, **kwargs):
    super(AddNorm, self).__init__(**kwargs)
    self.dropout = nn.Dropout(dropout)
    self.ln = nn.LayerNorm(normalized_shape)

  def forward(self, X, Y):
    return self.ln(self.dropout(Y) + X)

In [None]:
add_norm = AddNorm([3, 4], 0.5) # Normalized_shape is input.size()[1:]
add_norm.eval()
add_norm(torch.ones((2, 3, 4)), torch.ones((2, 3, 4))).shape

10.7.4 Encoder

In [None]:
#@save
class EncoderBlock(nn.Module):
  """Transformer encoder block."""
  def __init__(self, key_size, query_size, value_size, num_hiddens,
      norm_shape, ffn_num_input, ffn_num_hiddens, num_heads,
      dropout, use_bias=False, **kwargs):
    super(EncoderBlock, self).__init__(**kwargs)
    self.attention = d2l.MultiHeadAttention(
          key_size, query_size, value_size, num_hiddens, num_heads, dropout,
          use_bias)
    self.addnorm1 = AddNorm(norm_shape, dropout)
    self.ffn = PositionWiseFFN(
        ffn_num_input, ffn_num_hiddens, num_hiddens)
    self.addnorm2 = AddNorm(norm_shape, dropout)

  def forward(self, X, valid_lens):
    Y = self.addnorm1(X, self.attention(X, X, X, valid_lens))
    return self.addnorm2(Y, self.ffn(Y))

In [None]:
X = torch.ones((2, 100, 24))
valid_lens = torch.tensor([3, 2])
encoder_blk = EncoderBlock(24, 24, 24, 24, [100, 24], 24, 48, 8, 0.5)
encoder_blk.eval()
encoder_blk(X, valid_lens).shape

In [None]:
#@save
class TransformerEncoder(d2l.Encoder):
  """Transformer encoder."""
  def __init__(self, vocab_size, key_size, query_size, value_size,
        num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens,
        num_heads, num_layers, dropout, use_bias=False, **kwargs):
    super(TransformerEncoder, self).__init__(**kwargs)
    self.num_hiddens = num_hiddens
    self.embedding = nn.Embedding(vocab_size, num_hiddens)
    self.pos_encoding = d2l.PositionalEncoding(num_hiddens, dropout)
    self.blks = nn.Sequential()
    for i in range(num_layers):
      self.blks.add_module("block"+str(i),
        EncoderBlock(key_size, query_size, value_size, num_hiddens,
        norm_shape, ffn_num_input, ffn_num_hiddens,
        num_heads, dropout, use_bias))
      
  def forward(self, X, valid_lens, *args):
    # Since positional encoding values are between -1 and 1, the embedding
    # values are multiplied by the square root of the embedding dimension
    # to rescale before they are summed up
    X = self.pos_encoding(self.embedding(X) * math.sqrt(self.num_hiddens))
    self.attention_weights = [None] * len(self.blks)
    for i, blk in enumerate(self.blks):
      X = blk(X, valid_lens)
      self.attention_weights[
        i] = blk.attention.attention.attention_weights
    return X

In [None]:
encoder = TransformerEncoder(
  200, 24, 24, 24, 24, [100, 24], 24, 48, 8, 2, 0.5)
encoder.eval()
encoder(torch.ones((2, 100), dtype=torch.long), valid_lens).shape

10.7.5 Decoder

In [None]:
class DecoderBlock(nn.Module):
  # The `i`-th block in the decoder
  def __init__(self, key_size, query_size, value_size, num_hiddens,
      norm_shape, ffn_num_input, ffn_num_hiddens, num_heads,
      dropout, i, **kwargs):
    super(DecoderBlock, self).__init__(**kwargs)
    self.i = i
    self.attention1 = d2l.MultiHeadAttention(
      key_size, query_size, value_size, num_hiddens, num_heads, dropout)
    self.addnorm1 = AddNorm(norm_shape, dropout)
    self.attention2 = d2l.MultiHeadAttention(
      key_size, query_size, value_size, num_hiddens, num_heads, dropout)
    self.addnorm2 = AddNorm(norm_shape, dropout)
    self.ffn = PositionWiseFFN(ffn_num_input, ffn_num_hiddens,
      num_hiddens)
    self.addnorm3 = AddNorm(norm_shape, dropout)

  def forward(self, X, state):
    enc_outputs, enc_valid_lens = state[0], state[1]
    # During training, all the tokens of any output sequence are processed
    # at the same time, so `state[2][self.i]` is `None` as initialized.
    # When decoding any output sequence token by token during prediction,
    # `state[2][self.i]` contains representations of the decoded output at
    # the `i`-th block up to the current time step
    if state[2][self.i] is None:
      key_values = X
    else:
      key_values = torch.cat((state[2][self.i], X), axis=1)
      state[2][self.i] = key_values
    if self.training:
      batch_size, num_steps, _ = X.shape
      # Shape of `dec_valid_lens`: (`batch_size`, `num_steps`), where
      # every row is [1, 2, ..., `num_steps`]
      dec_valid_lens = torch.arange(
        1, num_steps + 1, device=X.device).repeat(batch_size, 1)
    else:
      dec_valid_lens = None

    # Self-attention
    X2 = self.attention1(X, key_values, key_values, dec_valid_lens)
    Y = self.addnorm1(X, X2)
    # Encoder-decoder attention. Shape of `enc_outputs`:
    # (`batch_size`, `num_steps`, `num_hiddens`)
    Y2 = self.attention2(Y, enc_outputs, enc_outputs, enc_valid_lens)
    Z = self.addnorm2(Y, Y2)
    return self.addnorm3(Z, self.ffn(Z)), state