In [None]:
try:
    import bayes_nanospace2025
    print("Already installed")
except ImportError:
    %pip install -q "bayes_nanospace2025 @ git+https://github.com/Mads-PeterVC/nanospace2025.git" # Install from GitHub. 
    print("Installed bayes_nanospace2025 from GitHub")

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.spatial.distance import cdist

from scipy.linalg import cho_solve, cho_factor
from bayes_nanospace2025 import plot_gp_prediction, YourCodeHere

## Gaussian Processes

### Agenda

In this notebook we will explore Gaussian Processes; 

The notebook 4 main sections: 

1. The Gaussian process prior
2. The Gaussian process Posterior
3. Wrapping things into a versatile GP class.
4. Using GP model and choosing hyperparameters

#### Exercises

There are small exercises dispersed throughout the notebook, they are indicated by a `YourCodeHere`-function that will raise an error if it has not been replaced by your code. 
As an example, if you encouter something like the cell below

In [None]:
a = 1
b = 1
c = YourCodeHere("Add a and b together")
assert c == 2, "The sum of a and b should be 2"

You should replace it with 

In [None]:
a = 1
b = 1
c = a + b
assert c == 2, "The sum of a and b should be 2"

### Gaussian Process Prior

We will start by seeing how to draw samples from a GP prior, where 

$$
y \sim \mathcal{N}(0, K(X, X))
$$

So we will need to decide on a domain for $X$ and implement a kernel function $K$.
We will start using the radial basis function (RBF) kernel, 

$$
K(x, x^*) = \exp{\left(-\frac{|x-x^*|^2}{2\lambda^2}\right)}
$$
With $|x|$ being the euclidean distance of $x$. 

In the cell below, implement the RBF kernel 

<details>
  <summary><strong><span style="color: lightgreen; font-size: 18px;">💡 Hint (Click arrow to expand) </span></strong></summary>
  <p>With numpy array operations you can calculate K for all distances in one call. E.g. np.exp(dists) applies exponential function to all elements in dists.</p>
</details>

In [None]:
def RBF(x1, x2, length_scale=1.0):
    """
    Radial Basis Function (RBF) kernel.
    """
    dists = cdist(
        x1, x2, "sqeuclidean"
    )  # This computes the squared Euclidean distance between all pairs of points in x1 and x2.
    K = YourCodeHere("Implement the RBF kernel.")  # Your code here.
    return K

X = np.linspace(0, 1, 10).reshape(-1, 1)  # 100 points from 0 to 10
assert RBF(X, X).shape == (10, 10), "The kernel matrix should be of shape (10, 10)"

Now that we have a kernel we can calculate the covariancee between some set of points $X$. 

In the cell below call the RBF function and it will plot the covariance matrix. 

Try it for different values of the `length_scale`-parameter. 

In [None]:
X = np.linspace(0, 1, 100).reshape(-1, 1)  # Create a grid of points
K = YourCodeHere("Compute the covariance matrix using the RBF kernel.")  # Your code here.

fig, ax = plt.subplots(figsize=(4, 4))
im = ax.matshow(K, origin="upper", extent=[0, 1, 1, 0], cmap="Purples")

Now we're ready to sample some functions to do so we will

- Define a set of points $X$. 
- Calculate the covariance between those points using our kernel function. 
- Decide on a prior mean, which can just be $\mu(X) = 0$.
- Sample from a multivariate normal distribution with the mean and covariance we've defined.

The last step is handled by the `np.random.multivariate_normal` function which can be called like so: `np.random.multivariate_normal(mean, covariance, num_samples)`.

In [None]:
# Define set of points
X = YourCodeHere("Create a set of points of shape (n, 1) where n is the number of points.")  # Your code here.

# Calculate the covariance between those points using our kernel function
K = YourCodeHere("Calculate the covariance matrix using the RBF kernel.")  # Your code here.

# Define a prior mean
mean = np.zeros(X.shape[0])  # Mean function, zero everywhere

# Sample from a multivariate normal distribution with the mean and covariance we've defined
n_samples = 10
sample = YourCodeHere("Sample from a multivariate normal distribution with the mean and covariance defined above.")  # Your code here.

fig, ax = plt.subplots(figsize=(4, 4))
l1 = ax.plot(X, sample.T, color="gray")
ax.plot(X, mean, color="black", lw=2, label="Mean")
ax.set_xlim(X.min(), X.max())

Thats it - these are samples from the Gaussian Process prior! Now let's try changing the kernel hyperparameters and see how that influences the sampled functions.

In [None]:
length_scales = [0.01, 0.1, 0.25, 0.5, 1.0]  # Different length scales to explore
X = np.linspace(0, 1, 100).reshape(-1, 1)  # Create a grid of points
mean = np.zeros(X.shape[0])  # Mean function, zero everywhere
n_samples = 10  # Number of samples to draw

fig, axes = plt.subplots(1, 5, figsize=(5 * 3, 3), sharey=True, layout="constrained")
for ax, length_scale in zip(axes, length_scales):
    samples = np.random.multivariate_normal(mean, RBF(X, X, length_scale), n_samples)
    ax.plot(X, samples.T, color="gray")
    ax.set_title(f"Length Scale = {length_scale}")

<div class="alert alert-block alert-success"><b>Takeaway:</b> The types of functions we get are clearly controlled by the kernel and it's hyperparameters. </div>

There are many choices of kernels, such as dot product kernel
$$
K(x, x^*) = (x \cdot x^*)
$$

Periodic kernel
$$
K(x, x^*) = \exp{\left( -\frac{2}{\lambda^2}\sin^2\left( \frac{\pi |x-x^*|}{p} \right) \right)}
$$

In the cell below finish implementing the periodic kernel.

<details>
    <summary><strong><span style="color: lightgreen; font-size: 18px;">💡 Hint - Dot product </span></strong></summary>
    <p> This is easist if you compute the outer-product between x1 and x2 - e.g. with np.outer </p>
  </details>

<br>

<details>
    <summary><strong><span style="color: lightgreen; font-size: 18px;">💡 Hint - Periodic </span></strong></summary>
    <p> Again using array operations this can all be vectorized, so using np.sin(x) computes the sine of all values in x. </p>
  </details>


In [None]:
def dot(x1, x2):
    """
    Dot product between two vectors.
    """
    YourCodeHere("Compute the dot product between two vectors, make sure it returns a matrix of shape (n, m) where n is the number of points in x1 and m is the number of points in x2.")  # Your code here.


def periodic(x1, x2, length_scale=1.0, period=1.0):
    """
    Periodic kernel.
    """
    dists = cdist(x1, x2, "euclidean")  # This computes the Euclidean distance between all pairs of points in x1 and x2.
    K = YourCodeHere("Implement the periodic kernel.")  # Your code here.
    return K

X1 = np.linspace(0, 1, 10).reshape(-1, 1)
X2 = np.linspace(0, 1, 15).reshape(-1, 1)

assert dot(X1, X2).shape == (10, 15), "The dot product should return a matrix of shape (n, m) where n is the number of points in X1 and m is the number of points in X2."
assert periodic(X1, X2, length_scale=0.1, period=0.5).shape == (10, 15), "The periodic kernel should return a matrix of shape (n, m) where n is the number of points in X1 and m is the number of points in X2."

A nice property of kernel functions is that they can be used as building blocks of more complex kernels just by adding and multiplying them together - kernels are **composable**.

For example, we can create a polynomial kernel as the product of two dot product kernels. 

<details>
    <summary><strong><span style="color: lightgreen; font-size: 18px;">💡 Hint - Lambda functions </span></strong></summary>
    <p> See how the kernels in the kernels list are defined using lambda functions, you can do the same 
    and just multiply or add kernels together. </p>
  </details>

In [None]:
kernels = [
    lambda x1, x2: RBF(x1, x2, length_scale=0.1),
    lambda x1, x2: dot(x1, x2),
    lambda x1, x2: periodic(x1, x2, length_scale=0.5, period=np.pi / 8),
]

# Try to make a quadratic kernel as the product of two dot product kernels
quadratic_kernel = YourCodeHere("Implement the quadratic kernel as the product of two dot product kernels in a lambda function .. lambda x1, x2: <your code here>")  # Your code here.
kernels.append(quadratic_kernel)

# Can also make a more complex kernel by adding two kernels together
complex_kernel = lambda x1, x2: RBF(x1, x2, length_scale=0.1) * periodic(x1, x2, length_scale=0.5, period=np.pi / 8) + dot(x1, x2) 
kernels.append(complex_kernel)


length_scales = [0.01, 0.1, 0.25, 0.5, 1.0]  # Different length scales to explore
X = np.linspace(0, 1, 100).reshape(-1, 1)  # Create a grid of points
mean = np.zeros(X.shape[0])  # Mean function, zero everywhere
n_samples = 3  # Number of samples to draw

ncols = len(kernels)

labels = ['RBF', 'Dot Product', 'Periodic', 'Quadratic', 'Complex']

fig, axes = plt.subplots(1, ncols, figsize=(ncols * 3, 3), sharey=True, layout="constrained")
for ax, kernel, label in zip(axes, kernels, labels):
    covariance = kernel(X, X)
    samples = np.random.multivariate_normal(mean, covariance, n_samples)
    ax.plot(X, samples.T)
    ax.set_title(label)

#### Gaussian Process Prior for functions of multiple variables.

So far we've looked only at functions of a single variable, but there's really nothing special about functions of multiple variables - we just need our kernel to describe the covariance 
between sets of multiple variables. 

We've had that our "index set" $X$ is something like this 

$$
X = [x_1, x_2, x_3, ..., x_N]
$$

For more dimensions we just need to redefine our set as 

$$
X = [(x_1, y_1), (x_2, y_2), (x_3, y_3), ..., (x_N, y_N)]
$$

The cell below defines a 2D index set and calculates the RBF kernel.

In [None]:
# Define a set of points in 2D
x = np.linspace(0, 1, 50)
y = np.linspace(0, 1, 50)
x_mesh, y_mesh = np.meshgrid(x, y)
X = np.vstack([x_mesh.ravel(), y_mesh.ravel()]).T

print(f"Shape of X: {X.shape}")  # Should be (2500, 2) for a 50x50 grid

# Calculate the covariance between those points using our kernel function
K = YourCodeHere("Calculate the covariance K(X, X) using one of our kernels - this is no different from the 1D case!")  # Your code here.
fig, ax = plt.subplots(figsize=(4, 4))
im = ax.matshow(K, origin="upper", cmap="Purples")

In [None]:
mean = np.zeros(X.shape[0])  # Mean function, zero everywhere
n_samples = 3  # Number of samples to draw
samples = YourCodeHere("Sample from a multivariate normal distribution with the mean and covariance defined above.")  # Your code here.

fig, axes = plt.subplots(1, 3, figsize=(3 * 3, 3), sharey=True, layout="constrained")

for ax, sample in zip(axes, samples):
    ax.contour(x_mesh, y_mesh, sample.reshape(x_mesh.shape), colors="black")
    ax.contourf(x_mesh, y_mesh, sample.reshape(x_mesh.shape), cmap="viridis")

<div class="alert alert-block alert-success"> <b>Takeaway:</b> GP's generalize naturally to multidimensional functions with everything handled by the kernel. </div>

### Gaussian Process Posterior

In order to use the GP for regression we would like to not just draw random functions, so we condition the GP on the observations. 
Our observations are pairs of coordinates and function values $(X, \mathbf{y})$.

This leads to following equations for the posterior mean and posterior covariance
$$
\mu(X^*) = K(X^*, X)[K(X, X)+\sigma_n^2I]^{-1}\mathbf{y}
$$ 

$$
\Sigma(X^*, X^*) = K(X^*, X^*) - K(X^*, X)[K(X, X) + \sigma^2_n I]^{-1}K(X, X^*)
$$

Where $X^*$ are where we want to query the GP and $\sigma_n^2$ is our assumed noise on the observations.

Both the posterior mean and covariance involve inverting the covariance matrix of the observations

$$
[K(X, X) + \sigma^2_n I]^{-1}
$$

Some care needs to be taken to do this in a stable and accurate way. 

---

Typically the most numerically stable and efficient way of doing so for the types of matrices encountered with GP's is Cholesky decomposition. 
With Cholesky decomposition a matrix $A$ is decomposed like so 

$$
A = LL^*
$$

The matrix $L$ can then be used to efficiently solve for the inverse of $A$ and for computing the determinant of $A$. The core routines of this 
are implemented in SciPy

- `scipy.linalg.cho_factor`: Performs the factorization to get the $L$-matrix.
- `cho_solve`: Solves the linear system of $A x = B$ given the $L$-matrix of $A$. We set $B = I$ to find the inverse.

---

In the cell below we will start by computing the posterior mean.


In [None]:
# Define some observations
X = np.array([0.3, 0.5, 0.7]).reshape(-1, 1)
y = np.array([-0.5, 0.5, 0]).reshape(-1, 1)

# Define the noise level
noise = 0.001
length_scale = 0.1

In [None]:
# Calculate the covariance matrix for the observations
K_obs = RBF(X, X, length_scale=length_scale) # Compute the kernel matrix for the observations.
K_obs += noise * np.eye(len(X))  # Add noise to the diagonal to include observation noise.

# Use Cholesky decomposition to compute the inverse of the covariance matrix
L, _ = cho_factor(K_obs, True)  # Cholesky factorization
K_obs_inv = cho_solve((L, True), np.eye(len(X)))  # Inverse of the covariance matrix

# Evaluate the posterior mean
X_query = np.linspace(0, 1, 100).reshape(-1, 1) # Points where we want to make predictions
K_query = RBF(X_query, X, length_scale=length_scale) # Covariance of query points with observations, make sure to use the same length scale as in K_obs.

print(f"{K_query.shape = } - Should be (100, 3)")
print(f"{K_obs_inv.shape = } - Should be (3, 3)")
print(f"{y.shape = }         - Should be (3, 1)")

Now we're ready to compute the posterior mean as the dot product of the computed matrices

$$
\begin{equation}
\begin{aligned}
\mu(X^*) &= K_{query} \cdot K_{obs}^{-1} \cdot \mathbf{y} \\
&= K_{\mathrm{query}} \cdot K_{\mathrm{obs\_inv}} \cdot \mathbf{y}
\end{aligned}
\end{equation}
$$ 

<details>
    <summary><strong><span style="color: lightgreen; font-size: 18px;">💡 Hint: Dot products </span></strong></summary>
    <p> In NumPy the dot product can be calculated using either np.dot(a, b) or a @ b. </p>
  </details>

In [None]:
mu_posterior = YourCodeHere("Calculate the posterior mean using the covariance between query points and observations, the inverse of the covariance matrix of observations, and the observed values.")  # Your code here.
mu_posterior = mu_posterior.flatten()  # Flatten the mean to make it a 1D array for plotting

In [None]:
fig, ax = plt.subplots(figsize=(4, 4))

ax.plot(X_query, mu_posterior, color="blue", label="Posterior Mean")
ax.scatter(X, y, color="red", label="Observations")
ax.legend()

If you've done it right you should see that the posterior mean closely matches the observations.

If that's the case, we can move onto the posterior covariance, we will also calculate posterior standard deviation 

$$
\sigma(X^*_i) = \Sigma(X^*_i, X^*_i)
$$

$$
\mathrm{std}(X^*_i) = \sqrt{\sigma(X^*_i)}
$$

Or in other words the standard deviations are the square root of the diagonal elements. Again, the posterior covariance is given by

$$
\begin{equation}
\begin{aligned}
\Sigma(X^*, X^*) &= K(X^*, X^*) - K(X^*, X)[K(X, X) + \sigma^2_n I]^{-1}K(X, X^*) \\
&= K_\mathrm{qq} - K_\mathrm{query} \cdot K_{\mathrm{obs\_inv}} \cdot K_\mathrm{query}^T
\end{aligned}
\end{equation}
$$

<details>
    <summary><strong><span style="color: lightgreen; font-size: 18px;">💡 Hint </span></strong></summary>
    <p> This is again a dot product between the computed matrices, so use np.dot or @ to compute the second term. </p>
  </details>

In [None]:
K_qq = RBF(X_query, X_query, length_scale=length_scale) # The covariance matrix for the query points, again same length scale as in K_obs.
cov_posterior = YourCodeHere("Compute the covariance of the posterior distribution.")  # Your code here.

var_posterior = np.diag(cov_posterior)  # Variance of the posterior distribution
std_posterior = np.sqrt(var_posterior)  # Standard deviation of the posterior

Now that we have everything lets just check the shapes and confirm that what we have is 

- The posterior mean $\mu_\mathrm{posterior}$ as an array with as many elements $N_\mathrm{query}$ as were in $X_\mathrm{query}$. 
- The posterior covariance as a matrix of shape $N_\mathrm{query} \times N_\mathrm{query}$.
- Posterior variance and standard deviation each with $N_\mathrm{query}$ elements. 

In [None]:
print(f"{mu_posterior.shape = } - Should be (100,)")
print(f"{cov_posterior.shape = } - Should be (100, 100)")
print(f"{var_posterior.shape = } - Should be (100,)")
print(f"{std_posterior.shape = } - Should be (100,)")

In [None]:
# With the posterior mean and covariance we can draw samples from the posterior distribution
n_samples = 10  # Number of samples to draw
samples = np.random.multivariate_normal(mu_posterior, cov_posterior, n_samples, check_valid="ignore")

In [None]:
fig, ax = plt.subplots(figsize=(7, 4))

ax.scatter(X, y, color="red", label="Observations", zorder=5)
ax.plot(X_query, mu_posterior, color="blue", label="Posterior Mean")
ax.fill_between(
    X_query.flatten(),
    mu_posterior.flatten() + 2 * std_posterior.flatten(),
    mu_posterior.flatten() - 2 * std_posterior.flatten(),
    color="blue",
    alpha=0.2,
    label="Posterior +/- 2 std",
)

l = ax.plot(X_query, samples.T, color="gray", alpha=0.5)
ax.legend()
ax.set_xlim(X_query.min(), X_query.max());

<div class="alert alert-block alert-success"> <b>Takeaway:</b> You've now implemented the predictive equations of a GP! </div>

#### Log marginal likelihood.

The last thing we want to be able to calculate is the **log marginal likelihood** a measure of how well the Gaussian Process with given hyperparameters explains the observed data, balancing data fit and model complexity.

$$
\begin{equation}
\begin{aligned}
\mathrm{LML} = \log p(y | X ) =& -\frac{1}{2} \mathbf{y}^T(K(X, X) + \sigma^2_n I)^{-1}\mathbf{y} \\
& -\frac{1}{2} \log |K(X, X) + \sigma^2_n I| \\
& - \frac{n}{2} \log 2\pi
\end{aligned}
\end{equation}
$$

Where $|...|$ in the second term is the determinant of the matrix. The three terms can be interpreted as 

1. Data fit - how well do the function in the distribution fit the data?
2. Model complexity - how complex is the model? 
3. A normalization constant. 

The first and the second term thus strike some balance that helps avoid overfitting. 

The first term includes the matrix inversion that've already seen how to do with Cholesky decomposition. 
The second term requires calculating the determinant of the same matrix, to ensure numerical stability a little bit of thought 
is required here. 

----

However, it turns out that the determinant can be calculated from the Cholesky factorization $L$. 
Again for a matrix $A$ the Cholesky decompositions is defined through 
$$
A = LL^*
$$
The determinant of $A$ can be stated as 
$$
|A| = \prod_i^n L_{ii}^2
$$
And therefore the log-determinant is 
$$
\log |A| = 2 \sum_i^n \log L_{ii}
$$
We thus get the log-determinant as the sum of the log of the diagonal elements of $L$ - which offers better numerical accuracy and stability compared to 
other methods in my experience. 

----

The LML is typically used to choose hyperparameters, so we will write a function that computes the LML given some data and the GP hyperparameters.

<details>
    <summary><strong><span style="color: lightgreen; font-size: 18px;">💡 Hint 1st LML term: </span></strong></summary>
    <p> The first term is a dot, one way of calculating it is: -0.5 * y.T @ K_inv @ y </p>
</details>

<br>

<details>
    <summary><strong><span style="color: lightgreen; font-size: 18px;">💡 Hint 2nd LML term: </span></strong></summary>
    <p> The second term is the sum of the logarithm of the diagional of L: -np.sum(np.log(np.diag(L))) </p>
</details>

<br>

<details>
    <summary><strong><span style="color: lightgreen; font-size: 18px;">💡 Hint 3rd LML term: </span></strong></summary>
    <p> This is a constant given by -n / 2 * np.log(2 * np.pi) </p>
</details>


In [None]:
def lml(X, y, length_scale=1.0, noise=0.001, amplitude=1.0):
    """
    Log Marginal Likelihood (LML) for Gaussian Process Regression.
    """
    n = X.shape[0]

    K = amplitude * RBF(X, X, length_scale)  # Covariance matrix
    K += noise * np.eye(n)  # Add noise to the diagonal

    L, _ = cho_factor(K, True)  # Cholesky factorization
    K_inv = cho_solve((L, True), np.eye(n))  # Solve for K_inv

    lml_value = 0
    lml_value + = YourCodeHere("Compute the first term of the log marginal likelihood.")  # Your code here.
    lml_value + = YourCodeHere("Compute the second term of the log marginal likelihood.")  # Your code here.
    lml_value + = YourCodeHere("Compute the third term of the log marginal likelihood.")  # Your code here.

    return lml_value.flatten()[0]  # Return as a scalar

In [None]:
X = np.array([0.3, 0.5, 0.7]).reshape(-1, 1)
y = np.array([-0.5, 0.5, 0]).reshape(-1, 1)

lml(X, y, length_scale=0.1, noise=0.001)  # Should give -3.0315

In [None]:
length_scales = [0.1, 0.25, 0.5, 1.0]
noise = 0.001
X = np.array([0.3, 0.5, 0.7]).reshape(-1, 1)
y = np.array([-0.5, 0.5, 0]).reshape(-1, 1)


fig, axes = plt.subplots(1, 4, figsize=(4 * 4, 4), sharey=True, layout="constrained")

for ax, ls in zip(axes, length_scales):
    lml_value = lml(X, y, length_scale=ls, noise=noise)

    K_obs = RBF(X, X, length_scale=ls) # Compute the kernel matrix for the observations.
    K_obs += noise * np.eye(len(X))  # Add noise to the diagonal to include observation noise.

    # Use Cholesky decomposition to compute the inverse of the covariance matrix
    L, _ = cho_factor(K_obs, True)  # Cholesky factorization
    K_obs_inv = cho_solve((L, True), np.eye(len(X)))  # Inverse of the covariance matrix

    # Evaluate the posterior mean
    X_query = np.linspace(0, 1, 100).reshape(-1, 1) # Points where we want to make predictions
    K_query = RBF(X_query, X, length_scale=ls) # Covariance of query points with observations, make sure to use the same length scale as in K_obs.

    mu_posterior = K_query @ K_obs_inv @ y 
    mu_posterior = mu_posterior.flatten()

    K_qq = RBF(X_query, X_query, length_scale=ls) # The covariance matrix for the query points, again same length scale as in K_obs.
    cov_posterior = K_qq - K_query @ K_obs_inv @ K_query.T # Posterior covariance

    var_posterior = np.diag(cov_posterior)  # Variance of the posterior distribution
    std_posterior = np.sqrt(var_posterior)  # Standard deviation of the posterior

    # With the posterior mean and covariance we can draw samples from the posterior distribution
    n_samples = 10  # Number of samples to draw
    samples = np.random.multivariate_normal(mu_posterior, cov_posterior, n_samples, check_valid="ignore")

    ax.scatter(X, y, color="red", label="Observations", zorder=5)
    ax.plot(X_query, mu_posterior, color="blue", label="Posterior Mean")
    ax.fill_between(
        X_query.flatten(),
        mu_posterior.flatten() + 2 * std_posterior.flatten(),
        mu_posterior.flatten() - 2 * std_posterior.flatten(),
        color="blue",
        alpha=0.2,
        label="Posterior +/- 2 std",
    )

    l = ax.plot(X_query, samples.T, color="gray", alpha=0.5)
    ax.legend()
    ax.set_title(f"Length Scale = {ls}, LML = {lml_value:.4f}")
    ax.set_xlim(X_query.min(), X_query.max());
# We can now define a Gaussian Process model that uses the RBF kernel and allows us to condition on data.

<div class="alert alert-block alert-success"> <b>Takeaway:</b> We can use the LML to judge if we are describing the data 
well - we want a large LML as that indicates the data is likely under the model. Notice that in the above figure 
we get an LML value of about -3 with a length-scale of 0.1, and it decreases as the length-scale increases - until 
at a length-scale of 1.0 our GP posterior doesn't even get close to the data-points. </div>

### Optional: Creating a GPR Model.

<div class="alert alert-block alert-info"> <b>Note:</b> You can skip this part and use my implementation instead. But it is good practice to refactor code after having explored it in a less structured way. </div>

So far we've seen all the elements required to construct and utilize a Gaussian process
- The kernel.
- The prior mean and covariance.
- The posterior mean and covariance.
- Sampling from the prior or the posterior.
- Calculating the log marginal likelihood.

However, it is not particularly structured, reusable or versatile. It's typically quite useful to wrap 
all these elements in a "Model"-object, similar to a `torch.Module`, such that when using it we don't need to think 
about carrying the right matrices around. 

This is really a software design thing and there are a number of ways to do so each with benefits for certain use cases.
However, it is typically much easier to understand, use and extend an algorithm if it is written in a concise way.

#### Kernel class

I've implemented a simple `Kernel`-base class that's imported below. 

It really only handles two things; 
- Allows composition of kernels through addition and multiplication.
- Defines a consistent API. 

To implement a kernel with this, two methods have to be defined

- `__init__`: Set the kernel hyperparameters. 
- `__call__`: Compute the kernel given two arrays.

In the cell below implement the RBF kernel by completing the `__call__` method.

In [None]:
from bayes_nanospace2025.tutorial.kernels import Kernel


class RadialBasis(Kernel):
    """
    Radial Basis Function (RBF) kernel.
    """

    def __init__(self, length_scale=1.0):
        super().__init__()
        self.length_scale = length_scale

    def __call__(self, x1, x2=None):
        if x2 is None:
            x2 = x1
        dists = cdist(x1, x2, "sqeuclidean")
        YourCodeHere("Implement the RBF kernel - which you've already done once.")  # Your code here.

With composable kernels we can include noise as a kernel.

There are some implementation subtleties with this, we need the implementation to 
behave like so: 

- $K(X, X)$: Noise added to the diagonal.
- $K(X, Y)$: No noise added. 

We accomplish this by defining that $K(X) = K(X, X)$ - so noise is only added if only one 
input set is provided. 

<details>
    <summary><strong><span style="color: lightgreen; font-size: 18px;">💡 Hint - Identity matrix </span></strong></summary>
    <p> To create an identity matrix use np.eye(n) to get an n by n matrix. This can then be multiplied by the value we want on the diagonal. </p>
  </details>

In [None]:
class Noise(Kernel):
    """
    Noise kernel.
    """

    def __init__(self, noise_level=1e-3):
        super().__init__()
        self.noise_level = noise_level

    def __call__(self, x1, x2=None):
        if x2 is not None:
            return np.zeros((x1.shape[0], x2.shape[0]))
        else:
            K = YourCodeHere("Return a diagonal matrix with noise_level on the diagonal for the noise kernel.")  # Your code here.
            return K

We can check that this behaves as expected;

In [None]:
kernel = RadialBasis(length_scale=0.1) + Noise(noise_level=0.001)

X = np.linspace(0, 1, 10).reshape(-1, 1)  # Create a grid of points

Kx = kernel(X)  # Covariance matrix with itself -> Noise added.
Kxy = kernel(X, X)  # Covariance matrix for two inputs -> No noise added.

print(f"{Kx.max() = } - Should be 1.001")
print(f"{Kxy.max() = } - Should be 1.000")

Finally we will add a `Constant` kernel that just acts as an amplitude.

$$
K(X, X) = \sigma_f
$$

In [None]:
class Constant(Kernel):
    """
    Constant kernel.
    """

    def __init__(self, amplitude=1.0):
        super().__init__()
        self.amplitude = amplitude

    def __call__(self, x1, x2=None):
        return self.amplitude 

Again we can do a little bit of sanity checking

In [None]:
X = np.linspace(0, 1, 10).reshape(-1, 1)  # Create a grid of points
kernel = Constant(amplitude=2.0) * RadialBasis(length_scale=0.1) + Noise(noise_level=0.001)

K = kernel(X, X)
print(f"{K.max() = } - Should be 2.0")
print(f"{K.min() = } - Should be very small.")


Now we have enough kernels to continue, but if you want to implement e.g. the periodic or the dot product kernel as 
one the classes feel free to do so.

#### GP Class

Now we can implement a `GaussianProcess`-class. 

To keep things neat I've defined two dataclasses 

- `PosteriorState`: To contain all the information needed to calculate properties of the posterior. 
- `PredictionResult`: To contain predicted properties, e.g. the mean, variance and covariance at the given query points.

Additionally, this implementation a couple of minor differences compared to what we've seen before. 
Firstly, the prior mean can be set - this changes the equation for the posterior mean: 

$$
\mu(X^*) = K(X^*, X)[K(X, X)+\sigma_n^2I]^{-1}(\mathbf{y}-\mu_\mathrm{prior}(X)) + \mu_\mathrm{prior}(X^*)
$$

Which is equal to what we've used so far, if $\mu_\mathrm{prior} = 0$, this can be useful to e.g. set 
the prior mean to the mean of the observed data reducing the magnitude of what the GP has to fit. 

Secondly, we have now included data noise in the kernel, and therefore we do not need to explicitly add it 
when calculating the inverse

$$
[K(X, X)+\sigma_n^2I]^{-1} \rightarrow [K(X, X)+\sigma_\mathrm{jitter}I]^{-1}
$$

However, for numerical stability a very small diagonal noise term is still added - this is called a *jitter*.

In [None]:
from dataclasses import dataclass
from typing import Callable


@dataclass
class PosteriorState:
    X_train: np.ndarray  # Training inputs
    y_train: np.ndarray  # Training outputs
    K_inv: np.ndarray  # Inverse of the covariance matrix for training inputs
    alpha: np.ndarray  # Vector for computing the posterior mean
    cholesky_factor: np.ndarray # Cholesky L-matrix for the covariance matrix.


@dataclass
class PredictionResult:
    X_query: np.ndarray  # Query points where predictions are made
    mean: np.ndarray  # Posterior mean at query points
    variance: np.ndarray  # Posterior variance at query points
    covariance: np.ndarray  # Posterior covariance matrix at query points

In [None]:

class GaussianProcess:
    def __init__(self, kernel: Kernel, prior_mean: Callable | float | None = None, jitter: float = 1e-9):
        self.kernel = kernel
        self.jitter = jitter
        self.prior_mean = prior_mean if prior_mean is not None else 0
        self.state = None  # This will hold the posterior state after conditioning on observed data

    def condition(self, X_obs, y_obs) -> None:
        """
        Condition the Gaussian Process on observed data.
        """
        K_obs = self.kernel(X_obs)  # Covariance matrix for the observations
        K_obs += self.jitter * np.eye(len(X_obs))  # Add jitter to the diagonal to ensure numerical stability

        # Use Cholesky decomposition to compute the inverse of the covariance matrix
        L, _ = cho_factor(K_obs, lower=True)  # Cholesky factorization
        K_obs_inv = cho_solve((L, True), np.eye(len(X_obs)))  # Inverse of the covariance matrix

        # If prior_mean is callable, evaluate it at X_obs; otherwise, use a constant prior mean
        prior_mean = self.prior_mean(X_obs) if callable(self.prior_mean) else np.full((len(X_obs), 1), self.prior_mean)
        y = y_obs - prior_mean  # Adjust observed outputs by subtracting the prior mean

        # Compute alpha for efficient posterior mean computation
        alpha = K_obs_inv @ y

        self.state = PosteriorState(X_train=X_obs, y_train=y, K_inv=K_obs_inv, alpha=alpha, cholesky_factor=L)

    def predict(self, X_query: np.ndarray) -> PredictionResult:
        """
        Predict the mean and variance at new query points.
        """
        state = self.state

        # Don't worry about this line if it seems confusing, it just computes the prior mean for the query points.
        prior_mean = (
            self.prior_mean(X_query) if callable(self.prior_mean) else np.full((len(X_query), 1), self.prior_mean)
        ).flatten()

        if state is None:
            K_qq = self.kernel(X_query)  # Prior covariance for the query points.
            return PredictionResult(
                X_query=X_query,
                mean=prior_mean,
                variance=np.diag(K_qq),
                covariance=K_qq,
            )

        # Predicting the posterior mean
        K_qt = YourCodeHere("Compute the covariance between query points and training points.")  # Your code here.

        # K_query: (n_query, n_train) - alpha: (n_train, 1) - prior_mean: (n_query, 1)
        mu_posterior = YourCodeHere("Compute the posterior mean at the query points, its a dot product of the covariance between query points and training points with the alpha vector.")  # Your code here.
        mu_posterior = mu_posterior.flatten()  # Flatten the mean to make it a 1D array.
        mu_posterior += prior_mean  # Add the prior mean to the posterior mean

        # Predicting the posterior covariance
        K_qq = YourCodeHere("Covariance matrix for the query points (n_query, n_query).")  # Your code here.
        cov_posterior = YourCodeHere("Compute the posterior covariance matrix - (n_query, n_query).")  # Your code here.

        # Compute the posterior variance as the diagonal of the covariance matrix
        var_posterior = np.diag(cov_posterior).copy() # (n_query, )
        var_posterior[var_posterior < 0] = 0  # Ensure non-negative variance

        return PredictionResult(X_query=X_query, mean=mu_posterior, variance=var_posterior, covariance=cov_posterior)

    def sample(self, X_query: np.ndarray, n_samples: int = 1, cov_jitter: float = 1e-6) -> np.ndarray:
        """
        Sample from the posterior distribution at query points.
        """
        prediction = self.predict(X_query)
        samples = np.random.multivariate_normal(
            prediction.mean.flatten(),
            prediction.covariance + np.eye(prediction.covariance.shape[0]) * cov_jitter,
            n_samples,
            check_valid="ignore",
        )
        return samples

    def log_marginal_likelihood(self) -> float:
        """
        Compute the log marginal likelihood of the observed data.
        """
        y = self.state.y_train
        K_inv = self.state.K_inv
        L = self.state.cholesky_factor

        lml = -0.5 * y.T @ K_inv @ y - np.sum(np.log(np.diag(L))) - len(y) / 2 * np.log(2 * np.pi)
        return lml.flatten()[0]

Now we can try using the model, the cell below just checks some output shapes to ensure everything works as
intended.

In [None]:
X_obs = np.array([0.3, 0.5, 0.7]).reshape(-1, 1)
y_obs = np.array([-0.5, 1, 0]).reshape(-1, 1)
X_query = np.linspace(0, 1, 100).reshape(-1, 1)

# Construct GP
kernel = Constant(1.0) * RadialBasis(length_scale=0.1) + Noise(noise_level=1e-6)
gp = GaussianProcess(kernel)

# First we can check the prior mean and covariance at the query points
prior_prediction = gp.predict(X_query)  # Predict the prior mean and variance at query points

print(f"{prior_prediction.mean.shape = }")  # Should be (100,)
print(f"{prior_prediction.covariance.shape = }")  # Should be (100, 100)
assert prior_prediction.mean.shape == (100,), "Prior mean should be of shape (100,)"
assert prior_prediction.covariance.shape == (100, 100), "Prior covariance should be of shape (100, 100)"

# Then the posterior mean and covariance after conditioning on the observed data
gp.condition(X_obs, y_obs)  # Condition the GP with observed data
posterior_prediction = gp.predict(X_query)  # Predict the prior mean and variance at query points

print(f"{posterior_prediction.mean.shape = }")  # Should be (100,)
print(f"{posterior_prediction.covariance.shape = }")  # Should be (100, 100)
assert posterior_prediction.mean.shape == (100,), "Posterior mean should be of shape (100,)"
assert posterior_prediction.covariance.shape == (100, 100), "Posterior covariance should be of shape (100, 100)"

Now we're ready to try our GP model and plot the results.

The `plot_gp_prediction`-function is just a simple little plotting utility function, you can see its definition using a notebook magic `plot_gp_prediction??` in a code cell. 

In [None]:
# Construct GP
kernel = Constant(1.0) * RadialBasis(length_scale=0.1) + Noise(noise_level=1e-6)
prior_mean = lambda x: 2* np.ones_like(x)  # Constant prior mean function - you can try changing this.
gp = GaussianProcess(kernel=kernel, prior_mean=prior_mean)

# Make some data
X_obs = np.array([0.3, 0.5, 0.7]).reshape(-1, 1)
y_obs = np.array([-0.5, 1, 0]).reshape(-1, 1)
X_query = np.linspace(0, 1, 100).reshape(-1, 1)

# Plotting 
fig, axes = plt.subplots(1, 2, figsize=(2 * 4, 4), sharey=True)
for ax, condition in zip(axes, [False, True]):
    if condition:
        gp.condition(X_obs, y_obs)  # Condition the GP with observed data
        ax.scatter(X_obs, y_obs, color="red", label="Observations")

        lml = gp.log_marginal_likelihood()
        ax.set_title("Conditioned GP - LML: {:.2f}".format(lml))
    else:
        ax.set_title("Unconditioned GP")

    # Query the GP for predictions
    prediction = gp.predict(X_query)
    plot_gp_prediction(ax, prediction)

    samples = gp.sample(X_query, n_samples=10)
    for sample in samples:
        ax.plot(X_query, sample, color="gray", alpha=0.5, zorder=-1)

### Using GP model.

<div class="alert alert-block alert-info"> <b>Note:</b> If you skipped the previous section you can import and use 
my version of the `GaussianProcess`-class by uncommenting the import block below.</div>

In [None]:
# Uncomment the following line to import the GaussianProcess class and related kernels
# Keep commented if you want to use your own version of the GaussianProcess class.
# from bayes_nanospace2025 import GaussianProcess, RadialBasis, Noise, Constant

We will test our GP implementation on the function defined in the next cell.

In [None]:
def f(x):
    """
    A simple function to model.
    """
    return np.sin(2 * np.pi * x) + 0.1 * np.random.randn(*x.shape)

# Generate some training data
np.random.seed(42)  # For reproducibility
n_data = 25
X_obs = np.random.uniform(-5, 5, size=(n_data, 1))
y_obs = f(X_obs)

X_query = np.linspace(-5, 5, 1000).reshape(-1, 1)

Now we can setup the `GaussianProcess` model, condition it on the data and plot its predictions.

In [None]:
# Setup GP & predict
kernel = Constant(1) * RadialBasis(length_scale=0.5) + Noise(noise_level=1e-6)
gp = GaussianProcess(kernel=kernel, prior_mean=0)
gp.condition(X_obs, y_obs)
prediction = gp.predict(X_query)

# Plotting 
fig, ax = plt.subplots(figsize=(6, 4))
plot_gp_prediction(ax, prediction)
ax.scatter(X_obs, y_obs, color="red", label="Observations")

With our nicer implementation we can compute the marginal loglikelihood landscape by creating a grid over kernel parameters and plot the landscape.

In [None]:
# Plot the lml landscape
noise_levels = np.geomspace(0.001, 0.75, 50)
length_scales = np.geomspace(0.01, 5.0, 50)
lenth_mesh, noise_mesh = np.meshgrid(length_scales, noise_levels)
lml_values = np.zeros((len(noise_levels), len(length_scales)))
for i, noise in enumerate(noise_levels):
    for j, length_scale in enumerate(length_scales):
        kernel = Constant(1) * RadialBasis(length_scale=length_scale) + Noise(noise_level=noise)
        gp = GaussianProcess(kernel=kernel, prior_mean=0)
        gp.condition(X_obs, y_obs)
        lml_values[i, j] = gp.log_marginal_likelihood()

lml_values = np.clip(lml_values, a_min=-50, a_max=None)  # Clip values for better visualization

In [None]:
fig, ax = plt.subplots(figsize=(6, 4))

# Make the contour plot
ax.contour(length_scales, noise_levels, lml_values, levels=20, colors="black", linestyles='-')
c = ax.contourf(length_scales, noise_levels, lml_values, levels=20, cmap="Purples")

# Plot the maximum log marginal likelihood
index_0 = np.unravel_index(np.argmax(lml_values), lml_values.shape)
index_1 = np.unravel_index(np.argmax(lml_values - 1000 * (length_scales < 1)), lml_values.shape)

for max_index in [index_0, index_1]:
    ax.plot(length_scales[max_index[1]], noise_levels[max_index[0]], "ro", markersize=10, label="Max LML")


ax.set_xscale("log")
ax.set_yscale("log")
ax.set_xlabel("Length Scale")
ax.set_ylabel("Noise Level")
ax.set_title("Log Marginal Likelihood Landscape")
fig.colorbar(c, ax=ax, label="Log Marginal Likelihood")

We can now make a GP with the optimal hyperparameter values

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(8, 4))

for ax, max_index in zip(axes, [index_0, index_1]):
    length_scale = length_scales[max_index[1]]
    noise_level = noise_levels[max_index[0]]

    kernel = Constant(1) * RadialBasis(length_scale=length_scale) + Noise(noise_level=noise_level)
    gp = GaussianProcess(kernel=kernel, prior_mean=0)
    gp.condition(X_obs, y_obs)
    prediction = gp.predict(X_query)

    ax.scatter(X_obs, y_obs, color="red", label="Observations")
    plot_gp_prediction(ax, prediction)
    ax.set_title(f"Length Scale: {length_scale:.2f}, Noise Level: {noise_level:.2f}")

<div class="alert alert-block alert-success"> <b>Takeaway:</b> The log marginal likelihood landscape proposes two hypothesis for our data; one that is quickly varying with low variance that closely fits the data and one that is slowly varying with high variance. </div>

A grid search is not particularly efficient, so normally gradient-based optimization is employed. 
The analytical gradients of the LML wrt. kernel hyperparameters can be derived but it's tedious - 
so we will stick to a gradient-free optimization procedure. 

In [None]:
def optimize_hyperparameters(initial_guess, X_obs, y_obs):
    from scipy.optimize import minimize

    def calculate_lml(length_scale, noise_level, amplitude):
        kernel = Constant(amplitude) * RadialBasis(length_scale=length_scale) + Noise(noise_level=noise_level)
        gp = GaussianProcess(kernel=kernel, prior_mean=0)
        gp.condition(X_obs, y_obs)
        return gp.log_marginal_likelihood()
    
    bounds = [(1e-6, 5.0), (1e-3, 2.0), (0.1, 10)]  # Bounds for length_scale and noise_level
    result = minimize(
        lambda x: -calculate_lml(x[0], x[1], x[2]),  # We minimize the negative log marginal likelihood
        initial_guess,
        bounds=bounds,
    )
    print(f"Optimal Length Scale: {result.x[0]:.4f}, Optimal Noise Level: {result.x[1]:.4f}, Optimal Amplitude: {result.x[2]:.4f}")
    

    return result.x  # Return the optimal hyperparameters

In [None]:
set_1 = optimize_hyperparameters([0.1, 0.1, 1.0], X_obs, y_obs) # Initial guess for length_scale, noise_level, and amplitude
set_2 = optimize_hyperparameters([2, 0.5, 1.0], X_obs, y_obs) # Another initial guess

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(8, 4))

for ax, x in zip(axes, [set_1, set_2]):
    kernel = Constant(x[2]) * RadialBasis(length_scale=x[0]) + Noise(noise_level=x[1])
    gp = GaussianProcess(kernel=kernel, prior_mean=0)
    gp.condition(X_obs, y_obs)

    predictions = gp.predict(X_query)

    plot_gp_prediction(ax, predictions)
    ax.scatter(X_obs, y_obs, color="red", label="Observations")

<div class="alert alert-block alert-success"> <b>Takeaway:</b> By optimizing the kernel parameters we can also 
find the two ways of describing the data that we saw before. </div>

### Conclusion

We've now seen how Gaussian Processes can be used for regression.

- GP's are distributions over functions. 
- The type of functions are controlled through the prior mean and coviariance/kernel.
- Knowledge about the data we want to model can be encoded through the choice of kernel. 
- Conditioning on observed data makes the functions in the distribution fit the observations. 
- The posterior mean is average of all the functions in the distribution after conditioning. 
- The posterior covariance, variance and standard deviation gives us the ability to evaluate the models confidence about its predictions.
- The marginal log likelihood lets us evaluate the model's ability to describe the data, allowing us to pick appropriate hyperparameters.


Ideally I've left you with the impression that they are versatile models. 

I hope you will try to apply a GP to your own research. If you're working with relatively 
small datasets (<2000ish observations) then GP's are ideal and will provide you a very good baseline model 
that you can use directly or use as a benchmark for other models. 

### Resources 

I've listed a few resources that you might find useful

| Resource | Description| 
| -------- | --------------- |
| [Gaussian Processes for Machine Learning](https://gaussianprocess.org/gpml/chapters/RW.pdf) | Defacto best book on GPs for machine learning, covers many topics in detail | 
| [GPyTorch](https://gpytorch.ai/) | PyTorch implementations of various types of GPR's |
| [GPJax](https://docs.jaxgaussianprocesses.com/) | Jax implementations of various types of GPR's |
| [Scikit-learn GP](https://scikit-learn.org/stable/modules/gaussian_process.html) | Python implementation of GP regression and classification. Good starting point for comparison with more sophisticated implementations. |