# Practical Session: Probabilistic Linear Solvers and Their Applications
Author: Marvin Pförtner

In this tutorial we will implement a **matrix-free** **probabilistic linear solver** (PLS) from scratch and apply it to linear systems arising in two different applications: Gaussian process regression and the numerical solution of a linear PDE.

## Setup

Clone the GitHub repository

`https://github.com/marvinpfoertner/2024-pn-spring-school`

To participate in this tutorial, you will need a fresh Python 3.11 virtual environment (or a conda/miniconda installation).
Activate the environment, and run `pip install -r requirements.txt` in the repository to install all necessary requirements.

If you use conda, you can simply run `conda env create -f environment.yml` in the repository.
This will create a new conda env named `pn-school-linalg` with the correct Python version and all necessary requirements installed automatically.

In either case, run `jupyter lab` and navigate to the tutorial notebook.

If everything is installed correctly, the following two cells should run without errors (the first one might produce a `[KeOps] Warning : ...`).

In [None]:
from matplotlib import pyplot as plt
import numpy as np
import scipy.linalg

import probnum as pn
import linpde_gp

from numpy.typing import ArrayLike
from probnum.typing import LinearOperatorLike

In [None]:
%config InlineBackend.figure_formats = ['svg']
%matplotlib inline

FIGSIZE = (8, 4)
plt.rcParams['figure.figsize'] = FIGSIZE
plt.rcParams["figure.autolayout"] = True
plt.rcParams["text.usetex"] = True

## Matrix-Free Linear Algebra

It is well-known that solving a linear system $A x = b$ with $A \in \mathbb{R}^{n \times n}$ has a worst-case time complexity of $\mathcal{O}(n^3)$.
This is often used to argue that the cost of solving such problems scales prohibitively with $n$.
However, in practice, the quadratic memory cost of storing a (dense) $n \times n$ matrix in memory is arguably much worse.

Luckily, for many problems, so-called **matrix-free** methods tackle both of these problems at the same time.
In contrast to classical linear solvers, they only require access to matrix-vector products with the system matrix, i.e. $v \mapsto A v$.
Many system matrices $A$ arising in practical applications admit efficient computation of the maps $v \mapsto A v$ without incurring $\mathcal{O}(n^2)$ memory cost.
We call the such maps $A[\cdot] : \mathbb{R}^n \to \mathbb{R}^n, v \to A v$ **matrix-free linear operators**.

**Examples of Matrix-Free Linear Operators**:
- sparse matrices (e.g. banded matrices) in $\mathcal{O}(\operatorname{nnz}(A))$ time and space
- diagonal matrices $A = \operatorname{diag}(a)$, since $$A[v] = a \odot v$$ in $\mathcal{O}(n)$ time and space
- symbolic matrices (e.g. kernel Gram matrices) $A_{ij} = F(x_i, x_j)$, since $$(A[v])_i = \sum_{j = 1}^n F(x_i, x_j) v_j$$ in $\mathcal{O}(n^2)$ time and $\mathcal{O}(n)$ space
- low-rank matrices $A = U V^T$ with $U, V \in \mathbb{R}^{n \times r}$ and $r \ll d$, since $$A[v] = U[\underbrace{(V^\top[v])}_{\in \mathbb{R}^n}]$$
- Toeplitz matrices $A = \operatorname{toep}(a)$ with $a \in \mathbb{R}^n$, since $$A[v] = \operatorname{rfft}^{-1}(\operatorname{rfft}(a) \odot \operatorname{rfft}(v))$$
in $\mathcal{O}(n \log n)$ time and $\mathcal{O}(n)$ space
- recursive algebraic combinations of matrix-free linear operators
    - linear combinations $A = \beta B + \gamma C$, since $A[v] = \beta B[v] + \gamma C[v]$
    - composition $A = B C$, since $A[v] = B[C[v]]$
    - Kronecker (and Khatri–Rao) products $A = B \otimes C$ with $B \in \mathbb{R}^{n_B \times n_B}, C \in \mathbb{R}^{n_C \times n_C}$ and $n_B, n_C \in \mathcal{O}(\sqrt{n})$, since $$A[\operatorname{vec}(M)] = \operatorname{vec}(C[B[M^\top]^\top])$$ in $\mathcal{O}(n \sqrt{n})$ time and $\mathcal{O}(n)$ space.
    - block matrices $$A = \begin{pmatrix} B & C \\ D & E \end{pmatrix},$$ since $$A \begin{pmatrix} u \\ v\end{pmatrix} = \begin{pmatrix} B[u] + C[v] \\ D[u] + E[v] \end{pmatrix}$$

The subpackage [`probnum.linops`](https://probnum.readthedocs.io/en/latest/api/linops.html) of the `probnum` Python library implements a small computer algebra system for working with matrix-free linear operators.

<div class="alert alert-block alert-info">

**Task 1:** Implement a [`probnum.linops.LinearOperator`](https://probnum.readthedocs.io/en/latest/api/automod/probnum.linops.LinearOperator.html#probnum.linops.LinearOperator) for representing a symmetric tridiagonal matrix
\begin{equation*}
A =
\begin{pmatrix}
    a_1    & b_1    & 0      & 0      & \cdots & 0      \\
    b_1    & a_2    & b_2    & 0      & \ddots & \vdots \\
    0      & b_2    & a_3    & b_3    & \ddots & 0 \\
    0      & 0      & b_3    & a_4    & \ddots & 0 \\
    \vdots & \ddots & \ddots & \ddots & \ddots   & b_{n - 1} \\
    0      & \cdots & 0      & 0      &  b_{n - 1} & a_n
\end{pmatrix}
\end{equation*}
in $\mathcal{O}(n)$ space.
The scaffolding is already given below, you just have to implement the method `_matmul` realizing $x \mapsto A x$ and the method `_solve` realizing $x \mapsto A^{-1} v$.

*Hint:* For the `_solve` method, the function `scipy.linalg.solve_banded` could come in handy.

</div>

In [None]:
class SymmetricTridiagonalMatrix(pn.linops.LinearOperator):
    def __init__(self, diag: ArrayLike, offdiag: ArrayLike):
        self._diag = np.asarray(diag)
        self._offdiag = np.asarray(offdiag)

        if self._diag.ndim != 1:
            raise ValueError("`diag` must be a 1D array")

        if self._offdiag.shape != (self._diag.size - 1,):
            raise ValueError("`offdiag` must have shape `(diag.size - 1,)`.")

        super().__init__(
            shape=(self._diag.size, self._diag.size),
            dtype=np.promote_types(self._diag.dtype, self._offdiag.dtype),
        )

        self.is_symmetric = True

        # SOLUTION
        self._diags_padded = np.zeros((3, self._diag.size), dtype=self.dtype)
        self._diags_padded[0, 1:] = self._offdiag
        self._diags_padded[1, :] = self._diag
        self._diags_padded[2, :-1] = self._offdiag
        # SOLUTION END

    def _matmul(self, x: np.ndarray) -> np.ndarray:
        """(Vectorized) matrix-matrix product `y = self @ x`.

        Parameters
        ----------
        x
            Stack of matrices of shape `(D1, ..., Dk, N, L)`, where `self.shape == (N, N)`

        Returns
        -------
        Stack `y` of matrices of shape `(D1, ..., Dk, N, L)`, where

        ..code::
            y[i1, ..., ik, :, :] == self @ x[i1, ..., ik, :, :]
        """
        # SOLUTION
        res = self._diag[:, None] * x
        res[1:] += self._offdiag[:, None] * x[..., :-1, :]
        res[:-1] += self._offdiag[:, None] * x[..., 1:, :]
        return res
        # SOLUTION END

    @pn.linops.LinearOperator.broadcast_matmat(method=True)
    def _solve(self, b: np.ndarray) -> np.ndarray:
        """Solves the linear system `self @ x = b`.

        Parameters
        ----------
        b
            Matrix of shape `(N, L)`, where `self.shape == (N, N)`.

        Returns
        -------
        Solution `x` to the linear system of shape `(N, L)`, i.e.

        ..code::
            y[i1, ..., ik, :, :] == A @ x[i1, ..., ik, :, :]
        """
        # SOLUTION
        return scipy.linalg.solve_banded((1, 1), self._diags_padded, b)
        # SOLUTION END

Below you will find some code to test your implementation.

In [None]:
n = 5

diag = np.arange(1, n + 1, dtype=np.double)
offdiag = 2 * np.arange(1, n, dtype=np.double)

A = SymmetricTridiagonalMatrix(diag, offdiag)

To test your implementation of `_matmul`, we can call `LinearOperator.todense`.
Its default implementation simply applies the linear operator to a unit matrix, i.e. $A[I_n]$, which produces a dense representation of the matrix.
If the following cell produces the matrix
\begin{equation*}
\begin{pmatrix}
1 & 2 & 0 & 0 & 0 \\
2 & 2 & 4 & 0 & 0 \\
0 & 4 & 3 & 6 & 0 \\
0 & 0 & 6 & 4 & 8 \\
0 & 0 & 0 & 8 & 5 \\
\end{pmatrix}
\end{equation*}
for $d = 5$, then your code is likely correct.

In [None]:
A.todense()

The implementation of `_solve` is likely correct if `A.inv() @ A` is approximately equal to the identity matrix.

In [None]:
with np.printoptions(precision=5):
    print((A.inv() @ A).todense())

## Probabilistic Linear Solvers

We will now use our matrix-free linear operators to implement a matrix-free probabilistic linear solver (PLS).
In the lectures you have seen that a (solution-based) PLS posits a prior $\mu_0(x) = \mathcal{N}(x; x_0, \Sigma_0)$ over the unknown solution of the linear system $A x = b$, and then proceeds to iteratively compute the conditional distributions
$$\mu_i(x) := \mu_{i - 1}(x \mid s_i^\top A x = s_i^\top b) = \mu_0(x \mid s_j^\top A x = s_j^\top b\quad \forall j \in \{1, \dotsc, i\})$$
for $i < n$.
The so-called actions $s_i \in \mathbb{R}^n$ are generated by a policy, which depends on $\mu_{i - 1}$.
Due to the closure of Gaussian probability measures under conditioning on affine constraints, the conditional distributions $\mu_i$ are also Gaussian
$$\mu_i(x) = \mathcal{N}(x; x_i, \Sigma_i)$$
whose moments admit the recursion
\begin{align*}
x_i    & = x_{i - 1} + \frac{\alpha_i}{\eta_i} d_i \\
\Sigma_i & = \Sigma_{i - 1} - \frac{1}{\eta_i} d_i d_i^\top
\end{align*}
with
\begin{align*}
r_{i - 1} & = b - A[x_{i - 1}] \\
\alpha_i  & = s_i^\top r_{i - 1} \\
d_i       & = \Sigma_{i - 1} A^\top [s_i] \\
\eta_i    & = s_i^\top A d_i.
\end{align*}
Partially resolving the recursion for $\Sigma_i$, we find that
$$\Sigma_i = \Sigma_0 - D_i D_i^\top,$$
where $D_0 = (\ ) \in \mathbb{R}^{n \times 0}$ and $D_i = \begin{pmatrix} D_{i - 1} & \frac{1}{\sqrt{\eta_i}} d_i \end{pmatrix} \in \mathbb{R}^{n \times i}$.
In practice, it is often helpful to implement the recursion in terms of the downdate matrices $D_i$, instead of the full covariance matrix.
In this case, we also need that
$$d_i = (\Sigma_0 - D_i D_i^\top) A^\top[s_i].$$

In this tutorial, we will assume $A$ to be symmetric, positive-definite and we will write specialized code for a specific choice of prior covariance, namely $\Sigma_0 = A^{-1}$.
Combined with the policy $\operatorname{Policy}(\mu_{i - 1}) = b - A x_{i - 1} = r_{i - 1}$, the estimates $x_i$ of the solution coincide with the iterates of the method of conjugate gradients (CG).

<div class="alert alert-box alert-info">

**Task 2:**
Implement a probabilistic linear solver with hard-coded prior $\mathcal{N}(x_0, A^{-1})$ for the linear system $A x = b$.
The scaffolding is already given in the cell below.
You just have to implement the inner loop and the CG policy.
The solver should return $(x_i, D_i)$ instead of $(x_i, \Sigma_i)$.
Note that, under the prior with $\Sigma_0 = A^{-1}$, the expression for $d_i$ simplifies to
\begin{equation*}
    d_i = (I - D_i D_i^\top A) s_i.
\end{equation*}
Pay close attention to keeping the algorithm matrix-free and use the associativity of matrix multiplication to your advantage.

</div>

In [None]:
class CGPolicy:
    def __call__(self, *, i: int, x: np.ndarray, D: np.ndarray, r: np.ndarray, **kwargs):
        # SOLUTION
        return r
        # SOLUTION END

class RandomPolicy:
    def __init__(self, rng: np.random.Generator) -> None:
        self._rng = rng

    def __call__(self, *, r: np.ndarray, **kwargs) -> None:
        return self._rng.normal(size=r.shape)

class CoordinatePolicy:
    def __call__(self, *, i: int, r: np.ndarray, **kwargs):
        s = np.zeros_like(r)
        s[i - 1] = 1
        return s

class EigenvectorPolicy:
    def __init__(self, A: ArrayLike, desc: bool = True):
        _, eigvecs = np.linalg.eigh(A)

        self._eigvecs = eigvecs[:, ::-1] if desc else eigvecs

    def __call__(self, *, i: int, **kwargs):
        return self._eigvecs[:, i]

In [None]:
def problinsolve(
    A: pn.linops.LinearOperatorLike,
    b: ArrayLike,
    x0: ArrayLike | None = None,
    policy: callable = CGPolicy(),
    abstol: float = 1e-10,
    reltol: float = 1e-12,
    maxiter: int | None = None,
):
    A = pn.linops.aslinop(A)
    b = np.asarray(b)

    if x0 is None:
        x0 = np.zeros(A.shape[1])

    x0 = np.asarray(x0, dtype=np.promote_types(A.dtype, b.dtype))

    # Initial Residual
    r0 = b - A @ x0

    # Initial Belief
    x = x0
    D = np.zeros((A.shape[1], 0), dtype=x.dtype, order="F")

    r = r0

    # Stopping Criteria
    tol = max(abstol, reltol * np.linalg.norm(b, 2))

    if maxiter is None:
        maxiter = x.size

    i = 0

    while i < maxiter and np.linalg.norm(r, 2) > tol:
        s = policy(i=i, x=x, D=D, r=r)

        # SOLUTION
        alpha = s @ r
        As = A @ s
        d = s - D @ (D.T @ As)
        eta = As @ d

        x = x + (alpha / eta) * d
        D = np.concatenate((D, np.sqrt(1 / eta) * d[:, None]), axis=1)
        # END SOLUTION

        i = i + 1
        r = b - A @ x

    return x, D

To test your implementation, we use `probnum` to generate random linear systems with spd system matrix.

In [None]:
from probnum.problems.zoo.linalg import random_linear_system, random_spd_matrix

n = 2

linsys = random_linear_system(
    np.random.default_rng(243),
    random_spd_matrix,
    dim=n,
)

linsys

In [None]:
x, D = problinsolve(
    linsys.A,
    linsys.b,
    # maxiter=n / 2,
)

np.testing.assert_allclose(x, linsys.solution)

We can also take a look at the full covariance matrix.
For $i \approx d$, it should be close to zero.

In [None]:
sigma = pn.linops.aslinop(linsys.A).inv() - D @ D.T
sigma.todense()

The matrix $D_i D_i^\top$ is a low-rank approximation of $A^{-1}$.

In [None]:
(D @ D.T) @ linsys.A

## Application: IterGP

Now that we have a working implementation of a probabilistic linear solver, we will apply it to the linear system arising in Gaussian process (GP) regression.
This strategy is the core idea behind IterGP and was already introduced in the lecture.

In GP regression, we posit a Gaussian process prior $f \sim \mathcal{GP}(m, k)$ over an unknown function and we assume that we measure the function at a set $X = (x_i)_{i = 1}^n \in \mathbb{X}^n$ of inputs according to the measurement model $f(X) + \epsilon$, where $\epsilon \sim \mathcal{N}(0, \lambda^2 I_n)$ with $\epsilon \perp\!\!\perp f$.
We then measure $y \in \mathbb{R}^n$.

In [None]:
mean = pn.functions.Zero(())
cov = pn.randprocs.covfuncs.Matern((), 1.5)

prior = pn.randprocs.GaussianProcess(mean, cov)

In [None]:
domain = (-1, 1)

n = 10

# Observation model
noise_std = 0.2
epsilon = pn.randvars.Normal(
    np.zeros(n),
    noise_std ** 2 * pn.linops.Identity(n),
)

# Generate data by sampling from the prior and the observation model
rng = np.random.default_rng(324)
X = rng.uniform(*domain, n)
y = prior(X).sample(rng) + epsilon.sample(rng)

In [None]:
xs_plot = np.linspace(*domain, 100)

prior.plot(
    plt.gca(),
    xs_plot,
    rng=np.random.default_rng(42),
    num_samples=10,
    label=r"Prior $f$",
)

plt.errorbar(
    X,
    y,
    yerr=1.96 * noise_std,
    fmt="+",
    capsize=5,
    label=r"Data $(X, y)$",
)

plt.legend()

To incorporate the measurements into our model, we compute the conditional process $f \mid f(X) + \epsilon = y \sim \mathcal{GP}(m^y, k^y)$, where
\begin{align*}
    m^y(x)        & = m(x) + k(x, X) w\\
    k^y(x_1, x_2) & = k(x_1, x_2) - k(x_1, X) \hat{K}^{-1} k(X, x_2)
\end{align*}
with $w = \hat{K}^{-1} (y - m(X))$ and $\hat{K} = k(X, X) + \lambda^2 I_n$.

<div class="alert alert-box alert-info">

**Task 3:**
Implement the expressions for the conditional mean and the conditional covariance function in the `ConditionalGaussianProcess` class below.
Use a Cholesky decomposition to multiply with $\hat{K}^{-1}$.

*Hints:*
- the methods `self._Khat.solve(..)` and `self._Khat.inv()` use the Cholesky decomposition to multiply with the inverse and to construct the inverse in a lazy fashion, respectively.
- the method `Covariance._kxX` might come in handy for implementing the broadcasting in $k(x_1, X) \hat{K}^{-1} k(X, x_2)$ correctly

</div>

In [None]:
class ConditionalGaussianProcess(pn.randprocs.GaussianProcess):
    def __init__(
        self,
        prior: pn.randprocs.GaussianProcess,
        X: ArrayLike,
        y: ArrayLike,
        noise: pn.randvars.Normal,
    ) -> None:
        self._prior = prior

        self._X = np.asarray(X)
        self._y = np.asarray(y)
        self._noise = noise

        self._Khat = self._prior.cov.linop(self._X) + pn.linops.aslinop(self._noise.cov)
        self._Khat.is_symmetric = True

        # SOLUTION
        self._w = self._Khat.solve(self._y - (self._prior.mean(X) + self._noise.mean))
        # END SOLUTION

        super().__init__(
            mean=ConditionalGaussianProcess.Mean(prior.mean, prior.cov, self._X, self._w),
            cov=ConditionalGaussianProcess.CovarianceFunction(prior.cov, self._X, self._Khat),
        )

    class Mean(pn.functions.Function):
        def __init__(
            self,
            prior_mean: pn.functions.Function,
            prior_cov: pn.randprocs.covfuncs.CovarianceFunction,
            X: np.ndarray,
            w: np.ndarray,
        ) -> None:
            self._prior_mean = prior_mean
            self._prior_cov = prior_cov

            self._X = X

            self._w = w

            super().__init__(
                input_shape=prior_mean.input_shape,
                output_shape=prior_mean.output_shape,
            )

        def _evaluate(self, x: np.ndarray) -> np.ndarray:
            """Evaluate the conditional mean function at location `x`.

            Parameters
            ----------
            x
                Single input of shape `()` or a batch of inputs of shape `(N,)`.

            Returns
            -------
            Posterior mean evaluated at the given input(s).
            Output shape should be `()` for a single input or `(N,)` for a batch of inputs.
            """
            # SOLUTION
            return self._prior_mean(x) + self._prior_cov.linop(x, self._X) @ self._w
            # END SOLUTION

    class CovarianceFunction(pn.randprocs.covfuncs.CovarianceFunction):
        def __init__(
            self,
            prior_cov: pn.randprocs.covfuncs.CovarianceFunction,
            X: np.ndarray,
            Khat: pn.linops.LinearOperator,
        ):
            self._prior_cov = prior_cov

            self._X = X

            self._Khat = Khat

            super().__init__(
                input_shape=prior_cov.input_shape,
                output_shape_0=prior_cov.output_shape_0,
                output_shape_1=prior_cov.output_shape_1,
            )

        def _evaluate(self, x0: np.ndarray, x1: np.ndarray | None) -> np.ndarray:
            """Evaluates the conditional covariance function between locations `x0` and `x1`.

            Parameters
            ----------
            x0
                (Batch of) input(s) with shape `batch_shape_0`.
            x1
                (Batch of) input(s) with shape `batch_shape_1`.
                If this is `None` it is assumed that `x1 == x0`.

            Returns
            -------
            Covariance between function values at `x0` and `x1` as an array with shape
            `np.broadcast_shapes(batch_shape_0, batch_shape_1)`.
            This function has to broadcast correctly according to the rules of NumPy broadcasting.
            For example, if `x0.shape == (100, 1)` and `x1.shape == (1, 100)`, then
            `self._evaluate(x0, x1).shape == (100, 100)` and
            `self._evaluate(x0, x1)[i, j] = self._evaluate(x0[i, 0], x1[j, 0])`.
            """
            # SOLUTION
            k_x0_X = self._kxX(x0)
            k_x1_X = self._kxX(x1) if x1 is not None else k_x0_X

            return self._prior_cov(x0, x1) - (k_x0_X[..., None, :] @ self._Khat.inv() @ k_x1_X[..., :, None])[..., 0, 0]
            # END SOLUTION

        def _kxX(self, x: np.ndarray) -> np.ndarray:
            """Computes test-train cross-covariances according to the prior covariance function.

            Parameters
            ----------
            x
                (Batch of) input(s) with shape `batch_shape`.

            Returns
            -------
            Prior test-train cross-covariances with shape `batch_shape + (n,)`, where `n == X.shape[0]`.
            """
            return self._prior_cov(np.expand_dims(x, axis=x.ndim - self.input_ndim_0), self._X)

In [None]:
posterior_cho = ConditionalGaussianProcess(
    prior,
    X,
    y,
    epsilon,
)

xs_plot = np.linspace(*domain, 100)

posterior_cho.plot(
    plt.gca(),
    xs_plot,
    rng=np.random.default_rng(42),
    num_samples=10,
    label=r"Posterior $f \mid f(X) + \epsilon = y$",
)

plt.errorbar(
    X,
    y,
    yerr=1.96 * noise_std,
    fmt="+",
    capsize=5,
    label=r"Data $(X, y)$",
)

plt.legend()

The Cholesky decomposition used above has a time complexity of $\frac{1}{3} n^3$ (to leading order) and a memory complexity of $\mathcal{O}(n^2)$.
This quickly becomes prohibitive for large datasets.

To address this, the lecture introduced IterGP, a GP approximation method based on a matrix-free probabilistic linear solver.
In essence, in the following, we will use our PLS to solve the system $\hat{K} w = y - m(X)$.
This yields an estimate $w_i$ of the solution and a downdate matrix $D_i$.
As seen in the lecture, we can use these two quantities to implement the approximate conditional mean and covariance functions:
\begin{align*}
    m^y_i(x) & = m(x) + k(x, X) w_i \\
    k^y_i(x_1, x_2) & = k(x_1, x_2) - k(x_1, X) D_i D_i^\top k(X, x_2).
\end{align*}

<div class="alert alert-box alert-info">

**Task 4:**
Implement the expressions for the conditional mean $m^y_i$ and the conditional covariance function $k^y_i$ in the `IterGPConditionalGaussianProcess` class below.
Again, use the associativity of matrix products to your advantage.

</div>

In [None]:
class IterGPConditionalGaussianProcess(pn.randprocs.GaussianProcess):
    def __init__(
        self,
        prior: pn.randprocs.GaussianProcess,
        X: ArrayLike,
        y: ArrayLike,
        noise: pn.randvars.Normal,
        **problinsolve_kwargs,
    ) -> None:
        self._prior = prior

        self._X = np.asarray(X)
        self._y = np.asarray(y)
        self._noise = noise

        self._gram = self._prior.cov.linop(self._X) + pn.linops.aslinop(self._noise.cov)

        self._w, self._D = problinsolve(
            self._gram,
            self._y - (self._prior.mean(X) + self._noise.mean),
            **problinsolve_kwargs,
        )

        super().__init__(
            mean=IterGPConditionalGaussianProcess.Mean(prior.mean, prior.cov, self._X, self._w),
            cov=IterGPConditionalGaussianProcess.CovarianceFunction(prior.cov, self._X, self._D),
        )

    class Mean(pn.functions.Function):
        def __init__(
            self,
            prior_mean: pn.functions.Function,
            prior_cov: pn.randprocs.covfuncs.CovarianceFunction,
            X: np.ndarray,
            w: np.ndarray,
        ) -> None:
            self._prior_mean = prior_mean
            self._prior_cov = prior_cov

            self._X = X

            self._w = w

            super().__init__(
                input_shape=prior_mean.input_shape,
                output_shape=prior_mean.output_shape,
            )

        def _evaluate(self, x: np.ndarray) -> np.ndarray:
            """Evaluate the IterGP conditional mean function at location `x`.

            Parameters
            ----------
            x
                Single input of shape `()` or a batch of inputs of shape `(N,)`.

            Returns
            -------
            Posterior mean evaluated at the given input(s).
            Output shape should be `()` for a single input or `(N,)` for a batch of inputs.
            """
            # SOLUTION
            return self._prior_mean(x) + self._prior_cov.linop(x, self._X) @ self._w
            # END SOLUTION

    class CovarianceFunction(pn.randprocs.covfuncs.CovarianceFunction):
        def __init__(
            self,
            prior_cov: pn.randprocs.covfuncs.CovarianceFunction,
            X: np.ndarray,
            D: np.ndarray,
        ):
            self._prior_cov = prior_cov

            self._X = X

            self._D = D

            super().__init__(
                input_shape=prior_cov.input_shape,
                output_shape_0=prior_cov.output_shape_0,
                output_shape_1=prior_cov.output_shape_1,
            )

        def _evaluate(self, x0: np.ndarray, x1: np.ndarray | None) -> np.ndarray:
            """Evaluates the IterGP conditional covariance function between locations `x0` and `x1`.

            Parameters
            ----------
            x0
                (Batch of) input(s) with shape `batch_shape_0`.
            x1
                (Batch of) input(s) with shape `batch_shape_1`.
                If this is `None` it is assumed that `x1 == x0`.

            Returns
            -------
            Covariance between function values at `x0` and `x1` as an array with shape
            `np.broadcast_shapes(batch_shape_0, batch_shape_1)`.
            This function has to broadcast correctly according to the rules of NumPy broadcasting.
            For example, if `x0.shape == (100, 1)` and `x1.shape == (1, 100)`, then
            `self._evaluate(x0, x1).shape == (100, 100)` and
            `self._evaluate(x0, x1)[i, j] = self._evaluate(x0[i, 0], x1[j, 0])`.
            """
            # SOLUTION
            k_x0_X_D = self._kxX(x0) @ self._D
            k_x1_X_D = self._kxX(x1) @ self._D if x1 is not None else k_x0_X_D

            return self._prior_cov(x0, x1) - (k_x0_X_D[..., None, :] @ k_x1_X_D[..., :, None])[..., 0, 0]
            # END SOLUTION

        def _kxX(self, x: np.ndarray) -> np.ndarray:
            """Computes test-train cross-covariances according to the prior covariance function.

            Parameters
            ----------
            x
                (Batch of) input(s) with shape `batch_shape`.

            Returns
            -------
            Prior test-train cross-covariances with shape `batch_shape + (n,)`, where `n == X.shape[0]`.
            """
            return self._prior_cov(np.expand_dims(x, axis=x.ndim - self.input_ndim_0), self._X)

In [None]:
posterior_itergp = IterGPConditionalGaussianProcess(
    prior,
    X,
    y,
    epsilon,
    maxiter=4,
)

In [None]:
fig, ax = plt.subplots()

posterior_itergp.plot(
    ax,
    xs_plot,
    rng=np.random.default_rng(42),
    num_samples=10,
    label="IterGP Posterior",
)

posterior_cho.plot(
    ax,
    xs_plot,
    label="Exact Posterior",
    color="C3",
)

plt.errorbar(
    X,
    y,
    yerr=1.96 * noise_std,
    fmt="+",
    capsize=5,
    label=r"Data $(X, y)$",
)

plt.legend()

In [None]:
import ipywidgets

%matplotlib widget

fig, ax = plt.subplots(num="IterGP")

def interact(policy: str, maxiter: int):
    if policy == "CG":
        policy = CGPolicy()
    elif policy == "Coordinate":
        policy = CoordinatePolicy()
    elif policy == "Random":
        policy = RandomPolicy(np.random.default_rng(89745))
    else:
        raise IndexError("Unknown policy")

    posterior_itergp = IterGPConditionalGaussianProcess(
        prior,
        X,
        y,
        epsilon,
        policy=policy,
        maxiter=maxiter,
    )

    ax.cla()
    posterior_itergp.plot(
        ax,
        xs_plot,
        rng=np.random.default_rng(42),
        num_samples=10,
        label="IterGP Posterior",
    )

    posterior_cho.plot(
        ax,
        xs_plot,
        label="Exact Posterior",
        color="C3",
    )

    plt.errorbar(
        X,
        y,
        yerr=1.96 * noise_std,
        fmt="+",
        capsize=5,
        label=r"Data $(X, y)$",
    )

    ax.legend()

ipywidgets.interactive(
    interact,
    policy=ipywidgets.ToggleButtons(
        options=['CG', 'Coordinate', 'Random'],
        description='Policy',
    ),
    maxiter=ipywidgets.IntSlider(
        value=int(0.5 * n),
        min=0,
        max=n,
        description="maxiter (%)",
    ),
)

In [None]:
%matplotlib inline

## Application: Galerkin Methods for Linear PDEs

Consider the linear partial differential equation (PDE)
$$-\Delta u = f$$
with $u \colon \overline{\mathbb{D}} \to \mathbb{R}$ and $\mathbb{D} \subset \mathbb{R}^d$ bounded and open, subject to so-called Dirichlet boundary conditions
$$u(x) = g(x)$$
on $x \in \partial \mathbb{D}$, where $f$ and $g$ are known.

In general, this boundary value problem can not be solved in closed-form, so we have to use a numerical method to approximate it's solution.
To this end, we will focus on the **Galerkin methods**, a particularly popular family of numerical methods for solving PDEs.

<hr/>

In the following, I will give a very terse, intuitive overview over Galerkin methods.

<div class="alert alert-box alert-warning">

**Disclaimer:**
Since the main focus of this tutorial is on linear algebra and not on PDEs, the following overview is very crude.
For the sake of brevity, I'm glossing over many important theoretical concepts (e.g. Sobolev spaces).
More complete introductions to these methods and the related theoretical concepts can be found in

- L.C. Evans. *Partial Differential Equations: Second Edition*. AMS, 2010.
- C.A.J. Fletcher. *Computational Galerkin Methods*. Springer, 1984.
- H.P. Langtangen and K.-A. Mardal. *Introduction to Numerical Methods for Variational Problems*. Springer, 2019.

Probabilistic numerical versions of (Petrov-)Galerkin methods and more general methods of weighted residuals are introduced in

M. Pförtner, I. Steinwart, P. Hennig, and J. Wenger. Physics-Informed Gaussian Process Regression Generalizes Linear PDE Solvers. 2022.

</div>

The first step in applying Galerkin methods is to convert the PDE into its so-called **weak** or **variational formulation**.
To this end, let $v: \mathbb{D} \to \mathbb{R}$ be a so-called **test function** with $v \vert_{\partial \mathbb{D}} = 0$.
Integrating $v$ against both sides of the PDE yields
\begin{equation*}
    - \int_{\mathbb{D}} v(x) \Delta u(x)\;\mathrm{d}x
    = \int_{\mathbb{D}} v(x) f(x)\;\mathrm{d}x.
\end{equation*}
If this equation holds for a sufficiently large set of test functions $v$, then it is equivalent to the original PDE.
Applying Green's first identity (think: multivariate integration by parts) to the left-hand site of the equation, we obtain
\begin{equation*}
    \underbrace{\int_{\mathbb{D}} \langle \nabla v(x), \nabla u(x) \rangle\;\mathrm{d}x}_{=: B[u, v]}
    = \underbrace{\int_{\mathbb{D}} v(x) f(x)\;\mathrm{d}x}_{=: l[v]},
\end{equation*}
where the boundary integral vanishes due to the fact that $v \vert_{\partial \mathbb{D}} = 0$.
Note that the expression on the left-hand side makes sense even if $u$ is once differentiable almost everywhere.
This equation, i.e.
\begin{equation*}
    B[u, v] = l[v] \qquad \qquad \forall v
\end{equation*}
is the so-called weak formulation of the original PDE.

The weak formulation constitutes a system of infinitely many equations in infinitely many unknowns.
To render the problem tractable, Galerkin methods proceed by only requiring it to hold for finitely many test functions $\{ \psi_i \}_{i = 1}^n$ and by making the "ansatz"
\begin{equation*}
  u(x) = \sum_{j = 1}^n w_j \phi_j(x)
\end{equation*}
where the so-called **trial functions** $\{\phi_i\}_{j = 1}^n$ are linearly independent and fulfill the boundary conditions.
In other words, we approximate the solution of the PDE in an $n$-dimensional subspace of the infinite-dimensional solution space.
If $\psi_i$ and $\phi_j$ are locally supported, then the resuting approximation scheme is referred to as a **finite element method** (FEM).
Since $B$ is bilinear, this amounts to
\begin{equation*}
    \sum_{j = 1}^n \underbrace{B[\phi_j, \psi_i]}_{=: A_{ij}} w_j = \underbrace{l[\psi_i]}_{=: b_i} \qquad \qquad i = 1, \dotsc, n,
\end{equation*}
or, equivalently, we need to solve the linear system $A w = b$.

<hr/>

For simplicity, we will set $d = 1$ and $\mathbb{D} = [-1, 1]$, as well as $f(x) = c$ and $g(x) = 0$.
Then the BVP amounts to solving
\begin{align*}
    -u''(x) & = c \\
    u(-1)   & = 0 \\
    u(1)    & = 0
\end{align*}
for $u$.
In this case, we have $u(x) = \frac{c}{2}(1 - x^2)$.

In [None]:
domain = (-1, 1)
xs_plot = np.linspace(*domain, 500)

c = 2

def u_star(x: ArrayLike):
    return c / 2 * (1 - x ** 2)

For solving the PDE with a Galerkin method, we choose so-called linear Lagrange elements as test and trial functions, i.e.
$$
\phi_i(x) 
= \psi_i(x)
= \begin{cases}
    \frac{x - x_{i - 1}}{x_i - x_{i - 1}} & \text{if}\ x_{i - 1} \le x \le x_i, \\
    \frac{x_{i + 1} - x}{x_{i + 1} - x_i} & \text{if}\ x_i < x \le x_{i + 1}, \\
    0 & \text{otherwise}.
\end{cases}
$$
on a grid $-1 = x_0 < x_1 < \dotsb < x_n < x_{n + 1} = 1$ which we choose to be regular, i.e. $x_i = (-1) + 2 \frac{i}{n + 1}$.

In [None]:
class UnivariateLinearLagrangeElements(pn.functions.Function):
    def __init__(self, grid: ArrayLike):
        self._grid = np.asarray(grid)

        if self._grid.ndim != 1 or self._grid.size < 3:
            raise ValueError("`grid` must be a 1D array with at least 3 elements")

        self._num_elements = self._grid.size - 2

        super().__init__(
            input_shape=(),
            output_shape=(self._num_elements,),
        )

    def _evaluate(self, x: np.ndarray) -> np.ndarray:
        return np.maximum(
            0,
            np.where(
                x[..., None] < self._grid[1:-1],
                (x[..., None] - self._grid[:-2]) / (self._grid[1:-1] - self._grid[:-2]),
                (self._grid[2:] - x[..., None]) / (self._grid[2:] - self._grid[1:-1]),
            )
        )

In [None]:
n = 5
xs_grid = np.linspace(*domain, n + 2)
phis = UnivariateLinearLagrangeElements(xs_grid)

plt.plot(
    xs_plot,
    phis(xs_plot),
    label=[rf"$\phi_{{{i}}}$" for i in range(n)],
)

plt.legend()

The $\phi_j$ are piecewise linear and locally supported.
Moreover, their linear span amounts to the set of piecewise linear functions that interpolate the coefficients $w_i$ on the grid nodes.

In [None]:
plt.plot(
    xs_plot,
    phis(xs_plot) * u_star(xs_grid[1:-1]),
    label=[rf"$w_{{{i}}} \phi_{{{i}}}$" for i in range(n)],
)

plt.plot(
    xs_plot,
    phis(xs_plot) @ u_star(xs_grid[1:-1]),
    label=r"$\sum_{j = 1}^n w_j \phi_j$",
    color="black",
)

plt.legend()

Computing the entries of the stiffness matrix $A = (B[\phi_j, \psi_i])_{i,j = 1}^n$ constructed by the Galerkin method yields
$$
A =
\frac{1}{\delta x} \cdot
\begin{pmatrix}
 2     & -1     &  0     & \cdots &  0     \\
-1     &  2     & -1     & \ddots & \vdots \\
 0     & -1     &  2     & \ddots &  0     \\
\vdots & \ddots & \ddots & \ddots & -1     \\
 0     & \cdots &  0     & -1     &  2
\end{pmatrix}
$$
i.e. a tridiagonal Toeplitz matrix.
For the force vector $b = (l[\psi_i])_{i = 1}^n$, we obtain $b_i = c \delta_x$.

In [None]:
delta_x = (domain[1] - domain[0]) / (n + 1)

A_fem = SymmetricTridiagonalMatrix(
    np.full(n, 2 / delta_x),
    np.full(n - 1, -1 / delta_x),
)

b_fem = np.full(n, c * delta_x)

We will now solve the resulting linear system using our probabilistic linear solver.

In [None]:
w_fem, D_fem = problinsolve(
    A_fem,
    b_fem,
)

w_fem

<div class="alert alert-box alert-info">

**Task 5:**
The output of the PLS for the linear system $A x = b$ can be interpreted as the random variable $\mathrm{x} \sim \mathcal{N}(x_i, \Sigma_i)$.
Implement a function `pls_randvar` that transforms the current return value of the PLS function into a [`probnum.randvars.Normal`](https://probnum.readthedocs.io/en/v0.1.6/automod/probnum.randvars.Normal.html) object.
The scaffolding is given below.

*Hint:* `LinearOperator.inv()` could come in handy for performance and numerical stability.

</div>

In [None]:
def pls_randvar(x: np.ndarray, D: np.ndarray, A: LinearOperatorLike) -> pn.randvars.Normal:
    A = pn.linops.aslinop(A)
    D = pn.linops.aslinop(D)

    # SOLUTION
    Sigma = A.inv() - D @ D.T
    Sigma.is_symmetric = True


    return pn.randvars.Normal(x, Sigma)
    # END SOLUTION

In [None]:
w_fem_randvar = pls_randvar(
    w_fem,
    D_fem,
    A_fem,
)

Recall that $$u(x) = \sum_{j = 1}^n w_j \phi_j(x).$$
Since our PLS gives us acces to $w \in \mathbb{R}^n$ in the form of a random variable $\mathrm{w} \sim \mathcal{N}(w_i, \Sigma_i)$, we have
$$u = \phi(\cdot)^\top \mathrm{w} \sim \mathcal{GP}(\phi(\cdot)^\top w_i, \phi(\cdot)^\top \Sigma_i \phi(\bullet)),$$
i.e. the pushforward of our belief about the solution $w$ through the trial functions is a parametric GP that encodes our belief about the PDE solution.

In [None]:
u_fem = linpde_gp.randprocs.ParametricGaussianProcess(
    weights=w_fem_randvar,
    feature_fn=phis,
)

In [None]:
u_fem.plot(
    plt.gca(),
    xs_plot,
    rng=np.random.default_rng(2345),
    num_samples=10,
    label=r"$\phi(\cdot)^{\top} \mathrm{w}$",
)

plt.plot(
    xs_plot,
    u_star(xs_plot),
    label=r"$u^\star$",
)

plt.legend()

In [None]:
import ipywidgets

%matplotlib widget

fig, ax = plt.subplots(num="FEM")

def interact(policy: str, maxiter: int):
    if policy == "CG":
        policy = CGPolicy()
    elif policy == "Coordinate":
        policy = CoordinatePolicy()
    elif policy == "Random":
        policy = RandomPolicy(np.random.default_rng(89745))
    elif policy == "Eigenvector (desc)":
        policy = EigenvectorPolicy(A_fem.todense(), desc=True)
    elif policy == "Eigenvector (asc)":
        policy = EigenvectorPolicy(A_fem.todense(), desc=False)
    else:
        raise IndexError("Unknown policy")

    u_fem = linpde_gp.randprocs.ParametricGaussianProcess(
        weights=pls_randvar(
            *problinsolve(
                A_fem,
                b_fem,
                policy=policy,
                maxiter=maxiter,
            ),
            A_fem,
        ),
        feature_fn=phis,
    )

    ax.cla()
    u_fem.plot(
        ax,
        xs_plot,
        rng=np.random.default_rng(2345),
        num_samples=10,
        label=r"$\phi(\cdot)^{\top} \mathrm{w}$",
    )

    ax.plot(
        xs_plot,
        u_star(xs_plot),
        label=r"$u^\star$",
    )

    plt.legend()

    ax.legend()

ipywidgets.interactive(
    interact,
    policy=ipywidgets.ToggleButtons(
        options=['CG', 'Coordinate', 'Random', 'Eigenvector (desc)', 'Eigenvector (asc)'],
        description='Policy',
    ),
    maxiter=ipywidgets.IntSlider(
        value=int(0.5 * n),
        min=0,
        max=n,
        description="maxiter",
    ),
)

In [None]:
%matplotlib inline

In [None]:
plt.plot(np.linalg.eigvalsh(A_fem.todense()))