## Laboratory work 8
Tracking and forecasting in conditions of measurement gaps

Team: Abramov Semen, Belikov Ilia, Nikolay Zherdev

In [61]:
import pandas as pd
import numpy as np
from math import pi
from sklearn.metrics import mean_squared_error
from tqdm import tqdm_notebook as tqdm

In [62]:
from plotly import __version__
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
import plotly.graph_objs as go

init_notebook_mode(connected=True)

In [63]:
def generate_randaccel_trajectory(X0=5, V0=0, var=10, size=300, T=0.1, bias=0.2):
    V = np.zeros(size)
    V[0] = V0
    
    a = np.random.normal(loc=bias, scale=np.sqrt(var), size=size)
    
    V[1:] = a[:-1]*T 
    V = np.cumsum(V)
    
    X = np.zeros(size)
    X[0] = X0
    X[1:] = V[:-1]*T + a[:-1]*(T**2)/2
    return np.cumsum(X)

In [64]:
def measure(true_traj, var=0.1, gap_prob=0):
    gaps = np.random.uniform(size=true_traj.shape)
    msr = np.add(true_traj, np.random.normal(scale=np.sqrt(var), size=true_traj.shape))
    msr[gaps < gap_prob] = None
    return msr

In [65]:
def kalman(msr, S0=[2.0, 0], T=1, P0=[[10000.0, 0], [0, 10000.0]], p_sigma=0.2**2, m_sigma=20**2, K=None, q=0):
    size = (len(msr), 2) # size of array
    
    G = np.array([(T**2)/2,T]).T
    Q = G.dot(G.T)*p_sigma
    R = m_sigma
    P = np.zeros((size[0], 2, 2))
    P_ = np.zeros((size[0], 2, 2))
    F = np.array([[1, T], [0, 1]])
    H = np.array([1, 0])
    
    #if K not none don't update K
    update_k = False
    if K is None:
        K = np.zeros(size)
        update_k = True
    else:
        K = np.full(fill_value=K, shape=size)
    
    # allocate space for arrays
    X=np.zeros(size)
    X_=np.zeros(size)
    
    # intial guesses
    X_[0] = S0
    P_[0] = P0
    
    for k in range(1, size[0]):
        # prediction
        X[k] = F.dot(X_[k-1]) + G*q
        P[k] = F.dot(P_[k-1].dot(F.T)) + Q
        # filtration
        if update_k:
            K[k] = P[k].dot(H.T)/(H.dot(P[k].dot(H.T)) + R)
        if not np.isnan(msr[k]):
            X_[k] = X[k] + K[k]*(msr[k] - H.dot(X[k]))
        else:
            X_[k] = X[k]
        P_[k] = (np.eye(2) - np.outer(K[k], H)).dot(P[k])
        
    # X predictions, X filtrations, K, P predictions, P filtrations, extrapolation to 7 steps ahead
    return X[:,0], X_[:,0], K, np.sqrt(P[:,0,0]), np.sqrt(P_[:,0,0]), np.linalg.matrix_power(F,7).dot(X_.T)[0]

In [66]:
tr = generate_randaccel_trajectory(X0=5, V0=1, T=1, size=200, var=0.2**2)

In [67]:
msr = measure(true_traj=tr, var=20**2, gap_prob=0.2)

In [78]:
data = [
    go.Scatter(
        y=tr,
        name='true trajectory'
    ),
    go.Scatter(
        y=msr,
        name='data'
    ),
    go.Scatter(
        y=kalman(msr)[0],
        name='kalman'
    ),
]

layout= go.Layout(
    title= 'Kalman filtration of gapped data',
    xaxis= dict(
        title= 'Step',
    ),
    yaxis=dict(
        title= 'X',
    ),
    showlegend= True
)
fig= go.Figure(data=data, layout=layout)
iplot(fig)

In [91]:
M = 500
points = 800

In [92]:
def experiment(M=500, points=200, P0=[[10000.0, 0], [0, 10000.0]], S0=[2, 0], acc_var=0.2**2, p_sigma=0.2**2, 
               _K=None, bias=0, q=0, gap_prob=0):
    rms_X7_error = np.zeros(points)
    rms_f_error = np.zeros(points)
    true_error = np.zeros(points)

    for i in tqdm(range(M)):
        tr = generate_randaccel_trajectory(X0=5, V0=1, T=1, size=points, var=acc_var, bias=bias)
        msr = measure(true_traj=tr, var=20**2, gap_prob=gap_prob)
        X_predicted, X_filtered, K, P_predicted, P_filtered, X7 = kalman(msr, P0=P0, S0=S0, p_sigma=p_sigma, K=_K, q=q)
        rms_X7_error = rms_X7_error + ((X7 - tr)**2)/M
        rms_f_error = rms_f_error + ((X_filtered - tr)**2)/M
        true_error = true_error + ((measure(true_traj=tr, var=20**2) - tr)**2)/M
        
    return np.sqrt(rms_f_error), np.sqrt(rms_X7_error), np.sqrt(true_error)

In [93]:
rms_f_error, rms_X7_error, true_error = experiment(gap_prob=0.2)

HBox(children=(IntProgress(value=0, max=500), HTML(value='')))




In [94]:
data = [
    go.Scatter(
        y=rms_f_error,
        name='rms error of filtering'
    ),
    go.Scatter(
        y=rms_X7_error,
        name='rms error of prediction'
    ),
    go.Scatter(
        y=true_error,
        name='measurement error'
    ),
]

layout= go.Layout(
    title= 'Mean error of estimation by step',
    xaxis= dict(
        title= 'Step',
    ),
    yaxis=dict(
        title= 'X',
    ),
    showlegend= True
)
fig= go.Figure(data=data, layout=layout)
iplot(fig)

In [95]:
rms_f_error, rms_X7_error, true_error = experiment(gap_prob=0.5)

HBox(children=(IntProgress(value=0, max=500), HTML(value='')))




In [96]:
data = [
    go.Scatter(
        y=rms_f_error,
        name='rms error of filtering'
    ),
    go.Scatter(
        y=rms_X7_error,
        name='rms error of prediction'
    ),
    go.Scatter(
        y=true_error,
        name='measurement error'
    ),
]

layout= go.Layout(
    title= 'Mean error of estimation by step',
    xaxis= dict(
        title= 'Step',
    ),
    yaxis=dict(
        title= 'X',
    ),
    showlegend= True
)
fig= go.Figure(data=data, layout=layout)
iplot(fig)

In [107]:
rms_f_error, rms_X7_error, true_error = experiment(gap_prob=0.7)

HBox(children=(IntProgress(value=0, max=500), HTML(value='')))




In [108]:
data = [
    go.Scatter(
        y=rms_f_error,
        name='rms error of filtering'
    ),
    go.Scatter(
        y=rms_X7_error,
        name='rms error of prediction'
    ),
    go.Scatter(
        y=true_error,
        name='measurement error'
    ),
]

layout= go.Layout(
    title= 'Mean error of estimation by step',
    xaxis= dict(
        title= 'Step',
    ),
    yaxis=dict(
        title= 'X',
    ),
    showlegend= True
)
fig= go.Figure(data=data, layout=layout)
iplot(fig)

### Conclusion: 
Kalman filter shows great performance on gapped data. We can acheive error equal to true measurement error even with 70% of missing data. Thus, one can use this filter for data recovery particulary in case of motion with random acceleration, which corresponds many cases in motion tracking. In case of less than 50% of missing data one can get much better estimation of true trajectory and missed points