In [None]:
from google.colab import drive
drive.mount('/content/drive')

! pip install kaggle

! mkdir ~/.kaggle

!cp /content/drive/MyDrive/kaggle.json ~/.kaggle/kaggle.json

! chmod 600 ~/.kaggle/kaggle.json

! kaggle competitions download -c challenges-in-representation-learning-facial-expression-recognition-challenge

! unzip challenges-in-representation-learning-facial-expression-recognition-challenge

!pip install -q wandb

Mounted at /content/drive
Downloading challenges-in-representation-learning-facial-expression-recognition-challenge.zip to /content
 95% 271M/285M [00:00<00:00, 525MB/s]
100% 285M/285M [00:00<00:00, 538MB/s]
Archive:  challenges-in-representation-learning-facial-expression-recognition-challenge.zip
  inflating: example_submission.csv  
  inflating: fer2013.tar.gz          
  inflating: icml_face_data.csv      
  inflating: test.csv                
  inflating: train.csv               


In [4]:
import os
import pandas as pd
import numpy as np
from PIL import Image
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torch.optim.lr_scheduler import ReduceLROnPlateau
import wandb
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight

# Initialize W&B and Configuration
wandb.init(project="facial-expression-recognition", name="combined-vgg-style-net-v1.0")

config = {
    "epochs": 75,  # A deeper model with augmentation needs more epochs to train
    "batch_size": 64,  # A larger model may require a smaller batch size to fit in memory
    "learning_rate": 1e-3,
    "image_size": 48,
    "num_classes": 7,
    "num_workers": 2
}
wandb.config.update(config)

# Data Loading and Efficient Pre-processing
# Process pixel strings once for speed.
def string_to_array(pixel_string):
    return np.array(pixel_string.split(), dtype=np.uint8).reshape(config["image_size"], config["image_size"])

data_path = os.path.expanduser("/content/train.csv")
if not os.path.exists(data_path):
    print(f"Error: Data file not found at {data_path}")
    pass

full_train_df = pd.read_csv(data_path)
full_train_df['pixels_array'] = full_train_df['pixels'].apply(string_to_array)

train_df, val_df = train_test_split(
    full_train_df,
    test_size=0.1,
    stratify=full_train_df['emotion'],
    random_state=42
)

# Dataset Class
class FacialExpressionDataset(Dataset):
    def __init__(self, dataframe, transform=None):
        self.df = dataframe
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        image_array = self.df.iloc[idx]['pixels_array']
        image = Image.fromarray(image_array)
        label = int(self.df.iloc[idx]['emotion'])

        if self.transform:
            image = self.transform(image)

        return image, label

# Rich Data Augmentation
# Crucial for preventing the large model from overfitting.
train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(15),
    transforms.RandomAffine(degrees=0, translate=(0.1, 0.1), scale=(0.9, 1.1), shear=10),
    transforms.ColorJitter(brightness=0.2, contrast=0.2),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,)),
    transforms.RandomErasing(p=0.5, scale=(0.02, 0.1), ratio=(0.3, 3.3), value=0)
])

val_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

train_dataset = FacialExpressionDataset(train_df, transform=train_transform)
val_dataset = FacialExpressionDataset(val_df, transform=val_transform)

train_loader = DataLoader(
    train_dataset, batch_size=config["batch_size"], shuffle=True,
    num_workers=config["num_workers"], pin_memory=True
)
val_loader = DataLoader(
    val_dataset, batch_size=config["batch_size"], shuffle=False,
    num_workers=config["num_workers"], pin_memory=True
)

# Model Architecture
# We take the deep VGG-style feature extractor
# and combine it with the robust classifier head.
class CombinedVGGNet(nn.Module):
    def __init__(self, num_classes):
        super(CombinedVGGNet, self).__init__()

        # The powerful feature extractor
        self.features = nn.Sequential(
          # Block 1
          nn.Conv2d(1, 64, kernel_size=3, padding=1), nn.BatchNorm2d(64), nn.ReLU(inplace=True),
          nn.Conv2d(64, 64, kernel_size=3, padding=1), nn.BatchNorm2d(64), nn.ReLU(inplace=True),
          nn.MaxPool2d(kernel_size=2, stride=2),  # -> (64, 24, 24)

          # Block 2
          nn.Conv2d(64, 128, kernel_size=3, padding=1), nn.BatchNorm2d(128), nn.ReLU(inplace=True),
          nn.Conv2d(128, 128, kernel_size=3, padding=1), nn.BatchNorm2d(128), nn.ReLU(inplace=True),
          nn.MaxPool2d(kernel_size=2, stride=2),  # -> (128, 12, 12)

          # Block 3
          nn.Conv2d(128, 256, kernel_size=3, padding=1), nn.BatchNorm2d(256), nn.ReLU(inplace=True),
          nn.Conv2d(256, 256, kernel_size=3, padding=1), nn.BatchNorm2d(256), nn.ReLU(inplace=True),
          nn.MaxPool2d(kernel_size=2, stride=2),  # -> (256, 6, 6)

          # Block 4
          nn.Conv2d(256, 512, kernel_size=3, padding=1), nn.BatchNorm2d(512), nn.ReLU(inplace=True),
          nn.Conv2d(512, 512, kernel_size=3, padding=1), nn.BatchNorm2d(512), nn.ReLU(inplace=True),
          nn.MaxPool2d(kernel_size=2, stride=2),  # -> (512, 3, 3)
        )

        # The robust classifier head, adapted for the 512 channels
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1)) # Pool to a single 1x1 feature map

        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(512 * 1 * 1, 1024),
            nn.BatchNorm1d(1024),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(1024, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = self.classifier(x)
        return x

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CombinedVGGNet(num_classes=config["num_classes"]).to(device)

# Loss, Optimizer, and Scheduler
# Handle class imbalance and adapt the learning rate.
class_weights = compute_class_weight('balanced', classes=np.unique(train_df['emotion']), y=train_df['emotion'].to_numpy())
class_weights = torch.tensor(class_weights, dtype=torch.float).to(device)

criterion = nn.CrossEntropyLoss(weight=class_weights)
optimizer = torch.optim.Adam(model.parameters(), lr=config["learning_rate"])
scheduler = ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=5, verbose=True)

# Full Training and Validation Loop
wandb.watch(model, log="all")
best_val_acc = 0.0

for epoch in range(config["epochs"]):
    model.train()
    train_loss, train_correct = 0.0, 0
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        train_loss += loss.item() * images.size(0)
        _, preds = torch.max(outputs, 1)
        train_correct += (preds == labels).sum().item()

    model.eval()
    val_loss, val_correct = 0.0, 0
    with torch.no_grad():
        for images, labels in val_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            val_loss += loss.item() * images.size(0)
            _, preds = torch.max(outputs, 1)
            val_correct += (preds == labels).sum().item()

    train_loss /= len(train_loader.dataset)
    train_acc = train_correct / len(train_loader.dataset)
    val_loss /= len(val_loader.dataset)
    val_acc = val_correct / len(val_loader.dataset)

    scheduler.step(val_acc)

    wandb.log({
        "epoch": epoch + 1, "train_loss": train_loss, "train_accuracy": train_acc,
        "val_loss": val_loss, "val_accuracy": val_acc, "learning_rate": optimizer.param_groups[0]['lr']
    })

    print(f"Epoch {epoch+1:02d}: Train Acc={train_acc:.4f}, Val Acc={val_acc:.4f}, Train Loss={train_loss:.4f}, Val Loss={val_loss:.4f}")

    if val_acc > best_val_acc:
        best_val_acc = val_acc
        torch.save(model.state_dict(), "best_combined_model.pth")
        wandb.save("best_combined_model.pth")
        print(f"New best model saved with validation accuracy: {val_acc:.4f}")

wandb.finish()



Epoch 01: Train Acc=0.1593, Val Acc=0.1916, Train Loss=2.0018, Val Loss=1.9202
New best model saved with validation accuracy: 0.1916
Epoch 02: Train Acc=0.1744, Val Acc=0.2013, Train Loss=1.9400, Val Loss=1.9907
New best model saved with validation accuracy: 0.2013
Epoch 03: Train Acc=0.2173, Val Acc=0.2334, Train Loss=1.8765, Val Loss=1.7672
New best model saved with validation accuracy: 0.2334
Epoch 04: Train Acc=0.2874, Val Acc=0.2470, Train Loss=1.7751, Val Loss=2.0837
New best model saved with validation accuracy: 0.2470
Epoch 05: Train Acc=0.3525, Val Acc=0.3608, Train Loss=1.6747, Val Loss=1.5889
New best model saved with validation accuracy: 0.3608
Epoch 06: Train Acc=0.3788, Val Acc=0.3866, Train Loss=1.6195, Val Loss=1.5355
New best model saved with validation accuracy: 0.3866
Epoch 07: Train Acc=0.4037, Val Acc=0.3612, Train Loss=1.5596, Val Loss=1.5167
Epoch 08: Train Acc=0.4218, Val Acc=0.3856, Train Loss=1.5120, Val Loss=1.4805
Epoch 09: Train Acc=0.4411, Val Acc=0.4330, 

0,1
epoch,▁▁▁▁▁▂▂▂▂▃▃▃▃▃▃▃▄▄▄▄▅▅▅▅▅▅▆▆▆▆▇▇▇▇▇▇████
learning_rate,█████████████████████████████████████▁▁▁
train_accuracy,▁▂▃▄▅▅▅▅▆▆▆▆▆▆▆▆▆▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇▇█████
train_loss,██▆▅▅▅▄▄▄▄▄▃▃▃▃▃▃▃▃▃▂▃▂▂▂▂▂▂▂▂▂▂▂▂▂▂▁▁▁▁
val_accuracy,▁▂▃▄▃▅▅▆▆▅▇▇▇▆▇▇▇▇▇▇▇▇▇▇▇▇██████████████
val_loss,▇█▅▃▃▃▂▂▂▂▂▁▁▂▂▁▁▁▁▁▁▁▂▁▁▁▁▁▁▁▁▁▂▂▁▁▁▁▁▂

0,1
epoch,75.0
learning_rate,0.0005
train_accuracy,0.71751
train_loss,0.70047
val_accuracy,0.67607
val_loss,1.11631
