From 1857c4c753bd5fa40ab76228b17cd0f8ff61d22d Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Tue, 29 Mar 2022 13:50:58 +0800 Subject: [PATCH 1/4] support to publish as loaned message Signed-off-by: Barry Xu --- .../include/rosbag2_transport/player.hpp | 34 ++++++++++++++++++- .../src/rosbag2_transport/player.cpp | 10 +++--- .../test/rosbag2_transport/mock_player.hpp | 4 ++- 3 files changed, 42 insertions(+), 6 deletions(-) diff --git a/rosbag2_transport/include/rosbag2_transport/player.hpp b/rosbag2_transport/include/rosbag2_transport/player.hpp index de9f6bf7a5..e80479abad 100644 --- a/rosbag2_transport/include/rosbag2_transport/player.hpp +++ b/rosbag2_transport/include/rosbag2_transport/player.hpp @@ -16,11 +16,13 @@ #define ROSBAG2_TRANSPORT__PLAYER_HPP_ #include +#include #include #include #include #include #include +#include #include #include "keyboard_handler/keyboard_handler.hpp" @@ -56,6 +58,36 @@ class Reader; namespace rosbag2_transport { +class DataPublisher final +{ +public: + explicit DataPublisher(std::shared_ptr pub) + : publisher_(std::move(pub)) + { + using std::placeholders::_1; + if (publisher_->can_loan_messages()) { + publish_func_ = std::bind(&rclcpp::GenericPublisher::publish_as_loaned_msg, publisher_, _1); + } else { + publish_func_ = std::bind(&rclcpp::GenericPublisher::publish, publisher_, _1); + } + } + + ~DataPublisher() {} + + void publish(const rclcpp::SerializedMessage & message) + { + publish_func_(message); + } + + std::shared_ptr generic_publisher() + { + return publisher_; + } + +private: + std::shared_ptr publisher_; + std::function publish_func_; +}; class Player : public rclcpp::Node { @@ -157,7 +189,7 @@ class Player : public rclcpp::Node std::mutex ready_to_play_from_queue_mutex_; std::condition_variable ready_to_play_from_queue_cv_; rclcpp::Publisher::SharedPtr clock_publisher_; - std::unordered_map> publishers_; + std::unordered_map> publishers_; private: rosbag2_storage::SerializedBagMessageSharedPtr * peek_next_message_from_queue(); diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index c6308b996a..d1212111ec 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -238,7 +238,7 @@ void Player::play() } for (auto pub : publishers_) { try { - if (!pub.second->wait_for_all_acked(timeout)) { + if (!pub.second->generic_publisher()->wait_for_all_acked(timeout)) { RCLCPP_ERROR( get_logger(), "Timed out while waiting for all published messages to be acknowledged for topic %s", @@ -521,9 +521,11 @@ void Player::prepare_publishers() topic, topic_qos_profile_overrides_, get_logger()); try { - publishers_.insert( - std::make_pair( - topic.name, create_generic_publisher(topic.name, topic.type, topic_qos))); + std::shared_ptr pub = + create_generic_publisher(topic.name, topic.type, topic_qos); + std::shared_ptr data_pub = + std::make_shared(std::move(pub)); + publishers_.insert(std::make_pair(topic.name, data_pub)); if (play_options_.wait_acked_timeout >= 0 && topic_qos.reliability() == rclcpp::ReliabilityPolicy::BestEffort) { diff --git a/rosbag2_transport/test/rosbag2_transport/mock_player.hpp b/rosbag2_transport/test/rosbag2_transport/mock_player.hpp index aff44fd8ef..ff0404c622 100644 --- a/rosbag2_transport/test/rosbag2_transport/mock_player.hpp +++ b/rosbag2_transport/test/rosbag2_transport/mock_player.hpp @@ -35,7 +35,9 @@ class MockPlayer : public rosbag2_transport::Player { std::vector pub_list; for (const auto & publisher : publishers_) { - pub_list.push_back(static_cast(publisher.second.get())); + pub_list.push_back( + static_cast( + publisher.second->generic_publisher().get())); } return pub_list; } From d3f44d91662b53db3ad366dbd5f7b386fcd83cf2 Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Wed, 30 Mar 2022 16:34:21 +0800 Subject: [PATCH 2/4] Add option to enable loan message Signed-off-by: Barry Xu --- ros2bag/ros2bag/verb/play.py | 6 ++++++ rosbag2_py/src/rosbag2_py/_transport.cpp | 1 + .../include/rosbag2_transport/play_options.hpp | 3 +++ rosbag2_transport/include/rosbag2_transport/player.hpp | 4 ++-- rosbag2_transport/src/rosbag2_transport/player.cpp | 2 +- 5 files changed, 13 insertions(+), 3 deletions(-) diff --git a/ros2bag/ros2bag/verb/play.py b/ros2bag/ros2bag/verb/play.py index d82f1a2a7b..f1f0feb84a 100644 --- a/ros2bag/ros2bag/verb/play.py +++ b/ros2bag/ros2bag/verb/play.py @@ -105,6 +105,11 @@ def add_arguments(self, parser, cli_name): # noqa: D102 "Note that this option is valid only if the publisher\'s QOS profile is " 'RELIABLE.', metavar='TIMEOUT') + parser.add_argument( + '--enable-loan-message', action='store_true', default=False, + help='Enable to publish as loaned message if loaned message can be used. ' + 'It can help to reduce the number of data copies, so there is a greater benefit ' + 'for sending big data.') def main(self, *, args): # noqa: D102 qos_profile_overrides = {} # Specify a valid default @@ -144,6 +149,7 @@ def main(self, *, args): # noqa: D102 play_options.start_paused = args.start_paused play_options.start_offset = args.start_offset play_options.wait_acked_timeout = args.wait_for_all_acked + play_options.enable_loan_message = args.enable_loan_message player = Player() player.play(storage_options, play_options) diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index c11ca23929..3a1b147cc7 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -248,6 +248,7 @@ PYBIND11_MODULE(_transport, m) { &PlayOptions::getStartOffset, &PlayOptions::setStartOffset) .def_readwrite("wait_acked_timeout", &PlayOptions::wait_acked_timeout) + .def_readwrite("enable_loan_message", &PlayOptions::enable_loan_message) ; py::class_(m, "RecordOptions") diff --git a/rosbag2_transport/include/rosbag2_transport/play_options.hpp b/rosbag2_transport/include/rosbag2_transport/play_options.hpp index 27e33b9561..4b8903ff0f 100644 --- a/rosbag2_transport/include/rosbag2_transport/play_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/play_options.hpp @@ -66,6 +66,9 @@ struct PlayOptions // Timeout for waiting for all published messages to be acknowledged. // Negative value means that published messages do not need to be acknowledged. int64_t wait_acked_timeout = -1; + + // Enable to publish as loaned message + bool enable_loan_message = false; }; } // namespace rosbag2_transport diff --git a/rosbag2_transport/include/rosbag2_transport/player.hpp b/rosbag2_transport/include/rosbag2_transport/player.hpp index e80479abad..c2d5262f30 100644 --- a/rosbag2_transport/include/rosbag2_transport/player.hpp +++ b/rosbag2_transport/include/rosbag2_transport/player.hpp @@ -61,11 +61,11 @@ namespace rosbag2_transport class DataPublisher final { public: - explicit DataPublisher(std::shared_ptr pub) + explicit DataPublisher(std::shared_ptr pub, bool enable_loan_message) : publisher_(std::move(pub)) { using std::placeholders::_1; - if (publisher_->can_loan_messages()) { + if (enable_loan_message && publisher_->can_loan_messages()) { publish_func_ = std::bind(&rclcpp::GenericPublisher::publish_as_loaned_msg, publisher_, _1); } else { publish_func_ = std::bind(&rclcpp::GenericPublisher::publish, publisher_, _1); diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index d1212111ec..575c4c932f 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -524,7 +524,7 @@ void Player::prepare_publishers() std::shared_ptr pub = create_generic_publisher(topic.name, topic.type, topic_qos); std::shared_ptr data_pub = - std::make_shared(std::move(pub)); + std::make_shared(std::move(pub), play_options_.enable_loan_message); publishers_.insert(std::make_pair(topic.name, data_pub)); if (play_options_.wait_acked_timeout >= 0 && topic_qos.reliability() == rclcpp::ReliabilityPolicy::BestEffort) From 7f4e3fed53f7493d0fb7ef089e84916924450902 Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Fri, 1 Apr 2022 11:13:14 +0800 Subject: [PATCH 3/4] Use disable-loan-message instead of enable-loan-message Signed-off-by: Barry Xu --- ros2bag/ros2bag/verb/play.py | 11 ++++++----- rosbag2_py/src/rosbag2_py/_transport.cpp | 2 +- .../include/rosbag2_transport/play_options.hpp | 4 ++-- .../include/rosbag2_transport/player.hpp | 8 ++++---- rosbag2_transport/src/rosbag2_transport/player.cpp | 2 +- 5 files changed, 14 insertions(+), 13 deletions(-) diff --git a/ros2bag/ros2bag/verb/play.py b/ros2bag/ros2bag/verb/play.py index f1f0feb84a..92e01bcbb1 100644 --- a/ros2bag/ros2bag/verb/play.py +++ b/ros2bag/ros2bag/verb/play.py @@ -106,10 +106,11 @@ def add_arguments(self, parser, cli_name): # noqa: D102 'RELIABLE.', metavar='TIMEOUT') parser.add_argument( - '--enable-loan-message', action='store_true', default=False, - help='Enable to publish as loaned message if loaned message can be used. ' - 'It can help to reduce the number of data copies, so there is a greater benefit ' - 'for sending big data.') + '--disable-loan-message', action='store_true', default=False, + help='Disable to publish as loaned message. ' + 'By default, if loaned message can be used, messages are published as loaned ' + 'message. It can help to reduce the number of data copies, so there is a greater ' + 'benefit for sending big data.') def main(self, *, args): # noqa: D102 qos_profile_overrides = {} # Specify a valid default @@ -149,7 +150,7 @@ def main(self, *, args): # noqa: D102 play_options.start_paused = args.start_paused play_options.start_offset = args.start_offset play_options.wait_acked_timeout = args.wait_for_all_acked - play_options.enable_loan_message = args.enable_loan_message + play_options.disable_loan_message = args.disable_loan_message player = Player() player.play(storage_options, play_options) diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index 3a1b147cc7..ba514e97af 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -248,7 +248,7 @@ PYBIND11_MODULE(_transport, m) { &PlayOptions::getStartOffset, &PlayOptions::setStartOffset) .def_readwrite("wait_acked_timeout", &PlayOptions::wait_acked_timeout) - .def_readwrite("enable_loan_message", &PlayOptions::enable_loan_message) + .def_readwrite("disable_loan_message", &PlayOptions::disable_loan_message) ; py::class_(m, "RecordOptions") diff --git a/rosbag2_transport/include/rosbag2_transport/play_options.hpp b/rosbag2_transport/include/rosbag2_transport/play_options.hpp index 4b8903ff0f..2e8c012c5b 100644 --- a/rosbag2_transport/include/rosbag2_transport/play_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/play_options.hpp @@ -67,8 +67,8 @@ struct PlayOptions // Negative value means that published messages do not need to be acknowledged. int64_t wait_acked_timeout = -1; - // Enable to publish as loaned message - bool enable_loan_message = false; + // Disable to publish as loaned message + bool disable_loan_message = false; }; } // namespace rosbag2_transport diff --git a/rosbag2_transport/include/rosbag2_transport/player.hpp b/rosbag2_transport/include/rosbag2_transport/player.hpp index c2d5262f30..3bf2f28cdc 100644 --- a/rosbag2_transport/include/rosbag2_transport/player.hpp +++ b/rosbag2_transport/include/rosbag2_transport/player.hpp @@ -61,14 +61,14 @@ namespace rosbag2_transport class DataPublisher final { public: - explicit DataPublisher(std::shared_ptr pub, bool enable_loan_message) + explicit DataPublisher(std::shared_ptr pub, bool disable_loan_message) : publisher_(std::move(pub)) { using std::placeholders::_1; - if (enable_loan_message && publisher_->can_loan_messages()) { - publish_func_ = std::bind(&rclcpp::GenericPublisher::publish_as_loaned_msg, publisher_, _1); - } else { + if (disable_loan_message || !publisher_->can_loan_messages()) { publish_func_ = std::bind(&rclcpp::GenericPublisher::publish, publisher_, _1); + } else { + publish_func_ = std::bind(&rclcpp::GenericPublisher::publish_as_loaned_msg, publisher_, _1); } } diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index 575c4c932f..a0a6dc26c1 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -524,7 +524,7 @@ void Player::prepare_publishers() std::shared_ptr pub = create_generic_publisher(topic.name, topic.type, topic_qos); std::shared_ptr data_pub = - std::make_shared(std::move(pub), play_options_.enable_loan_message); + std::make_shared(std::move(pub), play_options_.disable_loan_message); publishers_.insert(std::make_pair(topic.name, data_pub)); if (play_options_.wait_acked_timeout >= 0 && topic_qos.reliability() == rclcpp::ReliabilityPolicy::BestEffort) From a121b2fa99821e5e0dcf3ba9dfe1b5864a972f42 Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Mon, 4 Apr 2022 14:54:05 +0800 Subject: [PATCH 4/4] Change to inner class and return code instead of exception Signed-off-by: Barry Xu --- .../include/rosbag2_transport/player.hpp | 65 ++++++++++--------- .../src/rosbag2_transport/player.cpp | 17 +++-- 2 files changed, 45 insertions(+), 37 deletions(-) diff --git a/rosbag2_transport/include/rosbag2_transport/player.hpp b/rosbag2_transport/include/rosbag2_transport/player.hpp index 3bf2f28cdc..5ddb6418f9 100644 --- a/rosbag2_transport/include/rosbag2_transport/player.hpp +++ b/rosbag2_transport/include/rosbag2_transport/player.hpp @@ -58,37 +58,6 @@ class Reader; namespace rosbag2_transport { -class DataPublisher final -{ -public: - explicit DataPublisher(std::shared_ptr pub, bool disable_loan_message) - : publisher_(std::move(pub)) - { - using std::placeholders::_1; - if (disable_loan_message || !publisher_->can_loan_messages()) { - publish_func_ = std::bind(&rclcpp::GenericPublisher::publish, publisher_, _1); - } else { - publish_func_ = std::bind(&rclcpp::GenericPublisher::publish_as_loaned_msg, publisher_, _1); - } - } - - ~DataPublisher() {} - - void publish(const rclcpp::SerializedMessage & message) - { - publish_func_(message); - } - - std::shared_ptr generic_publisher() - { - return publisher_; - } - -private: - std::shared_ptr publisher_; - std::function publish_func_; -}; - class Player : public rclcpp::Node { public: @@ -185,11 +154,43 @@ class Player : public rclcpp::Node void seek(rcutils_time_point_value_t time_point); protected: + class PlayerPublisher final + { +public: + explicit PlayerPublisher( + std::shared_ptr pub, + bool disable_loan_message) + : publisher_(std::move(pub)) + { + using std::placeholders::_1; + if (disable_loan_message || !publisher_->can_loan_messages()) { + publish_func_ = std::bind(&rclcpp::GenericPublisher::publish, publisher_, _1); + } else { + publish_func_ = std::bind(&rclcpp::GenericPublisher::publish_as_loaned_msg, publisher_, _1); + } + } + + ~PlayerPublisher() {} + + void publish(const rclcpp::SerializedMessage & message) + { + publish_func_(message); + } + + std::shared_ptr generic_publisher() + { + return publisher_; + } + +private: + std::shared_ptr publisher_; + std::function publish_func_; + }; bool is_ready_to_play_from_queue_{false}; std::mutex ready_to_play_from_queue_mutex_; std::condition_variable ready_to_play_from_queue_cv_; rclcpp::Publisher::SharedPtr clock_publisher_; - std::unordered_map> publishers_; + std::unordered_map> publishers_; private: rosbag2_storage::SerializedBagMessageSharedPtr * peek_next_message_from_queue(); diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index a0a6dc26c1..f92ec168b6 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -523,9 +523,10 @@ void Player::prepare_publishers() try { std::shared_ptr pub = create_generic_publisher(topic.name, topic.type, topic_qos); - std::shared_ptr data_pub = - std::make_shared(std::move(pub), play_options_.disable_loan_message); - publishers_.insert(std::make_pair(topic.name, data_pub)); + std::shared_ptr player_pub = + std::make_shared( + std::move(pub), play_options_.disable_loan_message); + publishers_.insert(std::make_pair(topic.name, player_pub)); if (play_options_.wait_acked_timeout >= 0 && topic_qos.reliability() == rclcpp::ReliabilityPolicy::BestEffort) { @@ -558,8 +559,14 @@ bool Player::publish_message(rosbag2_storage::SerializedBagMessageSharedPtr mess bool message_published = false; auto publisher_iter = publishers_.find(message->topic_name); if (publisher_iter != publishers_.end()) { - publisher_iter->second->publish(rclcpp::SerializedMessage(*message->serialized_data)); - message_published = true; + try { + publisher_iter->second->publish(rclcpp::SerializedMessage(*message->serialized_data)); + message_published = true; + } catch (const std::exception & e) { + RCLCPP_ERROR_STREAM( + get_logger(), "Failed to publish message on '" << message->topic_name << + "' topic. \nError: %s" << e.what()); + } } return message_published; }