Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add publish by loaned message in GenericPublisher #1856

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions rclcpp/include/rclcpp/generic_publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
#include "rclcpp/typesupport_helpers.hpp"
#include "rclcpp/visibility_control.hpp"

#include "rmw/rmw.h"

namespace rclcpp
{

Expand Down Expand Up @@ -116,9 +118,24 @@ class GenericPublisher : public rclcpp::PublisherBase
RCLCPP_PUBLIC
void publish(const rclcpp::SerializedMessage & message);

/**
* Publish a rclcpp::SerializedMessage via loaned message after de-serialization.
*
* \param message a serialized message
* \throws anything rclcpp::exceptions::throw_from_rcl_error can show
*/
RCLCPP_PUBLIC
void publish_as_loaned_msg(const rclcpp::SerializedMessage & message);

private:
// The type support library should stay loaded, so it is stored in the GenericPublisher
std::shared_ptr<rcpputils::SharedLibrary> ts_lib_;

void * borrow_loaned_message();
void deserialize_message(
const rmw_serialized_message_t & serialized_message,
void * deserialized_msg);
void publish_loaned_message(void * loaned_message);
};

} // namespace rclcpp
Expand Down
2 changes: 2 additions & 0 deletions rclcpp/include/rclcpp/publisher_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,8 @@ class PublisherBase : public std::enable_shared_from_this<PublisherBase>
uint64_t intra_process_publisher_id_;

rmw_gid_t rmw_gid_;

const rosidl_message_type_support_t type_support_;
};

} // namespace rclcpp
Expand Down
45 changes: 45 additions & 0 deletions rclcpp/src/rclcpp/generic_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,49 @@ void GenericPublisher::publish(const rclcpp::SerializedMessage & message)
}
}

void GenericPublisher::publish_as_loaned_msg(const rclcpp::SerializedMessage & message)
{
auto loaned_message = borrow_loaned_message();
deserialize_message(message.get_rcl_serialized_message(), loaned_message);
publish_loaned_message(loaned_message);
}

void * GenericPublisher::borrow_loaned_message()
{
void * loaned_message = nullptr;
auto return_code = rcl_borrow_loaned_message(
fujitatomoya marked this conversation as resolved.
Show resolved Hide resolved
get_publisher_handle().get(), &type_support_, &loaned_message);

if (return_code != RMW_RET_OK) {
if (return_code == RCL_RET_UNSUPPORTED) {
rclcpp::exceptions::throw_from_rcl_error(
return_code,
"current middleware cannot support loan messages");
} else {
rclcpp::exceptions::throw_from_rcl_error(return_code, "failed to borrow loaned msg");
}
}
return loaned_message;
}

void GenericPublisher::deserialize_message(
const rmw_serialized_message_t & serialized_message,
void * deserialized_msg)
{
auto return_code = rmw_deserialize(&serialized_message, &type_support_, deserialized_msg);
if (return_code != RMW_RET_OK) {
rclcpp::exceptions::throw_from_rcl_error(return_code, "failed to deserialize msg");
}
}

void GenericPublisher::publish_loaned_message(void * loaned_message)
{
auto return_code = rcl_publish_loaned_message(
fujitatomoya marked this conversation as resolved.
Show resolved Hide resolved
get_publisher_handle().get(), loaned_message, NULL);

if (return_code != RCL_RET_OK) {
rclcpp::exceptions::throw_from_rcl_error(return_code, "failed to publish loaned message");
}
}

} // namespace rclcpp
4 changes: 3 additions & 1 deletion rclcpp/src/rclcpp/publisher_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ PublisherBase::PublisherBase(
const rosidl_message_type_support_t & type_support,
const rcl_publisher_options_t & publisher_options)
: rcl_node_handle_(node_base->get_shared_rcl_node_handle()),
intra_process_is_enabled_(false), intra_process_publisher_id_(0)
intra_process_is_enabled_(false),
intra_process_publisher_id_(0),
type_support_(type_support)
{
auto custom_deleter = [node_handle = this->rcl_node_handle_](rcl_publisher_t * rcl_pub)
{
Expand Down
101 changes: 85 additions & 16 deletions rclcpp/test/rclcpp/test_generic_pubsub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,34 +61,37 @@ class RclcppGenericNodeFixture : public Test
publishers_.push_back(publisher);
}

std::vector<std::string> subscribe_raw_messages(
size_t expected_messages_number, const std::string & topic_name, const std::string & type)
template<typename T1, typename T2>
std::vector<T1> subscribe_raw_messages(
size_t expected_recv_msg_count, const std::string & topic_name, const std::string & type)
{
std::vector<std::string> messages;
std::vector<T1> messages;
size_t counter = 0;
auto subscription = node_->create_generic_subscription(
topic_name, type, rclcpp::QoS(1),
[&counter, &messages](std::shared_ptr<rclcpp::SerializedMessage> message) {
test_msgs::msg::Strings string_message;
rclcpp::Serialization<test_msgs::msg::Strings> serializer;
serializer.deserialize_message(message.get(), &string_message);
messages.push_back(string_message.string_value);
[&counter, &messages, this](std::shared_ptr<rclcpp::SerializedMessage> message) {
T2 deserialized_message;
rclcpp::Serialization<T2> serializer;
serializer.deserialize_message(message.get(), &deserialized_message);
messages.push_back(this->get_data_from_msg(deserialized_message));
counter++;
});

while (counter < expected_messages_number) {
while (counter < expected_recv_msg_count) {
rclcpp::spin_some(node_);
}
return messages;
}

rclcpp::SerializedMessage serialize_string_message(const std::string & message)
template<typename T1, typename T2>
rclcpp::SerializedMessage serialize_message(const T1 & data)
{
test_msgs::msg::Strings string_message;
string_message.string_value = message;
rclcpp::Serialization<test_msgs::msg::Strings> ser;
T2 message;
write_message(data, message);

rclcpp::Serialization<T2> ser;
SerializedMessage result;
ser.serialize_message(&string_message, &result);
ser.serialize_message(&message, &result);
return result;
}

Expand All @@ -115,6 +118,27 @@ class RclcppGenericNodeFixture : public Test
std::shared_ptr<rclcpp::Node> node_;
rclcpp::Node::SharedPtr publisher_node_;
std::vector<std::shared_ptr<rclcpp::PublisherBase>> publishers_;

private:
void write_message(const std::string & data, test_msgs::msg::Strings & message)
{
message.string_value = data;
}

void write_message(const int64_t & data, test_msgs::msg::BasicTypes & message)
{
message.int64_value = data;
}

std::string get_data_from_msg(const test_msgs::msg::Strings & message)
{
return message.string_value;
}

int64_t get_data_from_msg(const test_msgs::msg::BasicTypes & message)
{
return message.int64_value;
}
};


Expand All @@ -130,7 +154,7 @@ TEST_F(RclcppGenericNodeFixture, publisher_and_subscriber_work)

auto subscriber_future_ = std::async(
std::launch::async, [this, topic_name, type] {
return subscribe_raw_messages(1, topic_name, type);
return subscribe_raw_messages<std::string, test_msgs::msg::Strings>(1, topic_name, type);
});

// TODO(karsten1987): Port 'wait_for_sub' to rclcpp
Expand All @@ -147,14 +171,59 @@ TEST_F(RclcppGenericNodeFixture, publisher_and_subscriber_work)
ASSERT_TRUE(success);

for (const auto & message : test_messages) {
publisher->publish(serialize_string_message(message));
publisher->publish(serialize_message<std::string, test_msgs::msg::Strings>(message));
}

auto subscribed_messages = subscriber_future_.get();
EXPECT_THAT(subscribed_messages, SizeIs(Not(0)));
EXPECT_THAT(subscribed_messages[0], StrEq("Hello World"));
}

TEST_F(RclcppGenericNodeFixture, publish_loaned_msg_work)
{
// We currently publish more messages because they can get lost
std::vector<int64_t> test_messages = {100, 100};
std::string topic_name = "/int64_topic";
std::string type = "test_msgs/msg/BasicTypes";

auto publisher = node_->create_generic_publisher(topic_name, type, rclcpp::QoS(1));

if (publisher->can_loan_messages()) {
auto subscriber_future_ = std::async(
std::launch::deferred, [this, topic_name, type] {
return subscribe_raw_messages<int64_t, test_msgs::msg::BasicTypes>(
1, topic_name, type);
});

auto allocator = node_->get_node_options().allocator();
auto success = false;
auto ret = rcl_wait_for_subscribers(
node_->get_node_base_interface()->get_rcl_node_handle(),
&allocator,
topic_name.c_str(),
1u,
static_cast<rcutils_duration_value_t>(2e9),
&success);
ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
ASSERT_TRUE(success);

for (const auto & message : test_messages) {
publisher->publish_as_loaned_msg(
serialize_message<int64_t, test_msgs::msg::BasicTypes>(message));
}

auto subscribed_messages = subscriber_future_.get();
EXPECT_THAT(subscribed_messages, SizeIs(Not(0)));
EXPECT_EQ(subscribed_messages[0], test_messages[0]);
} else {
ASSERT_THROW(
{
publisher->publish_as_loaned_msg(
serialize_message<int64_t, test_msgs::msg::BasicTypes>(test_messages[0]));
}, rclcpp::exceptions::RCLError);
}
}

TEST_F(RclcppGenericNodeFixture, generic_subscription_uses_qos)
{
// If the GenericSubscription does not use the provided QoS profile,
Expand Down