### Dependencies

You can run the following cell to install all the libraries we'll be using throughout this notebook.

In [None]:
%pip install numpy sympy scipy pandas plotly ipywidgets scikit-learn tqdm

# Controlling an Overheating Motor with Gaussian Process Regression

## 1. Introduction to the Motor

In this assignment, we will implement a bang-bang controller to regulate the activity of a motor, such that it works as fast as possible without overheating. As usual,
before we go into the details of the controller, let's first understand the motor and how it operates.

### 1.1 Motor Dynamics

The motor we are working with is a simple DC motor. The motor is controlled by an electric current $u(t)$, which determines the angular velocity of the motor $\omega(t)$. 
The motor is subject to a load torque $T_L(t)$, which is the some kind of external resistance applied to the motor. The motor dynamics are described by the following 
differential equation:

$$
J \frac{d\omega(t)}{dt} = K_T u(t) - T_L(t) - B \omega(t),
$$

where:
- $J$ is the moment of inertia of the motor,
- $B$ is the friction coefficient of the motor, and
- $K_T$ is the torque constant of the motor.

#### Exercise 1.1.1

Implement the `MotorDynamics` interface that has a method `__call__(t)`, which returns the angular velocity of a motor at time `t`. 
The angular velocity should be such that it satisfies the differential equation above. An implementation of the `MotorDynamics` interface will
need the following parameters:
- `J`: the moment of inertia $J$,
- `B`: the friction coefficient $B$,
- `K_T`: the torque constant $K_T$,
- `u`: a function that takes a time `t` and returns the current $u(t)$,
- `T_L`: a function that takes a time `t` and returns the current $T_L(t)$, and
- `omega_0`: the initial angular velocity of the motor.

Once this is done, the `animate` method on the `Motor` class will animate the motor's state over time.

<details>
    <summary> Hint (a big one!) </summary>

> #### Solving with the Euler Method
>
> You can solve the differential equation above in many ways, but a simple numerical way to do it is by using the **Euler method**. The idea is the following:
> 1. **Discretize** the continuous time differential equation into a **discrete time difference equation**. In this case, we can assume a zero-order hold on $\omega(t)$.
>     $$
>     \frac{d\omega(t)}{dt} \approx \frac{\omega[k+1] - \omega[k]}{\Delta t}, \quad \text{where } \Delta t \text{ is the time step.}
>     $$
> 2. **Substitute** it into the differential equation to get the difference equation:
>     $$
>     \omega[k+1] = \omega[k] + \frac{\Delta t}{J} (K_T u[k] - T_L[k] - B \omega[k]), \quad \text{for } k = 0, 1, 2, \ldots
>     $$
> 3. Implement the difference equation in the `__call__` method. It would make sense to **store the current angular velocity** as an attribute,
>    so that you can use it to calculate the next angular velocity efficiently.  
> &nbsp;

</details>

<details>
    <summary> Hint (another one!) </summary>

> #### Solving with Integrating Factors
>
> You can solve the above differential equation analytically by using the method of integrating factors. The result will be the following integral:
> $$
> \omega(t) = e^{-at} \left( \omega_0 + \frac{1}{J} \int_0^t e^{a\tau} g(\tau) d\tau \right), \quad \text{where } a = \frac{B}{J}, \, g(\tau) = K_T u(\tau) - T_L(\tau).
> $$
>
> Although, for performance reasons, you can use the following version of the equation:
> $$
> \omega(t) = e^{-at} \left( \omega_0 + \frac{1}{J} \left[\int_0^{t'} e^{a\tau} g(\tau) d\tau + \int_{t'}^t e^{a\tau} g(\tau) d\tau\right] \right), \quad \text{where } t' \text{ is the last time step for which } \omega(t') \text{ is known}.
> $$
>
> Reuse the result of the integral and this version will be faster, since the simulation will need to calculate the angular acceleration for successive values of $t$. 
> You can use `scipy.integrate.quad` to calculate the value of the integral.  
>
> This method should generally be more accurate than the Euler method, but it really depends on how you perform the numerical integration.  
> &nbsp;

</details>

In [2]:
from typing import Protocol


class MotorDynamics(Protocol):
    def __call__(self, t: float, /) -> float:
        """Return the angular acceleration at time t."""
        ...

    def reset(self) -> None:
        """Reset the motor dynamics."""
        ...


class TemperatureModel(Protocol):
    def __call__(self, t: float, /) -> float:
        """Return the temperature of the motor winding at time t."""
        ...

    def reset(self) -> None:
        """Reset the temperature model."""
        ...


class ControlSignal(Protocol):
    def __call__(self, t: float, /) -> float:
        """Return the control signal at time t."""
        ...


class Load(Protocol):
    def __call__(self, t: float, /) -> float:
        """Return the load at time t."""
        ...


# The following are implementations of the above protocols, although not the best ones. Nevertheless, they still come in handy.
# Go further to see where you need to implement the protocols.


class NoMotorDynamics:
    def __call__(self, t: float) -> float:
        return 0.0

    def reset(self) -> None:
        pass


class NoTemperatureModel:
    def __call__(self, t: float) -> float:
        return 0.0

    def reset(self) -> None:
        pass


class NoControlSignal:
    def __call__(self, t: float) -> float:
        return 0.0


class NoLoad:
    def __call__(self, t: float) -> float:
        return 0.0

In [None]:
from typing import Final, Callable
from dataclasses import dataclass, KW_ONLY, field

from scipy.integrate import quad

import numpy as np


# {% if not exercise['1.1.1'].solution %}
# You can create an implementation of the `MotorDynamics` protocol like this:


@dataclass
class EulerMotorDynamics:  # {# type: ignore #}
    _: KW_ONLY  # Keyword-only arguments are good, so you don't mix up your arguments
    J: Final[float]
    B: Final[float]
    K_T: Final[float]
    u: Final[ControlSignal]
    T_L: Final[Load]
    omega_0: Final[float] = 0.0

    # You can add more attributes here if you need to store additional state.

    def __call__(self, t: float) -> float:
        # TODO: Implement me!
        raise NotImplementedError("Not implemented!")

    # More methods would go here if you need them.

    def reset(self) -> None:
        # TODO: If you do use mutable state, you should reset it here.
        pass


# {% else %}
# TODO: Review me!


@dataclass
class EulerMotorDynamics:
    _: KW_ONLY  # Keyword-only arguments are good, so you don't mix up your arguments
    J: Final[float]
    B: Final[float]
    K_T: Final[float]
    u: Final[ControlSignal]
    T_L: Final[Load]
    omega_0: Final[float] = 0.0

    t_last: float = field(init=False)
    omega_last: float = field(init=False)

    def __call__(self, t: float) -> float:
        assert (
            t >= self.t_last
        ), "The specified time point must be greater than or equal to the last one."

        dt = t - self.t_last
        omega = self.omega_last + dt * self.current_angular_acceleration()

        # Don't forget to update the t and omega to be the most recent values
        self.t_last = t
        self.omega_last = omega

        return omega

    def current_angular_acceleration(self) -> float:
        t_last, omega_last = self.t_last, self.omega_last

        return (
            self.u(t_last) * self.K_T - self.T_L(t_last) - self.B * omega_last
        ) / self.J

    def reset(self) -> None:
        self.t_last = 0
        self.omega_last = self.omega_0


# The following solves the same problem using the integrating factors approach.
# The generic solver is implemented separately, since it also comes in handy later on.


@dataclass
class IntegratingFactorsSolver:
    _: KW_ONLY
    a: Final[float]
    g: Final[Callable[[float], float]]
    y_0: Final[float]

    t_last: float = field(init=False)
    y_last: float = field(init=False)

    def __call__(self, t: float) -> float:
        assert (
            t >= self.t_last
        ), "The specified time point must be greater than or equal to the last one."

        integral = (
            self.integral_last
            + quad(lambda tau: np.exp(self.a * tau) * self.g(tau), self.t_last, t)[0]
        )
        y = np.exp(-self.a * t) * (self.y_0 + integral)

        # Don't forget to update the t and omega to be the most recent values
        self.t_last = t
        self.integral_last = integral

        return y

    def reset(self) -> None:
        self.t_last = 0
        self.integral_last = 0


class IntegratingMotorDynamics:
    solver: Final[IntegratingFactorsSolver]

    def __init__(
        self,
        *,
        J: float,
        B: float,
        K_T: float,
        u: ControlSignal,
        T_L: Load,
        omega_0: float = 0.0,
    ) -> None:
        self.solver = IntegratingFactorsSolver(
            a=B / J,
            g=lambda t: (K_T * u(t) - T_L(t)) / J,
            y_0=omega_0,
        )

    def __call__(self, t: float) -> float:
        return self.solver(t)

    def reset(self) -> None:
        self.solver.reset()


# {% endif %}

Once you implement your motor dynamics above, make sure to create it in the function below.

In [None]:
def motor_dynamics(
    *, J: float, B: float, K_T: float, u: ControlSignal, T_L: Load, omega_0: float = 0.0
) -> MotorDynamics:
    # {% if exercise['1.1.1'].solution %}
    # TODO: Review me!
    return IntegratingMotorDynamics(J=J, B=B, K_T=K_T, u=u, T_L=T_L, omega_0=omega_0)
    # {% else %}
    # TODO: Implement me!
    raise NotImplementedError("Not implemented!")
    # {% endif %}

In [62]:
from typing import Sequence, TypeAlias, ClassVar

import pandas as pd

from numpy.typing import NDArray
from plotly.subplots import make_subplots
from plotly.graph_objects import Figure, Frame, Scatter

Vector: TypeAlias = NDArray[np.floating]


def padded_range_for(
    data: Vector | tuple[float, float], padding: float = 0.1
) -> tuple[float, float]:
    if len(data) == 0:
        lower, upper = 0, 1
    else:
        lower, upper = min(data), max(data)
        lower, upper = min(0, lower), max(1, upper)

    delta = upper - lower

    return lower - padding * delta, upper + padding * delta


def draw_motor_housing() -> Scatter:
    theta = np.linspace(0, 2 * np.pi, 100)

    return Scatter(
        x=np.cos(theta),
        y=np.sin(theta),
        mode="lines",
        line=dict(color="blue"),
        name="Motor Housing",
        legendgroup="Motor",
    )


@dataclass(frozen=True)
class TemperatureThresholds:
    upper: float
    lower: float


@dataclass(frozen=True)
class SimulationResults:
    t_f: float
    _: KW_ONLY
    t: Vector
    theta: Vector
    omega: Vector
    T: Vector
    T_predicted: Vector
    u: Vector
    T_L: Vector

    t_range: tuple[float, float]
    omega_range: tuple[float, float]
    T_range: tuple[float, float]
    u_range: tuple[float, float]
    T_L_range: tuple[float, float]

    show_predicted: bool
    thresholds: TemperatureThresholds | None

    MOTOR_HOUSING_TRACE: ClassVar[Scatter] = draw_motor_housing()

    @staticmethod
    def create(
        t_f: float,
        *,
        t: Vector,
        theta: Vector,
        omega: Vector,
        T: Vector,
        T_predicted: Vector,
        u: Vector,
        T_L: Vector,
        thresholds: TemperatureThresholds | None,
    ) -> "SimulationResults":
        return SimulationResults(
            t_f,
            t=t,
            theta=theta,
            omega=omega,
            T=T,
            T_predicted=T_predicted,
            u=u,
            T_L=T_L,
            t_range=padded_range_for((0, t_f)),
            omega_range=padded_range_for(omega),
            T_range=padded_range_for(T),
            u_range=padded_range_for(u),
            T_L_range=padded_range_for(T_L),
            show_predicted=bool(np.any(T_predicted)),
            thresholds=thresholds,
        )

    def until(self, step: int) -> "SimulationResults":
        return SimulationResults(
            self.t_f,
            t=self.t[:step],
            theta=self.theta[:step],
            omega=self.omega[:step],
            T=self.T[:step],
            T_predicted=self.T_predicted[:step],
            u=self.u[:step],
            T_L=self.T_L[:step],
            t_range=self.t_range,
            omega_range=self.omega_range,
            T_range=self.T_range,
            u_range=self.u_range,
            T_L_range=self.T_L_range,
            show_predicted=self.show_predicted,
            thresholds=self.thresholds,
        )

    def draw(self) -> Figure:
        figure = self.create_layout()

        self.draw_motor_on(figure)
        self.draw_angular_velocity_on(figure)
        self.draw_temperature_on(figure)
        self.draw_control_signal_on(figure)
        self.draw_load_on(figure)

        return figure

    def save_measurements_to(self, path: str, /, *, noise: float = 0.1) -> None:
        # We save the temperature measurements to a CSV file and simulate gaussian noise
        # with a standard deviation of `noise`. We also add some other random data to make it
        # more confusing, just line in real life.

        # Set seed for reproducibility
        np.random.seed(42)

        pd.DataFrame(
            {
                "Motor Housing Vibration Amplitude (mm)": np.abs(
                    np.random.normal(0, 0.5, len(self.t))
                )
                + np.abs(np.sin(self.t)),
                "Motor Winding Temperature (C)": self.T
                + np.random.normal(0, noise, len(self.T)),
                "Chroniton Displacement Current (A)": np.random.normal(
                    0, 0.1, len(self.t)
                ),
                "Elapsed Time (s)": self.t,
            }
        ).to_csv(path, index=False)

    def __repr__(self) -> str:
        return ""

    @property
    def theta_f(self) -> float:
        return self.theta[-1]

    def create_layout(self) -> Figure:
        return make_subplots(
            rows=2,
            cols=3,
            column_widths=[0.4, 0.3, 0.3],
            row_heights=[0.5, 0.5],
            horizontal_spacing=0.1,
            subplot_titles=(
                "Motor State",
                "Angular Velocity",
                "Temperature",
                "Control Signal",
                "Load",
            ),
            specs=[
                [{"rowspan": 2, "type": "xy"}, {"type": "xy"}, {"type": "xy"}],
                [None, {"type": "xy"}, {"type": "xy"}],
            ],
        ).update_layout(height=750)

    def draw_motor_on(self, figure: Figure) -> None:
        figure.add_trace(self.MOTOR_HOUSING_TRACE, row=1, col=1)
        figure.add_trace(
            Scatter(
                x=[0, np.cos(self.theta_f)],
                y=[0, np.sin(self.theta_f)],
                mode="lines+markers",
                marker=dict(size=10),
                line=dict(width=2, color="red"),
                name="Motor Indicator",
                legendgroup="Motor",
            ),
            row=1,
            col=1,
        )

        figure.update_xaxes(range=[-1.5, 1.5], row=1, col=1)

        # Ensure the aspect ratio is square
        figure.update_layout(
            xaxis=dict(scaleanchor="y", scaleratio=1),
            yaxis=dict(scaleanchor="x", scaleratio=1),
        )

    def draw_angular_velocity_on(self, figure: Figure) -> None:
        figure.add_trace(
            Scatter(x=self.t, y=self.omega, mode="lines", name="Angular Velocity"),
            row=1,
            col=2,
        )

        figure.update_xaxes(title_text="Time (s)", row=1, col=2, range=self.t_range)
        figure.update_yaxes(
            title_text="Angular Velocity (rad/s)", row=1, col=2, range=self.omega_range
        )

    def draw_temperature_on(self, figure: Figure) -> None:
        figure.add_trace(
            Scatter(x=self.t, y=self.T, mode="lines", name="Temperature"),
            row=1,
            col=3,
        )

        if self.show_predicted:
            figure.add_trace(
                Scatter(
                    x=self.t,
                    y=self.T_predicted,
                    mode="lines",
                    name="Predicted Temperature",
                    line=dict(dash="dash"),
                ),
                row=1,
                col=3,
            )

        if self.thresholds is not None:
            figure.add_hline(
                y=self.thresholds.upper,
                line=dict(color="red", width=1, dash="dash"),
                annotation_text="Overheating Threshold",
                annotation_position="top right",
                annotation_font=dict(size=8),
                row=1,
                col=3,
            )

            figure.add_hline(
                y=self.thresholds.lower,
                line=dict(color="green", width=1, dash="dash"),
                annotation_text="Cooling Threshold",
                annotation_position="bottom right",
                annotation_font=dict(size=8),
                row=1,
                col=3,
            )

        figure.update_xaxes(title_text="Time (s)", row=1, col=3, range=self.t_range)
        figure.update_yaxes(
            title_text="Temperature (°C)", row=1, col=3, range=self.T_range
        )

    def draw_control_signal_on(self, figure: Figure) -> None:
        figure.add_trace(
            Scatter(x=self.t, y=self.u, mode="lines", name="Control Signal"),
            row=2,
            col=2,
        )

        figure.update_xaxes(title_text="Time (s)", row=2, col=2, range=self.t_range)
        figure.update_yaxes(
            title_text="Control Signal (A)", row=2, col=2, range=self.u_range
        )

    def draw_load_on(self, figure: Figure) -> None:
        figure.add_trace(
            Scatter(x=self.t, y=self.T_L, mode="lines", name="Load"), row=2, col=3
        )

        figure.update_xaxes(title_text="Time (s)", row=2, col=3, range=self.t_range)
        figure.update_yaxes(title_text="Load (N)", row=2, col=3, range=self.T_L_range)

In [5]:
from typing import Any
from dataclasses import replace

from tqdm import tqdm


@dataclass(frozen=True)
class Motor:
    name: str
    _: KW_ONLY
    omega: MotorDynamics = NoMotorDynamics()
    T: TemperatureModel = NoTemperatureModel()
    u: ControlSignal = NoControlSignal()
    T_L: Load = NoLoad()

    T_predicted: TemperatureModel = NoTemperatureModel()

    def with_dynamics(self, omega: MotorDynamics) -> "Motor":
        return replace(self, omega=omega)

    def with_temperature(self, T: TemperatureModel) -> "Motor":
        return replace(self, T=T)

    def with_control_signal(self, u: ControlSignal) -> "Motor":
        return replace(self, u=u)

    def with_load(self, T_L: Load) -> "Motor":
        return replace(self, T_L=T_L)

    def with_predicted_temperature(self, T_predicted: TemperatureModel) -> "Motor":
        return replace(self, T_predicted=T_predicted)

    def animate(
        self,
        *,
        t_f: float,
        steps: int = 100,
        thresholds: TemperatureThresholds | None = None,
    ) -> SimulationResults:
        results = self.simulate(t_f=t_f, steps=steps, thresholds=thresholds)
        frames = self.create_frames(results, steps)
        starting_frame = frames[0]

        Figure(
            data=starting_frame.data,
            layout=starting_frame.layout,
            frames=[Frame(data=frame.data) for frame in frames],
        ).update_layout(
            title=f"Animated {self.name}",
            updatemenus=[self._animation_configuration()],
        ).show()

        return results

    def simulate(
        self, *, t_f: float, steps: int, thresholds: TemperatureThresholds | None = None
    ) -> SimulationResults:
        self.omega.reset()
        self.T.reset()
        self.T_predicted.reset()

        t = np.linspace(0, t_f, steps)
        theta = np.zeros(steps)
        omega = np.zeros(steps)
        T = np.zeros(steps)
        u = np.zeros(steps)
        T_L = np.zeros(steps)
        T_predicted = np.zeros(steps)

        # We do the first step manually
        omega[0] = self.omega(0)
        T[0] = self.T(0)
        u[0] = self.u(0)
        T_L[0] = self.T_L(0)
        T_predicted[0] = self.T_predicted(0)

        for i, t_i in tqdm(
            enumerate(t[1:], start=1),
            desc=f"Simulating {self.name}",
            total=steps,
            initial=1,
            unit=" time step",
        ):
            dt = t_i - t[i - 1]
            theta[i] = theta[i - 1] + omega[i - 1] * dt
            omega[i] = self.omega(t_i)
            T[i] = self.T(t_i)
            u[i] = self.u(t_i)
            T_L[i] = self.T_L(t_i)
            T_predicted[i] = self.T_predicted(t_i)

        return SimulationResults.create(
            t_f,
            t=t,
            omega=omega,
            T=T,
            T_predicted=T_predicted,
            u=u,
            T_L=T_L,
            theta=theta,
            thresholds=thresholds,
        )

    def create_frames(self, results: SimulationResults, steps: int) -> list[Figure]:
        return [results.until(i).draw() for i in range(1, steps + 1)]

    @staticmethod
    def _animation_configuration() -> dict[str, Any]:
        return dict(
            type="buttons",
            buttons=[
                dict(
                    label="Play",
                    method="animate",
                    args=[
                        None,
                        {
                            "frame": {"duration": 10, "redraw": False},
                            "fromcurrent": True,
                            "transition": {"duration": 0},
                        },
                    ],
                )
            ],
        )

Let's check out what the motor looks like when it's running.

In [None]:
# No control signal, no load
u: ControlSignal = lambda _: 0.0
T_L: Load = lambda _: 0.0

morton = (
    Motor(name="Morton")
    .with_control_signal(u)
    .with_load(T_L)
    .with_dynamics(motor_dynamics(J=1.0, B=0.1, K_T=1.0, u=u, T_L=T_L))
)

morton.animate(t_f=10.0, steps=100)

In [None]:
# Constant control signal, but no load
u: ControlSignal = lambda _: 1.0
T_L: Load = lambda _: 0.0

motorola = (
    Motor("Motorola")
    .with_control_signal(u)
    .with_load(T_L)
    .with_dynamics(motor_dynamics(J=1.0, B=0.1, K_T=1.0, u=u, T_L=T_L))
)

motorola.animate(t_f=10.0, steps=100)

### 1.2. Motor Temperature

Now that our motor is up and running, you may have noticed that the temperature is always stuck at zero. This is because we aren't really monitoring
the temperature of the motor winding during the simulation in any way. Let's fix that by adding a temperature "sensor" to the motor.

When we create the controller later on, it won't know the actual temperature of the winding though. Gaussian Process Regression will have to predict it, 
but we will be able to verify the predictions by comparing them to the actual temperature values.

<details>
  <summary>Temperature (you can skip this)</summary>

The following describes how we simulate a "temperature sensor" for the motor winding. You can skip this if you're not interested in the details.

To model the temperature dynamics of the motor winding, we will use the following differential equation:
  
$$
C_t \frac{dT(t)}{dt} = u^2(t)R - k(T(t) - T_{amb})
$$

where:
- $C_t$ is the thermal capacity of the motor winding,
- $R$ is the resistance of the motor winding,
- $k$ is the heat transfer coefficient of the motor, and
- $T_{amb}$ is the ambient temperature.

To keep the computation times reasonably low, we will solve this differential equation using integrating factors:

$$
T(t) = e^{-at} \left( T_0 + \frac{1}{C_t} \left[\int_0^{t'} e^{a\tau} g(\tau) d\tau + \int_{t'}^t e^{a\tau} g(\tau) d\tau\right] \right), \\
\text{where } a = \frac{k}{C_t}, \, g(\tau) = u^2(\tau)R + kT_{amb}, \text{ and } t' \text{ is the last time step for which } T(t') \text{ is known}.
$$

</details>

In [8]:
class TemperatureSensor:
    solver: Final[IntegratingFactorsSolver]

    def __init__(
        self,
        *,
        C_t: float,
        R: float,
        k: float,
        T_ambient: float,
        u: ControlSignal,
        T_0: float,
    ) -> None:
        self.solver = IntegratingFactorsSolver(
            a=k / C_t,
            g=lambda t: (u(t) ** 2 * R + k * T_ambient) / C_t,
            y_0=T_0,
        )

    def __call__(self, t: float) -> float:
        return self.solver(t)

    def reset(self) -> None:
        self.solver.reset()

    @staticmethod
    def for_motor(
        motor: Motor,
        *,
        C_t: float = 40,  # J/°C
        R: float = 2,  # Ω
        k: float = 0.5,  # W/°C
        T_ambient: float = 25,  # °C
        T_0: float = 25,  # °C
    ) -> "TemperatureSensor":
        return TemperatureSensor(
            C_t=C_t, R=R, k=k, T_ambient=T_ambient, u=motor.u, T_0=T_0
        )

In [None]:
# Constant control signal and load, that turns off after 150 seconds
u: ControlSignal = lambda t: 1.0 if t < 150.0 else 0.0
T_L: Load = lambda t: 0.5 if t < 150.0 else 0.0

morton = (
    Motor("Morton")
    .with_control_signal(u)
    .with_load(T_L)
    .with_dynamics(motor_dynamics(J=1.0, B=0.1, K_T=1.0, u=u, T_L=T_L))
)

# Let's also see what the temperature looks like.
morton = morton.with_temperature(TemperatureSensor.for_motor(morton))

# The simulation is also longer
morton.animate(t_f=300.0, steps=200).save_measurements_to("motor_temperature_data.csv")

### 1.3. Motor Temperature Data

Time for the fun part! A colleague of yours has collected some temperature measurements for motors. They ran the motor at full power, then turned it off,
and recorded the changes in temperature over time for the whole process. This data is stored in the file `motor_temperature_data.csv`.

#### Exercise 1.3.1

Load the data from the file `motor_temperature_data.csv` and understand its structure. 

- What type of data is it? 
- How many *important* columns are there? 
- What do they represent?

Once you've done that, the next logical thing to do would be to visualize the data. You'll find a cool plotting function below to help you with that.

You can use a convenient library called `pandas` to load the data. Let your web searching power guide you through the rest of the process.

<details>
    <summary> Hint </summary>

> &nbsp;  
> Pandas has a `read_csv` function exactly for such situations. You can also use it in conjunction with the `head` method to get a quick look at the data.  
> &nbsp;

</details>

In [10]:
def plot_temperature(*, t: Vector, T: Vector) -> Figure:
    return Figure(
        data=[
            Scatter(
                x=t,
                y=T,
                mode="markers",
                name="Temperature",
                marker=dict(size=5, symbol="cross"),
            )
        ],
        layout=dict(
            title="Motor Winding Temperature",
            xaxis=dict(title="Time (s)"),
            yaxis=dict(title="Temperature (°C)"),
        ),
    )

In [None]:
# {% if not exercise['1.3.1'].solution %}
# TODO: Implement me!
print("Oops! Looks like you forgot to implement this cell.")
# {% else %}
# TODO: Review me!

# Let's read in the file first.
temperature_data = pd.read_csv("motor_temperature_data.csv")

# Now let's check out the first few rows.
temperature_data.head()
# {% endif %}

In [12]:
def extract_data_from(temperature_data: pd.DataFrame) -> tuple[Vector, Vector]:
    # {% if exercise['1.3.1'].solution %}
    # TODO: Review me!
    # Extract the temperature data.
    t = np.array(temperature_data["Elapsed Time (s)"])
    T = np.array(temperature_data["Motor Winding Temperature (C)"])

    return t, T
    # {% else %}
    # TODO: Implement me!
    raise NotImplementedError("Not implemented!")
    # {% endif %}

In [None]:
# {% if exercise['1.3.1'].solution %}
# TODO: Review me!
# Extract the temperature data.
t, T = extract_data_from(temperature_data)

# Let's plot the temperature data.
plot_temperature(t=t, T=T).show()
# {% endif %}

### 1.4. Reflection

After looking at the data, consider the following questions:
- What parts of the data could be useful for predicting the motor winding temperature?
- Are there any distinct sections in the data that could provide more information?
- Does the data look noisy? If so, how could you deal with the noise?

Well, looks like that's the data you have to work with. While you think about what you can do with all those scattered points on your screen, 
let's get a quick refresher on how Gaussian Process Regression works.

## 2. Gaussian Process Regression

### 2.1. The Basics

Unlike your regular linear regression, Gaussian Process Regression (GPR) does not directly assume a particular form for the underlying function relating
the inputs to the outputs. Consequently, it doesn't estimate any parameters of the underlying function, since we don't know of any parameters to begin 
with. It does however, assume that adjacent points in the input space have some kind of relationship. This relationship is captured by a covariance 
function, also known as the kernel function, which is the core of GPR (pun intended).

### 2.2. The Kernel Function

The kernel of a GPR model represents the prior belief about the function we are trying to estimate. It measures the similarity between two points,
($x_1$ and $x_2$), and determines how much a known data point at $x_1$ can influence the prediction at $x_2$ (or vice versa). 

The choice of this function largely determines the "shape" of the predictions and there are many options to choose from, but the kernel typically 
has the following properties:

- If the points are close to each other, the kernel returns a large value, indicating that their values are highly correlated.
- If the points are far apart, the kernel returns a small value, meaning their values are weakly correlated or uncorrelated.

Throughout this exercise, we will represent it via the `Kernel` interface defined below:

In [14]:
class Kernel(Protocol):
    def __call__(self, x_1: float, x_2: float) -> float:
        """Computes the value of the kernel function for the two input points x_1 and x_2."""
        ...


#### Exercise 2.2.1

Let's start with a common kernel function called the **Radial Basis Function (RBF)**. It is defined as:
$$ k(x_1,x_2):= \sigma^2*\exp(-\dfrac{(x_1-x_2)^2}{2l^2})$$

where:
- $\sigma^2$ is the signal variance, which controls the vertical variation or amplitude of the function, and
- $l > 0$ (sometimes written as $\lambda$) is the length-scale, which controls how quickly the correlation between two points decreases as their distance increases.

Implement the `__call__` method of the `RBF` class, which takes two inputs $x_1$ and $x_2$ and returns the value of the RBF kernel function at 
those points. Note that *sigma* and *l* are provided as instance attributes. 

In [15]:
@dataclass(frozen=True)
class RBFKernel:
    _: KW_ONLY
    sigma: float = 1.0
    l: float = 1.0

    def __call__(self, x_1: float, x_2: float) -> float:
        # {% if exercise['2.2.1'].solution %}
        # TODO: Review me!
        return self.sigma**2 * np.exp(-((x_1 - x_2) ** 2) / (2 * self.l**2))
        # {% else %}
        # TODO: Implement me!
        raise NotImplementedError("Not implemented!")
        # {% endif %}

    def __str__(self) -> str:
        # This just prints the kernel in a nice way.
        return f"RBF(\u03c3={self.sigma:.3f}, l={self.l:.3f})"

In [None]:
import plotly
import plotly.graph_objects as go

from IPython.display import display, HTML
from ipywidgets import interact, widgets

Matrix: TypeAlias = NDArray[np.floating]


class OnButtonClick(Protocol):
    def __call__(self, figure: Figure) -> None:
        """Updates the figure when the button is clicked."""
        ...


class OnParameterUpdate(Protocol):
    def __call__(self, figure: Figure, *args: Any, **kwargs: Any) -> None:
        """Returns the new y values for the figure."""
        ...


@dataclass(frozen=True)
class Button:
    name: str
    _: KW_ONLY
    on_click: OnButtonClick


@dataclass(frozen=True)
class Slider:
    name: str
    _: KW_ONLY
    min: float
    max: float
    step: float
    default: float
    description: str = ""


def enable_plotly_latex() -> None:
    plotly.offline.init_notebook_mode()
    display(
        HTML(
            '<script type="text/javascript" async src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.1/MathJax.js?config=TeX-MML-AM_SVG"></script>'
        )
    )


def set_layout_for(
    figure: Figure, *, title: str = "", x_title: str = "Input", y_title: str = "Output"
) -> Figure:
    figure.update_layout(
        height=600,
        title=title,
        xaxis_title=x_title,
        yaxis_title=y_title,
        legend=dict(yanchor="top", y=0.9, xanchor="right", x=0.95),
    )

    return figure


def line_scatter(
    x: Vector,
    y: Vector,
    *,
    name: str = "Prediction",
    color: str = "blue",
    dash: str = "solid",
    visible: bool = True,
    showlegend: bool = True,
) -> Scatter:
    return Scatter(
        x=x,
        y=y,
        name=name,
        visible=visible,
        showlegend=showlegend,
        line=dict(color=color, width=2, dash=dash),
    )


def dot_scatter(
    x: Vector,
    y: Vector,
    *,
    name: str = "Observed points",
    color: str = "red",
    visible: bool = True,
    showlegend: bool = True,
) -> Scatter:
    return Scatter(
        x=x,
        y=y,
        name=name,
        visible=visible,
        showlegend=showlegend,
        mode="markers",
        marker=dict(color=color, size=8),
    )


def uncertainty_area_scatter(
    x: Vector,
    y_upper: Vector,
    y_lower: Vector,
    *,
    name: str = "Mean +/- Standard Deviation",
    visible: bool = True,
) -> Scatter:
    # We plot the upper bound first, which corresponds to the x values
    # Then we plot the lower bound, but in reverse order, so that the area
    # between the two lines is filled.
    return Scatter(
        x=np.concatenate((x, x[::-1])),
        y=np.concatenate((y_upper, y_lower[::-1])),
        name=name,
        visible=visible,
        showlegend=True,
        fill="toself",
        fillcolor="rgba(189,195,199,0.5)",
        line=dict(color="rgba(200,200,200,0)"),
        hoverinfo="skip",
    )


def label(
    x: float,
    y: float,
    text: str,
    *,
    position: str = "top right",
    font_size: int = 16,
    color: str = "black",
) -> Scatter:
    return Scatter(
        x=[x],
        y=[y],
        mode="text+markers",
        text=[text],
        textposition=position,
        textfont=dict(size=font_size, color=color),
        showlegend=False,
    )


def interactive_figure(
    data: list[Scatter],
    *,
    title: str,
    x_title: str,
    y_title: str,
    buttons: list[Button] = [],
    sliders: list[Slider] = [],
    on_update: OnParameterUpdate | None = None,
) -> Figure:
    figure = set_layout_for(
        go.FigureWidget(data=data), title=title, x_title=x_title, y_title=y_title
    )

    @interact(
        **{
            param.name: widgets.FloatSlider(
                value=param.default,
                min=param.min,
                max=param.max,
                step=param.step,
                description=param.description or param.name,
                style={"description_width": "200px"},
                layout=widgets.Layout(width="35%"),
            )
            for param in sliders
        }
    )
    def update(**kwargs: Any) -> None:
        assert (
            not sliders or on_update is not None
        ), "You forgot to specify an update function (on_update)."

        if on_update is not None:
            with figure.batch_update():
                on_update(figure, **kwargs)

    def click_handler_for(button: Button) -> Callable:
        def click_handler(_) -> None:
            with figure.batch_update():
                button.on_click(figure)

        return click_handler

    button_components: list[widgets.Button] = []

    for button in buttons:
        component = widgets.Button(description=button.name)
        component.on_click(click_handler_for(button))

        button_components.append(component)

    return widgets.VBox([figure, *button_components])


enable_plotly_latex()

Let's see what this kernel looks like. Run the cell below for an interactive visualization.

In [None]:
@dataclass(frozen=True)
class CovarianceCalculator:
    x_values: Vector

    def __call__(self, figure: Figure, *, l: float, sigma: float, x_2: float) -> None:
        figure.data[0].y = self.calculate(l, sigma, x_2)

        # Update the label for x_2
        line = label(x=x_2, y=self.variance(l, sigma, x_2), text="$x_2$")
        figure.data[1].x = line.x
        figure.data[1].y = line.y

    def calculate(self, l: float, sigma: float, x_2: float) -> Vector:
        kernel = RBFKernel(l=l, sigma=sigma)
        return np.array([kernel(x, x_2) for x in self.x_values])

    def variance(self, l: float, sigma: float, x_2: float) -> float:
        kernel = RBFKernel(l=l, sigma=sigma)
        return kernel(x_2, x_2)


x_values = np.arange(-10, 10, 0.1)
calculate_covariance = CovarianceCalculator(x_values)

interactive_figure(
    data=[
        line_scatter(
            x=x_values,
            y=np.zeros_like(x_values),
            name="Covariance",
        ),
        label(x=0, y=0, text="$x_2$"),
    ],
    title="Radial Basis Function Kernel",
    x_title="$x_1$",
    y_title="$RBF(x_1, x_2)$",
    sliders=[
        Slider(
            "l",
            min=0.1,
            max=3,
            step=0.1,
            default=1.0,
            description=r"Length scale (l)",
        ),
        Slider(
            "sigma",
            min=0.1,
            max=2,
            step=0.01,
            default=1.0,
            description=r"Signal variance (sigma)",
        ),
        Slider(
            "x_2",
            min=-10,
            max=10,
            step=0.1,
            default=0.0,
            description=r"Second input (x_2)",
        ),
    ],
    on_update=calculate_covariance,
)

### 2.3. The Covariance Matrix

Hopefully your kernels looks nice and smooth, but it's not enough for a GPR model. The next ingredient is data. You would need to have some labeled
data points (*training data*) with which you can update your prior belief about the function distribution, and the first step is to calculate the 
**covariance matrix**.

Let's say you are given $n$ training points $x_1, x_2, \ldots, x_m \in \mathbb{R}^n$ with values $y_1, y_2, \ldots, y_m \in \mathbb{R}$. The covariance
matrix $K'$ is a square matrix of size $m \times m$ where the $(i, j)$-th element is given by the kernel function evaluated at $x_i$ and $x_j$:

$$
K' = \begin{bmatrix}
k(x_1, x_1) & k(x_1, x_2) & \ldots & k(x_1, x_m) \\
k(x_2, x_1) & k(x_2, x_2) & \ldots & k(x_2, x_m) \\
\vdots & \vdots & \ddots & \vdots \\
k(x_m, x_1) & k(x_m, x_2) & \ldots & k(x_m, x_m)
\end{bmatrix}
$$

You can also define a more general covariance matrix $K(\mathbf{x^{(1)}}, \mathbf{x^{(2)}})$ between two vectors of points $\mathbf{x^{(1)}}$ and $\mathbf{x^{(2)}}$:

$$
K(\mathbf{x^{(1)}}, \mathbf{x^{(2)}}) = \begin{bmatrix}
k(x_1^{(1)}, x_1^{(2)}) & k(x_1^{(1)}, x_2^{(2)}) & \ldots & k(x_1^{(1)}, x_m^{(2)}) \\
k(x_2^{(1)}, x_1^{(2)}) & k(x_2^{(1)}, x_2^{(2)}) & \ldots & k(x_2^{(1)}, x_m^{(2)}) \\
\vdots & \vdots & \ddots & \vdots \\
k(x_m^{(1)}, x_1^{(2)}) & k(x_m^{(1)}, x_2^{(2)}) & \ldots & k(x_m^{(1)}, x_m^{(2)})
\end{bmatrix}, \quad \text{so that } K' = K(\mathbf{x}, \mathbf{x}).
$$

These matrices can be used to update our function distribution based on the training data. Later on, we will also use them to make predictions at new points.


#### Exercise 2.3.1

Implement the `covariance_matrix` function given below. It takes two vectors of training points `x_1` and `x_2`, and the kernel function. It should
return the covariance matrix $K(\mathbf{x^{(1)}}, \mathbf{x^{(2)}})$ as defined above.

In [18]:
import sympy as sp
from sympy import latex
from IPython.display import Math


@dataclass(frozen=True)
class FormattingContext:
    _: KW_ONLY
    m: int
    n: int
    start_limit: int
    end_limit: int
    precision: int
    hide_small_values: bool
    v_dots: sp.Symbol
    c_dots: sp.Symbol
    d_dots: sp.Symbol

    @property
    def limit(self) -> int:
        return self.start_limit + self.end_limit

    @staticmethod
    def create_for(
        matrix: Matrix,
        *,
        start_limit: int,
        end_limit: int,
        precision: int,
        hide_small_values: bool,
    ) -> "FormattingContext":
        return FormattingContext(
            m=matrix.shape[0],
            n=matrix.shape[1],
            start_limit=start_limit,
            end_limit=end_limit,
            precision=precision,
            hide_small_values=hide_small_values,
            v_dots=sp.symbols(r"\vdots"),
            c_dots=sp.symbols(r"\cdots"),
            d_dots=sp.symbols(r"\ddots"),
        )


def truncate(matrix: Matrix, *, context: FormattingContext) -> sp.Matrix:
    m, n = matrix.shape
    start_limit = context.start_limit
    end_limit = context.end_limit

    if m <= context.limit and n <= context.limit:
        return sp.Matrix(matrix)

    rows = list(matrix[:start_limit])
    if m > context.limit:
        rows.extend(matrix[-end_limit:])

    result = sp.Matrix(rows)

    if n > context.limit:
        result = result.extract(
            [i for i in range(result.rows)],
            list(range(start_limit)) + list(range(n - end_limit, n)),
        )

    return result


def extend_horizontally(matrix: sp.Matrix, *, context: FormattingContext) -> sp.Matrix:
    m = context.m
    start_limit, limit = context.start_limit, context.limit
    v_dots = context.v_dots

    if m > limit:
        columns = matrix.shape[1]
        matrix = matrix.row_insert(
            start_limit, sp.Matrix([[v_dots for _ in range(columns)]])
        )

    return matrix


def extend_vertically(matrix: sp.Matrix, *, context: FormattingContext) -> sp.Matrix:
    n = context.n
    start_limit, limit = context.start_limit, context.limit
    c_dots = context.c_dots

    if n > limit:
        rows = matrix.shape[0]
        matrix = matrix.col_insert(
            start_limit, sp.Matrix([c_dots for _ in range(rows)])
        )

    return matrix


def extend_diagonally(matrix: sp.Matrix, *, context: FormattingContext) -> sp.Matrix:
    m, n = context.m, context.n
    start_limit, limit = context.start_limit, context.limit
    d_dots = context.d_dots

    if m > limit and n > limit:
        matrix[start_limit, start_limit] = d_dots

    return matrix


def format_numbers(matrix: sp.Matrix, *, context: FormattingContext) -> sp.Matrix:
    precision = context.precision

    def format_entry(entry: sp.Expr) -> sp.Symbol | float | str:
        if isinstance(entry, sp.Symbol):
            return entry

        number = float(entry)
        if abs(number) < 1e-3 and number != 0:
            return 0 if context.hide_small_values else f"{number:.{precision}e}"
        else:
            return f"{number:.{precision}f}"

    return sp.Matrix(
        [[format_entry(value) for value in row] for row in matrix.tolist()]
    )


def pretty(
    matrix: Matrix,
    *,
    start_limit: int = 5,
    end_limit: int = 5,
    precision: int = 3,
    hide_small_values: bool = True,
) -> None:
    context = FormattingContext.create_for(
        matrix,
        start_limit=start_limit,
        end_limit=end_limit,
        precision=precision,
        hide_small_values=hide_small_values,
    )

    symbolic = truncate(matrix, context=context)
    symbolic = extend_horizontally(symbolic, context=context)
    symbolic = extend_vertically(symbolic, context=context)
    symbolic = extend_diagonally(symbolic, context=context)
    symbolic = format_numbers(symbolic, context=context)

    display(Math(latex(symbolic)))

In [19]:
def covariance_matrix(x_1: Vector, x_2: Vector, kernel: Kernel) -> Matrix:
    # {% if exercise['2.3.1'].solution %}
    # TODO: Review me!
    return np.array([[kernel(a, b) for a in x_1] for b in x_2])
    # {% else %}
    # TODO: Implement me!
    raise NotImplementedError("Not implemented!")
    # {% endif %}

Try calculating the covariance matrix for some random points using the RBF kernel you implemented earlier. You can use the `pretty` 
function to print the matrix in a readable format.

In [None]:
x = np.linspace(-5, 5, 100)  # This can be both your x_1 and x_2

# {% if not exercise['2.3.1'].solution %}
# TODO: Implement me!
print("Oops! Looks like you forgot to implement this cell.")
# {% else %}
# TODO: Review me!
kernel = RBFKernel(sigma=1.0, l=1.0)
K = covariance_matrix(x, x, kernel)
pretty(K)
# {% endif %}

## 2.4. Predictions

To actually incorporate the training data into the model, we need to update some parameters of our expected function distribution. The result of this
update is the **posterior distribution**. We are assuming a multivariate Gaussian distribution for the function values, so the corresponding parameters
are the mean $\mu$ and the covariance $\Sigma$ of the distribution.

The posterior distribution is given by:

$$
\mathcal{N}(\mu=K_*K^{-1}y, \Sigma=K_{**} - K_*K^{-1}K_*^T)
$$

where:
- $K$ is the covariance matrix between all pairs of training points,
- $K_*$ is the covariance matrix between the training points and the test points,
- $K_{**}$ is the covariance matrix between the test points themselves, and
- $y$ is the vector of training values.

To account for the noise that is most likely present in our data, we also add a small value $\sigma_e^2$ to the diagonal of $K'$. 
This value is called the **noise variance** and it represents the uncertainty in the data. This final covariance matrix $K$ is then:

$$
K = K' + \sigma_e^2 \mathbb{I}_m, \quad \text{where } \mathbb{I}_m \text{ is the identity matrix of size } m \times m.
$$

You **don't** need to do this for $K_*$ and $K_{**}$.

#### Exercise 2.4.1

First, implement the `K` property of the `GPR` class. It should return the covariance matrix $K$ of the training data. 

Then, implement the `predict` method of the `GPR` class below. The argument `x` is a vector of the test points, and the method should return:
- the mean $\mu$,
- the covariance matrix $\Sigma$, and
- the variance $\sigma^2$ of the posterior distribution at the test points.

Note that, the training data and a **general calculator** for covariance matrices $K(\mathbf{x^{(1)}}, \mathbf{x^{(2)}})$ are provided as 
instance attributes of the `GPR` class. It may also be a good idea to avoid recalculating the covariance matrix $K$ every time you make a 
prediction.

**Important**: Actually inverting the matrix $K$ is not recommended, as it is computationally expensive, as well as numerically unstable. Instead,
whenever you need to calculate $K^{-1} ...$, use the provided `solve(A, B)` function, which solves the linear system $AX = B$ for $X$. It also works
for multiple right-hand sides (i.e. $B$ can be a matrix).

Tip: `@` or `np.dot` can be used for matrix multiplication.

<details>
    <summary> Hint </summary>

> &nbsp;  
> The recipe for the prediction process is as follows:
> 
> 1. Calculate the covariance matrix $K_* = K(\mathbf{x_{\text{train}}}, \mathbf{x})$ between the training points and the test points.
> 2. Calculate the covariance matrix $K_{**} = K(\mathbf{x}, \mathbf{x})$ between the test points themselves.
> 3. Calculate the mean of the posterior distribution $\mu = K_* K^{-1} y$, where $y$ is the vector of training values.
> 4. Calculate the covariance of the posterior distribution $\Sigma = K_{**} - K_* K^{-1} K_*^T$.
> 5. The variance of the posterior distribution is just the diagonal of the covariance matrix $\Sigma$.  
> &nbsp;

</details>

<details>
    <summary> Note </summary>

> &nbsp;  
> In practical implementations of GPR you will face several numerical problems. For example, numeric approximations made during 
> matrix operations may lead to a non-invertible/non-positive-semi-definite covariance matrix $\Sigma$. This is, however, not
> a valid covariance matrix for a Gaussian distribution, which makes further calculations impossible. To circumvent this issue,
> we later add a small value to the diagonal of the covariance matrix to ensure it is positive semi-definite.  
> &nbsp;

</details>

In [21]:
from scipy.sparse import csc_matrix
from scipy.sparse.linalg import cg, spilu, LinearOperator


def incomplete_cholesky_preconditioner(A: Matrix) -> LinearOperator:
    """Compute the Incomplete Cholesky preconditioner for matrix A."""
    A_csc = csc_matrix(A)
    ilu = spilu(A_csc, drop_tol=1e-5)

    M = LinearOperator(A.shape, ilu.solve)
    return M


def solve(
    A: Matrix,
    B: Vector | Matrix,
    /,
    *,
    tolerance: float = 1e-10,
    max_iterations: int | None = None,
) -> Vector | Matrix:
    """Solves AX = B for X using the Conjugate Gradient method, where A is symmetric positive definite."""
    if not isinstance(B, np.ndarray):
        B = np.array(B)

    if A.shape[0] == 1:
        return np.array([(B / A).squeeze()])

    if B.ndim == 1:
        B = B.reshape(-1, 1)

    N, nrhs = B.shape
    solutions: list[Vector] = []

    max_iterations = max_iterations or N
    M = incomplete_cholesky_preconditioner(A)

    for i in range(nrhs):
        x, info = cg(A, B[:, i], M=M, rtol=tolerance, maxiter=max_iterations)
        if info > 0:
            # print(f"Warning: CG did not converge for column {i}. Error code: {info}")
            pass  # You can uncomment the above line if you want to see the warning

        solutions.append(x)

    return np.array(solutions).squeeze().T

Some useful classes/interfaces are provided below to help you with the implementation.

In [22]:
class CovarianceMatrixCalculator(Protocol):
    def __call__(self, x_1: Vector, x_2: Vector) -> Matrix:
        """Calculates the covariance matrix for the given input vectors x_1 and x_2."""
        ...


@dataclass(frozen=True)
class TrainingData:
    x: Vector
    y: Vector

    def take(self, count: int) -> "TrainingData":
        """Returns new training data with only the first `count` data points."""
        return TrainingData(x=self.x[:count], y=self.y[:count])

    @property
    def m(self) -> int:
        """Returns the number of training examples."""
        return len(self.y)


@dataclass(frozen=True)
class PredictionResults:  # This should be the output of the predict method
    _: KW_ONLY
    mean: Vector
    covariance: Matrix
    variance: Vector

In [23]:
@dataclass(frozen=True)
class OneDimensionalCovarianceMatrixCalculator:
    kernel: Kernel

    def __call__(self, x_1: Vector, x_2: Vector) -> Matrix:
        return covariance_matrix(x_1, x_2, self.kernel)

    def __str__(self) -> str:
        return str(self.kernel)

In [24]:
@dataclass
class GPR:
    training: TrainingData
    covariance_matrix: CovarianceMatrixCalculator
    sigma_noise: float

    # {% if exercise['2.4.1'].solution %}
    _K: Matrix | None = None  # We can cache the covariance matrix like this
    # {% else %}
    # Did you know you can use instance attributes to store computation results?
    # {% endif %}

    @property
    def K(self) -> Matrix:
        # You can use this as the noise term in the covariance matrix
        noise = max(self.sigma_noise, 3e-7) * np.identity(self.training.m)

        # {% if exercise['2.4.1'].solution %}
        # TODO: Review me!
        if self._K is None:
            self._K = self.covariance_matrix(self.training.x, self.training.x) + noise

        return self._K
        # {% else %}
        # TODO: Implement me!
        # {% endif %}

    def predict(self, x: Vector) -> PredictionResults:
        # {% if exercise['2.4.1'].solution %}
        # TODO: Review me!
        # We first calculate the covariance between the training points and the new points
        K_star = self.covariance_matrix(self.training.x, x)

        # Then we calculate the covariance between the new points
        K_double_star = self.covariance_matrix(x, x)

        # Next, we calculate the mean and covariance of the prediction
        mean = K_star @ solve(self.K, self.training.y)
        covariance = K_double_star - K_star @ solve(self.K, K_star.T)

        # Finally, we add a value larger than machine epsilon to ensure the covariance is SPD
        covariance = covariance + 3e-5 * np.ones(np.shape(covariance)[0])

        return PredictionResults(
            mean=mean, covariance=covariance, variance=np.diag(covariance)
        )
        # {% else %}
        # TODO: Implement me!

        # Add a value larger than machine epsilon to ensure the covariance is SPD, like this:
        # covariance = covariance + 3e-5 * np.ones(np.shape(covariance)[0])

        raise NotImplementedError("Not implemented!")
        # {% endif %}

    @staticmethod
    def create(
        training_data: TrainingData, kernel: Kernel, *, sigma_noise: float = 0
    ) -> "GPR":
        """Creates a new Gaussian Process Regression model from the given training data and kernel."""
        return GPR(
            training_data,
            OneDimensionalCovarianceMatrixCalculator(kernel),
            sigma_noise=sigma_noise,
        )

### 2.5. Visualizing the GPR

The typical way to visualize the predictions of a GPR model is to plot the mean of the posterior distribution, along with the standard deviation. Let's
do exactly that.

#### Exercise 2.5.1

Use the `plot_GPR` function provided below, along with the sample training and test data, to visualize the predictions of the GPR model.

In [25]:
def create_traces_for(
    model: GPR,
    x: Vector,
    *,
    show_mean: bool = True,
    show_samples: bool = True,
    show_variance: bool = True,
) -> list[Scatter]:
    result = model.predict(x)
    mean = result.mean
    standard_deviation = np.sqrt(result.variance)

    data: list[Scatter] = []

    for i in range(1, 4):
        data.append(
            uncertainty_area_scatter(
                x=x,
                y_lower=mean - i * standard_deviation,
                y_upper=mean + i * standard_deviation,
                name=f"Mean +/- {i} * Standard Deviation",
                visible=show_variance,
            )
        )

    data.append(line_scatter(x=x, y=mean, visible=show_mean))
    data.append(
        dot_scatter(x=model.training.x, y=model.training.y, visible=show_samples)
    )

    return data


def plot_GPR(
    model: GPR,
    x: Vector,
    *,
    additional_traces: Sequence[Scatter] = (),
    show_mean: bool = True,
    show_samples: bool = True,
    show_variance: bool = True,
) -> None:
    data = create_traces_for(
        model,
        x,
        show_mean=show_mean,
        show_samples=show_samples,
        show_variance=show_variance,
    )

    set_layout_for(
        Figure(data=[*data, *additional_traces]),
        title=f"GPR with kernel {model.covariance_matrix} and {model.sigma_noise if model.sigma_noise else 'no'} noise",
        x_title="x",
        y_title="f(x)",
    ).show()

Here's your sample data:

In [26]:
# arbitrary training values for demonstration purposes
training_data = TrainingData(
    x=np.array([0, 0.3, 1, 3.1, 4.7]),
    y=np.array([1, 0, 1.4, 1, -0.9]),
)

# these are our test points
x = np.arange(-1, 10, 0.1)

You can use `GPR.create` to create a GPR model with a given kernel and training data.

In [None]:
# {% if not exercise['2.5.1'].solution %}
# TODO: Implement me!
print("Oops! Looks like you forgot to implement this cell.")
# {% else %}
# TODO: Review me!
model = GPR.create(training_data, RBFKernel())
plot_GPR(model, x)
# {% endif %}

### 2.6. Reflection

As you can see, a cool feature of the GPR is that it is sort of a probability distribution over functions - that means can literally "draw" random functions
from the distribution.

- What do you think a random function drawn from the GPR distribution looks like?
- Does the function always pass through the training points?
- How do the hyperparameters of the kernel function affect the predictions?

Once you're done thinking, you can try it by running the next cell and pressing the "Add random drawing" button multiple times. 

Now compare it with the grey standard deviation areas. The "density" of the functions will correspond to the greyness of these 
areas, but be careful! **Not every function of the distribution must be contained in the visualized uncertainty areas.**

In [None]:
@dataclass(frozen=True)
class RandomDrawingGenerator:
    result: PredictionResults
    x: Vector

    def __call__(self, figure: Figure) -> None:
        figure.add_trace(
            line_scatter(
                x=self.x,
                y=np.random.multivariate_normal(
                    self.result.mean, self.result.covariance
                ),
                name="random function",
                showlegend=False,
            )
        )


def interactive_random_functions_figure(
    training: TrainingData, kernel: Kernel, x: Vector
) -> Figure:
    model = GPR.create(training, kernel)
    draw_random_function = RandomDrawingGenerator(model.predict(x), x)

    return interactive_figure(
        data=create_traces_for(model, x, show_mean=False),
        title=f"Random functions of the Gaussian process with {kernel}",
        x_title="x",
        y_title="f(x)",
        buttons=[Button("Add random function", on_click=draw_random_function)],
    )


interactive_random_functions_figure(training_data, RBFKernel(), x)

### 2.7. (Bonus) Different Kernel Functions

The RBF kernel is just one of many kernel functions that can be used in GPR. Here are a few more:
 
- **Linear Kernel**: $k_{\text{Lin}}(x, x') = \sigma_b^2 + \sigma_v^2 (x - c)(x' - c)$
- **Periodic Kernel**: $k_{\text{Per}}(x, x') = \sigma^2 \exp\left(-\dfrac{2\sin^2(\pi|x - x'|/p)}{l^2}\right)$
- **Ration Quadratic Kernel**: $k_{\text{RQ}}(x, x') = \sigma^2 \left(1 + \dfrac{(x - x')^2}{2\alpha l^2}\right)^{-\alpha}$

#### Exercise 2.7.1

Implement the periodic kernel below and see what impact it has on the predictions. You can implement the other kernels as well if you're 
feeling adventurous.

In [None]:
@dataclass(frozen=True)
class PeriodicKernel:
    sigma: float = 0.4
    length: float = 1

    def __call__(self, x_1: float, x_2: float) -> float:
        # {% if exercise['2.7.1'].solution %}
        # TODO: Review me!
        return float(
            self.sigma**2
            * np.exp(-2 * np.sin((np.pi * np.abs(x_1 - x_2) / 2)) ** 2 / self.length**2)
        )
        # {% else %}
        # TODO: Implement me!
        raise NotImplementedError("Not implemented!")
        # {% endif %}


interactive_random_functions_figure(training_data, PeriodicKernel(), x)

### 2.8. Reflection

Okay, now think about this:
- What would happen if the points in the training data were very close to each other?
- What would happen if the points in the training data were very far apart?
- What if we had more training data points? what about less?

Now that we've seen what a big difference the choice of the kernel can make, we demonstrate on how the predicted results change, as we iteratively add more observed points to the GPR. Click the "Add data point" button to see the results change each time.

In [None]:
@dataclass
class TrainingDataUpdater:
    training_data: TrainingData
    kernel: Kernel
    x: Vector
    point_count: int = 1  # Does not work with 1 point for some reason

    def add(self, figure: Figure) -> None:
        self.point_count = min(self.point_count + 1, len(self.training_data.x))
        self.update(figure)

    def remove(self, figure: Figure) -> None:
        self.point_count = max(1, self.point_count - 1)
        self.update(figure)

    def update(self, figure: Figure) -> None:
        traces = self.traces()

        # This is very hacky, but it works for now.
        figure.data[3].y = traces[-2].y  # mean
        figure.data[4].x = traces[-1].x  # new x values
        figure.data[4].y = traces[-1].y  # new y values

        for i in range(1, 4):
            figure.data[i - 1].y = traces[i - 1].y  # uncertainty areas

    def traces(self) -> list[Scatter]:
        training_data = self.training_data.take(self.point_count)
        model = GPR.create(training_data, self.kernel)
        return create_traces_for(model, self.x)


update_data_points = TrainingDataUpdater(training_data, RBFKernel(), x)

interactive_figure(
    data=update_data_points.traces(),
    title="GPR with varying number of training points",
    x_title="x",
    y_title="f(x)",
    buttons=[
        Button("Add data point", on_click=update_data_points.add),
        Button("Remove data point", on_click=update_data_points.remove),
    ],
)

## 2.9 Visualization of effect of free parameters

### 2.9 Reflection
Awesome! Now that we can make predictions with our GPR, lets visualize how the GPR changes if we change the free parameters, i.e. length-scale, variance and white noise. As you will see, these parameters have a huge impact on the predicted function and we'll optimize for them in the last exercise task.

Ask yourself the following: What impact on the predicted function does each one of the hyperparameters have?

In [None]:
@dataclass(frozen=True)
class HyperparameterUpdater:
    training_data: TrainingData
    x: Vector

    def __call__(self, figure: Figure, sigma: float, l: float, noise: float) -> None:
        data = self.plot(sigma, l, noise)

        for i in range(len(data)):
            figure.data[i].y = data[i].y

    def plot(self, sigma: float, l: float, noise: float) -> list[Scatter]:
        kernel = RBFKernel(sigma=sigma, l=l)
        model = GPR.create(self.training_data, kernel, sigma_noise=noise)
        return create_traces_for(model, self.x)


update_hyperparameters = HyperparameterUpdater(training_data, x)

interactive_figure(
    data=update_hyperparameters.plot(1, 0.5, 0.1),
    title="GPR with varying hyperparameters and noise",
    x_title="x",
    y_title="f(x)",
    sliders=[
        Slider(
            "sigma",
            min=0.01,
            max=3,
            step=0.01,
            default=1,
            description=r"Signal variance (sigma)",
        ),
        Slider(
            "l",
            min=0.01,
            max=3,
            step=0.01,
            default=0.5,
            description=r"Length scale (l)",
        ),
        Slider(
            "noise",
            min=0,
            max=0.5,
            step=0.025,
            default=0.1,
            description=r"Noise",
        ),
    ],
    on_update=update_hyperparameters,
)

Pretty impressive how many function 'shapes' we can generate with a GPR!

## 2.10. Optimization of Hyperparameters

There's one more thing we can do to improve the predictions of our GPR model: optimize the hyperparameters of the kernel function. A
typical approach is to maximize the log-likelihood of the training data given the hyperparameters. The log-likelihood is given by:

$$
\log p(y | X, \theta) = -\frac{1}{2} y^T K^{-1} y - \frac{1}{2} \log |K| - \frac{m}{2} \log 2\pi
$$

where:
- $y$ is the vector of training values,
- $X$ is the matrix of training points,
- $K$ is the covariance matrix between the training points,
- $m$ is the number of training points, and
- $\theta$ is the vector of hyperparameters.

#### Exercise 2.10.1

Implement the `optimize_hyperparameters` function below, which takes the following arguments:
- `create_model`: a function that creates a GPR model from the given hyperparameters,
- `theta_0`: the initial guess for the hyperparameters, and
- `bounds`: the bounds for the hyperparameters.

The function should return the optimized hyperparameters as a `Vector` of the same size as `theta_0`. You can use the `K` property of the GPR
class you implemented earlier to calculate the covariance matrix.

<details>
    <summary> Hint </summary>

> &nbsp;  
> The optimization problem can be solved using the `minimize` function from the `scipy.optimize` module. You can use the `L-BFGS-B` method
> for the optimization. Pay attention that the `minimize` function minimizes the objective function, and not maximizes it.  
> &nbsp;

</details>

In [32]:
from scipy.optimize import minimize


Bounds: TypeAlias = tuple[float | None, float | None]


class ModelCreator(Protocol):
    def __call__(self, theta: Vector) -> GPR:
        """Creates a GPR model with the given hyperparameters."""
        ...


def optimize_hyperparameters(
    create_model: ModelCreator,
    theta_0: Vector,
    bounds: list[Bounds],
) -> Vector:
    # {% if exercise['2.10.1'].solution %}
    # TODO: Review me!
    def objective(theta: Vector) -> float:
        model = create_model(theta)
        y = model.training.y
        m = model.training.m

        L = (
            y.T @ solve(model.K, y)
            + np.log(np.linalg.det(model.K))
            + m * np.log(2 * np.pi)
        ) / 2

        return L

    return minimize(
        lambda x: objective(x), x0=theta_0, bounds=bounds, method="L-BFGS-B"
    ).x
    # {% else %}
    # TODO: Implement me!
    raise NotImplementedError("Not implemented!")
    # {% endif %}

Let's see the difference in the predictions before and after optimizing the hyperparameters.

In [None]:
@dataclass(frozen=True)
class GPRWithRBFKernelCreator:  # This is our ModelCreator
    training_data: TrainingData
    sigma_noise: float

    def __call__(self, theta: Vector) -> GPR:
        sigma, l = theta
        return GPR.create(
            self.training_data,
            RBFKernel(sigma=sigma, l=l),
            sigma_noise=self.sigma_noise,
        )


# Some arbitrary function that we want to approximate
def mollifier(x: Vector) -> Vector:
    return 5.0 * np.exp(-((x - 3) ** 2) / (2 * 2**2))


# Arbitrary data points, we also add some noise to the actual function values
sigma_noise = 0.25
x_training = np.array([-0.75, 0.25, 2.9, 5.0, 5.5, 6.0, 10.0])
y_training = mollifier(x_training) + np.random.normal(0, sigma_noise, len(x_training))
training_data = TrainingData(x=x_training, y=y_training)

# We create a range of x values for testing/plotting
x = np.linspace(-1, 11, 100)
y = mollifier(x)
theta_0 = np.array([1.0, 1.0])

# Let's plot the actual function
actual_function_trace = line_scatter(
    x=x, y=y, name="Actual function", color="green", dash="dash"
)

model_creator = GPRWithRBFKernelCreator(training_data, sigma_noise)

# We first plot the GPR with the initial hyperparameters
print("First, with the initial hyperparameters:")
plot_GPR(model_creator(theta_0), x=x, additional_traces=[actual_function_trace])

# Now we optimize the hyperparameters
theta = optimize_hyperparameters(
    model_creator, theta_0, bounds=[(0.01, None), (0.01, None)]
)

# Finally, we plot the GPR with the optimized hyperparameters
print("Then, with the optimized hyperparameters:")
plot_GPR(model_creator(theta), x=x, additional_traces=[actual_function_trace])

### 2.11. Multi-dimensional Inputs

So far, we've only been working with one-dimensional inputs. However, GPR can be extended to multiple dimensions. Everything stays
pretty much the same, except that the kernel function is now a product of the one-dimensional kernels for each dimension.

$$
k_{\text{multi}}(x, x') = \prod_{i=1}^n k_i(x_i, x_i')
$$

where:
- $x = (x_1, x_2, \ldots, x_n)$ and $x' = (x_1', x_2', \ldots, x_n')$ are the input vectors,
- $k_i$ is the kernel function for the $i$-th dimension, and
- $n$ is the number of dimensions.

#### Exercise 2.11.1

Implement the `MultiDimensionalKernel` class below, which takes a list of one-dimensional kernel functions and returns their 
combined kernel function.

In [34]:
class MultiDimensionalKernel:
    kernels: list[Kernel]

    def __init__(self, *kernels: Kernel) -> None:
        self.kernels = list(kernels)

    def __call__(self, x_1: Vector, x_2: Vector) -> float:
        # {% if exercise['2.11.1'].solution %}
        # TODO: Review me!
        result = 1

        for kernel, x_1_i, x_2_i in zip(self.kernels, x_1, x_2):
            result *= kernel(x_1_i, x_2_i)

        return result
        # {% else %}
        # TODO: Implement me!
        raise NotImplementedError("Not implemented!")
        # {% endif %}

    def __str__(self) -> str:
        return " * ".join(str(kernel) for kernel in self.kernels)

#### Exercise 2.11.2

Another detail to consider is that the way we calculate the covariance matrix changes. Instead of having $\mathbf{x^{(1)}}$ and $\mathbf{x^{(2)}}$,
as vectors of points, we now have matrices $\mathbf{X^{(1)}}$ and $\mathbf{X^{(2)}}$. Each row of these matrices represents a point in the
corresponding n-dimensional space.

Implement the `multi_dimensional_covariance_matrix` function below, which takes two matrices of training points $\mathbf{X^{(1)}}$ and $\mathbf{X^{(2)}}$,
and the multi-dimensional kernel function. It should return the covariance matrix $K(\mathbf{X^{(1)}}, \mathbf{X^{(2)}})$.

<details>
    <summary> Hint </summary>

> &nbsp;  
> This task is much simpler than it sounds. You need to do the exact same thing as before, but instead of calculating the kernel function
> for each pair of points, you calculate it for each pair of rows. The kernel function can already handle pairs of vectors as inputs.
>
> If you implemented the one-dimensional covariance matrix correctly, it could directly work for the multi-dimensional case as well.  
> &nbsp;

</details>

In [35]:
def multi_dimensional_covariance_matrix(
    x_1: Matrix, x_2: Matrix, kernel: MultiDimensionalKernel
) -> Matrix:
    # {% if exercise['2.11.2'].solution %}
    # TODO: Review me!
    return np.array([[kernel(a, b) for a in x_1] for b in x_2])
    # {% else %}
    # TODO: Implement me!
    raise NotImplementedError("Not implemented!")
    # {% endif %}

In [36]:
@dataclass(frozen=True)
class MultiDimensionalCovarianceMatrixCalculator:
    kernel: MultiDimensionalKernel

    def __call__(self, x_1: Matrix, x_2: Matrix) -> Matrix:
        return multi_dimensional_covariance_matrix(x_1, x_2, self.kernel)

    def __str__(self) -> str:
        return str(self.kernel)

In [37]:
def set_3d_layout_for(
    figure: go.Figure,
    *,
    title: str,
    x_title: str = "X",
    y_title: str = "Y",
    z_title: str = "Z",
) -> go.Figure:
    figure.update_layout(
        height=800,
        title=title,
        scene=dict(
            xaxis_title=x_title,
            yaxis_title=y_title,
            zaxis_title=z_title,
        ),
        legend=dict(yanchor="top", y=1.05, xanchor="right", x=1.05),
    )
    return figure


def surface_scatter(
    x: Matrix,
    y: Matrix,
    z: Matrix,
    *,
    name: str = "Prediction",
    color_scale: str = "Plasma",
    opacity: float = 1.0,
    color: str | None = None,
    visible: bool = True,
    showlegend: bool = True,
    legend_only: bool = False,
) -> go.Surface:
    actual_colorscale = color_scale if color is None else [[0, color], [1, color]]

    return go.Surface(
        x=x,
        y=y,
        z=z,
        name=name,
        visible="legendonly" if legend_only else visible,
        showlegend=showlegend,
        colorscale=actual_colorscale,
        opacity=opacity,
        showscale=False,
    )


def dot_scatter_3d(
    x: Vector,
    y: Vector,
    z: Vector,
    *,
    name: str = "Observed points",
    color: str = "red",
    visible: bool = True,
    showlegend: bool = True,
) -> go.Scatter3d:
    return go.Scatter3d(
        x=x,
        y=y,
        z=z,
        name=name,
        visible=visible,
        showlegend=showlegend,
        mode="markers",
        marker=dict(color=color, size=5),
    )


def uncertainty_volume(
    x: Matrix,
    y: Matrix,
    z_lower: Matrix,
    z_upper: Matrix,
    *,
    name: str = "Uncertainty Volume",
    opacity: float = 0.3,
    visible: bool = True,
    showlegend: bool = True,
) -> list[go.Surface]:
    lower_surface = go.Surface(
        x=x,
        y=y,
        z=z_lower,
        name=name + " (lower bound)",
        visible=visible,
        showlegend=showlegend,
        legendgroup=name,
        colorscale="Blues",
        opacity=opacity,
        showscale=False,
    )
    upper_surface = go.Surface(
        x=x,
        y=y,
        z=z_upper,
        name=name + " (upper bound)",
        visible=visible,
        showlegend=showlegend,
        legendgroup=name,
        colorscale="Reds",
        opacity=opacity,
        showscale=False,
    )
    return [lower_surface, upper_surface]

In [38]:
def input_grid(
    x_range: tuple[float, float], y_range: tuple[float, float], m: int
) -> Matrix:
    points_per_dimension = int(np.sqrt(m))
    x_0, x_f = x_range
    y_0, y_f = y_range
    delta_x = (x_f - x_0) / points_per_dimension
    delta_y = (y_f - y_0) / points_per_dimension

    x = np.arange(x_0, x_f + delta_x, delta_x)
    y = np.arange(y_0, y_f + delta_y, delta_y)
    X, Y = np.meshgrid(x, y)

    # Flatten and stack the grid to form the matrix
    return np.column_stack([X.ravel(), Y.ravel()])


def input_to_grid(x: Matrix) -> tuple[Matrix, Matrix]:
    # Separate x and y coordinates
    x_coordinates = x[:, 0]
    y_coordinates = x[:, 1]

    # Find unique values
    x_unique = np.unique(x_coordinates)
    y_unique = np.unique(y_coordinates)

    # Create meshgrid
    X, Y = np.meshgrid(x_unique, y_unique)

    return X, Y


def output_to_grid(*, x: Vector, y: Vector, z: Vector, X: Matrix, Y: Matrix) -> Matrix:
    # Initialize Z with NaN values
    Z = np.full(X.shape, np.nan)

    # Fill in Z values where we have data
    for x_k, y_k, z_k in zip(x, y, z):
        i = np.where(X[0, :] == x_k)[0][0]
        j = np.where(Y[:, 0] == y_k)[0][0]
        Z[j, i] = z_k

    return Z


def create_surface(
    x: Matrix,
    f: Callable[[Matrix], Vector],
    *,
    name: str,
    color: str,
    opacity: float,
    legend_only: bool = False,
) -> go.Surface:
    X, Y = input_to_grid(x)
    Z = f(x).reshape(X.shape)

    return surface_scatter(
        x=X, y=Y, z=Z, name=name, color=color, opacity=opacity, legend_only=legend_only
    )


def create_traces_for_3d(
    model: GPR,
    x: Matrix,
    *,
    show_mean: bool = True,
    show_samples: bool = True,
    show_variance: bool = True,
) -> list[go.Surface]:
    X, Y = input_to_grid(x)

    result = model.predict(x)
    mean = result.mean
    standard_deviation = np.sqrt(result.variance)

    data: list[go.Surface] = []

    if show_variance:
        for i in range(1, 4):
            lower_surface, upper_surface = uncertainty_volume(
                x=X,
                y=Y,
                z_lower=(mean - i * standard_deviation).reshape(X.shape),
                z_upper=(mean + i * standard_deviation).reshape(X.shape),
                name=f"Mean +/- {i} * Standard Deviation",
                opacity=0.3 / i,
            )
            data.extend([lower_surface, upper_surface])

    if show_mean:
        data.append(
            surface_scatter(x=X, y=Y, z=mean.reshape(X.shape), name="Mean Prediction")
        )

    if show_samples:
        data.append(
            dot_scatter_3d(
                x=model.training.x[:, 0],
                y=model.training.x[:, 1],
                z=model.training.y,
                name="Training Data",
            )
        )

    return data


def plot_GPR_3d(
    model: GPR,
    x: Matrix,
    *,
    additional_traces: Sequence[go.Surface] = (),
    show_mean: bool = True,
    show_samples: bool = True,
    show_variance: bool = True,
) -> None:
    data = create_traces_for_3d(
        model,
        x,
        show_mean=show_mean,
        show_samples=show_samples,
        show_variance=show_variance,
    )

    fig = go.Figure(data=[*data, *additional_traces])

    set_3d_layout_for(
        fig,
        title=f"GPR with kernel {model.covariance_matrix} and {model.sigma_noise if model.sigma_noise else 'no'} noise",
        x_title="x",
        y_title="y",
        z_title="f(x,y)",
    ).show()

In [None]:
@dataclass(frozen=True)
class MultiDimensionalGPRCreator:  # This ModelCreator works with multi-dimensional kernels
    training_data: TrainingData
    sigma_noise: float

    def __call__(self, theta: Vector) -> GPR:
        # The theta vector looks like this: [sigma_1, l_1, sigma_2, l_2, ...]
        # The `theta.reshape(-1, 2)` basically pairs up successive elements in the theta vector.
        kernel = MultiDimensionalKernel(
            *(RBFKernel(sigma=sigma, l=l) for sigma, l in theta.reshape(-1, 2))
        )

        return GPR(
            self.training_data,
            MultiDimensionalCovarianceMatrixCalculator(kernel),
            sigma_noise=self.sigma_noise,
        )


def mollifier_3d(x: Matrix) -> Vector:
    c_x = 3
    c_y = 4
    w_x = 2
    w_y = 3

    return (
        5.0
        * np.exp(-((x[:, 0] - c_x) ** 2 / (2 * w_x**2)))
        * np.exp(-((x[:, 1] - c_y) ** 2 / (2 * w_y**2)))
    )


def noise_3d(x: Matrix) -> Vector:
    return np.random.normal(0, sigma_noise, len(x))


# Again, we first take some arbitrary data points and add noise
sigma_noise = 0.25
x_training = np.array(
    [
        [-0.75, 2.6, 2.9, 5.0, 5.5, 6.0, 10.0],
        [1.0, 0.5, 3.0, 4.0, 5.0, 6.0, 7.0],
    ],
).T
y_training = mollifier_3d(x_training) + noise_3d(x_training)
training_data = TrainingData(x=x_training, y=y_training)

x = input_grid((-1, 11), (0, 8), 100)
actual_surface = create_surface(
    x,
    mollifier_3d,
    name="Actual function",
    color="green",
    opacity=0.25,
    legend_only=True,
)

theta_0 = np.array([2.4, 2.84, 2.4, 2.84])
model_creator = MultiDimensionalGPRCreator(training_data, sigma_noise)

# We first plot the GPR with the initial hyperparameters
print("First, with the initial hyperparameters:")
plot_GPR_3d(model_creator(theta_0), x, additional_traces=[actual_surface])

# Now we optimize the hyperparameters
theta = optimize_hyperparameters(
    model_creator,
    theta_0,
    bounds=[(0.01, None), (0.01, None), (0.01, None), (0.01, None)],
)

# Finally, we plot the GPR with the optimized hyperparameters
print("Then, with the optimized hyperparameters:")
plot_GPR_3d(model_creator(theta), x, additional_traces=[actual_surface])

### 2.12. Reflection

What do you think:
- Is the multi-dimensional GPR able to predict the function better after optimizing the hyperparameters? What's the difference?
- What would happen if we had more data points in the multi-dimensional case?
- What if we used a different kernel function?

## 3. GPR for Motor Temperature Prediction

### 3.1. Preparing the GPR Model

Back to our original problem! We have the motor temperature data, and we want to predict the temperature of the motor winding using GPR.

You've probably generated some ideas on how we can use the data available to us to predict the motor winding temperature, but here's
one way to do it:

1. **Preprocess the Data**: The data is noisy, so we need to clean it up a bit. We can use a moving average filter to smooth out the data.
2. **Extract the Features**: We can extract some parts of the data that we think are important for predicting the motor winding temperature.
    We suggest using the following features:
    - The starting temperature of the motor winding,
    - The time the motor has been running, and
    - The phase of the motor (on or off).  
    
    These features will comprise the 3-dimensional input vector for the GPR model. Our output data will be the temperature difference of the motor
    winding. Throughout this process, we will also want to normalize the data to make it easier for the GPR model to learn the patterns.
3. **Split the Data**: We can split the data into training and test sets. We can use the first 75% of the data for training and the rest for testing.
4. **Train the GPR Model**: We can use the extracted features and the final temperature data to train (& tune the hyperparameters of) the GPR model.
5. **Test the GPR Model**: We can use the test data to evaluate the performance of the GPR model.
6. **Predict the Motor Winding Temperature**: Finally, we can use the trained GPR model to predict the temperature of the motor winding at any given 
    time. The controller can then use this prediction to regulate the motor activity.

#### Exercise 3.1.1

Implement the `denoise` function given below, which takes two vectors `x` and `y`, representing the input and output data, and applies a 
moving average filter to the output data. The function should return both the new input and output vectors. 

When applying the filter, you may decide to discard the first and last few points of the output data, so make sure to adjust the input 
data accordingly.

<details>
    <summary> Hint </summary>

> &nbsp;  
> You can use the `np.convolve` function of numpy to apply the filter. Set the `mode` parameter to `'valid'` to discard the first and last few points.  
> &nbsp;

</details>

In [40]:
def denoise(
    x: Vector,
    y: Vector,
    *,
    window_size: int,
) -> tuple[Vector, Vector]:
    # {% if exercise['3.1.1'].solution %}
    # TODO: Review me!
    x_denoised = x[window_size // 2 : -(window_size // 2)]
    y_denoised = np.convolve(y, np.ones(window_size) / window_size, mode="valid")

    return x_denoised, y_denoised
    # {% else %}
    # TODO: Implement me!
    raise NotImplementedError("Not implemented!")
    # {% endif %}

Applying our denoising function to the motor temperature data, we can now visualize the cleaned data.

In [None]:
window_size = 15
t_denoised, T_denoised = denoise(t, T, window_size=window_size)

set_layout_for(
    Figure(
        data=[
            dot_scatter(x=t, y=T, name="Original Noisy data"),
            line_scatter(x=t_denoised, y=T_denoised, name="Denoised data"),
        ]
    ),
    title=f"Denoised temperature data (window size = {window_size})",
    x_title="Time",
    y_title="Temperature",
).show()

#### Exercise 3.1.2

Implement the `extract_features` function below, which takes the cleaned input data and extracts the following feature vectors **for pairs of
data points**:

$$
\mathbf{x} = \left[ x_1, x_2, x_3 \right]
$$

where:
- $x_1$ is the initial temperature of the motor winding,
- $x_2$ is the time the motor has been running, and
- $x_3$ is the phase of the motor (on or off).

Our label $y$ will be the temperature difference of the motor winding.

The function should return a tuple `(X, y)` where `X` is the matrix of features and `y` is the output data. In `X`, each row represents a
single data point, and each column represents a feature.

You may be wondering how you should choose the data point pairs to extract the features from. We suggest constructing the pairs in the following way:
1. Let's say our time points are indexed as $t^{(1)}, t^{(2)}, t^{(3)}, ..., t^{(m)}$.
2. Take a subsequence of every 3rd point, starting from the first index, like $t^{(1)}, t^{(4)}, t^{(7)}, ... $.
3. Repeat step 2 for different starting indices until every data point falls in exactly one subsequence.
4. Repeat steps 2 and 3, but this time take every 5th point. Repeat it for every 7th point as well.
5. Adjacent points in each subsequence will now be valid pairs.

**Make sure data point pairs are such that the motor has the same phase in both data points.**

In [42]:
# {% if exercise['3.1.2'].solution %}
@dataclass(frozen=True)
class DataForPhase:
    t: Vector
    T: Vector


@dataclass(frozen=True)
class DataForPhases:
    on: DataForPhase
    off: DataForPhase


@dataclass(frozen=True)
class DataSequences:
    t: list[Vector]
    T: list[Vector]
    on_sequence_lengths: list[int]
    off_sequence_lengths: list[int]

    def initial_temperatures(self) -> Vector:
        # Since the running times are for calculated from subsequent pairs of time points,
        # we just need to remove the last element from each temperature sequence.
        return np.concatenate([sequence[:-1] for sequence in self.T])

    def running_times(self) -> Vector:
        return np.concatenate([np.diff(sequence) for sequence in self.t])

    def temperature_differences(self) -> Vector:
        return np.concatenate([np.diff(sequence) for sequence in self.T])

    def motor_phases(self) -> Vector:
        # We encode the motor phase (on = 1, off = 0). We also shouldn't forget the -1,
        # since we are dropping one element from each sequence when calculating the running times.
        phase_on = [np.ones(length - 1) for length in self.on_sequence_lengths]
        phase_off = [np.zeros(length - 1) for length in self.off_sequence_lengths]

        return np.concatenate([*phase_on, *phase_off])


def split_into_phases(t: Vector, T: Vector) -> DataForPhases:
    # This can be done by splitting the data right in the middle
    # Generally the easiest way is to just look at the data.
    t_mid = t[len(t) // 2]
    t_on, t_off = t[t < t_mid], t[t >= t_mid]
    T_on, T_off = T[t < t_mid], T[t >= t_mid]

    return DataForPhases(
        on=DataForPhase(t=t_on, T=T_on), off=DataForPhase(t=t_off, T=T_off)
    )


def create_sequences(data: DataForPhases) -> DataSequences:
    # The way we do it is kind of arbitrary, but it's a good starting point.
    def sequences_from(x: Vector) -> list[Vector]:
        return [x[i::j] for j in [3, 5, 7] for i in range(j)]

    t_on_sequences, t_off_sequences = (
        sequences_from(data.on.t),
        sequences_from(data.off.t),
    )
    T_on_sequences, T_off_sequences = (
        sequences_from(data.on.T),
        sequences_from(data.off.T),
    )

    return DataSequences(
        t=[*t_on_sequences, *t_off_sequences],
        T=[*T_on_sequences, *T_off_sequences],
        on_sequence_lengths=[len(sequence) for sequence in t_on_sequences],
        off_sequence_lengths=[len(sequence) for sequence in t_off_sequences],
    )


# {% endif %}


def extract_features(t: Vector, T: Vector) -> tuple[Matrix, Vector]:
    # {% if exercise['3.2.1'].solution %}
    # TODO: Review me!
    # We first start by splitting the data into two phases - motor on and motor off.
    data = split_into_phases(t, T)

    # Next, we extract sequences of data for each phase.
    data_sequences = create_sequences(data)

    # We can now extract the interesting features and labels.
    x_1 = data_sequences.initial_temperatures()
    x_2 = data_sequences.running_times()
    x_3 = data_sequences.motor_phases()

    X = np.column_stack([x_1, x_2, x_3])
    y = data_sequences.temperature_differences()

    return X, y
    # {% else %}
    # TODO: Implement me!
    raise NotImplementedError("Not implemented!")
    # {% endif %}

In [43]:
def subplots(figures: list[Figure], figure_height: int = 300) -> Figure:
    n = len(figures)

    figure = make_subplots(rows=n, cols=1, shared_xaxes=True, vertical_spacing=0.05)

    for i, sub_figure in enumerate(figures, 1):
        for trace in sub_figure.data:
            figure.add_trace(trace, row=i, col=1)

        # Update y-axis title for each subplot
        figure.update_yaxes(title_text=sub_figure.layout.yaxis.title.text, row=i, col=1)

    # Update the overall figure layout
    figure.update_layout(
        height=figure_height * n,
    )

    # Update x-axis title (only for the bottom subplot)
    figure.update_xaxes(title_text=figures[-1].layout.xaxis.title.text, row=n, col=1)

    return figure


def plot_features(X: Matrix, y: Vector) -> Figure:
    indices = np.arange(len(X)) * 1.0

    initial_temperature_figure = set_layout_for(
        Figure(data=[line_scatter(x=indices, y=X[:, 0], name="Initial Temperature")]),
        x_title="Data Point Index",
        y_title=r"$\text{Temperature (} x_1 \text{)}$",
    )

    time_difference_figure = set_layout_for(
        Figure(
            data=[
                line_scatter(
                    x=indices, y=X[:, 1], name="Time Difference", color="orange"
                )
            ]
        ),
        x_title="Data Point Index",
        y_title=r"$\text{Time (} x_2 \text{)}$",
    )

    phase_figure = set_layout_for(
        Figure(
            data=[line_scatter(x=indices, y=X[:, 2], name="Motor Phase", color="green")]
        ),
        x_title="Data Point Index",
        y_title=r"$\text{On/Off (} x_3 \text{)}$",
    )

    temperature_difference_figure = set_layout_for(
        Figure(
            data=[dot_scatter(x=indices, y=y, name="Final Temperature", color="red")]
        ),
        x_title="Data Point Index",
        y_title=r"$\text{Temperature (} y \text{)}$",
    )

    subplots(
        [
            initial_temperature_figure,
            time_difference_figure,
            phase_figure,
            temperature_difference_figure,
        ]
    ).show()


def zero_plane(x: Vector, y: Vector) -> go.Surface:
    X, Y = input_to_grid(np.column_stack([x, y]))
    Z = np.zeros(X.shape)

    return surface_scatter(X, Y, Z, name="Zero Plane", color="black", opacity=0.1)


def plot_features_3d(
    X: Matrix,
    y: Vector,
    *,
    additional_traces: list[go.Scatter3d | go.Surface] = [],
) -> None:
    on_indices = X[:, 2] == 1

    T_0 = X[:, 0]
    d_t = X[:, 1]
    d_T = y

    set_3d_layout_for(
        go.Figure(
            data=[
                dot_scatter_3d(
                    x=T_0[on_indices],
                    y=d_t[on_indices],
                    z=d_T[on_indices],
                    color="red",
                    name="Motor On",
                ),
                dot_scatter_3d(
                    x=T_0[~on_indices],
                    y=d_t[~on_indices],
                    z=d_T[~on_indices],
                    color="blue",
                    name="Motor Off",
                ),
                *additional_traces,
                zero_plane(T_0, d_t),
            ]
        ),
        title="Temperature Change vs. Initial Temperature and Running Time",
        x_title="Initial Temperature",
        y_title="Running Time",
        z_title="Temperature Change",
    ).show()

Okay, let's see what our features look like:

- $x_1$ should look like scaled down sections of the original data stacked next to each other.
- $x_2$ is time differences between neighboring points and should have 3 distinct sections for each phase.  
- $x_3$ should be 1 for the first half and 0 for the second.
- $y$ should have sections similar to $x_2$, but this time it's temperature differences which slowly decrease within the same section.

In [None]:
X, y = extract_features(t_denoised, T_denoised)
plot_features(X, y)

We can also take a look at it in 3D.

In [None]:
plot_features_3d(X, y)

#### Exercise 3.1.3

Implement the normalization methods of the `Normalizer` class below, which take the features extracted from the previous exercise and 
normalize/denormalize them. Specifically:

- `normalize_temperature` should normalize the starting temperature of the motor winding to the range $[0, 1]$.
- `normalize_time` should normalize the running times of the motor to the range $[0, 1]$.
- `normalize_temperature_difference` should normalize the temperature differences to the range $[-1, 1]$.
- `denormalize_temperature` should denormalize the predicted temperature differences back to the original range.

The statistical properties of the data (e.g. mean and standard deviation), should be calculated the first time normalization happens. All future
normalizations/denormalizations should use the statistical properties of the data provided for the first time.

<details>
    <summary> Hint </summary>

> &nbsp;  
> You can use the `MinMaxScaler` and `MaxAbsScaler` from the `sklearn.preprocessing` module to normalize the data. For denormalization, you can 
> use the `inverse_transform` method of the scaler. You can use the `fit` method of the scaler to calculate and set the statistical properties
> of a particular vector of data. These properties will be used every time you call `transform` afterwards.
> &nbsp;

</details>

In [46]:
from sklearn.preprocessing import MinMaxScaler, MaxAbsScaler


# {% if exercise['3.1.3'].solution %}
Scaler: TypeAlias = MinMaxScaler | MaxAbsScaler
ScalerCreator: TypeAlias = Callable[[], Scaler]
# {% endif %}


@dataclass
class Normalizer:
    # {% if exercise['3.1.3'].solution %}
    # TODO: Review me!
    temperature_scaler: Scaler | None = None
    time_scaler: Scaler | None = None
    temperature_difference_scaler: Scaler | None = None
    # {% endif %}

    def normalize_temperature(self, T: Vector) -> Vector:
        # {% if exercise['3.1.3'].solution %}
        # TODO: Review me!
        self.temperature_scaler, result = self.transform_with(
            self.temperature_scaler, T, MinMaxScaler
        )
        return result
        # {% else %}
        # TODO: Implement me!
        raise NotImplementedError("Not implemented!")
        # {% endif %}

    def normalize_time(self, t: Vector) -> Vector:
        # {% if exercise['3.1.3'].solution %}
        # TODO: Review me!
        self.time_scaler, result = self.transform_with(
            self.time_scaler, t, MinMaxScaler
        )
        return result
        # {% else %}
        # TODO: Implement me!
        raise NotImplementedError("Not implemented!")
        # {% endif %}

    def normalize_temperature_difference(self, d_T: Vector) -> Vector:
        # {% if exercise['3.1.3'].solution %}
        # TODO: Review me!
        self.temperature_difference_scaler, result = self.transform_with(
            self.temperature_difference_scaler, d_T, MaxAbsScaler
        )
        return result
        # {% else %}
        # TODO: Implement me!
        raise NotImplementedError("Not implemented!")
        # {% endif %}

    def denormalize_temperature_difference(self, T: Vector) -> Vector:
        # {% if exercise['3.1.3'].solution %}
        # TODO: Review me!
        assert (
            self.temperature_difference_scaler is not None
        ), "Cannot denormalize data without normalizing some data first."

        return self.temperature_difference_scaler.inverse_transform(
            T.reshape(-1, 1)
        ).flatten()
        # {% else %}
        # TODO: Implement me!
        raise NotImplementedError("Not implemented!")
        # {% endif %}

    # {% if exercise['3.1.3'].solution %}

    def transform_with(
        self, scaler: Scaler | None, data: Vector, create_scaler: ScalerCreator
    ) -> tuple[Scaler, Vector]:
        reshaped_data = data.reshape(-1, 1)

        if scaler is None:
            scaler = create_scaler()
            scaler.fit(reshaped_data)

        return scaler, scaler.transform(reshaped_data).flatten()

    # {% endif %}

    def normalize(self, X: Matrix, y: Vector) -> tuple[Matrix, Vector]:
        X = self.normalize_input(X)
        y = self.normalize_temperature_difference(y)

        return X, y

    def normalize_input(self, X: Matrix) -> Vector:
        return np.column_stack(
            [
                self.normalize_temperature(X[:, 0]),
                self.normalize_time(X[:, 1]),
                X[:, 2],
            ]
        )

    def denormalize(self, y: Vector) -> Vector:
        return self.denormalize_temperature_difference(y)

Our normalized data now looks like this (notice the scales have changed):

In [None]:
normalizer = Normalizer()
X_normalized, y_normalized = normalizer.normalize(X, y)
plot_features(X_normalized, y_normalized)

In [None]:
plot_features_3d(X_normalized, y_normalized)

#### Exercise 3.1.4

Now we can shuffle and split the data into training and test sets. Implement the `split_data` function below, which takes the normalized features
`X` and the output data `y`, shuffles them, and splits them into training and test sets. The function should return the training and test sets
for both the features and the output data.

The training set can be the first 75% of the data, and the test set can be the rest. A random seed is provided which you can use for reproducibility.

<details>
    <summary> Hint </summary>

> &nbsp;  
> You can use the `shuffle` and `train_test_split` functions from the `sklearn.model_selection` module to shuffle and split the data.  
> &nbsp;

</details>

In [49]:
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split


@dataclass(frozen=True)
class DataSet:
    X: Matrix
    y: Vector


@dataclass(frozen=True)
class DataSets:
    training: DataSet
    test: DataSet

    def training_data(self) -> TrainingData:
        return TrainingData(x=self.training.X, y=self.training.y)


def split_data(X: Matrix, y: Vector, *, seed: int = 42) -> DataSets:
    # {% if exercise['3.1.4'].solution %}
    # TODO: Review me!
    # We first shuffle the data
    X, y = shuffle(X, y, random_state=seed)  # type: ignore

    # Then we split the data into training and test sets
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.25, random_state=seed
    )

    return DataSets(
        training=DataSet(X=X_train, y=y_train),
        test=DataSet(X=X_test, y=y_test),
    )
    # {% else %}
    # TODO: Implement me!
    raise NotImplementedError("Not implemented!")
    # {% endif %}

Let's make sure we didn't mess up the data splitting.

In [None]:
data_sets = split_data(X_normalized, y_normalized)

print("This is what the training data looks like:")
plot_features_3d(data_sets.training.X, data_sets.training.y)

print("This is what the test data looks like:")
plot_features_3d(data_sets.test.X, data_sets.test.y)

#### Exercise 3.1.5

Now create a multi-dimensional GPR model using the `MultiDimensionalGPRCreator` class and the training data. You can use the RBF kernel for each
dimension. Make sure to also tune the hyperparameters of the kernel using the `optimize_hyperparameters` function.

In [None]:
# {% if exercise['3.1.5'].solution %}
# TODO: Review me!
sigma_noise = 0.25
model_creator = MultiDimensionalGPRCreator(data_sets.training_data(), sigma_noise)

# We have three features, so we need six hyperparameters (sigma, l for each feature)
theta_0 = np.array([1.0, 1.0, 1.0, 1.0, 1.0, 1.0])
theta = optimize_hyperparameters(
    model_creator,
    theta_0,
    bounds=[(0.01, None)] * 6,
)

model = model_creator(theta)
# {% else %}
# TODO: Implement me!
print("Oops! Looks like you forgot to implement this cell.")
# {% endif %}

#### Exercise 3.1.6

Evaluate the performance of the GPR model on the test data. Implement the `evaluate` function below, which takes the GPR model and the test data,
and returns the mean squared error (MSE) of the predictions.

In [52]:
def evaluate(model: GPR, X: Matrix, y: Vector) -> float:
    # {% if exercise['3.1.6'].solution %}
    # TODO: Review me!
    result = model.predict(X)

    return float(np.mean((result.mean - y) ** 2))
    # {% else %}
    # TODO: Implement me!
    raise NotImplementedError("Not implemented!")
    # {% endif %}

In [None]:
print(
    f"MSE on the training data: {evaluate(model, data_sets.training.X, data_sets.training.y):.4f}"
)
print(
    f"MSE on the test data: {evaluate(model, data_sets.test.X, data_sets.test.y):.4f}"
)

In [None]:
plot_features_3d(
    data_sets.test.X,
    data_sets.test.y,
    additional_traces=[
        dot_scatter_3d(
            x=data_sets.test.X[:, 0],
            y=data_sets.test.X[:, 1],
            z=model.predict(data_sets.test.X).mean,
            name="Prediction",
            color="green",
        )
    ],
)

#### Exercise 3.1.7

Finally, we can use our GPR model to predict the temperature of the motor winding at any given time. Implement the `predict` method of the
`GPRTemperaturePredictor` class below, which takes a time `t`, a boolean parameter `on` (whether the motor is on or off) and returns the predicted
temperature of the motor winding at the given time.

In addition, store the most recent time and temperature of the motor winding in the `T_last` and `t_last` attributes of the class. You will need these
to accurately predict the temperature of the next time step. In addition, keep even older time and temperature values in the `T_rollback` and `t_rollback`
attributes. These will be used to roll back and re-predict the temperature in case the controller changes its decision about the appropriate motor phase.

In [55]:
@dataclass
class GPRTemperaturePredictor:
    normalizer: Normalizer
    model: GPR
    T_0: float

    T_last: float = 0.0
    t_last: float = 0.0

    T_rollback: float = 0.0
    t_rollback: float = 0.0

    def predict(self, t: float, on: bool) -> float:
        # {% if exercise['3.1.7'].solution %}
        # TODO: Review me!
        x = np.array([[self.T_last, t - self.t_last, float(on)]])

        # We first normalize the data
        x_normalized = self.normalizer.normalize_input(x)

        # We then predict the temperature difference
        y_normalized = self.model.predict(x_normalized).mean

        # Finally, we denormalize the prediction
        y = self.normalizer.denormalize(y_normalized)[0]

        # We back up the last prediction
        self.T_rollback = self.T_last
        self.t_rollback = self.t_last

        # We store the new prediction
        self.T_last = y + self.T_last
        self.t_last = t

        return y + self.T_last
        # {% else %}
        # TODO: Implement me!
        raise NotImplementedError("Not implemented!")
        # {% endif %}

    def rollback(self) -> None:
        self.T_last = self.T_rollback
        self.t_last = self.t_rollback

    def reset(self) -> None:
        self.T_last = self.T_0
        self.t_last = 0.0

Let's see what happens if we turn the motor on for some time and then turn it off.

In [56]:
t_original, T_original = extract_data_from(temperature_data)

In [None]:
predictor = GPRTemperaturePredictor(normalizer, model, T_0=25.0)
predictor.reset()

t = np.linspace(0, 300, 50)
T = np.array([predictor.predict(t_i, on=(t_i < 150)) for t_i in t])

set_layout_for(
    Figure(
        data=[
            line_scatter(x=t, y=T, name="Predicted Temperature"),
            dot_scatter(x=t_original, y=T_original, name="Original Temperature"),
        ]
    ),
    title="Predicted temperature of the motor",
    x_title="Time",
    y_title="Temperature",
).show()

If all went well, the predicted temperature plot should be very close to the original data.

### 3.2. Creating the Controller

Now that we have a model that can predict the temperature of the motor winding, we can use it to create a controller that regulates the motor activity.

#### Exercise 3.2.1

Implement the `BangBangController` class below, which takes a `TemperaturePredictor` and temperature `thresholds`. The controller's `control` method
takes a time `t` and returns `True` if the motor should be turned on at that time, and `False` otherwise.

The controller logic can be as follows:
- If the predicted temperature is above (overheating) `thresholds.upper`, the motor must be turned off.
- If a motor was recently turned off, and the predicted temperature is below (cool down) `thresholds.lower`, the motor should be turned on again.

Since the `TemperaturePredictor` relies on previous time steps to make predictions, make sure to use the `rollback` method of the `TemperaturePredictor`
to update the temperature prediction whenever the controller changes its decision.

For example, if you predict the temperature at time $t$ for the motor being on, and then decide it's better to turn it off (also at time $t$), you should
roll back the prediction and re-predict the temperature at time $t$ for the motor being off.

In [58]:
class TemperaturePredictor(Protocol):
    def predict(self, t: float, on: bool) -> float:
        """Predicts the temperature of the motor at time `t`, considering if the motor is on or off."""
        ...

    def rollback(self) -> None:
        """Rolls back the most recent prediction."""
        ...

    def reset(self) -> None:
        """Resets the predictor to its initial state."""
        ...

In [59]:
@dataclass(frozen=True)
class ControlDecision:
    on: bool
    t_start: float
    t_end: float


@dataclass
class BangBangController:
    predictor: TemperaturePredictor
    thresholds: TemperatureThresholds
    cache: list[ControlDecision] = field(default_factory=list)

    # {% if exercise['3.2.1'].solution %}
    on: bool = False
    # {% endif %}

    def control(self, t: float) -> bool:
        # {% if exercise['3.2.1'].solution %}
        # TODO: Review me!
        temperature = self.predictor.predict(t, on=self.on)

        if temperature > self.thresholds.upper and self.on:
            self.on = False
            self.predictor.rollback()
            self.predictor.predict(t, on=self.on)
        elif temperature < self.thresholds.lower and not self.on:
            self.on = True
            self.predictor.rollback()
            self.predictor.predict(t, on=self.on)

        return self.on
        # {% else %}
        # TODO: Implement me!
        raise NotImplementedError("Not implemented!")
        # {% endif %}

    def in_cache(self, t: float) -> bool:
        return any(decision.t_start <= t <= decision.t_end for decision in self.cache)

    def from_cache(self, t: float) -> ControlDecision:
        return next(
            decision
            for decision in self.cache
            if decision.t_start <= t <= decision.t_end
        )

    def add_to_cache(self, on: bool, t_end: float) -> None:
        if len(self.cache) == 0:
            t_start = 0.0
        else:
            t_start = self.cache[-1].t_end

        self.cache.append(ControlDecision(on=on, t_start=t_start, t_end=t_end))

    def __call__(self, t: float) -> float:
        if self.in_cache(t):
            return self.from_cache(t).on

        on = self.control(t)
        self.add_to_cache(on, t)

        return 1.0 if on else 0.0

The final test! Let's see if our controller can regulate the motor activity based on the predicted temperature.

In [60]:
@dataclass(frozen=True)
class TemperaturePredictionsCache:
    predictor: TemperaturePredictor
    cache: dict[float, float] = field(default_factory=dict)

    def predict(self, t: float, on: bool) -> float:
        self.cache[t] = self.predictor.predict(t, on)
        return self.cache[t]

    def rollback(self) -> None:
        self.predictor.rollback()

    def reset(self) -> None:
        self.predictor.reset()
        self.cache.clear()

    def __call__(self, t: float) -> float:
        assert t in self.cache, f"Temperature at time {t} has not been predicted yet."
        return self.cache[t]

In [None]:
# We cache the predictions to avoid recalculating them when visualizing the controller.
predictions_cache = TemperaturePredictionsCache(predictor)
thresholds = TemperatureThresholds(lower=27, upper=28)
controller = BangBangController(predictions_cache, thresholds)

# Bang-bang controller with GPR predictor.
u: ControlSignal = controller
T_L: Load = lambda _: 0  # Doesn't matter for temperature prediction

morton = (
    Motor("Morton")
    .with_control_signal(u)
    .with_load(T_L)
    .with_dynamics(motor_dynamics(J=1.0, B=0.1, K_T=1.0, u=u, T_L=T_L))
    .with_predicted_temperature(predictions_cache)
)

# Let's not forget to check what the temperature really looks like.
morton = morton.with_temperature(TemperatureSensor.for_motor(morton))

# The simulation is even longer
morton.animate(t_f=600.0, steps=100, thresholds=thresholds)

### 3.3. Reflection

- How well did the GPR model predict the temperature of the motor winding? Did the motor overheat or cool down too much?
- What would happen if the simulation went on for a longer time? Would the predictions be more accurate?
- Do the size of the time steps affect the accuracy of the predictions? How?

Well done! You've successfully created a controller that can regulate the motor activity based on the predicted temperature of the motor winding. It's
pretty cool to see that it actually works, right? Although, it's probably not the best way to design such a controller, you at least hopefully learned a
lot about Gaussian Process Regression and *some* of its applications.