In [None]:
#!/usr/bin/env python3
"""
bayes_opt_cifar10.py

A reproducible, modular example that:
- Defines a small, parameterized CNN for CIFAR-10 (PyTorch).
- Uses scikit-optimize (skopt) for Bayesian Optimization (gp_minimize).
- Compares to a Random Search baseline.
- Logs validation accuracy and saves best model.
- Designed to "run without error" if dependencies are installed.

Usage:
    pip install -r requirements.txt
    python bayes_opt_cifar10.py

Requirements (example):
    torch>=1.8, torchvision, scikit-optimize, numpy, tqdm

Author: ChatGPT (GPT-5 Thinking mini)
Date: 2025-11-19
"""
from __future__ import annotations

import math
import os
import random
import time
from typing import Dict, Tuple

import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from skopt import gp_minimize
from skopt.space import Categorical, Integer, Real
from skopt.utils import use_named_args
from torch.utils.data import DataLoader, random_split
from tqdm import tqdm

# -----------------------
# Configuration / Globals
# -----------------------
SEED = 42
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
DATA_DIR = "./data"
BEST_MODEL_PATH = "best_cifar10_model.pth"
NUM_CLASSES = 10
DEFAULT_EPOCHS = 12  # keep small by default to run quickly for testing
WORKERS = 4 if torch.cuda.is_available() else 0

# -----------------------
# Determinism
# -----------------------
def set_seed(seed: int = SEED) -> None:
    """Set random seeds for reproducibility."""
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)
    # Make CUDA deterministic â€” might affect performance
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False


set_seed(SEED)

# -----------------------
# Data helpers
# -----------------------
def get_dataloaders(
    batch_size: int = 128, val_fraction: float = 0.1
) -> Tuple[DataLoader, DataLoader, DataLoader]:
    """
    Returns train, val, test dataloaders for CIFAR-10.

    Args:
        batch_size: batch size for loaders.
        val_fraction: fraction of training set used for validation.

    Returns:
        train_loader, val_loader, test_loader
    """
    transform_train = transforms.Compose(
        [
            transforms.RandomHorizontalFlip(),
            transforms.RandomCrop(32, padding=4),
            transforms.ToTensor(),
            transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
        ]
    )
    transform_test = transforms.Compose(
        [
            transforms.ToTensor(),
            transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
        ]
    )

    os.makedirs(DATA_DIR, exist_ok=True)
    full_train = torchvision.datasets.CIFAR10(
        root=DATA_DIR, train=True, download=True, transform=transform_train
    )
    test_set = torchvision.datasets.CIFAR10(
        root=DATA_DIR, train=False, download=True, transform=transform_test
    )

    val_size = int(len(full_train) * val_fraction)
    train_size = len(full_train) - val_size
    train_set, val_set = random_split(
        full_train, [train_size, val_size], generator=torch.Generator().manual_seed(SEED)
    )

    train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=WORKERS)
    val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=False, num_workers=WORKERS)
    test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False, num_workers=WORKERS)

    return train_loader, val_loader, test_loader


# -----------------------
# Model
# -----------------------
class SimpleCNN(nn.Module):
    """A small configurable convolutional network for CIFAR-10."""

    def __init__(self, num_filters: int = 32, num_conv_layers: int = 3, dropout: float = 0.3):
        """
        Create a simple CNN.

        Args:
            num_filters: base number of filters in first conv layer.
            num_conv_layers: how many conv blocks (each block may double filters).
            dropout: dropout probability before final classifier.
        """
        super().__init__()
        layers = []
        in_ch = 3
        out_ch = num_filters
        for i in range(num_conv_layers):
            layers.append(nn.Conv2d(in_ch, out_ch, kernel_size=3, padding=1))
            layers.append(nn.BatchNorm2d(out_ch))
            layers.append(nn.ReLU(inplace=True))
            layers.append(nn.MaxPool2d(2))
            in_ch = out_ch
            out_ch = min(out_ch * 2, 512)
        self.encoder = nn.Sequential(*layers)
        # compute flattened feature size for CIFAR-10 (32x32) after pooling each block halves dims
        final_spatial = 32 // (2 ** num_conv_layers)
        final_channels = in_ch
        self.dropout = nn.Dropout(dropout)
        self.classifier = nn.Linear(final_channels * final_spatial * final_spatial, NUM_CLASSES)

    def forward(self, x):
        x = self.encoder(x)
        x = torch.flatten(x, 1)
        x = self.dropout(x)
        x = self.classifier(x)
        return x


# -----------------------
# Training + Evaluation
# -----------------------
def train_one_epoch(
    model: nn.Module,
    optimizer: optim.Optimizer,
    criterion,
    dataloader: DataLoader,
    device: torch.device,
) -> float:
    """Train model for one epoch; return average training loss."""
    model.train()
    running_loss = 0.0
    total = 0
    for x, y in dataloader:
        x, y = x.to(device), y.to(device)
        optimizer.zero_grad()
        out = model(x)
        loss = criterion(out, y)
        loss.backward()
        optimizer.step()
        batch_size = x.size(0)
        running_loss += loss.item() * batch_size
        total += batch_size
    return running_loss / total if total else 0.0


def evaluate(model: nn.Module, dataloader: DataLoader, device: torch.device) -> Tuple[float, float]:
    """
    Evaluate model: returns (avg_loss, accuracy)
    """
    model.eval()
    criterion = nn.CrossEntropyLoss()
    running_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for x, y in dataloader:
            x, y = x.to(device), y.to(device)
            out = model(x)
            loss = criterion(out, y)
            running_loss += loss.item() * x.size(0)
            preds = out.argmax(dim=1)
            correct += (preds == y).sum().item()
            total += x.size(0)
    avg_loss = running_loss / total if total else 0.0
    acc = correct / total if total else 0.0
    return avg_loss, acc


# -----------------------
# Objective for Bayesian Optimization
# -----------------------
# Define the hyperparameter search space
search_space = [
    Real(1e-4, 1e-1, prior="log-uniform", name="lr"),
    Integer(2, 4, name="num_conv_layers"),  # keeps model small
    Integer(16, 128, name="num_filters"),
    Real(0.0, 0.5, name="dropout"),
]


@use_named_args(search_space)
def objective(**params) -> float:
    """
    Objective function for Bayesian optimization.
    Returns negative validation accuracy (since skopt minimizes).
    """
    # Unpack parameters
    lr = float(params["lr"])
    num_conv_layers = int(params["num_conv_layers"])
    num_filters = int(params["num_filters"])
    dropout = float(params["dropout"])

    # Fixed training parameters for speed / reproducibility
    epochs = DEFAULT_EPOCHS
    batch_size = 128

    # Prepare data and model
    train_loader, val_loader, _ = get_dataloaders(batch_size=batch_size)
    model = SimpleCNN(num_filters=num_filters, num_conv_layers=num_conv_layers, dropout=dropout).to(DEVICE)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9, weight_decay=5e-4)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

    best_val_acc = 0.0
    # small training loop
    for epoch in range(epochs):
        _train_loss = train_one_epoch(model, optimizer, criterion, train_loader, DEVICE)
        val_loss, val_acc = evaluate(model, val_loader, DEVICE)
        scheduler.step()
        if val_acc > best_val_acc:
            best_val_acc = val_acc
        # Optional: early stopping (if convergence)
        # If training is noisy for small epochs, we avoid aggressive stopping.
    # We minimize so return negative validation accuracy
    return -best_val_acc


# -----------------------
# Random Search Baseline
# -----------------------
def random_search_trials(n_trials: int = 10):
    """
    Run random sampling across the same search space. Returns best validation accuracy and params.
    """
    rng = np.random.RandomState(SEED)
    best_acc = 0.0
    best_params = None
    for i in range(n_trials):
        # sample uniformly in log space for lr
        lr = float(10 ** rng.uniform(np.log10(1e-4), np.log10(1e-1)))
        num_conv_layers = int(rng.randint(2, 5))  # upper bound is exclusive
        num_filters = int(rng.randint(16, 129))
        dropout = float(rng.uniform(0.0, 0.5))
        # evaluate
        params = {"lr": lr, "num_conv_layers": num_conv_layers, "num_filters": num_filters, "dropout": dropout}
        acc = -objective(**params)  # objective returns -val_acc
        print(f"[Random] Trial {i+1}/{n_trials} - val_acc={acc:.4f} params={params}")
        if acc > best_acc:
            best_acc = acc
            best_params = params
    return best_acc, best_params


# -----------------------
# Run optimization and compare
# -----------------------
def run_optimization(n_calls: int = 12, random_search_trials_count: int = 6) -> None:
    """
    Run Bayesian Optimization and Random Search baseline, save best model found by Bayesian opt.
    """
    print("Starting Bayesian Optimization (skopt.gp_minimize)...")
    start_time = time.time()
    # gp_minimize will run the objective function n_calls times (including initial points).
    res = gp_minimize(
        func=objective,
        dimensions=search_space,
        acq_func="EI",  # expected improvement
        n_calls=n_calls,
        n_initial_points=4,
        random_state=SEED,
    )
    bo_time = time.time() - start_time
    best_index = res.x_iters.index(res.x)
    best_params = dict(lr=res.x[0], num_conv_layers=int(res.x[1]), num_filters=int(res.x[2]), dropout=float(res.x[3]))
    best_val_acc = -res.fun  # because we minimized negative val acc
    print(f"Bayes Opt finished in {bo_time:.1f}s. Best val acc: {best_val_acc:.4f}")
    print(f"Best params (BO): {best_params}")

    # Run a short random search baseline
    print("\nStarting Random Search baseline...")
    rs_start = time.time()
    rs_best_acc, rs_best_params = random_search_trials(n_trials=random_search_trials_count)
    rs_time = time.time() - rs_start
    print(f"Random Search finished in {rs_time:.1f}s. Best val acc: {rs_best_acc:.4f}")
    print(f"Best params (Random): {rs_best_params}")

    # As a final step, train a model with best BO params for a bit longer and save it
    print("\nTraining final model with BO best params and saving to disk...")
    train_loader, val_loader, test_loader = get_dataloaders(batch_size=128)
    final_model = SimpleCNN(
        num_filters=int(best_params["num_filters"]),
        num_conv_layers=int(best_params["num_conv_layers"]),
        dropout=float(best_params["dropout"]),
    ).to(DEVICE)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(final_model.parameters(), lr=float(best_params["lr"]), momentum=0.9, weight_decay=5e-4)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)
    final_epochs = max(DEFAULT_EPOCHS, 20)  # train a bit longer for final model
    best_val = 0.0
    for epoch in range(final_epochs):
        train_loss = train_one_epoch(final_model, optimizer, criterion, train_loader, DEVICE)
        val_loss, val_acc = evaluate(final_model, val_loader, DEVICE)
        scheduler.step()
        print(f"[Final] Epoch {epoch+1}/{final_epochs} train_loss={train_loss:.4f} val_acc={val_acc:.4f}")
        if val_acc > best_val:
            best_val = val_acc
            torch.save(final_model.state_dict(), BEST_MODEL_PATH)
    print(f"Final model saved to {BEST_MODEL_PATH} with val acc {best_val:.4f}")

    # Evaluate saved model on test set
    print("\nEvaluating saved model on test set...")
    final_model.load_state_dict(torch.load(BEST_MODEL_PATH, map_location=DEVICE))
    test_loss, test_acc = evaluate(final_model, test_loader, DEVICE)
    print(f"Test loss: {test_loss:.4f} Test accuracy: {test_acc:.4f}")

    # Summarize
    print("\nSUMMARY:")
    print(f"BO best val acc: {best_val_acc:.4f} params: {best_params}")
    print(f"Random best val acc: {rs_best_acc:.4f} params: {rs_best_params}")
    print(f"BO runtime: {bo_time:.1f}s  Random runtime: {rs_time:.1f}s")


# -----------------------
# Entrypoint
# -----------------------
if __name__ == "__main__":
    # Keep runtime reasonable by default. User can edit n_calls / trials above.
    run_optimization(n_calls=12, random_search_trials_count=6)
