<a href="https://colab.research.google.com/github/sungjuGit/STPython_IntroBayes/blob/main/Kalman_example_python.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np
from copy import deepcopy

## Define class Kalman and related functions

In [2]:
class Kalman:
    """
    Defines the Kalman Filter object.
    """
    def __init__(self, A, Q, H, R, m, P):
        """
        Initialize a Kalman filter.

        Parameters can be either matrices/vectors or scalars.
        A: State transition matrix
        Q: Process covariance matrix
        H: Measurement mapping
        R: Measurement covariance matrix
        m: Current estimate mean
        P: State covariance matrix. Current estimate uncertainty
        """
        self.A = A  # State transition matrix
        self.Q = Q  # Process covariance matrix
        self.H = H  # Measurement mapping
        self.R = R  # Measurement covariance matrix
        self.m = m  # Current estimate mean
        self.P = P  # State covariance matrix. Current estimate uncertainty

def K(k_or_P, H=None, R=None):
    """
    Computes the Kalman Gain.

    Can be called with a Kalman object or with P, H, R matrices directly.
    """
    if isinstance(k_or_P, Kalman):
        # Called with a Kalman object
        k = k_or_P
        return K(k.P, k.H, k.R)
    else:
        # Called with P, H, R matrices
        P = k_or_P
        # Note: In numpy, @ is the matrix multiplication operator
        return P @ np.transpose(H) @ np.linalg.inv(H @ P @ np.transpose(H) + R)

def predict(k):
    """
    Predict next state based on the dynamic process model.
    """
    k.m = k.A @ k.m
    k.P = k.A @ k.P @ np.transpose(k.A) + k.Q
    return {"state": k.m, "cov": k.P}

def update(k, y):
    """
    Compute the filtered distribution.
    """
    kalman_gain = K(k.P, k.H, k.R)
    k.m = k.m + kalman_gain @ (y - k.H @ k.m)

    # The identity matrix needs to be sized correctly
    I = np.eye(kalman_gain.shape[0])

    innovation = I - kalman_gain @ k.H
    k.P = innovation @ k.P @ np.transpose(innovation) + kalman_gain @ k.R @ np.transpose(kalman_gain)

    return {"state": k.m, "cov": k.P, "gain": kalman_gain}

def next(k, y):
    """
    Compute a complete Kalman step.
    """
    new_instance = deepcopy(k)
    p = predict(new_instance)
    up = update(new_instance, y)

    return {
        "filter": new_instance,
        "updated": up["state"],
        "predicted": p["state"],
        "cov": up["cov"],
        "gain": up["gain"]
    }

 ## Example for 2nd order linear Kalman filter

In [3]:
## x = [x1 x2 x1dot x2dot x1dotdot x2dotdot]
# measurements are done for x1, x2, x1dotdot, and x2dotdot

# Initial state vector
x_0 = np.zeros((6, 1))

# Initial covariance matrix
Σ_0 = np.eye(6) * 1000

# Time step
Δ_t = 0.1

# State transition matrix
A = np.array([
    [1, 0, Δ_t, 0, 0.5*Δ_t**2, 0],
    [0, 1, 0, Δ_t, 0, 0.5*Δ_t**2],
    [0, 0, 1, 0, Δ_t, 0],
    [0, 0, 0, 1, 0, Δ_t],
    [0, 0, 0, 0, 1, 0],
    [0, 0, 0, 0, 0, 1]
])

# Measurement mapping matrix
H = np.array([
    [1, 0, 0, 0, 0, 0],
    [0, 1, 0, 0, 0, 0],
    [0, 0, 0, 0, 1, 0],
    [0, 0, 0, 0, 0, 1]
])

# Measurement covariance matrix
R = np.array([
    [2, 0, 0, 0],
    [0, 10, 0, 0],
    [0, 0, 0.4, 0],
    [0, 0, 0, 0.4]
])

# Process covariance matrix
Q = np.eye(6) * 0.0001

In [4]:
# Initialize the Kalman filter
kalman_filter = Kalman(A, Q, H, R, x_0, Σ_0)

In [5]:
# Number of measurement data points
n = 2

# Initialize arrays to store results
predicted = np.zeros((6, n))
updated = np.zeros((6, n))
covariances = np.zeros((6, 6, n))
gains = np.zeros((6, 4, n))

In [6]:
# Main loop
for i in range(n):  # Python uses 0-based indexing, unlike Julia's 1-based indexing
    # Current measurement - replace with actual data in each time step
    y_cur = np.array([[1], [1], [1], [1]])

    # Perform a complete Kalman step
    filter_next = next(kalman_filter, y_cur)

    # Store results
    predicted[:, i:i+1] = filter_next["predicted"]
    updated[:, i:i+1] = filter_next["updated"]
    covariances[:, :, i] = filter_next["cov"]
    gains[:, :, i] = filter_next["gain"]

    # Use recursion - update filter for next iteration
    kalman_filter = filter_next["filter"]

In [7]:
predicted

array([[0.        , 1.02285966],
       [0.        , 1.01499403],
       [0.        , 0.29824076],
       [0.        , 0.29746962],
       [0.        , 0.99960213],
       [0.        , 0.99960211]])

In [8]:
updated

array([[0.99803359, 1.00328088],
       [0.99024508, 1.00499898],
       [0.19828055, 0.13553239],
       [0.19750941, 0.24751919],
       [0.99960213, 0.99979614],
       [0.99960211, 0.99979954]])