# 04 - NCLT Sensor Data Exploration

Explore IMU, GPS, odometry, and gyro data from the NCLT sensors addon dataset.

In [None]:
import sys
sys.path.insert(0, '..')

import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path

from src.datasets.sensor_loader import SessionSensorManager, SensorSynchronizer
from src.utils.imu_utils import (
    parse_imu_csv,
    integrate_gyroscope,
    imu_preintegration,
    gravity_alignment,
    compute_bias,
)
from src.utils.gps_utils import (
    lla_to_enu,
    parse_gps_csv,
    gps_to_pose,
    filter_gps_by_fix,
    haversine_distance,
)
from src.utils.io_utils import load_config

## 1. Load Sensor Data

In [None]:
config = load_config('../configs/dataset_config.yaml')
sensors_config = config['nclt']['sensors']
session_name = '2012-01-08'

# Resolve sensor data path: Kaggle -> local -> mock
kaggle_path = Path(config['nclt']['sensors_kaggle_path']) / session_name
local_path = Path(config['nclt']['sensors_local_path']) / session_name
mock_path = Path('../data/nclt_mock') / session_name

if kaggle_path.exists():
    sensor_dir = kaggle_path
    print(f'Using Kaggle sensor data: {sensor_dir}')
elif local_path.exists():
    sensor_dir = local_path
    print(f'Using local sensor data: {sensor_dir}')
elif mock_path.exists():
    sensor_dir = mock_path
    print(f'Using mock sensor data: {sensor_dir}')
else:
    raise FileNotFoundError(
        'Sensor data not found. Attach nclt-sensors-addon on Kaggle, '
        'download locally, or generate mock data.'
    )

manager = SessionSensorManager(sensor_dir, sensors_config)
print(f'\nSession: {session_name}')
print(f'Directory: {sensor_dir}')

# Check which sensors are available
sensor_names = ['imu', 'gps', 'odometry', 'kvh', 'ground_truth']
for name in sensor_names:
    loader = getattr(manager, name)
    status = 'available' if loader is not None else 'not found'
    print(f'  {name:15s}: {status}')

## 2. IMU Analysis

In [None]:
# Load IMU data
imu_csv = sensor_dir / sensors_config['imu']['filename']
imu_data = parse_imu_csv(imu_csv)

print(f'IMU samples: {len(imu_data["timestamps"])}')
print(f'Time span: {(imu_data["timestamps"][-1] - imu_data["timestamps"][0]) * 1e-6:.1f} s')

# Convert timestamps to seconds from start
t_imu = (imu_data['timestamps'] - imu_data['timestamps'][0]) * 1e-6

fig, axes = plt.subplots(3, 1, figsize=(14, 10), sharex=True)

# Accelerometer
labels_accel = ['accel_x', 'accel_y', 'accel_z']
for i, label in enumerate(labels_accel):
    axes[0].plot(t_imu, imu_data['accelerometer'][:, i], linewidth=0.5, label=label)
axes[0].set_ylabel('Acceleration (m/s^2)')
axes[0].set_title('Accelerometer')
axes[0].legend(loc='upper right')
axes[0].grid(True, alpha=0.3)

# Gyroscope
labels_gyro = ['rot_x', 'rot_y', 'rot_z']
for i, label in enumerate(labels_gyro):
    axes[1].plot(t_imu, imu_data['gyroscope'][:, i], linewidth=0.5, label=label)
axes[1].set_ylabel('Angular velocity (rad/s)')
axes[1].set_title('Gyroscope')
axes[1].legend(loc='upper right')
axes[1].grid(True, alpha=0.3)

# Magnetometer
labels_mag = ['mag_x', 'mag_y', 'mag_z']
for i, label in enumerate(labels_mag):
    axes[2].plot(t_imu, imu_data['magnetometer'][:, i], linewidth=0.5, label=label)
axes[2].set_ylabel('Magnetic field (Gauss)')
axes[2].set_title('Magnetometer')
axes[2].set_xlabel('Time (s)')
axes[2].legend(loc='upper right')
axes[2].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

In [None]:
# Compute gyroscope bias from initial static period
gyro_bias = compute_bias(
    imu_data['timestamps'],
    imu_data['gyroscope'],
    static_duration_us=5_000_000,
)
print(f'Gyroscope bias (rad/s): [{gyro_bias[0]:.6f}, {gyro_bias[1]:.6f}, {gyro_bias[2]:.6f}]')
print(f'Gyroscope bias (deg/s): [{np.degrees(gyro_bias[0]):.4f}, {np.degrees(gyro_bias[1]):.4f}, {np.degrees(gyro_bias[2]):.4f}]')

# Gravity alignment
R_align = gravity_alignment(imu_data['accelerometer'], num_samples=100)
print(f'\nGravity alignment rotation matrix:')
print(R_align)

# Check: apply alignment to mean accelerometer reading
mean_accel = np.mean(imu_data['accelerometer'][:100], axis=0)
aligned_accel = R_align @ mean_accel
print(f'\nMean accel (body frame):   [{mean_accel[0]:.4f}, {mean_accel[1]:.4f}, {mean_accel[2]:.4f}] m/s^2')
print(f'Mean accel (aligned):      [{aligned_accel[0]:.4f}, {aligned_accel[1]:.4f}, {aligned_accel[2]:.4f}] m/s^2')
print(f'Expected (gravity-aligned): [0.0000, 0.0000, -9.8100] m/s^2')

## 3. GPS Trajectory

In [None]:
# Parse GPS data
gps_csv = sensor_dir / sensors_config['gps']['filename']
gps_data = parse_gps_csv(gps_csv)

print(f'GPS records: {len(gps_data["timestamps"])}')
print(f'Time span: {(gps_data["timestamps"][-1] - gps_data["timestamps"][0]) * 1e-6:.1f} s')
print(f'Fix modes present: {np.unique(gps_data["mode"])}')
print(f'Lat range: [{gps_data["latitude"].min():.6f}, {gps_data["latitude"].max():.6f}]')
print(f'Lon range: [{gps_data["longitude"].min():.6f}, {gps_data["longitude"].max():.6f}]')

# Filter by fix quality (keep mode >= 1)
ts_filt, lat_filt, lon_filt, alt_filt = filter_gps_by_fix(
    gps_data['timestamps'],
    gps_data['mode'],
    gps_data['latitude'],
    gps_data['longitude'],
    gps_data['altitude'],
    min_mode=1,
)
print(f'After fix filter: {len(ts_filt)} / {len(gps_data["timestamps"])} records')

# Convert to ENU
enu = lla_to_enu(lat_filt, lon_filt, alt_filt)

# Build mode array for filtered points (for color-coding)
mode_mask = gps_data['mode'] >= 1
mode_filt = gps_data['mode'][mode_mask]

# Plot GPS trajectory
fig, ax = plt.subplots(1, 1, figsize=(10, 10))

scatter = ax.scatter(
    enu[:, 0], enu[:, 1],
    c=mode_filt, cmap='viridis', s=8, alpha=0.7,
)
cbar = plt.colorbar(scatter, ax=ax, label='Fix mode')
cbar.set_ticks(np.unique(mode_filt))

ax.plot(enu[0, 0], enu[0, 1], 'go', markersize=10, label='Start')
ax.plot(enu[-1, 0], enu[-1, 1], 'rs', markersize=10, label='End')

ax.set_xlabel('East (m)')
ax.set_ylabel('North (m)')
ax.set_title(f'GPS Trajectory (ENU) - {session_name}')
ax.set_aspect('equal')
ax.legend()
ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Compute total distance
total_dist = haversine_distance(
    gps_data['latitude'][0], gps_data['longitude'][0],
    gps_data['latitude'][-1], gps_data['longitude'][-1],
)
print(f'Start-to-end distance: {total_dist:.1f} m')

## 4. Odometry vs Ground Truth

In [None]:
# Load odometry and ground truth
odom_loader = manager.odometry
gt_loader = manager.ground_truth

if odom_loader is None or gt_loader is None:
    print('Odometry or ground truth data not available.')
else:
    odom_data = odom_loader.load()
    gt_data = gt_loader.load()

    print(f'Odometry: {len(odom_data)} samples')
    print(f'Ground truth: {len(gt_data)} samples')

    # Extract x, y positions
    odom_x = odom_data.values[:, odom_data.columns.index('x')]
    odom_y = odom_data.values[:, odom_data.columns.index('y')]

    gt_x = gt_data.values[:, gt_data.columns.index('x')]
    gt_y = gt_data.values[:, gt_data.columns.index('y')]

    # Plot both trajectories
    fig, ax = plt.subplots(1, 1, figsize=(10, 10))

    ax.plot(gt_x, gt_y, linewidth=1.0, label='Ground Truth', color='tab:blue')
    ax.plot(odom_x, odom_y, linewidth=1.0, label='Odometry', color='tab:orange', alpha=0.8)

    ax.plot(gt_x[0], gt_y[0], 'go', markersize=10, label='Start')
    ax.plot(gt_x[-1], gt_y[-1], 'rs', markersize=10, label='GT End')
    ax.plot(odom_x[-1], odom_y[-1], 'r^', markersize=10, label='Odom End')

    ax.set_xlabel('X (m)')
    ax.set_ylabel('Y (m)')
    ax.set_title(f'Odometry vs Ground Truth - {session_name}')
    ax.set_aspect('equal')
    ax.legend()
    ax.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.show()

    # Compute endpoint drift
    # Find the closest ground truth timestamp to the last odometry timestamp
    gt_end_val = gt_loader.query(int(odom_data.timestamps[-1]), window_us=1_000_000)
    if gt_end_val is not None:
        gt_end_x = gt_end_val[gt_data.columns.index('x')]
        gt_end_y = gt_end_val[gt_data.columns.index('y')]
        drift = np.sqrt((odom_x[-1] - gt_end_x) ** 2 + (odom_y[-1] - gt_end_y) ** 2)
        total_path = np.sum(np.sqrt(np.diff(gt_x) ** 2 + np.diff(gt_y) ** 2))
        print(f'Endpoint drift: {drift:.3f} m')
        print(f'Total path length (GT): {total_path:.1f} m')
        print(f'Drift rate: {drift / total_path * 100:.2f} %')
    else:
        drift = np.sqrt((odom_x[-1] - gt_x[-1]) ** 2 + (odom_y[-1] - gt_y[-1]) ** 2)
        print(f'Endpoint drift (approx): {drift:.3f} m')

## 5. IMU Preintegration

In [None]:
# Run IMU preintegration on first 1000 samples
n_samples = min(1000, len(imu_data['timestamps']))

preint = imu_preintegration(
    timestamps=imu_data['timestamps'][:n_samples],
    accel=imu_data['accelerometer'][:n_samples],
    gyro=imu_data['gyroscope'][:n_samples] - gyro_bias,  # bias-corrected
)

positions = preint['positions']
velocities = preint['velocities']
t_preint = (imu_data['timestamps'][:n_samples] - imu_data['timestamps'][0]) * 1e-6

print(f'Preintegrated {n_samples} IMU samples ({t_preint[-1]:.2f} s)')
print(f'Final position: [{positions[-1, 0]:.3f}, {positions[-1, 1]:.3f}, {positions[-1, 2]:.3f}] m')
print(f'Final velocity: [{velocities[-1, 0]:.3f}, {velocities[-1, 1]:.3f}, {velocities[-1, 2]:.3f}] m/s')

# Plot integrated position trajectory
fig, axes = plt.subplots(1, 2, figsize=(16, 6))

# 2D trajectory (X-Y)
axes[0].plot(positions[:, 0], positions[:, 1], linewidth=0.8, label='IMU preintegration')

# Compare with ground truth if available
if gt_loader is not None:
    gt_data_loaded = gt_loader.load()
    # Find ground truth within the same time window
    t_start = int(imu_data['timestamps'][0])
    t_end = int(imu_data['timestamps'][n_samples - 1])
    gt_slice = gt_loader.slice(t_start, t_end)
    if len(gt_slice) > 0:
        gt_xi = gt_slice.values[:, gt_slice.columns.index('x')]
        gt_yi = gt_slice.values[:, gt_slice.columns.index('y')]
        # Shift GT to start at origin for comparison
        axes[0].plot(
            gt_xi - gt_xi[0], gt_yi - gt_yi[0],
            linewidth=1.0, linestyle='--', label='Ground Truth',
        )

axes[0].plot(positions[0, 0], positions[0, 1], 'go', markersize=8, label='Start')
axes[0].plot(positions[-1, 0], positions[-1, 1], 'rs', markersize=8, label='End')
axes[0].set_xlabel('X (m)')
axes[0].set_ylabel('Y (m)')
axes[0].set_title('IMU Preintegrated Trajectory (X-Y)')
axes[0].set_aspect('equal')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# Position components over time
for i, label in enumerate(['x', 'y', 'z']):
    axes[1].plot(t_preint, positions[:, i], linewidth=0.8, label=label)
axes[1].set_xlabel('Time (s)')
axes[1].set_ylabel('Position (m)')
axes[1].set_title('IMU Preintegrated Position Components')
axes[1].legend()
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## 6. Sensor Synchronization

In [None]:
# Build a synchronizer using all available sensors
try:
    sync = manager.get_synchronizer()
    print(f'Synchronizer: {sync}')
except ValueError as e:
    print(f'Could not create synchronizer: {e}')
    sync = None

if sync is not None and gt_loader is not None:
    gt_data_loaded = gt_loader.load()
    # Use a subset of ground truth timestamps as reference
    n_query = min(200, len(gt_data_loaded))
    query_times = gt_data_loaded.timestamps[:n_query]

    # Get synchronized readings
    sample_synced = sync.get_synchronized(int(query_times[0]))
    print(f'\nSample synced data at t={query_times[0]}:')
    for name, values in sample_synced.items():
        if values is not None:
            print(f'  {name:15s}: {values[:3]}...' if len(values) > 3 else f'  {name:15s}: {values}')
        else:
            print(f'  {name:15s}: None (no data within window)')

    # Compute time alignment errors for each sensor
    # For each query timestamp, find the nearest sensor timestamp and measure the delta
    fig, ax = plt.subplots(1, 1, figsize=(12, 5))

    sensor_loaders = {
        'imu': manager.imu,
        'gps': manager.gps,
        'odometry': manager.odometry,
        'kvh': manager.kvh,
    }

    for sensor_name, loader in sensor_loaders.items():
        if loader is None:
            continue
        sensor_data = loader.load()
        deltas_ms = []
        for qt in query_times:
            qt_int = int(qt)
            # Find nearest timestamp
            idx = np.searchsorted(sensor_data.timestamps, qt_int)
            candidates = []
            if idx > 0:
                candidates.append(abs(int(sensor_data.timestamps[idx - 1]) - qt_int))
            if idx < len(sensor_data.timestamps):
                candidates.append(abs(int(sensor_data.timestamps[idx]) - qt_int))
            if candidates:
                deltas_ms.append(min(candidates) / 1000.0)  # us -> ms
        if deltas_ms:
            ax.hist(deltas_ms, bins=30, alpha=0.6, label=sensor_name)

    ax.set_xlabel('Time delta to nearest sample (ms)')
    ax.set_ylabel('Count')
    ax.set_title('Sensor Time Alignment (relative to ground truth timestamps)')
    ax.legend()
    ax.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.show()

## 7. Heading Comparison

In [None]:
kvh_loader = manager.kvh
gt_loader = manager.ground_truth

if kvh_loader is None and gt_loader is None:
    print('KVH and ground truth data not available - cannot compare headings.')
else:
    fig, ax = plt.subplots(1, 1, figsize=(14, 5))

    # KVH fiber optic gyro heading
    if kvh_loader is not None:
        kvh_data = kvh_loader.load()
        t_kvh = (kvh_data.timestamps - kvh_data.timestamps[0]) * 1e-6
        kvh_heading = kvh_data.values[:, kvh_data.columns.index('heading')]
        # Subsample for readability if too many points
        step_kvh = max(1, len(t_kvh) // 5000)
        ax.plot(
            t_kvh[::step_kvh], np.degrees(kvh_heading[::step_kvh]),
            linewidth=0.8, label='KVH gyro heading', alpha=0.8,
        )

    # Ground truth yaw
    if gt_loader is not None:
        gt_data_loaded = gt_loader.load()
        # Use KVH t0 as reference if available, else GT t0
        t0_ref = kvh_data.timestamps[0] if kvh_loader is not None else gt_data_loaded.timestamps[0]
        t_gt = (gt_data_loaded.timestamps - t0_ref) * 1e-6
        gt_yaw = gt_data_loaded.values[:, gt_data_loaded.columns.index('yaw')]
        ax.plot(
            t_gt, np.degrees(gt_yaw),
            linewidth=1.0, linestyle='--', label='Ground truth yaw', alpha=0.8,
        )

    # GPS track heading
    gps_loader = manager.gps
    if gps_loader is not None:
        gps_data_loaded = gps_loader.load()
        t0_ref = kvh_data.timestamps[0] if kvh_loader is not None else gps_data_loaded.timestamps[0]
        t_gps = (gps_data_loaded.timestamps - t0_ref) * 1e-6
        # GPS track heading is in degrees (column: track)
        gps_track = gps_data_loaded.values[:, gps_data_loaded.columns.index('track')]
        ax.plot(
            t_gps, gps_track,
            'o', markersize=3, label='GPS track heading', alpha=0.6,
        )

    ax.set_xlabel('Time (s)')
    ax.set_ylabel('Heading (degrees)')
    ax.set_title(f'Heading Comparison - {session_name}')
    ax.legend()
    ax.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.show()