# OpenMLSys Sec 3.2 Multi-layer Perceptron

## Prepare Data

In [2]:
import os
import shutil
import numpy as np
import struct
import zipfile
from kaggle.api.kaggle_api_extended import KaggleApi

def download_and_preprocess_mnist(output_dir="mnist_data"):
    os.makedirs(output_dir, exist_ok=True)

    # Initialize Kaggle API
    api = KaggleApi()
    api.authenticate()
    
    # Download MNIST dataset
    dataset_zip = os.path.join(output_dir, "mnist-dataset.zip")
    print("Downloading MNIST dataset...")
    api.dataset_download_files("hojjatk/mnist-dataset", path=output_dir, unzip=True)

    # Organizing extracted files
    files = {
        "train_images": "train-images.idx3-ubyte",
        "train_labels": "train-labels.idx1-ubyte",
        "test_images": "t10k-images.idx3-ubyte",
        "test_labels": "t10k-labels.idx1-ubyte"
    }
    
    for key, filename in files.items():
        files[key] = os.path.join(output_dir, filename)

    # Function to read MNIST files
    def read_idx(filename):
        with open(filename, 'rb') as f:
            magic, num = struct.unpack(">II", f.read(8))
            if magic == 2051:  # Image file
                rows, cols = struct.unpack(">II", f.read(8))
                return np.frombuffer(f.read(), dtype=np.uint8).reshape(num, rows, cols)
            elif magic == 2049:  # Label file
                return np.frombuffer(f.read(), dtype=np.uint8)
    
    # Read dataset
    train_images = read_idx(files["train_images"])
    train_labels = read_idx(files["train_labels"])
    test_images = read_idx(files["test_images"])
    test_labels = read_idx(files["test_labels"])

    # Merge train and test sets before splitting
    all_images = np.concatenate([train_images, test_images], axis=0)
    all_labels = np.concatenate([train_labels, test_labels], axis=0)

    # Shuffle dataset before splitting
    indices = np.arange(len(all_images))
    np.random.shuffle(indices)

    all_images = all_images[indices]
    all_labels = all_labels[indices]

    # Split into train (70%) and test (30%)
    split_idx = int(0.7 * len(all_images))
    train_images, test_images = all_images[:split_idx], all_images[split_idx:]
    train_labels, test_labels = all_labels[:split_idx], all_labels[split_idx:]

    # Create train/ and test/ directories
    train_dir = os.path.join(output_dir, "train")
    test_dir = os.path.join(output_dir, "test")
    os.makedirs(train_dir, exist_ok=True)
    os.makedirs(test_dir, exist_ok=True)

    # Save datasets as .npy
    np.save(os.path.join(train_dir, "images.npy"), train_images)
    np.save(os.path.join(train_dir, "labels.npy"), train_labels)
    np.save(os.path.join(test_dir, "images.npy"), test_images)
    np.save(os.path.join(test_dir, "labels.npy"), test_labels)

    print("MNIST dataset has been processed and saved in:")
    print(f" - Training data: {train_dir}")
    print(f" - Testing data: {test_dir}")

# Run the function
download_and_preprocess_mnist()


Downloading MNIST dataset...
Dataset URL: https://www.kaggle.com/datasets/hojjatk/mnist-dataset
MNIST dataset has been processed and saved in:
 - Training data: mnist_data/train
 - Testing data: mnist_data/test


In [12]:
import os

# Get the current working directory (cwd)
cwd = os.getcwd()
print(f"Current working directory: {cwd}")

# List the contents of the current directory
files = os.listdir(cwd + "/mnist_data")

print("Files and directories in the current directory:")
for file in files:
    print(file)


Current working directory: /home/yliang/ml
Files and directories in the current directory:
train-labels-idx1-ubyte
t10k-images-idx3-ubyte
train-images-idx3-ubyte
t10k-images.idx3-ubyte
t10k-labels.idx1-ubyte
train-labels.idx1-ubyte
test
train-images.idx3-ubyte
t10k-labels-idx1-ubyte
train


Now we start using the mnist dataset at ./dataset/

In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms

import numpy as np

## Data Pre-processing

In [5]:
class MNISTDataset(Dataset):
    def __init__(self, images_path, labels_path, transform=None):
        """
        images_path: path to the .npy file containing MNIST images
        labels_path: path to the .npy file containing MNIST labels
        transform: optional torchvision transforms to apply to each image
        """
        self.images = np.load(images_path)   # shape (N, 28, 28)
        self.labels = np.load(labels_path)   # shape (N,)
        self.transform = transform

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]

        # The image is currently a NumPy array of shape (28, 28).
        # If a transform is provided, we'll convert it to a PIL image first
        # then apply the transforms.
        if self.transform:
            image = self.transform(image)

        return image, label


# Define our transforms:
# 1) Convert the numpy array to a PIL image
# 2) Resize to (32, 32) for demonstration (optional)
# 3) Convert to a normalized tensor
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((32, 32)),
    transforms.ToTensor()  # This converts [0, 255] range to [0, 1.0]
])

# Create Dataset objects for train/test
train_dataset = MNISTDataset(
    images_path="mnist_data/train/images.npy",
    labels_path="mnist_data/train/labels.npy",
    transform=transform
)

test_dataset = MNISTDataset(
    images_path="mnist_data/test/images.npy",
    labels_path="mnist_data/test/labels.npy",
    transform=transform
)

# Create DataLoaders for batching and shuffling
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader  = DataLoader(test_dataset, batch_size=64, shuffle=False)

print("DataLoaders created.")
print(f"Number of training samples: {len(train_dataset)}")
print(f"Number of testing samples: {len(test_dataset)}")

DataLoaders created.
Number of training samples: 49000
Number of testing samples: 21000


In [7]:
class MLP(nn.Module):
    def __init__(self):
        super(MLP, self).__init__()
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(32*32, 256)
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, 10)
        
    def forward(self, x):
        x = self.flatten(x)     # Flatten from [B, 1, 32, 32] -> [B, 1024]
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)         # Output logits of shape [B, 10]
        return x

# Instantiate the model
model = MLP()
print("Model architecture:")
print(model)

Model architecture:
MLP(
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (fc1): Linear(in_features=1024, out_features=256, bias=True)
  (fc2): Linear(in_features=256, out_features=128, bias=True)
  (fc3): Linear(in_features=128, out_features=10, bias=True)
)


## Loss Function and Optimizer

In [8]:
criterion = nn.CrossEntropyLoss()       # Cross-entropy loss for multi-class classification
optimizer = optim.Adam(model.parameters(), lr=0.001)  # Adam optimizer with learning rate = 0.001

print("Loss function and optimizer are set.")

Loss function and optimizer are set.


## Training

In [9]:
num_epochs = 5  # Example: 5 epochs

for epoch in range(num_epochs):
    model.train()  # set model to training mode
    
    total_loss = 0.0
    for images, labels in train_loader:
        # images shape: [batch_size, 1, 32, 32]
        # labels shape: [batch_size]

        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
    
    avg_loss = total_loss / len(train_loader)
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}")

Epoch [1/5], Loss: 0.3131
Epoch [2/5], Loss: 0.1246
Epoch [3/5], Loss: 0.0818
Epoch [4/5], Loss: 0.0639
Epoch [5/5], Loss: 0.0490


## Testing

In [10]:
model.eval()  # set model to evaluation mode
correct = 0
total = 0

with torch.no_grad():
    for images, labels in test_loader:
        outputs = model(images)            # Forward pass
        _, predicted = torch.max(outputs, 1)  # Get class with highest logit
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

accuracy = 100 * correct / total
print(f"Test Accuracy: {accuracy:.2f}%")

Test Accuracy: 97.17%
