In [None]:
import numpy as np
import matplotlib.pyplot as plt
import json
import scipy
from typing import List, Any, Dict, Union
import transforms3d as t3d
from scipy import signal
import math

def vectorize_to_np(record_list: List[Dict[str, Any]], keys: List[str]) -> Dict[str, np.ndarray]:
    """Vectorizing record

    Args:
        record_list (List[Dict[str, Any]]): List of records, each record is a bundled dictionary
        keys (List[str]): keys to extract from records

    Returns:
        Dict[str, np.ndarray]: A dictionary in which keys are desired and values are numpy arrays
    """
    assert len(keys) > 0
    assert len(record_list) > 0
    res = {}
    for key in keys:
        res[key] = np.expand_dims(np.array([record[key] for record in record_list]),axis=-1)

    # Verify length
    _length: int = len(res[keys[0]])
    for key in keys:
        if _length != len(res[key]):
            raise ValueError("Not every attribute has the same length")

    return res

def rpy_to_pose_mat_np(rpy_data: np.ndarray) -> np.ndarray:
    """Convert roll-pitch-yaw data to transform matrix

    Args:
        ryp_data (np.ndarray): 2-D matrix
        [[r0,p0,y0],[r1,p1,y1],...]

    Returns:
        np.ndarray: [description]
    """
    length = rpy_data.shape[0]
    pose_mat = np.empty(shape=(length, 3, 3))
    for idx in range(length):
        pose_mat[idx] = t3d.euler.euler2mat(*rpy_data[idx])
    return pose_mat


In [None]:
record_name = './data/2021-09-22/2021-09-22-20:48:13_cu.usbserial-13110_IMU_record.json' # path-like string
c = 1. / 180. * np.pi # deg->rad conversion

# use json module to load
record_list = json.load(open(record_name))
# record_list = record_list[100:]
x = vectorize_to_np(record_list,['accel_x','accel_y','accel_z','pitch','roll','yaw', 'time'])

accel = np.hstack([x['accel_x'],x['accel_y'],x['accel_z']])
rpy = np.hstack([x['roll'] * c,x['pitch'] * c,x['yaw'] * c])
pose_mat = rpy_to_pose_mat_np(rpy)

timestamp = x['time']

fig = plt.figure(figsize=(32,8))

ax = fig.add_subplot(141, projection='3d')
ax.scatter(accel[:,0], accel[:,1], accel[:,2], c=timestamp)
ax.set_xlabel('X', fontdict={'size': 15, 'color': 'red'})
ax.set_ylabel('Y', fontdict={'size': 15, 'color': 'red'})
ax.set_zlabel('Z', fontdict={'size': 15, 'color': 'red'})


ax = fig.add_subplot(142)
ax.scatter(timestamp,accel[:,0])
ax.set_title('ACCEL-X')


ax = fig.add_subplot(143)
ax.scatter(timestamp,accel[:,1])
ax.set_title('ACCEL-Y')


ax = fig.add_subplot(144)
ax.scatter(timestamp,accel[:,2])
ax.set_title('ACCEL-Z')

In [None]:
def measurement_update(sensor_var, p_cov_check, y_k, p_check, v_check, q_check):
    # 3.1 Compute Kalman Gain
    # K_k = P_k * H_k.T * inv( H_k * P_k * H_k.T + R_k )
    try:
        temp = matmul(h_jac, matmul(p_cov_check, h_jac.T)) + sensor_var * np.eye(3)
        inv = np.linalg.inv(temp)
        # print("temp: ", temp.shape, "sensor_var: ", sensor_var)
        K = matmul(p_cov_check,
                   matmul(h_jac.T, inv))  # np.linalg.inv(matmul(h_jac, matmul(p_cov_check, h_jac.T)) + sensor_var )))

    except np.linalg.LinAlgError as err:
        if 'Singular matrix' in str(err):
            raise "A singular matrix "

    # 3.2 Compute error state
    # print("y_k size: ", y_k.shape, "h_jac size: ", h_jac.shape, "p_check size: ", p_check.shape, "P_CHECK: ", p_check)
    error_state = y_k - p_check  # matmul(h_jac[:3, :3], p_check)

    # 3.3 Correct predicted state
    p_hat = p_check + matmul(K, error_state)[:3]
    v_hat = v_check + matmul(K, error_state)[3:6]
    # print("error_state ", error_state.shape, "K: ", K.shape, "q_check: ", q_check.shape)
    q_hat = Quaternion(axis_angle=matmul(K, error_state)[6:]).quat_mult_right(q_check)  # 注意更新q的方式，是通过四元数更新的

    # 3.4 Compute corrected covariance
    p_cov_hat = matmul(np.eye(9) - matmul(K, h_jac), p_cov_check)

    return p_hat, v_hat, q_hat, p_cov_hat

In [None]:
################################################################################################
# Now that everything is set up, we can start taking in the sensor data and creating estimates
# for our state in a loop.
################################################################################################
for k in range(1, imu_f.data.shape[0]):  # start at 1 b/c we have initial prediction from gt
    # print("SETP : ", k)
    delta_t = imu_f.t[k] - imu_f.t[k - 1]  # 两个数据之间的时间间隔

    # 1. Update state with IMU inputs
    q_cov = delta_t**2 * np.diag([var_imu_f, var_imu_f, var_imu_f, var_imu_w, var_imu_w, var_imu_w])  # 输入（IMU）的协方差矩阵

    Cns = Quaternion(*q_est[k - 1]).to_mat()  # 四元数转旋转矩阵
    p_check = p_est[k - 1] + delta_t * v_est[k - 1] + (delta_t**2) * 0.5 * (matmul(Cns, imu_f.data[k - 1]) + g)
    v_check = v_est[k - 1] + delta_t * (matmul(Cns, imu_f.data[k - 1]) + g)
    q_check = Quaternion(euler=delta_t * imu_w.data[k - 1]).quat_mult_right(q_est[k - 1])
    # 1. Update state with IMU inputs
    ################ CORRECTION STEP #####################
    # 1.1 Linearize the motion model and compute Jacobians
    f_jac = np.eye(9)
    i_1 = np.eye(3) * delta_t
    f = imu_f.data[k - 1].reshape((3, 1))  # 3*1
    # i_2 = -skew_symmetric( np.dot(C_ns,f).reshape((3,1)) )*delta_t
    i_2 = matmul(Cns, -skew_symmetric(f)) * delta_t
    f_jac[:3, 3:6] = i_1
    f_jac[3:6, 6:] = i_2
    m_jac = np.eye(3)

    # 2. Propagate uncertainty
    p_cov_check = f_jac @ p_cov[k - 1] @ f_jac.T + l_jac @ q_cov @ l_jac.T  # 9*9

    # 3. Check availability of GNSS and LIDAR measurements
    # 如果两个观测都不可用，那么直接使用运动模型的预测
    p_hat, v_hat, q_hat, p_cov_hat = p_check, v_check, q_check, p_cov_check
    p_est[k], v_est[k], q_est[k], p_cov[k] = p_check, v_check, q_check, p_cov_check
    for i in range(len(gnss.t)):
        if (abs(gnss.t[i] - imu_f.t[k]) < 0.01):
            p_hat, v_hat, q_hat, p_cov_hat = measurement_update(sensor_var=var_gnss,
                                                                p_cov_check=p_cov_hat,
                                                                y_k=gnss.data[i],
                                                                p_check=p_hat,
                                                                v_check=v_hat,
                                                                q_check=q_hat)

    for i in range(len(lidar.t)):
        if (abs(lidar.t[i] - imu_f.t[k]) < 0.01):
            p_hat, v_hat, q_hat, p_cov_hat = measurement_update(sensor_var=var_lidar,
                                                                p_cov_check=p_cov_hat,
                                                                y_k=lidar.data[i],
                                                                p_check=p_hat,
                                                                v_check=v_hat,
                                                                q_check=q_hat)

    p_est[k] = p_hat
    v_est[k] = v_hat
    q_est[k] = q_hat  # 4*1
    p_cov[k] = p_cov_hat