In [None]:
import experiment_utils
from experiment_utils import config

config.experiment_name = "0001_cpu_stationary_2d"
config.target = "jmlr"
config.debug_mode = True

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import probnum as pn

import linpde_gp
from linpde_gp.typing import RandomProcessLike

plt.rcParams.update(config.tueplots_bundle())

## Problem Definition

In [None]:
import cpu

bvp = cpu.bvp_2D

## Plotting

In [None]:
%matplotlib inline

import matplotlib.axes
from mpl_toolkits.axes_grid1 import make_axes_locatable
import matplotlib.cm
import matplotlib.colors


class BeliefPlotter:
    def __init__(self, bvp: linpde_gp.problems.pde.BoundaryValueProblem):
        self._bvp = bvp

        self._plt_grid = self._bvp.domain.uniform_grid((100,50))

    def plot_geometry(
        self,
        ax: matplotlib.axes.Axes,
        q_dot_V: RandomProcessLike | None = None,
    ):
        cpu.plot_schematic(ax)

        if q_dot_V is not None:
            q_dot_V = linpde_gp.randprocs.asrandproc(q_dot_V)

            ax.imshow(
                q_dot_V.mean(self._plt_grid).T,
                cmap="coolwarm",
                norm=matplotlib.colors.TwoSlopeNorm(0.0),
                extent=[0.0, cpu.width, 0.0, cpu.height],
            )

    def plot_rhs(
        self,
        ax: matplotlib.axes.Axes,
        q_dot_V: RandomProcessLike,
    ):
        q_dot_V = linpde_gp.randprocs.asrandproc(q_dot_V)

        ax.plot_surface(
            self._plt_grid[..., 0],
            self._plt_grid[..., 1],
            q_dot_V.mean(self._plt_grid),
            cmap="coolwarm",
            norm=matplotlib.colors.TwoSlopeNorm(0.0),
        )

        cpu.adjust_xaxis(ax)
        cpu.adjust_yaxis(ax)
        cpu.adjust_q_dot_V_axis(ax.zaxis)

    def plot_rhs_heatmap(
        self,
        ax: matplotlib.axes.Axes,
        q_dot_V: RandomProcessLike,
        colorbar: bool = True,
    ):
        cpu.adjust_xaxis(ax)
        cpu.adjust_yaxis(ax)

        q_dot_V_im = ax.imshow(
            q_dot_V(self._plt_grid).T,
            cmap="coolwarm",
            norm=matplotlib.colors.TwoSlopeNorm(0.0),
            aspect="auto",
            extent=[0.0, cpu.width, 0.0, cpu.height],
        )

        if colorbar:
            q_dot_V_im_cm = _add_top_colorbar(ax, q_dot_V_im)
            cpu.adjust_q_dot_V_axis(q_dot_V_im_cm.xaxis)

    def plot_q_dot_V_A(
        self,
        ax: matplotlib.axes.Axes,
        q_dot_V: RandomProcessLike,
        q_dot_A_north: RandomProcessLike,
        q_dot_A_south: RandomProcessLike,
        q_dot_A_east: RandomProcessLike,
        q_dot_A_west: RandomProcessLike,
    ):
        pass

    def plot_belief_3D(
        self,
        ax: matplotlib.axes.Axes,
        u: pn.randprocs.GaussianProcess,
        X_pde: np.ndarray | None = None,
        X_nbc_NESW: tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray] | None = None,
        X_dts: np.ndarray | None = None,
        u_dts: np.ndarray | None = None,
        u_dts_noise: pn.randvars.RandomVariable | None = None,
    ):
        u.plot(
            ax,
            self._plt_grid,
            cmap="inferno",
            slice_xs=self._plt_grid[:, 0, 0],
            slice_ys=cpu.core_centers_ys[1],
            slice_color="#AAAAAA",
            slice_alpha=0.5,
            slice_padding=0.5,
            slice_cred_int_color="C0",
            slice_cred_int_alpha=0.5,
            slice_num_samples=7,
            slice_samples_rng=np.random.default_rng(3456),
            slice_samples_linewidth=1.0,
            # edgecolor="black",
            # linewidth=0.1,
            antialiased=False,
        )

        cpu.adjust_xaxis(ax)
        cpu.adjust_yaxis(ax)
        cpu.adjust_tempaxis(ax.zaxis)

        if X_pde is not None:
            ax.scatter(
                X_pde[..., 0],
                X_pde[..., 1],
                zs=ax.get_zlim()[0],
                zdir="z",
                marker=".",
                color="C3",
                label=r"$\bm{X}_\text{PDE}$",
            )

        if X_nbc_NESW is not None:
            for idx, X_nbc_dir in enumerate(X_nbc_NESW):
                ax.scatter(
                    X_nbc_dir[..., 0],
                    X_nbc_dir[..., 1],
                    zs=ax.get_zlim()[0],
                    zdir="z",
                    marker=".",
                    color="C5",
                    label=r"$\bm{X}_\text{NBC}$" if idx == 0 else None,
                )

        if X_dts is not None:
            X_dts = np.asarray(X_dts).reshape((-1, 2))
            u_dts = np.asarray(u_dts).reshape(-1)
            u_dts_std = u_dts_noise.std.reshape((-1))

            ax.scatter(
                X_dts[:, 0],
                X_dts[:, 1],
                zs=ax.get_zlim()[0],
                zdir="z",
                marker=".",
                color="C4",
            )

            xs_above = []
            ys_above = []
            zs_above = []
            zerrs_above = [[], []]

            xs_below = []
            ys_below = []
            zs_below = []
            zerrs_below = [[], []]

            for xy, z, mean_xy, zstd in zip(
                X_dts,
                u_dts,
                u.mean(X_dts),
                u_dts_std,
            ):
                cred_int = 1.96 * zstd

                if z >= mean_xy:
                    xs_above.append(xy[0])
                    ys_above.append(xy[1])
                    zs_above.append(z)

                    zerrs_above[0].append(min(zstd, z - mean_xy))
                    zerrs_above[1].append(zstd)
                elif z + cred_int >= mean_xy:
                    xs_below.append(xy[0])
                    ys_below.append(xy[1])
                    zs_below.append(mean_xy)

                    zerrs_below[0].append(0.0)
                    zerrs_below[1].append(zstd - (mean_xy - z))

            ax.errorbar(
                xs_above,
                ys_above,
                zs_above,
                zerr=zerrs_above,
                fmt=".",
                color="C4",
                label=r"$(\bm{X}_\text{DTS}, \bm{y}_\text{DTS})$"
            )

            ax.errorbar(
                xs_below,
                ys_below,
                zs_below,
                zerr=zerrs_below,
                fmt=".",
                color="C4",
                markersize=0,
            )

    def plot_pred_belief_3D(
        self,
        ax: matplotlib.axes.Axes,
        u: pn.randprocs.GaussianProcess,
    ):
        # Differential Operator Image Belief
        Du = self._bvp.pde.diffop(u)

        ax.plot_surface(
            self._plt_grid[..., 0],
            self._plt_grid[..., 1],
            Du.mean(self._plt_grid),
            cmap="coolwarm",
            norm=matplotlib.colors.TwoSlopeNorm(0.0),
        )

        cpu.adjust_xaxis(ax)
        cpu.adjust_yaxis(ax)
        cpu.adjust_q_dot_V_axis(ax.zaxis)

    def plot_belief_heatmap(
        self,
        axs: np.ndarray,
        u: pn.randprocs.GaussianProcess,
    ):
        u_mean_im = axs[0].imshow(
            u.mean(self._plt_grid).T,
            cmap="inferno",
            extent=[0.0, cpu.width, 0.0, cpu.height],
        )

        u_mean_im_cm = _add_top_colorbar(axs[0], u_mean_im)

        cpu.adjust_xaxis(axs[0])
        cpu.adjust_yaxis(axs[0])
        cpu.adjust_tempaxis(u_mean_im_cm.xaxis)

        u_cred_im = axs[1].imshow(
            1.96 * u.std(self._plt_grid).T,
            cmap="inferno",
            extent=[0.0, cpu.width, 0.0, cpu.height],
        )

        u_cred_im_cm = _add_top_colorbar(axs[1], u_cred_im)

        cpu.adjust_xaxis(axs[1])
        cpu.adjust_yaxis(axs[1])
        cpu.adjust_tempaxis(u_cred_im_cm.xaxis)

    def plot_pred_belief_heatmap(
        self,
        axs: np.ndarray,
        u: pn.randprocs.GaussianProcess,
    ):
        # Differential Operator Image Belief
        Du = self._bvp.pde.diffop(u)

        Du_mean_im = axs[0].imshow(
            Du.mean(self._plt_grid).T,
            cmap="coolwarm",
            norm=matplotlib.colors.TwoSlopeNorm(0.0),
            extent=[0.0, cpu.width, 0.0, cpu.height],
        )

        Du_mean_im_cm = _add_top_colorbar(axs[0], Du_mean_im)

        cpu.adjust_xaxis(axs[0])
        cpu.adjust_yaxis(axs[0])
        cpu.adjust_q_dot_V_axis(Du_mean_im_cm.xaxis)

        Du_cred_im = axs[1].imshow(
            1.96 * Du.std(self._plt_grid).T,
            cmap="inferno",
            extent=[0.0, cpu.width, 0.0, cpu.height],
        )

        Du_cred_im_cm = _add_top_colorbar(axs[1], Du_cred_im)

        cpu.adjust_xaxis(axs[1])
        cpu.adjust_yaxis(axs[1])
        cpu.adjust_q_dot_V_axis(Du_cred_im_cm.xaxis)

def _add_top_colorbar(
    ax: matplotlib.axes.Axes,
    mappable: matplotlib.cm.ScalarMappable,
) -> matplotlib.axes.Axes:
    cax = make_axes_locatable(ax).append_axes(
        "top",
        size="10%",
        pad=0.1,
    )

    plt.colorbar(
        mappable,
        cax=cax,
        orientation="horizontal",
    )

    # This must be placed after the colorbar has been drawn
    cax.xaxis.tick_top()
    cax.xaxis.set_label_position("top")

    return cax

plotter = BeliefPlotter(bvp)

## Simplified Model

## Prior

In [None]:
# Solution
u = pn.randprocs.GaussianProcess(
    mean=linpde_gp.functions.Constant(input_shape=(2,), value=59.0),
    cov=3.0 ** 2 * linpde_gp.randprocs.covfuncs.TensorProduct(
        linpde_gp.randprocs.covfuncs.Matern((), nu=2.5, lengthscales=cpu.width),
        linpde_gp.randprocs.covfuncs.Matern((), nu=2.5, lengthscales=cpu.height),
    ),
)

# Volumetric (Interior) Heat Sources and Sinks
q_dot_V = linpde_gp.randprocs.asrandproc(bvp.pde.rhs)

# Boundary Heat Flux
q_dot_A = linpde_gp.randprocs.asrandproc(cpu.q_dot_A_2D)

### Visualize Problem Geometry

In [None]:
plotter.plot_geometry(
    ax=plt.gca(),
    q_dot_V=q_dot_V,
)

In [None]:
fig, ax = plt.subplots(subplot_kw={"projection": "3d"})
plotter.plot_rhs(
    ax,
    q_dot_V=q_dot_V,
)

In [None]:
fig, ax = plt.subplots(
    subplot_kw={
        "projection": "3d",
        "computed_zorder": False,
    }
)

plotter.plot_belief_3D(
    ax,
    u=u,
)

In [None]:
fig, ax = plt.subplots(
    subplot_kw={
        "projection": "3d",
        "computed_zorder": False,
    }
)

plotter.plot_pred_belief_3D(
    ax,
    u=u,
)

In [None]:
fig, axs = plt.subplots(ncols=2)

plotter.plot_belief_heatmap(
    axs,
    u=u,
)

In [None]:
fig, axs = plt.subplots(ncols=2)

plotter.plot_pred_belief_heatmap(
    axs,
    u=u,
)

## Conditioning on the PDE

In [None]:
N_pde = 15

X_pde = bvp.domain.uniform_grid(
    (N_pde, N_pde),
    inset=(0.03 * cpu.width, 0.03 * cpu.height)
)

In [None]:
u_cond_pde = u.condition_on_observations(
    Y=np.zeros_like(X_pde, shape=X_pde.shape[:-1]),
    L=bvp.pde.diffop,
    X=X_pde,
    b=-q_dot_V(X_pde),
)

In [None]:
fig, ax = plt.subplots(
    subplot_kw={
        "projection": "3d",
        "computed_zorder": False,
    }
)

plotter.plot_belief_3D(
    ax,
    u=u_cond_pde,
    X_pde=X_pde,
)

In [None]:
fig, ax = plt.subplots(
    subplot_kw={
        "projection": "3d",
        "computed_zorder": False,
    }
)

plotter.plot_pred_belief_3D(
    ax,
    u=u_cond_pde,
)

In [None]:
fig, axs = plt.subplots(ncols=2)

plotter.plot_belief_heatmap(
    axs,
    u=u_cond_pde,
)

In [None]:
fig, axs = plt.subplots(ncols=2)

plotter.plot_pred_belief_heatmap(
    axs,
    u=u_cond_pde,
)

## Conditioning on Neumann Boundary Conditions

In [None]:
N_nbc = (10, 8)

In [None]:
# North Boundary
X_nbc_north = bvp.domain.boundary[3].uniform_grid(N_nbc[0])

u_cond_pde_nbc = u_cond_pde.condition_on_observations(
    Y=np.zeros_like(X_nbc_north[..., 0]),
    X=X_nbc_north,
    L=-cpu.kappa * linpde_gp.linfuncops.diffops.DirectionalDerivative([0.0, -1.0]),
    b=-q_dot_A(X_nbc_north[..., 0])
)

In [None]:
# East Boundary
X_nbc_east = bvp.domain.boundary[1].uniform_grid(N_nbc[1])

u_cond_pde_nbc = u_cond_pde_nbc.condition_on_observations(
    Y=np.zeros_like(X_nbc_east[..., 1]),
    X=X_nbc_east,
    L=-cpu.kappa * linpde_gp.linfuncops.diffops.DirectionalDerivative([-1.0, 0.0]),
    b=-q_dot_A(cpu.width + X_nbc_east[..., 1])
)

In [None]:
# South Boundary
X_nbc_south = bvp.domain.boundary[2].uniform_grid(N_nbc[0])

u_cond_pde_nbc = u_cond_pde_nbc.condition_on_observations(
    Y=np.zeros_like(X_nbc_south[..., 0]),
    X=X_nbc_south,
    L=-cpu.kappa * linpde_gp.linfuncops.diffops.DirectionalDerivative([0.0, 1.0]),
    b=-q_dot_A(cpu.width + cpu.height + X_nbc_south[..., 0])
)

In [None]:
# West Boundary
X_nbc_west = bvp.domain.boundary[0].uniform_grid(N_nbc[1])

u_cond_pde_nbc = u_cond_pde_nbc.condition_on_observations(
    Y=np.zeros_like(X_nbc_west[..., 1]),
    X=X_nbc_west,
    L=-cpu.kappa * linpde_gp.linfuncops.diffops.DirectionalDerivative([-1.0, 0.0]),
    b=-q_dot_A(cpu.width + cpu.height + cpu.width + X_nbc_west[..., 1])
)

In [None]:
fig, ax = plt.subplots(
    subplot_kw={
        "projection": "3d",
        "computed_zorder": False,
    }
)

plotter.plot_belief_3D(
    ax,
    u=u_cond_pde_nbc,
    X_pde=X_pde,
    X_nbc_NESW=(X_nbc_north, X_nbc_east, X_nbc_south, X_nbc_west),
)

In [None]:
fig, axs = plt.subplots(ncols=2, sharey=True)

plotter.plot_belief_heatmap(
    axs,
    u=u_cond_pde_nbc,
)

## Conditioning on Measurements

In [None]:
y_dts = cpu.y_dts_2D
noise_dts = cpu.noise_dts_2D

In [None]:
u_cond_pde_nbc_dts = u_cond_pde_nbc.condition_on_observations(
    y_dts,
    X=cpu.core_centers,
    b=noise_dts,
)

In [None]:
with plt.rc_context(config.tueplots_bundle(nrows=2, rel_width=0.5)):
    fig, ax = plt.subplots(
        subplot_kw={
            "projection": "3d",
            "computed_zorder": False,
        }
    )

    plotter.plot_belief_3D(
        ax,
        u=u_cond_pde_nbc_dts,
        X_pde=X_pde,
        X_nbc_NESW=(X_nbc_north, X_nbc_east, X_nbc_south, X_nbc_west),
        X_dts=cpu.core_centers,
        u_dts=y_dts,
        u_dts_noise=noise_dts,
    )

    fig.legend(loc="lower center", ncol=3, bbox_to_anchor=(0.55, -0.01))
    # fig.legend(loc="lower left", bbox_to_anchor=(0.95, -0.05))

    experiment_utils.savefig("01_simplified_03_u_cond_pde_nbc_dts_3d")

In [None]:
fig, axs = plt.subplots(ncols=2, sharey=True)

plotter.plot_belief_heatmap(
    axs,
    u=u_cond_pde_nbc_dts,
)