## The transformer architecture

<div>
<img src="https://heidloff.net/assets/img/2023/02/transformers.png" width="800"/>
</div>

- #### how does a generative pre-trained transformer differ from a transformer?
    - a gpt has no encoder and multi-head attention (what the encoder plugs into)
    - so it will basically :
        - i) masked multi head attention
        - ii) add and normalise
        - iii) feed forward
        - iv) add and normalise
        - v) linear transformation
        - vi) softmax

# Converting the Romeo and Juliet model to a GPT model

## 1) import necessary modules and set the hyperparameters

In [1]:
import torch
import torch.nn as nn
from torch.nn import functional as F
import mmap
import random
import pickle
import argparse

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)

block_size = 64
batch_size = 128
max_iterations = 5000
learning_rate = 3e-4
evaluation_iterations = 100
evaluation_interval = 200
n_embd = 384
n_head = 8
n_layer = 4
dropout = 0.2

cuda


## 2) read the text file with data and make a sorted set of characters to get the vocab_size 

In [2]:
with open('data.txt', 'r', encoding='utf-8') as file:
    text = file.read()  
    
chars = sorted(set(text))
vocab_size = len(chars)
print(vocab_size)

71


## 3) make a character-level tokenizer and encode the text corpus

In [3]:
string_to_integer = { ch:i for i,ch in enumerate(chars) }
integer_to_string = { i:ch for i,ch in enumerate(chars) }
encoder = lambda s: [string_to_integer[c] for c in s]
decoder = lambda l: ''.join([integer_to_string[i] for i in l])

data = torch.tensor(encoder(text), dtype=torch.long)

## 4) Create training and Validation splits and define the get_batch function

In [4]:
split_size = int(0.8*len(data))
training_data = data[:split_size]
validation_data = data[split_size:]

def get_batch(split):
    data = training_data if split == 'train' else validation_data
    ix = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])
    x,y = x.to(device), y.to(device)
    return x, y


## 5) Define the estimate loss function

In [5]:
@torch.no_grad()
def estimate_loss():
    out = {}
    model.eval()
    for split in ['train', 'val']:
        losses = torch.zeros(evaluation_iterations)
        for k in range(evaluation_iterations):
            inputs, targets = get_batch(split)
            logits, loss = model(inputs, targets)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out

## 6) Define the gpt model class and initialise a model
- #### i) we add positional encoding as per the architechture

In [6]:
class Head(nn.Module):
    """One head of self-attention."""
    
    def __init__(self, head_size):
        """
        Initializes the self-attention head.
        
        Args:
            head_size (int): The dimensionality of each attention head.
        """
        super().__init__()
        # Linear layers to project input embeddings to key, query, and value vectors
        self.key = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        
        # Create a lower triangular matrix for causal masking to prevent attention to future tokens
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))
        
        # Dropout layer for regularization
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x):
        """
        Performs the forward pass for the self-attention head.
        
        Args:
            x (torch.Tensor): Input tensor of shape (B, T, C), where
                              B = Batch size,
                              T = Sequence length,
                              C = Embedding dimension.
        
        Returns:
            torch.Tensor: Output tensor after applying self-attention, shape (B, T, head_size).
        """
        B, T, C = x.shape  # Unpack the input shape
        
        # Project input embeddings to keys and queries
        k = self.key(x)    # Shape: (B, T, head_size)
        q = self.query(x)  # Shape: (B, T, head_size)
        
        # Compute attention scores by taking the dot product of queries and keys
        # Transpose k to shape (B, head_size, T) for batch matrix multiplication
        wei = q @ k.transpose(-2, -1) * (k.shape[-1] ** -0.5)  # Shape: (B, T, T)
        
        # Apply causal masking to ensure each position can only attend to previous positions
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf'))  # Shape: (B, T, T)
        
        # Apply softmax to obtain attention weights
        wei = F.softmax(wei, dim=-1)  # Shape: (B, T, T)
        
        # Apply dropout to the attention weights for regularization
        wei = self.dropout(wei)
        
        # Project input embeddings to values
        v = self.value(x)  # Shape: (B, T, head_size)
        
        # Perform weighted aggregation of the values based on attention weights
        out = wei @ v  # Shape: (B, T, head_size)
        
        return out

class MultiHeadAttention(nn.Module):
    """Multiple heads of self-attention in parallel."""
    
    def __init__(self, num_heads, head_size):
        """
        Initializes the multi-head self-attention module.
        
        Args:
            num_heads (int): Number of attention heads.
            head_size (int): Dimensionality of each attention head.
        """
        super().__init__()
        # Create a list of Head modules
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        
        # Linear layer to project concatenated head outputs back to embedding dimension
        self.proj = nn.Linear(head_size * num_heads, n_embd)
        
        # Dropout layer for regularization
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x):
        """
        Performs the forward pass for multi-head self-attention.
        
        Args:
            x (torch.Tensor): Input tensor of shape (B, T, C).
        
        Returns:
            torch.Tensor: Output tensor after multi-head attention, shape (B, T, n_embd).
        """
        # Concatenate outputs from all attention heads along the embedding dimension
        out = torch.cat([h(x) for h in self.heads], dim=-1)  # Shape: (B, T, head_size * num_heads)
        
        # Project the concatenated outputs back to the original embedding dimension
        out = self.dropout(self.proj(out))  # Shape: (B, T, n_embd)
        
        return out

class FeedForward(nn.Module):
    """A simple linear layer followed by a non-linearity."""
    
    def __init__(self, n_embd):
        """
        Initializes the feedforward network.
        
        Args:
            n_embd (int): Embedding dimension.
        """
        super().__init__()
        # Define a sequential network comprising linear layers and ReLU activation
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),  # Expand embedding dimension
            nn.ReLU(),                      # Apply ReLU activation
            nn.Linear(4 * n_embd, n_embd),  # Project back to original embedding dimension
            nn.Dropout(dropout),            # Apply dropout for regularization
        )
    
    def forward(self, x):
        """
        Performs the forward pass for the feedforward network.
        
        Args:
            x (torch.Tensor): Input tensor of shape (B, T, C).
        
        Returns:
            torch.Tensor: Output tensor after feedforward processing, shape (B, T, C).
        """
        return self.net(x)

class Block(nn.Module):
    """Transformer block: communication followed by computation."""
    
    def __init__(self, n_embd, n_head):
        """
        Initializes the Transformer block.
        
        Args:
            n_embd (int): Embedding dimension.
            n_head (int): Number of attention heads.
        """
        super().__init__()
        head_size = n_embd // n_head  # Determine head size based on embedding dimension and number of heads
        self.sa = MultiHeadAttention(n_head, head_size)  # Multi-head self-attention module
        self.ffwd = FeedForward(n_embd)                  # Feedforward network
        self.ln1 = nn.LayerNorm(n_embd)                  # Layer normalization after attention
        self.ln2 = nn.LayerNorm(n_embd)                  # Layer normalization after feedforward
    
    def forward(self, x):
        """
        Performs the forward pass for the Transformer block.
        
        Args:
            x (torch.Tensor): Input tensor of shape (B, T, C).
        
        Returns:
            torch.Tensor: Output tensor after processing, shape (B, T, C).
        """
        # Apply multi-head self-attention
        y = self.sa(x)  # Shape: (B, T, C)
        
        # Add residual connection and apply layer normalization
        x = self.ln1(x + y)  # Shape: (B, T, C)
        
        # Apply feedforward network
        y = self.ffwd(x)  # Shape: (B, T, C)
        
        # Add residual connection and apply layer normalization
        x = self.ln2(x + y)  # Shape: (B, T, C)
        
        return x

class GPTLanguageModel(nn.Module):
    """GPT Language Model implementing Transformer architecture."""
    
    def __init__(self, vocab_size):
        """
        Initializes the GPT language model.
        
        Args:
            vocab_size (int): Size of the vocabulary (number of unique tokens).
        """
        super().__init__()
        # Token embedding table maps each token index to an embedding vector
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
        
        # Position embedding table provides positional information for each token in the sequence
        self.position_embedding_table = nn.Embedding(block_size, n_embd)
        
        # Stack multiple Transformer blocks to build the model's depth
        self.blocks = nn.Sequential(*[Block(n_embd, n_head=n_head) for _ in range(n_layer)])
        
        # Final layer normalization for stabilizing the output
        self.ln_f = nn.LayerNorm(n_embd)
        
        # Language modeling head projects the final embeddings to vocabulary logits
        self.lm_head = nn.Linear(n_embd, vocab_size)
        
        # Initialize weights using the defined method
        self.apply(self._init_weights)
    
    def _init_weights(self, module):
        """
        Initializes weights of the model's layers.
        
        Args:
            module (nn.Module): A module within the model.
        """
        if isinstance(module, nn.Linear):
            # Initialize linear layers with normal distribution (mean=0, std=0.02)
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                # Initialize biases to zero
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            # Initialize embedding layers with normal distribution (mean=0, std=0.02)
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
    
    def forward(self, index, targets=None):
        """
        Performs the forward pass for the GPT language model.
        
        Args:
            index (torch.Tensor): Input tensor of token indices, shape (B, T).
            targets (torch.Tensor, optional): Target tensor of token indices, shape (B, T).
        
        Returns:
            Tuple[torch.Tensor, Optional[torch.Tensor]]:
                - logits: Predicted logits for each token, shape (B, T, vocab_size).
                - loss: Cross-entropy loss (if targets are provided), else None.
        """
        B, T = index.shape  # Unpack batch size and sequence length
        
        # Retrieve token embeddings for the input indices
        tok_emb = self.token_embedding_table(index)  # Shape: (B, T, C)
        
        # Create a range of positions and retrieve their embeddings
        pos_emb = self.position_embedding_table(torch.arange(T, device=device))  # Shape: (T, C)
        
        # Add token and position embeddings to incorporate positional information
        x = tok_emb + pos_emb  # Shape: (B, T, C)
        
        # Pass the embeddings through the stack of Transformer blocks
        x = self.blocks(x)  # Shape: (B, T, C)
        
        # Apply final layer normalization
        x = self.ln_f(x)  # Shape: (B, T, C)
        
        # Project the normalized embeddings to vocabulary logits
        logits = self.lm_head(x)  # Shape: (B, T, vocab_size)
        
        # If targets are provided, compute the cross-entropy loss
        if targets is None:
            loss = None
        else:
            # Reshape logits and targets for loss computation
            logits = logits.view(B * T, -1)    # Shape: (B*T, vocab_size)
            targets = targets.view(B * T)      # Shape: (B*T)
            loss = F.cross_entropy(logits, targets)  # Scalar loss value
        
        return logits, loss
    
    def generate(self, index, max_new_tokens):
        """
        Generates new tokens based on the input context.
        
        Args:
            index (torch.Tensor): Input tensor of token indices, shape (B, T).
            max_new_tokens (int): Number of new tokens to generate.
        
        Returns:
            torch.Tensor: Generated token indices, shape (B, T + max_new_tokens).
        """
        for _ in range(max_new_tokens):
            # Crop the input indices to the last block_size tokens to adhere to the model's maximum context length
            index_cond = index[:, -block_size:]
            
            # Perform a forward pass to get the logits for the current context
            logits, _ = self.forward(index_cond)
            
            # Focus on the logits of the last time step (the most recent token)
            logits = logits[:, -1, :]  # Shape: (B, C)
            
            # Apply softmax to convert logits to probabilities
            probs = F.softmax(logits, dim=-1)  # Shape: (B, C)
            
            # Sample the next token index from the probability distribution
            index_next = torch.multinomial(probs, num_samples=1)  # Shape: (B, 1)
            
            # Append the sampled token to the sequence
            index = torch.cat((index, index_next), dim=1)  # Shape: (B, T + 1)
        
        return index

# Instantiate the GPT language model with the specified vocabulary size
model = GPTLanguageModel(vocab_size)

# Move the model to the specified device (CPU or GPU)
m = model.to(device)

## 7) create an AdamW optimiser and define the training loop

In [7]:

optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

for iter in range(max_iterations):
    if iter % evaluation_iterations == 0:
        losses = estimate_loss()
        print(f"Iteration {iter}, training loss {losses['train']:.2f}, validation loss {losses['val']:.2f}")
    inputs, targets = get_batch('train')
    logits, loss = model.forward(inputs, targets)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

print(loss.item())

Iteration 0, training loss 4.38, validation loss 4.39
Iteration 100, training loss 2.34, validation loss 2.40
Iteration 200, training loss 1.97, validation loss 2.08
Iteration 300, training loss 1.77, validation loss 1.89
Iteration 400, training loss 1.63, validation loss 1.79
Iteration 500, training loss 1.54, validation loss 1.73
Iteration 600, training loss 1.47, validation loss 1.69
Iteration 700, training loss 1.41, validation loss 1.66
Iteration 800, training loss 1.35, validation loss 1.64
Iteration 900, training loss 1.30, validation loss 1.65
Iteration 1000, training loss 1.25, validation loss 1.65
Iteration 1100, training loss 1.20, validation loss 1.65
Iteration 1200, training loss 1.14, validation loss 1.64
Iteration 1300, training loss 1.09, validation loss 1.66
Iteration 1400, training loss 1.05, validation loss 1.67
Iteration 1500, training loss 1.00, validation loss 1.69
Iteration 1600, training loss 0.95, validation loss 1.72
Iteration 1700, training loss 0.90, validat

In [10]:
test_prompt = 'hi i am yung ting'
context = torch.tensor(encoder(test_prompt), dtype=torch.long, device=device)
# context = torch.zeros((1,1), dtype=torch.long, device=device)
generated_tokens = m.generate(context.unsqueeze(0), max_new_tokens=100)
generated_chars = decoder(generated_tokens[0].tolist())
print(generated_chars)

hi i am yung tings bers in cold make him list,
Enor kinsman is old, what passes’d it shall.
In tell you go to thee ni
