# Module 4 - Exercise 6: PyTorch Advanced Topics

<a href="https://colab.research.google.com/github/jumpingsphinx/jumpingsphinx.github.io/blob/main/notebooks/module4-neural-networks/exercise6-pytorch-advanced.ipynb" target="_blank"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Learning Objectives

By the end of this exercise, you will be able to:

- Implement custom layers and loss functions
- Use learning rate schedulers and advanced optimizers
- Apply data augmentation techniques
- Implement early stopping and model checkpointing
- Use TensorBoard for visualization
- Build production-ready training pipelines

## Prerequisites

- Completion of Exercise 5 (PyTorch Basics)
- Understanding of advanced deep learning concepts
- Familiarity with PyTorch ecosystem

## Setup

Run this cell first to import required libraries:

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset, random_split
from torchvision import datasets, transforms, models
from torch.optim.lr_scheduler import StepLR, ReduceLROnPlateau, CosineAnnealingLR

import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns
from tqdm import tqdm
import copy

# Set random seeds
torch.manual_seed(42)
np.random.seed(42)

# Check for GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

print(f"PyTorch version: {torch.__version__}")
print(f"Device: {device}")
print("Setup complete!")

---

## Part 1: Custom Layers and Modules

### Background

Sometimes you need to create custom layers with specialized behavior. PyTorch makes this easy by subclassing nn.Module.

### Exercise 1.1: Create a Custom Residual Block

**Task:** Implement a residual connection block.

In [None]:
class ResidualBlock(nn.Module):
    """
    Residual block with skip connection.
    
    out = F(x) + x
    """
    
    def __init__(self, in_features, out_features):
        super(ResidualBlock, self).__init__()
        
        # Main path
        self.fc1 = nn.Linear(in_features, out_features)
        self.bn1 = nn.BatchNorm1d(out_features)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(out_features, out_features)
        self.bn2 = nn.BatchNorm1d(out_features)
        
        # Skip connection (if dimensions don't match)
        self.skip = None
        if in_features != out_features:
            self.skip = nn.Linear(in_features, out_features)
    def forward(self, x):
        """
        Forward pass with residual connection.
        """
        identity = x
        
        # Main path
        out = self.fc1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.fc2(out)
        out = self.bn2(out)
        
        # Skip connection
        if self.skip is not None:
            identity = self.skip(identity)
        
        # Add and activate
        out += identity
        out = self.relu(out)
        return out
        return out

# Test residual block
print("Testing Residual Block")
print("=" * 70)

res_block = ResidualBlock(64, 128)
x_test = torch.randn(32, 64)  # Batch of 32
output = res_block(x_test)

print(f"Input shape: {x_test.shape}")
print(f"Output shape: {output.shape}")
print("✓ Residual block working!")

### Exercise 1.2: Create a Custom Attention Layer

**Task:** Implement a simple self-attention mechanism.

In [None]:
class SelfAttention(nn.Module):
    """
    Simple self-attention layer.
    """
    
    def __init__(self, in_features):
        super(SelfAttention, self).__init__()
        
        # Create query, key, value projections
        self.query = nn.Linear(in_features, in_features)
        self.key = nn.Linear(in_features, in_features)
        self.value = nn.Linear(in_features, in_features)
        
        self.scale = np.sqrt(in_features)
    def forward(self, x):
        """
        Apply self-attention.
        
        Parameters:
        -----------
        x : torch.Tensor, shape (batch_size, in_features)
        
        Returns:
        --------
        out : torch.Tensor, shape (batch_size, in_features)
        """
        Q = self.query(x)
        K = self.key(x)
        V = self.value(x)
        
        # Compute attention scores
        attention_scores = torch.matmul(Q, K.T) / self.scale
        attention_weights = F.softmax(attention_scores, dim=-1)
        
        # Apply attention to values
        out = torch.matmul(attention_weights, V)
        return out
        return out

# Test attention
attention = SelfAttention(64)
x_test = torch.randn(32, 64)
output = attention(x_test)
print(f"\nAttention input: {x_test.shape}")
print(f"Attention output: {output.shape}")
print("✓ Self-attention working!")

---

## Part 2: Learning Rate Schedulers

### Background

Learning rate scheduling adjusts the learning rate during training for better convergence.

### Exercise 2.1: Compare Different Schedulers

**Task:** Test various learning rate schedules.

In [None]:
print("Learning Rate Schedulers")
print("=" * 70)

# Create a dummy model and optimizer
model = nn.Linear(10, 1)
optimizer = optim.SGD(model.parameters(), lr=0.1)

# Different schedulers
schedulers = {
    'StepLR': StepLR(optimizer, step_size=10, gamma=0.5),
    'ReduceLROnPlateau': ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5),
    'CosineAnnealing': CosineAnnealingLR(optimizer, T_max=50, eta_min=0.001)
}

# Simulate training and track learning rates
epochs = 50
lr_history = {name: [] for name in schedulers.keys()}

for name, scheduler in schedulers.items():
    # Reset optimizer
    optimizer = optim.SGD(model.parameters(), lr=0.1)
    
    if name == 'StepLR':
        scheduler = StepLR(optimizer, step_size=10, gamma=0.5)
    elif name == 'ReduceLROnPlateau':
        scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5)
    else:
        scheduler = CosineAnnealingLR(optimizer, T_max=50, eta_min=0.001)
    
    for epoch in range(epochs):
        # Simulate validation loss (decreasing with noise)
        val_loss = 1.0 / (epoch + 1) + np.random.randn() * 0.05
        
        # Record learning rate
        lr_history[name].append(optimizer.param_groups[0]['lr'])
        
        # Step scheduler
        if name == 'ReduceLROnPlateau':
            scheduler.step(val_loss)
        else:
            scheduler.step()

# Plot learning rate schedules
plt.figure(figsize=(12, 6))
for name, lrs in lr_history.items():
    plt.plot(lrs, label=name, linewidth=2)

plt.xlabel('Epoch', fontsize=12)
plt.ylabel('Learning Rate', fontsize=12)
plt.title('Learning Rate Schedulers Comparison', fontsize=14, fontweight='bold')
plt.legend(fontsize=11)
plt.grid(True, alpha=0.3)
plt.yscale('log')
plt.show()

print("\nScheduler Descriptions:")
print("  • StepLR: Reduces LR by factor every N epochs")
print("  • ReduceLROnPlateau: Reduces LR when metric stops improving")
print("  • CosineAnnealing: Smooth cosine decay to minimum LR")

---

## Part 3: Regularization Techniques

### Exercise 3.1: Implement Network with Dropout and Batch Normalization

**Task:** Create a well-regularized network.

In [None]:
class RegularizedMLP(nn.Module):
    """
    MLP with dropout and batch normalization.
    """
    
    def __init__(self, input_size, hidden_sizes, output_size, dropout_rate=0.5):
        super(RegularizedMLP, self).__init__()
        
        layers = []
        
        # Input layer
        # Input layer
        layers.append(nn.Linear(input_size, hidden_sizes[0]))
        layers.append(nn.BatchNorm1d(hidden_sizes[0]))
        layers.append(nn.ReLU())
        layers.append(nn.Dropout(dropout_rate))
        
        # Hidden layers
        for i in range(len(hidden_sizes) - 1):
            layers.append(nn.Linear(hidden_sizes[i], hidden_sizes[i+1]))
            layers.append(nn.BatchNorm1d(hidden_sizes[i+1]))
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(dropout_rate))
        
        # Output layer
        layers.append(nn.Linear(hidden_sizes[-1], output_size))
        
        self.network = nn.Sequential(*layers)
    def forward(self, x):
        return self.network(x)

# Create regularized model
model_reg = RegularizedMLP(input_size=784, 
                          hidden_sizes=[256, 128, 64], 
                          output_size=10,
                          dropout_rate=0.3)

print("Regularized MLP Architecture:")
print(model_reg)

total_params = sum(p.numel() for p in model_reg.parameters())
print(f"\nTotal parameters: {total_params:,}")

### Exercise 3.2: Train with L2 Regularization (Weight Decay)

**Task:** Compare training with and without weight decay.

In [None]:
# Load MNIST data
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,))
])

train_dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
test_dataset = datasets.MNIST(root='./data', train=False, transform=transform)

# Split train into train/val
train_size = int(0.8 * len(train_dataset))
val_size = len(train_dataset) - train_size
train_dataset, val_dataset = random_split(train_dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=128, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False)

print(f"Training samples: {len(train_dataset)}")
print(f"Validation samples: {len(val_dataset)}")
print(f"Test samples: {len(test_dataset)}")

In [None]:
def train_model(model, train_loader, val_loader, epochs=20, lr=0.001, weight_decay=0):
    """
    Train model and track metrics.
    """
    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
    
    history = {'train_loss': [], 'val_loss': [], 'val_acc': []}
    
    for epoch in range(epochs):
        # Training
        model.train()
        train_loss = 0
        for data, target in train_loader:
            data = data.view(data.size(0), -1).to(device)
            target = target.to(device)
            
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
        
        # Validation
        model.eval()
        val_loss = 0
        correct = 0
        total = 0
        
        with torch.no_grad():
            for data, target in val_loader:
                data = data.view(data.size(0), -1).to(device)
                target = target.to(device)
                
                output = model(data)
                loss = criterion(output, target)
                
                val_loss += loss.item()
                _, predicted = output.max(1)
                total += target.size(0)
                correct += predicted.eq(target).sum().item()
        
        train_loss /= len(train_loader)
        val_loss /= len(val_loader)
        val_acc = 100. * correct / total
        
        history['train_loss'].append(train_loss)
        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_acc)
        
        if (epoch + 1) % 5 == 0:
            print(f"Epoch {epoch+1}/{epochs}: Train Loss: {train_loss:.4f}, "
                  f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%")
    
    return history

# Train without weight decay
print("Training WITHOUT weight decay")
print("=" * 70)
model_no_wd = RegularizedMLP(784, [256, 128, 64], 10, dropout_rate=0.3)
history_no_wd = train_model(model_no_wd, train_loader, val_loader, epochs=20, weight_decay=0)

# Train with weight decay
print("\nTraining WITH weight decay")
print("=" * 70)
model_wd = RegularizedMLP(784, [256, 128, 64], 10, dropout_rate=0.3)
history_wd = train_model(model_wd, train_loader, val_loader, epochs=20, weight_decay=0.001)

# Compare
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))

ax1.plot(history_no_wd['train_loss'], label='No WD - Train', linewidth=2)
ax1.plot(history_no_wd['val_loss'], label='No WD - Val', linewidth=2)
ax1.plot(history_wd['train_loss'], label='WD - Train', linewidth=2, linestyle='--')
ax1.plot(history_wd['val_loss'], label='WD - Val', linewidth=2, linestyle='--')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Loss')
ax1.set_title('Loss Comparison')
ax1.legend()
ax1.grid(True, alpha=0.3)

ax2.plot(history_no_wd['val_acc'], label='No Weight Decay', linewidth=2)
ax2.plot(history_wd['val_acc'], label='With Weight Decay', linewidth=2)
ax2.set_xlabel('Epoch')
ax2.set_ylabel('Validation Accuracy (%)')
ax2.set_title('Validation Accuracy')
ax2.legend()
ax2.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print(f"\nFinal Val Acc (No WD): {history_no_wd['val_acc'][-1]:.2f}%")
print(f"Final Val Acc (With WD): {history_wd['val_acc'][-1]:.2f}%")

---

## Part 4: Data Augmentation

### Exercise 4.1: Apply Augmentation Transforms

**Task:** Create augmented datasets for better generalization.

In [None]:
print("Data Augmentation")
print("=" * 70)

# Define augmentation transforms
    train_transform = transforms.Compose([
        transforms.RandomRotation(10),  # Rotate by ±10 degrees
        transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)),  # Shift
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ])
    
    test_transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ])
    
    # Load datasets with augmentation
    train_dataset_aug = datasets.MNIST(root='./data', train=True, 
                                       download=True, transform=train_transform)
    test_dataset_aug = datasets.MNIST(root='./data', train=False, 
                                      download=True, transform=test_transform)
print("Data augmentation applies random transformations during training.")
print("This helps the model generalize better to variations in the data.")

---

## Part 5: Model Checkpointing

### Exercise 5.1: Implement Checkpoint Saving

**Task:** Save best model during training.

In [None]:
class EarlyStopping:
    """
    Early stopping to stop training when validation loss doesn't improve.
    """
    
    def __init__(self, patience=5, min_delta=0, verbose=True):
        self.patience = patience
        self.min_delta = min_delta
        self.verbose = verbose
        self.counter = 0
        self.best_loss = None
        self.early_stop = False
        self.best_model = None
    
    def __call__(self, val_loss, model):
        if self.best_loss is None:
            self.best_loss = val_loss
            self.best_model = copy.deepcopy(model.state_dict())
        elif val_loss > self.best_loss - self.min_delta:
            self.counter += 1
            if self.verbose:
                print(f"EarlyStopping counter: {self.counter}/{self.patience}")
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_loss = val_loss
            self.best_model = copy.deepcopy(model.state_dict())
            self.counter = 0

def train_with_checkpointing(model, train_loader, val_loader, epochs=50, lr=0.001):
    """
    Train with checkpointing and early stopping.
    """
    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3)
    early_stopping = EarlyStopping(patience=7, verbose=True)
    
    history = {'train_loss': [], 'val_loss': [], 'val_acc': []}
    best_val_acc = 0
    
    for epoch in range(epochs):
        # Training
        model.train()
        train_loss = 0
        for data, target in train_loader:
            data = data.view(data.size(0), -1).to(device)
            target = target.to(device)
            
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
        
        # Validation
        model.eval()
        val_loss = 0
        correct = 0
        total = 0
        
        with torch.no_grad():
            for data, target in val_loader:
                data = data.view(data.size(0), -1).to(device)
                target = target.to(device)
                
                output = model(data)
                loss = criterion(output, target)
                
                val_loss += loss.item()
                _, predicted = output.max(1)
                total += target.size(0)
                correct += predicted.eq(target).sum().item()
        
        train_loss /= len(train_loader)
        val_loss /= len(val_loader)
        val_acc = 100. * correct / total
        
        history['train_loss'].append(train_loss)
        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_acc)
        
        # Learning rate scheduling
        scheduler.step(val_loss)
        
        # Save best model
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'val_acc': val_acc,
                'val_loss': val_loss
            }, 'best_model.pth')
            print(f"✓ Saved new best model (Val Acc: {val_acc:.2f}%)")
        
        # Early stopping
        early_stopping(val_loss, model)
        if early_stopping.early_stop:
            print(f"Early stopping at epoch {epoch+1}")
            model.load_state_dict(early_stopping.best_model)
            break
        
        print(f"Epoch {epoch+1}/{epochs}: Train Loss: {train_loss:.4f}, "
              f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%, "
              f"LR: {optimizer.param_groups[0]['lr']:.6f}")
    
    return history

# Train with checkpointing
print("Training with Checkpointing and Early Stopping")
print("=" * 70)

model_checkpoint = RegularizedMLP(784, [256, 128, 64], 10, dropout_rate=0.3)
history_checkpoint = train_with_checkpointing(model_checkpoint, train_loader, val_loader, epochs=50)

---

## Part 6: Transfer Learning Concepts

### Exercise 6.1: Use Pre-trained Features

**Task:** Demonstrate feature extraction with a pre-trained network.

In [None]:
print("Transfer Learning Concepts")
print("=" * 70)

# Load pre-trained ResNet (example - won't train on MNIST)
# This demonstrates the concept
pretrained_model = models.resnet18(pretrained=True)

print("Pre-trained ResNet18:")
print(pretrained_model)

# Freeze all layers
for param in pretrained_model.parameters():
    param.requires_grad = False

# Replace final layer
num_features = pretrained_model.fc.in_features
pretrained_model.fc = nn.Linear(num_features, 10)  # 10 classes for MNIST

print("\nModified for MNIST (last layer):")
print(pretrained_model.fc)

# Count trainable parameters
trainable_params = sum(p.numel() for p in pretrained_model.parameters() if p.requires_grad)
total_params = sum(p.numel() for p in pretrained_model.parameters())

print(f"\nTotal parameters: {total_params:,}")
print(f"Trainable parameters: {trainable_params:,}")
print(f"Frozen parameters: {total_params - trainable_params:,}")

print("\nTransfer Learning Benefits:")
print("  • Faster training (fewer parameters to update)")
print("  • Better performance with less data")
print("  • Leverage knowledge from large datasets")

---

## Part 7: Advanced Architecture

### Exercise 7.1: Build a Multi-Path Network

**Task:** Create a network with parallel paths (Inception-style).

In [None]:
class MultiPathBlock(nn.Module):
    """
    Multi-path block with parallel processing.
    """
    
    def __init__(self, in_features):
        super(MultiPathBlock, self).__init__()
        
        # Path 1: Direct
        self.path1 = nn.Sequential(
            nn.Linear(in_features, in_features),
            nn.ReLU()
        )
        
        # Path 2: Deeper
        self.path2 = nn.Sequential(
            nn.Linear(in_features, in_features // 2),
            nn.ReLU(),
            nn.Linear(in_features // 2, in_features),
            nn.ReLU()
        )
        
        # Combine paths
        self.combine = nn.Linear(in_features * 2, in_features)
    
    def forward(self, x):
        # Process through parallel paths
        out1 = self.path1(x)
        out2 = self.path2(x)
        
        # Concatenate and combine
        combined = torch.cat([out1, out2], dim=1)
        output = self.combine(combined)
        
        return output

class AdvancedNetwork(nn.Module):
    """
    Advanced network with multiple techniques.
    """
    
    def __init__(self, input_size, output_size):
        super(AdvancedNetwork, self).__init__()
        
        self.input_layer = nn.Sequential(
            nn.Linear(input_size, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.Dropout(0.3)
        )
        
        self.multi_path = MultiPathBlock(256)
        
        self.residual = ResidualBlock(256, 128)
        
        self.output_layer = nn.Linear(128, output_size)
    
    def forward(self, x):
        x = self.input_layer(x)
        x = self.multi_path(x)
        x = self.residual(x)
        x = self.output_layer(x)
        return x

# Create advanced network
advanced_model = AdvancedNetwork(784, 10)
print("Advanced Network Architecture:")
print(advanced_model)

total_params = sum(p.numel() for p in advanced_model.parameters())
print(f"\nTotal parameters: {total_params:,}")

---

## Challenge Problems (Optional)

### Challenge 1: Implement Gradient Clipping

Add gradient clipping to prevent exploding gradients.

In [None]:
    # Use torch.nn.utils.clip_grad_norm_()
    torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
print("Challenge: Implement gradient clipping!")

### Challenge 2: Mixed Precision Training

Use automatic mixed precision for faster training.

In [None]:
    # Use torch.cuda.amp.autocast and GradScaler
    scaler = torch.cuda.amp.GradScaler()
    
    # In training loop:
    # with torch.cuda.amp.autocast():
    #     output = model(data)
    #     loss = criterion(output, target)
    # scaler.scale(loss).backward()
    # scaler.step(optimizer)
    # scaler.update()
print("Challenge: Implement mixed precision training!")

### Challenge 3: Custom Loss Function

Create a custom loss function combining multiple objectives.

In [None]:
class CustomLoss(nn.Module):
    def __init__(self, weight_decay=0.0):
        super().__init__()
        self.ce = nn.CrossEntropyLoss()
        self.weight_decay = weight_decay
        
    def forward(self, output, target, model):
        loss = self.ce(output, target)
        l1_reg = torch.tensor(0.).to(output.device)
        for param in model.parameters():
            l1_reg += torch.norm(param, 1)
        return loss + self.weight_decay * l1_reg

---

## Reflection Questions

1. **When should you use residual connections?**
   - Think about very deep networks

2. **How does batch normalization help training?**
   - Consider internal covariate shift

3. **What's the difference between dropout and weight decay?**
   - How do they prevent overfitting differently?

4. **Why is early stopping important?**
   - What problems does it solve?

5. **When is transfer learning most effective?**
   - Think about dataset size and similarity

---

## Summary

In this exercise, you learned:

- Creating custom layers (residual blocks, attention)
- Using learning rate schedulers for adaptive learning
- Applying regularization (dropout, batch norm, weight decay)
- Data augmentation for better generalization
- Model checkpointing and early stopping
- Transfer learning concepts
- Building advanced architectures with multiple techniques

**Key Takeaways:**

- Custom modules enable flexible architecture design
- Learning rate scheduling improves convergence
- Multiple regularization techniques work together
- Data augmentation is crucial for small datasets
- Checkpointing prevents loss of progress
- Transfer learning leverages pre-trained knowledge
- Combining techniques creates robust models

**Next Steps:**

- Explore convolutional neural networks (CNNs)
- Study recurrent neural networks (RNNs)
- Learn about attention mechanisms and transformers
- Practice on real-world datasets (ImageNet, COCO, etc.)

---

**Congratulations!** You've completed the Neural Networks module. You now have a solid foundation in both the theory and practice of deep learning with PyTorch.

**Need help?** Check the solution notebook or open an issue on [GitHub](https://github.com/jumpingsphinx/jumpingsphinx.github.io/issues).