## Introduction to Natural Language Processing
[**CC-BY-NC-SA**](https://creativecommons.org/licenses/by-nc-sa/4.0/deed.en)<br/>
Prof. Dr. Annemarie Friedrich<br/>
Faculty of Applied Computer Science, University of Augsburg<br/>
Date: **SS 2025**

# 9. Attention + 10. Transformers (Homework)

__Recommendation:__ Use a GPU for the second part of the homework, e.g., in Google Colab Runtime --> Change Runtime --> GPU --> T4.

**Learning Goals**

* Explain why we need contextualized word embeddings
* Compute the forward pass of attention
* Explain the encoder-decoder architecture
* Explain the encoder block of the transformer
* Use BERT-specific implementation details
* Obtain an overview of influential pre-trained transformers
* Scientific reading + presentations


In [None]:
# Installations
!pip install -U datasets
!pip install transformers

# Imports
import torch
import transformers
from datasets import load_dataset
import numpy as np
import random
import os

# This is true if you are working on an GPU.
cuda_available = torch.cuda.is_available()

## Implementing Attention in PyTorch

❓ Your task is to implement the matrix-based calculation of self-attention using PyTorch. You will make use of the following functions and classes:

* `torch.Tensor`: keep in mind that even scalar values must be implemented as a Tensor object.
* `torch.sqrt`: computes the square root.
* `torch.mm`: matrix multiplication.

Pytorch provides a function for computing the softmax over a tensor. The function must first be created as an object of the `torch.nn.Softmax` class. (In Python, even functions are objects.)

Note over which dimension the softmax is computed:

In [7]:
a = torch.Tensor([[1, 1, 1], [1, 2, 3]])
print(a)

softmax = torch.nn.Softmax(dim=0)
print("\nsoftmax with dim=0\n", softmax(a))
print("-> softmax is computed for each column\n")

softmax = torch.nn.Softmax(dim=1)

print("softmax with dim=1\n", softmax(a))
print("-> softmax is computed for each row")

tensor([[1., 1., 1.],
        [1., 2., 3.]])

softmax with dim=0
 tensor([[0.5000, 0.2689, 0.1192],
        [0.5000, 0.7311, 0.8808]])
-> softmax is computed for each column

softmax with dim=1
 tensor([[0.3333, 0.3333, 0.3333],
        [0.0900, 0.2447, 0.6652]])
-> softmax is computed for each row


For computing the matrix-based calulation of self-attention for one input sentence (we do not have a batch dimension here!), follow the explanations of [Jay Alammar - The Illustrated Transfromer](http://jalammar.github.io/illustrated-transformer/).

First compute all query, key, and value vectors:

![Image illustrating how to compute Q, K and V](http://jalammar.github.io/images/t/self-attention-matrix-calculation.png)

Then, compute the attention weights via softmax and use them to create sums of the value vectors to get Z:

![Image illustrating how to compute Z using the attention weights computed using softmax](http://jalammar.github.io/images/t/self-attention-matrix-calculation-2.png)



This is your input:

$x_1 = [0.2, 0.04, 0.8, 0.09]$

$x_2 = [0.1, 0.31, 0.13, 0.06]$

$x_3 = [0.1, 0.4, 0.07, 0.1]$

$W_q = \begin{pmatrix}
4 & 2\\
1 & 6\\
1 & 1\\
2 & 1\\
\end{pmatrix}
W_k = \begin{pmatrix}
1 & 1\\
2 & 5\\
0 & 3\\
8 & 2\\
\end{pmatrix}
W_v = \begin{pmatrix}
3 & 1\\
3 & 5\\
1 & 0\\
2 & 4\\
\end{pmatrix}$


In [None]:
x1 = torch.Tensor([[0.2, 0.04, 0.8, 0.09]])
print(x1)
x2 = torch.Tensor([[0.1, 0.31, 0.13, 0.06]])
print(x2)
x3 = torch.Tensor([[0.1, 0.4, 0.07, 0.1]])
print(x3)

w_q = torch.tensor([[4, 2], [1, 6], [1, 1], [2, 1]])
print(w_q)
w_k = torch.tensor([[1, 1], [2, 5], [0, 3], [8, 2]])
print(w_k)
w_v = torch.tensor([[3, 1], [3, 5], [1, 0], [2, 4]])
print(w_v)

## Finetuning BERT

In this homework, we'll use a "vanilla BERT" as provided by the HuggingFace transformers library that we just modify for binary classification to predict whether a sentence (in our dataset) is objective or subjective. We focus on the English part of the dataset described in this paper:
[Antici et al.: A Corpus for Sentence-level Subjectivity Detection on English News Articles. 2023.](https://arxiv.org/abs/2305.18034)

Take a brief look at the paper and at the HuggingFace [dataset card](https://huggingface.co/datasets/tasksource/subjectivity/viewer/tasksource--subjectivity/train?p=7) to get an idea of what the task is about.

In [None]:
# Define the device we'll use for tensor computations
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Computing on:", device)

# Imports
from transformers import BertForSequenceClassification
import torch.optim as optim
from transformers import BertTokenizer
from torch.utils.data import Dataset, DataLoader

# Load the dataset
train_data = load_dataset("tasksource/subjectivity", split="train")
val_data = load_dataset("tasksource/subjectivity", split="validation")
test_data = load_dataset("tasksource/subjectivity", split="test")

❓ It's ALWAYS a good idea to first LOOK AT THE DATA. Compute the label distributions for the train, val, and test splits. (Recommendation: Write a function `print_label_dist(data_set)` that takes in a datasplit and prints out its label distribution. Print out a few instances to familiarize yourself with the data structure.

In [None]:
# Your code here


### Tokenization

We can call the tokenizer object directly (see __call__: https://huggingface.co/docs/transformers/main_classes/tokenizer).
It splits the text into word-piece tokens and returns a list containing tensors for the input_ids, the token_type_ids (which we will not need today), and the attention_mask (more on this below).
The `encoding` data structure looks like this:

```
{'input_ids': tensor([[  101,  6620, 22933,  2869,  2018,  2815, 27836,
      1010,  3038,  1996,  6514,  2231, 10858,  1996,  3423,  3691,  2000,
      17542,  1012,   102]]), 'token_type_ids': tensor([[0, 0, 0, 0, 0, 0, 0,
      0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]]), 'attention_mask': tensor([[1,
      1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]])}

```

In [None]:
# Tokenization
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased') # instantiate the tokenizer that corresponds to the model that we will use

def tokenize(data_set):
  input_data = []
  max_len = 0
  for i in range(len(data_set)):
    # use the tokenizer to tokenize the data
    encoding = tokenizer(data_set["Sentence"][i], return_tensors='pt', add_special_tokens=True, \
                         return_attention_mask=True)
    max_len = max(max_len, len(encoding['input_ids'].squeeze()))
    encoding["input_ids"] = encoding["input_ids"].squeeze()
    encoding["attention_mask"] = encoding["attention_mask"].squeeze()
    encoding["token_type_ids"] = encoding["token_type_ids"].squeeze()
    encoding = encoding.to(device)
    input_data.append(encoding)
  return input_data, max_len

train_input, max_len = tokenize(train_data)
val_input, _ = tokenize(val_data)
test_input, _ = tokenize(test_data)

print(train_input[0])
print(tokenizer.convert_ids_to_tokens(train_input[0]["input_ids"].squeeze()))
print()
print("Maximum number of tokens in training set:", max_len)

### Padding and Truncation
For the sake of understanding, we will perform __padding__ and __truncation__ manually as a preprocessing step today. (Later, you can use a `collate_fn` function that you pass to the dataloader, which only pads/truncates per batch to optimize speed, or you can use the tokenizer (see link above) to perform these steps for you.)

The inputs to the neural network, if computed using tensors on a GPU, must have the exact same dimensions. Remember that our input tensors are a list of token IDs. But not every sentence has the same number of tokens! Also, even large language models have a limited size of tensors that they accept as input, e.g., for the bert-base model, this number is 512 word-piece tokens. Hence, we need to do two things to make our inputs compatible such that they can be passed to the model as one batch:

* Define a maximum length of the vectors that represent each input (sentence, short text, ...). Today, we'll use the maximum length of the training set inputs. They fit into BERT.
* But wait, what if the validation/test set have a longer sentence? We also need to make sure to cut off (truncate) the input sequences as the predefined maximum length to ensure that they will fit into a tensor (in our case: a tensor that looks like a matrix).

The following function performs these steps. Read it carefully and try to understand each step.

In [None]:
# Padding / Truncation
def pad_truncate(data_set, max_len):
  # Make sure all input tensors are of length max_len
  for i in range(len(data_set)):
    instance = data_set[i]
    # Pad the input_ids
    zeros = torch.zeros(max_len-len(instance["input_ids"].squeeze()), dtype=torch.long, device=device)
    instance["input_ids"] = torch.cat((instance["input_ids"], zeros))
    instance["input_ids"] = instance["input_ids"].squeeze()[:max_len].unsqueeze(0) # slicing as in Python lists, the squeeze() removes the batch dimension here, unsqueeze(0) adds it back
    # Pad the masks
    instance["token_type_ids"] = torch.zeros(max_len, dtype=torch.long, device=device) # these are only needed if we perform sentence pairs tasks
    instance["attention_mask"] = torch.cat((instance["attention_mask"], zeros)) # these are needed such that the models knows where to perform attention
    # remove batch dimensions
    instance["input_ids"] = instance["input_ids"].squeeze()
    #instance["token_type_ids"] = instance["token_type_ids"].squeeze()
    instance["attention_mask"] = instance["attention_mask"].squeeze()
  return data_set


train_input = pad_truncate(train_input, max_len)
val_input = pad_truncate(val_input, max_len)
test_input = pad_truncate(test_input, max_len)
print(train_input[0])

### Labels to Tensors

❓ We have now represented our input data as tensors. Next, we need to map the labels to a single list of labels, respecting the order in our dataset. Write a function `get_labels(data_set)` that returns such a list for each datasplit. Replace the None values below with the return values of the calls to this function.


In [None]:
# Your code here
train_labels = None
dev_labels = None
test_labels = None

### PyTorch Dataset: BERT Input

The pre-trained BERT model expects not only the input_ids as input, but also the token_type_ids and the attention_masks that we have computed earlier.
Each input instance will consist of one dictionary (as above). The DataLoader will combine the values of these dictionaries into tensors that contain all the instances that will be used in one batch. The `__get_item__` method only needs to return this data structure for one instance.

In [None]:
# Preparing a custom dataset for BERT
class SubjectivityDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        label = torch.tensor(self.labels[idx], dtype=torch.float32, device=device)
        item = {"input_ids" : self.encodings[idx]["input_ids"],
                "token_type_ids" : self.encodings[idx]["token_type_ids"],
                "attention_mask" : self.encodings[idx]["attention_mask"]}
        return item, label

    def __len__(self):
        return len(self.labels)

train_dataset = SubjectivityDataset(train_input, train_labels)
print(train_dataset[0])
val_dataset = SubjectivityDataset(val_input, val_labels)
test_dataset = SubjectivityDataset(test_input, test_labels)

### Adapting BERT
The code below shows an example of using a pre-trained BERT model as an "embedding layer" in a neural network. The original model was pre-trained with a classification layer which we will ignore. Instead, we retrieve the embedding for the CLS token (the first embedding of the last hidden layer of BERT) and feed this into a linear layer predicting a z-score, and then pass this z through the sigmoid function to compute a probability score. When fine-tuning the model, the embeddings will also slightly change. However, if we would do this with a huge randomly initialized model, we would likely not achieve anything meaningful with just a small training dataset. Adapting the model to our task and domain, however, is highly effective.

In [None]:
class MyFinetunedModel(torch.nn.Module):

  def __init__(self):
    # max_len is the number of input_ids per token
    super(MyFinetunedModel, self).__init__()
    self.bert = BertForSequenceClassification.from_pretrained('bert-base-uncased') #, return_dict=True)
    self.linear = torch.nn.Linear(768, 1) # map from one BERT token embedding to a single scalar
    self.sigmoid = torch.nn.Sigmoid()

  def forward(self, inputs):
    outputs = self.bert(**inputs, output_hidden_states=True) # obtain embeddings for inputs
    last_hidden_state = outputs.hidden_states[-1] # hidden state values of last BERT layer
    cls_embedding = last_hidden_state[:,0,:] # selects the 768-dimensional embedding output for CLS
    # see: https://huggingface.co/docs/transformers/main_classes/output
    # (torch.FloatTensor of shape (batch_size, sequence_length, hidden_size))
    logit = self.linear(cls_embedding) # Classifier layer mapping embedding of CLS token to logit
    score = self.sigmoid(logit)
    return score

In [None]:
def evaluate(model, data_loader):
  # Compute accuracy of model on data provided by data_loader
  correct = 0
  num_instances = len(data_loader.dataset)
  with torch.no_grad(): # This tells the model that we're not training
                        # Will not remember gradients for this block
    model.eval()
    for X, y in iter(data_loader):
      y_probs = model(X)
      y_probs = y_probs.squeeze(1)
      y_pred = torch.where(y_probs >= 0.5, 1, 0.)
      correct += (y_pred == y).float().sum()

  accuracy = 100 * correct / num_instances
  return accuracy

In [None]:
# Always fun with the random seeds ...
# We need to set them such that our results will be replicable.
# (Hint: for an experiment later, you can change the random seed here and check what happens.
# But for now, let's keep the answer to all questions of the universe, 42.)
seed=42
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
np.random.seed(seed)
random.seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed)
if cuda_available:
  # This is needed on Colab as we are working in a distributed environment
  # If you are working in a different GPU environment, you can probably omit this line if it results in errors.
  os.environ["CUBLAS_WORKSPACE_CONFIG"]=":4096:8"

# Should we still have some source for non-determinism in our code, this will complain:
torch.use_deterministic_algorithms(True)


#####################################
# Instantiate the model             #
#####################################

model = MyFinetunedModel()
model = model.to(device)

#####################################
# Training / Fine-tuning the model  #
#####################################

num_epochs = 12
batch_size = 64
learning_rate = 1e-5

optimizer = optim.AdamW(model.parameters(), lr=learning_rate) # Always required for BERT!
loss_fn = torch.nn.BCELoss() # This loss function does not include sigmoid.
# Side note: if you exclude the sigmoid above, you have to use:
# https://pytorch.org/docs/stable/generated/torch.nn.BCEWithLogitsLoss.html

# ... and the rest of the code: is just as before!

# Data Loaders
data_loader_train = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
data_loader_val = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
data_loader_test = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Training
for n in range(num_epochs):
  model.train()
  it = iter(data_loader_train)  # Create the iterator from the training dataset
  epoch_loss, steps = 0, 0      # To keep track of the current epoch's loss

  for  X, y in it:              # Obtain a tensor X = batch of X-values, y accordingly
    y_pred = model(X)           # Have our model with current weights make a prediction
    y_pred = y_pred.squeeze(1)  # Removes the extra batch dimension (technical trick)
    loss = loss_fn(y_pred, y)   # Have the loss function compute the loss value
    optimizer.zero_grad()       # Reset the optimizer (otherwise it accumulates results - would be wrong here)
    loss.backward()             # Compute the gradients (partial derivatives)
    optimizer.step()            # Update the network's weights
    epoch_loss += loss          # For tracking the epoch's loss
    steps += 1

  print("\nEpoch:", n+1, "    Loss: {:0.4f}".format(epoch_loss/steps))
  # evaluate model at end of epoch
  print("Training accuracy:   {:2.1f}".format(evaluate(model, data_loader_train)))
  print("Validation accuracy: {:2.1f}".format(evaluate(model, data_loader_val)))



In [None]:
# Test accuracy
print("     Test accuracy: {:2.1f}".format(evaluate(model, data_loader_test)))

❓ When you look at the training logs above: does the model overfit? After which epoch could we have stopped the training?

Congratulations, you have just fine-tuned your first BERT-based model. Make sure to work through the code above carefully such that you understand each line. Take notes. In future tasks, you will have to modify the code further.