In [None]:
%pip install filterpy

In [None]:
import os
import matplotlib.pyplot as plt
import numpy as np
import sys

sys.path.append(os.path.abspath("../.."))

import filterpy.common
import filterpy.kalman
import simprob.kalman

## Multivariate Kalman Filters

Comparing to examples from:

https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python/blob/master/06-Multivariate-Kalman-Filters.ipynb

### Kalman Filter predictions without observations

(cells 14-17)

In [None]:
start_x = np.array([10.0, 4.5])
start_P = np.diag([500, 49])

print("filterpy:")
x = start_x
P = start_P
F = np.array([[1, 0.1], [0, 1]])
for _ in range(5):
    x, P = filterpy.kalman.predict(x=x, P=P, F=F, Q=0)
    print("x =", x)
print(P)

print()
print("simprob:")
for r in simprob.simulate(
    simprob.kalman.MultivariateNormal(start_x, start_P),
    [
        simprob.Iteration(
            simprob.kalman.KalmanTransition(
                F, simprob.kalman.MultivariateNormal.delta(np.zeros(2))
            )
        )
    ]
    * 5,
):
    print("x =", r.mean)
print(r.covar)

### Complete Kalman Filter with observations

(cells 4 & 27-31)

In [None]:
def compute_dog_data(z_var, process_var, count=1, dt=1.0):
    "returns track, measurements 1D ndarrays"
    x, vel = 0.0, 1.0
    z_std = np.sqrt(z_var)
    p_std = np.sqrt(process_var)
    xs, zs = [], []
    for _ in range(count):
        v = vel + (np.random.randn() * p_std)
        x += v * dt
        xs.append(x)
        zs.append(x + np.random.randn() * z_std)
    return np.array(xs), np.array(zs)


def pos_vel_filter(x, P, R, Q=0.0, dt=1.0):
    """Returns a KalmanFilter which implements a
    constant velocity model for a state [x dx].T
    """

    kf = filterpy.kalman.KalmanFilter(dim_x=2, dim_z=1)
    kf.x = np.array([x[0], x[1]])  # location and velocity
    kf.F = np.array([[1.0, dt], [0.0, 1.0]])  # state transition matrix
    kf.H = np.array([[1.0, 0]])  # Measurement function
    kf.R *= R  # measurement uncertainty
    if np.isscalar(P):
        kf.P *= P  # covariance matrix
    else:
        kf.P[:] = P  # [:] makes deep copy
    if np.isscalar(Q):
        kf.Q = filterpy.common.Q_discrete_white_noise(dim=2, dt=dt, var=Q)
    else:
        kf.Q[:] = Q
    return kf


plt.title("filterpy and simprob produce same results")


dt = 0.1
count = 50
R = 10
Q = 0.01
track, zs = compute_dog_data(R, Q, count)
plt.plot(track, label="True state")
plt.scatter(np.arange(len(zs)), zs, label="Observations")

x0 = (0.0, 0.0)
P = np.diag([500.0, 49.0])
kf = pos_vel_filter(x0, R=R, P=P, Q=Q, dt=dt)
init_Q = kf.Q

xs, cov = [], []
for z in zs:
    kf.predict()
    kf.update(z)
    xs.append(kf.x)
    cov.append(kf.P)

xs, cov = np.array(xs), np.array(cov)
plt.plot(xs[:, 0], label="Kalman (filterpy)")

res = list(
    simprob.simulate(
        simprob.kalman.MultivariateNormal(np.array(x0), P),
        [
            simprob.Iteration(
                simprob.kalman.KalmanTransition(
                    kf.F, simprob.kalman.MultivariateNormal(np.zeros(2), init_Q)
                ),
                simprob.kalman.MultivariateNormal(o[None], kf.R),
            )
            for o in zs
        ],
    )
)[1:]
plt.plot([r.mean[0] for r in res], "--", label="Kalman (simprob)")

plt.legend()
(abs((xs - [r.mean for r in res])) < 1e-10).all()