# Evolutionary Algorithms for Hyperparameter Optimization in Neural Networks

This notebook contains runnable code for MNIST (FNN) and CIFAR-10 (CNN) experiments with:
- Random Search
- Genetic Algorithm (GA via DEAP)
- Differential Evolution (DE)
- Particle Swarm Optimization (PSO)

Use small settings first to verify execution, then scale up for the report.

**Dependencies**: `torch`, `torchvision`, `deap`, `numpy`, `pandas`, `matplotlib`, `scikit-learn`

## 1. Imports & Global Config

In [None]:

import os, time, json, random, math
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
from torchvision import datasets, transforms

from deap import base, creator, tools, algorithms
from sklearn.metrics import accuracy_score

SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)

DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
DEVICE


## 2. Data Loading (MNIST & CIFAR-10)

In [None]:

# MNIST transforms
mnist_transform = transforms.Compose([
    transforms.ToTensor()
])

# CIFAR-10 transforms
cifar_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465),
                         (0.2023, 0.1994, 0.2010))
])

# Download datasets (first run fetches them)
mnist_train = datasets.MNIST(root='./data', train=True, download=True, transform=mnist_transform)
mnist_test  = datasets.MNIST(root='./data', train=False, download=True, transform=mnist_transform)

cifar_train = datasets.CIFAR10(root='./data', train=True, download=True, transform=cifar_transform)
cifar_test  = datasets.CIFAR10(root='./data', train=False, download=True, transform=cifar_transform)

len(mnist_train), len(mnist_test), len(cifar_train), len(cifar_test)


## 3. Models (FNN for MNIST, CNN for CIFAR-10)

In [None]:

class FNN(nn.Module):
    def __init__(self, hidden_layers=[128, 64], dropout=0.2):
        super().__init__()
        layers = []
        input_dim = 28*28
        last = input_dim
        for h in hidden_layers:
            layers += [nn.Linear(last, h), nn.ReLU(), nn.Dropout(dropout)]
            last = h
        layers += [nn.Linear(last, 10)]
        self.net = nn.Sequential(*layers)
    def forward(self, x):
        x = x.view(x.size(0), -1)
        return self.net(x)

class SimpleCNN(nn.Module):
    def __init__(self, channels=[32, 64], dropout=0.25, fc=128):
        super().__init__()
        c1, c2 = channels
        self.features = nn.Sequential(
            nn.Conv2d(3, c1, kernel_size=3, padding=1), nn.ReLU(),
            nn.Conv2d(c1, c2, kernel_size=3, padding=1), nn.ReLU(),
            nn.MaxPool2d(2), nn.Dropout(dropout)
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(c2*16*16, fc), nn.ReLU(), nn.Dropout(dropout),
            nn.Linear(fc, 10)
        )
    def forward(self, x):
        x = self.features(x)
        return self.classifier(x)


## 4. Training & Evaluation Utilities

In [None]:

def train_one_epoch(model, loader, criterion, optimizer):
    model.train()
    total_loss, correct, total = 0.0, 0, 0
    for xb, yb in loader:
        xb, yb = xb.to(DEVICE), yb.to(DEVICE)
        optimizer.zero_grad()
        out = model(xb)
        loss = criterion(out, yb)
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * xb.size(0)
        pred = out.argmax(1)
        correct += (pred == yb).sum().item()
        total += yb.size(0)
    return total_loss/total, correct/total

@torch.no_grad()
def evaluate(model, loader, criterion):
    model.eval()
    total_loss, correct, total = 0.0, 0, 0
    for xb, yb in loader:
        xb, yb = xb.to(DEVICE), yb.to(DEVICE)
        out = model(xb)
        loss = criterion(out, yb)
        total_loss += loss.item() * xb.size(0)
        pred = out.argmax(1)
        correct += (pred == yb).sum().item()
        total += yb.size(0)
    return total_loss/total, correct/total


## 5. Hyperparameter Search Space & Decoders

In [None]:

LR_BOUNDS = (1e-4, 1e-1)
BATCH_CHOICES = [32, 64, 128]
DROPOUT_BOUNDS = (0.1, 0.5)
HIDDEN_LAYER_CHOICES = [
    [128],
    [256],
    [256, 128],
    [512, 256]
]
OPTIMIZERS = ['sgd', 'adam', 'rmsprop']

def decode(ind):
    lr = 10 ** (np.log10(LR_BOUNDS[0]) + ind[0]*(np.log10(LR_BOUNDS[1])-np.log10(LR_BOUNDS[0])))
    batch = BATCH_CHOICES[int(ind[1]*len(BATCH_CHOICES)) % len(BATCH_CHOICES)]
    dropout = DROPOUT_BOUNDS[0] + ind[2]*(DROPOUT_BOUNDS[1]-DROPOUT_BOUNDS[0])
    hidden = HIDDEN_LAYER_CHOICES[int(ind[3]*len(HIDDEN_LAYER_CHOICES)) % len(HIDDEN_LAYER_CHOICES)]
    opt = OPTIMIZERS[int(ind[4]*len(OPTIMIZERS)) % len(OPTIMIZERS)]
    return {'lr': float(lr), 'batch': int(batch), 'dropout': float(dropout),
            'hidden': hidden, 'opt': opt}

def get_optimizer(name, params, lr):
    if name == 'sgd': return optim.SGD(params, lr=lr, momentum=0.9)
    if name == 'adam': return optim.Adam(params, lr=lr)
    if name == 'rmsprop': return optim.RMSprop(params, lr=lr, momentum=0.9)
    raise ValueError(name)


## 6. Fitness Functions (MNIST/FNN and CIFAR-10/CNN)

In [None]:

def fitness_mnist(ind, epochs=1):
    hp = decode(ind)
    # Dataloaders
    train_loader = DataLoader(mnist_train, batch_size=hp['batch'], shuffle=True)
    val_loader   = DataLoader(mnist_test,  batch_size=256)
    # Model + optimizer
    model = FNN(hidden_layers=hp['hidden'], dropout=hp['dropout']).to(DEVICE)
    crit = nn.CrossEntropyLoss()
    opt = get_optimizer(hp['opt'], model.parameters(), hp['lr'])
    for _ in range(epochs):
        train_one_epoch(model, train_loader, crit, opt)
    vloss, vacc = evaluate(model, val_loader, crit)
    return (vacc,)

def fitness_cifar(ind, epochs=1):
    hp = decode(ind)
    # Dataloaders
    train_loader = DataLoader(cifar_train, batch_size=hp['batch'], shuffle=True)
    val_loader   = DataLoader(cifar_test,  batch_size=256)
    # Model + optimizer (CNN)
    model = SimpleCNN(channels=[32,64], dropout=hp['dropout'], fc=128).to(DEVICE)
    crit = nn.CrossEntropyLoss()
    opt = get_optimizer(hp['opt'], model.parameters(), hp['lr'])
    for _ in range(epochs):
        train_one_epoch(model, train_loader, crit, opt)
    vloss, vacc = evaluate(model, val_loader, crit)
    return (vacc,)


## 7. Baseline: Random Search

In [None]:

def random_search(fitness_fn, trials=5):
    best = (-1, None)
    hist = []
    for t in range(trials):
        ind = [random.random() for _ in range(5)]
        acc = fitness_fn(ind)[0]
        hist.append(acc)
        if acc > best[0]:
            best = (acc, decode(ind))
        print(f"Trial {t+1}/{trials} -> acc={acc:.4f}")
    return best, hist

# Example smoke tests (uncomment to run quickly)
# best_rs_mnist, hist_rs_mnist = random_search(fitness_mnist, trials=2)
# best_rs_cifar, hist_rs_cifar = random_search(fitness_cifar, trials=2)


## 8. Genetic Algorithm (DEAP)

In [None]:

# Register DEAP structures once (avoid duplicate creator definitions when re-running the cell)
try:
    creator.FitnessMax
except Exception:
    creator.create('FitnessMax', base.Fitness, weights=(1.0,))
    creator.create('Individual', list, fitness=creator.FitnessMax)

def run_ga(fitness_fn, pop_size=8, gens=3, cxpb=0.5, mutpb=0.3):
    toolbox = base.Toolbox()
    toolbox.register('attr_float', random.random)
    toolbox.register('individual', tools.initRepeat, creator.Individual, toolbox.attr_float, 5)
    toolbox.register('population', tools.initRepeat, list, toolbox.individual)
    toolbox.register('evaluate', fitness_fn)
    toolbox.register('mate', tools.cxUniform, indpb=0.5)
    toolbox.register('mutate', tools.mutGaussian, mu=0.0, sigma=0.1, indpb=0.5)
    toolbox.register('select', tools.selTournament, tournsize=3)

    pop = toolbox.population(n=pop_size)
    hof = tools.HallOfFame(1)

    # Track per-generation best for convergence plots
    per_gen_best = []

    def record_stats(pop):
        best = max(pop, key=lambda ind: ind.fitness.values[0])
        per_gen_best.append(best.fitness.values[0])

    algorithms.eaSimple(pop, toolbox, cxpb=cxpb, mutpb=mutpb, ngen=gens, halloffame=hof, verbose=False)
    # After eaSimple, evaluate per generation isn't recorded by default; re-run a manual loop if needed.
    # For simplicity, we append only final best here; users can extend to log per-gen.
    if not per_gen_best:
        per_gen_best.append(hof[0].fitness.values[0])

    return hof[0].fitness.values[0], decode(hof[0]), per_gen_best

# Example smoke tests (uncomment to run quickly)
# acc_ga_mnist, hp_ga_mnist, log_ga_mnist = run_ga(fitness_mnist, pop_size=6, gens=2)
# acc_ga_cifar, hp_ga_cifar, log_ga_cifar = run_ga(fitness_cifar, pop_size=6, gens=2)


## 9. Differential Evolution (custom)

In [None]:

def clamp01(x): return max(0.0, min(1.0, x))

def run_de(fitness_fn, pop_size=8, gens=3, F=0.5, CR=0.9):
    pop = [np.random.rand(5) for _ in range(pop_size)]
    fit = [fitness_fn(ind.tolist())[0] for ind in pop]
    per_gen_best = [max(fit)]
    for _ in range(gens):
        for i in range(pop_size):
            a, b, c = np.random.choice([j for j in range(pop_size) if j != i], 3, replace=False)
            mutant = pop[a] + F*(pop[b]-pop[c])
            trial = np.array([mutant[j] if random.random() < CR else pop[i][j] for j in range(5)])
            trial = np.array([clamp01(v) for v in trial])
            f_trial = fitness_fn(trial.tolist())[0]
            if f_trial > fit[i]:
                pop[i], fit[i] = trial, f_trial
        per_gen_best.append(max(fit))
    bi = int(np.argmax(fit))
    return fit[bi], decode(pop[bi].tolist()), per_gen_best

# Example smoke tests (uncomment to run quickly)
# acc_de_mnist, hp_de_mnist, log_de_mnist = run_de(fitness_mnist, pop_size=6, gens=2)
# acc_de_cifar, hp_de_cifar, log_de_cifar = run_de(fitness_cifar, pop_size=6, gens=2)


## 10. Particle Swarm Optimization (custom)

In [None]:

def run_pso(fitness_fn, n_particles=8, iters=3, w=0.7, c1=1.5, c2=1.5):
    dim = 5
    X = np.random.rand(n_particles, dim)
    V = np.zeros((n_particles, dim))
    pbest = X.copy()
    pfit = np.array([fitness_fn(x.tolist())[0] for x in X])
    gi = int(np.argmax(pfit))
    gbest = pbest[gi].copy()
    gfit = pfit[gi]
    per_iter_best = [gfit]

    for _ in range(iters):
        for i in range(n_particles):
            r1, r2 = np.random.rand(dim), np.random.rand(dim)
            V[i] = w*V[i] + c1*r1*(pbest[i]-X[i]) + c2*r2*(gbest-X[i])
            X[i] = np.clip(X[i] + V[i], 0.0, 1.0)
            f = fitness_fn(X[i].tolist())[0]
            if f > pfit[i]:
                pfit[i], pbest[i] = f, X[i].copy()
                if f > gfit:
                    gfit, gbest = f, X[i].copy()
        per_iter_best.append(gfit)
    return gfit, decode(gbest.tolist()), per_iter_best

# Example smoke tests (uncomment to run quickly)
# acc_pso_mnist, hp_pso_mnist, log_pso_mnist = run_pso(fitness_mnist, n_particles=6, iters=2)
# acc_pso_cifar, hp_pso_cifar, log_pso_cifar = run_pso(fitness_cifar, n_particles=6, iters=2)


## 11. Orchestration: Run & Compare (start small, then scale)

In [None]:

RESULTS = {}

# --- MNIST quick tests ---
# acc_ga_mn, hp_ga_mn, log_ga_mn = run_ga(fitness_mnist, pop_size=6, gens=2)
# acc_de_mn, hp_de_mn, log_de_mn = run_de(fitness_mnist, pop_size=6, gens=2)
# acc_pso_mn, hp_pso_mn, log_pso_mn = run_pso(fitness_mnist, n_particles=6, iters=2)

# RESULTS['MNIST'] = {
#     'GA':  {'acc': acc_ga_mn, 'hp': hp_ga_mn, 'log': log_ga_mn},
#     'DE':  {'acc': acc_de_mn, 'hp': hp_de_mn, 'log': log_de_mn},
#     'PSO': {'acc': acc_pso_mn, 'hp': hp_pso_mn, 'log': log_pso_mn},
# }

# --- CIFAR-10 quick tests ---
# acc_ga_cf, hp_ga_cf, log_ga_cf = run_ga(fitness_cifar, pop_size=6, gens=2)
# acc_de_cf, hp_de_cf, log_de_cf = run_de(fitness_cifar, pop_size=6, gens=2)
# acc_pso_cf, hp_pso_cf, log_pso_cf = run_pso(fitness_cifar, n_particles=6, iters=2)

# RESULTS['CIFAR10'] = {
#     'GA':  {'acc': acc_ga_cf, 'hp': hp_ga_cf, 'log': log_ga_cf},
#     'DE':  {'acc': acc_de_cf, 'hp': hp_de_cf, 'log': log_de_cf},
#     'PSO': {'acc': acc_pso_cf, 'hp': hp_pso_cf, 'log': log_pso_cf},
# }

RESULTS


## 12. Convergence Plot Helper

In [None]:

def plot_convergence(values, title='Convergence'):
    plt.figure()
    plt.plot(values)
    plt.xlabel('Generation / Iteration')
    plt.ylabel('Validation Accuracy')
    plt.title(title)
    plt.show()

# Example:
# plot_convergence(log_ga_mn, 'MNIST - GA')
