# Transformers

© Data Trainers LLC. GPL v 3.0.

Author: Axel Sirota


---



Inspired highly on the tutorial [NMT with Transformers](https://www.tensorflow.org/text/tutorials/transformer) which takes the code from the original Transformer model paper originally proposed in ["Attention is all you need"](https://arxiv.org/abs/1706.03762) by Vaswani et al. (2017).

## Prep

In [57]:
!pip install --upgrade  textblob gensim pytorch-nlp swifter




In [58]:

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import itertools
import sys
from textblob import TextBlob, Word
import numpy as np
import random
import re
import swifter
from torch.utils.data import TensorDataset
from torch.utils.data import DataLoader

import os
import pandas as pd
import gensim
import warnings
import nltk


def set_seeds_and_trace():
  torch.manual_seed(0)
  os.environ['PYTHONHASHSEED'] = '0'
  np.random.seed(42)
  random.seed(42)

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
set_seeds_and_trace()
warnings.filterwarnings('ignore')
nltk.download('punkt')
textblob_tokenizer = lambda x: TextBlob(x).words

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


## The Transformer Layers

In this demo we will create, from scratch, with the same tools the original Authors had, the Transformer architecture. Why? To understand how it works, why it works, and exactly what is novel!

<table>
<tr>
  <th colspan=1>The original Transformer diagram</th>
  <th colspan=1>A representation of a 4-layer Transformer</th>
</tr>
<tr>
  <td>
   <img width=400 src="https://www.tensorflow.org/images/tutorials/transformer/transformer.png"/>
  </td>
  <td>
   <img width=307 src="https://www.tensorflow.org/images/tutorials/transformer/Transformer-4layer-compact.png"/>
  </td>
</tr>
</table>

Each of the components in these two diagrams will be explained as you progress through the demo.


### What did we have before?

Before, we used Cross Attention or self attention, remember? And for sequence data we basically used it like this:

<table>
<tr>
  <th colspan=1>Seq2Seq with attention</th>
<tr>
<tr>
  <td>
   <img src="https://www.dropbox.com/s/r6u7ll5nlt96t9f/seq2seq.png?raw=1"/>
  </td>
</tr>
</table>



Where we input attention with the hidden state to create another updated hidden state we could input into the next cell. And this worked well on medium sized sentences, but was hard to train and unstable. Now that we know this, the Transformer basicaly tried to get rid of the RNN by using **only** attention

### The embedding and positional encoding layer

The inputs to both the encoder and decoder use the same embedding and positional encoding logic.

<table>
<tr>
  <th colspan=1>The embedding and positional encoding layer</th>
<tr>
<tr>
  <td>
   <img src="https://www.tensorflow.org/images/tutorials/transformer/PositionalEmbedding.png"/>
  </td>
</tr>
</table>

In [59]:
## This comes straight from the paper

def positional_encoding(length, depth):
    depth = depth // 2
    positions = np.arange(length)[:, np.newaxis]
    depths = np.arange(depth)[np.newaxis, :] / depth

    angle_rates = 1 / (10000 ** depths)
    angle_rads = positions * angle_rates

    pos_encoding = np.concatenate([np.sin(angle_rads), np.cos(angle_rads)], axis=-1)
    return torch.tensor(pos_encoding, dtype=torch.float32)


In [60]:
class PositionalEmbedding(nn.Module):
    def __init__(self, vocab_size, d_model):
        super(PositionalEmbedding, self).__init__()
        self.d_model = d_model
        self.embedding = nn.Embedding(vocab_size, d_model, padding_idx=0)
        self.register_buffer('pos_encoding', positional_encoding(2048, d_model))

    def forward(self, x):
        length = x.size(1)
        x = self.embedding(x)
        x *= torch.sqrt(torch.tensor(self.d_model, dtype=torch.float32))
        x = x + self.pos_encoding[None, :length, :]
        return x

In [61]:
pos = PositionalEmbedding(5000, 100)

In [62]:
input = torch.tensor(np.random.randint(1,5000, size=(3,26)))
response = pos(input)
response.shape

torch.Size([3, 26, 100])

### Add and normalize

<table>
<tr>
  <th colspan=2>Add and normalize</th>
<tr>
<tr>
  <td>
   <img src="https://www.tensorflow.org/images/tutorials/transformer/Add+Norm.png"/>
  </td>
</tr>
</table>


We will create a BaseAttention layer that inherits the Add+Norm and then each subclass of attention will implement the correct one

In [63]:
class BaseAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(BaseAttention, self).__init__()
        self.mha = nn.MultiheadAttention(embed_dim=d_model, num_heads=num_heads)
        self.layernorm = nn.LayerNorm(d_model)

    def forward(self, x):
        # In PyTorch, MultiheadAttention expects inputs in the shape (S, N, E)
        # S: source sequence length, N: batch size, E: embedding dimension
        attn_output, _ = self.mha(x, x, x)
        x = x + attn_output
        return self.layernorm(x)

### Self Attention layer

<table>
<tr>
  <th colspan=1>The global self attention layer</th>
<tr>
<tr>
  <td>
   <img src="https://www.tensorflow.org/images/tutorials/transformer/SelfAttention.png"/>
  </td>
</tr>
</table>

In [64]:
class GlobalSelfAttention(BaseAttention):
    def __init__(self, d_model, num_heads):
        super(GlobalSelfAttention, self).__init__(d_model, num_heads)

    def forward(self, x):
        # Transpose to match the input shape requirements of nn.MultiheadAttention
        x = x.transpose(0, 1)  # From (N, S, E) to (S, N, E)
        return super().forward(x).transpose(0, 1)  # Transpose back to (N, S, E)

Let's test it!

In [65]:
embedding_dim = 100
vocab_size = 5000
input = torch.tensor(np.random.randint(1,vocab_size, size=(3,26)))

# First we apply the PositionalEmbedding to embed into what the attention layer expects
pos = PositionalEmbedding(vocab_size, embedding_dim)

# Then we do the self attention, the n_heads is arbitrary
gsa = GlobalSelfAttention(num_heads=5, d_model=embedding_dim)


response = gsa(pos(input))
response.shape

torch.Size([3, 26, 100])

Notice the shape is the same, since MHA concats all 5 heads and the we add everything

### The cross attention layer

This layer connects the encoder and decoder. This layer is the most straight-forward use of attention in the model, it performs the same task as the attention block in the previous demo (and we will copy it).

<table>
<tr>
  <th colspan=1>The cross attention layer</th>
<tr>
<tr>
  <td>
   <img src="https://www.tensorflow.org/images/tutorials/transformer/CrossAttention.png"/>
  </td>
</tr>
</table>

In [66]:
class CrossAttention(BaseAttention):
    def __init__(self, d_model, num_heads):
        super(CrossAttention, self).__init__(d_model, num_heads)

    def forward(self, x, context):
        x = x.transpose(0, 1)  # Transpose to match the shape (S, N, E)
        context = context.transpose(0, 1)
        attn_output, attn_scores = self.mha(x, context, context, need_weights=True)
        self.last_attn_scores = attn_scores
        x = x + attn_output
        return self.layernorm(x).transpose(0, 1)


In [67]:
embedding_dim_es = 100
vocab_size_es = 5000

embedding_dim_en = 100
vocab_size_en = 6000

# We are supposing the model will translate Spanish to English, so context for CrossAttention will be the spanish input.

input_es = torch.tensor(np.random.randint(1,vocab_size_es, size=(3,26)))
input_en = torch.tensor(np.random.randint(1,vocab_size_es, size=(3,24)))


pos_es = PositionalEmbedding(vocab_size_es, embedding_dim_es)
pos_en = PositionalEmbedding(vocab_size_en, embedding_dim_en)


gsa = GlobalSelfAttention(num_heads=5, d_model=embedding_dim_es)
cross = CrossAttention(num_heads=5, d_model=embedding_dim_en)


context = gsa(pos_es(input_es)) # Forget about the feed forwards

response = cross(pos_en(input_en), context=context) # Forget about masked attention for now, assume it is the identity

response.shape

torch.Size([3, 24, 100])

Notice the shape is (batch_size, words in sentence in output, embedding_dim) , regardless the input sentence had more words or other embedding dim. We are doing a good move forward!

### The causal self attention layer (Masked Multi Headed Attention)

<table>
<tr>
  <th colspan=1>The causal self attention layer</th>
<tr>
<tr>
  <td>
   <img src="https://www.tensorflow.org/images/tutorials/transformer/CausalSelfAttention.png"/>
  </td>
</tr>
</table>



The only big difference in the masked multi headedd attention is that we cannot attend to words in the future, so we will use a mask such that the `Nth` word can only see the first `N-1` words and not all the sentence.

In [68]:
class CausalSelfAttention(BaseAttention):
    def __init__(self, d_model, num_heads):
        super(CausalSelfAttention, self).__init__(d_model, num_heads)

    def forward(self, x):
        x = x.transpose(0, 1)  # Transpose to match the shape (S, N, E)
        attn_mask = torch.triu(torch.ones(x.size(0), x.size(0)), diagonal=1).bool()
        attn_output, _ = self.mha(x, x, x, attn_mask=attn_mask)
        x = x + attn_output
        return self.layernorm(x).transpose(0, 1)

<table>
<tr>
  <th colspan=1>The causal self attention layer</th>
<tr>
<tr>
  <td>
   <img width=330 src="https://www.tensorflow.org/images/tutorials/transformer/CausalSelfAttention-new-full.png"/>
  </td>
</tr>
</table>

Notice in the diagram above how the query can onlly attend the values for the past

In [69]:
embedding_dim_en = 100
vocab_size_en = 6000

# We are supposing the model will translate Spanish to English, so context for CrossAttention will be the spanish input.

input_en = torch.tensor(np.random.randint(1,vocab_size_es, size=(3,24)))


pos_en = PositionalEmbedding(vocab_size_en, embedding_dim_en)

csa = CausalSelfAttention(num_heads =5, d_model=embedding_dim_en)

response = csa(pos_es(input_en))

response.shape

torch.Size([3, 24, 100])

### The feed forward network

The transformer also includes this point-wise feed-forward network in both the encoder and decoder, we will include them inside the encoder and decoder respectively:

<table>
<tr>
  <th colspan=1>The feed forward network</th>
<tr>
<tr>
  <td>
   <img src="https://www.tensorflow.org/images/tutorials/transformer/FeedForward.png"/>
  </td>
</tr>
</table>

### The encoder layer

The encoder contains a stack of `N` encoder layers. Where each `EncoderLayer` contains a `GlobalSelfAttention` and `FeedForward` layer:

<table>
<tr>
  <th colspan=1>The encoder layer</th>
<tr>
<tr>
  <td>
   <img src="https://www.tensorflow.org/images/tutorials/transformer/EncoderLayer.png"/>
  </td>
</tr>
</table>

In [70]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, dim_feedforward, dropout=0.1):
        super(EncoderLayer, self).__init__()
        self.self_attn = nn.MultiheadAttention(d_model, num_heads, dropout=dropout)
        self.feed_forward = nn.Sequential(
            nn.Linear(d_model, dim_feedforward),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(dim_feedforward, d_model),
        )
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, src, src_mask=None, src_key_padding_mask=None):
        attn_output, _ = self.self_attn(src, src, src, attn_mask=src_mask,
                                        key_padding_mask=src_key_padding_mask)
        src = self.norm1(src + self.dropout(attn_output))
        ff_output = self.feed_forward(src)
        src = self.norm2(src + self.dropout(ff_output))
        return src

In [71]:
embedding_dim = 100
vocab_size = 5000
input = torch.tensor(np.random.randint(1,vocab_size, size=(3,26)))
pos = PositionalEmbedding(vocab_size, embedding_dim)
sample_encoder_layer = EncoderLayer(d_model=embedding_dim, num_heads=5, dim_feedforward=1012)
response = sample_encoder_layer(pos(input))
response.shape

torch.Size([3, 26, 100])

### The encoder

Notice we need to be able to repeat the past EncoderLayer Nx times, so we need another Layer that is able to do exactly that

<table>
<tr>
  <th colspan=1>The encoder</th>
<tr>
<tr>
  <td>
   <img src="https://www.tensorflow.org/images/tutorials/transformer/Encoder.png"/>
  </td>
</tr>
</table>

In [72]:
import math

class Encoder(nn.Module):
    def __init__(self, num_layers, d_model, num_heads, dim_feedforward, dropout, vocab_size, max_seq_length=5000):
        super(Encoder, self).__init__()
        self.d_model = d_model
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoder = PositionalEncoding(d_model, dropout, max_seq_length)
        self.layers = nn.ModuleList([EncoderLayer(d_model, num_heads, dim_feedforward, dropout) for _ in range(num_layers)])
        self.norm = nn.LayerNorm(d_model)

    def forward(self, src, mask=None, src_key_padding_mask=None):
        src = self.embedding(src) * math.sqrt(self.d_model)
        src = self.pos_encoder(src)
        for layer in self.layers:
            src = layer(src, src_mask=mask, src_key_padding_mask=src_key_padding_mask)
        src = self.norm(src)
        return src

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)

In [73]:
embedding_dim = 100
vocab_size = 5000
input = torch.tensor(np.random.randint(1,vocab_size, size=(3,26)))
sample_encoder = Encoder(num_layers=4,
                         d_model=embedding_dim,
                         num_heads=5,
                         dim_feedforward=512,
                         dropout=0.1,
                         vocab_size=vocab_size)
response = sample_encoder(input)
response.shape

torch.Size([3, 26, 100])

We got our Encoder!! Yahoo!!

### The decoder layer

Same as before we need a Decoder layer that uses the Attention layers and then another layer to permit having Nx layers of decoding

<table>
<tr>
  <th colspan=1>The decoder layer</th>
<tr>
<tr>
  <td>
   <img src="https://www.tensorflow.org/images/tutorials/transformer/DecoderLayer.png"/>
  </td>
</tr>
</table>

In [74]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, dim_feedforward, dropout=0.1):
        super(DecoderLayer, self).__init__()
        self.self_attn = nn.MultiheadAttention(d_model, num_heads, dropout=dropout)
        self.multihead_attn = nn.MultiheadAttention(d_model, num_heads, dropout=dropout)
        self.feed_forward = nn.Sequential(
            nn.Linear(d_model, dim_feedforward),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(dim_feedforward, d_model),
        )
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, tgt, memory, tgt_mask=None, memory_mask=None,
                tgt_key_padding_mask=None, memory_key_padding_mask=None):
        tgt_reshaped = tgt.transpose(0, 1)
        memory_reshaped = memory.transpose(0, 1)

        # Self attention
        tgt2 = self.self_attn(tgt_reshaped, tgt_reshaped, tgt_reshaped, attn_mask=tgt_mask, key_padding_mask=tgt_key_padding_mask)[0]
        tgt = self.norm1(tgt_reshaped + tgt2)
        tgt = self.dropout(tgt)

        # Reshape back to (batch_size, seq_length, d_model)
        tgt = tgt.transpose(0, 1)

        # Cross attention with encoder's output
        tgt_reshaped = tgt.transpose(0, 1)  # Reshape again for cross-attention
        tgt2 = self.multihead_attn(tgt_reshaped, memory_reshaped, memory_reshaped, attn_mask=memory_mask, key_padding_mask=memory_key_padding_mask)[0]
        tgt = self.norm2(tgt_reshaped + tgt2)
        tgt = self.dropout(tgt)

        # Reshape back to (batch_size, seq_length, d_model)
        tgt = tgt.transpose(0, 1)

        # Feed forward
        tgt2 = self.feed_forward(tgt)
        tgt = self.norm3(tgt + tgt2)
        tgt = self.dropout(tgt)

        return tgt

In [75]:
embedding_dim_es = 100
vocab_size_es = 5000

embedding_dim_en = 100
vocab_size_en = 6000

# We are supposing the model will translate Spanish to English, so context for CrossAttention will be the spanish input.

input_es = torch.tensor(np.random.randint(1,vocab_size_es, size=(3,26)))
input_en = torch.tensor(np.random.randint(1,vocab_size_es, size=(3,24)))

pos_en = PositionalEmbedding(vocab_size_en, embedding_dim_en)


encoder =  Encoder(num_layers=4,
                         d_model=embedding_dim_es,
                         num_heads=5,
                         dim_feedforward=512,
                         dropout=0.1,
                         vocab_size=vocab_size)

context = encoder(input_es)

print(pos_en(input_en).shape, context.shape)

decoder_layer = DecoderLayer(d_model=embedding_dim_en, num_heads=5, dim_feedforward=218, dropout=0.2)

response = decoder_layer(pos_en(input_en), memory=context)

response.shape

torch.Size([3, 24, 100]) torch.Size([3, 26, 100])


torch.Size([3, 24, 100])

### The Decoder

Similar to the `Encoder`, the `Decoder` consists of a `PositionalEmbedding`, and a stack of `DecoderLayer`s:

<table>
<tr>
  <th colspan=1>The embedding and positional encoding layer</th>
<tr>
<tr>
  <td>
   <img src="https://www.tensorflow.org/images/tutorials/transformer/Decoder.png"/>
  </td>
</tr>
</table>

In [76]:
class Decoder(nn.Module):
    def __init__(self, num_layers, d_model, num_heads, dim_feedforward, dropout, vocab_size, max_seq_length=5000):
        super(Decoder, self).__init__()
        self.d_model = d_model
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoder = PositionalEncoding(d_model, dropout, max_seq_length)
        self.layers = nn.ModuleList([DecoderLayer(d_model, num_heads, dim_feedforward, dropout) for _ in range(num_layers)])
        self.norm = nn.LayerNorm(d_model)

    def forward(self, tgt, memory, tgt_mask=None, memory_mask=None,
                tgt_key_padding_mask=None, memory_key_padding_mask=None):
        tgt = self.embedding(tgt) * math.sqrt(self.d_model)
        tgt = self.pos_encoder(tgt)
        for layer in self.layers:
            tgt = layer(tgt, memory, tgt_mask=tgt_mask, memory_mask=memory_mask,
                        tgt_key_padding_mask=tgt_key_padding_mask, memory_key_padding_mask=memory_key_padding_mask)
        tgt = self.norm(tgt)
        return tgt

In [77]:
embedding_dim_es = 100
vocab_size_es = 5000

embedding_dim_en = 100
vocab_size_en = 6000

# We are supposing the model will translate Spanish to English, so context for CrossAttention will be the spanish input.

input_es = torch.tensor(np.random.randint(1,vocab_size_es, size=(3,26)))
input_en = torch.tensor(np.random.randint(1,vocab_size_es, size=(3,24)))

encoder =  Encoder(num_layers=4,
                         d_model=embedding_dim_es,
                         num_heads=5,
                         dim_feedforward=512,
                         dropout=0.1,
                         vocab_size=vocab_size)

context = encoder(input_es)

decoder = Decoder(num_layers=4, d_model=embedding_dim_en, num_heads=5, dim_feedforward=124, dropout=0.1, vocab_size=vocab_size_en)

response = decoder(input_en, memory=context)

response.shape

torch.Size([3, 24, 100])

## The Transformer Model

You now have `Encoder` and `Decoder`. To complete the `Transformer` model, you need to put them together and add a final linear (`Dense`) layer which converts the resulting vector at each location into output token probabilities.

The output of the decoder is the input to this final linear layer.

<table>
<tr>
  <th colspan=1>The transformer</th>
<tr>
<tr>
  <td>
   <img src="https://www.tensorflow.org/images/tutorials/transformer/transformer.png"/>
  </td>
</tr>
</table>

In [78]:
class Translator(nn.Module):
    def __init__(self, encoder, decoder, src_vocab_size, tgt_vocab_size, d_model):
        super(Translator, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.output_layer = nn.Linear(d_model, tgt_vocab_size)

    def forward(self, src, tgt, src_mask, tgt_mask, src_key_padding_mask, tgt_key_padding_mask, memory_key_padding_mask):
        memory = self.encoder(src, src_mask, src_key_padding_mask)
        output = self.decoder(tgt, memory, tgt_mask, memory_mask=None,
                              tgt_key_padding_mask=tgt_key_padding_mask, memory_key_padding_mask=memory_key_padding_mask)
        output = self.output_layer(output)
        return output

In [79]:
embedding_dim = 100
vocab_size_es = 5000
vocab_size_en = 6000

num_layers = 4
dim_feedforward = 512
num_heads = 5
dropout_rate = 0.1

# We are supposing the model will translate Spanish to English, so context for CrossAttention will be the spanish input.

input_es = torch.tensor(np.random.randint(1,vocab_size_es, size=(3,26)))
input_en = torch.tensor(np.random.randint(1,vocab_size_es, size=(3,24)))
src_mask = tgt_mask = torch.ones((3, 3)).to(torch.bool)

encoder =  Encoder(num_layers=num_layers,
                         d_model=embedding_dim,
                         num_heads=num_heads,
                         dim_feedforward=dim_feedforward,
                         dropout=dropout_rate,
                         vocab_size=vocab_size_es)

context = encoder(input_es)

decoder = Decoder(num_layers=num_layers, d_model=embedding_dim, num_heads=num_heads, dim_feedforward=dim_feedforward, dropout=dropout_rate, vocab_size=vocab_size_en)



translator = Translator(encoder, decoder, vocab_size_es, vocab_size_en, d_model=embedding_dim)

output = translator(input_es, input_en, src_mask, None, None, None, None)
output.shape

torch.Size([3, 24, 6000])

In [80]:
translator

Translator(
  (encoder): Encoder(
    (embedding): Embedding(5000, 100)
    (pos_encoder): PositionalEncoding(
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (layers): ModuleList(
      (0-3): 4 x EncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=100, out_features=100, bias=True)
        )
        (feed_forward): Sequential(
          (0): Linear(in_features=100, out_features=512, bias=True)
          (1): ReLU()
          (2): Dropout(p=0.1, inplace=False)
          (3): Linear(in_features=512, out_features=100, bias=True)
        )
        (norm1): LayerNorm((100,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((100,), eps=1e-05, elementwise_affine=True)
        (dropout): Dropout(p=0.1, inplace=False)
      )
    )
    (norm): LayerNorm((100,), eps=1e-05, elementwise_affine=True)
  )
  (decoder): Decoder(
    (embedding): Embedding(6000, 100)
    (pos_encoder): PositionalEncoding(
     