# Assignment - Transformer and BERT





---





---



Fill your information here & run the cell

## Transformer and BERT

In this assignment, you will:
- Implement a simplified BERT from scratch
- Visualize attention in your implementd model
- Fine-tune a pre-trained BERT model 

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import numpy as np
from transformers import BertTokenizer, BertModel

### Encoder

In order to implement BERT, we should first implement the encoder layer of the transformer. An encoder has 2 main sub-layers: multi-headed attention layer and a simple feed-forward layer. The multi-headed attention layer is already implemented , but you should implement the feedforward sub-layer 
<br>
<center>

![](https://github.com/iust-deep-learning/982/raw/master/static_files/assignments/asg04_assets/encoder.PNG)

</center>
<br>

In [None]:


# ------------------------------ Encoder ------------------------------
class MultiHeadAttention(nn.Module):
    def __init__(self, hidden_size, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.hidden_size = hidden_size
        self.num_heads = num_heads
        self.projection_dim = hidden_size // num_heads
        self.Q = nn.Linear(hidden_size, hidden_size)
        self.K = nn.Linear(hidden_size, hidden_size)
        self.V = nn.Linear(hidden_size, hidden_size)
        self.out = nn.Linear(hidden_size, hidden_size)

    def attention(self, query, key, value, mask):
        # print(f'Query shape before reshape: {query.shape}')
        # print(f'Key shape before reshape: {key.shape}')

        query = query.reshape(query.shape[0], query.shape[1], -1, self.projection_dim)
        key = key.reshape(key.shape[0], key.shape[1], -1, self.projection_dim)
        value = value.reshape(value.shape[0], value.shape[1], -1, self.projection_dim)

        score = torch.matmul(query, key.transpose(-2, -1))
        dim_key = torch.tensor(key.shape[-1], dtype=torch.float32)
        scaled_score = score / torch.sqrt(dim_key)

        if mask is not None:
            # Change from mask.unsqueeze(1) to mask.unsqueeze(1).unsqueeze(2)
            # to add an extra dimension for the heads and then expand to match the score dimensions
            mask = mask.unsqueeze(1).unsqueeze(2)  # Add a new dimension for heads
            mask = mask.expand_as(scaled_score)  # Ensure mask is correctly expanded
            masked_score = scaled_score.masked_fill(mask == 0, -1e9)
        else:
            masked_score = scaled_score

        weights = F.softmax(masked_score, dim=-1)
        output = torch.matmul(weights, value)

        #print(f'Output shape after matmul: {output.shape}')
        return output, weights


    def separate_heads(self, x, batch_size):
        x = x.reshape(batch_size, -1, self.num_heads, self.projection_dim)
        return x.permute(0, 2, 1, 3)

    def forward(self, inputs, att_mask):
        batch_size = inputs.shape[0]
        query = self.separate_heads(self.Q(inputs), batch_size)
        key = self.separate_heads(self.K(inputs), batch_size)
        value = self.separate_heads(self.V(inputs), batch_size)

        # print(f'Separate heads shapes - Query: {query.shape}, Key: {key.shape}, Value: {value.shape}')

        attention, self.att_weights = self.attention(query, key, value, att_mask)
        attention = attention.permute(0, 2, 1, 3)

        # print(f'Attention shape after permute: {attention.shape}')

        concat_attention = attention.reshape(batch_size, -1, self.hidden_size)

        # print(f'Concat attention shape: {concat_attention.shape}')

        output = self.out(concat_attention)

        # print(f'Output shape: {output.shape}')
        return output


**Question**: Why does the transformer use multi-headed attention instead of just a single self-attention?

<font color=red> Write your answer here</font>

#### Feed-Forward Sub-Layer

The feed-forward sub-layer of the encoder has two dense layers. The first dense layer is called the "intermediate" layer and the second one is the "output" layer whose functionality is to down-project back to the hidden layer size. Dropout is also applied to the output of the intermediate layer. Unlike the original transformer, BERT uses "GELU" activation function in the intermediate dense layer. Since there is no GELU activation function in TensorFlow (there is one in TensorFlow Addons but it will crash your session!), you should implement it yourself!

Here is the GELU paper: https://arxiv.org/abs/1606.08415 . Or you can just search the internet!

In [None]:

def GELU(x):

  ########################################
  #     Put your implementation here     #
  ########################################

In [None]:
class FFN(nn.Module):
    def __init__(self, intermediate_size, hidden_size, drop_rate):
        super(FFN, self).__init__()
        self.intermediate = nn.Linear(hidden_size, intermediate_size)
        self.out = nn.Linear(intermediate_size, hidden_size)
        self.dropout = nn.Dropout(drop_rate)

    def forward(self, inputs):
        x = self.intermediate(inputs)
        x = 
        x = 
        x = self.out(x)
        return x

#### Residual Connections

In the encoder, dropout is applied to each sub-layer's output, then it gets added to the sub-layer's input (residual connection) and finaly goes through a layer normalizaion step. You should implement all the aforementioned steps in the **AddNorm** custom layer in the cell below!

In [None]:
class AddNorm(nn.Module):
    def __init__(self, hidden_size, LNepsilon, drop_rate):  # Add hidden_size
        super(AddNorm, self).__init__()
        self.LN = nn.LayerNorm(hidden_size, eps=LNepsilon)  # Correctly use hidden_size
        self.dropout = nn.Dropout(drop_rate)

    def forward(self, sub_layer_in, sub_layer_out):
          x = self.dropout(sub_layer_out)

          ########################################
          #     Put your implementation here     #
          ########################################
          x = 
          x = self.LN(x)
          return x

Now we have everything we need to implement an encoder layer!

In [None]:
class Encoder(nn.Module):
    def __init__(self, hidden_size, num_heads, intermediate_size, drop_rate=0.1, LNepsilon=1e-12):
        super(Encoder, self).__init__()
        self.attention = MultiHeadAttention(hidden_size, num_heads)
        self.ffn = FFN(intermediate_size, hidden_size, drop_rate)
        self.addnorm1 = AddNorm(hidden_size, LNepsilon, drop_rate)  # Pass hidden_size
        self.addnorm2 = AddNorm(hidden_size, LNepsilon, drop_rate)  # Pass hidden_size

    def forward(self, inputs, mask):
        att_out = self.attention(inputs, mask)
        att_out = self.addnorm1(inputs, att_out)
         ########################################
          #     Put your implementation here     #
          ########################################
        ffn_out = 
        ffn_out = 
        return ffn_out

### BERT

In the previous part, you implemented the encoder layer. We only need two more layers to implement BERT. First layer is the embedding layer. The final embedding for each token in BERT is the addition of three types of embeddings. Aside from token embeddings, there is also segment embeddings and position embeddings. For this assignment we are ignoring the segment embeddings since we only want to do single sentence classification! <br>
Unlike the transformer, which uses fixed positional embeddings, BERT uses learned positional embeddings.

<br>
<center>

![](https://github.com/iust-deep-learning/982/raw/master/static_files/assignments/asg04_assets/bert_emb.PNG)

</center>
<br>

Note that layer normalization followed by dropout is applied to the final embeddings (after adding all the embeddings).



**Question**: What is segment embedding's functionality in BERT?

<font color=red> Write your answer here</font>

In [None]:

# ------------------------------ BERT ------------------------------

class BertEmbedding(nn.Module):
    def __init__(self, vocab_size, maxlen, hidden_size):
        super(BertEmbedding, self).__init__()
        self.TokEmb = nn.Embedding(vocab_size, hidden_size, padding_idx=0)
        self.PosEmb = nn.Parameter(torch.randn(maxlen, hidden_size))
        self.LN = nn.LayerNorm(hidden_size)
        self.dropout = nn.Dropout(0.1)

    def forward(self, inputs):
        tok_emb = self.TokEmb(inputs)
        pos_emb = self.PosEmb[:inputs.shape[1], :]
        emb = tok_emb + pos_emb
        emb = self.LN(emb)
        emb = self.dropout(emb)
        return emb


The last layer you need to implement is the "pooler". The pooler converts the hidden states of the last encoder layer (which is of shape **[batch_size, sequence_lenght, hidden_size]**) to a vector representation (which is of shape **[batch_size, hidden_size]**) for each input sentence. The pooler does this by simply taking the hidden state corresponding to the first token (a special token in the beggining of each sentence) and feeding it to a dense layer (tanh is used as the activation function of this dense layer in the original implementation). 

In [None]:
class Pooler(nn.Module):
    def __init__(self, hidden_size):
        super(Pooler, self).__init__()
        self.dense = nn.Linear(hidden_size, hidden_size)

    def forward(self, encoder_out):
        first_token = encoder_out[:, 0, :]
        pooled_out = self.dense(first_token)
        return pooled_out


**Question**: As it was explained earlier, the pooler's job is to create a single vector representation of a sentence (or sentence pair) by taking the hidden state corresponding to the first token. Can you suggest another form of pooling that could work for BERT?

<font color=red> Write your answer here</font>

Now you can use the the **create_BERT** function in the cell below. This function gets BERT's hyper-parameters as its inputs and return a BERT model. Use the functional api to create the model.<br>
Note that the returned model must have two outputs (just like the pre-trained BERTs): 
- The hidden states of the last encoder layer
- Output of the pooler

In [None]:
def create_BERT(vocab_size, maxlen, hidden_size, num_layers, num_att_heads, intermediate_size, drop_rate=0.1):
    """
    Creates a BERT model based on the arguments provided.

    Arguments:
    vocab_size: number of words in the vocabulary
    maxlen: maximum length of each sentence
    hidden_size: dimension of the hidden state of each encoder layer
    num_layers: number of encoder layer
    num_att_heads: number of attention heads in the multi-headed attention layer
    intermediate_size: dimension of the intermediate layer in the feed-forward sublayer of the encoders
    drop_rate: dropout rate of all the dropout layers used in the model
    returns:
    model: a BERT model
    """

    emb = BertEmbedding(vocab_size, maxlen, hidden_size)
    encoder_layers = nn.ModuleList([Encoder(hidden_size, num_att_heads, intermediate_size, drop_rate, LNepsilon=1e-12) for _ in range(num_layers)])  # Pass LNepsilon
    pooler = Pooler(hidden_size)

    model = nn.ModuleList([emb, encoder_layers, pooler])

    return model

The Rotten tomatoes critic reviews dataset is used for this assignment. This dataset consists of about 350000 short reviews.

In [None]:
train_reviews, test_reviews = pd.read_csv('train_reviews.csv').values[:, 1:], pd.read_csv('test_reviews.csv').values[:, 1:]

(train_texts, train_labels), (test_texts, test_labels) = (train_reviews[:, 0], train_reviews[:, 1]), (test_reviews[:, 0], test_reviews[:, 1])

train_texts = [s.lower() for s in train_texts]
test_texts = [s.lower() for s in test_texts]

aprx_vocab_size = 20000
cls_token = '[cls]'
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')  # Using pre-trained BERT tokenizer
MAXLEN = 32

class ReviewsDataset(Dataset):
    def __init__(self, texts, labels):
        self.texts = texts
        self.labels = labels

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]
        encoded_text = tokenizer.encode_plus(text, add_special_tokens=True, max_length=MAXLEN, padding='max_length', truncation=True, return_tensors='pt')
        input_ids = encoded_text['input_ids'].squeeze()
        attention_mask = encoded_text['attention_mask'].squeeze()
        return {'input_ids': input_ids, 'attention_mask': attention_mask, 'label': torch.tensor(label, dtype=torch.long)}

Now complete the **encode_sentence** function in the cell below. This function recieves a sentence and an integer denoting the maximum length of the sentence as inputs and returns a list of token ids. Here are the steps to implement this function:
- encode the input sentence using the trained tokenizer to receive a token id list
- zero-pad the token id list to the maximum length
- add the id corresponding to the special token to the beggining of the token id list

Now use the functional api and the **create_BERT** function you implemented earlier to create a classifier for the movie reviews dataset.
Note that the intermediate layer in the feed-forward sub-layer of the encoders is set to $4\times H$ in the original BERT implementation, where $H$ is the hidden layer size. 

In [None]:
# ------------------------------ Training ------------------------------

train_dataset = ReviewsDataset(train_texts, train_labels)
test_dataset = ReviewsDataset(test_texts, test_labels)

train_dataloader = DataLoader(train_dataset, batch_size=256, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=256, shuffle=False)

# Using pre-trained BERT-Base hyperparameters
hidden_size = 768
num_heads = 6
num_layers = 6

# Creating BERT model
model = create_BERT(vocab_size=tokenizer.vocab_size, maxlen=MAXLEN, hidden_size=hidden_size, num_layers=num_layers, num_att_heads=num_heads, intermediate_size=4*hidden_size)

# Defining optimizer and loss function
optimizer = torch.optim.Adam(model.parameters(), lr=5e-5)
loss_fn = nn.CrossEntropyLoss()

# Training loop
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

epochs = 2

# for epoch in range(epochs):
# use tqdm for progress bar
batch_number = 0
for epoch in range(epochs):
    total_loss = 0
    for batch in train_dataloader:
        batch_number += 1
        #print(f'Batch % completed : {batch_number/len(train_dataloader)}')
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['label'].to(device)

        optimizer.zero_grad()

        # Pass input through the model
        emb = model[0](input_ids)
        for encoder in model[1]:
            emb = encoder(emb, attention_mask)
        output = model[2](emb)

        # Calculate loss
        loss = loss_fn(output, labels)

        # Backpropagate and update weights
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print(f'Epoch {epoch+1}: Loss = {total_loss/len(train_dataloader)}')

In [None]:
# write an evaluation function for the test data