In [None]:
1. Configuration

In [None]:
"""
Complete Training Script for Facial Emotion Recognition using ResNet-18
7 emotions: angry, disgust, fear, happy, sad, surprise, neutral
"""

# ============================================================================
# CONFIGURATION
# ============================================================================

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from PIL import Image
from pathlib import Path
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import os
from tqdm import tqdm
from torchvision.models import resnet18, ResNet18_Weights   #added

CLASS_NAMES = ['angry', 'disgust', 'fear', 'happy', 'neutral', 'sad', 'surprise']
CLASS_TO_IDX = {c: i for i, c in enumerate(CLASS_NAMES)}

# Paths - MODIFY THESE
TRAIN_DIR = Path(r"C:\Users\seanr\fer2013Prototype\data\fer2013_folders\train")  # Directory with 7 emotion folders
TEST_DIR = Path(r"C:\Users\seanr\fer2013Prototype\data\fer2013_folders\test")     # Directory with 7 emotion folders

#Ensure paths exist
assert TRAIN_DIR.exists(), f"TRAIN_DIR not found: {TRAIN_DIR}"
assert TEST_DIR.exists(), f"TEST_DIR not found: {TEST_DIR}"


# Hyperparameters
BATCH_SIZE = 32
NUM_EPOCHS = 50
LEARNING_RATE = 0.001
VAL_SPLIT = 0.2
NUM_WORKERS = 4            #set to 0 if you get issues on Windows
PATIENCE = 7  # For early stopping
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
# ============================================================================
# DATASET CLASS
# ============================================================================

class EmotionDataset(Dataset):
    """Dataset for emotion images organized in folders by class"""
    
    def __init__(self, image_paths, labels, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform
    
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        label = self.labels[idx]
        
        img = Image.open(img_path).convert('RGB')
        
        if self.transform:
            img = self.transform(img)
        
        return img, label


In [None]:
# ============================================================================
# DATA LOADING FUNCTIONS
# ============================================================================

def load_emotion_data(data_dir, val_split=0.2, random_state=42):
    """Load images from folder structure and create train/val split"""
    data_dir = Path(data_dir)
    all_paths = []
    all_labels = []
    
    for emotion_name in CLASS_NAMES:
        emotion_dir = data_dir / emotion_name
        
        if not emotion_dir.exists():
            print(f"Warning: Folder '{emotion_name}' not found in {data_dir}")
            continue
        
        image_files = []
        for ext in ['*.jpg', '*.jpeg', '*.png', '*.bmp', '*.gif']:
            image_files.extend(list(emotion_dir.glob(ext)))
        
        label_idx = CLASS_TO_IDX[emotion_name]
        
        for img_path in image_files:
            all_paths.append(str(img_path))
            all_labels.append(label_idx)
        
        print(f"Found {len(image_files)} images for '{emotion_name}'")
    
    print(f"Total images: {len(all_paths)}")
    
    if val_split > 0:
        train_paths, val_paths, train_labels, val_labels = train_test_split(
            all_paths, all_labels,
            test_size=val_split,
            random_state=random_state,
            stratify=all_labels
        )
        print(f"Train images: {len(train_paths)}")
        print(f"Validation images: {len(val_paths)}")
        return train_paths, train_labels, val_paths, val_labels
    else:
        return all_paths, all_labels, [], []


def get_data_loaders(train_dir, test_dir, batch_size=32, val_split=0.2, num_workers=4):
    """Create train, validation, and test data loaders"""
    
    # Data augmentation for training
    IMG_SIZE = 224
    train_transform = transforms.Compose([
        transforms.Resize((IMG_SIZE, IMG_SIZE)),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(10),
        transforms.ColorJitter(brightness=0.2, contrast=0.2),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    
    # No augmentation for validation/test
    val_test_transform = transforms.Compose([
        transforms.Resize((48, 48)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
    ])

    #Load training data (includes validataion data)
    print("=" * 60)
    print("LOADING TRAINING DATA")
    print("=" * 60)
    train_paths, train_labels, val_paths, val_labels = load_emotion_data(            #no random state?
        train_dir, val_split=val_split
    )

    #Load test data
    print("\n" + "=" * 60)
    print("LOADING TEST DATA")
    print("=" * 60)
    test_paths, test_labels, _, _ = load_emotion_data(test_dir, val_split=0.0)
    
    # Create datasets
    train_dataset = EmotionDataset(train_paths, train_labels, transform=train_transform)
    val_dataset = EmotionDataset(val_paths, val_labels, transform=val_test_transform)
    test_dataset = EmotionDataset(test_paths, test_labels, transform=val_test_transform)
    
    # Create data loaders
    train_loader = DataLoader(                                   #DataLoader function defined in PyTorch. Returns images, labels in n=batchsize batches
        train_dataset, batch_size=batch_size, shuffle=True,
        num_workers=num_workers, pin_memory=True
    )
    val_loader = DataLoader(
        val_dataset, batch_size=batch_size, shuffle=False,
        num_workers=num_workers, pin_memory=True
    )
    test_loader = DataLoader(
        test_dataset, batch_size=batch_size, shuffle=False,
        num_workers=num_workers, pin_memory=True
    )
    
    return train_loader, val_loader, test_loader


In [None]:
# ============================================================================
# MODEL CREATION
# ============================================================================

def create_resnet18_model(num_classes=7, pretrained=True):
    """Create ResNet-18 for emotion classification"""
    
    # Load pretrained weights or None
    weights = ResNet18_Weights.IMAGENET1K_V1 if pretrained else None
    model = resnet18(weights=weights)
    
    # Replace the classifier head with dropout + linear layer
    model.fc = nn.Sequential(
        nn.Dropout(0.3),
        nn.Linear(model.fc.in_features, num_classes)
    )

    # Set up for unfreezing only the final classification layer (Fast, change if need better performance)
    # Freeze all backbone layers
    for param in model.parameters():
        param.requires_grad = False   # <--- typo fixed
    
    # Unfreeze ONLY the final classification layer
    model.fc[1].weight.requires_grad = True
    model.fc[1].bias.requires_grad = True
    
    return model


In [None]:
# ============================================================================
# TRAINING FUNCTIONS
# ============================================================================

def train_one_epoch(model, train_loader, criterion, optimizer, device):     #criterion = the loss function (e.g. nn.CrossEntropyLoss)
    """Train for one epoch"""                                               #optimizer updates the model's parameters (torch.optim.Adam or SGD)
    model.train()
    
    running_loss = 0.0
    correct = 0
    total = 0
    num_batches = 0
    
    pbar = tqdm(train_loader, desc='Training')         #pbar = progress bar -- for visualization
    
    for images, labels in pbar:
        images, labels = images.to(device), labels.to(device)    #Moves tensors to GPU or CPU
        
        optimizer.zero_grad()                          #Before computing gradients for this batch, we reset previous gradients to zero.
        
        outputs = model(images)                         #feeds batch of images throught the network. Outputs tensor of shape (batch_size, num_classes),
                                                        #   each row containing raw logits (unnormalized scores) for each class
        loss = criterion(outputs, labels)               #Computes loss, returns scalar loss (average over the batch)
        loss.backward()                                 #Back propagation
        optimizer.step()                                #Uses the gradients to update the model's parameters (adjusts weights to reduce loss)
        
        # Update loss
        running_loss += loss.item()                    #loss.item() converts the PyTorch scalar tensor to a Python float.
        num_batches += 1
        
        # Update accuracy
        _, predicted = outputs.max(1)                   #predicted gets the index of the maximum (argmax), i.e., the predicted class ID.
        total += labels.size(0)                         #We increase total to track how many samples we've seen so far in the epoch.
        correct += predicted.eq(labels).sum().item()    #correct = total number of correctly classified samples
        
        pbar.set_postfix({
            'loss': running_loss / num_batches,
            'acc': 100. * correct / total
        })
    
    epoch_loss = running_loss / num_batches            #computes mean loss per batch
    epoch_acc = 100. * correct / total
    
    return epoch_loss, epoch_acc                       #Scalars: epoch_loss: average training loss for this epoch.
                                                       #         epoch_acc: average training accuracy (percent).


def validate(model, val_loader, criterion, device):
    """Validate the model"""
    model.eval()                                       #Puts model in evaluation mode (turns off Dropout and uses running means instaed of batch stats)
    running_loss = 0.0
    correct = 0
    total = 0
    
    with torch.no_grad():                                           #Pytorch does not compute gradients (optimizes performance & memory), avoids backdrop storage overhead
        for images, labels in tqdm(val_loader, desc='Validation'):
            images, labels = images.to(device), labels.to(device)
            
            outputs = model(images)
            loss = criterion(outputs, labels)
            
            running_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()
    
    epoch_loss = running_loss / len(val_loader)
    epoch_acc = 100. * correct / total
    return epoch_loss, epoch_acc


def train_model(model, train_loader, val_loader, criterion, optimizer, 
                scheduler, device, num_epochs=50, patience=7):
    """Main training loop with early stopping"""
    best_val_acc = 0.0
    patience_counter = 0
    train_losses = []
    val_losses = []
    train_accs = []
    val_accs = []
    
    print("\n" + "=" * 60)
    print("STARTING TRAINING")
    print("=" * 60)
    
    for epoch in range(num_epochs):
        print(f'\nEpoch {epoch+1}/{num_epochs}')
        print('-' * 60)
        
        # Train
        train_loss, train_acc = train_one_epoch(
            model, train_loader, criterion, optimizer, device
        )
        
        # Validate
        val_loss, val_acc = validate(model, val_loader, criterion, device)
        
        # Update learning rate
        scheduler.step(val_loss)                              
        current_lr = optimizer.param_groups[0]['lr']          #grabs current learning rate from optimizer to print it
        
        # Store metrics
        train_losses.append(train_loss)
        val_losses.append(val_loss)
        train_accs.append(train_acc)
        val_accs.append(val_acc)
        
        print(f'Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%')
        print(f'Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%')
        print(f'Learning Rate: {current_lr:.6f}')
        
        # Save best model
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save(model.state_dict(), 'best_model.pth')
            print(f'✓ New best model saved! (Val Acc: {val_acc:.2f}%)')
            patience_counter = 0
        else:
            patience_counter += 1
            print(f'No improvement. Patience: {patience_counter}/{patience}')
        
        # Early stopping
        if patience_counter >= patience:
            print(f'\n⚠ Early stopping triggered at epoch {epoch+1}')
            break
    
    # Plot training history
    plot_training_history(train_losses, val_losses, train_accs, val_accs)        # FIXME make sure plot_training_history is defined

    #If you want to return best model, and not just the latest:
    #model.load_state_dict(torch.load('best_model.pth', map_location=device))
    
    return model, best_val_acc                                                   #returns model:FIXME current model(wights from last epoch trained, 
                                                                                #   not necessarily the best. (OK?) and returns best validation acc encountered 


In [None]:
# ============================================================================
# VISUALIZATION FUNCTIONS
# ============================================================================

def plot_training_history(train_losses, val_losses, train_accs, val_accs):
    """Plot training history"""
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))
    
    # Loss plot
    ax1.plot(train_losses, label='Train Loss', marker='o')
    ax1.plot(val_losses, label='Val Loss', marker='s')
    ax1.set_xlabel('Epoch')
    ax1.set_ylabel('Loss')
    ax1.set_title('Training and Validation Loss')
    ax1.legend()
    ax1.grid(True)
    
    # Accuracy plot
    ax2.plot(train_accs, label='Train Acc', marker='o')
    ax2.plot(val_accs, label='Val Acc', marker='s')
    ax2.set_xlabel('Epoch')
    ax2.set_ylabel('Accuracy (%)')
    ax2.set_title('Training and Validation Accuracy')
    ax2.legend()
    ax2.grid(True)
    
    plt.tight_layout()
    plt.savefig('training_history.png', dpi=300, bbox_inches='tight')
    print("\n✓ Training history plot saved as 'training_history.png'")
    plt.close()


def plot_confusion_matrix(y_true, y_pred, class_names):
    """Plot confusion matrix"""
    cm = confusion_matrix(y_true, y_pred)
    
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=class_names, yticklabels=class_names,
                cbar_kws={'label': 'Count'})
    plt.title('Confusion Matrix', fontsize=16, pad=20)
    plt.ylabel('True Label', fontsize=12)
    plt.xlabel('Predicted Label', fontsize=12)
    plt.tight_layout()
    plt.savefig('confusion_matrix.png', dpi=300, bbox_inches='tight')
    print("✓ Confusion matrix saved as 'confusion_matrix.png'")
    plt.close()


In [None]:
# ============================================================================
# Model Evaluation
# ============================================================================

def evaluate_model(model, test_loader, device, class_names):
    """Evaluate model on test set and plot confusion matrix."""
    
    model.eval()
    all_preds = []
    all_labels = []
    
    correct = 0
    total = 0
    
    with torch.no_grad():
        for images, labels in tqdm(test_loader, desc="Testing"):
            images, labels = images.to(device), labels.to(device)
            
            outputs = model(images)
            _, predicted = outputs.max(1)
            
            # Accumulate predictions and labels
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            
            # Accuracy update
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()
    
    test_acc = 100.0 * correct / total
    print(f"\n✓ Test Accuracy: {test_acc:.2f}%")
    
    # Plot confusion matrix
    plot_confusion_matrix(all_labels, all_preds, class_names)
    
    return test_acc


In [None]:
# ============================================================================
# MAIN EXECUTION
# ============================================================================

def main():
    print("\n" + "=" * 60)
    print("FACIAL EMOTION RECOGNITION - ResNet18")
    print("=" * 60)
    print(f"Device: {DEVICE}")
    print(f"Batch Size: {BATCH_SIZE}")
    print(f"Learning Rate: {LEARNING_RATE}")
    print(f"Number of Epochs: {NUM_EPOCHS}")
    print(f"Validation Split: {VAL_SPLIT}")
    print("=" * 60)
    
    # Load data
    train_loader, val_loader, test_loader = get_data_loaders(
        train_dir=TRAIN_DIR,
        test_dir=TEST_DIR,
        batch_size=BATCH_SIZE,
        val_split=VAL_SPLIT,
        num_workers=NUM_WORKERS
    )
    
    print(f"\nData loaders ready:")
    print(f"  Train batches: {len(train_loader)}")
    print(f"  Val batches: {len(val_loader)}")
    print(f"  Test batches: {len(test_loader)}")
    
    # Create model
    print("\n" + "=" * 60)
    print("INITIALIZING MODEL")
    print("=" * 60)
    model = create_resnet18_model(num_classes=len(CLASS_NAMES), pretrained=True)
    model = model.to(DEVICE)
    print("✓ ResNet-18 model created (pretrained on ImageNet)")
    print(f"✓ Final layer modified for {len(CLASS_NAMES)} classes")
    
    # Loss and optimizer
    criterion = nn.CrossEntropyLoss()
    
    #optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
    #Because we froze the backbone of the model and unfroze only the classification head, use the following optimizer instead:
    optimizer = optim.Adam(
        filter(lambda p: p.requires_grad, model.parameters()),
        lr = LEARNING_RATE
    )
    
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, mode='min', patience=3, factor=0.5, verbose=True
    )
    
    # Train model
    model, best_val_acc = train_model(
        model, train_loader, val_loader, criterion, 
        optimizer, scheduler, DEVICE, 
        num_epochs=NUM_EPOCHS, patience=PATIENCE
    )
    
    # Load best model for testing
    print("\n" + "=" * 60)
    print("LOADING BEST MODEL FOR TESTING")
    print("=" * 60)
    state_dict = torch.load('best_model.pth', map_location=DEVICE)        #ensures always load to correct device
    model.load_state_dict(state_dict)
    model.to(DEVICE)                                                      #not necessary, ensures model is loaded to proper device
    print(f"✓ Loaded best model (Val Acc: {best_val_acc:.2f}%)")
    
    # Evaluate on test set
    test_acc = evaluate_model(model, test_loader, DEVICE, CLASS_NAMES)
    
    # Final summary
    print("\n" + "=" * 60)
    print("TRAINING COMPLETE - SUMMARY")
    print("=" * 60)
    print(f"Best Validation Accuracy: {best_val_acc:.2f}%")
    print(f"Test Accuracy: {test_acc:.2f}%")
    print(f"Model saved as: best_model.pth")
    print("=" * 60)


if __name__ == "__main__":
    main()