<center><img src='https://drive.google.com/uc?id=1_utx_ZGclmCwNttSe40kYA6VHzNocdET' height="60"></center>

AI TECH - Akademia Innowacyjnych Zastosowań Technologii Cyfrowych. Program Operacyjny Polska Cyfrowa na lata 2014-2020
<hr>

<center><img src='https://drive.google.com/uc?id=1BXZ0u3562N_MqCLcekI-Ens77Kk4LpPm'></center>

<center>
Projekt współfinansowany ze środków Unii Europejskiej w ramach Europejskiego Funduszu Rozwoju Regionalnego
Program Operacyjny Polska Cyfrowa na lata 2014-2020,
Oś Priorytetowa nr 3 "Cyfrowe kompetencje społeczeństwa" Działanie  nr 3.2 "Innowacyjne rozwiązania na rzecz aktywizacji cyfrowej"
Tytuł projektu:  „Akademia Innowacyjnych Zastosowań Technologii Cyfrowych (AI Tech)”
    </center>

In this lab, you will implement some of the techniques discussed in the lecture.

Below you are given a solution to the previous scenario. It has two serious drawbacks:
 * The output predictions do not sum up to one (i.e. the output is not a probability distribution), even though the images always contain exactly one digit.
 * It uses MSE coupled with output sigmoid, which can lead to saturation and slow convergence.

**Task 0.** Implement a numerically stable version of softmax.

**Task 1.** Use softmax instead of coordinate-wise sigmoid and use log-loss instead of MSE. Test to see if this improves convergence. Hint: When implementing backprop it might be easier to consider these two functions as a single block, rather than compute the gradient over the softmax values.

**Task 2.** Implement L2 regularization and add momentum to the SGD algorithm. Play with different amounts of regularization and momentum. See if this improves accuracy/convergence.

**Task 3 (optional).** Implement Adagrad or AdamW (currently popular in LLM training), dropout, and some simple data augmentations (e.g. tiny rotations/shifts, etc.). Again, test to see how these changes improve accuracy/convergence.

**Task 4.** Try adding extra layers to the network. Again, test how the changes you introduced affect accuracy/convergence. As a start, you can try this architecture: [784,100,30,10]

The provided model evaluation code (`evaluate_model`) may take some time to complete. During implementation, you can change the number of evaluated models to 1 and reduce the number of tested learning rates, and epochs.  


In [None]:
!pip install tqdm pandas
!wget --no-verbose -O mnist.npz https://s3.amazonaws.com/img-datasets/mnist.npz

In [None]:
import random
import json
from pathlib import Path
from typing import Any, Callable, Sequence

import numpy as np
import pandas as pd
from IPython.display import display
from numpy.typing import NDArray
from torchvision import datasets, transforms
from tqdm import tqdm

FloatNDArray = NDArray[np.float64]

np.random.seed(42)

In [None]:
def load_mnist(
    path: Path = Path("mnist.npz")
) -> tuple[FloatNDArray, FloatNDArray, FloatNDArray, FloatNDArray]:
    """
    Load the MNIST dataset (grayscale 28 x 28 images of hand-written digits).

    Returns tuple of:
    - x_train: shape (N_train, H * W), grayscale values 0..1.
    - y_train: shape (N_train, 10), one-hot-encoded label, dtype float64.
    - x_test: shape (N_test, H * W), grayscale values 0..1.
    - y_train: shape (N_test, 10), one-hot-encoded label, dtype float64.

    More: https://en.wikipedia.org/wiki/MNIST_database
    """
    with np.load(path) as f:
        x_train, _y_train = f["x_train"], f["y_train"]
        x_test, _y_test = f["x_test"], f["y_test"]

    H = W = 28
    N_train = len(x_train)
    N_test = len(x_test)
    assert x_train.shape == (N_train, H, W) and _y_train.shape == (N_train,)
    assert x_test.shape == (N_test, H, W) and _y_test.shape == (N_test,)

    x_train = x_train.reshape(N_train, H * W) / 255.0
    x_test = x_test.reshape(N_test, H * W) / 255.0

    y_train = np.zeros((N_train, 10), dtype=np.float64)
    y_train[np.arange(N_train), _y_train] = 1

    y_test = np.zeros((N_test, 10))
    y_test[np.arange(N_test), _y_test] = 1

    return x_train, y_train, x_test, y_test

x_train, y_train, x_test, y_test = load_mnist()

In [None]:
def sigmoid(z: FloatNDArray) -> FloatNDArray:
    return 1.0 / (1.0 + np.exp(-z))


def sigmoid_prime(z: FloatNDArray) -> FloatNDArray:
    """Derivative of the sigmoid function."""
    return sigmoid(z) * (1 - sigmoid(z))

## Warm-Up
Implement a numerically stable version of softmax.  

In general, softmax is defined as  
$$\text{softmax}(x_1, x_2, \ldots, x_n) = (\frac{e^{x_1}}{\sum_i{e^{x_i}}}, \frac{e^{x_2}}{\sum_i{e^{x_i}}}, \ldots, \frac{e^{x_n}}{\sum_i{e^{x_i}}})$$  
However, taking $e^{1000000}$ can result in NaN.  
Can you implement softmax so that the highest power to which e will be risen will be at most $0$ and the predictions will be mathematically equivalent?  

Hint: <sub><sub><sub>sǝnlɐʌ llɐ ɯoɹɟ ʇᴉ ʇɔɐɹʇqns  puɐ ᴉ‾x ʇsǝƃɹɐl ǝɥʇ ǝʞɐʇ</sub></sub></sub>

In [None]:
def unstable_softmax(x: FloatNDArray, axis: int = -1) -> FloatNDArray:
    e = np.exp(x)
    return e / np.sum(e, axis=axis, keepdims=True)


def stable_softmax(x: FloatNDArray, axis: int = -1) -> FloatNDArray:
    ## TODO
    ###{
    ###}


### TESTS ###
def _test_one(x: FloatNDArray, y: FloatNDArray) -> None:
    r = stable_softmax(x)
    assert r.shape == y.shape, f"Expected shape {y.shape}, got {r.shape=}"
    assert np.isclose(np.ones(x.shape[0]), r.sum(axis=-1), atol=1e-5, rtol=0).all()
    assert np.isclose(y, r, atol=1e-5, rtol=0).all()

def test_stable_softmax() -> None:
    x1 = np.random.rand(100, 32).astype(np.float64)
    _test_one(x1, unstable_softmax(x1))

    x2 = np.ones((10, 10, 32), dtype=np.float64) * 1e6
    _test_one(x2, np.ones_like(x2) / x2.shape[-1])

    print("OK")

test_stable_softmax()
### TESTS END ###

## ModelResults


In [None]:
class ModelResults:
    """Just a helper class for gathering results in a nice table. Feel free to ignore."""
    def __init__(self):
        # Map from model name to map from lr to list of test accuracies.
        self.results = dict[str, dict[float, list[float]]]()

    def clear(self, model_name: str | None = None) -> None:
        """Forget results for a given model (defaults to all models)."""
        if model_name:
            if model_name in self.results:
                del self.results[model_name]
        else:
            self.results = {}

    def add_result(self, model_name: str, learning_rate: float, accuracy: float) -> None:
        if model_name not in self.results:
            self.results[model_name] = {}
        if learning_rate not in self.results[model_name]:
            self.results[model_name][learning_rate] = []
        self.results[model_name][learning_rate].append(accuracy)

    def display_results(self) -> None:
        data = list[dict[str, Any]]()
        for model_name, model_results in self.results.items():
            for lr, accuracies in model_results.items():
                mean_accuracy = np.mean(accuracies)
                accuracy_summary = f"{mean_accuracy:2.1%} ± {np.std(accuracies) * 100:.1f} p.p."
                data.append({
                    "model": model_name,
                    "lr": lr,
                    "mean_accuracy": mean_accuracy,
                    "accuracy": accuracy_summary
                })

        df = pd.DataFrame(data).sort_values("mean_accuracy", ascending=False)
        del df["mean_accuracy"]
        display(df.style.format({"lr": "{:.1g}"}).hide())

    def evaluate_model(
        self,
        model_name: str,
        model_constructor: Callable[[Sequence[int]], Any],
        layers: Sequence[int] = (784, 30, 10),
        learning_rates: Sequence[float] = (1.0, 10.0, 100.0),
        n_trainings: int = 3,
        **kwargs: Any
    ) -> None:
        # Automatic model name with parameters.
        if kwargs:
            if tuple(layers) != (784, 30, 10):
                model_name += "[" + ",".join(str(n) for n in layers) + "]"

            model_name += "("
            for k, v in kwargs.items():
                if isinstance(v, (float,  np.floating)):
                    model_name += f"{k}={v:.1g},"
                else:
                    model_name += f"{k}={v},"
            model_name = model_name[:-1]
            model_name += ")"

        # Train for each learning rate, n_trainings times.
        for lr in learning_rates:
            print(f"Checking {n_trainings} random trainings with with lr = {lr}")
            for i in range(n_trainings):
                network = model_constructor(layers, **kwargs)
                accuracy = network.train(
                    (x_train, y_train),
                    epochs=10,
                    mini_batch_size=100,
                    learning_rate=lr,
                    test_data=(x_test, y_test),
                )
                self.add_result(model_name, lr, float(accuracy))


model_results = ModelResults()

## Baseline
The solution to the previous lab: an MLP network with MSE loss on sigmoid outputs, trained with plain SGD (batched).

In [None]:
class Network:
    def __init__(self, sizes: Sequence[int] = (784, 30, 10)):
        """
        Args:
        - sizes: sequence of layer widths [N^0, ... , N^last]
          These are lengths of activation vectors, where:
          - N^0 is input size: H * W = 28 * 28 = 784.
          - N^last is the number of classes into which we can classify each input: 10.
        """
        self.sizes = list(sizes)

        # List of len(sizes) - 1 vectors of shape (N^1), (N^2), ..., (N^last).
        self.biases = [np.random.randn(n) for n in sizes[1:]]

        # List of len(sizes) - 1 matrices of shape (N^i, N^{i-1}).
        # Weights are indexed by target node first.
        self.weights = [
            np.random.randn(n_out, n_in) / np.sqrt(n_in)
            for n_in, n_out in zip(sizes[:-1], sizes[1:], strict=True)
        ]

        self.num_layers = len(self.weights)   # = len(sizes) - 1


    def feedforward(self, x: FloatNDArray) -> FloatNDArray:
        """
        Run the network on a batch of cases of shape (B, N^0), values 0..1.

        Returns last layer activations, shape (B, N^last), values 0..1.
        """
        g = x
        for w, b in zip(self.weights, self.biases, strict=True):
            # Shapes (B, N^{i-1}) @ (N^{i-1}, N^i) + (N^i,)  ==  (B, N^i)
            g = sigmoid(g @ w.T + b)
        return g

    def learning_step(self, x_mini_batch: FloatNDArray, y_mini_batch: FloatNDArray, learning_rate: float) -> None:
        """
        Update network parameters with a single mini-batch step of backpropagation and gradient descent.

        Args:
        - x_mini_batch: shape (B, N^0) where B is mini_batch_size.
        - y_mini_batch: shape (B, N^last).
        - learning_rate.
        """
        grads_w, grads_b = self.backprop(x_mini_batch, y_mini_batch)

        # Gradient descent step.
        self.weights = [
            w - learning_rate * grad_w
            for w, grad_w in zip(self.weights, grads_w, strict=True)
        ]
        self.biases = [
            b - learning_rate * grad_b
            for b, grad_b in zip(self.biases, grads_b, strict=True)
        ]

    def backprop(
        self, x: FloatNDArray, y: FloatNDArray
    ) -> tuple[list[FloatNDArray], list[FloatNDArray]]:
        """
        Backpropagation for a mini-batch (vectorized).

        Args:
        - x: input, shape (B, N^0)
        - y: target label (one-hot encoded), shape (B, N^last)

        Returns (grads_w, grads_b), where:
        - grads_w: list of gradients over weights (shape (N^i, N^{i-1})), for each layer.
        - grads_b: list of gradients over biases (shape (N^i)), for each layer.
        """
        B, N0 = x.shape
        assert N0 == self.sizes[0]

        ### Copy from previous labs ###


    def cost_derivative(self, a: FloatNDArray, y: FloatNDArray) -> FloatNDArray:
        """
        Gradient of loss (MSE) over output activations.

        Args:
        - a: output activations, shape (B, N^last).
        - y: target values (one-hot encoded labels), shape (B, N^last).

        Returns gradients, shape (B, N^last).
        """
        assert a.shape == y.shape, f"Shape mismatch: {a.shape=} but {y.shape=}"
        B, N_last = a.shape
        return (2 / (B * N_last)) * (a - y.astype(np.float64))

    def evaluate(self, x_test_data: FloatNDArray, y_test_data: FloatNDArray) -> np.float64:
        """
        Compute accuracy: the ratio of correct answers for test_data.

        Args:
        - x_test_data: shape (B, N^0).
        - y_test_data: shape (B, N^last).
        """
        predictions = np.argmax(self.feedforward(x_test_data), axis=1)
        targets = np.argmax(y_test_data, axis=1)
        return np.mean(predictions == targets)

    def train(
        self,
        training_data: tuple[FloatNDArray, FloatNDArray],
        test_data: tuple[FloatNDArray, FloatNDArray] | None = None,
        epochs: int = 2,
        mini_batch_size: int = 100,
        learning_rate: float = 1.0
    ) -> np.float64:
        x_train, y_train = training_data
        progress_bar = tqdm(range(epochs), desc="Epoch")
        for epoch in progress_bar:
            for i in range(x_train.shape[0] // mini_batch_size):
                i_begin = i * mini_batch_size
                i_end = (i + 1) * mini_batch_size
                self.learning_step(x_train[i_begin:i_end], y_train[i_begin:i_end], learning_rate)
            if test_data:
                x_test, y_test = test_data
                accuracy = self.evaluate(x_test, y_test)
                progress_bar.set_postfix_str(f"Test accuracy: {accuracy * 100:.2f} %")

        if test_data:
            x_test, y_test = test_data
            return self.evaluate(x_test, y_test)
        else:
            return np.float64(-1)

model_results.evaluate_model(model_name="Baseline", model_constructor=Network, n_trainings=3)
model_results.display_results()

## Task 1: softmax & cross-entropy loss
Use softmax instead of coordinate-wise sigmoid and use negative-log-loss instead of MSE. Test to see if this improves convergence.   

Hints:
* When implementing backprop it's easier to consider these two functions as a single block, skipping the computation of the gradient over the softmax values, and going directly to gradients over logits (last pre-activations).
* Softmax is only used after the last layer; previous layers (and their grad computations) can be unchanged.
* Remember to update the forward pass in both places.
* Loss for a mini-batch is the mean of losses for each dataitem in it, by convention.


In [None]:
class Task1(Network):
    def __init__(self, sizes: Sequence[int]):
        super().__init__(sizes=sizes)

    def feedforward(self, x: FloatNDArray) -> FloatNDArray:
        ## TODO
        ###{
        ###}
        return g

    def backprop(
        self, x: FloatNDArray, y: FloatNDArray
    ) -> tuple[list[FloatNDArray], list[FloatNDArray]]:
        B, N0 = x.shape
        assert N0 == self.sizes[0]

        # Forward pass.
        # Activations (including input) of shapes (B, N^0), (B, N^1), ..., (B, N^last).
        gs: list[FloatNDArray] = [x]

        ## TODO
        ###{

        ###}

        # Backward pass.
        ## TODO
        ###{
        grads_w = []  # Shapes (N^last, N^{last-1}), ..., (N^1, N^0).
        grads_b = []  # Shapes (N^last,), ..., (N^1,).

        grads_w.reverse()  # Now shapes (N^1, N^0), ..., (N^last, N^{last-1}).
        grads_b.reverse()  # Now shapes (N^1,) ..., (N^last,).
        ###}

        for grad_b, b in zip(grads_b, self.biases, strict=True):
            assert grad_b.shape == b.shape, f"Shape mismatch: {grad_b.shape=} but {b.shape=}"
        for grad_w, w in zip(grads_w, self.weights, strict=True):
            assert grad_w.shape == w.shape, f"Shape mismatch: {grad_w.shape=} but {w.shape=}"

        return grads_w, grads_b


model_results.evaluate_model(model_name="SoftMax", model_constructor=Task1, n_trainings=3)
model_results.display_results()

## Task 2: L2-regularization and momentum
Implement L2-regularization and add momentum to the SGD algorithm. Play with different amounts of regularization and momentum. See if this improves accuracy/convergence.  
A few notes:
* do not regularize the biases
* you can see an example pseudocode here [pytorch.org/docs/stable/generated/torch.optim.SGD.html](https://pytorch.org/docs/stable/generated/torch.optim.SGD.html)

In [None]:
class Task2(Network):
    def __init__(
        self, sizes: Sequence[int], l2_factor: float = 1e-5, momentum: float = 0.2
    ):
        super().__init__(sizes=sizes)
        ## TODO
        ####{
        ###}

    def learning_step(self, x_mini_batch: FloatNDArray, y_mini_batch: FloatNDArray, learning_rate: float) -> None:
        """
        Update parameters with one mini-batch step of backprop and gradient descent (with momentum and L2-regularization).

        Args:
        - x_mini_batch: shape (B, N^0) where B is mini_batch_size.
        - y_mini_batch: shape (B, N^last).
        - learning_rate.
        """
        ## TODO
        ###{
        ###}

model_results.evaluate_model(
    model_name=f"L2&Momentum",
    model_constructor=Task2,
    learning_rates=[10.0],
    n_trainings=1,
    l2_factor=1e-5,
    momentum=0.2
)
model_results.display_results()

In [None]:
class Task1And2(Task2, Task1):
    # A somewhat hacky but short way to mix Task1 and Task2.
    # You could also just replace the superclass of Task2 to be Task1.
    pass

model_results.evaluate_model(
    model_name=f"Softmax&L2&Momentum",
    model_constructor=Task1And2,
    learning_rates=[2.0],
    n_trainings=3,
    l2_factor=1e-6,
    momentum=0.1
)
model_results.display_results()

## Task 3 (optional)
Implement more variations of SGD:
* AdamW (probably the most popular choice) or Adagrad,
* dropout
* some simple data augmentations (e.g. tiny rotations/shifts etc.).

Again, test to see how these changes improve accuracy/convergence.  

Quick reminders:
* for AdamW, check the official [PyTorch documentation](https://pytorch.org/docs/stable/generated/torch.optim.AdamW.html)'s pseudocode or the original paper: [Decoupled Weight Decay Regularization](https://arxiv.org/abs/1711.05101).
* for AdaGrad, check the Appendix of this notebook.
* for dropout: during training only, zero-out each activation in the considered layer with probability $p$, and multiplying other activations by $\frac{1}{1-p}$.

In [None]:
# Place for remaining parts of task 3

## Task 4
Try adding extra layers to the network. Again, test how the changes you introduced affect accuracy/convergence and what learning rates work.

As a start, you can try this slightly larger architecture: [784,100,30,10]  


In [None]:
## TODO

# Appendix

## Adagrad (simplified version)

Let $p_1, \ldots, p_n$ be all parameters in our model (weights and biases).  
For parameter $p_i$ we maintain a variable $G_i$ (can be set to $0$ initially).
Let $\mathcal{L}$ be our loss without L2.   
We update $G_i$ and $p_i$ each training step as follows:  
$$
G_i = G_i +  \left(\frac{\partial \mathcal{L}}{\partial p_i}\right)^2\\
p_i = p_i - \frac{\eta}{\sqrt{\left(G_i + \epsilon\right)}}\frac{\partial \mathcal{L}}{\partial p_i}
$$