# Kriging from Scratch

Kriging, or Gaussian process regression, uses Gaussian processes to predict values at unobserved locations by computing a weighted average of known values.

A Gaussian process gives us a probability distribution over possible *functions*. It's a stochastic process in which every set of random variables is governed by a mutivariate Gaussian distribution. Why is this a useful way to model phenomena? In short, [lots of things are Gaussian](https://en.wikipedia.org/wiki/Central_limit_theorem) or can be well approximated as Gaussian, and lots quantities you might want to compute can be derived straightforwardly and in closed form when things are Gaussian.

Kriging is especially powerful because it yields not just a single "best fit", but a distribution of functions that fit to the observations. This makes for a more robust approach, with which we can quantify predictive uncertainty and better inform downstream decision-making.

We want to approximate a function $f: \mathbb{R}^d \rightarrow \mathbb{R}$ as best we can, using only a set of observations at points $\{x_1, x_2, \ldots, x_n\}$ with corresponding values $\{y_1, y_2, \ldots, y_n\}$. Kriging models this function as the realization of a Gaussian process
$$f(x) \sim \mathcal{GP} \left( m(x), k(x,x') \right)$$
where $m(x)$ is the mean function (typically set to zero or a constant), and $k(x,x')$ is the all-important kernel, or covariance function, which defines the similarity between points.

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.interpolate import CubicSpline
from scipy.stats import norm

random_seed = 19846
rng = np.random.default_rng(random_seed)

Let's use a simple example. Suppose we want to model the function $f(x) = \sin (\frac{1}{2}\pi x) - \sin (\pi x)$.

In [None]:
def signal_model(x):
    return np.sin(0.5*np.pi*x) - np.sin(np.pi*x)

xt = np.linspace(0, 5, 200)
f_x = signal_model(xt)

sns.set_style("whitegrid", {"font.family":"serif"})
fig, ax = plt.subplots()
sns.lineplot(x=xt, y=f_x, color="black", ax=ax)
ax.set_ylim(-3,3)
ax.set_xlabel('x')
ax.set_ylabel('y')
ax.set_title(r'$f(x) = \sin (\frac{1}{2}\pi x) - \sin (\pi x)$', fontsize=10);

Of course, we don't know the true function, we have only a collection of samples.

In [None]:
n_samples = 10
xt = np.linspace(0, 5, 200)
f_x = signal_model(xt)
x = rng.uniform(0, 5, n_samples)
y = signal_model(x)

fig, ax = plt.subplots()
sns.lineplot(x=xt, y=f_x, color='black', linestyle='--', alpha=0.25, ax=ax, label='Ground truth');
sns.scatterplot(x=x, y=y, color='red', ax=ax, zorder=3, label='Observations');
ax.legend(loc='upper right')
ax.set_ylim(-3,3)
ax.set_xlabel('x')
ax.set_ylabel('y')
ax.set_title(r'$f(x) = \sin (\frac{1}{2}\pi x) - \sin (\pi x)$', fontsize=10);

What's the best way to estimate the function based on this handful of observations? For kriging, a necessary step is picking a kernel with which to define the Gaussian process. The selection of a kernel, or covariance function, is key. Through the kernel we specify a covariance matrix that completely specifies the distribution of functions. (This is because Gaussian processes are completely defined by their second-order statistics. Define the covariance function, define the behavior.)

Selecting a kernel encodes prior knowledge. Different kernels induce different relationships between points: do you believe the function you're trying to model is smooth, noisy, linear, periodic, etc.? Poor kernel selection usually results in a poor estimate of the true function.

A common kernel is the squared exponential, or [radial basis function kernel](https://en.wikipedia.org/wiki/Radial_basis_function_kernel):
$$k(x, x') = \exp \left\{ -\frac{1}{2} (x - x')^2 \right\}$$

With the kernel selected we can sample functions from the Gaussian process.

In [None]:
def rbf_kernel(x1, x2=None, sigma_f=1, l=1):
    """Squared exponential kernel function."""
    if x2 is None:
        x2 = x1
    sq_dist = (x1 - x2.T)**2
    kernel = sigma_f * np.exp(-sq_dist / (2 * l**2))
    return kernel

n_instances = 25

K = rbf_kernel(np.expand_dims(xt,1), sigma_f=2, l=0.5) # Covariance matrix
m = np.zeros(200)

f_prior = rng.multivariate_normal(m, K, n_instances)
f_prior.shape

fig, ax = plt.subplots()
for i in range(n_instances):
    sns.lineplot(x=xt, y=f_prior[i,:], color='black', alpha=0.25, ax=ax);
ax.set_title('Sample Functions from Prior Distribution', fontsize=10)

Here are some other popoular kernels for comparison.

In [None]:
def matern_3over2_kernel(x1, x2=None, sigma_f=1, l=1):
    if x2 is None:
        x2 = x1
    dist = np.abs(x1 - x2.T)
    sqrt3 = np.sqrt(3)
    return sigma_f**2 * (1 + (sqrt3*dist)/l) * np.exp(-sqrt3*dist/l)

def brownian_kernel(x1, x2=None):
    if x2 is None:
        x2 = x1
    return np.minimum(x1, x2.T)

def linear_kernel(x1, x2=None, sigma_m=1, sigma_b=1, c=0):
    if x2 is None:
        x2 = x1
    return sigma_b**2 + sigma_m**2 * np.dot(x1 - c, x2.T - c)

K_matern = matern_3over2_kernel(np.expand_dims(xt,1))
K_brownian = brownian_kernel(np.expand_dims(xt,1))
K_linear = linear_kernel(np.expand_dims(xt,1), sigma_b=4)

f_matern = rng.multivariate_normal(m, K_matern, n_instances)
f_brownian = rng.multivariate_normal(m, K_brownian, n_instances)
f_linear = rng.multivariate_normal(m, K_linear, n_instances)

fig, axs = plt.subplots(2,2)
axs = axs.flatten()
for i in range(n_instances):
    sns.lineplot(x=xt, y=f_prior[i,:], color='black', alpha=0.2, ax=axs[0]);
    axs[0].set_ylim(-3,3)
    axs[0].set_title("RBF Kernel", fontsize=9)
for i in range(n_instances):
    sns.lineplot(x=xt, y=f_matern[i,:], color='black', alpha=0.2, ax=axs[1]);
    axs[0].set_ylim(-3,3)
    axs[1].set_title(r"Matérn Kernel", fontsize=9)
for i in range(n_instances):
    sns.lineplot(x=xt, y=f_brownian[i,:], color='black', alpha=0.2, ax=axs[2]);
    axs[0].set_ylim(-3,3)
    axs[2].set_title("Brownian Kernel", fontsize=9)
for i in range(n_instances):
    sns.lineplot(x=xt, y=f_linear[i,:], color='black', alpha=0.2, ax=axs[3]);
    axs[0].set_ylim(-3,3)
    axs[3].set_title("Linear Kernel", fontsize=9)

To make a prediction at a new point $x_*$, we compute
$$\hat{f}(x_*) = m(x_*) + k(x_*, X) \left[ K(X, X) + \sigma^2_n I \right]^{-1} (y - m(X))$$
$$\text{Var}\left( \hat{f}(x_*) \right) = k(x_*,x_*) - k(x_*,X) \left[ K(X,X) + \sigma^2_n I \right]^{-1} k(X, x)$$
where $X$ is the vector of observations $[x_1, x_2, \ldots, x_n]^\top$, $K$ the covariance matrix of observations as generated by the kernel $k$, and $\sigma^2_n$ the noise variance (we'll get to noise shortly).

Let's return to our example.

In [None]:
def rbf_kernel(x1, x2=None, sigma_f=1, l=1):
    # A more numerically stable implementation
    if x2 is None:
        x2 = x1
    if x1.ndim == 1:
        x1 = np.expand_dims(x1,1)
        x2 = np.expand_dims(x2,1)
    x1_norm = np.sum(x1**2, axis=1).reshape(-1, 1)
    x2_norm = np.sum(x2**2, axis=1).reshape(1, -1)
    sq_dist = x1_norm + x2_norm - 2.0 * np.dot(x1, x2.T)
    sq_dist = np.maximum(sq_dist, 0.0)
    kernel = sigma_f**2 * np.exp(-0.5 * sq_dist / l**2)
    return kernel

def gpr(x_train, y_train, x_test, kernel_func, sigma_n=0.0, **kernel_params):
    """Gaussian process regression"""
    K = kernel_func(x_train, x_train, **kernel_params)
    K = K + (sigma_n**2 + 1e-6) * np.eye(len(x_train))
    K_s = kernel_func(x_train, x_test, **kernel_params)
    K_ss = kernel_func(x_test, x_test, **kernel_params)

    try:
        # Try standard Cholesky approach first
        L = np.linalg.cholesky(K)
        alpha = np.linalg.solve(L.T, np.linalg.solve(L, y_train))
        mu = K_s.T @ alpha
        v = np.linalg.solve(L, K_s)
        sigma = np.sqrt(np.diag(K_ss - v.T @ v))
    except np.linalg.LinAlgError:
        # If Cholesky fails, use a slower but more stable approach
        print("Warning: Matrix not positive semi-definite, using more robust method")
        eigenvalues, eigenvectors = np.linalg.eigh(K)
        eigenvalues = np.maximum(eigenvalues, 1e-6)
        K_inv = eigenvectors @ np.diag(1.0/eigenvalues) @ eigenvectors.T
        mu = K_s.T @ K_inv @ y_train
        var = np.diag(K_ss - K_s.T @ K_inv @ K_s)
        sigma = np.sqrt(np.maximum(0, var))

    return mu, sigma

n_samples = 10
xt = np.linspace(0, 5, 200)
f_x = signal_model(xt)

mu, sigma = gpr(x, y, xt, rbf_kernel, sigma_f=2, l=0.5) # GPR prediction

x_sorted_idx = np.argsort(x)
x_sorted = x[x_sorted_idx]
y_sorted = y[x_sorted_idx]
cs = CubicSpline(x_sorted, y_sorted)
spline_pred = cs(xt) # Cubic spline prediction, for comparison

fig, ax = plt.subplots()
sns.lineplot(x=xt, y=f_x, color='black', linestyle='--', alpha=0.8, ax=ax, label=r'f(x)');
sns.scatterplot(x=x, y=y, color='red', ax=ax, label='Observations', zorder=3);
sns.lineplot(x=xt, y=mu, color='blue', ax=ax, alpha=0.8, label=r'GPR Prediction');
ax.fill_between(xt, mu - 2*sigma, mu + 2*sigma, color='blue', alpha=0.2, label='95% Confidence')
sns.lineplot(x=xt, y=spline_pred, color='orange', ax=ax, alpha=0.8, label='Spline Prediction')
ax.set_ylim(-3,3)
ax.set_title('Gaussian Process Regression', fontsize=10)
ax.set_xlabel('x')
ax.set_ylabel('y')
ax.legend(loc='upper right')

Pretty good for only observing 10 points, no? Not only does it improve on simpler interpolation methods, but it has a lovely representation of uncertainty. But we start to see the real magic when we add noise.

Suppose now instead we observe *noisy* samples of the function we're trying to model, $y = f(x) + \epsilon$, where $\epsilon \sim \mathcal{N}(0, \sigma_n^2)$.

In [None]:
n_samples = 20
sigma_n = 0.25 # Now with noise!
x = rng.uniform(0, 5, n_samples)
y = signal_model(x) + rng.normal(0.0, sigma_n, n_samples)

mu, sigma = gpr(x, y, xt, rbf_kernel, sigma_n=sigma_n, sigma_f=2, l=0.5)

x_sorted_idx = np.argsort(x)
x_sorted = x[x_sorted_idx]
y_sorted = y[x_sorted_idx]
cs = CubicSpline(x_sorted, y_sorted)
spline_pred = cs(xt)

fig, ax = plt.subplots()
sns.lineplot(x=xt, y=f_x, color='black', linestyle='--', alpha=0.8, ax=ax, label=r'f(x)');
sns.scatterplot(x=x, y=y, color='red', ax=ax, label='Noisy Observations', zorder=3);
sns.lineplot(x=xt, y=mu, color='blue', ax=ax, alpha=0.8, label=r'GPR Prediction');
ax.fill_between(xt, mu - 2*sigma, mu + 2*sigma, color='blue', alpha=0.2, label='95% Confidence')
sns.lineplot(x=xt, y=spline_pred, color='orange', ax=ax, alpha=0.8, label='Spline Prediction')
ax.set_ylim(-3,3)
ax.set_title('Gaussian Process Regression', fontsize=10)
ax.set_xlabel('x')
ax.set_ylabel('y')
ax.legend(loc='upper right')

And not only is kriging robust to noise, but those uncertainty estimates give us a way to hunt for the best *new* samples to improve our predictions.

In [None]:
def get_expected_improvement(mu, sigma, f_best, xi=0.1, epsilon=1e-6):
    """
    Expected Improvement acquisition function
    
    Args:
        mu: Mean predictions
        sigma: Standard deviation of predictions
        f_best: Best observed value so far
        xi: Exploration-exploitation trade-off parameter
        epsilon: Minimum sigma value for numerical stability
    """
    imp = mu - f_best - xi
    sigma = np.maximum(sigma, epsilon)
    Z = imp / sigma
    expected_imp = imp * norm.cdf(Z) + sigma * norm.pdf(Z)
    return expected_imp

def plot_gpr_step(x_train, y_train, iteration, sigma_n=0.0, n_iterations=10):
    """Helper function to plot Bayesian optimization process"""
    x_test = np.linspace(0, 5, 200)
    y_true = signal_model(x_test)
    mu, sigma = gpr(x_train, y_train, x_test, rbf_kernel, sigma_n=sigma_n, sigma_f=1.0, l=0.5)

    # Calculate expected improvement
    f_best = np.max(y_train)
    expected_imp = get_expected_improvement(mu, sigma, f_best)

    # Find point with maximum expected improvement
    next_idx = np.argmax(expected_imp)
    x_next = x_test[next_idx]
    if sigma_n > 0.0:
        y_next = y_true[next_idx] + rng.normal(0.0, sigma_n)
    else:
        y_next = y_true[next_idx]

    fig, ax = plt.subplots()
    sns.lineplot(x=x_test, y=y_true, color='black', linestyle='--', alpha=0.8, ax=ax, label=r'f(x)');
    ax.scatter(x=x_train, y=y_train, color='red', label='Observations', zorder=3);
    sns.lineplot(x=x_test, y=mu, color='blue', alpha=0.8, ax=ax, label=r'f(x)');
    ax.fill_between(x_test, mu - 2*sigma, mu + 2*sigma, color='blue', alpha=0.2, label='95% Confidence')
    if iteration < n_iterations:
        ax.scatter(x=x_next, y=y_next, color='red', edgecolor='black', linewidth=2, label='Next Sample', zorder=3);
    ax.legend(loc='upper right')
    ax.set_title(f"GPR - Iteration {iteration}/{n_iterations}", fontsize=10)
    plt.tight_layout()
    return fig, x_next

n_iterations = 10
sigma_n = 0.0
x_init = np.array([1.0]) # Initial observations
y_init = signal_model(x_init) + rng.normal(0.0, sigma_n)
x_obs = x_init.copy() # Observation set
y_obs = y_init.copy()
figures = []

for i in range(1, n_iterations+1):
    figs, x_next = plot_gpr_step(x_obs, y_obs, i, sigma_n=sigma_n, n_iterations=n_iterations)
    figures.append(fig)
    if i < n_iterations:
        y_next = signal_model(x_next)
        x_obs = np.concatenate((x_obs, [x_next]))
        y_obs = np.concatenate((y_obs, [y_next]))