In [24]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torchvision.utils import make_grid
import numpy as np
import pandas as pd
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt

Set device and reproducibility

In [46]:
torch.manual_seed(101)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

Create a transformer instance: transform to tensor

In [26]:
transform = transforms.ToTensor()

Import the MNIST Dataset and divide into train and test data

In [27]:
train_data = datasets.MNIST(root = "train_images/MNIST", train=True, download=True, transform=transform)
test_data = datasets.MNIST(root = "train_images/MNIST", train=False, download=True, transform=transform)

Create the dataloaders with a small batch size (for CNN)

In [31]:
train_loader = DataLoader(dataset=train_data, batch_size=10, shuffle=True)
test_loader = DataLoader(dataset=test_data, batch_size=10, shuffle=False)

### Parameters of a convolutional layer
Example:
*conv1 = nn.Conv2d(1,6,3,1)* means:

- 1, for one input channel (grayscale)
- 6, output channels for feature extractions, the feature maps, the filters that the CNN will figure out
- 3, kernel size for a 3 by 3 filter
- 1, for the horizontal and vertical stride of the convolution

### Simple training step with convolutional layers and polling layers
- Input
- Convolutional layer
- Pooling layer
- Flattening

In [48]:
# define the convolutional layer
conv1 = nn.Conv2d(1,6,3,1)

for i, (X_train, Y_train) in enumerate(train_data):
    break

print("Shape of X_train:", X_train.shape)

# add an extra dimension to X_train to make the convolution (1 for batch)
x = X_train.view(1,1,28,28)

# perform a convolution 
out = conv1(x)
print("Shape after the convolution:", out.shape)

# perform a pooling
max_p = F.max_pool2d(out,2,2)
print("Shape after the pooling", max_p.shape)

# How to feed this output of max_p to a fully connected input?
x = max_p.view(-1, 6*13*13)
print("Shape after the flattening", x.shape)

Shape of X_train: torch.Size([1, 28, 28])
Shape after the convolution: torch.Size([1, 6, 26, 26])
Shape after the pooling torch.Size([1, 6, 13, 13])
Shape after the flattening torch.Size([1, 1014])


### CNN Model
Architecture of the neural network:

CNN -> POL -> CNN -> POL -> FLAT -> FCL1 -> FCL2 -> FCL3

In [63]:
class ConvolutionalNN(nn.Module):
    def __init__(self, filter1, filter2):
        super().__init__()
        self.conv1 = nn.Conv2d(1, filter1, 3, 1)  # Example: You can change this to any number of filters
        self.conv2 = nn.Conv2d(6, filter2, 3, 1) # Example: You can change this to any number of filters
        self.pool = nn.MaxPool2d(2, 2)
        
        # Initialize fully connected layers with placeholder
        self.fc1 = None
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

        # Dynamically compute the input size for the first fully connected layer
        self._initialize_fc1()

    def _initialize_fc1(self):
        # Create a dummy tensor with the same size as an MNIST input
        dummy_input = torch.zeros(1, 1, 28, 28)

        # Pass the dummy input through the conv and pooling layers to calculate the flattened size
        dummy_output = self.pool(F.relu(self.conv1(dummy_input)))
        dummy_output = self.pool(F.relu(self.conv2(dummy_output)))
        
        # Calculate the flattened size
        flattened_size = dummy_output.numel()  # Returns the total number of elements in the tensor
        
        # Now that we know the flattened size, we can initialize the first fully connected layer
        self.fc1 = nn.Linear(flattened_size, 120)

    def forward(self, X):
        X = F.relu(self.conv1(X))
        X = self.pool(X)  
        X = F.relu(self.conv2(X))
        X = self.pool(X)  
        X = X.view(X.size(0), -1)  # Flattening
        X = F.relu(self.fc1(X))
        X = F.relu(self.fc2(X))
        X = self.fc3(X)
        return F.log_softmax(X, dim=1)

In [64]:
model = ConvolutionalNN(filter1=6, filter2=16).to(device)
print(model)

ConvolutionalNN(
  (conv1): Conv2d(1, 6, kernel_size=(3, 3), stride=(1, 1))
  (conv2): Conv2d(6, 16, kernel_size=(3, 3), stride=(1, 1))
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (fc2): Linear(in_features=120, out_features=84, bias=True)
  (fc3): Linear(in_features=84, out_features=10, bias=True)
  (fc1): Linear(in_features=400, out_features=120, bias=True)
)


In [65]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [66]:
import time
start_time = time.time()

epochs = 5
train_loss = []
test_losses = []
train_correct = []
test_correct = []

for epoch in range(epochs):
    trn_corr = 0
    tst_corr = 0
    
    # train 
    for b, (X_trn, Y_trn) in enumerate(train_loader):
        X_trn, Y_trn = X_trn.to(device), Y_trn.to(device)  # Ensure inputs and targets are on the same device
        b += 1
        
        y_pred = model(X_trn)  # Fixed typo: use X_trn instead of X_train
        loss = criterion(y_pred, Y_trn)
        
        predicted = torch.max(y_pred.data, 1)[1]
        batch_corr = (predicted == Y_trn).sum()  # Corrected from y_pred to Y_trn
        trn_corr += batch_corr
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        if b % 600 == 0:
            print(f'epoch: {i:2}  batch: {b:4} [{10*b:6}/60000]  loss: {loss.item():10.8f}  \
accuracy: {trn_corr.item()*100/(10*b):7.3f}%')
    train_loss.append(loss.item())
    train_correct.append(trn_corr)  # Use trn_corr instead of batch_corr for the entire epoch
        
    # test
    with torch.no_grad():
        for b, (X_test, Y_test) in enumerate(test_loader):
            X_test, Y_test = X_test.to(device), Y_test.to(device)  # Ensure test inputs and targets are on the same device
            y_val = model(X_test)
            predicted = torch.max(y_val.data, 1)[1]
            tst_corr += (predicted == Y_test).sum()
        
    loss = criterion(y_val, Y_test)
    test_losses.append(loss.item())
    test_correct.append(tst_corr)
        
current_time = time.time()
total = current_time - start_time
print(f'\nDuration: {time.time() - start_time:.0f} seconds')

epoch:  0  batch:  600 [  6000/60000]  loss: 0.50230491  accuracy:  73.650%
epoch:  0  batch: 1200 [ 12000/60000]  loss: 0.32116914  accuracy:  83.242%
epoch:  0  batch: 1800 [ 18000/60000]  loss: 0.38754264  accuracy:  87.172%
epoch:  0  batch: 2400 [ 24000/60000]  loss: 0.35554084  accuracy:  89.242%
epoch:  0  batch: 3000 [ 30000/60000]  loss: 0.08998176  accuracy:  90.667%
epoch:  0  batch: 3600 [ 36000/60000]  loss: 0.08671220  accuracy:  91.642%
epoch:  0  batch: 4200 [ 42000/60000]  loss: 0.52407354  accuracy:  92.362%
epoch:  0  batch: 4800 [ 48000/60000]  loss: 0.41157693  accuracy:  92.902%
epoch:  0  batch: 5400 [ 54000/60000]  loss: 0.16420719  accuracy:  93.381%
epoch:  0  batch: 6000 [ 60000/60000]  loss: 0.00142492  accuracy:  93.795%
epoch:  0  batch:  600 [  6000/60000]  loss: 0.00271948  accuracy:  97.783%
epoch:  0  batch: 1200 [ 12000/60000]  loss: 0.04339140  accuracy:  97.542%
epoch:  0  batch: 1800 [ 18000/60000]  loss: 0.00172344  accuracy:  97.656%
epoch:  0  b