In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
from torchvision import datasets, transforms
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from IPython.display import display

import copy


## TODO 00: Add GPU support to the script - modify wherever needed (including possibly other places marked with/without 'TODO' )

In [None]:
# Use CPU/GPU
device = torch.device('cuda:0') if torch.cuda.is_available() else torch.device('cpu')
device

device(type='cuda', index=0)

#### 1. Config

In [None]:
# TODO 01: Add the required (but missing here) configuration parameters and their values

config = {
    "data_directory": "[INSERT DIRECTORY HERE]",  # Path to your dataset directory
    "num_layers": 4,
    "hidden_sizes": [768, 512, 384, 256],
    "activation": "relu",   # relu, leakyrelu, sigmoid
    'leakyrelu_negative_slope': 0.02,
    "dropout_probs": [0.35, 0.3, 0.25, 0.2],
    "normalization": "layernorm",   # "none", "batchnorm", "layernorm", "groupnorm"
    "batch_size": 128,
    "epochs": 50,
    "learning_rate": 3e-4,
    "weight_decay":  2e-3,
    "optimizer": "adam",    # adam or sgd
    "lr_scheduler": 'OneCycle',   # None, 'OneCycle', 'ReduceOnPlateau'
    "OneCycle": {'max_lr': 0.003},
    "patience": 10,

    #TODO 01: Missing Parameters
    "prototyping": False,
    "prototyping_train_frac": 0.2,
}

#### 2. Dataset

In [None]:
# Define transforms for CIFAR10 dataset
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465),      # Normalize image channels using CIFAR-10 mean
                         (0.2470, 0.2435, 0.2616))])    # Normalize RGB Standard Deviation

# Load CIFAR10 dataset from the specified directory
full_train = datasets.CIFAR10(config["data_directory"], train=True, download=True, transform=transform)
test_dataset = datasets.CIFAR10(config["data_directory"], train=False, download=True, transform=transform)

# Dataset parameters
input_size = 32 * 32 * 3    # 32x32 images in RGB (3 channels)
num_classes = 10            # 10 class outputs

if config["prototyping"]:
    # work with a fraction of the full dataset for quicker prototyping (e.g. 20%); keep
    # the train:frac ratio to 4:1
    train_size_frac = config["prototyping_train_frac"]

    subset_len = int(train_size_frac * len(full_train))
    remaining_len = len(full_train) - subset_len

    # Split full_train --> prototyping subset (train_val) + remaining
    train_val, _ = random_split(full_train, [subset_len, remaining_len])


    train_size = int(0.8 * subset_len)    # Update Training to 80%
    val_size = subset_len - train_size    # Update Val to %20


else:
    # Split train/val
    train_size = int(0.8 * len(full_train))
    val_size = len(full_train) - train_size
    train_val = full_train

train_dataset, val_dataset = random_split(train_val, [train_size, val_size])



train_loader = DataLoader(train_dataset, batch_size=config["batch_size"], shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=config["batch_size"], shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=config["batch_size"], shuffle=False)



100%|██████████| 170M/170M [00:03<00:00, 42.7MB/s]


#### 3. Model

In [None]:
# TODO 03: complete the following function
# Hint: try to minimize code reuse and use a loop where it makes sense

class MLP(nn.Module):
    def __init__(self, input_size, hidden_sizes, num_classes, activation, dropout_probs, normalization, leakyrelu_negative_slope=0.01):
        super(MLP, self).__init__()
        self.input_size = input_size
        self.layers = nn.ModuleList()

        # Choose activation function
        if activation == 'relu':
            act_fn = nn.ReLU
        elif activation == 'leakyrelu':
            act_fn = lambda: nn.LeakyReLU(negative_slope=leakyrelu_negative_slope)
        elif activation == 'sigmoid':
            act_fn = nn.Sigmoid
        else:
            raise ValueError(f"Unsupported activation: {activation}")

        # Build hidden layers
        prev_size = input_size
        for i, hidden_size in enumerate(hidden_sizes):
            # Linear layer
            self.layers.append(nn.Linear(prev_size, hidden_size))

            # Normalization
            if normalization == 'batchnorm':
                self.layers.append(nn.BatchNorm1d(hidden_size))
            elif normalization == 'layernorm':
                self.layers.append(nn.LayerNorm(hidden_size))
            elif normalization == 'groupnorm':
                num_groups = min(32, hidden_size)  # Ensure groups divides hidden_size
                self.layers.append(nn.GroupNorm(num_groups, hidden_size))

            # Activation
            self.layers.append(act_fn())

            # Dropout
            if i < len(dropout_probs):
                self.layers.append(nn.Dropout(dropout_probs[i]))

            prev_size = hidden_size

        # Output layer
        self.layers.append(nn.Linear(prev_size, num_classes))

    def forward(self, x):
        # Flatten the input
        x = x.view(x.size(0), -1)
        for layer in self.layers:
            x = layer(x)
        return x


# Instantiate model and move to device
model = MLP(
    input_size,
    config["hidden_sizes"],
    num_classes,
    config["activation"],
    config["dropout_probs"],
    config["normalization"],
    config['leakyrelu_negative_slope']
).to(device)

print(model)


#### 4. Training

In [None]:
# TODO 04 (optional): Add code to implement the scheduler(s) logic

def train_model(model, train_loader, val_loader, config):
    # TODO 05: Add code to initialize the optimizer according to config["optimizer"]
    if config['optimizer'] == 'adam':
        optimizer = optim.Adam(model.parameters(), lr=config['learning_rate'], weight_decay=config['weight_decay'])
    elif config['optimizer'] == 'sgd':
        optimizer = optim.SGD(model.parameters(), lr=config['learning_rate'], weight_decay=config['weight_decay'], momentum=0.9)
    else:
        raise ValueError(f"Unsupported optimizer: {config['optimizer']}")

    # TODO 06: Define the loss function
    criterion = nn.CrossEntropyLoss()

    # Initialize scheduler based on config
    scheduler = None
    if config['lr_scheduler'] == 'OneCycle':
        scheduler = optim.lr_scheduler.OneCycleLR(
            optimizer,
            max_lr=config['OneCycle']['max_lr'],
            steps_per_epoch=len(train_loader),
            epochs=config['epochs']
        )
    elif config['lr_scheduler'] == 'ReduceOnPlateau':
        scheduler = optim.lr_scheduler.ReduceLROnPlateau(
            optimizer,
            mode='max',
            factor=0.5,
            patience=config['patience']//2,
            verbose=True
        )

    # Early stopping variables
    best_val_acc = 0.0
    patience_counter = 0
    best_model_wts = None

    # Training loop
    for epoch in range(config["epochs"]):
        model.train()
        train_loss = 0.0
        correct = 0
        total = 0

        for X, y in train_loader:
            # TODO 07: Add code to train the model (with GPU support)
            X, y = X.to(device), y.to(device)              # Move data to device

            optimizer.zero_grad()                          # Zero the gradients
            outputs = model(X)                             # Forward pass
            loss = criterion(outputs, y)                   # Compute loss
            loss.backward()                                # Backward pass
            optimizer.step()                               # Update weights

            # Step scheduler if OneCycleLR
            if config['lr_scheduler'] == 'OneCycle':
                scheduler.step()

            train_loss += loss.item() * X.size(0)
            _, preds = torch.max(outputs, 1)
            correct += (preds == y).sum().item()
            total += y.size(0)

        train_loss /= total
        train_acc = correct / total

        # Validation
        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0
        with torch.no_grad():
            for X_val, y_val in val_loader:
                # TODO 08: Add code to validate the model (with GPU support)
                X_val, y_val = X_val.to(device), y_val.to(device)     # Move data to device

                outputs_val = model(X_val)                             # Forward pass
                loss_val = criterion(outputs_val, y_val)               # Compute loss

                val_loss += loss_val.item() * X_val.size(0)
                _, preds_val = torch.max(outputs_val, 1)
                val_correct += (preds_val == y_val).sum().item()
                val_total += y_val.size(0)

        val_loss /= val_total
        val_acc = val_correct / val_total

        # Step scheduler if ReduceLROnPlateau
        if config['lr_scheduler'] == 'ReduceOnPlateau':
            scheduler.step(val_acc)

        print(f"Epoch {epoch+1}/{config['epochs']}: "
              f"Train Loss={train_loss:.4f}, Train Acc={train_acc:.4f}, "
              f"Val Loss={val_loss:.4f}, Val Acc={val_acc:.4f}")

        # TODO 09: Add code to implement early stopping logic
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            best_model_wts = copy.deepcopy(model.state_dict())
            patience_counter = 0
        else:
            patience_counter += 1

        if patience_counter >= config['patience']:
            print(f"Early stopping at epoch {epoch+1}")
            break

    # TODO 10: Add code to restore the best model weights
    if best_model_wts is not None:
        model.load_state_dict(best_model_wts)

    return model


In [None]:
model = train_model(model, train_loader, val_loader, config)

#### 5. Testing

In [None]:

def evaluate(model, loader):
    model.eval()
    correct = 0
    total = 0
    all_preds, all_labels = [], []
    with torch.no_grad():
        for X, y in loader:
            # TODO 11: Add the missing lines to make testing work (with GPU support)
            X, y = X.to(device), y.to(device)                                   # Move data to device
            outputs = model(X)                                                  # Forward pass


            _, preds = torch.max(outputs, 1)
            correct += (preds == y).sum().item()
            total += y.size(0)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(y.cpu().numpy())
    return correct / total, np.array(all_preds), np.array(all_labels)

test_acc, preds, labels = evaluate(model, test_loader)
print(f"Overall Test Accuracy: {test_acc:.4f}")

# Per-class accuracy
for i in range(num_classes):
    idx = labels == i
    acc_i = (preds[idx] == labels[idx]).mean()
    print(f"Class {i}: {acc_i:.4f}")

# Confusion Matrix
cm = confusion_matrix(labels, preds)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=range(num_classes))
disp.plot(cmap=plt.cm.Blues)
plt.show()
