# KMINST Classifier Utils

## 1. Imports
These are important imports needed for the notebook to run

In [2]:
!pip install pycm livelossplot
%pylab inline

Populating the interactive namespace from numpy and matplotlib


### CUDA setup

In [3]:
from sklearn.metrics import accuracy_score
from sklearn.model_selection import StratifiedShuffleSplit
from sklearn.decomposition import PCA
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import StratifiedKFold
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification
from sklearn.naive_bayes import GaussianNB

from scipy.stats import mode

import numpy as np 
import pandas as pd

from livelossplot import PlotLosses
from pycm import *

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset 
from torch.utils.data import TensorDataset, DataLoader
import torchvision.transforms as transforms
from torchvision.transforms import Compose, ToTensor, Normalize, RandomRotation, ToPILImage, RandomAffine, Resize, RandomChoice, RandomRotation, RandomHorizontalFlip
from torchvision.datasets import KMNIST


def set_seed(seed):
    """
    Use this to set ALL the random seeds to a fixed value and take out any randomness from cuda kernels
    """
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

    torch.backends.cudnn.benchmark = False  ##uses the inbuilt cudnn auto-tuner to find the fastest convolution algorithms. -
    torch.backends.cudnn.enabled   = False

    return True

device = 'cpu'
if torch.cuda.device_count() > 0 and torch.cuda.is_available():
    print("Cuda installed! Running on GPU!")
    device = 'cuda'
else:
    print("No GPU available!")

Cuda installed! Running on GPU!


### Mounting Google drive

In [4]:
from google.colab import drive
drive.mount('/content/gdrive/')

Drive already mounted at /content/gdrive/; to attempt to forcibly remount, call drive.mount("/content/gdrive/", force_remount=True).


### Download the KMNIST Data from Google drive

In [0]:
np_kmnist_data = np.load("/content/gdrive/My Drive/kmnist_data/kmnist-train-imgs.npy")
np_kmnist_labels = np.load("/content/gdrive/My Drive/kmnist_data/kmnist-train-labels.npy")
np_kmnist_test = np.load("/content/gdrive/My Drive/kmnist_data/kmnist-test-imgs.npy")

# This converts numpy to tensor
kmnist_data = torch.tensor(np_kmnist_data)
kmnist_labels = torch.tensor(np_kmnist_labels)
kmnist_test = torch.tensor(np_kmnist_test)

## 2. Mean, Standard deviation and Manual Normalisation

In [0]:
def get_mean_std(train, val):
    """
    Gets the mean and std of a training and validation set
    
    Input: train - a torch.Tensor object
           val - a torch.Tensor object
           
    Returns: train_mean, val_mean, train_std, val_std (floats) - mean and standard deviations of train and val
    """
    train = np.asarray(train)
    val = np.asarray(val)
    
    train_mean = train.mean() / 255 
    val_mean = val.mean() / 255
    
    train_std = train.std() / 255
    val_std = val.std() / 255
  
    return train_mean, val_mean, train_std, val_std


def apply_normalization(X):
    '''
    This normalization function will normalize each individual image based on the mean and stdev of the entire dataset.
    Input: X - A torch.Tensor object
    
    Returns: X - torch.Tensor object
    '''
    # takes an average of the image channel, i
    mean = X.mean()
    # takes a stdev of the image channel, i
    std = X.std()
    X -= mean
    X /= std
    return X

## 3. Data Augmentation Setup

### CustomTensorDataSet

In [0]:
class CustomImageTensorDataset(Dataset):
    def __init__(self, data, targets, transform=None):
        """
        Args:
            data (Tensor): A tensor containing the data e.g. images
            targets (Tensor): A tensor containing all the labels
            transform (callable, optional): Optional transform to be applied
                on a sample.
        """
        self.data = data
        self.targets = targets
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample, label = self.data[idx], self.targets[idx]
        sample = sample.view(1, 28, 28).float()/255.
        if self.transform:
            sample = self.transform(sample)

        return sample, label

### Transformations

In [0]:
def transformed(mean, std, choice=0):
    """
    This function performs the composition of transformations depending on the choice of given

    Input: mean - float
           std - float
           choice - value of 0, 1 or 2 which chooses the type of transformation that is applied to the dataset
    """
    transform = Compose([
        ToPILImage(),
        RandomAffine(degrees=10., translate=(0.1, 0.1), shear=10.),
        ToTensor(),
        Normalize(mean=[mean], std=[std])
    ])

    if choice == 1:
        transform = Compose([
            ToPILImage(),
            ToTensor(),
            Normalize(mean=[mean], std=[std])
        ])

    if choice == 2:
        transform = Compose([
            ToPILImage(),
            RandomChoice([
                RandomRotation(10),
                RandomHorizontalFlip(1.0),
                RandomAffine(degrees=0, translate=(0.1, 0.1), shear=0.),
                RandomAffine(degrees=0, translate=(0., 0.), shear=10.)
            ]),
            ToTensor(),
            Normalize(mean=[mean], std=[std])
        ])

    return transform


# a random rotation transform to be used on a split training dataset
train__rotate = Compose([
    ToPILImage(),
    RandomRotation(10),
    ToTensor(),
    Normalize(mean=[0.1919], std=[0.3483])
])

# a random affine transform (includes a random rotation, translation and shear) to be used on a split training dataset
train__random = Compose([
    ToPILImage(),
    RandomAffine(degrees=10., translate=(0.1,0.1),shear=10.),
    ToTensor(),
    Normalize(mean=[0.1919], std=[0.3483])
])

# a random rotation transform to be used on the full kmnist dataset
full_train__rotate = Compose([
    ToPILImage(),
    RandomRotation(10),
    ToTensor(),
    Normalize(mean=[0.1919], std=[0.3483])
])

# a random affine transform to be used on the full kmnist dataset
full_train__random = Compose([
    ToPILImage(),
    RandomAffine(degrees=10., translate=(0.1,0.1),shear=10.),
    ToTensor(),
    Normalize(mean=[0.1919], std=[0.3483])
])

# a normalization transform to be used on a split validation dataset
validation_test_transform = Compose([
    Normalize(mean=[0.1919], std=[0.3486])
])

## 4.  K-fold Cross Validation

In [0]:
def k_split(X, y, splits=5):
  """
  This function splits the training set into k-folds to use for hyperparamter optimisation
  
  Input: X - The training set, a torch.Tensor object
         y - The training labels, a torch.Tensor object
         splits - default = 5, number of folds, Integer
         
  Returns: trains - List of training sets, list is length of k,  List of torch.Tensor objects
           vals - List of validation sets, list is length of k,  List of torch.Tensor objects
           train_labels - List of training labels, list is length of k,  List of torch.Tensor objects
           val_labels - List of validation labels, list is length of k,  List of torch.Tensor objects

  """
  
  if(splits==1): # Just split training and validation without k-fold 
    shuffler = StratifiedShuffleSplit(n_splits=1, test_size=0.1, random_state=42).split(X, y)
    indices = [(train_idx, validation_idx) for train_idx, validation_idx in shuffler][0]

    X_train, y_train = X[indices[0]], y[indices[0]]
    X_val, y_val = X[indices[1]], y[indices[1]]
    
    return X_train, X_val, y_train, y_val
  
  # Lists of training sets and labels and validation set and labels
  trains = []
  vals = [] 
  train_labels = [] 
  val_labels = []
  
  skf = StratifiedKFold(n_splits=splits, random_state=None)
  
  for train_index, val_index in skf.split(X,y): 
      X_train, X_val = X[train_index], X[val_index] 
      y_train, y_val = y[train_index], y[val_index]
      
      trains.append(X_train)
      vals.append(X_val)
      train_labels.append(y_train)
      val_labels.append(y_val)
      
  return trains, vals, train_labels, val_labels


def k_fold_optimisation(k_folds, momentums=None, weight_decays=None, lrs=None, epochs=None, optims=None):
  """
  This function performs k-fold cross validation 
  
  Input: k_folds - Number of folds, Integer
         momentum - List of momentum values for each fold, List of floats
         weight_decays - List of weight decay values for each fold, List of floats
         lrs - List of learning rate values for each fold, List of floats
         epochs - List of epoch values for each fold, List of integers
         weight_decays - List of weight decay values for each fold, List of floats
  
  """

  # Check that we have chosen at least one parameter to test
  if momentum == None and weight_decays == None and lrs == None and epochs == None and optims == None:
    print("Error: No parameter list!")
    return
  
  if k_folds < 2:
    print("Error: Must have more than 1 fold")
    return
  
  # Whatever parameters is chosen must be equal to number of k-folds
  assert(k_folds == len(momentums) or k_folds == len(weight_decays) or k_folds == len(lrs) or k_folds == len(epochs) or k_folds == len(optims))
  
  # Split using k-fold
  trains, valids, tr_labels, val_labels = k_split(kmnist_data, kmnist_labels, splits=k_folds)
  
  # Save the current sum of validation accuracies for the respective index of the parameters
  sum_accs = k_folds * [0.0]
  
  # Train each training set + test with validation set
  for i in range(len(trains)):
    train_mean, val_mean, train_std, val_std = get_mean_std(trains[i], valids[i])
    
    k_train = CustomTensorDataset(trains[i], tr_labels[i].long(), transform=transformed(train_mean, train_std))
    k_validate = CustomTensorDataset(valids[i], val_labels[i].long(), transform=transformed(val_mean, val_std))
  
    # Train with the different parameters
    for j in range(k_folds):
      params = None
      if len(momentums) == k_folds:
        params = {momentum: momentums[j]}
      elif len(weight_decays) == k_folds:
        params = {weight_decay: weight_decays[j]}
      elif len(lrs) == k_folds:
        params = {lr: lrs[j]}
      elif len(epochs) == k_folds:
        params = {n_epochs: epochs[j]}
      elif len(optims) == k_folds:
         params = {optim: optims[j]}
        
      model, train_accs, train_losses, valid_accs, valid_losses = train_model_params(k_train, k_validate, params)
      
      sum_accs[j] += max(valid_accs)
      
  # Get the average validation accuracy for each parameter 
  sum_accs /= k_folds
  
  # Get the index of the best parameter
  indx_of_max = np.argmax(sum_accs)
    
  if len(momentums) == k_folds:
    return momentums[indx_of_max], max(sum_accs)
  elif len(weight_decays) == k_folds:
    return weight_decays[indx_of_max], max(sum_accs)
  elif len(lrs) == k_folds:
    return lrs[indx_of_max], max(sum_accs)
  elif len(epochs) == k_folds:
    return epochs[indx_of_max], max(sum_accs)
  elif len(optims) == k_folds:
     return optims[indx_of_max], max(sum_accs)


## 5. Model Training Functions


### Parameters kept constant
We decided to keep some hyperparameters constant in order to reduce the number of parameters to optimise

In [0]:
seed = 42
batch_size = 64
test_batch_size = 1000

### Train, Validate and Evaluate

In [0]:
def train(model, optimizer, criterion, data_loader):
    """
    This function trains a neural network based on a selected optimizer and loss function
    
    Input: model - a torch.nn class, can be a custom class. Ex. LeNet5 below
           optimizer - a pytorch optim function. Ex. torch.optim.SGD
           criterion - a torch.nn class. Ex. nn.CrossEntropyLoss()
           data_loader - a pytorch DataLoader initialized with a training and validation/test set
    
    Returns: training losses and training accuracy (floats)
    """
    model.train()
    train_loss, train_accuracy = 0, 0
    for X, y in data_loader:
        X, y = X.to(device), y.to(device)
        optimizer.zero_grad()
        a2 = model(X.view(-1, 1, 28, 28))
        loss = criterion(a2, y)
        loss.backward()
        train_loss += loss*X.size(0)
        y_pred = F.log_softmax(a2, dim=1).max(1)[1]
        train_accuracy += accuracy_score(y.cpu().numpy(), y_pred.detach().cpu().numpy())*X.size(0)
        optimizer.step()  
        
    return train_loss/len(data_loader.dataset), train_accuracy/len(data_loader.dataset)


def validate(model, criterion, data_loader):
    """
    This function validates a neural network based on a selected optimizer and loss function
    
    Input: model - a torch.nn class, can be a custom class. Ex. LeNet5 below
           optimizer - a pytorch optim function. Ex. torch.optim.SGD
           criterion - a torch.nn class. Ex. nn.CrossEntropyLoss()
           data_loader - a pytorch DataLoader initialized with a training and validation/test set
    
    Returns: validation losses and validation accuracy (floats)
    """
    
    model.eval()
    validation_loss, validation_accuracy = 0., 0.
    for X, y in data_loader:
        with torch.no_grad():
            X, y = X.to(device), y.to(device)
            a2 = model(X.view(-1, 1, 28, 28))
            loss = criterion(a2, y)
            validation_loss += loss*X.size(0)
            y_pred = F.log_softmax(a2, dim=1).max(1)[1]
            validation_accuracy += accuracy_score(y.cpu().numpy(), y_pred.cpu().numpy())*X.size(0)
            
    return validation_loss/len(data_loader.dataset), validation_accuracy/len(data_loader.dataset)


def evaluate(model, X_test):
    """
    This function labels an input image based on a Softmax of a neural network output 
    
    Input: model - a torch.nn class, can be a custom class. Ex. LeNet5 below
           X_test - a pytorch.Tensor object
    
    Returns: y_preds - a list of the labels for each image in X
    """
    model.eval()
    y_preds = []
    for X in X_test:
        with torch.no_grad():
            X = X.to(device)
            a2 = model(X.view(-1, 1, 28, 28))
            y_pred = F.log_softmax(a2, dim=1).max(1)[1]
            y_preds.append(y_pred.cpu().numpy())
            
    return np.concatenate(y_preds, 0)


### Training functions

In [0]:
def train_model(model, trainset, validset, lr=1e-2, momentum=0.5, weight_decay=0.0):
    """
    This function trains a model through the a number of epoch cycles and optimizes the model parameters based on the loss function

    Input: trainset - a pytorch Dataset initialized with a training data Tensor
           validset - a pytorch Dataset initialized with validation/test data Tensor
           lr - learning rate used by model, float
           momentum - a parameter specified in the SGD optimizer, float
           weight_decay - a regularization parameter specified in the SGD optimizer, float

    Returns: model - a torch.nn class
    """
  
    set_seed(seed)
    model = model.to(device)
    optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=momentum, weight_decay=weight_decay)

    criterion = nn.CrossEntropyLoss()

    train_loader = DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=0)
    validation_loader = DataLoader(validset, batch_size=test_batch_size, shuffle=False, num_workers=0)

    liveloss = PlotLosses()

    for epoch in range(n_epochs):
        logs = {}
        train_loss, train_accuracy = train(model, optimizer, criterion, train_loader)

        logs['' + 'log loss'] = train_loss.item()
        logs['' + 'accuracy'] = train_accuracy.item()

        validation_loss, validation_accuracy = validate(model, criterion, validation_loader)

        logs['val_' + 'log loss'] = validation_loss.item()
        logs['val_' + 'accuracy'] = validation_accuracy.item()

        liveloss.update(logs)
        liveloss.draw()

    return model



def train_model_params(model, trainset, validset, full=False, plot=True, params=None):
    """
    This function trains a model through the a number of epoch cycles and optimizes the model parameters based on the loss function
    
    Input: trainset - a pytorch Dataset initialized with a training data Tensor
           validset - a pytorch Dataset initialized with validation/test data Tensor
           full - if True, then train on whole training set and validate on test set
           plot - if True, will display a livelossplot of the training and validation losses/accuracies. Else, prints the losses and accuracies
           params - dict of parameter values {momentum, weight_decay, lr, n_epochs, optim}. If parameter is not defined when called, then 
                    default values (below) are used
    
    Returns: model - a torch.nn class
             train_accs - list of training accuracy scores, list of floats
             train_losses - list of training losses, list of floats
             valid_accs - list of validation accuracy scores, list of floats
             valid_losses - list of validation losses, list of floats
    """
  
    # Default parameters
    momentum = 0.5 
    weight_decay = 0.0
    lr = 1e-2
    n_epochs = 30 
    optim = 'SGD'

    if params: # If params is set with user defined values
      momentum = params.get('momentum') if params.get('momentum') else momentum
      weight_decay = params.get('weight_decay') if params.get('weight_decay') else weight_decay
      lr = params.get('lr') if params.get('lr') else lr
      n_epochs = params.get('n_epochs') if params.get('n_epochs') else n_epochs
      optim = params.get('optim') if params.get('optim') else optim

    print("Set parameters:", "momentum:", momentum, "weight decay:", weight_decay, "learning rate:", lr, "number of epochs:", n_epochs, "optimiser:", optim, "plot:", plot)

    set_seed(seed)
    model = model.to(device)
    optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=momentum, weight_decay=weight_decay)

    if optim == 'Adam': # Adam optimiser
        optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
        print("Adam Optimiser used")

    criterion = nn.CrossEntropyLoss()

    train_loader = DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=0)
    validation_loader = DataLoader(validset, batch_size=test_batch_size, shuffle=False, num_workers=0)

    liveloss = PlotLosses()

    # Save the training acc and losses 
    train_accs, train_losses = [], []

    # Save the validation acc and losses
    valid_accs, valid_losses = [], []

    for epoch in range(n_epochs):
        print("epoch: ", epoch)
        logs = {}
        train_loss, train_accuracy = train(model, optimizer, criterion, train_loader)

        train_accs.append(train_accuracy)
        train_losses.append(train_loss)

        if plot:
          logs['' + 'log loss'] = train_loss.item()
          logs['' + 'accuracy'] = train_accuracy.item()


        if not full:

          validation_loss, validation_accuracy = validate(model, criterion, validation_loader)

          valid_accs.append(validation_accuracy)
          valid_losses.append(validation_loss)

          if plot:
            logs['val_' + 'log loss'] = validation_loss.item()
            logs['val_' + 'accuracy'] = validation_accuracy.item()

        if full:
          if plot:
            logs['val_' + 'log loss'] = 0
            logs['val_' + 'accuracy'] = 0

        liveloss.update(logs)
        liveloss.draw()

    if full:
      test_loss, test_accuracy = validate(model, criterion, validation_loader)    
      print("Avg. Test Loss: %1.3f" % test_loss.item(), " Avg. Test Accuracy: %1.3f" % test_accuracy.item())
      print("")

    return model, train_accs, train_losses, valid_accs, valid_losses



def train_model_augmented(model, train_dataset, validation_dataset, aug_dataset, lr=1e-2, momentum=0.5, weight_decay=0., n_epochs=30, plot=True, augs=1):
    """
    This function trains a model through the a number of epoch cycles and optimizes the model parameters based on the loss function
    
    Input: model - a torch.nn class, can be a custom class. Ex. LeNet5 below
           train_dataset - a pytorch Dataset initialized with a training data Tensor
           validation_dataset - a pytorch Dataset initialized with validation/test data Tensor
           lr - learning rate used by model, float
           momentum - a parameter specified in the SGD optimizer, float
           weight_decay - a regularization parameter specified in the SGD optimizer, float
           n_epochs - the total number of epoch cycles to train the model, integer
           plot - if True, will display a livelossplot of the training and validation losses/accuracies. Else, prints the losses and accuracies
           augs - number of extra augumented trainings per epoch cycle
    
    Returns: model - a torch.nn class
    """
    set_seed(seed)
    model = model.to(device)
    optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=momentum, weight_decay=weight_decay)
    criterion = nn.CrossEntropyLoss()
    
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0)
    validation_loader = DataLoader(validation_dataset, batch_size=test_batch_size, shuffle=False, num_workers=0)
    aug_loader = DataLoader(aug_dataset, batch_size=batch_size, shuffle=True, num_workers=0)

    if plot:
      liveloss = PlotLosses()
      for epoch in range(n_epochs):
          logs = {}
          # train on original dataset
          train_loss, train_accuracy = train(model, optimizer, criterion, train_loader)
          # train on augmented dataset
          for i in range(augs):
              train_loss, train_accuracy = train(model, optimizer, criterion, aug_loader)

              logs['' + 'log loss'] = train_loss.item()
              logs['' + 'accuracy'] = train_accuracy.item()

              validation_loss, validation_accuracy = validate(model, criterion, validation_loader)
              logs['val_' + 'log loss'] = validation_loss.item()
              logs['val_' + 'accuracy'] = validation_accuracy.item()

              liveloss.update(logs)
              liveloss.draw()
    else:
      for epoch in range(n_epochs):
          # train on original dataset
          train_loss, train_accuracy = train(model, optimizer, criterion, train_loader)
          # train on augmented dataset
          for i in range(augs):
              train_loss, train_accuracy = train(model, optimizer, criterion, aug_loader)

          validation_loss, validation_accuracy = validate(model, criterion, validation_loader)

          if (epoch % 5 == 0 or epoch == (n_epochs - 1)):
              print("Training loss: ", train_loss.item())
              print("Val loss: ", validation_loss.item())
              print("Training acc: ", train_accuracy.item())
              print("Val acc: ", validation_accuracy.item())

    return model



## 6. Save Predictions as CSV file

In [0]:
def save_predictions(model, X_test, name=None):
    """
    Saves the predictions of the model from the test set as a csv file 

    Input: model - a torch.nn class, can be a custom class. Ex. LeNet, AlexNet
           X_test - testset, a pytorch.Tensor object
    """
    X_test = apply_normalization(X_test)
    y_pred1 = evaluate(model, X_test)

    ID = np.arange(0, len(y_pred1))
    dataframe1 = pd.DataFrame({'ID': ID, 'Category': y_pred1})

    # Saves to CSV file
    if not name:
      name = "kmnist_classifier.csv"
    path = F"/content/gdrive/My Drive/KMNIST_ENTROPY/results/{name}"
    dataframe1.to_csv(path, index=False, sep=',')
    
    
def save_predictions_ns(y_preds, name=None):
    """
    Saves the predictions of classifier given as a list as a csv file 

    Input: y_pred - numpy array
    """
    
    ID = np.arange(0, len(y_preds))
    dataframe1 = pd.DataFrame({'ID': ID, 'Category': y_preds})

    # Saves to CSV file
    if not name:
      name = "kmnist_pca_classifier.csv"
    path = F"/content/gdrive/My Drive/KMNIST_ENTROPY/results/{name}"
    dataframe1.to_csv(path, index=False, sep=',')

## 7. Save Model

In [0]:
def save_model(model, name):
  """
  Saves the model as a pt file 
  
  Input: model - a torch.nn class
         name - file name, a string
  """
  
  # Ex. model_save_name = 'AlexNet_kmnist_classifier_random_ep60_drop0_wd1e3.pt'
  path = F"/content/gdrive/My Drive/KMNIST_ENTROPY/models/{name}" 
  torch.save(model.state_dict(), path)
  

## 8. Load Model 

In [0]:
  def load_model(model, name):
    """
    Loads the model as a pt file

    Input: model - a torch.nn class
           name - file name, a string
    """

    model = model.to(device)
    model.load_state_dict(torch.load(F"/content/gdrive/My Drive/KMNIST_ENTROPY/models/{name}"))
    model.to(device)
  
    return model

In [0]:
print("Utils.ipynb has finished downloading")