<a href="https://colab.research.google.com/github/jacobrdavis/CSE546_image_classification_on_cifar_10/blob/main/CSE546_image_classification_on_cifar_10.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Image Classification on CIFAR-10

In [None]:
import torch
from torch import nn
from torch.distributions import uniform
import numpy as np

from typing import Tuple, Union, List, Callable
from torch.optim import SGD
import torchvision
from torch.utils.data import DataLoader, TensorDataset, random_split
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm

Let's verify that we are using a gpu:

In [None]:
assert torch.cuda.is_available(), "GPU is not available, check the directions above (or disable this assertion to use CPU)"

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(DEVICE)  # this should print out CUDA

Load CIFAR-10 data.

In [None]:
train_dataset = torchvision.datasets.CIFAR10("./data", train=True, download=True, transform=torchvision.transforms.ToTensor())
test_dataset = torchvision.datasets.CIFAR10("./data", train=False, download=True, transform=torchvision.transforms.ToTensor())

Create the data loaders using a subset and a full set of data.

In [None]:
batch_size_full =  512
batch_size_subset = 256

subset = list(range(0, 10000))
train_subset = torch.utils.data.Subset(train_dataset, indices=subset)

train_data_subset, val_data_subset = random_split(train_subset, [int(0.9 * len(train_subset)), int( 0.1 * len(train_subset))])
train_data_full, val_data_full = random_split(train_dataset, [int(0.9 * len(train_dataset)), int( 0.1 * len(train_dataset))])

# Create separate dataloaders for the train, test, and validation set
train_subset_loader = DataLoader(
    train_data_subset,
    batch_size=batch_size_subset,
    shuffle=True
)

val_subset_loader = DataLoader(
    val_data_subset,
    batch_size=batch_size_subset,
    shuffle=True
)

train_full_loader = DataLoader(
    train_data_full,
    batch_size=batch_size_full,
    shuffle=True
)

val_full_loader = DataLoader(
    val_data_full,
    batch_size=batch_size_full,
    shuffle=True
)

test_loader = DataLoader(
    test_dataset,
    batch_size=batch_size_full,
    shuffle=True
)

In [None]:
print(len(train_subset_loader.dataset))
print(len(val_subset_loader.dataset))
print(len(train_full_loader.dataset))
print(len(val_full_loader.dataset))

Define the train function.

In [None]:
def train(
    model: nn.Module, optimizer: SGD,
    train_loader: DataLoader, val_loader: DataLoader,
    epochs: int = 20
)-> Tuple[List[float], List[float], List[float], List[float]]:
    """
    Trains a model for the specified number of epochs using the loaders.

    Returns: 
    Lists of training loss, training accuracy, validation loss, validation accuracy for each epoch.
    """
    batch_size = train_loader.batch_size
    loss = nn.CrossEntropyLoss()
    train_losses = []
    train_accuracies = []
    val_losses = []
    val_accuracies = []
    for e in tqdm(range(epochs)):
        model.train()
        train_loss = 0.0
        train_acc = 0.0

        # Main training loop; iterate over train_loader. The loop
        # terminates when the train loader finishes iterating, which is one epoch.
        for (x_batch, labels) in train_loader:
            x_batch, labels = x_batch.to(DEVICE), labels.to(DEVICE)
            optimizer.zero_grad()
            labels_pred = model(x_batch)
            batch_loss = loss(labels_pred, labels)
            train_loss = train_loss + batch_loss.item()

            labels_pred_max = torch.argmax(labels_pred, 1)
            batch_acc = torch.sum(labels_pred_max == labels)
            train_acc = train_acc + batch_acc.item()

            batch_loss.backward()
            optimizer.step()
        train_losses.append(train_loss / len(train_loader))
        train_accuracies.append(train_acc / (batch_size * len(train_loader)))

        # Validation loop; use .no_grad() context manager to save memory.
        model.eval()
        val_loss = 0.0
        val_acc = 0.0

        with torch.no_grad():
            for (v_batch, labels) in val_loader:
                v_batch, labels = v_batch.to(DEVICE), labels.to(DEVICE)
                labels_pred = model(v_batch)
                v_batch_loss = loss(labels_pred, labels)
                val_loss = val_loss + v_batch_loss.item()

                v_pred_max = torch.argmax(labels_pred, 1)
                batch_acc = torch.sum(v_pred_max == labels)
                val_acc = val_acc + batch_acc.item()
            val_losses.append(val_loss / len(val_loader))
            val_accuracies.append(val_acc / (batch_size * len(val_loader)))

    return train_losses, train_accuracies, val_losses, val_accuracies


### Fully-connected output, 1 fully-connected hidden layer:

$x^{out} = W_2 \mathrm{relu} (W_1 (x^{in}) + b_1 ) + b_2$

In [None]:
def fully_connected_neural_network(dim_in, m, dim_out) -> nn.Module:
    """Fully-connected output, 1 fully-connected hidden layer."""
    model =  nn.Sequential(
            nn.Flatten(),
            nn.Linear(dim_in, m),  # [in, out]
            nn.ReLU(),
            nn.Linear(m, dim_out)
         )
    return model.to(DEVICE)

#### Parameter search over the fully-connected output, 1 fully-connected hidden layer:

In [None]:
def parameter_search_fully_connected_nn(
    train_loader: DataLoader,
    val_loader: DataLoader,
    model_fn:Callable[[], nn.Module]
) -> float:
    """
    Parameter search for the neural network with fully-connected output, and 
    1 fully-connected hidden layer.

    Args:
    train_loader: the train dataloader.
    val_loader: the validation dataloader.
    model_fn: a function that, when called, returns a torch.nn.Module.

    Returns:
    The train/vald losses and accuracies, learning rates, momentum factors, and
    hidden layer sizes for each search iteration and train epoch.
    """
    dim_in = 3072
    dim_out = 10
    learning_rates = torch.logspace(-6, 0, 15)
    hidden_layer_sizes = torch.linspace(100, 600, 11) #  M
    momentum_factors = torch.linspace(0.05, 1.25, 13)
    n_epochs = 8

    num_searches = 50
    best_loss = torch.tensor(np.inf)

    results = {
        'learning_rate': [],
        'momentum_factor': [],
        'hidden_layer_size': [],
        'train_loss': [],
        'train_acc': [],
        'val_loss': [],
        'val_acc': [],
    }

    for i in range(num_searches):
        learning_rate_sampler = torch.randint(low=0, high=len(learning_rates), size=(1,))
        hidden_layer_sampler = torch.randint(low=0, high=len(hidden_layer_sizes), size=(1,))
        momentum_factor_sampler = torch.randint(low=0, high=len(momentum_factors), size=(1,))

        lr = learning_rates[learning_rate_sampler].item()
        momentum = momentum_factors[momentum_factor_sampler].item()
        m = int(hidden_layer_sizes[hidden_layer_sampler].item())

        print(f"lr: {lr}; momentum: {momentum}; m: {m}")

        model = model_fn(dim_in, m, dim_out)
        optim = SGD(model.parameters(), lr=lr, momentum=momentum)

        train_loss, train_acc, val_loss, val_acc = train(
            model,
            optim,
            train_loader,
            val_loader,
            epochs=n_epochs
        )

        results['learning_rate'].append(lr)
        results['momentum_factor'].append(momentum)
        results['hidden_layer_size'].append(m)
        results['train_loss'].append(train_loss)
        results['train_acc'].append(train_acc)
        results['val_loss'].append(val_loss)
        results['val_acc'].append(val_acc)

    return results

In [None]:
parameters = parameter_search_fully_connected_nn(train_subset_loader,
                                                 val_subset_loader,
                                                 fully_connected_neural_network)

for key, item in parameters.items():
    parameters[key] = np.array(item)

Extract the best 3 search results.

In [None]:
best_3_search_indices = np.argpartition(np.max(parameters['val_acc'][:,-5:], axis=1), -3)[-3:]
best_3_search_indices = best_3_search_indices[np.argsort(np.max(parameters['val_acc'], axis=1)[best_3_search_indices])]
best_3_search_indices

In [None]:
epochs = range(0, len(parameters['val_acc'][0]))
num_searches = len(parameters['learning_rate'])


fig, ax = plt.subplots(figsize=(8,8))

for i in range(num_searches):  # or in best_3_search_indices:
    ax.plot(epochs, parameters['val_acc'][i], label='validation')
    ax.set_ylabel('validation accuracy')
    ax.set_xlabel('epoch')
    ax.set_ylim([0, 0.6])
    line_label = (f"lr={np.round(parameters['learning_rate'][i], 8)}; "
                  f"mom={np.round(parameters['momentum_factor'][i], 5)}; "
                  f"m={parameters['hidden_layer_size'][i]}")
    ax.annotate(line_label, (epochs[-1], parameters['val_acc'][i][-1]))


Retrain the best-performing models on the full training dataset



In [None]:
dim_in = 3072
dim_out = 10

best_results = {
      'learning_rate': [],
      'momentum_factor': [],
      'hidden_layer_size': [],
      'train_loss': [],
      'train_acc': [],
      'val_loss': [],
      'val_acc': [],
  }

for i in best_3_search_indices:
  lr = parameters['learning_rate'][i]
  momentum = parameters['momentum_factor'][i]
  m = parameters['hidden_layer_size'][i]

  print(f"lr={np.round(lr, 5)}; "
        f"mom={np.round(momentum, 5)}; "
        f"m={m}; "
        f"batch_size={batch_size}" )

  n_epochs = 50 

  model = fully_connected_neural_network(dim_in, m, dim_out)
  optim = SGD(model.parameters(), lr=lr, momentum=momentum)

  train_loss, train_acc, val_loss, val_acc = train(
      model,
      optim,
      train_full_loader,
      val_full_loader,
      epochs=n_epochs
  )

  print(f'train_loss: {train_loss}')
  print(f'train_acc: {train_acc}')
  print(f'val_loss: {val_loss}')
  print(f'val_acc: {val_acc}')

  best_results['learning_rate'].append(lr)
  best_results['momentum_factor'].append(momentum)
  best_results['hidden_layer_size'].append(m)
  best_results['train_loss'].append(train_loss)
  best_results['train_acc'].append(train_acc)
  best_results['val_loss'].append(val_loss)
  best_results['val_acc'].append(val_acc)

for key, item in parameters.items():
    parameters[key] = np.array(item)

Plot the retrained, best-performing models.

In [None]:
epochs = range(0, n_epochs)

fig, ax = plt.subplots(2, 1, figsize=(8,8), sharex=True)
plt.subplots_adjust(wspace=0, hspace=0.1)

title_str = []
model_colors = ['seagreen', 'rebeccapurple', 'darkorange']

for i in range(len(best_results['learning_rate'])):

  ax[0].plot(epochs, best_results['val_acc'][i], label=f'model {i} validation', linestyle=':', color=model_colors[i])
  ax[0].plot(epochs, best_results['train_acc'][i], label=f'model {i} train', linestyle='-', color=model_colors[i])
  ax[0].set_ylabel('accuracy')
  ax[0].set_ylim([0, 0.7])
  ax[0].axhline(0.5, color='k', linestyle='--', linewidth=0.5)


  ax[1].plot(epochs, best_results['val_loss'][i], label=f'model {i} validation', linestyle=':', color=model_colors[i])
  ax[1].plot(epochs, best_results['train_loss'][i], label=f'model {i} train', linestyle='-', color=model_colors[i])
  ax[1].set_ylabel('loss')
  ax[1].set_xlabel('epoch')
  ax[1].set_ylim([1, 3])
  ax[1].legend(frameon=False, ncols=3, loc='upper center', bbox_to_anchor=(0.5, -.2))

  title_str.append(f"model {i}: "
                   f"lr={np.round(best_results['learning_rate'][i], 5)}; "
                   f"mom={np.round(best_results['momentum_factor'][i], 5)}; "
                   f"m={best_results['hidden_layer_size'][i]}")


ax[0].set_title('\n'.join(title_str))
fig.tight_layout()
# fig.savefig('best_3_fully_connected_neural_networks.png',  dpi=400)

### Convolutional layer with max-pool and fully-connected output:

$x^{out} = W_2 (\mathrm{MaxPool} (\mathrm{relu} ( \mathrm{Conv2d} (x^{in}, W_1) + b_1 ))) + b_2$

Where,

$\mathrm{Conv2d} (x^{in}, W_1) \in \R^{(33-k) \times (33-k) \times M}$

$\mathrm{MaxPool} (\mathrm{relu} ( \mathrm{Conv2d} (x^{in}, W_1) + b_1 )) \in \R^{(\frac{33-k}{N}) \times (\frac{33-k}{N}) \times M}$

$W_2 \in \R^{10 \times M(\frac{33-k}{N})^2};\; b_2 \in \R^{10}$

such that $M$, $k$, $N$ are model-specific hyperparameters.

In [None]:
def convolutional_neural_network(dim_in, m, k, n, dim_out) -> nn.Module:
    """Convolutional layer with max-pool and fully-connected output"""
    fc_input_size = int(m * ((33 - k)/n)**2)
    
    model =  nn.Sequential(
            nn.Conv2d(dim_in, m, k),  # (in, # filters, kernel size)
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(n, n)),
            nn.Flatten(),
            nn.Linear(fc_input_size, dim_out),
         )
    return model.to(DEVICE)

Determine pooled output size of the MaxPool layer (p) using the MaxPool size (N) and Conv2D size (k).

In [None]:
k_n_combs = [(5, 14), (5, 7), (6, 9), (8, 5)]

for k, n in k_n_combs:
    print(f"{k},{n}")
    p = np.divide(33-k, n)
    print(f"p={p}")

#### Parameter search over the convolutional layer with max-pool and fully-connected output:

In [None]:
def parameter_search_convolutional_nn(
    train_loader: DataLoader,
    val_loader: DataLoader,
    model_fn:Callable[[], nn.Module]
) -> float:
    """
    Parameter search over the neural network with a convolutional layer with 
    max-pool and fully-connected output.

    Args:
    train_loader: the train dataloader.
    val_loader: the validation dataloader.
    model_fn: a function that, when called, returns a torch.nn.Module.

    Returns:
    The train/vald losses and accuracies, learning rates, momentum factors,
    Conv2D size, number of Conv2D filters, and MaxPool size for each search
    iteration and train epoch.
    """
    dim_in = 3
    dim_out = 10
    learning_rates = torch.logspace(-4, 0, 15)
    momentum_factors = torch.linspace(0.05, 1.5, 20)

    conv2d_filters = torch.tensor([10, 20, 50, 100, 120, 150, 200])  # m_typ = 100;
    k_n_combs = [(5, 14), (5, 7), (6, 9), (8, 5)]  # k_typ = 5, n_typ = 14; pool_size = np.divide(33-k, n)
    n_epochs = 8
    num_searches = 50

    results = {
        'learning_rate': [],
        'momentum_factor': [],
        'conv2d_size': [],  # k
        'conv2d_filters': [],  # M
        'maxpool_size': [],  # N
        'train_loss': [],
        'train_acc': [],
        'val_loss': [],
        'val_acc': [],
    }

    for i in range(num_searches):
        learning_rate_sampler = torch.randint(low=0, high=len(learning_rates), size=(1,))
        momentum_factor_sampler = torch.randint(low=0, high=len(momentum_factors), size=(1,))
        conv2d_filters_sampler = torch.randint(low=0, high=len(conv2d_filters), size=(1,))
        k_n_combs_sampler = torch.randint(low=0, high=len(k_n_combs), size=(1,))

        lr = learning_rates[learning_rate_sampler].item()
        momentum = momentum_factors[momentum_factor_sampler].item()
        m = int(conv2d_filters[conv2d_filters_sampler].item())
        k, n = k_n_combs[k_n_combs_sampler]

        print(f"lr: {lr}; "
              f"momentum: {momentum}; "
              f"m: {m}; "
              f"k: {k}; "
              f"n: {n}; ")

        model = model_fn(dim_in, m, k, n, dim_out)

        optim = SGD(model.parameters(), lr=lr, momentum=momentum)

        train_loss, train_acc, val_loss, val_acc = train(
            model,
            optim,
            train_loader,
            val_loader,
            epochs=n_epochs
        )

        results['learning_rate'].append(lr)
        results['momentum_factor'].append(momentum)
        results['conv2d_size'].append(k)
        results['conv2d_filters'].append(m)
        results['maxpool_size'].append(n)
        results['train_loss'].append(train_loss)
        results['train_acc'].append(train_acc)
        results['val_loss'].append(val_loss)
        results['val_acc'].append(val_acc)

    return results

In [None]:
parameters = parameter_search_convolutional_nn(train_subset_loader,
                                               val_subset_loader,
                                               convolutional_neural_network)

for key, item in parameters.items():
    parameters[key] = np.array(item)

Extract the best-performing model indices.

In [None]:
best_search_index = np.argmax(np.max(parameters['val_acc'], axis=1))
best_3_search_indices = np.argpartition(np.max(parameters['val_acc'][:,-5:], axis=1), -3)[-3:]
best_3_search_indices = best_3_search_indices[np.argsort(np.max(parameters['val_acc'], axis=1)[best_3_search_indices])]
best_3_search_indices

In [None]:
epochs = range(0, len(parameters['val_acc'][0]))
num_searches = len(parameters['learning_rate'])

fig, ax = plt.subplots(figsize=(8,6))
for i in range(num_searches):  # or in best_3_search_indices:
    ax.plot(epochs, parameters['val_acc'][i], label='validation')
    ax.set_ylabel('validation accuracy')
    ax.set_xlabel('epoch')
    ax.set_ylim([0, 0.6])
    line_label = (f"lr={np.round(parameters['learning_rate'][i], 5)}; "
                  f"mom={np.round(parameters['momentum_factor'][i], 2)}; "
                  f"k={np.round(parameters['conv2d_size'][i], 5)}; "
                  f"M={np.round(parameters['conv2d_filters'][i], 5)}; "
                  f"N={parameters['maxpool_size'][i]}")

    ax.annotate(line_label, (epochs[-1], parameters['val_acc'][i][-1]))

fig, ax = plt.subplots(figsize=(8,6))
for i in range(num_searches):
    ax.plot(epochs, parameters['val_loss'][i], label='validation')
    ax.set_ylabel('loss')
    ax.set_xlabel('epoch')
    ax.set_ylim([1, 3])
    line_label = (f"lr={np.round(parameters['learning_rate'][i], 5)}; "
                  f"mom={np.round(parameters['momentum_factor'][i], 2)}; "
                  f"k={np.round(parameters['conv2d_size'][i], 5)}; "
                  f"M={np.round(parameters['conv2d_filters'][i], 5)}; "
                  f"N={parameters['maxpool_size'][i]}")

    ax.annotate(line_label, (epochs[-1], parameters['val_acc'][i][-1]))

Retrain the best three models

In [None]:
dim_in = 3
dim_out = 10

best_results = {
    'learning_rate': [],
    'momentum_factor': [],
    'conv2d_size': [],  # k
    'conv2d_filters': [],  # M
    'maxpool_size': [],  # N
    'train_loss': [],
    'train_acc': [],
    'val_loss': [],
    'val_acc': [],
  }

for i in best_3_search_indices:
  lr = parameters['learning_rate'][i]
  momentum = parameters['momentum_factor'][i]
  k = parameters['conv2d_size'][i]
  m = parameters['conv2d_filters'][i]
  n = parameters['maxpool_size'][i]

  print(f"lr={np.round(lr, 5)}; "
        f"mom={np.round(momentum, 5)}; "
        f"k={k}; "
        f"m={m}; "
        f"n={n}; "
        f"batch_size={batch_size_full}" )

  n_epochs = 65

  model = convolutional_neural_network(dim_in, m, k, n, dim_out)
  optim = SGD(model.parameters(), lr=lr, momentum=momentum)

  train_loss, train_acc, val_loss, val_acc = train(
      model,
      optim,
      train_full_loader,
      val_full_loader,
      epochs=n_epochs
  )

  print(f'train_loss: {train_loss}')
  print(f'train_acc: {train_acc}')
  print(f'val_loss: {val_loss}')
  print(f'val_acc: {val_acc}')


  best_results['learning_rate'].append(lr)
  best_results['momentum_factor'].append(momentum)
  best_results['conv2d_size'].append(k)
  best_results['conv2d_filters'].append(m)
  best_results['maxpool_size'].append(n)
  best_results['train_loss'].append(train_loss)
  best_results['train_acc'].append(train_acc)
  best_results['val_loss'].append(val_loss)
  best_results['val_acc'].append(val_acc)

for key, item in parameters.items():
    parameters[key] = np.array(item)

Plot the retrained best-performing models.

In [None]:
epochs = range(0, n_epochs)

fig, ax = plt.subplots(2, 1, figsize=(8,8), sharex=True)
plt.subplots_adjust(wspace=0, hspace=0.1)

title_str = []
model_colors = ['seagreen', 'rebeccapurple', 'darkorange']

for i in range(len(best_results['learning_rate'])):

  ax[0].plot(epochs, best_results['val_acc'][i], label=f'model {i} validation', linestyle=':', color=model_colors[i])
  ax[0].plot(epochs, best_results['train_acc'][i], label=f'model {i} train', linestyle='-', color=model_colors[i])
  ax[0].set_ylabel('accuracy')
  ax[0].set_ylim([0, 0.8])
  ax[0].axhline(0.65, color='k', linestyle='--', linewidth=0.5)


  ax[1].plot(epochs, best_results['val_loss'][i], label=f'model {i} validation', linestyle=':', color=model_colors[i])
  ax[1].plot(epochs, best_results['train_loss'][i], label=f'model {i} train', linestyle='-', color=model_colors[i])
  ax[1].set_ylabel('loss')
  ax[1].set_xlabel('epoch')
  ax[1].set_ylim([0.5, 3])
  ax[1].legend(frameon=False, ncols=3, loc='upper center', bbox_to_anchor=(0.5, -.2))

  title_str.append(f"model {i}: "
                   f"lr={np.round(best_results['learning_rate'][i], 5)}; "
                   f"mom={np.round(best_results['momentum_factor'][i], 5)}; "
                   f"k={np.round(best_results['conv2d_size'][i], 5)}; "
                   f"m={np.round(best_results['conv2d_filters'][i], 5)}; "
                   f"n={np.round(best_results['maxpool_size'][i], 5)}; "
                   f"batch_size={batch_size_full}")

ax[0].set_title('\n'.join(title_str))
fig.tight_layout()
# fig.savefig('best_3_convolutional_neural_networks.png',  dpi=400)

#### Evaluate

In [None]:
def evaluate(
    model: nn.Module, loader: DataLoader
) -> Tuple[float, float]:
    """Computes test loss and accuracy of model on loader."""
    loss = nn.CrossEntropyLoss()
    model.eval()
    test_loss = 0.0
    test_acc = 0.0
    with torch.no_grad():
        for (batch, labels) in loader:
            batch, labels = batch.to(DEVICE), labels.to(DEVICE)
            y_batch_pred = model(batch)
            batch_loss = loss(y_batch_pred, labels)
            test_loss = test_loss + batch_loss.item()

            pred_max = torch.argmax(y_batch_pred, 1)
            batch_acc = torch.sum(pred_max == labels)
            test_acc = test_acc + batch_acc.item()
        test_loss = test_loss / len(loader)
        test_acc = test_acc / (batch_size * len(loader))
        return test_loss, test_acc

In [None]:
test_loss, test_acc = evaluate(model, test_loader)
print(f"Test Accuracy: {test_acc}")