Skip to content

Commit

Permalink
improve code
Browse files Browse the repository at this point in the history
Signed-off-by: Shi <Qiyao.Shi@streetscooter.com>
  • Loading branch information
Shi authored and Shi committed Feb 18, 2020
1 parent f822db7 commit 455511a
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 38 deletions.
2 changes: 0 additions & 2 deletions rosbag2_transport/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ find_package(rcl REQUIRED)
find_package(rclcpp REQUIRED)
find_package(rcutils REQUIRED)
find_package(rosbag2 REQUIRED)
find_package(sensor_msgs REQUIRED)
find_package(rmw REQUIRED)
find_package(shared_queues_vendor REQUIRED)
find_package(rosidl_typesupport_introspection_cpp)
Expand All @@ -51,7 +50,6 @@ ament_target_dependencies(${PROJECT_NAME}
rosidl_generator_cpp
rmw
shared_queues_vendor
sensor_msgs
rosidl_typesupport_introspection_cpp
)

Expand Down
52 changes: 19 additions & 33 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,29 +139,24 @@ unsigned long alignment(unsigned long data_size, unsigned long last_data_size, u
}

//deal with string
void Player::deal_with_string(const uint8_t *dds_buffer)
void Player::deal_with_string(const uint8_t *dds_buffer, bool is_wstring)
{
uint32_t length;
size_t string_header = sizeof(uint32_t);//string header 4 Bytes

unsigned long one_offset = alignment(string_header, last_data_size, current_position);
current_position = current_position + one_offset;
memcpy(&length, (dds_buffer + current_position + 4), string_header);
last_data_size = sizeof(char);
current_position += (string_header + length);
}

//deal with wstring
void Player::deal_with_wstring(const uint8_t *dds_buffer)
{
uint32_t length;
size_t string_header = sizeof(uint32_t);//wstring header 4 Bytes

unsigned long one_offset = alignment(string_header, last_data_size, current_position);
current_position = current_position + one_offset;
memcpy(&length, (dds_buffer + current_position + 4), string_header);
last_data_size = sizeof(uint32_t);
current_position += (string_header + length * sizeof(uint32_t));
if(!is_wstring)
{
last_data_size = sizeof(char);
current_position += (string_header + length);
}
else {
last_data_size = sizeof(uint32_t);
current_position += (string_header + length * sizeof(uint32_t));
}
}

//find out the real position of header in fastrtps serialized data
Expand All @@ -175,6 +170,7 @@ void Player::calculate_position_with_align(const uint8_t * dds_buffer_ptr, const
bool is_wstring = false;
bool is_ros_msg_type = false;
const rosidl_typesupport_introspection_cpp::MessageMembers * sub_members;
//reference:https://github.com/ros2/rmw_fastrtps/blob/9438cca2a6a21b2436684607fea8a78624363f80/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport_impl.hpp#L525
switch (message_member[i].type_id_) {
case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_BOOL:
data_size = sizeof(bool);
Expand Down Expand Up @@ -215,14 +211,14 @@ void Player::calculate_position_with_align(const uint8_t * dds_buffer_ptr, const
data_size = 8;
break;
case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_STRING:
deal_with_string(dds_buffer_ptr);
deal_with_string(dds_buffer_ptr, is_wstring);
is_string = true;
break;
case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_WCHAR:
data_size = sizeof(uint16_t);
break;
case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_WSTRING:
deal_with_wstring(dds_buffer_ptr);
deal_with_string(dds_buffer_ptr, is_wstring);
is_wstring = true;
break;
case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_MESSAGE:
Expand Down Expand Up @@ -251,14 +247,14 @@ void Player::calculate_position_with_align(const uint8_t * dds_buffer_ptr, const
else if (is_string && !is_wstring && !is_ros_msg_type && message_member[i].is_array_)
{
for (uint j = 0;j < message_member[i].array_size_ - 1; j++) {
deal_with_string(dds_buffer_ptr);
deal_with_string(dds_buffer_ptr, false);
}
}
//array of wstring
else if (is_wstring && !is_string && !is_ros_msg_type && message_member[i].is_array_)
{
for (uint j = 0;j < message_member[i].array_size_ - 1; j++) {
deal_with_wstring(dds_buffer_ptr);
deal_with_string(dds_buffer_ptr, true);
}
}
else if (is_ros_msg_type && !is_string && !is_wstring && message_member[i].is_array_)
Expand All @@ -277,13 +273,14 @@ void Player::play_messages_until_queue_empty()
while (message_queue_.try_dequeue(message) && rclcpp::ok()) {
std::this_thread::sleep_until(start_time_ + message.time_since_start);

if (topics_ts_map_.find(message.message->topic_name) != topics_ts_map_.end() ) {
auto it = topics_ts_map_.find(message.message->topic_name);
if (it != topics_ts_map_.end() ) {
builtin_interfaces::msg::Time ros_time_to_set;

std::chrono::time_point<std::chrono::high_resolution_clock> time_to_be_set = start_time_ + message.time_since_start;

auto offset_index = topics_ts_map_[message.message->topic_name].stop_index;
const rosidl_typesupport_introspection_cpp::MessageMember * msg_ptr = topics_ts_map_[message.message->topic_name].msg_member_ptr;
auto offset_index = it->second.stop_index;
const rosidl_typesupport_introspection_cpp::MessageMember * msg_ptr = it->second.msg_member_ptr;

std::chrono::duration_cast<std::chrono::nanoseconds>(time_to_be_set.time_since_epoch());

Expand All @@ -305,11 +302,6 @@ void Player::play_messages_until_queue_empty()
last_data_size = ULONG_MAX;
}

// if(options.clock_type == "current")
// {
// prepare_clock(message);
// }

if (rclcpp::ok()) {
publishers_[message.message->topic_name]->publish(message.message->serialized_data);
}
Expand All @@ -331,12 +323,6 @@ void Player::prepare_topic_ts_map()
for (unsigned int i = 0;i < msg_member_count;i++) {
if(strcmp(msg_member_ptr[i].name_, "header") == 0)
{
//caculate header time stamper offset (int32----time.sec)
// current_position = calculate_position_with_align(msg_member_ptr, i);
// size_t header_time_sec_size = sizeof (int32_t);
// unsigned long one_offset = header_time_sec_size > last_data_size ? (header_time_sec_size - current_position % header_time_sec_size) : 0;
// unsigned long real_offset = one_offset + current_position;
//topics_ts_map_.insert(std::make_pair(topic.name, msg_member_ptr[i].offset_));
header_support_struct header_support;
header_support.stop_index = i;
header_support.msg_member_ptr = msg_member_ptr;
Expand Down
4 changes: 1 addition & 3 deletions rosbag2_transport/src/rosbag2_transport/player.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,10 @@ class Player
void wait_for_filled_queue(const PlayOptions & options) const;
void play_messages_from_queue();
void play_messages_until_queue_empty();
void prepare_clock(ReplayableMessage & message);
void prepare_publishers();
void prepare_topic_ts_map();
void calculate_position_with_align(const uint8_t * dds_buffer, const rosidl_typesupport_introspection_cpp::MessageMember *message_member, unsigned long stop_index);
void deal_with_string(const uint8_t * dds_buffer);
void deal_with_wstring(const uint8_t * dds_buffer);
void deal_with_string(const uint8_t * dds_buffer, bool is_wstring);

static constexpr double read_ahead_lower_bound_percentage_ = 0.9;
static const std::chrono::milliseconds queue_read_wait_period_;
Expand Down

0 comments on commit 455511a

Please sign in to comment.