In [1]:
import lidar
import rosbag
from dask.distributed import Client
client = Client()  # start distributed scheduler locally.  Launch dashboard

Failed to load Python extension for LZ4 support. LZ4 compression will not be available.


first converting the bagfile to a parquet file

In [6]:
bagfile = "/workspaces/lidar/tests/testdata/big.bag"
bagfile_bag = rosbag.Bag(bagfile)

In [2]:
#!/usr/bin/env python
import pandas as pd
import numpy as np
import itertools
from typing import Optional, List
import rosbag
from tqdm import tqdm
import sensor_msgs.point_cloud2 as pc2
from dask import delayed
import dask.dataframe as dd


PANDAS_TYPEMAPPING = {
    1: np.dtype("int8"),
    2: np.dtype("uint8"),
    3: np.dtype("int16"),
    4: np.dtype("uint16"),
    5: np.dtype("int32"),
    6: np.dtype("uint32"),
    7: np.dtype("float32"),
    8: np.dtype("float64"),
}



def read_bag(
    bag: rosbag.Bag,
    start_frame_number: Optional[int] = 0,
    end_frame_number: Optional[int] = None,
    keep_zeros: bool = False,
    topic: str = "/os1_cloud_node/points",
) -> List:
    messages = bag.read_messages(topics=[topic])
    sliced_messages = itertools.islice(messages, start_frame_number, None)
    result_list = []
    if end_frame_number is None:
        end_frame_number = 2  # TODO fix to lenght of messages
    for frame_number in tqdm(range(start_frame_number, end_frame_number, 1)):
        message = next(sliced_messages)
        frame = delayed(dataframe_from_message(message, keep_zeros))
        result_list.append(frame)
    return result_list


def dataframe_from_message(
    message: rosbag.bag.BagMessage, keep_zeros: bool = False
) -> pd.DataFrame:
    columnnames = [item.name for item in message.message.fields]
    type_dict = {
        item.name: PANDAS_TYPEMAPPING[item.datatype] for item in message.message.fields
    }
    frame_raw = pc2.read_points(message.message)
    frame_df = pd.DataFrame(np.array(list(frame_raw)), columns=columnnames)
    frame_df = frame_df.astype(type_dict)
    if not keep_zeros:
        frame_df = frame_df[
            (frame_df["x"] != 0.0) & (frame_df["y"] != 0.0) & (frame_df["z"] != 0.0)
        ]
        frame_df["original_id"] = frame_df.index
        frame_df = frame_df.astype({"original_id": "uint32"})
        frame_df = frame_df.reset_index(drop=True)
    return frame_df


  

In [30]:
lazy_dataframes = read_bag(bagfile_bag, 0, 200, False, "/os1_cloud_node/points")

100%|██████████| 200/200 [01:29<00:00,  2.22it/s]


In [9]:
test = dd.from_delayed(lazy_dataframes)

In [10]:
test.x.max().compute()

1.252312421798706

In [11]:
test.tail()

Unnamed: 0,x,y,z,intensity,t,reflectivity,ring,noise,range,original_id
111817,0.805429,0.045549,-0.115905,79.0,99926580,5,47,24,815,131055
111818,0.79422,-0.013478,-0.158536,108.0,99926580,7,53,39,810,131061
111819,0.80339,0.015773,-0.168396,101.0,99926580,7,54,58,821,131062
111820,0.762607,0.014853,-0.189751,69.0,99926580,4,58,37,786,131066
111821,0.776013,0.043899,-0.201706,64.0,99926580,4,59,32,803,131067


In [12]:
test.head()

Unnamed: 0,x,y,z,intensity,t,reflectivity,ring,noise,range,original_id
0,0.796484,-0.04322,0.236579,50.0,0,3,0,44,832,0
1,0.807739,0.046913,0.214302,134.0,0,9,3,42,837,3
2,0.808805,0.016405,0.19007,153.0,0,10,6,37,831,6
3,0.818806,-0.043872,0.177631,65.0,0,4,8,42,839,8
4,0.821585,-0.01381,0.169493,136.0,0,9,9,25,839,9


In [13]:
test

Unnamed: 0_level_0,x,y,z,intensity,t,reflectivity,ring,noise,range,original_id
npartitions=200,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
,float32,float32,float32,float32,uint32,uint16,uint8,uint16,uint32,uint32
,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...


In [14]:
lazy_dataframes[0].compute()

Unnamed: 0,x,y,z,intensity,t,reflectivity,ring,noise,range,original_id
0,0.796484,-0.043220,0.236579,50.0,0,3,0,44,832,0
1,0.807739,0.046913,0.214302,134.0,0,9,3,42,837,3
2,0.808805,0.016405,0.190070,153.0,0,10,6,37,831,6
3,0.818806,-0.043872,0.177631,65.0,0,4,8,42,839,8
4,0.821585,-0.013810,0.169493,136.0,0,9,9,25,839,9
...,...,...,...,...,...,...,...,...,...,...
112182,0.801167,0.016010,-0.137276,107.0,99939680,7,50,43,813,131058
112183,0.814400,0.046128,-0.147871,86.0,99939680,6,51,56,829,131059
112184,0.809908,-0.013744,-0.161668,107.0,99939680,7,53,39,826,131061
112185,0.795561,0.015620,-0.166755,86.0,99939680,6,54,57,813,131062


This is very promissing! I could just keep the tiny_lazy_dataframes and then convert it to a frame whenever needed!

What about the meta data, how can I use that?

In [17]:
test.to_parquet("/workspaces/export_test")

In [3]:
test2 = dd.read_parquet("/workspaces/export_test")

In [4]:
test2.x.max().compute()

1.252312421798706

Writing meta data as a json into the same folder

In [21]:
meta = {"oring_file": "sepp", "timestamps": [234234.234234, 234234.22234]}

In [22]:
import json

In [24]:
json.dump(meta)

TypeError: dump() missing 1 required positional argument: 'fp'

In [25]:
with open("/workspaces/export_test/data_file.json", "w") as write_file:
    json.dump(meta, write_file)


In [26]:
with open("/workspaces/export_test/data_file.json", "r") as read_file:
    data = json.load(read_file)


In [28]:
data["t"]

'sepp'

In [6]:
type(test2)

dask.dataframe.core.DataFrame

In [10]:
test2.describe().compute()

Unnamed: 0,x,y,z,intensity,t,reflectivity,ring,noise,range,original_id
count,22405810.0,22405810.0,22405810.0,22405810.0,22405810.0,22405810.0,22405810.0,22405810.0,22405810.0,22405810.0
mean,-1.395355,-0.1555028,0.1420729,109.8774,49397940.0,298.1485,31.5414,44.84404,2951.956,64785.94
std,3.188425,2.228653,0.7776557,132.3687,27723680.0,731.4155,18.37981,58.30621,3002.505,36345.0
min,-34.5218,-15.38729,-0.665405,8.0,0.0,1.0,0.0,0.0,665.0,0.0
25%,-1.821742,-1.5286,-0.2560951,47.0,25745800.0,7.0,16.0,30.0,1212.0,33750.75
50%,0.02228281,0.02162177,-0.004399885,71.0,50354340.0,21.0,32.0,39.0,1956.0,66066.0
75%,0.7839448,1.622289,0.2577102,129.0,74855910.0,115.0,48.0,51.0,3280.0,98234.5
max,1.252312,5.981112,6.153335,2566.0,99984610.0,19740.0,63.0,2071.0,35017.0,131070.0


In [24]:
test2.shape

(Delayed('int-3bde0fd9-9481-440e-8069-0e92b3cb828e'), 10)

In [25]:
len(test2)

22405807

In [28]:
test2

Unnamed: 0_level_0,x,y,z,intensity,t,reflectivity,ring,noise,range,original_id
npartitions=200,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
,float32,float32,float32,float32,uint32,uint16,uint8,uint16,uint32,uint32
,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...


In [29]:
test2.npartitions

200

In [31]:
test3 = test2.get_partition(0)

In [32]:
type(test3)

dask.dataframe.core.DataFrame

In [37]:
test3.compute()

Unnamed: 0,x,y,z,intensity,t,reflectivity,ring,noise,range,original_id
0,0.796484,-0.043220,0.236579,50.0,0,3,0,44,832,0
1,0.807739,0.046913,0.214302,134.0,0,9,3,42,837,3
2,0.808805,0.016405,0.190070,153.0,0,10,6,37,831,6
3,0.818806,-0.043872,0.177631,65.0,0,4,8,42,839,8
4,0.821585,-0.013810,0.169493,136.0,0,9,9,25,839,9
...,...,...,...,...,...,...,...,...,...,...
112182,0.801167,0.016010,-0.137276,107.0,99939680,7,50,43,813,131058
112183,0.814400,0.046128,-0.147871,86.0,99939680,6,51,56,829,131059
112184,0.809908,-0.013744,-0.161668,107.0,99939680,7,53,39,826,131061
112185,0.795561,0.015620,-0.166755,86.0,99939680,6,54,57,813,131062


In [38]:
pd.Timestamp('2017-01-01T12')

Timestamp('2017-01-01 12:00:00')

In [39]:
type(pd.Timestamp('2017-01-01T12'))

pandas._libs.tslibs.timestamps.Timestamp

In [40]:
import datetime

In [45]:
type(datetime.datetime(2021,10,10))

datetime.datetime

In [48]:
datetime.datetime.now()

datetime.datetime(2021, 3, 12, 12, 53, 23, 276731)

In [56]:
test3 = [delayed(test2.get_partition(i)) for i in range(0,10)]

In [58]:
test4 = dd.from_delayed(test3)

In [59]:
test4.npartitions

10