In [34]:
%pip install pillow numpy rosbags plotly nbformat scipy

Collecting scipy
  Downloading scipy-1.16.3-cp311-cp311-win_amd64.whl.metadata (60 kB)
     ---------------------------------------- 0.0/60.8 kB ? eta -:--:--
     ---------------------------------------- 60.8/60.8 kB 1.6 MB/s eta 0:00:00
Downloading scipy-1.16.3-cp311-cp311-win_amd64.whl (38.7 MB)
   ---------------------------------------- 0.0/38.7 MB ? eta -:--:--
    --------------------------------------- 0.6/38.7 MB 12.9 MB/s eta 0:00:03
   - -------------------------------------- 1.6/38.7 MB 17.0 MB/s eta 0:00:03
   --- ------------------------------------ 3.1/38.7 MB 22.0 MB/s eta 0:00:02
   ----- ---------------------------------- 5.0/38.7 MB 26.9 MB/s eta 0:00:02
   ------ --------------------------------- 6.7/38.7 MB 28.8 MB/s eta 0:00:02
   -------- ------------------------------- 8.4/38.7 MB 30.0 MB/s eta 0:00:02
   ---------- ----------------------------- 10.0/38.7 MB 30.4 MB/s eta 0:00:01
   ----------- ---------------------------- 11.3/38.7 MB 32.7 MB/s eta 0:00:01
   -


[notice] A new release of pip is available: 24.0 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [1]:
from rosbags.highlevel import AnyReader
from rosbags.typesys import Stores, get_typestore
from PIL import Image
import numpy as np
from pathlib import Path


typestore = get_typestore(Stores.LATEST)

In [87]:
def luminar_cloud_to_struct(msg):
    """
    Parse Luminar-style PointCloud2 into a structured NumPy array.

    Fields (based on your PointField list):
    - timestamp (uint64 encoded as 8 uint8)
    - x, y, z, reflectance (float32)
    - return_index, last_return_index, sensor_id (uint8)
    - azimuth, elevation, depth (float32)
    - line_index (uint16)
    - frame_index, detector_site_id, scan_checkpoint,
      existence_probability_percent, data_qualifier, blockage_level (uint8)
    """
    point_step = msg.point_step
    n_points = msg.width * msg.height

    dtype = np.dtype({
        'names': [
            'timestamp', 'x', 'y', 'z', 'reflectance',
            'return_index', 'last_return_index', 'sensor_id',
            'azimuth', 'elevation', 'depth',
            'line_index', 'frame_index', 'detector_site_id',
            'scan_checkpoint', 'existence_prob',
            'data_qualifier', 'blockage_level'
        ],
        'formats': [
            '<u8',   # timestamp (8 bytes, little-endian uint64)
            '<f4',   # x
            '<f4',   # y
            '<f4',   # z
            '<f4',   # reflectance
            'u1',    # return_index
            'u1',    # last_return_index
            'u1',    # sensor_id
            '<f4',   # azimuth
            '<f4',   # elevation
            '<f4',   # depth
            '<u2',   # line_index
            'u1',    # frame_index
            'u1',    # detector_site_id
            'u1',    # scan_checkpoint
            'u1',    # existence_probability_percent
            'u1',    # data_qualifier
            'u1',    # blockage_level
        ],
        'offsets': [
            0,   # timestamp
            8,   # x
            12,  # y
            16,  # z
            20,  # reflectance
            24,  # return_index
            25,  # last_return_index
            26,  # sensor_id
            32,  # azimuth
            36,  # elevation
            40,  # depth
            44,  # line_index
            46,  # frame_index
            47,  # detector_site_id
            48,  # scan_checkpoint
            49,  # existence_probability_percent
            50,  # data_qualifier
            51,  # blockage_level
        ],
        'itemsize': point_step,
    })

    arr = np.frombuffer(msg.data, dtype=dtype, count=n_points)

    # Optionally filter obvious junk here (e.g., existence_prob == 0)
    return arr


In [88]:
TWO_PI = 2.0 * np.pi

def compute_ray_id(arr, azimuth_bins=36000):
    """
    Compute a stable ray identifier from line_index and azimuth.
    - azimuth assumed in radians.
    - azimuth_bins: number of bins over 0..2π (e.g. 36000 -> 0.01° per bin)

    Returns: int64 array ray_id
    """
    az = arr['azimuth']
    # Wrap to [0, 2π)
    az_wrapped = np.mod(az, TWO_PI)

    # Quantize azimuth to a bin index
    az_idx = np.floor(az_wrapped * (azimuth_bins / TWO_PI)).astype(np.int64)
    az_idx = np.clip(az_idx, 0, azimuth_bins - 1)

    line_idx = arr['line_index'].astype(np.int64)  # uint16

    # Pack line_index and az_idx into one 64-bit integer:
    # [ line_index (upper 16 bits) | az_idx (lower bits) ]
    ray_id = (line_idx << 32) | az_idx  # 32 bits more than enough for az_idx

    return ray_id


In [None]:
def interpolate_luminar_frames(arr0, arr1, t=0.5, azimuth_bins=36000):
    """
    Interpolate between two Luminar PointCloud2 frames at time fraction t in [0,1].

    Returns:
        xyz_mid: (M, 3) float32
        reflectance_mid: (M,) float32
        timestamp_mid: (M,) uint64 (interpolated)
        meta0_idx: indices into msg0's points for the matched/interpolated subset
        meta1_idx: indices into msg1's points for the matched/interpolated subset
    """
    if len(arr0) == 0 or len(arr1) == 0:
        # Nothing to interpolate
        return (np.empty((0, 3), dtype=np.float32),
                np.empty((0,), dtype=np.float32),
                np.empty((0,), dtype=np.uint64),
                np.empty((0,), dtype=np.int64),
                np.empty((0,), dtype=np.int64))

    # --- Build ray IDs ---
    ray0 = compute_ray_id(arr0, azimuth_bins=azimuth_bins)
    ray1 = compute_ray_id(arr1, azimuth_bins=azimuth_bins)

    # Pack (ray_id, return_index) into one key so we can intersect
    key0 = (ray0 << 3) | arr0['return_index'].astype(np.int64)
    key1 = (ray1 << 3) | arr1['return_index'].astype(np.int64)

    # --- Find matches between frames ---
    # intersect1d gives the common keys and the indices in each array
    common_keys, idx0, idx1 = np.intersect1d(key0, key1, assume_unique=False,
                                             return_indices=True)

    if common_keys.size == 0:
        return (np.empty((0, 3), dtype=np.float32),
                np.empty((0,), dtype=np.float32),
                np.empty((0,), dtype=np.uint64),
                idx0, idx1)

    A0 = arr0[idx0]
    A1 = arr1[idx1]

    # --- Extract fields to interpolate ---
    x0 = A0['x'].astype(np.float32)
    y0 = A0['y'].astype(np.float32)
    z0 = A0['z'].astype(np.float32)

    x1 = A1['x'].astype(np.float32)
    y1 = A1['y'].astype(np.float32)
    z1 = A1['z'].astype(np.float32)

    refl0 = A0['reflectance'].astype(np.float32)
    refl1 = A1['reflectance'].astype(np.float32)

    ts0 = A0['timestamp'].astype(np.uint64)
    ts1 = A1['timestamp'].astype(np.uint64)

    # --- Linear interpolation ---
    t = float(t)
    xyz_mid = np.stack([
        (1.0 - t) * x0 + t * x1,
        (1.0 - t) * y0 + t * y1,
        (1.0 - t) * z0 + t * z1,
    ], axis=-1).astype(np.float32)

    reflectance_mid = ((1.0 - t) * refl0 + t * refl1).astype(np.float32)

    ts_mid = ((1.0 - t) * ts0.astype(np.float64) + t * ts1.astype(np.float64)).astype(np.uint64)

    return xyz_mid, reflectance_mid, ts_mid, idx0, idx1


In [76]:
import plotly.graph_objects as go
import plotly.io as pio


def show_cloud(*xyzs, max_points=200_000):
    """
    Interactive 3D scatter of a point cloud using Plotly.
    Downsamples if there are too many points.
    """
    data = []
    for xyz in xyzs:

        if len(xyz) == 0:
            raise ValueError("Empty point cloud")

        # Downsample for performance if needed
        if len(xyz) > max_points:
            step = len(xyz) // max_points
            xyz = xyz[::step]
        data.append(

        go.Scatter3d(
            x=xyz[:, 0],
            y=xyz[:, 1],
            z=xyz[:, 2],
            mode="markers",
            marker=dict(
                size=1,
                opacity=0.5,
            ),
        )
        )

    fig = go.Figure(
        data=data
    )

    fig.update_layout(
        scene=dict(
            xaxis_title="X",
            yaxis_title="Y",
            zaxis_title="Z",
            aspectmode="data",  # preserves scale
        ),
        margin=dict(l=0, r=0, b=0, t=0),
    )

    fig.show()


In [90]:
# extract all point clouds from rosbag and convert to xyz numpy arrays
with AnyReader([Path("dpt_rosbag_lvms_2024_12\\rosbag2_2024_12_12-18_21_55_12.mcap")], default_typestore=typestore) as reader:
    connections = [c for c in reader.connections if c.topic == "/luminar_front/points/existence_prob_filtered"]
    raw_messages = list(reader.messages(connections=connections))
    clouds = []
    for conn, timestamp, raw_msg in raw_messages:
        pc = reader.deserialize(raw_msg, conn.msgtype)
        arr = luminar_cloud_to_struct(pc)
        clouds.append((timestamp, arr))

In [None]:
c1 = clouds[1][1]
c2 = clouds[2][1]

xyz_mid, refl_mid, ts_mid, idx0, idx1 = interpolate_luminar_frames(
    c1, c2,
    t=0.5,
    azimuth_bins=36000,
)


In [102]:
def make_xyz(arr):
    xyz = np.stack([arr['x'], arr['y'], arr['z']], axis=-1).astype(np.float32)
    # filter y < -20 out
    mask = xyz[:, 1] > -20.0
    # filter x > 50 out
    mask &= xyz[:, 0] < 50.0
    return xyz[mask]


In [103]:
show_cloud(make_xyz(c1), make_xyz(c2), xyz_mid)