<h1 style="font-size:40px;"><center>Exercise 0:<br>Testing your installed enviroment
</center></h1>

# Introduction

This is a very short notebook that test some of the things you'll be using for the computer exercises.

## Last but not least
Have fun!

---

# Importing some packages

In the cell below, we will import needed libraries. 

Run the cell by entering into the cell and press "CTRL Enter".

In [None]:
import torch
device = 'cpu'
# Uncomment this to use CUDA acceleration if available
# device = torch.accelerator.current_accelerator().type if torch.accelerator.is_available() else "cpu"
print(f"PyTorch: Using {device} device")
# The floating point data type can be changed here
dtype_torch = torch.float32

from torch.utils.data import DataLoader, TensorDataset
from torch import nn, Tensor
from collections import OrderedDict
import torchmetrics

import matplotlib.pyplot as plt
import numpy as np

# Generate and display some data
This cell defines the two synthetic datasets.
Run the cell by entering into the cell and press "CTRL Enter".

In [None]:
def syn2(N):
    "Generate data for classification problem in 2D."
    x = np.empty(shape=(N, 2))
    d = np.empty(shape=(N, 1))
    N1 = N // 2

    # Positive samples
    x[:N1,:] = 0.8 + np.random.normal(size=(N1, 2))
    # Negative samples
    x[N1:,:] = -.8 + np.random.normal(size=(N-N1, 2))

    # Target
    d[:N1] = 1
    d[N1:] = 0

    return x, d

def regr1(N, periods=2, damp=False, v=0):
    "Generate data for 1D regression problem with damped cosine and noise"
    dx = 2*periods*np.pi / (N-1)
    x = np.arange(N) * dx

    if damp:
        d = np.cos(x)*np.exp(-x*0.05)
    else:
        d = np.cos(x)
    noise = lambda n : np.random.normal(size=n)
    std_signal = np.std(d)
    d = d + v * std_signal * noise(N)

    return x[:, None], d[:, None]

def standard(x):
    "Mean and stddev across samples"
    return np.mean(x, axis=0), np.std(x, axis=0)

# seed = 0 means random, seed > 0 means fixed
seed = 0
np.random.seed(seed) if seed else None

x, d = syn2(100)
plt.figure()
plt.scatter(x[:,0], x[:,1], c=d)

# Regression, one period, no noise
x, d = regr1(100, 2, False, 0)
plt.figure()
plt.scatter(x, d)

# Regression, 1.5 period, exponential damping, some noise
x, d = regr1(100, 3, True, 0.2)
plt.figure()
plt.scatter(x, d)

# ANN test
     
Run the cell by entering into the cell and press "CTRL Enter".

In [None]:
%%time

class Network(nn.Module):
    "A simple MLP with one or more fully connected layers"

    def __init__(self, *, inputs=1, outputs=1, nodes=[4], activation=nn.Tanh, out_activation=None):
        """
        Args:
            inputs (int, optional): The number of input nodes.
            outputs (int, optional): The number of output nodes.
            nodes (list, optional): A list of layer sizes.
            activation: Activation function (or None for linear). Defaults to nn.Tanh
            out_activation (optional): Activation function for output layer.
        """
        super().__init__()

        seqstack = OrderedDict()
        prevn = inputs
        for i, n in enumerate(nodes):
            seqstack[f"layer{i+1}"] = nn.Linear(prevn, n, dtype=dtype_torch)
            prevn = n
            if activation is not None:
                seqstack[f"act{i+1}"] = activation()
        seqstack["layerN"] = nn.Linear(prevn, outputs, dtype=dtype_torch)
        if out_activation is not None:
            seqstack["actN"] = out_activation()
        self.mlp_stack = nn.Sequential(seqstack)

    def forward(self, x : Tensor):
        "Apply the network stack on some input"
        return self.mlp_stack(x)

    def predict(self, input_data):
        """
        Apply the network on a set of input data.

        Args:
            input_data (np.ndarray): Input data

        Returns:
            pred (np.ndarray): Predicted output.
        """
        self.eval()
        inp = torch.tensor(input_data, dtype=dtype_torch, device=device)
        with torch.no_grad():
            pred = self(inp)
        return pred.cpu().numpy()

    def __str__(self):
        s = super().__str__()
        ps = ["Named parameters:"] + [
            f"{name}: {param.numel()}" for name, param in
             self.mlp_stack.named_parameters() if param.requires_grad]
        totp = sum(p.numel() for p in self.mlp_stack.parameters() if p.requires_grad)
        return s + f"\nTrainable parameters: {totp}\n" + "\n  ".join(ps) + "\n"

def train_epoch(*, model : Network, dataloader : DataLoader,
                loss_fn, optimizer : torch.optim.Optimizer):
    """
    Train a model for a single epoch.

    Args:
        model (Network): The network.
        dataloader (DataLoader): Batch DataLoader with training data.
        loss_fn (Loss): Loss function, e.g. nn.MSELoss.
        optimizer (Optimizer): The optimizer used to update the network.

    Returns:
        train_loss (float): Training error over all batches.
    """
    model.train()
    train_loss = 0
    for X, y in dataloader:
        X, y = X.to(device), y.to(device)   # Move data to GPU if necessary
        optimizer.zero_grad()   # Reset the gradients

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)
        train_loss += loss.item() * len(X)

        # Backpropagation
        loss.backward()
        optimizer.step()
    return train_loss / len(dataloader.dataset)

def test(*, model : Network, dataloader : DataLoader, loss_fn, metrics=[]):
    """
    Test a model on a set of data.

    Args:
        model (Network): The network.
        dataloader (DataLoader): DataLoader with data to test.
        loss_fn (Loss): Loss function, e.g. nn.MSELoss.
        metrics (iterable): Additional metrics from torchmetrics.

    Returns:
        loss (float): Mean error over all batches.
    """
    model.eval()
    loss = 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            loss += loss_fn(pred, y).item() * len(X)
            for m in metrics:
                m.update(pred, y)
    return loss / len(dataloader.dataset)


def train_loop(*, model : Network, train_dataloader : DataLoader,
               val_dataloader : DataLoader = None, loss_fn,
               optimizer : torch.optim.Optimizer, epochs : int,
               print_every:int = 100, metrics=None, print_final=True):
    """
    Train and optionally test a model.

    Args:
        model (Network): The network.
        train_dataloader (DataLoader): Training data.
        val_dataloader (DataLoader, optional): Validation data.
        loss_fn (Loss): Loss function, e.g. nn.MSELoss.
        optimizer (Optimizer): An optimizer from torch.optim.
        epochs (int): Number of epochs to train for.
        print_every (int, optional): Print loss every so many epochs. Defaults to 100.
        metrics (dict(name: metric), optional): Record/print these additional metrics.
        print_final(bool, optional): Print final metrics. Defaults to True.

    Returns:
        train_losses (list(float)): Training loss during each epoch.
        val_losses (list(float)): Validation loss after each epoch.
        metrics_res (dict(name: list(float))): Values of metrics after each epoch.
    """
    train_losses = []
    val_losses = []
    val_loss = np.nan

    # Move metrics to CPU/GPU and prepare for their output
    metrics = {name: m.to(device) for name, m in (metrics or {}).items()}
    metrics_res = {name: [] for name in metrics.keys()}

    for t in range(epochs):
        train_loss = train_epoch(model=model, dataloader=train_dataloader,
                           loss_fn=loss_fn, optimizer=optimizer)
        train_losses.append(train_loss)
        if val_dataloader is not None:
            for m in metrics.values():
                m.reset()
            val_loss = test(dataloader=val_dataloader, model=model,
                            loss_fn=loss_fn, metrics=metrics.values())
            val_losses.append(val_loss)
            for name, m in metrics.items():
                metrics_res[name].append(m.compute().cpu())
        if (print_every > 0 and t % print_every == 0) or (
                print_every >= 0 and t + 1 == epochs):
            extras = [f" {n} {v[-1]:<7f}" if torch.isreal(v[-1])
                      else f" {n} {v[-1]}"
                      for n, v in metrics_res.items()]
            print(f"Epoch {t+1:<7d} train {train_loss:<7f} "
                  f" validation {val_loss:<7f}", "".join(extras))
    if print_final:
        print("\n** Validation metrics after training **\n"
              f"Loss {val_losses[-1]:<7g}")
        for n, v in metrics_res.items():
            if torch.isreal(v[-1]):
                print(f"{n} {v[-1]:<7g}")
            else:
                print(f"{n}:")
                print(v[-1])
        print()
    return train_losses, val_losses, metrics_res

def plot_training(train_loss, val_loss, metrics_res={}):
    "Plot the training history"
    plt.figure()
    plt.ylabel('Loss / Metric')
    plt.xlabel('Epoch')
    plt.plot(train_loss, label="Training loss")
    plt.plot(val_loss, label="Validation loss")
    for name, res in metrics_res.items():
        if torch.isreal(res[0]):
            plt.plot(res, label=name)
    plt.legend(loc='best')
    plt.show()

# Generate training data
x_trn, d_trn = regr1(50, 2, 0, 0.0)

# Standardization of inputs
mu, std = standard(x_trn)
x_trn = (x_trn - mu) / std

# Define the network, cost function and training settings
model_ex1 = Network(
    inputs=1,            # number of input nodes
    outputs=1,           # number of output nodes
    nodes=[4],           # number of nodes in hidden layer
    activation=nn.Tanh,  # activation function in hidden layer
    out_activation=None  # activation function in output layer (if not linear)
    ).to(device)         # move data to GPU or keep with CPU

# Optimization parameters
opt_method = torch.optim.SGD  # minimization method
learning_rate = 0.05          # learning rate
loss_fn = nn.MSELoss()        # loss function, MSE
number_epochs = 4000
minibatch_size = 50

# Additional metrics to print
metrics = {'MSE': torchmetrics.MeanSquaredError()}

# Set up the optimizer
optimizer = opt_method(model_ex1.parameters(), lr=learning_rate)

# Print a summary of the model
print(model_ex1)

# Turn the training data into a dataset with Tensors on the GPU or CPU
dset_trn = TensorDataset(torch.tensor(x_trn, device=device, dtype=dtype_torch),
                         torch.tensor(d_trn, device=device, dtype=dtype_torch))

# Create a batch loader for the training data
dl_trn = DataLoader(dset_trn, batch_size=minibatch_size)

# Train the network and print the progress
train_loss, val_loss, metrics_res = train_loop(
    model=model_ex1,
    train_dataloader=dl_trn,
    val_dataloader=dl_trn, # Test with the training data
    loss_fn=loss_fn,
    metrics=metrics,
    optimizer=optimizer,
    print_every=100,
    epochs=number_epochs)

# Plot the training history
plot_training(train_loss, val_loss, metrics_res)

# Predict output on the training data
d_pred = model_ex1.predict(x_trn)

# Plot the result
plt.figure()
plt.ylabel('Prediction / Target')
plt.xlabel('Input')
plt.scatter(x_trn, d_trn, label='Target')
plt.scatter(x_trn, d_pred, label='Prediction')
plt.title('Prediction vs Target')
plt.legend(loc='best')
plt.show()