In [5]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torchvision import datasets
import torchvision.transforms as transforms
import torchvision

# reshaping the MNIST images from 28x28 to 32x32

training_data = datasets.MNIST(
    root="data",
    train=True,
    download=True,
    transform=transforms.Compose([
        transforms.Resize(32),
        transforms.ToTensor(),
    ])
)

test_data = datasets.MNIST(
    root="data",
    train=False,
    download=True,
    transform=transforms.Compose([
        transforms.Resize(32),
        transforms.ToTensor(),
    ])
)

train_dataloader = DataLoader(training_data, batch_size=64, shuffle=True)
test_dataloader = DataLoader(test_data, batch_size=64)

device = "cuda" if torch.cuda.is_available() else "cpu"

In [8]:
num_features = 32
num_epochs = 10
lr = 0.001
batch_size = 64

In [103]:
# overcomplicated model below, just a fun toy to play with

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.main = nn.Sequential(
            # input is 1 x 32 x 32
            nn.Conv2d(1, num_features, 5, 1, 2, bias=False),
            nn.ReLU(True),
            # now 32 x 32 x 32
            nn.Conv2d(num_features, num_features * 2, 5, 1, 1, bias=False),
            nn.BatchNorm2d(num_features * 2),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Dropout(0.25),
            # now 64 x 30 x 30
            nn.Conv2d(num_features * 2, num_features * 4, 5, 2, 3, bias=False),
            nn.BatchNorm2d(num_features * 4),
            nn.LeakyReLU(0.2, inplace=True),
            # now 128 x 16 x 16
            nn.Conv2d(num_features * 4, num_features * 8, 5, 1, 0, bias=False),
            nn.BatchNorm2d(num_features * 8),
            nn.LeakyReLU(0.2, inplace=True),
            # now 256 x 12 x 12
            nn.Conv2d(num_features * 8, num_features * 8, 5, 2, 1, bias=False),
            nn.BatchNorm2d(num_features * 8),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Dropout(0.25),
            # now 256 x 5 x 5
            nn.Conv2d(num_features * 8, num_features * 16, 5, 1, 0, bias=False),
            nn.BatchNorm2d(num_features * 16),
            nn.LeakyReLU(0.2, inplace=True),
            # now 512 x 1 x 1
            # we will now flatten this and apply a linear layer to get the output
            nn.Flatten(),
            nn.Linear(num_features * 16, 256),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Dropout(0.25),
            nn.Linear(256, 128),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(128, 10)
        )

    def forward(self, input):
        return self.main(input)

In [109]:
## simpler model below (leNet), quicker to train but worse performance

# import torch
# import torch.nn as nn
# import torch.nn.functional as F

# class Net(nn.Module):
#     def __init__(self):
#         super(Net, self).__init__()
#         # 1 input image channel, 6 output channels, 5x5 square convolution
#         # kernel
#         self.conv1 = nn.Conv2d(1, 10, 5, padding=2)
#         self.conv2 = nn.Conv2d(10, 25, 5)
#         # an affine operation: y = Wx + b
#         self.fc1 = nn.Linear(25 * 5 * 5, 120)
#         self.fc2 = nn.Linear(120, 84)
#         self.fc3 = nn.Linear(84, 10)

#     def forward(self, x):
#         # Max pooling over a (2, 2) window
#         x = F.max_pool2d(F.relu(self.conv1(x)), 2)
#         # If the size is a square, you can specify with a single number
#         x = F.max_pool2d(F.relu(self.conv2(x)), 2)
#         x = torch.flatten(x, 1) # flatten all dimensions except the batch dimension
#         x = F.relu(self.fc1(x))
#         x = F.relu(self.fc2(x))
#         x = self.fc3(x)
#         return x

In [104]:
net = Net().to(device)
print(net)

Net(
  (main): Sequential(
    (0): Conv2d(1, 32, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2), bias=False)
    (1): ReLU(inplace=True)
    (2): Conv2d(32, 64, kernel_size=(5, 5), stride=(1, 1), padding=(1, 1), bias=False)
    (3): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (4): LeakyReLU(negative_slope=0.2, inplace=True)
    (5): Dropout(p=0.25, inplace=False)
    (6): Conv2d(64, 128, kernel_size=(5, 5), stride=(2, 2), padding=(3, 3), bias=False)
    (7): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (8): LeakyReLU(negative_slope=0.2, inplace=True)
    (9): Conv2d(128, 256, kernel_size=(5, 5), stride=(1, 1), bias=False)
    (10): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (11): LeakyReLU(negative_slope=0.2, inplace=True)
    (12): Conv2d(256, 256, kernel_size=(5, 5), stride=(2, 2), padding=(1, 1), bias=False)
    (13): BatchNorm2d(256, eps=1e-05, momentum=0.1,

In [106]:
def train_loop(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    # Set the model to training mode - important for batch normalization and dropout layers
    # Unnecessary in this situation but added for best practices
    model.train()
    total_loss = 0.0
    for batch, (X, y) in enumerate(dataloader):
        X = X.to(device)
        y = y.to(device)
        # Compute prediction and loss
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        total_loss += loss.item()

        if batch % 100 == 0 and batch > 0:
            loss, current = loss.item(), (batch + 1) * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

    total_loss /= (batch + 1)
    return total_loss


def test_loop(dataloader, model, loss_fn):
    # Set the model to evaluation mode - important for batch normalization and dropout layers
    # Unnecessary in this situation but added for best practices
    model.eval()
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, correct = 0, 0

    # Evaluating the model with torch.no_grad() ensures that no gradients are computed during test mode
    # also serves to reduce unnecessary gradient computations and memory usage for tensors with requires_grad=True
    with torch.no_grad():
        for X, y in dataloader:
            X = X.to(device)
            y = y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()

    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [105]:
from torch.optim.lr_scheduler import *

optimizer = torch.optim.SGD(net.parameters(), lr=lr, momentum=0.9)
scheduler = StepLR(optimizer, step_size=2, gamma=0.9)
criterion = nn.CrossEntropyLoss()

In [107]:
epochs = 15
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train_loss = train_loop(train_dataloader, net, criterion, optimizer)
    print(f"\nTrain loss: {train_loss:.3f}")
    test_loop(test_dataloader, net, criterion)
    scheduler.step()
print("Done!")

Epoch 1
-------------------------------
loss: 1.512834  [ 6464/60000]
loss: 0.748784  [12864/60000]
loss: 0.281845  [19264/60000]
loss: 0.220910  [25664/60000]
loss: 0.227667  [32064/60000]
loss: 0.050484  [38464/60000]
loss: 0.182676  [44864/60000]
loss: 0.060664  [51264/60000]
loss: 0.103765  [57664/60000]

Train loss: 0.474
Test Error: 
 Accuracy: 98.4%, Avg loss: 0.056675 

Epoch 2
-------------------------------
loss: 0.103534  [ 6464/60000]
loss: 0.174568  [12864/60000]
loss: 0.071310  [19264/60000]
loss: 0.059385  [25664/60000]
loss: 0.122034  [32064/60000]
loss: 0.085418  [38464/60000]
loss: 0.035791  [44864/60000]
loss: 0.032629  [51264/60000]
loss: 0.160789  [57664/60000]

Train loss: 0.068
Test Error: 
 Accuracy: 98.9%, Avg loss: 0.033002 

Epoch 3
-------------------------------
loss: 0.150817  [ 6464/60000]
loss: 0.008508  [12864/60000]
loss: 0.024912  [19264/60000]
loss: 0.047525  [25664/60000]
loss: 0.005829  [32064/60000]
loss: 0.089259  [38464/60000]
loss: 0.012038  [4

In [108]:
## Final evaluation

net.eval()
correct = 0
total = 0
with torch.no_grad():
    for data in test_dataloader:
        images, labels = data
        images = images.to(device)
        labels = labels.to(device)
        outputs = net(images)
        _, predicted = torch.max(outputs.data, 1) # returns (values, indices)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f"Correct vs Total: {correct} / {total}")

Correct vs Total: 9948 / 10000
