In [None]:
"""
Contains functionality for creating PyTorch DataLoaders for
image classification data.
"""
from typing import List, Tuple

import torch
from torchvision import datasets, transforms

def data_loader(data_dir: str, batch_size: int, shuffle: bool=True) \
    -> Tuple[torch.utils.data.DataLoader, torch.utils.data.DataLoader, List[str]]:
    """
    Takes in a dataset directory and returns a train and test data loader

    Parameters
    ----------
    data_dir: str
        relative path to directory containing dataset
    batch_size: int
        batch size to load into torch.utils.data.DataLoader
    shuffle: bool
        whether to shuffle batches of torch.utils.data.DataLoader

    Retrns
    ------
    Tuple[torch.utils.data.DataLoader, torch.utils.data.DataLoader, List[str]]
        (train_dataloader, test_dataloader, class_names). class_names is a list of target classes.

    """
    normalize = transforms.Normalize(
        mean=[0.4799],
        std=[0.2386],
    )

    # define transforms
    transform = transforms.Compose([
        transforms.RandomCrop(32, padding=4, padding_mode='reflect'),
        transforms.RandomHorizontalFlip(),
        transforms.Grayscale(num_output_channels=1),
        transforms.ToTensor(),
        normalize
    ])

    # Load the dataset
    test_data = datasets.CIFAR10(
        root=data_dir, train=False,
        download=True, transform=transform,
    )
    train_data = datasets.CIFAR10(
        root=data_dir, train=True,
        download=True, transform=transform,
    )

    # Get class names
    class_names = train_data.classes

    # Turn images into data loaders
    train_loader = torch.utils.data.DataLoader(
        train_data, batch_size=batch_size, shuffle=shuffle)
    test_loader = torch.utils.data.DataLoader(
        test_data, batch_size=batch_size, shuffle=shuffle)

    return train_loader, test_loader, class_names


# Model

In [None]:
"""
Contains PyTorch model code to instantiate a VGG-16 model.
"""
import torch
from torch import nn


class VGG16(nn.Module):
    """
    Adapts the VGG-16 architecture for 1 channel grey-scaled images found at the following source:
    https://arxiv.org/pdf/1409.1556.pdf?ref=blog.paperspace.com
    """
    def __init__(self, num_classes=10):
        super(VGG16, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(1, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU())
        self.layer2 = nn.Sequential(
            nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = 2, stride = 2))
        self.layer3 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU())
        self.layer4 = nn.Sequential(
            nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = 2, stride = 2))
        self.layer5 = nn.Sequential(
            nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU())
        self.layer6 = nn.Sequential(
            nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU())
        self.layer7 = nn.Sequential(
            nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = 2, stride = 2))
        self.layer8 = nn.Sequential(
            nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU())
        self.layer9 = nn.Sequential(
            nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU())
        self.layer10 = nn.Sequential(
            nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = 2, stride = 2))
        self.layer11 = nn.Sequential(
            nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU())
        self.layer12 = nn.Sequential(
            nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU())
        self.layer13 = nn.Sequential(
            nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = 2, stride = 2))
        self.fc = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(512*1*1, 4096),
            nn.ReLU())
        self.fc1 = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(4096, 4096),
            nn.ReLU())
        self.fc2= nn.Sequential(
            nn.Linear(4096, num_classes))

    def forward(self, x: torch.Tensor):
        out = self.layer1(x)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = self.layer5(out)
        out = self.layer6(out)
        out = self.layer7(out)
        out = self.layer8(out)
        out = self.layer9(out)
        out = self.layer10(out)
        out = self.layer11(out)
        out = self.layer12(out)
        out = self.layer13(out)
        out = out.reshape(out.size(0), -1)
        out = self.fc(out)
        out = self.fc1(out)
        out = self.fc2(out)
        return out


# Utils

In [None]:
from typing import Dict, List

import matplotlib.pyplot as plt


def plot_loss_curves(results: Dict[str, List[float]]):
    loss = results['train_loss']
    test_loss = results['test_loss']

    acc = results['train_acc']
    test_acc = results['test_acc']

    number_of_epochs = range(len(results['train_loss']))

    plt.figure(figsize=(15, 7))

    # Plot loss curve
    plt.subplot(1, 2, 1)
    plt.plot(number_of_epochs, loss, label='train_loss')
    plt.plot(number_of_epochs, test_loss, label='test_loss')
    plt.title("Loss")
    plt.xlabel("Epochs")
    plt.legend()

    # Plot accuracy curve
    plt.subplot(1, 2, 2)
    plt.plot(number_of_epochs, acc, label='train_accuracy')
    plt.plot(number_of_epochs, test_acc, label='test_accuracy')
    plt.title("Accuracy")
    plt.xlabel("Epochs")
    plt.legend()

    plt.show()


def get_lr(optimizer):
    for param_group in optimizer.param_groups:
        return param_group['lr']

# Main

In [None]:
"""
Contains functions for training and testing model.
"""
from typing import List, Tuple, Dict

import torch

def train_step(model: torch.nn.Module,
               data_loader: torch.utils.data.DataLoader,
               loss_fn: torch.nn.Module,
               optimizer: torch.optim.Optimizer,
               device: torch.device) -> Tuple[float, float]:
    """
    Step through one epoch of training step

    Parameters
    ----------
    model: torch.nn.Module
        A PyTorch model to be trained.
    dataloader: torch.utils.data.DataLoader
        A DataLoader instance for the model to be trained on.
    loss_fn: torch.nn.Module
        A PyTorch loss function to minimize.
    optimizer: torch.optim.Optimizer
        A PyTorch optimizer to help minimize the loss function.
    device: torch.cuda.device
        A target device to compute on (e.g. "cuda" or "cpu").

    Returns
    -------
    Tuple[float, float]
        A tuple of training loss and training accuracy metrics. In the form (train_loss, train_accuracy).
    """
    train_loss, train_accuracy = 0, 0

    model.train()

    for batch, (X, y) in enumerate(data_loader):
        X, y = X.to(device), y.to(device)

        y_pred = model(X)

        # Calculate and append loss
        loss = loss_fn(y_pred, y)
        train_loss += loss.item()

        optimizer.zero_grad()

        loss.backward()

        optimizer.step()

        # Calculate and accumulate accuracy metric across all batches
        y_pred_class = torch.argmax(torch.softmax(y_pred, dim=1), dim=1)
        train_accuracy += (y_pred_class == y).sum().item()/len(y_pred)

    train_loss /= len(data_loader)
    train_accuracy /= len(data_loader)
    return train_loss, train_accuracy

def test_step(model: torch.nn.Module,
              data_loader: torch.utils.data.DataLoader,
              loss_fn: torch.nn.Module,
              device: torch.cuda.device) -> Tuple[float, float]:
    """
    Step through one epoch of training step

    Parameters
    ----------
    model: torch.nn.Module
        A PyTorch model to be tested.
    dataloader: torch.utils.data.DataLoader
        A DataLoader instance for the model to be tested on.
    loss_fn: torch.nn.Module
        A PyTorch loss function to minimize.
    device: torch.cuda.device
        A target device to compute on (e.g. "cuda" or "cpu").

    Returns
    -------
    Tuple[float, float]
        A tuple of test loss and test accuracy metrics. In the form (test_loss, test_accuracy).
    """
    test_loss, test_accuracy = 0, 0

    model.eval()

    with torch.no_grad():
        for X_test, y_test in data_loader:

            X_test, y_test = X_test.to(device), y_test.to(device)

            # forward pass
            test_pred_logits = model(X_test)

            # Calculate and accumulate loss
            loss = loss_fn(test_pred_logits, y_test)
            test_loss += loss.item()

            # Calculate and accumulate accuracy
            test_pred_labels = test_pred_logits.argmax(dim=1)
            test_accuracy += ((test_pred_labels == y_test).sum().item()/len(test_pred_labels))

        test_loss /= len(data_loader)
        test_accuracy /= len(data_loader)
        return test_loss, test_accuracy


# Train Entire

In [None]:
def train(model: torch.nn.Module,
          train_dataloader: torch.utils.data.DataLoader,
          test_dataloader: torch.utils.data.DataLoader,
          optimizer: torch.optim.Optimizer,
          loss_fn: torch.nn.Module,
          epochs: int,
          max_lr: float,
          device: torch.device,
          grad_clip: float =None) -> Dict[str, List[float]]:
    """
    Passes a target PyTorch models through train_step() and test_step()
    functions for a number of epochs, training and testing the model
    in the same epoch loop.

    Calculates, prints and stores evaluation metrics throughout.

    Parameters
    ----------
    model: torch.nn.Module
        A PyTorch model to be trained and tested.
    train_loader: torch.utils.data.DataLoader
        A DataLoader instance for the model to be trained on.
    test_loader: torch.utils.data.DataLoader
        A DataLoader instance for the model to be tested on.
    optimizer: torch.optim.Optimizer
        A PyTorch optimizer to help minimize the loss function.
    loss_fn: torch.nn.Module
        A PyTorch loss function to calculate loss on both datasets.
    epochs: int
        An integer indicating how many epochs to train for.
    max_lr: float
        A float representing the maximum learning rate for the learing_rate
        scheduler.
    device: torch.device
        A target device to compute on
    grad_clip: float = None
        A float representing the maximum gradient threshold during training
        to stabilize gradient values.

    Returns
    -------
    Dict[str, List]
        A dictionary of training and testing loss as well as training and
        testing accuracy metrics. Each metric has a value in a list for
        each epoch.
        {train_loss: [...],
        train_acc: [...],
        test_loss: [...],
        test_acc: [...],
        lr: [...]}
    """
    torch.cuda.empty_cache()
    history = []

    # Set up one-cycle learning rate scheduler
    sched = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr, epochs=epochs,
                                                steps_per_epoch=len(train_dataloader))

    model.to(device)  # Move the model to the specified device

    # Create empty results dictionary
    results = {"train_loss": [],
        "train_acc": [],
        "test_loss": [],
        "test_acc": [],
        "lrs": []
    }

    for epoch in range(epochs):
        print("Learning Rate:", optimizer.param_groups[0]['lr'])


        train_loss, train_acc = train_step(model=model,
                                           data_loader=train_dataloader,
                                           loss_fn=loss_fn,
                                           optimizer=optimizer,
                                           device=device)
        test_loss, test_acc = test_step(model=model,
                                        data_loader=test_dataloader,
                                        loss_fn=loss_fn,
                                        device=device)

        current_lr = get_lr(optimizer)

        print(
            f"Epoch: {epoch+1} | "
            f"train_loss: {train_loss:.4f} | "
            f"train_acc: {train_acc:.4f} | "
            f"test_loss: {test_loss:.4f} | "
            f"test_acc: {test_acc:.4f} |"
            f"lr: {current_lr:.4f}"
        )

        # Update results dictionary
        results["train_loss"].append(train_loss)
        results["train_acc"].append(train_acc)
        results["test_loss"].append(test_loss)
        results["test_acc"].append(test_acc)
        results["lrs"].append(current_lr)

        # Update learning rate
        sched.step()

    # Return the filled results at the end of the epochs
    return results

# Hyperopt

In [None]:
from hyperopt import hp, fmin, tpe, Trials

def optimize_hyperparameters(data_dir, device, num_evals=50):
    # Define the search space for hyperparameters
    space = {
        'num_epochs': hp.quniform('num_epochs', 10, 50, 1),
        'batch_size': hp.choice('batch_size', [16, 32, 64, 128]),
        'max_lr': hp.loguniform('max_lr', -4, -1),
        'grad_clip': hp.uniform('grad_clip', 0.0, 0.5),
        'weight_decay': hp.loguniform('weight_decay', -6, -2)
    }

    # Define your objective function for Hyperopt
    def objective(params):
        # Create dataloaders from data_setup.py
        train_dataloader, test_dataloader, class_names = data_loader(
            data_dir=data_dir,
            batch_size=params['batch_size']
        )

        # Create and configure the model using the provided hyperparameters
        model = VGG16(num_classes=len(class_names))
        loss_fn = torch.nn.CrossEntropyLoss()
        optimizer = torch.optim.SGD(model.parameters(), lr=params['max_lr'], weight_decay=params['weight_decay'],)

        # Start training from engine.py
        results = train(
            model=model,
            train_dataloader=train_dataloader,
            test_dataloader=test_dataloader,
            loss_fn=loss_fn,
            optimizer=optimizer,
            epochs=int(params['num_epochs']),
            device=device,
            max_lr=params['max_lr'],  # Use the sampled learning rate
            grad_clip=params['grad_clip']
        )

        # Return a value to minimize (e.g., negative test accuracy)
        return -results['test_acc'][-1]

    # Set up Hyperopt for optimization
    trials = Trials()
    best = fmin(fn=objective, space=space, algo=tpe.suggest, max_evals=num_evals, trials=trials)

    # Retrieve the best hyperparameters
    best_num_epochs = int(best['num_epochs'])
    best_max_lr = best['max_lr']
    best_grad_clip = best['neurons_per_layer']
    best_weight_decay = best['weight_decay']

    return best_num_epochs, best_max_lr, best_grad_clip, best_weight_decay

if __name__ == '__main__':
    data_dir = 'data'
    device = "cuda" if torch.cuda.is_available() else "cpu"

    best_num_epochs, best_max_lr, best_grad_clip, best_weight_decay = optimize_hyperparameters(data_dir, device)

    print(f'Best hyperparameters: '
          f'epochs: {best_num_epochs} | '
          f'lr: {best_max_lr} | '
          f'grad_clip: {best_grad_clip} | '
          f'weight_decay: {best_weight_decay}')


Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to data/cifar-10-python.tar.gz
  0%|          | 0/50 [00:01<?, ?trial/s, best loss=?]

  0%|          | 0/170498071 [00:00<?, ?it/s]
[A
  0%|          | 32768/170498071 [00:00<19:39, 144523.34it/s]
[A
  0%|          | 65536/170498071 [00:00<19:23, 146491.55it/s]
[A
  0%|          | 98304/170498071 [00:00<19:18, 147087.66it/s]
[A
  0%|          | 229376/170498071 [00:00<08:53, 319141.80it/s]
[A
  0%|          | 458752/170498071 [00:01<04:59, 567952.47it/s]
[A
  0%|          | 851968/170498071 [00:01<02:23, 1181912.69it/s]
[A
  1%|          | 1048576/170498071 [00:01<02:19, 1216941.76it/s]
[A
  1%|          | 1703936/170498071 [00:01<01:12, 2337996.84it/s]
[A
  1%|1         | 2031616/170498071 [00:01<01:08, 2462486.97it/s]
[A
  2%|1         | 3080192/170498071 [00:01<00:38, 4399123.12it/s]
[A
  2%|2         | 3702784/170498071 [00:01<00:34, 4848236.72it/s]
[A
  3%|3         | 5373952/170498071 [00:01<00:20, 8010824.17it/s]
[A
  4%|4         | 7110656/170498071 [00:02<00:15, 10567413.41it/s]
[A
  5%|4         | 8257536/170498071 [00:02<00:16, 9710679.65it/s] 

Extracting data/cifar-10-python.tar.gz to data
Files already downloaded and verified
Learning Rate:
0.011479155095565408
Epoch: 1 | train_loss: 2.1663 | train_acc: 0.1522 | test_loss: 3.7266 | test_acc: 0.1201 |lr: 0.0115
Learning Rate:
0.011479159672797501
Epoch: 2 | train_loss: 2.1475 | train_acc: 0.1616 | test_loss: 2.1935 | test_acc: 0.1563 |lr: 0.0115
Learning Rate:
0.011479173404493392
Epoch: 3 | train_loss: 2.1492 | train_acc: 0.1604 | test_loss: 2.1506 | test_acc: 0.1639 |lr: 0.0115
Learning Rate:
0.011479196290652138
Epoch: 4 | train_loss: 2.1537 | train_acc: 0.1602 | test_loss: 2.1610 | test_acc: 0.1673 |lr: 0.0115
Learning Rate:
0.011479228331272351
Epoch: 5 | train_loss: 2.1435 | train_acc: 0.1641 | test_loss: 2.3764 | test_acc: 0.1124 |lr: 0.0115
Learning Rate:
0.011479269526351754
Epoch: 6 | train_loss: 2.1530 | train_acc: 0.1605 | test_loss: 2.1352 | test_acc: 0.1650 |lr: 0.0115
Learning Rate:
0.011479319875887684
Epoch: 7 | train_loss: 2.2893 | train_acc: 0.1074 | test_

# Main

In [None]:
"""
Train a classification model for the CIFAR-10 dataset.
"""

import torch

# Setup hyperparameters
NUM_EPOCHS = 12
BATCH_SIZE = 32
MAX_LR = 0.1 # threshold for learning_rate from the learing rate scheduler
GRAD_CLIP = 0.1 # threshold for gradient values
WEIGHT_DECAY = 1e-4 # regularization parameter

if __name__ == '__main__':
    # Setup data directory
    data_dir = 'data'

    # Setup device
    device = "cuda" if torch.cuda.is_available() else "cpu"

    # Create dataloaders from data_setup.py
    train_dataloader, test_dataloader, class_names = data_loader(
        data_dir=data_dir,
        batch_size=BATCH_SIZE
    )

    # Create VGG16 model from model_builder.py
    model = VGG16(num_classes=len(class_names))

    # Set loss and optimizer
    loss_fn = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=MAX_LR, weight_decay=WEIGHT_DECAY)

    # Start training from engine.py
    results = train(model=model,
        train_dataloader=train_dataloader,
        test_dataloader=test_dataloader,
        loss_fn=loss_fn,
        optimizer=optimizer,
        epochs=NUM_EPOCHS,
        device=device,
        max_lr=MAX_LR,
        grad_clip=GRAD_CLIP
    )

    plot_loss_curves(results)
