# Programming Task Description

## Programming Task: Implementing a Character-Level GPT Model

### Introduction
In this task, you will create a Python script using PyTorch to implement a simplified GPT (Generative Pre-trained Transformer) model for character-level language modeling. The model will be trained on the text in input.txt to predict the next character in a sequence and generate new text based on a given context. The architecture follows the decoder part of the transformer model from the "Attention is All You Need" paper by Vaswani et al., focusing on masked multi-head self-attention to ensure predictions depend only on previous positions.

## Task Description
### Your goal is to write a Python jupyter notebook that:

1. Reads and processes the text from input.txt.
2. Implements a decoder-only transformer model with manual attention mechanisms.
3. Trains the model on the processed data.
4. Generates new text using the trained model.

You will use PyTorch and implement the attention mechanism from scratch, following the decoder structure outlined in the "Attention is All You Need" paper.

### Step-by-step Guide

1. Data Preparation
* Read all text from input.txt using UTF-8 encoding.
* Create a sorted list of unique characters (vocabulary) from the text.
* Build two dictionaries:
    * stoi: Maps characters to integers (e.g., 'a' -> 0).
    * itos: Maps integers to characters (e.g., 0 -> 'a').
* Define functions:
    * encode(s): Converts a string to a list of integers using stoi.
    * decode(l): Converts a list of integers to a string using itos.
* Encode the entire text into a tensor of integers using torch.tensor.
* Split the data: 90% for training, 10% for validation.

2. Data Loading
* Implement a function get_batch(split):
    * Input: split is either 'train' or 'val'.
    * Select the appropriate dataset (training or validation).
    * Randomly sample batch_size starting indices, ensuring each sequence fits within block_size.
* Return:
    * x: A tensor of shape (batch_size, block_size) with input sequences.
    * y: A tensor of shape (batch_size, block_size) with target sequences (shifted by one position).
* Move tensors to the device (CPU or GPU).

3. Model Architecture
* Implement the following components in a decoder-only transformer:
    * Embedding Layers:
        * Token embedding: nn.Embedding(vocab_size, n_embd) for character indices.
        * Position embedding: nn.Embedding(block_size, n_embd) for positions 0 to block_size-1.
    * Transformer Blocks:
        * Each block includes:
            * Masked Multi-Head Self-Attention:
                * Implement manually (do not use nn.MultiheadAttention).
                * For each head:
                    * Linear layers for queries (Q), keys (K), and values (V).
                    * Scaled dot-product attention: attention = softmax((Q @ K.T) / sqrt(d_k)) @ V.
                    * Mask future positions with a lower triangular matrix (e.g., tril) by setting future weights to -inf before softmax.
                * Concatenate heads and apply a projection layer.
            * Feed-Forward Network: nn.Linear(n_embd, 4 * n_embd) → ReLU → nn.Linear(4 * n_embd, n_embd).
            * Layer Normalization: Apply nn.LayerNorm(n_embd) before each sub-layer (pre-norm).
            * Residual Connections: Add input to output of each sub-layer.
        * Use n_layer blocks in sequence.
    * Final Layers:
        * nn.LayerNorm(n_embd) for final normalization.
        * nn.Linear(n_embd, vocab_size) to produce logits.
* Define a GPTLanguageModel class with:
    * forward(idx, targets=None): Computes logits and loss (if targets provided).
    * generate(idx, max_new_tokens): Autoregressively generates new tokens.

4. Training
* Use the AdamW optimizer with learning_rate = 3e-4.
* Train for max_iters = 5000 iterations.
* Estimate and print training and validation losses:
* Compute loss using F.cross_entropy on flattened logits and targets.

5. Text Generation
* Implement generate(idx, max_new_tokens):
    * Start with an initial context idx (shape (B, T)).
    * For max_new_tokens steps:
        * Crop idx to the last block_size tokens.
        * Get logits from forward.
        * Apply softmax to the last time step’s logits to get probabilities.
        * Sample the next token using torch.multinomial.
        * Append the sampled token to idx.
    * Return the extended sequence.

### Hyperparameters
Use these values:

* batch_size = 64
* block_size = 256
* n_embd = 384
* n_head = 6
* n_layer = 6
* dropout = 0.2
* learning_rate = 3e-4
* max_iters = 5000

### Understanding the Decoder
The "Attention is All You Need" paper describes a transformer with an encoder and decoder. For this task, you focus on the decoder-only architecture used in GPT:

* Masked Self-Attention: Ensures the model only attends to previous positions in the sequence, making it autoregressive. This is achieved by masking future tokens in the attention computation, as shown below:

$Attention (Q, K, V) = softmax((Q@K.T)/sqrt(d_{k}) + mask) @V$ 

where $mask$ sets future positions to $-inf$

* Decoder Role: In the original paper, the decoder generates output sequences while attending to the encoder’s output. Here, without an encoder, it uses self-attention on the input sequence alone, predicting the next token step-by-step.

### Additional Notes
* Manual Attention: Implement attention from scratch to understand its mechanics (no pre-built PyTorch modules).
* Masking: Use a lower triangular matrix (e.g., torch.tril) to mask future positions.
* Device Handling: Set device = 'cuda' if torch.cuda.is_available() else 'cpu' and move tensors/models accordingly.
* Dropout: Apply nn.Dropout(dropout) in attention and feed-forward layers for regularization.

### Deliverables
A Python script that:
* Implements all steps above.
* Prints training and validation losses every 500/100? iterations (up to you).
* Generates and prints a 500-character sample after training.

### Evaluation Criteria
* Correct data preparation and batch loading.
* Accurate implementation of the transformer model, especially masked self-attention.
* Successful training with decreasing loss.
* Generation of coherent (for character-level) text.

# Code

In [1]:
!pip install matplotlib

[1;31merror[0m: [1mexternally-managed-environment[0m

[31m×[0m This environment is externally managed
[31m╰─>[0m To install Python packages system-wide, try 'pacman -S
[31m   [0m python-xyz', where xyz is the package you are trying to
[31m   [0m install.
[31m   [0m 
[31m   [0m If you wish to install a non-Arch-packaged Python package,
[31m   [0m create a virtual environment using 'python -m venv path/to/venv'.
[31m   [0m Then use path/to/venv/bin/python and path/to/venv/bin/pip.
[31m   [0m 
[31m   [0m If you wish to install a non-Arch packaged Python application,
[31m   [0m it may be easiest to use 'pipx install xyz', which will manage a
[31m   [0m virtual environment for you. Make sure you have python-pipx
[31m   [0m installed via pacman.

[1;35mnote[0m: If you believe this is a mistake, please contact your Python installation or OS distribution provider. You can override this, at the risk of breaking your Python installation or OS, by passing --break-s

In [2]:
import torch
import torch.nn as nn
from torch.nn import functional as F
from torch.utils.data import random_split
from typing import Optional, Tuple

import random
#import torchvision
#import torchvision.transforms as transforms
import matplotlib.pyplot as plt

In [3]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

## Data Preprocessing

In [4]:
with open("data/input-2.txt", "r", encoding="utf-8") as file:
    data = file.read()

sorted_chars = sorted(list(set(data)))
sorted_str = "".join(sorted_chars)

In [5]:
data[25:50]

'proceed any further, hear'

In [6]:
vocab_size = len(sorted_chars)

In [7]:
class Tokenizer:
    def __init__(self):
        self.tokens: dict = {}
        self.idx: int = 0

    def encode(self, chars: str):
        result = []
        for c in chars:
            if c not in self.tokens.keys():
                self.tokens[c] = self.idx
                self.idx += 1
            result.append(self.tokens[c])
        
        return result

    def decode(self, nums: list[int]):
        result = [self.tokens[i] for i in nums]
        return result

In [8]:
tokenizer = Tokenizer()
encoded_str = tokenizer.encode(sorted_str)

In [9]:
stoi = {x: y for x, y in zip(sorted_str, encoded_str)}
itos = {x: y for x, y in zip(encoded_str, sorted_str)}

print("String to Int:\n", stoi, "\n")
print("Int to String:\n", itos)

String to Int:
 {'\n': 0, ' ': 1, '!': 2, '$': 3, '&': 4, "'": 5, ',': 6, '-': 7, '.': 8, '3': 9, ':': 10, ';': 11, '?': 12, 'A': 13, 'B': 14, 'C': 15, 'D': 16, 'E': 17, 'F': 18, 'G': 19, 'H': 20, 'I': 21, 'J': 22, 'K': 23, 'L': 24, 'M': 25, 'N': 26, 'O': 27, 'P': 28, 'Q': 29, 'R': 30, 'S': 31, 'T': 32, 'U': 33, 'V': 34, 'W': 35, 'X': 36, 'Y': 37, 'Z': 38, 'a': 39, 'b': 40, 'c': 41, 'd': 42, 'e': 43, 'f': 44, 'g': 45, 'h': 46, 'i': 47, 'j': 48, 'k': 49, 'l': 50, 'm': 51, 'n': 52, 'o': 53, 'p': 54, 'q': 55, 'r': 56, 's': 57, 't': 58, 'u': 59, 'v': 60, 'w': 61, 'x': 62, 'y': 63, 'z': 64} 

Int to String:
 {0: '\n', 1: ' ', 2: '!', 3: '$', 4: '&', 5: "'", 6: ',', 7: '-', 8: '.', 9: '3', 10: ':', 11: ';', 12: '?', 13: 'A', 14: 'B', 15: 'C', 16: 'D', 17: 'E', 18: 'F', 19: 'G', 20: 'H', 21: 'I', 22: 'J', 23: 'K', 24: 'L', 25: 'M', 26: 'N', 27: 'O', 28: 'P', 29: 'Q', 30: 'R', 31: 'S', 32: 'T', 33: 'U', 34: 'V', 35: 'W', 36: 'X', 37: 'Y', 38: 'Z', 39: 'a', 40: 'b', 41: 'c', 42: 'd', 43: 'e', 4

In [10]:
encoded_data = torch.tensor(tokenizer.encode(data))
encoded_data

tensor([18, 47, 56,  ..., 45,  8,  0])

In [22]:
train_size = int(len(encoded_data) * 0.9)
val_size = len(encoded_data) - train_size

train_data, val_data = encoded_data[:train_size], encoded_data[train_size + 1: len(encoded_data)]
train_data

tensor([18, 47, 56,  ..., 43, 56, 43])

In [23]:
# hyperparameters

batch_size = 64
block_size = 256
n_embd = 384
n_head = 6
n_layer = 6
dropout = 0.2
learning_rate = 3e-4
max_iters = 5000

In [42]:
def get_batch(split: str):
    global train_data, val_data
    global train_size, val_size
    global block_size, batch_size, device

    data = {
        'train': train_data,
        'test': val_data
    }.get(split.lower(), None)

    data_size = {
        'train': train_size,
        'test': val_size
    }.get(split.lower(), None)

    if data is None or data_size is None:
        raise ValueError(f"Invalid split: {split}")
    
    start_indices = [random.randint(0, data_size - block_size - 1) for _ in range(batch_size)]

    x_list, y_list = [], []
    for idx in start_indices:
        x = data[idx: idx + block_size]
        y = data[idx + 1: idx + block_size + 1]

        x_list.append(x.unsqueeze(0))
        y_list.append(y.unsqueeze(0))

    x_batch = torch.cat(x_list, dim=0).to(device)
    y_batch = torch.cat(y_list, dim=0).to(device)

    return (x_batch, y_batch)
        
train_xbatch, train_ybatch = get_batch('train')
trainT, trainB = train_xbatch.shape

In [45]:
train_xbatch.size()

torch.Size([64, 256])

## Model Training

In [60]:
class Head(nn.Module):
    def __init__(self, n_embd: int, head_size: int, block_size: int, dropout: int):
        super().__init__()
        self.query = nn.Linear(n_embd, head_size)
        self.key = nn.Linear(n_embd, head_size)
        self.value = nn.Linear(n_embd, head_size)
        
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))           

        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        Q = self.query(x)
        K = self.key(x)
        V = self.value(x)
        
        B, T, C = x.shape
        d_k = K.shape[-1]
        # (1, T, T) -> (B, T, T)
        mask = self.tril[:T, :T].unsqueeze(0).expand(B, -1, -1).to(x.device)

        weights = Q @ K.transpose(-2, 1) * d_k ** -0.5
        weights = weights.masked_fill(mask == 0, float('inf'))

        weights = F.softmax(weights, dim=-1)
        weights = self.dropout(weights)
        
        out = weights @ V
        return out
        

class MultiHeadAttention(nn.Module):
    def __init__(self, vocab_size, block_size, n_embd, n_head, dropout):
        super().__init__()
        head_size = n_embd // n_head
        self.heads = nn.ModuleList([Head(n_embd, head_size, block_size, dropout) for _ in range(n_head)])
        self.projection = nn.Linear(n_head * head_size, n_embd)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        cat_heads = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.projection(cat_heads)
        out = self.dropout(out)
        return out

class FeedForward(nn.Module):
    def __init__(self, n_embd, dropout):
        super().__init__()
        self.linear1 = nn.Linear(n_embd, 4 * n_embd)
        self.relu = nn.ReLU()
        self.linear2 = nn.Linear(4 * n_embd, n_embd)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        out = self.linear1(x)
        out = self.relu(out)
        out = self.linear2(out)
        out = self.dropout(out)
        return out

class Block(nn.Module):
    def __init__(self, vocab_size, block_size, n_embd, n_head, dropout):
        super().__init__()

        self.attention = MultiHeadAttention(
            vocab_size = vocab_size,
            block_size = block_size,
            n_embd = n_embd,
            n_head = n_head,
            dropout = dropout
        )
        self.feed_forward = FeedForward(n_embd, dropout)

        self.norm1 = nn.LayerNorm(n_embd)
        self.norm2 = nn.LayerNorm(n_embd)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # original input are added back intentionally as residuals
        x = x + self.attention(self.norm1(x))
        x = x + self.feed_forward(self.norm2(x))
        return x
    

class GPTLanguageModel(nn.Module):
    def __init__(self, vocab_size, block_size, n_embd, n_head, n_layer, dropout, device):        
        super().__init__()
        self.block_size = block_size
        self.device = device

        self.token_embd = nn.Embedding(vocab_size, n_embd)
        self.pos_embd = nn.Embedding(block_size, n_embd)
        
        self.blocks = nn.Sequential(*[
            Block(vocab_size, block_size, n_embd, n_head, dropout) for _ in range(n_layer)
        ])
        
        self.norm = nn.LayerNorm(n_embd)
        self.out = nn.Linear(n_embd, vocab_size)

        self.to(device)

    def forward(self, idx: torch.Tensor, targets: Optional[torch.Tensor] = None) -> Tuple[torch.Tensor, Optional[torch.Tensor]]:
        B, T = idx.size()

        tk_embedding = self.token_embd(idx)
        pos_embedding = self.pos_embd(torch.arange(T, device=self.device))

        x = tk_embedding + pos_embedding
        x = self.blocks(x)
        x = self.norm(x)
        logits = self.out(x)

        loss = None
        if targets:
            B, T, C = logits.shape
            logits = logits.view(B * T, C)
            targets = targets.view(B * T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss

    def generate(self, idx: torch.Tensor, max_new_tokens: int) -> torch.Tensor:
        for _ in range(max_new_tokens):
            # crop idx to last block_size tokens
            crop_input = idx[:, -self.block_size:]

            # get logits from forward
            logits, _ = self(crop_input)
            logits = logits[:, -1, :]

            # apply softmax to the last time step's logits
            prob = F.softmax(logits, dim=-1)

            # sample next token using torch.multinomial
            next_idx = torch.multinomial(prob, num_samples=1)

            # append sampled token to idx
            idx = torch.cat((idx, next_idx), dim=1)
        return idx
        

In [61]:
model = GPTLanguageModel(vocab_size, block_size, n_embd, n_head, n_layer, dropout, device)

optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

In [62]:
for epoch in range(max_iters):
    x, y = get_batch('train')

    model.train()
    x.shape
    outputs, loss = model(x, y)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    if (i + 1) % 100 == 0:
        model.eval()
        with torch.no_grad():
            val_x, val_y = get_batch('val')
            _, val_loss = model(val_x, val_y)
        print(f"Epoch [{epoch + 1}/{max_iters}], Step [{i + 1}/{batch_size}], Train Loss: {loss.item():.4f} | Val Loss: {val_loss.item():.4f}")

RuntimeError: Expected size for first two dimensions of batch2 tensor to be: [64, 64] but got: [64, 256].