![hslu_logo.png](img/hslu_logo.png)

## Week 02

<hr style="border:1px solid black">

# Excercise: Multi Layer Perceptron with PyTorch
---
---
This excercise is to illustrate a first classification problem using a multi-layer-perceptron. In other words it is a feed forward network with a set of fully connected layers. The input data can be choosen to be MNIST or FashionMNIST

### Import necessary packages

In [None]:
import torch
import torchvision
import numpy as np
import matplotlib.pyplot as plt

from utils import read_data, plot_img, plot_tiles, plot_error, plot_cost

#### Download dataset (MNIST or FashionMNIST)

In [None]:
#data is save in storage_path (use same path for different SW)
training_data, test_data, labels_map = read_data(data_type='FashionMNIST', storage_path='../SW01/data')

#to access the images
print(f'training data shape {training_data.data.shape} and type {training_data.data.dtype} ')
print(f'test data shape {test_data.data.shape} and type {test_data.data.dtype} ')

#to access the labels
print(f'training data shape {training_data.targets.shape} and type {training_data.targets.dtype} ')
print(f'test data shape {test_data.targets.shape} and type {test_data.targets.dtype} ')
print(f'categories: {labels_map}')

#### Organization of data

In [None]:
#to access direclty the images
print('data shape and type:')
print(training_data.data.shape)
print(training_data.data.dtype)

#to access directly the lables
print('\ncategory labels with shape and type:')
print(torch.unique(training_data.targets))
print(training_data.targets.shape)
print(training_data.targets.dtype)

#### Plot sample images

In [None]:
plot_img(training_data.data[0], figure_size = [2,2])

In [None]:
#plot categories mixed
plot_tiles(training_data.data, 10, 5)

In [None]:
#plot only certain categorie
label = 3
plot_tiles(training_data.data[training_data.targets == label], 10,5)

#### MiniBatch class
Pytorch has its own dataloader routine for mini batches but the present version is more efficient for our toy model

In [None]:
class MiniBatches:
    """
    obtains x- and y-data in the constructor and returns a sample of batch_size with each call to next()
    """
    def __init__(self, X, Y, batch_size):
        """
        constructor

        Arguments:
        x/y -- data
        batch_size -- size of batch (0 means one single batch)
        """
        self.X = X
        self.Y = Y
        m = X.shape[0]
        self.indices = torch.randperm(m)
        self.n = X.shape[1]
        
        if not batch_size:
            self.batch_size = m
            self.mb = 1
        else:
            self.batch_size = batch_size        
            self.mb = int(m / self.batch_size)    
        
        self.ib = 0

    def number_of_batches(self):
        return self.mb

    def next(self):
        it = self.indices[self.ib * self.batch_size:(self.ib + 1) * self.batch_size]
        X_batch = self.X[it, :]
        Y_batch = self.Y[it]
        self.ib += 1

        return {'X_batch': X_batch, 'Y_batch': Y_batch}

#### Define the neural network

In [None]:
class MultiLayerPerceptron:
    """
    MLP class handling the layers and doing all propagation and back propagation steps
    all hidden layers are dense (with ReLU activation) and the last layer is softmax
    """
    def __init__(self, num_input, num_hidden, num_output):
        """
        constructor

        Arguments:
        list_num_neurons -- list of layer sizes including in- and output layer
        
        """
        self.model = torch.nn.Sequential(
            torch.nn.Linear(num_input, num_output)
            ### START YOUR CODE ###*
            #torch.nn....,
            #torch.nn...
            ### END YOUR CODE ###*
        )
        
        self.cost_fn = torch.nn.CrossEntropyLoss(reduction='mean')

        #used to save results
        self.result_data = torch.tensor([])
        
    def propagate(self, X):
        """
        calculates the function estimation based on current parameters [W,B]
        """    
        self.Y_pred = self.model(X)
           
     
    def back_propagate(self, cost):
        """
        calculates the backpropagation results based on expected output y
        this function must be performed AFTER the corresponding propagte step
        """    
        self.model.zero_grad()
        
        cost.backward()
 

    def calc_cost(self, Y):
        """
        calculates the MSE loss function
        """
        cost = self.cost_fn(self.Y_pred, Y)
        
        return cost
    
        
        
    def gradient_descend(self, alpha):
        """
        does the gradient descend based on results from last back_prop step with learning rate alpha
        """
        with torch.no_grad():
            for param in self.model.parameters():
                param -= alpha * param.grad
            
     
        
    def calc_error(self, Y):
        """
        get error information
        """
        m = Y.shape[0]

        Y_pred_argmax = torch.argmax(self.Y_pred, dim=1)
        train_error = torch.sum(Y != Y_pred_argmax) / m

        return train_error


    def save_training_data(self, data):
        """
        save training and validation curves
        """
        #determine the train loss and error
        self.propagate(data['X_train'])
        cost_train = self.calc_cost(data['Y_train'])
        error_train = self.calc_error(data['Y_train'])
        #calculate validation loss and error
        self.propagate(data['X_val'])
        cost_val = self.calc_cost(data['Y_val'])
        error_val = self.calc_error(data['Y_val'])

        #safe the results
        res = torch.tensor([[cost_train.item(), error_train.item(), cost_val.item(), error_val.item()]])
        self.result_data = torch.cat((self.result_data, res), 0)
        
        
    def optimize(self, data, epochs, alpha, batch_size=16, debug=0):
        """
        performs epochs number of gradient descend steps and appends result to output array

        Arguments:
        data -- dictionary with data
        epochs -- number of epochs
        alpha -- learning rate
        batch_size -- size of batch (0: use full training set)
        debug -- False (default)/True; get info on each gradient descend step
        """
        
        # save results before 1st step
        for i0 in range(0, epochs):
            #save the data at the beginning of the step
            self.save_training_data(data)
            #create batches for each epoch
            batches = MiniBatches(data['X_train'], data['Y_train'], batch_size)
            #loop over batches
            for ib in range(batches.number_of_batches()):
                batch = batches.next()
                #do prediction
                self.propagate(batch['X_batch'])
                #determine the loss 
                cost = self.calc_cost(batch['Y_batch'])
                #determine the error
                self.back_propagate(cost)
                #do the correction step
                self.gradient_descend(alpha)
                #calculate the error
                error = self.calc_error(batch['Y_batch'])
    
            if debug and np.mod(i0, debug) == 0:
                print('step %r, train cost %1.3f, train error %1.3f, val cost %1.3f, val error %1.3f' % \
                     (i0, self.result_data[-1,0], self.result_data[-1,1], self.result_data[-1,2], self.result_data[-1,3]))

        #save final performance
        self.save_training_data(data)            
            

In [None]:
#normalize data: original data is overwritten
data_min, data_max = torch.min(training_data.data), torch.max(training_data.data)
print('original min-max values are: %r, %r and type %r' % (data_min.item(), data_max.item(), data_min.dtype))

normalize_tpye = 0

if normalize_tpye == 0:
    #min-max-rescaling
    training_data.data = (training_data.data.float() - data_min) / (data_max - data_min)    
    test_data.data = (test_data.data.float() - data_min) / (data_max - data_min)   
else:
    #min-max-normalization
    training_data.data = 2*(training_data.data.float() - data_min) / (data_max - data_min) - 1
    test_data.data = 2*(test_data.data.float() - data_min) / (data_max - data_min) - 1 

data_min, data_max = torch.min(training_data.data), torch.max(training_data.data)
print('now min-max values are: %r, %r and type %r' % (data_min.item(), data_max.item(), data_min.dtype))


#### Define X and Y values and do optimization

In [None]:
#input is flattend to n x 784
data_X = training_data.data.flatten(1)
data_X_test = test_data.data.flatten(1)
#labels are direclty supported by pytorch
data_Y = training_data.targets
data_Y_test = test_data.targets

data = {'X_train' : data_X, 'Y_train' : data_Y, \
         'X_val' : data_X_test, 'Y_val' : data_Y_test}

num_input = data_X.shape[1]
num_output = len(torch.unique(data_Y))
num_hidden = 100

mlp = MultiLayerPerceptron(num_input, num_hidden, num_output)

print(mlp.model)

mlp.optimize(data, 20, 0.04, 1, 2)

#plot the results (ranges to the right used for fig.11 - fig.13
plot_cost(mlp, y_range = [2e-1, 2.5]) #[1e-1, 2.5]
plot_error(mlp, y_range = [1e-1, 1])  #[.2e-1, 1]

In [None]:
y_pred = torch.argmax(mlp.model(data_X),1)

#select a number of false classifications (rows x cols) to plot
num_sel = 9

plot_tiles(training_data.data[y_pred != data_Y], num_sel, num_sel, figure_size = [6,6])

for i0 in range(0, num_sel):
    print(y_pred[y_pred != data_Y][i0*num_sel:(i0+1)*num_sel].numpy())

#### Test accuracy

In [None]:
#input is flattend to n x 784
data_X = test_data.data.flatten(1)
#labels are direclty supported by pytorch
data_Y = test_data.targets

y_pred = torch.argmax(mlp.model(data_X),1)

test_acc = torch.sum(data_Y != y_pred)/data_Y.shape[0]
print(test_acc.numpy())

In [None]:
print(torch.sum(data_Y != y_pred), data_Y.shape[0])

In [None]:
data_Y.shape