## Transformer Architecture

Implementation of Transformer from scratch
#### Self Attention

In [None]:
from transformers import AutoTokenizer

env: PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True


In [5]:
model_ckpt = "bert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_ckpt)
text = "time flies like an arrow"

tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

In [7]:
inputs = tokenizer(text, return_tensors = "pt", add_special_tokens = False)
inputs.input_ids

tensor([[ 2051, 10029,  2066,  2019,  8612]])

In [8]:
## Create Dense Embeddings

from torch import nn
from transformers import AutoConfig

config = AutoConfig.from_pretrained(model_ckpt)
token_emb = nn.Embedding(config.vocab_size, config.hidden_size)
token_emb

Embedding(30522, 768)

In [9]:
inputs_embeds = token_emb(inputs.input_ids)
inputs_embeds.size()

torch.Size([1, 5, 768])

In [11]:
## Create query, key and value vectors and calculate attentions using dot product as a similarity function

import torch 
from math import sqrt

query = key = value = inputs_embeds
dim_k = key.size(-1)
scores = torch.bmm(query, key.transpose(1,2)) / sqrt(dim_k)
scores.size()

torch.Size([1, 5, 5])

In [12]:
## Apply Softmax 

import torch.nn.functional as F

weights = F.softmax(scores, dim = -1)
weights.sum(dim = -1)

tensor([[1., 1., 1., 1., 1.]], grad_fn=<SumBackward1>)

In [13]:
## Multiply attention weights by values

attn_outputs = torch.bmm(weights, value)
attn_outputs.shape

torch.Size([1, 5, 768])

We've implemented a simple form of self attention. The whole process is just two matrix multiplications and a softmax.
Let's create a function for the above self attention so we can use it again.

In [14]:
def scaled_dot_product_attention(query, key, value):
    dim_k = query.size(-1)
    scores = torch.bmm(query, key.transpose(1,2)) / sqrt(dim_k)
    weights = F.softmax(scores, dim = -1)
    return torch.bmm(weights, value)

#### Multi-Headed Attention

In practice, the self-attention layer applies three independent linear transformations to each embedding to generate the query, key, and value vectors. These transformations project the embeddings and each projection carries its own set of learnable parameters, which allows the self-
attention layer to focus on different semantic aspects of the sequence.

Multi-head attention allows the model to focus on several aspects at once (like one head focus on similarity aspect.)

In [15]:
## Lets first build a single attention head

class AttentionHead(nn.Module):
    def __init__(self, embed_dim, head_dim):
        super().__init__()
        self.q = nn.Linear(embed_dim, head_dim)
        self.k = nn.Linear(embed_dim, head_dim)
        self.v = nn.Linear(embed_dim, head_dim)

    def forward(self, hidden_state):
        attn_outputs = scaled_dot_product_attention(
            self.q(hidden_state), self.k(hidden_state), self.v(hidden_state)
        )    
        return attn_outputs


Here We've initialized three independent linear layer that apply matrix multiplication to the embedding vectors to produce tensors of shape **[batch_size, seq_len, head_dim]** where `head_dim` is the number of dimensions we are projecting into.
`head_dim` is a multiple of `embed_dim`. For example, `BERT` has 12 attention heads , so the dimension of each head is 768/12 = 64. 

In [16]:
## Lets implement the full multi-head attention layer

class MultiHeadAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        embed_dim = config.hidden_size
        num_heads = config.num_attention_heads
        head_dim = embed_dim // num_heads
        self.heads = nn.ModuleList(
            [AttentionHead(embed_dim, head_dim) for _ in range(num_heads)]
        )
        self.output_linear = nn.Linear(embed_dim, embed_dim)

    def forward(self, hidden_state):
        x = torch.cat([h(hidden_state) for h in self.heads], dim = -1)
        x = self.output_linear(x)
        return x    

In [17]:
multihead_attn = MultiHeadAttention(config)
attn_output = multihead_attn(inputs_embeds)
attn_output.size()

torch.Size([1, 5, 768])

#### Feed Forward Layer

It is often referred to as **position-wise feed forward layer** as it processes each embedding independently instead of processing the whole embeddings sequence as a single vector. It is also referred to as a one-dimensional convolution with a kernel size of one.

```
The hidden size of the first layer to be four times the size of the embeddings, with a GELU activation function most commonly used.
```

In [None]:
class FeedForward(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.linear_1 = nn.Linear(config.hidden_size, config.intermediate_size)
        self.linear_2 = nn.Linear(config.intermediate_size, config.hidden_size)
        self.gelu = nn.GELU()
        self.dropout = nn.Dropout(config.hidden_dropout_prob)

    def forward(self, x):
        x = self.linear_1(x)
        x = self.gelu(x)
        x = self.linear_2(x)
        x = self.dropout(x)
        return x

In [19]:
## Pass attention outputs through feed forward layer

feed_forward = FeedForward(config)
ff_outputs = feed_forward(attn_outputs)
ff_outputs.size()

torch.Size([1, 5, 768])

We created a fully fledged transformer encoder layer!

#### Adding Layer Normalization

The Transformer architecture makes use of layer normalization and skip connections.The former normalizes each input in the batch to have `zero mean` and `unity variance`. Skip connections pass a tensor to the next layer of the model without processing and add it to the processed tensor. When it comes to plac ing the layer normalization in the encoder or decoder layers of a transformer, there are two main choices adopted in the literature:

1. Post layer Normalization
2. Pre layer Normalization (**More Common and Stable** )

In [21]:
class TransformerEncoderLayer(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.layer_norm_1 = nn.LayerNorm(config.hidden_size)
        self.layer_norm_2 = nn.LayerNorm(config.hidden_size)
        self.attention = MultiHeadAttention(config)
        self.feed_forward = FeedForward(config)

    def forward(self, x):
        ## Apply layer normalization and then copy input into query, key, value
        hidden_state = self.layer_norm_1(x)
        ## Apply attention with a skip connection
        x = x + self.attention(hidden_state)
        ## Apply feed-forward layer with a skip connection
        x = x + self.feed_forward(self.layer_norm_2(x))
        return x

In [23]:
encoder_layer = TransformerEncoderLayer(config)
inputs_embeds.shape, encoder_layer(inputs_embeds).size()

(torch.Size([1, 5, 768]), torch.Size([1, 5, 768]))

We've now implemented our first transformer encoder layer from scratch!
However there is a caveat with the way we set up endoer layers: they are totally invariant to the positions of the tokens.

#### Positional Encodings

Idea: Augment the token embeddings with a position-dependent pattern values arranged in a vector. If the pattern is characteristic for each position, the attention heads and feed-forward layers in each stack can learn to incorporate positional information into their transformations.

Let’s create a custom Embeddings module that combines a token embedding layer that projects the input_ids to a dense hidden state together with the positional embedding that does the same for position_ids. The resulting embedding is simply the sum of both embeddings:

In [27]:
class Embeddings(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.token_embeddings = nn.Embedding(config.vocab_size,
        config.hidden_size)
        self.position_embeddings = nn.Embedding(config.max_position_embeddings,
        config.hidden_size)
        self.layer_norm = nn.LayerNorm(config.hidden_size, eps=1e-12)
        self.dropout = nn.Dropout()
    def forward(self, input_ids):
        ## Create position IDs for input sequence
        seq_length = input_ids.size(1)
        position_ids = torch.arange(seq_length, dtype=torch.long).unsqueeze(0)
        ## Create token and position embeddings 
        token_embeddings = self.token_embeddings(input_ids)
        position_embeddings = self.position_embeddings(position_ids)
        ## Combine token and position embeddings
        embeddings = token_embeddings + position_embeddings
        embeddings = self.layer_norm(embeddings)
        embeddings = self.dropout(embeddings)
        return embeddings

In [28]:
embedding_layer = Embeddings(config)
embedding_layer(inputs.input_ids).size()

torch.Size([1, 5, 768])

Let’s put all of this together now by building the full transformer encoder combiningthe embeddings with the encoder layers:

In [32]:
class TransformerEncoder(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.embeddings = Embeddings(config)
        self.layers = nn.ModuleList([TransformerEncoderLayer(config)
        for _ in range(config.num_hidden_layers)])
    def forward(self, x):
        x = self.embeddings(x)
        for layer in self.layers:
            x = layer(x)
        return x

In [33]:
encoder = TransformerEncoder(config)
encoder(inputs.input_ids).size()

torch.Size([1, 5, 768])

### Adding a Classification Head

In [37]:
class TransformerForSequenceClassification(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.encoder = TransformerEncoder(config)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        self.classifier = nn.Linear(config.hidden_size, config.num_labels)
    def forward(self, x):
        x = self.encoder(x)[:, 0, :] # select hidden state of [CLS] token
        x = self.dropout(x)
        x = self.classifier(x)
        return x

In [38]:
config.num_labels = 3
encoder_classifier = TransformerForSequenceClassification(config)
encoder_classifier(inputs.input_ids).size()

torch.Size([1, 3])

For each example in the batch we get the unnormalized logits for each class in the output.

#### The Decoder

Let’s take a look at the modifications we need to make to include masking in our self-attention layer. The trick with masked self-attention is to introduce a mask matrix with ones on the lower diagonal and zeros above:

In [39]:
seq_len = inputs.input_ids.size(-1)
mask = torch.tril(torch.ones(seq_len, seq_len)).unsqueeze(0)
mask[0]

tensor([[1., 0., 0., 0., 0.],
        [1., 1., 0., 0., 0.],
        [1., 1., 1., 0., 0.],
        [1., 1., 1., 1., 0.],
        [1., 1., 1., 1., 1.]])

Here we’ve used PyTorch’s tril() function to create the lower triangular matrix.Once we have this mask matrix, we can prevent each attention head from peeking at future tokens by using Tensor.masked_fill() to replace all the zeros with negative infinity:

In [41]:
scores.masked_fill(mask == 0 , -float("inf"))

tensor([[[ 2.5460e+01,        -inf,        -inf,        -inf,        -inf],
         [-6.3789e-01,  2.5376e+01,        -inf,        -inf,        -inf],
         [ 6.5489e-01, -1.0682e-02,  2.7282e+01,        -inf,        -inf],
         [ 6.5625e-01, -4.8393e-01,  1.0954e+00,  2.8371e+01,        -inf],
         [ 1.7219e-01, -1.7702e-01,  1.4043e+00,  2.6288e-01,  2.7081e+01]]],
       grad_fn=<MaskedFillBackward0>)

By setting the upper values to negative infinity, we guarantee that the attention weights are all zero once we take the softmax over the scores. We can easily include this masking behavior with a small change to our scaled dot-product attention function that we
implemented earlier:

In [43]:
def scaled_dot_product_attention(query, key, value, mask = None):
    dim_k = query.size(-1)
    scores = torch.bmm(query, key.transpose(1,2)) / sqrt(dim_k)
    if mask is not None:
        scores = scores.masked_fill(mask == 0, float("-inf"))
    weights = F.softmax(scores, dim = -1)
    return weights.bmm(value)    