In [1]:
import numpy as np
import numpy.linalg as npla

np.set_printoptions(6, suppress=True)
import matplotlib.pyplot as plt

%matplotlib inline
# %matplotlib widget
plt.rcParams.update(
    {
        "text.usetex": True,
        "font.family": "serif",
        "font.serif": ["Times"],
        "font.size": 14,
    }
)

from pylgmath import se3op, Transformation

from pysteam.evaluable.se3 import SE3StateVar
from pysteam.evaluable.vspace import VSpaceStateVar
from pysteam.evaluable.stereo import (
    HomoPointStateVar,
    ComposeLandmarkEvaluator,
    HomoPointErrorEvaluator,
)

from pysteam.evaluable.sonar import (
    ComposeSonarLandmarkEvaluator,
    SonarErrorEvaluator
)

from pysteam.problem import (
    OptimizationProblem,
    L2LossFunc,
    StaticNoiseModel,
    WeightedLeastSquareCostTerm,
)
from pysteam.solver import GaussNewtonSolver, Covariance
from pysteam.trajectory import Time
from pysteam.trajectory.const_vel import Interface as ConstVelTrajectory



In [2]:
def plot_frame(ax, T_ri=np.eye(4), length=1, **kwargs):
    axes = T_ri @ np.concatenate((np.eye(3) * length, np.ones((1, 3))), axis=0)
    ax.plot(*zip(T_ri[:3, 3], axes[:3, 0]), color="r", **kwargs)
    ax.plot(*zip(T_ri[:3, 3], axes[:3, 1]), color="g", **kwargs)
    ax.plot(*zip(T_ri[:3, 3], axes[:3, 2]), color="b", **kwargs)


def get_cov_ellipsoid(ax, mu, cov, nstd=3, **kwargs):
    assert mu.shape == (3,) and cov.shape == (3, 3)

    # Find and sort eigenvalues to correspond to the covariance matrix
    eigvals, eigvecs = np.linalg.eigh(cov)
    idx = np.sum(cov, axis=0).argsort()
    eigvals_temp = eigvals[idx]
    idx = eigvals_temp.argsort()
    eigvals = eigvals[idx]
    eigvecs = eigvecs[:, idx]

    # Set of all spherical angles to draw our ellipsoid
    n_points = 100
    theta = np.linspace(0, 2 * np.pi, n_points)
    phi = np.linspace(0, np.pi, n_points)

    # Width, height and depth of ellipsoid
    rx, ry, rz = nstd * np.sqrt(eigvals)

    # Get the xyz points for plotting
    # Cartesian coordinates that correspond to the spherical angles:
    X = rx * np.outer(np.cos(theta), np.sin(phi))
    Y = ry * np.outer(np.sin(theta), np.sin(phi))
    Z = rz * np.outer(np.ones_like(theta), np.cos(phi))

    # Rotate ellipsoid for off axis alignment
    old_shape = X.shape
    # Flatten to vectorise rotation
    X, Y, Z = X.flatten(), Y.flatten(), Z.flatten()
    X, Y, Z = np.matmul(eigvecs, np.array([X, Y, Z]))
    X, Y, Z = X.reshape(old_shape), Y.reshape(old_shape), Z.reshape(old_shape)

    # Add in offsets for the mean
    X = X + mu[0]
    Y = Y + mu[1]
    Z = Z + mu[2]

    return ax.plot_wireframe(X, Y, Z, **kwargs)

In [3]:
## setup
# number of node states along trajectory (make as large as you like)
K = 10
# total arclength
L = 10

# initialize the node states (pose and velocity)
t_list = [] # time list
gt_w_ir_inr_list = [] # ground truth velocities
gt_T_ri_list = [] # ground truth poses
w_ir_inr_list = [] # observed velocities
T_ri_list = [] # observed poses
for i in range(K):
    # time (time along the trajectory from start to end)
    t_list.append(i * L / (K - 1))

    # ground truth trajectory
    if i == 0:
        gt_w_ir_inr_list.append(np.array([[-1.0, 0.0, 0.0, 0.0, 0.0, 0.0]]).T)
        gt_T_ri_list.append(np.eye(4))
        gt_T_ri_list[-1][2, 3] = 1.
    else:
        # Fixed velocities along the whole trajectory
        gt_w_ir_inr_list.append(np.array([[-1.0, 0.0, 0.0, 0.0, 0.1, 0.3]]).T)
        # Compute GT pose by applying the velocity change to the previous pose (i.e., no noise)
        gt_T_ri_list.append(
            se3op.vec2tran(L / (K - 1) * gt_w_ir_inr_list[i - 1]) @ gt_T_ri_list[i - 1]
        )

    # initial trajectory (will be later optimized)
    w_ir_inr_list.append(np.array([[-1.0, 0.0, 0.0, 0.0, 0.0, 0.0]]).T)
    T_ri_list.append(
        np.array(
            [
                [1.0, 0.0, 0.0, -t_list[-1]],
                [0.0, 1.0, 0.0, 0.0],
                [0.0, 0.0, 1.0, 0.0],
                [0.0, 0.0, 0.0, 1.0],
            ]
        )
    )
T_ri_list[0][2, 3] = 1.

# landmarks

# true GT landmarks (distance, theta)
gt_p_list = [
    np.array([[1.4142, 0.785]]).T,

]

Jland = len(gt_p_list)

# initial landmarks (will be later optimized; homogenious coordinates)
p_list = [np.array([[1.0, 1.0, 0.0, 1.0]]).T for _ in range(Jland)]

# set up the problem
t_knot_list = [Time(t) for t in t_list]
w_ir_inr_var_list = [VSpaceStateVar(w_ir_inr) for w_ir_inr in w_ir_inr_list]
T_ri_var_list = [SE3StateVar(Transformation(T_ba=T_ri)) for T_ri in T_ri_list]
p_var_list = [HomoPointStateVar(p) for p in p_list]

'''
Spectral density?
'''
# initialize the trajectory in pysteam
qcd = 0.01 * np.array([0.1, 0.001, 0.001, 0.01, 0.01, 0.01])
trajectory = ConstVelTrajectory(qcd=qcd)
for (t, T_ri, w_ir_inr) in zip(t_knot_list, T_ri_var_list, w_ir_inr_var_list):
    trajectory.add_knot(time=t, T_k0=T_ri, w_0k_ink=w_ir_inr)

meas_cost_terms = []
noise_model = StaticNoiseModel(0.01 * np.eye(3))
loss_func = L2LossFunc()
for k in range(K):
    ''' observe landmark evey 5th time step since we have 5 landmarks? '''
    j = k % Jland
    y = gt_p_list[j]
    '''
    Given the robot pose and the landmark location in robot frame, map it to world frame. 
    Next, measure error between GT and estimate. 
    '''
    error_func = SonarErrorEvaluator(
        ComposeSonarLandmarkEvaluator(T_ri_var_list[k], p_var_list[j]), y
    )
    meas_cost_terms.append(
        WeightedLeastSquareCostTerm(error_func, noise_model, loss_func)
    )

w_ir_inr_var_list[0].locked = True
T_ri_var_list[0].locked = True


opt_prob = OptimizationProblem()
opt_prob.add_state_var(*w_ir_inr_var_list, *T_ri_var_list, *p_var_list)
opt_prob.add_cost_term(*trajectory.get_prior_cost_terms())
opt_prob.add_cost_term(*meas_cost_terms)

solver = GaussNewtonSolver(opt_prob, verbose=True)
solver.optimize()

Forward
T_rw:
 [[ 1.  0.  0.  0.]
 [ 0.  1.  0.  0.]
 [ 0.  0.  1. -1.]
 [ 0.  0.  0.  1.]]
P_w:
 [[1.]
 [1.]
 [0.]
 [1.]]
P_r:
 [[ 1.]
 [ 1.]
 [-1.]
 [ 1.]]
Landmark Est:
 [[1.732051]
 [0.955317]
 [2.186276]]
Landmark GT:
 [[1.4142]
 [0.785 ]
 [1.57  ]]
Error: -1.1044434611586702


ValueError: matmul: Input operand 1 does not have enough dimensions (has 0, gufunc core with signature (n?,k),(k,m?)->(n?,m?) requires 1)

In [None]:
# query estimated trajectory
tq_list = np.linspace(0.0, L, 100)
w_ir_inr_q_list = [
    trajectory.get_velocity_interpolator(Time(tq)).evaluate() for tq in tq_list
]
T_ri_q_list = [trajectory.get_pose_interpolator(Time(tq)).evaluate() for tq in tq_list]
rq = np.stack([T_ri_q.inverse().matrix()[:3, 3] for T_ri_q in T_ri_q_list]).T

In [None]:
fig = plt.figure()

# ground truth
ax = fig.add_subplot(1, 2, 1, projection="3d")
for gt_p in gt_p_list:
    ax.scatter(*gt_p[:3, 0], color="r")
for gt_T_ri in gt_T_ri_list:
    plot_frame(ax, npla.inv(gt_T_ri), length=0.2)

gt_r_list = [npla.inv(gt_T_ri)[:3, 3] for gt_T_ri in gt_T_ri_list]
gt_p_list = [gt_p[:3, 0] for gt_p in gt_p_list]
for k in range(K):
    ax.plot(*np.stack((gt_r_list[k], gt_p_list[k % Jland])).T, color="b", alpha=0.5)

# estimated
ax = fig.add_subplot(1, 2, 2, projection="3d")
for p in p_var_list:
    ax.scatter(*p.evaluate()[:3, 0], color="r")
for T_ri_var in T_ri_var_list:
    plot_frame(ax, npla.inv(T_ri_var.evaluate().matrix()), length=0.2)

ax.plot(*rq)

r_list = [npla.inv(T_ri_var.evaluate().matrix())[:3, 3] for T_ri_var in T_ri_var_list]
p_list = [p.evaluate()[:3, 0] for p in p_var_list]
for k in range(K):
    ax.plot(*np.stack((r_list[k], p_list[k % Jland])).T, color="b", alpha=0.5)

covariance = Covariance(opt_prob)

for T_ri_var in T_ri_var_list[1:]:
    T_ir = npla.inv(T_ri_var.evaluate().matrix())
    C_ir = T_ir[:3, :3]
    r_cov = C_ir @ covariance.query(T_ri_var)[:3, :3] @ C_ir.T
    get_cov_ellipsoid(ax, T_ir[:3, 3], r_cov, nstd=3.0, alpha=0.1)
    # print(f"{r_cov[0, 0]:.6f}, {r_cov[0, 1]:.6f}, {r_cov[0, 2]:.6f}, {r_cov[1, 1]:.6f}, {r_cov[1, 2]:.6f}, {r_cov[2, 2]:.6f}")

p_cov_list = [covariance.query(p_var) for p_var in p_var_list]
for k in range(K):
    get_cov_ellipsoid(ax, p_list[k % Jland], p_cov_list[k % Jland], nstd=3.0, alpha=0.1)
# for p_cov in p_cov_list:
#   print(f"{p_cov[0, 0]:.6f}, {p_cov[0, 1]:.6f}, {p_cov[0, 2]:.6f}, {p_cov[1, 1]:.6f}, {p_cov[1, 2]:.6f}, {p_cov[2, 2]:.6f}")

In [None]:
fig = plt.figure()
ax = fig.add_subplot(1, 1, 1, projection="3d")

Kq = 4
tq_list = np.linspace(t_list[Kq], t_list[Kq + 1], 11)
for tq in tq_list:
    T_ir = npla.inv(trajectory.get_pose_interpolator(Time(tq)).evaluate().matrix())
    C_ir = T_ir[:3, :3]
    r_cov = C_ir @ trajectory.get_covariance(covariance, Time(tq))[:3, :3] @ C_ir.T
    plot_frame(ax, T_ir, length=0.1)
    get_cov_ellipsoid(ax, T_ir[:3, 3], r_cov, nstd=1.0, alpha=0.1)
    print(
        f"{tq}, {r_cov[0, 0]:.6f}, {r_cov[0, 1]:.6f}, {r_cov[0, 2]:.6f}, {r_cov[1, 1]:.6f}, {r_cov[1, 2]:.6f}, {r_cov[2, 2]:.6f}"
    )