# MNIST Digits Classification - Simple PyTorch🔥CNN 

Subject: Building a CNN Classificator with PyTorch to classify hand-written digits.

Data: MNIST (handwritten digits) via torchvision

Procedure:
- Previewing images from dataset with pyplot's imshow()
- Neural network with torch.nn.module, torch.nn.Sequential, torch.nn.Conv2d, torch.nn.ReLU, torch.nn.MaxPool2d, torch.nn.Dropout, torch.nn.Linear, and torch.nn.init.xavier_uniform_
- Visualizing model with torchviz' make_dot()
- Training with nn.CrossEntropyLoss and torch.optim.Adam optimizer
- Visualization Loss and Accuracy with pyplot
- Visualization of the CNN Layers with pyplot's imshow()
- Good results

Others:
- Compatible with Google Colab and Kaggle as runtime
- CUDA support

Sources used:
- https://machinelearningknowledge.ai/pytorch-conv2d-explained-with-examples/
- Probably some more, but this is an old Notebook and I forgot the sources. Please let me know if I copied your code and I will mention it.

## Bootstrap and Imports

In [None]:
import os
import torch
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f'Running on {DEVICE}')

# running in google colab
if 'google.colab' in str(get_ipython()):
    NUM_EPOCHS = 50
    !pip install torchviz
    BASE_PATH = './drive/MyDrive/Colab/data/'
    from google.colab import drive
    drive.mount('/content/drive')

# running interactively in kaggle
elif get_ipython().config.IPKernelApp.connection_file.startswith('/root/.local/share'):
    NUM_EPOCHS = 5
    BASE_PATH = '/kaggle/input/'
    !pip install torchviz
    
# running as background job in kaggle
elif 'SHLVL' in os.environ:
    NUM_EPOCHS = 50
    BASE_PATH = '/kaggle/input/'
    !pip install torchviz

else:
    BASE_PATH = '../data/'
    NUM_EPOCHS = 2

In [None]:
import random
from tqdm.auto import tqdm
import numpy as np
from collections.abc import Callable
import locale
locale.setlocale(locale.LC_ALL, locale='')  # for thousands separator via ... print(f'{value:n}')"
import math

from IPython.display import HTML, Image
import time
import matplotlib.animation as animation
import matplotlib.pyplot as plt
plt.style.use('ggplot')
import pandas as pd
import torch
from torch import nn
from torchvision import transforms
from torchvision import datasets
from torchvision.utils import make_grid
from torchvision import utils
from torch.utils.data import DataLoader
from torch.nn.modules.loss import _Loss
from torchviz import make_dot
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
import sklearn.metrics

my_seed = 123
random.seed(my_seed)
torch.manual_seed(my_seed)

## Dataset

## Load Data into Pandas

In [None]:
path_train = BASE_PATH + 'digit-recognizer/train.csv'
path_test = BASE_PATH + 'digit-recognizer/test.csv'

df_train_source = pd.read_csv(path_train)
df_test = pd.read_csv(path_test)

print(df_train_source.columns)
print(df_test.columns)

print(df_train_source.shape)
print(df_test.shape)
assert 'label' in df_train_source
assert 'label' not in df_test

In [None]:
# split train into train and validation
df_train_randomized = df_train_source.sample(frac=1)
NUM_VAL = int(len(df_train_randomized) * 0.15)

df_train_with_label = df_train_randomized[:-NUM_VAL]  # (35700, 785)
df_val_with_label = df_train_randomized[-NUM_VAL:]  # (6300, 785)

In [None]:
df_train = df_train_with_label.iloc[:,1:]  # (35700, 784)
df_val = df_val_with_label.iloc[:,1:]  # (6300, 784)

ser_y_train = df_train_with_label.iloc[:,0]  # (35700,)
ser_y_val = df_val_with_label.iloc[:,0]  # (6300,)

In [None]:
assert all(df_train.columns == df_val.columns)
assert all(df_train.columns == df_test.columns)

# no nan treatment required
assert df_train.isnull().sum().sum() == 0

## Tensorize

In [None]:
# from flat 784 (1..255) to 28*28 pixels with 1 normalized channel (0.0..1.0)
def reshape(df: pd.DataFrame) -> np.array:  # df: (n, 784), all int64
    df = df.values.reshape(-1, 28, 28)  # (n, 28, 28), int64
    df = df.astype(np.float32)  # (n, 28, 28), float32
    # the pixels have 256 values (0.0..255.0), therefore we normalize to (0.0..1.0)
    df = df / 255.0
    return df
    
    
arr_train = reshape(df_train)  # np.array (35700, 28, 28), float32
arr_val = reshape(df_val)  # np.array (6300, 28, 28)
arr_test = reshape(df_test)  # np.array (28000, 28, 28)

In [None]:
# tensorize
train = torch.tensor(arr_train).to(DEVICE)  # torch.float32
val = torch.tensor(arr_val).to(DEVICE)
test = torch.tensor(arr_test).to(DEVICE)

y_train = torch.tensor(ser_y_train.values).to(DEVICE)  # [35700], torch.int64
y_val = torch.tensor(ser_y_val.values).to(DEVICE)  # [6300], torch.int64

## Dataset and DataLoader

In [None]:
dataset_train = TensorDataset(train, y_train)

In [None]:
BATCH_SIZE = 128
train_loader = DataLoader(dataset=dataset_train,
                          batch_size=BATCH_SIZE,
                          shuffle=True)

# Preview Images

In [None]:
# preview first ten images
def print_ten_numbers(x: torch.Tensor, 
                      y: torch.Tensor):
    fig = plt.figure(figsize=(25, 10))  # (width, height) in inches
    for i in range(10):
        ax = fig.add_subplot(1,  # nrows
                             10,  # ncols
                             i+1, # index (1-based)
                             xticks=[],
                             yticks=[])
        image = x[i]  # [28, 28]
        label = y[i].item()  # int
        ax.imshow(X=image.squeeze().cpu(),
                  cmap='gray')
        ax.set_title(f"{label}")

x_batch, y_batch = next(iter(train_loader))   # x_batch: [128, 28, 28] torch.float32, y_batch: [128] torch.int64
print_ten_numbers(x=x_batch, y=y_batch)

# CNN Classifier

In [None]:
class CNNClassifier(torch.nn.Module):
    # https://machinelearningknowledge.ai/pytorch-conv2d-explained-with-examples/

    def __init__(self, dropout_probability=0.3):
        super(CNNClassifier, self).__init__()
        
        # add dummy input channel: [batch_size, 28, 28] -> [batch_size, 1, 28, 28]
        self.unflatten = torch.nn.Unflatten(dim=1, unflattened_size=(1,28))
        
        self.layer1 = torch.nn.Sequential(
            # (batch_size, 1, 28, 28) -> (batch_size, 32, 28, 28)
            torch.nn.Conv2d(in_channels=1,  # Number of channels in the input image
                            out_channels=32,  # Number of channels produced by the convolution
                            kernel_size=3, #  Size of the convolving kernel
                            stride=1,  # Stride of the convolution. Default: 1
                            padding=1,  # Padding added to all four sides of the input. Default: 0
                           ),
            # (element-wise)
            torch.nn.ReLU(),
            # (batch_size, 32, 28, 28)  - > (batch_size, 32, 14, 14)
            torch.nn.MaxPool2d(kernel_size=2, # the size of the window to take a max over
                               stride=2,  # the stride of the window. Default value is kernel_size
                              ),
            # (element-wise)
            torch.nn.Dropout(p=dropout_probability,  # probability of an element to be zeroed. Default: 0.5
                            ),
        )

        self.layer2 = torch.nn.Sequential(
            # (batch_size, 32, 14, 14) --> (batch_size, 64, 14, 14)
            torch.nn.Conv2d(in_channels=32,
                            out_channels=64,
                            kernel_size=3,
                            stride=1,
                            padding=1),
            torch.nn.ReLU(),
            # (batch_size, 64, 14, 14) --> (batch_size, 64, 7, 7)
            torch.nn.MaxPool2d(kernel_size=2,
                               stride=2),
            torch.nn.Dropout(p=dropout_probability))

        self.layer3 = torch.nn.Sequential(
            # (batch_size, 64, 7, 7) --> (batch_size, 128, 7, 7)
            torch.nn.Conv2d(in_channels=64,
                            out_channels=128,
                            kernel_size=3,
                            stride=1,
                            padding=1),
            torch.nn.ReLU(),
            # (batch_size, 128, 7, 7) --> (batch_size, 128, 4, 4)
            torch.nn.MaxPool2d(kernel_size=2, 
                               stride=2, 
                               padding=1),  # default: 0
            torch.nn.Dropout(p=dropout_probability)
            )
        
        # [batch_size, 128, 4, 4] -> [batch_size, 2048]
        self.flatten = torch.nn.Flatten()  # for feed-forward 

        # [batch_size, 2048] --> [batch_size, 625]
        self.fc1 = torch.nn.Linear(in_features=4 * 4 * 128,
                                   out_features=625,
                                   bias=True)
        
        # [batch_size, 625] --> [batch_size, 10]
        self.fc2 = torch.nn.Linear(in_features=625,
                                   out_features=10,
                                   bias=True)
        
        torch.nn.init.xavier_uniform_(self.fc1.weight)  # initialize weights (seems to make no difference)
        torch.nn.init.xavier_uniform_(self.fc2.weight) 
        

    def forward(self, x: torch.Tensor) -> torch.Tensor:  # x: [batch_size, 28, 28]
        
        # CNN
        x_unflattened = self.unflatten(x)
        output_layer_1 = self.layer1(x_unflattened)  # [batch_size, 32, 14, 14]
        output_layer_2 = self.layer2(output_layer_1)  # [batch_size, 64, 7, 7]
        output_layer_3 = self.layer3(output_layer_2)  # [batch_size, 128, 4, 4]
        flattened = self.flatten(output_layer_3)  # flattened to[batch_size, 2048]
        
        # FC
        output_fully_connected_1 = self.fc1(flattened)  # [batch_size, 625]
        output_fully_connected_2 = self.fc2(output_fully_connected_1)  # [batch_size, 10]

        return output_fully_connected_2

In [None]:
# visualize the classifier
c_temp = CNNClassifier().to(DEVICE)
# to visualize with torchviz, we need some input that can pass through the model's forward() method.
predictions = c_temp(x_batch)
make_dot(predictions)

## Training

In [None]:
LEARNING_RATE = 0.001

classifier = CNNClassifier().to(DEVICE)
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(classifier.parameters(),
                             lr = LEARNING_RATE)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer,  # reduce learning rate when model stops improving on validation dataset 
                                                       mode='min', 
                                                       verbose=True)

In [None]:
def compute_metrics(classifier: CNNClassifier, 
                    loss_fn: Callable,
                    x: torch.Tensor, 
                    y: torch.Tensor
                   )->tuple[float, float, float]:
    
        y_pred_logits = classifier(x)
        loss = loss_fn(y_pred_logits, y).item()
    
        y_pred = y_pred_logits.argmax(dim=1)
        correct = (y_pred == y).type(torch.FloatTensor)
        accuracy = correct.mean().item()

        f1_score = sklearn.metrics.f1_score(y_true=y.cpu(), 
                                            y_pred=y_pred.cpu(),
                                            average='micro')  # multi-class problem
        
        return loss, accuracy, f1_score

In [None]:
df_metrics = pd.DataFrame(columns=['loss_train', 'accuracy_train', 'f1_train', 
                                   'loss_val', 'accuracy_val', 'f1_val'],
                          index=range(NUM_EPOCHS))

for epoch in tqdm(range(NUM_EPOCHS)):

    for batch, (x_train_batch, y_train_batch) in enumerate(train_loader):
        # x_train_batch: [batch_size, 28, 28] torch.float32
        # y_train_batch: [batch_size] torch.int64

        x_train_batch = x_train_batch.to(DEVICE)
        y_train_batch = y_train_batch.to(DEVICE)

        # switch to training mode mode (we might have been in evaluation mode)
        classifier.train()

        pred_train_batch = classifier(x_train_batch)  # [batch_size, 10]

        # clear existing gradients from previous batch
        optimizer.zero_grad()

        loss = loss_fn(pred_train_batch,
                       y_train_batch)  # [], .item() is e.g. 2.291177988052368

        # compute gradients (backpropagation), then apply gradients
        loss.backward()
        optimizer.step()
        
    # after each epoch, switch to evaluation mode, then evaluate without computing gradients
    classifier.eval()
    with torch.no_grad():
        loss_train, accuracy_train, f1_score_train = compute_metrics(classifier, loss_fn, train, y_train)
        # val: [6300, 28, 28], torch.float32
        loss_val, accuracy_val, f1_score_val = compute_metrics(classifier, loss_fn, val, y_val)

        df_metrics.iloc[epoch] = [loss_train, accuracy_train, f1_score_train,
                                  loss_val, accuracy_val, f1_score_val]
        
    scheduler.step(loss_val)
    print(f'Accuracy Validation after epoch {epoch}: {accuracy_val :.4f}  '
          f'(Train: {accuracy_train :.4f}) '
          f'LR = {optimizer.param_groups[0]["lr"]}\n')

In [None]:
optimizer.param_groups[0]["lr"]

# Evaluation

In [None]:
print(df_metrics.shape)
df_metrics

In [None]:
epochs = range(NUM_EPOCHS)

fig, ((ax1, ax2), (ax3, _)) = plt.subplots(nrows=2,
                                       ncols=2,
                                       figsize=(15,5),
                                          sharex=True)

# Plot and label the training and val loss values
ax1.plot(epochs, df_metrics['loss_train'], label='Training Loss')
ax1.plot(epochs, df_metrics['loss_val'], label='val Loss')
ax1.set_ylabel('Loss')
ax1.legend(loc='best')

# ... Accuracy
ax2.plot(epochs, df_metrics['accuracy_train'], label='Training Accuracy')
ax2.plot(epochs, df_metrics['accuracy_val'], label='val Accuracy')
ax2.set_ylabel('Accuracy')
ax2.legend(loc='best')

# ... F1-Score
ax3.plot(epochs, df_metrics['f1_train'], label='Training F1-Score')
ax3.plot(epochs, df_metrics['f1_val'], label='val F1-Score')
ax3.set_ylabel('F1-Score')
ax3.legend(loc='best')
ax3.set_xlabel('Epochs')
ax3.set_xticks(np.arange(0, 
                         NUM_EPOCHS))

plt.suptitle('Training and Validation Metrics')
plt.xlabel('Epochs')
plt.xticks(np.arange(0, 
                     NUM_EPOCHS))

plt.show()

## False Predictions

In [None]:
# let's have a look at some of the misclassified images from the validation dataset
classifier.eval()
with torch.no_grad():
    y_pred_logits = classifier(val)
    y_pred = y_pred_logits.argmax(dim=1)
    correct = (y_pred == y_val).type(torch.FloatTensor)  # [6300] with either 1.0 or 0.0
   
ser_correct = pd.Series(correct)
print(ser_correct.value_counts())

In [None]:
bad_indexes = ser_correct[ser_correct == 0.0].index.to_list()

val_misclassified = val[bad_indexes]  # [69, 28, 28]
y_val_misclassified = y_val[bad_indexes]
y_pred_misclassified = y_pred[bad_indexes]

In [None]:
fig, axes = plt.subplots(nrows=math.ceil(len(val_misclassified) / 10),  # axes: np.array of shape (7, 10)
                         ncols=10,
                         figsize=(15,15),
                        )

for i in range(len(val_misclassified)):
    image = val_misclassified[i]
    y = y_val_misclassified[i].item()
    pred = y_pred_misclassified[i].item()
    
    ax = axes[i//10, i%10]
    
    ax.imshow(X=image.squeeze(dim=0).cpu(),
              cmap='gray')
    ax.set_xticks([]) 
    ax.set_yticks([]) 
    ax.set_title(f"✓{y} / ⚠ {pred}")

# Submission

In [None]:
with torch.no_grad():
    y_pred_logits = classifier(test)  # [28000, 10], torch.float32
    y_pred = y_pred_logits.argmax(dim=1)  # [28000], torch.int64
    
    predicted_labels = y_pred.cpu().numpy()  # np.array (28000,), int64

print(predicted_labels)
print(pd.Series(predicted_labels).value_counts())

In [None]:
df_pred = pd.DataFrame({'ImageId': range(1, len(test)+1),
                        'Label': predicted_labels})
df_pred['Label'].value_counts()

In [None]:
df_pred.to_csv('submission.csv',
               index=False)

# Visualization of CNN Layers

## Image and Prediction

In [None]:
n_example = 50

# let's look at an example image from the validation dataset:
x_example = val[n_example].detach()  # [28, 28]
plt.imshow(x_example.cpu(), cmap='gray')

In [None]:
# predicted logits
classifier.eval()
with torch.no_grad():
    predicted_logits = classifier(x_example.unsqueeze(dim=0).to(DEVICE))  # [1, 10]
    for i in range(10):
        print(f'{i}: {round(predicted_logits[0][i].item(), 2) :5}')

## Outputs of CNN Layer 1

In [None]:
# from [28, 28] to [batch_size, channel, 28, 28], i.e. [1, 1, 28, 28]
x_unsqueezed = x_example.unsqueeze(dim=0)
x_unsqueezed = x_unsqueezed.unsqueeze(dim=0)

classifier.eval()
with torch.no_grad():
    output_layer_1 = classifier.layer1(
        x_unsqueezed.to(DEVICE)
        ).detach()  # 1, 32, 14, 14

fig = plt.figure(figsize=(25, 10))  # (width, height) in inches
for i in range(32):
    ax = fig.add_subplot(4,         # nrows
                         10,        # ncols
                         i+1,       # index (1-based)
                         xticks=[],
                         yticks=[])
    image = output_layer_1[0][i]  # (14, 14)
    ax.imshow(X=image.cpu(),
              cmap='gray')

### Show the outputs of CNN Layer 2

In [None]:
classifier.eval()
with torch.no_grad():
    output_layer_2 = classifier.layer2(output_layer_1.to(DEVICE)).detach()  # [1, 64, 7, 7]

fig = plt.figure(figsize=(25, 5))  # (width, height) in inches
for i in range(64):
    ax = fig.add_subplot(
                 4,         # nrows
                 20,        # ncols
                 i+1,       # index (1-based)
                 xticks=[],
                 yticks=[])
    image = output_layer_2[0][i]  # (7, 7)
    ax.imshow(X=image.cpu(),
              cmap='gray')

### Show the outputs of CNN Layer 3

In [None]:
classifier.eval()
with torch.no_grad():
    output_layer_3 = classifier.layer3(output_layer_2.to(DEVICE)).detach()  # [1, 128, 4, 4]

print(output_layer_3.shape)
    
fig = plt.figure(figsize=(25, 5))  # (width, height) in inches
for i in range(128):
    ax = fig.add_subplot(
                 5,         # nrows
                 30,        # ncols
                 i+1,       # index (1-based)
                 xticks=[],
                 yticks=[])
    image = output_layer_3[0][i]  # (28, 28)
    ax.imshow(X=image.cpu(),
              cmap='gray')