# Efficient Vision Transformer (ViT) Acceleration Techniques - Hands-On Practice

## Learning Objectives
Upon completion of this notebook, you will be able to:
- **Implement** Window Attention mechanisms for spatial locality in vision transformers
- **Code** Linear Attention using kernel approximations for linear complexity scaling
- **Develop** Sparse Attention patterns for efficient long-range dependencies
- **Analyze** computational complexity trade-offs between different attention mechanisms
- **Benchmark** performance differences across attention variants
- **Visualize** attention patterns and efficiency characteristics

## Abstract

Vision Transformers (ViTs) have demonstrated remarkable performance in computer vision tasks, but their quadratic computational complexity with respect to sequence length poses significant challenges for practical deployment. This notebook presents a comprehensive analysis of three prominent acceleration techniques: Window Attention, Linear Attention, and Sparse Attention.

## Introduction

The self-attention mechanism in Vision Transformers exhibits O(n²) complexity, where n represents the number of image patches. For high-resolution images, this becomes computationally prohibitive. We investigate three distinct approaches to address this limitation:

1. **Window Attention**: Restricts attention computation to local windows, reducing complexity to O(n)
2. **Linear Attention**: Approximates full attention using kernel methods with O(n) complexity
3. **Sparse Attention**: Selectively attends to a subset of tokens based on learned or fixed patterns

Each technique represents a different trade-off between computational efficiency and representational capacity.

---
**🔥 HANDS-ON PRACTICE**: This notebook contains code completion exercises marked with `# TODO:` comments. Complete the missing implementations to build efficient attention mechanisms from scratch!

In [None]:
# Import Required Libraries
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import time
import math
from typing import Optional, Tuple
from collections import defaultdict

# Set random seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)

# Configure plotting
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")

print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

## Theoretical Background

### Standard Self-Attention Mechanism

The standard self-attention mechanism works by computing relationships between all pairs of input tokens. For each token, it calculates how much attention to pay to every other token in the sequence. This process involves three main steps:

1. **Query, Key, Value Creation**: Each input token is transformed into three vectors - a query vector (what information this token is looking for), a key vector (what information this token contains), and a value vector (the actual information this token carries).

2. **Attention Score Calculation**: For each token, we compute similarity scores between its query vector and the key vectors of all other tokens. These scores determine how much each token should attend to every other token.

3. **Weighted Combination**: The attention scores are normalized using softmax to create attention weights, then used to compute a weighted combination of all value vectors.

**Computational Challenge**: The main bottleneck is computing attention scores between all pairs of tokens, which requires n-squared operations where n is the number of tokens. This becomes prohibitively expensive for long sequences.

### Problem Statement

For high-resolution images, Vision Transformers divide the image into patches (typically 16x16 pixels each). A 224x224 image creates 196 patches, but a 1024x1024 image creates 4,096 patches. The attention computation scales quadratically with the number of patches, making standard ViT computationally intractable for high-resolution images. This motivates the development of efficient attention mechanisms that maintain the quality of standard attention while reducing computational requirements.

In [None]:
# Basic Vision Transformer Components - HANDS-ON EXERCISE

class PatchEmbedding(nn.Module):
    """Convert image to patch embeddings"""
    def __init__(self, img_size: int = 224, patch_size: int = 16, in_channels: int = 3, embed_dim: int = 768):
        super().__init__()
        self.img_size = img_size
        self.patch_size = patch_size
        self.num_patches = (img_size // patch_size) ** 2
        
        # TODO: Create a convolutional layer that converts patches to embeddings
        # HINT: Use nn.Conv2d with kernel_size=patch_size, stride=patch_size
        # Input channels: in_channels, Output channels: embed_dim
        self.projection = # Your code here
        
    def forward(self, x):
        # TODO: Apply the projection and reshape for transformer input
        # HINT: 1) Apply self.projection, 2) Flatten spatial dims, 3) Transpose to (B, N, embed_dim)
        # x: (B, C, H, W) -> (B, embed_dim, H/P, W/P) -> (B, N, embed_dim)
        x = # Your code here - apply projection
        x = # Your code here - flatten and transpose
        return x

class MultiHeadAttention(nn.Module):
    """Standard multi-head self-attention mechanism - COMPLETE THE MISSING PARTS"""
    def __init__(self, embed_dim: int, num_heads: int = 8, dropout: float = 0.1):
        super().__init__()
        assert embed_dim % num_heads == 0
        
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        # TODO: Calculate head dimension
        # HINT: embed_dim divided by num_heads
        self.head_dim = # Your code here
        
        # TODO: Calculate scaling factor for attention scores
        # HINT: Use head_dim ** -0.5 (1/sqrt(head_dim))
        self.scale = # Your code here
        
        # TODO: Create linear layer for Q, K, V projections
        # HINT: Input: embed_dim, Output: embed_dim * 3 (for Q, K, V)
        self.qkv = # Your code here
        
        self.proj = nn.Linear(embed_dim, embed_dim)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        B, N, C = x.shape
        
        # TODO: Generate Q, K, V matrices and reshape for multi-head attention
        # HINT: 1) Apply self.qkv, 2) Reshape to (B, N, 3, num_heads, head_dim)
        # 3) Permute to (3, B, num_heads, N, head_dim), 4) Split into q, k, v
        qkv = self.qkv(x).reshape(B, N, 3, self.num_heads, self.head_dim).permute(2, 0, 3, 1, 4)
        q, k, v = qkv.unbind(0)
        
        # TODO: Compute scaled dot-product attention
        # HINT: 1) Compute q @ k.transpose(-2, -1), 2) Multiply by self.scale
        # 3) Apply softmax, 4) Apply dropout
        attn = # Your code here - compute attention scores
        attn = # Your code here - apply softmax
        attn = self.dropout(attn)
        
        # TODO: Apply attention to values and reshape output
        # HINT: 1) Multiply attn @ v, 2) Transpose and reshape back to (B, N, C)
        x = # Your code here
        x = self.proj(x)
        return x, attn

In [None]:
class TransformerBlock(nn.Module):
    """Standard Transformer block with attention and FFN"""
    def __init__(self, embed_dim: int, num_heads: int = 8, mlp_ratio: float = 4.0, dropout: float = 0.1):
        super().__init__()
        self.norm1 = nn.LayerNorm(embed_dim)
        self.attn = MultiHeadAttention(embed_dim, num_heads, dropout)
        self.norm2 = nn.LayerNorm(embed_dim)
        
        mlp_dim = int(embed_dim * mlp_ratio)
        self.mlp = nn.Sequential(
            nn.Linear(embed_dim, mlp_dim),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(mlp_dim, embed_dim),
            nn.Dropout(dropout)
        )
        
    def forward(self, x):
        # Pre-norm architecture
        attn_out, attn_weights = self.attn(self.norm1(x))
        x = x + attn_out
        x = x + self.mlp(self.norm2(x))
        return x, attn_weights

In [None]:
# Standard Attention Complexity Analysis

def measure_attention_complexity():
    """Empirically measure attention computational complexity"""
    sequence_lengths = [64, 128, 256, 512, 1024]
    embed_dim = 256
    times = []
    
    for seq_len in sequence_lengths:
        x = torch.randn(4, seq_len, embed_dim).to(device)
        attn = MultiHeadAttention(embed_dim).to(device)
        
        # Warm up
        for _ in range(10):
            _ = attn(x)
        
        # Measure time
        torch.cuda.synchronize() if torch.cuda.is_available() else None
        start_time = time.time()
        
        for _ in range(100):
            _ = attn(x)
            
        torch.cuda.synchronize() if torch.cuda.is_available() else None
        end_time = time.time()
        
        times.append((end_time - start_time) / 100)
    
    return sequence_lengths, times

seq_lens, standard_times = measure_attention_complexity()
print("Standard Attention Complexity Analysis:")
for seq_len, exec_time in zip(seq_lens, standard_times):
    print(f"Sequence length {seq_len}: {exec_time:.4f}s")

## 1. Window Attention Mechanism

### Theoretical Foundation

Window Attention, popularized by the Swin Transformer, restricts the self-attention computation to non-overlapping local windows. This approach is motivated by the observation that natural images exhibit strong local correlations.

**Key Principles:**
1. **Locality Bias**: Most relevant information for a patch lies in its spatial neighborhood
2. **Computational Efficiency**: Reduces complexity from quadratic to linear scaling where n is the total number of patches
3. **Hierarchical Processing**: Enables multi-scale feature learning through window shifting

**How Window Attention Works:**

Instead of computing attention between all possible patch pairs in an image, Window Attention divides the image into non-overlapping rectangular windows (typically 7x7 patches). Within each window, standard self-attention is computed, but tokens cannot attend to patches outside their window.

For example, if an image has 196 patches (14x14) and we use 7x7 windows, we create 4 windows of 49 patches each. Instead of computing attention across all 196×196 = 38,416 pairs, we compute attention within each of the 4 windows: 4×(49×49) = 9,604 pairs - a significant reduction.

To maintain some global connectivity, Swin Transformer uses **shifted windows** in alternating layers, where the window boundaries are moved by half the window size, allowing information to flow between previously separate windows.

**Complexity Analysis:**
- Standard Attention: Quadratic scaling with sequence length
- Window Attention: Linear scaling (assuming fixed window size)

In [None]:
# Window Attention Helper Functions

def window_partition(x, window_size):
    """
    Partition feature map into non-overlapping windows
    Args:
        x: (B, H, W, C)
        window_size: int
    Returns:
        windows: (num_windows*B, window_size, window_size, C)
    """
    B, H, W, C = x.shape
    x = x.view(B, H // window_size, window_size, W // window_size, window_size, C)
    windows = x.permute(0, 1, 3, 2, 4, 5).contiguous().view(-1, window_size, window_size, C)
    return windows

def window_reverse(windows, window_size, H, W):
    """
    Merge windows back to feature map
    Args:
        windows: (num_windows*B, window_size, window_size, C)
        window_size: int
        H, W: height and width of feature map
    Returns:
        x: (B, H, W, C)
    """
    B = int(windows.shape[0] / (H * W / window_size / window_size))
    x = windows.view(B, H // window_size, W // window_size, window_size, window_size, -1)
    x = x.permute(0, 1, 3, 2, 4, 5).contiguous().view(B, H, W, -1)
    return x

In [None]:
# HANDS-ON EXERCISE: Implement Window Attention

class SimpleWindowAttention(nn.Module):
    """
    Simplified Window Attention for arbitrary sequence lengths - COMPLETE THE IMPLEMENTATION
    """
    def __init__(self, embed_dim, window_size, num_heads=8, dropout=0.1):
        super().__init__()
        self.embed_dim = embed_dim
        self.window_size = window_size
        self.num_heads = num_heads
        # TODO: Calculate head dimension and scale factor (similar to standard attention)
        self.head_dim = # Your code here
        self.scale = # Your code here
        
        # TODO: Create QKV projection and output projection layers
        self.qkv = # Your code here
        self.proj = # Your code here
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x):
        B, N, C = x.shape
        
        # TODO: Pad sequence to be divisible by window_size
        # HINT: Calculate how much padding is needed using modulo operation
        padding_length = # Your code here - calculate padding needed
        if padding_length > 0:
            # TODO: Create padding tensor and concatenate with input
            padding = # Your code here - create zero padding
            x_padded = # Your code here - concatenate padding
        else:
            x_padded = x
        
        N_padded = x_padded.shape[1]
        # TODO: Calculate number of windows
        num_windows = # Your code here
        
        # TODO: Reshape input into windows
        # HINT: First reshape to (B, num_windows, window_size, C), then to (B*num_windows, window_size, C)
        x_windows = x_padded.view(B, num_windows, self.window_size, C)
        x_windows = # Your code here - reshape for processing
        
        # TODO: Apply attention within each window
        # HINT: This is similar to standard attention but operates on windows
        qkv = self.qkv(x_windows).reshape(B * num_windows, self.window_size, 3, self.num_heads, self.head_dim)
        qkv = qkv.permute(2, 0, 3, 1, 4)
        q, k, v = qkv.unbind(0)
        
        # TODO: Compute attention scores, apply softmax, and get attended values
        attn = # Your code here - compute scaled attention scores
        attn = # Your code here - apply softmax
        attn = self.dropout(attn)
        
        # TODO: Apply attention to values and project
        x_attended = # Your code here - apply attention to values and reshape
        x_attended = self.proj(x_attended)
        
        # TODO: Reshape back to original format and remove padding
        # HINT: 1) Reshape to (B, num_windows, window_size, C), 2) View as (B, N_padded, C), 3) Remove padding
        x_attended = x_attended.view(B, num_windows, self.window_size, C)
        x_attended = # Your code here - reshape to sequence format
        
        # TODO: Remove padding to get back to original sequence length
        x_attended = # Your code here - slice to remove padding
        
        return x_attended

# Test your implementation
print("Testing SimpleWindowAttention implementation...")
try:
    test_window_attn = SimpleWindowAttention(embed_dim=256, window_size=8, num_heads=8)
    test_input = torch.randn(2, 100, 256)  # Batch=2, Seq=100, Dim=256
    test_output = test_window_attn(test_input)
    print(f"✅ Success! Input shape: {test_input.shape}, Output shape: {test_output.shape}")
    print(f"Window size: {test_window_attn.window_size}, Windows needed: {(100 + 8 - 1) // 8}")
except Exception as e:
    print(f"❌ Error in implementation: {e}")
    print("💡 Check your TODO implementations above!")

In [None]:
class WindowAttention(nn.Module):
    """
    Window-based Multi-Head Self-Attention with relative position bias
    """
    def __init__(self, dim, window_size, num_heads, qkv_bias=True, dropout=0.):
        super().__init__()
        self.dim = dim
        self.window_size = window_size  # Wh, Ww
        self.num_heads = num_heads
        head_dim = dim // num_heads
        self.scale = head_dim ** -0.5

        # Define relative position bias table
        self.relative_position_bias_table = nn.Parameter(
            torch.zeros((2 * window_size[0] - 1) * (2 * window_size[1] - 1), num_heads))

        # Get pair-wise relative position indices
        coords_h = torch.arange(self.window_size[0])
        coords_w = torch.arange(self.window_size[1])
        coords = torch.stack(torch.meshgrid([coords_h, coords_w], indexing='ij'))
        coords_flatten = torch.flatten(coords, 1)
        relative_coords = coords_flatten[:, :, None] - coords_flatten[:, None, :]
        relative_coords = relative_coords.permute(1, 2, 0).contiguous()
        relative_coords[:, :, 0] += self.window_size[0] - 1
        relative_coords[:, :, 1] += self.window_size[1] - 1
        relative_coords[:, :, 0] *= 2 * self.window_size[1] - 1
        relative_position_index = relative_coords.sum(-1)
        self.register_buffer("relative_position_index", relative_position_index)

        self.qkv = nn.Linear(dim, dim * 3, bias=qkv_bias)
        self.attn_drop = nn.Dropout(dropout)
        self.proj = nn.Linear(dim, dim)
        self.proj_drop = nn.Dropout(dropout)

        nn.init.trunc_normal_(self.relative_position_bias_table, std=.02)

    def forward(self, x, mask=None):
        """
        Args:
            x: input features with shape (num_windows*B, N, C)
            mask: (0/-inf) mask with shape (num_windows, Wh*Ww, Wh*Ww) or None
        """
        B_, N, C = x.shape
        qkv = self.qkv(x).reshape(B_, N, 3, self.num_heads, C // self.num_heads).permute(2, 0, 3, 1, 4)
        q, k, v = qkv.unbind(0)

        q = q * self.scale
        attn = (q @ k.transpose(-2, -1))

        relative_position_bias = self.relative_position_bias_table[self.relative_position_index.view(-1)].view(
            self.window_size[0] * self.window_size[1], self.window_size[0] * self.window_size[1], -1)
        relative_position_bias = relative_position_bias.permute(2, 0, 1).contiguous()
        attn = attn + relative_position_bias.unsqueeze(0)

        if mask is not None:
            nW = mask.shape[0]
            attn = attn.view(B_ // nW, nW, self.num_heads, N, N) + mask.unsqueeze(1).unsqueeze(0)
            attn = attn.view(-1, self.num_heads, N, N)
            attn = attn.softmax(dim=-1)
        else:
            attn = attn.softmax(dim=-1)

        attn = self.attn_drop(attn)

        x = (attn @ v).transpose(1, 2).reshape(B_, N, C)
        x = self.proj(x)
        x = self.proj_drop(x)
        return x, attn

In [None]:
class SwinTransformerBlock(nn.Module):
    """Swin Transformer Block with Window Attention"""
    def __init__(self, dim, input_resolution, num_heads, window_size=7, shift_size=0, 
                 mlp_ratio=4., qkv_bias=True, dropout=0.):
        super().__init__()
        self.dim = dim
        self.input_resolution = input_resolution
        self.num_heads = num_heads
        self.window_size = window_size
        self.shift_size = shift_size
        self.mlp_ratio = mlp_ratio
        
        if min(self.input_resolution) <= self.window_size:
            self.shift_size = 0
            self.window_size = min(self.input_resolution)

        self.norm1 = nn.LayerNorm(dim)
        self.attn = WindowAttention(
            dim, window_size=(self.window_size, self.window_size), num_heads=num_heads,
            qkv_bias=qkv_bias, dropout=dropout)

        self.norm2 = nn.LayerNorm(dim)
        mlp_hidden_dim = int(dim * mlp_ratio)
        self.mlp = nn.Sequential(
            nn.Linear(dim, mlp_hidden_dim),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(mlp_hidden_dim, dim),
            nn.Dropout(dropout)
        )

        if self.shift_size > 0:
            H, W = self.input_resolution
            img_mask = torch.zeros((1, H, W, 1))
            h_slices = (slice(0, -self.window_size),
                        slice(-self.window_size, -self.shift_size),
                        slice(-self.shift_size, None))
            w_slices = (slice(0, -self.window_size),
                        slice(-self.window_size, -self.shift_size),
                        slice(-self.shift_size, None))
            cnt = 0
            for h in h_slices:
                for w in w_slices:
                    img_mask[:, h, w, :] = cnt
                    cnt += 1

            mask_windows = window_partition(img_mask, self.window_size)
            mask_windows = mask_windows.view(-1, self.window_size * self.window_size)
            attn_mask = mask_windows.unsqueeze(1) - mask_windows.unsqueeze(2)
            attn_mask = attn_mask.masked_fill(attn_mask != 0, float(-100.0)).masked_fill(attn_mask == 0, float(0.0))
        else:
            attn_mask = None

        self.register_buffer("attn_mask", attn_mask)

    def forward(self, x):
        H, W = self.input_resolution
        B, L, C = x.shape
        assert L == H * W, "Input feature has wrong size"

        shortcut = x
        x = self.norm1(x)
        x = x.view(B, H, W, C)

        # Cyclic shift
        if self.shift_size > 0:
            shifted_x = torch.roll(x, shifts=(-self.shift_size, -self.shift_size), dims=(1, 2))
        else:
            shifted_x = x

        # Partition windows
        x_windows = window_partition(shifted_x, self.window_size)
        x_windows = x_windows.view(-1, self.window_size * self.window_size, C)

        # W-MSA/SW-MSA
        attn_windows, attention_weights = self.attn(x_windows, mask=self.attn_mask)

        # Merge windows
        attn_windows = attn_windows.view(-1, self.window_size, self.window_size, C)
        shifted_x = window_reverse(attn_windows, self.window_size, H, W)

        # Reverse cyclic shift
        if self.shift_size > 0:
            x = torch.roll(shifted_x, shifts=(self.shift_size, self.shift_size), dims=(1, 2))
        else:
            x = shifted_x
        x = x.view(B, H * W, C)

        # FFN
        x = shortcut + x
        x = x + self.mlp(self.norm2(x))

        return x, attention_weights

In [None]:
# Window Attention Complexity Analysis

def analyze_window_attention_complexity():
    """Analyze computational complexity of window attention"""
    sequence_lengths = [64, 128, 256, 512]
    window_sizes = [8, 16, 32]
    embed_dim = 96
    
    results = defaultdict(list)
    
    for seq_len in sequence_lengths:
        x = torch.randn(4, seq_len, embed_dim).to(device)
        
        for window_size in window_sizes:
            if window_size <= seq_len:
                # Simple window attention
                window_attn = SimpleWindowAttention(
                    embed_dim=embed_dim,
                    window_size=window_size,
                    num_heads=3
                ).to(device)
                
                # Measure time
                torch.cuda.synchronize() if torch.cuda.is_available() else None
                start_time = time.time()
                
                for _ in range(50):
                    _ = window_attn(x)
                
                torch.cuda.synchronize() if torch.cuda.is_available() else None
                end_time = time.time()
                
                avg_time = (end_time - start_time) / 50
                results[f'Window-{window_size}'].append((seq_len, avg_time))
    
    return results

window_complexity_results = analyze_window_attention_complexity()
print("Window Attention Complexity Analysis:")
for method, timing_data in window_complexity_results.items():
    print(f"\n{method}:")
    for seq_len, exec_time in timing_data:
        print(f"  Sequence length {seq_len}: {exec_time:.4f}s")

## 2. Linear Attention Mechanism

### Theoretical Foundation

Linear Attention approximates the standard softmax attention using kernel methods, achieving linear complexity while maintaining global receptive fields. The key insight is to reformulate attention computation using the associative property of matrix multiplication.

**How Linear Attention Works:**

Standard attention computes attention weights by taking the softmax of query-key similarities, then multiplying by values. Linear attention approximates this process using kernel functions that map queries and keys to a higher-dimensional feature space.

The main innovation is reordering the computation:
1. **Standard approach**: First compute all query-key similarities (expensive), then combine with values
2. **Linear approach**: First combine keys with values using kernel functions (cheaper), then compute with queries

This reordering changes the computational complexity from quadratic to linear in sequence length.

**Kernel Functions:**

Linear attention uses kernel functions to approximate the softmax operation:

- **ELU-based kernel**: Uses the ELU activation function plus 1 to ensure positive values, providing a good approximation to softmax while being computationally efficient
- **ReLU-based kernel**: Uses ReLU activation, which is very fast but may provide a coarser approximation
- **Feature mapping kernel**: Splits each element into positive and negative parts, doubling the feature dimension but often improving approximation quality

**Key Properties:**
1. **Complexity Reduction**: Changes from quadratic to linear scaling with sequence length
2. **Global Receptive Field**: Unlike window attention, maintains ability to attend to all tokens
3. **Approximation Quality**: Trade-off between efficiency and attention accuracy - kernel choice significantly affects how well it approximates standard attention

In [None]:
# HANDS-ON EXERCISE: Implement Linear Attention

class LinearAttention(nn.Module):
    """
    Linear Attention mechanism using kernel approximation - COMPLETE THE IMPLEMENTATION
    """
    def __init__(self, embed_dim, num_heads=8, kernel_type='elu', dropout=0.1):
        super().__init__()
        assert embed_dim % num_heads == 0
        
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.head_dim = embed_dim // num_heads
        self.kernel_type = kernel_type
        
        # TODO: Create QKV projection and output projection
        self.qkv = # Your code here
        self.proj = # Your code here
        self.dropout = nn.Dropout(dropout)
        
        # TODO: Set up kernel function based on kernel_type
        # HINT: Use if-elif statements to assign the correct kernel function
        if kernel_type == 'elu':
            self.kernel_fn = # Your code here
        elif kernel_type == 'relu':
            self.kernel_fn = # Your code here
        elif kernel_type == 'feature_map':
            self.kernel_fn = # Your code here
        else:
            raise ValueError(f"Unknown kernel type: {kernel_type}")
    
    def _elu_kernel(self, x):
        """ELU-based kernel function"""
        # TODO: Apply ELU activation and add 1 to ensure positive values
        # HINT: F.elu(x) + 1
        return # Your code here
    
    def _relu_kernel(self, x):
        """ReLU-based kernel function"""
        # TODO: Apply ReLU activation
        return # Your code here
    
    def _feature_map_kernel(self, x):
        """Feature mapping kernel: [x+, x-]"""
        # TODO: Split into positive and negative parts, then concatenate
        # HINT: Use F.relu(x) for positive part and F.relu(-x) for negative part
        x_pos = # Your code here
        x_neg = # Your code here
        return # Your code here - concatenate along last dimension
    
    def forward(self, x):
        B, N, C = x.shape
        # Generate Q, K, V (same as standard attention)
        qkv = self.qkv(x).reshape(B, N, 3, self.num_heads, self.head_dim).permute(2, 0, 3, 1, 4)
        q, k, v = qkv.unbind(0)  # (B, num_heads, N, head_dim)
        
        # TODO: Apply kernel function to Q and K
        # HINT: Use self.kernel_fn() on both q and k
        q_kernel = # Your code here
        k_kernel = # Your code here
        
        # TODO: Implement linear attention computation: φ(Q)(φ(K)^T V)
        # Step 1: Compute φ(K)^T V using einsum
        # HINT: Use 'bhnd,bhnf->bhdf' to multiply k_kernel^T with v
        kv = # Your code here
        
        # Step 2: Compute φ(Q)(φ(K)^T V) using einsum
        # HINT: Use 'bhnd,bhdf->bhnf' to multiply q_kernel with kv
        qkv_out = # Your code here
        
        # TODO: Compute normalization factor
        # Step 1: Sum k_kernel across sequence dimension
        k_sum = # Your code here - sum along sequence dimension, keep dims
        
        # Step 2: Compute normalizer using einsum
        # HINT: Use 'bhnd,bhnd->bhn' to compute dot product of q_kernel and k_sum
        normalizer = # Your code here
        normalizer = normalizer.unsqueeze(-1)  # Add dimension for broadcasting
        
        # TODO: Apply normalization
        qkv_out = # Your code here - divide by normalizer (add small epsilon)
        
        # TODO: Reshape and project output
        # HINT: 1) Transpose to (B, N, num_heads, head_dim), 2) Reshape to (B, N, C), 3) Apply projection
        qkv_out = qkv_out.transpose(1, 2).reshape(B, N, C)
        output = # Your code here - apply projection and dropout
        
        return output

# Test your implementation
print("Testing LinearAttention implementation...")
try:
    test_linear_attn = LinearAttention(embed_dim=256, num_heads=8, kernel_type='elu')
    test_input = torch.randn(2, 100, 256)
    test_output = test_linear_attn(test_input)
    print(f"✅ Success! Input shape: {test_input.shape}, Output shape: {test_output.shape}")
    print(f"Kernel type: {test_linear_attn.kernel_type}")
    
    # Test different kernels
    for kernel in ['elu', 'relu', 'feature_map']:
        test_attn = LinearAttention(embed_dim=256, kernel_type=kernel)
        output = test_attn(test_input)
        print(f"✅ {kernel} kernel works! Output shape: {output.shape}")
        
except Exception as e:
    print(f"❌ Error in implementation: {e}")
    print("💡 Check your TODO implementations above!")

In [None]:
class CausalLinearAttention(nn.Module):
    """
    Causal Linear Attention for autoregressive tasks
    """
    def __init__(self, embed_dim, num_heads=8, kernel_type='elu', dropout=0.1):
        super().__init__()
        assert embed_dim % num_heads == 0
        
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.head_dim = embed_dim // num_heads
        self.kernel_type = kernel_type
        
        self.qkv = nn.Linear(embed_dim, embed_dim * 3, bias=False)
        self.proj = nn.Linear(embed_dim, embed_dim)
        self.dropout = nn.Dropout(dropout)
        
        if kernel_type == 'elu':
            self.kernel_fn = lambda x: F.elu(x) + 1
        elif kernel_type == 'relu':
            self.kernel_fn = F.relu
        else:
            raise ValueError(f"Unknown kernel type: {kernel_type}")
    
    def forward(self, x):
        B, N, C = x.shape
        qkv = self.qkv(x).reshape(B, N, 3, self.num_heads, self.head_dim).permute(2, 0, 3, 1, 4)
        q, k, v = qkv.unbind(0)
        
        # Apply kernel function
        q_kernel = self.kernel_fn(q)
        k_kernel = self.kernel_fn(k)
        
        # Causal linear attention using cumulative sums
        output = torch.zeros_like(q)
        kv_state = torch.zeros(B, self.num_heads, self.head_dim, self.head_dim).to(x.device)
        k_state = torch.zeros(B, self.num_heads, self.head_dim).to(x.device)
        
        for i in range(N):
            # Update states
            kv_state = kv_state + torch.einsum('bhd,bhf->bhdf', k_kernel[:, :, i], v[:, :, i])
            k_state = k_state + k_kernel[:, :, i]
            
            # Compute output
            output[:, :, i] = torch.einsum('bhd,bhdf->bhf', q_kernel[:, :, i], kv_state)
            normalizer = torch.einsum('bhd,bhd->bh', q_kernel[:, :, i], k_state).unsqueeze(-1)
            output[:, :, i] = output[:, :, i] / (normalizer + 1e-6)
        
        # Reshape and project
        output = output.transpose(1, 2).reshape(B, N, C)
        output = self.proj(output)
        output = self.dropout(output)
        
        return output

In [None]:
# Linear Attention Analysis Functions

def compare_attention_approximation_quality():
    """Compare approximation quality of linear attention vs standard attention"""
    embed_dim = 256
    seq_len = 128
    batch_size = 4
    
    x = torch.randn(batch_size, seq_len, embed_dim).to(device)
    
    # Standard attention
    std_attn = MultiHeadAttention(embed_dim).to(device)
    std_output, std_attn_weights = std_attn(x)
    
    # Linear attention variants
    kernels = ['elu', 'relu', 'feature_map']
    results = {}
    
    for kernel_type in kernels:
        linear_attn = LinearAttention(embed_dim, kernel_type=kernel_type).to(device)
        linear_output = linear_attn(x)
        
        # Compute approximation error
        mse_error = F.mse_loss(linear_output, std_output).item()
        cosine_sim = F.cosine_similarity(
            linear_output.view(-1), std_output.view(-1), dim=0
        ).item()
        
        results[kernel_type] = {
            'mse_error': mse_error,
            'cosine_similarity': cosine_sim
        }
    
    return results

def measure_linear_attention_complexity():
    """Measure computational complexity of linear attention"""
    sequence_lengths = [64, 128, 256, 512, 1024]
    embed_dim = 256
    
    std_times = []
    linear_times = []
    
    for seq_len in sequence_lengths:
        x = torch.randn(4, seq_len, embed_dim).to(device)
        
        # Standard attention
        std_attn = MultiHeadAttention(embed_dim).to(device)
        torch.cuda.synchronize() if torch.cuda.is_available() else None
        start_time = time.time()
        for _ in range(50):
            _ = std_attn(x)
        torch.cuda.synchronize() if torch.cuda.is_available() else None
        std_times.append((time.time() - start_time) / 50)
        
        # Linear attention
        linear_attn = LinearAttention(embed_dim, kernel_type='elu').to(device)
        torch.cuda.synchronize() if torch.cuda.is_available() else None
        start_time = time.time()
        for _ in range(50):
            _ = linear_attn(x)
        torch.cuda.synchronize() if torch.cuda.is_available() else None
        linear_times.append((time.time() - start_time) / 50)
    
    return sequence_lengths, std_times, linear_times

In [None]:
# Run Linear Attention Analyses

print("Linear Attention Approximation Quality Analysis:")
approx_results = compare_attention_approximation_quality()
for kernel, metrics in approx_results.items():
    print(f"{kernel} kernel:")
    print(f"  MSE Error: {metrics['mse_error']:.6f}")
    print(f"  Cosine Similarity: {metrics['cosine_similarity']:.4f}")

print("\nLinear Attention Complexity Analysis:")
seq_lens, std_times, linear_times = measure_linear_attention_complexity()
for seq_len, std_time, linear_time in zip(seq_lens, std_times, linear_times):
    speedup = std_time / linear_time
    print(f"Seq len {seq_len}: Standard {std_time:.4f}s, Linear {linear_time:.4f}s, Speedup: {speedup:.2f}x")

## 3. Sparse Attention Mechanism

### Theoretical Foundation

Sparse Attention reduces computational complexity by selectively attending to a subset of input tokens. This approach is motivated by the observation that not all token pairs contribute equally to the final representation.

**How Sparse Attention Works:**

Instead of computing attention between every pair of tokens, sparse attention uses predefined or learned patterns to determine which tokens can attend to which other tokens. This creates a sparse attention matrix where many entries are zero.

**Key Insight**: In many tasks, tokens primarily need to attend to:
- **Local neighbors** (for capturing local patterns)
- **A few global tokens** (for capturing long-range dependencies)
- **Specific positions** based on task requirements

**Standard vs Sparse Attention:**

**Standard Attention** computes a full attention matrix where every token can attend to every other token. For a sequence of length n, this requires n² attention computations.

**Sparse Attention** restricts attention to a sparse pattern where each token only attends to a small subset of other tokens. If each token attends to only s other tokens (where s << n), the computation becomes linear in sequence length.

**Sparsity Patterns:**

1. **Random Sparse**: Randomly sample a subset of connections for each token
2. **Local Window**: Each token attends only to tokens within a fixed window around it
3. **Strided**: Each token attends to every k-th token (useful for capturing regular patterns)
4. **Global + Local**: Combine a few global tokens (that attend to everything) with local windows
5. **Learned Sparse**: Use machine learning to discover which connections are most important

**Complexity Analysis:**
- **Standard Attention**: Quadratic scaling - every token attends to every other token
- **Sparse Attention**: Approximately linear scaling - each token attends to only a fixed number of other tokens

The **BigBird** model exemplifies effective sparse attention by combining global attention (some tokens attend to all others), local attention (sliding windows), and random attention (random long-range connections) to maintain both efficiency and modeling capability.

In [None]:
# HANDS-ON EXERCISE: Implement Sparse Attention Patterns

def create_sparse_mask(seq_len, pattern_type='random', sparsity_factor=4, window_size=None):
    """
    Create different types of sparse attention masks - COMPLETE THE IMPLEMENTATION
    """
    if pattern_type == 'random':
        # TODO: Create random sparse pattern
        # HINT: 1) Create zero matrix, 2) Generate random indices, 3) Set those positions to 1
        mask = torch.zeros(seq_len, seq_len)
        # Calculate how many connections to keep
        total_connections = seq_len * seq_len
        connections_to_keep = total_connections // sparsity_factor
        
        # TODO: Generate random indices and set them to 1
        indices = # Your code here - use torch.randperm to get random indices
        mask.view(-1)[indices] = 1
        return mask.bool()
    
    elif pattern_type == 'local_window':
        # TODO: Create local window pattern
        # HINT: For each position i, allow attention to positions within window_size//2
        window_size = window_size or seq_len // 8
        mask = torch.zeros(seq_len, seq_len)
        
        for i in range(seq_len):
            # TODO: Calculate start and end positions for the window
            start = # Your code here - max(0, i - window_size // 2)
            end = # Your code here - min(seq_len, i + window_size // 2 + 1)
            # TODO: Set the window positions to 1
            mask[i, start:end] = # Your code here
            
        return mask.bool()
    
    elif pattern_type == 'strided':
        # TODO: Create strided pattern
        # HINT: For each position i, attend to every sparsity_factor-th position starting from i
        mask = torch.zeros(seq_len, seq_len)
        for i in range(seq_len):
            # TODO: Set every sparsity_factor-th position to 1
            mask[i, i::sparsity_factor] = # Your code here
        return mask.bool()
    
    elif pattern_type == 'global_local':
        # TODO: Create global + local pattern (BigBird style)
        # HINT: 1) Some tokens attend globally, 2) All tokens have local attention
        mask = torch.zeros(seq_len, seq_len)
        window_size = window_size or 32
        global_tokens = min(seq_len // 16, 64)
        
        # TODO: Global connections for first few tokens
        # HINT: First global_tokens can attend to all positions
        mask[:global_tokens, :] = # Your code here
        mask[:, :global_tokens] = # Your code here
        
        # TODO: Local connections for all tokens
        # HINT: Same as local_window pattern
        for i in range(seq_len):
            start = # Your code here
            end = # Your code here
            mask[i, start:end] = # Your code here
        
        return mask.bool()
    
    else:
        raise ValueError(f"Unknown pattern type: {pattern_type}")

# Test your sparse mask implementation
print("🧪 Testing Sparse Mask Creation...")
print("=" * 50)

def test_sparse_patterns():
    """Test all sparse attention patterns"""
    seq_len = 64
    patterns = ['random', 'local_window', 'strided', 'global_local']
    
    for pattern in patterns:
        try:
            # TODO: Create mask using your implementation
            mask = create_sparse_mask(seq_len, pattern, sparsity_factor=4)
            
            # Calculate sparsity statistics
            total_connections = seq_len * seq_len
            active_connections = mask.sum().item()
            sparsity_ratio = active_connections / total_connections
            
            print(f"✅ {pattern:12s}: {active_connections:4.0f}/{total_connections} connections "
                  f"({sparsity_ratio:.3f} density)")
            
            # TODO: Verify mask properties
            # Check that mask is square and has correct dimensions
            assert mask.shape == (seq_len, seq_len), f"Wrong shape: {mask.shape}"
            assert mask.dtype == torch.bool, f"Wrong dtype: {mask.dtype}"
            
            # Pattern-specific checks
            if pattern == 'local_window':
                # TODO: Verify diagonal elements are True (self-attention)
                diagonal_check = # Your code here - check mask.diag().all()
                assert diagonal_check, "Local window should include self-attention"
                
        except Exception as e:
            print(f"❌ {pattern:12s}: Error - {e}")

# Run the test
test_sparse_patterns()

print(f"\n🎯 Understanding Sparsity Patterns:")
print("- Random: Unstructured sparsity, good for general pruning")
print("- Local Window: Captures local dependencies, good for images")  
print("- Strided: Regular patterns, good for periodic data")
print("- Global+Local: Combines global and local attention, best balance")

In [None]:
# HANDS-ON EXERCISE: Implement Sparse Attention

class SparseAttention(nn.Module):
    """
    Sparse Multi-Head Self-Attention with configurable sparsity patterns - COMPLETE THE IMPLEMENTATION
    """
    def __init__(self, embed_dim, num_heads=8, sparsity_pattern='random', 
                 sparsity_factor=4, dropout=0.1):
        super().__init__()
        assert embed_dim % num_heads == 0
        
        # TODO: Set up basic attention parameters (similar to standard attention)
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.head_dim = # Your code here
        self.scale = # Your code here
        
        # Store sparsity configuration
        self.sparsity_pattern = sparsity_pattern
        self.sparsity_factor = sparsity_factor
        
        # TODO: Create QKV projection and output projection layers
        self.qkv = # Your code here
        self.proj = # Your code here
        self.dropout = nn.Dropout(dropout)
        
        # Register buffer for attention mask (will be set during forward)
        self.register_buffer('attention_mask', None)
    
    def forward(self, x):
        B, N, C = x.shape
        
        # TODO: Create sparse mask if not exists or sequence length changed
        # HINT: Check if self.attention_mask is None or has wrong shape
        if # Your condition here:
            self.attention_mask = create_sparse_mask(
                N, self.sparsity_pattern, self.sparsity_factor
            ).to(x.device)
        
        # TODO: Generate Q, K, V matrices (same as standard attention)
        qkv = self.qkv(x).reshape(B, N, 3, self.num_heads, self.head_dim).permute(2, 0, 3, 1, 4)
        q, k, v = qkv.unbind(0)
        
        # TODO: Compute attention scores
        # HINT: Standard dot-product attention with scaling
        attn = # Your code here
        
        # TODO: Apply sparse mask to attention scores
        # HINT: 1) Add batch and head dimensions to mask, 2) Use masked_fill with negative infinity
        sparse_mask = self.attention_mask.unsqueeze(0).unsqueeze(0)  # (1, 1, N, N)
        attn = # Your code here - apply mask using masked_fill
        
        # TODO: Apply softmax and dropout
        attn = # Your code here - apply softmax
        attn = self.dropout(attn)
        
        # TODO: Apply attention to values and reshape output
        # HINT: 1) Multiply attn @ v, 2) Transpose and reshape, 3) Apply projection
        x = # Your code here - apply attention to values
        x = # Your code here - apply output projection
        
        return x, attn

# Test your implementation
print("Testing SparseAttention implementation...")
try:
    # Test different sparsity patterns
    patterns = ['random', 'local_window', 'strided', 'global_local']
    
    for pattern in patterns:
        test_sparse_attn = SparseAttention(
            embed_dim=256, 
            num_heads=8, 
            sparsity_pattern=pattern,
            sparsity_factor=4
        )
        test_input = torch.randn(2, 64, 256)
        test_output, test_attn = test_sparse_attn(test_input)
        
        # Calculate actual sparsity
        mask = test_sparse_attn.attention_mask
        sparsity = mask.float().mean().item()
        
        print(f"✅ {pattern} pattern works!")
        print(f"   Input: {test_input.shape}, Output: {test_output.shape}")
        print(f"   Attention sparsity: {sparsity:.3f} ({sparsity*100:.1f}% connections)")
        print()
        
except Exception as e:
    print(f"❌ Error in implementation: {e}")
    print("💡 Check your TODO implementations above!")

In [None]:
class BigBirdAttention(nn.Module):
    """
    BigBird-style sparse attention with global, local, and random components
    """
    def __init__(self, embed_dim, num_heads=8, window_size=64, num_global_tokens=64,
                 num_random_tokens=64, dropout=0.1):
        super().__init__()
        assert embed_dim % num_heads == 0
        
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.head_dim = embed_dim // num_heads
        self.scale = self.head_dim ** -0.5
        self.window_size = window_size
        self.num_global_tokens = num_global_tokens
        self.num_random_tokens = num_random_tokens
        
        self.qkv = nn.Linear(embed_dim, embed_dim * 3, bias=False)
        self.proj = nn.Linear(embed_dim, embed_dim)
        self.dropout = nn.Dropout(dropout)
    
    def create_bigbird_mask(self, seq_len):
        """Create BigBird attention mask"""
        mask = torch.zeros(seq_len, seq_len, dtype=torch.bool)
        
        # Global attention for first num_global_tokens
        global_tokens = min(self.num_global_tokens, seq_len)
        mask[:global_tokens, :] = True
        mask[:, :global_tokens] = True
        
        # Local sliding window
        for i in range(seq_len):
            start = max(0, i - self.window_size // 2)
            end = min(seq_len, i + self.window_size // 2 + 1)
            mask[i, start:end] = True
        
        # Random connections
        for i in range(seq_len):
            if i >= global_tokens:  # Skip global tokens
                random_indices = torch.randperm(seq_len)[:self.num_random_tokens]
                mask[i, random_indices] = True
        
        return mask
    
    def forward(self, x):
        B, N, C = x.shape
        
        # Create attention mask
        attention_mask = self.create_bigbird_mask(N).to(x.device)
        
        qkv = self.qkv(x).reshape(B, N, 3, self.num_heads, self.head_dim).permute(2, 0, 3, 1, 4)
        q, k, v = qkv.unbind(0)
        
        attn = (q @ k.transpose(-2, -1)) * self.scale
        
        # Apply attention mask
        sparse_mask = attention_mask.unsqueeze(0).unsqueeze(0)
        attn = attn.masked_fill(~sparse_mask, float('-inf'))
        
        attn = attn.softmax(dim=-1)
        attn = self.dropout(attn)
        
        x = (attn @ v).transpose(1, 2).reshape(B, N, C)
        x = self.proj(x)
        
        return x, attn

In [None]:
# Sparse Attention Analysis Functions

def analyze_sparsity_patterns():
    """Analyze different sparsity patterns"""
    seq_len = 256
    patterns = ['random', 'local_window', 'strided', 'global_local']
    
    fig, axes = plt.subplots(2, 2, figsize=(12, 10))
    axes = axes.flatten()
    
    for i, pattern in enumerate(patterns):
        mask = create_sparse_mask(seq_len, pattern, sparsity_factor=8)
        
        axes[i].imshow(mask.float().numpy(), cmap='Blues', aspect='auto')
        axes[i].set_title(f'{pattern.replace("_", " ").title()} Pattern')
        axes[i].set_xlabel('Key Position')
        axes[i].set_ylabel('Query Position')
        
        # Calculate sparsity
        sparsity = mask.float().mean().item()
        axes[i].text(0.02, 0.98, f'Sparsity: {sparsity:.3f}', 
                    transform=axes[i].transAxes, verticalalignment='top',
                    bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))
    
    plt.tight_layout()
    plt.show()

def measure_sparse_attention_efficiency():
    """Measure efficiency of different sparse attention patterns"""
    embed_dim = 256
    seq_len = 512
    batch_size = 4
    
    x = torch.randn(batch_size, seq_len, embed_dim).to(device)
    
    # Standard attention baseline
    std_attn = MultiHeadAttention(embed_dim).to(device)
    torch.cuda.synchronize() if torch.cuda.is_available() else None
    start_time = time.time()
    for _ in range(30):
        _ = std_attn(x)
    torch.cuda.synchronize() if torch.cuda.is_available() else None
    std_time = (time.time() - start_time) / 30
    
    # Sparse attention variants
    patterns = ['random', 'local_window', 'strided', 'global_local']
    sparsity_factors = [2, 4, 8, 16]
    
    results = defaultdict(dict)
    
    for pattern in patterns:
        for factor in sparsity_factors:
            sparse_attn = SparseAttention(
                embed_dim, sparsity_pattern=pattern, sparsity_factor=factor
            ).to(device)
            
            torch.cuda.synchronize() if torch.cuda.is_available() else None
            start_time = time.time()
            for _ in range(30):
                _ = sparse_attn(x)
            torch.cuda.synchronize() if torch.cuda.is_available() else None
            sparse_time = (time.time() - start_time) / 30
            
            speedup = std_time / sparse_time
            results[pattern][factor] = {
                'time': sparse_time,
                'speedup': speedup
            }
    
    return std_time, results

In [None]:
# Run Sparse Attention Analyses

print("Analyzing Sparsity Patterns:")
analyze_sparsity_patterns()

print("\nSparse Attention Efficiency Analysis:")
std_time, sparse_results = measure_sparse_attention_efficiency()
print(f"Standard Attention: {std_time:.4f}s")

for pattern, factor_results in sparse_results.items():
    print(f"\n{pattern.replace('_', ' ').title()} Pattern:")
    for factor, metrics in factor_results.items():
        print(f"  Factor {factor}: {metrics['time']:.4f}s (Speedup: {metrics['speedup']:.2f}x)")

In [None]:
# Comprehensive Benchmark - Main Function

def comprehensive_benchmark():
    """
    Comprehensive benchmark comparing all attention mechanisms across 
    multiple dimensions: speed, memory, and approximation quality
    """
    
    # Test configurations
    test_configs = [
        {'seq_len': 64, 'embed_dim': 256, 'batch_size': 8},
        {'seq_len': 128, 'embed_dim': 256, 'batch_size': 8},
        {'seq_len': 256, 'embed_dim': 256, 'batch_size': 4},
        {'seq_len': 512, 'embed_dim': 256, 'batch_size': 2},
    ]
    
    results = defaultdict(lambda: defaultdict(list))
    
    for config in test_configs:
        seq_len = config['seq_len']
        embed_dim = config['embed_dim']
        batch_size = config['batch_size']
        
        print(f"Testing config: seq_len={seq_len}, embed_dim={embed_dim}, batch_size={batch_size}")
        
        # Generate test input
        x = torch.randn(batch_size, seq_len, embed_dim).to(device)
        
        # Standard Attention (baseline)
        std_attn = MultiHeadAttention(embed_dim).to(device)
        std_output, std_attn_weights = std_attn(x)
        
        # Measure standard attention
        torch.cuda.synchronize() if torch.cuda.is_available() else None
        start_time = time.time()
        for _ in range(50):
            _ = std_attn(x)
        torch.cuda.synchronize() if torch.cuda.is_available() else None
        std_time = (time.time() - start_time) / 50
        
        # Memory usage (approximate)
        std_memory = seq_len * seq_len * batch_size * 4  # 4 bytes per float32
        
        # Store baseline results
        results['Standard'][seq_len] = {
            'time': std_time,
            'memory': std_memory,
            'speedup': 1.0,
            'memory_reduction': 1.0,
            'approximation_error': 0.0
        }
        
        # Test other attention mechanisms
        results = test_window_attention(results, x, std_output, std_time, std_memory, seq_len, embed_dim, batch_size)
        results = test_linear_attention(results, x, std_output, std_time, std_memory, seq_len, embed_dim, batch_size)
        results = test_sparse_attention(results, x, std_output, std_time, std_memory, seq_len, embed_dim, batch_size)
    
    return results

In [None]:
# HANDS-ON EXERCISE: Compare All Attention Mechanisms

def compare_attention_mechanisms_exercise():
    """
    EXERCISE: Complete this function to compare all attention mechanisms
    """
    print("🔬 HANDS-ON COMPARISON: Attention Mechanisms")
    print("=" * 60)
    
    # Test configuration
    batch_size, seq_len, embed_dim = 4, 128, 256
    num_heads = 8
    
    # TODO: Create test input tensor
    # HINT: Use torch.randn with the dimensions above
    test_input = # Your code here
    
    print(f"Test input shape: {test_input.shape}")
    print(f"Sequence length: {seq_len}, Embed dim: {embed_dim}")
    print()
    
    # TODO: Initialize all attention mechanisms
    print("🏗️  Initializing attention mechanisms...")
    
    # Standard attention (baseline)
    std_attn = # Your code here - use MultiHeadAttention
    
    # Window attention
    window_attn = # Your code here - use SimpleWindowAttention with window_size=16
    
    # Linear attention  
    linear_attn = # Your code here - use LinearAttention with kernel_type='elu'
    
    # Sparse attention
    sparse_attn = # Your code here - use SparseAttention with sparsity_pattern='global_local'
    
    attention_models = {
        'Standard': std_attn,
        'Window': window_attn, 
        'Linear': linear_attn,
        'Sparse': sparse_attn
    }
    
    # TODO: Test each attention mechanism
    results = {}
    
    for name, model in attention_models.items():
        print(f"Testing {name} Attention...")
        
        try:
            # TODO: Measure execution time
            start_time = time.time()
            
            # Run the model multiple times for accurate timing
            for _ in range(10):
                if name == 'Standard':
                    # TODO: Handle standard attention (returns output and attention weights)
                    output, attn_weights = # Your code here
                else:
                    # TODO: Handle other attention types
                    if name == 'Sparse':
                        # Sparse attention returns output and attention
                        output, _ = # Your code here
                    else:
                        # Window and Linear attention return only output
                        output = # Your code here
            
            end_time = time.time()
            exec_time = (end_time - start_time) / 10  # Average time
            
            # TODO: Calculate approximate memory usage
            if name == 'Standard':
                # Standard attention: O(n²) memory
                memory_usage = # Your code here - seq_len * seq_len
            elif name == 'Window':
                # Window attention: O(n * w) memory where w is window size
                memory_usage = # Your code here - seq_len * 16 (window size)
            elif name == 'Linear':
                # Linear attention: O(n) memory
                memory_usage = # Your code here - seq_len
            elif name == 'Sparse':
                # Sparse attention: reduced quadratic
                sparsity = sparse_attn.attention_mask.float().mean().item()
                memory_usage = # Your code here - seq_len * seq_len * sparsity
            
            # Store results
            results[name] = {
                'time': exec_time,
                'memory': memory_usage,
                'output_shape': output.shape
            }
            
            print(f"  ✅ Success! Time: {exec_time:.4f}s, Memory: ~{memory_usage:.0f} units")
            
        except Exception as e:
            print(f"  ❌ Error: {e}")
            results[name] = {'error': str(e)}
    
    # TODO: Calculate and display speedups
    print(f"\n📊 PERFORMANCE COMPARISON:")
    print("-" * 40)
    
    if 'Standard' in results and 'time' in results['Standard']:
        baseline_time = results['Standard']['time']
        baseline_memory = results['Standard']['memory']
        
        for name, result in results.items():
            if 'time' in result:
                # TODO: Calculate speedup and memory reduction
                speedup = # Your code here - baseline_time / result['time']
                memory_reduction = # Your code here - baseline_memory / result['memory']
                
                print(f"{name:10s}: {result['time']:.4f}s | "
                      f"Speedup: {speedup:.2f}x | "
                      f"Memory: {memory_reduction:.2f}x less")
    
    print("\n🎯 Key Insights:")
    print("- Window Attention: Best for images with spatial locality")
    print("- Linear Attention: Best scaling for very long sequences") 
    print("- Sparse Attention: Balanced trade-off between efficiency and global context")
    
    return results

# Run the comparison exercise
print("🚀 Running Attention Mechanisms Comparison Exercise...")
print("Complete the TODO items above, then run this cell!")
print()

try:
    comparison_results = compare_attention_mechanisms_exercise()
    print(f"\n🎉 Exercise completed successfully!")
except Exception as e:
    print(f"❌ Please complete the TODO items above. Error: {e}")

In [None]:
# Benchmark Helper Functions

def test_window_attention(results, x, std_output, std_time, std_memory, seq_len, embed_dim, batch_size):
    """Test window attention performance"""
    if seq_len >= 8:  # Minimum size for window attention
        window_size = min(8, seq_len // 2)
        window_size = max(1, min(window_size, seq_len))
        
        window_attn = SimpleWindowAttention(
            embed_dim=embed_dim,
            window_size=window_size,
            num_heads=8
        ).to(device)
        
        window_output = window_attn(x)
        
        # Measure time
        torch.cuda.synchronize() if torch.cuda.is_available() else None
        start_time = time.time()
        for _ in range(50):
            _ = window_attn(x)
        torch.cuda.synchronize() if torch.cuda.is_available() else None
        window_time = (time.time() - start_time) / 50
        
        # Approximation quality
        approx_error = F.mse_loss(window_output, std_output).item()
        
        # Memory (approximately)
        num_windows = (seq_len + window_size - 1) // window_size
        window_memory = window_size * window_size * num_windows * batch_size * 4
        
        results['Window'][seq_len] = {
            'time': window_time,
            'memory': window_memory,
            'speedup': std_time / window_time,
            'memory_reduction': std_memory / window_memory,
            'approximation_error': approx_error
        }
    
    return results

def test_linear_attention(results, x, std_output, std_time, std_memory, seq_len, embed_dim, batch_size):
    """Test linear attention performance"""
    linear_attn = LinearAttention(embed_dim, kernel_type='elu').to(device)
    linear_output = linear_attn(x)
    
    torch.cuda.synchronize() if torch.cuda.is_available() else None
    start_time = time.time()
    for _ in range(50):
        _ = linear_attn(x)
    torch.cuda.synchronize() if torch.cuda.is_available() else None
    linear_time = (time.time() - start_time) / 50
    
    approx_error = F.mse_loss(linear_output, std_output).item()
    linear_memory = seq_len * embed_dim * batch_size * 4  # Linear in sequence length
    
    results['Linear'][seq_len] = {
        'time': linear_time,
        'memory': linear_memory,
        'speedup': std_time / linear_time,
        'memory_reduction': std_memory / linear_memory,
        'approximation_error': approx_error
    }
    
    return results

def test_sparse_attention(results, x, std_output, std_time, std_memory, seq_len, embed_dim, batch_size):
    """Test sparse attention performance"""
    sparse_attn = SparseAttention(embed_dim, sparsity_pattern='global_local', sparsity_factor=4).to(device)
    sparse_output, _ = sparse_attn(x)
    
    torch.cuda.synchronize() if torch.cuda.is_available() else None
    start_time = time.time()
    for _ in range(50):
        _ = sparse_attn(x)
    torch.cuda.synchronize() if torch.cuda.is_available() else None
    sparse_time = (time.time() - start_time) / 50
    
    approx_error = F.mse_loss(sparse_output, std_output).item()
    sparse_memory = (seq_len * seq_len // 4) * batch_size * 4  # Reduced by sparsity factor
    
    results['Sparse'][seq_len] = {
        'time': sparse_time,
        'memory': sparse_memory,
        'speedup': std_time / sparse_time,
        'memory_reduction': std_memory / sparse_memory,
        'approximation_error': approx_error
    }
    
    return results

In [None]:
# Complexity Scaling Analysis

def analyze_complexity_scaling():
    """Analyze how each method scales with sequence length"""
    sequence_lengths = [64, 128, 256, 512]
    embed_dim = 256
    
    methods = {
        'Standard': lambda: MultiHeadAttention(embed_dim),
        'Linear': lambda: LinearAttention(embed_dim, kernel_type='elu'),
        'Sparse': lambda: SparseAttention(embed_dim, sparsity_pattern='random', sparsity_factor=4),
        'Window': lambda: SimpleWindowAttention(embed_dim, window_size=8)
    }
    
    scaling_results = defaultdict(list)
    
    for seq_len in sequence_lengths:
        x = torch.randn(4, seq_len, embed_dim).to(device)
        
        for method_name, method_factory in methods.items():
            model = method_factory().to(device)
            
            # Measure time
            torch.cuda.synchronize() if torch.cuda.is_available() else None
            start_time = time.time()
            for _ in range(30):
                _ = model(x)
            torch.cuda.synchronize() if torch.cuda.is_available() else None
            avg_time = (time.time() - start_time) / 30
            
            scaling_results[method_name].append((seq_len, avg_time))
    
    return scaling_results

def theoretical_memory_complexity():
    """Compare theoretical memory complexities"""
    sequence_lengths = np.array([64, 128, 256, 512, 1024])
    
    # Theoretical complexities (normalized)
    standard_memory = sequence_lengths ** 2
    linear_memory = sequence_lengths
    window_memory = sequence_lengths  # Assuming fixed window size
    sparse_memory = sequence_lengths ** 2 / 4  # 4x sparsity
    
    return {
        'sequence_lengths': sequence_lengths,
        'Standard': standard_memory,
        'Linear': linear_memory,
        'Window': window_memory,
        'Sparse': sparse_memory
    }

In [None]:
# Run Comprehensive Benchmark

print("Running Comprehensive Benchmark...")
benchmark_results = comprehensive_benchmark()

print("\nBenchmark Results Summary:")
print("=" * 80)

for method_name, seq_results in benchmark_results.items():
    print(f"\n{method_name} Attention:")
    print("-" * 40)
    for seq_len, metrics in seq_results.items():
        print(f"  Seq Len {seq_len:3d}: Time={metrics['time']:.4f}s, "
              f"Speedup={metrics['speedup']:.2f}x, "
              f"Memory Reduction={metrics['memory_reduction']:.2f}x, "
              f"Error={metrics['approximation_error']:.6f}")

# Analyze scaling behavior
print("\nAnalyzing Complexity Scaling...")
scaling_data = analyze_complexity_scaling()

print("\nComplexity Scaling Results:")
print("=" * 50)
for method_name, timing_data in scaling_data.items():
    print(f"\n{method_name}:")
    for seq_len, exec_time in timing_data:
        print(f"  Length {seq_len:3d}: {exec_time:.4f}s")

# Memory complexity analysis
memory_analysis = theoretical_memory_complexity()
print("\nTheoretical Memory Complexity (normalized):")
print("=" * 60)
for i, seq_len in enumerate(memory_analysis['sequence_lengths']):
    print(f"Seq Len {seq_len:4d}: Standard={memory_analysis['Standard'][i]:8.0f}, "
          f"Linear={memory_analysis['Linear'][i]:6.0f}, "
          f"Window={memory_analysis['Window'][i]:6.0f}, "
          f"Sparse={memory_analysis['Sparse'][i]:8.0f}")

In [None]:
# Performance Visualization Functions - Part 1

def create_performance_visualizations():
    """Create comprehensive performance visualizations"""
    
    # Generate sample data for visualization (replace with actual benchmark results if available)
    sequence_lengths = np.array([64, 128, 256, 512])
    
    # Execution times (in milliseconds)
    execution_times = {
        'Standard': np.array([5.2, 18.7, 72.3, 285.1]),
        'Window': np.array([3.1, 8.9, 23.4, 67.8]),
        'Linear': np.array([2.8, 5.6, 11.2, 22.4]),
        'Sparse': np.array([3.5, 10.2, 28.7, 89.3])
    }
    
    # Memory usage (in MB)
    memory_usage = {
        'Standard': sequence_lengths ** 2 * 0.001,  # Quadratic scaling
        'Window': sequence_lengths * 0.02,          # Linear scaling
        'Linear': sequence_lengths * 0.015,         # Linear scaling
        'Sparse': sequence_lengths ** 2 * 0.00025   # Reduced quadratic
    }
    
    # Approximation errors
    approximation_errors = {
        'Standard': np.zeros_like(sequence_lengths, dtype=float),
        'Window': np.array([0.001, 0.002, 0.003, 0.004]),
        'Linear': np.array([0.05, 0.06, 0.07, 0.08]),
        'Sparse': np.array([0.01, 0.015, 0.02, 0.025])
    }
    
    # Create comprehensive visualization
    fig = plt.figure(figsize=(20, 15))
    
    # 1. Execution Time Comparison
    ax1 = plt.subplot(3, 3, 1)
    for method, times in execution_times.items():
        plt.plot(sequence_lengths, times, 'o-', label=method, linewidth=2, markersize=8)
    plt.xlabel('Sequence Length')
    plt.ylabel('Execution Time (ms)')
    plt.title('Execution Time vs Sequence Length')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.yscale('log')
    
    # 2. Memory Usage Comparison
    ax2 = plt.subplot(3, 3, 2)
    for method, memory in memory_usage.items():
        plt.plot(sequence_lengths, memory, 's-', label=method, linewidth=2, markersize=8)
    plt.xlabel('Sequence Length')
    plt.ylabel('Memory Usage (MB)')
    plt.title('Memory Usage vs Sequence Length')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.yscale('log')
    
    # 3. Speedup Comparison
    ax3 = plt.subplot(3, 3, 3)
    for method, times in execution_times.items():
        if method != 'Standard':
            speedup = execution_times['Standard'] / times
            plt.plot(sequence_lengths, speedup, '^-', label=method, linewidth=2, markersize=8)
    plt.xlabel('Sequence Length')
    plt.ylabel('Speedup (vs Standard)')
    plt.title('Speedup vs Sequence Length')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.axhline(y=1, color='k', linestyle='--', alpha=0.5)
    
    return fig, execution_times, memory_usage, approximation_errors, sequence_lengths

In [None]:
# Performance Visualization Functions - Part 2

def create_additional_visualizations(fig, execution_times, memory_usage, approximation_errors, sequence_lengths):
    """Add remaining visualizations to the figure"""
    
    # 4. Approximation Error
    ax4 = plt.subplot(3, 3, 4)
    for method, errors in approximation_errors.items():
        if method != 'Standard':
            plt.plot(sequence_lengths, errors, 'd-', label=method, linewidth=2, markersize=8)
    plt.xlabel('Sequence Length')
    plt.ylabel('MSE Error vs Standard')
    plt.title('Approximation Error')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.yscale('log')
    
    # 5. Efficiency vs Accuracy Trade-off
    ax5 = plt.subplot(3, 3, 5)
    for i, seq_len in enumerate([128, 256, 512]):
        idx = np.where(sequence_lengths == seq_len)[0][0]
        for method in ['Window', 'Linear', 'Sparse']:
            speedup = execution_times['Standard'][idx] / execution_times[method][idx]
            error = approximation_errors[method][idx]
            plt.scatter(error, speedup, s=100, label=f'{method} (L={seq_len})', alpha=0.7)
    
    plt.xlabel('Approximation Error (MSE)')
    plt.ylabel('Speedup')
    plt.title('Efficiency vs Accuracy Trade-off')
    plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
    plt.grid(True, alpha=0.3)
    plt.xscale('log')
    
    # 6. Theoretical Complexity Visualization
    ax6 = plt.subplot(3, 3, 6)
    extended_lengths = np.logspace(1, 4, 100)  # 10 to 10000
    
    # Theoretical time complexities (normalized)
    standard_complexity = extended_lengths ** 2
    linear_complexity = extended_lengths
    window_complexity = extended_lengths  # Assuming fixed window
    sparse_complexity = extended_lengths ** 2 / 4
    
    plt.loglog(extended_lengths, standard_complexity, label='O(n²) - Standard', linewidth=2)
    plt.loglog(extended_lengths, linear_complexity, label='O(n) - Linear/Window', linewidth=2)
    plt.loglog(extended_lengths, sparse_complexity, label='O(n²/s) - Sparse', linewidth=2)
    
    plt.xlabel('Sequence Length (n)')
    plt.ylabel('Computational Complexity')
    plt.title('Theoretical Complexity Scaling')
    plt.legend()
    plt.grid(True, alpha=0.3)
    
    return fig

In [None]:
# Performance Visualization Functions - Part 3

def create_final_visualizations(fig, execution_times, memory_usage, approximation_errors, sequence_lengths):
    """Add the final set of visualizations"""
    
    # 7. Attention Pattern Visualization
    ax7 = plt.subplot(3, 3, 7)
    seq_len = 64
    patterns = {
        'Standard': torch.ones(seq_len, seq_len),
        'Window': create_sparse_mask(seq_len, 'local_window', window_size=8),
        'Sparse': create_sparse_mask(seq_len, 'global_local', sparsity_factor=4)
    }
    
    # Create composite visualization
    composite = torch.zeros(seq_len, seq_len * 3)
    for i, (name, pattern) in enumerate(patterns.items()):
        start_col = i * seq_len
        end_col = (i + 1) * seq_len
        composite[:, start_col:end_col] = pattern.float()
    
    plt.imshow(composite.numpy(), cmap='Blues', aspect='auto')
    plt.title('Attention Patterns Comparison')
    plt.ylabel('Query Position')
    
    # Add labels
    plt.xticks([seq_len//2, seq_len + seq_len//2, 2*seq_len + seq_len//2], 
               ['Standard', 'Window', 'Sparse'])
    
    # 8. Memory Efficiency Bar Chart
    ax8 = plt.subplot(3, 3, 8)
    methods = ['Standard', 'Window', 'Linear', 'Sparse']
    seq_len_idx = 2  # 256 tokens
    memory_values = [memory_usage[method][seq_len_idx] for method in methods]
    colors = ['red', 'green', 'blue', 'orange']
    
    bars = plt.bar(methods, memory_values, color=colors, alpha=0.7)
    plt.ylabel('Memory Usage (MB)')
    plt.title(f'Memory Usage Comparison\n(Sequence Length: {sequence_lengths[seq_len_idx]})')
    plt.grid(True, alpha=0.3, axis='y')
    
    # Add value labels on bars
    for bar, value in zip(bars, memory_values):
        plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.1,
                f'{value:.2f}', ha='center', va='bottom')
    
    # 9. Comprehensive Performance Radar Chart
    ax9 = plt.subplot(3, 3, 9, projection='polar')
    
    # Metrics for radar chart (normalized 0-1, higher is better)
    metrics = ['Speed', 'Memory\nEfficiency', 'Accuracy', 'Scalability', 'Implementation\nComplexity']
    
    # Sample scores (0-1, higher is better)
    scores = {
        'Window': [0.8, 0.9, 0.85, 0.9, 0.7],
        'Linear': [0.9, 0.95, 0.6, 0.95, 0.8],
        'Sparse': [0.7, 0.8, 0.8, 0.8, 0.6]
    }
    
    angles = np.linspace(0, 2 * np.pi, len(metrics), endpoint=False)
    angles = np.concatenate((angles, [angles[0]]))  # Complete the circle
    
    for method, values in scores.items():
        values_plot = np.concatenate((values, [values[0]]))  # Complete the circle
        ax9.plot(angles, values_plot, 'o-', linewidth=2, label=method)
        ax9.fill(angles, values_plot, alpha=0.25)
    
    ax9.set_xticks(angles[:-1])
    ax9.set_xticklabels(metrics)
    ax9.set_ylim(0, 1)
    ax9.set_title('Comprehensive Performance Comparison')
    ax9.legend(loc='upper right', bbox_to_anchor=(1.3, 1.0))
    ax9.grid(True)
    
    plt.tight_layout()
    plt.show()
    
    return fig

In [None]:
# Analysis Summary Functions

def create_detailed_analysis_summary():
    """Create a detailed analysis summary"""
    
    print("COMPREHENSIVE ANALYSIS SUMMARY")
    print("=" * 80)
    
    print("\n1. COMPUTATIONAL COMPLEXITY ANALYSIS:")
    print("-" * 50)
    print("Standard Attention:     O(n²·d) - Quadratic scaling")
    print("Window Attention:       O(n·W²·d) ≈ O(n·d) - Linear scaling")
    print("Linear Attention:       O(n·d·d_φ) ≈ O(n·d) - Linear scaling")
    print("Sparse Attention:       O(s·n·d) where s << n - Reduced quadratic")
    
    print("\n2. TRADE-OFF ANALYSIS:")
    print("-" * 50)
    print("Window Attention:")
    print("  ✓ Excellent speed improvement (5-10x)")
    print("  ✓ Maintains local spatial relationships")
    print("  ✓ Low approximation error")
    print("  ✗ Limited global context")
    print("  ✗ Image-specific (requires 2D structure)")
    
    print("\nLinear Attention:")
    print("  ✓ Best scalability (truly linear)")
    print("  ✓ Maintains global receptive field")
    print("  ✓ Hardware efficient")
    print("  ✗ Higher approximation error")
    print("  ✗ Kernel choice affects performance")
    
    print("\nSparse Attention:")
    print("  ✓ Configurable sparsity patterns")
    print("  ✓ Good balance of efficiency and accuracy")
    print("  ✓ Maintains some global connections")
    print("  ✗ Pattern design complexity")
    print("  ✗ Implementation complexity")
    
    print("\n3. PRACTICAL RECOMMENDATIONS:")
    print("-" * 50)
    print("• For image tasks (ViT): Window Attention")
    print("• For very long sequences: Linear Attention")
    print("• For balanced performance: Sparse Attention with global+local pattern")
    print("• For mobile/edge deployment: Linear or Window Attention")
    
    print("\n4. FUTURE DIRECTIONS:")
    print("-" * 50)
    print("• Learnable sparse patterns")
    print("• Hybrid attention mechanisms")
    print("• Hardware-specific optimizations")
    print("• Dynamic attention routing")

In [None]:
# Execute Visualizations and Analysis

print("Creating Comprehensive Performance Visualizations...")
fig, execution_times, memory_usage, approximation_errors, sequence_lengths = create_performance_visualizations()

# Add remaining visualizations
fig = create_additional_visualizations(fig, execution_times, memory_usage, approximation_errors, sequence_lengths)
fig = create_final_visualizations(fig, execution_times, memory_usage, approximation_errors, sequence_lengths)

# Generate analysis summary
create_detailed_analysis_summary()