Environment: pytorch

# <font color='purple'>Convolutional Neural Network
In this notebook we train a convolutional neural network to classify images into one of the 10 classes

In [4]:
import pandas as pd

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import torchvision
import torchvision.transforms as transforms

import optuna
from optuna.trial import TrialState

In [2]:
my_seed = 42

## <font color = 'blue'>Import Training Data

In [3]:
my_mean = torch.load('mean.pt')
my_std = torch.load('std_dev.pt')

def get_CIFAR(train):
    """
    Function to download and transform CIFAR dataset
    :param train: Boolean value. If True, return training dataset. If False return test dataset.
    :param mean: sequence of means for each channel, to be used for normalisation
    :param std_dev: sequence of std deviations for each channel, to be used for normalisation
    :return: Dataset
    """
    my_transform = transforms.Compose([transforms.ToTensor(),
                                       transforms.Normalize(my_mean, my_std)])

    my_cifar = torchvision.datasets.CIFAR10(root='./data', train=train, download=True, transform=my_transform)

    return my_cifar

In [4]:
trainset = get_CIFAR(train = True)

Files already downloaded and verified


## <font color = 'blue'>Train Model

**Tune hyperparameters**

In [5]:
def set_parameters(trial, my_optimizer):
    """
    Set parameters for neural network, optimisation algorithm etc.
    :param trial: Optuna trial object
    :param my_optimizer: optimizer to use - SGD / SGD_classical / SGD_nesterov / Adam

    :return: dictionary of parameters:
            - n_conv_layers: number of convolution layers in neural network
            - out_ch_conv{i}: number of output channels in convolution layer i
            - kernel_conv{i}_even: kernel width in convolution layer i - even option
            - kernel_conv{i}_odd:                                      - odd option

            - n_linear_layers: number of linear layers in neural network
            - n_units_lin{i}: number of units in linear layer i
            - dropout_lin{i}: dropout probability for linear layer i

            - lr: learning rate
            - batch_size: batch size
            - n_epochs = number of epochs (i.e. number of passes through training data during optimisation)
    """
    trial.suggest_int("n_conv_layers", 2, 2)

    for i in range(trial.params['n_conv_layers']):
        trial.suggest_int(f'out_ch_conv{i}', 1, 50)
        trial.suggest_categorical(f'kernel_conv{i}_even', [2, 4, 6])
        trial.suggest_categorical(f'kernel_conv{i}_odd', [3, 5, 7])

    trial.suggest_int("n_linear_layers", 1, 3)

    for i in range(trial.params['n_linear_layers']):
        trial.suggest_int(f'n_units_lin{i}', 1, 200)
        trial.suggest_float(f"dropout_lin{i}", 0.1, 0.9)

    trial.suggest_float("lr", 1e-5, 1e-1, log=True)
    trial.suggest_int("batch_size", 10, 10)
    trial.suggest_int("n_epochs", 5, 5)

    trial.suggest_categorical("optimizer", [my_optimizer])
    if (my_optimizer=='SGD_classical') | (my_optimizer=='SGD_nesterov'):
        trial.suggest_float("momentum", 0.6, 0.999)
    elif my_optimizer=='Adam':
        trial.suggest_float("beta1", 0.6, 0.999)
        trial.suggest_float("beta2", 0.8, 0.999)

    my_params = trial.params

    return my_params

In [6]:
def define_model(my_params):
    """Defines convolutional neural network based on set parameters
    :param my_params: dictionary of parameters (see set_parameters() for full list)
    """

    layers = []

    # Define Convolution Layers
    in_ch = 3  # number of input channels = no. of channels in feature matrix = 3 (RGB)
    img_width = 32 # number of px along length & width of feature matrix
    for i in range(my_params['n_conv_layers']):
        # convolution layer
        out_ch = my_params[f'out_ch_conv{i}']  # number of output channels for this layer
        # for even image width use odd kernel width so that resulting img width is divisible by 2 during pooling
        if (img_width % 2) == 0:
            kernel_size = my_params[f'kernel_conv{i}_odd']
        else:
            kernel_size = my_params[f'kernel_conv{i}_even']
        layers.append(nn.Conv2d(in_ch, out_ch, kernel_size))

        layers.append(nn.ReLU())  # activation function
        layers.append(nn.MaxPool2d(2,2))  # pooling layer

        in_ch = out_ch  # no. of input channels for next layer = no. of output channels from this layer
        img_width = int((img_width-(kernel_size-1))/2)

    layers.append(nn.Flatten(start_dim=1))  # flatten all dimensions except batch

    # Define Linear Layers
    in_features = in_ch * img_width * img_width
    for i in range(my_params['n_linear_layers']):
        # linear layer
        out_features = my_params[f'n_units_lin{i}']
        layers.append(nn.Linear(in_features, out_features))

        layers.append(nn.ReLU())  # activation function

        #drop-out regularisation
        p = my_params[f"dropout_lin{i}"]
        layers.append(nn.Dropout(p))

        in_features = out_features  # no. of inputs for next layer = no. of outputs of this layer

    layers.append(nn.Linear(in_features, 10))  # output layer

    return nn.Sequential(*layers)

In [7]:
def get_train_val_dataloader(training_dataset, my_batchsize, my_seed = None):
    """
    Function to split training data into training and validation subsets and format as dataloaders
    Model performance on validation set will be used for hyperparameter tuning.

    :param training_dataset: full set of training data, in pytorch Dataset format
    :param my_batchsize: batch size for pytorch DataLoader
    :param my_seed: optional seed to be used for train test split random state

    :return: tuple of pytorch DataLoaders - train_dataloader, val_dataloader
    """

    # separate into training & validation datasets
    total_len = len(training_dataset.data)
    train, val = torch.utils.data.random_split(dataset=training_dataset, lengths=[int(0.8*total_len), int(0.2*total_len)])

    #format as pytorch dataloader
    train_dataloader = DataLoader(train, batch_size=my_batchsize, shuffle=True)
    val_dataloader = DataLoader(val, batch_size=my_batchsize)

    return train_dataloader, val_dataloader

In [8]:
def count_correct(predictions, y):
    """
    Counts number of correct predictions in a batch
    :param predictions: 1D tensor with predictions
    :param y: 1D tensor with true classes
    :return: number of correct predictions (pred==y)
    """
    predictions = predictions.numpy()
    y = y.numpy()

    n_correct = (predictions == y).sum()

    return n_correct

In [9]:
def objective(trial, my_optimizer):
    """
    Objective for Optuna to optimise
    :param trial: Optuna trial object
    :param optimizer_name: optimizer to use
                            - SGD: SGD without momentum
                            - SGD_classical: SGD with classical momentum
                            - SGD_nesterov: SGD with nesterov momentum
                            - Adam
    :return: accuracy - fraction of correctly labelled validation points. This is what Optuna seeks to maximise
    """

    #set parameters
    my_params = set_parameters(trial, my_optimizer)

    # Instantiate model
    model = define_model(my_params)

    # Instantiate optimizer
    lr = my_params['lr']
    if my_optimizer == 'SGD':
        optimizer = getattr(optim, "SGD")(model.parameters(), lr=lr)
    elif my_optimizer == 'SGD_classical':
        momentum = my_params['momentum']
        optimizer = getattr(optim, "SGD")(model.parameters(), lr=lr, momentum=momentum)
    elif my_optimizer == 'SGD_nesterov':
        momentum = my_params['momentum']
        optimizer = getattr(optim, "SGD")(model.parameters(), lr=lr, momentum=momentum,
                                                   nesterov=True)
    elif my_optimizer == 'Adam':
        beta1 = my_params['beta1']
        beta2 = my_params['beta2']
        optimizer = getattr(optim, "Adam")(model.parameters(), lr=lr, betas=(beta1, beta2))
    else:
        raise ValueError("optimizer_name must be 'SGD' / 'SGD_classical' / 'SGD_nesterov' / 'Adam'")

    # get data
    train_dataloader, val_dataloader = get_train_val_dataloader(training_dataset=trainset,
                                                                          my_batchsize=my_params['batch_size'])

    # train model
    for epoch in range(my_params['n_epochs']):

        #train
        model.train()
        for batch, (X, y) in enumerate(train_dataloader):
            # X and y are tensors. X.size() = (batch_size,n_features), y.size()=(batch_size,)
            # set datatype for compatibility with nn.
            X = X.float()
            y = y.long()

            # calculate model output and resulting loss
            model_output = model(X)  # tensor. size=(batch_size x n_classes)
            loss_fn = nn.CrossEntropyLoss() # instantiate loss function
            loss = loss_fn(model_output, y)

            # Backpropagation to update model weights
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # validate. We do this at each epoch to facilitate pruning:
        # i.e. early termination of trials which are clearly not going to be optimum
        model.eval()
        correct = 0
        with torch.no_grad():
            for batch, (X, y) in enumerate(val_dataloader):
                X = X.float()
                y = y.long()

                # calculate model output and total number of correct predictions for this batch
                model_output = model(X)
                pred = torch.argmax(model_output, dim=1)  # prediction = class with highest output value
                correct += count_correct(pred, y)

        accuracy = correct / len(val_dataloader.dataset)

        # report accuracy to allow Optuna to decide whether to prune this trial
        trial.report(accuracy, epoch)
        if trial.should_prune():
            raise optuna.exceptions.TrialPruned()

    return accuracy  # return final validation accuracy after all epochs (unless pruned)

Optimisation algorithm is selected manually from:
- "SGD": SGD without momentum
- "SGD_classical": SGD with classical momentum
- "SGD_nesterov": SGD with nesterov momentum
- "Adam"

In [10]:
my_optim = input("Select optimisation algorithm (SGD/ SGD_classical, SGD_nesterov, Adam): ")

Select optimisation algorithm (SGD/ SGD_classical, SGD_nesterov, Adam): SGD_nesterov


In [11]:
# instantiate optuna study
study = optuna.create_study(direction="maximize", sampler=optuna.samplers.TPESampler())
# Optimise hyperparameters will try {n_trials} param combinations or till {timeout} seconds is hit
study.optimize(lambda trial: objective(trial, my_optim), n_trials=100)

[32m[I 2022-12-06 14:35:41,723][0m A new study created in memory with name: no-name-bab891ab-1b12-432b-a62c-103e9253f286[0m
[32m[I 2022-12-06 14:38:23,692][0m Trial 0 finished with value: 0.3281 and parameters: {'n_conv_layers': 2, 'out_ch_conv0': 9, 'kernel_conv0_even': 6, 'kernel_conv0_odd': 7, 'out_ch_conv1': 24, 'kernel_conv1_even': 4, 'kernel_conv1_odd': 3, 'n_linear_layers': 3, 'n_units_lin0': 105, 'dropout_lin0': 0.7532499858263603, 'n_units_lin1': 104, 'dropout_lin1': 0.11838041596539997, 'n_units_lin2': 53, 'dropout_lin2': 0.7335057596935689, 'lr': 0.003954673847025026, 'batch_size': 10, 'n_epochs': 5, 'optimizer': 'SGD_nesterov', 'momentum': 0.7886969989376063}. Best is trial 0 with value: 0.3281.[0m
[32m[I 2022-12-06 14:40:49,256][0m Trial 1 finished with value: 0.6201 and parameters: {'n_conv_layers': 2, 'out_ch_conv0': 13, 'kernel_conv0_even': 2, 'kernel_conv0_odd': 7, 'out_ch_conv1': 34, 'kernel_conv1_even': 6, 'kernel_conv1_odd': 7, 'n_linear_layers': 1, 'n_units

[32m[I 2022-12-06 15:38:11,053][0m Trial 29 pruned. [0m
[32m[I 2022-12-06 15:38:54,164][0m Trial 30 pruned. [0m
[32m[I 2022-12-06 15:43:51,232][0m Trial 31 finished with value: 0.6586 and parameters: {'n_conv_layers': 2, 'out_ch_conv0': 49, 'kernel_conv0_even': 2, 'kernel_conv0_odd': 3, 'out_ch_conv1': 20, 'kernel_conv1_even': 6, 'kernel_conv1_odd': 3, 'n_linear_layers': 1, 'n_units_lin0': 93, 'dropout_lin0': 0.18427834760558845, 'lr': 0.005952288879299075, 'batch_size': 10, 'n_epochs': 5, 'optimizer': 'SGD_nesterov', 'momentum': 0.6490634484539481}. Best is trial 16 with value: 0.6875.[0m
[32m[I 2022-12-06 15:48:23,962][0m Trial 32 pruned. [0m
[32m[I 2022-12-06 15:53:08,411][0m Trial 33 finished with value: 0.6824 and parameters: {'n_conv_layers': 2, 'out_ch_conv0': 44, 'kernel_conv0_even': 2, 'kernel_conv0_odd': 3, 'out_ch_conv1': 31, 'kernel_conv1_even': 6, 'kernel_conv1_odd': 3, 'n_linear_layers': 1, 'n_units_lin0': 97, 'dropout_lin0': 0.198053644754176, 'lr': 0.00460

[32m[I 2022-12-06 17:04:02,365][0m Trial 75 pruned. [0m
[32m[I 2022-12-06 17:05:02,940][0m Trial 76 pruned. [0m
[32m[I 2022-12-06 17:10:35,988][0m Trial 77 finished with value: 0.6904 and parameters: {'n_conv_layers': 2, 'out_ch_conv0': 45, 'kernel_conv0_even': 2, 'kernel_conv0_odd': 3, 'out_ch_conv1': 44, 'kernel_conv1_even': 6, 'kernel_conv1_odd': 3, 'n_linear_layers': 1, 'n_units_lin0': 154, 'dropout_lin0': 0.17705477709707426, 'lr': 0.00427669587129408, 'batch_size': 10, 'n_epochs': 5, 'optimizer': 'SGD_nesterov', 'momentum': 0.6999135583225274}. Best is trial 77 with value: 0.6904.[0m
[32m[I 2022-12-06 17:11:38,289][0m Trial 78 pruned. [0m
[32m[I 2022-12-06 17:12:33,002][0m Trial 79 pruned. [0m
[32m[I 2022-12-06 17:13:12,708][0m Trial 80 pruned. [0m
[32m[I 2022-12-06 17:18:46,577][0m Trial 81 finished with value: 0.7056 and parameters: {'n_conv_layers': 2, 'out_ch_conv0': 46, 'kernel_conv0_even': 2, 'kernel_conv0_odd': 3, 'out_ch_conv1': 50, 'kernel_conv1_even'

In [12]:
#display study results
pruned_trials = study.get_trials(deepcopy=False, states=[TrialState.PRUNED])
complete_trials = study.get_trials(deepcopy=False, states=[TrialState.COMPLETE])

print("Study statistics: ")
print("  Number of finished trials: ", len(study.trials))
print("  Number of pruned trials: ", len(pruned_trials))
print("  Number of complete trials: ", len(complete_trials))

print("Best trial:")
best_trial = study.best_trial

print("  Validation Accuracy: ", best_trial.value)

print("  Params: ")
for key, value in best_trial.params.items():
    print(f"    {key}: {value}")

Study statistics: 
  Number of finished trials:  100
  Number of pruned trials:  64
  Number of complete trials:  36
Best trial:
  Validation Accuracy:  0.7056
  Params: 
    n_conv_layers: 2
    out_ch_conv0: 46
    kernel_conv0_even: 2
    kernel_conv0_odd: 3
    out_ch_conv1: 50
    kernel_conv1_even: 6
    kernel_conv1_odd: 3
    n_linear_layers: 1
    n_units_lin0: 122
    dropout_lin0: 0.18699223741707605
    lr: 0.006360806468362518
    batch_size: 10
    n_epochs: 5
    optimizer: SGD_nesterov
    momentum: 0.6606857381635803


**Train final model using tuned hyperparameters**

In [13]:
def train_final_model(my_params):
    """
    Train final model using tuned hyperparameters from best Optuna trial
    :param my_params: dictionary of parameters from Optuna trial object that had best validation accuracy

    :return: model
    """

    # Instantiate model
    model = define_model(my_params)

    # Instantiate optimizer
    my_optimizer = my_params['optimizer']
    lr = my_params['lr']
    if my_optimizer == 'SGD':
        optimizer = getattr(optim, "SGD")(model.parameters(), lr=lr)
    elif my_optimizer == 'SGD_classical':
        momentum = my_params['momentum']
        optimizer = getattr(optim, "SGD")(model.parameters(), lr=lr, momentum=momentum)
    elif my_optimizer == 'SGD_nesterov':
        momentum = my_params['momentum']
        optimizer = getattr(optim, "SGD")(model.parameters(), lr=lr, momentum=momentum,
                                          nesterov=True)
    elif my_optimizer == 'Adam':
        beta1 = my_params['beta1']
        beta2 = my_params['beta2']
        optimizer = getattr(optim, "Adam")(model.parameters(), lr=lr, betas=(beta1, beta2))
    else:
        raise ValueError("optimizer_name must be 'SGD' / 'SGD_classical' / 'SGD_nesterov' / 'Adam'")

    # get data
    train_dataloader = DataLoader(dataset=trainset, batch_size=my_params['batch_size'])

    # train model
    for epoch in range(my_params['n_epochs']):
        model.train()
        for batch, (X, y) in enumerate(train_dataloader):
            # set datatype for compatibility with nn.
            X = X.float()
            y = y.long()

            # calculate model output and resulting loss
            model_output = model(X)  # tensor. size=(batch_size x n_classes)
            loss_fn = nn.CrossEntropyLoss()  # instantiate loss function
            loss = loss_fn(model_output, y)

            # Backpropagation to update model weights
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

    return model

In [14]:
best_params = best_trial.params
final_model = train_final_model(best_params)

In [16]:
# EVALUATE FINAL TRAINING ACCURACY
def predict_and_evaluate(model, my_dataset):
    """
    Function to run trained and tuned model on provided dataframe to obtain predictions and evaluate
    accuracy

    :param model: trained model
    :param my_dataset: dataset including features and target/label

    :return: accuracy
    """
    my_dataloader = DataLoader(my_dataset, batch_size=10, shuffle=False)

    model.eval()
    correct = 0
    with torch.no_grad():
        for batch, (X, y) in enumerate(my_dataloader):
            X = X.float()
            y = y.long()

            # calculate model output and total number of correct predictions for this batch
            model_output = model(X)
            pred = torch.argmax(model_output, dim=1)  # prediction = class with highest output value
            correct += count_correct(pred, y)

    accuracy = correct / len(my_dataloader.dataset)

    return accuracy


train_acc = predict_and_evaluate(final_model, trainset)
print(f"  Final Training Accuracy: {train_acc}")

  Final Training Accuracy: 0.79506


## <font color='blue'>Test

In [18]:
testset = get_CIFAR(train = False)
test_acc = predict_and_evaluate(final_model, testset)
print(f"  Test Accuracy: {test_acc}")

Files already downloaded and verified
  Test Accuracy: 0.7116


## <font color='blue'>Save Final Model

In [19]:
torch.save(final_model, 'cnn_' + best_params['optimizer'] + '.pth')

## <font color = 'blue'> Past results

In [5]:
print('my_seed=42')
my_results = {'cnn_SGD_nesterov':0.7116}

myresults_df = pd.DataFrame.from_dict(my_results, 'index').reset_index().rename({'index':'model',0:'accuracy'},axis=1)
myresults_df

my_seed=42


Unnamed: 0,model,accuracy
0,cnn_SGD_nesterov,0.7116
