In [None]:
%matplotlib inline


MNIST Handwritten Digit Recognition in PyTorch

Sources:

- [MNIST nextjournal.com/gkoehler/pytorch-mnist](https://nextjournal.com/gkoehler/pytorch-mnist)
- [MNIST github/pytorch/examples](https://github.com/pytorch/examples/tree/master/mnist)
- [MNIST kaggle](https://www.kaggle.com/sdelecourt/cnn-with-pytorch-for-mnist)

Convert to jupyter:
```
sphx_glr_python_to_jupyter.py  dl_cnn_mnist_pytorch.py
```



In [None]:
import os
import numpy as np
import argparse
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms

Define the hyperparameters we'll be using for the experiment.

Here the number of epochs defines how many times we'll
loop over the complete training dataset, while learning_rate and momentum are hyperparameters
for the optimizer we'll be using later on.



In [None]:
import tempfile
WD = os.path.join(tempfile.gettempdir(), "dl_cnn_mnist_pytorch")
os.makedirs(WD, exist_ok=True)
os.chdir(WD)
print("Working dir is:", os.getcwd())
os.makedirs("data", exist_ok=True)
os.makedirs("models", exist_ok=True)

n_epochs = 2
batch_size_train = 64
batch_size_test = 1000
learning_rate = 0.01
momentum = 0.5
log_interval = 10
random_seed = 1
no_cuda = True

use_cuda = not no_cuda and torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")

Load dataset



In [None]:
def load_mnist(batch_size_train, batch_size_test):
    train_loader = torch.utils.data.DataLoader(
        datasets.MNIST('data', train=True, download=True,
                       transform=transforms.Compose([
                           transforms.ToTensor(),
                           transforms.Normalize((0.1307,), (0.3081,))
                       ])),
        batch_size=batch_size_train, shuffle=True)
    test_loader = torch.utils.data.DataLoader(
        datasets.MNIST('data', train=False, transform=transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.1307,), (0.3081,))
        ])),
        batch_size=batch_size_test, shuffle=True)
    return train_loader, test_loader

train_loader, test_loader = load_mnist(batch_size_train, batch_size_test)
data_shape = train_loader.dataset.data.shape[1:]
D_in = np.prod(data_shape)
D_out = len(train_loader.dataset.targets.unique())

Now let's take a look at some examples. We'll use the test_loader for this.



In [None]:
batch_idx, (example_data, example_targets) = next(enumerate(test_loader))
print(example_data.shape, example_targets.shape)

So one test data batch is a tensor of shape: . This means we have 1000 examples of 28x28 pixels in grayscale
(i.e. no rgb channels, hence the one). We can plot some of them using matplotlib.



In [None]:
import matplotlib.pyplot as plt

def show_data_label_prediction(data, y_true, y_pred=None, shape=(2, 3)):
    y_pred = [None] * len(y_true) if y_pred is None else y_pred
    fig = plt.figure()
    for i in range(np.prod(shape)):
        plt.subplot(*shape, i+1)
        plt.tight_layout()
        plt.imshow(data[i][0], cmap='gray', interpolation='none')
        plt.title("True: {} Pred: {}".format(y_true[i], y_pred[i]))
        plt.xticks([])
        plt.yticks([])

    return fig

show_data_label_prediction(data=example_data, y_true=example_targets, y_pred=None, shape=(2, 3))

Softmax Classifier (Multinomial Logistic Regression)




In [None]:
class TwoLayerMLP(nn.Module):

    def __init__(self, d_in, d_hidden, d_out):
        super(TwoLayerMLP, self).__init__()
        self.d_in = d_in
        
        self.linear1 = nn.Linear(d_in, d_hidden)
        self.linear2 = nn.Linear(d_hidden, d_out)

    def forward(self, X):
        X = X.view(-1, self.d_in)
        X = self.linear1(X)
        return F.log_softmax(self.linear2(X), dim=1)

MLP

For MNIST, D_in=784, 784*(250+1) + 250*(100+1) + 100*(10+1) = 222 360 parameters to train



In [None]:
class MLP(nn.Module):

    def __init__(self):
        super(MLP, self).__init__()
        self.linear1 = nn.Linear(D_in, 250)
        self.linear2 = nn.Linear(250, 100)
        self.linear3 = nn.Linear(100, D_out)

    def forward(self, X):
        X = X.view(-1, D_in)
        X = F.relu(self.linear1(X))
        X = F.relu(self.linear2(X))
        X = self.linear3(X)
        return F.log_softmax(X, dim=1)

class MLPDropOut(nn.Module):

    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(D_in, 50)
        self.fc1_drop = nn.Dropout(0.2)
        self.fc2 = nn.Linear(50, 50)
        self.fc2_drop = nn.Dropout(0.2)
        self.fc3 = nn.Linear(50, D_out)

    def forward(self, x):
        x = x.view(-1, D_in)
        x = F.relu(self.fc1(x))
        x = self.fc1_drop(x)
        x = F.relu(self.fc2(x))
        x = self.fc2_drop(x)
        return F.log_softmax(self.fc3(x), dim=1)

#mlp = MLP()
#print(mlp)

CNN Models
We'll use two 2-D convolutional layers followed by two fully-connected (or linear) layers. As activation function
we'll choose rectified linear units (ReLUs in short)



In [None]:
class ConvNet(nn.Module):
    def __init__(self):
        super(Net1, self).__init__()
        self.conv1 = nn.Conv2d(1, 20, 5, 1)
        self.conv2 = nn.Conv2d(20, 50, 5, 1)
        self.fc1 = nn.Linear(4*4*50, 500)
        self.fc2 = nn.Linear(500, 10)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2, 2)
        x = x.view(-1, 4*4*50)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)

regularization with two dropout layers.



In [None]:
class ConvNetDropOut(nn.Module):
    def __init__(self):
        super(Net2, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x)

Training the Model

First we want to make sure our network is in training mode.
Then we iterate over all training data once per epoch.
Loading the individual batches is handled by the DataLoader.
First we need to manually set the gradients to zero using `optimizer.zero_grad()` since PyTorch by default
accumulates gradients.
We then produce the output of our network (forward pass) and compute a negative log-likelihodd loss between the
output and the ground truth label.

The backward() call we now collect a new set of gradients which we propagate back into each of the network's
parameters using optimizer.step().
For more detailed information about the inner workings of PyTorch's automatic gradient system,
see the official docs for autograd (highly recommended).

We'll also keep track of the progress with some printouts. In order to create a nice training curve later on
we also create two lists for saving training and testing losses.
On the x-axis we want to display the number of training examples the network has seen during training.

Neural network modules as well as optimizers have the ability to save and load their internal state using
`.state_dict()`. With this we can continue training from previously saved state dicts if needed - we'd just need
to call `.load_state_dict(state_dict)`.



In [None]:
def train(model, train_loader, optimizer, epoch, device, log_interval=10):
    train_losses, train_counter = list(), list()
    # epoch = 1; log_interval=10; train_losses=[]; train_counter=[]
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        # batch_idx, (data, target) = next(enumerate(train_loader))
        # print(data.shape)
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()

        train_losses.append(loss.item())
        train_counter.append(data.shape[0]) # (batch_idx * data.shape[0]) + ((epoch-1)*len(train_loader.dataset)))

        if batch_idx % log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))

            torch.save(model.state_dict(), 'models/mod-%s.pth' % model.__class__.__name__)
            torch.save(optimizer.state_dict(), 'models/mod-%s_opt-%s.pth' % (model.__class__.__name__, optimizer.__class__.__name__))

    return model, train_losses, train_counter

Test loop. Here we sum up the test loss and keep track of correctly classified digits to compute the accuracy of
the network.
Using the context manager no_grad() we can avoid storing the computations done producing the output of our network
in the computation graph.

In [None]:
def test(model, test_loader, device):
    model.eval()
    test_loss = 0
    correct = 0
    output, pred, target = list(), list(), list()
    with torch.no_grad():
        for data, target_ in test_loader:
            # batch_idx, (data, target) = next(enumerate(test_loader))
            # print(target_.shape)
            data, target_ = data.to(device), target_.to(device) # target.shape == 1000
            output_ = model(data) # output.shape == (1000, 10)
            test_loss += F.nll_loss(output_, target_, reduction='sum').item() # sum up batch loss
            pred_ = output_.argmax(dim=1) # get the index of the max log-probability
            correct += pred_.eq(target_.view_as(pred_)).sum().item() # view_as(other): View this tensor as the same size as other
            output.append(output_)
            pred.append(pred_)
            target.append(target_)

    output = torch.cat(output)
    pred = torch.cat(pred)
    target = torch.cat(target)
    assert pred.eq(target.view_as(pred)).sum().item() == correct

    test_loss /= len(test_loader.dataset)

    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))
    return pred, output, target, test_loss

In [None]:
Initialize the network and the optimizer.


In [None]:
#  If we were using a GPU for training, we should have also sent the network parameters to the GPU
model = TwoLayerMLP(D_in, 50, D_out)
optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=momentum)

Time to run the training! We'll manually add a test() call before we loop over n_epochs to evaluate our model with
randomly initialized parameters.



In [None]:
pred, output, target, test_loss = test(model, test_loader, device)
print("Test accuracy = {}%".format((target == pred).sum() * 100. / len(target)))

Train one epoch



In [None]:
model, train_losses, train_counter = train(model, train_loader, optimizer, 1, device)
pred, output, target, test_loss = test(model, test_loader, device)

Evaluating the Model's Performance



In [None]:
print("Test accuracy = {}%".format((target == pred).sum() * 100. / len(target)))
test_counter, test_losses = [len(train_loader.dataset)], [test_loss]

fig = plt.figure()
plt.plot(np.cumsum(train_counter), train_losses, '-b',
         np.cumsum(test_counter), test_losses, "or")
plt.legend(['Train Loss', 'Test Loss'], loc='upper right')
plt.xlabel('number of training examples seen')
plt.ylabel('negative log likelihood loss')

let's again look at a few examples as we did earlier and compare the model's output.



In [None]:
with torch.no_grad():
  output = model(example_data)
y_pred = output.argmax(dim=1)

show_data_label_prediction(data=example_data, y_true=example_targets, y_pred=y_pred, shape=(3, 4))

Look at some missclassified images



In [None]:
errors = example_targets != y_pred
print("Nb errors = {}, (rate = {:.2f}%)".format(errors.sum(), 100 * errors.sum().item() / len(errors)))
err_idx = np.where(errors)
show_data_label_prediction(data=example_data[err_idx], y_true=example_targets[err_idx], y_pred=y_pred[err_idx], shape=(3, 4))

Reload model



In [None]:
model = TwoLayerMLP(D_in, 50, D_out)
optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=momentum)

optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=momentum)
model.load_state_dict(torch.load('models/mod-%s.pth' % model.__class__.__name__))
optimizer.load_state_dict(torch.load('models/mod-%s_opt-%s.pth' % (model.__class__.__name__, optimizer.__class__.__name__)))

Continue training from checkpoints



In [None]:
for epoch in range(2, n_epochs + 1):
#for epoch in range(n_epochs+1, n_epochs + 5):
#for epoch in range(n_epochs + 5, n_epochs + 10):
    model, train_losses_, train_counter_ = train(model, train_loader, optimizer, epoch, device, log_interval)
    train_losses += train_losses_
    train_counter += train_counter_
    pred, output, target, test_loss = test(model, test_loader, device)
    test_counter.append(len(train_loader.dataset))
    test_losses.append(test_loss)
    print("Test accuracy = {:.1f}%".format((target == pred).sum().item() * 100. / len(target)))
    #test(cont_mod, test_loader, epoch, device, test_losses)


fig = plt.figure()
plt.plot(np.cumsum(train_counter), train_losses, color='blue')
plt.plot(np.cumsum(test_counter), test_losses, "or")
plt.legend(['Train Loss', 'Test Loss'], loc='upper right')
plt.xlabel('number of training examples seen')
plt.ylabel('negative log likelihood loss')

Visualize coeficients map



In [None]:
layers = [layer for layer in model.modules()]
l = layers[0]
weights = [p for p in l.parameters()]
print([w.shape for w in weights])
w = weights[0].detach().numpy()
# torch.Size([10, 784]) => 10 x 1 x 28 x 28
ima = np.concatenate([w[i].reshape(1, 1, 28, 28) for i in range(10)])
ima = w.reshape(10, 1, 28, 28)
show_data_label_prediction(data=ima, y_true=np.arange(10), y_pred=None, shape=(2, 5))