Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support to publish as loaned message #981

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions ros2bag/ros2bag/verb/play.py
Expand Up @@ -105,6 +105,12 @@ def add_arguments(self, parser, cli_name): # noqa: D102
"Note that this option is valid only if the publisher\'s QOS profile is "
'RELIABLE.',
metavar='TIMEOUT')
parser.add_argument(
'--disable-loan-message', action='store_true', default=False,
help='Disable to publish as loaned message. '
'By default, if loaned message can be used, messages are published as loaned '
'message. It can help to reduce the number of data copies, so there is a greater '
'benefit for sending big data.')

def main(self, *, args): # noqa: D102
qos_profile_overrides = {} # Specify a valid default
Expand Down Expand Up @@ -144,6 +150,7 @@ def main(self, *, args): # noqa: D102
play_options.start_paused = args.start_paused
play_options.start_offset = args.start_offset
play_options.wait_acked_timeout = args.wait_for_all_acked
play_options.disable_loan_message = args.disable_loan_message

player = Player()
player.play(storage_options, play_options)
1 change: 1 addition & 0 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Expand Up @@ -248,6 +248,7 @@ PYBIND11_MODULE(_transport, m) {
&PlayOptions::getStartOffset,
&PlayOptions::setStartOffset)
.def_readwrite("wait_acked_timeout", &PlayOptions::wait_acked_timeout)
.def_readwrite("disable_loan_message", &PlayOptions::disable_loan_message)
;

py::class_<RecordOptions>(m, "RecordOptions")
Expand Down
Expand Up @@ -66,6 +66,9 @@ struct PlayOptions
// Timeout for waiting for all published messages to be acknowledged.
// Negative value means that published messages do not need to be acknowledged.
int64_t wait_acked_timeout = -1;

// Disable to publish as loaned message
bool disable_loan_message = false;
};

} // namespace rosbag2_transport
Expand Down
37 changes: 35 additions & 2 deletions rosbag2_transport/include/rosbag2_transport/player.hpp
Expand Up @@ -16,11 +16,13 @@
#define ROSBAG2_TRANSPORT__PLAYER_HPP_

#include <chrono>
#include <functional>
#include <future>
#include <memory>
#include <queue>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>

#include "keyboard_handler/keyboard_handler.hpp"
Expand Down Expand Up @@ -56,7 +58,6 @@ class Reader;

namespace rosbag2_transport
{

class Player : public rclcpp::Node
{
public:
Expand Down Expand Up @@ -153,11 +154,43 @@ class Player : public rclcpp::Node
void seek(rcutils_time_point_value_t time_point);

protected:
class PlayerPublisher final
{
public:
explicit PlayerPublisher(
std::shared_ptr<rclcpp::GenericPublisher> pub,
bool disable_loan_message)
: publisher_(std::move(pub))
{
using std::placeholders::_1;
if (disable_loan_message || !publisher_->can_loan_messages()) {
publish_func_ = std::bind(&rclcpp::GenericPublisher::publish, publisher_, _1);
} else {
publish_func_ = std::bind(&rclcpp::GenericPublisher::publish_as_loaned_msg, publisher_, _1);
}
}

~PlayerPublisher() {}

void publish(const rclcpp::SerializedMessage & message)
{
publish_func_(message);
}

std::shared_ptr<rclcpp::GenericPublisher> generic_publisher()
{
return publisher_;
}

private:
std::shared_ptr<rclcpp::GenericPublisher> publisher_;
std::function<void(const rclcpp::SerializedMessage &)> publish_func_;
};
bool is_ready_to_play_from_queue_{false};
std::mutex ready_to_play_from_queue_mutex_;
std::condition_variable ready_to_play_from_queue_cv_;
rclcpp::Publisher<rosgraph_msgs::msg::Clock>::SharedPtr clock_publisher_;
std::unordered_map<std::string, std::shared_ptr<rclcpp::GenericPublisher>> publishers_;
std::unordered_map<std::string, std::shared_ptr<PlayerPublisher>> publishers_;

private:
rosbag2_storage::SerializedBagMessageSharedPtr * peek_next_message_from_queue();
Expand Down
21 changes: 15 additions & 6 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Expand Up @@ -238,7 +238,7 @@ void Player::play()
}
for (auto pub : publishers_) {
try {
if (!pub.second->wait_for_all_acked(timeout)) {
if (!pub.second->generic_publisher()->wait_for_all_acked(timeout)) {
RCLCPP_ERROR(
get_logger(),
"Timed out while waiting for all published messages to be acknowledged for topic %s",
Expand Down Expand Up @@ -521,9 +521,12 @@ void Player::prepare_publishers()
topic, topic_qos_profile_overrides_,
get_logger());
try {
publishers_.insert(
std::make_pair(
topic.name, create_generic_publisher(topic.name, topic.type, topic_qos)));
std::shared_ptr<rclcpp::GenericPublisher> pub =
create_generic_publisher(topic.name, topic.type, topic_qos);
std::shared_ptr<Player::PlayerPublisher> player_pub =
std::make_shared<Player::PlayerPublisher>(
std::move(pub), play_options_.disable_loan_message);
publishers_.insert(std::make_pair(topic.name, player_pub));
if (play_options_.wait_acked_timeout >= 0 &&
topic_qos.reliability() == rclcpp::ReliabilityPolicy::BestEffort)
{
Expand Down Expand Up @@ -556,8 +559,14 @@ bool Player::publish_message(rosbag2_storage::SerializedBagMessageSharedPtr mess
bool message_published = false;
auto publisher_iter = publishers_.find(message->topic_name);
if (publisher_iter != publishers_.end()) {
publisher_iter->second->publish(rclcpp::SerializedMessage(*message->serialized_data));
message_published = true;
try {
publisher_iter->second->publish(rclcpp::SerializedMessage(*message->serialized_data));
message_published = true;
} catch (const std::exception & e) {
RCLCPP_ERROR_STREAM(
get_logger(), "Failed to publish message on '" << message->topic_name <<
"' topic. \nError: %s" << e.what());
}
}
return message_published;
}
Expand Down
4 changes: 3 additions & 1 deletion rosbag2_transport/test/rosbag2_transport/mock_player.hpp
Expand Up @@ -35,7 +35,9 @@ class MockPlayer : public rosbag2_transport::Player
{
std::vector<rclcpp::PublisherBase *> pub_list;
for (const auto & publisher : publishers_) {
pub_list.push_back(static_cast<rclcpp::PublisherBase *>(publisher.second.get()));
pub_list.push_back(
static_cast<rclcpp::PublisherBase *>(
publisher.second->generic_publisher().get()));
}
return pub_list;
}
Expand Down