In [None]:
[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/nepslor/B5203E-TSAF/blob/main/W6/kalman_filter_exercise.ipynb)

# Kalman filter forecasting
In this exercise we will try to forecast the position of a polecart pendolum using a simplified linear model for the system's dynamics.
Called $x$ the position, $\theta$ the angle of the pendulum, the full equations of motion for the cartpole are:
$$
\begin{align}
x_{t+1} &= x_{t}+\dot{x} dt +\frac{1}{2}\ddot{x} t^2\\
\theta_{t+1} &= \theta_{t}+\dot{\theta} dt +\frac{1}{2}\ddot{\theta} t^2\\
\end{align}
$$

Unfortunately, the expressions for the cartesian and angular accelerations are non-linear function of the state:
$$
\begin{aligned}
\ddot{x} &= \frac{m L \dot{\theta}^2 \sin(\theta) + m g \sin(\theta) \cos(\theta) + u(t) - d \dot{x}}{M + m \sin^2(\theta)}\\
\ddot{\theta} &= \frac{-m L \dot{\theta}^2 \sin(\theta) \cos(\theta) - (M + m) g \sin(\theta) - \cos(\theta) u(t) + d \cos(\theta) \dot{x}}{L (M + m \sin^2(\theta))}
\end{aligned}
$$
where $m$ and  $g$ the mass of the pendulum and the gravitational acceleration, and **$u$** an **external force acting on the cartpole**.

We can approximate the dynamics of the cartpole with a linear system:

$$
\begin{bmatrix}
x(k+1) \\
\dot{x}(k+1) \\
\theta(k+1) \\
\dot{\theta}(k+1)
\end{bmatrix} =
\begin{bmatrix}
1 & dt & 0 & 0 \\
0 & 1 & 0 & 0 \\
0 & 0 & 1 & dt \\
0 & 0 & 0 & 1
\end{bmatrix}
\begin{bmatrix}
x(k) \\
\dot{x}(k) \\
\theta(k) \\
\dot{\theta}(k)
\end{bmatrix}
$$


❓ What's the interpretation of the linear system above?


# Kalman filter for foreacasting
In this exercise we'll try to write a Kalman filter from scratch and use it to forecast the motion of the cart pole in the next 10 steps.
We report here the main prediction and update formula of the Kalman filter:

**state prediction**
$$\begin{align}
\hat{x}_{k|k-1} &= A_k \hat{x}_{k-1|k-1} + B_k u_{k-1}\\
P_{k|k-1} &= A_k P_{k-1|k-1} A_k^T + Q_k
\end{align}$$


**state update**
$$\begin{align}
\tilde{y}_k &= y_k - C_k \hat{x}_{k|k-1}\\
S_k &= C_k P_{k|k-1} C_k^T + R_k\\
K_k &= P_{k|k-1} C_k^T S_k^{-1}\\
\hat{x}_{k|k} &= \hat{x}_{k|k-1} + K_k \tilde{y}_k\\
P_{k|k} &= (I - K_k C_k) P_{k|k-1}
\end{align}$$

### Forecasting with Kalman filter
At each step, the Kalman filter first predicts the next state and its covariance using the system dynamics (prediction step). Then, it updates this prediction based on the new observation, adjusting both the state estimate and its covariance (update step).
After processing the current observation, we can use the updated state estimate to forecast future states over a specified horizon by repeatedly applying the prediction step without incorporating new observations. In this way, we'll use the most recent state estimate to predict the next 10 steps of the system's evolution, propagating both the mean and the state covariance forward in time.


In [None]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from scipy.integrate import solve_ivp
from scipy.interpolate import interp1d
from functools import partial
from IPython.display import HTML

np.random.seed(197)

The following block defines the nonlinear equation of the true system and simulate them over 500 steps. We use `solve_ivp` from `scipy.integrate` to solve the ODEs, as we did in the Takens' theorem exercise.

In [None]:
# Pendulum-cart dynamics as provided by user
def pendcart_ode(t, state, ctrl, m=1., M=5., L=2., g=9.81, d=1.):
    x, x_dot, theta, theta_dot = state
    ctrl_t = ctrl(t)
    s, c = np.sin(theta), np.cos(theta)

    x_dot_dot = (m * L * theta_dot ** 2 * s + m * g * s * c + ctrl_t - d * x_dot) / (M + m * s ** 2)
    theta_dot_dot = (-m * L * theta_dot ** 2 * s * c - (M + m) * g * s - c * ctrl_t + d * c * x_dot) / (L * (M + m * s ** 2))

    return [x_dot, x_dot_dot, theta_dot, theta_dot_dot]

# Simulation helper function
def simulate_pendcart(x0, ctrl, t, **kwargs):
    ctrl_func = interp1d(t, ctrl, kind='linear', fill_value="extrapolate")
    sol = solve_ivp(partial(pendcart_ode, ctrl=ctrl_func, **kwargs), [t[0], t[-1]], x0, t_eval=t)
    return sol.y.T

# Parameters
m  = 1.0   # pole mass
M  = 5.0   # cart mass
L  = 2.0   # pole length
g  = 9.81  # gravity
d  = 1.0   # cart viscous damping

# Define simulation parameters
n_osc = 10
t_end = 20
t = np.linspace(0, t_end, 600)
dt = t[1] - t[0]
initial_state = [0, 0, np.pi / 8, 0]
control_signal = 3* np.sin(n_osc * t * 2 * np.pi /t_end) +  15 * np.random.normal(0, 2, size=t.shape)

# Simulate system dynamics
states = simulate_pendcart(initial_state, control_signal, t, m=m, M=M, L=L, g=g, d=d)


The following block just define an animation

In [None]:
#@title Animation
# Create animation
fig, ax = plt.subplots(figsize=(10, 5), layout='constrained')
ax.set_aspect('equal')
xmin= np.min(states[:300,0])-1
xmax= np.max(states[:300,0])+1
ax.set_xlim(xmin, xmax)
ax.set_ylim(-3, 1)
ax.grid()

cart_width, cart_height = 0.4, 0.2
cart_patch = plt.Rectangle((0, 0), cart_width, cart_height, fc='black')
line, = ax.plot([], [], 'o-', lw=2, markersize=8)
ax.add_patch(cart_patch)

def init():
    cart_patch.set_xy((-cart_width / 2, -cart_height / 2))
    line.set_data([], [])
    return cart_patch, line

def animate(i):
    cart_x = states[i, 0]
    theta = states[i, 2]

    cart_patch.set_xy((cart_x - cart_width / 2, -cart_height / 2))
    pend_x = cart_x + 2 * np.sin(theta)
    pend_y = -2 * np.cos(theta)

    line.set_data([cart_x, pend_x], [0, pend_y])
    return cart_patch, line

ani = animation.FuncAnimation(fig, animate, frames=100, interval=40, init_func=init, blit=True);
plt.close(fig)

# Display animation in Colab
HTML(ani.to_jshtml())

# ❓ KF implementation
Try to complete the `kalman_predict` and  `kalman_update` functions using the formula above

In [None]:

def kalman_predict(x, P, A, Q, B, u):
    x_pred = ???
    P_pred = ???
    return x_pred, P_pred


def kalman_update(x_pred, P_pred, y, C, R):
    ???
    return x_updated, P_updated

def kalman(x, P, A, Q, y, C, R):
    x_pred, P_pred = kalman_predict(x, P, A, Q)
    x_updated, P_updated = kalman_update(x_pred, P_pred, y, C, R)
    return x_updated, P_updated



# ❓Define the system
Try to define matrix $A$ and $C$ for our system.

In [None]:

# Simulate system dynamics
states = simulate_pendcart(initial_state, control_signal, t)

# Kalman Filter parameters
A = ???
B = np.atleast_2d(0)
C = np.eye(4)


We initialize the state covariance to the identity matrix. As you will see, the Kalman filter will continuously update the belief on P as new observations are processed.

In [None]:
Q = 0.01 * np.eye(4)
R = 0.05 * np.eye(4)
P = np.eye(4)


The following code runs the `kalman_forecast` function at each step. This function calls the KF you designed and runs it for 10 steps, producing an estimatino of the evolution of the system.

❓ Don't we need a forecast of the external force to predict the next states?

In [None]:
def kalman_forecast(x, P, A, B, Q, C,  steps=10):
    forecasts = []
    x_forecast, P_forecast = x.copy(), P.copy()
    state_covariances = []
    for _ in range(steps):
        x_forecast, P_forecast = kalman_predict(x_forecast, P_forecast, A, Q, B, 0)
        forecasts.append(C @ x_forecast)
        state_covariances.append(P_forecast)
    return np.array(forecasts), state_covariances

# Prepare Kalman filter forecasts
forecasts = []
prediction_steps = 20
x = np.copy(states[0])
forecast_state_cov = []
for i in range(len(t)-prediction_steps):
    y = np.copy(states[i])
    x_pred, P_pred = kalman_predict(x, P, A, Q, B, control_signal[i])
    x, P = kalman_update(x_pred, P_pred, y, C, R)
    forecast, state_covariances = kalman_forecast(x, P, A, B, Q, C, steps=prediction_steps)
    forecasts.append(forecast)
    forecast_state_cov.append(state_covariances)

In [None]:
import numpy as np
from matplotlib.patches import Ellipse
from functools import partial

def ani(forecasts, forecast_state_cov, n_frames=300):
    fig, ax = plt.subplots(figsize=(10, 5), layout='constrained')
    ax.set_aspect('equal')
    ax.set_xlim(xmin, xmax)
    ax.set_ylim(-3, 1)
    ax.grid()

    cart_width, cart_height = 0.4, 0.2
    cart_patch = plt.Rectangle((0, 0), cart_width, cart_height, fc='black')
    line, = ax.plot([], [], 'o-', lw=2, markersize=8)
    pred_line, = ax.plot([], [], 'r--', lw=1.5)
    ax.add_patch(cart_patch)
    gt_line, = ax.plot([], [], 'g-', lw=1.5)

    def init():
        cart_patch.set_xy((-cart_width / 2, -cart_height / 2))
        line.set_data([], [])
        pred_line.set_data([], [])
        gt_line.set_data([], [])
        return cart_patch, line, pred_line, gt_line


    def get_pendulum_tip_and_cov(x, theta, P):
        J = np.array([
            [1, 0, 2 * np.cos(theta), 0],
            [0, 0, 2 * np.sin(theta), 0]
        ])
        cov_tip = J @ P[:4, :4] @ J.T
        pend_x = x + 2 * np.sin(theta)
        pend_y = -2 * np.cos(theta)
        return pend_x, pend_y, cov_tip

    def animate(i, forecasts, forecast_state_cov):
        cart_x = states[i, 0]
        theta = states[i, 2]
        cart_patch.set_xy((cart_x - cart_width / 2, -cart_height / 2))
        pend_x, pend_y = cart_x + 2 * np.sin(theta), -2 * np.cos(theta)
        line.set_data([cart_x, pend_x], [0, pend_y])

        # Remove old ellipses
        [e.remove() for e in getattr(ax, "_ellipses", [])]
        ax._ellipses = []

        if i < len(forecasts):
            pred_cart_x = forecasts[i][:, 0]
            pred_theta = forecasts[i][:, 2]
            covs = forecast_state_cov[i]
            ci = 1.64

            for j in range(len(pred_cart_x)):
                px, py, cov_tip = get_pendulum_tip_and_cov(pred_cart_x[j], pred_theta[j], covs[j])
                vals, vecs = np.linalg.eigh(cov_tip)
                order = vals.argsort()[::-1]
                vals, vecs = vals[order], vecs[:, order]
                width, height = 2 * ci * np.sqrt(vals)
                angle = np.degrees(np.arctan2(*vecs[:,0][::-1]))
                ellipse = Ellipse((px, py), width, height, angle=angle, alpha=0.2, color='orange')
                ax.add_patch(ellipse)
                ax._ellipses.append(ellipse)

            pred_line.set_data(
                [get_pendulum_tip_and_cov(pred_cart_x[j], pred_theta[j], covs[j])[0] for j in range(len(pred_cart_x))],
                [get_pendulum_tip_and_cov(pred_cart_x[j], pred_theta[j], covs[j])[1] for j in range(len(pred_cart_x))]
            )
            # Ground truth for the same horizon
            gt_cart_x = states[i:i+len(pred_cart_x), 0]
            gt_theta = states[i:i+len(pred_cart_x), 2]
            gt_pend_x = gt_cart_x + 2 * np.sin(gt_theta)
            gt_pend_y = -2 * np.cos(gt_theta)
            gt_line.set_data(gt_pend_x, gt_pend_y)
        else:
            gt_line.set_data([], [])

        return cart_patch, line, pred_line
    ani = animation.FuncAnimation(fig, partial(animate, forecasts=forecasts, forecast_state_cov=forecast_state_cov), frames=n_frames, interval=40, init_func=init, blit=True)
    plt.close(fig)
    return HTML(ani.to_jshtml())
ani(forecasts, forecast_state_cov)

### Discrete small-angle cart–pole (high-rate) with input as **augmented state**

We use a purely **discrete** model with sampling time \(dt\), assuming small angles and high frequency.
State:
$$
x_k \equiv \begin{bmatrix} x_k \\ \dot{x}_k \\ \theta_k \\ \dot{\theta}_k \end{bmatrix},
\qquad \text{and we embed } u_k \text{ as a state so } u_{k+1}=u_k.
$$

Forward-Euler (high-rate) update with light couplings:
$$
\begin{aligned}
x_{k+1}        &= x_k + dt\,\dot{x}_k,\\
\dot{x}_{k+1}  &= \dot{x}_k
                 + dt\!\left(-\frac{d}{M}\,\dot{x}_k
                               + \frac{m g}{M}\,\theta_k
                               + \frac{1}{M}\,u_k\right),\\
\theta_{k+1}   &= \theta_k + dt\,\dot{\theta}_k,\\
\dot{\theta}_{k+1}
               &= \dot{\theta}_k
                 + dt\!\left(\frac{d}{L M}\,\dot{x}_k
                              - \frac{(M+m) g}{L M}\,\theta_k
                              - \frac{1}{L M}\,u_k\right),\\
u_{k+1}        &= u_k.
\end{aligned}
$$

Compactly, with augmented state $z_k \equiv \begin{bmatrix} x_k \\ u_k \end{bmatrix}$:
$$
\begin{aligned}
z_{k+1} &= A_{\text{aug}}\, z_k + w_k,\\[4pt]
A_{\text{aug}} &=
\begin{bmatrix}
A & B\\
0 & 1
\end{bmatrix},
\quad
A =
\begin{bmatrix}
1 & dt & 0 & 0\\
0 & 1-\tfrac{d}{M}dt & \tfrac{m g}{M}dt & 0\\
0 & 0 & 1 & dt\\
0 & \tfrac{d}{L M}dt & -\tfrac{(M+m) g}{L M}dt & 1
\end{bmatrix},
\quad
B =
\begin{bmatrix}
0\\[4pt]
\tfrac{dt}{M}\\[4pt]
0\\[4pt]
-\tfrac{dt}{L M}
\end{bmatrix}.
\end{aligned}
$$

**Measurements.** We observe \(x\) and \(\theta\) **or** the full state; here we keep full state from your setup:
$$
y_k = C_{\text{aug}} z_k + v_k,\qquad
C_{\text{aug}} = \begin{bmatrix} I_4 & 0 \end{bmatrix}.
$$

This lets us use your existing `kalman_predict`, `kalman_update`, and `kalman_forecast` **unchanged** by working on the
augmented system $(A_{\text{aug}}, C_{\text{aug}})$, with $Q_{\text{aug}}$ and $R$ sized accordingly.


In [None]:
# Small-angle discrete dynamics (forward Euler), matches your earlier symbols
A = ???

B = np.array([[0.0], [dt/M], [0.0], [-dt/(L*M)]], dtype=float)

# Your original measurement setup (full state)
C = np.eye(4)
Q = 0.01 * np.eye(4)
R = 0.05 * np.eye(4)
P = np.eye(4)

# Filtering + open-loop forecasting
prediction_steps = 10
forecasts = []
forecast_state_cov = []
x = states[0].copy()

for i in range(len(t) - prediction_steps):
    y = states[i]                        # measurement (from your sim)
    u_k = np.array([control_signal[i]])  # shape (1,) for B @ u

    # 1) predict WITH control
    x_pred, P_pred = kalman_predict(x, P, A, Q, B=B, u=u_k)

    # 2) update
    x, P = kalman_update(x_pred, P_pred, y, C, R)

    # 3) multi-step forecast open-loop (NO control)
    forecast, state_covariances = kalman_forecast(x, P, A, B, Q, C, steps=prediction_steps)
    forecast_state_cov.append(state_covariances)
    forecasts.append(forecast)

forecasts = np.array(forecasts)  # shape: (len(t)-prediction_steps, prediction_steps, 4)

In [None]:
ani(forecasts, forecast_state_cov)