# **NextGen2024**: Deep Learning Workshop 1 

Within these workshops, we will dive into the learning problem, and explore what control we have as practitioners. Understanding the balance and trade-offs between different variables can provide rich insight into the design and function of modern deep learning systems and directions.

We will explore this within the context of some simple models and toy datasets before moving onto explore these within the context of more modern recent architectures during our next workshop.

Through these workshops I hope to convey a better understanding of the fundamental trade-offs in the design and implementation of deep learning systems.

In [None]:
import torch
import random
import numpy as np
import torch.nn as nn
import matplotlib.pyplot as plt
from torch.utils.data import random_split

from typing import *
from torch import Tensor

# seed training for deterministic results
random.seed(0)
np.random.seed(0)
torch.manual_seed(0)
torch.cuda.manual_seed(0)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True

## [Section 1] Function Approximation

Informally a function is just a system of outputs, numbers in $x$, numbers out $y=f(x)$. If you know the function you can always calculate the correct output $y$ for a given input $x$.

<img src="Linear.png" alt="Computer Vision" width="300"/>

We can formulate nearly any task as a function. Consider object classification in computer vision, given a set of pixel values $x \in \mathbb{R}^{H \times W \times C}$ as input, we want to output a set of probabilities representing the likelihood a class exists in an image $y \in \mathbb{R}^{N_{c}}$. Say we have some target function $f$ which can perfectly perform this task, then for any image we can calculate which classes exists in that image..

<img src="ObjectClassificationDetection.jpg" alt="Computer Vision" width="600"/>

In the case of a simple linear relationship this function is more obvious $f(x) = mx + c$. However in the case of object classification, the function $f$ that maps the input pixels to the output classes probabilities is a little more difficult. So what happens if we don't know the function?

Fundamentally this is what machine learning and deep-learning is about. Given some observations $x$ and $y=f(x)$ from a data generating process, how can we learning the function $f$ that represents or atleast approximates this process.

Let's begin by exploring a simple single dimensional perceptron...

In [None]:
def target_function(x):
    """ Data generating process.
    """
    return 2 * x + 3


class Perceptron(nn.Module):
    """ Perceptron model

    y = Wx + B

    """
    def __init__(self, use_activation=False):
        super(Perceptron, self).__init__()

        # Create linear layer
        # https://pytorch.org/docs/stable/generated/torch.nn.Linear.html
        self.fc = nn.Linear(1, 1)
        self.fc.weight = nn.Parameter(torch.tensor([[1.0]])) # TODO: Modify these values to fit the target function.
        self.fc.bias = nn.Parameter(torch.tensor([0.0]))

        # Activation Function
        # https://pytorch.org/docs/stable/generated/torch.nn.ReLU.html
        self.activation = nn.ReLU() if use_activation else None

    def forward(self, x):
        x = self.activation(self.fc(x)) if self.activation is not None else self.fc(x)
        return x


# Define the inputs x
x = torch.linspace(-10, 10, 100)

# Use the target function to generate the outputs
y = target_function(x)

# Create the model and get the predicted values
with torch.no_grad(): 
    y_p = Perceptron(use_activation=False)(x.unsqueeze(-1)) # TODO: See what happens if you use the activation function.
    
# Plot the results
fig = plt.figure(figsize=(6,6))
ax = fig.add_subplot(111)
ax.scatter(x, y, alpha=0.75, label="Target f(x)")
ax.scatter(x, y_p, alpha=0.75, label="Predicted f_approx(x)")
ax.grid(True, alpha=0.25)
ax.legend(loc="best")
ax.set_xlabel("x")
ax.set_ylabel("y")

We can see that with a single perceptron we can approximate a linear relationship, atleast when we're not using any activation function. However, when we have a slighlty more complex data generating process say $f(x) = 0.5x^{2} - 20$, we end up needing to use more perceptrons - this is akin to using multiple piecewise functions to approximate $f$. In this scenario using non-linearity is essential to be able to approximate this non-linear function.

See if you can hand pick the values of the two perceptrons to approximate the function $f$.

In [None]:
def target_function(x):
    """ Data generating process.
    """
    return 0.5 * x**2 - 20


class TwoPerceptron(nn.Module):
    """ Perceptron model

    y = ReLU(W1*x + B2) + ReLU(W2*x + B2)

    """
    def __init__(self):
        super(TwoPerceptron, self).__init__()

        # Create Single Perceptron
        self.fc1 = nn.Linear(1, 1)
        self.fc1.weight = nn.Parameter(torch.tensor([[...]])) # TODO: Fill in these values e.g. 1.0
        self.fc1.bias = nn.Parameter(torch.tensor([...]))

        # Create Single Perceptron
        self.fc2 = nn.Linear(1, 1)
        self.fc2.weight = nn.Parameter(torch.tensor([[...]]))
        self.fc2.bias = nn.Parameter(torch.tensor([...]))

        # Activation Function
        self.activation = nn.ReLU()


    def forward(self, x):
        x = self.activation(self.fc1(x)) + self.activation(self.fc2(x)) - ... # TODO: Add in some offset
        return x


# Define the inputs x
x = torch.linspace(-10, 10, 100)

# Define the outputs y
y = target_function(x)

# Create the model and get the predicted values
with torch.no_grad():
    y_p = TwoPerceptron()(x.unsqueeze(-1))

# Plot the results
fig = plt.figure(figsize=(6,6))
ax = fig.add_subplot(111)
ax.scatter(x, y, alpha=0.75, label="Target f(x)")
ax.scatter(x, y_p, alpha=0.75, label="Predicted f_approx(x)")
ax.grid(True, alpha=0.25)
ax.legend(loc="best")
ax.set_xlabel("x")
ax.set_ylabel("y")

Using just two perceptrons we can see that we do a pretty poor job of approximating $f$, if we increase the number of perceptrons we can better approximate $f$, however it becomes increasingly more prohibitive to tune as the number of parameters increases.

We could also consider stacking layers of these perceptrons to create a multi-layer perceptron, this would let us model increasingly complex and non-linear functions. Multi-layer perceptrons are provably universal function approximators [Multilayer feedforward networks are universal approximators](https://www.sciencedirect.com/science/article/pii/0893608089900208?via%3Dihub), that is they can approximate any function to any degree of precision within a bounded domain.

---
### [Section 1A] Classification
We can use neural networks to approximate functions, lets see how we can apply this in a classification example.

Consider two clusters sitting on the $xy$-plane, each of the points $P = (x, y)$ is associated with a cluster. We want to map each point $P$ to a cluster.  We can do this by mapping each input 2-D point $(x,y)$ to an output 1-D number $z$. Lets say we want this output number to be negative ($z<0$) if the cluster is orange and positive ($z>0$) if the cluster is blue.

<img src="Clusters.png" alt="Clusters" width="600"/>

We can do this by re-defining our perceptron model from before to use 2 inputs instead of 1. Where as before our perceptron was representing a line $y = F(x)$, it's now representing a plane $z = F(x, y)$.

In [None]:
# Create the Clusters
mu = torch.zeros((50))
std = torch.ones((50))
c1_x = torch.normal(mu, std) + 5
c1_y = torch.normal(mu, std) + 5
c1_z = torch.zeros((50))
c2_x = torch.normal(mu, std) - 5
c2_y = torch.normal(mu, std) - 5
c2_z = torch.zeros((50))

# Create a simple perceptron model
""" #TODO: We've re-defined the perceptron to use 2-inputs.
"""
class Perceptron(nn.Module):
    def __init__(self):
        super(Perceptron, self).__init__()
        self.fc = nn.Linear(2, 1) # NOTE: Change here from 1 input to 2.
        self.fc.weight = nn.Parameter(torch.tensor([[..., ...]])) # TODO: Set some values here to model a plane $f(x,y) = ax + by + c
        self.fc.bias = nn.Parameter(torch.tensor([0.0]))

    def forward(self, x):
        return self.fc(x)

# Evaluate the perceptron on a grid
plane_xy = torch.zeros(100, 100, 3)
model_results = torch.zeros(100, 100, 3)
x_1 = torch.linspace(-10, 10, 100)
x_2 = torch.linspace(-10, 10, 100)
model = Perceptron()
for idx, x1 in enumerate(x_1):
    for jdx, x2 in enumerate(x_2):
        model_results[idx,jdx,0] = x1
        model_results[idx,jdx,1] = x2
        with torch.no_grad(): model_results[idx,jdx,2] = model(torch.tensor([x1, x2]))
        plane_xy[idx, jdx, 0] = x1
        plane_xy[idx, jdx, 1] = x2
        plane_xy[idx, jdx, 2] = 0
plane_xy = plane_xy.reshape(100*100,3)   
model_results = model_results.reshape(100*100, 3)

# Evaluate the perceptron for the cluster points
proj_c1_z = torch.zeros(50)
proj_c2_z = torch.zeros(50)
for idx in range(50):
    # Cluster 1/2
    with torch.no_grad(): proj_c1_z[idx] = model(torch.tensor([c1_x[0], c1_y[0]]))
    with torch.no_grad(): proj_c2_z[idx] = model(torch.tensor([c2_x[0], c2_y[0]]))

# Plot the results
fig = plt.figure(figsize=(8,8))
ax = fig.add_subplot(111, projection="3d")
ax.scatter(plane_xy[:,0], plane_xy[:,1], plane_xy[:,2], color="tab:gray", alpha=0.01, label="XY-plane")
ax.scatter(c1_x, c1_y, c1_z, color="tab:blue", alpha=0.75, label="Cluster 1")
ax.scatter(c2_x, c2_y, c2_z, color="tab:orange", alpha=0.75, label="Cluster 2")
ax.grid(True, alpha=0.25)
ax.legend(loc="best")
ax.set_xlabel("x")
ax.set_ylabel("y")
ax.set_zlabel("z")
ax.set_xlim(-10, 10)
ax.set_ylim(-10, 10)

# Plot the results
fig = plt.figure(figsize=(8,8))
ax = fig.add_subplot(111, projection="3d")
ax.scatter(plane_xy[:,0], plane_xy[:,1], plane_xy[:,2], color="tab:gray", alpha=0.01, label="XY-plane")
ax.scatter(model_results[:,0], model_results[:,1], model_results[:,2], color="tab:olive", alpha=0.05, label="Model")
ax.scatter(c1_x, c1_y, proj_c1_z, color="tab:blue", alpha=0.75, label="Projected Cluster 1")
ax.scatter(c2_x, c2_y, proj_c2_z, color="tab:orange", alpha=0.75, label="Projected Cluster 2")
ax.scatter(c1_x, c1_y, c1_z, color="tab:gray", alpha=0.75)
ax.scatter(c2_x, c2_y, c2_z, color="tab:gray", alpha=0.75)
ax.grid(True, alpha=0.25)
ax.legend(loc="best")
ax.set_xlabel("x")
ax.set_ylabel("y")
ax.set_zlabel("z")
ax.set_xlim(-10, 10)
ax.set_ylim(-10, 10)

We've used our Perceptron model to calculate the height $z$ for each poihnt $P = (x, y)$. We can see that the orange cluster has negative z-values and the blue cluster has positive z-values. Let's say our decision boundary is $z=0$, anything with $z<0$ will belong to the orange cluster and anything with $z>0$ with belong to the blue cluster. Within this example, we have mapped a 2-D input to a 1-D output, this makes it much easier to draw a decision boundary for our classification problem.

<img src="Clusters.png" alt="Clusters" width="600"/>

Within deep-learning we generally deal with much higher dimensional and more complex inputs and models, but the general idea remains the same, we want our model to learn to transform some input space $\mathcal{R}^{in}$ into some useful output space $\mathcal{R}^{out}$. But the problem now requires some more complex and abstract function for our network to learn to approximate.

Okay, that sounds a bit more useful, why don't we use multi-layer perceptrons for everything?

---
---
## [Section 2] The Learning Problem

In reality learning to approximate a function is a balance between the model, dataset and optimization process.


**The Hypothesis Space**: Lets refer to the set of possible models obtained from the training process as the hypothesis space $\mathcal{H}$. We'll also denote the set of possible solutions as the solution space $\mathcal{S}$.

<img src="Slide1.JPG" alt="Slide 1" width="600"/>


In machine learning, the hypothesis space $\mathcal{H}$ is typically determine by of our choice of model $\mathcal{H}_{\theta}$, data $\mathcal{H}_{D}$, and optimization $\mathcal{H}_{o}$ spaces.
- A model can generally never truly represent the lower dimensional rules governing a system.
- A dataset can generally never truly fully represent in a complexity balanced manner the distribution of interest.
- A optimization process may never find the true global minima in the error landscape.

<img src="Slide2.JPG" alt="Slide 2" width="600"/>


**Approximation Error**: The approximation error $\epsilon_{app}$ is the error between our target function $f$ the best possible model $h^{*} \in \mathcal{H}$. This error represents the *BEST* performance we can get out of our hypothesis space.

**Generalization Error**: The generalization error $\epsilon_{gen}$ is the error between our target function $f$ and our trained model $h \in \mathcal{H}$. This error represents the *ACTUAL* performance obtained as a result of the learning process.

**Estimation Error**: The estimation error $\epsilon_{est}$ is the difference in error between our trained model $h$ and the optimal model $h^{*}$.

<img src="Slide3.JPG" alt="Slide 3" width="600"/>

Within this workshop we will explore how the **model** and **dataset** interact and affect the outcome of the learning problem.

---
---
## [Section 3] Spiral Classification

Let's say we have collected some data from some physical system that produces a set of spirals, and we're interested in classifying the different spiral arms generated.

<img src="TwoSpirals.jpg" alt="Spiral" width="300"/>

Thankfully it so happens that the data generated follows a fairly simple target function $r(\theta) = \alpha + \beta * \theta + \mathcal{N}(\mu,\sigma)$, in reality with deep-learning related tasks we rarely know or can even represent this target function.

Let us see what data we've managed to collect first...

#### **Data**

We'll generate some data following this rule, and visualize the results.

In [None]:
""" Lets define our data generating process!
"""
def generateSpiralDataset(
        spiral_num_samples: int,
        spiral_noise: Optional[float] = 1., 
        spiral_revolutions: Optional[float] = 1,
    ):
        """ Lets use two simple archimedes spirals going in opposite directions using the
        following rule: radius = alpha * theta

        Returns:
            Tensor of shape [N * 3] where the columns contain [x-value, y-value, label] 

        """
        # Theta 
        theta = 2 * np.pi * spiral_revolutions * np.sqrt(np.random.rand(spiral_num_samples))

        # Spiral A
        radius_A = 2 * theta + np.pi / 2 # radius increases linearly with theta - anti-clockwise
        x_A = radius_A * np.cos(theta) # x-component
        y_A = radius_A * np.sin(theta) # y-component
        spiral_A_data = np.array([x_A, y_A]).T # pack them together: column 0 = x-values, column 1 = y-values
        spiral_A_data += spiral_noise * np.random.randn(spiral_num_samples,2) # add some noise
        spiral_A_lbls = np.zeros((spiral_num_samples,1)) # spiral A = 0
        spiral_A = np.append(spiral_A_data, spiral_A_lbls, axis=1)
            
        # Spiral B
        radius_B = -2 * theta - np.pi / 2 # clockwise
        x_B = radius_B * np.cos(theta)
        y_B = radius_B * np.sin(theta)
        spiral_B_data = np.array([x_B, y_B]).T
        spiral_B_data += spiral_noise * np.random.randn(spiral_num_samples,2) 
        spiral_B_lbls = np.ones((spiral_num_samples,1)) # spiral B = 1
        spiral_B = np.append(spiral_B_data, spiral_B_lbls, axis=1)

        # Put the data into one big array
        spirals = np.append(spiral_A, spiral_B, axis=0)
        spirals = torch.from_numpy(spirals).to(dtype=torch.float32)

        return spirals

# Generate the spiral dataset
data = generateSpiralDataset(
    spiral_num_samples = 100, 
    spiral_noise = 0.5, 
    spiral_revolutions = 1.0
)

In [None]:
""" Lets plot the dataset, if possible using a color for each spiral...
""" 
# Begin by separating the dataset into the different spirals for plotting...
spiral_A_x = data[data[:,2] == 0][:,0]
spiral_A_y = data[data[:,2] == 0][:,1]
spiral_B_x = data[data[:,2] == 1][:,0]
spiral_B_y = data[data[:,2] == 1][:,1]

# Plot the results
fig = plt.figure(figsize=(6,6))
ax = fig.add_subplot(111)
ax.scatter(spiral_A_x, spiral_A_y, alpha=0.75, label="Spiral A")
ax.scatter(spiral_B_x, spiral_B_y, alpha=0.75, label="Spiral B")
ax.set_title("Spiral Dataset")
ax.grid(True, alpha=0.25)
ax.legend(loc="best")

### **Dataset**

Looks good!

We decide to split up our dataset into training and testing sets, and create a dataset to draw samples from during training.

In [None]:
""" Split up our data into training and testing!
"""
class SpiralDataset(torch.utils.data.Dataset):
    def __init__(self, data: Tensor, batch_size: Optional[int] = None):
        self.data = data
        self.batch_size = batch_size if batch_size is not None else self.data.shape[0]

    def __len__(self) -> int:
        return self.data.shape[0] // self.batch_size

    def __iter__(self) -> Tuple[Tensor, Tensor]:
        """ Yield data from the dataset in batches.
        """ 
        n_samples = self.__len__()
        for idx in range(n_samples):
            # Extract data for current batch 
            sample = self.data[idx*self.batch_size:(idx+1)*self.batch_size]

            # Define inputs: x/y-values
            x = sample[:,0:-1]

            # Define targets: labels
            y = sample[:,-1].unsqueeze(-1)

            yield x, y


# Create a function for splitting the data and creating the datasets
def split_and_create_datasets(data: Tensor, training_fraction: Optional[float] = 0.7) -> Tuple[SpiralDataset, SpiralDataset]:
    # Define the number of samples to use during training vs. testing
    n_train = int(training_fraction * data.shape[0])
    n_test = data.shape[0] - n_train

    # Split the dataset into training/testing set
    train_idxs, valid_idxs = tuple([s.indices for s in random_split(range(data.shape[0]), [n_train, n_test])])
    train_data = data[train_idxs]
    test_data = data[valid_idxs]

    # Create datasets
    train_dataset = SpiralDataset(train_data)
    test_dataset = SpiralDataset(test_data)

    return train_dataset, test_dataset


# Split data and create datasets
train_dataset, test_dataset = split_and_create_datasets(data, training_fraction=0.7)

In [None]:
""" Lets plot the results
"""
# Create a function for plotting the spirals
def plot_spirals_splits(train_data, test_data):
    fig, ax = plt.subplots(figsize=(6,6))
    ax.scatter(train_data[train_data[:,2] == 0,0], train_data[train_data[:,2] == 0,1], alpha=0.15, color="tab:blue", label="Spiral A: Train")
    ax.scatter(train_data[train_data[:,2] == 1,0], train_data[train_data[:,2] == 1,1], alpha=0.15, color="tab:orange", label="Spiral B: Train")
    ax.scatter(test_data[test_data[:,2] == 0,0], test_data[test_data[:,2] == 0,1], color="tab:blue", label="Spiral A: Test")
    ax.scatter(test_data[test_data[:,2] == 1,0], test_data[test_data[:,2] == 1,1], color="tab:orange", label="Spiral B: Test")
    ax.grid(True, alpha=0.25)
    ax.set_title(f"Spiral Dataset")
    ax.legend(loc="best")


# Plot the spirals
plot_spirals_splits(train_dataset.data, test_dataset.data)

### **Model**

We decide to take a stab at modelling this dataset using a multi-layer perceptron, let's consider what we're trying to do...

We decide that for each point $P=(x,y)$ we want to calculate the height $z$, where the height $z$ can be used to represent the class of each spiral. Lets say we want $z<0.5$ to correspond to Spiral $A$ and $z>0.5$ to correspond to Spiral $B$.

<img src="2DSurface.png" alt="Surface" width="600"/>

By using multiple-layers of perceptrons we're hoping to approximate not just a plane as before but a more complex and non-linear 2D surface, through projecting and re-projecting into higher dimensional manifolds within the hidden layers, however the fundamental idea remains the same.

In [None]:
""" Lets define the architecture of our model!
"""
class Perceptron(nn.Module):
    """ Create a single-layer perceptron with ReLU activation function.
    """
    def __init__(self, 
        input_dim: int, 
        output_dim: int
    ):
        super(Perceptron, self).__init__()

        # Create linear layer
        # https://pytorch.org/docs/stable/generated/torch.nn.Linear.html
        self.fc = nn.Linear(input_dim, output_dim)

        # Create non-linear activation function
        # https://pytorch.org/docs/stable/generated/torch.nn.ReLU.html
        self.act = nn.ReLU()

    def forward(self, x):
        x = self.act(self.fc(x))
        return x


class MultiLayerPerceptron(nn.Module):
    """ Create a multi-layer perceptron by stacking multiple layers of perceptrons.
    """
    def __init__(self, 
        input_dim: Optional[int] = 2, 
        output_dim: Optional[int] = 1, 
        hidden_layers: Optional[int] = 0, 
        hidden_dim: Optional[int] = 4
    ):
        super(MultiLayerPerceptron, self).__init__()

        # Create input layer
        modules = [Perceptron(input_dim, hidden_dim)]

        # Create hidden layers
        for _ in range(hidden_layers):
            modules.append(Perceptron(hidden_dim, hidden_dim))

        # Create output layer
        modules.append(Perceptron(hidden_dim, output_dim))

        # Create the model
        self.layers = nn.ModuleList(modules)

    def forward(self, x):
        for layer in self.layers:
            x = layer(x)
        return x

### **Training**

We have our **dataset** and **model**, lets train up our spiral classifier.

In [None]:
""" Next we're going to create the model, optimizer, loss function we want to train!
"""
# device
device = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")

# model
model = MultiLayerPerceptron(input_dim=2, output_dim=1, hidden_layers=1, hidden_dim=4).to(device)

# loss function
loss_fn = torch.nn.MSELoss() # there are better loss functions but we'll use this for now

# optimizer
opt = torch.optim.Adam(model.parameters(), lr=1e-3)

In [None]:
""" Lets train our model!
"""
from tqdm import tqdm


def train_epoch(dataset: Callable, model: Callable, loss_fn: Callable, opt: Callable):
  """ Performs a training epoch.
  """
  losses = []
  for idx, (x, y) in enumerate(dataset):
    # move data to device
    x = x.to(device)
    y = y.to(device)

    # forwards pass
    y_p = model(x)

    # compute the loss
    l = loss_fn(y_p, y)

    # performance the backwards pass (compute gradients)
    l.backward()

    # perform step of the optimizer and clear gradients
    opt.step()
    opt.zero_grad()

    # append loss
    losses.append(l.item())

  return losses, model


def test_epoch(dataset: Callable, model: Callable, loss_fn: Callable):
  """ Performs a testing epoch.
  """
  losses = []
  with torch.no_grad():
    for idx, (x, y) in enumerate(dataset):
      # move data to device
      x = x.to(device)
      y = y.to(device)

      # forwards pass
      y_p = model(x)

      # compute the loss
      l = loss_fn(y_p, y)

      # append loss
      losses.append(l.item())
    
  return losses


# Let's run the training loop!
num_epochs = 1000
train_losses = []
valid_losses = {}
train_loss, valid_loss = .0, .0
with tqdm(range(num_epochs)) as pbar:
  for epoch in pbar:
    # Training
    pbar.set_description(f"[TRAIN] Epoch: {epoch}/{num_epochs} | train loss: {train_loss:.3f} | valid loss: {valid_loss:.3f}")
    _losses, model = train_epoch(train_dataset, model, loss_fn, opt)
    train_losses += _losses
    train_loss = sum(_losses) / len(_losses)
    pbar.set_description(f"[TRAIN] Epoch: {epoch}/{num_epochs} | train loss: {train_loss:.3f} | valid loss: {valid_loss:.3f}")

    # Validation
    pbar.set_description(f"[VALID] Epoch: {epoch}/{num_epochs} | train loss: {train_loss:.3f} | valid loss: {valid_loss:.3f}")
    _losses = test_epoch(test_dataset, model, loss_fn)
    valid_losses[epoch*len(train_dataset)] = _losses[0]
    valid_loss = sum(_losses) / len(_losses)
    pbar.set_description(f"[VALID] Epoch: {epoch}/{num_epochs} | train loss: {train_loss:.3f} | valid loss: {valid_loss:.3f}")

In [None]:
""" Lets plot the training performance using the losses
"""
# Plot losses from training
def plot_training_performance(train_losses, valid_losses):
  fig = plt.figure(figsize=(16,4))
  ax = fig.add_subplot(111)
  ax.plot(train_losses, linestyle="-", color="tab:blue", label="Train", alpha=0.75)
  ax.plot(valid_losses.keys(), valid_losses.values(), linestyle="-", color="tab:orange", label="Valid", alpha=0.75)
  ax.set_xlim(left=0, right=len(train_losses))
  ax.set_ylim(bottom=0, top=1)
  ax.set_xlabel("Iterations")
  ax.set_ylabel("Loss")
  ax.set_title("Training")
  ax.legend(loc="best")
  ax.grid(True, alpha=0.25)


# Plot 
plot_training_performance(train_losses, valid_losses)

In [None]:
""" Lets also check out how the model is making decisions over the input space.
"""
def plot_model_input_space(model, train_dataset, test_dataset, x_min, x_max, x_res: Optional[int] = 120):
    # Create grid points
    X1 = np.linspace(x_min, x_max, x_res).astype(np.float32)
    X2 = np.linspace(x_min, x_max, x_res).astype(np.float32)

    # Re-order the results 
    bg = np.zeros((x_res, x_res))
    for idx, x1 in enumerate(X1):
        for jdx, x2 in enumerate(X2):
            # evaluate model at point
            point = torch.tensor([[x1, x2]])
            with torch.no_grad():
                y_p = model(point.to(device)).cpu()
            
            # store result at point
            bg[jdx, idx] = y_p.item()

    # Clip values
    bg[bg < 0] = .0
    bg[bg > 1] = 1.

    # Define normalization bounds 
    from matplotlib.colors import Normalize
    norm = Normalize(vmin=0, vmax=1)

    # Lets create a custom colorm
    from matplotlib.colors import LinearSegmentedColormap
    colors = ["tab:orange", "white", "tab:blue"]
    cmap = LinearSegmentedColormap.from_list("custom_cmap", colors)

    # Map the values to the colour map
    bg_colors = cmap(bg)[::-1,::-1]

    # Plot the results
    from matplotlib.cm import ScalarMappable
    fig = plt.figure(figsize=(8,8))
    ax = fig.add_subplot(111)
    ax.imshow(bg_colors, extent=[x_min, x_max, x_min, x_max], origin="lower", aspect="auto", alpha=0.25)
    fig.colorbar(ScalarMappable(norm=norm, cmap=cmap), ax=ax, label='Value')

    # Lets also overlay the training and testing data
    ax.scatter(train_dataset.data[train_dataset.data[:,2] == 0,0], train_dataset.data[train_dataset.data[:,2] == 0,1], alpha=0.15, color="tab:blue", label="Spiral A: Train")
    ax.scatter(train_dataset.data[train_dataset.data[:,2] == 1,0], train_dataset.data[train_dataset.data[:,2] == 1,1], alpha=0.15, color="tab:orange", label="Spiral B: Train")
    ax.scatter(test_dataset.data[test_dataset.data[:,2] == 0,0], test_dataset.data[test_dataset.data[:,2] == 0,1], color="tab:blue", label="Spiral A: Test")
    ax.scatter(test_dataset.data[test_dataset.data[:,2] == 1,0], test_dataset.data[test_dataset.data[:,2] == 1,1], color="tab:orange", label="Spiral B: Test")

    # Other figure stuff
    ax.set_title("Network Outputs")
    ax.set_xlim(left=x_min, right=x_max)
    ax.set_ylim(bottom=x_min, top=x_max)
    ax.set_xlabel("x_1")
    ax.set_ylabel("x_2")
    ax.legend(loc="best")
    ax.grid(True, alpha=0.25)


# Plot the values over the input space
plot_model_input_space(model, train_dataset, test_dataset, x_min=-15, x_max=15)

---
### [Section 3A] **Model Bias-Complexity Trade-off**

As we increase the size and complexity of our hypothesis class $\mathcal{H}$ (i.e. creating a larger and higher capacity model), the approximation error $\epsilon_{app}$ generally *decreases* as our best possible model becomes more capable of fitting the target function $f$. 

However, the estimation error $\epsilon_{est}$ **may** also *increase* as the model becomes more capable of over-fitting the data distribution seen during training and not learning the lower dimensional rules governing the data generation process.

<img src="Slide5.JPG" alt="Slide 5" width="600"/>
<img src="Slide6.JPG" alt="Slide 6" width="600"/>

Our learning problem has a fixed complexity, we need to find the balance between under-fitting and over-fitting the training distribution.

In [None]:
# Define the model, loss and optimizer
model = MultiLayerPerceptron(
  input_dim = 2,
  output_dim = 1,
  hidden_layers = 2,
  hidden_dim = 8
).to(device)
loss_fn = torch.nn.MSELoss()
opt = torch.optim.Adam(model.parameters(), lr=1e-3)

# Let's run the training loop!
num_epochs = 1000
train_losses = []
valid_losses = {}
train_loss, valid_loss = .0, .0
with tqdm(range(num_epochs)) as pbar:
  for epoch in pbar:
    # Training
    pbar.set_description(f"[TRAIN] Epoch: {epoch}/{num_epochs} | train loss: {train_loss:.3f} | valid loss: {valid_loss:.3f}")
    _losses, model = train_epoch(train_dataset, model, loss_fn, opt)
    train_losses += _losses
    train_loss = sum(_losses) / len(_losses)
    pbar.set_description(f"[TRAIN] Epoch: {epoch}/{num_epochs} | train loss: {train_loss:.3f} | valid loss: {valid_loss:.3f}")

    # Validation
    pbar.set_description(f"[VALID] Epoch: {epoch}/{num_epochs} | train loss: {train_loss:.3f} | valid loss: {valid_loss:.3f}")
    _losses = test_epoch(test_dataset, model, loss_fn)
    valid_losses[epoch*len(train_dataset)] = _losses[0]
    valid_loss = sum(_losses) / len(_losses)
    pbar.set_description(f"[VALID] Epoch: {epoch}/{num_epochs} | train loss: {train_loss:.3f} | valid loss: {valid_loss:.3f}")

# Plot the training performance
plot_training_performance(train_losses, valid_losses)

# Plot the values over the input space
plot_model_input_space(model, train_dataset, test_dataset, x_min=-15, x_max=15)

---
### [Section 3B] **Dataset Bias-Complexity Trade-off**

However, we also have to consider the other aspects of the learning problem, the complexity of the distribution also plays a role in defining the hypothesis class $\mathcal{H}$. As we increase the size and complexity of the distribution space $\mathcal{H}_{D}$ we generally expect our approximation error $\epsilon_{app}$ to increase for a fixed model. The flip-side of this is also generally true.

Peferably, we would collect a distribution that is fully representative of the rules which govern the data generation process, however this is generally not feasible. We typically like to make sure we have a *good enough* representation with respect to the distribution we expect to see during deployment.

Say we decide to apply our model to another similar problem! Hey this binary star system has spirals, let's assume the spirals are the same as before but just extend for longer.

<img src="CelestialSpiral.jpg" alt="Spiral" width="300"/>

Let's generate this dataset and see how well our previous model fits this data.

In [None]:
# Generate the spiral data : but longer...
data = generateSpiralDataset(
    spiral_num_samples = 400, 
    spiral_noise = 0.5, 
    spiral_revolutions = 2.0
)

# Create the datasets
train_dataset, test_dataset = split_and_create_datasets(data, training_fraction=0.7)

# Plot the datasets
plot_spirals_splits(train_dataset.data, test_dataset.data)

# Plot the values over the input space
plot_model_input_space(model, train_dataset, test_dataset, x_min=-30, x_max=30)

We can clearly see that the model has failed to generalize to our target function $f$, and has simply just over-fit the distribution seen during training. 

However, our model learned this problem before, potentially we just need to re-train the model...

In [None]:
# Define the model, loss and optimizer
model = MultiLayerPerceptron(
    input_dim = 2,
    output_dim = 1,
    hidden_layers = 2,
    hidden_dim = 8
).to(device) #
loss_fn = torch.nn.MSELoss()
opt = torch.optim.Adam(model.parameters(), lr=1e-3)

# Let's run the training loop!
num_epochs = 1000
train_losses = []
valid_losses = {}
train_loss, valid_loss = .0, .0
with tqdm(range(num_epochs)) as pbar:
  for epoch in pbar:
    # Training
    pbar.set_description(f"[TRAIN] Epoch: {epoch}/{num_epochs} | train loss: {train_loss:.3f} | valid loss: {valid_loss:.3f}")
    _losses, model = train_epoch(train_dataset, model, loss_fn, opt)
    train_losses += _losses
    train_loss = sum(_losses) / len(_losses)
    pbar.set_description(f"[TRAIN] Epoch: {epoch}/{num_epochs} | train loss: {train_loss:.3f} | valid loss: {valid_loss:.3f}")

    # Validation
    pbar.set_description(f"[VALID] Epoch: {epoch}/{num_epochs} | train loss: {train_loss:.3f} | valid loss: {valid_loss:.3f}")
    _losses = test_epoch(test_dataset, model, loss_fn)
    valid_losses[epoch*len(train_dataset)] = _losses[0]
    valid_loss = sum(_losses) / len(_losses)
    pbar.set_description(f"[VALID] Epoch: {epoch}/{num_epochs} | train loss: {train_loss:.3f} | valid loss: {valid_loss:.3f}")

# Plot the training performance
plot_training_performance(train_losses, valid_losses)

# Plot the values over the input space
plot_model_input_space(model, train_dataset, test_dataset, x_min=-30, x_max=30)

Within this new spiral dataset we have significantly increased the size and complexity of the distribution $\mathcal{H}_{D}$, our previous model hypothesis space $\mathcal{H}_{\theta}$ was "well-balanced" for the complexity of that distribution. However, with the new learning problem our models representational capacity seems to be ill-suited to this task. 

As with before, we can increase the size and complexity of our model hypothesis space $\mathcal{H}_{\theta}$ by increasing the model size to potentially counter-act this.

In [None]:
# Define the model, loss and optimizer
model = MultiLayerPerceptron(
    input_dim = 2,
    output_dim = 1,
    hidden_layers = 4,
    hidden_dim = 8
).to(device) #
loss_fn = torch.nn.MSELoss()
opt = torch.optim.Adam(model.parameters(), lr=1e-3)

# Let's run the training loop!
num_epochs = 1000
train_losses = []
valid_losses = {}
train_loss, valid_loss = .0, .0
with tqdm(range(num_epochs)) as pbar:
  for epoch in pbar:
    # Training
    pbar.set_description(f"[TRAIN] Epoch: {epoch}/{num_epochs} | train loss: {train_loss:.3f} | valid loss: {valid_loss:.3f}")
    _losses, model = train_epoch(train_dataset, model, loss_fn, opt)
    train_losses += _losses
    train_loss = sum(_losses) / len(_losses)
    pbar.set_description(f"[TRAIN] Epoch: {epoch}/{num_epochs} | train loss: {train_loss:.3f} | valid loss: {valid_loss:.3f}")

    # Validation
    pbar.set_description(f"[VALID] Epoch: {epoch}/{num_epochs} | train loss: {train_loss:.3f} | valid loss: {valid_loss:.3f}")
    _losses = test_epoch(test_dataset, model, loss_fn)
    valid_losses[epoch*len(train_dataset)] = _losses[0]
    valid_loss = sum(_losses) / len(_losses)
    pbar.set_description(f"[VALID] Epoch: {epoch}/{num_epochs} | train loss: {train_loss:.3f} | valid loss: {valid_loss:.3f}")

# Plot the training performance
plot_training_performance(train_losses, valid_losses)

# Plot the values over the input space
plot_model_input_space(model, train_dataset, test_dataset, x_min=-30, x_max=30)

Sometimes, we are lucky enough to be able to simply leveraging scaling laws to increase our model hypothesis space $\mathcal{H}_{\theta}$ to better capture this more complex data distribution. However this isn't always the case, sometimes certain architectures can only take us so far before we run into other limitations or issues inherent in their design... 

Understanding how the inductive biases of a model interacts with a given dataset and learning problem can provide rich insights into better ways to design models or modify the learning problem to better suit our desired outcome.

Trying to figure out a good balance between these two variables can take quite a bit of trial and error and is mostly a heuristic process based on experience and best practices. We can look at the evolution of NLP from Hidden Markov Models to LSTMS to Transformers and see this iterative development to design models to better capture and learn from larger and more complex datasets. Even just simply understanding where you sit in terms of under-fitting or over-fitting can be quite informative, there's a plethora of research into LLM scaling laws such as [Scaling Laws for Neural Language Models](https://arxiv.org/abs/2001.08361) (this paper formed the basis for OpenAI to explore scaling GPT-3 to 175B parameters) and [Training Compute-Optimal Large Language Models](https://arxiv.org/abs/2203.15556v1) are quite good in this regard to see that understanding this fundamental trade-off is an essential aspect in designing performant models.

---
### [Section 3C] **Hacky Prior Knowledge**

We also note that if we look at the models predictions outside the domain it's clear it's still just over-fitting the training distribution and not learning the underlying data generation process.

Within the context of our toy dataset, no matter how we balance the previous factors, we will always have some approximation $\epsilon_{app}$ and thus generalization $\epsilon_{gen}$ error. For a model to learn our target function $f$ it need to be capable of representing the rules governing the data generating process $r(\theta) = \alpha + \beta * \theta + \mathcal{N}(\mu,\sigma)$. Whilst as a universal function approximator an infinitely wide MLP would be able to capture this solution, however, in-reality with finite width and depth networks we can only learn almost anything.

Generally in designing and applying machine learning, having an understanding and intuition of the underlying symmetries of a given problem can aid in incorporating different inductive priors into the model for a given task. Building in priors can act to constraint the hypothesis space $\mathcal{H}$ allowing you to do more with less.

In [None]:
# Generate the spiral data..
data = generateSpiralDataset(
    spiral_num_samples = 400, 
    spiral_noise = 0.5, 
    spiral_revolutions = 2.0
)

""" Say we had some intuition that the data has a relationship with the radius at a given point... 
"""
x = data[:,0]
y = data[:,1]
lbls = data[:,2]
radius = torch.sqrt(x**2 + y**2)
theta = torch.atan2(y, x)
new_data = torch.stack([radius, theta, x, y, lbls], dim=1)

# Create the datasets
cartesian_train_dataset, cartesian_test_dataset = split_and_create_datasets(data, training_fraction=0.7)
new_train_dataset, new_test_dataset = split_and_create_datasets(new_data, training_fraction=0.7)

# Plot the datasets
plot_spirals_splits(cartesian_train_dataset.data, cartesian_test_dataset.data)

# Increase the size of the model until you see if you get slightly better performance...
model = MultiLayerPerceptron(
    input_dim = 4,
    output_dim = 1,
    hidden_layers = 4,
    hidden_dim = 8
).to(device) #
loss_fn = torch.nn.MSELoss()
opt = torch.optim.Adam(model.parameters(), lr=1e-3)

# Let's run the training loop!
num_epochs = 2000
train_losses = []
valid_losses = {}
train_loss, valid_loss = .0, .0
with tqdm(range(num_epochs)) as pbar:
  for epoch in pbar:
    # Training
    pbar.set_description(f"[TRAIN] Epoch: {epoch}/{num_epochs} | train loss: {train_loss:.3f} | valid loss: {valid_loss:.3f}")
    _losses, model = train_epoch(new_train_dataset, model, loss_fn, opt)
    train_losses += _losses
    train_loss = sum(_losses) / len(_losses)
    pbar.set_description(f"[TRAIN] Epoch: {epoch}/{num_epochs} | train loss: {train_loss:.3f} | valid loss: {valid_loss:.3f}")

    # Validation
    pbar.set_description(f"[VALID] Epoch: {epoch}/{num_epochs} | train loss: {train_loss:.3f} | valid loss: {valid_loss:.3f}")
    _losses = test_epoch(new_test_dataset, model, loss_fn)
    valid_losses[epoch*len(train_dataset)] = _losses[0]
    valid_loss = sum(_losses) / len(_losses)
    pbar.set_description(f"[VALID] Epoch: {epoch}/{num_epochs} | train loss: {train_loss:.3f} | valid loss: {valid_loss:.3f}")

# Plot the training performance
plot_training_performance(train_losses, valid_losses)


def plot_model_input_space(model, train_dataset, test_dataset, x_min, x_max, x_res: Optional[int] = 120):
    # Create grid points
    X = np.linspace(x_min, x_max, x_res).astype(np.float32)
    Y = np.linspace(x_min, x_max, x_res).astype(np.float32)

    # Re-order the results 
    import math
    bg = np.zeros((x_res, x_res))
    for idx, x in enumerate(X):
        for jdx, y in enumerate(Y):
            # evaluate model at point
            radius = math.sqrt(x**2 + y**2)
            theta = math.atan2(y, x)
            point = torch.tensor([[radius, theta, x, y]]) # NOTE: Add in the new input features

            with torch.no_grad():
                y_p = model(point.to(device)).cpu()
            
            # store result at point
            bg[jdx, idx] = y_p.item()

    # Clip values
    bg[bg < 0] = .0
    bg[bg > 1] = 1.

    # Define normalization bounds 
    from matplotlib.colors import Normalize
    norm = Normalize(vmin=0, vmax=1)

    # Lets create a custom colorm
    from matplotlib.colors import LinearSegmentedColormap
    colors = ["tab:orange", "white", "tab:blue"]
    cmap = LinearSegmentedColormap.from_list("custom_cmap", colors)

    # Map the values to the colour map
    bg_colors = cmap(bg)[::-1,::-1]

    # Plot the results
    from matplotlib.cm import ScalarMappable
    fig = plt.figure(figsize=(8,8))
    ax = fig.add_subplot(111)
    ax.imshow(bg_colors, extent=[x_min, x_max, x_min, x_max], origin="lower", aspect="auto", alpha=0.25)
    fig.colorbar(ScalarMappable(norm=norm, cmap=cmap), ax=ax, label='Value')

    # Lets also overlay the training and testing data
    ax.scatter(train_dataset.data[train_dataset.data[:,2] == 0,0], train_dataset.data[train_dataset.data[:,2] == 0,1], alpha=0.15, color="tab:blue", label="Spiral A: Train")
    ax.scatter(train_dataset.data[train_dataset.data[:,2] == 1,0], train_dataset.data[train_dataset.data[:,2] == 1,1], alpha=0.15, color="tab:orange", label="Spiral B: Train")
    ax.scatter(test_dataset.data[test_dataset.data[:,2] == 0,0], test_dataset.data[test_dataset.data[:,2] == 0,1], color="tab:blue", label="Spiral A: Test")
    ax.scatter(test_dataset.data[test_dataset.data[:,2] == 1,0], test_dataset.data[test_dataset.data[:,2] == 1,1], color="tab:orange", label="Spiral B: Test")

    # Other figure stuff
    ax.set_title("Network Outputs")
    ax.set_xlim(left=x_min, right=x_max)
    ax.set_ylim(bottom=x_min, top=x_max)
    ax.set_xlabel("x_1")
    ax.set_ylabel("x_2")
    ax.legend(loc="best")
    ax.grid(True, alpha=0.25)


# Plot the values over the input space
plot_model_input_space(model, cartesian_train_dataset, cartesian_test_dataset, x_min=-30, x_max=30)

We note that we didn't do anything particularly sophisticated or even what would generally be considered good practice, but through the "magic" of deep-learning it paid off.

## [Section 4] Building an Informed Model

Here's another example where considering the data can inform the model architecture, here we have a dataset which generates data following the rule $f(x_{1}, x_{2}) = x_{1} + x_{2}$.

See if you can design a model which can learn to represent the underlying data generating process perfectly.

In [None]:
# create a dataset class
class SumDataset(torch.utils.data.Dataset):
  """ Dataset

  Simple dataset which generates data following the rule: 
    y = x_1 + x_2

  Where: 
    x_1 in [low, high] e.g. [0,10]
    x_2 in [low, high] e.g. [0,10]

  """
  def __init__(self, low: int, high: int, batch_size: int):
    self.low = low
    self.high = high
    self.batch_size = batch_size

  def __iter__(self):
    while True:
      yield self.get_batch()

  def get_batch(self):
    # define some random numbers between [low,high]
    x_1 = torch.randint(low=self.low, high=self.high, size=(self.batch_size,1), dtype=torch.float32)
    x_2 = torch.randint(low=self.low, high=self.high, size=(self.batch_size,1), dtype=torch.float32)
    x = torch.cat((x_1, x_2), dim=1)

    # define the target value following our rule
    y = x_1 + x_2

    return x, y

In [None]:
# create a MLP class
class YourModel(nn.Module):
  """ Multi-layer perceptron
  """
  def __init__(self, ...):
    super(YourModel, self).__init__()

    # Create model layers
    modules = []
    modules.append(...)
    
    # Create the model
    self.model = nn.Sequential(*modules)

  def forward(self, x):
    y_p = self.model(x)
    return y_p

In [None]:
# device
device = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")

# model
model = YourModel(...).to(device)

# loss function
loss = torch.nn.MSELoss()

# optimizer
opt = torch.optim.Adam(model.parameters(), lr=1e-3)

# dataset
dataset_train = SumDataset(low=0, high=10, batch_size=16)

In [None]:
from tqdm import tqdm

# outputs
losses = []

#
n_epochs = 20000

# training loop
with tqdm(dataset_train, total=n_epochs) as pbar:
  for idx, (x, y) in enumerate(pbar):
    # move data to device
    x = x.to(device)
    y = y.to(device)

    # forwards pass
    y_p = model(x)

    # compute the loss
    l = loss(y_p, y)

    # performance the backwards pass (compute gradients)
    l.backward()

    # perform step of the optimizer and clear gradients
    opt.step()
    opt.zero_grad()

    # update description
    pbar.set_description(f"[TRAIN] epoch: {idx}/{n_epochs} | loss: {l.item():.4f}")
    t_losses.append(l.item())

    # stopping criterion
    if idx >= n_epochs: break

In [None]:
import matplotlib.pyplot as plt

# plot training losses
fig = plt.figure(figsize=(16,8))
ax = fig.add_subplot(111)
ax.semilogy(t_losses, linestyle="-", color="tab:blue", label="training", alpha=0.75)
ax.set_xlim(left=0, right=len(t_losses))
ax.set_xlabel("Iterations")
ax.set_ylabel("Loss")
ax.set_title("Training Loss")
ax.legend(loc="best")
ax.grid(True, alpha=0.25)

In [None]:
# investigate the model parameters
for name, param in model.named_parameters():
    if param.requires_grad:
        print(f"{name}: {param.data.shape} / {param.data}")