# Kalman Filter Walkthrough

## Model

**Transition Model**

$$
\mathbf{z}_{t} = \mathbf{A}_t \mathbf{z}_{t-1} + \mathbf{Q}
$$

we can also rewrite this as a distribution:

$$
p(\mathbf{z}_{t}|\mathbf{z}_{t-1}) = \mathcal{N}(\mathbf{z}_t|\mathbf{A}_t\mathbf{z}_{t-1}, \mathbf{Q}_t)
$$

---
**Observation Model**

$$
\mathbf{x}_t = \mathbf{C}_t\mathbf{z}_t + \mathbf{R}_t
$$

where can also rewrite this as a distribution:

$$
p(\mathbf{x}_t|\mathbf{z}_t) = \mathcal{N}(\mathbf{x}_t|\mathbf{C}_t\mathbf{z}_t, \mathbf{R}_t)
$$


In [5]:
import numpy as np
from typing import NamedTuple
from dataclasses import dataclass

### Distribution

Kalman Filter Portions:

* $A_t \in \mathbb{R}^{N_\mathbf{z} \times N_\mathbf{z}}$
* $R_t$ = `N_z x N_z` 
* $C_t$ = `N_x x N_z`
* $Q_t$ = `N_x x N_x`

In [26]:
@dataclass
class KalmanDist:
    transition_matrix : np.ndarray
    transition_noise : np.ndarray
    measurement_matrix : np.ndarray
    measurement_noise : np.ndarray
    
    def predictive_mean(self, x: np.ndarray) -> np.ndarray:
        return predictive_mean(x, self.measurement_matrix)
    
    def predictive_cov(self, cov: np.ndarray) -> np.ndarray:
        return predictive_cov(cov, self.transition_matrix, self.transition_noise)

### State

In [11]:
class State(NamedTuple):
    mean : np.ndarray
    cov : np.ndarray
    

### Transition Model

### Observation Model

### Predict Step

$$
\begin{aligned}
\boldsymbol{\mu}_t &= \mathbf{A}_{t-1}\boldsymbol{\mu}_{t-1} \\
\boldsymbol{\Sigma}_t &= \mathbf{A}_{t-1}\boldsymbol{\Sigma}_{t-1}\mathbf{A}_t^\top + \mathbf{Q}_t
\end{aligned}
$$

where:
* `KalmanDist` $= \mathcal{N}(\boldsymbol{\mu}_t, \boldsymbol{\Sigma}_t)$.

In [36]:
def predict_step(state: State, dist: KalmanDist) -> State:
    """Prediction step in Kalman filter eqns"""
    # predictive mean
    mean = dist.predictive_mean(state.mean)
    
    # predictive covariance
    cov = dist.predictive_cov(state.cov)
    
    return mean, cov

def predictive_mean(mean: np.ndarray, transition: np.ndarray) -> np.ndarray:
    """predictive mean in update step"""
    return transition @ mean

def predictive_cov(cov: np.ndarray, transition: np.ndarray, noise: np.ndarray) -> np.ndarray:
    """predictive covariance in update step"""
    return transition @ cov @ transition.T + noise

### Update Step

This is the measurement step which we can compute using Bayes rule like so.

$$
p(\mathbf{z}_t|\mathbf{x}_t,\mathbf{x}_{1:t-1}) \propto p(\mathbf{x}_t|\mathbf{z}_t)p(\mathbf{z}_t|\mathbf{z}_{1:t-1})
$$

This quantity is given by:

$$
\begin{aligned}
p(\mathbf{z}_t|\mathbf{x}_{1:t}) &= \mathcal{N}(\mathbf{z}_t|\boldsymbol{\mu}_t, \boldsymbol{\Sigma}_t)
\end{aligned}
$$

This is the dist. for the update step and it is given by these equations:

$$
\begin{aligned}
\boldsymbol{\mu}_t &= \boldsymbol{\mu}_{t|t-1} + \mathbf{K}_t\mathbf{r}_t \\
\boldsymbol{\Sigma}_t &= \left( \mathbf{I} - \mathbf{K}_t\mathbf{C}_t \right)\boldsymbol{\Sigma}_{t|t-1}
\end{aligned}
$$

where:
* $K_t$ - Kalman Gain Matrix
* $r_t$ - innovation/residual

---
**Residual**

This quantity is the difference between our predicted observations and the actual observations. This is given by:

$$
\begin{aligned}
\mathbf{r}_t &= \mathbf{x}_t - \hat{\mathbf x}_t \\
\hat{\mathbf x}_t &= \mathbf{C}_t {\boldsymbol \mu}_{t|t-1}
\end{aligned}
$$

where:

* $C_t$ - measurement matrix
* ${\boldsymbol \mu}_{t|t-1}$ - mean from the predict step

In [35]:

def residual(obs: np.ndarray, state: State, dist: KalmanDist) -> np.ndarray:
    
    # unroll variables
    µ = state.mean
    A = dist.transition_matrix
    C = dist.measurement_matrix
    
    # predictive mean
    µ_pred = dist.predictive_mean(state.mean)
    
    # difference
    obs_pred = C @ µ_pred
    
    # residual
    res = obs - obs_pred
    
    return res

---
**Kalman Gain Matrix**

$$
\begin{aligned}
\mathbf{K}_t &= \boldsymbol{\Sigma}_{t|t-1}C_t^\top \mathbf{S}_t^{-1} \\
&= \mathbf{C}_t \boldsymbol{\Sigma}_{t|t-1}\mathbf{C}_t^\top + \mathbf{R}_t
\end{aligned}
$$

where can also use the matrix inversion lemma and rewrite the Kalman gain matrix.

$$
\begin{aligned}
\mathbf{K}_t &= \boldsymbol{\Sigma}_{t|t-1} \mathbf{C}^\top \left( \mathbf{C}\boldsymbol{\Sigma}_{t|t-1} \mathbf{C}^\top + \mathbf{R} \right)^{-1} \\
&= \left( \boldsymbol{\Sigma}_{t|t-1}^{-1} + \mathbf{C}^\top \mathbf{RC}\right)^{-1}\mathbf{C}^\top \mathbf{R}^{-1}
\end{aligned}
$$

In [33]:
def kalman_gain_matrix(state: State, dist: KalmanDist)-> np.ndarray:
    
    # unroll variables
    A = dist.transition_matrix
    Q = dist.transition_noise
    C = dist.measurement_matrix
    R = dist.measurement_noise
    𝚺 = state.cov
    
    # predictive covariance
    𝚺 = dist.predictive_cov(𝚺)
    
    # kalman gain
    K = 𝚺 @ C.T @ np.linalg.inv(C @ 𝚺 @ C.T + R)
    
    return K

---
**Update Step**

$$
\begin{aligned}
p(\mathbf{z}_t|\mathbf{x}_{1:t}) &= \mathcal{N}(\mathbf{z}_t|\boldsymbol{\mu}_t, \boldsymbol{\Sigma}_t) \\
\boldsymbol{\mu}_t &= \boldsymbol{\mu}_{t|t-1} + \mathbf{K}_t\mathbf{r}_t \\
\boldsymbol{\Sigma}_t &= \left( \mathbf{I} - \mathbf{K}_t\mathbf{C}_t \right)\boldsymbol{\Sigma}_{t|t-1}
\end{aligned}
$$

where:
* $K_t$ - *Kalman Gain Matrix*
* $r_t$ - *innovation/residual*

In [37]:
def update_step(
    obs: np.ndarray,
    state: State, 
    dist: KalmanDist,
) -> State:
    
    # unroll variables
    C = dist.measurement_matrix
    I = np.eye(C.shape[0])
    µ = state.mean
    𝚺 = state.cov
    
    # predict step
    µ = dist.predictive_mean(µ)
    𝚺 = dist.predictive_cov(𝚺)
    
    # kalman gain
    K = kalman_gain_matrix(state, dist)
    
    # innovation/residual
    r = residual(obs, state, dist)
    
    # update step
    µ = µ + K @ r
    𝚺 = (I - K @ C) @ 𝚺
    
    return µ, 𝚺

## Posterior Predictive

This is a one-step-ahead predictive density for the observations. It predicts the next time step using all of the previous observations. It is given by these equations:

$$
p(\mathbf{x}_t|\mathbf{x}_{1:t-1}) = \int \mathcal{N}(\mathbf{x}_t|\mathbf{Cz}_t, \mathbf{R})\mathcal{N}(\mathbf{z}_t|\boldsymbol{\mu}_{t|t-1},\boldsymbol{\Sigma}_{t|t-1})d\mathbf{z}_t
$$

In [None]:
def forecast():
    return None

## Smoothing Algorithm

This is a message passing algorithm that propagates from right to left after everything has been observed.

$$
\begin{aligned}
p(\mathbf{z}_t|\mathbf{x}_{1:T}) &= \mathcal{N}(\mathbf{z}_t|\boldsymbol{\mu}_{t:T},\boldsymbol{\Sigma}_{t:T})
\end{aligned}
$$

This is given by these equations:

$$
\begin{aligned}
\boldsymbol{\mu}_{t|T} = \boldsymbol{\mu}_{t|t} + \mathbf{J}_t(\boldsymbol{\mu}_{t+1|T}
\end{aligned}
$$


In [None]:
def rts_smoother(state, dist):
    return None