In [1]:
from pathlib import Path
from rosbags.highlevel import AnyReader
from sensor_msgs.msg import PointCloud2,  PointField
import struct
import numpy as np
import math 
import open3d as o3d 

Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.


# Editing ROS2 bag
saving only the topics you need to ROS2 data.db3
### works only with standard ros messages 
find the list of supported messages [here](https://ternaris.gitlab.io/rosbags/topics/typesys.html#included-message-types): 

In [None]:
from __future__ import annotations

from typing import TYPE_CHECKING, cast

from rosbags.interfaces import ConnectionExtRosbag2
from rosbags.rosbag2 import Reader, Writer

if TYPE_CHECKING:
    from pathlib import Path


def save_topics(src: Path, dst: Path, topics: list) -> None:
    """Remove topic from rosbag2.

    Args:
        src: Source path.
        dst: Destination path.
        topic: list of topics to save
    """
    
    with Reader(src) as reader, Writer(dst) as writer:
        conn_map = {}
        for conn in reader.connections:
            if conn.topic in topics:
                ext = cast(ConnectionExtRosbag2, conn.ext)
                conn_map[conn.id] = writer.add_connection(
                    conn.topic,
                    conn.msgtype,
                    serialization_format = ext.serialization_format,
                    offered_qos_profiles = ext.offered_qos_profiles,
                )
            else: 
                continue

        rconns = [reader.connections[x-1] for x in conn_map]
        for conn, timestamp, data in reader.messages(connections=rconns):
            writer.write(conn_map[conn.id], timestamp, data)

In [None]:
topics = ['/velodyne_points']
# Change the directories to your data directories
save_topics('/home/r/records', '/home/r/records/bag',topics)


# Saving the map from the LOAM results

In [2]:
_DATATYPES = {}
_DATATYPES[PointField.INT8]    = ('b', 1)
_DATATYPES[PointField.UINT8]   = ('B', 1)
_DATATYPES[PointField.INT16]   = ('h', 2)
_DATATYPES[PointField.UINT16]  = ('H', 2)
_DATATYPES[PointField.INT32]   = ('i', 4)
_DATATYPES[PointField.UINT32]  = ('I', 4)
_DATATYPES[PointField.FLOAT32] = ('f', 4)
_DATATYPES[PointField.FLOAT64] = ('d', 8)

In [3]:
def read_points(cloud, field_names=None, skip_nans=False, uvs=[]):

    fmt = _get_struct_fmt(cloud.is_bigendian, cloud.fields, field_names)
    width, height, point_step, row_step, data, isnan = cloud.width, cloud.height, cloud.point_step, cloud.row_step, cloud.data, math.isnan
    unpack_from = struct.Struct(fmt).unpack_from

    if skip_nans:
        if uvs:
            for u, v in uvs:
                p = unpack_from(data, (row_step * v) + (point_step * u))
                has_nan = False
                for pv in p:
                    if isnan(pv):
                        has_nan = True
                        break
                if not has_nan:
                    yield p
        else:
            for v in range(height):
                offset = row_step * v
                for u in range(width):
                    p = unpack_from(data, offset)[:3]
                    has_nan = False
                    for pv in p:
                        if isnan(pv):
                            has_nan = True
                            break
                    if not has_nan:
                        yield p
                    offset += point_step
    else:
        if uvs:
            for u, v in uvs:
                yield unpack_from(data, (row_step * v) + (point_step * u))
        else:
            for v in range(height):
                offset = row_step * v
                for u in range(width):
                    yield unpack_from(data, offset)[:3]
                    offset += point_step

def _get_struct_fmt(is_bigendian, fields, field_names=None):
    fmt = '>' if is_bigendian else '<'

    offset = 0
    for field in (f for f in sorted(fields, key=lambda f: f.offset) if field_names is None or f.name in field_names):
        if offset < field.offset:
            fmt += 'x' * (field.offset - offset)
            offset = field.offset
        if field.datatype not in _DATATYPES:
            print('Skipping unknown PointField datatype [%d]' % field.datatype, file=sys.stderr)
        else:
            datatype_fmt, datatype_length = _DATATYPES[field.datatype]
            fmt    += field.count * datatype_fmt
            offset += field.count * datatype_length

    return fmt

**Play the rosbag and extract the last point cloud in the topic ```/laser_cloud_map```**

In [4]:
scan = o3d.geometry.PointCloud()
# Change the path here to you data path
with AnyReader([Path('/home/r/Desktop/map/bag.bag')]) as reader:
    # topic and msgtype information is available on .connections list
    for connection in reader.connections:
        print(connection.topic, connection.msgtype)
    for connection, timestamp, rawdata in reader.messages():
        if connection.topic == '/laser_cloud_map':
            msg = reader.deserialize(rawdata, connection.msgtype)
            points_as_array = np.array(list(read_points(msg, skip_nans=True)))
            scan.points = o3d.utility.Vector3dVector(points_as_array)

/aft_mapped_to_init nav_msgs/msg/Odometry
/velodyne_cloud_registered sensor_msgs/msg/PointCloud2
/laser_cloud_map sensor_msgs/msg/PointCloud2


**Visualiz the Map and save it as .pcd file**

In [5]:
o3d.io.write_point_cloud("map.pcd", scan)
o3d.visualization.draw_geometries([scan])