In [19]:
#!/usr/bin/env python3
"""
Extract point cloud and paths from ROS2 bag files (using only the last message from each topic),
export to a GLB file, and display it in an IPython Notebook.

The point cloud is simulated as a set of small boxes (with adjustable size)
colored in grayscale based on their z height.
Path1 is colored red and path2 is colored blue.
If the point cloud is too large, it will be subsampled to avoid overloading the system.
Ground points (with z below a threshold) are removed for a cleaner point cloud.
"""

import numpy as np
import trimesh
import rosbag2_py
from rclpy.serialization import deserialize_message  # Updated import for ROS2 Humble
from sensor_msgs.msg import PointCloud2
from nav_msgs.msg import Path
import sensor_msgs_py.point_cloud2 as pc2

# For displaying in an IPython Notebook
from IPython.display import HTML, display

def extract_point_cloud(bag_path, topic):
    """Extract the point cloud data (x, y, z) from only the last message in the bag file for the given topic."""
    reader = rosbag2_py.SequentialReader()
    storage_options = rosbag2_py.StorageOptions(uri=bag_path, storage_id='sqlite3')
    converter_options = rosbag2_py.ConverterOptions(
        input_serialization_format='cdr',
        output_serialization_format='cdr'
    )
    reader.open(storage_options, converter_options)

    last_data = None
    while reader.has_next():
        topic_name, data, t = reader.read_next()
        if topic_name == topic:
            last_data = data
    if last_data is None:
        return np.array([])

    msg = deserialize_message(last_data, PointCloud2)
    points = []
    for p in pc2.read_points(msg, skip_nans=True, field_names=("x", "y", "z")):
        points.append(p)
    return np.array(points)

def extract_path(bag_path, topic):
    """Extract the path (as a set of poses) from only the last message from the bag file for the given topic."""
    reader = rosbag2_py.SequentialReader()
    storage_options = rosbag2_py.StorageOptions(uri=bag_path, storage_id='sqlite3')
    converter_options = rosbag2_py.ConverterOptions(
        input_serialization_format='cdr',
        output_serialization_format='cdr'
    )
    reader.open(storage_options, converter_options)

    last_data = None
    while reader.has_next():
        topic_name, data, t = reader.read_next()
        if topic_name == topic:
            last_data = data
    if last_data is None:
        return np.array([])

    msg = deserialize_message(last_data, Path)
    path_points = []
    for pose_stamped in msg.poses:
        pos = pose_stamped.pose.position
        path_points.append([pos.x, pos.y, pos.z])
    return np.array(path_points)

def create_path_cylinders(path_points, radius=0.05):
    """
    Create a single mesh that represents the path as a series of cylinders
    connecting consecutive waypoints.
    """
    cylinders = []
    for i in range(len(path_points) - 1):
        start = path_points[i]
        end = path_points[i + 1]
        vec = end - start
        length = np.linalg.norm(vec)
        if length == 0:
            continue
        # Create a cylinder along the Z axis (centered at origin)
        cyl = trimesh.creation.cylinder(radius=radius, height=length, sections=12)
        # Align the cylinder with the vector from start to end.
        z_axis = np.array([0, 0, 1])
        direction = vec / length
        rotation, _ = trimesh.geometry.align_vectors(z_axis, direction, return_angle=True)
        T = trimesh.transformations.translation_matrix((start + end) / 2)
        transform = trimesh.transformations.concatenate_matrices(T, rotation)
        cyl.apply_transform(transform)
        cylinders.append(cyl)
    if cylinders:
        return trimesh.util.concatenate(cylinders)
    else:
        return None

def create_point_cloud_mesh(points, colors, point_size=0.02):
    """
    Create a mesh representing the point cloud by instancing a small box
    at each point.
    
    Args:
      points (Nx3 array): XYZ coordinates.
      colors (Nx4 array): RGBA colors for each point (values in 0-255).
      point_size (float): Edge length of the box representing each point.
    
    Returns:
      A trimesh.Trimesh object containing all the box instances.
    """
    # Create a box template centered at the origin.
    box = trimesh.creation.box(extents=[point_size, point_size, point_size])
    meshes = []
    for pt, color in zip(points, colors):
        instance = box.copy()
        instance.visual.face_colors = color
        instance.apply_translation(pt)
        meshes.append(instance)
    return trimesh.util.concatenate(meshes)

def display_glb(glb_filename):
    """Display the GLB file in an IPython Notebook using <model-viewer>."""
    html_code = f"""
    <script type="module" src="https://unpkg.com/@google/model-viewer/dist/model-viewer.min.js"></script>
    <model-viewer src="{glb_filename}" alt="3D model" auto-rotate camera-controls style="width: 800px; height: 600px;"></model-viewer>
    """
    display(HTML(html_code))

def main():
    # Define bag file paths and corresponding topics
    pc_bag    = "/media/raghuram/Untitled/data/bags/gq_full_map_pc"
    pc_topic  = "/quadrotor/nvblox_node/static_occupancy"
    path1_bag = "/media/raghuram/Untitled/paths_waypoints/gq_path"
    path1_topic = "/quadrotor/des_path"
    path2_bag = "/media/raghuram/Untitled/bags/ros2_bags/gq_map_ros2"
    path2_topic = "/visualizer/trajectory_path"

    # Adjustable parameters
    point_size = 0.3
    max_points = 50000  # Limit the number of point cloud boxes to process
    ground_threshold = 0.3  # Remove points with z <= ground_threshold

    print("Extracting point cloud from:", pc_bag)
    points = extract_point_cloud(pc_bag, pc_topic)
    print(f"Extracted {points.shape[0]} points from the last message.")

    # Convert structured array to a regular (N,3) float64 array if needed.
    if points.size > 0 and points.dtype.names is not None:
        points = np.stack((points['x'], points['y'], points['z']), axis=-1).astype(np.float64)

    # Subsample the points if there are too many.
    if points.shape[0] > max_points:
        indices = np.random.choice(points.shape[0], max_points, replace=False)
        points = points[indices]
        print(f"Subsampled to {points.shape[0]} points.")
    
    # Remove ground points (those with z value below the threshold)
    if points.size > 0:
        points = points[points[:,2] > ground_threshold]
        print(f"After ground removal, {points.shape[0]} points remain.")

    print("Extracting path1 from:", path1_bag)
    path1_points = extract_path(path1_bag, path1_topic)
    print(f"Extracted {path1_points.shape[0]} waypoints for path1 from the last message.")

    print("Extracting path2 from:", path2_bag)
    path2_points = extract_path(path2_bag, path2_topic)
    print(f"Extracted {path2_points.shape[0]} waypoints for path2 from the last message.")

    # Build a trimesh scene to hold all geometry.
    scene = trimesh.Scene()

    # Create and add the point cloud mesh with grayscale intensity based on z height.
    if points.size > 0:
        z = points[:, 2]
        z_min = np.min(z)
        z_max = np.max(z)
        if np.abs(z_max - z_min) < 1e-6:
            normalized = np.zeros_like(z)
        else:
            normalized = (z - z_min) / (z_max - z_min)
        # Compute grayscale colors; adjust the range if needed.
        grayscale = (normalized * 255).astype(np.uint8)
        colors = np.stack((grayscale, grayscale, grayscale, np.full_like(grayscale, 255)), axis=1)
        # Create a mesh from points using small boxes.
        point_cloud_mesh = create_point_cloud_mesh(points, colors, point_size=point_size)
        scene.add_geometry(point_cloud_mesh, node_name='point_cloud')
    
    # Create and add path geometries if available.
    if path1_points.shape[0] > 1:
        path1_mesh = create_path_cylinders(path1_points, radius=0.3)
        if path1_mesh is not None:
            # Color path1 in red (RGBA)
            path1_mesh.visual.face_colors = [255, 0, 0, 255]
            scene.add_geometry(path1_mesh, node_name='path1')
    if path2_points.shape[0] > 1:
        path2_mesh = create_path_cylinders(path2_points, radius=0.3)
        if path2_mesh is not None:
            # Color path2 in light blue (RGBA)
            path2_mesh.visual.face_colors = [0, 0, 230, 255]
            scene.add_geometry(path2_mesh, node_name='path2')
    
    # Export the scene to a GLB (glTF binary) file.
    output_file = "/media/raghuram/Untitled/glb/output_gq.glb"
    scene.export(output_file)
    print(f"Scene exported successfully to {output_file}")

    # Display the GLB in the IPython Notebook.
    display_glb(output_file)#!/usr/bin/env python3
"""
Extract point cloud and paths from ROS2 bag files (using only the last message from each topic),
export to a GLB file, and display it in an IPython Notebook.

The point cloud is simulated as a set of small boxes (with adjustable size)
colored in grayscale based on their z height.
Path1 is colored red and path2 is colored blue.
If the point cloud is too large, it will be subsampled to avoid overloading the system.
Ground points (with z below a threshold) are removed for a cleaner point cloud.
"""

import numpy as np
import trimesh
import rosbag2_py
from rclpy.serialization import deserialize_message  # Updated import for ROS2 Humble
from sensor_msgs.msg import PointCloud2
from nav_msgs.msg import Path
import sensor_msgs_py.point_cloud2 as pc2

# For displaying in an IPython Notebook
from IPython.display import HTML, display

def extract_point_cloud(bag_path, topic):
    """Extract the point cloud data (x, y, z) from only the last message in the bag file for the given topic."""
    reader = rosbag2_py.SequentialReader()
    storage_options = rosbag2_py.StorageOptions(uri=bag_path, storage_id='sqlite3')
    converter_options = rosbag2_py.ConverterOptions(
        input_serialization_format='cdr',
        output_serialization_format='cdr'
    )
    reader.open(storage_options, converter_options)

    last_data = None
    while reader.has_next():
        topic_name, data, t = reader.read_next()
        if topic_name == topic:
            last_data = data
    if last_data is None:
        return np.array([])

    msg = deserialize_message(last_data, PointCloud2)
    points = []
    for p in pc2.read_points(msg, skip_nans=True, field_names=("x", "y", "z")):
        points.append(p)
    return np.array(points)

def extract_path(bag_path, topic):
    """Extract the path (as a set of poses) from only the last message from the bag file for the given topic."""
    reader = rosbag2_py.SequentialReader()
    storage_options = rosbag2_py.StorageOptions(uri=bag_path, storage_id='sqlite3')
    converter_options = rosbag2_py.ConverterOptions(
        input_serialization_format='cdr',
        output_serialization_format='cdr'
    )
    reader.open(storage_options, converter_options)

    last_data = None
    while reader.has_next():
        topic_name, data, t = reader.read_next()
        if topic_name == topic:
            last_data = data
    if last_data is None:
        return np.array([])

    msg = deserialize_message(last_data, Path)
    path_points = []
    for pose_stamped in msg.poses:
        pos = pose_stamped.pose.position
        path_points.append([pos.x, pos.y, pos.z])
    return np.array(path_points)

def create_path_cylinders(path_points, radius=0.05):
    """
    Create a single mesh that represents the path as a series of cylinders
    connecting consecutive waypoints.
    """
    cylinders = []
    for i in range(len(path_points) - 1):
        start = path_points[i]
        end = path_points[i + 1]
        vec = end - start
        length = np.linalg.norm(vec)
        if length == 0:
            continue
        # Create a cylinder along the Z axis (centered at origin)
        cyl = trimesh.creation.cylinder(radius=radius, height=length, sections=12)
        # Align the cylinder with the vector from start to end.
        z_axis = np.array([0, 0, 1])
        direction = vec / length
        rotation, _ = trimesh.geometry.align_vectors(z_axis, direction, return_angle=True)
        T = trimesh.transformations.translation_matrix((start + end) / 2)
        transform = trimesh.transformations.concatenate_matrices(T, rotation)
        cyl.apply_transform(transform)
        cylinders.append(cyl)
    if cylinders:
        return trimesh.util.concatenate(cylinders)
    else:
        return None

def create_point_cloud_mesh(points, colors, point_size=0.02):
    """
    Create a mesh representing the point cloud by instancing a small box
    at each point.
    
    Args:
      points (Nx3 array): XYZ coordinates.
      colors (Nx4 array): RGBA colors for each point (values in 0-255).
      point_size (float): Edge length of the box representing each point.
    
    Returns:
      A trimesh.Trimesh object containing all the box instances.
    """
    # Create a box template centered at the origin.
    box = trimesh.creation.box(extents=[point_size, point_size, point_size])
    meshes = []
    for pt, color in zip(points, colors):
        instance = box.copy()
        instance.visual.face_colors = color
        instance.apply_translation(pt)
        meshes.append(instance)
    return trimesh.util.concatenate(meshes)

def display_glb(glb_filename):
    """Display the GLB file in an IPython Notebook using <model-viewer>."""
    html_code = f"""
    <script type="module" src="https://unpkg.com/@google/model-viewer/dist/model-viewer.min.js"></script>
    <model-viewer src="{glb_filename}" alt="3D model" auto-rotate camera-controls style="width: 800px; height: 600px;"></model-viewer>
    """
    display(HTML(html_code))

def main():
    # Define bag file paths and corresponding topics
    pc_bag    = "/media/raghuram/Untitled/data/bags/gq_full_map_pc"
    pc_topic  = "/quadrotor/nvblox_node/static_occupancy"
    path1_bag = "/media/raghuram/Untitled/paths_waypoints/gq_path"
    path1_topic = "/quadrotor/des_path"
    path2_bag = "/media/raghuram/Untitled/bags/ros2_bags/gq_map_ros2"
    path2_topic = "/visualizer/trajectory_path"

    # Adjustable parameters
    point_size = 0.5
    max_points = 40000  # Limit the number of point cloud boxes to process
    ground_threshold = 0.3  # Remove points with z <= ground_threshold

    print("Extracting point cloud from:", pc_bag)
    points = extract_point_cloud(pc_bag, pc_topic)
    print(f"Extracted {points.shape[0]} points from the last message.")

    # Convert structured array to a regular (N,3) float64 array if needed.
    if points.size > 0 and points.dtype.names is not None:
        points = np.stack((points['x'], points['y'], points['z']), axis=-1).astype(np.float64)

    # Subsample the points if there are too many.
    if points.shape[0] > max_points:
        indices = np.random.choice(points.shape[0], max_points, replace=False)
        points = points[indices]
        print(f"Subsampled to {points.shape[0]} points.")
    
    # Remove ground points (those with z value below the threshold)
    if points.size > 0:
        points = points[points[:,2] > ground_threshold]
        print(f"After ground removal, {points.shape[0]} points remain.")

    print("Extracting path1 from:", path1_bag)
    path1_points = extract_path(path1_bag, path1_topic)
    print(f"Extracted {path1_points.shape[0]} waypoints for path1 from the last message.")

    print("Extracting path2 from:", path2_bag)
    path2_points = extract_path(path2_bag, path2_topic)
    print(f"Extracted {path2_points.shape[0]} waypoints for path2 from the last message.")

    # Build a trimesh scene to hold all geometry.
    scene = trimesh.Scene()

    # Create and add the point cloud mesh with grayscale intensity based on z height.
    if points.size > 0:
        z = points[:, 2]
        z_min = np.min(z)
        z_max = np.max(z)
        if np.abs(z_max - z_min) < 1e-6:
            normalized = np.zeros_like(z)
        else:
            normalized = (z - z_min) / (z_max - z_min)
        # Compute grayscale colors; adjust the range if needed.
        grayscale = (normalized * 255).astype(np.uint8)
        colors = np.stack((grayscale, grayscale, grayscale, np.full_like(grayscale, 255)), axis=1)
        # Create a mesh from points using small boxes.
        point_cloud_mesh = create_point_cloud_mesh(points, colors, point_size=point_size)
        scene.add_geometry(point_cloud_mesh, node_name='point_cloud')
    
    # Create and add path geometries if available.
    if path1_points.shape[0] > 1:
        path1_mesh = create_path_cylinders(path1_points, radius=0.3)
        if path1_mesh is not None:
            # Color path1 in red (RGBA)
            path1_mesh.visual.face_colors = [255, 0, 0, 255]
            scene.add_geometry(path1_mesh, node_name='path1')
    if path2_points.shape[0] > 1:
        path2_mesh = create_path_cylinders(path2_points, radius=0.3)
        if path2_mesh is not None:
            # Color path2 in light blue (RGBA)
            path2_mesh.visual.face_colors = [0, 0, 230, 255]
            scene.add_geometry(path2_mesh, node_name='path2')
    
    # Export the scene to a GLB (glTF binary) file.
    output_file = "/media/raghuram/Untitled/glb/output_gq.glb"
    scene.export(output_file)
    print(f"Scene exported successfully to {output_file}")

    # Display the GLB in the IPython Notebook.
    display_glb(output_file)

if __name__ == "__main__":
    main()


if __name__ == "__main__":
    main()


Extracting point cloud from: /media/raghuram/Untitled/data/bags/gq_full_map_pc


[INFO] [1744044875.115794737] [rosbag2_storage]: Opened database '/media/raghuram/Untitled/data/bags/gq_full_map_pc/rosbag2_2025_03_10-17_58_24_0.db3' for READ_ONLY.


Extracted 74846 points from the last message.
Subsampled to 40000 points.
After ground removal, 12596 points remain.
Extracting path1 from: /media/raghuram/Untitled/paths_waypoints/gq_path
Extracted 5600 waypoints for path1 from the last message.
Extracting path2 from: /media/raghuram/Untitled/bags/ros2_bags/gq_map_ros2


[INFO] [1744044875.424500571] [rosbag2_storage]: Opened database '/media/raghuram/Untitled/paths_waypoints/gq_path/rosbag2_2025_04_03-17_42_58_0.db3' for READ_ONLY.
[INFO] [1744044875.613108270] [rosbag2_storage]: Opened database '/media/raghuram/Untitled/bags/ros2_bags/gq_map_ros2/gq_map_ros2.db3' for READ_ONLY.


Extracted 3882 waypoints for path2 from the last message.
Scene exported successfully to /media/raghuram/Untitled/glb/output_gq.glb


Extracting point cloud from: /media/raghuram/Untitled/data/bags/gq_full_map_pc


[INFO] [1744044881.044354960] [rosbag2_storage]: Opened database '/media/raghuram/Untitled/data/bags/gq_full_map_pc/rosbag2_2025_03_10-17_58_24_0.db3' for READ_ONLY.


Extracted 74846 points from the last message.
Subsampled to 40000 points.
After ground removal, 12607 points remain.
Extracting path1 from: /media/raghuram/Untitled/paths_waypoints/gq_path
Extracted 5600 waypoints for path1 from the last message.
Extracting path2 from: /media/raghuram/Untitled/bags/ros2_bags/gq_map_ros2


[INFO] [1744044881.310932981] [rosbag2_storage]: Opened database '/media/raghuram/Untitled/paths_waypoints/gq_path/rosbag2_2025_04_03-17_42_58_0.db3' for READ_ONLY.
[INFO] [1744044881.485539019] [rosbag2_storage]: Opened database '/media/raghuram/Untitled/bags/ros2_bags/gq_map_ros2/gq_map_ros2.db3' for READ_ONLY.


Extracted 3882 waypoints for path2 from the last message.
Scene exported successfully to /media/raghuram/Untitled/glb/output_gq.glb


In [16]:
#!/usr/bin/env python3
"""
Extract point cloud and paths from ROS2 bag files (using only the last message from each topic),
export to a GLB file, and display it in an IPython Notebook.

The point cloud is simulated as a set of small boxes (with adjustable size)
colored in grayscale based on their z height.
Path1 is colored red and path2 is colored light blue.
If the point cloud is too large, it will be subsampled to avoid overloading the system.
"""

import numpy as np
import trimesh
import rosbag2_py
from rclpy.serialization import deserialize_message  # Updated import for ROS2 Humble
from sensor_msgs.msg import PointCloud2
from nav_msgs.msg import Path
import sensor_msgs_py.point_cloud2 as pc2

# For displaying in an IPython Notebook
from IPython.display import HTML, display

def extract_point_cloud(bag_path, topic):
    """Extract the point cloud data (x, y, z) from only the last message in the bag file for the given topic."""
    reader = rosbag2_py.SequentialReader()
    storage_options = rosbag2_py.StorageOptions(uri=bag_path, storage_id='sqlite3')
    converter_options = rosbag2_py.ConverterOptions(
        input_serialization_format='cdr',
        output_serialization_format='cdr'
    )
    reader.open(storage_options, converter_options)
    
    last_data = None
    while reader.has_next():
        topic_name, data, t = reader.read_next()
        if topic_name == topic:
            last_data = data
    if last_data is None:
        return np.array([])
    
    msg = deserialize_message(last_data, PointCloud2)
    points = []
    for p in pc2.read_points(msg, skip_nans=True, field_names=("x", "y", "z")):
        points.append(p)
    return np.array(points)

def extract_path(bag_path, topic):
    """Extract the path (as a set of poses) from only the last message in the bag file for the given topic."""
    reader = rosbag2_py.SequentialReader()
    storage_options = rosbag2_py.StorageOptions(uri=bag_path, storage_id='sqlite3')
    converter_options = rosbag2_py.ConverterOptions(
        input_serialization_format='cdr',
        output_serialization_format='cdr'
    )
    reader.open(storage_options, converter_options)
    
    last_data = None
    while reader.has_next():
        topic_name, data, t = reader.read_next()
        if topic_name == topic:
            last_data = data
    if last_data is None:
        return np.array([])
    
    msg = deserialize_message(last_data, Path)
    path_points = []
    for pose_stamped in msg.poses:
        pos = pose_stamped.pose.position
        path_points.append([pos.x, pos.y, pos.z])
    return np.array(path_points)

def create_path_cylinders(path_points, radius=0.05):
    """
    Create a single mesh that represents the path as a series of cylinders
    connecting consecutive waypoints.
    """
    cylinders = []
    for i in range(len(path_points) - 1):
        start = path_points[i]
        end = path_points[i + 1]
        vec = end - start
        length = np.linalg.norm(vec)
        if length == 0:
            continue
        # Create a cylinder along the Z axis (centered at origin)
        cyl = trimesh.creation.cylinder(radius=radius, height=length, sections=12)
        # Align the cylinder with the vector from start to end.
        z_axis = np.array([0, 0, 1])
        direction = vec / length
        # Get rotation matrix that aligns z_axis with our direction.
        rotation, _ = trimesh.geometry.align_vectors(z_axis, direction, return_angle=True)
        # Compute translation so that the cylinder is centered between start and end.
        T = trimesh.transformations.translation_matrix((start + end) / 2)
        # Combine rotation and translation.
        transform = trimesh.transformations.concatenate_matrices(T, rotation)
        cyl.apply_transform(transform)
        cylinders.append(cyl)
    if cylinders:
        return trimesh.util.concatenate(cylinders)
    else:
        return None

def create_point_cloud_mesh(points, colors, point_size=0.02):
    """
    Create a mesh representing the point cloud by instancing a small box at each point.
    
    Args:
      points (Nx3 array): XYZ coordinates.
      colors (Nx4 array): RGBA colors for each point (values in 0-255).
      point_size (float): Edge length of the box representing each point.
    
    Returns:
      A trimesh.Trimesh object containing all the box instances.
    """
    # Create a box template.
    # The box is centered at the origin.
    box = trimesh.creation.box(extents=[point_size, point_size, point_size])
    meshes = []
    for pt, color in zip(points, colors):
        instance = box.copy()
        instance.visual.face_colors = color
        instance.apply_translation(pt)
        meshes.append(instance)
    return trimesh.util.concatenate(meshes)

def display_glb(glb_filename):
    """Display the GLB file in an IPython Notebook using <model-viewer>."""
    html_code = f"""
    <script type="module" src="https://unpkg.com/@google/model-viewer/dist/model-viewer.min.js"></script>
    <model-viewer src="{glb_filename}" alt="3D model" auto-rotate camera-controls style="width: 800px; height: 600px;"></model-viewer>
    """
    display(HTML(html_code))

def main():
    # Define bag file paths and corresponding topics
    pc_bag    = "/media/raghuram/Untitled/data/bags/minco_map"
    pc_topic  = "/voxel_map"
    path1_bag = "/media/raghuram/Untitled/paths_waypoints/minco_path_bag"
    path1_topic = "/quadrotor/des_path"
    path2_bag = "/media/raghuram/Untitled/bags/ros2_bags/minco_map_ros2"
    path2_topic = "/visualizer/trajectory_path"

    # Adjustable parameters
    point_size = 0.5  # Edge length of each box
    max_points = 50000  # Limit the number of point cloud boxes to process

    print("Extracting point cloud from:", pc_bag)
    points = extract_point_cloud(pc_bag, pc_topic)
    print(f"Extracted {points.shape[0]} points from the last message.")

    # Convert structured array to a regular (N,3) float64 array if needed.
    if points.size > 0 and points.dtype.names is not None:
        points = np.stack((points['x'], points['y'], points['z']), axis=-1).astype(np.float64)

    # Subsample the points if there are too many
    if points.shape[0] > max_points:
        indices = np.random.choice(points.shape[0], max_points, replace=False)
        points = points[indices]
        print(f"Subsampled to {points.shape[0]} points.")

    print("Extracting path1 from:", path1_bag)
    path1_points = extract_path(path1_bag, path1_topic)
    print(f"Extracted {path1_points.shape[0]} waypoints for path1 from the last message.")

    print("Extracting path2 from:", path2_bag)
    path2_points = extract_path(path2_bag, path2_topic)
    print(f"Extracted {path2_points.shape[0]} waypoints for path2 from the last message.")

    # Build a trimesh scene to hold all geometry.
    scene = trimesh.Scene()

    # Create and add the point cloud mesh with grayscale intensity based on z height.
    if points.size > 0:
        z = points[:, 2]
        z_min = np.min(z)
        z_max = np.max(z)
        if np.abs(z_max - z_min) < 1e-6:
            normalized = np.zeros_like(z)
        else:
            normalized = (z - z_min) / (z_max - z_min)
        # Compute grayscale colors: same value for R, G, and B.
        grayscale = (normalized * 255 - 15).astype(np.uint8)
        colors = np.stack((grayscale, grayscale, grayscale, np.full_like(grayscale, 255)), axis=1)
        # Create a mesh from points using small boxes.
        point_cloud_mesh = create_point_cloud_mesh(points, colors, point_size=point_size)
        scene.add_geometry(point_cloud_mesh, node_name='point_cloud')
    
    # Create and add path geometries if available.
    if path1_points.shape[0] > 1:
        path1_mesh = create_path_cylinders(path1_points, radius=0.3)
        if path1_mesh is not None:
            # Color path1 in red (RGBA)
            path1_mesh.visual.face_colors = [255, 0, 0, 255]
            scene.add_geometry(path1_mesh, node_name='path1')
    if path2_points.shape[0] > 1:
        path2_mesh = create_path_cylinders(path2_points, radius=0.3)
        if path2_mesh is not None:
            # Color path2 in light blue (RGBA)
            path2_mesh.visual.face_colors = [0, 0, 255, 255]
            scene.add_geometry(path2_mesh, node_name='path2')

    # Export the scene to a GLB (glTF binary) file.
    output_file = "/media/raghuram/Untitled/glb/output_minco.glb"
    scene.export(output_file)
    print(f"Scene exported successfully to {output_file}")

    # Display the GLB in the IPython Notebook.
    display_glb(output_file)

if __name__ == "__main__":
    main()


Extracting point cloud from: /media/raghuram/Untitled/data/bags/minco_map


[INFO] [1744044693.638679210] [rosbag2_storage]: Opened database '/media/raghuram/Untitled/data/bags/minco_map/minco_map.db3' for READ_ONLY.


Extracted 239999 points from the last message.
Subsampled to 50000 points.
Extracting path1 from: /media/raghuram/Untitled/paths_waypoints/minco_path_bag
Extracted 5580 waypoints for path1 from the last message.
Extracting path2 from: /media/raghuram/Untitled/bags/ros2_bags/minco_map_ros2


[INFO] [1744044694.077043453] [rosbag2_storage]: Opened database '/media/raghuram/Untitled/paths_waypoints/minco_path_bag/rosbag2_2025_04_02-15_11_18_0.db3' for READ_ONLY.
[INFO] [1744044694.208561771] [rosbag2_storage]: Opened database '/media/raghuram/Untitled/bags/ros2_bags/minco_map_ros2/minco_map_ros2.db3' for READ_ONLY.


Extracted 3450 waypoints for path2 from the last message.
Scene exported successfully to /media/raghuram/Untitled/glb/output_minco.glb


In [21]:
#!/usr/bin/env python3
"""
Extract point cloud and paths from ROS2 bag files (using only the last message from each topic),
export to a GLB file, and display it in an IPython Notebook.

The point cloud is simulated as a set of small spheres (with adjustable size)
colored in grayscale based on their z height.
Path1 is colored red and path2 is colored blue.
If the point cloud is too large, it will be subsampled to avoid overloading the system.

Additionally, we remove points above a certain z-height to 'cut off the roof'.
"""

import numpy as np
import trimesh
import rosbag2_py
from rclpy.serialization import deserialize_message  # Updated import for ROS2 Humble
from sensor_msgs.msg import PointCloud2
from nav_msgs.msg import Path
import sensor_msgs_py.point_cloud2 as pc2

# For displaying in an IPython Notebook
from IPython.display import HTML, display

def extract_point_cloud(bag_path, topic):
    """Extract the point cloud data (x, y, z) from only the last message in the bag file for the given topic."""
    reader = rosbag2_py.SequentialReader()
    storage_options = rosbag2_py.StorageOptions(uri=bag_path, storage_id='sqlite3')
    converter_options = rosbag2_py.ConverterOptions(
        input_serialization_format='cdr',
        output_serialization_format='cdr'
    )
    reader.open(storage_options, converter_options)
    
    last_data = None
    while reader.has_next():
        topic_name, data, t = reader.read_next()
        if topic_name == topic:
            last_data = data
    if last_data is None:
        return np.array([])
    
    msg = deserialize_message(last_data, PointCloud2)
    points = []
    for p in pc2.read_points(msg, skip_nans=True, field_names=("x", "y", "z")):
        points.append(p)
    return np.array(points)

def extract_path(bag_path, topic):
    """Extract the path (as a set of poses) from only the last message in the bag file for the given topic."""
    reader = rosbag2_py.SequentialReader()
    storage_options = rosbag2_py.StorageOptions(uri=bag_path, storage_id='sqlite3')
    converter_options = rosbag2_py.ConverterOptions(
        input_serialization_format='cdr',
        output_serialization_format='cdr'
    )
    reader.open(storage_options, converter_options)
    
    last_data = None
    while reader.has_next():
        topic_name, data, t = reader.read_next()
        if topic_name == topic:
            last_data = data
    if last_data is None:
        return np.array([])
    
    msg = deserialize_message(last_data, Path)
    path_points = []
    for pose_stamped in msg.poses:
        pos = pose_stamped.pose.position
        path_points.append([pos.x, pos.y, pos.z])
    return np.array(path_points)

def create_path_cylinders(path_points, radius=0.05):
    """
    Create a single mesh that represents the path as a series of cylinders
    connecting consecutive waypoints.
    """
    cylinders = []
    for i in range(len(path_points) - 1):
        start = path_points[i]
        end = path_points[i + 1]
        vec = end - start
        length = np.linalg.norm(vec)
        if length == 0:
            continue
        # Create a cylinder along the Z axis (centered at origin)
        cyl = trimesh.creation.cylinder(radius=radius, height=length, sections=12)
        # Align the cylinder with the vector from start to end.
        z_axis = np.array([0, 0, 1])
        direction = vec / length
        # Get rotation matrix that aligns z_axis with our direction.
        rotation, _ = trimesh.geometry.align_vectors(z_axis, direction, return_angle=True)
        # Compute translation so that the cylinder is centered between start and end.
        T = trimesh.transformations.translation_matrix((start + end) / 2)
        # Combine rotation and translation.
        transform = trimesh.transformations.concatenate_matrices(T, rotation)
        cyl.apply_transform(transform)
        cylinders.append(cyl)
    if cylinders:
        return trimesh.util.concatenate(cylinders)
    else:
        return None

def create_point_cloud_mesh(points, colors, point_size=0.02):
    """
    Create a mesh representing the point cloud by instancing a small sphere
    at each point.
    
    Args:
      points (Nx3 array): XYZ coordinates.
      colors (Nx4 array): RGBA colors for each point (values in 0-255).
      point_size (float): Radius of the sphere representing each point.
    
    Returns:
      A trimesh.Trimesh object containing all the sphere instances.
    """
    # Create a sphere template.
    box = trimesh.creation.box(extents=[point_size, point_size, point_size])
    meshes = []
    for pt, color in zip(points, colors):
        instance = box.copy()
        instance.visual.face_colors = color
        instance.apply_translation(pt)
        meshes.append(instance)
    return trimesh.util.concatenate(meshes)

def display_glb(glb_filename):
    """Display the GLB file in an IPython Notebook using <model-viewer>."""
    html_code = f"""
    <script type="module" src="https://unpkg.com/@google/model-viewer/dist/model-viewer.min.js"></script>
    <model-viewer src="{glb_filename}" alt="3D model" auto-rotate camera-controls style="width: 800px; height: 600px;"></model-viewer>
    """
    display(HTML(html_code))

def main():
    # Define bag file paths and corresponding topics
    pc_bag    = "/media/raghuram/Untitled/data/bags/hospital_map"
    pc_topic  = "/occupancy_map/inflated_voxel_map"
    path1_bag = "/media/raghuram/Untitled/paths_waypoints/hospital_path_bag"
    path1_topic = "/quadrotor/des_path"
    path2_bag = "/media/raghuram/Untitled/bags/ros2_bags/hospital_map_ros2"
    path2_topic = "/visualizer/trajectory_path"

    # Adjustable parameters
    point_size = 0.5
    max_points = 50000  # Limit the number of point cloud spheres to process
    
    # This is the z-threshold above which we'll remove points ("cut off the roof").
    # Adjust this value as needed.
    max_z_threshold = 1.5

    print("Extracting point cloud from:", pc_bag)
    points = extract_point_cloud(pc_bag, pc_topic)
    print(f"Extracted {points.shape[0]} points from the last message.")

    # Convert structured array to a regular (N,3) float64 array if needed.
    if points.size > 0 and points.dtype.names is not None:
        points = np.stack((points['x'], points['y'], points['z']), axis=-1).astype(np.float64)

    # Remove any points above the specified z threshold
    if points.size > 0:
        before_filter_count = points.shape[0]
        points = points[points[:, 2] < max_z_threshold]
        after_filter_count = points.shape[0]
        print(f"Removed {before_filter_count - after_filter_count} points above z={max_z_threshold}.")
        print(f"{after_filter_count} points remain.")

    # Subsample the points if there are too many
    if points.shape[0] > max_points:
        indices = np.random.choice(points.shape[0], max_points, replace=False)
        points = points[indices]#!/usr/bin/env python3

        print(f"Subsampled to {points.shape[0]} points.")

    print("Extracting path1 from:", path1_bag)
    path1_points = extract_path(path1_bag, path1_topic)
    print(f"Extracted {path1_points.shape[0]} waypoints for path1 from the last message.")

    print("Extracting path2 from:", path2_bag)
    path2_points = extract_path(path2_bag, path2_topic)
    print(f"Extracted {path2_points.shape[0]} waypoints for path2 from the last message.")

    # Build a trimesh scene to hold all geometry.
    scene = trimesh.Scene()

    # Create and add the point cloud mesh with grayscale intensity based on z height.
    if points.size > 0:
        z = points[:, 2]
        z_min = np.min(z)
        z_max = np.max(z)
        if np.abs(z_max - z_min) < 1e-6:
            normalized = np.zeros_like(z)
        else:
            normalized = (z - z_min) / (z_max - z_min)
        # Compute grayscale colors: same value for R, G, and B.
        grayscale = (normalized * 255).astype(np.uint8)
        colors = np.stack((grayscale, grayscale, grayscale, np.full_like(grayscale, 255)), axis=1)
        # Create a mesh from points using small spheres.
        point_cloud_mesh = create_point_cloud_mesh(points, colors, point_size=point_size)
        scene.add_geometry(point_cloud_mesh, node_name='point_cloud')
    
    # Create and add path geometries if available.
    if path1_points.shape[0] > 1:
        path1_mesh = create_path_cylinders(path1_points, radius=0.3)
        if path1_mesh is not None:
            # Color path1 in red (RGBA)
            path1_mesh.visual.face_colors = [255, 0, 0, 255]
            scene.add_geometry(path1_mesh, node_name='path1')
    if path2_points.shape[0] > 1:
        path2_mesh = create_path_cylinders(path2_points, radius=0.3)
        if path2_mesh is not None:
            # Color path2 in blue (RGBA)
            path2_mesh.visual.face_colors = [0, 0, 255, 255]
            scene.add_geometry(path2_mesh, node_name='path2')

    # Export the scene to a GLB (glTF binary) file.
    output_file = "/media/raghuram/Untitled/glb/output_hospital.glb"
    scene.export(output_file)
    print(f"Scene exported successfully to {output_file}")

    # Display the GLB in the IPython Notebook.
    display_glb(output_file)

if __name__ == "__main__":
    main()


Extracting point cloud from: /media/raghuram/Untitled/data/bags/hospital_map


[INFO] [1744045511.836170528] [rosbag2_storage]: Opened database '/media/raghuram/Untitled/data/bags/hospital_map/hospital_map.db3' for READ_ONLY.


Extracted 733204 points from the last message.
Removed 347265 points above z=1.5.
385939 points remain.
Subsampled to 50000 points.
Extracting path1 from: /media/raghuram/Untitled/paths_waypoints/hospital_path_bag
Extracted 3199 waypoints for path1 from the last message.
Extracting path2 from: /media/raghuram/Untitled/bags/ros2_bags/hospital_map_ros2
Extracted 2814 waypoints for path2 from the last message.


[INFO] [1744045513.659841219] [rosbag2_storage]: Opened database '/media/raghuram/Untitled/paths_waypoints/hospital_path_bag/rosbag2_2025_04_02-14_15_48_0.db3' for READ_ONLY.
[INFO] [1744045513.750315505] [rosbag2_storage]: Opened database '/media/raghuram/Untitled/bags/ros2_bags/hospital_map_ros2/hospital_map_ros2.db3' for READ_ONLY.


Scene exported successfully to /media/raghuram/Untitled/glb/output_hospital.glb


In [22]:
#!/usr/bin/env python3
"""
Extract point cloud and paths from ROS2 bag files (using only the last message from each topic),
export to a GLB file, and display it in an IPython Notebook.

The point cloud is simulated as a set of small spheres (with adjustable size)
colored in grayscale based on their z height.
Path1 is colored red and path2 is colored blue.
If the point cloud is too large, it will be subsampled to avoid overloading the system.
"""

import numpy as np
import trimesh
import rosbag2_py
from rclpy.serialization import deserialize_message  # Updated import for ROS2 Humble
from sensor_msgs.msg import PointCloud2
from nav_msgs.msg import Path
import sensor_msgs_py.point_cloud2 as pc2

# For displaying in an IPython Notebook
from IPython.display import HTML, display

def extract_point_cloud(bag_path, topic):
    """Extract the point cloud data (x, y, z) from only the last message in the bag file for the given topic."""
    reader = rosbag2_py.SequentialReader()
    storage_options = rosbag2_py.StorageOptions(uri=bag_path, storage_id='sqlite3')
    converter_options = rosbag2_py.ConverterOptions(
        input_serialization_format='cdr',
        output_serialization_format='cdr'
    )
    reader.open(storage_options, converter_options)
    
    last_data = None
    while reader.has_next():
        topic_name, data, t = reader.read_next()
        if topic_name == topic:
            last_data = data
    if last_data is None:
        return np.array([])
    
    msg = deserialize_message(last_data, PointCloud2)
    points = []
    for p in pc2.read_points(msg, skip_nans=True, field_names=("x", "y", "z")):
        points.append(p)
    return np.array(points)

def extract_path(bag_path, topic):
    """Extract the path (as a set of poses) from only the last message in the bag file for the given topic."""
    reader = rosbag2_py.SequentialReader()
    storage_options = rosbag2_py.StorageOptions(uri=bag_path, storage_id='sqlite3')
    converter_options = rosbag2_py.ConverterOptions(
        input_serialization_format='cdr',
        output_serialization_format='cdr'
    )
    reader.open(storage_options, converter_options)
    
    last_data = None
    while reader.has_next():
        topic_name, data, t = reader.read_next()
        if topic_name == topic:
            last_data = data
    if last_data is None:
        return np.array([])
    
    msg = deserialize_message(last_data, Path)
    path_points = []
    for pose_stamped in msg.poses:
        pos = pose_stamped.pose.position
        path_points.append([pos.x, pos.y, pos.z])
    return np.array(path_points)

def create_path_cylinders(path_points, radius=0.05):
    """
    Create a single mesh that represents the path as a series of cylinders
    connecting consecutive waypoints.
    """
    cylinders = []
    for i in range(len(path_points) - 1):
        start = path_points[i]
        end = path_points[i + 1]
        vec = end - start
        length = np.linalg.norm(vec)
        if length == 0:
            continue
        # Create a cylinder along the Z axis (centered at origin)
        cyl = trimesh.creation.cylinder(radius=radius, height=length, sections=12)
        # Align the cylinder with the vector from start to end.
        z_axis = np.array([0, 0, 1])
        direction = vec / length
        # Get rotation matrix that aligns z_axis with our direction.
        rotation, _ = trimesh.geometry.align_vectors(z_axis, direction, return_angle=True)
        # Compute translation so that the cylinder is centered between start and end.
        T = trimesh.transformations.translation_matrix((start + end) / 2)
        # Combine rotation and translation.
        transform = trimesh.transformations.concatenate_matrices(T, rotation)
        cyl.apply_transform(transform)
        cylinders.append(cyl)
    if cylinders:
        return trimesh.util.concatenate(cylinders)
    else:
        return None

def create_point_cloud_mesh(points, colors, point_size=0.02):
    """
    Create a mesh representing the point cloud by instancing a small sphere
    at each point.
    
    Args:
      points (Nx3 array): XYZ coordinates.
      colors (Nx4 array): RGBA colors for each point (values in 0-255).
      point_size (float): Radius of the sphere representing each point.
    
    Returns:
      A trimesh.Trimesh object containing all the sphere instances.
    """
    # Create a sphere template.
    box = trimesh.creation.box(extents=[point_size, point_size, point_size])
    meshes = []
    for pt, color in zip(points, colors):
        instance = box.copy()
        instance.visual.face_colors = color
        instance.apply_translation(pt)
        meshes.append(instance)
    return trimesh.util.concatenate(meshes)

def display_glb(glb_filename):
    """Display the GLB file in an IPython Notebook using <model-viewer>."""
    html_code = f"""
    <script type="module" src="https://unpkg.com/@google/model-viewer/dist/model-viewer.min.js"></script>
    <model-viewer src="{glb_filename}" alt="3D model" auto-rotate camera-controls style="width: 800px; height: 600px;"></model-viewer>
    """
    display(HTML(html_code))

def main():
    # Define bag file paths and corresponding topics
    pc_bag    = "/media/raghuram/Untitled/data/bags/zju_forest_map"
    pc_topic  = "/map_generator/global_cloud"
    path1_bag = "/media/raghuram/Untitled/paths_waypoints/forest_path_bag"
    path1_topic = "/quadrotor/des_path"
    path2_bag = "/media/raghuram/Untitled/bags/ros2_bags/zhu_forrest_ros2"
    path2_topic = "/visualizer/trajectory_path"

    # Adjustable parameters
    point_size = 0.5
    max_points = 50000  # Limit the number of point cloud spheres to process

    print("Extracting point cloud from:", pc_bag)
    points = extract_point_cloud(pc_bag, pc_topic)
    print(f"Extracted {points.shape[0]} points from the last message.")

    # Convert structured array to a regular (N,3) float64 array if needed.
    if points.size > 0 and points.dtype.names is not None:
        points = np.stack((points['x'], points['y'], points['z']), axis=-1).astype(np.float64)

    # Subsample the points if there are too many
    if points.shape[0] > max_points:
        indices = np.random.choice(points.shape[0], max_points, replace=False)
        points = points[indices]
        print(f"Subsampled to {points.shape[0]} points.")

    print("Extracting path1 from:", path1_bag)
    path1_points = extract_path(path1_bag, path1_topic)
    print(f"Extracted {path1_points.shape[0]} waypoints for path1 from the last message.")

    print("Extracting path2 from:", path2_bag)
    path2_points = extract_path(path2_bag, path2_topic)
    print(f"Extracted {path2_points.shape[0]} waypoints for path2 from the last message.")

    # Build a trimesh scene to hold all geometry.
    scene = trimesh.Scene()

    # Create and add the point cloud mesh with grayscale intensity based on z height.
    if points.size > 0:
        z = points[:, 2]
        z_min = np.min(z)
        z_max = np.max(z)
        if np.abs(z_max - z_min) < 1e-6:
            normalized = np.zeros_like(z)
        else:
            normalized = (z - z_min) / (z_max - z_min)
        # Compute grayscale colors: same value for R, G, and B.
        grayscale = (normalized * 255 - 25).astype(np.uint8)
        colors = np.stack((grayscale, grayscale, grayscale, np.full_like(grayscale, 255)), axis=1)
        # Create a mesh from points using small spheres.
        point_cloud_mesh = create_point_cloud_mesh(points, colors, point_size=point_size)
        scene.add_geometry(point_cloud_mesh, node_name='point_cloud')
    
    # Create and add path geometries if available.
    if path1_points.shape[0] > 1:
        path1_mesh = create_path_cylinders(path1_points, radius=0.3)
        if path1_mesh is not None:
            # Color path1 in red (RGBA)
            path1_mesh.visual.face_colors = [255, 0, 0, 255]
            scene.add_geometry(path1_mesh, node_name='path1')
    if path2_points.shape[0] > 1:
        path2_mesh = create_path_cylinders(path2_points, radius=0.3)
        if path2_mesh is not None:
            # Color path2 in light blue (RGBA)
            path2_mesh.visual.face_colors = [0, 0, 255, 255]
            scene.add_geometry(path2_mesh, node_name='path2')

    
    # Export the scene to a GLB (glTF binary) file.
    output_file = "/media/raghuram/Untitled/glb/output_forest.glb"
    scene.export(output_file)
    print(f"Scene exported successfully to {output_file}")

    # Display the GLB in the IPython Notebook.
    display_glb(output_file)

if __name__ == "__main__":
    main()


Extracting point cloud from: /media/raghuram/Untitled/data/bags/zju_forest_map


[INFO] [1744045738.456496299] [rosbag2_storage]: Opened database '/media/raghuram/Untitled/data/bags/zju_forest_map/zju_forest_map.db3' for READ_ONLY.


Extracted 297443 points from the last message.
Subsampled to 50000 points.
Extracting path1 from: /media/raghuram/Untitled/paths_waypoints/forest_path_bag
Extracted 3302 waypoints for path1 from the last message.
Extracting path2 from: /media/raghuram/Untitled/bags/ros2_bags/zhu_forrest_ros2
Extracted 2336 waypoints for path2 from the last message.


[INFO] [1744045739.005875005] [rosbag2_storage]: Opened database '/media/raghuram/Untitled/paths_waypoints/forest_path_bag/rosbag2_2025_04_02-14_34_49_0.db3' for READ_ONLY.
[INFO] [1744045739.083482287] [rosbag2_storage]: Opened database '/media/raghuram/Untitled/bags/ros2_bags/zhu_forrest_ros2/zhu_forrest_ros2.db3' for READ_ONLY.


Scene exported successfully to /media/raghuram/Untitled/glb/output_forest.glb


In [24]:
#!/usr/bin/env python3
"""
Extract point cloud and paths from ROS2 bag files (using only the last message from each topic),
export to a GLB file, and display it in an IPython Notebook.

The point cloud is simulated as a set of small spheres (with adjustable size)
colored in grayscale based on their z height.
Path1 is colored red and path2 is colored blue.
If the point cloud is too large, it will be subsampled to avoid overloading the system.
"""

import numpy as np
import trimesh
import rosbag2_py
from rclpy.serialization import deserialize_message  # Updated import for ROS2 Humble
from sensor_msgs.msg import PointCloud2
from nav_msgs.msg import Path
import sensor_msgs_py.point_cloud2 as pc2

# For displaying in an IPython Notebook
from IPython.display import HTML, display

def extract_point_cloud(bag_path, topic):
    """Extract the point cloud data (x, y, z) from only the last message in the bag file for the given topic."""
    reader = rosbag2_py.SequentialReader()
    storage_options = rosbag2_py.StorageOptions(uri=bag_path, storage_id='sqlite3')
    converter_options = rosbag2_py.ConverterOptions(
        input_serialization_format='cdr',
        output_serialization_format='cdr'
    )
    reader.open(storage_options, converter_options)
    
    last_data = None
    while reader.has_next():
        topic_name, data, t = reader.read_next()
        if topic_name == topic:
            last_data = data
    if last_data is None:
        return np.array([])
    
    msg = deserialize_message(last_data, PointCloud2)
    points = []
    for p in pc2.read_points(msg, skip_nans=True, field_names=("x", "y", "z")):
        points.append(p)
    return np.array(points)

def extract_path(bag_path, topic):
    """Extract the path (as a set of poses) from only the last message in the bag file for the given topic."""
    reader = rosbag2_py.SequentialReader()
    storage_options = rosbag2_py.StorageOptions(uri=bag_path, storage_id='sqlite3')
    converter_options = rosbag2_py.ConverterOptions(
        input_serialization_format='cdr',
        output_serialization_format='cdr'
    )
    reader.open(storage_options, converter_options)
    
    last_data = None
    while reader.has_next():
        topic_name, data, t = reader.read_next()
        if topic_name == topic:
            last_data = data
    if last_data is None:
        return np.array([])
    
    msg = deserialize_message(last_data, Path)
    path_points = []
    for pose_stamped in msg.poses:
        pos = pose_stamped.pose.position
        path_points.append([pos.x, pos.y, pos.z])
    return np.array(path_points)

def create_path_cylinders(path_points, radius=0.05):
    """
    Create a single mesh that represents the path as a series of cylinders
    connecting consecutive waypoints.
    """
    cylinders = []
    for i in range(len(path_points) - 1):
        start = path_points[i]
        end = path_points[i + 1]
        vec = end - start
        length = np.linalg.norm(vec)
        if length == 0:
            continue
        # Create a cylinder along the Z axis (centered at origin)
        cyl = trimesh.creation.cylinder(radius=radius, height=length, sections=12)
        # Align the cylinder with the vector from start to end.
        z_axis = np.array([0, 0, 1])
        direction = vec / length
        # Get rotation matrix that aligns z_axis with our direction.
        rotation, _ = trimesh.geometry.align_vectors(z_axis, direction, return_angle=True)
        # Compute translation so that the cylinder is centered between start and end.
        T = trimesh.transformations.translation_matrix((start + end) / 2)
        # Combine rotation and translation.
        transform = trimesh.transformations.concatenate_matrices(T, rotation)
        cyl.apply_transform(transform)
        cylinders.append(cyl)
    if cylinders:
        return trimesh.util.concatenate(cylinders)
    else:
        return None

def create_point_cloud_mesh(points, colors, point_size=0.02):
    """
    Create a mesh representing the point cloud by instancing a small sphere
    at each point.
    
    Args:
      points (Nx3 array): XYZ coordinates.
      colors (Nx4 array): RGBA colors for each point (values in 0-255).
      point_size (float): Radius of the sphere representing each point.
    
    Returns:
      A trimesh.Trimesh object containing all the sphere instances.
    """
    # Create a sphere template.
    box = trimesh.creation.box(extents=[point_size, point_size, point_size])
    meshes = []
    for pt, color in zip(points, colors):
        instance = box.copy()
        instance.visual.face_colors = color
        instance.apply_translation(pt)
        meshes.append(instance)
    return trimesh.util.concatenate(meshes)

def display_glb(glb_filename):
    """Display the GLB file in an IPython Notebook using <model-viewer>."""
    html_code = f"""
    <script type="module" src="https://unpkg.com/@google/model-viewer/dist/model-viewer.min.js"></script>
    <model-viewer src="{glb_filename}" alt="3D model" auto-rotate camera-controls style="width: 800px; height: 600px;"></model-viewer>
    """
    display(HTML(html_code))

def main():
    # Define bag file paths and corresponding topics
    pc_bag    = "/media/raghuram/Untitled/data/bags/bspline_default_map"
    pc_topic  = "/occupancy_map/voxel_map"
    path1_bag = "/media/raghuram/Untitled/paths_waypoints/cmu_pRH_Bf"
    path1_topic = "/quadrotor/des_path"
    path2_bag = "/media/raghuram/Untitled/bags/ros2_bags/cmu_map_ros2"
    path2_topic = "/visualizer/trajectory_path"

    # Adjustable parameters
    point_size = 0.2
    max_points = 100000  # Limit the number of point cloud spheres to process

    print("Extracting point cloud from:", pc_bag)
    points = extract_point_cloud(pc_bag, pc_topic)
    print(f"Extracted {points.shape[0]} points from the last message.")

    # Convert structured array to a regular (N,3) float64 array if needed.
    if points.size > 0 and points.dtype.names is not None:
        points = np.stack((points['x'], points['y'], points['z']), axis=-1).astype(np.float64)

    # Subsample the points if there are too many
    if points.shape[0] > max_points:
        indices = np.random.choice(points.shape[0], max_points, replace=False)
        points = points[indices]
        print(f"Subsampled to {points.shape[0]} points.")

    print("Extracting path1 from:", path1_bag)
    path1_points = extract_path(path1_bag, path1_topic)
    print(f"Extracted {path1_points.shape[0]} waypoints for path1 from the last message.")

    print("Extracting path2 from:", path2_bag)
    path2_points = extract_path(path2_bag, path2_topic)
    print(f"Extracted {path2_points.shape[0]} waypoints for path2 from the last message.")

    # Build a trimesh scene to hold all geometry.
    scene = trimesh.Scene()

    # Create and add the point cloud mesh with grayscale intensity based on z height.
    if points.size > 0:
        z = points[:, 2]
        z_min = np.min(z)
        z_max = np.max(z)
        if np.abs(z_max - z_min) < 1e-6:
            normalized = np.zeros_like(z)
        else:
            normalized = (z - z_min) / (z_max - z_min)
        # Compute grayscale colors: same value for R, G, and B.
        grayscale = (normalized * 255 + 0).astype(np.uint8)
        colors = np.stack((grayscale, grayscale, grayscale, np.full_like(grayscale, 255)), axis=1)
        # Create a mesh from points using small spheres.
        point_cloud_mesh = create_point_cloud_mesh(points, colors, point_size=point_size)
        scene.add_geometry(point_cloud_mesh, node_name='point_cloud')
    
    # Create and add path geometries if available.
    if path1_points.shape[0] > 1:
        path1_mesh = create_path_cylinders(path1_points, radius=0.3)
        if path1_mesh is not None:
            # Color path1 in red (RGBA)
            path1_mesh.visual.face_colors = [255, 0, 0, 255]
            scene.add_geometry(path1_mesh, node_name='path1')
    if path2_points.shape[0] > 1:
        path2_mesh = create_path_cylinders(path2_points, radius=0.3)
        if path2_mesh is not None:
            # Color path2 in light blue (RGBA)
            path2_mesh.visual.face_colors = [0, 0, 255, 255]
            scene.add_geometry(path2_mesh, node_name='path2')

    
    # Export the scene to a GLB (glTF binary) file.
    output_file = "/media/raghuram/Untitled/glb/output_cmu.glb"
    scene.export(output_file)
    print(f"Scene exported successfully to {output_file}")

    # Display the GLB in the IPython Notebook.
    display_glb(output_file)

if __name__ == "__main__":
    main()


Extracting point cloud from: /media/raghuram/Untitled/data/bags/bspline_default_map
Extracted 102844 points from the last message.


[INFO] [1744045774.814707750] [rosbag2_storage]: Opened database '/media/raghuram/Untitled/data/bags/bspline_default_map/bspline_default_map.db3' for READ_ONLY.


Subsampled to 100000 points.
Extracting path1 from: /media/raghuram/Untitled/paths_waypoints/cmu_pRH_Bf
Extracted 1313 waypoints for path1 from the last message.
Extracting path2 from: /media/raghuram/Untitled/bags/ros2_bags/cmu_map_ros2
Extracted 1339 waypoints for path2 from the last message.


[INFO] [1744045775.029374406] [rosbag2_storage]: Opened database '/media/raghuram/Untitled/paths_waypoints/cmu_pRH_Bf/rosbag2_2025_04_02-14_05_42_0.db3' for READ_ONLY.
[INFO] [1744045775.072062950] [rosbag2_storage]: Opened database '/media/raghuram/Untitled/bags/ros2_bags/cmu_map_ros2/cmu_map_ros2.db3' for READ_ONLY.


Scene exported successfully to /media/raghuram/Untitled/glb/output_cmu.glb


In [None]:
#!/usr/bin/env python3
"""
Extract point cloud and paths from ROS2 bag files (using only the last message from each topic),
export to a GLB file, and display it in an IPython Notebook.

The point cloud is simulated as a set of small spheres (with adjustable size)
colored in grayscale based on their z height.
Path1 is colored red and path2 is colored blue.
If the point cloud is too large, it will be subsampled to avoid overloading the system.
"""

import numpy as np
import trimesh
import rosbag2_py
from rclpy.serialization import deserialize_message  # Updated import for ROS2 Humble
from sensor_msgs.msg import PointCloud2
from nav_msgs.msg import Path
import sensor_msgs_py.point_cloud2 as pc2

# For displaying in an IPython Notebook
from IPython.display import HTML, display

def extract_point_cloud(bag_path, topic):
    """Extract the point cloud data (x, y, z) from only the last message in the bag file for the given topic."""
    reader = rosbag2_py.SequentialReader()
    storage_options = rosbag2_py.StorageOptions(uri=bag_path, storage_id='sqlite3')
    converter_options = rosbag2_py.ConverterOptions(
        input_serialization_format='cdr',
        output_serialization_format='cdr'
    )
    reader.open(storage_options, converter_options)
    
    last_data = None
    while reader.has_next():
        topic_name, data, t = reader.read_next()
        if topic_name == topic:
            last_data = data
    if last_data is None:
        return np.array([])
    
    msg = deserialize_message(last_data, PointCloud2)
    points = []
    for p in pc2.read_points(msg, skip_nans=True, field_names=("x", "y", "z")):
        points.append(p)
    return np.array(points)

def extract_path(bag_path, topic):
    """Extract the path (as a set of poses) from only the last message in the bag file for the given topic."""
    reader = rosbag2_py.SequentialReader()
    storage_options = rosbag2_py.StorageOptions(uri=bag_path, storage_id='sqlite3')
    converter_options = rosbag2_py.ConverterOptions(
        input_serialization_format='cdr',
        output_serialization_format='cdr'
    )
    reader.open(storage_options, converter_options)
    
    last_data = None
    while reader.has_next():
        topic_name, data, t = reader.read_next()
        if topic_name == topic:
            last_data = data
    if last_data is None:
        return np.array([])
    
    msg = deserialize_message(last_data, Path)
    path_points = []
    for pose_stamped in msg.poses:
        pos = pose_stamped.pose.position
        path_points.append([pos.x, pos.y, pos.z])
    return np.array(path_points)

def create_path_cylinders(path_points, radius=0.05):
    """
    Create a single mesh that represents the path as a series of cylinders
    connecting consecutive waypoints.
    """
    cylinders = []
    for i in range(len(path_points) - 1):
        start = path_points[i]
        end = path_points[i + 1]
        vec = end - start
        length = np.linalg.norm(vec)
        if length == 0:
            continue
        # Create a cylinder along the Z axis (centered at origin)
        cyl = trimesh.creation.cylinder(radius=radius, height=length, sections=12)
        # Align the cylinder with the vector from start to end.
        z_axis = np.array([0, 0, 1])
        direction = vec / length
        # Get rotation matrix that aligns z_axis with our direction.
        rotation, _ = trimesh.geometry.align_vectors(z_axis, direction, return_angle=True)
        # Compute translation so that the cylinder is centered between start and end.
        T = trimesh.transformations.translation_matrix((start + end) / 2)
        # Combine rotation and translation.
        transform = trimesh.transformations.concatenate_matrices(T, rotation)
        cyl.apply_transform(transform)
        cylinders.append(cyl)
    if cylinders:
        return trimesh.util.concatenate(cylinders)
    else:
        return None

def create_point_cloud_mesh(points, colors, point_size=0.02):
    """
    Create a mesh representing the point cloud by instancing a small sphere
    at each point.
    
    Args:
      points (Nx3 array): XYZ coordinates.
      colors (Nx4 array): RGBA colors for each point (values in 0-255).
      point_size (float): Radius of the sphere representing each point.
    
    Returns:
      A trimesh.Trimesh object containing all the sphere instances.
    """
    # Create a sphere template.
    box = trimesh.creation.box(extents=[point_size, point_size, point_size])
    meshes = []
    for pt, color in zip(points, colors):
        instance = box.copy()
        instance.visual.face_colors = color
        instance.apply_translation(pt)
        meshes.append(instance)
    return trimesh.util.concatenate(meshes)

def display_glb(glb_filename):
    """Display the GLB file in an IPython Notebook using <model-viewer>."""
    html_code = f"""
    <script type="module" src="https://unpkg.com/@google/model-viewer/dist/model-viewer.min.js"></script>
    <model-viewer src="{glb_filename}" alt="3D model" auto-rotate camera-controls style="width: 800px; height: 600px;"></model-viewer>
    """
    display(HTML(html_code))

def main():
    # Define bag file paths and corresponding topics
    pc_bag    = "/media/raghuram/Untitled/bags/ros2_bags/map_VIO_indoors"
    pc_topic  = "/race16/visual_pc"
    path1_bag = "/media/raghuram/Untitled/bags/ros2_bags/map_VIO_indoors"
    path1_topic = "/race16/quadrotor_path"
    # path2_bag = "/media/raghuram/Untitled/bags/ros2_bags/cmu_map_ros2"
    # path2_topic = "/visualizer/trajectory_path"

    # Adjustable parameters
    point_size = 0.5
    max_points = 100000  # Limit the number of point cloud spheres to process

    print("Extracting point cloud from:", pc_bag)
    points = extract_point_cloud(pc_bag, pc_topic)
    print(f"Extracted {points.shape[0]} points from the last message.")

    # Convert structured array to a regular (N,3) float64 array if needed.
    if points.size > 0 and points.dtype.names is not None:
        points = np.stack((points['x'], points['y'], points['z']), axis=-1).astype(np.float64)

    # Subsample the points if there are too many
    if points.shape[0] > max_points:
        indices = np.random.choice(points.shape[0], max_points, replace=False)
        points = points[indices]
        print(f"Subsampled to {points.shape[0]} points.")

    print("Extracting path1 from:", path1_bag)
    path1_points = extract_path(path1_bag, path1_topic)
    print(f"Extracted {path1_points.shape[0]} waypoints for path1 from the last message.")

    print("Extracting path2 from:", path2_bag)
    path2_points = extract_path(path2_bag, path2_topic)
    print(f"Extracted {path2_points.shape[0]} waypoints for path2 from the last message.")

    # Build a trimesh scene to hold all geometry.
    scene = trimesh.Scene()

    # Create and add the point cloud mesh with grayscale intensity based on z height.
    if points.size > 0:
        z = points[:, 2]
        z_min = np.min(z)
        z_max = np.max(z)
        if np.abs(z_max - z_min) < 1e-6:
            normalized = np.zeros_like(z)
        else:
            normalized = (z - z_min) / (z_max - z_min)
        # Compute grayscale colors: same value for R, G, and B.
        grayscale = (normalized * 255 + 0).astype(np.uint8)
        colors = np.stack((grayscale, grayscale, grayscale, np.full_like(grayscale, 255)), axis=1)
        # Create a mesh from points using small spheres.
        point_cloud_mesh = create_point_cloud_mesh(points, colors, point_size=point_size)
        scene.add_geometry(point_cloud_mesh, node_name='point_cloud')
    
    # Create and add path geometries if available.
    if path1_points.shape[0] > 1:
        path1_mesh = create_path_cylinders(path1_points, radius=0.3)
        if path1_mesh is not None:
            # Color path1 in red (RGBA)
            path1_mesh.visual.face_colors = [255, 0, 0, 255]
            scene.add_geometry(path1_mesh, node_name='path1')
    # if path2_points.shape[0] > 1:
    #     path2_mesh = create_path_cylinders(path2_points, radius=0.3)
    #     if path2_mesh is not None:
    #         # Color path2 in light blue (RGBA)
    #         path2_mesh.visual.face_colors = [0, 0, 255, 255]
    #         scene.add_geometry(path2_mesh, node_name='path2')

    
    # Export the scene to a GLB (glTF binary) file.
    output_file = "/media/raghuram/Untitled/glb/output_vio_1.glb"
    scene.export(output_file)
    print(f"Scene exported successfully to {output_file}")

    # Display the GLB in the IPython Notebook.
    display_glb(output_file)

if __name__ == "__main__":
    main()
