In [19]:
# Transformer code directly from library.
from transformers import AutoTokenizer
from bertviz.transformers_neuron_view import BertModel
from bertviz.neuron_view import show

model_ckpt = "bert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_ckpt)
model = BertModel.from_pretrained(model_ckpt)
text = "time flies like an arrow"
show(model, "bert", tokenizer, text, display_mode = "light", layer = 0, head = 8)

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [20]:
# Now we implement that from scratch. First we load the tokenizer. See how each word is mapped to
# an unique id in the tokenizer's vocabulary. Note that this specific tokenizer has 30522 elements.
from transformers import AutoTokenizer

model_ckpt = "bert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_ckpt)
text = "time flies like an arrow"

inputs = tokenizer(text, return_tensors = "pt", add_special_tokens = False)
inputs.input_ids

tensor([[ 2051, 10029,  2066,  2019,  8612]])

In [21]:
# Now our goal is to generate dense embedding for each word. Lets generate a lookup table so that we can look them up.
from torch import nn
from transformers import AutoConfig

config = AutoConfig.from_pretrained(model_ckpt)
token_emb = nn.Embedding(config.vocab_size, config.hidden_size)
token_emb

Embedding(30522, 768)

In [22]:
# With the help of lookup table, we can now generate the embeddings by feeding the input ids.
inputs_embeds = token_emb(inputs.input_ids)
inputs_embeds.size()
# [1, 5, 768] => [batch_size, seq_len, hidden_dim]

torch.Size([1, 5, 768])

In [23]:
# For now, lets skip the positional embedding and move on to calculating attention scores/vectors/weights.
# Usually the query, key, and value are generated by applying independent weight metrices to the embeddings.
# But for now, we kept them equal for simplicity.

# Note that this is a scaled dot product attention. torch.bmm() is used for batch matrix multiplication (aka dot product).
# Also note the transpose, [5 x 768] * [768 x 5] = [5, 5]. So the output is a [5 x 5] matrix. Finally we scale the values
# to get rid of large values.
import torch
from math import sqrt

query = key = value = inputs_embeds
dim_k = key.size(-1)
scores = torch.bmm(query, key.transpose(1,2)) / sqrt(dim_k)
scores.size()

torch.Size([1, 5, 5])

In [25]:
# Lets apply softmax to generate attention weights. This turns attention scores to a probability distribution that sums up to 1.
import torch.nn.functional as F
weights = F.softmax(scores, dim = -1)
weights.sum(dim = -1)

tensor([[[1.0000e+00, 1.7354e-13, 4.9758e-13, 1.6994e-12, 1.3259e-12],
         [1.3908e-13, 1.0000e+00, 8.7665e-13, 2.0918e-13, 2.8190e-13],
         [1.4901e-13, 3.2759e-13, 1.0000e+00, 2.1522e-13, 1.9273e-13],
         [6.3952e-12, 9.8226e-13, 2.7045e-12, 1.0000e+00, 8.0183e-13],
         [4.0846e-12, 1.0836e-12, 1.9826e-12, 6.5638e-13, 1.0000e+00]]],
       grad_fn=<SoftmaxBackward0>)


In [26]:
# Finally multiply the attention weights by the values to finish the process.
# weights = [5 x 5], value = [5 x 768]. The output is [5 x 768].
# Pay attention here: We had 5 words and the embeddings were retrieved from the token vocab [5 x 768].
# After multiplying with weights [5 x 5], they are converted into new embeddings [5 x 768].
# These embeddings store contextual relationship of words in that sentence. For example,
# "Time flies like an arrow" -> We know that flies has strong relationship with arrow.
# So thats all attention did, it learned the lanugage, context, grammer etc.
attn_outputs = torch.bmm(weights, value)
attn_outputs.shape

torch.Size([1, 5, 768])

In [27]:
# Lets wrap everything together in a function
def scaled_dot_product_attention(query, key, value):
    dim_k = query.size(-1)
    scores = torch.bmm(query, key.transpose(1, 2)) / sqrt(dim_k)
    weights = F.softmax(scores, dim=-1)
    return torch.bmm(weights, value)

In [28]:
# So far so good. But the coding we did so far missed two important points:
# 1) In practice, the self-attention layer applies three independent linear xformations to each embedding
# to generate query, key, and value vectors. These projections carries its own set of learnable params,
# which allows the self-attention layer to focus on different semantic aspects of the sequence. Lets code it up.

class AttentionHead(nn.Module):
    def __init__(self, embed_dim, head_dim):
        super().__init__()
        self.q = nn.Linear(embed_dim, head_dim)
        self.k = nn.Linear(embed_dim, head_dim)
        self.v = nn.Linear(embed_dim, head_dim)

    def forward(self, hidden_state):
        attn_outputs = scaled_dot_product_attention(
            self.q(hidden_state), self.k(hidden_state), self.v(hidden_state))
        return attn_outputs
    
# What did we do here?
# We inititalized three independent linear layers that apply matrix multiplication to the embedding vectors
# to produce tensors of shape [batch_size, seq_len, head_dim] where 'head_dim' is the number of dimenstions we are
# projecting into. Conventionally, head_dim is a multiple of embed_dim (e.g., for BERT: 768 / 12 = 64 (12 heads))
# So q, k, and v turns into [5, 64]. Hence the attention output also turns into [5, 64].

In [30]:
# 2) Multi-headed attention: Why? If we just run the same code 12 times in a for loop, they should generate the same thing right?
# Well the answer is softmax [weights = F.softmax(scores, dim = -1)]. Without going into detail, its sufficent to say that softmax
# is a probability distribution algo so the results are not deterministic.
# For example, one head focuses on subject-verb interaction while another focuses on nearby adjectives.
# We don't handcraft these relations into the model, they are fully learned from the data. This is very similar to filters
# in CNN Vision algos where one filter is responsible for detecting faces and another one finds wheels of cars.

class MultiHeadAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        embed_dim = config.hidden_size
        num_heads = config.num_attention_heads
        head_dim = embed_dim // num_heads
        self.heads = nn.ModuleList(
            [AttentionHead(embed_dim, head_dim) for _ in range(num_heads)]
        )
        self.output_linear = nn.Linear(embed_dim, embed_dim)

    def forward(self, hidden_state):
        x = torch.cat([h(hidden_state) for h in self.heads], dim=-1)
        x = self.output_linear(x)
        return x

    
# Note the line 'x = self.output_linear(x)'. The concat output from the attention heads is fed through a final
# linear layer to produce an output tensor of shape [batch_size, seq_len, hidden_dim] = [1, 5, 768], which is
# suitable for the downstream layers.

In [35]:
# Time to test our code. To keep things comparable, we use the same config params.
print(config.hidden_size) # 768
print(config.num_attention_heads) # 12
multihead_attn = MultiHeadAttention(config)
attn_output = multihead_attn(inputs_embeds)    
print(attn_output.size())

768
12
torch.Size([1, 5, 768])


In [39]:
# Just a one off thing. Lets use BertViz again to check the attention of 'flies' in two different sentences. 

from bertviz import head_view
from transformers import AutoModel

model = AutoModel.from_pretrained(model_ckpt, output_attentions=True)

sentence_a = "time flies like an arrow"
sentence_b = "fruit flies like a banana"

viz_inputs = tokenizer(sentence_a, sentence_b, return_tensors='pt')
attention = model(**viz_inputs).attentions
sentence_b_start = (viz_inputs.token_type_ids == 0).sum(dim=1)
tokens = tokenizer.convert_ids_to_tokens(viz_inputs.input_ids[0])

head_view(attention, tokens, sentence_b_start, heads=[8])

Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertModel: ['cls.seq_relationship.bias', 'cls.predictions.decoder.weight', 'cls.predictions.transform.dense.weight', 'cls.predictions.transform.dense.bias', 'cls.seq_relationship.weight', 'cls.predictions.bias', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.transform.LayerNorm.bias']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


<IPython.core.display.Javascript object>

In [42]:
# So we are done with the attention block. Only thing left is the feed-forward layer. The key here is:
# instead of processing the whole sequence, it processes each embedding independently. The rule of thumb,
# Hidden size of the first layer should be 4 times of the embedding size (config.intermediate_size = 3072 = 768 * 4).
# Also a GELU activation function is used. Lets code it up and give a test.

class FeedForward(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.linear_1 = nn.Linear(config.hidden_size, config.intermediate_size)
        self.linear_2 = nn.Linear(config.intermediate_size, config.hidden_size)
        self.gelu = nn.GELU()
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        
    def forward(self, x):
        x = self.linear_1(x)
        x = self.gelu(x)
        x = self.linear_2(x)
        x = self.dropout(x)
        return x

feed_forward = FeedForward(config)
ff_outputs = feed_forward(attn_outputs)
ff_outputs.size()

# Finally we are done with our little attention block. But we skipped a few bits, such as Layer Normalization and Skip Connections.

torch.Size([1, 5, 768])

In [44]:
# Skip connections: Simply passes a tensor to the next layer without processing and add it to the processed tensors.
# Layer normalization: We can do pre or post. Lets code up the 'pre layer normalization' and add it to our input embeddings.

class TransformerEncoderLayer(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.layer_norm_1 = nn.LayerNorm(config.hidden_size)
        self.layer_norm_2 = nn.LayerNorm(config.hidden_size)
        self.attention = MultiHeadAttention(config)
        self.feed_forward = FeedForward(config)

    def forward(self, x):
        # Apply layer normalization and then copy input into query, key, value
        hidden_state = self.layer_norm_1(x)
        # Apply attention with a skip connection
        x = x + self.attention(hidden_state)
        # Apply feed-forward layer with a skip connection
        x = x + self.feed_forward(self.layer_norm_2(x))
        return x
    
encoder_layer = TransformerEncoderLayer(config)
inputs_embeds.shape, encoder_layer(inputs_embeds).size()

(torch.Size([1, 5, 768]), torch.Size([1, 5, 768]))

In [55]:
# We are finally done with implementing our first encoder layer from scratch! But we need to add the Positional Embedding first.
# Lets create a custom embedding module that combines a token embedding layer that projects the input_ids to a dense hidden state
# together with the positional embedding that does the same with position_ids. The result is just the sum of both embeddings.

# config.max_position_embeddings => 512
# seq_length => 5
class Embeddings(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.token_embeddings = nn.Embedding(config.vocab_size, config.hidden_size)
        self.position_embeddings = nn.Embedding(config.max_position_embeddings, config.hidden_size)
        self.layer_norm = nn.LayerNorm(config.hidden_size, eps = 1e-12)
        self.dropout = nn.Dropout()

    def forward(self, input_ids):
        # Create position IDs for input sequence
        seq_length = input_ids.size(1) # 5
        position_ids = torch.arange(seq_length, dtype=torch.long).unsqueeze(0) # tensor([[0, 1, 2, 3, 4]])
        # Create token and position embeddings
        token_embeddings = self.token_embeddings(input_ids)
        position_embeddings = self.position_embeddings(position_ids)
        # Combine token and position embeddings
        embeddings = token_embeddings + position_embeddings
        embeddings = self.layer_norm(embeddings)
        embeddings = self.dropout(embeddings)
        return embeddings

In [56]:
# Finally time to put everything together
class TransformerEncoder(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.embeddings = Embeddings(config) # Combines embedding with positional infos
        self.layers = nn.ModuleList([TransformerEncoderLayer(config) 
                                     for _ in range(config.num_hidden_layers)]) # Adds full encoder layer

    def forward(self, x):
        x = self.embeddings(x)
        for layer in self.layers:
            x = layer(x)
        return x
    
encoder = TransformerEncoder(config)
encoder(inputs.input_ids).size()

# Now we are done with the full encoding block (Just as the input, output size should be [1, 5, 768])

torch.Size([1, 5, 768])

In [59]:
# So far we coded up the body of the xformer. Adding a classification head to this quite easy.
class TransformerForSequenceClassification(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.encoder = TransformerEncoder(config)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        self.classifier = nn.Linear(config.hidden_size, config.num_labels)
        
    def forward(self, x):
        x = self.encoder(x)[:, 0, :] # Traditionally, hidden state of [CLS] token is used for prediction. 
        x = self.dropout(x)
        x = self.classifier(x)
        return x
    
config.num_labels = 3 # How many classes do we want to classifiy in.
encoder_classifier = TransformerForSequenceClassification(config)
encoder_classifier(inputs.input_ids).size()

torch.Size([1, 3])

In [63]:
# The main difference between an encoder and decoder are the followings:
# 1) Masked multi-head self-attention layer
# 2) Encoder-decoder attention layer.

# Lets code up the masked attention layer. We use the trill function to create a traingular matrix.
# Then we use that mask to replace '0' with -inf.
seq_len = inputs.input_ids.size(-1) # 5
mask = torch.tril(torch.ones(seq_len, seq_len)).unsqueeze(0)

scores.masked_fill(mask == 0, -float("inf"))

# Why -inf, because softmax calculates the normalized exponential and e^-inf = 0


tensor([[[28.7090,    -inf,    -inf,    -inf,    -inf],
         [-0.6734, 28.9304,    -inf,    -inf,    -inf],
         [ 0.3800,  1.1677, 29.9147,    -inf,    -inf],
         [ 1.6082, -0.2652,  0.7476, 27.3837,    -inf],
         [ 1.3601,  0.0331,  0.6373, -0.4682, 27.5839]]],
       grad_fn=<MaskedFillBackward0>)

In [64]:
# We can easily include this masking behavior with a small change to our scaled dot-product attention.
def scaled_dot_product_attention(query, key, value, mask=None):
    dim_k = query.size(-1)
    scores = torch.bmm(query, key.transpose(1, 2)) / sqrt(dim_k)
    if mask is not None:
        scores = scores.masked_fill(mask == 0, float("-inf"))
    weights = F.softmax(scores, dim=-1)
    return weights.bmm(value)

In [65]:
# The encoder-decoder attention layer is something I want to do in the future.
# This is a good reference point to start (https://github.com/karpathy/minGPT)