Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: replace rosbag time stamper with sim time(current time) #300

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ros2bag/ros2bag/verb/play.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ def add_arguments(self, parser, cli_name): # noqa: D102
parser.add_argument(
'-s', '--storage', default='sqlite3',
help='storage identifier to be used, defaults to "sqlite3"')
parser.add_argument(
'-c', '--current', action='store_true',
help='determine if current system time will be set in Header time stamper')
parser.add_argument(
'-r', '--read-ahead-queue-size', type=int, default=1000,
help='size of message queue rosbag tries to hold in memory to help deterministic '
Expand All @@ -46,5 +49,6 @@ def main(self, *, args): # noqa: D102
rosbag2_transport_py.play(
uri=bag_file,
storage_id=args.storage,
use_current_time=args.current,
node_prefix=NODE_NAME_PREFIX,
read_ahead_queue_size=args.read_ahead_queue_size)
3 changes: 3 additions & 0 deletions rosbag2_transport/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ find_package(ament_cmake_ros REQUIRED)
find_package(rcl REQUIRED)
find_package(rclcpp REQUIRED)
find_package(rcutils REQUIRED)
find_package(rosidl_typesupport_introspection_cpp REQUIRED)
find_package(rmw REQUIRED)
find_package(rosbag2_compression REQUIRED)
find_package(rosbag2_cpp REQUIRED)
Expand All @@ -46,6 +47,8 @@ ament_target_dependencies(${PROJECT_NAME}
rcl
rclcpp
rcutils
rosidl_generator_cpp
rosidl_typesupport_introspection_cpp
rmw
rosbag2_compression
rosbag2_cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ struct PlayOptions
public:
size_t read_ahead_queue_size;
std::string node_prefix = "";
bool use_current_time = false;
};

} // namespace rosbag2_transport
Expand Down
191 changes: 191 additions & 0 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ void Player::play(const PlayOptions & options)
{
prepare_publishers();

if(options.use_current_time)
{
prepare_topic_ts_map();
}

storage_loading_future_ = std::async(
std::launch::async,
[this, options]() {load_storage_content(options);});
Expand Down Expand Up @@ -132,17 +137,203 @@ void Player::play_messages_from_queue()
} while (!is_storage_completely_loaded() && rclcpp::ok());
}

unsigned long alignment(unsigned long data_size, unsigned long last_data_size, unsigned long current_position)
{
return data_size > last_data_size ? (data_size - current_position % data_size) & (data_size-1):0;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we do this vendor specific thing here?

how can we use the dds vendor implementation instead of using copied code in here

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this alignment related to time? Just trying to keep the set of changes semantically separated.

Copy link

@kikass13 kikass13 Feb 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in our version of this feature, we are changing the timestamp inside the serialized message directly, that way we achieve a "simulated" time while publishing messages via the player class. We therefore need to be able to find out exactly where to put the header.stamp, which depends on a dynamic alignment inside the dds serialization process based on underlying message types.
This is related to issue #299

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the offset in typesupport does not provides actual offset in fastrtps serialized data. (It provides the offset in machine memory, using the alignment of machine) In order to figure out the actual offset of "Header", we must calculate the offset according to Fastrtps alignment rule.

}

void Player::deal_with_string(const uint8_t *dds_buffer, bool is_wstring)
{
uint32_t length;
size_t string_header = sizeof(uint32_t);

unsigned long one_offset = alignment(string_header, last_data_size_, current_position_);
current_position_ = current_position_ + one_offset;
memcpy(&length, (dds_buffer + current_position_ + 4), string_header);

if(!is_wstring)
{
last_data_size_ = sizeof(char);
current_position_ += (string_header + length);
}
else {
last_data_size_ = sizeof(uint32_t);
current_position_ += (string_header + length * sizeof(uint32_t));
}
}

void Player::calculate_position_with_align(const uint8_t * dds_buffer_ptr, const rosidl_typesupport_introspection_cpp::MessageMember *message_member, unsigned long stop_index)
scoopySHI marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this change should actually land in rosidl_typesupport_introspection_cpp rather than in rosbag2.
We've worked on a similar change for the serialized bag message, which resulted in here: ros2/rosidl#416

So I am very hesitant to bring back this logic

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can confirm that this is probably not the right place for a lot of this logic. the point is that we don't really know if this PR is even necessary. There is currently no way (as far as we know) of achieving what #299 needs without compromising the relatively clean rosbag functionality. So this is just the first shot we have for this problem so any suggestion is helpful :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our initial thought was also to generate the specific message type buffer solely from the rosidl_typesupport_introspection_cpp. Then deserialize the fastrtps data in the buffer, replace header time stamper, serialize it and publish. Considering the processing speed we stick to the plan above using memcpy.
I totally agree that it is worth some discussion about where and how this feature should be implemented. This PR is just our try to solve the issue and it works. Any advise or suggestion is helpful for us :)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Karsten1987 the ros2/rosidl#416 you mentioned does not solve the problem. We do not know the type at hand (we cannot static cast something unknown into something unknown) ...
std_msgs__msg__String msg = *static_cast<std_msgs__msg__String *>(message_memory);
in your solution does not help when dealing with fully unknown type deserialization. This could only work if we switch(type_string) case 1, case 2, case 3 all possible message types - which is not possible due to the existence of custom messages (which are not known by default).

This works well for subscribers (where I give the expected Template type of the deserialization) ... but it should not work in places where i deserialize something random and look for fields.

Am I missing something here? Thanks :)

{
unsigned long one_offset = 0;
unsigned long data_size = 0;

for (unsigned int i = 0; i < stop_index; i++) {
bool is_string = false;
bool is_wstring = false;
bool is_ros_msg_type = false;
const rosidl_typesupport_introspection_cpp::MessageMembers * sub_members;
switch (message_member[i].type_id_) {
case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_BOOL:
data_size = sizeof(bool);
break;
case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_BYTE:
case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_UINT8:
data_size = sizeof (uint8_t);
break;
case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_CHAR:
case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_INT8:
data_size = sizeof (char);
break;
case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_FLOAT:
data_size = sizeof (float);
break;
case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_FLOAT64:
data_size = sizeof (uint64_t);
break;
case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_INT16:
data_size = sizeof (int16_t);
break;
case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_UINT16:
data_size = sizeof (uint16_t);
break;
case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_INT32:
data_size = sizeof (int32_t);
break;
case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_UINT32:
data_size = sizeof (uint32_t);
break;
case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_INT64:
data_size = sizeof (int64_t);
break;
case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_UINT64:
data_size = sizeof (uint64_t);
break;
case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_LONG_DOUBLE:
data_size = 8;
break;
case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_STRING:
deal_with_string(dds_buffer_ptr, is_wstring);
is_string = true;
break;
case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_WCHAR:
data_size = sizeof(uint16_t);
break;
case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_WSTRING:
deal_with_string(dds_buffer_ptr, is_wstring);
is_wstring = true;
break;
case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_MESSAGE:
sub_members = static_cast<const rosidl_typesupport_introspection_cpp::MessageMembers *>(message_member[i].members_->data);
calculate_position_with_align(dds_buffer_ptr, sub_members->members_, sub_members->member_count_);
is_ros_msg_type = true;
break;
}
//standard element
if(!is_string && !is_wstring && !is_ros_msg_type && !message_member[i].is_array_)
scoopySHI marked this conversation as resolved.
Show resolved Hide resolved
{
one_offset = alignment(data_size, last_data_size_, current_position_);
current_position_ += (one_offset + data_size);
last_data_size_ = data_size;
}
//standard array
else if(!is_string && !is_wstring && !is_ros_msg_type && message_member[i].is_array_)
{
for (uint j = 0;j < message_member[i].array_size_; j++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The iterating index should be the same type as array_size_

one_offset = alignment(data_size, last_data_size_, current_position_);
current_position_ += (one_offset + data_size);
last_data_size_ = data_size;
}
}
//array of string
else if (is_string && !is_wstring && !is_ros_msg_type && message_member[i].is_array_)
{
for (uint j = 0;j < message_member[i].array_size_ - 1; j++) {
deal_with_string(dds_buffer_ptr, false);
}
}
//array of wstring
else if (is_wstring && !is_string && !is_ros_msg_type && message_member[i].is_array_)
{
for (uint j = 0;j < message_member[i].array_size_ - 1; j++) {
deal_with_string(dds_buffer_ptr, true);
}
}
else if (is_ros_msg_type && !is_string && !is_wstring && message_member[i].is_array_)
{
for (uint j = 0;j < message_member[i].array_size_ - 1; j++) {
calculate_position_with_align(dds_buffer_ptr, sub_members->members_, sub_members->member_count_);
}
}
}

}

void Player::play_messages_until_queue_empty()
{
ReplayableMessage message;
while (message_queue_.try_dequeue(message) && rclcpp::ok()) {
std::this_thread::sleep_until(start_time_ + message.time_since_start);

auto it = topics_ts_map_.find(message.message->topic_name);
if (it != topics_ts_map_.end() ) {
builtin_interfaces::msg::Time ros_time_to_set;

std::chrono::time_point<std::chrono::high_resolution_clock> time_to_be_set = start_time_ + message.time_since_start;

auto offset_index = it->second.stop_index;
const rosidl_typesupport_introspection_cpp::MessageMember * msg_ptr = it->second.msg_member_ptr;

std::chrono::duration_cast<std::chrono::nanoseconds>(time_to_be_set.time_since_epoch());

ros_time_to_set.sec = static_cast<int32_t>(floor(time_to_be_set.time_since_epoch().count()/1e9));
ros_time_to_set.nanosec = static_cast<uint32_t>(round(time_to_be_set.time_since_epoch().count() - ros_time_to_set.sec*1e9));

//memcpy
uint8_t * buffer_temp = message.message->serialized_data->buffer;
dds_buffer_ptr_ = message.message->serialized_data->buffer;
calculate_position_with_align(dds_buffer_ptr_, msg_ptr, offset_index);
size_t header_time_sec_size = sizeof (int32_t);
unsigned long last_offset = alignment(header_time_sec_size, last_data_size_, current_position_);
current_position_ += last_offset;
buffer_temp = buffer_temp + current_position_ + 4;

memcpy(buffer_temp, &ros_time_to_set, sizeof(builtin_interfaces::msg::Time));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Use memcpy_s I believe Windows will complain about not using it.


current_position_ = 0;
last_data_size_ = ULONG_MAX;
}

if (rclcpp::ok()) {
publishers_[message.message->topic_name]->publish(message.message->serialized_data);
}
}
}


void Player::prepare_topic_ts_map()
{
auto topics = reader_->get_all_topics_and_types();

for (const auto & topic : topics){
auto type_support = rosbag2::get_typesupport(topic.type, "rosidl_typesupport_introspection_cpp");
auto msg_members = static_cast<const rosidl_typesupport_introspection_cpp::MessageMembers *>(type_support->data);

const rosidl_typesupport_introspection_cpp::MessageMember *msg_member_ptr = msg_members->members_;
auto msg_member_count = msg_members->member_count_;
for (unsigned int i = 0;i < msg_member_count;i++) {
if(strcmp(msg_member_ptr[i].name_, "header") == 0)
{
HeaderSupportStruct header_support;
header_support.stop_index = i;
header_support.msg_member_ptr = msg_member_ptr;
topics_ts_map_.insert(std::make_pair(topic.name, header_support));
break;
}
}
}

}

void Player::prepare_publishers()
{
auto topics = reader_->get_all_topics_and_types();
Expand Down
23 changes: 23 additions & 0 deletions rosbag2_transport/src/rosbag2_transport/player.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
#define ROSBAG2_TRANSPORT__PLAYER_HPP_

#include <chrono>
#include <climits>
#include <cmath>
#include <cstring>
#include <future>
#include <memory>
#include <queue>
Expand All @@ -25,6 +28,13 @@
#include "moodycamel/readerwriterqueue.h"

#include "rosbag2_transport/play_options.hpp"
#include "rosbag2_node.hpp"


#include "rosidl_typesupport_cpp/message_type_support.hpp"
#include "rosidl_typesupport_introspection_cpp/field_types.hpp"
#include "rosidl_typesupport_introspection_cpp/message_introspection.hpp"
#include "rosidl_typesupport_introspection_c/message_introspection.h"

#include "replayable_message.hpp"

Expand All @@ -38,6 +48,11 @@ class Reader;
namespace rosbag2_transport
{

typedef struct{
int stop_index;
const rosidl_typesupport_introspection_cpp::MessageMember * msg_member_ptr;
}HeaderSupportStruct;

class GenericPublisher;
class Rosbag2Node;

Expand All @@ -58,8 +73,14 @@ class Player
void play_messages_from_queue();
void play_messages_until_queue_empty();
void prepare_publishers();
void prepare_topic_ts_map();
void calculate_position_with_align(const uint8_t * dds_buffer, const rosidl_typesupport_introspection_cpp::MessageMember *message_member, unsigned long stop_index);
void deal_with_string(const uint8_t * dds_buffer, bool is_wstring);

static constexpr double read_ahead_lower_bound_percentage_ = 0.9;
size_t last_data_size_ = ULONG_MAX;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A more C++ way of doing this would be to use numeric_limits::max()

unsigned long current_position_ = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I believe current_position_ should be std::ptrdiff_t since it is used in pointer math.

uint8_t *dds_buffer_ptr_;
static const std::chrono::milliseconds queue_read_wait_period_;

std::shared_ptr<rosbag2_cpp::Reader> reader_;
Expand All @@ -68,8 +89,10 @@ class Player
mutable std::future<void> storage_loading_future_;
std::shared_ptr<Rosbag2Node> rosbag2_transport_;
std::unordered_map<std::string, std::shared_ptr<GenericPublisher>> publishers_;
std::unordered_map<std::string, header_support_struct> topics_ts_map_;
};

} // namespace rosbag2_transport


#endif // ROSBAG2_TRANSPORT__PLAYER_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -144,19 +144,21 @@ rosbag2_transport_play(PyObject * Py_UNUSED(self), PyObject * args, PyObject * k
"uri",
"storage_id",
"node_prefix",
"use_current_time",
"read_ahead_queue_size",
nullptr
};

char * uri;
char * storage_id;
char * node_prefix;
bool use_current_time = false;
size_t read_ahead_queue_size;
if (!PyArg_ParseTupleAndKeywords(
args, kwargs, "sss|k", const_cast<char **>(kwlist),
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "sss|bk", const_cast<char **>(kwlist),
&uri,
&storage_id,
&node_prefix,
&use_current_time,
&read_ahead_queue_size))
{
return nullptr;
Expand All @@ -167,6 +169,7 @@ rosbag2_transport_play(PyObject * Py_UNUSED(self), PyObject * args, PyObject * k

play_options.node_prefix = std::string(node_prefix);
play_options.read_ahead_queue_size = read_ahead_queue_size;
play_options.use_current_time = use_current_time;

rosbag2_storage::MetadataIo metadata_io{};
rosbag2_storage::BagMetadata metadata{};
Expand Down