# 3. Custom Convolutional Neural Network (CNN)

## Overview

In this notebook, we'll build a **CNN from scratch** for facial expression recognition.

### Why CNNs for Images?

CNNs are specifically designed for image data because they:

1. **Learn hierarchical features** - From edges → textures → patterns → objects
2. **Are translation invariant** - Can recognize faces regardless of position
3. **Share parameters** - Same filter applied across entire image (efficient)
4. **Preserve spatial structure** - Unlike flattening to a vector

### CNN Building Blocks:

| Layer | Purpose |
|-------|--------|
| **Conv2D** | Extract local features using learnable filters |
| **BatchNorm** | Normalize activations, stabilize training |
| **ReLU** | Non-linearity (negative values → 0) |
| **MaxPool** | Reduce spatial size, add translation invariance |
| **Dropout** | Prevent overfitting by randomly dropping neurons |
| **Linear** | Final classification layers |

## Step 1: Import Libraries

In [None]:
# Standard imports
import numpy as np
import pandas as pd
from pathlib import Path
import pickle
import json
import time

# PyTorch
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

# Image processing
from PIL import Image
from torchvision import transforms

# Visualization
import matplotlib.pyplot as plt
import seaborn as sns

# Progress bars
from tqdm.notebook import tqdm

# Scikit-learn for metrics
from sklearn.metrics import classification_report, confusion_matrix

# Set seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)

print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")

In [None]:
# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

## Step 2: Configuration and Constants

In [None]:
# =============================================================================
# CONFIGURATION
# =============================================================================

# Image settings
IMG_SIZE = 224
IMAGENET_MEAN = [0.485, 0.456, 0.406]
IMAGENET_STD = [0.229, 0.224, 0.225]

# Training settings
BATCH_SIZE = 32
NUM_EPOCHS = 25
LEARNING_RATE = 0.001
PATIENCE = 5  # Early stopping patience

# Model settings
DROPOUT_RATE = 0.5

# Emotion classes
EMOTION_CLASSES = [
    "anger", "disgust", "fear", "happiness",
    "neutral", "sadness", "surprise"
]
NUM_CLASSES = len(EMOTION_CLASSES)
EMOTION_TO_IDX = {e: i for i, e in enumerate(EMOTION_CLASSES)}
IDX_TO_EMOTION = {i: e for i, e in enumerate(EMOTION_CLASSES)}

print("Configuration:")
print(f"  Image size: {IMG_SIZE}x{IMG_SIZE}")
print(f"  Batch size: {BATCH_SIZE}")
print(f"  Epochs: {NUM_EPOCHS}")
print(f"  Learning rate: {LEARNING_RATE}")
print(f"  Dropout: {DROPOUT_RATE}")
print(f"  Classes: {NUM_CLASSES}")

## Step 3: Load and Prepare Data

In [None]:
# Load processed data from notebook 1
with open('processed_data.pkl', 'rb') as f:
    data = pickle.load(f)

train_df = data['train_df']
val_df = data['val_df']
test_df = data['test_df']

print(f"Training samples: {len(train_df)}")
print(f"Validation samples: {len(val_df)}")
print(f"Test samples: {len(test_df)}")

In [None]:
# Define transforms
train_transform = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(degrees=10),
    transforms.ColorJitter(brightness=0.2, contrast=0.2),
    transforms.ToTensor(),
    transforms.Normalize(mean=IMAGENET_MEAN, std=IMAGENET_STD)
])

test_transform = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean=IMAGENET_MEAN, std=IMAGENET_STD)
])

print("Transforms defined!")

In [None]:
# Dataset class
class FacialExpressionDataset(Dataset):
    """
    PyTorch Dataset for facial expression images.
    """
    def __init__(self, dataframe, transform=None):
        self.data = dataframe.reset_index(drop=True)
        self.transform = transform
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        image = Image.open(row['image_path']).convert('RGB')
        label = row['label']
        
        if self.transform:
            image = self.transform(image)
        
        return image, label

# Create datasets
train_dataset = FacialExpressionDataset(train_df, transform=train_transform)
val_dataset = FacialExpressionDataset(val_df, transform=test_transform)
test_dataset = FacialExpressionDataset(test_df, transform=test_transform)

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=0, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0, pin_memory=True)

print(f"Training batches: {len(train_loader)}")
print(f"Validation batches: {len(val_loader)}")
print(f"Test batches: {len(test_loader)}")

## Step 4: Build the CNN Architecture

We'll build a CNN with the following structure:

```
Input (3 x 224 x 224)
    ↓
ConvBlock1: 3 → 32 channels  (output: 32 x 112 x 112)
    ↓
ConvBlock2: 32 → 64 channels (output: 64 x 56 x 56)
    ↓
ConvBlock3: 64 → 128 channels (output: 128 x 28 x 28)
    ↓
ConvBlock4: 128 → 256 channels (output: 256 x 14 x 14)
    ↓
Global Average Pooling (output: 256)
    ↓
FC Layer: 256 → 128 + ReLU + Dropout
    ↓
FC Layer: 128 → 7 (num_classes)
    ↓
Output (7 class logits)
```

In [None]:
class ConvBlock(nn.Module):
    """
    A single convolutional block.
    
    Structure: Conv2D → BatchNorm → ReLU → MaxPool
    
    Why this order?
    - Conv2D: Extract features using learnable filters
    - BatchNorm: Normalize to stabilize training, allows higher learning rates
    - ReLU: Non-linearity. Without it, stacking layers would just be linear!
    - MaxPool: Reduce spatial dimensions by 2x, adds translation invariance
    """
    
    def __init__(self, in_channels, out_channels, kernel_size=3):
        super().__init__()
        
        # Convolutional layer
        # padding = kernel_size // 2 maintains spatial dimensions
        self.conv = nn.Conv2d(
            in_channels, 
            out_channels,
            kernel_size=kernel_size,
            padding=kernel_size // 2  # 'same' padding
        )
        
        # Batch normalization
        # Normalizes activations across the batch
        self.bn = nn.BatchNorm2d(out_channels)
        
        # Max pooling
        # Reduces spatial dimensions by factor of 2
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
    
    def forward(self, x):
        x = self.conv(x)    # Apply convolution
        x = self.bn(x)      # Normalize
        x = F.relu(x)       # Non-linearity
        x = self.pool(x)    # Downsample
        return x

print("ConvBlock defined!")

In [None]:
class CustomCNN(nn.Module):
    """
    Custom CNN for facial expression recognition.
    
    Architecture:
    - 4 convolutional blocks (progressively increasing channels)
    - Global average pooling (reduces to 1x1 spatial size)
    - 2 fully connected layers with dropout
    
    Why Global Average Pooling?
    - Reduces parameters significantly (no flattening + FC)
    - More robust to spatial translations
    - Acts as structural regularizer (prevents overfitting)
    """
    
    def __init__(self, num_classes=NUM_CLASSES, dropout=DROPOUT_RATE):
        super().__init__()
        
        # Convolutional layers
        # Each block doubles channels and halves spatial dimensions
        self.conv1 = ConvBlock(3, 32)      # 224 → 112
        self.conv2 = ConvBlock(32, 64)     # 112 → 56
        self.conv3 = ConvBlock(64, 128)    # 56 → 28
        self.conv4 = ConvBlock(128, 256)   # 28 → 14
        
        # Global Average Pooling
        # Takes any spatial size and outputs 1x1
        self.global_pool = nn.AdaptiveAvgPool2d((1, 1))
        
        # Classifier head
        self.fc1 = nn.Linear(256, 128)
        self.dropout = nn.Dropout(dropout)
        self.fc2 = nn.Linear(128, num_classes)
    
    def forward(self, x):
        # Feature extraction
        x = self.conv1(x)  # (B, 32, 112, 112)
        x = self.conv2(x)  # (B, 64, 56, 56)
        x = self.conv3(x)  # (B, 128, 28, 28)
        x = self.conv4(x)  # (B, 256, 14, 14)
        
        # Global pooling and flatten
        x = self.global_pool(x)  # (B, 256, 1, 1)
        x = x.view(x.size(0), -1)  # (B, 256)
        
        # Classification
        x = F.relu(self.fc1(x))  # (B, 128)
        x = self.dropout(x)
        x = self.fc2(x)  # (B, num_classes)
        
        return x
    
    def get_feature_maps(self, x):
        """Get feature maps from last conv layer (for Grad-CAM)."""
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        return x

print("CustomCNN defined!")

In [None]:
# Create model and move to device
model = CustomCNN(num_classes=NUM_CLASSES, dropout=DROPOUT_RATE)
model = model.to(device)

# Count parameters
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

print("=" * 60)
print("MODEL SUMMARY")
print("=" * 60)
print(f"\nTotal parameters: {total_params:,}")
print(f"Trainable parameters: {trainable_params:,}")

# Test forward pass
test_input = torch.randn(1, 3, IMG_SIZE, IMG_SIZE).to(device)
test_output = model(test_input)
print(f"\nInput shape: {test_input.shape}")
print(f"Output shape: {test_output.shape}")

In [None]:
# Visualize the architecture
print("\nModel Architecture:")
print(model)

## Step 5: Training Setup

We need:
1. **Loss function**: CrossEntropyLoss for multi-class classification
2. **Optimizer**: Adam (adaptive learning rate)
3. **Learning rate scheduler**: Reduce LR when validation loss plateaus
4. **Early stopping**: Stop if no improvement for N epochs

In [None]:
# Loss function
# CrossEntropyLoss combines LogSoftmax + NLLLoss
# It expects raw logits (not probabilities)
criterion = nn.CrossEntropyLoss()

# Optimizer
# Adam: Adaptive Moment Estimation
# Combines momentum + adaptive learning rates per parameter
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

# Learning rate scheduler
# Reduces LR by factor of 0.5 when validation loss doesn't improve for 2 epochs
scheduler = optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, 
    mode='min',      # Minimize validation loss
    factor=0.5,      # Reduce LR by half
    patience=2,      # Wait 2 epochs before reducing
    verbose=True
)

print("Training components:")
print(f"  Loss function: CrossEntropyLoss")
print(f"  Optimizer: Adam (lr={LEARNING_RATE})")
print(f"  Scheduler: ReduceLROnPlateau")

In [None]:
class EarlyStopping:
    """
    Early stopping to prevent overfitting.
    
    How it works:
    - Track the best validation loss
    - If no improvement for 'patience' epochs, stop training
    - Returns True when model improved (should save checkpoint)
    """
    
    def __init__(self, patience=PATIENCE, min_delta=0.001):
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.best_loss = float('inf')
        self.should_stop = False
    
    def __call__(self, val_loss):
        """
        Check if training should stop.
        
        Returns:
            True if this is the best model so far
        """
        if val_loss < self.best_loss - self.min_delta:
            # Improvement! Reset counter
            self.best_loss = val_loss
            self.counter = 0
            return True  # Best model, should save
        else:
            # No improvement
            self.counter += 1
            if self.counter >= self.patience:
                self.should_stop = True
            return False

print("EarlyStopping defined!")

## Step 6: Training Loop

The training loop:
1. For each epoch:
   - **Train**: Forward pass, compute loss, backward pass, update weights
   - **Validate**: Forward pass only, compute loss (no weight updates)
   - Check early stopping
   - Save best model

In [None]:
def train_one_epoch(model, train_loader, criterion, optimizer, device):
    """
    Train the model for one epoch.
    
    Returns:
        Average loss and accuracy for the epoch
    """
    model.train()  # Set to training mode (enables dropout, batchnorm updates)
    
    running_loss = 0.0
    correct = 0
    total = 0
    
    pbar = tqdm(train_loader, desc="Training", leave=False)
    
    for images, labels in pbar:
        # Move data to device
        images = images.to(device)
        labels = labels.to(device)
        
        # Zero gradients from previous step
        optimizer.zero_grad()
        
        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)
        
        # Backward pass
        loss.backward()
        
        # Update weights
        optimizer.step()
        
        # Track metrics
        running_loss += loss.item() * images.size(0)
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()
        
        # Update progress bar
        pbar.set_postfix({
            'loss': f'{loss.item():.4f}',
            'acc': f'{100.0 * correct / total:.2f}%'
        })
    
    avg_loss = running_loss / total
    accuracy = correct / total
    
    return avg_loss, accuracy


def validate(model, val_loader, criterion, device):
    """
    Validate the model.
    
    Returns:
        Average loss and accuracy
    """
    model.eval()  # Set to evaluation mode (disables dropout)
    
    running_loss = 0.0
    correct = 0
    total = 0
    
    with torch.no_grad():  # No gradient computation needed
        for images, labels in val_loader:
            images = images.to(device)
            labels = labels.to(device)
            
            outputs = model(images)
            loss = criterion(outputs, labels)
            
            running_loss += loss.item() * images.size(0)
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()
    
    avg_loss = running_loss / total
    accuracy = correct / total
    
    return avg_loss, accuracy

print("Training functions defined!")

In [None]:
# =============================================================================
# MAIN TRAINING LOOP
# =============================================================================

# Initialize tracking
history = {
    'train_loss': [],
    'train_acc': [],
    'val_loss': [],
    'val_acc': [],
    'lr': []
}

early_stopping = EarlyStopping(patience=PATIENCE)
best_model_state = None
best_epoch = 0

print("=" * 60)
print("TRAINING CUSTOM CNN")
print("=" * 60)
print(f"\nStarting training for {NUM_EPOCHS} epochs...")
print("-" * 60)

start_time = time.time()

for epoch in range(NUM_EPOCHS):
    print(f"\nEpoch {epoch + 1}/{NUM_EPOCHS}")
    
    # Train
    train_loss, train_acc = train_one_epoch(
        model, train_loader, criterion, optimizer, device
    )
    
    # Validate
    val_loss, val_acc = validate(model, val_loader, criterion, device)
    
    # Update learning rate
    scheduler.step(val_loss)
    current_lr = optimizer.param_groups[0]['lr']
    
    # Log metrics
    history['train_loss'].append(train_loss)
    history['train_acc'].append(train_acc)
    history['val_loss'].append(val_loss)
    history['val_acc'].append(val_acc)
    history['lr'].append(current_lr)
    
    print(f"  Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.4f}")
    print(f"  Val Loss:   {val_loss:.4f} | Val Acc:   {val_acc:.4f}")
    
    # Check early stopping
    is_best = early_stopping(val_loss)
    if is_best:
        print(f"  ✓ New best model!")
        best_model_state = model.state_dict().copy()
        best_epoch = epoch
    
    if early_stopping.should_stop:
        print(f"\nEarly stopping triggered after {epoch + 1} epochs")
        break

training_time = time.time() - start_time
print(f"\nTraining completed in {training_time/60:.1f} minutes")

In [None]:
# Load best model
model.load_state_dict(best_model_state)
print(f"Loaded best model from epoch {best_epoch + 1}")

## Step 7: Visualize Training History

In [None]:
# Plot training history
fig, axes = plt.subplots(1, 3, figsize=(15, 4))

epochs = range(1, len(history['train_loss']) + 1)

# Loss
axes[0].plot(epochs, history['train_loss'], 'b-', label='Training')
axes[0].plot(epochs, history['val_loss'], 'r-', label='Validation')
axes[0].axvline(best_epoch + 1, color='green', linestyle='--', label=f'Best (epoch {best_epoch + 1})')
axes[0].set_xlabel('Epoch')
axes[0].set_ylabel('Loss')
axes[0].set_title('Loss Curve')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# Accuracy
axes[1].plot(epochs, history['train_acc'], 'b-', label='Training')
axes[1].plot(epochs, history['val_acc'], 'r-', label='Validation')
axes[1].axvline(best_epoch + 1, color='green', linestyle='--', label=f'Best (epoch {best_epoch + 1})')
axes[1].set_xlabel('Epoch')
axes[1].set_ylabel('Accuracy')
axes[1].set_title('Accuracy Curve')
axes[1].legend()
axes[1].grid(True, alpha=0.3)

# Learning rate
axes[2].plot(epochs, history['lr'], 'g-')
axes[2].set_xlabel('Epoch')
axes[2].set_ylabel('Learning Rate')
axes[2].set_title('Learning Rate Schedule')
axes[2].grid(True, alpha=0.3)

plt.suptitle('Custom CNN Training History', fontsize=14, y=1.02)
plt.tight_layout()
plt.show()

## Step 8: Evaluate on Test Set

In [None]:
# Final evaluation
test_loss, test_acc = validate(model, test_loader, criterion, device)

print("=" * 60)
print("FINAL RESULTS (Custom CNN)")
print("=" * 60)
print(f"\nTest Loss: {test_loss:.4f}")
print(f"Test Accuracy: {test_acc:.4f} ({test_acc*100:.2f}%)")

In [None]:
# Get all predictions
model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for images, labels in test_loader:
        images = images.to(device)
        outputs = model(images)
        _, preds = outputs.max(1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.numpy())

all_preds = np.array(all_preds)
all_labels = np.array(all_labels)

# Classification report
print("\nClassification Report:")
print(classification_report(all_labels, all_preds, target_names=EMOTION_CLASSES))

In [None]:
# Confusion matrix
cm = confusion_matrix(all_labels, all_preds)
cm_normalized = cm.astype('float') / cm.sum(axis=1, keepdims=True)

fig, axes = plt.subplots(1, 2, figsize=(14, 5))

sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=EMOTION_CLASSES, yticklabels=EMOTION_CLASSES,
            ax=axes[0])
axes[0].set_xlabel('Predicted')
axes[0].set_ylabel('True')
axes[0].set_title('Confusion Matrix (Counts)')

sns.heatmap(cm_normalized, annot=True, fmt='.1%', cmap='Blues',
            xticklabels=EMOTION_CLASSES, yticklabels=EMOTION_CLASSES,
            ax=axes[1])
axes[1].set_xlabel('Predicted')
axes[1].set_ylabel('True')
axes[1].set_title('Confusion Matrix (Normalized)')

plt.suptitle('Custom CNN Results', fontsize=14, y=1.02)
plt.tight_layout()
plt.show()

## Step 9: Save Model and Results

In [None]:
# Save model
torch.save({
    'epoch': best_epoch,
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'val_loss': early_stopping.best_loss,
    'test_acc': test_acc,
}, 'custom_cnn_best.pth')

print("Model saved to custom_cnn_best.pth")

# Save history
with open('custom_cnn_history.json', 'w') as f:
    json.dump(history, f, indent=2)

print("Training history saved to custom_cnn_history.json")

# Save results for comparison
cnn_results = {
    'model_name': 'Custom CNN',
    'test_accuracy': test_acc,
    'test_loss': test_loss,
    'best_epoch': best_epoch + 1,
    'training_time_minutes': training_time / 60,
    'confusion_matrix': cm.tolist()
}

with open('custom_cnn_results.pkl', 'wb') as f:
    pickle.dump(cnn_results, f)

print("Results saved for comparison!")

## Summary

### What We Built:
- **Custom CNN** with 4 convolutional blocks
- **Global Average Pooling** for efficiency
- **Dropout** for regularization

### Key Techniques:
- **Data augmentation** to prevent overfitting
- **Early stopping** to find optimal training duration
- **Learning rate scheduling** for better convergence

### Results Comparison:

| Model | Test Accuracy |
|-------|---------------|
| HOG + SVM (Baseline) | ~XX% |
| Custom CNN | ~XX% |

### Next Steps:
- **Notebook 4**: Use transfer learning for even better results
- **Notebook 5**: Compare all models
- **Notebook 6**: Visualize with Grad-CAM