<a href="https://colab.research.google.com/github/johnrudrapogu/CSC-Final-Project/blob/main/E1_Coding_Kit.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# E1: Unimodal Language Representations (Coding Part)
---
Welcome to the coding part of the first exercise sheet. It will be about training `Transformer Language Models` (LMs) on text. It consists of six major components:
* **Data Preparation**: For that, please download the `lotr.txt` file from Moodle or by clicking on the link in the exercise sheet. This file, which contains the text from `The Lord of the Rings`, will be used to train our LMs in this exercise. Upload this file to this Google Colab / Kaggle instance or alternatively to your Google Drive if you decide to mount your personal Drive.
* **1 - Tokenization**: After the training text is ready, we have to think about how we want to tokenize it before training our model. As an initial approach to this, a `CharTokenizer` class is already provided, which implements a naive character-level tokenization method. Based on that, your task is to implement a data-driven `SubwordTokenizer` and accelerate it with a prefix tree data structure.
* **2 -  Model Architecture**: In the second coding task, you will complete the already provided Transformer model definition.
* **3 - Position Embeddings**: The provided model architecture does not include any sort of position embeddings. It's part of this exercise to implement a well-known type of positional embeddings.
* **4 - Training and Evaluation**: After all the steps above, you will now be able to finally train your model(s).

### ⚠️ **Note**: You do not need GPU acceleration until Task 4
You can safely do the first three exercises without any GPU attached to your runtime.

In [None]:
# Some necessary imports
import torch
import torch.nn as nn
import torch.optim as optim

import math
import random
import os
import json

import matplotlib.pyplot as plt
import numpy as np

from tqdm import tqdm
from dataclasses import dataclass

In [None]:
# Specify the path to your training text
data_path = 'lotr.txt'

# Optional:
from google.colab import drive
drive.mount('/content/drive')
data_path = '/content/drive/MyDrive/MAI_SS25_Exercises/lotr.txt'

# Load the training data
with open(data_path, 'r', encoding='latin-1') as f:
    lotr_text = f.read()

print(f"Dataset length: {len(lotr_text)} characters")
print(f"Sample:\n{lotr_text[:2000]}")

## 1 - Tokenization
The following `CharTokenizer` class implements the provided abstract (data-driven) Tokenizer interface. It is provided to you as a simple character-level tokenisation approach. It treats every character that occurs in the data as it's own token.

In [None]:
# Tokenizer Interface
class Tokenizer:

    def __init__(self, text, vocab_size=None):
        self.train(text, vocab_size)

    def train(self, text):
        raise NotImplementedError

    def encode(self, s):
        raise NotImplementedError

    def decode(self, tokens):
        raise NotImplementedError


# Character-level Tokenizer
class CharTokenizer(Tokenizer):
    def __init__(self, text):
        super(CharTokenizer, self).__init__(text)

    def train(self, text, vocab_size):
        # Build vocab from sorted unique characters
        self.vocab = sorted(list(set(text)))
        self.token_to_id = {ch: i for i, ch in enumerate(self.vocab)}
        self.id_to_token = {i: ch for ch, i in self.token_to_id.items()}

    def encode(self, s):
        """Encode a string to a list of token indices"""
        return [self.token_to_id[c] for c in s]

    def decode(self, tokens):
        """Decode a list of token indices back to a string"""
        return ''.join([self.id_to_token[i] for i in tokens])

# Instantiate tokenizer and process data
tokenizer = CharTokenizer(lotr_text)
data = torch.tensor(tokenizer.encode(lotr_text), dtype=torch.long)

print(f"Vocab size: {len(tokenizer.vocab)}")
print(f"Vocab elements: {tokenizer.vocab}")
print(f"Sample tokenization: 'Gandalf runs' -> {tokenizer.encode('Gandalf runs')}")
print(f"Decoded back: {tokenizer.decode(tokenizer.encode('Gandalf runs'))}")


### a) 💻 Implement Subword Frequency Tokenization
The `SubwordTokenizer` class shall do the following during training:


1.   Start with a vocabulary $V$ that contains only all occurring characters.
2.   Then repeat until `len(vocab)` is equal to the wanted `vocab_size`:

*   Compute frequencies of token pairs in the text
*   Take most frequent pair of tokens
*   Merge them into a new token and update the vocabulary

For encoding, it shall greedily take the longest matching token from the vocabulary as the next token in the encoded sequence. You are supposed to build a simple prefix tree after training to accelerate this search for the longest matching prefix token. This will modify the inner logic of your encode(...) method but significantly improve its performance. You may use the `pygtrie` package to implement the data structure. (`pip install pygtrie`)

In [None]:
# Install the pygtrie package for the Prefix Trie data structure
!pip install pygtrie

In [None]:
import pygtrie
from collections import Counter

# ================ STUDENT PART (START) ==================================

class SubwordTokenizer(Tokenizer):

    def __init__(self, text, vocab_size=1000):
        super(SubwordTokenizer, self).__init__(text, vocab_size)

    def train(self, text):
        raise NotImplementedError

    def encode(self, s):
        raise NotImplementedError

    def decode(self, tokens):
        raise NotImplementedError

# ================ STUDENT PART (END) ==================================

Next, we provide a PyTorch dataset class that holds the tokenized input and output sequences for us. Make sure to understand what it is doing before moving on.

In [None]:
from torch.utils.data import Dataset, DataLoader

class TextTokenDataset(Dataset):
    def __init__(self, encoded_text, seq_length):
        self.data = encoded_text
        self.seq_length = seq_length

    def __len__(self):
        return len(self.data) - self.seq_length  # Total possible sequences

    def __getitem__(self, idx):
        input_sequence = self.data[idx: idx + self.seq_length]  # input sequence of tokens
        target_sequence = self.data[idx + 1: idx + self.seq_length + 1]  # target sequence (shifted by 1)

        return torch.tensor(input_sequence), torch.tensor(target_sequence)

## 2 - Model Architecture
Below you can find an almost complete PyTorch model that we will use as our LM. It consists of `TransformerBlock` modules.


### a) 💻 Complete the TransformerBlock class
The `TransformerBlock` class is currently incomplete; it is your task to implement its `forward` method. Make sure that you create and apply a causal mask for the attention mechanism.

In [None]:
class TransformerBlock(nn.Module):

    def __init__(self, embed_dim, num_heads):
        super(TransformerBlock, self).__init__()

        self.embed_dim = embed_dim
        self.num_heads = num_heads

        # Multi-head attention layer
        self.attn = nn.MultiheadAttention(embed_dim, num_heads, batch_first=True)

        # Feedforward layers
        self.ffn = nn.Sequential(
            nn.Linear(embed_dim, embed_dim),
            nn.ReLU(),
            nn.Linear(embed_dim, embed_dim)
        )

        # Normalization
        self.norm1 = nn.LayerNorm(embed_dim)
        self.norm2 = nn.LayerNorm(embed_dim)


    def forward(self, x, is_training=True):

        batch_size, seq_len, _ = x.size()

        # ============== STUDENT PART (START) ==================================

        # ================ STUDENT PART (END) ==================================

        return x


### b) 💻 Complete the TransformerModel class
After that, complete the `forward` method of the `TransformerModel` class as well. This method combines the (optional) positional embedding, which we will implement later, and the token embeddings by an element-wise sum before passing them through the blocks and final layer.

In [None]:
class TransformerModel(nn.Module):

    def __init__(self, vocab_size, embed_dim=128, num_heads=4, n_blocks=2, context_length=128, pos_emb_class=None):

        super(TransformerModel, self).__init__()
        self.embed_dim = embed_dim
        self.token_embed = nn.Embedding(vocab_size, embed_dim)

        if pos_emb_class is not None:
            self.pos_emb = pos_emb_class(embed_dim, max_len=context_length)
        else:
            self.pos_emb = None

        self.blocks = nn.ModuleList([TransformerBlock(embed_dim, num_heads) for _ in range(n_blocks)])
        self.lm_head = nn.Linear(embed_dim, vocab_size)

    def forward(self, idx, is_training=True):

        # ============== STUDENT PART (START) ==================================

        # ================ STUDENT PART (END) ==================================

        return logits

## 3 - Position Embeddings
Transformers don't have any built-in notion of sequence order like RNNs or CNNs. To help the model understand the order of tokens in a sequence, we add position embeddings to the token embeddings. These tell the model where in the sequence each token is.

### a) 💻 Implement a PositionEmbeddings class
Implement the `SinusoidalPositionEmbeddings` class, which is the type of positional encoding used and described in [Attention is All You Need](https://arxiv.org/abs/1706.03762) (Vaswani et al. 2017).

The final output of this PyTorch module for an input of `x.shape = (batch_size, seq_len, _)` should be of shape `(1, seq_len, embed_dim)`.

In [None]:
class SinusoidalPositionEmbeddings(nn.Module):

    def __init__(self, embed_dim, max_len=512):
        super(SinusoidalPositionEmbeddings, self).__init__()
        self.embed_dim = embed_dim

    def forward(self, x):

        # ============== STUDENT PART (START) ==================================

        # ================ STUDENT PART (END) ==================================

        return ...


### c) 💻 Implement a helpful visualisation of your implemented positional embeddings.
Be creative and include the result together with an explanation in your report.

In [None]:
# ============== STUDENT PART (START) ==================================

# ================ STUDENT PART (END) ==================================

### ⚠️ **Note**: You should now restart your runtime with GPU access.

## 4 - Training and Evaluation
Now it is finally time to train and also evaluate your language model. The training and sampling functions are already provided:

In [None]:
def train(model, train_loader, optimizer, criterion, tokenizer, context_length, num_epochs=5):
    n_batches_between_validation = 100

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        loop = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}", ncols=100)

        for batch_idx, (input_seq, target_seq) in enumerate(loop):
            input_seq, target_seq = input_seq.cuda(), target_seq.cuda()

            optimizer.zero_grad()

            # Forward pass
            logits = model(input_seq)

            # Flatten logits and target sequences for loss computation
            logits = logits.view(-1, logits.size(-1))  # Reshape for loss
            target_seq = target_seq.view(-1)  # Flatten the target sequence

            # Compute the loss
            loss = criterion(logits, target_seq)
            loss.backward()

            # Update weights
            optimizer.step()

            running_loss += loss.item()

            # Update the progress bar with the loss
            loop.set_postfix(loss=running_loss / (batch_idx + 1))

            if batch_idx % n_batches_between_validation == 0:
                model.eval()
                prompt = """Gandalf """
                generated_tokens = generate(model, prompt, max_len=context_length)
                model.train()
                print(" -> " + tokenizer.decode(generated_tokens[0].tolist()))

In [None]:
def generate(model, prompt, max_len=128, temperature=1.0):
    model.eval()  # Set model to evaluation mode
    prompt_tokens = torch.tensor(tokenizer.encode(prompt), dtype=torch.long).unsqueeze(0).cuda()
    generated = prompt_tokens

    for _ in range(max_len - prompt_tokens.shape[1]):
        # Feed the current sequence into the model
        logits = model(generated, is_training=False)

        # Only get the logits for the last token (the token being predicted)
        logits = logits[:, -1, :]

        # Apply temperature (optional, can control randomness of predictions)
        logits = logits / temperature

        # Sample from the probability distribution
        probs = torch.nn.functional.softmax(logits, dim=-1)
        next_token = torch.multinomial(probs, 1)  # Sample from the distribution

        # Append the generated token to the sequence
        generated = torch.cat([generated, next_token], dim=1)

    return generated

### 💻 a) Train your tokenizers and inspect their learned vocabularies

In [None]:
# ============== STUDENT PART (START) ==================================

char_tokenizer = ...

subword_tokenizer = ...

# ================ STUDENT PART (END) ==================================

In [None]:
# ============== STUDENT PART (START) ==================================

# Tokenizer inspection here

# ================ STUDENT PART (END) ==================================

### Provided default configuration

In [None]:
@dataclass
class DefaultConfig:

  context_length: int = 64
  embed_dim: int = 64
  n_heads: int = 8
  n_blocks: int = 4
  ffn_dim: int = 512
  batch_size: int = 64

  n_epochs: int = 1
  weight_decay: float = 0.01
  learning_rate: float = 0.0005

### Provided default setup

In [None]:
# Choose your tokenizer
tokenizer = ...

# Create the configuration
config = DefaultConfig()
vocab_size = len(tokenizer.vocab)

# Encoding the entire text
encoded_text = tokenizer.encode(lotr_text)
train_dataset = TextTokenDataset(encoded_text, config.context_length)

# Create a DataLoader to batch data
train_loader = DataLoader(train_dataset, batch_size=config.batch_size, shuffle=True)

# Create Model
model = TransformerModel(
      vocab_size,
      config.embed_dim,
      num_heads=config.n_heads,
      n_blocks=config.n_blocks,
      context_length=config.context_length,
      pos_emb_class=SinusoidalPositionEmbeddings
    ).cuda()

# Print model size
print(f"Model size: {sum(p.numel() for p in model.parameters())/1e6:.2f}M parameters")

# Optimizer and Loss function
optimizer = optim.AdamW(model.parameters(), lr=config.learning_rate, weight_decay=config.weight_decay)
criterion = nn.CrossEntropyLoss()

# Train the model
train(model, train_loader, optimizer, criterion, tokenizer, config.context_length, num_epochs=config.n_epochs)

### Evaluation metric: Perplexity

In [None]:
def perplexity(model, test_text, tokenizer, context_length):
    model.eval()
    test_encoded = tokenizer.encode(test_text)
    test_dataset = TextTokenDataset(test_encoded, context_length)
    test_loader = DataLoader(test_dataset, batch_size=config.batch_size, shuffle=False)
    total_loss = 0.0
    total_tokens = 0

    with torch.no_grad():
        for input_seq, target_seq in tqdm(test_loader):
            input_seq, target_seq = input_seq.cuda(), target_seq.cuda()

            # Forward pass
            logits = model(input_seq)
            logits = logits.view(-1, logits.size(-1))  # Reshape for loss
            target_seq = target_seq.view(-1)  # Flatten the target sequence

            # Compute the loss
            loss = criterion(logits, target_seq)
            total_loss += loss.item()
            total_tokens += target_seq.size(0)

    avg_loss = total_loss / len(test_loader)
    perplexity = math.exp(avg_loss)

    return perplexity

In [None]:
value = perplexity(model, lotr_text[:len(lotr_text)//10], tokenizer, config.context_length)
print(f"Perplexity: {value:.2f}")

### 💻 c) Train and evaluate your final models (obtained with the two tokenizers), w/ and /wo the positional embeddings (4 combinations). Stick to the default configuration for now.

In [None]:
# ============== STUDENT PART (START) ==================================

# Your experiment here. Ensure it reproduces the results in your report.

# ================ STUDENT PART (END) ==================================

### Test your model with prompts

In [None]:
# Test your model
prompt = "Gandalf"
generated_tokens = generate(model, prompt, max_len=config.context_length)

print(tokenizer.decode(generated_tokens[0].tolist()))

### **Optional** (Only Karma Points)
Generate a story about Gandalf with your best LM (*feel free to change the configuration*) and share it in the respective **Content Q&A forum** thread.


In [None]:
# ============== STUDENT PART (START) ==================================

prompt = "..."

gandalfs_story = ...

# ================ STUDENT PART (END) ==================================