<a href="https://colab.research.google.com/github/pushkar-hue/Emotion_detection_using_ConvNeXt/blob/main/fer_using_ConvNeXt.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("msambare/fer2013")

print("Path to dataset files:", path)

In [None]:
cd /kaggle/input/fer2013/

In [None]:
ls

In [None]:
import os

def walk_through(dir_path):
  for dirpath, dirnames, filenames in os.walk(dir_path):
    print(f"There are {len(dirnames)} directories and {len(filenames)} images in '{dirpath}'.")

In [None]:
walk_through(path)

In [None]:
import numpy as np
import torch
import torch.nn as nn

In [None]:
train_dir = '/kaggle/input/fer2013/train'
test_dir = '/kaggle/input/fer2013/test'


In [None]:
import os
from torchvision import transforms, datasets
from torch.utils.data import DataLoader


In [None]:
# For training with augmentation
train_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(15),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406],
                         [0.229, 0.224, 0.225])
])

# For testing (no augmentation)
test_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406],
                         [0.229, 0.224, 0.225])
])


In [None]:
train_dataset = datasets.ImageFolder(root=train_dir, transform=train_transforms)
test_dataset = datasets.ImageFolder(root=test_dir, transform=test_transforms)


In [None]:
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=2)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=2)


In [None]:
class_names = train_dataset.classes
print("Classes:", class_names)


In [None]:
class_name = train_dataset.classes

class_name

In [None]:
class_idx = train_dataset.class_to_idx

class_idx

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
import torchvision

weights = torchvision.models.EfficientNet_B3_Weights.DEFAULT
model = torchvision.models.efficientnet_b3(weights=weights)

model = model.to(device)

In [None]:
# Unfreeze more layers
for layer in list(model.features)[-5:]: # Try -8 or -10 instead of -5
    for param in layer.parameters():
        param.requires_grad = True

In [None]:
torch.manual_seed(42)
torch.cuda.manual_seed(42)


output_shape = len(train_dataset.classes)

model.classifier = torch.nn.Sequential(
    torch.nn.Dropout(p=0.3, inplace=True),
    torch.nn.Linear(in_features=1536,
                    out_features=output_shape,
                    bias=True)).to(device)

In [None]:
import torch.optim as optim

criterion=nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.1)

In [None]:
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode='min', factor=0.5, patience=2
)

In [None]:
import torch

from tqdm.auto import tqdm
from typing import Dict, List, Tuple
def train_step(model: torch.nn.Module,
               dataloader: torch.utils.data.DataLoader,
               loss_fn: torch.nn.Module,
               optimizer: torch.optim.Optimizer,
               device: torch.device,
               scheduler: torch.optim.lr_scheduler._LRScheduler = None) -> Tuple[float, float]:

    model.train()

    train_loss, train_acc = 0.0, 0.0
    total_samples = 0

    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        y_pred = model(X)

        loss = loss_fn(y_pred, y)
        train_loss += loss.item() * X.size(0)  # Scale loss by batch size

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if scheduler:
            scheduler.step(loss)  # Step the scheduler if provided

        current_lr = optimizer.param_groups[0]['lr']
        print(f"LR: {current_lr:.6f}")
        y_pred_class = torch.argmax(torch.softmax(y_pred, dim=1), dim=1)
        train_acc += (y_pred_class == y).sum().item()
        total_samples += y.size(0)

    train_loss /= total_samples  # Normalize by total dataset size
    train_acc /= total_samples  # Normalize accuracy

    return train_loss, train_acc

In [None]:
def test_step(model: torch.nn.Module,
              dataloader: torch.utils.data.DataLoader,
              loss_fn: torch.nn.Module,
              device: torch.device) -> Tuple[float, float]:


    model.eval()


    test_loss, test_acc = 0, 0


    with torch.inference_mode():

        for batch, (X, y) in enumerate(dataloader):

            X, y = X.to(device), y.to(device)



            test_pred_logits = model(X)


            loss = loss_fn(test_pred_logits, y)
            test_loss += loss.item()

            test_pred_labels = test_pred_logits.argmax(dim=1)
            test_acc += ((test_pred_labels == y).sum().item()/len(test_pred_labels))


    test_loss = test_loss / len(dataloader)
    test_acc = test_acc / len(dataloader)
    return test_loss, test_acc

In [None]:
def train_and_validate(model,
                       train_loader,
                       val_loader,
                       criterion,
                       optimizer,
                       device,
                       num_epochs=50,
                       patience=5,
                       checkpoint_dir='./checkpoints',
                       scheduler=None): # Add scheduler argument
    """
    Train and validate the model with early stopping and checkpointing

    Args:
    - model: PyTorch model
    - train_loader: DataLoader for training data
    - val_loader: DataLoader for validation data
    - criterion: Loss function
    - optimizer: Optimizer
    - device: Computing device (cuda/cpu)
    - num_epochs: Maximum number of training epochs
    - patience: Number of epochs with no improvement after which training will be stopped
    - checkpoint_dir: Directory to save model checkpoints
    - scheduler: (Optional) Learning rate scheduler
    """
    # Create checkpoint directory if it doesn't exist
    os.makedirs(checkpoint_dir, exist_ok=True)

    # Training history tracking
    history = {
        'train_loss': [],
        'train_acc': [],
        'val_loss': [],
        'val_acc': []
    }

    # Early stopping variables
    best_val_loss = float('inf')
    epochs_no_improve = 0

    # Training loop
    for epoch in range(num_epochs):
        # Training phase
        model.train()
        train_loss, train_acc = 0, 0

        train_progress_bar = tqdm(train_loader,
                                  desc=f'Epoch {epoch+1}/{num_epochs}',
                                  unit='batch')

        for batch, (X, y) in enumerate(train_progress_bar):
            X, y = X.to(device), y.to(device)

            # Zero the parameter gradients
            optimizer.zero_grad()

            # Forward pass
            outputs = model(X)
            loss = criterion(outputs, y)

            # Backward pass and optimize
            loss.backward()
            optimizer.step()

            # Compute metrics
            train_loss += loss.item()
            train_pred = torch.argmax(torch.softmax(outputs, dim=1), dim=1)
            train_acc += (train_pred == y).float().mean().item()

            # Update progress bar
            train_progress_bar.set_postfix({
                'Train Loss': loss.item(),
                'Train Acc': train_acc / (batch + 1)
            })

        # Average epoch metrics
        train_loss /= len(train_loader)
        train_acc /= len(train_loader)

        # Validation phase
        model.eval()
        val_loss, val_acc = 0, 0

        with torch.inference_mode():
            for X, y in val_loader:
                X, y = X.to(device), y.to(device)

                outputs = model(X)
                loss = criterion(outputs, y)

                val_loss += loss.item()
                val_pred = torch.argmax(torch.softmax(outputs, dim=1), dim=1)
                val_acc += (val_pred == y).float().mean().item()

        # Average validation metrics
        val_loss /= len(val_loader)
        val_acc /= len(val_loader)

        # Store history
        history['train_loss'].append(train_loss)
        history['train_acc'].append(train_acc)
        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_acc)

        # Print epoch summary
        print(f'Epoch {epoch+1}/{num_epochs}:')
        print(f'Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}')
        print(f'Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}')

        # Step the scheduler with the validation loss
        if scheduler:
            scheduler.step(val_loss)
            print(f"Current LR: {optimizer.param_groups[0]['lr']:.6f}")

        # Early stopping and model checkpointing
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            epochs_no_improve = 0

            # Save best model
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'train_loss': train_loss,
                'val_loss': val_loss
            }, os.path.join(checkpoint_dir, 'best_model.pth'))
        else:
            epochs_no_improve += 1

        # Early stopping
        if epochs_no_improve >= patience:
            print(f'Early stopping triggered after {epoch+1} epochs')
            break

    return history

In [None]:
history = train_and_validate(
        model=model,
        train_loader=train_loader,
        val_loader=test_loader,
        criterion=criterion,
        optimizer=optimizer,
        device=device,
        num_epochs=6,
        patience=3,
        checkpoint_dir='/kaggle/working/checkpoints'
    )

In [None]:
import shutil

# 1. Define paths to original directories
original_train_dir = '/kaggle/input/fer2013/train'
original_test_dir = '/kaggle/input/fer2013/test'

# 2. Create new directories
filtered_train_dir = '/kaggle/working/train_filtered'
filtered_test_dir = '/kaggle/working/test_filtered'
os.makedirs(filtered_train_dir, exist_ok=True)
os.makedirs(filtered_test_dir, exist_ok=True)

# 3. Define classes to keep
classes_to_keep = ['angry', 'fear', 'happy', 'neutral', 'sad', 'surprise']

# 4. Write a function to copy filtered data
def filter_and_copy_data(original_dir, new_dir, classes):
    for class_name in classes:
        # Create class subdirectory in the new directory
        new_class_dir = os.path.join(new_dir, class_name)
        os.makedirs(new_class_dir, exist_ok=True)

        # Path to the original class subdirectory
        original_class_dir = os.path.join(original_dir, class_name)

        # Copy all images from the original to the new directory
        for filename in os.listdir(original_class_dir):
            shutil.copy(os.path.join(original_class_dir, filename), new_class_dir)

# 5. Call the function for both training and testing datasets
filter_and_copy_data(original_train_dir, filtered_train_dir, classes_to_keep)
filter_and_copy_data(original_test_dir, filtered_test_dir, classes_to_keep)

print("Filtered datasets created successfully.")
walk_through(filtered_train_dir)
walk_through(filtered_test_dir)

In [None]:
# 1. Create a new ImageFolder dataset for the filtered training data
filtered_train_dir = '/kaggle/working/train_filtered'
filtered_train_dataset = datasets.ImageFolder(root=filtered_train_dir, transform=train_transforms)

# 2. Extract the class names and class-to-index mapping
filtered_class_names = filtered_train_dataset.classes
filtered_class_idx = filtered_train_dataset.class_to_idx

print("Filtered Classes:", filtered_class_names)
print("Filtered Class Index Mapping:", filtered_class_idx)

# 3. Update the output_shape variable
output_shape = len(filtered_class_names)
print("Updated output shape:", output_shape)

In [None]:
print("\nContinuing training with a lower learning rate...")
continued_optimizer = optim.Adam([
    {'params': model_b0.features.parameters(), 'lr': 2e-6},  # Was 1e-5
    {'params': model_b0.classifier.parameters(), 'lr': 2e-5}   # Was 1e-4
], weight_decay=1e-4)

# 3. Set up a new scheduler for the new optimizer
continued_scheduler = ReduceLROnPlateau(continued_optimizer, 'min', patience=2, verbose=True)

# 4. Train for 5 more epochs
history_continued = train_and_validate(
    model=model_b0,
    train_loader=train_loader_b0,
    val_loader=test_loader_b0,
    criterion=criterion,
    optimizer=continued_optimizer,
    scheduler=continued_scheduler,
    device=device,
    num_epochs=5,
    patience=3,  # Lower patience as we expect smaller improvements

    checkpoint_dir='/kaggle/working/checkpoints_b0_continued' # New checkpoint directory
)

### **1. Load EfficientNet-B1 Model**

First, we'll load the `efficientnet_b1` model with its default pretrained weights from ImageNet.

In [None]:
filtered_train_dir = '/kaggle/working/train_filtered'
filtered_test_dir = '/kaggle/working/test_filtered'


In [None]:
from torchvision import transforms
weights = ConvNeXt_Tiny_Weights.DEFAULT
transform = weights.transforms()


train_transform = transforms.Compose([
    transforms.Resize(256),  # Resize first
    transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),  # Zoomed-in crops
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(degrees=15),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],  # ConvNeXt pretrained on ImageNet
                         std=[0.229, 0.224, 0.225])
])

val_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])


In [None]:
train_dataset = datasets.ImageFolder(root=filtered_train_dir, transform=train_transform)
test_dataset = datasets.ImageFolder(root=filtered_test_dir, transform=val_transform)

# Calculate class counts
class_counts = [0] * len(train_dataset.classes)
for _, label in train_dataset.samples:
    class_counts[label] += 1

# Weighted sampling
class_weights = 1. / torch.tensor(class_counts, dtype=torch.float)
sample_weights = class_weights[train_dataset.targets]
sampler = WeightedRandomSampler(weights=sample_weights, num_samples=len(sample_weights), replacement=True)

# DataLoaders
train_loader = DataLoader(train_dataset, batch_size=32, sampler=sampler, num_workers=2)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=2)


In [None]:
model = convnext_tiny(weights=weights)
model = model.to(device)

# Replace classifier (6 classes)
in_features = model.classifier[2].in_features
model.classifier[2] = nn.Linear(in_features, 6).to(device)


In [None]:
# Freeze all layers first
for param in model.parameters():
    param.requires_grad = False

# Unfreeze last two feature blocks
for layer in list(model.features.children())[-2:]:
    for param in layer.parameters():
        param.requires_grad = True

# Unfreeze classifier
for param in model.classifier.parameters():
    param.requires_grad = True


In [None]:
criterion = nn.CrossEntropyLoss(label_smoothing=0.1)

optimizer = optim.Adam([
    {'params': model.features.parameters(), 'lr': 1e-5},
    {'params': model.classifier.parameters(), 'lr': 1e-4}
], weight_decay=1e-4)

scheduler = ReduceLROnPlateau(optimizer, 'min', patience=2, verbose=True)


In [None]:
def train_and_validate(model, train_loader, val_loader, criterion, optimizer, scheduler, device, num_epochs=10, patience=5, checkpoint_dir=None):
    best_val_acc = 0
    patience_counter = 0
    for epoch in range(num_epochs):
        model.train()
        train_loss, train_correct = 0, 0

        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            train_loss += loss.item() * images.size(0)
            train_correct += (outputs.argmax(1) == labels).sum().item()

        val_loss, val_correct = 0, 0
        model.eval()
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                loss = criterion(outputs, labels)

                val_loss += loss.item() * images.size(0)
                val_correct += (outputs.argmax(1) == labels).sum().item()

        train_acc = train_correct / len(train_loader.dataset)
        val_acc = val_correct / len(val_loader.dataset)
        avg_train_loss = train_loss / len(train_loader.dataset)
        avg_val_loss = val_loss / len(val_loader.dataset)

        print(f"Epoch {epoch+1}: Train Acc: {train_acc:.4f}, Val Acc: {val_acc:.4f}, Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}")

        scheduler.step(avg_val_loss)

        # Save best model
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            patience_counter = 0
            if checkpoint_dir:
                os.makedirs(checkpoint_dir, exist_ok=True)
                torch.save(model.state_dict(), os.path.join(checkpoint_dir, 'best_model.pth'))
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print("Early stopping.")
                break


In [None]:
history = train_and_validate(
    model=model,
    train_loader=train_loader,
    val_loader=test_loader,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    device=device,
    num_epochs=20,
    patience=5,
    checkpoint_dir='/kaggle/working/convnext_tiny_checkpoint'
)


In [None]:
# Save the trained model weights
model_path = "/kaggle/working/convnext_fer2013.pth" # Save to the working directory
torch.save(model.state_dict(), model_path)

In [None]:
from google.colab import files
files.download(model_path)


In [None]:
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
import torch

# Load model
model.load_state_dict(torch.load(model_path, map_location=torch.device('cpu')))
model.eval()
model.to(device) # Move the model to the correct device

# Run inference on validation set
all_preds = []
all_labels = []

with torch.no_grad():
    for inputs, labels in test_loader:
        inputs = inputs.to(device) # Move inputs to the correct device
        labels = labels.to(device) # Move labels to the correct device
        outputs = model(inputs)
        _, preds = torch.max(outputs, 1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

# Classification report
print(classification_report(all_labels, all_preds, target_names=filtered_class_names))

# Confusion matrix
cm = confusion_matrix(all_labels, all_preds)
plt.figure(figsize=(10, 7))
sns.heatmap(cm, annot=True, fmt="d", xticklabels=filtered_class_names, yticklabels=filtered_class_names, cmap="Blues")
plt.xlabel("Predicted")
plt.ylabel("Actual")
plt.title("Confusion Matrix")
plt.show()

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
import numpy as np

cm = confusion_matrix(all_labels, all_preds)
cm_normalized = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

plt.figure(figsize=(10, 8))
sns.heatmap(cm_normalized, annot=True, fmt=".2f", cmap='Blues', xticklabels=class_names, yticklabels=class_names)
plt.title("Normalized Confusion Matrix")
plt.ylabel('Actual')
plt.xlabel('Predicted')
plt.show()
