# Homework 3 - Name, SCIPER

In this homework, we are going to work with the transformer. There are three parts of this homework.

- In the first part, we are going to implement **positional encoding** and **self-attention**  and test them on a simple text dataset which contains around 100 sentences. We will use a small transformer in this task.

- In the second part, we will detect promoters from the DNA sequences. The main difference compared to the previous task is to tokenize the DNA sequence. Thus, our task here is to build the **tokenizer** to tokenize the DNA sequence. For the model, we will continue using the small transformer.

- In the third part, we will use a **foundation model** DNABERT to perform promoter detection. In this part, you do not need to train the transformer. Instead, you need to find and load the correct pre-trained model and then use it to get the embedding of the DNA sequence. Then, you will build a simple classifier to perform promoter detection based on the DNA embedding.



## 0. Initialization

Import the packages you are going to use here.

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from transformers import AutoTokenizer, AutoModel
from torch.utils.data import DataLoader, TensorDataset
import pandas as pd
import random
import numpy as np
# from torchmetrics.classification import BinaryF1Score

import ipywidgets as widgets
from types import SimpleNamespace
from utils import data, evaluation, models, visualization, text_exercise

import math

Set the hyperparameters.

In [None]:
# Set seeds
seed = 128
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "mps" if torch.has_mps else "cpu")
print(f"Using device: {device}")

## 1. Positional Encoding and Self-Attention (7 pts)

### 1.1. Sinusoidal Positional Encoding (1 pt)

In this section, you are going to implement the sinusoidal positional encoding. The formula is as the following:

<div>
<img src="./imgs/positional embedding.png" width="400"/>
</div>

where $t$ is the desired position in the input and $\mathsf{\omega}_k$ follows:

<div>
<img src="./imgs/omega.png" width="200"/>
</div>

To see the details of sinusoidal positional encoding, you can check this [link](https://kazemnejad.com/blog/transformer_architecture_positional_encoding/).

In [None]:
class PositionalEmbedding(nn.Module):
    """Returns the positional embedding for inputs of a maximum length and dimension."""

    def __init__(self, max_position_embeddings, hidden_size, device):
        super().__init__()
        """
        Initialises the PositionalEmbedding class.

        Args:
            max_position_embeddings (int): maximum length of the input - related to t in the previous formula
            hidden_size (int): encoding dimension - d in the previous formula
            device (torch.device): device on which to store the positional embedding
        """

        # Save parameters
        self.max_position_embeddings = max_position_embeddings
        self.hidden_size = hidden_size

        # Compute positional embeddings
        t = torch.arange(max_position_embeddings).unsqueeze(1)
        w = 1 / 10000 ** (torch.arange(0, hidden_size, 2) / hidden_size)
        self.positional_embedding = torch.zeros(max_position_embeddings, hidden_size, device=device)
        self.positional_embedding[:, 0::2] = torch.sin(w * t)
        self.positional_embedding[:, 1::2] = torch.cos(w * t)

    def forward(self, x):
        return self.positional_embedding

    def embedding(self):
        return self.positional_embedding

Here, you can visualize your positional encoding. If you implement everything correctly, you can get a figure that is similar to Figure 2 in this [link](https://kazemnejad.com/blog/transformer_architecture_positional_encoding/).

In [None]:
(
    visualize_embedding,
    dimension_selector,
    max_len_selector,
) = visualization.display_positional_encoding(PositionalEmbedding)
ui = widgets.HBox([max_len_selector, dimension_selector])
out = widgets.interactive_output(
    visualize_embedding, {"max_len": max_len_selector, "dimension": dimension_selector}
)
display(ui, out)

### 1.2. Self-Attention Mechanism (5 pts)

In this section, you are going to implement the self-attention mechanism. Please check the section 'Self-Attention in Detail' in this [link](https://jalammar.github.io/illustrated-transformer/) for the details of self-attention mechanism. (We encourage you to carefully go through the link since it is a very good tutorial for transformer.)

The specific steps will be provided in the comments of the following code. (The steps are only for reference. You do need to follow the steps if you have a better way to implement it.)

I will split the implementation of the `BertSelfAttention` class into several chunks to make the code more readable and modular. The multi-headed self-attention essentially consists of several parts: `ScaledDotProductAttention`, `SingleHeadAttention`, `MultiHeadAttention` and `SelfAttention` in that order.

In [None]:
class BertSelfAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        if config.hidden_size % config.num_attention_heads != 0:
            raise ValueError(
                "The hidden size (%d) is not a multiple of the number of attention "
                "heads (%d)" % (config.hidden_size, config.num_attention_heads)
            )

        # Save configuration parameters
        self.hidden_size = config.hidden_size
        self.output_attentions = config.output_attentions
        self.num_attention_heads = config.num_attention_heads
        self.attention_head_size = int(config.hidden_size / config.num_attention_heads)
        self.all_head_size = self.num_attention_heads * self.attention_head_size

        # Initialise attention weights matrices of all heads in a single linear layer
        self.query = nn.Linear(config.hidden_size, self.all_head_size)
        self.key = nn.Linear(config.hidden_size, self.all_head_size)
        self.value = nn.Linear(config.hidden_size, self.all_head_size)

        # Add dropout layer
        self.dropout = nn.Dropout(config.attention_probs_dropout_prob)

    def transpose_for_scores(self, x):
        """
        Permutes an input tensor from (batch_size, seq_len, all_head_size) to
        (batch_size, num_heads, seq_len, head_size)
        """
        new_x_shape = x.size()[:-1] + (
            self.num_attention_heads,
            self.attention_head_size,
        )
        x = x.view(*new_x_shape)
        return x.permute(0, 2, 1, 3)

    def forward(
        self,
        hidden_states,
        attention_mask=None,
        head_mask=None,
        encoder_hidden_states=None,
        encoder_attention_mask=None,
    ):
        # The parameter encoder_hidden_states and encoder_attention_mask is for cross-attention.
        # We do not use them in this homework.

        # Compute the query, key, value matrices in all heads
        # Output dimension: (num_batches, seq_len, num_heads * head_size) = (num_batches, seq_len, all_head_size)
        mixed_key_layer = self.query(hidden_states)
        mixed_query_layer = self.key(hidden_states)
        mixed_value_layer = self.value(hidden_states)

        # Transpose K, Q, V to get the queries, keys and values in each head
        # Output dimension: (num_batches, num_heads, seq_len, head_size)
        query_layer = self.transpose_for_scores(mixed_query_layer)
        key_layer = self.transpose_for_scores(mixed_key_layer)
        value_layer = self.transpose_for_scores(mixed_value_layer)

        # Compute attention scores through scaled dot product attention
        attention_scores = (
            query_layer @ key_layer.transpose(-1, -2) / math.sqrt(self.hidden_size)
        )

        # You do not need to change this part.
        # Apply attention mask
        if attention_mask is not None:
            attention_scores = attention_scores + attention_mask

        # Normalize the attention scores to probabilities (+ dropout)
        attention_probs = self.dropout(F.softmax(attention_scores, dim=-1))

        # You do not need to change this part.
        if head_mask is not None:
            attention_probs = attention_probs * head_mask

        # Compute output as weighted sum of value by the score
        context_layer = attention_probs @ value_layer

        # Permute context layer to get the original shape
        B, _, _, H = context_layer.shape
        context_layer = context_layer.view(B, -1, H * self.num_attention_heads)

        # Get the output
        outputs = (
            (context_layer, attention_probs)
            if self.output_attentions
            else (context_layer,)
        )
        return outputs

Let's test your implementation using simple text data! First, let's load the data.

We use a small dataset in this homework for a shorter training time.

In [None]:
# ChatGPT generated text data about BERT
text = text_exercise.get()
sentences_df, vocab = data.to_sentence_df(text)

After loading the data, you can train your model. Here we train our model using masked token prediction.

Hint: The final model accuracy should be higher than 0.9.

In [None]:
text_max_len = 11

text_config = SimpleNamespace(
    vocab_size=len(vocab),
    hidden_size=60,
    max_position_embeddings=text_max_len,
    type_vocab_size=1,
    layer_norm_eps=1e-12,
    hidden_dropout_prob=0.0,
    attention_probs_dropout_prob=0.0,
    num_attention_heads=1,
    hidden_act="gelu",
    intermediate_size=160,
    num_hidden_layers=1,
    is_decoder=False,
    output_attentions=True,
    output_hidden_states=False,
    pruned_heads={},
    initializer_range=0.02,
    device="cpu",
)

tokenizer = data.TextTokenizer(vocab)
(
    input_ids,
    segment_ids,
    masked_lm_labels,
    labels_idx,
    labels,
    attention_masks,
) = data.generate_masked_data(
    sentences_df, tokenizer, k=1, max_len=text_max_len, noise_rate=0.4
)

model = models.BertForMaskedLM(
    config=text_config,
    positional_embedding=PositionalEmbedding,
    attention=BertSelfAttention,
)
optimizer = optim.AdamW(model.parameters(), lr=0.001)
print(
    f"Number of trainable model parameters: {models.number_of_model_parameters(model)}"
)

for epoch in range(200):
    optimizer.zero_grad()
    loss, outputs, attentions = model(
        input_ids=input_ids,
        token_type_ids=segment_ids,
        masked_lm_labels=masked_lm_labels,
        attention_mask=attention_masks,
    )
    if (epoch + 1) % 20 == 0:
        print("Epoch:", "%04d" % (epoch + 1), "loss =", "{:.6f}".format(loss))
    loss.backward()
    optimizer.step()

print(
    f"Final model accuracy: {evaluation.masked_label_accuracy(labels, labels_idx, outputs.data)}"
)

### 1.3. Visualize Attention (1 pt)

Here, you can visualize the self-attention. 

Question: Can you interpret the visualization of the self-attention?

**Write down you answer here (1 pt):** The visualisation shows a heatmap of the attention weight matrix in a head within an self-attention layer for some sample. The matrices' row-sum is $1$ and we can interpret the element $a_{uv}$ as the importance of the $v$-th token to the $u$-th token. The darker the color, the higher the importance.

In [None]:
visualize_attention, sample_id_selector = visualization.display_attantion(
    attentions=attentions, input_ids=input_ids, tokenizer=tokenizer
)
widgets.interactive(visualize_attention, sample_id=sample_id_selector)

### 1.4. Train on small Wikitext Dataset

Here, you can **optionally** test your model on the smallest wikitext dataset. You should get an test accuracy around 0.4 after training 50 epochs.

This part is only for you to test your code. You can choose to run it or not. It takes around 1 hour to train the model for 50 epochs on the smallest wikitext dataset with Google Colab.

In [None]:
# Don't train
# text_exercise.train_wikitext(device, positional_embedding=PositionalEmbedding, attention=BertSelfAttention)

## 2. Promoter detection (7 pts)

In this section, we detect promoter in DNA sequence.

A promoter is a region of DNA upstream of a gene where relevant proteins (such as RNA polymerase and transcription factors) bind to initiate transcription of that gene. Promoter detection is to identify if there are promoter regions in the given DNA sequence. We have covered this in the lecture. (If you are interested in the promoter, you can check this [link](https://www.genome.gov/genetics-glossary/Promoter) for more details.)

Here, we use a transformer and a classifier. The transformer first embeds the DNA sequences into features, and then the classifier detects the promoter based on the features.

The main difference between text and DNA sequence is how to tokenize the sequence. Thus, you need to implement a tokenizer for the DNA sequence.

### 2.1. DNA Tokenizer (1 pts)

Here, you will implement the DNA tokenizer the same as in DNABERT. Please check this [paper](https://academic.oup.com/bioinformatics/article/37/15/2112/6128680) for implementation details. Also, you need to check the data type and shape for both input and output.

In [None]:
class DNATokenizer(data.Tokenizer):
    def __init__(self, k, vocab, unknown="[UNK]"):
        """
        DNA tokenizer that splits a DNA sequence into k-mers. Inherits from
        data.Tokenizer which implements functionality for mapping the parsed tokens
        to indices and vice versa.
        """
        super().__init__(vocab, unknown)
        # Save parameters
        self.k = k

    def _parse_text(self, text):
        """
        Parse a text into a list of tokens.

        Args:
            text (str): text to parse

        Returns:
            list[str]: list of k-mer token strings
        """
        return [text[i : i + self.k] for i in range(len(text) - self.k + 1)]

### 2.2. Test BERT on DNA Sequence

In this section, you will train BERT on DNA sequence to learn the embedding of DNA sequence. The code is provided below and you do not need to write anything.

Hint: the final evaluation accuracy should be higher than 0.2.

In [None]:
kmer = 3
mask_length = kmer
VOCAB_3MER = [
    "[PAD]",
    "[UNK]",
    "[CLS]",
    "[SEP]",
    "[MASK]",
    "AAA",
    "AAT",
    "AAC",
    "AAG",
    "ATA",
    "ATT",
    "ATC",
    "ATG",
    "ACA",
    "ACT",
    "ACC",
    "ACG",
    "AGA",
    "AGT",
    "AGC",
    "AGG",
    "TAA",
    "TAT",
    "TAC",
    "TAG",
    "TTA",
    "TTT",
    "TTC",
    "TTG",
    "TCA",
    "TCT",
    "TCC",
    "TCG",
    "TGA",
    "TGT",
    "TGC",
    "TGG",
    "CAA",
    "CAT",
    "CAC",
    "CAG",
    "CTA",
    "CTT",
    "CTC",
    "CTG",
    "CCA",
    "CCT",
    "CCC",
    "CCG",
    "CGA",
    "CGT",
    "CGC",
    "CGG",
    "GAA",
    "GAT",
    "GAC",
    "GAG",
    "GTA",
    "GTT",
    "GTC",
    "GTG",
    "GCA",
    "GCT",
    "GCC",
    "GCG",
    "GGA",
    "GGT",
    "GGC",
    "GGG",
]

raw_training_data = data.load_csv("./data/train.csv")
raw_test_data = data.load_csv("./data/test.csv")

dna_max_len = 298
batch_size = 128
max_dna_mask = 100
dataset_size = 1000
num_layers = 3
num_heads = 6
dna_config = SimpleNamespace(
    vocab_size=len(VOCAB_3MER),
    hidden_size=60,
    max_position_embeddings=dna_max_len,
    type_vocab_size=1,
    layer_norm_eps=1e-12,
    hidden_dropout_prob=0.0,
    attention_probs_dropout_prob=0.0,
    num_attention_heads=num_heads,
    hidden_act="gelu",
    intermediate_size=160,
    num_hidden_layers=num_layers,
    is_decoder=False,
    output_attentions=True,
    output_hidden_states=True,
    pruned_heads={},
    initializer_range=0.02,
    device=device,
)

tokenizer = DNATokenizer(k=kmer, vocab=VOCAB_3MER)
(
    input_ids,
    segment_ids,
    masked_lm_labels,
    labels_idx,
    labels,
    attention_masks,
) = data.generate_masked_data(
    raw_training_data,
    tokenizer,
    max_len=dna_max_len,
    max_mask=max_dna_mask,
    k=mask_length,
    mask_rate=0.05,
    max_size=dataset_size,
)
(
    test_input_ids,
    test_segment_ids,
    test_masked_lm_labels,
    test_labels_idx,
    test_labels,
    test_attention_masks,
) = data.generate_masked_data(
    raw_test_data,
    tokenizer,
    max_len=dna_max_len,
    max_mask=max_dna_mask,
    k=mask_length,
    mask_rate=0.05,
    max_size=dataset_size,
)

model = models.BertForMaskedLM(
    config=dna_config,
    positional_embedding=PositionalEmbedding,
    attention=BertSelfAttention,
).to(device)
optimizer = optim.AdamW(model.parameters(), lr=0.002)
print(
    f"Number of trainable model parameters: {models.number_of_model_parameters(model)}"
)

train_dataset = TensorDataset(
    input_ids, segment_ids, masked_lm_labels, labels_idx, labels, attention_masks
)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

test_dataset = TensorDataset(
    test_input_ids,
    test_segment_ids,
    test_masked_lm_labels,
    test_labels_idx,
    test_labels,
    test_attention_masks,
)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

for epoch in range(50):
    total_train_loss = 0
    model.train()
    for (
        batch_input_ids,
        batch_segment_ids,
        batch_masked_lm_labels,
        _,
        _,
        batch_attention_mask,
    ) in train_loader:
        optimizer.zero_grad()
        loss, outputs, hidden_states, _ = model(
            input_ids=batch_input_ids.to(device),
            token_type_ids=batch_segment_ids.to(device),
            masked_lm_labels=batch_masked_lm_labels.to(device),
            attention_mask=batch_attention_mask.to(device),
        )
        loss.backward()
        optimizer.step()
        total_train_loss += loss.item()
    avg_train_loss = total_train_loss / len(train_loader)

    if (epoch + 1) % 10 == 0:
        model.eval()
        total_eval_loss = 0
        for (
            batch_input_ids,
            batch_segment_ids,
            batch_masked_lm_labels,
            _,
            _,
            batch_attention_mask,
        ) in test_loader:
            with torch.no_grad():
                loss, outputs, hidden_states, _ = model(
                    input_ids=batch_input_ids.to(device),
                    token_type_ids=batch_segment_ids.to(device),
                    masked_lm_labels=batch_masked_lm_labels.to(device),
                    attention_mask=batch_attention_mask.to(device),
                )
                if batch_attention_mask.sum() - torch.numel(batch_attention_mask) > 0:
                    print("found patting", batch_attention_mask.sum())
                total_eval_loss += loss.item()
        avg_eval_loss = total_eval_loss / len(test_loader)
        print(
            "Epoch:",
            "%04d" % (epoch + 1),
            "train cost =",
            "{:.6f}".format(avg_train_loss),
            "eval cost =",
            "{:.6f}".format(avg_eval_loss),
        )

average_train_acc, _ = evaluation.model_masked_label_accuracy(
    model, train_loader, device
)
average_test_acc, last_test_attention = evaluation.model_masked_label_accuracy(
    model, test_loader, device
)
print(
    "Train Acc =",
    "{:.6f}".format(average_train_acc),
    "Eval Acc =",
    "{:.6f}".format(average_test_acc),
)

### 2.3. Visualize the Attentions (1 pt)

Here, you can visualize the self-attention. 

Question: compare the visualization to Section 1.3, what can you find here? How do you explain it?

**Write down you answer here (1 pt):**

In [None]:
(
    visualize_attention,
    sample_id_selector,
    layer_selector,
    head_selector,
) = visualization.display_multi_attantion(
    attentions=last_test_attention,
    tokenizer=tokenizer,
    input_ids=input_ids,
    layers=range(1, num_layers + 1),
    heads=range(1, num_heads + 1),
)
ui = widgets.HBox([sample_id_selector, layer_selector, head_selector])
out = widgets.interactive_output(
    visualize_attention,
    {"sample_id": sample_id_selector, "layer": layer_selector, "head": head_selector},
)
display(ui, out)

### 2.4. Use your pretrained model for promoter detection (5 pts)

You already have the embeddings for the DNA sequence. Now, you are going to build a classifier based on the DNA embeddings. The classifier is to perform promoter detection. Specifically, the DNA sequence will be classified into *'contains promoter'* or *'does not contain promoter'*.

Hint: 
- We now want to annotate data (get the label for each sample), not predict masked data anymore!
- You can reuse some parts of the code in the previous sections, e.g. dataloader and training pipeline in Section 2.2.
- If you implement the previous section correctly (the Eval Acc > 0.2 in Section 2.2), you already have an pre-trained object named 'model' of class models.BertForMaskedLM. You can directly use it.
- The evaluation accuracy of this task should be around 0.6.

We first define a new model `PromoterDetector` which is a binary classification head that is "stacked" on top of a pre-trained DNABERT model. The model assumes that the BERT model is pre-trained and all weights are frozen for faster training.

In [None]:
class PromoterDetector(nn.Module):
    """PromoterDetector classification head on top of DNA BERT."""
    def __init__(self, model):
        """
        Classification head to stack on top of a pre-trained BERT model for promoter detection.

        Args:
            model (nn.Module): A pre-trained encoder model
        """
        super().__init__()

        # Save parameters
        self.model = model
        self.hidden_size = model.config.hidden_size

        # Initialise linear layer from class token
        self.linear = nn.Linear(self.hidden_size, 1)

    def forward(self, input_ids):
        """
        Feed forward function of the classification head. Returns the logits of the classification
        of each sequence in the batch. Assumes that the hidden states are returned by BERT model
        as the second output. And that the last tensor in the hidden state list corresponds to the
        hidden states in the last encoder block (i.e. the final output of the encoder) and that the
        first token in the sequence is the class token.

        Args:
            input_ids (torch.Tensor): Input tensor of shape (batch_size, seq_len)

        Returns:
            torch.Tensor: Output tensor of shape (batch_size, )
        """

        # Compute BERT output
        _, hidden_states, _ = self.model(input_ids=input_ids)
        
        # Extract last hidden state
        last_hidden_state = hidden_states[-1]

        # Get the embedding of the first token (class token)
        class_tokens = last_hidden_state[:, 0, :]

        # Get the classification logits
        logits = self.linear(class_tokens)

        return logits.squeeze()

Next, we define new dataset classes and loaders which sample the tokenised input indices and sequence labels. We can use the utility function `data.generate_labeled_data` and PyTorch functions for this.

In [None]:
# Generate labelled training data
train_input_ids, train_labels = data.generate_labeled_data(
    raw_training_data,
    tokenizer,
    max_len=dna_max_len,
    max_size=dataset_size,
)

# Generate labelled test data
test_input_ids, test_labels = data.generate_labeled_data(
    raw_test_data,
    tokenizer,
    max_len=dna_max_len,
    max_size=dataset_size,
)

# Convert to PyTorch tensors
train_dataset = TensorDataset(train_input_ids, train_labels)
test_dataset = TensorDataset(test_input_ids, test_labels)

# Create batched data loader
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

Let's initialise the `PromotorDetector` from a the pre-trained DNABERT model with frozen weights.

In [None]:
# Freeze all BERT parameters
for params in model.parameters():
    params.requires_grad = False

# Initialise the classifier
clf = PromoterDetector(model).to(device)
optimizer = optim.AdamW(clf.parameters(), lr=0.002)
criterion = nn.BCEWithLogitsLoss()

print(f"Number of model parameters: {models.number_of_model_parameters(clf)}")
print(f"Number of trainable model parameters: {sum([p.numel() for p in clf.parameters() if p.requires_grad])}")

We train the model for `50` epochs and print the training and validation loss and accuracy every other epoch.

In [None]:
# Training loop
for epoch in range(50):
    total_train_loss = 0
    model.train()
    for (input_ids, labels) in train_loader:
        # Zero gradients
        optimizer.zero_grad()
        
        # Move data to device
        input_ids = input_ids.to(device)
        labels = labels.to(device).float()

        # Forward pass
        logits = clf(input_ids)
        loss = criterion(logits.squeeze(), labels)

        # Backward pass
        loss.backward()
        optimizer.step()
        total_train_loss += loss.item()

    avg_train_loss = total_train_loss / len(train_loader)

    if (epoch + 1) % 2 == 0:
        model.eval()
        total_eval_loss = 0
        for (input_ids, labels) in test_loader:
            with torch.no_grad():
                # Move data to device
                input_ids = input_ids.to(device)
                labels = labels.to(device).float()

                # Forward pass and compute loss
                logits = clf(input_ids)
                loss = criterion(logits.squeeze(), labels)

                total_eval_loss += loss.item()

        avg_eval_loss = total_eval_loss / len(test_loader)
        print(
            "Epoch:",
            "%04d" % (epoch + 1),
            "train cost =",
            "{:.6f}".format(avg_train_loss),
            "eval cost =",
            "{:.6f}".format(avg_eval_loss),
        )

Finally, we evaluate the model on the test set using accuracy score.

In [None]:
# Evaluate
y_pred, y_true = [], []
for (input_ids, labels) in test_loader:
    with torch.no_grad():
        # Move data to device
        input_ids = input_ids.to(device)
        labels = labels.to(device).float()

        # Forward pass and compute loss
        logits = clf(input_ids)
        probs = F.sigmoid(logits.squeeze())
        preds = torch.round(probs)

        y_pred.extend(preds.tolist())
        y_true.extend(labels.tolist())

y_pred = torch.Tensor(y_pred)
y_true = torch.Tensor(y_true)
eval_acc = (y_pred == y_true).float().mean()

print("Eval Acc =", "{:.6f}".format(eval_acc))

### 2.5. Additional question (1 pt)

Now we change mask_length = 1 (already changed, you do not need to implement anything).
Let's run the code below and check the accuracy.

Question: What is the final masked token prediction accuracy? How do you explain this?

**Write down you answer here (1 pt):**

In [None]:
kmer = 3
mask_length = 1

dna_max_len = 298
batch_size = 128
max_dna_mask = 100
dataset_size = 1000
num_layers = 3
num_heads = 6
dna_config = SimpleNamespace(
    vocab_size=len(VOCAB_3MER),
    hidden_size=60,
    max_position_embeddings=dna_max_len,
    type_vocab_size=1,
    layer_norm_eps=1e-12,
    hidden_dropout_prob=0.0,
    attention_probs_dropout_prob=0.0,
    num_attention_heads=num_heads,
    hidden_act="gelu",
    intermediate_size=160,
    num_hidden_layers=num_layers,
    is_decoder=False,
    output_attentions=True,
    output_hidden_states=True,
    pruned_heads={},
    initializer_range=0.02,
    device=device,
)

tokenizer = DNATokenizer(k=3, vocab=VOCAB_3MER)
(
    input_ids,
    segment_ids,
    masked_lm_labels,
    labels_idx,
    labels,
    attention_masks,
) = data.generate_masked_data(
    raw_training_data,
    tokenizer,
    max_len=dna_max_len,
    max_mask=max_dna_mask,
    k=mask_length,
    mask_rate=0.05,
    max_size=dataset_size,
)
(
    test_input_ids,
    test_segment_ids,
    test_masked_lm_labels,
    test_labels_idx,
    test_labels,
    test_attention_masks,
) = data.generate_masked_data(
    raw_test_data,
    tokenizer,
    max_len=dna_max_len,
    max_mask=max_dna_mask,
    k=mask_length,
    mask_rate=0.05,
    max_size=dataset_size,
)

model = models.BertForMaskedLM(
    config=dna_config,
    positional_embedding=PositionalEmbedding,
    attention=BertSelfAttention,
).to(device)
optimizer = optim.AdamW(model.parameters(), lr=0.002)
print(
    f"Number of trainable model parameters: {models.number_of_model_parameters(model)}"
)

train_dataset = TensorDataset(
    input_ids, segment_ids, masked_lm_labels, labels_idx, labels, attention_masks
)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

test_dataset = TensorDataset(
    test_input_ids,
    test_segment_ids,
    test_masked_lm_labels,
    test_labels_idx,
    test_labels,
    test_attention_masks,
)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

for epoch in range(50):
    total_train_loss = 0
    model.train()
    for (
        batch_input_ids,
        batch_segment_ids,
        batch_masked_lm_labels,
        _,
        _,
        batch_attention_mask,
    ) in train_loader:
        optimizer.zero_grad()
        loss, outputs, hidden_states, _ = model(
            input_ids=batch_input_ids.to(device),
            token_type_ids=batch_segment_ids.to(device),
            masked_lm_labels=batch_masked_lm_labels.to(device),
            attention_mask=batch_attention_mask.to(device),
        )
        loss.backward()
        optimizer.step()
        total_train_loss += loss.item()
    avg_train_loss = total_train_loss / len(train_loader)

    if (epoch + 1) % 10 == 0:
        model.eval()
        total_eval_loss = 0
        for (
            batch_input_ids,
            batch_segment_ids,
            batch_masked_lm_labels,
            _,
            _,
            batch_attention_mask,
        ) in test_loader:
            with torch.no_grad():
                loss, outputs, hidden_states, _ = model(
                    input_ids=batch_input_ids.to(device),
                    token_type_ids=batch_segment_ids.to(device),
                    masked_lm_labels=batch_masked_lm_labels.to(device),
                    attention_mask=batch_attention_mask.to(device),
                )
                if batch_attention_mask.sum() - torch.numel(batch_attention_mask) > 0:
                    print("found patting", batch_attention_mask.sum())
                total_eval_loss += loss.item()
        avg_eval_loss = total_eval_loss / len(test_loader)
        print(
            "Epoch:",
            "%04d" % (epoch + 1),
            "train cost =",
            "{:.6f}".format(avg_train_loss),
            "eval cost =",
            "{:.6f}".format(avg_eval_loss),
        )

average_train_acc, _ = evaluation.model_masked_label_accuracy(
    model, train_loader, device
)
average_test_acc, last_test_attention = evaluation.model_masked_label_accuracy(
    model, test_loader, device
)
print(
    "Train Acc =",
    "{:.6f}".format(average_train_acc),
    "Eval Acc =",
    "{:.6f}".format(average_test_acc),
)

## 3. Using foundation model (5 pts)

### 3.1. Introduction

In this section, we aim to use a foundation model, DNABERT, to perform promoter detection.
A foundation model is a model pretrained on large datasets. Foundation models serve as the foundational building blocks upon which various applications can be constructed.

Here, we use DNABERT as the foundation model. We first apply it on DNA sequence to get the embedding. Then, we train a classifier on the embedding as in Section 2. Please follow this [link](https://github.com/Zhihan1996/DNABERT_2) to load the foundation model.

### 3.2. Implementation

**Consider this situation:** You get a dataset about promoter detection, and you build your model to perform the task as in Section 2. However, the performance is not good since the model is not strong enough. Suddenly, you think we can use a large pre-trained model to embed DNA sequences. Then, you search online and find the pre-trained model [DNABERT](https://github.com/Zhihan1996/DNABERT_2). Now, you want to perform promoter detection using the pre-trained DNABERT.

There is no coding framework in this section. Just make things work (get good test accuracy) using the pre-trained model!

Hint: 
- We encourage you to create a **new environment** following the instructions of Section 3 in this [link](https://github.com/Zhihan1996/DNABERT_2). (When you face the error "The model class you are passing has a config_class attribute that is not consistent with the config class you passed ...", creating a new environment can save you.)
- Section 4 in this [link](https://github.com/Zhihan1996/DNABERT_2) shows you how to load and use the pre-trained foundation model.

**Note**: Switch the environment at this point.

In [None]:
# Imports
import os
import tqdm
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from transformers import AutoTokenizer, AutoModel
from torch.utils.data import DataLoader, TensorDataset
from utils import data

from transformers import AutoTokenizer, AutoModel

import os
os.environ["TOKENIZERS_PARALLELISM"] = "false"

a. Load the dataset.

In [None]:
# Load the train and test datasets from train.csv and test.csv
raw_training_data = data.load_csv("./data/train.csv")
raw_test_data = data.load_csv("./data/test.csv")

b. Get the embeddings of the DNA sequences using pretrained model.

Hint: 
- This step can take some time. Thus, you can start with a small sample size, and then increase it when you have made sure that everything works correctly.
- After getting the embeddings, you can save them so that you can directly load them next time without running the foundation model.

In [None]:
# ToDo: Load the pretrained DNABERT model and use this to get the embeddings of the train and test DNA sequences.
# Load pre-trained DNABERT model and tokeniser
tokeniser = AutoTokenizer.from_pretrained("zhihan1996/DNABERT-2-117M", trust_remote_code=True)
model = AutoModel.from_pretrained("zhihan1996/DNABERT-2-117M", trust_remote_code=True)

Let's familiarise ourselves with the API provided by the pre-trained `AutoTokenizer` and `AutoModel` class. We can pass a `str` into the `AutoTokenizer` to get the tokenised input indices and pass the tokenised input indices into the `AutoModel` to get the embeddings.

In [None]:
dna = "ACGTAGCATCGGATCTATCTATCGACACTTGGTTATCGATCTACGAGCATCTCGTTAGC"
input_ids = tokeniser(dna, return_tensors = 'pt')["input_ids"] # (B, L)
hidden_states, _ = model(input_ids) # (B, L, H)

# Compute sequence embedding using mean pooling
embedding = torch.mean(hidden_states[0], dim=0)

print(f"input_ids: {input_ids.shape}")
print(f"hidden_states: {hidden_states.shape}")
print(f"embedding: {embedding.shape}")

Let's see if we can pass batched input

In [None]:
# Batched input to tokeniser
input_ids = tokeniser([dna, dna], return_tensors = 'pt')["input_ids"] # (B, L)
hidden_states, _ = model(input_ids) # (B, L, H)

# Compute sequence embedding using mean pooling
embedding = torch.mean(hidden_states[0], dim=0)

print(f"input_ids: {input_ids.shape}")
print(f"hidden_states: {hidden_states.shape}")
print(f"embedding: {embedding.shape}")

Works as expected. We can now iterate over the DNA sequences in our training and testing data.

In [None]:
def load_embeddings(path):
    return torch.load(path)

def save_embeddings(embeddings, path):
    return torch.save(embeddings, path)

def get_embeddings(model, tokeniser, sequences, batch_size = 128):
    embeddings = []
    for sequence in tqdm.tqdm(sequences):
        input_ids = tokeniser(sequence, return_tensors = 'pt')["input_ids"]
        hidden_states, _ = model(input_ids) # (B, L, H)
        embedding = torch.mean(hidden_states[0], dim=0)

        embeddings.append(embedding.tolist()) # (B, H)

    return torch.Tensor(embeddings)

# Extract training and testing sequences
# sub = 1000
train_sequences = raw_training_data["sequence"].tolist()
test_sequences = raw_test_data["sequence"].tolist()
train_labels = raw_training_data["label"].tolist()
test_labels = raw_test_data["label"].tolist()

# Set paths for saving and loading embeddings
data_path = os.path.join(os.getcwd(), "data")
train_embeddings_path = os.path.join(data_path, "train_embeddings.pt")
test_embeddings_path = os.path.join(data_path, "test_embeddings.pt")

# Compute and save embeddings if they do not exist
if not os.path.exists(train_embeddings_path) and not os.path.exists(test_embeddings_path):
    train_embeddings = get_embeddings(model, tokeniser, train_sequences)
    test_embeddings = get_embeddings(model, tokeniser, test_sequences)

    save_embeddings(train_embeddings, train_embeddings_path)
    save_embeddings(test_embeddings, test_embeddings_path)

# Load embeddings from disk
train_embeddings = load_embeddings(train_embeddings_path)
test_embeddings = load_embeddings(test_embeddings_path)

print("Loaded embeddings ✅.")
print(f"train_embeddings: {train_embeddings.shape}")
print(f"test_embeddings: {test_embeddings.shape}")

In [None]:
# ToDo: Using tsne or umap to visualize the embedding space.
from sklearn.manifold import TSNE
from sklearn.preprocessing import StandardScaler
from umap.umap_ import UMAP
import matplotlib.pyplot as plt
import seaborn as sns

sns.set_style("darkgrid")

# Hint: you can import other packages here for visualization.
tsne = TSNE(n_components=2)
umap = UMAP(n_components=2)

# Use a subset for visualisation
sub = 1000
sub_train_embeddings = train_embeddings[:sub]
sub_train_labels = train_labels[:sub]

# Standardize embeddings for better visualisation
scaler = StandardScaler()
scaled_train_embeddings = scaler.fit_transform(sub_train_embeddings)

# Compute t-SNE and UMAP embeddings
tsne_train_embeddings = tsne.fit_transform(scaled_train_embeddings)
umap_train_embeddings = umap.fit_transform(scaled_train_embeddings)

fig, axs = plt.subplots(ncols=2, figsize=(10, 5))
sns.scatterplot(
    x=tsne_train_embeddings[:, 0], 
    y=tsne_train_embeddings[:, 1],
    hue=sub_train_labels,
    s=50,
    ax=axs[0]
    );

sns.scatterplot(
    x=umap_train_embeddings[:, 0], 
    y=umap_train_embeddings[:, 1],
    hue=sub_train_labels,
    s=50,
    ax=axs[1]
    );

axs[0].set_title("t-SNE of DNABERT embeddings");
axs[1].set_title("UMAP of DNABERT embeddings");

c. Train a classifier.

Hint: It is easy to overfit on the training set. Try to avoid overfitting.

In [None]:
class Classifier(nn.Module):
    """Predicts the promoter sequence from the embedding of the sequence."""

    def __init__(self, 
                 hidden_dims = [256, 128, 64],
                 activation = nn.ReLU(),
                 dropout = 0.1):

        super().__init__()

        # Initialise linear layers
        dims = [768] + hidden_dims + [1]
        self.layers = nn.ModuleList([
            nn.Linear(dims[i], dims[i+1]) for i in range(len(dims) - 1)
        ])

        # Initialise dropout and activation
        self.dropout = nn.Dropout(dropout)
        self.activation = activation

    def forward(self, x):
        """
        Feed forward function of the classification head. Returns the logits of the classification
        of each sequence in the batch.

        Args:
            x (torch.Tensor): Tensor of sequence embeddings of shape (batch_size, embedding_size)
        
        Returns:
            torch.Tensor: Output tensor of shape (batch_size, )
        """

        for layer in self.layers[:-1]:
            x = layer(x)
            x = self.dropout(x)
            x = self.activation(x)

        x = self.layers[-1](x)

        return x.squeeze()

In [None]:
# Data classes
batch_size = 128

# Create PyTorch dataset from embeddings and labels
train_labels = torch.Tensor(raw_training_data["label"].tolist())
test_labels = torch.Tensor(raw_test_data["label"].tolist())

train_dataset = TensorDataset(train_embeddings, train_labels)
test_dataset = TensorDataset(test_embeddings, test_labels)

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
# Training hyperparameters
epochs = 10
lr = 0.001

# Initialise model and optimiser
model = Classifier().to(device)
optimiser = optim.AdamW(model.parameters(), lr=lr)
criterion = nn.BCEWithLogitsLoss()

for epoch in range(epochs):
    model.train()
    total_train_loss = 0.

    for batch_embeddings, batch_labels in train_loader:
        # Zero gradients
        optimiser.zero_grad()

        # Move data to device
        batch_embeddings = batch_embeddings.to(device)
        batch_labels = batch_labels.to(device)

        # Forward pass
        logits = model(batch_embeddings)
        loss = criterion(logits, batch_labels)

        # Backward pass
        loss.backward()
        optimiser.step()

        total_train_loss += loss.item()

    avg_train_loss = total_train_loss / len(train_loader)

    if (epoch + 1) % 1 == 0:
        model.eval()
        total_eval_loss = 0.
        for batch_embeddings, batch_labels in test_loader:
            # Move data to device
            batch_embeddings = batch_embeddings.to(device)
            batch_labels = batch_labels.to(device)

            # Forward pass
            logits = model(batch_embeddings)
            loss = criterion(logits, batch_labels)

            total_eval_loss += loss.item()

        avg_eval_loss = total_eval_loss / len(test_loader)

        print(f"Epoch [{epoch+1}/{epochs}] | Train Loss {avg_train_loss:.4f} | Eval Loss {avg_eval_loss:.4f}")

In [None]:
# Evaluate performance
def validate(model, loader, device):
    model.to(device)
    y_true, y_pred = [], []
    for batch_embeddings, batch_labels in loader:
        # Move data to device
        batch_embeddings = batch_embeddings.to(device)
        batch_labels = batch_labels.to(device)

        # Forward pass
        logits = model(batch_embeddings)
        probs = F.sigmoid(logits)
        preds = torch.round(probs)

        y_true.extend(batch_labels.tolist())
        y_pred.extend(preds.tolist())

    y_true = torch.Tensor(y_true)
    y_pred = torch.Tensor(y_pred)

    # Compute accuracy
    acc = (preds == batch_labels).float().mean()

    return {
        "acc": acc
    }

train_results = validate(model, train_loader, device)
test_results = validate(model, test_loader, device)

print(f"Train Acc: {train_results['acc']:.4f}")
print(f"Test Acc: {test_results['acc']:.4f}")