# MNIST Classification with CNNs
### UCR EE/CS 228 Introduction to Deep Learning, Instructor: Yinglun Zhu (yinglunz.com)

Part of the code is borrowed from the PyTorch Tutorial https://pytorch.org/tutorials/beginner/basics/quickstart_tutorial.html

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor

In [None]:
# Download training data from open datasets.
train_data = datasets.MNIST(
    root="data",
    train=True,
    download=True,
    transform=ToTensor(),
)

# Download test data from open datasets.
test_data = datasets.MNIST(
    root="data",
    train=False,
    download=True,
    transform=ToTensor(),
)

batch_size = 64

# Create data loaders.
train_dataloader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_data, batch_size=batch_size)

# Get cpu, gpu or mps device for training.
device = (
    "cuda"
    if torch.cuda.is_available()
    else "cpu"
)
print(f"Using {device} device.")

In [None]:
# Define MLP model
class MLP(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28*28, 512),
            nn.ReLU(),
            nn.Linear(512, 10)
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

model = MLP().to(device)
print(model)

In [None]:
# Define LeNet
class LeNet(nn.Module):
    def __init__(self):
        super(LeNet, self).__init__()
        
        # Convolutional layers
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=6, kernel_size=5, stride=1, padding=2)  # 28x28 -> 28x28
        self.conv2 = nn.Conv2d(in_channels=6, out_channels=16, kernel_size=5, stride=1)  # 14x14 -> 10x10
        
        # Fully connected layers
        self.fc1 = nn.Linear(in_features=16*5*5, out_features=120)  # 16*5*5 = 400
        self.fc2 = nn.Linear(in_features=120, out_features=84)
        self.fc3 = nn.Linear(in_features=84, out_features=10)  # 10 output classes (e.g., for MNIST)

    def forward(self, x):
        # Convolution + Activation + Pooling
        x = F.relu(self.conv1(x))  # 28x28 -> 28x28
        x = F.max_pool2d(x, kernel_size=2, stride=2)  # 28x28 -> 14x14
        
        x = F.relu(self.conv2(x))  # 14x14 -> 10x10
        x = F.max_pool2d(x, kernel_size=2, stride=2)  # 10x10 -> 5x5
        
        # Flatten the output for the fully connected layers
        x = x.view(-1, 16*5*5)  # Flatten to (batch_size, 400)
        
        # Fully connected layers + Activation
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        
        # Output layer
        x = self.fc3(x)  # No activation (e.g., softmax should be applied in the loss function if needed)
        return x

# Instantiate the model
model = LeNet()

# Print the model summary
print(model)

In [None]:
# Specify loss function and optimizer
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

In [None]:
def print_effective_lr(optimizer):
    """Prints the effective learning rate for the first parameter, handling both Adam and SGD."""
    param_group = optimizer.param_groups[0]  # Access the first parameter group
    param = param_group['params'][0]  # Access the first parameter in the group

    # Ensure the parameter has gradients before accessing them
    if param.grad is not None:
        if isinstance(optimizer, torch.optim.Adam):
            # For Adam, calculate the effective learning rate
            effective_lr = param_group['lr'] / (
                param.grad.pow(2).mean().sqrt().item() + param_group['eps']
            )
            print(f"Effective LR (Adam) for the first parameter: {effective_lr:.6f}")
        elif isinstance(optimizer, torch.optim.SGD):
            # For SGD, print the global learning rate
            print(f"Learning Rate (SGD): {param_group['lr']:.6f}")
        else:
            print("Optimizer not supported for effective LR printing.")
    else:
        print("Gradient not available for the first parameter.")

# Training function
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        loss.backward()

        # Print effective learning rate only after the first batch
        if batch == 0:
            print_effective_lr(optimizer)
        
        optimizer.step()
        optimizer.zero_grad()

        if batch % 200 == 0:
            loss, current = loss.item(), (batch + 1) * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

In [None]:
# Test function
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [None]:
# Test trained MLP
torch.manual_seed(42)
model = MLP().to(device)
# Specify loss function and optimizer
loss_fn = nn.CrossEntropyLoss()
# optimizer = torch.optim.SGD(model.parameters(), lr=1e-2)
# optimizer = torch.optim.SGD(model.parameters(), lr=1e-2, momentum=0.9)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, betas=(0.9, 0.999), eps=1e-8, weight_decay=0)
epochs = 5
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_dataloader, model, loss_fn, optimizer)
    test(test_dataloader, model, loss_fn)
print("Done!")

In [None]:
# Test trained LeNet
torch.manual_seed(42)
model = LeNet().to(device)
# Specify loss function and optimizer
loss_fn = nn.CrossEntropyLoss()
# optimizer = torch.optim.SGD(model.parameters(), lr=1e-2)
# optimizer = torch.optim.SGD(model.parameters(), lr=1e-2, momentum=0.9)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, betas=(0.9, 0.999), eps=1e-8, weight_decay=0)
epochs = 5
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_dataloader, model, loss_fn, optimizer)
    test(test_dataloader, model, loss_fn)
print("Done!")