In [1]:
import sys
sys.path.insert(1, '../')
from FrenetFDA.iek_filter_smoother_Frenet_state import IEKFilterSmootherFrenetState
import FrenetFDA.visualization as visu
import numpy as np
from scipy.integrate import cumtrapz, solve_ivp
from sklearn.gaussian_process.kernels import Matern
import plotly.graph_objs as go
import plotly.express as px
from plotly.subplots import make_subplots
layout = go.Layout(plot_bgcolor='rgba(0,0,0,0)')
import plotly.io as pio
pio.renderers.default = 'browser'

In [11]:
def generate_mu(theta, grid, mu0):
    ode_mu = lambda s,mu: (np.matmul(mu.reshape(4,4),np.array([[0,-theta(s)[0], 0, 1], [theta(s)[0], 0, -theta(s)[1], 0], [0, theta(s)[1], 0, 0], [0, 0, 0, 0]]))).flatten()
    sol_mu = solve_ivp(ode_mu, t_span=(0,1), y0=mu0.flatten(), t_eval=grid, method='Radau')
    mu = sol_mu.y.reshape(4,4,len(grid))
    mu = np.moveaxis(mu, -1, 0)
    mu_Q = mu[:,:3,:3]
    mu_X = mu[:,:3,3] 
    return mu, mu_Q, mu_X

def generate_Y(X, gamma):
    W = gamma**2*np.eye(3)
    Y = X[1:] + np.random.multivariate_normal(np.zeros(3), W, size=(len(X)-1))
    return Y

color_list = px.colors.qualitative.Plotly
def plot_res_Z(kf, Q_true, Q_mu, X_true, X_mu, Y, title=''):

    fig = make_subplots(rows=2, cols=2, subplot_titles=("Tangent Vector", "Normal Vector", "Binormal Vector", "X"), specs=[[{"type": "scene"}, {"type": "scene"}],
           [{"type": "scene"}, {"type": "scene"}]],)
    # T 
    fig.add_trace(go.Scatter3d(x=Q_true[:,:,0][:,0],y=Q_true[:,:,0][:,1],z=Q_true[:,:,0][:,2],name='True',mode='lines',line=dict(width=3,  color=color_list[0])), row=1, col=1)
    fig.add_trace(go.Scatter3d(x=Q_mu[:,:,0][:,0],y=Q_mu[:,:,0][:,1],z=Q_mu[:,:,0][:,2],name='Init',mode='lines',line=dict(width=3,  color=color_list[1])), row=1, col=1)
    fig.add_trace(go.Scatter3d(x=kf.track_Q[:,:,0][:,0],y=kf.track_Q[:,:,0][:,1],z=kf.track_Q[:,:,0][:,2],name='Track',mode='lines',line=dict(width=3,  color=color_list[2])), row=1, col=1)
    fig.add_trace(go.Scatter3d(x=kf.smooth_Q[:,:,0][:,0],y=kf.smooth_Q[:,:,0][:,1],z=kf.smooth_Q[:,:,0][:,2],name='Smooth',mode='lines',line=dict(width=3,  color=color_list[3])), row=1, col=1)
    # N 
    fig.add_trace(go.Scatter3d(x=Q_true[:,:,1][:,0],y=Q_true[:,:,1][:,1],z=Q_true[:,:,1][:,2],name='True',mode='lines',line=dict(width=3,  color=color_list[0])), row=1, col=2)
    fig.add_trace(go.Scatter3d(x=Q_mu[:,:,1][:,0],y=Q_mu[:,:,1][:,1],z=Q_mu[:,:,1][:,2],name='Init',mode='lines',line=dict(width=3, color=color_list[1])), row=1, col=2)
    fig.add_trace(go.Scatter3d(x=kf.track_Q[:,:,1][:,0],y=kf.track_Q[:,:,1][:,1],z=kf.track_Q[:,:,1][:,2],name='Track',mode='lines',line=dict(width=3, color=color_list[2])), row=1, col=2)
    fig.add_trace(go.Scatter3d(x=kf.smooth_Q[:,:,1][:,0],y=kf.smooth_Q[:,:,1][:,1],z=kf.smooth_Q[:,:,1][:,2],name='Smooth',mode='lines',line=dict(width=3,  color=color_list[3])), row=1, col=2)
    # B
    fig.add_trace(go.Scatter3d(x=Q_true[:,:,2][:,0],y=Q_true[:,:,2][:,1],z=Q_true[:,:,2][:,2],name='True',mode='lines',line=dict(width=3, color=color_list[0])), row=2, col=1)
    fig.add_trace(go.Scatter3d(x=Q_mu[:,:,2][:,0],y=Q_mu[:,:,2][:,1],z=Q_mu[:,:,2][:,2],name='Init',mode='lines',line=dict(width=3, color=color_list[1])), row=2, col=1)
    fig.add_trace(go.Scatter3d(x=kf.track_Q[:,:,2][:,0],y=kf.track_Q[:,:,2][:,1],z=kf.track_Q[:,:,2][:,2],name='Track',mode='lines',line=dict(width=3, color=color_list[2])), row=2, col=1)
    fig.add_trace(go.Scatter3d(x=kf.smooth_Q[:,:,2][:,0],y=kf.smooth_Q[:,:,2][:,1],z=kf.smooth_Q[:,:,2][:,2],name='Smooth',mode='lines',line=dict(width=3, color=color_list[3])), row=2, col=1)
    # X 
    fig.add_trace(go.Scatter3d(x=X_true[:,0],y=X_true[:,1],z=X_true[:,2],name='True',mode='lines',line=dict(width=3,  color=color_list[0])), row=2, col=2)
    fig.add_trace(go.Scatter3d(x=X_mu[:,0],y=X_mu[:,1],z=X_mu[:,2],name='Init',mode='lines',line=dict(width=3, color=color_list[1])), row=2, col=2)
    fig.add_trace(go.Scatter3d(x=kf.track_X[:,0],y=kf.track_X[:,1],z=kf.track_X[:,2],name='Track',mode='lines',line=dict(width=3, color=color_list[2])), row=2, col=2)
    fig.add_trace(go.Scatter3d(x=kf.smooth_X[:,0],y=kf.smooth_X[:,1],z=kf.smooth_X[:,2],name='Smooth',mode='lines',line=dict(width=3, color=color_list[3])), row=2, col=2)
    fig.add_trace(go.Scatter3d(x=Y[:,0],y=Y[:,1],z=Y[:,2],name='Observations',mode='lines',line=dict(width=3, color=color_list[4])), row=2, col=2)

    fig.update_xaxes(showline=True, showgrid=False, linewidth=1, linecolor='black')
    fig.update_yaxes(showline=True, showgrid=False, linewidth=1, linecolor='black')
    fig.update_layout(title_text=title)
    fig.show()

N = 200
grid = np.linspace(0,1,N+1)


""" __________ Definition of exact function Theta ___________ """

curv = lambda s : 2*np.cos(2*np.pi*s) + 5
tors = lambda s : 2*np.sin(2*np.pi*s) + 1

def theta(s):
    if isinstance(s, int) or isinstance(s, float):
        return np.array([curv(s), tors(s)])
    elif isinstance(s, np.ndarray):
        return np.vstack((curv(s), tors(s))).T
    else:
        raise ValueError('Variable is not a float, a int or a NumPy array.')
    
def warping(s,a):
    if np.abs(a)<1e-15:
        return s
    else:
        return (np.exp(a*s) - 1)/(np.exp(a) - 1)
    
warp_grid = warping(grid, 2)
    
""" __________ Definition of states and noisy observations __________ """

mu0 = np.eye(4)
mu_Z, mu_Q, mu_X = generate_mu(theta, warp_grid, mu0)

gamma = 0.001
W = gamma**2*np.eye(3)
Y = generate_Y(mu_X, gamma)

In [3]:
visu.plot_3D([mu_X, Y], ['X', 'Y'])

In [8]:
### Kernel representation of true theta
    
nu = 1.5
l = 1.0
kernel = 1.0 * Matern(length_scale=l, nu=nu)
v = (grid[1:] + grid[:-1])/2
rho = np.array([1,0,0])
V = np.expand_dims(v, 1)
y = np.vstack((curv(v),tors(v))).T.reshape((len(v)*2,1))
K_VV = np.kron(kernel(V,V),np.eye(2))
true_coefs = np.linalg.solve(K_VV,y)

def separable_kernel(x,y):
    return np.kron(kernel(x,y),np.eye(2))

def theta_kern(s):
    if isinstance(s, int) or isinstance(s, float):
            return np.squeeze(np.kron(kernel(v[:,np.newaxis], np.array([s])[:,np.newaxis]),np.eye(2)).T @ true_coefs)
    elif isinstance(s, np.ndarray):
        return np.reshape(np.kron(kernel(v[:,np.newaxis], s[:,np.newaxis]),np.eye(2)).T @ true_coefs, (-1,2))
    else:
        raise ValueError('Variable is not a float, a int or a NumPy array.')
    
sigma_theta = 2
val_noisy = theta_kern(v) + np.random.multivariate_normal(np.zeros(2), sigma_theta**2*np.eye(2), size=len(v))

# Kernel representation 
lbda = 0.3
coefs_noisy = np.linalg.solve((K_VV+lbda*np.eye(len(v)*2)),val_noisy.reshape((len(v)*2,1)))
coefs_noisy = np.reshape(coefs_noisy,(-1,2))
coefs_noisy = np.reshape(coefs_noisy, (-1,))
theta_noisy_arr = np.reshape(K_VV.T @ coefs_noisy, (-1,2))

def theta_kern_noisy(s):
    if isinstance(s, int) or isinstance(s, float):
            return np.squeeze(np.kron(kernel(v[:,np.newaxis], np.array([s])[:,np.newaxis]),np.eye(2)).T @ coefs_noisy)
    elif isinstance(s, np.ndarray):
        return np.reshape(np.kron(kernel(v[:,np.newaxis], s[:,np.newaxis]),np.eye(2)).T @ coefs_noisy, (-1,2))
    else:
        raise ValueError('Variable is not a float, a int or a NumPy array.')
    
def a_theta_kern_noisy(s):
    if isinstance(s, int) or isinstance(s, float):
        return np.array([theta_kern_noisy(s)[1], 0, theta_kern_noisy(s)[0]])
    elif isinstance(s, np.ndarray):
        return np.vstack((theta_kern_noisy(s)[:,1], np.zeros(N), theta_kern_noisy(s)[:,0])).T
    else:
        raise ValueError('Variable is not a float, a int or a NumPy array.')
    
def omega_theta_kern_noisy(s):
    if isinstance(s, int) or isinstance(s, float):
        return np.hstack((a_theta_kern_noisy(s),rho))
    elif isinstance(s, np.ndarray):
        return np.hstack((a_theta_kern_noisy(s), np.stack((np.tile(rho, (len(a_theta_kern_noisy(s)),1))))))
    else:
        raise ValueError('Variable is not a float, a int or a NumPy array.')

In [16]:
visu.plot_array_2D_names(v, [curv(v), val_noisy[:,0], theta_kern_noisy(v)[:,0]], ['exact_function', 'noisy initial guess', 'kernel representation initial guess'])
visu.plot_array_2D_names(v, [tors(v), val_noisy[:,1], theta_kern_noisy(v)[:,1]], ['exact_function', 'noisy initial guess', 'kernel representation initial guess'])

In [12]:
sigma = 0.02
Sigma = lambda s: sigma**2*np.eye(2)

P0 = (0.0001**2)*np.eye(6)

kf = IEKFilterSmootherFrenetState(3, W, Sigma, theta_kern_noisy, Z0=mu0, P0=P0, K_pts=5) 
kf.smoothing(warp_grid,Y)

In [13]:
mu_Z_noisy, mu_Q_noisy, mu_X_noisy = generate_mu(theta_kern_noisy, warp_grid, mu0)
plot_res_Z(kf, mu_Q, mu_Q_noisy, mu_X, mu_X_noisy, Y, title='test tracking from wrong a_theta')

In [6]:
print(kf.L.shape)

(6, 2)


In [14]:
kf.L @ Sigma(0) @ kf.L.T

array([[0.0004, 0.    , 0.    , 0.    , 0.    , 0.    ],
       [0.    , 0.    , 0.    , 0.    , 0.    , 0.    ],
       [0.    , 0.    , 0.0004, 0.    , 0.    , 0.    ],
       [0.    , 0.    , 0.    , 0.    , 0.    , 0.    ],
       [0.    , 0.    , 0.    , 0.    , 0.    , 0.    ],
       [0.    , 0.    , 0.    , 0.    , 0.    , 0.    ]])