In [None]:
import os
import numpy as np
from utils.extract_topics import McapTimeWindowExtractor

from scipy.spatial.transform import Rotation
from typing import List, Dict, Optional

In [2]:
extractor = McapTimeWindowExtractor(
    mcap_path="dataset/bag2_forsyth_street_all.mcap",
    camera_topic="/cam_sync/cam0/image_raw/compressed",
    lidar_topic="/ouster/points",
    imu_topic="/vectornav/imu_uncompensated",
    start_time=10.0,    # Start at 10 seconds
    duration=5.0        # Extract 5 seconds worth
)
extractor.extract()

Reading MCAP: dataset/bag2_forsyth_street_all.mcap
Camera topic: /cam_sync/cam0/image_raw/compressed
LiDAR topic:  /ouster/points
IMU topic:    /vectornav/imu_uncompensated

Extracting time window:
  Start: +10.0s from bag start
  Duration: 5.0s


Extracting: 1150msg [00:07, 156.60msg/s]


  Extraction complete:
  Camera frames: 100
  LiDAR scans:   50
  IMU samples:   1000
  Actual time span: 4.94s





In [None]:
class IMUPreintegrator:
    def __init__(self,
                 max_time_window: float = 0.1,
                 gravity: np.ndarray = np.array([0, 0, 9.81]),
                 use_bias_correction: bool = False,
                 acc_bias: np.ndarray = np.zeros(3),
                 gyro_bias: np.ndarray = np.zeros(3),
                 verbose: bool = False):
        
        self.max_time_window = max_time_window
        self.gravity = gravity
        self.use_bias_correction = use_bias_correction
        self.acc_bias = acc_bias
        self.gyro_bias = gyro_bias
        self.verbose = verbose
        
        self.delta_R = None
        self.delta_v = None
        self.delta_p = None
        self.delta_t = None
        
    def preintegrate(self, 
                     imu_measurements: List[Dict],
                     t_start: float,
                     t_end: float) -> Dict:
        
        if len(imu_measurements) == 0:
            raise ValueError("No IMU measurements provided")
        
        if abs(t_end - t_start) > self.max_time_window:
            if self.verbose:
                print(f"Warning: time window {t_end - t_start:.4f}s exceeds max {self.max_time_window}s")
        
        self.delta_t = t_end - t_start
        
        self.delta_R = Rotation.identity()
        self.delta_v = np.zeros(3)
        self.delta_p = np.zeros(3)
        
        if self.verbose:
            print(f"Preintegrating {len(imu_measurements)} IMU measurements")
            print(f"Time window: {self.delta_t*1000:.2f} ms")
        
        prev_time = t_start
        
        for i, imu in enumerate(imu_measurements):
            t_imu = imu['timestamp_sec']
            
            dt = t_imu - prev_time
            
            if dt <= 0:
                continue
            
            acc = imu['linear_acceleration'].copy()
            gyro = imu['angular_velocity'].copy()
            
            if self.use_bias_correction:
                acc -= self.acc_bias
                gyro -= self.gyro_bias
            
            self._integrate_step(acc, gyro, dt)
            
            prev_time = t_imu
        
        if self.verbose:
            print(f"Final rotation: {self._rotation_to_euler_deg()}")
            print(f"Final position: {self.delta_p}")
            print(f"Final velocity: {self.delta_v}")
        
        return self.get_result()
    
    def _integrate_step(self, acc: np.ndarray, gyro: np.ndarray, dt: float):
        
        delta_angle = gyro * dt
        delta_rotation = Rotation.from_rotvec(delta_angle)
        
        acc_rotated = self.delta_R.apply(acc)
        
        acc_global = acc_rotated - self.gravity
        
        self.delta_p += self.delta_v * dt + 0.5 * acc_global * dt**2
        
        self.delta_v += acc_global * dt
        
        self.delta_R = self.delta_R * delta_rotation
    
    def get_result(self) -> Dict:
        return {
            'delta_R': self.delta_R.as_matrix(),
            'delta_v': self.delta_v,
            'delta_p': self.delta_p,
            'delta_t': self.delta_t,
            'rotation_euler': self._rotation_to_euler_deg()
        }
    
    def get_transform_matrix(self) -> np.ndarray:
        T = np.eye(4)
        T[:3, :3] = self.delta_R.as_matrix()
        T[:3, 3] = self.delta_p
        return T
    
    def _rotation_to_euler_deg(self) -> np.ndarray:
        return self.delta_R.as_euler('xyz', degrees=True)
    
    def reset(self):
        self.delta_R = None
        self.delta_v = None
        self.delta_p = None
        self.delta_t = None

In [None]:
if len(extractor.imu_measurements) > 1:
    imu_dt = (extractor.imu_measurements[-1]['timestamp_sec'] - 
                extractor.imu_measurements[0]['timestamp_sec'])
    imu_freq = len(extractor.imu_measurements) / imu_dt
    print(f"IMU frequency: {imu_freq:.1f} Hz")

print("TESTING IMU PREINTEGRATION")

t_camera = extractor.camera_frames[0]['timestamp_sec']

lidar_times = np.array([scan['timestamp_sec'] for scan in extractor.lidar_scans])
closest_lidar_idx = np.argmin(np.abs(lidar_times - t_camera))
t_lidar = extractor.lidar_scans[closest_lidar_idx]['timestamp_sec']

print(f"Camera timestamp: {t_camera:.6f}")
print(f"LiDAR timestamp:  {t_lidar:.6f}")
print(f"Time offset:      {(t_lidar - t_camera)*1000:.2f} ms")

imu_times = np.array([imu['timestamp_sec'] for imu in extractor.imu_measurements])
imu_mask = (imu_times >= min(t_camera, t_lidar)) & (imu_times <= max(t_camera, t_lidar))
imu_indices = np.where(imu_mask)[0]
imu_between = [extractor.imu_measurements[i] for i in imu_indices]

print(f"IMU samples between: {len(imu_between)}")

if len(imu_between) < 2:
    print("ERROR: Not enough IMU samples between camera and lidar!")
else:
    integrator = IMUPreintegrator(
        max_time_window=0.2,
        gravity=np.array([0, 0, 9.81]),
        verbose=True
    )
    
    result = integrator.preintegrate(
        imu_measurements=imu_between,
        t_start=min(t_camera, t_lidar),
        t_end=max(t_camera, t_lidar)
    )
    

    print("PREINTEGRATION RESULT")

    print(f"Delta time: {result['delta_t']*1000:.2f} ms")
    print(f"\nRotation (euler xyz): {result['rotation_euler']} degrees")
    print(f"\nPosition change:\n{result['delta_p']}")
    print(f"\nVelocity change:\n{result['delta_v']}")
    print(f"\nRotation matrix:\n{result['delta_R']}")
    
    T = integrator.get_transform_matrix()
    print(f"\n4x4 Transform matrix:\n{T}")