In [None]:
import numpy as np
from pr3_utils import *
from stereo import *
from tqdm import tqdm

In [None]:
dataset = "03"
feature_type = "all"
feature_type = "selected"

time_stamp,features,linear_velocity,angular_velocity,k,b,imu_T_cam = \
    load_data(f"../data/{dataset}.npz")

num_time_stamp = time_stamp.shape[0]
num_features = features.shape[1]

tau = time_stamp[1:] - time_stamp[:-1]
velocity = np.concatenate([linear_velocity, angular_velocity], axis=1)

T_mean = np.load(f"../data/{dataset}_EKF_localization_T_mean.npy")

In [None]:
cam_T_imu = inversePose(imu_T_cam)
fsu = k[0,0]
fsv = k[1,1]
cu  = k[0,2]
cv  = k[1,2]
Ks = np.array([
    [fsu,0,cu,0],
    [0,fsv,cv,0],
    [fsu,0,cu,-fsu*b],
    [0,fsv,cv,0],
])
P = np.array([
    [1,0,0,0],
    [0,1,0,0],
    [0,0,1,0],
])

### mapping

In [None]:
if feature_type == "all":
    M_init = np.load(f"../data/{dataset}_EKF_mapping_M_init_{feature_type}.npy")
else:
    M_init = np.load(f"../data/{dataset}_EKF_mapping_M_init_{feature_type}.npy")
    M_selection_mask = np.load(f"../data/{dataset}_EKF_mapping_M_mask_{feature_type}.npy")
    M_init = M_init[M_selection_mask]
    features = features[:,M_selection_mask,:]

In [None]:
# noise model
V = 5

# log list
log_M_covar_norm_t = []
log_M_covar_norm = []

# init
M_mean = np.copy(M_init)
M_covar_flat = np.eye(3*num_features, 3*num_features, dtype=np.float16)
M_covar = M_covar_flat.reshape(num_features, 3, num_features, 3)

# EKF
for t in tqdm(range(1, num_time_stamp)):
    # update step
    present_mask = get_seeing_mask(features[t,:,:], d_min=3)
    Nt = np.sum(present_mask)
    
    # compose observation
    present_M_mean = M_mean[present_mask, :]
    present_M_mean_homo = np.hstack([present_M_mean, np.ones([Nt,1])])
    ixgrid = np.ix_(present_mask, np.arange(3), present_mask, np.arange(3))
    present_M_covar_flat = M_covar[ixgrid].reshape([Nt*3, Nt*3])

    # build H
    H = np.zeros([Nt, 4, Nt, 3])
    cam_T_world = cam_T_imu @ inversePose(T_mean[t])
    for j in range(Nt):
        # δh(T, m_j) / δm_j
        H[j,:,j,:] = Ks @ \
         projectionJacobian(cam_T_world @ present_M_mean_homo[j,:]) @ \
         cam_T_world@P.T
    H = H.reshape([Nt*4, Nt*3])

    # Kalman gain and innovation 
    K = present_M_covar_flat @ H.T @ np.linalg.inv(H @ present_M_covar_flat @ H.T + V*np.eye(Nt*4))
    innovation = K @ (
        features[t,present_mask,:] - (projection(present_M_mean_homo @ cam_T_world.T)@Ks.T)
    ).reshape(-1)
    
    # update
    M_mean[present_mask, :] = present_M_mean + innovation.reshape(-1, 3)
    M_covar[ixgrid] = (
        (np.eye(Nt*3) - K@H)@present_M_covar_flat
    ).reshape(Nt, 3, Nt, 3)

    # log covariance
    if t%50 == 0:
        log_M_covar_norm_t.append(time_stamp[t])
        log_M_covar_norm.append(np.trace(M_covar_flat))

In [None]:
fig,ax = visualize_trajectory("EKF_localization", T_mean, show_ori=False,)

plot_bound = 200
x_min, x_max = np.min(T_mean[:,0,3]), np.max(T_mean[:,0,3])
y_min, y_max = np.min(T_mean[:,1,3]), np.max(T_mean[:,1,3])
plot_mask = (x_min-plot_bound < M_mean[:,0]) & (M_mean[:,0] < x_max+plot_bound) & \
            (y_min-plot_bound < M_mean[:,1]) & (M_mean[:,1] < y_max+plot_bound)
ax.scatter(M_mean[plot_mask, 0], M_mean[plot_mask, 1], 
           s=0.1, c='C4',label=f"features({feature_type})")
ax.legend()
fig.savefig(f"../img/{dataset}_EKF_mapping_{feature_type}", dpi=300)
plt.plot()

In [None]:
plt.plot(log_M_covar_norm_t, log_M_covar_norm)
plt.scatter(log_M_covar_norm_t, log_M_covar_norm)
plt.xlabel("time stamp")
plt.ylabel("tr(M_covar)")
plt.savefig(f"../img/{dataset}_EKF_mapping_{feature_type}_covar_trace", dpi=300)
plt.show()

In [None]:
from matplotlib.colors import Normalize

sample_step = 400
size = int(np.ceil(num_features/sample_step)*3)
# vmin, vmax = np.min(M_covar), np.max(M_covar)
vmin, vmax = -1, 1
plt.imshow(
    M_covar[::sample_step, :, ::sample_step, :].reshape(size, size), 
    cmap='viridis', origin='lower', norm=Normalize(vmin=vmin, vmax=vmax))
plt.colorbar()
plt.savefig(f"../img/{dataset}_EKF_mapping_{feature_type}_covar_sample", dpi=300)
plt.show()

In [None]:
print(M_covar.nbytes/1024/1024/1024, "GB")