# Build a transformer architecture

## Following steps are performed in this notebook

1. Tokenization using existing pretrained tokenizer from BERT Base uncased model
2. Get dense embeddings from tokenized input
3. Show attention computation by using scaled dot product computation

##### Sample text

In [1]:
text = "time flies like an arrow"


#### Use BERT's pretrained AutoTokenizer  

In [2]:
from transformers import AutoTokenizer
model_ckpt = "bert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_ckpt)


In [4]:
inputs = tokenizer(text, return_tensors="pt", add_special_tokens=False)
inputs.input_ids

tensor([[ 2051, 10029,  2066,  2019,  8612]])

### Get dense embeddings

In [5]:
from torch import nn
from transformers import AutoConfig

##### Load the choosen model's config to be used for generating embeddings

In [6]:
config = AutoConfig.from_pretrained(model_ckpt)

In [7]:
config 

BertConfig {
  "_name_or_path": "bert-base-uncased",
  "architectures": [
    "BertForMaskedLM"
  ],
  "attention_probs_dropout_prob": 0.1,
  "classifier_dropout": null,
  "gradient_checkpointing": false,
  "hidden_act": "gelu",
  "hidden_dropout_prob": 0.1,
  "hidden_size": 768,
  "initializer_range": 0.02,
  "intermediate_size": 3072,
  "layer_norm_eps": 1e-12,
  "max_position_embeddings": 512,
  "model_type": "bert",
  "num_attention_heads": 12,
  "num_hidden_layers": 12,
  "pad_token_id": 0,
  "position_embedding_type": "absolute",
  "transformers_version": "4.28.1",
  "type_vocab_size": 2,
  "use_cache": true,
  "vocab_size": 30522
}

In [9]:
token_emb = nn.Embedding(num_embeddings=config.vocab_size, embedding_dim=config.hidden_size)

In [10]:
inputs_embeds = token_emb(inputs.input_ids)
inputs_embeds.size()

torch.Size([1, 5, 768])

#### Now we will take this embedding vector for the sample text and show attention mechanism

In [11]:
import torch
from math import sqrt

query = key = value = inputs_embeds
dim_k = key.size(-1)
# bmm - batch matrix multiplication
scores = torch.bmm(query, key.transpose(1,2)) / sqrt(dim_k)
scores.size()

torch.Size([1, 5, 5])

##### Note the scaled dot product called as scores

In [12]:
key.shape, key.transpose(1,2).shape

(torch.Size([1, 5, 768]), torch.Size([1, 768, 5]))

In [13]:
import torch.nn.functional as F

#### Apply softmax on the scores

In [14]:
weights = F.softmax(scores, dim=-1)
weights.sum(dim=-1)

tensor([[1., 1., 1., 1., 1.]], grad_fn=<SumBackward1>)

In [15]:
weights.shape, value.shape

(torch.Size([1, 5, 5]), torch.Size([1, 5, 768]))

In [16]:
weights

tensor([[[1.0000e+00, 2.6015e-11, 7.3310e-12, 1.1523e-11, 8.3914e-12],
         [3.1404e-12, 1.0000e+00, 2.1801e-12, 1.1948e-12, 2.3536e-13],
         [2.5877e-12, 6.3748e-12, 1.0000e+00, 1.1552e-12, 4.8856e-12],
         [2.9615e-12, 2.5437e-12, 8.4108e-13, 1.0000e+00, 1.2079e-12],
         [4.4658e-12, 1.0376e-12, 7.3659e-12, 2.5011e-12, 1.0000e+00]]],
       grad_fn=<SoftmaxBackward0>)

#### Get attention enforced embeddings by multiplying the probabilities on the individual embeddings(value) 

In [17]:
attn_outputs = torch.bmm(weights, value)
attn_outputs.shape

torch.Size([1, 5, 768])

#### Consolodating above steps in one single function 

In [18]:
def scaled_dot_product_attention(query, key, value):
    # query = key = value = inputs_embeds
    dim_k = key.size(-1)
    scores = torch.bmm(query, key.transpose(1,2)) / sqrt(dim_k)
    weights = F.softmax(scores, dim=-1)
    attn_outputs = torch.bmm(weights, value)
    return attn_outputs

##### We computed above is called self attention 

### Multi-head atttention 

In [20]:
class AttentionHead(nn.Module):
    def __init__(self, embed_dim, head_dim):
        super().__init__()
        self.q = nn.Linear(embed_dim, head_dim)
        self.k = nn.Linear(embed_dim, head_dim)
        self.v = nn.Linear(embed_dim, head_dim)

    def forward(self, hidden_state):
        attn_outputs = scaled_dot_product_attention(
            self.q(hidden_state), self.k(hidden_state), self.v(hidden_state))
        return attn_outputs

In [21]:
class MultiHeadAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        embed_dim = config.hidden_size
        num_heads = config.num_attention_heads
        head_dim = embed_dim // num_heads
        self.heads = nn.ModuleList(
            [AttentionHead(embed_dim, head_dim) for _ in range(num_heads)]
        )
        self.output_linear = nn.Linear(embed_dim, embed_dim)

    def forward(self, hidden_state):
        print("Number of heads ", len(self.heads))
        for h in self.heads:
            print("Dimension of hidden state ",  hidden_state.shape)
            print("Dimension per head ", h(hidden_state).shape)
            break
        # Concatenating all the attention heads equivalent to self.heads
        x = torch.cat([h(hidden_state) for h in self.heads], dim=-1)
        x = self.output_linear(x)
        return x

In [22]:
multihead_attn = MultiHeadAttention(config)
attn_output = multihead_attn(inputs_embeds)
attn_output.size()

Number of heads  12
Dimension of hidden state  torch.Size([1, 5, 768])
Dimension per head  torch.Size([1, 5, 64])


torch.Size([1, 5, 768])

### Visualise the attention for a word like flies having 2 different context in each sentence

In [34]:
# !pip install bertviz

In [23]:
from bertviz import head_view
from transformers import AutoModel

model = AutoModel.from_pretrained(model_ckpt, output_attentions=True)

sentence_a = "time flies like an arrow"
sentence_b = "fruit flies like a banana"

viz_inputs = tokenizer(sentence_a, sentence_b, return_tensors='pt')
attention = model(**viz_inputs).attentions
sentence_b_start = (viz_inputs.token_type_ids == 0).sum(dim=1)
tokens = tokenizer.convert_ids_to_tokens(viz_inputs.input_ids[0])

head_view(attention, tokens, sentence_b_start, heads=[8])

  with safe_open(checkpoint_file, framework="pt") as f:
Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertModel: ['cls.seq_relationship.weight', 'cls.predictions.transform.LayerNorm.weight', 'cls.seq_relationship.bias', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.transform.dense.bias']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


<IPython.core.display.Javascript object>

### Add a feed forward layer after the multi-head attention

In [24]:
class FeedForward(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.linear_1 = nn.Linear(config.hidden_size, config.intermediate_size)
        self.linear_2 = nn.Linear(config.intermediate_size, config.hidden_size)
        self.gelu = nn.GELU()
        self.dropout = nn.Dropout(config.hidden_dropout_prob)

    def forward(self, x):
        x = self.linear_1(x)
        x = self.gelu(x)
        x = self.linear_2(x)
        x = self.dropout(x)
        return x

In [25]:
feed_forward = FeedForward(config)
ff_outputs = feed_forward(attn_outputs)
ff_outputs.size()

torch.Size([1, 5, 768])

### Transformer layer using batch-normalisation, multi-head attention and feedforward

In [26]:
class TransformerEncoderLayer(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.layer_norm_1 = nn.LayerNorm(config.hidden_size)
        self.layer_norm_2 = nn.LayerNorm(config.hidden_size)
        self.attention = MultiHeadAttention(config)
        self.feed_forward = FeedForward(config)

    def forward(self, x):
        # Apply layer normalization and then copy input into query, key, value
        hidden_state = self.layer_norm_1(x)
        # NOTE: Apply attention with a skip connection
        x = x + self.attention(hidden_state)
        # NOTE: Apply feed-forward layer with a skip connection
        x = x + self.feed_forward(self.layer_norm_2(x))

        # Embedding(e) -> Batch_Norm(bn) -> Attention + bn (an) -> Batch_Norm(bn2) -> FeedForward(ff) + an 
        return x

In [27]:
encoder_layer = TransformerEncoderLayer(config)
inputs_embeds.shape, encoder_layer(inputs_embeds).size()

Number of heads  12
Dimension of hidden state  torch.Size([1, 5, 768])
Dimension per head  torch.Size([1, 5, 64])


(torch.Size([1, 5, 768]), torch.Size([1, 5, 768]))

#### Until now, there's no information about the relative positions of each word

### Positional embeddings

In [31]:
seq_length = inputs.input_ids.size(1)
print(seq_length)
position_ids = torch.arange(seq_length, dtype=torch.long).unsqueeze(0)
position_ids

5


tensor([[0, 1, 2, 3, 4]])

In [32]:
class Embeddings(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.token_embeddings = nn.Embedding(config.vocab_size,
                                             config.hidden_size)
        self.position_embeddings = nn.Embedding(config.max_position_embeddings,
                                                config.hidden_size)
        self.layer_norm = nn.LayerNorm(config.hidden_size, eps=1e-12)
        self.dropout = nn.Dropout()

    def forward(self, input_ids):
        # Create position IDs for input sequence
        seq_length = input_ids.size(1)
        position_ids = torch.arange(seq_length, dtype=torch.long).unsqueeze(0)
        # Create token and position embeddings
        token_embeddings = self.token_embeddings(input_ids)
        position_embeddings = self.position_embeddings(position_ids)
        # Combine token and position embeddings
        embeddings = token_embeddings + position_embeddings
        embeddings = self.layer_norm(embeddings)
        embeddings = self.dropout(embeddings)
        return embeddings

In [33]:
embedding_layer = Embeddings(config)
embedding_layer(inputs.input_ids).size()

torch.Size([1, 5, 768])

### Transformer encoder using positional embeddings
We will stack multiple encoder layers equal to the config.num_hidden_layers

In [34]:
class TransformerEncoder(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.embeddings = Embeddings(config)
        print("Number of encoder layers ",config.num_hidden_layers) 
        self.layers = nn.ModuleList([TransformerEncoderLayer(config)
                                     for _ in range(config.num_hidden_layers)])

    def forward(self, x):
        x = self.embeddings(x)
        for layer in self.layers:
            x = layer(x)
        return x


In [35]:
encoder = TransformerEncoder(config)
encoder(inputs.input_ids).size()

Number of encoder layers  12
Number of heads  12
Dimension of hidden state  torch.Size([1, 5, 768])
Dimension per head  torch.Size([1, 5, 64])
Number of heads  12
Dimension of hidden state  torch.Size([1, 5, 768])
Dimension per head  torch.Size([1, 5, 64])
Number of heads  12
Dimension of hidden state  torch.Size([1, 5, 768])
Dimension per head  torch.Size([1, 5, 64])
Number of heads  12
Dimension of hidden state  torch.Size([1, 5, 768])
Dimension per head  torch.Size([1, 5, 64])
Number of heads  12
Dimension of hidden state  torch.Size([1, 5, 768])
Dimension per head  torch.Size([1, 5, 64])
Number of heads  12
Dimension of hidden state  torch.Size([1, 5, 768])
Dimension per head  torch.Size([1, 5, 64])
Number of heads  12
Dimension of hidden state  torch.Size([1, 5, 768])
Dimension per head  torch.Size([1, 5, 64])
Number of heads  12
Dimension of hidden state  torch.Size([1, 5, 768])
Dimension per head  torch.Size([1, 5, 64])
Number of heads  12
Dimension of hidden state  torch.Size([

torch.Size([1, 5, 768])

### We have built a transformer architecture with stacked encoder layers, 

Note that this is just the body part which not trained for a specific task like classification

### Classification Head

In [36]:
class TransformerForSequenceClassification(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.encoder = TransformerEncoder(config)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        self.classifier = nn.Linear(config.hidden_size, config.num_labels)

    def forward(self, x):
        x = self.encoder(x)[:, 0, :] # select hidden state of [CLS] token
        x = self.dropout(x)
        x = self.classifier(x)
        return x

In [37]:
config.num_labels = 3
encoder_classifier = TransformerForSequenceClassification(config)
encoder_classifier(inputs.input_ids).size()

Number of encoder layers  12
Number of heads  12
Dimension of hidden state  torch.Size([1, 5, 768])
Dimension per head  torch.Size([1, 5, 64])
Number of heads  12
Dimension of hidden state  torch.Size([1, 5, 768])
Dimension per head  torch.Size([1, 5, 64])
Number of heads  12
Dimension of hidden state  torch.Size([1, 5, 768])
Dimension per head  torch.Size([1, 5, 64])
Number of heads  12
Dimension of hidden state  torch.Size([1, 5, 768])
Dimension per head  torch.Size([1, 5, 64])
Number of heads  12
Dimension of hidden state  torch.Size([1, 5, 768])
Dimension per head  torch.Size([1, 5, 64])
Number of heads  12
Dimension of hidden state  torch.Size([1, 5, 768])
Dimension per head  torch.Size([1, 5, 64])
Number of heads  12
Dimension of hidden state  torch.Size([1, 5, 768])
Dimension per head  torch.Size([1, 5, 64])
Number of heads  12
Dimension of hidden state  torch.Size([1, 5, 768])
Dimension per head  torch.Size([1, 5, 64])
Number of heads  12
Dimension of hidden state  torch.Size([

torch.Size([1, 3])

#### When we use decoder also in other architectures, it is important to mask the future words in the sentence to avoid leaks

In [49]:
seq_len = inputs.input_ids.size(-1)
mask = torch.tril(torch.ones(seq_len, seq_len)).unsqueeze(0)
mask[0]

tensor([[1., 0., 0., 0., 0.],
        [1., 1., 0., 0., 0.],
        [1., 1., 1., 0., 0.],
        [1., 1., 1., 1., 0.],
        [1., 1., 1., 1., 1.]])

In [50]:
scores.masked_fill(mask == 0, -float("inf"))

tensor([[[26.5800,    -inf,    -inf,    -inf,    -inf],
         [-0.2824, 29.6897,    -inf,    -inf,    -inf],
         [ 0.5962,  0.7480, 26.2963,    -inf,    -inf],
         [ 0.8027,  1.2319, -0.3900, 30.2816,    -inf],
         [-0.2163,  1.0643, -2.7430, -0.8506, 28.5274]]],
       grad_fn=<MaskedFillBackward0>)

#### One can use the previous function with masking 

In [51]:
def scaled_dot_product_attention(query, key, value, mask=None):
    dim_k = query.size(-1)
    scores = torch.bmm(query, key.transpose(1, 2)) / sqrt(dim_k)
    if mask is not None:
        scores = scores.masked_fill(mask == 0, float("-inf"))
    weights = F.softmax(scores, dim=-1)
    return weights.bmm(value)

# END