<center><img src='https://drive.google.com/uc?id=1_utx_ZGclmCwNttSe40kYA6VHzNocdET' height="60"></center>

AI TECH - Akademia Innowacyjnych Zastosowań Technologii Cyfrowych. Program Operacyjny Polska Cyfrowa na lata 2014-2020
<hr>

<center><img src='https://drive.google.com/uc?id=1BXZ0u3562N_MqCLcekI-Ens77Kk4LpPm'></center>

<center>
Projekt współfinansowany ze środków Unii Europejskiej w ramach Europejskiego Funduszu Rozwoju Regionalnego
Program Operacyjny Polska Cyfrowa na lata 2014-2020,
Oś Priorytetowa nr 3 "Cyfrowe kompetencje społeczeństwa" Działanie  nr 3.2 "Innowacyjne rozwiązania na rzecz aktywizacji cyfrowej"
Tytuł projektu:  „Akademia Innowacyjnych Zastosowań Technologii Cyfrowych (AI Tech)”
    </center>

Code based on https://github.com/pytorch/examples/blob/master/mnist/main.py

In this exercise, we are using high-level abstractions from torch.nn like nn.Linear.
Note: during the next lab session we will go one level deeper and implement more things
with bare hands.

Tasks:

 1. Read the code.

 2. Check that the given implementation reaches 95% test accuracy for architecture input-128-128-10 after few epochs.

 3. Add the option to use SGD with momentum instead of ADAM.

 4. Experiment with different learning rates. Use the provided TrainingVisualizer
 to plot the learning curves and gradient-to-weight ratios. Compare visualizations
 for different learning rates for both ADAM and SGD with momentum.

 5. Parameterize the constructor by a list of sizes of hidden layers of the MLP.
 Note that this requires creating a list of layers as an attribute of the Net class,
 and one can't use a standard Python list containing nn.Modules (why?).
 Check torch.nn.ModuleList.

If you run this notebook locally then you may need to install some packages.
It may be achieved by adding the following code cell to the notebook and running it:
```
!pip install torch torchvision plotly ipywidgets
```
This notebook can also utilize Colab GPU. However, remember to kill your GPU session after classes as otherwise, you may use all your free GPU time for this week.

In [None]:
import math
import sys

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import plotly.graph_objects as go
from torch import Tensor
from torchvision import datasets, transforms

# Allow custom (non-ipywidget) widgets.
if "google.colab" in sys.modules:
    from google.colab import output

    output.enable_custom_widget_manager()

# For reproducibility.
torch.manual_seed(1)

## Tensors
In ML, *tensor* is just a short name for an n-dimensional array.

In other areas like physics, it might mean a multilinear operation that can be represented, after a choice of basis, by an ndarray; this is where we would see all the extra stuff (like covariance/contravariance), like for example on the Wikipedia article for "tensor". For us, it's usually just a bunch of numbers.

torch.Tensor is can be thought of as a wrapper around numpy ndarray, but:
* the interface is slightly different (like `np.expand_dims(x, dim=n-1)` vs `torch.unsqueeze(x, axis=-1)`);
* the data can be stored in GPU memory (or others);
* it can be part of a computation graph: a computed Tensor has pointers to other Tensors it was computed from, and to functions computing partial derivatives.

All the operations defined on for a torch.Tensor (`@`, `.arctan`, `.unsqueeze`, and a hundred others) can also run on a GPU and

In [None]:
x_np = np.array([[0, 1, 2], [3, 4, 5]])
x = torch.tensor([[0, 1, 2], [3, 4, 5]])  # Also: torch.rand, ones, zeros, rand_like, ...
display(x_np, x)

In [None]:
torch.from_numpy(x_np).equal(x), np.array_equal(x_np, x.numpy())

In [None]:
x.shape, x.dtype, x.device, x.requires_grad

Most operations have an in-place variant with the same name, but with an underscore.
This forgets the original value, making the computation of gradients impossible, so most of the time we don't use in-place operations.

In [None]:
x.add_(2)
display(x)

## Autograd
Operations on torch tensors remember the computation graph.
This allows to compute the gradient of e.g. a loss node over all other nodes with loss.backward().

In [None]:
x = torch.tensor([[0., 1., 2.], [3., 4., 5.]], requires_grad=True)
y = torch.sigmoid(x)
loss = (((y - 1) ** 2).mean())
print("x", x.grad_fn, ", y", y.grad_fn, ", loss", loss.grad_fn)
print(loss.grad_fn.next_functions)  # type: ignore
print(loss.grad_fn.next_functions[0][0].next_functions)  # type: ignore

The method Tensor.backward():
* for each node with `requires_grad`: computes the gradients from each .grad_fn and propagates it using the chain rule.
* for each leaf node (`requires_grad` with no predecessors that `requires_grad`): accumulates the gradient into the tensor’s `.grad` attribute.



In [None]:
loss.backward()
x.grad

We say *accumulates* not *stores*, because if .grad already exists, `backward()` adds to it, instead of replacing it.

In [None]:
loss2 = ((x * 0).mean())
loss2.backward()
x.grad

During (for example) evaluation, you don't need to store the computational graph, so you can disable it:

In [None]:
with torch.no_grad():
    y = x * x

print(y.grad_fn)

In some situations you may need more fine-grained control.
You can detach a tensor from the graph, to treat it like a constant tensor.

In [None]:
y = x * x
print(y.grad_fn, y.requires_grad)
y = y.detach()  # Not in-place, returns a new tensor with the same data.
print(y.grad_fn, y.requires_grad)

## Modules
A torch **module** (not *model*) is essentially just an object with a `forward()` method, usually `forward(x: Tensor) -> Tensor`.
A network (model) is built as a large module that contains and calls smaller modules, which themselves contain even smaller modules.

Basically any callable could be a module, but subclassing `nn.Module` adds features like:
* automatically registering and enumerating submodules and trainable parameters,
* serializing and storing parameters into files,
* adding hooks in the middle of existing models.

In [None]:
class MyLinear(nn.Module):
    """
    This is essentially the same as nn.Linear.

    Compare: https://github.com/pytorch/pytorch/blob/v2.9.0/torch/nn/modules/linear.py#L53
    """
    def __init__(self, in_features: int, out_features: int) -> None:
        super().__init__()
        self.in_features = in_features
        self.out_features = out_features

        self.weight = nn.Parameter(torch.empty((out_features, in_features)))
        self.bias = nn.Parameter(torch.empty(out_features))

        self.reset_parameters()

    def reset_parameters(self) -> None:
        # The initialization used by PyTorch by default, including non-zero biases.
        bound = 1 / math.sqrt(self.in_features)
        nn.init.uniform_(self.weight, -bound, bound)
        nn.init.uniform_(self.bias, -bound, bound)

    def forward(self, input: Tensor) -> Tensor:
        """
        Input shape: (B*, in_features).
        Output shape: (B*, out_features).
        """
        return input @ self.weight.T + self.bias


We define the module behaviour in the **`forward()`** method, not in the special `__call__()`.<br>
However, we then use the module directly as a callable, `module()`.<br>
This will call `module.forward()`, but can also calls hooks, if any are defined.

In [None]:
module = MyLinear(2, 3)
x = torch.rand((7, 5, 2))
y = module(x)
y.shape

The class **nn.Parameter** just wraps a tensor to indicate that this is a trainable parameter, and changes the default `requires_grad`.<br>
When an nn.Parameter is assigned to a field of an nn.Module, it automatically registers it,
so that all parameters of this modules and its submodules can be easily accessed.

In [None]:
for i, (name, param) in enumerate(module.named_parameters()):
    print(f'{i}. "{name}":\t{param}\n')

## A basic Net

In [None]:
class Net(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(784, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, 10)

    def forward(self, x: Tensor) -> Tensor:
        """
        Input: shape (B, 28, 28).
        Output: class probabilities (after softmax), shape (B, 10).
        """
        x = torch.flatten(x, start_dim=1)  # (B, 728)
        x = self.fc1(x)  # (B, 128)
        x = F.relu(x)
        x = self.fc2(x)  # (B, 128)
        x = F.relu(x)
        x = self.fc3(x)  # (B, 10)
        output = F.log_softmax(x, dim=1)
        return output

Assigning to a nn.Module to a field of nn.Module automatically registers it as a submodule.<br>
You can iterate over all submodules (recursively) with `.named_modules()` or over direct children only with `.named_children()`.

In [None]:
net = Net()
for i, (name, submodule) in enumerate(net.named_modules()):
    print(f'{i}. "{name}":\t{submodule}')

The `.train()` and `.eval()` methods set the `training` flag of a module and all its submodules.<br>
This changes the behaviour of certain modules like BatchNorm and Dropout (see next labs).

Forgetting to switch the training mode is a common bug that can cause weird behaviour.

In [None]:
net.train()
print(net.fc1.training)
net.eval()
print(net.fc1.training)

Note that operations on a module (`.train()`, `.eval()`, `.to(dtype)`, `.to(device)`) modify it in-place.<br>
Operations on a tensor without a trailing underscore (including `.to(dtype)`, `.to(device)`) return a new tensor and leave the original unchanged


## Train and test code

In [None]:
# @title <small>Helper class: TrainingVisualizer</small>

class TrainingVisualizer:
    def __init__(self, log_interval: int = 10):
        self.log_interval = log_interval
        self.train_loss_fig = self.init_line_plot(
            title="Training loss", xaxis_title="Step"
        )
        self.grad_to_weight_fig = self.init_line_plot(
            title="Gradient standard deviation to weight standard deviation ratio at 1st layer",
            xaxis_title="Step",
            yaxis_title="Gradient to weight ratio (log scale)",
            yaxis_type="log",
        )
        self.test_acc_fig = self.init_line_plot(
            title="Test accuracy", x=[], xaxis_title="Epoch", mode="lines+markers"
        )

        # Parameters related to current tracked model and its training
        self.first_linear_layer = None
        self.lr = None
        self.trace_idx = -1

    def init_line_plot(
        self,
        title: str,
        x=None,
        xaxis_title: str | None = None,
        yaxis_title: str | None = None,
        yaxis_type: str = "linear",
        mode: str = "lines",
    ) -> go.FigureWidget:
        fig = go.Figure()
        fig.update_layout(
            title=title,
            title_x=0.5,
            xaxis_title=xaxis_title,
            yaxis_title=yaxis_title,
            height=400,
            width=1500,
            margin=dict(b=10, t=60),
        )
        fig.update_yaxes(type=yaxis_type)
        # We cannot add new traces dynamically because Colab has a problem with widgets
        # from plotly (traces added dynamically are rendered twice).
        # As an ugly workaround we create a lot of empty traces and update them later
        # with actual data. Empty traces are not plotted.
        for _ in range(25):
            fig.add_trace(go.Scatter(x=x, y=[], showlegend=True, mode=mode))

        fig_widget = go.FigureWidget(fig)
        display(fig_widget)
        return fig_widget

    def track_model(
        self, model: torch.nn.Module, optimizer: torch.optim.Optimizer, lr: float
    ) -> None:
        """
        Start tracking training metrics for a new model.
        """

        for field in model.__dict__["_modules"].values():
            if isinstance(field, nn.Linear):
                self.first_linear_layer = field
                break
            elif isinstance(field, nn.ModuleList):
                self.first_linear_layer = field[0]
                break

        self.lr = lr
        self.trace_idx += 1

        optim_name = type(optimizer).__name__
        self.train_loss_fig.data[self.trace_idx].name = f"{optim_name}, {lr}"
        self.grad_to_weight_fig.data[self.trace_idx].name = f"{optim_name}, {lr}"
        self.test_acc_fig.data[self.trace_idx].name = f"{optim_name}, {lr}"

    def plot_gradients_and_loss(self, batch_idx: int, loss: float) -> None:
        if batch_idx % self.log_interval == 0:
            self.train_loss_fig.data[self.trace_idx].y += (loss,)

            layer = self.first_linear_layer
            grad_to_weight_ratio = (
                self.lr * layer.weight.grad.std() / layer.weight.std()
            ).item()

            self.grad_to_weight_fig.data[self.trace_idx].y += (grad_to_weight_ratio,)

    def plot_accuracy(self, epoch: int, accuracy: float) -> None:
        self.test_acc_fig.data[self.trace_idx].x += (epoch,)
        self.test_acc_fig.data[self.trace_idx].y += (accuracy,)

In [None]:
def train_epoch(
    model: torch.nn.Module,
    device: torch.device,
    train_loader: torch.utils.data.DataLoader,
    optimizer: torch.optim.Optimizer,
    epoch: int,
    log_interval: int,
    visualizer: TrainingVisualizer,
    verbose: bool = False,
) -> None:
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        # Move data to the device (e.g. GPU #0) where model parameters are.
        data, target = data.to(device), target.to(device)

        # Since `backward()` will accumuate gradients, remember to reset them.
        optimizer.zero_grad()

        # Forward pass.
        output = model(data)  # Logs of class probabilites, shape (B, 10)
        loss = F.nll_loss(output, target)  # Shape (1,)

        # Backward pass.
        loss.backward()

        visualizer.plot_gradients_and_loss(batch_idx, loss.item())

        # For each parameter, updates the tensor's value using its .grad.
        optimizer.step()

        if batch_idx % log_interval == 0:
            if verbose:
                done, total = batch_idx * len(data), len(train_loader.dataset)
                print(
                    f"Train Epoch: {epoch} [{done}/{total} images ({done / total:.0%})]\t"
                    + f"Loss: {loss.item():.6f}"
                )

In [None]:
def test(
    model: torch.nn.Module,
    device: torch.device,
    test_loader: torch.utils.data.DataLoader,
    epoch: int,
    visualizer: TrainingVisualizer,
    verbose: bool = False,
) -> None:
    model.eval()
    test_loss = 0
    n_correct = 0
    n_total = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            # Accumulate sum of dataitem losses and only divide by n_total at the end.
            test_loss += F.nll_loss(output, target, reduction="sum").item()
            pred = output.argmax(dim=1)
            n_correct += (pred == target).sum().item()
            n_total += len(target)

    test_loss /= n_total

    if verbose:
        print(f"\nTest loss: {test_loss:.4f}, Accuracy: {n_correct}/{n_total} ({n_correct/n_total:.0%})\n")
    visualizer.plot_accuracy(epoch, 100.0 * n_correct / n_total)

## Hyperparameters

In [None]:
# Training uses more memory than test (due to gradient computations),
# so we can set test_batch_size to be larger.
epochs = 5
batch_size = 256
test_batch_size = 1000
lr = 1e-2
log_interval = 10

# Use CUDA if a CUDA GPU is available, otherwise we'll run on CPU.
# You could also use "mps" for Apple Silicon, and some others,
# though support for less common operations may vary.
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")

## Dataset and dataloader
Any sequence of dataitems (and even some iterables) can be used as a ***dataset***.<br>
A dataitem can be for example an (input image, output label) pair.<br>
Torch datasets usually allow passing a transformation: it is then applied whenever you retrieve an item with `dataset[i]`.

A ***dataloader*** manages parallel loading (it can create worker threads that call `dataset[i]` in parallel), batching, shuffling, sometimes moving data between devices.

In [None]:
transform = transforms.Compose(
    [transforms.ToTensor(), transforms.Normalize(mean=(0.1307,), std=(0.3081,))]
)
train_dataset = datasets.MNIST("../data", train=True, download=True, transform=transform)
test_dataset = datasets.MNIST("../data", train=False, transform=transform)
len(train_dataset), len(test_dataset)

In [None]:
kwargs = {"num_workers": 1, "pin_memory": True} if use_cuda else {}
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, **kwargs)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=test_batch_size, **kwargs)

## Running it all

In [None]:
model = Net().to(device)

optimizer = optim.Adam(model.parameters(), lr=lr)

visualizer = TrainingVisualizer(log_interval=log_interval)
visualizer.track_model(model, optimizer, lr)

for epoch in range(1, epochs + 1):
    train_epoch(
        model,
        device,
        train_loader,
        optimizer,
        epoch,
        log_interval,
        visualizer,
        verbose=True,
    )
    test(model, device, test_loader, epoch, visualizer, verbose=True)