In [1]:
# Import some stuff

import os
import sys
import glob
import numpy as np
import pandas as pd
import math
# from pathlib import Path

from evo.core import metrics
from evo.core.trajectory import PoseTrajectory3D

In [2]:
# Download NTUVIRAL Groundtruth,
!rm -rf ntuviral_gt
!git clone https://github.com/ntu-aris/ntuviral_gt

# Set the ground truth path here
gtgen_res_path = 'ntuviral_gt'

Cloning into 'ntuviral_gt'...
remote: Enumerating objects: 38, done.[K
remote: Counting objects: 100% (38/38), done.[K
remote: Compressing objects: 100% (20/20), done.[K
remote: Total 38 (delta 0), reused 38 (delta 0), pack-reused 0[K
Unpacking objects: 100% (38/38), 4.78 MiB | 8.65 MiB/s, done.


In [3]:
# Or set the path to a different path
# gtgen_res_path = '/home/tmn/DATASETS/NTUVIRAL/ntuviral_gt'

In [4]:
# Downdload sample fast-lio2 estimate
!rm -rf fastlio2_sample
!git clone https://github.com/ntu-aris/fastlio2_sample

# Set the path to the logs of your slam estimate
slam_est_path = 'fastlio2_sample'

# Set the directory where results are exported
output_dir = slam_est_path + '/analysis'
os.makedirs(output_dir, exist_ok=True)

Cloning into 'fastlio2_sample'...
remote: Enumerating objects: 38, done.[K
remote: Counting objects: 100% (38/38), done.[K
remote: Compressing objects: 100% (20/20), done.[K
remote: Total 38 (delta 0), reused 38 (delta 0), pack-reused 0[K
Unpacking objects: 100% (38/38), 7.19 MiB | 12.25 MiB/s, done.


In [5]:
# Conversion between quat and rotm
from evo.core.transformations import quaternion_matrix as quat2rotm_

def quat2rotm(q):
    return quat2rotm_(q)[0:3, 0:3]

# region Customizable for each method ---------------------

# Offset from body center to the prism
t_B_prism = np.array([-0.293656, -0.012288, -0.273095]).reshape((3,1))

# endregion Customizable for each method -----------------

algo_name = 'fastlio2'

# Minimum completeness to judge ate
min_completeness = 90.0

In [6]:
# Search for the sequence name
gndtr_logs = glob.glob(gtgen_res_path + '/**/ground_truth.csv', recursive=True)
gndtr_logs = sorted(gndtr_logs)
gndtr_df   = pd.DataFrame([str(x) for x in gndtr_logs], columns=['fullpath'])

# Check for the sequence
def decode_gndtr_sequence_name(x):
    sequence = x.split('/')[-2]
    return sequence

gndtr_df['sequence'] = gndtr_df['fullpath'].apply(lambda x : decode_gndtr_sequence_name(x))

In [7]:
# Create the ground truth array for each sequence
def load_csv(log):
    
    # Open the log
    data = np.loadtxt(log, delimiter=',', skiprows=1)

    # Pose stamp
    t    = data[:, 2]/1e9
    xyz  = data[:, 3:6]
    quat = data[:, 6:10]

    pose_stamp = np.empty((data.shape[0], 8))
    pose_stamp[:, 0]   = t
    pose_stamp[:, 1:4] = xyz
    pose_stamp[:, 4:8] = quat
    
    # Create the spline
    return pose_stamp

gndtr_df['pose_stamped'] = gndtr_df['fullpath'].apply(lambda x : load_csv(x))

In [8]:
# Search for the estimates
slam_est_logs = glob.glob(slam_est_path + '/**/odometry.csv', recursive=True)
slam_est_logs = sorted(slam_est_logs)
est_df = pd.DataFrame([str(x) for x in slam_est_logs if 'result_' in str(x)], columns=['fullpath'])

# Decode the sequence
def decode_est_sequence_name(x):
    dirname = os.path.dirname(x)
    seqname = dirname.split('/')[-1].replace('result_', '')
    return seqname

est_df['sequence'] = est_df['fullpath'].apply(lambda x : decode_est_sequence_name(x))

In [9]:
def getGTMinTime(x):
    print(x)
    return gndtr_df[gndtr_df['sequence'] == x]['pose_stamped'].iloc[0][0, 0]

def getGTMaxTime(x):
    print(x)
    return gndtr_df[gndtr_df['sequence'] == x]['pose_stamped'].iloc[0][-1, 0]

est_df['t_min'] = est_df['sequence'].apply( lambda x : getGTMinTime(x) )
est_df['t_max'] = est_df['sequence'].apply( lambda x : getGTMaxTime(x) )

eee_01
eee_02
eee_03
nya_01
nya_02
nya_03
rtp_01
rtp_02
rtp_03
sbs_01
sbs_02
sbs_03
spms_01
spms_02
spms_03
tnp_01
tnp_02
tnp_03
eee_01
eee_02
eee_03
nya_01
nya_02
nya_03
rtp_01
rtp_02
rtp_03
sbs_01
sbs_02
sbs_03
spms_01
spms_02
spms_03
tnp_01
tnp_02
tnp_03


In [10]:
# Load the estimate data
est_df['data_est'] = est_df['fullpath'].apply(lambda x : np.loadtxt(x, delimiter=',', skiprows=1))

def extract_est_data(data, t_min, t_max, path = None):
    
    # print(t_min, 'to', t_max)
    t = data[:, 0]/1.0e9
    P = data[:, 3:6]
    Q = data[:, [9, 6, 7, 8]]
    idx_intime = [ idx for idx in range(0, len(t)) if t[idx] >= t_min and t[idx] <= t_max ]

    t = t[idx_intime]
    P = P[idx_intime, :]
    Q = Q[idx_intime, :]

    # Add the pose offset
    for n in range(0, len(P)):
        P[n, :] = P[n, :] + (quat2rotm(Q[n, :]).dot(t_B_prism)).transpose()

    return (t, P, Q)

est_df['t_est'], est_df['P_est'], est_df['Q_est'] = zip(*map(extract_est_data, est_df['data_est'], est_df['t_min'], est_df['t_max'], est_df['fullpath']))

In [11]:
# Sample the ground truth pose against the estimate.

from warnings import warn
from evo.core.trajectory import PoseTrajectory3D
from evo.core.transformations import quaternion_slerp as qslerp

def lerp(a, b, t):
    return (1.0-t)*a + t*b

# We need to do linear interpolation, ourselves...
def cv_interp(traj, ref_timestamps, *, extrapolate_past_end=False):
    "Compute points along trajectory *est* at *timestamps* or trajectory *ref*."
    # Accept trajectories 
    if hasattr(ref_timestamps, 'timestamps'):
        ref_timestamps = ref_timestamps.timestamps

    ref_timestamps = np.array(ref_timestamps, copy=True)
    
    est_tran = traj.positions_xyz
    est_quat = traj.orientations_quat_wxyz
    
    # Index of closest next estimated timestamp for each pose in *traj*.
    # Must be >= 1 since we look at the previous pose.
    i = 1

    for j, t_ref in enumerate(ref_timestamps):
        while i < traj.num_poses and traj.timestamps[i] <= t_ref:
            i += 1
        if not extrapolate_past_end and traj.num_poses <= i:
            warn('reference trajectory ends after estimated, cut short')
            break
        t_prev = traj.timestamps[i-1]
        t_next = traj.timestamps[i]
        td     = (t_ref - t_prev) / (t_next - t_prev)
        quat_j = qslerp(est_quat[i-1], est_quat[i], td)
        tran_j = lerp(est_tran[i-1], est_tran[i], td)
        yield np.r_[t_ref, tran_j, quat_j]

def trajectory_interpolation(traj, timestamps):
    poses = np.array(list(cv_interp(traj, timestamps)))
    times = poses[:, 0]
    trans = poses[:, 1:4]
    quats = poses[:, 4:8]
    return PoseTrajectory3D(trans, quats, times, meta=traj.meta)

# Make the associable trajectory
def make_traj(x):
    
    t_gt = gndtr_df[gndtr_df['sequence'] == x['sequence']]['pose_stamped'].iloc[0][:, 0]    
    valid = []
    for n in range(0, len(x['t_est'])):
        sample_time = x['t_est'][n]
        min_diff = np.min(np.abs(t_gt - sample_time))
        if min_diff < 0.05:
            valid.append(n)
    
    t_valid = x['t_est'][valid]
    P_valid = x['P_est'][valid, :]
    Q_valid = x['Q_est'][valid, :]
    
    return PoseTrajectory3D(P_valid, Q_valid, t_valid)
                            
est_df['traj_est'] = est_df.apply(lambda x : make_traj(x), axis=1)

def makeGTTraj(x):
    pose_stamped = gndtr_df[gndtr_df['sequence'] == x['sequence']]['pose_stamped'].iloc[0]
    gtr_traj = PoseTrajectory3D(pose_stamped[:, 1:4], pose_stamped[:, 4:8], pose_stamped[:, 0])
    return trajectory_interpolation(gtr_traj, x['traj_est'].timestamps)

est_df['traj_gtrest'] = est_df.apply(lambda x : makeGTTraj(x), axis=1)

est_df['P_gtrest'] = est_df['traj_gtrest'].apply(lambda x : x.positions_xyz)
est_df['Q_gtrest'] = est_df['traj_gtrest'].apply(lambda x : x.orientations_quat_wxyz[:, [3, 0, 1, 2]])
est_df['t_gtrest'] = est_df['traj_gtrest'].apply(lambda x : x.timestamps)


  data /= math.sqrt(numpy.dot(data, data))


In [12]:
# Align the traj
for i in est_df.index:
    est_df.at[i, 'traj_est'].align(est_df.at[i, 'traj_gtrest'])

In [13]:
# Calculate the ATE
def calculate_metric(traj_gtr, traj_est):
    metric = metrics.APE(pose_relation=metrics.PoseRelation.translation_part)
    metric.process_data((traj_gtr, traj_est))
    return float(metric.get_result(ref_name='reference', est_name='estimate').stats['rmse'])

est_df['ate_est'] = est_df.apply(lambda x : calculate_metric(x['traj_gtrest'], x['traj_est']), axis=1)
est_df['completeness_est'] = est_df.apply(lambda x : round((x['t_est'][-1] -  x['t_min']) / (x['t_max'] - x['t_min'])*100, 0), axis=1)

In [17]:
# Extracting the data to generate table of all sequences
ate_df = est_df.loc[:, ['sequence', 'ate_est', 'completeness_est', 'fullpath']]
ate_df['algo'] = algo_name

def check_if_complete(x):
    if x['ate_est'] < 20 and x['completeness_est'] < min_completeness:
        return float('inf')
    else:
        return x['ate_est']

ate_df['ate_est'] = ate_df.apply(lambda x : check_if_complete(x), axis=1)
def make_ate_txt(odom):
        return f'{round(odom, 3):.03f}'

ate_df['report'] = ate_df.apply(lambda x : make_ate_txt(x['ate_est']), axis=1)
# Show the result
ate_df

Unnamed: 0,sequence,ate_est,completeness_est,fullpath,algo,report
0,eee_01,0.068978,100.0,fastlio2_sample/result_eee_01/odometry.csv,fastlio2,0.069
1,eee_02,0.068572,100.0,fastlio2_sample/result_eee_02/odometry.csv,fastlio2,0.069
2,eee_03,0.110743,100.0,fastlio2_sample/result_eee_03/odometry.csv,fastlio2,0.111
3,nya_01,0.053233,100.0,fastlio2_sample/result_nya_01/odometry.csv,fastlio2,0.053
4,nya_02,0.089855,100.0,fastlio2_sample/result_nya_02/odometry.csv,fastlio2,0.09
5,nya_03,0.108449,100.0,fastlio2_sample/result_nya_03/odometry.csv,fastlio2,0.108
6,rtp_01,0.125244,100.0,fastlio2_sample/result_rtp_01/odometry.csv,fastlio2,0.125
7,rtp_02,0.131418,100.0,fastlio2_sample/result_rtp_02/odometry.csv,fastlio2,0.131
8,rtp_03,0.136501,100.0,fastlio2_sample/result_rtp_03/odometry.csv,fastlio2,0.137
9,sbs_01,0.08632,100.0,fastlio2_sample/result_sbs_01/odometry.csv,fastlio2,0.086


In [None]:
# Making a latex export
latex_df = ate_df[['sequence', 'report', 'completeness_est']]

# Writing the report
print(latex_df.to_latex(index_names=['#'], header=['Seq', 'ATE', '%']))