In [7]:
import rclpy
import os
import cv2
from rosbag2_py import SequentialReader, StorageOptions, ConverterOptions
from cv_bridge import CvBridge
from sensor_msgs.msg import CompressedImage, PointCloud2
from rclpy.serialization import deserialize_message
import numpy as np

def read_rosbag2(bag_file):

    # Create a SequentialReader instance
    storage_options = StorageOptions(uri=bag_file)
    converter_options = ConverterOptions(input_serialization_format="cdr", output_serialization_format="cdr")
    reader = SequentialReader()
    reader.open(storage_options, converter_options)
    bridge=CvBridge()
    data_dict = {}

    pcl2ls = PointCloudToLaserScan()
    # Read and print messages
    while reader.has_next():
        topic, msg, t = reader.read_next()
        t = int(t/1e09)
        
        data_dict[t] = {}
        if topic == '/camera/color/image_raw/compressed':
            m = deserialize_message(msg, CompressedImage)
            cv_image=bridge.compressed_imgmsg_to_cv2(m, desired_encoding='bgr8')
            # cv2.imshow("Compressed Image"+ , cv_image)
            # cv2.waitKey(2)
            data_dic[t]['image'] = cv_image
            break
            
        if topic == '/velodyne_points':
            m = deserialize_message(msg, PointCloud2)
            ls = pcl2ls.point_cloud_callback(m)
            data_dict[t]['laser_scan'] = ls
            break


if __name__ == "__main__":
    read_rosbag2("/home/nigitha/ros2_ws_rnd/src/Data/hall_01")

[INFO] [1724829035.644594046] [rosbag2_storage]: Opened database '/home/nigitha/ros2_ws_rnd/src/Data/hall_01/hall_01.db3' for READ_ONLY.


NameError: name 'PointCloudToLaserScan' is not defined

In [None]:
pip install numpy==1.21.6

Collecting numpy==1.21.6
[0m  Downloading numpy-1.21.6-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (2.1 kB)
Downloading numpy-1.21.6-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (15.9 MB)
[2K   [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m15.9/15.9 MB[0m [31m7.0 MB/s[0m eta [36m0:00:00[0m[31m7.1 MB/s[0m eta [36m0:00:01[0m
[?25hInstalling collected packages: numpy
  Attempting uninstall: numpy
    Found existing installation: numpy 2.1.0
    Uninstalling numpy-2.1.0:
      Successfully uninstalled numpy-2.1.0
Successfully installed numpy-1.21.6
Note: you may need to restart the kernel to use updated packages.


In [None]:
import rclpy
from sensor_msgs.msg import PointCloud2, LaserScan
import numpy as np
import sensor_msgs_py.point_cloud2 as point_cloud2
class PointCloudToLaserScan():
    def __init__(self):
        self.angle_min =  -np.pi / 2
        self.angle_max = np.pi / 2
        self.angle_increment = 0.01
        self.range_min = 0.0
        self.range_max = 10.0
        self.scan_time = 0.1
        self.min_height = -0.1
        self.max_height = 0.1

    
    def point_cloud_callback(self, msg):
        points = list(point_cloud2.read_points(msg, field_names=("x", "y", "z"), skip_nans=True))
        
        ranges = np.full(int((self.angle_max - self.angle_min) / self.angle_increment), np.inf)
        
        for point in points:
            x, y, z = point
            
            if z < self.min_height or z > self.max_height:
                continue
            
            range_val = np.sqrt(x**2 + y**2)
            if range_val < self.range_min or range_val > self.range_max:
                continue
            
            angle = np.arctan2(y, x)
            if angle < self.angle_min or angle > self.angle_max:
                continue
            
            bin_index = int((angle - self.angle_min) / self.angle_increment)
            if 0 <= bin_index < len(ranges):
                ranges[bin_index] = min(ranges[bin_index], range_val)
        
        scan = LaserScan()
        scan.header = msg.header
        scan.angle_min = self.angle_min
        scan.angle_max = self.angle_max
        scan.angle_increment = self.angle_increment
        scan.time_increment = 0.0
        scan.scan_time = self.scan_time
        scan.range_min = self.range_min
        scan.range_max = self.range_max
        scan.ranges = ranges.tolist()
        return scan
