In [2]:
%load_ext autoreload
%autoreload 2

In [12]:
import os.path as osp
import struct
from datetime import datetime
import numpy as np
import matplotlib.pyplot as plt
from estimate_qc import get_tran_wnoj, get_q_wnoj

In [9]:
#TODO: get applanix positions, velocities, covariances
root = '/workspace/data1/boreas-2021-09-02-11-42/'
SBET_RATE = 200.0
skip = 1
dt = 1.0 / SBET_RATE
dim = 6
C_enu_ned = np.array([[0, 1, 0],[1, 0, 0],[0, 0, -1]], dtype=np.float64)
DEG_TO_RAD = np.pi / 180
ARCSECOND_TO_DEG = 1.0 / 3600.0
changeovertime = 1627387200
# Periods during which daylight savings is in effect in Toronto
dst = [(1583650800, 1604210399), (1615705200, 1636264799), (1647154800, 1667714399), (1678604400, 1699163999), (1710054000, 1730613599)]
# Glen Shields LLA reference (first position of first sequence)
lla0 = np.array([0.7641426285531766, -1.3869452891985545, 153.67129211034626], dtype=np.float64)
R_w = np.diag([5.48885234e-06, 2.38789649e-05, 3.34471102e-06])
R_a = np.diag([0.03792177, 0.01259208, 0.00849714])

In [None]:
def get_applanix_data_at_time(time, gps_times, gps_lines):

    idx = bisect_left(gps_times, time)
    if idx >= len(gps_times):
        idx = len(gps_times) - 1
    d = abs(gps_times[idx] - time)
    if gps_times[idx] < time and idx < len(gps_times) - 1:
        if abs(gps_times[idx + 1] - time) < d:
            idx += 1
    elif gps_times[idx] > time and idx > 0:
        if abs(gps_times[idx - 1] - time) < d:
            idx -= 1

    closest = idx

    def _parse(gps_line):
        return [float(x) for x in gps_line.split(',')]

    gt_time = gps_times[closest]

    def _interpolate(lower, upper, t):
        assert(len(lower) == len(upper))
        tlow = lower[0]
        tupp = upper[0]
        assert(tlow < t and t < tupp)
        delta = tupp - tlow
        if delta == 0:
            return lower
        ratio = (t - tlow) / delta
        out = []
        for low, upp in zip(lower, upper):
            out.append(low + (upp - low) * ratio)
        out[0] = t
        return out

    line = _parse(gps_lines[closest])
    if gt_time < time:
        if closest == len(gps_lines) - 1:
            return line
        line_lower = line
        line_upper = _parse(gps_lines[closest + 1])
        return _interpolate(line_lower, line_upper, time)
    elif gt_time > time:
        if closest == 0:
            return line
        line_lower = _parse(gps_lines[closest - 1])
        line_upper = line
        return _interpolate(line_lower, line_upper, time)
    elif gt_time == time:
        return line

# Adjust orientation and translation from T_enu_applanix --> T_enu_sensor
def adjust_pose_to_sensor(line, T_applanix_sensor):
    T_enu_applanix = np.identity(4, dtype=np.float64)
    T_enu_applanix[0:3, 0:3] = yawPitchRollToRot(line[9], line[8], line[7])  # we read C_enu_applanix
    T_enu_applanix[0, 3] = line[1]
    T_enu_applanix[1, 3] = line[2]
    T_enu_applanix[2, 3] = line[3]
    T_enu_sensor = np.matmul(T_enu_applanix, T_applanix_sensor)
    line[1] = T_enu_sensor[0, 3]
    line[2] = T_enu_sensor[1, 3]
    line[3] = T_enu_sensor[2, 3]
    y, p, r = rotToYawPitchRoll(T_enu_sensor[:3, :3])  # we write y, p, r of C_enu_sensor
    line[7] = r
    line[8] = p
    line[9] = y

    adT_sensor_applanix = Transformation(T_ba=T_applanix_sensor).inverse().adjoint()
    varpi_applanix = np.zeros(6, dtype=np.float64)
    vbar = np.array([line[4], line[5], line[6]], dtype=np.float64)  # in enu
    varpi_applanix[:3] = np.matmul(T_enu_applanix[:3, :3].T, vbar.reshape(3, 1)).squeeze()  # rotate from enu to applanix
    varpi_applanix[3:] = np.array([line[12], line[11], line[10]], dtype=np.float64)     # angular already in applanix
    varpi_sensor = np.matmul(adT_sensor_applanix, varpi_applanix.reshape(6, 1)).squeeze()
    line[4], line[5], line[6] = np.matmul(T_enu_sensor[:3, :3], varpi_sensor[:3].reshape(3, 1)).squeeze()   # rotate back to enu
    line[12], line[11], line[10] = varpi_sensor[3:]     # angular should already be in sensor frame
    return line

def rotToYawPitchRoll(C, eps = 1e-15):
    i = 2
    j = 1
    k = 0
    c_y = np.sqrt(C[i, i]**2 + C[j, i]**2)
    if c_y > eps:
        r = np.arctan2(C[j, i], C[i, i])
        p = np.arctan2(-C[k, i], c_y)
        y = np.arctan2(C[k, j], C[k, k])
    else:
        r = 0
        p = np.arctan2(-C[k, i], c_y)
        y = np.arctan2(-C[j, k], C[j, j])

    return y, p, r

In [15]:
def get_enu_covariance(data, smr_data):
    lla = np.array([data[1], data[2], data[3]], dtype=np.float64)
    T_nedref_ned = RelLLAtoNED(lla, lla0)
    T_enu_ned = np.eye(4, dtype=np.float64)
    T_enu_ned[:3, :3] = C_enu_ned


    # what we get from sbet orientation (position of pos w.r.t. ned is zero)
    # note: pos frame is applanix frame here
    heading_wander = data[9]
    alpha = data[10]  # Wander angle
    
    heading_ned = heading_wander - alpha
    roll_ned = data[7]
    pitch_ned = data[8]

    def llarph_to_T_pos_enuref(h, p, r, delta_n = 0, delta_e = 0, delta_d = 0):
        T_ned_pos = np.eye(4, dtype=np.float64)
        # applanix uses different rotation conventions from our lab.
        T_ned_pos[:3, :3] = posOrientToRot(h, p, r)
        # what we need to compute is T_nedref_pos
        T_nedref_pos = np.matmul(T_nedref_ned, T_ned_pos)
        T_nedref_pos[0, 3] += delta_n
        T_nedref_pos[1, 3] += delta_e
        T_nedref_pos[2, 3] += delta_d
        # convert to ENU
        T_enu_ned = np.eye(4, dtype=np.float64)
        T_enu_ned[:3, :3] = C_enu_ned
        T_enuref_pos = np.matmul(T_enu_ned, T_nedref_pos)
        return get_inverse_tf(T_enuref_pos)

    Tbar = T_pos_enuref = llarph_to_T_pos_enuref(heading_ned, pitch_ned, roll_ned)

    T_enuref_ned = T_enu_ned @ T_nedref_ned
    C_enuref_ned = T_enuref_ned[:3, :3]
    R_pos_ned = np.diag([smr_data[1]**2, smr_data[2]**2, smr_data[3]**2])
    R_vel_ned = np.diag([smr_data[4]**2, smr_data[5]**2, smr_data[6]**2])
    R_pos_enu = C_enuref_ned @ R_pos_ned @ C_enuref_ned.T
    R_vel_enu = C_enuref_ned @ R_vel_ned @ C_enuref_ned.T
    
    return T_pos_enuref, C_enuref_ned, R_pos_enu, R_vel_enu

## Interface for Interpolating SMR Data at the time that we want

In [None]:
gps_file = osp.join(root, 'applanix', 'gps_post_process.csv')
with open(gps_file, 'r') as gf:
    header = gf.readline()
    gps_lines = gf.readlines()
    gps_times = []
    for line in gps_lines:
        gps_times.append(float(line.split(',')[0]))

In [None]:
with open(osp.join(root, 'applanix', 'smrmsg.out'), 'rb') as f:
    smr_fc = f.read()
smr_size = 10 * 8  # size of each line of smrmsg.out in bytes
smr_data_all = []
smr_data_times = []
for i in range(len(smr_fc) // smr_size):
    smr_data_all.append(struct.unpack("d" * 10, smr_fc[i * smr_size: (i + 1) * smr_size]))
    smr_data_times.append(smr_data_all[-1][0])

def get_smr_data_at_time(time, gps_times, gps_lines):
    idx = np.searchsorted(gps_times, time)
    if idx >= len(gps_times):
        idx = len(gps_times) - 1
    d = abs(gps_times[idx] - time)
    if gps_times[idx] < time and idx < len(gps_times) - 1:
        if abs(gps_times[idx + 1] - time) < d:
            idx += 1
    elif gps_times[idx] > time and idx > 0:
        if abs(gps_times[idx - 1] - time) < d:
            idx -= 1

    closest = idx
    gt_time = gps_times[closest]

    def _interpolate(lower, upper, t):
        assert(len(lower) == len(upper))
        tlow = lower[0]
        tupp = upper[0]
        assert(tlow < t and t < tupp)
        delta = tupp - tlow
        if delta == 0:
            return lower
        ratio = (t - tlow) / delta
        out = []
        for low, upp in zip(lower, upper):
            out.append(low + (upp - low) * ratio)
        out[0] = t
        return out

    line = gps_lines[closest]
    if gt_time < time:
        if closest == len(gps_lines) - 1:
            return line
        line_lower = line
        line_upper = gps_lines[closest + 1]
        return _interpolate(line_lower, line_upper, time)
    elif gt_time > time:
        if closest == 0:
            return line
        line_lower = gps_lines[closest - 1]
        line_upper = line
        return _interpolate(line_lower, line_upper, time)
    elif gt_time == time:
        return line

## Extract Pose and Covariances from SBET and SMRMSG

In [None]:
with open(osp.join(root, 'applanix', 'sbet.out'), 'rb') as f:
    sbet_fc = f.read()
with open(osp.join(root, 'applanix', 'ros_and_gps_time.csv')) as f:
    lines = f.readlines()
    start_time = float(lines[1].split(',')[1]) - 20
    end_time = float(lines[-1].split(',')[1]) + 20
print('start_time_gps: {}'.format(start_time))
print('end_time_gps: {}'.format(end_time))
dt = datetime.fromtimestamp(start_time)
weekday = dt.isoweekday()
if weekday == 7:
    weekday = 0
g2 = weekday * 24 * 3600 + dt.hour * 3600 + dt.minute * 60 + dt.second + dt.microsecond * 1e-6
start_week = round(start_time - g2)
gpstime0 = struct.unpack("d", sbet_fc[:8])[0]
print('gpstime0: {}'.format(gpstime0))
# get timezone offset:
# Toronto time is GMT-4 or GMT-5 depending on time of year
time_zone_offset = 5 * 3600
for period in dst:
    if period[0] < start_time and start_time < period[1]:
        time_zone_offset = 4 * 3600

time_zone_offset = 0

if 'boreas-2021-01-19-15-08' in root:
    time_zone_offset = 4 * 3600  # hardcode timezone offset for this sequence only

print('START WEEK: {} TIME ZONE OFFSET: {}'.format(start_week, time_zone_offset))
start_gps = start_time + time_zone_offset - start_week
end_gps = end_time + time_zone_offset - start_week
if start_time > changeovertime:
    start_gps += 18  # UTC --> GPS
    end_gps += 18
print('start_gps: {} end_gps: {}'.format(start_gps, end_gps))

sbet_size = 17 * 8
# smr_size = 10 * 8  # size of each line of smrmsg.out in bytes

sbet_times = []
sbet_data_all = []

poses = []
positions = []
vels = []
position_covs = []
vel_covs = []

for i in list(range(len(sbet_fc) // sbet_size))[::skip]:
    # SBET Data:
    data = struct.unpack("d" * 17, sbet_fc[i * sbet_size: (i + 1) * sbet_size])
    if data[0] < start_gps or data[0] > end_gps:
        continue
    sbet_times.append(data[0])
    sbet_data_all.append(data)
    # convert lat, lng to Easting, Northing in UTM
    latitude = data[1]
    longitude = data[2]
    lla = np.array([data[1], data[2], data[3]], dtype=np.float64)
    T_nedref_ned = RelLLAtoNED(lla, lla0)

    # what we get from sbet orientation (position of pos w.r.t. ned is zero)
    # note: pos frame is applanix frame here
    # heading_wander = data[9]
    alpha = data[10]  # Wander angle
    # heading_ned = heading_wander - alpha
    # roll_ned = data[7]
    # pitch_ned = data[8]

    smr_data = get_smr_data_at_time(data[0], smr_data_times, smr_data_all)
    T_pos_enuref, C_enuref_ned, R_pos_enu, R_vel_enu = get_enu_covariance(data, smr_data)
    # T_pos_enuref, Sigma = get_T_v_i_covariance_from_nedrph_covariance(data, smr_data)
    T_enuref_pos = get_inverse_tf(T_pos_enuref)
    

    v_x = data[4]
    v_y = data[5]
    v_n = v_x * np.cos(alpha) - v_y * np.sin(alpha)   # Velocity in north direction
    v_e = -v_x * np.sin(alpha) - v_y * np.cos(alpha)  # Velocity in east direction
    
    # rotate velocity from local ned to reference ned
    v_nedref = np.matmul(T_nedref_ned[:3, :3], np.array([v_n, v_e, -data[6]], np.float64))
    v_e = v_nedref[1]
    v_n = v_nedref[0]
    v_up = -v_nedref[2]

    

    # v = T_pos_enuref[:3, :3] @ np.array([v_e, v_n, v_up]).reshape(3, 1)
    # a = np.array([data[11], data[12], data[13]]).reshape(3, 1)
    # w = np.array([data[14], data[15], data[16]]).reshape(3, 1)
    # wa = (w - w_prev) * SBET_RATE / skip  # finite difference approximation

    # v_rms = np.array([smr_data[5], smr_data[4], smr_data[6]]).reshape(3, 1)
    # v_cov = np.diag(v_rms.squeeze()**2)
    # C = T_pos_enuref[:3, :3]
    # v_cov = C @ v_cov @ C.T

    poses.append(T_pos_enuref)
    positions.append(T_enuref_pos[:3, 3].squeeze())
    vels.append(np.array([v_e, v_n, v_up]))
    position_covs.append(R_pos_enu)
    vel_covs.append(R_vel_enu)
    

In [None]:
poses = np.array(poses)
positions = np.array(positions)
vels = np.array(vels)
position_covs = np.array(position_covs)
vel_covs = np.array(vel_covs)

## Get Raw IMU Measurements

In [16]:
with open(osp.join(root, 'applanix', 'imu_raw.csv'), 'r') as f:
    h = f.readline()
    imu_raw_lines = f.readlines()

imu_lines = []
for line in imu_raw_lines:
    line = [float(x) for x in line.rstrip().split(',')]
    imu_lines.append(line)

imu_data_raw = np.array(imu_lines)
imu_data_raw_times = imu_data_raw[::skip, 0]
imu_data_raw = imu_data_raw[::skip, 1:]
assert imu_data_raw.shape[0] == poses.shape[1]
imu_raw = np.copy(imu_data_raw)

NameError: name 'osp' is not defined

In [None]:
## TODO: Jacobians of accelerometer error wrt x, S, b
## TODO: check the analytical Jacobians against numerical

In [13]:
Ak = get_tran_wnoj(dt, dim=3)
qcd = np.ones(3)
Qk = get_q_wnoj(dt, qcd)

# TODO: build system of equations, solve for delta_x, delta_b, delta_S, iterate to convergence.