# experiment 2

In [None]:
!pip install kaggle wandb onnx -Uq
from google.colab import drive
drive.mount('/content/drive')

In [None]:
! mkdir ~/.kaggle

In [3]:
!cp /content/drive/MyDrive/ColabNotebooks/kaggle_API_credentials/kaggle.json ~/.kaggle/kaggle.json

In [4]:
! chmod 600 ~/.kaggle/kaggle.json

In [None]:
!kaggle competitions download -c challenges-in-representation-learning-facial-expression-recognition-challenge

In [None]:
! unzip challenges-in-representation-learning-facial-expression-recognition-challenge.zip

In [7]:
!pip install wandb onnx -Uq

# data

In [8]:
import os
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from PIL import Image
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from tqdm.auto import tqdm

import wandb

In [None]:
torch.backends.cudnn.deterministic = True
random.seed(42)
np.random.seed(42)
torch.manual_seed(42)
torch.cuda.manual_seed_all(42)

# Device configuration
device = "cuda" if torch.cuda.is_available() else "cpu" # detect the GPU if any, if not use CPU, change cuda to mps if you have a mac
print("Device available: ", device)

In [None]:
wandb.login()

# training

In [11]:
sweep_config = {
    'method': 'random',
    'metric': {'name': 'val_accuracy', 'goal': 'maximize'},
    'parameters': {
        'learning_rate': {'values': [0.01, 0.005, 0.001, 0.0005]},
        'batch_size': {'values': [32, 64, 128]},
        'dropout_rate': {'values': [0.2, 0.3, 0.4]},
        'weight_decay': {'values': [1e-4, 1e-5, 1e-6]},
        'hidden_dim': {'values': [64, 128, 256]},
    }
}

In [None]:
sweep_id = wandb.sweep(sweep_config, project="Facial_Expression_Recognition_3")

In [13]:
original_train_df = pd.read_csv('train.csv')

# First split: Create a test set (10% of original data)
train_val_df, test_df = train_test_split(original_train_df, test_size=0.1,
                                         random_state=42, stratify=original_train_df['emotion'])

# Second split: Split remaining data into training (80%) and validation (20%)
train_df, val_df = train_test_split(train_val_df, test_size=0.2,
                                    random_state=42, stratify=train_val_df['emotion'])

# Define emotion labels
emotion_labels = {0: 'Angry', 1: 'Disgust', 2: 'Fear', 3: 'Happy',
                  4: 'Sad', 5: 'Surprise', 6: 'Neutral'}

In [None]:
print(f"Original data size: {len(original_train_df)}")
print(f"Training set size: {len(train_df)} ({len(train_df)/len(original_train_df)*100:.1f}%)")
print(f"Validation set size: {len(val_df)} ({len(val_df)/len(original_train_df)*100:.1f}%)")
print(f"Test set size: {len(test_df)} ({len(test_df)/len(original_train_df)*100:.1f}%)")

# Print class distribution in each set
print("\nEmotion distribution:")
for i, emotion in emotion_labels.items():
    train_count = sum(train_df['emotion'] == i)
    val_count = sum(val_df['emotion'] == i)
    test_count = sum(test_df['emotion'] == i)
    print(f"  {emotion}: Train={train_count}, Val={val_count}, Test={test_count}")


In [15]:
class FER2013Dataset(Dataset):
    def __init__(self, dataframe, transform=None):
        self.dataframe = dataframe
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        row = self.dataframe.iloc[idx]
        pixels = [int(pixel) for pixel in row['pixels'].split()]
        image = np.array(pixels, dtype=np.uint8).reshape(48, 48)

        # Convert to PIL Image for transforms
        image = Image.fromarray(image)

        if self.transform:
            image = self.transform(image)

        label = row['emotion']
        return image, label

In [16]:
train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

# Validation and test sets only need basic transformations
val_test_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

In [17]:
train_dataset = FER2013Dataset(train_df, transform=train_transform)
val_dataset = FER2013Dataset(val_df, transform=val_test_transform)
test_dataset = FER2013Dataset(test_df, transform=val_test_transform)

# Create a small subset for overfitting test (20 samples)
indices = list(range(20))
overfit_dataset = torch.utils.data.Subset(train_dataset, indices)

In [18]:
class ImprovedCNN(nn.Module):
    def __init__(self, dropout_rate=0.3, hidden_dim=128):
        super(ImprovedCNN, self).__init__()


        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.pool1 = nn.MaxPool2d(kernel_size=2)

        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.pool2 = nn.MaxPool2d(kernel_size=2)

        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.pool3 = nn.MaxPool2d(kernel_size=2)

        self.fc1 = nn.Linear(128 * 6 * 6, hidden_dim)
        self.bn4 = nn.BatchNorm1d(hidden_dim)
        self.dropout1 = nn.Dropout(dropout_rate)
        self.fc2 = nn.Linear(hidden_dim, 7)

    def forward(self, x):

        x = self.pool1(F.relu(self.bn1(self.conv1(x))))

        x = self.pool2(F.relu(self.bn2(self.conv2(x))))

        x = self.pool3(F.relu(self.bn3(self.conv3(x))))

        x = x.view(-1, 128 * 6 * 6)
        x = F.relu(self.bn4(self.fc1(x)))
        x = self.dropout1(x)
        x = self.fc2(x)

        return x


In [19]:
def create_dataloaders(batch_size):
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2, pin_memory=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=2, pin_memory=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=2, pin_memory=True)
    overfit_loader = DataLoader(overfit_dataset, batch_size=batch_size, shuffle=True, pin_memory=True)
    return train_loader, val_loader, test_loader, overfit_loader


In [20]:
def compute_accuracy(loader, model, device):
    model.eval()
    correct = 0
    total = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for images, labels in loader:
            images = images.to(device)
            labels = labels.to(device)

            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)

            total += labels.size(0)
            correct += (predicted == labels).sum().item()

            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    accuracy = 100 * correct / total
    return accuracy, all_preds, all_labels


# test for 20 data overfit

In [21]:
indices = list(range(20))
overfit_dataset = torch.utils.data.Subset(train_dataset, indices)
overfit_loader = DataLoader(overfit_dataset, batch_size=16, shuffle=True, pin_memory=True)


In [22]:
def test_overfitting():
    print("Running overfitting test to check model architecture...")

    # Initialize model for overfitting test
    model = ImprovedCNN(dropout_rate=0.3, hidden_dim=128).to(device)

    # Define loss function and optimizer for overfitting test
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)

    # Initialize wandb run for overfitting test
    with wandb.init(project="Facial_Expression_Recognition_3", name="overfitting_test"):
        overfit_losses = []
        overfit_accs = []

        epochs = 30
        for epoch in range(epochs):
            model.train()
            running_loss = 0.0
            correct = 0
            total = 0

            for images, labels in overfit_loader:
                images = images.to(device)
                labels = labels.to(device)

                # Forward pass
                optimizer.zero_grad()
                outputs = model(images)
                loss = criterion(outputs, labels)

                # Backward pass and optimize
                loss.backward()
                optimizer.step()

                running_loss += loss.item()

                # Calculate accuracy
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

            overfit_loss = running_loss / len(overfit_loader)
            overfit_acc = 100 * correct / total
            overfit_losses.append(overfit_loss)
            overfit_accs.append(overfit_acc)

            # Log to wandb
            wandb.log({
                "overfit_epoch": epoch + 1,
                "overfit_loss": overfit_loss,
                "overfit_accuracy": overfit_acc
            })

            print(f"Overfit Epoch {epoch+1}/{epochs}, Loss: {overfit_loss:.4f}, Acc: {overfit_acc:.2f}%")

        # Plot the overfitting test results
        plt.figure(figsize=(12, 5))
        plt.subplot(1, 2, 1)
        plt.plot(overfit_losses)
        plt.title('Overfitting Test - Loss')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')

        plt.subplot(1, 2, 2)
        plt.plot(overfit_accs)
        plt.title('Overfitting Test - Accuracy')
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy (%)')

        plt.tight_layout()
        plt.savefig('overfit_test.png')
        plt.show()

        wandb.log({"overfit_test_plot": wandb.Image('overfit_test.png')})

        # Check if model can overfit
        if max(overfit_accs) > 95:
            print("Model passed the overfitting test! Proceeding with full training.")
            return True
        else:
            print("Warning: Model may have issues with gradient flow as it didn't achieve high accuracy on the small dataset.")
            return False


In [None]:
passed_overfit_test = test_overfitting()

# Training model

In [24]:
def compute_loss(loader, model, criterion, device):
    """Compute the loss on a dataset without updating model parameters"""
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for images, labels in loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            total_loss += loss.item()
    return total_loss / len(loader)

def visualize_predictions(loader, model, emotion_labels, device, num_samples=8):
    """Visualize model predictions on a few samples"""
    model.eval()
    all_images = []
    all_labels = []
    all_preds = []

    with torch.no_grad():
        for images, labels in loader:
            images = images.to(device)
            outputs = model(images)
            _, preds = torch.max(outputs, 1)

            # Convert to CPU for visualization
            all_images.extend(images.cpu().numpy())
            all_labels.extend(labels.numpy())
            all_preds.extend(preds.cpu().numpy())

            if len(all_images) >= num_samples:
                break

    # Select a subset of images to display
    indices = np.random.choice(len(all_images), min(num_samples, len(all_images)), replace=False)

    # Create a grid of images with their true and predicted labels
    fig, axes = plt.subplots(2, 4, figsize=(15, 8))
    axes = axes.flatten()

    for i, idx in enumerate(indices):
        if i < len(axes):
            img = all_images[idx].squeeze()  # Remove channel dimension for grayscale
            img = (img * 0.5) + 0.5  # Denormalize

            true_label = emotion_labels[all_labels[idx]]
            pred_label = emotion_labels[all_preds[idx]]

            axes[i].imshow(img, cmap='gray')
            axes[i].set_title(f"True: {true_label}\nPred: {pred_label}")
            axes[i].axis('off')

    plt.tight_layout()
    plt.savefig('prediction_samples.png')
    wandb.log({"prediction_samples": wandb.Image('prediction_samples.png')})


In [28]:
def train_model(config=None):
    with wandb.init(project="Facial_Expression_Recognition_3", config=config):
        # Access all hyperparameter values from wandb.config
        config = wandb.config

        # Create model and dataloaders with the current config
        model = ImprovedCNN(dropout_rate=config.dropout_rate, hidden_dim=config.hidden_dim).to(device)
        train_loader, val_loader, test_loader, _ = create_dataloaders(config.batch_size)

        # Define loss function and optimizer
        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(model.parameters(), lr=config.learning_rate, weight_decay=config.weight_decay)

        # Learning rate scheduler
        scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'max', patience=3, factor=0.5, verbose=True)

        # Initialize best validation accuracy
        best_val_acc = 0
        best_model_path = f"best_model_{wandb.run.id}.pt"

        # Training loop
        num_epochs = 20
        for epoch in range(num_epochs):
            # Training phase
            model.train()
            train_loss = 0
            train_correct = 0
            train_total = 0

            for images, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} [Train]"):
                images = images.to(device)
                labels = labels.to(device)

                # Forward pass
                optimizer.zero_grad()
                outputs = model(images)
                loss = criterion(outputs, labels)

                # Backward pass and optimize
                loss.backward()
                optimizer.step()

                # Calculate accuracy
                _, predicted = torch.max(outputs.data, 1)
                train_total += labels.size(0)
                train_correct += (predicted == labels).sum().item()
                train_loss += loss.item()

            train_loss /= len(train_loader)
            train_acc = 100 * train_correct / train_total

            # Validation phase
            val_acc, val_preds, val_labels = compute_accuracy(val_loader, model, device)
            val_loss = compute_loss(val_loader, model, criterion, device)

            # Log metrics to wandb
            wandb.log({
                "epoch": epoch + 1,
                "train_loss": train_loss,
                "train_accuracy": train_acc,
                "val_loss": val_loss,
                "val_accuracy": val_acc,
                "learning_rate": optimizer.param_groups[0]['lr']
            })

            # Print progress
            print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%")

            # Update learning rate scheduler
            scheduler.step(val_acc)

            # Save the best model
            if val_acc > best_val_acc:
                best_val_acc = val_acc
                torch.save(model.state_dict(), best_model_path)
                print(f"Model improved! Saved checkpoint (Val Acc: {val_acc:.2f}%)")

                # Log confusion matrix for best model
                cm = confusion_matrix(val_labels, val_preds)
                plt.figure(figsize=(10, 8))
                sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                            xticklabels=list(emotion_labels.values()),
                            yticklabels=list(emotion_labels.values()))
                plt.xlabel('Predicted')
                plt.ylabel('True')
                plt.title(f'Confusion Matrix (Validation) - Epoch {epoch+1}')
                plt.tight_layout()
                plt.savefig(f'confusion_matrix_epoch_{epoch+1}.png')
                wandb.log({"confusion_matrix": wandb.Image(f'confusion_matrix_epoch_{epoch+1}.png')})

        # Load the best model for final evaluation
        model.load_state_dict(torch.load(best_model_path))

        # Test the model
        test_accuracy, test_preds, test_labels = compute_accuracy(test_loader, model, device)

        # Compute per-class metrics
        report = classification_report(test_labels, test_preds,
                                       target_names=list(emotion_labels.values()),
                                       output_dict=True)

        # Log test results
        wandb.log({
            "test_accuracy": test_accuracy,
            "test_f1_macro": report['macro avg']['f1-score'],
            "test_precision_macro": report['macro avg']['precision'],
            "test_recall_macro": report['macro avg']['recall']
        })

        # Log per-class metrics
        for emotion_idx, emotion_name in emotion_labels.items():
            if emotion_name in report:
                wandb.log({
                    f"test_{emotion_name}_f1": report[emotion_name]['f1-score'],
                    f"test_{emotion_name}_precision": report[emotion_name]['precision'],
                    f"test_{emotion_name}_recall": report[emotion_name]['recall']
                })

        # Create and log confusion matrix for test set
        cm = confusion_matrix(test_labels, test_preds)
        plt.figure(figsize=(10, 8))
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                    xticklabels=list(emotion_labels.values()),
                    yticklabels=list(emotion_labels.values()))
        plt.xlabel('Predicted')
        plt.ylabel('True')
        plt.title('Confusion Matrix (Test)')
        plt.tight_layout()
        plt.savefig('test_confusion_matrix.png')
        wandb.log({"test_confusion_matrix": wandb.Image('test_confusion_matrix.png')})

        # Visualize some predictions
        visualize_predictions(test_loader, model, emotion_labels, device)

        print(f"Final Test Accuracy: {test_accuracy:.2f}%")
        return model, test_accuracy


In [None]:
print("Running full model training...")
default_config = {
    'learning_rate': 0.001,
    'batch_size': 64,
    'dropout_rate': 0.3,
    'weight_decay': 1e-5,
    'hidden_dim': 128
}

In [None]:
model, test_accuracy = train_model(default_config)
print(f"Training completed with test accuracy: {test_accuracy:.2f}%")


In [None]:
print("\nRunning hyperparameter sweep to find the best model...")
wandb.agent(sweep_id, train_model, count=3)


In [None]:
api = wandb.Api()
sweep = api.sweep(f"konstantine25b-free-university-of-tbilisi-/Facial_Expression_Recognition_3/sweeps/{sweep_id}")
runs = sorted(sweep.runs, key=lambda run: run.summary.get("val_accuracy", 0), reverse=True)
if runs:
    best_run = runs[0]
    print(f"\nBest run found: {best_run.name}")
    print(f"Best validation accuracy: {best_run.summary.get('val_accuracy', 0):.2f}%")
    print(f"Hyperparameters: {best_run.config}")

    # Load the best model for evaluation
    best_model_path = f"best_model_{best_run.id}.pt"

    try:
        # Create model with the best hyperparameters
        best_model = ImprovedCNN(
            dropout_rate=best_run.config.get('dropout_rate', 0.3),
            hidden_dim=best_run.config.get('hidden_dim', 128)
        ).to(device)

        # Load the model weights
        best_model.load_state_dict(torch.load(best_model_path))

        # Create dataloaders with the best batch size
        _, _, test_loader, _ = create_dataloaders(best_run.config.get('batch_size', 64))

        # Test the model
        test_accuracy, test_preds, test_labels = compute_accuracy(test_loader, best_model, device)

        # Compute per-class metrics
        report = classification_report(test_labels, test_preds,
                                      target_names=list(emotion_labels.values()),
                                      output_dict=True)

        print(f"\nFinal Test Accuracy with best model: {test_accuracy:.2f}%")
        print("\nClassification Report:")
        print(classification_report(test_labels, test_preds, target_names=list(emotion_labels.values())))

        # Create confusion matrix for test set
        cm = confusion_matrix(test_labels, test_preds)
        plt.figure(figsize=(10, 8))
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                    xticklabels=list(emotion_labels.values()),
                    yticklabels=list(emotion_labels.values()))
        plt.xlabel('Predicted')
        plt.ylabel('True')
        plt.title('Confusion Matrix (Test) - Best Model')
        plt.tight_layout()
        plt.savefig('best_model_confusion_matrix.png')

        # Visualize some predictions with the best model
        visualize_predictions(test_loader, best_model, emotion_labels, device)

    except Exception as e:
        print(f"Error loading or evaluating best model: {e}")
        print("You may need to manually download the best model from W&B.")
else:
    print("No completed runs found in the sweep.")

In [33]:
wandb.finish()