In [1]:
!pip install bertviz

Collecting bertviz
  Downloading bertviz-1.4.0-py3-none-any.whl (157 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m157.6/157.6 kB[0m [31m947.4 kB/s[0m eta [36m0:00:00[0m
Installing collected packages: bertviz
Successfully installed bertviz-1.4.0
[0m

In [2]:
from transformers import AutoTokenizer
from bertviz.transformers_neuron_view import BertModel
from bertviz.neuron_view import show

model_ckpt = "bert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_ckpt)
model = BertModel.from_pretrained(model_ckpt)
text = "time flies like an arrow"
show(model, "bert", tokenizer, text, display_mode="light", layer=0, head=8)

Downloading:   0%|          | 0.00/28.0 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/570 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/226k [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/455k [00:00<?, ?B/s]

100%|██████████| 433/433 [00:00<00:00, 130160.80B/s]
100%|██████████| 440473133/440473133 [00:14<00:00, 31308369.80B/s]


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [3]:
# Use PyTorch to implement the Transformer architecture.

# Step 1: Tokenize the text
inputs = tokenizer(text, return_tensors="pt", add_special_tokens=False)
inputs.input_ids

tensor([[ 2051, 10029,  2066,  2019,  8612]])

In [4]:
# Step 2: Create dense embeddings (i.e. every entry in the embeddings contain a nonzero value)

from torch import nn
from transformers import AutoConfig

config = AutoConfig.from_pretrained(model_ckpt)
token_emb = nn.Embedding(config.vocab_size, config.hidden_size)
token_emb

#30522 embedding vectors stored in nn.Embedding, each with a size of 768

Embedding(30522, 768)

In [5]:
# Step 3: Given lookup table, can generate the embeddings by feeding in the token IDs
inputs_embeds = token_emb(inputs.input_ids)
inputs_embeds.size()

# tensor with shape [batch_size, seq_len, hidden_dim]

torch.Size([1, 5, 768])

In [6]:
# Step 4: Create the query, key, and value vectors and calculate attention scores using the dot product as the similarity function
import torch
from math import sqrt

# Query, key and value vectors are generated by applying independent weight matrices to the embeddings, but they are set equal for now for simplicity
query = key = value = inputs_embeds
dim_k = key.size(-1)
# torch.bmm performs batched matrix multiplication. Scaled by size of the embeddings vectors to prevent saturation when we apply softmax
scores = torch.bmm(query, key.transpose(1,2)) / sqrt(dim_k)
scores.size()

torch.Size([1, 5, 5])

In [7]:
# Step 5: Apply softmax
import torch.nn.functional as F

weights = F.softmax(scores, dim=-1)
weights.sum(dim=-1)

tensor([[1., 1., 1., 1., 1.]], grad_fn=<SumBackward1>)

In [8]:
# Step 6: Multiply attention weights by the values
attn_outputs = torch.bmm(weights, value)
attn_outputs.shape

torch.Size([1, 5, 768])

In [9]:
# Combine the above into a single function
def scaled_dot_product_attention(query, key, value):
    dim_k = query.size(-1)
    scores = torch.bmm(query, key.transpose(1,2)) / sqrt(dim_k)
    weights = F.softmax(scores, dim = -1)
    return torch.bmm(weights, value)

### Multi-headed Attention
to apply three independent transformations to each embedding to generate the query, key, and value vectors, allowing the self-attention layer to focus on different semantic aspects of the sequence.

In [10]:
# First code up a single attention head
class AttentionHead(nn.Module):
    def __init__(self, embed_dim, head_dim):
        super().__init__()
        self.q = nn.Linear(embed_dim, head_dim)
        self.k = nn.Linear(embed_dim, head_dim)
        self.v = nn.Linear(embed_dim, head_dim)
        
    def forward(self, hidden_state):
        attn_outputs = scaled_dot_product_attention(self.q(hidden_state), self.k(hidden_state), self.v(hidden_state))
        return attn_outputs

In [11]:
# Concatenate the output from the attention heads to implement the full multi-head attention layer
class MultiHeadAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        embed_dim = config.hidden_size
        num_heads = config.num_attention_heads
        head_dim = embed_dim // num_heads
        self.heads = nn.ModuleList([AttentionHead(embed_dim, head_dim) for _ in range(num_heads)])
        self.output_linear = nn.Linear(embed_dim, embed_dim)
    
    def forward(self, hidden_state):
        x = torch.cat([h(hidden_state) for h in self.heads], dim = -1)
        x = self.output_linear(x)
        return x

In [12]:
multihead_attn = MultiHeadAttention(config)
attn_output = multihead_attn(inputs_embeds)
attn_output.size()

torch.Size([1, 5, 768])

In [13]:
from bertviz import head_view
from transformers import AutoModel

model = AutoModel.from_pretrained(model_ckpt, output_attentions=True)

sentence_a = "time flies like an arrow"
sentence_b = "fruit flies like a banana"

viz_inputs = tokenizer(sentence_a, sentence_b, return_tensors='pt')
attention = model(**viz_inputs).attentions
sentence_b_start = (viz_inputs.token_type_ids == 0).sum(dim=1)
tokens = tokenizer.convert_ids_to_tokens(viz_inputs.input_ids[0])

head_view(attention, tokens, sentence_b_start, heads=[8])

Downloading:   0%|          | 0.00/420M [00:00<?, ?B/s]

Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertModel: ['cls.predictions.transform.dense.weight', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.bias', 'cls.seq_relationship.bias', 'cls.predictions.transform.dense.bias', 'cls.seq_relationship.weight', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.decoder.weight']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


<IPython.core.display.Javascript object>

## Feed-Forward Layer

In [14]:
class FeedForward(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.linear_1 = nn.Linear(config.hidden_size, config.intermediate_size)
        self.linear_2 = nn.Linear(config.intermediate_size, config.hidden_size)
        self.gelu = nn.GELU()
        self.dropout = nn.Dropout(config.hidden_dropout_prob)

    def forward(self, x):
        x = self.linear_1(x)
        x = self.gelu(x)
        x = self.linear_2(x)
        x = self.dropout(x)
        return x

In [15]:
feed_forward = FeedForward(config)
ff_outputs = feed_forward(attn_outputs)
ff_outputs.size()

torch.Size([1, 5, 768])

## Layer Normalization

In [16]:
# Pre layer normaliztion: place layer normalization within the span of skip connections
class TransformerEncoderLayer(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.layer_norm_1 = nn.LayerNorm(config.hidden_size)
        self.layer_norm_2 = nn.LayerNorm(config.hidden_size)
        self.attention = MultiHeadAttention(config)
        self.feed_forward = FeedForward(config)

    def forward(self, x):
        # Apply layer normalization and then copy input into query, key, value
        hidden_state = self.layer_norm_1(x)
        # Apply attention with a skip connection
        x = x + self.attention(hidden_state)
        # Apply feed-forward layer with a skip connection
        x = x + self.feed_forward(self.layer_norm_2(x))
        return x

In [17]:
encoder_layer = TransformerEncoderLayer(config)
inputs_embeds.shape, encoder_layer(inputs_embeds).size()

(torch.Size([1, 5, 768]), torch.Size([1, 5, 768]))

## Position Embeddings

Idea: learn an efficient way of encoding the positions of tokens during pretraining

In [18]:
# Create a custom Embeddings module that combines a token embedding layer that projects the input_ids to a dense hidden state together with
# the positional embedding that does the same for position_ids.
# The resulting embedding is the sum of both embeddings.

class Embeddings(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.token_embeddings = nn.Embedding(config.vocab_size, config.hidden_size)
        self.position_embeddings = nn.Embedding(config.max_position_embeddings, config.hidden_size)
        self.layer_norm = nn.LayerNorm(config.hidden_size, eps=1e-12)
        self.dropout = nn.Dropout()
    
    def forward(self, input_ids):
        # Create position IDs for input sequence
        seq_length = input_ids.size(1)
        position_ids = torch.arange(seq_length, dtype=torch.long).unsqueeze(0)
        # Create token and position embeddings
        token_embeddings = self.token_embeddings(input_ids)
        position_embeddings = self.position_embeddings(position_ids)
        # Combine token and position embeddings
        embeddings = token_embeddings + position_embeddings
        embeddings = self.layer_norm(embeddings)
        embeddings = self.dropout(embeddings)
        return embeddings

In [19]:
embedding_layer = Embeddings(config)
embedding_layer(inputs.input_ids).size()

torch.Size([1, 5, 768])

Alternatives to learnable position embeddings:
1. Absolute positional representations: Use static patterns consisting of modulated sine and cosine signals to encode positions of the tokens. Works well when there are not large volumes of data available.
2. Relative positional representations: Based on the idea that when computing an embedding, the surrounding tokens are the most important. Encodes the relative position between tokens by modifying the attention mechanism with additional terms that take the relative position between tokens into account.

## Full Transformer Encoder

In [20]:
class TransformerEncoder(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.embeddings = Embeddings(config)
        self.layers = nn.ModuleList([TransformerEncoderLayer(config)
                                     for _ in range(config.num_hidden_layers)])

    def forward(self, x):
        x = self.embeddings(x)
        for layer in self.layers:
            x = layer(x)
        return x

## Adding a Classification Head

In [21]:
# Use the first token for prediction. Attach a dropout and linear layer to make a classification prediction.
class TransformerForSequenceClassification(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.encoder = TransformerEncoder(config)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        self.classifier = nn.Linear(config.hidden_size, config.num_labels)

    def forward(self, x):
        x = self.encoder(x)[:, 0, :] # select hidden state of [CLS] token
        x = self.dropout(x)
        x = self.classifier(x)
        return x

In [22]:
config.num_labels = 3
encoder_classifier = TransformerForSequenceClassification(config)
encoder_classifier(inputs.input_ids).size()

torch.Size([1, 3])

# Decoder

Has two attention sublayers:
1. Masked multi-head self-attention layer:
Ensures that the tokens generated at each timestep are only based on the past outputs and the current token being predicted. Without this, the decoder could cheat during training by simply copying the target translations; masking the inputs ensures the task is not trivial.

2. Encoder-decoder attention layer:
Performs multi-head attention over the output key and value vectors of the encoder stack, with the intermediate representations of the decoder acting as the queries. This way the encoder-decoder attention layer learns how to relate tokens from two different sequences, such as two different languages. The decoder has access to the encoder keys and values in each block.

In [23]:
# Masked Self-Attention by introducing a mask matrix with ones on the lower diagonal and zeros above

seq_len = inputs.input_ids.size(-1)
mask = torch.tril(torch.ones(seq_len, seq_len)).unsqueeze(0)
mask[0]

tensor([[1., 0., 0., 0., 0.],
        [1., 1., 0., 0., 0.],
        [1., 1., 1., 0., 0.],
        [1., 1., 1., 1., 0.],
        [1., 1., 1., 1., 1.]])

In [24]:
# Prevent each attention head from peaking at future tokens by replacing all the zeros with negative infinity. This guarantees the attention
# weights are all zero once we take the softmax because e to the power of negative infinity = 0
scores.masked_fill(mask == 0, -float("inf"))

tensor([[[ 2.9039e+01,        -inf,        -inf,        -inf,        -inf],
         [ 1.8737e+00,  2.9877e+01,        -inf,        -inf,        -inf],
         [ 2.1971e+00, -1.2978e+00,  2.7556e+01,        -inf,        -inf],
         [-1.4096e+00, -7.8071e-01, -1.9659e-01,  2.7037e+01,        -inf],
         [-2.8562e-01, -1.0545e+00, -9.9515e-01,  6.0634e-04,  2.5167e+01]]],
       grad_fn=<MaskedFillBackward0>)

In [25]:
# Include this masking behavior with a small change to the scaled dot-product attention function
def scaled_dot_product_attention(query, key, value, mask=None):
    dim_k = query.size(-1)
    scores = torch.bmm(query, key.transpose(1, 2)) / sqrt(dim_k)
    if mask is not None:
        scores = scores.masked_fill(mask == 0, float("-inf"))
    weights = F.softmax(scores, dim=-1)
    return weights.bmm(value)