# Assignment 5: Neuroevolution: Neural Architecture Search (NAS)

In [1]:
import os
import torch
import random
import numpy as np
from sklearn import datasets
import matplotlib.pyplot as plt
from sklearn.datasets import load_digits
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F

In [2]:
class Digits(Dataset):
    """Scikit-Learn Digits dataset."""

    def __init__(self, mode='train', transforms=None):
        digits = load_digits()
        if mode == 'train':
            self.data = digits.data[:1000].astype(np.float32)
            self.targets = digits.target[:1000]
        elif mode == 'val':
            self.data = digits.data[1000:1350].astype(np.float32)
            self.targets = digits.target[1000:1350]
        else:
            self.data = digits.data[1350:].astype(np.float32)
            self.targets = digits.target[1350:]
        self.transforms = transforms

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample_x = self.data[idx]
        sample_y = self.targets[idx]
        if self.transforms:
            sample_x = self.transforms(sample_x)
        return (sample_x, sample_y)

In [3]:
class Reshape(nn.Module):
    def __init__(self, size):
        super(Reshape, self).__init__()
        self.size = size # a list

    def forward(self, x):
        assert x.shape[1] == np.prod(self.size)
        return x.view(x.shape[0], *self.size)

# This module flattens an input (tensor -> matrix) by blending dimensions 
# beyond the batch size.
class Flatten(nn.Module):
    def __init__(self):
        super(Flatten, self).__init__()
  
    def forward(self, x):
        return x.view(x.shape[0], -1)

In [4]:
class ClassifierNeuralNet(nn.Module):
    def __init__(self, classnet):
        super(ClassifierNeuralNet, self).__init__()
        # We provide a sequential module with layers and activations
        self.classnet = classnet
        # The loss function (the negative log-likelihood)
        self.nll = nn.NLLLoss(reduction='none') #it requires log-softmax as input!!

    # This function classifies an image x to a class.
    # The output must be a class label (long).
    def classify(self, x):

        y_pred = self.classnet(x)
        _,pred_label = torch.max(y_pred, dim = 1)

        return pred_label

    # This function is crucial for a module in PyTorch.
    # In our framework, this class outputs a value of the loss function.
    def forward(self, x, y, reduction='avg'):

        loss = self.nll(self.classnet(x),y.long())

        if reduction == 'sum':
            return loss.sum()
        else:
            return loss.mean()

In [5]:
def evaluation(test_loader, model_best=None, epoch=None):
  
    model_best.eval()# set the model to the evaluation mode
    loss_test = 0.
    loss_error = 0.
    N = 0.
    # start evaluation
    for indx_batch, (test_batch, test_targets) in enumerate(test_loader):
        # loss (nll)
        loss_test_batch = model_best.forward(test_batch, test_targets, reduction='sum')
        loss_test = loss_test + loss_test_batch.item()
        # classification error
        y_pred = model_best.classify(test_batch)
        e = 1.*(y_pred == test_targets)
        loss_error = loss_error + (1. - e).sum().item()
        # the number of examples
        N = N + test_batch.shape[0]
    # divide by the number of examples
    loss_test = loss_test / N
    loss_error = loss_error / N

    # Print the performance
    if epoch is None:
        print(f'-> FINAL PERFORMANCE: nll={loss_test}, ce={loss_error}')
    else:
        if epoch % 10 == 0:
            print(f'Epoch: {epoch}, val nll={loss_test}, val ce={loss_error}')

    return loss_test, loss_error

def training(max_patience, num_epochs, model, optimizer, training_loader, val_loader):
    nll_val = []
    error_val = []
    best_nll = 1000.
    patience = 0

    # Main training loop
    for e in range(num_epochs):
        model.train() # set the model to the training mode
        # load batches
        for indx_batch, (batch, targets) in enumerate(training_loader):
          # calculate the forward pass (loss function for given images and labels)
          loss = model.forward(batch, targets)
          # remember we need to zero gradients! Just in case!
          optimizer.zero_grad()
          # calculate backward pass
          loss.backward(retain_graph=True)
          # run the optimizer
          optimizer.step()

        # Validation: Evaluate the model on the validation data
        loss_e, error_e = evaluation(val_loader, model_best=model, epoch=e)
        nll_val.append(loss_e)  # save for plotting
        error_val.append(error_e)  # save for plotting

        # Early-stopping: update the best performing model and break training if no 
        # progress is observed.
        if e == 0:
            pass
        else:
            if loss_e < best_nll:
                patience = 0
            else:
                patience = patience + 1

        if patience > max_patience:
            break

    # Return nll and classification error.
    nll_val = np.asarray(nll_val)
    error_val = np.asarray(error_val)
    return nll_val, error_val

In [6]:
class NAS():
    def __init__(self, params):
        self.params = params
    
    def network(self):
        fltr = random.choice(self.params['fltr'])
        ker_size = random.choice(self.params['ker_size'])
        pad = 1 if ker_size == 3 else 2
        pooling = random.choice(self.params['pooling'])
        activation = random.choice(self.params['activation'])
        l_neuron = random.randrange(10,110,10)

        for_size = nn.Sequential(Reshape((1,8,8)),
                             nn.Conv2d(1,fltr,ker_size,1,pad),
                             activation,
                             pooling,
                             Flatten())
        flat_size = for_size(torch.randn(1,64)).shape[1]


        classnet = nn.Sequential(Reshape((1,8,8)),
                             nn.Conv2d(1,fltr,ker_size,1,pad),
                             activation,
                             pooling,
                             Flatten(),
                             nn.Linear(flat_size,l_neuron),
                             activation,
                             nn.Linear(l_neuron,10),
                             nn.LogSoftmax(dim=1))
        return classnet

    def create_population(self,no_of_childs):
        networks = []
        for i in range(no_of_childs):
            networks.append(self.network())
        return networks

    def count_parameters(self,model):
        total_params = 0
        for name, parameter in model.named_parameters():
            if not parameter.requires_grad: continue
            param = parameter.numel()
            total_params+=param
        return total_params

    def objective(self,population,class_errors):
        weights = []
        for child in population:
            weights.append(self.count_parameters(child))
        max_weight = max(weights)
        objs = {}
        for i in range(len(weights)):
            obj = (class_errors[i]) + 0.01 * (weights[i]/max_weight)
            objs[obj] = i
        return objs
    
    def find_best_model(self,population,class_errors):
        best_model_index = sorted(self.objective(population,class_errors).items())[0][1]
        best_model_objective = sorted(self.objective(population,class_errors).items())[0][0]
        print(f'\n Best Model with objecive {best_model_objective:.4f}: \n {population[best_model_index]}')

    def train(self,population,num_epochs,training_loader,val_loader,test_loader,lr,wd,max_patience):
        pop = population
        class_errors = []
        for i,child in enumerate(pop):
            print('Training Child: ',i)
            model = ClassifierNeuralNet(child)
            optimizer = torch.optim.Adamax([p for p in model.parameters() if p.requires_grad == True], lr=lr, weight_decay=wd) 
            nll_val, error_val = training(max_patience=max_patience,
                                    num_epochs=num_epochs,
                                    model=model,
                                    optimizer=optimizer,
                                    training_loader=training_loader,
                                    val_loader=val_loader)
            test_loss, test_error = evaluation(test_loader=test_loader,model_best=model)
            class_errors.append(test_error)
        return class_errors

In [7]:
# Initialize training, validation and test sets.
train_data = Digits(mode='train')
val_data = Digits(mode='val')
test_data = Digits(mode='test')

# Initialize data loaders.
training_loader = DataLoader(train_data, batch_size=64, shuffle=True)
val_loader = DataLoader(val_data, batch_size=64, shuffle=False)
test_loader = DataLoader(test_data, batch_size=64, shuffle=False)

In [8]:
if __name__ =='__main__':
    # -> training hyperparams
    lr = 1e-3 # learning rate
    wd = 1e-5 # weight decay
    num_epochs = 11 # max. number of epochs
    max_patience = 100 # an early stopping is used, if training doesn't improve for longer than 20 epochs, it is stopped
    no_of_childs = 5  #no. of child networks in population 
    params = {
        'fltr':[8,16,32],
        'ker_size':[3,5],
        'activation':[nn.ReLU(),nn.Sigmoid(),nn.Tanh(),nn.Softplus(),nn.ELU()],
        'pooling':[nn.MaxPool2d(2),nn.MaxPool2d(1),nn.AvgPool2d(2),nn.AvgPool2d(1)],
    }
    
    networks = NAS(params)
    population = networks.create_population(no_of_childs)
    class_errors =  networks.train(population=population,
                                   num_epochs=num_epochs,
                                   training_loader=training_loader,
                                   val_loader=val_loader,
                                   test_loader=test_loader,lr=lr,wd=wd,max_patience=max_patience)
    
    networks.find_best_model(population,class_errors)

Training Child:  0
Epoch: 0, val nll=0.3875938960484096, val ce=0.08571428571428572
Epoch: 10, val nll=0.10159126520156861, val ce=0.02857142857142857
-> FINAL PERFORMANCE: nll=0.2944671366305426, ce=0.07158836689038031
Training Child:  1
Epoch: 0, val nll=0.8345775822230748, val ce=0.1
Epoch: 10, val nll=0.16879947389875138, val ce=0.03428571428571429
-> FINAL PERFORMANCE: nll=0.2937076811822469, ce=0.07606263982102908
Training Child:  2
Epoch: 0, val nll=0.7155780247279576, val ce=0.11142857142857143
Epoch: 10, val nll=0.11437022890363421, val ce=0.022857142857142857
-> FINAL PERFORMANCE: nll=0.2814940243492724, ce=0.0894854586129754
Training Child:  3
Epoch: 0, val nll=1.5503225490025112, val ce=0.3142857142857143
Epoch: 10, val nll=0.18459584508623395, val ce=0.05142857142857143
-> FINAL PERFORMANCE: nll=0.39454610129064094, ce=0.12080536912751678
Training Child:  4
Epoch: 0, val nll=0.9477011162894112, val ce=0.10857142857142857
Epoch: 10, val nll=0.1252859262057713, val ce=0.0257

Short Summary:

1) it randomly generate the (numbers_of_childs) networks for the given random parameters.

2) then it train each network on train set and eval in test set

3) then it calculates the objective for each network

4) based on objective function it finds the best model  
