# MNIST + PyTorch

**MNIST dataset** je jedním z nejstarších klasických datasetů v oblasti umělé inteligence.

**PyTorch** je nástroj pro vytváření a trénování neuronových sítí.

**Tenzory** jsou vícerozměrné "matice": objekty plné čísel, které lze indexovat ve více dimenzích. Vektory i matice jsou také tenzory, jednorozměrné a dvourozměrné. Tenzory jsou základní stavební kámen umělé inteligence.

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import random
from datetime import datetime

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from torchvision import datasets, transforms
import torch.nn.functional as F

# Seznámení se s datasetem

In [None]:
transform = transforms.ToTensor()
dataset = datasets.MNIST(root='./data', train=True, transform=transform, download=True)

In [None]:
x, y = dataset[0]

In [None]:
x

In [None]:
x.type()

In [None]:
y

In [None]:
k = random.randint(0, 59_999)
x, y = dataset[k]
for i in range(28):
  for j in range(28):
    s = ' ' if x[0, i, j] < 0.2 else ('.' if x[0, i, j] < 0.7 else 'x')
    print(s, end='')
  print()
print(f'Target = {y}')

In [None]:
rows, cols = 3, 4
fig, axes = plt.subplots(rows, cols, figsize=(8, 5))
for i, ax in enumerate(axes.flat):
  x, y = dataset[i]
  ax.imshow(x[0], cmap='gray')
  ax.set_title(y)
  ax.axis('off')
plt.tight_layout()

## K zamyšlení
- Pokud byste měli počítač naučit, jak rozpoznávat tyto cifry, jak byste to vůbec dělali?
- A když ho to naučíte, jakým způsobem to můžeme ohodnotit? Abychom mohli říct, jestli se to naučil dobře nebo ne?

## Neuronová síť

In [None]:
# Structure of the network
n_input = 28 * 28   # number of neurons in the input layer (~ number of pixels of images)
n_hidden_1 = 16     # number of neurons in the first hidden layer
n_hidden_2 = 16     # number of neurons in the second hidden layer
n_output = 10       # number of neurons in the output layer (~ number of different digits)

In [None]:
# Load the MNIST dataset
train_dataset = datasets.MNIST(root='./data', train=True, transform=transform,
                               download=True)
test_dataset = datasets.MNIST(root='./data', train=False, transform=transform,
                              download=True)

# Use tensor dataset instead - much faster
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
train_tensor_dataset = TensorDataset((train_dataset.data / 255.).to(device), train_dataset.targets.to(device))
test_tensor_dataset = TensorDataset((test_dataset.data / 255.).to(device), test_dataset.targets.to(device))

# Create data loaders
batch_size = 64     # size of batch
train_loader = DataLoader(train_tensor_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_tensor_dataset, batch_size=batch_size, shuffle=False)

### Definice modelu

In [None]:
# Definition of the neural network
class SmallNetwork(nn.Module):
  def __init__(self):
    super(SmallNetwork, self).__init__()
    self.fc1 = nn.Linear(28 * 28, 16)
    self.fc2 = nn.Linear(16, 16)
    self.fc3 = nn.Linear(16, 10)

  def forward(self, x):
    x = x.view(x.size(0), -1)
    x = torch.sigmoid(self.fc1(x))
    x = torch.sigmoid(self.fc2(x))
    x = self.fc3(x)
    return x

In [None]:
# Sigmoid = activation function
x = torch.linspace(-6, 6, 201)
plt.plot(x, torch.sigmoid(x))
plt.grid()

In [None]:
model = SmallNetwork()

In [None]:
x, y = dataset[0]

In [None]:
model(x)

In [None]:
torch.argmax(model(x))

In [None]:
def plot_prediction(dataset, model, rows=3, cols=4, random=False):
  fig, axes = plt.subplots(rows, cols, figsize=(8, 5))
  for i, ax in enumerate(axes.flat):
    if random:
      i = np.random.randint(len(dataset))
    x, y = dataset[i]
    pred = torch.argmax(model(x))
    ax.imshow(x[0], cmap='gray')
    ax.set_title(f'label = {y}, pred = {pred}')
    ax.axis('off')
  plt.tight_layout()

In [None]:
plot_prediction(dataset, model, random=True)

In [None]:
def prediction_table(dataset, model):
  _, predicted = torch.max(model(dataset.data.float() / 255.0), 1)
  df = pd.DataFrame({'True': dataset.targets.cpu(),
                     'Prediction': predicted.cpu()}, dtype=int)
  df['Count'] = 1
  pivoted = df.groupby(['True', 'Prediction'])['Count'].count().reset_index() \
    .pivot_table(index='True', columns='Prediction', values='Count', fill_value=0)
  for i in range(10):
    if i not in pivoted.columns:
      pivoted[i] = 0
  return pivoted.sort_index(axis=1)

In [None]:
prediction_table(test_dataset, model)

### Trénink

In [None]:
def train_network(model, train_loader, test_loader, num_epochs=20):
  cross_entropy_loss = nn.CrossEntropyLoss()
  optimizer = optim.SGD(model.parameters(), lr=0.01)

  train_losses = []
  test_losses = []
  train_accuracies = []
  test_accuracies = []

  # Loss and accuracy before the training.
  model.eval()
  test_loss = 0.0
  test_correct = 0
  with torch.no_grad():
    for images, labels in test_loader:
      outputs = model(images)
      loss = cross_entropy_loss(outputs, labels)
      test_loss += loss.item() * images.size(0)
      _, predicted = torch.max(outputs.data, 1)
      test_correct += (predicted == labels).sum().item()

  test_loss = test_loss / len(test_loader.dataset)
  test_losses.append(test_loss)
  test_accuracy = test_correct / len(test_loader.dataset)
  test_accuracies.append(test_accuracy)

  train_losses.append(np.nan)
  train_accuracies.append(np.nan)

  t0 = datetime.now()
  print(f"Starting training at {t0} on device {next(model.parameters()).device}."
        f" Test loss {test_loss:.4f}, accuracy {100 * test_accuracy:.2g}%.")
  print()

  for epoch in range(num_epochs):
    train_loss = 0.0
    train_correct = 0
    model.train(True)
    for images, labels in train_loader:
      outputs = model(images)
      loss = cross_entropy_loss(outputs, labels)

      optimizer.zero_grad()
      loss.backward()
      optimizer.step()

      train_loss += loss.item() * images.size(0)
      _, predicted = torch.max(outputs.data, 1)
      train_correct += (predicted == labels).sum().item()

    model.eval()
    test_loss = 0.0
    test_correct = 0
    with torch.no_grad():
      for images, labels in test_loader:
        outputs = model(images)
        loss = cross_entropy_loss(outputs, labels)
        test_loss += loss.item() * images.size(0)
        _, predicted = torch.max(outputs.data, 1)
        test_correct += (predicted == labels).sum().item()

    train_loss = train_loss / len(train_loader.dataset)
    train_losses.append(train_loss)
    train_accuracy = train_correct / len(train_loader.dataset)
    train_accuracies.append(train_accuracy)

    test_loss = test_loss / len(test_loader.dataset)
    test_losses.append(test_loss)
    test_accuracy = test_correct / len(test_loader.dataset)
    test_accuracies.append(test_accuracy)

    print(f"Elapsed {datetime.now() - t0}s, epoch {epoch + 1} / {num_epochs}.",
          f"Training loss {train_loss:.4f}, accuracy {100 * train_accuracy:.1f}%.",
          f"Test loss {test_loss:.4f}, accuracy {100 * test_accuracy:.1f}%.")

  print()
  print(f"Finished in {datetime.now() - t0}s.")
  return train_losses, train_accuracies, test_losses, test_accuracies

In [None]:
train_loss, train_acc, test_loss, test_acc = train_network(
    model, train_loader, test_loader, num_epochs=150)

### Výsledky

In [None]:
plt.plot(train_loss, 'g', label='Train loss')
plt.plot(test_loss, 'r', label='Test loss')
plt.title('Loss')
plt.grid()
plt.legend()

In [None]:
plt.plot(train_acc, 'g', label='Train accuracy')
plt.plot(test_acc, 'r', label='Test accuracy')
plt.title('Accuracy')
plt.grid()
plt.legend()

In [None]:
plot_prediction(test_dataset, model)

In [None]:
prediction_table(test_dataset, model)

## Matematický bonus

Celá neuronová síť je jen maticové násobení, podobně jako v našem příkladu na papíře. Stačí k tomu přidat aktivační funkci a biases a síť učit pomocí derivací.

In [None]:
# initialization - this is identical neural network as the one above
def initialize_params():
  return {'W1': np.random.randn(n_hidden_1, n_input) * np.sqrt(1 / n_input),
          'b1': np.zeros((n_hidden_1, 1)),
          'W2': np.random.randn(n_hidden_2, n_hidden_1) * np.sqrt(1. / n_hidden_1),
          'b2': np.zeros((n_hidden_2, 1)),
          'W3': np.random.randn(n_output, n_hidden_2) * np.sqrt(1. / n_hidden_2),
          'b3': np.zeros((n_output, 1))}

In [None]:
# And we need to define some helper function:

# sigmoid
def sigmoid(z):
    return 1 / (1 + np.exp(-z))

# one-hot encoding
def one_hot(arr):
    res = np.zeros((arr.size, arr.max() + 1))
    res[np.arange(arr.size), arr] = 1
    return res

# softmax (is really a soft version of maximum)
def softmax(z):
    return np.exp(z) / np.sum(np.exp(z), axis=0)

# loss function - the cost we try to minimize
def loss_mean_squared_errors(Y, Y_hat):
    return np.sum((Y - Y_hat) ** 2) / Y.shape[1]

def accuracy(Y, Y_hat):
    Y_idx = np.argmax(Y, axis=0)
    Y_hat_idx = np.argmax(Y_hat, axis=0)
    return np.sum(Y_idx == Y_hat_idx) / len(Y_idx)

In [None]:
def feed_forward(X, params):
    """
    feed forward:
    inputs: params: a dictionary contains all the weights and biases
    return: cache: a dictionary contains all the fully connected units and activations
    """
    cache = {}

    # Z1 = W1.dot(x) + b1
    cache['Z1'] = params['W1'] @ X + params['b1']
    # A1 = sigmoid(Z1)
    cache['A1'] = sigmoid(cache['Z1'])

    # Z2 = W2.dot(A1) + b2
    cache['Z2'] = params['W2'] @ cache['A1'] + params['b2']
    # A2 = sigmoid(Z2)
    cache['A2'] = sigmoid(cache['Z2'])

    # Z3 = W3.dot(A2) + b3
    cache['Z3'] = params['W3'] @ cache['A2'] + params['b3']
    # A3 = sigmoid(Z3)
    # cache['A3'] = softmax(cache['Z3'])
    cache['A3'] = sigmoid(cache['Z3'])

    return cache

In [None]:
def back_propagate(X, Y, params, cache, m_batch):
    """
    back propagation
    inputs:
        params: a dictionary contains all the weights and biases
        cache: a dictionary contains all the fully connected units and activations
    return:
        grads: a dictionary contains the gradients of corresponding weights and biases
    """
    # error at last layer - mean square errors
    dA3 = 2 * (cache['A3'] - Y)
    dZ3 = dA3 * cache['A3'] * (1 - cache['A3'])

    # or softmax with cross-entropy
    # dZ3 = cache['A3'] - Y

    # gradients at last layer
    dW3 = (1 / m_batch) * np.matmul(dZ3, cache['A2'].T)
    db3 = (1 / m_batch) * np.sum(dZ3, axis=1, keepdims=True)

    # back propagate through the second layer
    dA2 = np.matmul(params['W3'].T, dZ3)
    dZ2 = dA2 * cache['A2'] * (1 - cache['A2'])

    # gradients the second last layer
    dW2 = (1. / m_batch) * np.matmul(dZ2, cache['A1'].T)
    db2 = (1. / m_batch) * np.sum(dZ2, axis=1, keepdims=True)

    # back propagate through the first layer
    dA1 = np.matmul(params['W2'].T, dZ2)
    dZ1 = dA1 * cache['A1'] * (1 - cache['A1'])

    # gradients at the first layer
    dW1 = (1. / m_batch) * np.matmul(dZ1, X.T)
    db1 = (1. / m_batch) * np.sum(dZ1, axis=1, keepdims=True)

    grads = {'dW1': dW1, 'db1': db1, 'dW2': dW2, 'db2': db2, 'dW3': dW3, 'db3': db3}

    return grads

In [None]:
def train_params(params, X_train, Y_train, X_test, Y_test, num_epochs=20, lr=1):

  train_losses = []
  test_losses = []

  train_accuracies = []
  test_accuracies = []

  t0 = datetime.now()
  print(f"Starting manual training at {t0}.")
  print()

  for i in range(num_epochs):

      # shuffle training set
      permutation = np.random.permutation(X_train.shape[1])
      X_train_shuffled = X_train[:, permutation]
      Y_train_shuffled = Y_train[:, permutation]

      batches = (X_train_shuffled.shape[0] - 1) // batch_size + 1

      for j in range(batches):

          # get mini-batch
          begin = j * batch_size
          end = min(begin + batch_size, X_train.shape[1] - 1)
          X = X_train_shuffled[:, begin:end]
          Y = Y_train_shuffled[:, begin:end]
          m_batch = end - begin

          # forward and backward
          cache = feed_forward(X, params)
          grads = back_propagate(X, Y, params, cache, m_batch)

          # gradient descent
          params['W1'] = params['W1'] - lr * grads['dW1']
          params['b1'] = params['b1'] - lr * grads['db1']
          params['W2'] = params['W2'] - lr * grads['dW2']
          params['b2'] = params['b2'] - lr * grads['db2']
          params['W3'] = params['W3'] - lr * grads['dW3']
          params['b3'] = params['b3'] - lr * grads['db3']

      # forward pass on training set
      cache = feed_forward(X_train, params)
      train_loss = loss_mean_squared_errors(Y_train, cache['A3'])
      train_losses.append(train_loss)
      train_accuracy = accuracy(Y_train, cache['A3'])
      train_accuracies.append(train_accuracy)

      # forward pass on test set
      cache = feed_forward(X_test, params)
      test_loss = loss_mean_squared_errors(Y_test, cache['A3'])
      test_losses.append(test_loss)
      test_accuracy = accuracy(Y_test, cache['A3'])
      test_accuracies.append(test_accuracy)

      print(f"Elapsed {datetime.now() - t0}s, epoch {i + 1} / {num_epochs}.",
            f"Training loss {train_loss:.4f}, accuracy {100 * train_accuracy:.1f}%.",
            f"Test loss {test_loss:.4f}, accuracy {100 * test_accuracy:.1f}%.")

  print()
  print(f"Finished in {datetime.now() - t0}s.")
  return train_losses, train_accuracies, test_losses, test_accuracies

In [None]:
X_train, Y_train = (t.numpy() for t in train_tensor_dataset.tensors)
X_test, Y_test = (t.numpy() for t in test_tensor_dataset.tensors)

X_train = X_train.reshape(-1, 28 * 28).T
X_test = X_test.reshape(-1, 28 * 28).T

Y_train = one_hot(Y_train).T
Y_test = one_hot(Y_test).T

In [None]:
params = initialize_params()

In [None]:
train_loss, train_acc, test_loss, test_acc = train_params(
    params, X_train, Y_train, X_test, Y_test, num_epochs=250)

In [None]:
plt.plot(train_loss, 'g', label='Train loss')
plt.plot(test_loss, 'r', label='Test loss')
plt.title('Loss')
plt.grid()
plt.legend()

In [None]:
plt.plot(train_acc, 'g', label='Train accuracy')
plt.plot(test_acc, 'r', label='Test accuracy')
plt.title('Accuracy')
plt.grid()
plt.legend()

In [None]:
params

In [None]:
pred_test = np.argmax(feed_forward(X_test, params)['A3'], axis=0)
fig, axes = plt.subplots(rows, cols, figsize=(8, 5))
for i, ax in enumerate(axes.flat):
  if random:
    i = np.random.randint(X_test.shape[1])
  x, y = X_test[:, i], Y_test[:, i]
  pred = pred_test[i]
  ax.imshow(x.reshape(28, 28), cmap='gray')
  ax.set_title(f'label = {np.argmax(y)}, pred = {pred}')
  ax.axis('off')
plt.tight_layout()

In [None]:
# what the network is actually predicting?
pred_test = np.argmax(feed_forward(X_test, params)['A3'], axis=0)
true_test = np.argmax(Y_test, axis=0)

df = pd.DataFrame({'True': true_test, 'Prediction': pred_test})
df['Count'] = 1
pivoted = df.groupby(['True', 'Prediction'])['Count'].count().reset_index() \
    .pivot_table(index='True', columns='Prediction', values='Count', fill_value=0)
for i in range(10):
    if i not in pivoted.columns:
        pivoted[i] = 0
pivoted = pivoted[range(10)]
pivoted