diff --git a/ros2bag/ros2bag/verb/play.py b/ros2bag/ros2bag/verb/play.py index d82f1a2a7b..92e01bcbb1 100644 --- a/ros2bag/ros2bag/verb/play.py +++ b/ros2bag/ros2bag/verb/play.py @@ -105,6 +105,12 @@ def add_arguments(self, parser, cli_name): # noqa: D102 "Note that this option is valid only if the publisher\'s QOS profile is " 'RELIABLE.', metavar='TIMEOUT') + parser.add_argument( + '--disable-loan-message', action='store_true', default=False, + help='Disable to publish as loaned message. ' + 'By default, if loaned message can be used, messages are published as loaned ' + 'message. It can help to reduce the number of data copies, so there is a greater ' + 'benefit for sending big data.') def main(self, *, args): # noqa: D102 qos_profile_overrides = {} # Specify a valid default @@ -144,6 +150,7 @@ def main(self, *, args): # noqa: D102 play_options.start_paused = args.start_paused play_options.start_offset = args.start_offset play_options.wait_acked_timeout = args.wait_for_all_acked + play_options.disable_loan_message = args.disable_loan_message player = Player() player.play(storage_options, play_options) diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index c11ca23929..ba514e97af 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -248,6 +248,7 @@ PYBIND11_MODULE(_transport, m) { &PlayOptions::getStartOffset, &PlayOptions::setStartOffset) .def_readwrite("wait_acked_timeout", &PlayOptions::wait_acked_timeout) + .def_readwrite("disable_loan_message", &PlayOptions::disable_loan_message) ; py::class_(m, "RecordOptions") diff --git a/rosbag2_transport/include/rosbag2_transport/play_options.hpp b/rosbag2_transport/include/rosbag2_transport/play_options.hpp index 27e33b9561..2e8c012c5b 100644 --- a/rosbag2_transport/include/rosbag2_transport/play_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/play_options.hpp @@ -66,6 +66,9 @@ struct PlayOptions // Timeout for waiting for all published messages to be acknowledged. // Negative value means that published messages do not need to be acknowledged. int64_t wait_acked_timeout = -1; + + // Disable to publish as loaned message + bool disable_loan_message = false; }; } // namespace rosbag2_transport diff --git a/rosbag2_transport/include/rosbag2_transport/player.hpp b/rosbag2_transport/include/rosbag2_transport/player.hpp index de9f6bf7a5..5ddb6418f9 100644 --- a/rosbag2_transport/include/rosbag2_transport/player.hpp +++ b/rosbag2_transport/include/rosbag2_transport/player.hpp @@ -16,11 +16,13 @@ #define ROSBAG2_TRANSPORT__PLAYER_HPP_ #include +#include #include #include #include #include #include +#include #include #include "keyboard_handler/keyboard_handler.hpp" @@ -56,7 +58,6 @@ class Reader; namespace rosbag2_transport { - class Player : public rclcpp::Node { public: @@ -153,11 +154,43 @@ class Player : public rclcpp::Node void seek(rcutils_time_point_value_t time_point); protected: + class PlayerPublisher final + { +public: + explicit PlayerPublisher( + std::shared_ptr pub, + bool disable_loan_message) + : publisher_(std::move(pub)) + { + using std::placeholders::_1; + if (disable_loan_message || !publisher_->can_loan_messages()) { + publish_func_ = std::bind(&rclcpp::GenericPublisher::publish, publisher_, _1); + } else { + publish_func_ = std::bind(&rclcpp::GenericPublisher::publish_as_loaned_msg, publisher_, _1); + } + } + + ~PlayerPublisher() {} + + void publish(const rclcpp::SerializedMessage & message) + { + publish_func_(message); + } + + std::shared_ptr generic_publisher() + { + return publisher_; + } + +private: + std::shared_ptr publisher_; + std::function publish_func_; + }; bool is_ready_to_play_from_queue_{false}; std::mutex ready_to_play_from_queue_mutex_; std::condition_variable ready_to_play_from_queue_cv_; rclcpp::Publisher::SharedPtr clock_publisher_; - std::unordered_map> publishers_; + std::unordered_map> publishers_; private: rosbag2_storage::SerializedBagMessageSharedPtr * peek_next_message_from_queue(); diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index c6308b996a..f92ec168b6 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -238,7 +238,7 @@ void Player::play() } for (auto pub : publishers_) { try { - if (!pub.second->wait_for_all_acked(timeout)) { + if (!pub.second->generic_publisher()->wait_for_all_acked(timeout)) { RCLCPP_ERROR( get_logger(), "Timed out while waiting for all published messages to be acknowledged for topic %s", @@ -521,9 +521,12 @@ void Player::prepare_publishers() topic, topic_qos_profile_overrides_, get_logger()); try { - publishers_.insert( - std::make_pair( - topic.name, create_generic_publisher(topic.name, topic.type, topic_qos))); + std::shared_ptr pub = + create_generic_publisher(topic.name, topic.type, topic_qos); + std::shared_ptr player_pub = + std::make_shared( + std::move(pub), play_options_.disable_loan_message); + publishers_.insert(std::make_pair(topic.name, player_pub)); if (play_options_.wait_acked_timeout >= 0 && topic_qos.reliability() == rclcpp::ReliabilityPolicy::BestEffort) { @@ -556,8 +559,14 @@ bool Player::publish_message(rosbag2_storage::SerializedBagMessageSharedPtr mess bool message_published = false; auto publisher_iter = publishers_.find(message->topic_name); if (publisher_iter != publishers_.end()) { - publisher_iter->second->publish(rclcpp::SerializedMessage(*message->serialized_data)); - message_published = true; + try { + publisher_iter->second->publish(rclcpp::SerializedMessage(*message->serialized_data)); + message_published = true; + } catch (const std::exception & e) { + RCLCPP_ERROR_STREAM( + get_logger(), "Failed to publish message on '" << message->topic_name << + "' topic. \nError: %s" << e.what()); + } } return message_published; } diff --git a/rosbag2_transport/test/rosbag2_transport/mock_player.hpp b/rosbag2_transport/test/rosbag2_transport/mock_player.hpp index aff44fd8ef..ff0404c622 100644 --- a/rosbag2_transport/test/rosbag2_transport/mock_player.hpp +++ b/rosbag2_transport/test/rosbag2_transport/mock_player.hpp @@ -35,7 +35,9 @@ class MockPlayer : public rosbag2_transport::Player { std::vector pub_list; for (const auto & publisher : publishers_) { - pub_list.push_back(static_cast(publisher.second.get())); + pub_list.push_back( + static_cast( + publisher.second->generic_publisher().get())); } return pub_list; }