<a href="https://colab.research.google.com/github/oUesio/MLP-CIFAR10/blob/main/MLP.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [35]:
from google.colab import drive
drive.mount('/content/drive')
import numpy as np
from torchvision import transforms, datasets
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
import matplotlib.pyplot as plt
import pandas as pd
import os
import csv
from itertools import product
import copy


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [36]:
# @title
def image_to_feature_vector(img, p):

    # Handle pytorch tensors
    img = img.transpose(1, 2, 0)

    # Height, Width, Channels
    H, W, C = img.shape

    # Break image into grid of patches
    patches = img.reshape(H // p, p, W // p, p, C)
    # Reorder so patches in order
    patches = patches.transpose(0, 2, 1, 3, 4)
    # Flatten into 1D vector
    return patches.reshape(-1)


def dataset_to_loader(dataset, batch_size, patch_size, shuffle):
    X = [] # Feature vectors
    y = [] # Labels
    for img, label in dataset:
        img = np.array(img)
        fv = image_to_feature_vector(img, patch_size)  # 4 or 8
        X.append(fv)
        y.append(label)

    X = np.stack(X)
    y = np.array(y)

    X_tensor = torch.tensor(X, dtype=torch.float32)
    y_tensor = torch.tensor(y, dtype=torch.long)

    # Create a dataset and dataloader
    tensor_dataset = TensorDataset(X_tensor, y_tensor)
    loader = DataLoader(tensor_dataset, batch_size=batch_size, shuffle=shuffle)
    return loader

In [37]:
# @title
# Function to check if dictionary exists
def dict_exists_in_csv(data, csv_file):
    if not os.path.isfile(csv_file):
        return False

    with open(csv_file, mode='r', newline='') as f:
        reader = csv.DictReader(f)
        for row in reader:
            match = True
            for key, value in data.items():
                if key in ['val_acc', 'val_loss']:
                    continue
                # Convert lists to string for comparison
                if isinstance(value, list):
                    value = str(value)
                if row[key] != str(value):
                    match = False
                    break
            if match:
                return True
    return False

# Write to CSV if not already present
def append_dict_to_csv(data, csv_file):
    file_exists = os.path.isfile(csv_file)
    if not dict_exists_in_csv(data, csv_file):
        with open(csv_file, mode='a', newline='') as f:
            writer = csv.DictWriter(f, fieldnames=data.keys())
            if not file_exists:
                writer.writeheader()
            # Convert lists to strings to store in CSV
            row_to_write = {k: str(v) if isinstance(v, list) else v for k, v in data.items()}
            writer.writerow(row_to_write)
        print("Dictionary added to CSV.")
    else:
        print("Dictionary already exists in CSV.")

def load_params_from_csv(csv_file):
    with open(csv_file, "r") as f:
        reader = csv.DictReader(f)
        row = next(reader)  # exactly one row

    hidden_sizes = [int(x) for x in row["hidden_sizes"].split("-")]

    # Activation mapping
    activation_map = {
        "ReLU": nn.ReLU,
        "LeakyReLU": nn.LeakyReLU,
        "GELU": nn.GELU
    }

    params = {
        "patch_size": int(row["patch_size"]),
        "optimizer": row["optimizer"].lower(),
        "learning_rate": float(row["learning_rate"]),
        "batch_size": int(row["batch_size"]),
        "hidden_sizes": hidden_sizes,
        "dropout_rate": float(row["dropout_rate"]),
        "activation": activation_map[row["activation"]],
        "weight_decay": float(row["weight_decay"]),
        "epochs": int(row["epochs"]),
        "batch_norm": row["batch_norm"] == "True",
        "lr_scheduler": row["learning_rate_scheduler"]
    }

    return params




## MLP Model

In [38]:
class MLP(nn.Module):
    def __init__(self, hidden_sizes, activation, dropout_rate, use_batchnorm):
        super().__init__()
        layers = []
        last_size = 3072 # input size for flattened images
        # Hidden layers
        for h in hidden_sizes:
            layers.append(nn.Linear(last_size, h))
            #if use_batchnorm:
            layers.append(nn.BatchNorm1d(h))
            layers.append(activation())
            if dropout_rate > 0:
                layers.append(nn.Dropout(dropout_rate))
            last_size = h
        # Output layer
        layers.append(nn.Linear(last_size, 10)) # 10 classes
        self.model = nn.Sequential(*layers)

    def forward(self, x):
        return self.model(x)


## Training and Validation

In [39]:
def train_mlp(train_split, val_split, hidden_sizes, activation, dropout_rate, optimizer_name, learning_rate, batch_size, epochs, patience, patch_size, weight_decay, use_batchnorm, learning_rate_scheduler):
    # Loss
    criterion = nn.CrossEntropyLoss()

    '''# Split training dataset into 80:20 train/validation
    val_size = int(0.1 * len(train_dataset))
    train_size = len(train_dataset) - val_size
    train_split, val_split = torch.utils.data.random_split(train_dataset, [train_size, val_size])'''

    # Create data loaders
    train_loader = dataset_to_loader(train_split, batch_size, patch_size, shuffle=True)
    val_loader = dataset_to_loader(val_split, batch_size, patch_size, shuffle=False)

    model = MLP(hidden_sizes, activation, dropout_rate, use_batchnorm)

    # Optimiser with weight decay
    if optimizer_name == 'adam': optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
    elif optimizer_name == 'sgd': optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9, weight_decay=weight_decay)
    elif optimizer_name == 'adamw': optimizer = optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
    elif optimizer_name == 'rmsprop': optimizer = optim.RMSprop(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
    elif optimizer_name == 'adagrad': optimizer = optim.Adagrad(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
    elif optimizer_name == 'adadelta': optimizer = optim.Adadelta(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
    elif optimizer_name == 'nadam': optimizer = optim.NAdam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)


    # Learning rate scheduler
    if learning_rate_scheduler == 'reduce':  scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2)
    elif learning_rate_scheduler == 'step': scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.1)
    elif learning_rate_scheduler == 'expon': scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.95)
    elif learning_rate_scheduler == 'cosine': scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=50)

    # Early stopping
    best_val_loss = float("inf")
    best_model_wts = copy.deepcopy(model.state_dict())
    epochs_no_improve = 0

    for epoch in range(epochs):
        # Training
        model.train()
        correct = 0
        total = 0
        for X, y in train_loader:
            # Normalise inputs [0,1]
            #X = X.float() / 255.0
            # Clear previous gradients
            optimizer.zero_grad()
            # Forward pass (predictions)
            outputs = model(X)
            # Compute loss
            loss = criterion(outputs, y)
            # Compute gradients (backpropagation)
            loss.backward()
            # Update model weights
            optimizer.step()

            # Calculate training accuracy
            preds = outputs.argmax(dim=1)
            correct += (preds == y).sum().item()
            total += y.size(0)
        train_acc = correct / total

        # Validation
        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0
        # No gradients needed for validation
        with torch.no_grad():
            for val_X, val_y in val_loader:
                #val_X = val_X.float() / 255.0
                outputs = model(val_X)
                loss = criterion(outputs, val_y)
                val_loss += loss.item()

                # Calculate validation accuracy
                preds = outputs.argmax(dim=1)
                correct += (preds == val_y).sum().item()
                total += val_y.size(0)
        val_loss /= len(val_loader)
        val_acc = correct / total

        # Learning rate adjustement
        if learning_rate_scheduler == 'reduce':
            scheduler.step(val_loss)
        else:
            scheduler.step()

        # Early stopping
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_model_wts = copy.deepcopy(model.state_dict())
            epochs_no_improve = 0
        else:
            epochs_no_improve += 1

        if epochs_no_improve >= patience:
            print(f"Early stopping triggered at epoch {epoch+1}")
            break

    model.load_state_dict(best_model_wts)
    return model, train_acc, val_acc, best_val_loss

In [40]:
'''

train_transforms = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomCrop(32, padding=4),
    transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465),
                         (0.2023, 0.1994, 0.2010))
])

val_transforms = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465),
                         (0.2023, 0.1994, 0.2010))
])


'''

'\n\ntrain_transforms = transforms.Compose([\n    transforms.RandomHorizontalFlip(),\n    transforms.RandomCrop(32, padding=4),\n    transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3),\n    transforms.ToTensor(),\n    transforms.Normalize((0.4914, 0.4822, 0.4465),\n                         (0.2023, 0.1994, 0.2010))\n])\n\nval_transforms = transforms.Compose([\n    transforms.ToTensor(),\n    transforms.Normalize((0.4914, 0.4822, 0.4465),\n                         (0.2023, 0.1994, 0.2010))\n])\n\n\n'

In [None]:
'''train_transform = transforms.Compose([
    transforms.AutoAugment(transforms.AutoAugmentPolicy.CIFAR10),
    transforms.ToTensor(),
])'''

train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomCrop(32, padding=4),
    transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465),
                         (0.2023, 0.1994, 0.2010))
])

val_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465),
                         (0.2023, 0.1994, 0.2010))
])

train_aug = datasets.CIFAR10(root='./data', train=True, download=True, transform=train_transform)
train_noaug = datasets.CIFAR10(root='./data', train=True, download=True, transform=val_transform)

val_size = int(0.1 * len(train_aug))
train_size = len(train_aug) - val_size

train_indices, val_indices = torch.utils.data.random_split(range(len(train_aug)), [train_size, val_size])

train_split = torch.utils.data.Subset(train_aug, train_indices)
val_split = torch.utils.data.Subset(train_noaug, val_indices)


# Model parameters
epochs = 5 # keep as 5 until get better results
patience = 5

# Hyperparameters
patch_sizes = [8]#, 8] # 4
optimizers = ['adamw', 'adam'] #, 'rmsprop', 'adagrad', 'adadelta', 'nadam'] # adamW # 'adam' sgd
learning_rates = [0.001]#, 0.01]#, 0.002] #, 0.001]#, 0.002] # 0.001
batch_sizes = [16, 32]#, 64] #, 64] # 32 64, 128, 256 16
hidden_sizes_options = [[1024, 512, 256, 256, 128, 64], [1024, 512, 256, 256, 128, 128, 64]] # [[512, 256, 128], [256, 128]] #[512, 256, 128, 64], [1024, 512, 256, 128, 64], [512, 256, 128], [1024, 512, 256, 256, 128, 128, 64]] [256, 128], [512, 256, 128],
dropout_rates = [0.1]#, 0.2] #, 0.2] # 0, 0.05, 0.1,
activations = [nn.GELU, nn.LeakyReLU, nn.ReLU] # LeakyReLU # nn.ReLU
weight_decays = [0.0005]#, 0.001]#, 0.0005, 0.001] # 0.00001, 0
batch_norm = [True, False]#, False] # False
learning_rate_schedulers = ['expon', 'cosine', 'reduce'] # 'step'

# RIGHT NOW JUST WANT TO TEST LEARNING RATE SCHEDULERS


grid = product(patch_sizes, optimizers, learning_rates, batch_sizes, hidden_sizes_options, dropout_rates, activations, weight_decays, batch_norm, learning_rate_schedulers)

results = []
csv_file = "/content/drive/MyDrive/data/resultsQ5.csv"
csv_best_params = "/content/drive/MyDrive/data/Q5_best_params.csv"

if os.path.exists(csv_best_params):
    best_val_loss = pd.read_csv(csv_best_params).loc[0, "val_loss"]
else:
    best_val_loss = float("inf")

for pat, opt, lr, bs, hs, dr, act, wd, bn, lrs in grid:
    params = {"patch_size": pat, "optimizer": opt, "learning_rate": lr, "batch_size": bs, "hidden_sizes": hs, "dropout_rate": dr, "activation": act.__name__, "weight_decay": wd, "epochs": epochs, "batch_norm": bn, "learning_rate_scheduler": lrs}
    if not dict_exists_in_csv(params, csv_file):
        model, params["train_acc"], params["val_acc"], params["val_loss"] = train_mlp(train_split, val_split, hs, act, dr, opt, lr, bs, epochs, patience, pat, wd, bn, lrs)
        append_dict_to_csv(params, csv_file)
        results.append(params)
        if params["val_loss"] < best_val_loss:
            print(f"!!! Found new best: {params["val_loss"]}")
            best_val_loss = params["val_loss"]
            params["hidden_sizes"] = '-'.join(map(str, params["hidden_sizes"]))
            pd.DataFrame(params, index=[0]).to_csv(csv_best_params, index=False) # Replace if found better
            torch.save(model.state_dict(), "/content/drive/MyDrive/data/Q5_mlp.pt")
        print(f"PATCH: {pat}, OPT: {opt}, LR: {lr}, BS: {bs}, HS: {hs}, DR: {dr}, ACT: {act.__name__}, WD: {wd}, BN: {bn}, LRS: {lrs}, EPOCHS: {epochs}, Train Acc: {params["train_acc"]:.4f}, Val Acc: {params["val_acc"]:.4f}, Val Loss: {params["val_loss"]:.4f}")



Dictionary added to CSV.
PATCH: 8, OPT: adamw, LR: 0.001, BS: 16, HS: [1024, 512, 256, 256, 128, 64], DR: 0.1, ACT: GELU, WD: 0.0005, BN: True, LRS: expon, EPOCHS: 5, Train Acc: 0.4037, Val Acc: 0.4504, Val Loss: 1.5136


In [None]:
csv_file = "/content/drive/MyDrive/data/resultsQ5.csv"

results_df = pd.read_csv(csv_file)

# Sort by validation accuracy
#results_df = results_df.sort_values(by="val_acc", ascending=False)
results_df = results_df[results_df["epochs"] == 5].sort_values(by="val_acc", ascending=False)

# Select top 10 combinations
top_combinations = results_df.head(20)

# Display top combinations
print(top_combinations)

# Best parameter combination



## Testing

In [None]:

def test_mlp(model_path):
    csv_best_params = "/content/drive/MyDrive/data/Q5_best_params.csv"
    params = load_params_from_csv(csv_best_params)

    # Create test loader
    test_dataset = datasets.CIFAR10(root='./data', train=False, download=True)
    test_loader = dataset_to_loader(test_dataset, params["batch_size"], params["patch_size"], False)

    # Recreate model
    model = MLP(params["hidden_sizes"] , params["activation"], params["dropout_rate"], params["batch_norm"])

    # Load saved weights
    model.load_state_dict(torch.load(model_path))
    model.eval()

    correct = 0
    total = 0
    all_predicted = []
    all_labels = []

    with torch.no_grad():
        for X, y in test_loader:
            X = X.float() / 255.0  # Same normalisation as training
            outputs = model(X)
            preds = outputs.argmax(dim=1)

            correct += (preds == y).sum().item()
            total += y.size(0)

            all_predicted.extend(preds.numpy())
            all_labels.extend(y.numpy())

    test_acc = (correct / total) * 100 # Percentage
    return test_acc, all_predicted, all_labels



In [None]:
model_path = "/content/drive/MyDrive/data/Q5_mlp.pt"
acc, preds, labels = test_mlp(model_path)

print(acc)

# make some visualisations maybe?