In [1]:
print("Hello worldds")

Hello worldds


In [None]:
https://www.youtube.com/watch?v=oLUrXDFiJAc

# How do LLMs learn while predicting the next token?

## The Deceptive Simplicity of Next-Token Prediction

## The MMechanics of Token Prediction - minimal implementation of the Transformer Language model

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
import numpy as np 
from torch.utils.data import Dataset, DataLoader

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()

        # Create position encodings once and for all
        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, d_model)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x):
        # x has shape [seq_len,, batch_size, embedding_dim]
        return x + self.pe[:x.size(0), :]
    
class TransformerLM(nn.Module):
    def __init__(self, vocab_size, d_model=512, nhead=8, num_layers=6, dim_feedforward=2048, droupout=0.1):
        super().__init__()
        self.model_type = 'Transformer'
        self.d_model = d_model

        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoder = PositionalEncoding(d_model)

        # Create a standard transformer encoder 
        encoder_layers = nn.TransformerEncoderLayer(d_model, nhead, dim_feedforward, droupout)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layers, num_layers)

        # Final layer to predict token probabilities 
        self.output_layer = nn.Linear(d_model, vocab_size)

        self.init_weights()

    def init_weights(self):
        initrange =  0.1
        self.embedding.weight.data.uniform_(-initrange, initrange)
        self.output_layer.bias.data.zero_(-initrange, initrange)

    def forward(Self, src, src_mask=None):
        # src shape: [seq_len, batch_size]

        # Create embeddings 
        src = self.embedding(src) * math.sqrt(self.d_model)
        src = self.pos_encoder(src)

        # Pass through transformer 
        if src_mask is None:
            # Create a causal mask to prevent attending to future tokens
            src_mask = nn.Transformer.generate_square_subsequent_mask(src.size(0))
            src_mask = src_mask.to(src.device)

        output =  self.transformer_encoder(src, src_mask)

        # Project to vocabulary distribution
        output = self.output_layer(output)

        return output

### The learning process, implementing the training loop - Maximizing log-likelihood, HHow does the LLM actually "learn"

In [None]:
def train_transformer_lm(model, data_loader, optimizer,  criterion, device, clip_grad=1.0):
    model.train()
    total_loss = 0.
    for batch_idx, (data, targets) in enumerate(data_loader):
        data, ttargets = data.to(device), targets.to(device)

        # Zero gradients from previous iteration 
        optimizer.zero_grad()

        # Forward pass 
        output  = model(data)

        # Reshape for loss computation 
        output = output.view(-1, output.size(-1))
        targets = targets.vieww(-1)

        # compute loss (negative, log-likelihood)
        loss = criterion(output, targets)

        # Backward pass 
        loss.backward()

        # Clip gradients to prevent exploding gradients 
        torch.nn.utils.clilp_grad_norm_(model.parameters(), clip_grad)

        # Update weights 
        optimizer.step()

        total_loss += loss.item()

        if batch_idx % 200 == 0:
            print(f"Batch {batch_idx}, Loss: {loss.item():.4f}")

    return total_loss / len(data_loader)

### Emergent capabilities: Beyond Next-Token Prediction

In [None]:
def generate_text(model, start_sequence, max_new_tokens=50, temperature=1.0):
    """Generate text from the model, starting from the given sequence."""
    model.eval()

    input_ids = torch.tensor(start_sequence, dtype=torch.long).unsqueeze(0)
    generated_tokens = []

    for _ in range(max_new_tokens):
        # Prevent attending to future tokens
        attn_mask = torch.tril(torch.ones((input_ids.size(1), input_ids.size(1))))

        # Forward pass 
        with torch.no_grad():
            outputs = model(input_ids, attn_mask)
            next_token_logits = outputs[0, -1, :]

            # Apply temperature sampling
            if temperature > 0:
                next_token_logits = next_token_logits / temperature

            # Sample from the distribution 
            probs = F.softmax(next_token_logits, dim=-1)
            next_token = torch.multinomial(probs, num_samples=1).item()

            generated_tokens.append(next_token)

            # Append the preducted token to input for next iteration
            input_ids = torch.cat([input_ids, torch.tensor([[next_token]])], dim=1)
    return generated_tokens
    
def analyze_activation_patterns(model, input_text, layer_idx= -1):
    """Analyze the internal activation patters for the given input."""
    model.eval()
    input_ids = torch.tensor(input_text, dtype=torch.long).unsqueeze(0)

    # Register hook to get activations 
    activations = {}
    def get_activation(name):
        def hook(model, input, output):
            activations[name] = output.detach()
        return hook
    
    # Attach hook to the specified transformer layer
    model.transformer_encoder.layers[layer_idx].register_forward_hook(
        get_activation(f'transformer_layer_{layer_idx}')
    )

    # Forward pass 
    with torch.no_grad():
        model(input_ids)

    # Analyze the activations (e.g compute principal components, clusters, etc.)
    layer_activations = activations[f'transformer_layer_{layer_idx}']

    # Compute PCA for visualization (Example)

    from sklearn.decomposition import PCA
    pca = PCA(n_components=2)
    activation_2d = pca.fit_transform(layer_activations.squeeze(1).numpy())

    return activation_2d

### Information Compression and Internal Representations


In [None]:
def compute_mutual_information(model, dataset, num_samples=1000):
    """Estimate the mutual information btwn input features and internal representaions."""
    model.eval()
    representations = []
    inputs = []

    # Collect samples
    data_loader = DataLoader(dataset, batch_size=1, shuffle=True)
    for i, (input_ids, _) in enumerate(data_loader):
        if i >= num_samples:
            break

        # Get internal representations at a specific layer
        with torch.no_grad():
            # Forward pass through embedding layer
            embedded = model.embedding(input_ids) * math.sqrt(model.d_model)
            embedded = model.pos_encoder(embedded)

            # Get representation after first transformer layer
            layer_output = model.transformer_encoder.layers[0](embedded)

            # store representation and input
            representations.append(layer_output.meaan(dim=0).numpy())
            inputs.append(input_ids.numpy())

    representations = np.array(representations)
    inputs = np.array(inputs)

    # Estimate mutual information (simplified approximation)
    from sklearn.feature_selection import mutual_info_regression

    # Flatten inputs for MI calculation
    flat_inputs = inputs.reshape(inputs.shape[0], -1)

    # Calculate mutual information btwn each input dimension and representation
    mi_scores = []
    for i in range(representations.shape[1]):
        mi = mutual_info_regression(flat_inputs, representations[:, i])
        mi_scores.append(mi.mean())

    return np.array(mi_scores)

### The Emergence of In-Context Learning - adapts to new tasks without updating model params--all from next-token prediction

In [None]:
def demonstrate_in_context_learning(model, tokenizer, examples, test_input):
    """Show how a model can learn from examples in context."""
    # Format prompt with examples
    prompt = "Here are some examples:\n\n"
    for x, y in examples:
        prompt += f"Input: {x}\nOutput: {y}\n\n"

    # Add test case 
    prompt += f"Input: {test_input}\nOutput:"

    # Tokenize
    input_ids = tokenizer.encode(prompt, return_tensor="pt")

    # Generate response
    model.eval()
    with torch.no_grad():
        output_ids = model.generate(
            input_ids,
            max_new_tokens=50,
            temperature=0.7,
            do_sample=True,
            pad_token_id=tokenizer.eos_token_id
        )

    #3 Decode and return
    output_text = tokenizer.decode(output_ids[0], skip_special_tokens=True)
    return output_text.split("Output:")[-1].strip()



### Scaling laws and emerging abilities

In [None]:
def analyze_scaling_laws(model_sizes, task_performances):
    """Analyze how performance scales with model size."""
    import matplotlib.pyplot as plt

    # Convert to log scale
    log_sizes = np.log(model_sizes)

    # Plot scaliing relationship
    plt.figure(figsize=(10,6))
    for task_name, performances in task_performances.items():
        plt.plot(log_sizes, performances, marker='o', label=task_name)

    # compute power law fit 
    for task_name, performances in task_performances.items():
        # y = ax^b -> log(y) = log(a) + b * log(x)
        coef  = np.polyfit(log_sizes, performances, 1)
        poly1d_fn = np.poly1d(coef)
        plt.plot(log_sizes, poly1d_fn(log_sizes), '--', alpha=0.7)
        print(f"Task: {task_name}, Power law: y = {np.exp(coef[1]):.4f} * X^{coef[0]:.4f}")

    plt.xlabel('Log(Model Size)')
    plt.ylabel('Performance')
    plt.title('Scaling laws for different tasks')
    plt.legend()
    plt.grid(True, alpha=0.3)

    return plt

### Probing internal knowledge representation

In [None]:
def train_linear_probe(model, dataset, task='part_of_speech', hidden_layer=6):
    """Train a linear probe to extract information from model representations."""
    model.eval()

    # Collect representations and labels
    representations = []
    labels = []

    for input_ids, label in dataset:
        with torch.no_grad():
            # Get hidden states from specified layerr
            hidden_states = model.get_hidden_states(input_ids, layer=hidden_layer)
            representations.append(hidden_states.cpu().numpy())
            labels.append(label.cpu().numpy())

    X = np.vstack(representations)
    y = np.concatenate(labels)

    # Train a linear classifier
    from sklearn.linear_model import LogisticRegression
    from sklearn.model_selection import train_test_split
    from sklearn.metrics import accuracy_score

    X_train, X_test, y_train, y_test = train_test_split(X,y,test_size=0.2, random_state=42)

    clf = LogisticRegression(max_iter=1000)
    clf.fit(X_train, y_train)

    # Evaluate
    y_pred = clf.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    print(f"Probe accuracy for {task}: {accuracy:.4f}")

    return clf, accuracy

### Interpretable Mechanistic Analysis - Identifying important attention patterns

In [None]:
def analyze_attention_patterns(model, input_text, head_idx=0, layer_idx=0):
    """Analyze and visualize attention patterns in a specific attention head."""
    model.eval()

    # Tokenize input
    input_ids = tokenizer.encode(input_text, return_tensor="pt")

    # Forward pass, capturing attention weights
    with torch.no_grad():
        outputs = model(input_ids, output_attentions=True)

    # Get attention weights for the specified layer and head
    # Shape: [batch_size, num_heads, seq_len, seq_len]
    attention_weights = outputs.attentions[layer_idx][0, head_idx].cpu().numpy()
    
    # Get tokens for visualization
    tokens = tokenizer.convert_ids_to_tokens(input_ids[0])

    # Visualize attention
    import matplotlib.pyplot as plt
    import seaborn as sns

    plt.figure(figsize=(10, 8))
    sns.heatmap(attention_weights,
                xticklabels=tokens,
                yticklabels=tokens,
                cmap="viridis")
    plt.title(f"Attention patterns for Layer {layer_idx}, head {head_idx}")
    plt.tight_layout()

    return attention_weights, plt
    

### Role of data in LLM Learning

In [None]:
def compare_models_With_different_data(model_a, model_b, test_prompts):
    """Compare outputs from models trained on different data distributions"""
    results = []

    for prompt in test_prompts:
        output_a = generate_text(model_a, prompt)
        output_b = generate_text(model_b, prompt)

        results.append({
            "prompt":prompt,
            "model_a_output": output_a,
            "model_b_output": output_b,
            "difference": compare_outputs(output_a, output_b)
        })

    return results

def compare_outputs(output_a, output_b):
    """Compute a measure of difference  between two models outputs"""
    from nltk.translate.bleu_score import sentence_bleu

    # Tokenize outputs
    tokens_a = output_a.split()
    tokens_b = output_b.split()

    # Complete BLEU score as similarity measure
    bleu = sentence_bleu([tokens_a], tokens_b)


    return 1.0 - bleu # Return difference rather than similarity

### From prediction to Reasoning: Chain-of-Thought and Self-Consistency

In [None]:
def standard_vs_chain_of_thought(model, tokenizer, problems):
    """Compare standard prompting vs. chain-of-thought prompting"""
    result = []

    for problem in problems:

        # Standard prompting 
        standard_prompt = f"Problem: {problem}\nAnswer:"
        standard_output = generate_completion(model, tokenizer, standard_prompt)

        # Chain of thought prompting
        cot_prompt = f"Problem: {prblem}\nLet's solve this step-by-step:"
        cot_output = generate_completion(model, tokenizer, cot_prompt)

        # Extract final answers
        standard_answer = extract_answer(standard_output)
        cot_answer = extract_answer(cot_output)

        results.append({
            "problem":problem,
            "standard_output": standard_output,
            "cot_output": cot_output,
            "standard_answer": standard_answer,
            "cot_answer":cot_answer,
            "standard_correct":evaluate_answer(problem, standard_answer)
            "cot_correct": evaluate_answer(problem, cot_answer)
        })

    return result

def self_consistency_sampling(model, tokenizer, problem, n_samples=5):
    """Implement self consistency sampling for more reliable reasoning"""
    cot_prompt = f"Problem: {problem}\n Lets solve this step by step"

    # Generate multiple chain of thought paths
    paths = []
    answers = []

    for _ in range(n_samples):
        output = generate_completion(model, tokenizer, cot_prompt, temperature=0.7)
        answer = extract_answer(output)

        paths.append(output)
        answers.append(answer)

    #Return most common answer and all reasoning paths
    from collections import Counter
    answer_counts = Counter(answers)
    most_common_answer = answer_counts.most_common(1)[0][0]

    return {
        "problem": problem,
        "paths": paths,
        "answers":answers,
        "most_common_answer": most_common_answer,
        "is_correct": evaluate_answer(problem, most_common_answer)
    }


# MULTI-HEAD ATTENTION  - Implementing it in Pytorch

In [None]:
class PrepareForMultiHeadAttention(nn.Module):
    def __init__(self, d_model: int, heads: int, d_k : int, bias: bool):
        super().__init__()
        # Linear layer for transformation
        self.linear == nn.Linear(d_model, heads * d_k, bias=bias)

        # Number of attention heads
        self.heads = heads
        # Dimension of each head
        self.d_k = d_k

    def forward(self, x: torch.Tensor):
        # Save original shape (except last dimension)
        head_shape = x.shape[:-1]

        # Apply linear transformation
        x = self.linear(x)

        # Reshape to separate heads
        x = x.view(*head_shape, self.heads, self.d_k)

        return x

## Multi-Head Attention implementation

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, heads: int, d_model: int, dropout_prob: float = 0.1, bias: bool = True):
        super().__init__()

        # Features per head
        self.d_k = d_model // heads
        self.heads = heads

        # Transform query, key, value
        self.query = PrepareForMultiHeadAttention(d_model,  heads, self.d_k, bias=bias)
        self.key = PrepareForMultiHeadAttention(d_model, heads, self.d_k,  bias=bias)
        self.value = PrepareForMultiHeadAttention(d_model, heads, self.d_k, bias=True)

        self.softmax = nn.Softmax(dim=1)
        self.output = nn.Linear(d_model, d_model)
        self.dropout = nn.Droupout(dropout_prob)
        self.scale = 1 / math.sqrt(self.d_k) # prevents the dot products from growing too large when the dimension increases.

        self.attn = None

    def forward(self, *, query, key, value, mask=None):
        seq_len, batch_size, _ = query.shape

        if mask is not None:
            mask = self.prepare_mask(mask, query.shape, key.shape)

        # Transform inputs
        query = self.query(query)
        key = self.key(key)
        value = self.value(value)

        # Calculate attention scores
        scores = self.get_scores(query, key)

        # Scale scores
        scores *= self.scale

        # Apply mask if provided
        if mask is not None:
            scores = scores.masked_fill(mask==0, float('-inf'))

        # Apply softmax
        attn = self.softmax(scores)

        # Apply dropout
        attn = self.dropout(attn)

        # Multiply by values 
        X = torch.einsum("ijbh, jbhd -> ibhd", attn, value)

        # Save attention weights
        self.attn = attn.detach()

        # Reshape and apply output layer
        X = X.reshape(seq_len, batch_size, -1)
        return self.output(X)

### Attention Mechanism step-by-step
#### First we transform, query, key and value vectors using the preparation class.
#### Then, we calculate attentins scores using the get_scores method

In [None]:
def get_scores(self, query: torch.Tensor, key: torch.Tensor):
    # Calculate dot product between queries and keys
    return torch.einsum('ibhd, jbhd -> ijbh', query, key)

### MASKING PROCESS

In [None]:
def prepare_mask(self, mask: torch.Tensor, query_shape: List[int], key_shape: List[int]):
    # Ensure mask dimensions match
    assert mask.shape[0] == 1 or mask.shape[0] == query_shape[0]
    assert mask.shape[1] == key_shape[0]
    assert mask.shape[2] == 1 or mask.shape[2] == query_shape[1]

    # Add dimension for heads
    mask = mask.unsqueeze(-1)

    return mask

In [None]:
def create_causal_mask(seq_len):
    # Create a matrix where each position (i, j) is 1 if j <= i, else 0
    # The mask should have 1s where we ALLOW attention (not where we block it)
    mask = torch.tril(torch.ones(seq_len, seq_len), diagonal=0)

    #Add batch dimesnion and trabspoise to match expected shapre
    return mask.unsqueeze(2).byte()

In [None]:
def test_multi_head_attention():
    print("Testing multihead attention")

    # Test parameters
    batch_size = 2
    seq_len = 5
    d_model = 64
    heads = 8

    # Create random test inputs
    query = torch.randn(seq_len, batch_size, d_model)
    key = torch.randn(seq_len, batch_size, d_model)
    value = torch.randn(seq_len, batch_size, d_model)

    # Create nodel
    mha = MultiHeadAttention(heads=heads, d_model=d_model)

    # Test without mask