Exercise 5.1: Temperature-scaled softmax scores and sampling probabilities

In [1]:
import torch

# Define vocabulary
vocab = { 
    "closer": 0,
    "every": 1, 
    "effort": 2, 
    "forward": 3,
    "inches": 4,
    "moves": 5, 
    "pizza": 6,
    "toward": 7,
    "you": 8,
} 
inverse_vocab = {v: k for k, v in vocab.items()}

# Sample logits for next token prediction
next_token_logits = torch.tensor(
    [4.51, 0.89, -1.90, 6.75, 1.63, -1.62, -1.89, 6.28, 1.79]
)

def print_sampled_tokens(probas):
    """Print frequency of sampled tokens over 1000 samples."""
    torch.manual_seed(123)
    sample = [torch.multinomial(probas, num_samples=1).item() for i in range(1_000)]
    sampled_ids = torch.bincount(torch.tensor(sample))
    for i, freq in enumerate(sampled_ids):
        print(f"{freq} x {inverse_vocab[i]}")

def softmax_with_temperature(logits, temperature):
    """Apply temperature scaling to logits and compute softmax."""
    scaled_logits = logits / temperature
    return torch.softmax(scaled_logits, dim=0)

# Test different temperatures
temperatures = [1, 0.1, 5]  # Original, lower, and higher temperature
scaled_probas = [softmax_with_temperature(next_token_logits, T) for T in temperatures]

# Print results for each temperature
for i, probas in enumerate(scaled_probas):
    print(f"\nTemperature: {temperatures[i]}")
    print_sampled_tokens(probas)

# Check actual probability for "pizza" with temperature=5
temp5_idx = 2
pizza_idx = 6
print(f"\nActual probability for 'pizza' at temperature 5: {scaled_probas[temp5_idx][pizza_idx]:.4f}")


Temperature: 1
73 x closer
0 x every
0 x effort
582 x forward
2 x inches
0 x moves
0 x pizza
343 x toward

Temperature: 0.1
0 x closer
0 x every
0 x effort
985 x forward
0 x inches
0 x moves
0 x pizza
15 x toward

Temperature: 5
165 x closer
75 x every
42 x effort
239 x forward
71 x inches
46 x moves
32 x pizza
227 x toward
103 x you

Actual probability for 'pizza' at temperature 5: 0.0430


Exercise 5.2: Different temperature and top-k settings


In [2]:
import torch
import torch.nn.functional as F

def generate_with_temp_and_topk(logits, temperature=1.0, top_k=None):
    """Generate next token using temperature and optional top-k filtering."""
    # Apply temperature
    scaled_logits = logits / temperature
    
    if top_k is not None:
        # Get top k logits and their indices
        top_k_logits, top_k_indices = torch.topk(scaled_logits, k=min(top_k, len(scaled_logits)))
        
        # Create a mask of selected tokens
        mask = torch.zeros_like(scaled_logits)
        mask.scatter_(0, top_k_indices, 1)
        
        # Set non-top-k logits to negative infinity
        scaled_logits = torch.where(mask.bool(), scaled_logits, torch.tensor(float('-inf')))
    
    # Apply softmax to get probabilities
    probabilities = F.softmax(scaled_logits, dim=0)
    
    # Sample from the distribution
    next_token = torch.multinomial(probabilities, num_samples=1)
    
    return next_token, probabilities

# Test different combinations
test_logits = torch.tensor([4.5, 0.9, -1.9, 6.7, 1.6, -1.6, -1.9, 6.3, 1.8])
settings = [
    {"temp": 1.0, "top_k": None},
    {"temp": 0.5, "top_k": None},
    {"temp": 1.0, "top_k": 3},
    {"temp": 0.5, "top_k": 3}
]

for setting in settings:
    print(f"\nTemperature: {setting['temp']}, Top-k: {setting['top_k']}")
    token, probs = generate_with_temp_and_topk(
        test_logits, 
        temperature=setting['temp'], 
        top_k=setting['top_k']
    )
    print(f"Token probabilities: {probs}")


Temperature: 1.0, Top-k: None
Token probabilities: tensor([6.1615e-02, 1.6836e-03, 1.0238e-04, 5.5608e-01, 3.3903e-03, 1.3819e-04,
        1.0238e-04, 3.7275e-01, 4.1409e-03])

Temperature: 0.5, Top-k: None
Token probabilities: tensor([8.3993e-03, 6.2708e-06, 2.3189e-08, 6.8413e-01, 2.5429e-05, 4.2252e-08,
        2.3189e-08, 3.0740e-01, 3.7936e-05])

Temperature: 1.0, Top-k: 3
Token probabilities: tensor([0.0622, 0.0000, 0.0000, 0.5614, 0.0000, 0.0000, 0.0000, 0.3763, 0.0000])

Temperature: 0.5, Top-k: 3
Token probabilities: tensor([0.0084, 0.0000, 0.0000, 0.6842, 0.0000, 0.0000, 0.0000, 0.3074, 0.0000])


Exercise 5.3: Implementing top-p sampling


In [3]:
def top_p_sampling(logits, p=0.9):
    """
    Implement nucleus (top-p) sampling.
    
    Args:
        logits: Raw logits from the model
        p: Cumulative probability threshold (default: 0.9)
    
    Returns:
        Sampled token index
    """
    # Convert logits to probabilities
    probs = F.softmax(logits, dim=-1)
    
    # Sort probabilities in descending order
    sorted_probs, sorted_indices = torch.sort(probs, descending=True)
    
    # Calculate cumulative probabilities
    cumulative_probs = torch.cumsum(sorted_probs, dim=-1)
    
    # Create mask for tokens within top-p probability mass
    mask = cumulative_probs <= p
    
    # Add the first token that exceeds p (to ensure we have at least one token)
    mask[0] = True
    
    # Filter probabilities using the mask
    filtered_probs = torch.zeros_like(probs)
    filtered_probs[sorted_indices[mask]] = sorted_probs[mask]
    
    # Renormalize probabilities
    filtered_probs = filtered_probs / filtered_probs.sum()
    
    # Sample from the filtered distribution
    return torch.multinomial(filtered_probs, num_samples=1)

# Test the implementation
test_logits = torch.tensor([4.5, 0.9, -1.9, 6.7, 1.6, -1.6, -1.9, 6.3, 1.8])
p_values = [0.5, 0.9, 0.95]

for p in p_values:
    print(f"\nTop-p sampling with p={p}")
    sampled_token = top_p_sampling(test_logits, p=p)
    print(f"Sampled token index: {sampled_token.item()}")


Top-p sampling with p=0.5
Sampled token index: 3

Top-p sampling with p=0.9
Sampled token index: 3

Top-p sampling with p=0.95
Sampled token index: 7


Exercise 5.4: Implementing a custom tokenizer


In [4]:
class SimpleTokenizer:
    def __init__(self):
        # Define basic vocabulary with special tokens
        self.vocab = {
            "<|endoftext|>": 0,
            "<|unk|>": 1,
        }
        self.inverse_vocab = {v: k for k, v in self.vocab.items()}
        self.unk_token_id = 1

    def train(self, texts):
        """Train tokenizer on a list of texts."""
        # Simple word-based tokenization
        words = set()
        for text in texts:
            words.update(text.split())
        
        # Add words to vocabulary
        for word in sorted(words):
            if word not in self.vocab:
                self.vocab[word] = len(self.vocab)
        
        # Update inverse vocabulary
        self.inverse_vocab = {v: k for k, v in self.vocab.items()}

    def encode(self, text):
        """Convert text to token ids."""
        tokens = []
        for word in text.split():
            token_id = self.vocab.get(word, self.unk_token_id)
            tokens.append(token_id)
        return tokens

    def decode(self, token_ids):
        """Convert token ids back to text."""
        return " ".join(self.inverse_vocab.get(id, "<|unk|>") for id in token_ids)

# Test the tokenizer
texts = [
    "hello world",
    "world of AI",
    "hello AI world"
]

tokenizer = SimpleTokenizer()
tokenizer.train(texts)

# Test encoding and decoding
test_text = "hello AI world"
encoded = tokenizer.encode(test_text)
decoded = tokenizer.decode(encoded)

print(f"Original: {test_text}")
print(f"Encoded: {encoded}")
print(f"Decoded: {decoded}")

Original: hello AI world
Encoded: [3, 2, 5]
Decoded: hello AI world


Exercise 5.5: Implementing a custom dataset class


In [5]:
import torch
from torch.utils.data import Dataset, DataLoader

class LLMDataset(Dataset):
    def __init__(self, texts, tokenizer, max_length=1024, stride=512):
        self.input_ids = []
        self.target_ids = []
        
        for text in texts:
            # Tokenize text
            tokens = tokenizer.encode(text)
            
            # Create overlapping sequences
            for i in range(0, len(tokens) - max_length, stride):
                input_chunk = tokens[i:i + max_length]
                target_chunk = tokens[i + 1:i + max_length + 1]
                
                # Pad if necessary
                if len(input_chunk) < max_length:
                    input_chunk = input_chunk + [tokenizer.vocab["<|endoftext|>"]] * (max_length - len(input_chunk))
                    target_chunk = target_chunk + [tokenizer.vocab["<|endoftext|>"]] * (max_length - len(target_chunk))
                
                self.input_ids.append(torch.tensor(input_chunk))
                self.target_ids.append(torch.tensor(target_chunk))

    def __len__(self):
        return len(self.input_ids)

    def __getitem__(self, idx):
        return self.input_ids[idx], self.target_ids[idx]

# Test the dataset
texts = [
    "hello world of artificial intelligence",
    "language models are fascinating",
    "deep learning is amazing"
]

# Create and test dataset
tokenizer = SimpleTokenizer()
tokenizer.train(texts)
dataset = LLMDataset(texts, tokenizer, max_length=4, stride=2)

# Create dataloader
dataloader = DataLoader(dataset, batch_size=2, shuffle=True)

# Test iteration
for batch_idx, (inputs, targets) in enumerate(dataloader):
    print(f"\nBatch {batch_idx + 1}:")
    print(f"Input shape: {inputs.shape}")
    print(f"Target shape: {targets.shape}")
    print(f"Input tokens:\n{inputs}")
    print(f"Target tokens:\n{targets}")


Batch 1:
Input shape: torch.Size([1, 4])
Target shape: torch.Size([1, 4])
Input tokens:
tensor([[ 7, 14, 13,  4]])
Target tokens:
tensor([[14, 13,  4,  8]])


Exercise 5.6: Implementing a custom training loop


In [7]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import time

def train_model(model, train_dataloader, val_dataloader, 
                num_epochs=3, learning_rate=1e-4, device="cuda"):
    """
    Custom training loop for the LLM.
    
    Args:
        model: The LLM model
        train_dataloader: DataLoader for training data
        val_dataloader: DataLoader for validation data
        num_epochs: Number of training epochs
        learning_rate: Learning rate for optimization
        device: Device to train on ("cuda" or "cpu")
    """
    # Move model to device
    model = model.to(device)
    
    # Initialize optimizer and loss function
    optimizer = optim.AdamW(model.parameters(), lr=learning_rate)
    criterion = nn.CrossEntropyLoss()
    
    # Training loop
    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        start_time = time.time()
        
        # Training phase
        for batch_idx, (inputs, targets) in enumerate(train_dataloader):
            # Move batch to device
            inputs = inputs.to(device)
            targets = targets.to(device)
            
            # Forward pass
            outputs = model(inputs)
            
            # Reshape outputs and targets for loss calculation
            b, t, c = outputs.shape
            outputs = outputs.view(b * t, c)
            targets = targets.view(b * t)
            
            # Calculate loss
            loss = criterion(outputs, targets)
            
            # Backward pass and optimization
            optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
            
            total_loss += loss.item()
            
            # Print progress
            if (batch_idx + 1) % 100 == 0:
                print(f"Epoch [{epoch+1}/{num_epochs}], "
                      f"Step [{batch_idx+1}/{len(train_dataloader)}], "
                      f"Loss: {loss.item():.4f}")
        
        # Validation phase
        model.eval()
        val_loss = 0
        with torch.no_grad():
            for inputs, targets in val_dataloader:
                inputs = inputs.to(device)
                targets = targets.to(device)
                outputs = model(inputs)
                
                b, t, c = outputs.shape
                outputs = outputs.view(b * t, c)
                targets = targets.view(b * t)
                
                val_loss += criterion(outputs, targets).item()
        
        # Print epoch statistics
        epoch_time = time.time() - start_time
        avg_train_loss = total_loss / len(train_dataloader)
        avg_val_loss = val_loss / len(val_dataloader)
        
        print(f"\nEpoch {epoch+1} Summary:")
        print(f"Time: {epoch_time:.2f}s")
        print(f"Average Training Loss: {avg_train_loss:.4f}")
        print(f"Average Validation Loss: {avg_val_loss:.4f}\n")


