<h1>MLTS Exercise 06 - Kalman Filter</h1>

The goal of this exercise is to implement the Kalman filtering for a simple object tracking application.  Kalman filter (KF) is an optimal estimation problem where we have linear and Gaussian systems. These two properties are central to the KF, since they allow us to estimate the integrals and other computations analytically. 

The system includes prediction and observation models. In a tracking object problem, the prediction model can be defined by e.g. physical rules that give the relation between position of the object and its acceleration during time. Observations can be obtained by a type of a sensor, e.g. GPS, that can indicate the position of the object. It is easy to imagine that for a real-world application, we have to take into account noisy models. In KF, we model the noises for both models with Gaussian density (distribution). We are interested to combine these two models (sources of information), in order to optimally estimate the object position.

KF is a probabilistic method. We have to estimate several probability densities such as predictive and filtering density. The good news is that, since we assumed the system is Gaussian and linear, all these densities become a Gaussian density. One can define a Gaussian density by estimating its first two moments namely mean and variance.  Hence, the computation of, e.g. filtering density at a certain point, boils down to the computation of its mean and variance.

The KF algorithm is recursive. We compute the required densities (predictive and filtering density) once at a time for the whole trajectory of an object.

This was a brief intuition behind the KF. For finding the exact explanation or formulation, please refer to your lecture slides or to the book of “Machine learning: a probabilistic perspective” by Kevin Murphy.

## Task: Implement the prediction and filtering step of a kalman filter

Important variables are explained with comments

### Important formulas

#### 1. Prediction

* $\bar z_t = F_t z_{t-1}  [+ B_t u_t ]$   -> **B_t and u_t are left out in this example!**
* $\bar \Sigma_t = F_t \Sigma_{t-1} F_t^T + R_t$

#### 2. Filtering

* $K_t = \bar \Sigma_t H_t^T (H_t \bar \Sigma_{t}H_t^T + Q_t)^{-1}$

* $z_t = \bar z_t + K_t (y_t - H_t \bar z_t)$

* $\Sigma_t = (1 - K_t H_t) \bar \Sigma_t$

#### Some hints

* Initialize the priori prediction and priori covariance: $\bar z_0$, $\bar \Sigma_0$ with the variables initMu and initCov

* Start with the filtering step in the for loop after doing the initialization out of the for loop

* You should end up with 5 lines of code in the for loop :-)

* posterioriPred = $z_t$, posterioriErrorCov = $\Sigma_t$

In [1]:
%matplotlib widget
import numpy as np
from matplotlib import pyplot as plt
import ipywidgets as widgets

In [2]:
def kalman_filter(observation: np.array, 
                  A: np.array,
                  H: np.array,
                  Q: np.array,
                  R: np.array,
                  initMu: np.array,
                  initCov: np.array) -> tuple[np.array, np.arange]:
    """Kalman Filter Function

    Args:
        observation (np.array): observations
        A (np.array): state relation matrix
        H (np.array): state to observation relation matrix
        Q (np.array): measurement noise covariance matrix
        R (np.array): process noise covariance matrix
        initMu (np.array): initial estimate of a posteriori expectation
        initCov (np.array): initial estimate of a posteriori covariance matrix

    Returns:
        tuple[np.array, np.arange]: Posterior prediction, posterior covariance
    """
    numObservations = observation.shape[0]

    # Initialize empty matrices for: priorPred, posteriorPred, priorErrorCov, posteriorErrorCov, kalmanGain
    # Important: Use the correct shapes
    dim = A.shape[0]

    prioriPred = np.empty((numObservations + 1, dim))
    posterioriPred = np.empty((numObservations, dim))
    prioriErrorCov = np.empty((numObservations + 1, dim, dim))
    posterioriErrorCov = np.empty((numObservations, dim, dim))
    kalmanGain = np.empty((numObservations, dim, observation.shape[1]))

    # Initialize prior prediction and prior covariance
    prioriPred[0] = initMu
    prioriErrorCov[0] = initCov
    
    for k in range(0, numObservations):
        # compute predictive and filtering densities recursively for all observations
        # which involves computation of mean and covariance for predictive and filtering denities
        # Tipp: You can simplify matrix multiplications using the @ sign

        # Measurement step (correction)
        tmp = H @ prioriErrorCov[k] @ np.transpose(H) + Q
        kalmanGain[k] = prioriErrorCov[k] @ np.transpose(H) @ np.linalg.pinv(tmp)
        posterioriPred[k] = prioriPred[k] + kalmanGain[k] @ (observation[k] - H @ prioriPred[k])
        posterioriErrorCov[k] = (np.eye(dim) - kalmanGain[k] @ H) @ prioriErrorCov[k]

        # Time update (prediction)
        prioriPred[k + 1] = A @ posterioriPred[k]
        prioriErrorCov[k + 1] = A @ posterioriErrorCov[k] @ np.transpose(A) + R

    return posterioriPred, prioriErrorCov

In [25]:
@widgets.interact(
    numMeasurements=widgets.IntSlider(min=1, max=20, step=1, value=10, description='Number of measurements over the distance: '),
    distanceStartEnd=widgets.IntSlider(min=20, max=200, step=1, value=100, description='Distance tracking object travels: '),
    varObservations=widgets.Dropdown(options=[0.01, 0.001, 0.0001], value=0.001, description='Variance in x and y direction:'),
    processNoiseScaling=widgets.Dropdown(options=[0.01, 0.001, 0.0001], value=0.001, description='Scaling for process noise model in filter:'),
    observationNoiseScaling=widgets.Dropdown(options=[1., 0.1, 0.01], value=0.1, description='Scaling for observation noise model in filter:')
)
def plot(numMeasurements=10,
         distanceStartEnd=100,
         varObservations=0.001,
         processNoiseScaling=0.001,
         observationNoiseScaling=0.1):

    # initial values of prior prediction and covariance
    config = {'initMu': np.array([0, 10, 1, 0]),  # mean of first state; state at time 1
              'initCov': np.eye(4) * 0.1,  # covariance of first state; state at time 1
              'starting_point': 10
             }
    
    # create observations
    samplingPeriod = distanceStartEnd / numMeasurements
    observation = np.empty((numMeasurements, 2))

    for i in range(0, numMeasurements):
        observation[i] = [0 + (i * samplingPeriod), config['starting_point']] \
                         + np.random.multivariate_normal([0, 0], [[varObservations, 0], [0, varObservations]], (1, 1))
        
    # definining transition, observation and noise matrices A, H, Q, R
    A = np.eye(4)  # state relation matrix
    A[0][2] = samplingPeriod
    A[1][3] = samplingPeriod
    H = np.array([[1, 0, 0, 0], [0, 1, 0, 0]])  # state to observation relation matrix
    R = processNoiseScaling * np.eye(4)  # process noise covariance matrix
    Q = observationNoiseScaling * np.eye(2)  # measurement noise covariance matrix

    # do kalman filtering step
    mean, cov = kalman_filter(observation, A, H, Q, R, config['initMu'], config['initCov'])

    # plot
    plt.close('all')
    fig = plt.figure(figsize=(8, 5))
    ax = fig.gca()
    plt.xlabel("X")
    plt.ylabel("Y")
    plt.ylim([9.9, 10.1])
    plt.scatter(observation[:, 0], observation[:, 1], marker="x", c="blue", label="Observations")
    plt.plot(observation[:, 0], observation[:, 1], c="blue")
    plt.plot(mean[:, 0], mean[:, 1], c="red")
    plt.scatter(mean[:, 0], mean[:, 1], c="red", label="Kalman")
    plt.plot(np.arange(0, distanceStartEnd), np.ones((distanceStartEnd)) * config['initMu'][1], c="black", label="real")
    plt.legend()
    plt.show()

interactive(children=(IntSlider(value=10, description='Number of measurements over the distance: ', max=20, mi…