# Trajectory Interpolation Example

Example that uses a simple constant-velocity prior over a trajectory that has some fake GPS-like measurements and with interpolation.


In [1]:
import numpy as np
import numpy.linalg as npla
np.set_printoptions(6, suppress=True)

import matplotlib.pyplot as plt
%matplotlib inline
# %matplotlib widget

from pylgmath import so3op, se3op, Transformation
from pysteam.evaluable.se3 import SE3StateVar
from pysteam.evaluable.vspace import VSpaceStateVar
from pysteam.evaluable.p2p import P2PErrorEvaluator
from pysteam.trajectory import Time
from pysteam.trajectory.const_vel import Interface as TrajectoryInterface
from pysteam.problem import OptimizationProblem, StaticNoiseModel, L2LossFunc, WeightedLeastSquareCostTerm
from pysteam.solver import GaussNewtonSolver, DoglegGaussNewtonSolver, LevMarqGaussNewtonSolver
import pysteam

Options:


In [12]:
num_states = 70  # total number of states
dt = 1  # time step

# Create some fake measurements (can consider as gps measurements lat-long-alt being converted to xyz)
fake_meas = np.empty((num_states, 4, 1))
# fake_meas[:, 0, 0] = np.linspace(0, 1, num_states)
# fake_meas[:, 1, 0] = np.sin(fake_meas[:, 0, 0] * 4 * np.pi) + np.random.normal(scale=0.05, size=fake_meas[:, 1, 0].shape)
fake_meas = np.linspace(0, 1, num_states) + np.random.normal(scale=0.05, size=num_states)
fake_meas = np.sin(fake_meas[:, 0, 0] * 4 * np.pi) + np.random.normal(scale=0.05, size=fake_meas[:, 0, 0].shape)
fake_meas[:, 2, 0] = 0  # z to zero so the problem is 2D
fake_meas[:, 3, 0] = 1.0  # homogeneous coordinate


# plt.figure()
# plt.plot(fake_meas[:, 0], fake_meas[:, 1], '.', label='measured values')
# plt.legend()
# plt.show()

# states with initial conditions and associated timestamps
# T_ba = T_k0 where 0 can be some global frame (e.g. UTM) and k is the vehicle/robot frame at time k
states = [(dt * i, Transformation(C_ba=np.eye(3), r_ba_in_a=fake_meas[i, :3]), np.zeros((6, 1))) for i in range(num_states)]

# wrap states with corresponding steam state variables (no copying!)
state_vars = [(t, SE3StateVar(T_vi), VSpaceStateVar(w_iv_inv)) for t, T_vi, w_iv_inv in states]
# state_vars[0][1].locked = True  # lock first pose
# for i in state_vars:
#     i[1].locked = True  # lock pose
#     i[2].locked = True  # lock velocity

qcd = 1000000*np.ones(6)
traj = TrajectoryInterface(qcd=qcd)
for t, T_vi, w_iv_inv in state_vars:
    traj.add_knot(time=Time(t), T_k0=T_vi, w_0k_ink=w_iv_inv)

cost_terms = []
# use a shared L2 loss function and noise model for all cost terms
noise_matrix = np.linalg.inv(np.array([[1,0,0],[0,1,0],[0,0,1]]))
loss_func = L2LossFunc()
noise_model = StaticNoiseModel(np.eye(3), "information")
for i in range(num_states):
    error_func = P2PErrorEvaluator(T_rq=state_vars[i][1], reference=np.array([[0, 0, 0, 1]]).T, query=fake_meas[i])
    cost_terms.append(WeightedLeastSquareCostTerm(error_func, noise_model, loss_func))

opt_prob = OptimizationProblem()
opt_prob.add_state_var(*[v for state_var in state_vars for v in state_var[1:]])
opt_prob.add_cost_term(*traj.get_prior_cost_terms())
opt_prob.add_cost_term(*cost_terms)

#solver = DoglegGaussNewtonSolver(opt_prob, verbose=True)
solver = LevMarqGaussNewtonSolver(opt_prob, verbose=True)
solver.optimize()

Begin Optimization
------------------
Number of States:  100
Number of Cost Terms:  99
Initial Cost:  1.3725058034053054e-06
Iteration:    1  -  Cost:     0.0000  -  TR Shrink:  0.000  -  AvP Ratio:  1.000
Termination Cause:  CONVERGED ABSOLUTE CHANGE


In [13]:
%matplotlib notebook
T_0k = np.array([npla.inv(x.matrix()) for _, x, _ in states])
T_k0_interp = [
    traj.get_pose_interpolator(Time(t)).evaluate().matrix() for t in np.linspace(-1*dt,(num_states+1)*dt,100)
]
T_0k_interp = np.array([npla.inv(x) for x in T_k0_interp])
plt.plot(T_0k[:, 0, 3], T_0k[:, 1, 3], '+', label="measured (smoothed)")
plt.plot(fake_meas[:, 0], fake_meas[:, 1], '.', label='measured values')
plt.plot(T_0k_interp[:, 0, 3], T_0k_interp[:, 1, 3], '-', label="inter/extra-polated")
plt.legend(loc="upper right")
plt.show()

  plt.switch_backend(backend)
  plt.switch_backend(backend)


<IPython.core.display.Javascript object>

In [19]:
covariance = pysteam.solver.Covariance(opt_prob)
T_k0_interp1 = [
    traj.get_covariance(covariance, Time(i)) for t in np.linspace(-1*dt,(num_states+1)*dt,100)
]

AssertionError: 

In [20]:
import scripts.groundtruth_utils as theodo_g
import importlib
theodo_g = importlib.reload(theodo_g)
%matplotlib notebook

plt.plot(T_0k[:, 0, 3], T_0k[:, 1, 3], '.', label="measured (smoothed)", color="black")

covariance = pysteam.solver.Covariance(opt_prob)
x_cov = np.array([covariance.query(x) for _,x,_ in state_vars])

plt.plot([np.linalg.inv(x[1].matrix())[0,3] for x in states], [np.linalg.inv(x[1].matrix())[1,3] for x in states])
for x,c in zip(states, x_cov):
    theodo_g.plot_ellipse(plt.gca(), np.linalg.inv(x[1].matrix())[0:2, 3].flatten(), np.linalg.inv(c[:2,:2]), n_std=.1, color="blue")

# plt.plot(T_0k_interp[:, 0, 3], T_0k_interp[:, 1, 3], '-', label="inter/extra-polated")
# for i,j in zip(T_0k_interp, T_k0_interp1):
#     theodo_g.plot_ellipse(plt.gca(), i[0:2, 3].flatten(), j[:2,:2], n_std=.1, color="green")

plt.legend()
plt.show()

  plt.switch_backend(backend)
  plt.switch_backend(backend)


<IPython.core.display.Javascript object>

In [99]:
theodo_g = importlib.reload(theodo_g)
Time_RTS = np.linspace(0,10,50)
Time_sensor = np.linspace(0,10,300)

X = np.linspace(0, 1, len(Time_RTS)) + np.random.normal(scale=0.005, size=len(Time_RTS))
Y = np.sin(X * 4 * np.pi) + np.random.normal(scale=0.005, size=X.shape)
Z = np.ones(len(Time_RTS))

MC_data = []
for i in range(0, len(Time_RTS)):
    #MC_data.append([Time_RTS[i], np.array([X[i], Y[i], Z[i], 1]), np.linalg.inv(0.001*np.identity(3))])
    MC_data.append([Time_RTS[i], np.array([X[i], Y[i], Z[i], 1]), 0.001*np.identity(3)])

MC_interpolated = theodo_g.STEAM_interpolation_with_covariance(Time_RTS, Time_sensor, MC_data)

Begin Optimization
------------------
Number of States:  100
Number of Cost Terms:  99
Initial Cost:  1475.587532422841
Iteration:    1  -  Cost:    30.0153  -  TR Shrink:  0.000  -  AvP Ratio:  1.000
Iteration:    2  -  Cost:    29.9930  -  TR Shrink:  0.000  -  AvP Ratio:  1.222
Iteration:    3  -  Cost:    29.9880  -  TR Shrink:  0.000  -  AvP Ratio:  1.726
Iteration:    4  -  Cost:    29.9853  -  TR Shrink:  0.000  -  AvP Ratio:  1.790
Termination Cause:  CONVERGED RELATIVE CHANGE
Interpolation MC done !


In [100]:
theodo_g = importlib.reload(theodo_g)
%matplotlib notebook
plt.figure()
for i in MC_data:
    #theodo_g.plot_ellipse(plt.gca(), i[1][0:2], np.linalg.inv(i[2][0:2,0:2]), n_std=1, color="red")
    theodo_g.plot_ellipse(plt.gca(), i[1][0:2], i[2][0:2,0:2], n_std=1, color="red")
    plt.scatter(i[1][0], i[1][1], s=2, color='red')
for j in MC_interpolated[0:-1]:
    #theodo_g.plot_ellipse(plt.gca(), j[1][0:2], np.linalg.inv(j[2][0:2,0:2]), n_std=1, color="green")
    theodo_g.plot_ellipse(plt.gca(), j[1][0:2], j[2][0:2,0:2], n_std=1, color="green")
    plt.scatter(j[1][0], j[1][1], s=2, color='green')
plt.show()

  plt.switch_backend(backend)
  plt.switch_backend(backend)


<IPython.core.display.Javascript object>

In [51]:
print(MC_data[0])

[0.0, array([-0.041652, -0.471373,  1.      ,  1.      ]), array([[0.0001, 0.    , 0.    ],
       [0.    , 0.0001, 0.    ],
       [0.    , 0.    , 0.0001]])]
