diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 1389ad8aeb..3c271b20c6 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -245,6 +245,18 @@ if(BUILD_TESTING) "test_msgs" ) target_link_libraries(test_loaned_message ${PROJECT_NAME}) + ament_add_gtest(test_intra_process_communication test/test_intra_process_communication.cpp + TIMEOUT 120) + if(TARGET test_intra_process_communication) + ament_target_dependencies(test_intra_process_communication + "rcl" + "rcl_interfaces" + "rmw" + "rosidl_typesupport_cpp" + "test_msgs" + ) + endif() + target_link_libraries(test_intra_process_communication ${PROJECT_NAME}) ament_add_gtest(test_node test/test_node.cpp TIMEOUT 240) if(TARGET test_node) diff --git a/rclcpp/include/rclcpp/create_publisher.hpp b/rclcpp/include/rclcpp/create_publisher.hpp index 811c18b69f..c4ac38ece0 100644 --- a/rclcpp/include/rclcpp/create_publisher.hpp +++ b/rclcpp/include/rclcpp/create_publisher.hpp @@ -40,9 +40,10 @@ template< typename PublisherT = rclcpp::Publisher, typename NodeT> std::shared_ptr -create_publisher( +create_generic_publisher( NodeT & node, const std::string & topic_name, + const rosidl_message_type_support_t & type_support, const rclcpp::QoS & qos, const rclcpp::PublisherOptionsWithAllocator & options = ( rclcpp::PublisherOptionsWithAllocator() @@ -56,7 +57,7 @@ create_publisher( // Create the publisher. auto pub = node_topics->create_publisher( topic_name, - rclcpp::create_publisher_factory(options), + rclcpp::create_publisher_factory(options, type_support), qos ); @@ -66,6 +67,28 @@ create_publisher( return std::dynamic_pointer_cast(pub); } +template< + typename MessageT, + typename AllocatorT = std::allocator, + typename PublisherT = rclcpp::Publisher, + typename NodeT> +std::shared_ptr +create_publisher( + NodeT & node, + const std::string & topic_name, + const rclcpp::QoS & qos, + const rclcpp::PublisherOptionsWithAllocator & options = ( + rclcpp::PublisherOptionsWithAllocator() + ) +) +{ + const auto type_support = *rosidl_typesupport_cpp::get_message_type_support_handle(); + + return create_generic_publisher( + node, topic_name, type_support, + qos, options); +} + } // namespace rclcpp #endif // RCLCPP__CREATE_PUBLISHER_HPP_ diff --git a/rclcpp/include/rclcpp/create_subscription.hpp b/rclcpp/include/rclcpp/create_subscription.hpp index 9248254047..7ccc77d9e4 100644 --- a/rclcpp/include/rclcpp/create_subscription.hpp +++ b/rclcpp/include/rclcpp/create_subscription.hpp @@ -48,9 +48,10 @@ template< >, typename NodeT> typename std::shared_ptr -create_subscription( +create_generic_subscription( NodeT && node, const std::string & topic_name, + const rosidl_message_type_support_t & type_support, const rclcpp::QoS & qos, CallbackT && callback, const rclcpp::SubscriptionOptionsWithAllocator & options = ( @@ -67,7 +68,8 @@ create_subscription( auto factory = rclcpp::create_subscription_factory( std::forward(callback), options, - msg_mem_strat + msg_mem_strat, + type_support ); auto sub = node_topics->create_subscription(topic_name, factory, qos); @@ -76,6 +78,42 @@ create_subscription( return std::dynamic_pointer_cast(sub); } +template< + typename MessageT, + typename CallbackT, + typename AllocatorT = std::allocator, + typename CallbackMessageT = + typename rclcpp::subscription_traits::has_message_type::type, + typename SubscriptionT = rclcpp::Subscription, + typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy< + CallbackMessageT, + AllocatorT + >, + typename NodeT> +typename std::shared_ptr +create_subscription( + NodeT && node, + const std::string & topic_name, + const rclcpp::QoS & qos, + CallbackT && callback, + const rclcpp::SubscriptionOptionsWithAllocator & options = ( + rclcpp::SubscriptionOptionsWithAllocator() + ), + typename MessageMemoryStrategyT::SharedPtr msg_mem_strat = ( + MessageMemoryStrategyT::create_default() + ) +) +{ + const auto type_support = *rosidl_typesupport_cpp::get_message_type_support_handle(); + + return + create_generic_subscription< + MessageT, CallbackT, AllocatorT, CallbackMessageT, SubscriptionT, MessageMemoryStrategyT + >( + std::forward(node), topic_name, type_support, qos, std::forward( + callback), options, msg_mem_strat); +} + } // namespace rclcpp #endif // RCLCPP__CREATE_SUBSCRIPTION_HPP_ diff --git a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp index d671de701e..04508af46e 100644 --- a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp @@ -110,11 +110,14 @@ class IntraProcessManager * In addition this generates a unique intra process id for the subscription. * * \param subscription the SubscriptionIntraProcess to register. + * \param is_serialized true if the buffer expects serialized messages * \return an unsigned 64-bit integer which is the subscription's unique id. */ RCLCPP_PUBLIC uint64_t - add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription); + add_subscription( + rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription, + const bool is_serialized = false); /// Unregister a subscription using the subscription's unique id. /** @@ -134,11 +137,12 @@ class IntraProcessManager * In addition this generates a unique intra process id for the publisher. * * \param publisher publisher to be registered with the manager. + * \param is_serialized true if the buffer expects serialized messages * \return an unsigned 64-bit integer which is the publisher's unique id. */ RCLCPP_PUBLIC uint64_t - add_publisher(rclcpp::PublisherBase::SharedPtr publisher); + add_publisher(rclcpp::PublisherBase::SharedPtr publisher, const bool is_serialized = false); /// Unregister a publisher using the publisher's unique id. /** @@ -311,6 +315,7 @@ class IntraProcessManager rmw_qos_profile_t qos; const char * topic_name; bool use_take_shared_method; + bool is_serialized; }; struct PublisherInfo @@ -320,6 +325,7 @@ class IntraProcessManager rclcpp::PublisherBase::WeakPtr publisher; rmw_qos_profile_t qos; const char * topic_name; + bool is_serialized; }; struct SplittedSubscriptions diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index 618db3cac1..0172b0d036 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -28,6 +28,8 @@ #include "rclcpp/experimental/buffers/intra_process_buffer.hpp" #include "rclcpp/experimental/create_intra_process_buffer.hpp" #include "rclcpp/experimental/subscription_intra_process_base.hpp" +#include "rclcpp/serialization.hpp" +#include "rclcpp/serialized_message.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/waitable.hpp" #include "tracetools/tracetools.h" @@ -37,6 +39,8 @@ namespace rclcpp namespace experimental { +class SerializedMessage; + template< typename MessageT, typename Alloc = std::allocator, @@ -51,6 +55,10 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase using MessageAlloc = typename MessageAllocTraits::allocator_type; using ConstMessageSharedPtr = std::shared_ptr; using MessageUniquePtr = std::unique_ptr; + using CallbackMessageAllocTraits = allocator::AllocRebind; + using CallbackMessageAlloc = typename CallbackMessageAllocTraits::allocator_type; + using CallbackMessageUniquePtr = std::unique_ptr; + using CallbackMessageSharedPtr = std::shared_ptr; using BufferUniquePtr = typename rclcpp::experimental::buffers::IntraProcessBuffer< MessageT, @@ -64,11 +72,15 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase rclcpp::Context::SharedPtr context, const std::string & topic_name, rmw_qos_profile_t qos_profile, - rclcpp::IntraProcessBufferType buffer_type) + rclcpp::IntraProcessBufferType buffer_type, + std::shared_ptr serializer) : SubscriptionIntraProcessBase(topic_name, qos_profile), - any_callback_(callback) + any_callback_(callback), serializer_(serializer) { - if (!std::is_same::value) { + if (!std::is_same::value && + !std::is_same::value && + !std::is_same::value) + { throw std::runtime_error("SubscriptionIntraProcess wrong callback type"); } @@ -142,18 +154,47 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase (void)ret; } + // convert from ROS2 message to rcl_serialized_message_t (serilizatino needed) template - typename std::enable_if::value, void>::type + typename std::enable_if< + std::is_same::value && + !std::is_same::value, + void>::type execute_impl() { - throw std::runtime_error("Subscription intra-process can't handle serialized messages"); + if (nullptr == serializer_) { + throw std::runtime_error("Subscription intra-process can't handle serialized messages"); + } + + rmw_message_info_t msg_info = {}; + msg_info.from_intra_process = true; + + ConstMessageSharedPtr msg = buffer_->consume_shared(); + auto serialized_msg = + serializer_->serialize_message(reinterpret_cast(msg.get())); + + if (nullptr == serialized_msg) { + throw std::runtime_error("Subscription intra-process could not serialize message"); + } + + if (any_callback_.use_take_shared_method()) { + any_callback_.dispatch_intra_process(serialized_msg, msg_info); + } else { + throw std::runtime_error( + "Subscription intra-process for serialized " + "messages does not support unique pointers."); + } } + // forward from ROS2 message to ROS2 message (same type) template - typename std::enable_if::value, void>::type + typename std::enable_if< + !std::is_same::value && + !std::is_same::value, + void>::type execute_impl() { - rmw_message_info_t msg_info; + rmw_message_info_t msg_info = {}; msg_info.publisher_gid = {0, {0}}; msg_info.from_intra_process = true; @@ -166,8 +207,80 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase } } + // forward from rcl_serialized_message_t to SerializationMessage (no conversion needed) + template + typename std::enable_if< + std::is_same::value && + std::is_same::value, + void>::type + execute_impl() + { + rmw_message_info_t msg_info = {}; + msg_info.from_intra_process = true; + + if (any_callback_.use_take_shared_method()) { + ConstMessageSharedPtr msg = buffer_->consume_shared(); + if (msg == nullptr) { + throw std::runtime_error("Subscription intra-process could not get serialized message"); + } + any_callback_.dispatch_intra_process(msg, msg_info); + } else { + throw std::runtime_error( + "Subscription intra-process for serialized " + "messages does not support unique pointers."); + } + } + + // convert from rcl_serialized_message_t to ROS2 message (deserialization needed) + template + typename std::enable_if< + !std::is_same::value && + std::is_same::value, + void>::type + execute_impl() + { + if (nullptr == serializer_) { + throw std::runtime_error("Subscription intra-process can't handle unserialized messages"); + } + + ConstMessageSharedPtr serialized_container = buffer_->consume_shared(); + if (nullptr == serialized_container) { + throw std::runtime_error("Subscription intra-process could not get serialized message"); + } + + rmw_message_info_t msg_info = {}; + msg_info.from_intra_process = true; + + if (any_callback_.use_take_shared_method()) { + CallbackMessageSharedPtr msg = construct_unique(); + serializer_->deserialize_message( + serialized_container.get(), + reinterpret_cast(msg.get())); + any_callback_.dispatch_intra_process(msg, msg_info); + } else { + CallbackMessageUniquePtr msg = construct_unique(); + serializer_->deserialize_message( + serialized_container.get(), + reinterpret_cast(msg.get())); + any_callback_.dispatch_intra_process(std::move(msg), msg_info); + } + } + + CallbackMessageUniquePtr construct_unique() + { + CallbackMessageUniquePtr unique_msg; + auto ptr = CallbackMessageAllocTraits::allocate(*message_allocator_.get(), 1); + CallbackMessageAllocTraits::construct(*message_allocator_.get(), ptr); + unique_msg = CallbackMessageUniquePtr(ptr); + + return unique_msg; + } + AnySubscriptionCallback any_callback_; BufferUniquePtr buffer_; + std::shared_ptr serializer_; + std::shared_ptr message_allocator_ = + std::make_shared(); }; } // namespace experimental diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index aa614e3a0c..fd53e727d1 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -56,6 +56,12 @@ class Publisher : public PublisherBase using MessageDeleter = allocator::Deleter; using MessageUniquePtr = std::unique_ptr; using MessageSharedPtr = std::shared_ptr; + using SerializedMessageAllocatorTraits = + allocator::AllocRebind; + using SerializedMessageAllocator = typename SerializedMessageAllocatorTraits::allocator_type; + using SerializedMessageDeleter = allocator::Deleter; RCLCPP_SMART_PTR_DEFINITIONS(Publisher) @@ -70,37 +76,28 @@ class Publisher : public PublisherBase *rosidl_typesupport_cpp::get_message_type_support_handle(), options.template to_rcl_publisher_options(qos)), options_(options), - message_allocator_(new MessageAllocator(*options.get_allocator().get())) + message_allocator_(new MessageAllocator(*options.get_allocator().get())), + message_allocator_serialized_(new SerializedMessageAllocator(*options.get_allocator().get())) { - allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_.get()); + init_setup(); + } - if (options_.event_callbacks.deadline_callback) { - this->add_event_handler( - options_.event_callbacks.deadline_callback, - RCL_PUBLISHER_OFFERED_DEADLINE_MISSED); - } - if (options_.event_callbacks.liveliness_callback) { - this->add_event_handler( - options_.event_callbacks.liveliness_callback, - RCL_PUBLISHER_LIVELINESS_LOST); - } - if (options_.event_callbacks.incompatible_qos_callback) { - this->add_event_handler( - options_.event_callbacks.incompatible_qos_callback, - RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS); - } else if (options_.use_default_callbacks) { - // Register default callback when not specified - try { - this->add_event_handler( - [this](QOSOfferedIncompatibleQoSInfo & info) { - this->default_incompatible_qos_callback(info); - }, - RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS); - } catch (UnsupportedEventTypeException & /*exc*/) { - // pass - } - } - // Setup continues in the post construction method, post_init_setup(). + Publisher( + rclcpp::node_interfaces::NodeBaseInterface * node_base, + const std::string & topic, + const rclcpp::QoS & qos, + const rclcpp::PublisherOptionsWithAllocator & options, + const rosidl_message_type_support_t & type_support) + : PublisherBase( + node_base, + topic, + type_support, + options.template to_rcl_publisher_options(qos)), + options_(options), + message_allocator_(new MessageAllocator(*options.get_allocator().get())), + message_allocator_serialized_(new SerializedMessageAllocator(*options.get_allocator().get())) + { + init_setup(); } /// Called post construction, so that construction may continue after shared_from_this() works. @@ -135,8 +132,11 @@ class Publisher : public PublisherBase "intraprocess communication allowed only with volatile durability"); } uint64_t intra_process_publisher_id = ipm->add_publisher(this->shared_from_this()); + uint64_t intra_process_publisher_id_serialized = ipm->add_publisher( + this->shared_from_this(), true); this->setup_intra_process( intra_process_publisher_id, + intra_process_publisher_id_serialized, ipm); } } @@ -173,6 +173,11 @@ class Publisher : public PublisherBase virtual void publish(std::unique_ptr msg) { + if (std::is_same::value) { + this->template publish(std::move(msg)); + return; + } + if (!intra_process_is_enabled_) { this->do_inter_process_publish(*msg); return; @@ -187,34 +192,41 @@ class Publisher : public PublisherBase get_subscription_count() > get_intra_process_subscription_count(); if (inter_process_publish_needed) { - auto shared_msg = this->do_intra_process_publish_and_return_shared(std::move(msg)); + auto shared_msg = this->do_intra_process_publish_and_return_shared( + std::move(msg), message_allocator_); this->do_inter_process_publish(*shared_msg); } else { - this->do_intra_process_publish(std::move(msg)); + this->do_intra_process_publish(std::move(msg), message_allocator_); } } virtual void publish(const MessageT & msg) { - // Avoid allocating when not using intra process. - if (!intra_process_is_enabled_) { - // In this case we're not using intra process. - return this->do_inter_process_publish(msg); - } - // Otherwise we have to allocate memory in a unique_ptr and pass it along. - // As the message is not const, a copy should be made. - // A shared_ptr could also be constructed here. - auto ptr = MessageAllocatorTraits::allocate(*message_allocator_.get(), 1); - MessageAllocatorTraits::construct(*message_allocator_.get(), ptr, msg); - MessageUniquePtr unique_msg(ptr, message_deleter_); - this->publish(std::move(unique_msg)); + this->do_publish_message(msg); } - void + template + typename std::enable_if::value>::type publish(const rcl_serialized_message_t & serialized_msg) { - return this->do_serialized_publish(&serialized_msg); + this->do_publish_message(serialized_msg); + } + + /// Publish a serialized message. Non specialized version to prevent compiling errors. + template + void publish(std::unique_ptr serialized_msg) + { + (void)serialized_msg; + throw std::runtime_error( + "publishing unique_ptr with custom deleter only supported for serialized messages"); + } + + /// Publish a serialized message. + template + void publish(std::unique_ptr serialized_msg) + { + this->do_serialized_publish(*serialized_msg); } /// Publish an instance of a LoanedMessage. @@ -259,6 +271,69 @@ class Publisher : public PublisherBase } protected: + void init_setup() + { + allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_.get()); + + if (options_.event_callbacks.deadline_callback) { + this->add_event_handler( + options_.event_callbacks.deadline_callback, + RCL_PUBLISHER_OFFERED_DEADLINE_MISSED); + } + if (options_.event_callbacks.liveliness_callback) { + this->add_event_handler( + options_.event_callbacks.liveliness_callback, + RCL_PUBLISHER_LIVELINESS_LOST); + } + if (options_.event_callbacks.incompatible_qos_callback) { + this->add_event_handler( + options_.event_callbacks.incompatible_qos_callback, + RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS); + } else if (options_.use_default_callbacks) { + // Register default callback when not specified + try { + this->add_event_handler( + [this](QOSOfferedIncompatibleQoSInfo & info) { + this->default_incompatible_qos_callback(info); + }, + RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS); + } catch (UnsupportedEventTypeException & /*exc*/) { + RCLCPP_WARN_ONCE( + rclcpp::get_logger(rcl_node_get_logger_name(rcl_node_handle_.get())), + "This rmw implementation does not support ON_OFFERED_INCOMPATIBLE_QOS " + "events, you will not be notified when Publishers offer an incompatible " + "QoS profile to Subscriptions on the same topic."); + } + } + // Setup continues in the post construction method, post_init_setup(). + } + + template + typename std::enable_if::value>::type + do_publish_message(const T & msg) + { + // Avoid allocating when not using intra process. + if (!intra_process_is_enabled_) { + // In this case we're not using intra process. + return this->do_inter_process_publish(msg); + } + // Otherwise we have to allocate memory in a unique_ptr and pass it along. + // As the message is not const, a copy should be made. + // A shared_ptr could also be constructed here. + auto ptr = MessageAllocatorTraits::allocate(*message_allocator_.get(), 1); + MessageAllocatorTraits::construct(*message_allocator_.get(), ptr, msg); + MessageUniquePtr unique_msg(ptr, message_deleter_); + this->publish(std::move(unique_msg)); + } + + template + typename std::enable_if::value>::type + do_publish_message(const T & msg) + { + // Kept for backwards compatibility. Copies compelete memory! + this->publish(std::make_unique(msg)); + } + void do_inter_process_publish(const MessageT & msg) { @@ -280,15 +355,24 @@ class Publisher : public PublisherBase } void - do_serialized_publish(const rcl_serialized_message_t * serialized_msg) + do_serialized_publish(rcl_serialized_message_t serialized_msg) { - if (intra_process_is_enabled_) { - // TODO(Karsten1987): support serialized message passed by intraprocess - throw std::runtime_error("storing serialized messages in intra process is not supported yet"); + bool inter_process_publish_needed = + get_subscription_count() > get_intra_process_subscription_count(); + + if (inter_process_publish_needed) { + // declare here to avoid deletion before returning method + auto status = rcl_publish_serialized_message(&publisher_handle_, &serialized_msg, nullptr); + if (RCL_RET_OK != status) { + rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish serialized message"); + } } - auto status = rcl_publish_serialized_message(&publisher_handle_, serialized_msg, nullptr); - if (RCL_RET_OK != status) { - rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish serialized message"); + + auto msg = std::make_unique( + std::move(serialized_msg)); + + if (intra_process_is_enabled_) { + do_intra_process_publish(std::move(msg), message_allocator_serialized_); } } @@ -312,8 +396,11 @@ class Publisher : public PublisherBase } } + template void - do_intra_process_publish(std::unique_ptr msg) + do_intra_process_publish( + std::unique_ptr msg, + std::shared_ptr & message_allocator) { auto ipm = weak_ipm_.lock(); if (!ipm) { @@ -324,14 +411,21 @@ class Publisher : public PublisherBase throw std::runtime_error("cannot publish msg which is a null pointer"); } - ipm->template do_intra_process_publish( - intra_process_publisher_id_, + const uint64_t intra_process_publisher_id = std::is_same::value ? + intra_process_publisher_id_serialized_ : intra_process_publisher_id_; + + ipm->template do_intra_process_publish( + intra_process_publisher_id, std::move(msg), - message_allocator_); + message_allocator); } - std::shared_ptr - do_intra_process_publish_and_return_shared(std::unique_ptr msg) + template + std::shared_ptr + do_intra_process_publish_and_return_shared( + std::unique_ptr msg, + std::shared_ptr & message_allocator) { auto ipm = weak_ipm_.lock(); if (!ipm) { @@ -342,10 +436,14 @@ class Publisher : public PublisherBase throw std::runtime_error("cannot publish msg which is a null pointer"); } - return ipm->template do_intra_process_publish_and_return_shared( - intra_process_publisher_id_, + const uint64_t intra_process_publisher_id = std::is_same::value ? + intra_process_publisher_id_serialized_ : intra_process_publisher_id_; + + return ipm->template do_intra_process_publish_and_return_shared( + intra_process_publisher_id, std::move(msg), - message_allocator_); + message_allocator); } /// Copy of original options passed during construction. @@ -356,6 +454,7 @@ class Publisher : public PublisherBase const rclcpp::PublisherOptionsWithAllocator options_; std::shared_ptr message_allocator_; + std::shared_ptr message_allocator_serialized_; MessageDeleter message_deleter_; }; diff --git a/rclcpp/include/rclcpp/publisher_base.hpp b/rclcpp/include/rclcpp/publisher_base.hpp index 9627acaa4a..4f678ec0bb 100644 --- a/rclcpp/include/rclcpp/publisher_base.hpp +++ b/rclcpp/include/rclcpp/publisher_base.hpp @@ -191,6 +191,7 @@ class PublisherBase : public std::enable_shared_from_this void setup_intra_process( uint64_t intra_process_publisher_id, + uint64_t intra_process_publisher_id_serialized, IntraProcessManagerSharedPtr ipm); protected: @@ -222,6 +223,7 @@ class PublisherBase : public std::enable_shared_from_this bool intra_process_is_enabled_; IntraProcessManagerWeakPtr weak_ipm_; uint64_t intra_process_publisher_id_; + uint64_t intra_process_publisher_id_serialized_; rmw_gid_t rmw_gid_; }; diff --git a/rclcpp/include/rclcpp/publisher_factory.hpp b/rclcpp/include/rclcpp/publisher_factory.hpp index 87def3cc17..f287d8e49d 100644 --- a/rclcpp/include/rclcpp/publisher_factory.hpp +++ b/rclcpp/include/rclcpp/publisher_factory.hpp @@ -63,17 +63,21 @@ struct PublisherFactory /// Return a PublisherFactory with functions setup for creating a PublisherT. template PublisherFactory -create_publisher_factory(const rclcpp::PublisherOptionsWithAllocator & options) +create_publisher_factory( + const rclcpp::PublisherOptionsWithAllocator & options, + const rosidl_message_type_support_t & type_support) { PublisherFactory factory { // factory function that creates a MessageT specific PublisherT - [options]( + [options, type_support]( rclcpp::node_interfaces::NodeBaseInterface * node_base, const std::string & topic_name, const rclcpp::QoS & qos ) -> std::shared_ptr { - auto publisher = std::make_shared(node_base, topic_name, qos, options); + auto publisher = std::make_shared( + node_base, topic_name, qos, options, + type_support); // This is used for setting up things like intra process comms which // require this->shared_from_this() which cannot be called from // the constructor. diff --git a/rclcpp/include/rclcpp/serialization.hpp b/rclcpp/include/rclcpp/serialization.hpp new file mode 100644 index 0000000000..ddc122cb43 --- /dev/null +++ b/rclcpp/include/rclcpp/serialization.hpp @@ -0,0 +1,123 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__SERIALIZATION_HPP_ +#define RCLCPP__SERIALIZATION_HPP_ + +#include + +#include +#include + +#include "rcl/error_handling.h" + +namespace rclcpp +{ + +/// Interface to (de)serialize a message +class SerializationBase +{ +public: + virtual ~SerializationBase() = default; + + /// Serialize a ROS2 message to a serialized stream + /** + * \param[in] message The ROS2 message which is read and serialized by rmw. + */ + virtual std::shared_ptr serialize_message(const void * message) = 0; + + /// Deserialize a serialized stream to a ROS message + /** + * \param[in] serialized_message The serialized message to be converted to ROS2 by rmw. + * \param[out] message The deserialized ROS2 message. + */ + virtual void deserialize_message( + const rcl_serialized_message_t * serialized_message, + void * msg) = 0; +}; + +/// Default implementation to (de)serialize a message by using rmw_(de)serialize +class Serialization : public SerializationBase +{ +public: + Serialization( + const rosidl_message_type_support_t & type_support, + const rcutils_allocator_t allocator = rcutils_get_default_allocator()) + : type_support_(type_support), rcutils_allocator_(allocator) + {} + + std::shared_ptr serialize_message(const void * message) override + { + auto serialized_message = new rcl_serialized_message_t; + *serialized_message = rmw_get_zero_initialized_serialized_message(); + const auto ret = rmw_serialized_message_init(serialized_message, 0, &rcutils_allocator_); + if (ret != RCUTILS_RET_OK) { + throw std::runtime_error( + "Error allocating resources for serialized message: " + + std::string(rcutils_get_error_string().str)); + } + + if (nullptr == message) { + delete serialized_message; + throw std::runtime_error("Message is nullpointer while serialization."); + } + + const auto error = rmw_serialize( + message, + &type_support_, + serialized_message); + if (error != RCL_RET_OK) { + delete serialized_message; + throw std::runtime_error("Failed to serialize."); + } + + auto shared_serialized_msg = std::shared_ptr( + serialized_message, + [](rcl_serialized_message_t * msg) { + auto fini_ret = rmw_serialized_message_fini(msg); + delete msg; + if (fini_ret != RCL_RET_OK) { + RCUTILS_LOG_ERROR_NAMED( + "rclcpp", + "Failed to destroy serialized message: %s", rcl_get_error_string().str); + } + }); + + return shared_serialized_msg; + } + + void deserialize_message(const rcl_serialized_message_t * serialized_message, void * msg) override + { + if (nullptr == serialized_message || + serialized_message->buffer_capacity == 0 || + serialized_message->buffer_length == 0 || + !serialized_message->buffer) + { + throw std::runtime_error("Failed to deserialize nullptr serialized message."); + } + + const auto ret = rmw_deserialize(serialized_message, &type_support_, msg); + if (ret != RMW_RET_OK) { + throw std::runtime_error("Failed to deserialize serialized message."); + } + } + +private: + rosidl_message_type_support_t type_support_; + rcutils_allocator_t rcutils_allocator_; +}; + +} // namespace rclcpp + +#endif // RCLCPP__SERIALIZATION_HPP_ diff --git a/rclcpp/include/rclcpp/serialized_message.hpp b/rclcpp/include/rclcpp/serialized_message.hpp new file mode 100644 index 0000000000..8e4dbf837f --- /dev/null +++ b/rclcpp/include/rclcpp/serialized_message.hpp @@ -0,0 +1,80 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__SERIALIZED_MESSAGE_HPP_ +#define RCLCPP__SERIALIZED_MESSAGE_HPP_ + +#include + +#include + +#include "rcutils/logging_macros.h" + +#include "rmw/serialized_message.h" + +namespace rclcpp +{ + +/// Object oriented version of rcl_serialized_message_t with destructor to avoid memory leaks +class SerializedMessage : public rcl_serialized_message_t +{ +public: + SerializedMessage() + : rcl_serialized_message_t(rmw_get_zero_initialized_serialized_message()) + {} + + explicit SerializedMessage(const SerializedMessage & serialized_message) + : SerializedMessage(static_cast(serialized_message)) + {} + + explicit SerializedMessage(const rcl_serialized_message_t & serialized_message) + : rcl_serialized_message_t(rmw_get_zero_initialized_serialized_message()) + { + const auto ret = rmw_serialized_message_init( + this, serialized_message.buffer_length, + &serialized_message.allocator); + if (ret != RCL_RET_OK) { + rclcpp::exceptions::throw_from_rcl_error(ret); + } + + // do not call memcpy if the pointer is "static" + if (buffer != serialized_message.buffer) { + std::memcpy(buffer, serialized_message.buffer, serialized_message.buffer_length); + } + buffer_length = serialized_message.buffer_length; + } + + explicit SerializedMessage(rcl_serialized_message_t && msg) + : rcl_serialized_message_t(msg) + { + // reset buffer to prevent double free + msg = rmw_get_zero_initialized_serialized_message(); + } + + ~SerializedMessage() + { + if (nullptr != buffer) { + const auto fini_ret = rmw_serialized_message_fini(this); + if (fini_ret != RCL_RET_OK) { + RCUTILS_LOG_ERROR_NAMED( + "rclcpp", + "Failed to destroy serialized message: %s", rcl_get_error_string().str); + } + } + } +}; + +} // namespace rclcpp + +#endif // RCLCPP__SERIALIZED_MESSAGE_HPP_ diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 2b5172dc1b..9568027e9e 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -41,6 +41,7 @@ #include "rclcpp/message_info.hpp" #include "rclcpp/message_memory_strategy.hpp" #include "rclcpp/node_interfaces/node_base_interface.hpp" +#include "rclcpp/serialized_message.hpp" #include "rclcpp/subscription_base.hpp" #include "rclcpp/subscription_options.hpp" #include "rclcpp/subscription_traits.hpp" @@ -155,29 +156,84 @@ class Subscription : public SubscriptionBase "intraprocess communication allowed only with volatile durability"); } - // First create a SubscriptionIntraProcess which will be given to the intra-process manager. - auto context = node_base->get_context(); - using SubscriptionIntraProcessT = rclcpp::experimental::SubscriptionIntraProcess< - CallbackMessageT, - AllocatorT, - typename MessageUniquePtr::deleter_type>; - auto subscription_intra_process = std::make_shared( - callback, - options.get_allocator(), - context, - this->get_topic_name(), // important to get like this, as it has the fully-qualified name - qos_profile, - resolve_intra_process_buffer_type(options.intra_process_buffer_type, callback)); - TRACEPOINT( - rclcpp_subscription_init, - (const void *)get_subscription_handle().get(), - (const void *)subscription_intra_process.get()); - - // Add it to the intra process manager. using rclcpp::experimental::IntraProcessManager; + uint64_t intra_process_subscription_id; + uint64_t intra_process_subscription_id_serialized; + + auto context = node_base->get_context(); auto ipm = context->get_sub_context(); - uint64_t intra_process_subscription_id = ipm->add_subscription(subscription_intra_process); - this->setup_intra_process(intra_process_subscription_id, ipm); + + { + // First create a SubscriptionIntraProcess which will be given to the intra-process manager. + auto subscription_intra_process = std::make_shared< + rclcpp::experimental::SubscriptionIntraProcess< + CallbackMessageT, + AllocatorT, + typename MessageUniquePtr::deleter_type + >>( + callback, + options.get_allocator(), + context, + this->get_topic_name(), // important to get it by the fully-qualified name + qos.get_rmw_qos_profile(), + resolve_intra_process_buffer_type(options.intra_process_buffer_type, callback), + std::make_shared( + type_support_handle, + options.template to_rcl_subscription_options(qos).allocator) + ); + TRACEPOINT( + rclcpp_subscription_init, + (const void *)get_subscription_handle().get(), + (const void *)subscription_intra_process.get()); + + // Add it to the intra process manager. + intra_process_subscription_id = ipm->add_subscription(subscription_intra_process); + } + + { + using SerializedMessageAllocatorTraits = + allocator::AllocRebind; + using SerializedMessageAllocator = + typename SerializedMessageAllocatorTraits::allocator_type; + using SerializedMessageDeleter = allocator::Deleter; + using SerializedMessageUniquePtr = + std::unique_ptr; + + // First create a SubscriptionIntraProcess which will be given to the intra-process manager. + auto subscription_intra_process = std::make_shared< + rclcpp::experimental::SubscriptionIntraProcess< + rclcpp::SerializedMessage, + AllocatorT, + typename SerializedMessageUniquePtr::deleter_type, + CallbackMessageT + >>( + callback, + options.get_allocator(), + context, + this->get_topic_name(), // important to get it by the fully-qualified name + qos.get_rmw_qos_profile(), + resolve_intra_process_buffer_type(options.intra_process_buffer_type, callback), + std::make_shared( + type_support_handle, + options.template to_rcl_subscription_options(qos).allocator) + ); + TRACEPOINT( + rclcpp_subscription_init, + (const void *)get_subscription_handle().get(), + (const void *)subscription_intra_process.get()); + + // Add it to the intra process manager. + intra_process_subscription_id_serialized = ipm->add_subscription( + subscription_intra_process, + true); + } + + this->setup_intra_process( + {intra_process_subscription_id, + intra_process_subscription_id_serialized}, ipm); } TRACEPOINT( diff --git a/rclcpp/include/rclcpp/subscription_base.hpp b/rclcpp/include/rclcpp/subscription_base.hpp index 7ee8c624d1..707c0b419a 100644 --- a/rclcpp/include/rclcpp/subscription_base.hpp +++ b/rclcpp/include/rclcpp/subscription_base.hpp @@ -228,13 +228,13 @@ class SubscriptionBase : public std::enable_shared_from_this RCLCPP_PUBLIC void setup_intra_process( - uint64_t intra_process_subscription_id, + const std::vector & intra_process_subscription_ids, IntraProcessManagerWeakPtr weak_ipm); /// Return the waitable for intra-process, or nullptr if intra-process is not setup. RCLCPP_PUBLIC - rclcpp::Waitable::SharedPtr - get_intra_process_waitable() const; + std::vector + get_intra_process_waitables() const; /// Exchange state of whether or not a part of the subscription is used by a wait set. /** @@ -286,7 +286,7 @@ class SubscriptionBase : public std::enable_shared_from_this bool use_intra_process_; IntraProcessManagerWeakPtr weak_ipm_; - uint64_t intra_process_subscription_id_; + std::vector intra_process_subscription_ids_; private: RCLCPP_DISABLE_COPY(SubscriptionBase) diff --git a/rclcpp/include/rclcpp/subscription_factory.hpp b/rclcpp/include/rclcpp/subscription_factory.hpp index a0f265c803..67d2272a99 100644 --- a/rclcpp/include/rclcpp/subscription_factory.hpp +++ b/rclcpp/include/rclcpp/subscription_factory.hpp @@ -78,7 +78,8 @@ SubscriptionFactory create_subscription_factory( CallbackT && callback, const rclcpp::SubscriptionOptionsWithAllocator & options, - typename MessageMemoryStrategyT::SharedPtr msg_mem_strat) + typename MessageMemoryStrategyT::SharedPtr msg_mem_strat, + const rosidl_message_type_support_t & type_support) { auto allocator = options.get_allocator(); @@ -88,7 +89,7 @@ create_subscription_factory( SubscriptionFactory factory { // factory function that creates a MessageT specific SubscriptionT - [options, msg_mem_strat, any_subscription_callback]( + [options, msg_mem_strat, any_subscription_callback, type_support]( rclcpp::node_interfaces::NodeBaseInterface * node_base, const std::string & topic_name, const rclcpp::QoS & qos @@ -99,7 +100,7 @@ create_subscription_factory( auto sub = Subscription::make_shared( node_base, - *rosidl_typesupport_cpp::get_message_type_support_handle(), + type_support, topic_name, qos, any_subscription_callback, diff --git a/rclcpp/include/rclcpp/wait_set_template.hpp b/rclcpp/include/rclcpp/wait_set_template.hpp index 899d461ed3..96e67f0a2b 100644 --- a/rclcpp/include/rclcpp/wait_set_template.hpp +++ b/rclcpp/include/rclcpp/wait_set_template.hpp @@ -165,18 +165,19 @@ class WaitSetTemplate final : private SynchronizationPolicy, private StoragePoli } if (mask.include_intra_process_waitable) { auto local_subscription = inner_subscription; - auto waitable = inner_subscription->get_intra_process_waitable(); - if (nullptr != waitable) { - bool already_in_use = local_subscription->exchange_in_use_by_wait_set_state( - waitable.get(), - true); - if (already_in_use) { - throw std::runtime_error( - "subscription intra-process waitable already associated with a wait set"); + for (auto waitable : inner_subscription->get_intra_process_waitables()) { + if (nullptr != waitable) { + bool already_in_use = local_subscription->exchange_in_use_by_wait_set_state( + waitable.get(), + true); + if (already_in_use) { + throw std::runtime_error( + "subscription intra-process waitable already associated with a wait set"); + } + this->storage_add_waitable( + std::move(waitable), + std::move(local_subscription)); } - this->storage_add_waitable( - std::move(inner_subscription->get_intra_process_waitable()), - std::move(local_subscription)); } } }); @@ -230,11 +231,12 @@ class WaitSetTemplate final : private SynchronizationPolicy, private StoragePoli } } if (mask.include_intra_process_waitable) { - auto local_waitable = inner_subscription->get_intra_process_waitable(); - inner_subscription->exchange_in_use_by_wait_set_state(local_waitable.get(), false); - if (nullptr != local_waitable) { - // This is the case when intra process is disabled for the subscription. - this->storage_remove_waitable(std::move(local_waitable)); + for (auto local_waitable : inner_subscription->get_intra_process_waitables()) { + inner_subscription->exchange_in_use_by_wait_set_state(local_waitable.get(), false); + if (nullptr != local_waitable) { + // This is the case when intra process is disabled for the subscription. + this->storage_remove_waitable(std::move(local_waitable)); + } } } }); diff --git a/rclcpp/src/rclcpp/intra_process_manager.cpp b/rclcpp/src/rclcpp/intra_process_manager.cpp index 0b9c9d6670..3b0b4d1e19 100644 --- a/rclcpp/src/rclcpp/intra_process_manager.cpp +++ b/rclcpp/src/rclcpp/intra_process_manager.cpp @@ -32,7 +32,9 @@ IntraProcessManager::~IntraProcessManager() {} uint64_t -IntraProcessManager::add_publisher(rclcpp::PublisherBase::SharedPtr publisher) +IntraProcessManager::add_publisher( + rclcpp::PublisherBase::SharedPtr publisher, + const bool is_serialized) { std::unique_lock lock(mutex_); @@ -41,6 +43,7 @@ IntraProcessManager::add_publisher(rclcpp::PublisherBase::SharedPtr publisher) publishers_[id].publisher = publisher; publishers_[id].topic_name = publisher->get_topic_name(); publishers_[id].qos = publisher->get_actual_qos().get_rmw_qos_profile(); + publishers_[id].is_serialized = is_serialized; // Initialize the subscriptions storage for this publisher. pub_to_subs_[id] = SplittedSubscriptions(); @@ -56,7 +59,9 @@ IntraProcessManager::add_publisher(rclcpp::PublisherBase::SharedPtr publisher) } uint64_t -IntraProcessManager::add_subscription(SubscriptionIntraProcessBase::SharedPtr subscription) +IntraProcessManager::add_subscription( + SubscriptionIntraProcessBase::SharedPtr subscription, + const bool is_serialized) { std::unique_lock lock(mutex_); @@ -66,6 +71,7 @@ IntraProcessManager::add_subscription(SubscriptionIntraProcessBase::SharedPtr su subscriptions_[id].topic_name = subscription->get_topic_name(); subscriptions_[id].qos = subscription->get_actual_qos(); subscriptions_[id].use_take_shared_method = subscription->use_take_shared_method(); + subscriptions_[id].is_serialized = is_serialized; // adds the subscription id to all the matchable publishers for (auto & pair : publishers_) { @@ -220,6 +226,13 @@ IntraProcessManager::can_communicate( return false; } + // a publisher and a subscription with different content type can't communicate + // if is_serialized is true, the expected message typ is rcl_serialized_message_t + // otherwise the templated ROS2 message type + if (sub_info.is_serialized != pub_info.is_serialized) { + return false; + } + return true; } diff --git a/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp b/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp index 11de4a3278..9cb84d3cce 100644 --- a/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp +++ b/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp @@ -99,10 +99,12 @@ NodeTopics::add_subscription( callback_group->add_waitable(subscription_event); } - auto intra_process_waitable = subscription->get_intra_process_waitable(); - if (nullptr != intra_process_waitable) { - // Add to the callback group to be notified about intra-process msgs. - callback_group->add_waitable(intra_process_waitable); + const auto intra_process_waitables = subscription->get_intra_process_waitables(); + for (auto & intra_process_waitable : intra_process_waitables) { + if (nullptr != intra_process_waitable) { + // Add to the callback group to be notified about intra-process msgs. + callback_group->add_waitable(intra_process_waitable); + } } // Notify the executor that a new subscription was created using the parent Node. diff --git a/rclcpp/src/rclcpp/publisher_base.cpp b/rclcpp/src/rclcpp/publisher_base.cpp index c982a95d33..0f2258881a 100644 --- a/rclcpp/src/rclcpp/publisher_base.cpp +++ b/rclcpp/src/rclcpp/publisher_base.cpp @@ -44,7 +44,8 @@ PublisherBase::PublisherBase( const rosidl_message_type_support_t & type_support, const rcl_publisher_options_t & publisher_options) : rcl_node_handle_(node_base->get_shared_rcl_node_handle()), - intra_process_is_enabled_(false), intra_process_publisher_id_(0) + intra_process_is_enabled_(false), intra_process_publisher_id_(0), + intra_process_publisher_id_serialized_(0) { rcl_ret_t ret = rcl_publisher_init( &publisher_handle_, @@ -105,6 +106,7 @@ PublisherBase::~PublisherBase() return; } ipm->remove_publisher(intra_process_publisher_id_); + ipm->remove_publisher(intra_process_publisher_id_serialized_); } const char * @@ -238,9 +240,11 @@ PublisherBase::operator==(const rmw_gid_t * gid) const void PublisherBase::setup_intra_process( uint64_t intra_process_publisher_id, + uint64_t intra_process_publisher_id_serialized, IntraProcessManagerSharedPtr ipm) { intra_process_publisher_id_ = intra_process_publisher_id; + intra_process_publisher_id_serialized_ = intra_process_publisher_id_serialized; weak_ipm_ = ipm; intra_process_is_enabled_ = true; } diff --git a/rclcpp/src/rclcpp/subscription_base.cpp b/rclcpp/src/rclcpp/subscription_base.cpp index 2af068e32e..1d24adf71a 100644 --- a/rclcpp/src/rclcpp/subscription_base.cpp +++ b/rclcpp/src/rclcpp/subscription_base.cpp @@ -39,7 +39,6 @@ SubscriptionBase::SubscriptionBase( : node_base_(node_base), node_handle_(node_base_->get_shared_rcl_node_handle()), use_intra_process_(false), - intra_process_subscription_id_(0), type_support_(type_support_handle), is_serialized_(is_serialized) { @@ -93,7 +92,9 @@ SubscriptionBase::~SubscriptionBase() "Intra process manager died before than a subscription."); return; } - ipm->remove_subscription(intra_process_subscription_id_); + for (auto intra_process_subscription_id : intra_process_subscription_ids_) { + ipm->remove_subscription(intra_process_subscription_id); + } } const char * @@ -204,10 +205,10 @@ SubscriptionBase::get_publisher_count() const void SubscriptionBase::setup_intra_process( - uint64_t intra_process_subscription_id, + const std::vector & intra_process_subscription_ids, IntraProcessManagerWeakPtr weak_ipm) { - intra_process_subscription_id_ = intra_process_subscription_id; + intra_process_subscription_ids_ = intra_process_subscription_ids; weak_ipm_ = weak_ipm; use_intra_process_ = true; } @@ -218,23 +219,29 @@ SubscriptionBase::can_loan_messages() const return rcl_subscription_can_loan_messages(subscription_handle_.get()); } -rclcpp::Waitable::SharedPtr -SubscriptionBase::get_intra_process_waitable() const +std::vector +SubscriptionBase::get_intra_process_waitables() const { // If not using intra process, shortcut to nullptr. if (!use_intra_process_) { - return nullptr; + return std::vector(); } // Get the intra process manager. auto ipm = weak_ipm_.lock(); if (!ipm) { throw std::runtime_error( - "SubscriptionBase::get_intra_process_waitable() called " + "SubscriptionBase::get_intra_process_waitables() called " "after destruction of intra process manager"); } - // Use the id to retrieve the subscription intra-process from the intra-process manager. - return ipm->get_subscription_intra_process(intra_process_subscription_id_); + std::vector waitables(intra_process_subscription_ids_.size()); + + for (auto i = 0u; i < intra_process_subscription_ids_.size(); ++i) { + // Use the id to retrieve the subscription intra-process from the intra-process manager. + waitables[i] = ipm->get_subscription_intra_process(intra_process_subscription_ids_[i]); + } + + return waitables; } void @@ -274,8 +281,10 @@ SubscriptionBase::exchange_in_use_by_wait_set_state( if (this == pointer_to_subscription_part) { return subscription_in_use_by_wait_set_.exchange(in_use_state); } - if (get_intra_process_waitable().get() == pointer_to_subscription_part) { - return intra_process_subscription_waitable_in_use_by_wait_set_.exchange(in_use_state); + for (auto & waitable : get_intra_process_waitables()) { + if (waitable.get() == pointer_to_subscription_part) { + return intra_process_subscription_waitable_in_use_by_wait_set_.exchange(in_use_state); + } } for (const auto & qos_event : event_handlers_) { if (qos_event.get() == pointer_to_subscription_part) { diff --git a/rclcpp/test/test_intra_process_communication.cpp b/rclcpp/test/test_intra_process_communication.cpp new file mode 100644 index 0000000000..9a40be728d --- /dev/null +++ b/rclcpp/test/test_intra_process_communication.cpp @@ -0,0 +1,367 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include + +#include +#include +#include +#include +#include + +#include "rclcpp/exceptions.hpp" +#include "rclcpp/publisher.hpp" +#include "rclcpp/rclcpp.hpp" + +#include "rclcpp/serialized_message.hpp" + +int32_t & get_test_allocation_counter() +{ + static int32_t counter = 0; + return counter; +} + +void * custom_allocate(size_t size, void * state) +{ + static auto m_allocator = rcutils_get_default_allocator(); + + ++get_test_allocation_counter(); + auto r = m_allocator.allocate(size, state); + return r; +} + +void * custom_zero_allocate(size_t number_of_elements, size_t size_of_element, void * state) +{ + static auto m_allocator = rcutils_get_default_allocator(); + + ++get_test_allocation_counter(); + auto r = m_allocator.zero_allocate(number_of_elements, size_of_element, state); + return r; +} + +void * custom_reallocate(void * pointer, size_t size, void * state) +{ + static auto m_allocator = rcutils_get_default_allocator(); + + if (pointer == nullptr) { + ++get_test_allocation_counter(); + } + + auto r = m_allocator.reallocate(pointer, size, state); + return r; +} + +void custom_deallocate(void * pointer, void * state) +{ + static auto m_allocator = rcutils_get_default_allocator(); + + --get_test_allocation_counter(); + m_allocator.deallocate(pointer, state); +} + +rcl_serialized_message_t make_serialized_string_msg( + const std::shared_ptr & stringMsg) +{ + auto m_allocator = rcutils_get_default_allocator(); + + // add custom (de)allocator to count the references to the object + m_allocator.allocate = &custom_allocate; + m_allocator.deallocate = &custom_deallocate; + m_allocator.reallocate = &custom_reallocate; + m_allocator.zero_allocate = &custom_zero_allocate; + + rcl_serialized_message_t msg = rmw_get_zero_initialized_serialized_message(); + auto ret = rmw_serialized_message_init(&msg, 0, &m_allocator); + if (ret != RCL_RET_OK) { + rclcpp::exceptions::throw_from_rcl_error(ret); + } + + static auto type = + rosidl_typesupport_cpp::get_message_type_support_handle + (); + auto error = rmw_serialize(stringMsg.get(), type, &msg); + if (error != RMW_RET_OK) { + RCUTILS_LOG_ERROR_NAMED( + "test_intra_process_communication", + "Something went wrong preparing the serialized message"); + } + + return msg; +} + +/** + * Parameterized test. + * The first param are the NodeOptions used to create the nodes. + * The second param are the expected intraprocess count results. + */ +struct TestParameters +{ + rclcpp::NodeOptions node_options[2]; + uint64_t intraprocess_count_results[2]; + size_t runs; + std::string description; +}; + +std::ostream & operator<<(std::ostream & out, const TestParameters & params) +{ + out << params.description; + return out; +} + +class TestPublisherSubscriptionSerialized : public ::testing::TestWithParam +{ +public: + static void SetUpTestCase() + { + if (!rclcpp::ok()) { + rclcpp::init(0, nullptr); + } + } + +protected: + static std::chrono::milliseconds offset; +}; + +std::chrono::milliseconds TestPublisherSubscriptionSerialized::offset = std::chrono::milliseconds( + 250); +std::array counts; + +void OnMessageSerialized(const std::shared_ptr msg) +{ + EXPECT_NE(msg->buffer, nullptr); + EXPECT_GT(msg->buffer_capacity, 0u); + + ++counts[0]; +} + +void OnMessageConst(std::shared_ptr msg) +{ + EXPECT_EQ(msg->string_value.back(), '9'); + + ++counts[1]; +} + +void OnMessageUniquePtr(std::unique_ptr msg) +{ + EXPECT_EQ(msg->string_value.back(), '9'); + + ++counts[1]; +} + +void OnMessage(std::shared_ptr msg) +{ + EXPECT_EQ(msg->string_value.back(), '9'); + + ++counts[1]; +} + +TEST_P(TestPublisherSubscriptionSerialized, publish_serialized) +{ + get_test_allocation_counter() = 0; + + TestParameters parameters = GetParam(); + { + rclcpp::Node::SharedPtr node = std::make_shared( + "my_node", + "/ns", + parameters.node_options[0]); + auto publisher = node->create_publisher("/topic", 10); + + auto sub_shared = node->create_subscription( + "/topic", 10, + &OnMessage); + auto sub_unique = node->create_subscription( + "/topic", 10, + &OnMessageUniquePtr); + auto sub_const_shared = node->create_subscription( + "/topic", 10, + &OnMessageConst); + auto sub_serialized = node->create_subscription( + "/topic", 10, + &OnMessageSerialized); + + rclcpp::sleep_for(offset); + + counts.fill(0); + auto stringMsg = get_messages_strings()[3]; + + for (size_t i = 0; i < parameters.runs; i++) { + auto msg0 = make_serialized_string_msg(stringMsg); + + std::unique_ptr stringMsgU( + new test_msgs::msg::Strings( + *stringMsg)); + + publisher->publish(std::make_unique(msg0)); + publisher->publish(*stringMsg); + publisher->publish(std::move(stringMsgU)); + } + for (uint32_t i = 0; i < 3; ++i) { + rclcpp::spin_some(node); + rclcpp::sleep_for(offset); + } + + rclcpp::spin_some(node); + } + + if (parameters.runs == 1) { + EXPECT_EQ(counts[0], 3u); + EXPECT_EQ(counts[1], 9u); + } + + EXPECT_EQ(get_test_allocation_counter(), 0); +} + +TEST_P(TestPublisherSubscriptionSerialized, publish_serialized_generic) +{ + get_test_allocation_counter() = 0; + + TestParameters parameters = GetParam(); + { + rclcpp::Node::SharedPtr node = std::make_shared( + "my_node", + "/ns", + parameters.node_options[0]); + auto publisher = rclcpp::create_generic_publisher( + node, + "/topic", + *rosidl_typesupport_cpp::get_message_type_support_handle(), + rclcpp::QoS(10)); + + auto sub_gen_serialized = rclcpp::create_generic_subscription( + node, + "/topic", + *rosidl_typesupport_cpp::get_message_type_support_handle(), + rclcpp::QoS(10), + &OnMessageSerialized); + + auto sub_shared = node->create_subscription( + "/topic", 10, + &OnMessage); + auto sub_unique = node->create_subscription( + "/topic", 10, + &OnMessageUniquePtr); + auto sub_const_shared = node->create_subscription( + "/topic", 10, + &OnMessageConst); + auto sub_serialized = node->create_subscription( + "/topic", 10, + &OnMessageSerialized); + + rclcpp::sleep_for(offset); + + counts.fill(0); + auto stringMsg = get_messages_strings()[3]; + + for (size_t i = 0; i < parameters.runs; i++) { + auto msg0 = make_serialized_string_msg(stringMsg); + + publisher->publish(std::make_unique(msg0)); + } + rclcpp::spin_some(node); + rclcpp::sleep_for(offset); + + rclcpp::spin_some(node); + } + + if (parameters.runs == 1) { + EXPECT_EQ(counts[0], 2u); + EXPECT_EQ(counts[1], 3u); + } + + EXPECT_EQ(get_test_allocation_counter(), 0); +} + +auto get_new_context() +{ + auto context = rclcpp::Context::make_shared(); + context->init(0, nullptr); + return context; +} + +std::vector parameters = { + /* + Testing publisher subscription count api and internal process subscription count. + Two subscriptions in the same topic, both using intraprocess comm. + */ + { + { + rclcpp::NodeOptions().use_intra_process_comms(true), + rclcpp::NodeOptions().use_intra_process_comms(true) + }, + {1u, 2u}, + 1, + "two_subscriptions_intraprocess_comm" + }, + /* + Testing publisher subscription count api and internal process subscription count. + Two subscriptions, one using intra-process comm and the other not using it. + */ + { + { + rclcpp::NodeOptions().use_intra_process_comms(true), + rclcpp::NodeOptions().use_intra_process_comms(false) + }, + {1u, 1u}, + 1, + "two_subscriptions_one_intraprocess_one_not" + }, + /* + Testing publisher subscription count api and internal process subscription count. + Two contexts, both using intra-process. + */ + { + { + rclcpp::NodeOptions().use_intra_process_comms(true), + rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(true) + }, + {1u, 1u}, + 1, + "two_subscriptions_in_two_contexts_with_intraprocess_comm" + }, + /* + Testing publisher subscription count api and internal process subscription count. + Two contexts, both of them not using intra-process comm. + */ + { + { + rclcpp::NodeOptions().use_intra_process_comms(false), + rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(false) + }, + {0u, 0u}, + 1, + "two_subscriptions_in_two_contexts_without_intraprocess_comm" + } +}; + +std::vector setRuns(const std::vector & in, const size_t runs) +{ + std::vector out = in; + for (auto & p : out) { + p.runs = runs; + } + return out; +} + +INSTANTIATE_TEST_CASE_P( + TestWithDifferentNodeOptions, TestPublisherSubscriptionSerialized, + ::testing::ValuesIn(parameters), + ::testing::PrintToStringParamName()); + +INSTANTIATE_TEST_CASE_P( + TestWithDifferentNodeOptions1000Runs, TestPublisherSubscriptionSerialized, + ::testing::ValuesIn(setRuns(parameters, 1000)), + ::testing::PrintToStringParamName()); diff --git a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp index 2f94dc78c0..68bf091596 100644 --- a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp +++ b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp @@ -68,6 +68,18 @@ class LifecyclePublisher : public LifecyclePublisherInterface, { } + LifecyclePublisher( + rclcpp::node_interfaces::NodeBaseInterface * node_base, + const std::string & topic, + const rclcpp::QoS & qos, + const rclcpp::PublisherOptionsWithAllocator & options, + const rosidl_message_type_support_t & type_support) + : rclcpp::Publisher(node_base, topic, qos, options, type_support), + enabled_(false), + logger_(rclcpp::get_logger("LifecyclePublisher")) + { + } + ~LifecyclePublisher() {} /// LifecyclePublisher publish function @@ -110,6 +122,30 @@ class LifecyclePublisher : public LifecyclePublisherInterface, rclcpp::Publisher::publish(msg); } + /// Publish a serialized message. Non specialized version to prevent compiling errors. + template + void publish(std::unique_ptr serialized_msg) + { + (void)serialized_msg; + throw std::runtime_error( + "publishing unique_ptr with custom deleter only supported for serialized messages"); + } + + /// Publish a serialized message. + template + void publish(std::unique_ptr serialized_msg) + { + if (!enabled_) { + RCLCPP_WARN( + logger_, + "Trying to publish message on the topic '%s', but the publisher is not activated", + this->get_topic_name()); + + return; + } + this->do_serialized_publish(*serialized_msg); + } + virtual void on_activate() {