<a href="https://colab.research.google.com/github/sunyux/Exploring-Thompson-Sampling-for-CNN-Hyperparameter-Optimization/blob/main/Exploring_Thompson_Sampling_for_CNN_Hyperparameter_Optimization.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
from torchvision import datasets, transforms,models
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt

#get_data_loaders

*  mnist
*  cifar10


In [2]:
def get_data_loaders(dataset_name, batch_size):
    if dataset_name == "mnist":
        transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])
        train_dataset = datasets.MNIST(root="./data", train=True, download=True, transform=transform)
        test_dataset = datasets.MNIST(root="./data", train=False, download=True, transform=transform)
        input_channels = 1
    elif dataset_name == "cifar10":
        transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
        train_dataset = datasets.CIFAR10(root="./data", train=True, download=True, transform=transform)
        test_dataset = datasets.CIFAR10(root="./data", train=False, download=True, transform=transform)
        input_channels = 3
    else:
        raise ValueError(f"Unsupported dataset: {dataset_name}")
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    return train_loader, test_loader, input_channels

Module

*  LeNet
*  VGG16&VGG19
*  Resnet50
*  Transpose





In [3]:
def select_model(model_choice):
    if model_choice == "LeNet":
        return LeNet()
    elif model_choice == "VGG16":
        return get_vgg_model("VGG16", num_classes=10)
    elif model_choice == "VGG19":
        return get_vgg_model("VGG19", num_classes=10)
    elif model_choice == "ResNet50":
        return get_resnet50_model(num_classes=10)
    elif model_choice == "Transformer":
        return TransformerModel(num_classes=10)
    else:
        raise ValueError("Invalid model_choice. Choose from 'LeNet', 'VGG16', 'VGG19', or 'ResNet50'.")

In [4]:
class LeNet(nn.Module):
    def __init__(self, input_channels=1, num_classes=10, input_size=28):
        super(LeNet, self).__init__()
        self.conv1 = nn.Conv2d(input_channels, 6, kernel_size=5)
        self.conv2 = nn.Conv2d(6, 16, kernel_size=5)
        self.fc1 = nn.Linear(16 * 4 * 4, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, num_classes)
    def forward(self, x):
        x = torch.relu(self.conv1(x))
        x = torch.max_pool2d(x, 2)
        x = torch.relu(self.conv2(x))
        x = torch.max_pool2d(x, 2)
        x = x.view(-1, 16 * 4 * 4)
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

In [5]:
def get_vgg_model(model_name, num_classes):
    if model_name == "VGG16":
        model = models.vgg16(pretrained=True)
    elif model_name == "VGG19":
        model = models.vgg19(pretrained=True)

    # Freeze all convolutional layers to fine-tune only the classifier
    for param in model.features.parameters():
        param.requires_grad = False

    # Modify the classifier for the specific number of classes
    model.classifier[6] = nn.Linear(model.classifier[6].in_features, num_classes)
    return model

In [6]:
def get_resnet50_model(num_classes):
    model = models.resnet50(pretrained=True)

    # Freeze all layers except the last fully connected layer
    for param in model.parameters():
        param.requires_grad = False

    # Replace the final fully connected layer to match the number of classes
    model.fc = nn.Linear(model.fc.in_features, num_classes)
    return model

In [7]:
class Transpose(nn.Module):
    def __init__(self, dim0, dim1):
        super().__init__()
        self.dim0 = dim0
        self.dim1 = dim1

    def forward(self, x):
        return x.transpose(self.dim0, self.dim1)

class TransformerModel(nn.Module):
    def __init__(self, num_classes=10, input_channels=3, patch_size=4, hidden_dim=256, num_heads=8, num_layers=6, dropout=0.1):
        super(TransformerModel, self).__init__()

        self.patch_size = patch_size
        self.hidden_dim = hidden_dim

        # Calculate number of patches (assuming 32x32 input for CIFAR-10)
        self.num_patches = (32 // patch_size) ** 2
        self.patch_embed = nn.Sequential(
            nn.Conv2d(input_channels, hidden_dim, kernel_size=patch_size, stride=patch_size),
            nn.Flatten(2),
            Transpose(1, 2)
        )
        self.pos_embed = nn.Parameter(torch.zeros(1, self.num_patches, hidden_dim))
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=hidden_dim,
            nhead=num_heads,
            dim_feedforward=hidden_dim * 4,
            dropout=dropout,
            activation='gelu',
            batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.norm = nn.LayerNorm(hidden_dim)
        self.head = nn.Linear(hidden_dim, num_classes)
        nn.init.trunc_normal_(self.pos_embed, std=0.02)
        self.apply(self._init_weights)

    def forward(self, x):
        x = self.patch_embed(x)
        x = x + self.pos_embed
        x = self.transformer(x)
        x = torch.mean(x, dim=1)
        x = self.norm(x)
        x = self.head(x)
        return x

    def _init_weights(self, m):
        if isinstance(m, nn.Linear):
            nn.init.trunc_normal_(m.weight, std=0.02)
            if m.bias is not None:
                nn.init.zeros_(m.bias)
        elif isinstance(m, nn.LayerNorm):
            nn.init.zeros_(m.bias)
            nn.init.ones_(m.weight)

Thompson Sampling

*   LinearThompsonSampler
*   NonLinearThompsonSampler(Zhang, W., Zhou, D., Li, L., & Gu, Q. (2020). Neural Thompson Sampling. Proceedings of the International Conference on Learning Representations.)



In [8]:
def ThompsonSampling1(input_dim):
    class LinearThompsonSampler:
        def __init__(self, input_dim):
            self.A = np.identity(input_dim)
            self.b = np.zeros(input_dim)

        def sample(self):
            A_inv = np.linalg.inv(self.A)
            theta_hat = np.dot(A_inv, self.b)
            theta_sampled = np.random.multivariate_normal(theta_hat, A_inv)
            return theta_sampled

        def update(self, x_t, reward):
            x_t = np.reshape(x_t, (-1, 1))
            self.A += np.dot(x_t, x_t.T)
            self.b += reward * x_t.flatten()

    return LinearThompsonSampler(input_dim)

In [9]:
def ThompsonSampling(input_dim):
    class NonLinearThompsonSampler:
        def __init__(self, input_dim):
            # Neural network for mean approximation
            self.network = nn.Sequential(
                nn.Linear(input_dim, 64),
                nn.ReLU(),
                nn.Linear(64, 32),
                nn.ReLU(),
                nn.Linear(32, 1)
            )

            self.memory_x = []
            self.memory_y = []
            self.memory_size = 1000
            self.batch_size = 32
            self.optimizer = optim.Adam(self.network.parameters(), lr=0.001)
            self.beta = 1.0
            self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
            self.network.to(self.device)

        def sample(self):
            """
            Sample from the posterior distribution using Thompson Sampling
            Returns:
                numpy.ndarray: Sampled hyperparameters
            """
            sample_points = torch.randn(100, input_dim).to(self.device)
            with torch.no_grad():
                predictions = []
                for _ in range(10):  # Monte Carlo sampling
                    pred = self.network(sample_points)
                    predictions.append(pred)

                mean_pred = torch.mean(torch.stack(predictions), dim=0)
                std_pred = torch.std(torch.stack(predictions), dim=0)

                # Thompson sampling with uncertainty
                sampled_values = mean_pred + self.beta * std_pred
                best_idx = torch.argmax(sampled_values)

            return sample_points[best_idx].cpu().numpy()

        def update(self, x_t, reward):
            """
            Update the model with new observation
            Args:
                x_t (numpy.ndarray): Input hyperparameters
                reward (float): Observed reward (accuracy)
            """
            x_t = torch.FloatTensor(x_t).to(self.device)
            reward = torch.FloatTensor([reward]).to(self.device)
            self.memory_x.append(x_t)
            self.memory_y.append(reward)

            # Keep memory within size limit
            if len(self.memory_x) > self.memory_size:
                self.memory_x.pop(0)
                self.memory_y.pop(0)

            # Train network if enough samples
            if len(self.memory_x) >= self.batch_size:
                # Sample batch
                indices = np.random.choice(len(self.memory_x), self.batch_size, replace=False)
                batch_x = torch.stack([self.memory_x[i] for i in indices])
                batch_y = torch.stack([self.memory_y[i] for i in indices])

                # Train step
                self.optimizer.zero_grad()
                pred = self.network(batch_x)
                loss = nn.MSELoss()(pred, batch_y)
                loss.backward()
                self.optimizer.step()

                # Adjust exploration parameter
                self.beta = max(0.1, self.beta * 0.995)

    return NonLinearThompsonSampler(input_dim)

##Train data & Validate

In [10]:
def train_model(model, train_loader, config, device):
    model = model.to(device)
    optimizer = getattr(optim, config["optimizer"])(model.parameters(), lr=config["learning_rate"])
    criterion = nn.CrossEntropyLoss()
    for epoch in range(3):
        print(f"\nEpoch {epoch}")
        model.train()
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
        accuracy1 = validate_model(model, train_loader, criterion, device)
        print(f"\naccuracy1 {accuracy1}")
    accuracy = validate_model(model, train_loader, criterion, device)
    return accuracy

In [11]:
def validate_model(model, loader, criterion, device):
    model.eval()
    correct = 0
    with torch.no_grad():
        for inputs, labels in loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            correct += (outputs.argmax(1) == labels).sum().item()
    return correct / len(loader.dataset)

#Main function for find best performing hyperparameter

In [None]:
def main(model_choice, num_iterations=100):
    device = "cuda" if torch.cuda.is_available() else "cpu"

    # Initialize Thompson Sampling and Random Search
    sampler = ThompsonSampling(input_dim=3)
    random_search_results = []
    thompson_results = []
    accuracies_random_search = []
    accuracies_thompson = []

    for iteration in range(num_iterations):
        print(f"Testing Iteration {iteration}", flush=True)

        # --- Thompson Sampling ---
        x_t = np.random.rand(3)
        theta_sampled = sampler.sample()

        learning_rate = 1e-5 + (abs(theta_sampled[0] % 1) * (1e-3 - 1e-5))
        batch_size = int(abs(theta_sampled[1] * 64)) % 128 + 32
        optimizer = random.choice(["SGD", "Adam"])

        thompson_config = {
            "learning_rate": learning_rate,
            "batch_size": batch_size,
            "optimizer": optimizer,
        }
        train_loader, _, _ = get_data_loaders("cifar10", batch_size=thompson_config["batch_size"])
        model = select_model(model_choice)
        accuracy_thompson = train_model(model, train_loader, thompson_config, device)
        accuracies_thompson.append(accuracy_thompson)
        sampler.update(x_t, accuracy_thompson)
        thompson_results.append((thompson_config, accuracy_thompson))
        print(f"Thompson Sampling Iteration {iteration}: Accuracy = {accuracy_thompson:.4f}, Config = {thompson_config}")

        # --- Random Search ---
        learning_rate_rand = random.uniform(1e-5, 1e-3)
        batch_size_rand = random.choice([32, 64, 128])
        optimizer_rand = random.choice(["SGD", "Adam"])

        random_search_config = {
            "learning_rate": learning_rate_rand,
            "batch_size": batch_size_rand,
            "optimizer": optimizer_rand,
        }
        train_loader, _, _ = get_data_loaders("cifar10", batch_size=random_search_config["batch_size"])
        model = select_model(model_choice)
        accuracy_random_search = train_model(model, train_loader, random_search_config, device)
        accuracies_random_search.append(accuracy_random_search)
        random_search_results.append((random_search_config, accuracy_random_search))
        print(f"Random Search Iteration {iteration}: Accuracy = {accuracy_random_search:.4f}, Config = {random_search_config}")

    best_config_thompson, best_accuracy_thompson = max(thompson_results, key=lambda x: x[1])
    best_config_random_search, best_accuracy_random_search = max(random_search_results, key=lambda x: x[1])

    print(f"\nBest Thompson Sampling Config: {best_config_thompson}, Best Accuracy: {best_accuracy_thompson:.4f}")
    print(f"Best Random Search Config: {best_config_random_search}, Best Accuracy: {best_accuracy_random_search:.4f}")

    # Plot accuracy comparison over iterations
    plt.plot(range(len(accuracies_thompson)), accuracies_thompson, label='Thompson Sampling', color='b')
    plt.plot(range(len(accuracies_random_search)), accuracies_random_search, label='Random Search', color='r')
    plt.xlabel('Iteration')
    plt.ylabel('Accuracy')
    plt.title('Thompson Sampling vs Random Search Accuracy Trend')
    plt.legend()
    plt.show()

if __name__ == "__main__":
    model_choices = ["ResNet50"]
    for model_choice in model_choices:
        main(model_choice)

Testing Iteration 0
Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./data/cifar-10-python.tar.gz


100%|██████████| 170M/170M [00:02<00:00, 78.0MB/s]


Extracting ./data/cifar-10-python.tar.gz to ./data
Files already downloaded and verified


Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth
100%|██████████| 97.8M/97.8M [00:00<00:00, 124MB/s]



Epoch 0


##Main function for test best performing hyperparameter


* Change train_model epoch to a larger one
* Put hyperparameter in configurations list(eg:("ResNet50", 6.238483e-05, 66, "Adam"))



In [None]:
def main():
    device = "cuda" if torch.cuda.is_available() else "cpu"
    print(f"Using device: {device}")
    configurations = [
        # put data here !!!!!!!!!
         ("ResNet50", 6.238483e-05, 66, "Adam")

    ]

    results = []
    best_configs = {}
    for model_name, learning_rate, batch_size, optimizer in configurations:
        print(f"\nTesting: {model_name}")
        train_loader, test_loader, input_channels = get_data_loaders("cifar10", batch_size=batch_size)
        model = select_model(model_name)
        config = {
            "learning_rate": learning_rate,
            "batch_size": batch_size,
            "optimizer": optimizer
        }

        accuracy = train_model(model, train_loader, config, device)
        result = {
            "Model": model_name,
            "Learning Rate": learning_rate,
            "Batch Size": batch_size,
            "Optimizer": optimizer,
            "Accuracy": accuracy
        }
        results.append(result)
        if model_name not in best_configs or accuracy > best_configs[model_name]['Accuracy']:
            best_configs[model_name] = result

        print(f"Final Accuracy: {accuracy:.4f}")

    print("\nBest Configurations per Model Type:")
    for model_name, best_config in best_configs.items():
        print(f"\nBest {model_name} Configuration:")
        print(f"Learning Rate: {best_config['Learning Rate']:.6e}")
        print(f"Batch Size: {best_config['Batch Size']}")
        print(f"Optimizer: {best_config['Optimizer']}")
        print(f"Accuracy: {best_config['Accuracy']:.4f}")

    print("\nFinal Detailed Results:")
    for result in results:
        print(f"{result['Model']}\tLR: {result['Learning Rate']:.6e}\tBatch: {result['Batch Size']}\t"
              f"Optimizer: {result['Optimizer']}\tAccuracy: {result['Accuracy']:.4f}")

    return results

if __name__ == "__main__":
    results = main()