In [1]:
from torch.optim import SGD
import torch
import torchvision
from torch.nn import CrossEntropyLoss
import numpy as np
from torch.utils.data import DataLoader, TensorDataset

In [2]:
use_cuda = True

device = torch.device('cuda' if use_cuda and torch.cuda.is_available() else 'cpu')

# Data

The rountine downloads data, reshapes and normalizes data:

In [3]:
def get_mnist(normalize=True):
    train = torchvision.datasets.MNIST('data', download=True, train=True, transform=torchvision.transforms.ToTensor())
    test = torchvision.datasets.MNIST('data', train=False, transform=torchvision.transforms.ToTensor())

    xtrain = train.data.to(torch.float32)
    xtest = test.data.to(torch.float32)

    if normalize:
        xtrain = xtrain.view(-1, 28*28)
        xtest = xtest.view(-1, 28*28)
        mu, std = xtrain.mean(0), xtrain.std(0)

        xtrain -= mu
        xtest -= mu
        xtrain[:, std > 0] /= std[std > 0]
        xtest[:, std > 0] /= std[std > 0]
        
        xtrain = xtrain.view(-1, 28, 28)
        xtest = xtest.view(-1, 28, 28)

    train.data = xtrain
    test.data = xtest

    return train, test

In [4]:
train_dataset, test_dataset = get_mnist(normalize=True)

In [5]:
# Average of average per-pixel mean
train_dataset.data.mean(0).mean().item(), test_dataset.data.mean(0).mean().item()

(-7.328089801639237e-10, 0.002495675580576062)

# Model

In [6]:
import torch.nn as nn
import torch.nn.functional as F

# https://github.com/floydhub/mnist/blob/master/ConvNet.py
class Net(nn.Module):
    """ConvNet -> Max_Pool -> RELU -> ConvNet -> Max_Pool -> RELU -> FC -> RELU -> FC -> SOFTMAX"""
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 20, 5, 1)
        self.conv2 = nn.Conv2d(20, 50, 5, 1)
        self.fc1 = nn.Linear(4*4*50, 500)
        self.fc2 = nn.Linear(500, 10)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2, 2)
        x = x.view(-1, 4*4*50)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)

In [7]:
net = Net().to(device)

# Training

In [8]:
def accuracy(yhat, y):
    prediction = yhat.argmax(dim=1)
    return (y.eq(prediction)).to(float).mean().item()

In [9]:
from typing import Iterable, Callable

from torch.nn import Module
from torch.optim import Optimizer
from time import time

def train_epoch(model: Module, dataset: Iterable, optim: Optimizer, loss_fun: Module, metric_fun: Callable, device):
    losses = []
    metrics = []
    for batch_id, (x, y) in enumerate(dataset, 1):
        # Move to GPU
        x, y = x.to(device), y.to(device)
        # Reset gradients
        optim.zero_grad()
        # Forward pass
        yhat = model(x)
        # Performance evaluation
        metrics.append(metric_fun(yhat, y))
        # Compute loss
        loss = loss_fun(yhat, y)
        # Backward pass
        loss.backward()
        # Optimization step
        optim.step()

        losses.append(loss.item())

    return losses, metrics


def training(model: Module, dataset: Iterable, optim: Optimizer, loss_fun: Module, metric_fun: Callable, epochs: int, device):
    losses_epoch = []
    metrics_epoch = []
    model.train()
    t = time()
    for epoch in range(epochs):
        losses, metrics = train_epoch(model, dataset, optim, loss_fun, metric_fun, device)
        losses_epoch.append(losses)
        metrics_epoch.append(metrics)
        print(f'epoch {epoch}\tloss = {np.mean(losses)}\tacc = {np.mean(metrics)}')
    
    t = time() - t
    print('training took', t, 's')

    return losses_epoch, metrics_epoch

In [10]:
# Dataset preparation
# Add an axis to have one channel 
train_dataset.data = train_dataset.data[:, None, :]
test_dataset.data = test_dataset.data[:, None, :]

batch_size = 64
train_loader = DataLoader(TensorDataset(train_dataset.data, train_dataset.targets), batch_size=batch_size)

In [11]:
opt = SGD(net.parameters(), lr=0.01)

In [12]:
losses, accs = training(net, train_loader, opt, CrossEntropyLoss(), accuracy, epochs=5, device=device)

epoch 0	loss = 0.6013755448289645	acc = 0.8408515458422174
epoch 1	loss = 0.148831311306521	acc = 0.9561400586353944
epoch 2	loss = 0.09917276037106318	acc = 0.9707322761194029
epoch 3	loss = 0.07691708515792378	acc = 0.9769956023454158
epoch 4	loss = 0.06368229625811642	acc = 0.9810934168443497
training took 7.350011825561523 s


# Test

In [13]:
def test(model: Module, dataset: Iterable, loss_fun: Module, metric_fun: Callable, device):
    losses = []
    metrics = []
    model.eval()
    with torch.no_grad():
        for x, y in dataset:
            x, y = x.to(device), y.to(device)
            yhat = model(x)
            loss = loss_fun(yhat, y)
            losses.append(loss.item())
            metrics.append(metric_fun(yhat, y))
            
    print(f'Avg loss = {np.mean(losses)}\tAvg acc = {np.mean(metrics)}')
    return losses, metrics

In [14]:
batch_size = 64
test_loader = DataLoader(TensorDataset(test_dataset.data, test_dataset.targets), batch_size=batch_size)
losses, metrics = test(net, test_loader, CrossEntropyLoss(), accuracy, device)

Avg loss = 0.05828293676934484	Avg acc = 0.9804936305732485
