In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam, SGD
from torch.utils.data import TensorDataset, DataLoader

import tensorflow.keras.datasets.mnist as MNIST

from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, accuracy_score
import warnings

cuda = torch.cuda.is_available()
print("GPU available:", cuda)

torch.manual_seed(1102)
np.random.seed(1102)

#References: https://www.kaggle.com/code/snnaik/aoe-bme-lecture3-partc-mlp-lenetformnist

#### a. Downloads the MNIST dataset using MNIST package. (Make sure the images and labels for both training set and test set are downloaded correctly)
#### b. Using `train_test_split` function, splits training set into a new training set and validation set by a ratio of 20%.
#### c. Prints the number of samples, minimum and maximum intensity, and the shape of the images matrix for all three sets.

In [None]:
(trainingSET, label_train), (testing, label_test) = MNIST.load_data()

#split the training set into new training set and validation set
trainingSET, validation, label_train, label_val = train_test_split(trainingSET, label_train, test_size=0.2)

#print num entries, max and min value, shape
print(trainingSET.shape[0], np.max(trainingSET), np.min(trainingSET), trainingSET.shape)
print(validation.shape[0], np.max(validation), np.min(validation), validation.shape)
print(testing.shape[0], np.max(testing), np.min(testing), testing.shape)

#### Displays 24 first samples pulled from each set (train, validation, and test) and show the true labels as the title per each sample. 
**Note:** Displays all 24 samples pulled from each set in one plot as in four rows and six columns. Thus, we will expect three plots for training, validation, and test set.

In [None]:
sets = [trainingSET, trainingSET, trainingSET]
labels = [label_train, label_val, label_test]
titles = ['Training ', 'Validation', 'Testing']
for s in range(len(sets)): 
    row = 4
    col = 6
    fig = plt.figure(figsize = (12, 15))
    
    plt.suptitle(f"From {titles[s]}")
    for i in range(1, row * col + 1):
        
        IMG = sets[s][i, :, :]
        val = fig.add_subplot(row, col, i)
        val.set_xticks([])
        val.set_yticks([])
        val.title.set_text(str(labels[s][i]))
        plt.imshow(IMG, cmap = 'gray')
        
    plt.show()
    print


#### a. Reformats all samples and creates a `DataLoader` of batch size 100 for each set. 
#### b. Prints the total number of batches in each `DataLoader` using `len()` function


In [None]:
# [your code here]
image_train_torch = torch.from_numpy(trainingSET).type(torch.FloatTensor).view(-1, 1, 28, 28)
label_train_torch = torch.from_numpy(label_train).type(torch.LongTensor)

image_validation_torch = torch.from_numpy(validation).type(torch.FloatTensor).view(-1, 1, 28, 28)
label_validation_torch = torch.from_numpy(label_val).type(torch.LongTensor)

image_test_torch = torch.from_numpy(testing).type(torch.FloatTensor).view(-1, 1, 28, 28)
label_test_torch = torch.from_numpy(label_test).type(torch.LongTensor)

train_data = TensorDataset(image_train_torch, label_train_torch)
train_loader = DataLoader(train_data, batch_size = 100)

print(len(train_loader))

#### </span> Defines a model as the following table:
| Layer Number | Layer Type   | Number of kernels | Kernel size | Activation | Stride | Zero-Padding |
| ------------ | ------------ | ----------------- | ----------- | ---------- | ------ | ------------ |
| 1            | Conv2D       | 32                | 5x5         | ReLU       | 1      | 0            |
| 2            | MaxPooling2D | NA                | 2x2         | NA         | 2      | 0            |
| 3            | Conv2D       | 64                | 3x3         | ReLU       | 1      | 0            |
| 4            | MaxPooling2D | NA                | 2x2         | NA         | 2      | 0            |
| 5            | Conv2D       | 128               | 2x2         | ReLU       | 1      | 0            |
| 6            | MaxPooling2D | NA                | 2x2         | NA         | 2      | 0            |
| 7            | Faltten      | NA                | NA          | NA         | NA     | NA           |
| 8            | FC (Linear)  | 128               | NA          | ReLU       | NA     | NA           |
| 9            | FC (Linear)  | 64                | NA          | Sigmoid    | NA     | NA           |
| 10           | FC (Linear)  | 10                | NA          | SoftMax    | NA     | NA           |
**Note:** **NA** means not applicable.

#### Creates an instance of the model and load it on the GPU by calling the `.cuda()` function.

In [None]:
# Formula to calculate shape as we go through layer by layer = [(X - F + 2P)/S] + 1
# Here,
# X = Width / Height
# F = Kernel size
# P = Padding
# S = Strides (default = 1)

# Our input to the first layer is going to be [batchsize, 1, 32, 32]
# substitute, =[(28 - 5 + 2(0))/1] + 1
#             =[(23)/1] + 1
#             =23 + 1
#             =24


class MLPModel(nn.Module):
    '''Reshape -> FC -> Sigmoid -> FC -> Sigmoid -> FC -> SoftMax -> Cross-Entropy'''
    def __init__(self):
        '''Define model modules.'''
        super(MLPModel, self).__init__()
        self.fc1 = nn.Linear(28 * 28, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 10)

    def forward(self, x):
        '''Define the model architecture (the sequence to place the model modules).'''
        x = x.view(-1, 28 * 28)
        x = self.fc1(x)
        x = F.sigmoid(x)
        x = self.fc2(x)
        x = F.sigmoid(x)
        x = self.fc3(x)
        return F.log_softmax(x, dim = 1)

our_MLP = MLPModel()

# If GPU available, move the model to GPU.
if cuda:
    our_MLP.cuda()

print(our_MLP)

#### </span> Writes the shape of the output tensor for each layer of the model, knowing the input shape of each image in the MNIST dataset is $28\times28$.
**Note:** Uses the following formula: \
Assuming the input shape is: $(B, Ch_{in}, W_{in}, H_{in})$, and the output shape is $(B, Ch_{out}, W_{out}, H_{out})$ \
1. For the `Conv2D` layers with $N$ number of $K\times K$ kernels, zero-padding $P$, and stride $S$:
$$
W_{out} \text{ or } H_{out} = \lfloor {\frac{W_{in} \text{ or } H_{in} + 2P - K }{S}} + 1 \rfloor
$$
$\lfloor . \rfloor$ means integer floor.

And the number of channels for the output will be $Ch_{out} = N$

2. Similarly for the `Maxpooling` layers of size $K \times K$, zero-padding $P$, and stride $S$:
$$
W_{out} \text{ or } H_{out} = \lfloor {\frac{W_{in} \text{ or } H_{in} + 2P - K }{S}} + 1 \rfloor
$$

The only difference is that the number of channels for the output will be $Ch_{out} = Ch_{in}$

3. For `Linear` (Also known as Fully-Connected) layers, the input is a vector of $(B, D_{in})$. Assuming the layer has $N$ number of neurons, the output is a vector of size $(B, N)$.

**Note:** $B$ stands for the batch size.


Layer 1: W = [(28 - 5 + 2*0)/1] + 1 = 24, 32 Channels

Layer 2: W = [(24 - 2 + 2*0)/2] + 1 = 12, 32 Channels

Layer 3: W = [(12 - 3 + 2*0)/1] + 1 = 10, 64 Channels

Layer 4: W = [(10 - 2 + 2*0)/2] + 1 = 5, 64 Channels

Layer 5: W = [(5 - 2 + 2*0)/1] + 1 = 4, 128 Channels

Layer 6: W = [(4 - 2 + 2*0)/2] + 1 = 2, 128 Channels

Layer 7: D = 512 *Reshape*

Layer 8: Vector: <100, 128>

Layer 9: Vector: <100, 64>

Layer 10: Vector: <100, 10>

#### </span> Defines an SGD optimizer with a learning rate of $10^{-3}$ on the parameters of the model.

In [None]:
# define our optimizer
optimizer = SGD(our_MLP.parameters(), lr = 0.001)

#### </span> Trains the network on a train set for 50 epochs. Uses the "Cross-Entropy" (`F.cross_entropy`) as the loss function. 
#### At the end of each epoch, saves the model, tests the model on the validation set, and keeps the training and validation loss for later. 
**Note:** Loss is calculated for each batch; thus, average loss needed to be collected over all batches as the epoch loss for both training and validation phases.

In [None]:
!mkdir saved_models_MLP

In [None]:
EPOCHS = 50

train_epoch_loss = []
validation_epoch_loss = []

for epoch in range(EPOCHS):
    train_loss = []
    validation_loss = []

    for batch_index, (train_image, train_label) in enumerate(train_loader):
        # If GPU is available, move the data to the GPU for faster computation.
        if cuda:
            #######################################################
            ####################### Train #########################
            #######################################################
            # Set the model to train mode so that the parameters can be updated.
            our_MLP.train()

            train_label_predicted = our_MLP(train_image.cuda())

            # compute the loss
            loss = F.cross_entropy(train_label_predicted, train_label.cuda())
            train_loss.append(loss.cpu().data.item())

            # reset the gradient 
            optimizer.zero_grad()
            # backpropagate the loss
            loss.backward()
            # update the parameters
            optimizer.step()

            #######################################################
            ###################### Validation #####################
            #######################################################
            # Set the model to evaluation mode so that parameters are fixed.
            our_MLP.eval()

            validation_label_predicted = our_MLP(image_validation_torch.cuda())

            loss = F.cross_entropy(validation_label_predicted, label_validation_torch.cuda())
            validation_loss.append(loss.cpu().data.item())
        
        # If GPU is not available.
        else:
            #######################################################
            ####################### Train #########################
            #######################################################
            # Set the model to train mode so that the parameters can be updated.
            our_MLP.train()

            train_label_predicted = our_MLP(train_image)

            # compute the loss
            loss = F.cross_entropy(train_label_predicted, train_label)
            train_loss.append(loss.cpu().data.item())

            # reset the gradient
            optimizer.zero_grad()
            # backpropagate the loss
            loss.backward()
            # update the parameters
            optimizer.step()

            #######################################################
            ###################### Validation #####################
            #######################################################
            # Set the model to evaluation mode so that parameters are fixed.
            our_MLP.eval()

            validation_label_predicted = our_MLP(image_validation_torch)

            loss = F.cross_entropy(validation_label_predicted, label_validation_torch)
            validation_loss.append(loss.cpu().data.item())

    train_epoch_loss.append(np.mean(train_loss))
    validation_epoch_loss.append(np.mean(validation_loss))
    
    # save models
    torch.save(our_MLP.state_dict(), './saved_models_MLP/checkpoint_epoch_%s.pth' % (epoch))

    print("Epoch: {} | train_loss: {} | validation_loss: {}".format(epoch, train_epoch_loss[-1], validation_epoch_loss[-1]))

#### </span> Plots the learning curve and indicates in which epoch the model achieved the lowest validation loss.
**Note:** The learning curve is a plot that shows the training and validation loss for each epoch.

In [None]:
plt.figure(figsize = (12, 8))
plt.plot(train_epoch_loss, '-o', label = 'training loss', markersize = 3)
plt.plot(validation_epoch_loss, '-o', label = 'validation loss', markersize = 3)
plt.legend(loc = 'upper right');


best_epoch = np.argmin(validation_epoch_loss)
print('best epoch: ', best_epoch)


#### a. Loads the weights of the best epoch and tests the model on the test set.
#### b. Tests the model on the first sample of the test set and plots the output probabilities.
#### c. Prints the overall accuracy of the model on the test set. Accuracy formula is $Accuracy=\frac{\text{Number of correct prediction}}{\text{Total Number of samples}}$
#### d. Plots the confusion matrix for the test set.

**Note:** The testing phase is similar to the validation phase. You need to loop through batches of the test loader and keep the true labels and predicted labels for accuracy and confusion matrix.

In [None]:
state_dict = torch.load('./saved_models_MLP/checkpoint_epoch_%s.pth' % (epoch))
print(state_dict.keys())
our_MLP.load_state_dict(state_dict)




In [None]:
def predict_with_pytorch(model, input_data):
    model.eval()
    label_predicted_all = []

    label_predicted_one_hot = model(input_data)
    label_predicted_probability, label_predicted_index = torch.max(label_predicted_one_hot.data, 1)
    
    for current_prediction in label_predicted_index:
        label_predicted_all.append(current_prediction.detach().cpu().numpy().item())

    return label_predicted_all

if cuda:
    test_label_predicted = predict_with_pytorch(our_MLP, image_test_torch.cuda())
else:
    test_label_predicted = predict_with_pytorch(our_MLP, image_test_torch)

In [None]:
def view_classify(image, probabilities, version = "MNIST"):
    ''' Function for viewing an image and it's predicted classes.
    '''
    probabilities = probabilities.data.numpy().squeeze()

    fig, (ax1, ax2) = plt.subplots(figsize = (6, 9), ncols = 2)
    ax1.imshow(image.resize_(1, 28, 28).numpy().squeeze(), cmap = 'gray')
    ax1.set_title('Original Image')
    ax1.axis('off')
    ax2.bar(np.arange(10), probabilities)
    ax2.set_aspect(10)
    ax2.set_xticks(np.arange(10))
        
    ax2.set_title('Class Probability')
    ax2.set_ylim(0, 1.1)

    plt.tight_layout()
   
our_MLP.eval()

sample_test_image = image_test_torch[0, :, :, :][np.newaxis, :, :, :]

if cuda:
    sample_prediction = our_MLP(sample_test_image.cuda())
else:
    sample_prediction = our_MLP(sample_test_image)
    
warnings.filterwarnings('ignore')

if cuda:
    view_classify(sample_test_image, 2 ** sample_prediction.cpu())
else:
    view_classify(sample_test_image, 2 ** sample_prediction)
    

In [None]:
j=0
match = 0

for i in test_label_predicted:
    if i== label_test[j]:
        match = match + 1
    j = j+1
    
    
print("Accuracy:", round((match/len(label_test) * 100),2) , "%")

In [None]:
CM = confusion_matrix(label_test, test_label_predicted)

plt.figure(figsize = (12,10))
sns.heatmap(CM, annot = True, annot_kws = {"size": 10})
plt.ylim([0, 10]);
plt.ylabel('True labels');
plt.xlabel('predicted labels');

#### Final Thoughts:

My model performed better on the MNIST classification compared to the LeNet model. I would presume a larger network of more layers would raise accuracy.
To improve the accuracy of either model, I would suggest using a bigger set of data/sources. Also, I would calculate the batch size and the optimal learning rate.