From 98f610c114b8cc7bda2ba3a3d1d486767ee293e5 Mon Sep 17 00:00:00 2001 From: ivanpauno Date: Tue, 30 Apr 2019 16:05:53 -0300 Subject: [PATCH] New IntraProcessManager capable of storing shared_ptr (#690) * Changed mapped_ring_buffer class to store both shared_ptr or unique_ptr Signed-off-by: ivanpauno * Changed the IPM store and take methods Signed-off-by: ivanpauno * Changed publish methods to take advantage of the new IPM Signed-off-by: ivanpauno * Change how subscriptions handle intraprocess messages Signed-off-by: ivanpauno * Modified publish method signatures Signed-off-by: ivanpauno * Renamed 'publisher.cpp' and 'subscription.cpp' to 'publisher_base.cpp' and 'subscription_base.cpp' Signed-off-by: ivanpauno * Updated lifecycle_publisher publish methods Signed-off-by: ivanpauno --- rclcpp/CMakeLists.txt | 4 +- .../rclcpp/any_subscription_callback.hpp | 38 ++- .../include/rclcpp/intra_process_manager.hpp | 92 +++-- .../rclcpp/intra_process_manager_impl.hpp | 4 +- rclcpp/include/rclcpp/mapped_ring_buffer.hpp | 157 ++++++--- rclcpp/include/rclcpp/publisher.hpp | 323 ++++++------------ rclcpp/include/rclcpp/publisher_base.hpp | 195 +++++++++++ rclcpp/include/rclcpp/publisher_factory.hpp | 65 ---- rclcpp/include/rclcpp/subscription.hpp | 272 +++++---------- rclcpp/include/rclcpp/subscription_base.hpp | 165 +++++++++ .../include/rclcpp/subscription_factory.hpp | 58 ---- rclcpp/src/rclcpp/intra_process_manager.cpp | 15 + .../rclcpp/node_interfaces/node_topics.cpp | 13 +- .../{publisher.cpp => publisher_base.cpp} | 14 +- ...subscription.cpp => subscription_base.cpp} | 33 +- rclcpp/test/test_intra_process_manager.cpp | 125 ++++--- rclcpp/test/test_mapped_ring_buffer.cpp | 243 ++++++++++--- .../rclcpp_lifecycle/lifecycle_publisher.hpp | 42 +-- 18 files changed, 1093 insertions(+), 765 deletions(-) create mode 100644 rclcpp/include/rclcpp/publisher_base.hpp create mode 100644 rclcpp/include/rclcpp/subscription_base.hpp rename rclcpp/src/rclcpp/{publisher.cpp => publisher_base.cpp} (97%) rename rclcpp/src/rclcpp/{subscription.cpp => subscription_base.cpp} (78%) diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 0de4deb495..b4e42dd2b1 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -64,10 +64,10 @@ set(${PROJECT_NAME}_SRCS src/rclcpp/parameter_events_filter.cpp src/rclcpp/parameter_map.cpp src/rclcpp/parameter_service.cpp - src/rclcpp/publisher.cpp + src/rclcpp/publisher_base.cpp src/rclcpp/service.cpp src/rclcpp/signal_handler.cpp - src/rclcpp/subscription.cpp + src/rclcpp/subscription_base.cpp src/rclcpp/time.cpp src/rclcpp/time_source.cpp src/rclcpp/timer.cpp diff --git a/rclcpp/include/rclcpp/any_subscription_callback.hpp b/rclcpp/include/rclcpp/any_subscription_callback.hpp index 095f3b9dc2..5b6db7f025 100644 --- a/rclcpp/include/rclcpp/any_subscription_callback.hpp +++ b/rclcpp/include/rclcpp/any_subscription_callback.hpp @@ -36,6 +36,7 @@ class AnySubscriptionCallback using MessageAllocTraits = allocator::AllocRebind; using MessageAlloc = typename MessageAllocTraits::allocator_type; using MessageDeleter = allocator::Deleter; + using ConstMessageSharedPtr = std::shared_ptr; using MessageUniquePtr = std::unique_ptr; using SharedPtrCallback = std::function)>; @@ -154,7 +155,6 @@ class AnySubscriptionCallback void dispatch( std::shared_ptr message, const rmw_message_info_t & message_info) { - (void)message_info; if (shared_ptr_callback_) { shared_ptr_callback_(message); } else if (shared_ptr_with_info_callback_) { @@ -177,30 +177,50 @@ class AnySubscriptionCallback } void dispatch_intra_process( - MessageUniquePtr & message, const rmw_message_info_t & message_info) + ConstMessageSharedPtr message, const rmw_message_info_t & message_info) + { + if (const_shared_ptr_callback_) { + const_shared_ptr_callback_(message); + } else if (const_shared_ptr_with_info_callback_) { + const_shared_ptr_with_info_callback_(message, message_info); + } else { + if (unique_ptr_callback_ || unique_ptr_with_info_callback_ || + shared_ptr_callback_ || shared_ptr_with_info_callback_) + { + throw std::runtime_error("unexpected dispatch_intra_process const shared " + "message call with no const shared_ptr callback"); + } else { + throw std::runtime_error("unexpected message without any callback set"); + } + } + } + + void dispatch_intra_process( + MessageUniquePtr message, const rmw_message_info_t & message_info) { - (void)message_info; if (shared_ptr_callback_) { typename std::shared_ptr shared_message = std::move(message); shared_ptr_callback_(shared_message); } else if (shared_ptr_with_info_callback_) { typename std::shared_ptr shared_message = std::move(message); shared_ptr_with_info_callback_(shared_message, message_info); - } else if (const_shared_ptr_callback_) { - typename std::shared_ptr const_shared_message = std::move(message); - const_shared_ptr_callback_(const_shared_message); - } else if (const_shared_ptr_with_info_callback_) { - typename std::shared_ptr const_shared_message = std::move(message); - const_shared_ptr_with_info_callback_(const_shared_message, message_info); } else if (unique_ptr_callback_) { unique_ptr_callback_(std::move(message)); } else if (unique_ptr_with_info_callback_) { unique_ptr_with_info_callback_(std::move(message), message_info); + } else if (const_shared_ptr_callback_ || const_shared_ptr_with_info_callback_) { + throw std::runtime_error("unexpected dispatch_intra_process unique message call" + " with const shared_ptr callback"); } else { throw std::runtime_error("unexpected message without any callback set"); } } + bool use_take_shared_method() + { + return const_shared_ptr_callback_ || const_shared_ptr_with_info_callback_; + } + private: std::shared_ptr message_allocator_; MessageDeleter message_deleter_; diff --git a/rclcpp/include/rclcpp/intra_process_manager.hpp b/rclcpp/include/rclcpp/intra_process_manager.hpp index 6c044a4da6..5d3abed3f6 100644 --- a/rclcpp/include/rclcpp/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/intra_process_manager.hpp @@ -25,14 +25,15 @@ #include #include #include +#include #include #include "rclcpp/allocator/allocator_deleter.hpp" #include "rclcpp/intra_process_manager_impl.hpp" #include "rclcpp/mapped_ring_buffer.hpp" #include "rclcpp/macros.hpp" -#include "rclcpp/publisher.hpp" -#include "rclcpp/subscription.hpp" +#include "rclcpp/publisher_base.hpp" +#include "rclcpp/subscription_base.hpp" #include "rclcpp/visibility_control.hpp" namespace rclcpp @@ -184,21 +185,11 @@ class IntraProcessManager * \param buffer_size if 0 (default) a size is calculated based on the QoS. * \return an unsigned 64-bit integer which is the publisher's unique id. */ - template + RCLCPP_PUBLIC uint64_t add_publisher( - typename Publisher::SharedPtr publisher, - size_t buffer_size = 0) - { - auto id = IntraProcessManager::get_next_unique_id(); - size_t size = buffer_size > 0 ? buffer_size : publisher->get_queue_size(); - auto mrb = mapped_ring_buffer::MappedRingBuffer< - MessageT, - typename Publisher::MessageAlloc - >::make_shared(size, publisher->get_allocator()); - impl_->add_publisher(id, publisher, mrb, size); - return id; - } + rclcpp::PublisherBase::SharedPtr publisher, + size_t buffer_size = 0); /// Unregister a publisher using the publisher's unique id. /** @@ -241,13 +232,41 @@ class IntraProcessManager * \param message the message that is being stored. * \return the message sequence number. */ + template< + typename MessageT, typename Alloc = std::allocator> + uint64_t + store_intra_process_message( + uint64_t intra_process_publisher_id, + std::shared_ptr message) + { + using MRBMessageAlloc = typename std::allocator_traits::template rebind_alloc; + using TypedMRB = typename mapped_ring_buffer::MappedRingBuffer; + uint64_t message_seq = 0; + mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = impl_->get_publisher_info_for_id( + intra_process_publisher_id, message_seq); + typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast(buffer); + if (!typed_buffer) { + throw std::runtime_error("Typecast failed due to incorrect message type"); + } + + // Insert the message into the ring buffer using the message_seq to identify it. + bool did_replace = typed_buffer->push_and_replace(message_seq, message); + // TODO(wjwwood): do something when a message was displaced. log debug? + (void)did_replace; // Avoid unused variable warning. + + impl_->store_intra_process_message(intra_process_publisher_id, message_seq); + + // Return the message sequence which is sent to the subscription. + return message_seq; + } + template< typename MessageT, typename Alloc = std::allocator, typename Deleter = std::default_delete> uint64_t store_intra_process_message( uint64_t intra_process_publisher_id, - std::unique_ptr & message) + std::unique_ptr message) { using MRBMessageAlloc = typename std::allocator_traits::template rebind_alloc; using TypedMRB = typename mapped_ring_buffer::MappedRingBuffer; @@ -260,7 +279,7 @@ class IntraProcessManager } // Insert the message into the ring buffer using the message_seq to identify it. - bool did_replace = typed_buffer->push_and_replace(message_seq, message); + bool did_replace = typed_buffer->push_and_replace(message_seq, std::move(message)); // TODO(wjwwood): do something when a message was displaced. log debug? (void)did_replace; // Avoid unused variable warning. @@ -334,10 +353,45 @@ class IntraProcessManager // Return a copy or the unique_ptr (ownership) depending on how many subscriptions are left. if (target_subs_size) { // There are more subscriptions to serve, return a copy. - typed_buffer->get_copy_at_key(message_sequence_number, message); + typed_buffer->get(message_sequence_number, message); + } else { + // This is the last one to be returned, transfer ownership. + typed_buffer->pop(message_sequence_number, message); + } + } + + template< + typename MessageT, typename Alloc = std::allocator> + void + take_intra_process_message( + uint64_t intra_process_publisher_id, + uint64_t message_sequence_number, + uint64_t requesting_subscriptions_intra_process_id, + std::shared_ptr & message) + { + using MRBMessageAlloc = typename std::allocator_traits::template rebind_alloc; + using TypedMRB = mapped_ring_buffer::MappedRingBuffer; + message = nullptr; + + size_t target_subs_size = 0; + std::lock_guard lock(take_mutex_); + mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = impl_->take_intra_process_message( + intra_process_publisher_id, + message_sequence_number, + requesting_subscriptions_intra_process_id, + target_subs_size + ); + typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast(buffer); + if (!typed_buffer) { + return; + } + // Return a copy or the unique_ptr (ownership) depending on how many subscriptions are left. + if (target_subs_size) { + // There are more subscriptions to serve, return a copy. + typed_buffer->get(message_sequence_number, message); } else { // This is the last one to be returned, transfer ownership. - typed_buffer->pop_at_key(message_sequence_number, message); + typed_buffer->pop(message_sequence_number, message); } } diff --git a/rclcpp/include/rclcpp/intra_process_manager_impl.hpp b/rclcpp/include/rclcpp/intra_process_manager_impl.hpp index 89784c63d1..ab29af7b92 100644 --- a/rclcpp/include/rclcpp/intra_process_manager_impl.hpp +++ b/rclcpp/include/rclcpp/intra_process_manager_impl.hpp @@ -34,8 +34,8 @@ #include "rclcpp/macros.hpp" #include "rclcpp/mapped_ring_buffer.hpp" -#include "rclcpp/publisher.hpp" -#include "rclcpp/subscription.hpp" +#include "rclcpp/publisher_base.hpp" +#include "rclcpp/subscription_base.hpp" #include "rclcpp/visibility_control.hpp" namespace rclcpp diff --git a/rclcpp/include/rclcpp/mapped_ring_buffer.hpp b/rclcpp/include/rclcpp/mapped_ring_buffer.hpp index 69ce8b9264..c8fdf6493d 100644 --- a/rclcpp/include/rclcpp/mapped_ring_buffer.hpp +++ b/rclcpp/include/rclcpp/mapped_ring_buffer.hpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include @@ -38,7 +39,7 @@ class RCLCPP_PUBLIC MappedRingBufferBase RCLCPP_SMART_PTR_DEFINITIONS(MappedRingBufferBase) }; -/// Ring buffer container of unique_ptr's of T, which can be accessed by a key. +/// Ring buffer container of shared_ptr's or unique_ptr's of T, which can be accessed by a key. /** * T must be a CopyConstructable and CopyAssignable. * This class can be used in a container by using the base class MappedRingBufferBase. @@ -64,6 +65,7 @@ class MappedRingBuffer : public MappedRingBufferBase using ElemAlloc = typename ElemAllocTraits::allocator_type; using ElemDeleter = allocator::Deleter; + using ConstElemSharedPtr = std::shared_ptr; using ElemUniquePtr = std::unique_ptr; /// Constructor. @@ -101,32 +103,72 @@ class MappedRingBuffer : public MappedRingBufferBase * \param value if the key is found, the value is stored in this parameter */ void - get_copy_at_key(uint64_t key, ElemUniquePtr & value) + get(uint64_t key, ElemUniquePtr & value) { std::lock_guard lock(data_mutex_); auto it = get_iterator_of_key(key); value = nullptr; if (it != elements_.end() && it->in_use) { - auto ptr = ElemAllocTraits::allocate(*allocator_.get(), 1); - ElemAllocTraits::construct(*allocator_.get(), ptr, *it->value); - value = ElemUniquePtr(ptr); + if (it->unique_value) { + ElemDeleter deleter = it->unique_value.get_deleter(); + auto ptr = ElemAllocTraits::allocate(*allocator_.get(), 1); + ElemAllocTraits::construct(*allocator_.get(), ptr, *it->unique_value); + value = ElemUniquePtr(ptr, deleter); + } else if (it->shared_value) { + ElemDeleter * deleter = std::get_deleter(it->shared_value); + auto ptr = ElemAllocTraits::allocate(*allocator_.get(), 1); + ElemAllocTraits::construct(*allocator_.get(), ptr, *it->shared_value); + if (deleter) { + value = ElemUniquePtr(ptr, *deleter); + } else { + value = ElemUniquePtr(ptr); + } + } else { + throw std::runtime_error("Unexpected empty MappedRingBuffer element."); + } } } - /// Return ownership of the value stored in the ring buffer, leaving a copy. + /// Share ownership of the value stored in the ring buffer at the given key. /** - * The key is matched if an element in the ring bufer has a matching key. - * This method will allocate in order to store a copy. + * The key is matched if an element in the ring buffer has a matching key. * * The key is not guaranteed to be unique, see the class docs for more. * - * The ownership of the currently stored object is returned, but a copy is - * made and stored in its place. - * This means that multiple calls to this function for a particular element - * will result in returning the copied and stored object not the original. - * This also means that later calls to pop_at_key will not return the - * originally stored object, since it was returned by the first call to this - * method. + * The contents of value before the method is called are discarded. + * + * \param key the key associated with the stored value + * \param value if the key is found, the value is stored in this parameter + */ + void + get(uint64_t key, ConstElemSharedPtr & value) + { + std::lock_guard lock(data_mutex_); + auto it = get_iterator_of_key(key); + value.reset(); + if (it != elements_.end() && it->in_use) { + if (!it->shared_value) { + // The stored unique_ptr is upgraded to a shared_ptr here. + // All the remaining get and pop calls done with unique_ptr + // signature will receive a copy. + if (!it->unique_value) { + throw std::runtime_error("Unexpected empty MappedRingBuffer element."); + } + it->shared_value = std::move(it->unique_value); + } + value = it->shared_value; + } + } + + /// Give the ownership of the stored value to the caller if possible, or copy and release. + /** + * The key is matched if an element in the ring buffer has a matching key. + * This method may allocate in order to return a copy. + * + * If the stored value is a shared_ptr, it is not possible to downgrade it to a unique_ptr. + * In that case, a copy is returned and the stored value is released. + * + * The key is not guaranteed to be unique, see the class docs for more. * * The contents of value before the method is called are discarded. * @@ -134,24 +176,32 @@ class MappedRingBuffer : public MappedRingBufferBase * \param value if the key is found, the value is stored in this parameter */ void - get_ownership_at_key(uint64_t key, ElemUniquePtr & value) + pop(uint64_t key, ElemUniquePtr & value) { std::lock_guard lock(data_mutex_); auto it = get_iterator_of_key(key); value = nullptr; if (it != elements_.end() && it->in_use) { - // Make a copy. - auto ptr = ElemAllocTraits::allocate(*allocator_.get(), 1); - ElemAllocTraits::construct(*allocator_.get(), ptr, *it->value); - auto copy = ElemUniquePtr(ptr); - // Return the original. - value.swap(it->value); - // Store the copy. - it->value.swap(copy); + if (it->unique_value) { + value = std::move(it->unique_value); + } else if (it->shared_value) { + auto ptr = ElemAllocTraits::allocate(*allocator_.get(), 1); + ElemAllocTraits::construct(*allocator_.get(), ptr, *it->shared_value); + auto deleter = std::get_deleter(it->shared_value); + if (deleter) { + value = ElemUniquePtr(ptr, *deleter); + } else { + value = ElemUniquePtr(ptr); + } + it->shared_value.reset(); + } else { + throw std::runtime_error("Unexpected empty MappedRingBuffer element."); + } + it->in_use = false; } } - /// Return ownership of the value stored in the ring buffer at the given key. + /// Give the ownership of the stored value to the caller, at the given key. /** * The key is matched if an element in the ring buffer has a matching key. * @@ -163,13 +213,18 @@ class MappedRingBuffer : public MappedRingBufferBase * \param value if the key is found, the value is stored in this parameter */ void - pop_at_key(uint64_t key, ElemUniquePtr & value) + pop(uint64_t key, ConstElemSharedPtr & value) { std::lock_guard lock(data_mutex_); auto it = get_iterator_of_key(key); - value = nullptr; if (it != elements_.end() && it->in_use) { - value.swap(it->value); + if (it->shared_value) { + value = std::move(it->shared_value); + } else if (it->unique_value) { + value = std::move(it->unique_value); + } else { + throw std::runtime_error("Unexpected empty MappedRingBuffer element."); + } it->in_use = false; } } @@ -180,29 +235,44 @@ class MappedRingBuffer : public MappedRingBufferBase * It is up to the user to ensure the key is unique. * This method should not allocate memory. * - * After insertion, if a pair was replaced, then value will contain ownership - * of that displaced value. Otherwise it will be a nullptr. + * After insertion the value will be a nullptr. + * If a pair were replaced, its smart pointer is reset. * * \param key the key associated with the value to be stored * \param value the value to store, and optionally the value displaced */ bool - push_and_replace(uint64_t key, ElemUniquePtr & value) + push_and_replace(uint64_t key, ConstElemSharedPtr value) { std::lock_guard lock(data_mutex_); bool did_replace = elements_[head_].in_use; - elements_[head_].key = key; - elements_[head_].value.swap(value); - elements_[head_].in_use = true; + Element & element = elements_[head_]; + element.key = key; + element.unique_value.reset(); + element.shared_value.reset(); + element.shared_value = value; + element.in_use = true; head_ = (head_ + 1) % elements_.size(); return did_replace; } + /// Insert a key-value pair, displacing an existing pair if necessary. + /** + * See `bool push_and_replace(uint64_t key, const ConstElemSharedPtr & value)`. + */ bool - push_and_replace(uint64_t key, ElemUniquePtr && value) + push_and_replace(uint64_t key, ElemUniquePtr value) { - ElemUniquePtr temp = std::move(value); - return push_and_replace(key, temp); + std::lock_guard lock(data_mutex_); + bool did_replace = elements_[head_].in_use; + Element & element = elements_[head_]; + element.key = key; + element.unique_value.reset(); + element.shared_value.reset(); + element.unique_value = std::move(value); + element.in_use = true; + head_ = (head_ + 1) % elements_.size(); + return did_replace; } /// Return true if the key is found in the ring buffer, otherwise false. @@ -216,27 +286,28 @@ class MappedRingBuffer : public MappedRingBufferBase private: RCLCPP_DISABLE_COPY(MappedRingBuffer) - struct element + struct Element { uint64_t key; - ElemUniquePtr value; + ElemUniquePtr unique_value; + ConstElemSharedPtr shared_value; bool in_use; }; - using VectorAlloc = typename std::allocator_traits::template rebind_alloc; + using VectorAlloc = typename std::allocator_traits::template rebind_alloc; - typename std::vector::iterator + typename std::vector::iterator get_iterator_of_key(uint64_t key) { auto it = std::find_if( elements_.begin(), elements_.end(), - [key](element & e) -> bool { + [key](Element & e) -> bool { return e.key == key && e.in_use; }); return it; } - std::vector elements_; + std::vector elements_; size_t head_; std::shared_ptr allocator_; std::mutex data_mutex_; diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index 3bc5814112..6c921fbf68 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -23,6 +23,7 @@ #include #include #include +#include #include "rcl/error_handling.h" #include "rcl/publisher.h" @@ -31,167 +32,16 @@ #include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/allocator/allocator_deleter.hpp" +#include "rclcpp/intra_process_manager.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/node_interfaces/node_base_interface.hpp" +#include "rclcpp/publisher_base.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/visibility_control.hpp" namespace rclcpp { -// Forward declaration is used for friend statement. -namespace node_interfaces -{ -class NodeTopicsInterface; -} - -namespace intra_process_manager -{ -/** - * NOTE(ivanpauno): IntraProcessManager is forward declared here, avoiding a circular inclusion between intra_process_manager.hpp and publisher.hpp. - * SharedPtr and WeakPtr of the IntraProcessManager are defined again here, to avoid a warning for accessing a member of a forward declared class. - */ -class IntraProcessManager; -} - -class PublisherBase -{ - friend ::rclcpp::node_interfaces::NodeTopicsInterface; - -public: - RCLCPP_SMART_PTR_DEFINITIONS(PublisherBase) - - /// Default constructor. - /** - * Typically, a publisher is not created through this method, but instead is created through a - * call to `Node::create_publisher`. - * \param[in] node_base A pointer to the NodeBaseInterface for the parent node. - * \param[in] topic The topic that this publisher publishes on. - * \param[in] type_support The type support structure for the type to be published. - * \param[in] publisher_options QoS settings for this publisher. - */ - RCLCPP_PUBLIC - PublisherBase( - rclcpp::node_interfaces::NodeBaseInterface * node_base, - const std::string & topic, - const rosidl_message_type_support_t & type_support, - const rcl_publisher_options_t & publisher_options); - - RCLCPP_PUBLIC - virtual ~PublisherBase(); - - /// Get the topic that this publisher publishes on. - /** \return The topic name. */ - RCLCPP_PUBLIC - const char * - get_topic_name() const; - - /// Get the queue size for this publisher. - /** \return The queue size. */ - RCLCPP_PUBLIC - size_t - get_queue_size() const; - - /// Get the global identifier for this publisher (used in rmw and by DDS). - /** \return The gid. */ - RCLCPP_PUBLIC - const rmw_gid_t & - get_gid() const; - - /// Get the global identifier for this publisher used by intra-process communication. - /** \return The intra-process gid. */ - RCLCPP_PUBLIC - const rmw_gid_t & - get_intra_process_gid() const; - - /// Get the rcl publisher handle. - /** \return The rcl publisher handle. */ - RCLCPP_PUBLIC - rcl_publisher_t * - get_publisher_handle(); - - /// Get the rcl publisher handle. - /** \return The rcl publisher handle. */ - RCLCPP_PUBLIC - const rcl_publisher_t * - get_publisher_handle() const; - - /// Get subscription count - /** \return The number of subscriptions. */ - RCLCPP_PUBLIC - size_t - get_subscription_count() const; - - /// Get intraprocess subscription count - /** \return The number of intraprocess subscriptions. */ - RCLCPP_PUBLIC - size_t - get_intra_process_subscription_count() const; - - /// Get the actual QoS settings, after the defaults have been determined. - /** - * The actual configuration applied when using RMW_QOS_POLICY_*_SYSTEM_DEFAULT - * can only be resolved after the creation of the publisher, and it - * depends on the underlying rmw implementation. - * If the underlying setting in use can't be represented in ROS terms, - * it will be set to RMW_QOS_POLICY_*_UNKNOWN. - * May throw runtime_error when an unexpected error occurs. - * \return The actual qos settings. - */ - RCLCPP_PUBLIC - rmw_qos_profile_t - get_actual_qos() const; - - /// Compare this publisher to a gid. - /** - * Note that this function calls the next function. - * \param[in] gid Reference to a gid. - * \return True if the publisher's gid matches the input. - */ - RCLCPP_PUBLIC - bool - operator==(const rmw_gid_t & gid) const; - - /// Compare this publisher to a pointer gid. - /** - * A wrapper for comparing this publisher's gid to the input using rmw_compare_gids_equal. - * \param[in] gid A pointer to a gid. - * \return True if this publisher's gid matches the input. - */ - RCLCPP_PUBLIC - bool - operator==(const rmw_gid_t * gid) const; - - using StoreMessageCallbackT = std::function; - using IntraProcessManagerSharedPtr = - std::shared_ptr; - - /// Implementation utility function used to setup intra process publishing after creation. - RCLCPP_PUBLIC - void - setup_intra_process( - uint64_t intra_process_publisher_id, - StoreMessageCallbackT store_callback, - IntraProcessManagerSharedPtr ipm, - const rcl_publisher_options_t & intra_process_options); - -protected: - std::shared_ptr rcl_node_handle_; - - rcl_publisher_t publisher_handle_ = rcl_get_zero_initialized_publisher(); - rcl_publisher_t intra_process_publisher_handle_ = rcl_get_zero_initialized_publisher(); - - using IntraProcessManagerWeakPtr = - std::weak_ptr; - bool intra_process_is_enabled_; - IntraProcessManagerWeakPtr weak_ipm_; - uint64_t intra_process_publisher_id_; - StoreMessageCallbackT store_intra_process_message_; - - rmw_gid_t rmw_gid_; - rmw_gid_t intra_process_rmw_gid_; -}; - /// A publisher publishes messages of any type to a topic. template> class Publisher : public PublisherBase @@ -201,6 +51,7 @@ class Publisher : public PublisherBase using MessageAlloc = typename MessageAllocTraits::allocator_type; using MessageDeleter = allocator::Deleter; using MessageUniquePtr = std::unique_ptr; + using MessageSharedPtr = std::shared_ptr; RCLCPP_SMART_PTR_DEFINITIONS(Publisher) @@ -222,6 +73,15 @@ class Publisher : public PublisherBase virtual ~Publisher() {} + mapped_ring_buffer::MappedRingBufferBase::SharedPtr + make_mapped_ring_buffer(size_t size) const override + { + return mapped_ring_buffer::MappedRingBuffer< + MessageT, + typename Publisher::MessageAlloc + >::make_shared(size, this->get_allocator()); + } + /// Send a message to the topic for this publisher. /** * This function is templated on the input message type, MessageT. @@ -230,97 +90,56 @@ class Publisher : public PublisherBase virtual void publish(std::unique_ptr & msg) { - bool inter_process_subscriptions_exist = - get_subscription_count() > get_intra_process_subscription_count(); - if (!intra_process_is_enabled_ || inter_process_subscriptions_exist) { + if (!intra_process_is_enabled_) { this->do_inter_process_publish(msg.get()); + msg.reset(); + return; } - if (store_intra_process_message_) { - // Take the pointer from the unique_msg, release it and pass as a void * - // to the ipm. The ipm should then capture it again as a unique_ptr of - // the correct type. - // TODO(wjwwood): - // investigate how to transfer the custom deleter (if there is one) - // from the incoming unique_ptr through to the ipm's unique_ptr. - // See: http://stackoverflow.com/questions/11002641/dynamic-casting-for-unique-ptr - MessageT * msg_ptr = msg.get(); - msg.release(); - uint64_t message_seq = - store_intra_process_message_(intra_process_publisher_id_, msg_ptr, typeid(MessageT)); - rcl_interfaces::msg::IntraProcessMessage ipm; - ipm.publisher_id = intra_process_publisher_id_; - ipm.message_sequence = message_seq; - auto status = rcl_publish(&intra_process_publisher_handle_, &ipm); - if (RCL_RET_PUBLISHER_INVALID == status) { - rcl_reset_error(); // next call will reset error message if not context - if (rcl_publisher_is_valid_except_context(&intra_process_publisher_handle_)) { - rcl_context_t * context = rcl_publisher_get_context(&intra_process_publisher_handle_); - if (nullptr != context && !rcl_context_is_valid(context)) { - // publisher is invalid due to context being shutdown - return; - } - } - } - if (RCL_RET_OK != status) { - rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish intra process message"); - } + // If an interprocess subscription exist, then the unique_ptr is promoted + // to a shared_ptr and published. + // This allows doing the intraprocess publish first and then doing the + // interprocess publish, resulting in lower publish-to-subscribe latency. + // It's not possible to do that with an unique_ptr, + // as do_intra_process_publish takes the ownership of the message. + uint64_t message_seq; + bool inter_process_publish_needed = + get_subscription_count() > get_intra_process_subscription_count(); + MessageSharedPtr shared_msg; + if (inter_process_publish_needed) { + shared_msg = std::move(msg); + message_seq = + store_intra_process_message(intra_process_publisher_id_, shared_msg); } else { - // Always destroy the message, even if we don't consume it, for consistency. - msg.reset(); + message_seq = + store_intra_process_message(intra_process_publisher_id_, std::move(msg)); } - } - - virtual void - publish(const std::shared_ptr & msg) - { - // Avoid allocating when not using intra process. - if (!store_intra_process_message_) { - // In this case we're not using intra process. - return this->do_inter_process_publish(msg.get()); + this->do_intra_process_publish(message_seq); + if (inter_process_publish_needed) { + this->do_inter_process_publish(shared_msg.get()); } - // Otherwise we have to allocate memory in a unique_ptr and pass it along. - // TODO(wjwwood): - // The intra process manager should probably also be able to store - // shared_ptr's and do the "smart" thing based on other intra process - // subscriptions. For now call the other publish(). - auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1); - MessageAllocTraits::construct(*message_allocator_.get(), ptr, *msg.get()); - MessageUniquePtr unique_msg(ptr, message_deleter_); - return this->publish(unique_msg); } virtual void - publish(std::shared_ptr msg) + publish(const std::shared_ptr & msg) { - // Avoid allocating when not using intra process. - if (!store_intra_process_message_) { - // In this case we're not using intra process. - return this->do_inter_process_publish(msg.get()); - } - // Otherwise we have to allocate memory in a unique_ptr and pass it along. - // TODO(wjwwood): - // The intra process manager should probably also be able to store - // shared_ptr's and do the "smart" thing based on other intra process - // subscriptions. For now call the other publish(). - auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1); - MessageAllocTraits::construct(*message_allocator_.get(), ptr, *msg.get()); - MessageUniquePtr unique_msg(ptr, message_deleter_); - return this->publish(unique_msg); + publish(*msg); } virtual void publish(const MessageT & msg) { // Avoid allocating when not using intra process. - if (!store_intra_process_message_) { + if (!intra_process_is_enabled_) { // In this case we're not using intra process. return this->do_inter_process_publish(&msg); } // Otherwise we have to allocate memory in a unique_ptr and pass it along. + // As the message is not const, a copy should be made. + // A shared_ptr could also be constructed here. auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1); MessageAllocTraits::construct(*message_allocator_.get(), ptr, msg); MessageUniquePtr unique_msg(ptr, message_deleter_); - return this->publish(unique_msg); + this->publish(unique_msg); } virtual void @@ -335,7 +154,7 @@ class Publisher : public PublisherBase void publish(const rcl_serialized_message_t * serialized_msg) { - if (store_intra_process_message_) { + if (intra_process_is_enabled_) { // TODO(Karsten1987): support serialized message passed by intraprocess throw std::runtime_error("storing serialized messages in intra process is not supported yet"); } @@ -376,6 +195,64 @@ class Publisher : public PublisherBase } } + void + do_intra_process_publish(uint64_t message_seq) + { + rcl_interfaces::msg::IntraProcessMessage ipm; + ipm.publisher_id = intra_process_publisher_id_; + ipm.message_sequence = message_seq; + auto status = rcl_publish(&intra_process_publisher_handle_, &ipm); + if (RCL_RET_PUBLISHER_INVALID == status) { + rcl_reset_error(); // next call will reset error message if not context + if (rcl_publisher_is_valid_except_context(&intra_process_publisher_handle_)) { + rcl_context_t * context = rcl_publisher_get_context(&intra_process_publisher_handle_); + if (nullptr != context && !rcl_context_is_valid(context)) { + // publisher is invalid due to context being shutdown + return; + } + } + } + if (RCL_RET_OK != status) { + rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish intra process message"); + } + } + + uint64_t + store_intra_process_message( + uint64_t publisher_id, + std::shared_ptr msg) + { + auto ipm = weak_ipm_.lock(); + if (!ipm) { + throw std::runtime_error( + "intra process publish called after destruction of intra process manager"); + } + if (!msg) { + throw std::runtime_error("cannot publisher msg which is a null pointer"); + } + uint64_t message_seq = + ipm->template store_intra_process_message(publisher_id, msg); + return message_seq; + } + + uint64_t + store_intra_process_message( + uint64_t publisher_id, + std::unique_ptr msg) + { + auto ipm = weak_ipm_.lock(); + if (!ipm) { + throw std::runtime_error( + "intra process publish called after destruction of intra process manager"); + } + if (!msg) { + throw std::runtime_error("cannot publisher msg which is a null pointer"); + } + uint64_t message_seq = + ipm->template store_intra_process_message(publisher_id, std::move(msg)); + return message_seq; + } + std::shared_ptr message_allocator_; MessageDeleter message_deleter_; diff --git a/rclcpp/include/rclcpp/publisher_base.hpp b/rclcpp/include/rclcpp/publisher_base.hpp new file mode 100644 index 0000000000..b836ccd90e --- /dev/null +++ b/rclcpp/include/rclcpp/publisher_base.hpp @@ -0,0 +1,195 @@ +// Copyright 2014 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__PUBLISHER_BASE_HPP_ +#define RCLCPP__PUBLISHER_BASE_HPP_ + +#include +#include + +#include +#include +#include +#include +#include + +#include "rcl/publisher.h" + +#include "rclcpp/macros.hpp" +#include "rclcpp/mapped_ring_buffer.hpp" +#include "rclcpp/type_support_decl.hpp" +#include "rclcpp/visibility_control.hpp" + +namespace rclcpp +{ + +// Forward declaration is used for friend statement. +namespace node_interfaces +{ +class NodeBaseInterface; +class NodeTopicsInterface; +} + +namespace intra_process_manager +{ +/** + * IntraProcessManager is forward declared here, avoiding a circular inclusion between + * `intra_process_manager.hpp` and `publisher_base.hpp`. + */ +class IntraProcessManager; +} + +class PublisherBase +{ + friend ::rclcpp::node_interfaces::NodeTopicsInterface; + +public: + RCLCPP_SMART_PTR_DEFINITIONS(PublisherBase) + + /// Default constructor. + /** + * Typically, a publisher is not created through this method, but instead is created through a + * call to `Node::create_publisher`. + * \param[in] node_base A pointer to the NodeBaseInterface for the parent node. + * \param[in] topic The topic that this publisher publishes on. + * \param[in] type_support The type support structure for the type to be published. + * \param[in] publisher_options QoS settings for this publisher. + */ + RCLCPP_PUBLIC + PublisherBase( + rclcpp::node_interfaces::NodeBaseInterface * node_base, + const std::string & topic, + const rosidl_message_type_support_t & type_support, + const rcl_publisher_options_t & publisher_options); + + RCLCPP_PUBLIC + virtual ~PublisherBase(); + + /// Get the topic that this publisher publishes on. + /** \return The topic name. */ + RCLCPP_PUBLIC + const char * + get_topic_name() const; + + /// Get the queue size for this publisher. + /** \return The queue size. */ + RCLCPP_PUBLIC + size_t + get_queue_size() const; + + /// Get the global identifier for this publisher (used in rmw and by DDS). + /** \return The gid. */ + RCLCPP_PUBLIC + const rmw_gid_t & + get_gid() const; + + /// Get the global identifier for this publisher used by intra-process communication. + /** \return The intra-process gid. */ + RCLCPP_PUBLIC + const rmw_gid_t & + get_intra_process_gid() const; + + /// Get the rcl publisher handle. + /** \return The rcl publisher handle. */ + RCLCPP_PUBLIC + rcl_publisher_t * + get_publisher_handle(); + + /// Get the rcl publisher handle. + /** \return The rcl publisher handle. */ + RCLCPP_PUBLIC + const rcl_publisher_t * + get_publisher_handle() const; + + /// Get subscription count + /** \return The number of subscriptions. */ + RCLCPP_PUBLIC + size_t + get_subscription_count() const; + + /// Get intraprocess subscription count + /** \return The number of intraprocess subscriptions. */ + RCLCPP_PUBLIC + size_t + get_intra_process_subscription_count() const; + + /// Get the actual QoS settings, after the defaults have been determined. + /** + * The actual configuration applied when using RMW_QOS_POLICY_*_SYSTEM_DEFAULT + * can only be resolved after the creation of the publisher, and it + * depends on the underlying rmw implementation. + * If the underlying setting in use can't be represented in ROS terms, + * it will be set to RMW_QOS_POLICY_*_UNKNOWN. + * May throw runtime_error when an unexpected error occurs. + * \return The actual qos settings. + */ + RCLCPP_PUBLIC + rmw_qos_profile_t + get_actual_qos() const; + + /// Compare this publisher to a gid. + /** + * Note that this function calls the next function. + * \param[in] gid Reference to a gid. + * \return True if the publisher's gid matches the input. + */ + RCLCPP_PUBLIC + bool + operator==(const rmw_gid_t & gid) const; + + /// Compare this publisher to a pointer gid. + /** + * A wrapper for comparing this publisher's gid to the input using rmw_compare_gids_equal. + * \param[in] gid A pointer to a gid. + * \return True if this publisher's gid matches the input. + */ + RCLCPP_PUBLIC + bool + operator==(const rmw_gid_t * gid) const; + + using IntraProcessManagerSharedPtr = + std::shared_ptr; + + /// Implementation utility function that creates a typed mapped ring buffer. + RCLCPP_PUBLIC + mapped_ring_buffer::MappedRingBufferBase::SharedPtr + virtual make_mapped_ring_buffer(size_t size) const; + + /// Implementation utility function used to setup intra process publishing after creation. + RCLCPP_PUBLIC + void + setup_intra_process( + uint64_t intra_process_publisher_id, + IntraProcessManagerSharedPtr ipm, + const rcl_publisher_options_t & intra_process_options); + +protected: + std::shared_ptr rcl_node_handle_; + + rcl_publisher_t publisher_handle_ = rcl_get_zero_initialized_publisher(); + rcl_publisher_t intra_process_publisher_handle_ = rcl_get_zero_initialized_publisher(); + + using IntraProcessManagerWeakPtr = + std::weak_ptr; + bool intra_process_is_enabled_; + IntraProcessManagerWeakPtr weak_ipm_; + uint64_t intra_process_publisher_id_; + + rmw_gid_t rmw_gid_; + rmw_gid_t intra_process_rmw_gid_; +}; + +} // namespace rclcpp + +#endif // RCLCPP__PUBLISHER_BASE_HPP_ diff --git a/rclcpp/include/rclcpp/publisher_factory.hpp b/rclcpp/include/rclcpp/publisher_factory.hpp index 4044d1a6cc..aadb138dcd 100644 --- a/rclcpp/include/rclcpp/publisher_factory.hpp +++ b/rclcpp/include/rclcpp/publisher_factory.hpp @@ -52,24 +52,6 @@ struct PublisherFactory rcl_publisher_options_t & publisher_options)>; PublisherFactoryFunction create_typed_publisher; - - // Adds the PublisherBase to the intraprocess manager with the correctly - // templated call to IntraProcessManager::store_intra_process_message. - using AddPublisherToIntraProcessManagerFunction = std::function< - uint64_t( - rclcpp::intra_process_manager::IntraProcessManager * ipm, - rclcpp::PublisherBase::SharedPtr publisher)>; - - AddPublisherToIntraProcessManagerFunction add_publisher_to_intra_process_manager; - - // Creates the callback function which is called on each - // PublisherT::publish() and which handles the intra process transmission of - // the message being published. - using SharedPublishCallbackFactoryFunction = std::function< - rclcpp::PublisherBase::StoreMessageCallbackT( - rclcpp::intra_process_manager::IntraProcessManager::SharedPtr ipm)>; - - SharedPublishCallbackFactoryFunction create_shared_publish_callback; }; /// Return a PublisherFactory with functions setup for creating a PublisherT. @@ -92,53 +74,6 @@ create_publisher_factory(std::shared_ptr allocator) return std::make_shared(node_base, topic_name, publisher_options, message_alloc); }; - // function to add a publisher to the intra process manager - factory.add_publisher_to_intra_process_manager = - []( - rclcpp::intra_process_manager::IntraProcessManager * ipm, - rclcpp::PublisherBase::SharedPtr publisher) -> uint64_t - { - return ipm->add_publisher(std::dynamic_pointer_cast(publisher)); - }; - - // function to create a shared publish callback std::function - using StoreMessageCallbackT = rclcpp::PublisherBase::StoreMessageCallbackT; - factory.create_shared_publish_callback = - [](rclcpp::intra_process_manager::IntraProcessManager::SharedPtr ipm) -> StoreMessageCallbackT - { - rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = ipm; - - // this function is called on each call to publish() and handles storing - // of the published message in the intra process manager - auto shared_publish_callback = - [weak_ipm](uint64_t publisher_id, void * msg, const std::type_info & type_info) -> uint64_t - { - auto ipm = weak_ipm.lock(); - if (!ipm) { - // TODO(wjwwood): should this just return silently? Or maybe return with a warning? - throw std::runtime_error( - "intra process publish called after destruction of intra process manager"); - } - if (!msg) { - throw std::runtime_error("cannot publisher msg which is a null pointer"); - } - auto & message_type_info = typeid(MessageT); - if (message_type_info != type_info) { - throw std::runtime_error( - std::string("published type '") + type_info.name() + - "' is incompatible from the publisher type '" + message_type_info.name() + "'"); - } - MessageT * typed_message_ptr = static_cast(msg); - using MessageDeleter = typename Publisher::MessageDeleter; - std::unique_ptr unique_msg(typed_message_ptr); - uint64_t message_seq = - ipm->store_intra_process_message(publisher_id, unique_msg); - return message_seq; - }; - - return shared_publish_callback; - }; - // return the factory now that it is populated return factory; } diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 2a2a3e4eef..bda8db693f 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -23,6 +23,8 @@ #include #include #include +#include + #include "rcl/error_handling.h" #include "rcl/subscription.h" @@ -32,8 +34,11 @@ #include "rclcpp/any_subscription_callback.hpp" #include "rclcpp/exceptions.hpp" #include "rclcpp/expand_topic_or_service_name.hpp" +#include "rclcpp/intra_process_manager.hpp" +#include "rclcpp/logging.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/message_memory_strategy.hpp" +#include "rclcpp/subscription_base.hpp" #include "rclcpp/subscription_traits.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/visibility_control.hpp" @@ -46,122 +51,6 @@ namespace node_interfaces class NodeTopicsInterface; } // namespace node_interfaces -namespace intra_process_manager -{ -/** - * NOTE(ivanpauno): IntraProcessManager is forward declared here, avoiding a circular inclusion between intra_process_manager.hpp and publisher.hpp. - * SharedPtr and WeakPtr of the IntraProcessManager are defined again here, to avoid a warning for accessing a member of a forward declared class. - */ -class IntraProcessManager; -} - -/// Virtual base class for subscriptions. This pattern allows us to iterate over different template -/// specializations of Subscription, among other things. -class SubscriptionBase -{ -public: - RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(SubscriptionBase) - - /// Default constructor. - /** - * \param[in] node_handle The rcl representation of the node that owns this subscription. - * \param[in] type_support_handle rosidl type support struct, for the Message type of the topic. - * \param[in] topic_name Name of the topic to subscribe to. - * \param[in] subscription_options options for the subscription. - * \param[in] is_serialized is true if the message will be delivered still serialized - */ - RCLCPP_PUBLIC - SubscriptionBase( - std::shared_ptr node_handle, - const rosidl_message_type_support_t & type_support_handle, - const std::string & topic_name, - const rcl_subscription_options_t & subscription_options, - bool is_serialized = false); - - /// Default destructor. - RCLCPP_PUBLIC - virtual ~SubscriptionBase(); - - /// Get the topic that this subscription is subscribed on. - RCLCPP_PUBLIC - const char * - get_topic_name() const; - - RCLCPP_PUBLIC - std::shared_ptr - get_subscription_handle(); - - RCLCPP_PUBLIC - const std::shared_ptr - get_subscription_handle() const; - - RCLCPP_PUBLIC - virtual const std::shared_ptr - get_intra_process_subscription_handle() const; - - /// Borrow a new message. - /** \return Shared pointer to the fresh message. */ - virtual std::shared_ptr - create_message() = 0; - - /// Borrow a new serialized message - /** \return Shared pointer to a rcl_message_serialized_t. */ - virtual std::shared_ptr - create_serialized_message() = 0; - - /// Check if we need to handle the message, and execute the callback if we do. - /** - * \param[in] message Shared pointer to the message to handle. - * \param[in] message_info Metadata associated with this message. - */ - virtual void - handle_message(std::shared_ptr & message, const rmw_message_info_t & message_info) = 0; - - /// Return the message borrowed in create_message. - /** \param[in] message Shared pointer to the returned message. */ - virtual void - return_message(std::shared_ptr & message) = 0; - - /// Return the message borrowed in create_serialized_message. - /** \param[in] message Shared pointer to the returned message. */ - virtual void - return_serialized_message(std::shared_ptr & message) = 0; - - virtual void - handle_intra_process_message( - rcl_interfaces::msg::IntraProcessMessage & ipm, - const rmw_message_info_t & message_info) = 0; - - const rosidl_message_type_support_t & - get_message_type_support_handle() const; - - bool - is_serialized() const; - - /// Get matching publisher count - /** \return The number of publishers on this topic. */ - RCLCPP_PUBLIC - size_t - get_publisher_count() const; - -protected: - std::shared_ptr intra_process_subscription_handle_; - std::shared_ptr subscription_handle_; - std::shared_ptr node_handle_; - - using IntraProcessManagerWeakPtr = - std::weak_ptr; - bool use_intra_process_; - IntraProcessManagerWeakPtr weak_ipm_; - uint64_t intra_process_subscription_id_; - -private: - RCLCPP_DISABLE_COPY(SubscriptionBase) - - rosidl_message_type_support_t type_support_; - bool is_serialized_; -}; - /// Subscription implementation, templated on the type of message this subscription receives. template< typename CallbackMessageT, @@ -174,6 +63,7 @@ class Subscription : public SubscriptionBase using MessageAllocTraits = allocator::AllocRebind; using MessageAlloc = typename MessageAllocTraits::allocator_type; using MessageDeleter = allocator::Deleter; + using ConstMessageSharedPtr = std::shared_ptr; using MessageUniquePtr = std::unique_ptr; RCLCPP_SMART_PTR_DEFINITIONS(Subscription) @@ -205,9 +95,7 @@ class Subscription : public SubscriptionBase subscription_options, rclcpp::subscription_traits::is_serialized_subscription_argument::value), any_callback_(callback), - message_memory_strategy_(memory_strategy), - get_intra_process_message_callback_(nullptr), - matches_any_intra_process_publishers_(nullptr) + message_memory_strategy_(memory_strategy) {} /// Support dynamically setting the message memory strategy. @@ -238,12 +126,10 @@ class Subscription : public SubscriptionBase void handle_message(std::shared_ptr & message, const rmw_message_info_t & message_info) { - if (matches_any_intra_process_publishers_) { - if (matches_any_intra_process_publishers_(&message_info.publisher_gid)) { - // In this case, the message will be delivered via intra process and - // we should ignore this copy of the message. - return; - } + if (matches_any_intra_process_publishers(&message_info.publisher_gid)) { + // In this case, the message will be delivered via intra process and + // we should ignore this copy of the message. + return; } auto typed_message = std::static_pointer_cast(message); any_callback_.dispatch(typed_message, message_info); @@ -266,89 +152,109 @@ class Subscription : public SubscriptionBase rcl_interfaces::msg::IntraProcessMessage & ipm, const rmw_message_info_t & message_info) { - if (!get_intra_process_message_callback_) { + if (!use_intra_process_) { // throw std::runtime_error( // "handle_intra_process_message called before setup_intra_process"); // TODO(wjwwood): for now, this could mean that intra process was just not enabled. // However, this can only really happen if this node has it disabled, but the other doesn't. return; } - MessageUniquePtr msg; - get_intra_process_message_callback_( - ipm.publisher_id, - ipm.message_sequence, - intra_process_subscription_id_, - msg); - if (!msg) { - // This either occurred because the publisher no longer exists or the - // message requested is no longer being stored. - // TODO(wjwwood): should we notify someone of this? log error, log warning? - return; - } - any_callback_.dispatch_intra_process(msg, message_info); - } - - using GetMessageCallbackType = - std::function; - using MatchesAnyPublishersCallbackType = std::function; - - /// Implemenation detail. - // TODO(ivanpauno): This can be moved to the base class. No reason to be here. - // Also get_intra_process_message_callback_ and matches_any_intra_process_publishers_. - void setup_intra_process( - uint64_t intra_process_subscription_id, - GetMessageCallbackType get_message_callback, - MatchesAnyPublishersCallbackType matches_any_publisher_callback, - IntraProcessManagerWeakPtr weak_ipm, - const rcl_subscription_options_t & intra_process_options) - { - std::string intra_process_topic_name = std::string(get_topic_name()) + "/_intra"; - rcl_ret_t ret = rcl_subscription_init( - intra_process_subscription_handle_.get(), - node_handle_.get(), - rclcpp::type_support::get_intra_process_message_msg_type_support(), - intra_process_topic_name.c_str(), - &intra_process_options); - if (ret != RCL_RET_OK) { - if (ret == RCL_RET_TOPIC_NAME_INVALID) { - auto rcl_node_handle = node_handle_.get(); - // this will throw on any validation problem - rcl_reset_error(); - expand_topic_or_service_name( - intra_process_topic_name, - rcl_node_get_name(rcl_node_handle), - rcl_node_get_namespace(rcl_node_handle)); + if (any_callback_.use_take_shared_method()) { + ConstMessageSharedPtr msg; + take_intra_process_message( + ipm.publisher_id, + ipm.message_sequence, + intra_process_subscription_id_, + msg); + if (!msg) { + // This either occurred because the publisher no longer exists or the + // message requested is no longer being stored. + RCLCPP_WARN(get_logger("rclcpp"), + "Intra process message not longer being stored when trying to handle it"); + return; } - - rclcpp::exceptions::throw_from_rcl_error(ret, "could not create intra process subscription"); + any_callback_.dispatch_intra_process(msg, message_info); + } else { + MessageUniquePtr msg; + take_intra_process_message( + ipm.publisher_id, + ipm.message_sequence, + intra_process_subscription_id_, + msg); + if (!msg) { + // This either occurred because the publisher no longer exists or the + // message requested is no longer being stored. + RCLCPP_WARN(get_logger("rclcpp"), + "Intra process message not longer being stored when trying to handle it"); + return; + } + any_callback_.dispatch_intra_process(std::move(msg), message_info); } - - intra_process_subscription_id_ = intra_process_subscription_id; - get_intra_process_message_callback_ = get_message_callback; - matches_any_intra_process_publishers_ = matches_any_publisher_callback; - weak_ipm_ = weak_ipm; - use_intra_process_ = true; } /// Implemenation detail. const std::shared_ptr get_intra_process_subscription_handle() const { - if (!get_intra_process_message_callback_) { + if (!use_intra_process_) { return nullptr; } return intra_process_subscription_handle_; } private: + void + take_intra_process_message( + uint64_t publisher_id, + uint64_t message_sequence, + uint64_t subscription_id, + MessageUniquePtr & message) + { + auto ipm = weak_ipm_.lock(); + if (!ipm) { + throw std::runtime_error( + "intra process take called after destruction of intra process manager"); + } + ipm->template take_intra_process_message( + publisher_id, message_sequence, subscription_id, message); + } + + void + take_intra_process_message( + uint64_t publisher_id, + uint64_t message_sequence, + uint64_t subscription_id, + ConstMessageSharedPtr & message) + { + auto ipm = weak_ipm_.lock(); + if (!ipm) { + throw std::runtime_error( + "intra process take called after destruction of intra process manager"); + } + ipm->template take_intra_process_message( + publisher_id, message_sequence, subscription_id, message); + } + + bool + matches_any_intra_process_publishers(const rmw_gid_t * sender_gid) + { + if (!use_intra_process_) { + return false; + } + auto ipm = weak_ipm_.lock(); + if (!ipm) { + throw std::runtime_error( + "intra process publisher check called " + "after destruction of intra process manager"); + } + return ipm->matches_any_publishers(sender_gid); + } + RCLCPP_DISABLE_COPY(Subscription) AnySubscriptionCallback any_callback_; typename message_memory_strategy::MessageMemoryStrategy::SharedPtr message_memory_strategy_; - - GetMessageCallbackType get_intra_process_message_callback_; - MatchesAnyPublishersCallbackType matches_any_intra_process_publishers_; }; } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/subscription_base.hpp b/rclcpp/include/rclcpp/subscription_base.hpp new file mode 100644 index 0000000000..73b0d48c38 --- /dev/null +++ b/rclcpp/include/rclcpp/subscription_base.hpp @@ -0,0 +1,165 @@ +// Copyright 2019 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__SUBSCRIPTION_BASE_HPP_ +#define RCLCPP__SUBSCRIPTION_BASE_HPP_ + +#include +#include + +#include "rcl/subscription.h" + +#include "rcl_interfaces/msg/intra_process_message.hpp" + +#include "rmw/rmw.h" + +#include "rclcpp/any_subscription_callback.hpp" +#include "rclcpp/macros.hpp" +#include "rclcpp/type_support_decl.hpp" +#include "rclcpp/visibility_control.hpp" + +namespace rclcpp +{ + +namespace node_interfaces +{ +class NodeTopicsInterface; +} // namespace node_interfaces + +namespace intra_process_manager +{ +/** + * IntraProcessManager is forward declared here, avoiding a circular inclusion between + * `intra_process_manager.hpp` and `subscription_base.hpp`. + */ +class IntraProcessManager; +} + +/// Virtual base class for subscriptions. This pattern allows us to iterate over different template +/// specializations of Subscription, among other things. +class SubscriptionBase +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(SubscriptionBase) + + /// Default constructor. + /** + * \param[in] node_handle The rcl representation of the node that owns this subscription. + * \param[in] type_support_handle rosidl type support struct, for the Message type of the topic. + * \param[in] topic_name Name of the topic to subscribe to. + * \param[in] subscription_options options for the subscription. + * \param[in] is_serialized is true if the message will be delivered still serialized + */ + RCLCPP_PUBLIC + SubscriptionBase( + std::shared_ptr node_handle, + const rosidl_message_type_support_t & type_support_handle, + const std::string & topic_name, + const rcl_subscription_options_t & subscription_options, + bool is_serialized = false); + + /// Default destructor. + RCLCPP_PUBLIC + virtual ~SubscriptionBase(); + + /// Get the topic that this subscription is subscribed on. + RCLCPP_PUBLIC + const char * + get_topic_name() const; + + RCLCPP_PUBLIC + std::shared_ptr + get_subscription_handle(); + + RCLCPP_PUBLIC + const std::shared_ptr + get_subscription_handle() const; + + RCLCPP_PUBLIC + virtual const std::shared_ptr + get_intra_process_subscription_handle() const; + + /// Borrow a new message. + /** \return Shared pointer to the fresh message. */ + virtual std::shared_ptr + create_message() = 0; + + /// Borrow a new serialized message + /** \return Shared pointer to a rcl_message_serialized_t. */ + virtual std::shared_ptr + create_serialized_message() = 0; + + /// Check if we need to handle the message, and execute the callback if we do. + /** + * \param[in] message Shared pointer to the message to handle. + * \param[in] message_info Metadata associated with this message. + */ + virtual void + handle_message(std::shared_ptr & message, const rmw_message_info_t & message_info) = 0; + + /// Return the message borrowed in create_message. + /** \param[in] message Shared pointer to the returned message. */ + virtual void + return_message(std::shared_ptr & message) = 0; + + /// Return the message borrowed in create_serialized_message. + /** \param[in] message Shared pointer to the returned message. */ + virtual void + return_serialized_message(std::shared_ptr & message) = 0; + + virtual void + handle_intra_process_message( + rcl_interfaces::msg::IntraProcessMessage & ipm, + const rmw_message_info_t & message_info) = 0; + + const rosidl_message_type_support_t & + get_message_type_support_handle() const; + + bool + is_serialized() const; + + /// Get matching publisher count. + /** \return The number of publishers on this topic. */ + RCLCPP_PUBLIC + size_t + get_publisher_count() const; + + using IntraProcessManagerWeakPtr = + std::weak_ptr; + + /// Implemenation detail. + void setup_intra_process( + uint64_t intra_process_subscription_id, + IntraProcessManagerWeakPtr weak_ipm, + const rcl_subscription_options_t & intra_process_options); + +protected: + std::shared_ptr intra_process_subscription_handle_; + std::shared_ptr subscription_handle_; + std::shared_ptr node_handle_; + + bool use_intra_process_; + IntraProcessManagerWeakPtr weak_ipm_; + uint64_t intra_process_subscription_id_; + +private: + RCLCPP_DISABLE_COPY(SubscriptionBase) + + rosidl_message_type_support_t type_support_; + bool is_serialized_; +}; + +} // namespace rclcpp + +#endif // RCLCPP__SUBSCRIPTION_BASE_HPP_ diff --git a/rclcpp/include/rclcpp/subscription_factory.hpp b/rclcpp/include/rclcpp/subscription_factory.hpp index e2bdec7ef2..b479483244 100644 --- a/rclcpp/include/rclcpp/subscription_factory.hpp +++ b/rclcpp/include/rclcpp/subscription_factory.hpp @@ -114,64 +114,6 @@ create_subscription_factory( return sub_base_ptr; }; - // function that will setup intra process communications for the subscription - factory.setup_intra_process = - [message_alloc]( - rclcpp::intra_process_manager::IntraProcessManager::SharedPtr ipm, - rclcpp::SubscriptionBase::SharedPtr subscription, - const rcl_subscription_options_t & subscription_options) - { - rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = ipm; - uint64_t intra_process_subscription_id = ipm->add_subscription(subscription); - - auto intra_process_options = rcl_subscription_get_default_options(); - intra_process_options.allocator = rclcpp::allocator::get_rcl_allocator( - *message_alloc.get()); - intra_process_options.qos = subscription_options.qos; - intra_process_options.ignore_local_publications = false; - - // function that will be called to take a MessageT from the intra process manager - auto take_intra_process_message_func = - [weak_ipm]( - uint64_t publisher_id, - uint64_t message_sequence, - uint64_t subscription_id, - typename rclcpp::Subscription::MessageUniquePtr & message) - { - auto ipm = weak_ipm.lock(); - if (!ipm) { - // TODO(wjwwood): should this just return silently? Or return with a logged warning? - throw std::runtime_error( - "intra process take called after destruction of intra process manager"); - } - ipm->take_intra_process_message( - publisher_id, message_sequence, subscription_id, message); - }; - - // function that is called to see if the publisher id matches any local publishers - auto matches_any_publisher_func = - [weak_ipm](const rmw_gid_t * sender_gid) -> bool - { - auto ipm = weak_ipm.lock(); - if (!ipm) { - throw std::runtime_error( - "intra process publisher check called " - "after destruction of intra process manager"); - } - return ipm->matches_any_publishers(sender_gid); - }; - - auto typed_sub_ptr = std::dynamic_pointer_cast(subscription); - typed_sub_ptr->setup_intra_process( - intra_process_subscription_id, - take_intra_process_message_func, - matches_any_publisher_func, - weak_ipm, - intra_process_options - ); - }; - // end definition of factory function to setup intra process - // return the factory now that it is populated return factory; } diff --git a/rclcpp/src/rclcpp/intra_process_manager.cpp b/rclcpp/src/rclcpp/intra_process_manager.cpp index 19ac424b9d..56e30ab3c0 100644 --- a/rclcpp/src/rclcpp/intra_process_manager.cpp +++ b/rclcpp/src/rclcpp/intra_process_manager.cpp @@ -29,6 +29,21 @@ IntraProcessManager::IntraProcessManager( IntraProcessManager::~IntraProcessManager() {} +uint64_t +IntraProcessManager::add_publisher( + rclcpp::PublisherBase::SharedPtr publisher, + size_t buffer_size) +{ + auto id = IntraProcessManager::get_next_unique_id(); + size_t size = buffer_size > 0 ? buffer_size : publisher->get_queue_size(); + auto mrb = publisher->make_mapped_ring_buffer(size); + impl_->add_publisher(id, publisher, mrb, size); + if (!mrb) { + throw std::runtime_error("failed to create a mapped ring buffer"); + } + return id; +} + uint64_t IntraProcessManager::add_subscription( rclcpp::SubscriptionBase::SharedPtr subscription) diff --git a/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp b/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp index b380e3c2ec..bbb54b2b4a 100644 --- a/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp +++ b/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp @@ -47,13 +47,9 @@ NodeTopics::create_publisher( // Get the intra process manager instance for this context. auto ipm = context->get_sub_context(); // Register the publisher with the intra process manager. - uint64_t intra_process_publisher_id = - publisher_factory.add_publisher_to_intra_process_manager(ipm.get(), publisher); - // Create a function to be called when publisher to do the intra process publish. - auto shared_publish_callback = publisher_factory.create_shared_publish_callback(ipm); + uint64_t intra_process_publisher_id = ipm->add_publisher(publisher); publisher->setup_intra_process( intra_process_publisher_id, - shared_publish_callback, ipm, publisher_options); } @@ -93,10 +89,11 @@ NodeTopics::create_subscription( // Setup intra process publishing if requested. if (use_intra_process) { auto context = node_base_->get_context(); - auto intra_process_manager = + auto ipm = context->get_sub_context(); - subscription_factory.setup_intra_process( - intra_process_manager, subscription, subscription_options); + uint64_t intra_process_subscription_id = ipm->add_subscription(subscription); + subscription_options.ignore_local_publications = false; + subscription->setup_intra_process(intra_process_subscription_id, ipm, subscription_options); } // Return the completed subscription. diff --git a/rclcpp/src/rclcpp/publisher.cpp b/rclcpp/src/rclcpp/publisher_base.cpp similarity index 97% rename from rclcpp/src/rclcpp/publisher.cpp rename to rclcpp/src/rclcpp/publisher_base.cpp index a580bb56d0..09997bac4e 100644 --- a/rclcpp/src/rclcpp/publisher.cpp +++ b/rclcpp/src/rclcpp/publisher_base.cpp @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "rclcpp/publisher.hpp" +#include "rclcpp/publisher_base.hpp" #include #include @@ -43,8 +43,7 @@ PublisherBase::PublisherBase( const rosidl_message_type_support_t & type_support, const rcl_publisher_options_t & publisher_options) : rcl_node_handle_(node_base->get_shared_rcl_node_handle()), - intra_process_is_enabled_(false), intra_process_publisher_id_(0), - store_intra_process_message_(nullptr) + intra_process_is_enabled_(false), intra_process_publisher_id_(0) { rcl_ret_t ret = rcl_publisher_init( &publisher_handle_, @@ -235,10 +234,16 @@ PublisherBase::operator==(const rmw_gid_t * gid) const return result; } +rclcpp::mapped_ring_buffer::MappedRingBufferBase::SharedPtr +PublisherBase::make_mapped_ring_buffer(size_t size) const +{ + (void)size; + return nullptr; +} + void PublisherBase::setup_intra_process( uint64_t intra_process_publisher_id, - StoreMessageCallbackT store_callback, IntraProcessManagerSharedPtr ipm, const rcl_publisher_options_t & intra_process_options) { @@ -275,7 +280,6 @@ PublisherBase::setup_intra_process( } intra_process_publisher_id_ = intra_process_publisher_id; - store_intra_process_message_ = store_callback; weak_ipm_ = ipm; intra_process_is_enabled_ = true; diff --git a/rclcpp/src/rclcpp/subscription.cpp b/rclcpp/src/rclcpp/subscription_base.cpp similarity index 78% rename from rclcpp/src/rclcpp/subscription.cpp rename to rclcpp/src/rclcpp/subscription_base.cpp index ed26acaf17..23ed7e5e0f 100644 --- a/rclcpp/src/rclcpp/subscription.cpp +++ b/rclcpp/src/rclcpp/subscription_base.cpp @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "rclcpp/subscription.hpp" +#include "rclcpp/subscription_base.hpp" #include #include @@ -147,3 +147,34 @@ SubscriptionBase::get_publisher_count() const } return inter_process_publisher_count; } + +void SubscriptionBase::setup_intra_process( + uint64_t intra_process_subscription_id, + IntraProcessManagerWeakPtr weak_ipm, + const rcl_subscription_options_t & intra_process_options) +{ + std::string intra_process_topic_name = std::string(get_topic_name()) + "/_intra"; + rcl_ret_t ret = rcl_subscription_init( + intra_process_subscription_handle_.get(), + node_handle_.get(), + rclcpp::type_support::get_intra_process_message_msg_type_support(), + intra_process_topic_name.c_str(), + &intra_process_options); + if (ret != RCL_RET_OK) { + if (ret == RCL_RET_TOPIC_NAME_INVALID) { + auto rcl_node_handle = node_handle_.get(); + // this will throw on any validation problem + rcl_reset_error(); + expand_topic_or_service_name( + intra_process_topic_name, + rcl_node_get_name(rcl_node_handle), + rcl_node_get_namespace(rcl_node_handle)); + } + + rclcpp::exceptions::throw_from_rcl_error(ret, "could not create intra process subscription"); + } + + intra_process_subscription_id_ = intra_process_subscription_id; + weak_ipm_ = weak_ipm; + use_intra_process_ = true; +} diff --git a/rclcpp/test/test_intra_process_manager.cpp b/rclcpp/test/test_intra_process_manager.cpp index 5128ece832..e446aa1679 100644 --- a/rclcpp/test/test_intra_process_manager.cpp +++ b/rclcpp/test/test_intra_process_manager.cpp @@ -14,10 +14,13 @@ #include #include +#include +#define RCLCPP_BUILDING_LIBRARY 1 #include "gtest/gtest.h" #include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/macros.hpp" +#include "rclcpp/mapped_ring_buffer.hpp" #include "rmw/types.h" // Mock up publisher and subscription base to avoid needing an rmw impl. @@ -50,6 +53,14 @@ class PublisherBase return false; } + virtual + mapped_ring_buffer::MappedRingBufferBase::SharedPtr + make_mapped_ring_buffer(size_t size) const + { + (void)size; + return nullptr; + } + std::string mock_topic_name; size_t mock_queue_size; }; @@ -71,6 +82,15 @@ class Publisher : public PublisherBase allocator_ = std::make_shared(); } + mapped_ring_buffer::MappedRingBufferBase::SharedPtr + make_mapped_ring_buffer(size_t size) const override + { + return mapped_ring_buffer::MappedRingBuffer< + T, + typename Publisher::MessageAlloc + >::make_shared(size, allocator_); + } + std::shared_ptr get_allocator() { return allocator_; @@ -109,10 +129,9 @@ class SubscriptionBase } // namespace mock } // namespace rclcpp -// Prevent rclcpp/publisher.hpp and rclcpp/subscription.hpp from being imported. -#define RCLCPP__PUBLISHER_HPP_ -#define RCLCPP__SUBSCRIPTION_HPP_ -#define RCLCPP_BUILDING_LIBRARY 1 +// Prevent rclcpp/publisher_base.hpp and rclcpp/subscription.hpp from being imported. +#define RCLCPP__PUBLISHER_BASE_HPP_ +#define RCLCPP__SUBSCRIPTION_BASE_HPP_ // Force ipm to use our mock publisher class. #define Publisher mock::Publisher #define PublisherBase mock::PublisherBase @@ -155,10 +174,8 @@ TEST(TestIntraProcessManager, nominal) { s1->mock_topic_name = "nominal1"; s1->mock_queue_size = 10; - auto p1_id = - ipm.add_publisher>(p1); - auto p2_id = - ipm.add_publisher>(p2); + auto p1_id = ipm.add_publisher(p1); + auto p2_id = ipm.add_publisher(p2); auto s1_id = ipm.add_subscription(s1); auto ipm_msg = std::make_shared(); @@ -169,14 +186,14 @@ TEST(TestIntraProcessManager, nominal) { ); auto p1_m1_original_address = unique_msg.get(); - auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); + auto p1_m1_id = ipm.store_intra_process_message(p1_id, std::move(unique_msg)); ASSERT_EQ(nullptr, unique_msg); ipm_msg->message_sequence = 43; ipm_msg->publisher_id = 43; unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); - auto p2_m1_id = ipm.store_intra_process_message(p2_id, unique_msg); + auto p2_m1_id = ipm.store_intra_process_message(p2_id, std::move(unique_msg)); ASSERT_EQ(nullptr, unique_msg); ipm.take_intra_process_message(p2_id, p2_m1_id, s1_id, unique_msg); @@ -198,26 +215,22 @@ TEST(TestIntraProcessManager, nominal) { ipm_msg->publisher_id = 44; unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); - ipm.store_intra_process_message(p1_id, unique_msg); + ipm.store_intra_process_message(p1_id, std::move(unique_msg)); ASSERT_EQ(nullptr, unique_msg); ipm_msg->message_sequence = 45; ipm_msg->publisher_id = 45; unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); - ipm.store_intra_process_message(p1_id, unique_msg); + ipm.store_intra_process_message(p1_id, std::move(unique_msg)); ASSERT_EQ(nullptr, unique_msg); ipm_msg->message_sequence = 46; ipm_msg->publisher_id = 46; unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); - ipm.store_intra_process_message(p1_id, unique_msg); - ASSERT_NE(nullptr, unique_msg); - if (unique_msg) { - EXPECT_EQ(44ul, unique_msg->message_sequence); - EXPECT_EQ(44ul, unique_msg->publisher_id); - } + ipm.store_intra_process_message(p1_id, std::move(unique_msg)); + ASSERT_EQ(nullptr, unique_msg); } /* @@ -240,8 +253,7 @@ TEST(TestIntraProcessManager, remove_publisher_before_trying_to_take) { s1->mock_topic_name = "nominal1"; s1->mock_queue_size = 10; - auto p1_id = - ipm.add_publisher>(p1); + auto p1_id = ipm.add_publisher(p1); auto s1_id = ipm.add_subscription(s1); auto ipm_msg = std::make_shared(); @@ -251,7 +263,7 @@ TEST(TestIntraProcessManager, remove_publisher_before_trying_to_take) { new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg) ); - auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); + auto p1_m1_id = ipm.store_intra_process_message(p1_id, std::move(unique_msg)); ASSERT_EQ(nullptr, unique_msg); ipm.remove_publisher(p1_id); @@ -290,8 +302,7 @@ TEST(TestIntraProcessManager, removed_subscription_affects_take) { s3->mock_topic_name = "nominal1"; s3->mock_queue_size = 10; - auto p1_id = - ipm.add_publisher>(p1); + auto p1_id = ipm.add_publisher(p1); auto s1_id = ipm.add_subscription(s1); auto s2_id = ipm.add_subscription(s2); auto s3_id = ipm.add_subscription(s3); @@ -304,7 +315,7 @@ TEST(TestIntraProcessManager, removed_subscription_affects_take) { ); auto original_message_pointer = unique_msg.get(); - auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); + auto p1_m1_id = ipm.store_intra_process_message(p1_id, std::move(unique_msg)); ASSERT_EQ(nullptr, unique_msg); ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg); @@ -361,8 +372,7 @@ TEST(TestIntraProcessManager, multiple_subscriptions_one_publisher) { s3->mock_topic_name = "nominal1"; s3->mock_queue_size = 10; - auto p1_id = - ipm.add_publisher>(p1); + auto p1_id = ipm.add_publisher(p1); auto s1_id = ipm.add_subscription(s1); auto s2_id = ipm.add_subscription(s2); auto s3_id = ipm.add_subscription(s3); @@ -375,7 +385,7 @@ TEST(TestIntraProcessManager, multiple_subscriptions_one_publisher) { ); auto original_message_pointer = unique_msg.get(); - auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); + auto p1_m1_id = ipm.store_intra_process_message(p1_id, std::move(unique_msg)); ASSERT_EQ(nullptr, unique_msg); ipm.take_intra_process_message(p1_id, p1_m1_id, s1_id, unique_msg); @@ -437,12 +447,9 @@ TEST(TestIntraProcessManager, multiple_publishers_one_subscription) { s1->mock_topic_name = "nominal1"; s1->mock_queue_size = 10; - auto p1_id = - ipm.add_publisher>(p1); - auto p2_id = - ipm.add_publisher>(p2); - auto p3_id = - ipm.add_publisher>(p3); + auto p1_id = ipm.add_publisher(p1); + auto p2_id = ipm.add_publisher(p2); + auto p3_id = ipm.add_publisher(p3); auto s1_id = ipm.add_subscription(s1); auto ipm_msg = std::make_shared(); @@ -454,7 +461,7 @@ TEST(TestIntraProcessManager, multiple_publishers_one_subscription) { ); auto original_message_pointer1 = unique_msg.get(); - auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); + auto p1_m1_id = ipm.store_intra_process_message(p1_id, std::move(unique_msg)); ASSERT_EQ(nullptr, unique_msg); // Second publish @@ -463,7 +470,7 @@ TEST(TestIntraProcessManager, multiple_publishers_one_subscription) { unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); auto original_message_pointer2 = unique_msg.get(); - auto p2_m1_id = ipm.store_intra_process_message(p2_id, unique_msg); + auto p2_m1_id = ipm.store_intra_process_message(p2_id, std::move(unique_msg)); ASSERT_EQ(nullptr, unique_msg); // Third publish @@ -472,7 +479,7 @@ TEST(TestIntraProcessManager, multiple_publishers_one_subscription) { unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); auto original_message_pointer3 = unique_msg.get(); - auto p3_m1_id = ipm.store_intra_process_message(p3_id, unique_msg); + auto p3_m1_id = ipm.store_intra_process_message(p3_id, std::move(unique_msg)); ASSERT_EQ(nullptr, unique_msg); // First take @@ -545,12 +552,9 @@ TEST(TestIntraProcessManager, multiple_publishers_multiple_subscription) { s3->mock_topic_name = "nominal1"; s3->mock_queue_size = 10; - auto p1_id = - ipm.add_publisher>(p1); - auto p2_id = - ipm.add_publisher>(p2); - auto p3_id = - ipm.add_publisher>(p3); + auto p1_id = ipm.add_publisher(p1); + auto p2_id = ipm.add_publisher(p2); + auto p3_id = ipm.add_publisher(p3); auto s1_id = ipm.add_subscription(s1); auto s2_id = ipm.add_subscription(s2); auto s3_id = ipm.add_subscription(s3); @@ -564,7 +568,7 @@ TEST(TestIntraProcessManager, multiple_publishers_multiple_subscription) { ); auto original_message_pointer1 = unique_msg.get(); - auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); + auto p1_m1_id = ipm.store_intra_process_message(p1_id, std::move(unique_msg)); ASSERT_EQ(nullptr, unique_msg); // Second publish @@ -573,7 +577,7 @@ TEST(TestIntraProcessManager, multiple_publishers_multiple_subscription) { unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); auto original_message_pointer2 = unique_msg.get(); - auto p2_m1_id = ipm.store_intra_process_message(p2_id, unique_msg); + auto p2_m1_id = ipm.store_intra_process_message(p2_id, std::move(unique_msg)); ASSERT_EQ(nullptr, unique_msg); // Third publish @@ -582,7 +586,7 @@ TEST(TestIntraProcessManager, multiple_publishers_multiple_subscription) { unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); auto original_message_pointer3 = unique_msg.get(); - auto p3_m1_id = ipm.store_intra_process_message(p3_id, unique_msg); + auto p3_m1_id = ipm.store_intra_process_message(p3_id, std::move(unique_msg)); ASSERT_EQ(nullptr, unique_msg); // First take @@ -692,8 +696,7 @@ TEST(TestIntraProcessManager, ring_buffer_displacement) { s1->mock_topic_name = "nominal1"; s1->mock_queue_size = 10; - auto p1_id = - ipm.add_publisher>(p1); + auto p1_id = ipm.add_publisher(p1); auto s1_id = ipm.add_subscription(s1); auto ipm_msg = std::make_shared(); @@ -703,8 +706,7 @@ TEST(TestIntraProcessManager, ring_buffer_displacement) { new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg) ); - auto original_message_pointer1 = unique_msg.get(); - auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); + auto p1_m1_id = ipm.store_intra_process_message(p1_id, std::move(unique_msg)); ASSERT_EQ(nullptr, unique_msg); ipm_msg->message_sequence = 43; @@ -712,7 +714,7 @@ TEST(TestIntraProcessManager, ring_buffer_displacement) { unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); auto original_message_pointer2 = unique_msg.get(); - auto p1_m2_id = ipm.store_intra_process_message(p1_id, unique_msg); + auto p1_m2_id = ipm.store_intra_process_message(p1_id, std::move(unique_msg)); ASSERT_EQ(nullptr, unique_msg); ipm.take_intra_process_message(p1_id, p1_m2_id, s1_id, unique_msg); @@ -728,14 +730,8 @@ TEST(TestIntraProcessManager, ring_buffer_displacement) { ipm_msg->publisher_id = 44; unique_msg.reset(new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg)); - ipm.store_intra_process_message(p1_id, unique_msg); - EXPECT_NE(nullptr, unique_msg); // Should return the thing in the ring buffer it displaced. - if (unique_msg) { - // This should have been the first published message. - EXPECT_EQ(42ul, unique_msg->message_sequence); - EXPECT_EQ(42ul, unique_msg->publisher_id); - EXPECT_EQ(original_message_pointer1, unique_msg.get()); - } + ipm.store_intra_process_message(p1_id, std::move(unique_msg)); + EXPECT_EQ(nullptr, unique_msg); unique_msg.reset(); // Since it just got displaced it should no longer be there to take. @@ -759,8 +755,7 @@ TEST(TestIntraProcessManager, subscription_creation_race_condition) { p1->mock_topic_name = "nominal1"; p1->mock_queue_size = 2; - auto p1_id = - ipm.add_publisher>(p1); + auto p1_id = ipm.add_publisher(p1); auto ipm_msg = std::make_shared(); ipm_msg->message_sequence = 42; @@ -769,7 +764,7 @@ TEST(TestIntraProcessManager, subscription_creation_race_condition) { new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg) ); - auto p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); + auto p1_m1_id = ipm.store_intra_process_message(p1_id, std::move(unique_msg)); ASSERT_EQ(nullptr, unique_msg); auto s1 = std::make_shared(); @@ -808,7 +803,7 @@ TEST(TestIntraProcessManager, publisher_out_of_scope_take) { p1->mock_topic_name = "nominal1"; p1->mock_queue_size = 2; - p1_id = ipm.add_publisher>(p1); + p1_id = ipm.add_publisher(p1); auto ipm_msg = std::make_shared(); ipm_msg->message_sequence = 42; @@ -817,7 +812,7 @@ TEST(TestIntraProcessManager, publisher_out_of_scope_take) { new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg) ); - p1_m1_id = ipm.store_intra_process_message(p1_id, unique_msg); + p1_m1_id = ipm.store_intra_process_message(p1_id, std::move(unique_msg)); ASSERT_EQ(nullptr, unique_msg); // Explicitly remove publisher from ipm (emulate's publisher's destructor). @@ -847,7 +842,7 @@ TEST(TestIntraProcessManager, publisher_out_of_scope_store) { p1->mock_topic_name = "nominal1"; p1->mock_queue_size = 2; - p1_id = ipm.add_publisher>(p1); + p1_id = ipm.add_publisher(p1); } auto ipm_msg = std::make_shared(); @@ -857,6 +852,6 @@ TEST(TestIntraProcessManager, publisher_out_of_scope_store) { new rcl_interfaces::msg::IntraProcessMessage(*ipm_msg) ); - EXPECT_THROW(ipm.store_intra_process_message(p1_id, unique_msg), std::runtime_error); + EXPECT_THROW(ipm.store_intra_process_message(p1_id, std::move(unique_msg)), std::runtime_error); ASSERT_EQ(nullptr, unique_msg); } diff --git a/rclcpp/test/test_mapped_ring_buffer.cpp b/rclcpp/test/test_mapped_ring_buffer.cpp index 528c312778..5625804d13 100644 --- a/rclcpp/test/test_mapped_ring_buffer.cpp +++ b/rclcpp/test/test_mapped_ring_buffer.cpp @@ -12,12 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include +#include +#include -#define RCLCPP_BUILDING_LIBRARY 1 // Prevent including unavailable symbols -#include +#include "gtest/gtest.h" -#include +#define RCLCPP_BUILDING_LIBRARY 1 // Prevent including unavailable symbols +#include "rclcpp/mapped_ring_buffer.hpp" /* Tests get_copy and pop on an empty mrb. @@ -28,60 +29,90 @@ TEST(TestMappedRingBuffer, empty) { // Getting or popping an empty buffer should result in a nullptr. rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(1); - std::unique_ptr actual; - mrb.get_copy_at_key(1, actual); - EXPECT_EQ(nullptr, actual); + std::unique_ptr unique; + mrb.get(1, unique); + EXPECT_EQ(nullptr, unique); + + mrb.pop(1, unique); + EXPECT_EQ(nullptr, unique); + + std::shared_ptr shared; + mrb.get(1, shared); + EXPECT_EQ(nullptr, shared); - mrb.pop_at_key(1, actual); + mrb.pop(1, shared); + EXPECT_EQ(nullptr, shared); +} + +/* + Tests push_and_replace with a temporary object, and using + get and pop methods with shared_ptr signature. + */ +TEST(TestMappedRingBuffer, temporary_l_value_with_shared_get_pop) { + rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2); + // Pass in value with temporary object + mrb.push_and_replace(1, std::shared_ptr(new char('a'))); + + std::shared_ptr actual; + mrb.get(1, actual); + EXPECT_EQ('a', *actual); + + mrb.pop(1, actual); + EXPECT_EQ('a', *actual); + + mrb.get(1, actual); EXPECT_EQ(nullptr, actual); } /* - Tests push_and_replace with a temporary object. + Tests push_and_replace with a temporary object, and using + get and pop methods with unique_ptr signature. */ -TEST(TestMappedRingBuffer, temporary_l_value) { +TEST(TestMappedRingBuffer, temporary_l_value_with_unique_get_pop) { rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2); // Pass in value with temporary object - mrb.push_and_replace(1, std::unique_ptr(new char('a'))); + mrb.push_and_replace(1, std::shared_ptr(new char('a'))); std::unique_ptr actual; - mrb.get_copy_at_key(1, actual); + mrb.get(1, actual); EXPECT_EQ('a', *actual); - mrb.pop_at_key(1, actual); + mrb.pop(1, actual); EXPECT_EQ('a', *actual); - mrb.get_copy_at_key(1, actual); + mrb.get(1, actual); EXPECT_EQ(nullptr, actual); } /* Tests normal usage of the mrb. + Using shared push_and_replace, get and pop methods. */ -TEST(TestMappedRingBuffer, nominal) { +TEST(TestMappedRingBuffer, nominal_push_shared_get_pop_shared) { rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2); - std::unique_ptr expected(new char('a')); - // Store expected value's address for later comparison. - char * expected_orig = expected.get(); + std::shared_ptr expected(new char('a')); EXPECT_FALSE(mrb.push_and_replace(1, expected)); + EXPECT_EQ(2, expected.use_count()); - std::unique_ptr actual; - mrb.get_copy_at_key(1, actual); + std::shared_ptr actual; + mrb.get(1, actual); EXPECT_NE(nullptr, actual); if (actual) { EXPECT_EQ('a', *actual); } - EXPECT_NE(expected_orig, actual.get()); + EXPECT_EQ(expected, actual); + EXPECT_EQ(3, actual.use_count()); - mrb.pop_at_key(1, actual); - EXPECT_NE(nullptr, actual); + mrb.pop(1, actual); + EXPECT_EQ(expected, actual); if (actual) { EXPECT_EQ('a', *actual); } - EXPECT_EQ(expected_orig, actual.get()); + expected.reset(); + EXPECT_TRUE(actual.unique()); - mrb.get_copy_at_key(1, actual); + mrb.get(1, actual); EXPECT_EQ(nullptr, actual); expected.reset(new char('a')); @@ -93,16 +124,16 @@ TEST(TestMappedRingBuffer, nominal) { expected.reset(new char('c')); EXPECT_TRUE(mrb.push_and_replace(3, expected)); - mrb.get_copy_at_key(1, actual); + mrb.get(1, actual); EXPECT_EQ(nullptr, actual); - mrb.get_copy_at_key(2, actual); + mrb.get(2, actual); EXPECT_NE(nullptr, actual); if (actual) { EXPECT_EQ('b', *actual); } - mrb.get_copy_at_key(3, actual); + mrb.get(3, actual); EXPECT_NE(nullptr, actual); if (actual) { EXPECT_EQ('c', *actual); @@ -110,40 +141,167 @@ TEST(TestMappedRingBuffer, nominal) { } /* - Tests get_ownership on a normal mrb. + Tests normal usage of the mrb. + Using shared push_and_replace, unique get and pop methods. */ -TEST(TestMappedRingBuffer, get_ownership) { +TEST(TestMappedRingBuffer, nominal_push_shared_get_pop_unique) { rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2); - std::unique_ptr expected(new char('a')); - // Store expected value's address for later comparison. - char * expected_orig = expected.get(); + std::shared_ptr expected(new char('a')); + const char * expected_orig = expected.get(); EXPECT_FALSE(mrb.push_and_replace(1, expected)); + EXPECT_EQ(2, expected.use_count()); std::unique_ptr actual; - mrb.get_copy_at_key(1, actual); + mrb.get(1, actual); EXPECT_NE(nullptr, actual); if (actual) { EXPECT_EQ('a', *actual); } EXPECT_NE(expected_orig, actual.get()); + mrb.pop(1, actual); + EXPECT_NE(expected_orig, actual.get()); + if (actual) { + EXPECT_EQ('a', *actual); + } + mrb.get(1, actual); + EXPECT_EQ(nullptr, actual); + + EXPECT_FALSE(mrb.push_and_replace(1, expected)); + expected.reset(); + mrb.pop(1, actual); + EXPECT_NE(expected_orig, actual.get()); + if (actual) { + EXPECT_EQ('a', *actual); + } + mrb.get(1, actual); + EXPECT_EQ(nullptr, actual); + + expected.reset(new char('a')); + EXPECT_FALSE(mrb.push_and_replace(1, expected)); + + expected.reset(new char('b')); + EXPECT_FALSE(mrb.push_and_replace(2, expected)); + + expected.reset(new char('c')); + EXPECT_TRUE(mrb.push_and_replace(3, expected)); + + mrb.get(1, actual); + EXPECT_EQ(nullptr, actual); + + mrb.get(2, actual); + EXPECT_NE(nullptr, actual); + if (actual) { + EXPECT_EQ('b', *actual); + } + + mrb.get(3, actual); + EXPECT_NE(nullptr, actual); + if (actual) { + EXPECT_EQ('c', *actual); + } +} + +/* + Tests normal usage of the mrb. + Using unique push_and_replace, get and pop methods. + */ +TEST(TestMappedRingBuffer, nominal_push_unique_get_pop_unique) { + rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2); + std::unique_ptr expected(new char('a')); + const char * expected_orig = expected.get(); + + EXPECT_FALSE(mrb.push_and_replace(1, std::move(expected))); - mrb.get_ownership_at_key(1, actual); + std::unique_ptr actual; + mrb.get(1, actual); EXPECT_NE(nullptr, actual); if (actual) { EXPECT_EQ('a', *actual); } + EXPECT_NE(expected_orig, actual.get()); + mrb.pop(1, actual); + if (actual) { + EXPECT_EQ('a', *actual); + } EXPECT_EQ(expected_orig, actual.get()); + mrb.get(1, actual); + EXPECT_EQ(nullptr, actual); + + expected.reset(new char('a')); + EXPECT_FALSE(mrb.push_and_replace(1, std::move(expected))); + + expected.reset(new char('b')); + EXPECT_FALSE(mrb.push_and_replace(2, std::move(expected))); - mrb.pop_at_key(1, actual); + expected.reset(new char('c')); + EXPECT_TRUE(mrb.push_and_replace(3, std::move(expected))); + + mrb.get(1, actual); + EXPECT_EQ(nullptr, actual); + + mrb.get(2, actual); EXPECT_NE(nullptr, actual); if (actual) { - EXPECT_EQ('a', *actual); // The value should be the same. + EXPECT_EQ('b', *actual); } - EXPECT_NE(expected_orig, actual.get()); // Even though we pop'ed, we didn't get the original. - mrb.get_copy_at_key(1, actual); + mrb.get(3, actual); + EXPECT_NE(nullptr, actual); + if (actual) { + EXPECT_EQ('c', *actual); + } +} + +/* + Tests normal usage of the mrb. + Using unique push_and_replace, shared get and pop methods. + */ +TEST(TestMappedRingBuffer, nominal_push_unique_get_pop_shared) { + rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2); + std::unique_ptr expected(new char('a')); + const char * expected_orig = expected.get(); + + EXPECT_FALSE(mrb.push_and_replace(1, std::move(expected))); + + std::shared_ptr actual; + mrb.get(1, actual); + EXPECT_NE(nullptr, actual); + if (actual) { + EXPECT_EQ('a', *actual); + } + EXPECT_EQ(expected_orig, actual.get()); + mrb.pop(1, actual); + if (actual) { + EXPECT_EQ('a', *actual); + } + EXPECT_EQ(expected_orig, actual.get()); + mrb.get(1, actual); + EXPECT_EQ(nullptr, actual); + + expected.reset(new char('a')); + EXPECT_FALSE(mrb.push_and_replace(1, std::move(expected))); + + expected.reset(new char('b')); + EXPECT_FALSE(mrb.push_and_replace(2, std::move(expected))); + + expected.reset(new char('c')); + EXPECT_TRUE(mrb.push_and_replace(3, std::move(expected))); + + mrb.get(1, actual); EXPECT_EQ(nullptr, actual); + + mrb.get(2, actual); + EXPECT_NE(nullptr, actual); + if (actual) { + EXPECT_EQ('b', *actual); + } + + mrb.get(3, actual); + EXPECT_NE(nullptr, actual); + if (actual) { + EXPECT_EQ('c', *actual); + } } /* @@ -152,22 +310,23 @@ TEST(TestMappedRingBuffer, get_ownership) { TEST(TestMappedRingBuffer, non_unique_keys) { rclcpp::mapped_ring_buffer::MappedRingBuffer mrb(2); - std::unique_ptr input(new char('a')); + std::shared_ptr input(new char('a')); mrb.push_and_replace(1, input); input.reset(new char('b')); // Different value, same key. mrb.push_and_replace(1, input); + input.reset(); std::unique_ptr actual; - mrb.pop_at_key(1, actual); + mrb.pop(1, actual); EXPECT_NE(nullptr, actual); if (actual) { EXPECT_EQ('a', *actual); } actual = nullptr; - mrb.pop_at_key(1, actual); + mrb.pop(1, actual); EXPECT_NE(nullptr, actual); if (actual) { EXPECT_EQ('b', *actual); diff --git a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp index 867714e600..1a1a3edb5d 100644 --- a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp +++ b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp @@ -97,7 +97,7 @@ class LifecyclePublisher : public LifecyclePublisherInterface, * to the actual rclcpp Publisher base class */ virtual void - publish(const std::shared_ptr & msg) + publish(const std::shared_ptr & msg) { if (!enabled_) { RCLCPP_WARN(logger_, @@ -106,26 +106,7 @@ class LifecyclePublisher : public LifecyclePublisherInterface, return; } - rclcpp::Publisher::publish(msg); - } - - /// LifecyclePublisher publish function - /** - * The publish function checks whether the communication - * was enabled or disabled and forwards the message - * to the actual rclcpp Publisher base class - */ - virtual void - publish(std::shared_ptr msg) - { - if (!enabled_) { - RCLCPP_WARN(logger_, - "Trying to publish message on the topic '%s', but the publisher is not activated", - this->get_topic_name()); - - return; - } - rclcpp::Publisher::publish(msg); + rclcpp::Publisher::publish(*msg); } /// LifecyclePublisher publish function @@ -156,25 +137,6 @@ class LifecyclePublisher : public LifecyclePublisherInterface, this->publish(*msg); } - /// LifecyclePublisher publish function - /** - * The publish function checks whether the communication - * was enabled or disabled and forwards the message - * to the actual rclcpp Publisher base class - */ - virtual void - publish(std::shared_ptr & msg) - { - if (!enabled_) { - RCLCPP_WARN(logger_, - "Trying to publish message on the topic '%s', but the publisher is not activated", - this->get_topic_name()); - - return; - } - rclcpp::Publisher::publish(msg); - } - virtual void on_activate() {