In [1097]:
import timeit
import numpy as np
import pandas as pd
from scipy import integrate
from pykalman import KalmanFilter

In [482]:
import plotly.express as px

In [827]:
from plotly.offline import plot, iplot, init_notebook_mode
# Make plotly work with Jupyter notebook
init_notebook_mode(connected=True)

### State Extrapolation Equation
- Using the state extrapolation equation, we can predict the next system state, based on the knowledge of the current state. It extrapolates state vector from the present (time step n ) to the future (time step n+1).


- x^n+1,n=Fx^n,n+Gu^n,n+wn
- Where:
- x^n+1,n	is a predicted system state vector at time step n+1
- x^n,n	is an estimated system state vector at time step n
- u^n,n	is a control variable or input variable - a measurable (deterministic) input to the system
- wn	is a process noise or disturbance - an unmeasurable input that affects the state
- F	is a state transition matrix
- G	is a control matrix or input transition matrix (mapping control to state variables)


In [908]:
def state_extrapolation(cur_state_vec, control_vec, transition_mat, control_mat):
    
    """
    Function extraploates state_vec from time step t to time step t + 1, cur_state_vec(t,t), next_state_vec(t+1, t)
    Variable Dimensions:
    cur_state_vec: n * 1
    transition_mat: n * n 
    control_mat: n * m
    control_vec: m * 1
    noise_vec: n * 1
    """
    
#     print(transition_mat, transition_mat.shape)
#     print(cur_state_vec, cur_state_vec.shape)
#     print(control_mat, control_mat.shape)
#     print(control_vec, control_vec.shape)
    next_state_vec = np.dot(transition_mat, cur_state_vec) + np.dot(control_mat, control_vec)
    return next_state_vec

### Solve for F & G (Helper Functions)

#### Given your dynamic input with your state vec and your control vec, determine what F & G are, functionality currently manually
THE STATE SPACE REPRESENTATION
You might be wandering, why the state space representation must be in the following form:

x˙(t)=Ax(t)+Bu(t)
y(t)=Cx(t)+Du(t)

### Covariance Extrapolation Update

Pn+1,n=FPn,nFT+Q
Where:
Pn,n	is an estimate uncertainty (covariance) matrix of the current sate
Pn+1,n	is a predicted estimate uncertainty (covariance) matrix for the next state
F	is a state transition matrix that we've derived in "Modeling linear dynamic systems" section
B	is a input matrix
Q	is a process noise matrix


In [805]:
def cov_extrapolation(cur_cov_mat, transition_mat, process_noise_mat):
    next_cov_mat = np.linalg.multi_dot([transition_mat, cur_cov_mat, transition_mat.transpose()]) + process_noise_mat
    return next_cov_mat

In [300]:
def calc_process_noise_mat(noise_var, control_mat, time_dependence, continuous=False, control_mat_func=None, delta_t=0):
    
    if type(noise_var) != np.ndarray and not time_dependence:
        noise_var = np.full((control_mat.shape[0],1),noise_var)
    elif type(noise_var) != np.ndarray:
        noise_var = np.full((control_mat.shape[0], control_mat.shape[0]), noise_var)
        
    if not time_dependence:
        process_noise_mat = np.diag(noise_var)
    else:
        # discrete
        if not continuous:
            process_noise_mat = np.multiply(np.dot(control_mat, control_mat.transpose()))
        else:
            dim = noise_vec.shape[0]
            process_noise_mat = np.eye(dim)
#             print(dim)
            for x in range(dim):
                for y in range(dim):
                    process_noise_mat[x][y] = noise_integral(delta_t, x, y, control_mat_func, delta_t)
            process_noise_mat = np.multiply(noise_var, process_noise_mat)
            
    return process_noise_mat

def integrand(t, dim_x, dim_y):
    control_mat = np.array([t**2/2,t, t**3/3])
    control_mat_t = np.transpose(np.matrix(control_mat))
    return np.array(np.multiply(control_mat, control_mat_t))[dim_x][dim_y]

def noise_integral(t, dim_x, dim_y, control_mat_func, delta_t):
    return integrate.quad(control_mat_func, 0, delta_t, args=(dim_x, dim_y))[0]

In [286]:
calc_process_noise_mat(
    noise_vec=np.array([0,1,2]),
    control_mat=np.array([[0,1],[1,2]]),
    time_dependence=True,
    continuous=True,
    control_mat_func=integrand,
    delta_t=5)

array([[104.16666667,  52.08333333, 289.35185185],
       [ 52.08333333,  27.77777778, 138.88888889],
       [289.35185185, 138.88888889, 826.71957672]])

### Measurement Equation
zn=Hxn+vn
Where:
zn	is a measurement vector
xn	is a true system state (hidden state)
vn	a random noise vector
H	is an observation matrix


In [233]:
def measure_update(cur_state_vec, observation_mat, measure_noise_vec):
    """
    Dimension:
    State Vec n * 1
    Observation Matrix z * n
    Noise Vec z * 1
    """
    measure_vec = np.dot(observation_mat, cur_state_vec) + measure_noise_vec
    return measure_vec

### State Update Equation
x^n,n=x^n,n−1+Kn(zn−Hx^n,n−1)
where:
x^n,n	is a estimated system state vector at time step n
x^n,n−1	is a predicted system state vector at time step n−1
Kn	is a Kalman Gain
zn	is a measurement
H	is an observation matrix



x	state vector	nx×1
z	output vector	nz×1
H	observation matrix	nz×nx
K	Kalman gain	nx×nz

In [364]:
def state_update(prev_next_state_vec, kalman_gain_mat, measure_vec, observation_mat):
    
    """
    Dimension
    Prev next state vec n * 1
    Measure vc z * 1
    Observation Z * X
    Kalman Gain X * Z
    """
    next_state_vec = prev_next_state_vec + np.dot(kalman_gain_mat, measure_vec - np.dot(observation_mat, prev_next_state_vec))
    return next_state_vec

### Cov Update

Pn,n=(I−KnH)Pn,n−1(I−KnH)T+KnRnKTn
where:
Pn,n	is an estimate uncertainty (covariance) matrix of the current sate
Pn,n−1	is a prior estimate uncertainty (covariance) matrix of the current sate (predicted at the previous state)
Kn	is a Kalman Gain
H	is an observation matrix
Rn	is a Measurement Uncertainty (measurement noise covariance matrix)

In [376]:
def cov_update(prev_next_cov_mat, kalman_gain_mat, observation_mat, measure_noise_mat):
    """
    Kalman Gain X * Z
    Observation Z * X
    Identity X * X
    Previous Cov Mat  X * X
    Measure Noise Z * 1
    
    """
    
#     measure_noise_mat = np.cov(np.dot(np.matrix(measure_noise_vec).transpose(), np.matrix(measure_noise_vec)))
    
    identity_mat = np.eye(kalman_gain_mat.shape[0])
    
    next_cov_mat = prev_next_cov_mat
    
    first_part = np.subtract(identity_mat, np.dot(kalman_gain_mat, observation_mat))
    
    
    next_cov_mat = np.linalg.multi_dot([first_part, prev_next_cov_mat, first_part.transpose()]) + \
                   np.linalg.multi_dot([kalman_gain_mat, measure_noise_mat, kalman_gain_mat.transpose()])
    
    return next_cov_mat


def cov_update_simplified(prev_next_cov_mat, kalman_gain_mat, observation_mat, measure_noise_mat):
    
    identity_mat = np.eye(kalman_gain_mat.shape[0])
    next_cov_mat = np.dot(
                        np.subtract(identity_mat, np.dot(kalman_gain_mat, observation_mat)),
                        prev_next_cov_mat
    )
    return next_cov_mat

### Kalman Gain Equation


Kn=Pn,n−1HT(HPn,n−1HT+Rn)−1
where:
Kn	is the Kalman Gain
Pn,n−1	is a prior estimate uncertainty (covariance) matrix of the current sate (predicted at the previous state)
H	is an observation matrix
Rn	is a Measurement Uncertainty (measurement noise covariance matrix)

In [925]:
def calc_kalman_gain(prev_next_cov_mat, observation_mat, measure_noise_mat):
    """
    observation_mat Z * X
    prev_next_cov_mat  X * X
    measure_noise_mat Z * Z
    
    """
#     print('observation', observation_mat.shape, observation_mat, observation_mat.transpose().shape)
#     print('covariance', prev_next_cov_mat.shape, prev_next_cov_mat)
#     print('measure', measure_noise_mat, measure_noise_mat.shape)
    
    
    kalman_gain_mat = np.linalg.multi_dot([
        prev_next_cov_mat,
        observation_mat.transpose(),
        np.linalg.inv(
            np.linalg.multi_dot([observation_mat, prev_next_cov_mat, observation_mat.transpose()]) + measure_noise_mat
        )]
    )
    
#     print(prev_next_cov_mat)
#     print(np.linalg.multi_dot([observation_mat, prev_next_cov_mat, observation_mat.transpose()]) + measure_noise_mat)
    return kalman_gain_mat

In [289]:
class KalmanFilter:
    
    
    def __init__(self, init_state_vec, init_state_cov, transition_mat, observation_mat, control_mat):
        pass
    
    
    def predict(self, cur_state_vec, control_vec, noise_vec, transition_mat, control_mat, process_noise_mat):
        
        pred_next_state_vec = state_extrapolation(cur_state_vec, control_vec, process_noise_vec, transition_mat, control_mat)
        pred_next_cov_mat = cov_extrapolation(cur_cov_mat, transition_mat, process_noise_mat)
        
    def update(self, prev_next_cov_mat, prev_next_state_vec, observation_mat, measure_vec, measure_noise_mat):
        
        kalman_gain_mat = calc_kalman_gain(prev_next_cov_mat, observation_mat, measure_noise_mat)
        cur_state_vec = state_update(prev_next_state_vec, kalman_gain_mat, measure_vec, observation_mat)
        cur_state_cov_mat = cov_update(prev_next_cov_mat, kalman_gain_mat, observation_mat, measure_noise_mat)


###  Plan
- Understanding of Multi-Dimensional Kalman Filter, Review KalmanFilter.Net
- Process Noise Matrix Q & Measurement Noise Matrix R Determination (Time-Variant/Independent/Discrete/Continuous)
- Test on One Dimensional Data (Weight of Gold) (Done) Compare with pyKalman
- Test on Multi-Dimensional Data (One Dimension Moving), Compare with pyKalman
- Test on Multi-Dimensional Data (With Velocity & Acceleration Changing), Compare with pyKalman
- Test on Financial Data, Compare with pyKalman
- Test on Cointegrated Pair
- Merge all 5 equations into a Class Object
- Pandas compatibility
- Transition & Control Matrix Auxiliary Helpers (Derivatives Calculation)
- Vectorized Implementation (if possible)

- Assume distribution for the noise (noise is normally distributed)
- Model (Covariance Matrix is unknown, Collect Data (Fitted Covariance Matrix))
- Standard Error of Observation (0.05, that is the standard deviation of the error)

### Initialize Covariance Matrix is ERROR Covariance (High if you think your initial estimate is inaccurate)

In [None]:
### Noise Vector Variance Independent or given variance, multiply by control matrix (From Experiment)
### Measurement Uncertainty


In [None]:
"""

All noises are assumed as Gaussian models in KF and EKF. To determine the values for the measurement noise you need to collect a sample data sets from the sensors that you are using. For each data set you can calculate the variance value put it to the corresponding position of the measurement noise covariance matrix.

Usually process noise covariances are experimental values. You can use a trial and error method to find the values which reduce the error.


"""

### Weighting Gold (Comparing results with pyKalman)
- Results are consistent

In [812]:
measure_var = 100
init_state_vec = np.asarray(0)
init_state_cov = np.matrix(500)
transition_mat = np.matrix(1)
control_mat = np.matrix(0)
control_vec = np.zeros(1)
observation_mat = np.matrix(1)
measure_noise_mat = np.matrix(measure_var)
process_noise_mat = np.matrix(0.00)
true_value = 1000

In [821]:
kf = KalmanFilter(n_dim_obs=1, 
                  n_dim_state=1, 
                  initial_state_mean=init_state_vec,
                  initial_state_covariance=init_state_cov,
                  transition_matrices=transition_mat,
                  observation_matrices=observation_mat,
                  observation_covariance=measure_noise_mat,
                  transition_covariance=process_noise_mat)

In [909]:
prev_next_state_vec = state_extrapolation(init_state_vec, control_vec, transition_mat, control_mat)
prev_next_cov_mat = cov_extrapolation(init_state_cov, transition_mat, process_noise_mat)

kalman_gain_list = []
cur_state_vec_list = []
cur_state_cov_list = []
measurements = measure_var * np.random.randn(2000) + true_value
for measure in measurements:
    measure_vec = np.asarray(measure)
    kalman_gain_mat = calc_kalman_gain(prev_next_cov_mat, observation_mat, measure_noise_mat)
    kalman_gain_list.append(kalman_gain_mat.item(0))

    cur_state_vec = state_update(prev_next_state_vec, kalman_gain_mat, measure_vec, observation_mat)
    cur_state_cov_mat = cov_update(prev_next_cov_mat, kalman_gain_mat, observation_mat, measure_noise_mat)
    
    cur_state_vec_list.append(cur_state_vec.item(0))
    cur_state_cov_list.append(cur_state_cov_mat.item(0))
    
    prev_next_state_vec = state_extrapolation(cur_state_vec, control_vec, transition_mat, control_mat)
    prev_next_cov_mat = cov_extrapolation(cur_state_cov_mat, transition_mat, process_noise_mat)

    

prev_next_cov_mat = init_state_cov
prev_next_state_vec = init_state_vec

cur_state_vec_list_2 = []
cur_state_cov_list_2 = []
    
# for measure in measurements:
#     measure_vec = np.asarray(measure)
#     prev_next_state_vec, prev_next_cov_mat = kf.filter_update(prev_next_state_vec, prev_next_cov_mat,
#                     observation=measure_vec,
#                     observation_matrix=observation_mat)
    
    
# #     print(prev_next_state_vec)
# #     print(type(prev_next_state_vec.item(0)))
#     cur_state_vec_list_2.append(prev_next_state_vec.item(0))
#     cur_state_cov_list_2.append(prev_next_cov_mat.item(0))

#     print(observation_mat.shape, observation_mat, observation_mat.transpose())
#     print(prev_next_cov_mat.shape, prev_next_cov_mat)
#     print(measure_noise_mat, measure_noise_mat.shape)

# def calc_kalman_gain(prev_next_cov_mat, observation_mat, measure_noise_mat):
#     """
#     observation_mat Z * X
#     prev_next_cov_mat  X * X
#     measure_noise_mat Z * Z
    
#     """
#     print('observation', observation_mat.shape, observation_mat, observation_mat.transpose())
#     print('covariance', prev_next_cov_mat.shape, prev_next_cov_mat)
#     print('measure', measure_noise_mat, measure_noise_mat.shape)
    
    
#     kalman_gain_mat = np.linalg.multi_dot([
#         prev_next_cov_mat,
#         observation_mat.transpose(),
#         np.linalg.inv(
#             np.linalg.multi_dot([observation_mat, prev_next_cov_mat, observation_mat.transpose()]) + measure_noise_mat
#         )]
#     )
    
# #     print(prev_next_cov_mat)
# #     print(np.linalg.multi_dot([observation_mat, prev_next_cov_mat, observation_mat.transpose()]) + measure_noise_mat)
#     return kalman_gain_mat
    
    
res_df = pd.DataFrame({'KALMAN_GAIN': kalman_gain_list, 'CUR_STATE': cur_state_vec_list, 'CUR_STATE_STANDARD': cur_state_vec_list_2, 'CUR_COV_STANDARD': cur_state_cov_list_2, 'CUR_COV': cur_state_cov_list, 'TRUE_VALUE': true_value, 'MEASUREMENT': measurements})

observation (2, 1) [[0]
 [1]] [[0 1]]
covariance (2, 2) [[501.   0.]
 [  0.   0.]]
measure [[1]] (1, 1)


ValueError: shapes (2,2) and (1,2) not aligned: 2 (dim 1) != 1 (dim 0)

In [825]:
px.line(pd.melt(
    res_df, value_vars=['CUR_STATE', 'CUR_STATE_STANDARD']), y='value', color='variable',
       title='PREDICTION ITERATION')

In [826]:
px.line(pd.melt(
    res_df, value_vars=['CUR_COV', 'CUR_COV_STANDARD']), y='value', color='variable',
       title='KALMAN GAIN ITERATION')

### Constant Velocity Aircraft in One Dimension (Comparing results with pyKalman)
- State is Position
- Control is Velocity


- PyKalman merges control and transition into one matrices, so had to maneuver
- Vec, Cov shape in pyKalman is different
- Results are identical
- pyKalman disadvantage (Cannot set error in the control matrix straightforwardly)


- Assumption is good initial estiate(low covariance, close to true value initial estimate)
- Large Measurement Error

In [1146]:
measure_var = 10
init_state_vec = np.array([7,10])#.reshape(-1,1)
init_state_cov = np.matrix([[250,0],[0,0]]) #Assume I am highly confident of velocity, but not position
transition_mat = np.matrix([[1,1],[0,1]])
control_mat = np.matrix([1,0])#.reshape(-1,1)
control_vec = np.array([10]) # Velocity is constant at 10
observation_mat = np.matrix([1,0])#.reshape(-1,1) # Can only observe position
measure_noise_mat = np.matrix(measure_var)
process_noise_mat = np.matrix([[1,0],[0,0]])

In [1147]:
kf = KalmanFilter(n_dim_obs=1, 
                  n_dim_state=2, 
                  initial_state_mean=init_state_vec,
                  initial_state_covariance=init_state_cov,
                  transition_matrices=transition_mat,
                  observation_matrices=observation_mat,
                  observation_covariance=measure_noise_mat,
                  transition_covariance=process_noise_mat)

In [1148]:
 # Can only observe position
init_state_vec = init_state_vec.reshape(-1,1)
control_mat = control_mat.reshape(-1,1)
transition_mat = np.matrix(np.eye(2))

In [1158]:
num_moves = 100

In [1159]:
start = timeit.default_timer()
prev_next_state_vec = state_extrapolation(init_state_vec, control_vec, transition_mat, control_mat)
prev_next_cov_mat = cov_extrapolation(init_state_cov, transition_mat, process_noise_mat)

kalman_gain_list = []
cur_state_vec_list = []
cur_state_cov_list = []
measurements = np.arange(0, num_moves, 1) * 10 + 50 + measure_var * np.random.randn(num_moves)
true_value = np.arange(0, num_moves, 1) * 10 + 50

for measure in measurements:
    measure_vec = np.asarray(measure)
    kalman_gain_mat = calc_kalman_gain(prev_next_cov_mat, observation_mat, measure_noise_mat)
    kalman_gain_list.append(kalman_gain_mat.item(0))

    cur_state_vec = state_update(prev_next_state_vec, kalman_gain_mat, measure_vec, observation_mat)
    cur_state_cov_mat = cov_update(prev_next_cov_mat, kalman_gain_mat, observation_mat, measure_noise_mat)
    
    cur_state_vec_list.append(cur_state_vec.item(0))
    cur_state_cov_list.append(cur_state_cov_mat.item(0))
    
    prev_next_state_vec = state_extrapolation(cur_state_vec, control_vec, transition_mat, control_mat)
    prev_next_cov_mat = cov_extrapolation(cur_state_cov_mat, transition_mat, process_noise_mat)

    
end = timeit.default_timer()
print(end - start)
start = timeit.default_timer()

prev_next_cov_mat = init_state_cov
prev_next_state_vec = init_state_vec

cur_state_vec_list_2 = []
cur_state_cov_list_2 = []
    
for measure in measurements:
    measure_vec = np.asarray(measure)
    prev_next_state_vec, prev_next_cov_mat = kf.filter_update(prev_next_state_vec, prev_next_cov_mat,
                    observation=measure_vec,
                    observation_matrix=observation_mat)
    cur_state_vec_list_2.append(prev_next_state_vec.item(0))
    cur_state_cov_list_2.append(prev_next_cov_mat.item(0))
    
end = timeit.default_timer()   
print(end - start)

    
res_df = pd.DataFrame({'KALMAN_GAIN': kalman_gain_list, 
                       'CUR_STATE': cur_state_vec_list, 
                       'CUR_STATE_STANDARD': cur_state_vec_list_2, 
                       'CUR_COV_STANDARD': cur_state_cov_list_2, 
                       'CUR_COV': cur_state_cov_list, 
                       'TRUE_VALUE': true_value, 
                       'MEASUREMENT': measurements})

0.01087564101908356
0.06614747701678425


In [1160]:
res_df['MEASUREMENT_ERROR'] = res_df['MEASUREMENT'] - res_df['TRUE_VALUE']
res_df['KALMAN_ERROR'] = res_df['CUR_STATE'] - res_df['TRUE_VALUE']

In [1161]:
px.line(pd.melt(
    res_df, value_vars=['CUR_STATE', 'CUR_STATE_STANDARD', 'TRUE_VALUE', 'MEASUREMENT']), y='value', color='variable',
       title='PREDICTION ITERATION')

In [1162]:
px.line(pd.melt(
    res_df, value_vars=['CUR_COV', 'CUR_COV_STANDARD']), y='value', color='variable',
       title='COV ITERATION')

In [1163]:
px.line(pd.melt(
    res_df, value_vars=['KALMAN_GAIN']), y='value', color='variable',
       title='KALMAN GAIN ITERATION')

In [1164]:
px.line(pd.melt(
    res_df, value_vars=['MEASUREMENT_ERROR', 'KALMAN_ERROR']), y='value', color='variable',
       title='PREDICTION ITERATION')

In [1165]:
res_df['MEASUREMENT_ERROR'].mean(), res_df['MEASUREMENT_ERROR'].std()

(0.09567149963863635, 9.658235674913794)

In [1166]:
res_df['KALMAN_ERROR'].mean(), res_df['KALMAN_ERROR'].std()

(0.14428446719794508, 3.507830554480268)

### Constant Acceleration Aircraft in One Dimension (Comparing results with pyKalman)

### Changing  Aircraft in One Dimension (Comparing results with pyKalman)