From 9b73845f890b6686f376d48868e218980e24af10 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Mon, 20 Jan 2020 07:14:04 +0100 Subject: [PATCH 01/45] added "SerializationBase" and "Serialization" to convert a ROS2 message to rcl_serialized_message_t and vice versa Signed-off-by: Joshua Hampp --- .../rclcpp/experimental/serialization.hpp | 112 ++++++++++++++++++ 1 file changed, 112 insertions(+) create mode 100644 rclcpp/include/rclcpp/experimental/serialization.hpp diff --git a/rclcpp/include/rclcpp/experimental/serialization.hpp b/rclcpp/include/rclcpp/experimental/serialization.hpp new file mode 100644 index 0000000000..78231d6e54 --- /dev/null +++ b/rclcpp/include/rclcpp/experimental/serialization.hpp @@ -0,0 +1,112 @@ +// Copyright 2019 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXPERIMENTAL__SERIALIZATION_HPP_ +#define RCLCPP__EXPERIMENTAL__SERIALIZATION_HPP_ + +#include + +#include +#include + +#include "rcl/error_handling.h" + +namespace rclcpp +{ +namespace experimental +{ + +/// Interface to (de)serialize a message +class SerializationBase +{ +public: + virtual ~SerializationBase() {} + + virtual std::shared_ptr serialize_message(const void * message) = 0; + + virtual void deserialize_message( + const rcl_serialized_message_t & serialized_message, + void * msg) = 0; +}; + +/// Default implementation to (de)serialize a message by using rmw_(de)serialize +class Serialization : public SerializationBase +{ +public: + Serialization( + const rosidl_message_type_support_t & type_support, + const rcutils_allocator_t allocator = rcutils_get_default_allocator()) + : type_support_(type_support), rcutils_allocator_(allocator) + {} + + std::shared_ptr serialize_message(const void * message) override + { + auto serialized_message = new rcl_serialized_message_t; + *serialized_message = rmw_get_zero_initialized_serialized_message(); + const auto ret = rmw_serialized_message_init(serialized_message, 0, &rcutils_allocator_); + if (ret != RCUTILS_RET_OK) { + throw std::runtime_error( + "Error allocating resources for serialized message: " + + std::string(rcutils_get_error_string().str)); + } + + if (message) { + const auto error = rmw_serialize( + message, + &type_support_, + serialized_message); + if (error != RCL_RET_OK) { + throw std::runtime_error("Failed to serialize."); + } + } + + auto shared_serialized_msg = std::shared_ptr( + serialized_message, + [](rcl_serialized_message_t * msg) { + auto fini_ret = rmw_serialized_message_fini(msg); + delete msg; + if (fini_ret != RCL_RET_OK) { + RCUTILS_LOG_ERROR_NAMED( + "rclcpp", + "failed to destroy serialized message: %s", rcl_get_error_string().str); + } + }); + + return shared_serialized_msg; + } + + void deserialize_message(const rcl_serialized_message_t & serialized_message, void * msg) override + { + if (serialized_message.buffer_capacity == 0 || + serialized_message.buffer_length == 0 || + !serialized_message.buffer) + { + throw std::runtime_error("Failed to deserialize nullptr serialized message."); + } + + const auto ret = rmw_deserialize(&serialized_message, &type_support_, msg); + if (ret != RMW_RET_OK) { + throw std::runtime_error("Failed to deserialize serialized message."); + } + } + +private: + rosidl_message_type_support_t type_support_; + rcutils_allocator_t rcutils_allocator_; +}; + +} // namespace experimental +} // namespace rclcpp + +#endif // RCLCPP__EXPERIMENTAL__SERIALIZATION_HPP_ From c8162ba8617d0194a50b15a86583b0d568dea4b8 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Mon, 20 Jan 2020 07:16:08 +0100 Subject: [PATCH 02/45] added "SerializedContainer" as container of rcl_serialized_message_t, for addind a deleter to ease up memory handling further features: * copy constructor (allowing static memory allocation, e.g. if for static memory allocation in device driver) * destructor Signed-off-by: Joshua Hampp --- .../experimental/serialized_container.hpp | 75 +++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 rclcpp/include/rclcpp/experimental/serialized_container.hpp diff --git a/rclcpp/include/rclcpp/experimental/serialized_container.hpp b/rclcpp/include/rclcpp/experimental/serialized_container.hpp new file mode 100644 index 0000000000..5aba11d566 --- /dev/null +++ b/rclcpp/include/rclcpp/experimental/serialized_container.hpp @@ -0,0 +1,75 @@ +// Copyright 2019 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXPERIMENTAL__SERIALIZED_CONTAINER_HPP_ +#define RCLCPP__EXPERIMENTAL__SERIALIZED_CONTAINER_HPP_ + +#include + +#include "rcutils/logging_macros.h" + +#include "rmw/serialized_message.h" + +namespace rclcpp +{ +namespace experimental +{ + +/// Object oriented version of rcl_serialized_message_t with destructor to avoid memory leaks +class SerializedContainer : public rcl_serialized_message_t +{ +public: + SerializedContainer() + : rcl_serialized_message_t(rmw_get_zero_initialized_serialized_message()) + {} + + explicit SerializedContainer(const SerializedContainer & sc) + : rcl_serialized_message_t(rmw_get_zero_initialized_serialized_message()) + { + const auto ret = rmw_serialized_message_init(this, sc.buffer_length, &sc.allocator); + if (ret != RCL_RET_OK) { + rclcpp::exceptions::throw_from_rcl_error(ret); + } + + // do not call memcpy if the pointer is "static" + if (buffer != sc.buffer) { + std::memcpy(buffer, sc.buffer, sc.buffer_length); + } + buffer_length = sc.buffer_length; + } + + explicit SerializedContainer(rcl_serialized_message_t && msg) + : rcl_serialized_message_t(msg) + { + // reset buffer to prevent double free + msg = rmw_get_zero_initialized_serialized_message(); + } + + ~SerializedContainer() + { + if (buffer != nullptr) { + const auto fini_ret = rmw_serialized_message_fini(this); + if (fini_ret != RCL_RET_OK) { + RCUTILS_LOG_ERROR_NAMED( + "rclcpp", + "failed to destroy serialized message: %s", rcl_get_error_string().str); + } + } + } +}; + +} // namespace experimental +} // namespace rclcpp + +#endif // RCLCPP__EXPERIMENTAL__SERIALIZED_CONTAINER_HPP_ From f3aab4f6c5017bafa5e974b32a079628e09297b2 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Mon, 20 Jan 2020 07:30:00 +0100 Subject: [PATCH 03/45] extended "IntraProcessManager" for serialized messages: * added flag for serialized communication * check for flag in "can_communicate" Signed-off-by: Joshua Hampp --- .../rclcpp/experimental/intra_process_manager.hpp | 10 ++++++++-- rclcpp/src/rclcpp/intra_process_manager.cpp | 15 +++++++++++++-- 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp index d671de701e..04508af46e 100644 --- a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp @@ -110,11 +110,14 @@ class IntraProcessManager * In addition this generates a unique intra process id for the subscription. * * \param subscription the SubscriptionIntraProcess to register. + * \param is_serialized true if the buffer expects serialized messages * \return an unsigned 64-bit integer which is the subscription's unique id. */ RCLCPP_PUBLIC uint64_t - add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription); + add_subscription( + rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription, + const bool is_serialized = false); /// Unregister a subscription using the subscription's unique id. /** @@ -134,11 +137,12 @@ class IntraProcessManager * In addition this generates a unique intra process id for the publisher. * * \param publisher publisher to be registered with the manager. + * \param is_serialized true if the buffer expects serialized messages * \return an unsigned 64-bit integer which is the publisher's unique id. */ RCLCPP_PUBLIC uint64_t - add_publisher(rclcpp::PublisherBase::SharedPtr publisher); + add_publisher(rclcpp::PublisherBase::SharedPtr publisher, const bool is_serialized = false); /// Unregister a publisher using the publisher's unique id. /** @@ -311,6 +315,7 @@ class IntraProcessManager rmw_qos_profile_t qos; const char * topic_name; bool use_take_shared_method; + bool is_serialized; }; struct PublisherInfo @@ -320,6 +325,7 @@ class IntraProcessManager rclcpp::PublisherBase::WeakPtr publisher; rmw_qos_profile_t qos; const char * topic_name; + bool is_serialized; }; struct SplittedSubscriptions diff --git a/rclcpp/src/rclcpp/intra_process_manager.cpp b/rclcpp/src/rclcpp/intra_process_manager.cpp index 0b9c9d6670..06ab36d49a 100644 --- a/rclcpp/src/rclcpp/intra_process_manager.cpp +++ b/rclcpp/src/rclcpp/intra_process_manager.cpp @@ -32,7 +32,9 @@ IntraProcessManager::~IntraProcessManager() {} uint64_t -IntraProcessManager::add_publisher(rclcpp::PublisherBase::SharedPtr publisher) +IntraProcessManager::add_publisher( + rclcpp::PublisherBase::SharedPtr publisher, + const bool is_serialized) { std::unique_lock lock(mutex_); @@ -41,6 +43,7 @@ IntraProcessManager::add_publisher(rclcpp::PublisherBase::SharedPtr publisher) publishers_[id].publisher = publisher; publishers_[id].topic_name = publisher->get_topic_name(); publishers_[id].qos = publisher->get_actual_qos().get_rmw_qos_profile(); + publishers_[id].is_serialized = is_serialized; // Initialize the subscriptions storage for this publisher. pub_to_subs_[id] = SplittedSubscriptions(); @@ -56,7 +59,9 @@ IntraProcessManager::add_publisher(rclcpp::PublisherBase::SharedPtr publisher) } uint64_t -IntraProcessManager::add_subscription(SubscriptionIntraProcessBase::SharedPtr subscription) +IntraProcessManager::add_subscription( + SubscriptionIntraProcessBase::SharedPtr subscription, + const bool is_serialized) { std::unique_lock lock(mutex_); @@ -66,6 +71,7 @@ IntraProcessManager::add_subscription(SubscriptionIntraProcessBase::SharedPtr su subscriptions_[id].topic_name = subscription->get_topic_name(); subscriptions_[id].qos = subscription->get_actual_qos(); subscriptions_[id].use_take_shared_method = subscription->use_take_shared_method(); + subscriptions_[id].is_serialized = is_serialized; // adds the subscription id to all the matchable publishers for (auto & pair : publishers_) { @@ -220,6 +226,11 @@ IntraProcessManager::can_communicate( return false; } + // a publisher and a subscription with different content type can't communicate + if (sub_info.is_serialized != pub_info.is_serialized) { + return false; + } + return true; } From 9f1c10f1c41285c67af59aa9af01374c3f86c673 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Mon, 20 Jan 2020 07:32:48 +0100 Subject: [PATCH 04/45] extended "SubscriptionIntraProcess" for serialized communication by: * distinguish between content type and callback type * use "SerializedContainer" for serialized messages for memory deletion * added specialized methods for combinations of (un)serialized content/callback * automatically (de)serialize messages * allowed communication types are now (MessageT==CallbackMessageT || MessageT==SerializedContainer || CallbackMessageT==rcl_serialized_message_t) Signed-off-by: Joshua Hampp --- .../subscription_intra_process.hpp | 116 +++++++++++++++++- 1 file changed, 110 insertions(+), 6 deletions(-) diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index 618db3cac1..d6a6772eba 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -27,6 +27,7 @@ #include "rclcpp/any_subscription_callback.hpp" #include "rclcpp/experimental/buffers/intra_process_buffer.hpp" #include "rclcpp/experimental/create_intra_process_buffer.hpp" +#include "rclcpp/experimental/serialization.hpp" #include "rclcpp/experimental/subscription_intra_process_base.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/waitable.hpp" @@ -37,6 +38,8 @@ namespace rclcpp namespace experimental { +class SerializedContainer; + template< typename MessageT, typename Alloc = std::allocator, @@ -51,6 +54,10 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase using MessageAlloc = typename MessageAllocTraits::allocator_type; using ConstMessageSharedPtr = std::shared_ptr; using MessageUniquePtr = std::unique_ptr; + using CallbackMessageAllocTraits = allocator::AllocRebind; + using CallbackMessageAlloc = typename CallbackMessageAllocTraits::allocator_type; + using CallbackMessageUniquePtr = std::unique_ptr; + using CallbackMessageSharedPtr = std::shared_ptr; using BufferUniquePtr = typename rclcpp::experimental::buffers::IntraProcessBuffer< MessageT, @@ -64,11 +71,15 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase rclcpp::Context::SharedPtr context, const std::string & topic_name, rmw_qos_profile_t qos_profile, - rclcpp::IntraProcessBufferType buffer_type) + rclcpp::IntraProcessBufferType buffer_type, + std::shared_ptr serializer) : SubscriptionIntraProcessBase(topic_name, qos_profile), - any_callback_(callback) + any_callback_(callback), serializer_(serializer) { - if (!std::is_same::value) { + if (!std::is_same::value && + !std::is_same::value && + !std::is_same::value) + { throw std::runtime_error("SubscriptionIntraProcess wrong callback type"); } @@ -143,14 +154,40 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase } template - typename std::enable_if::value, void>::type + typename std::enable_if< + std::is_same::value && + !std::is_same::value, + void>::type execute_impl() { - throw std::runtime_error("Subscription intra-process can't handle serialized messages"); + if (serializer_) { + rmw_message_info_t msg_info; + msg_info.from_intra_process = true; + + ConstMessageSharedPtr msg = buffer_->consume_shared(); + auto serialized_msg = + serializer_->serialize_message(reinterpret_cast(msg.get())); + + if (serialized_msg == nullptr) { + throw std::runtime_error("Subscription intra-process could not serialize message"); + } + + if (any_callback_.use_take_shared_method()) { + any_callback_.dispatch_intra_process(serialized_msg, msg_info); + } else { + throw std::runtime_error("Subscription intra-process for serialized " + "messages does not support unique pointers."); + } + } else { + throw std::runtime_error("Subscription intra-process can't handle serialized messages"); + } } template - typename std::enable_if::value, void>::type + typename std::enable_if< + !std::is_same::value && + !std::is_same::value, + void>::type execute_impl() { rmw_message_info_t msg_info; @@ -166,8 +203,75 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase } } + template + typename std::enable_if< + std::is_same::value && + std::is_same::value, + void>::type + execute_impl() + { + rmw_message_info_t msg_info; + msg_info.from_intra_process = true; + + if (any_callback_.use_take_shared_method()) { + ConstMessageSharedPtr msg = buffer_->consume_shared(); + if (msg == nullptr) { + throw std::runtime_error("Subscription intra-process could not get serialized message"); + } + any_callback_.dispatch_intra_process(msg, msg_info); + } else { + throw std::runtime_error("Subscription intra-process for serialized " + "messages does not support unique pointers."); + } + } + + template + typename std::enable_if< + !std::is_same::value && + std::is_same::value, + void>::type + execute_impl() + { + if (serializer_) { + ConstMessageSharedPtr serialized_container = buffer_->consume_shared(); + if (serialized_container == nullptr) { + throw std::runtime_error("Subscription intra-process could not get serialized message"); + } + + rmw_message_info_t msg_info; + msg_info.from_intra_process = true; + + if (any_callback_.use_take_shared_method()) { + CallbackMessageSharedPtr msg = construct_unique(); + serializer_->deserialize_message(*serialized_container, + reinterpret_cast(msg.get())); + any_callback_.dispatch_intra_process(msg, msg_info); + } else { + CallbackMessageUniquePtr msg = construct_unique(); + serializer_->deserialize_message(*serialized_container, + reinterpret_cast(msg.get())); + any_callback_.dispatch_intra_process(std::move(msg), msg_info); + } + } else { + throw std::runtime_error("Subscription intra-process can't handle unserialized messages"); + } + } + + CallbackMessageUniquePtr construct_unique() + { + CallbackMessageUniquePtr unique_msg; + auto ptr = CallbackMessageAllocTraits::allocate(*message_allocator_.get(), 1); + CallbackMessageAllocTraits::construct(*message_allocator_.get(), ptr); + unique_msg = CallbackMessageUniquePtr(ptr); + + return unique_msg; + } + AnySubscriptionCallback any_callback_; BufferUniquePtr buffer_; + std::shared_ptr serializer_; + std::shared_ptr message_allocator_ = + std::make_shared(); }; } // namespace experimental From a300c2f4aed5f69cc99298211daa4291f52abd66 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Mon, 20 Jan 2020 07:35:00 +0100 Subject: [PATCH 05/45] added secon communication channel for intra process communication for serialized communication in publisher base and subscriber base (adapted waitables) Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/publisher_base.hpp | 2 ++ rclcpp/include/rclcpp/subscription_base.hpp | 8 +++--- .../rclcpp/node_interfaces/node_topics.cpp | 10 +++++--- rclcpp/src/rclcpp/publisher_base.cpp | 6 ++++- rclcpp/src/rclcpp/subscription_base.cpp | 25 ++++++++++++------- 5 files changed, 33 insertions(+), 18 deletions(-) diff --git a/rclcpp/include/rclcpp/publisher_base.hpp b/rclcpp/include/rclcpp/publisher_base.hpp index 9627acaa4a..4f678ec0bb 100644 --- a/rclcpp/include/rclcpp/publisher_base.hpp +++ b/rclcpp/include/rclcpp/publisher_base.hpp @@ -191,6 +191,7 @@ class PublisherBase : public std::enable_shared_from_this void setup_intra_process( uint64_t intra_process_publisher_id, + uint64_t intra_process_publisher_id_serialized, IntraProcessManagerSharedPtr ipm); protected: @@ -222,6 +223,7 @@ class PublisherBase : public std::enable_shared_from_this bool intra_process_is_enabled_; IntraProcessManagerWeakPtr weak_ipm_; uint64_t intra_process_publisher_id_; + uint64_t intra_process_publisher_id_serialized_; rmw_gid_t rmw_gid_; }; diff --git a/rclcpp/include/rclcpp/subscription_base.hpp b/rclcpp/include/rclcpp/subscription_base.hpp index 7ee8c624d1..707c0b419a 100644 --- a/rclcpp/include/rclcpp/subscription_base.hpp +++ b/rclcpp/include/rclcpp/subscription_base.hpp @@ -228,13 +228,13 @@ class SubscriptionBase : public std::enable_shared_from_this RCLCPP_PUBLIC void setup_intra_process( - uint64_t intra_process_subscription_id, + const std::vector & intra_process_subscription_ids, IntraProcessManagerWeakPtr weak_ipm); /// Return the waitable for intra-process, or nullptr if intra-process is not setup. RCLCPP_PUBLIC - rclcpp::Waitable::SharedPtr - get_intra_process_waitable() const; + std::vector + get_intra_process_waitables() const; /// Exchange state of whether or not a part of the subscription is used by a wait set. /** @@ -286,7 +286,7 @@ class SubscriptionBase : public std::enable_shared_from_this bool use_intra_process_; IntraProcessManagerWeakPtr weak_ipm_; - uint64_t intra_process_subscription_id_; + std::vector intra_process_subscription_ids_; private: RCLCPP_DISABLE_COPY(SubscriptionBase) diff --git a/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp b/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp index 11de4a3278..7135fbadea 100644 --- a/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp +++ b/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp @@ -99,10 +99,12 @@ NodeTopics::add_subscription( callback_group->add_waitable(subscription_event); } - auto intra_process_waitable = subscription->get_intra_process_waitable(); - if (nullptr != intra_process_waitable) { - // Add to the callback group to be notified about intra-process msgs. - callback_group->add_waitable(intra_process_waitable); + auto intra_process_waitables = subscription->get_intra_process_waitables(); + for (auto & intra_process_waitable : intra_process_waitables) { + if (nullptr != intra_process_waitable) { + // Add to the callback group to be notified about intra-process msgs. + callback_group->add_waitable(intra_process_waitable); + } } // Notify the executor that a new subscription was created using the parent Node. diff --git a/rclcpp/src/rclcpp/publisher_base.cpp b/rclcpp/src/rclcpp/publisher_base.cpp index c982a95d33..0f2258881a 100644 --- a/rclcpp/src/rclcpp/publisher_base.cpp +++ b/rclcpp/src/rclcpp/publisher_base.cpp @@ -44,7 +44,8 @@ PublisherBase::PublisherBase( const rosidl_message_type_support_t & type_support, const rcl_publisher_options_t & publisher_options) : rcl_node_handle_(node_base->get_shared_rcl_node_handle()), - intra_process_is_enabled_(false), intra_process_publisher_id_(0) + intra_process_is_enabled_(false), intra_process_publisher_id_(0), + intra_process_publisher_id_serialized_(0) { rcl_ret_t ret = rcl_publisher_init( &publisher_handle_, @@ -105,6 +106,7 @@ PublisherBase::~PublisherBase() return; } ipm->remove_publisher(intra_process_publisher_id_); + ipm->remove_publisher(intra_process_publisher_id_serialized_); } const char * @@ -238,9 +240,11 @@ PublisherBase::operator==(const rmw_gid_t * gid) const void PublisherBase::setup_intra_process( uint64_t intra_process_publisher_id, + uint64_t intra_process_publisher_id_serialized, IntraProcessManagerSharedPtr ipm) { intra_process_publisher_id_ = intra_process_publisher_id; + intra_process_publisher_id_serialized_ = intra_process_publisher_id_serialized; weak_ipm_ = ipm; intra_process_is_enabled_ = true; } diff --git a/rclcpp/src/rclcpp/subscription_base.cpp b/rclcpp/src/rclcpp/subscription_base.cpp index 2af068e32e..9336154892 100644 --- a/rclcpp/src/rclcpp/subscription_base.cpp +++ b/rclcpp/src/rclcpp/subscription_base.cpp @@ -39,7 +39,6 @@ SubscriptionBase::SubscriptionBase( : node_base_(node_base), node_handle_(node_base_->get_shared_rcl_node_handle()), use_intra_process_(false), - intra_process_subscription_id_(0), type_support_(type_support_handle), is_serialized_(is_serialized) { @@ -93,7 +92,9 @@ SubscriptionBase::~SubscriptionBase() "Intra process manager died before than a subscription."); return; } - ipm->remove_subscription(intra_process_subscription_id_); + for (auto intra_process_subscription_id : intra_process_subscription_ids_) { + ipm->remove_subscription(intra_process_subscription_id); + } } const char * @@ -204,10 +205,10 @@ SubscriptionBase::get_publisher_count() const void SubscriptionBase::setup_intra_process( - uint64_t intra_process_subscription_id, + const std::vector & intra_process_subscription_ids, IntraProcessManagerWeakPtr weak_ipm) { - intra_process_subscription_id_ = intra_process_subscription_id; + intra_process_subscription_ids_ = intra_process_subscription_ids; weak_ipm_ = weak_ipm; use_intra_process_ = true; } @@ -218,12 +219,12 @@ SubscriptionBase::can_loan_messages() const return rcl_subscription_can_loan_messages(subscription_handle_.get()); } -rclcpp::Waitable::SharedPtr -SubscriptionBase::get_intra_process_waitable() const +std::vector +SubscriptionBase::get_intra_process_waitables() const { // If not using intra process, shortcut to nullptr. if (!use_intra_process_) { - return nullptr; + return std::vector(); } // Get the intra process manager. auto ipm = weak_ipm_.lock(); @@ -233,8 +234,14 @@ SubscriptionBase::get_intra_process_waitable() const "after destruction of intra process manager"); } - // Use the id to retrieve the subscription intra-process from the intra-process manager. - return ipm->get_subscription_intra_process(intra_process_subscription_id_); + std::vector waitables(intra_process_subscription_ids_.size()); + + for (size_t i = 0; i < intra_process_subscription_ids_.size(); ++i) { + // Use the id to retrieve the subscription intra-process from the intra-process manager. + waitables[i] = ipm->get_subscription_intra_process(intra_process_subscription_ids_[i]); + } + + return waitables; } void From 86bd3a3c80d58b0bc295b436835483927b833be8 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Mon, 20 Jan 2020 07:40:06 +0100 Subject: [PATCH 06/45] extended "Publisher" for serialized messages: * implemented second communication channel for serialized intra process messages extended "Publisher" for serialized messages: * pass message type by argument * added constructor for backwards compatibility (moved common code in separate methods "init_setup") * added allocator for serialized messages Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/publisher.hpp | 92 ++++++++++++++++++++--------- 1 file changed, 63 insertions(+), 29 deletions(-) diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index aa614e3a0c..053cb5862b 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -56,6 +56,12 @@ class Publisher : public PublisherBase using MessageDeleter = allocator::Deleter; using MessageUniquePtr = std::unique_ptr; using MessageSharedPtr = std::shared_ptr; + using SerializedMessageAllocatorTraits = + allocator::AllocRebind; + using SerializedMessageAllocator = typename SerializedMessageAllocatorTraits::allocator_type; + using SerializedMessageDeleter = allocator::Deleter; RCLCPP_SMART_PTR_DEFINITIONS(Publisher) @@ -70,37 +76,28 @@ class Publisher : public PublisherBase *rosidl_typesupport_cpp::get_message_type_support_handle(), options.template to_rcl_publisher_options(qos)), options_(options), - message_allocator_(new MessageAllocator(*options.get_allocator().get())) + message_allocator_(new MessageAllocator(*options.get_allocator().get())), + message_allocator_serialized_(new SerializedMessageAllocator(*options.get_allocator().get())) { - allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_.get()); + init_setup(); + } - if (options_.event_callbacks.deadline_callback) { - this->add_event_handler( - options_.event_callbacks.deadline_callback, - RCL_PUBLISHER_OFFERED_DEADLINE_MISSED); - } - if (options_.event_callbacks.liveliness_callback) { - this->add_event_handler( - options_.event_callbacks.liveliness_callback, - RCL_PUBLISHER_LIVELINESS_LOST); - } - if (options_.event_callbacks.incompatible_qos_callback) { - this->add_event_handler( - options_.event_callbacks.incompatible_qos_callback, - RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS); - } else if (options_.use_default_callbacks) { - // Register default callback when not specified - try { - this->add_event_handler( - [this](QOSOfferedIncompatibleQoSInfo & info) { - this->default_incompatible_qos_callback(info); - }, - RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS); - } catch (UnsupportedEventTypeException & /*exc*/) { - // pass - } - } - // Setup continues in the post construction method, post_init_setup(). + Publisher( + rclcpp::node_interfaces::NodeBaseInterface * node_base, + const std::string & topic, + const rclcpp::QoS & qos, + const rclcpp::PublisherOptionsWithAllocator & options, + const rosidl_message_type_support_t & type_support) + : PublisherBase( + node_base, + topic, + type_support, + options.template to_rcl_publisher_options(qos)), + options_(options), + message_allocator_(new MessageAllocator(*options.get_allocator().get())), + message_allocator_serialized_(new SerializedMessageAllocator(*options.get_allocator().get())) + { + init_setup(); } /// Called post construction, so that construction may continue after shared_from_this() works. @@ -259,6 +256,43 @@ class Publisher : public PublisherBase } protected: + void init_setup() + { + allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_.get()); + + if (options_.event_callbacks.deadline_callback) { + this->add_event_handler( + options_.event_callbacks.deadline_callback, + RCL_PUBLISHER_OFFERED_DEADLINE_MISSED); + } + if (options_.event_callbacks.liveliness_callback) { + this->add_event_handler( + options_.event_callbacks.liveliness_callback, + RCL_PUBLISHER_LIVELINESS_LOST); + } + if (options_.event_callbacks.incompatible_qos_callback) { + this->add_event_handler( + options_.event_callbacks.incompatible_qos_callback, + RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS); + } else if (options_.use_default_callbacks) { + // Register default callback when not specified + try { + this->add_event_handler( + [this](QOSOfferedIncompatibleQoSInfo & info) { + this->default_incompatible_qos_callback(info); + }, + RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS); + } catch (UnsupportedEventTypeException & /*exc*/) { + RCLCPP_WARN_ONCE( + rclcpp::get_logger(rcl_node_get_logger_name(rcl_node_handle_.get())), + "This rmw implementation does not support ON_OFFERED_INCOMPATIBLE_QOS " + "events, you will not be notified when Publishers offer an incompatible " + "QoS profile to Subscriptions on the same topic."); + } + } + // Setup continues in the post construction method, post_init_setup(). + } + void do_inter_process_publish(const MessageT & msg) { From 9d7ef528854934c8be9ef6b79bcead80864e8eb9 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Mon, 20 Jan 2020 07:43:02 +0100 Subject: [PATCH 07/45] extended "Publisher" for serialized messages: * implemented second communication channel for serialized intra process messages * extend publish to handle "rcl_serialized_message_t" * changed behaviour of serialized message publishing by taking ownership of message (for deletion) Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/publisher.hpp | 123 ++++++++++++++++++++-------- 1 file changed, 90 insertions(+), 33 deletions(-) diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index 053cb5862b..3371227537 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -132,8 +132,11 @@ class Publisher : public PublisherBase "intraprocess communication allowed only with volatile durability"); } uint64_t intra_process_publisher_id = ipm->add_publisher(this->shared_from_this()); + uint64_t intra_process_publisher_id_serialized = ipm->add_publisher( + this->shared_from_this(), true); this->setup_intra_process( intra_process_publisher_id, + intra_process_publisher_id_serialized, ipm); } } @@ -170,6 +173,11 @@ class Publisher : public PublisherBase virtual void publish(std::unique_ptr msg) { + if (std::is_same::value) { + this->template publish(std::move(msg)); + return; + } + if (!intra_process_is_enabled_) { this->do_inter_process_publish(*msg); return; @@ -184,34 +192,33 @@ class Publisher : public PublisherBase get_subscription_count() > get_intra_process_subscription_count(); if (inter_process_publish_needed) { - auto shared_msg = this->do_intra_process_publish_and_return_shared(std::move(msg)); + auto shared_msg = this->do_intra_process_publish_and_return_shared(std::move( + msg), message_allocator_); this->do_inter_process_publish(*shared_msg); } else { - this->do_intra_process_publish(std::move(msg)); + this->do_intra_process_publish(std::move(msg), message_allocator_); } } virtual void publish(const MessageT & msg) { - // Avoid allocating when not using intra process. - if (!intra_process_is_enabled_) { - // In this case we're not using intra process. - return this->do_inter_process_publish(msg); - } - // Otherwise we have to allocate memory in a unique_ptr and pass it along. - // As the message is not const, a copy should be made. - // A shared_ptr could also be constructed here. - auto ptr = MessageAllocatorTraits::allocate(*message_allocator_.get(), 1); - MessageAllocatorTraits::construct(*message_allocator_.get(), ptr, msg); - MessageUniquePtr unique_msg(ptr, message_deleter_); - this->publish(std::move(unique_msg)); + this->do_publish_message(msg); } - void - publish(const rcl_serialized_message_t & serialized_msg) + /// Publish a serialized message. Non specialized version to prevent comipiling errors. + template + void publish(std::unique_ptr serialized_msg) + { + (void)serialized_msg; + throw std::runtime_error("not supported"); + } + + /// Publish a serialized message. + template + void publish(std::unique_ptr serialized_msg) { - return this->do_serialized_publish(&serialized_msg); + this->do_serialized_publish(serialized_msg.get()); } /// Publish an instance of a LoanedMessage. @@ -293,6 +300,33 @@ class Publisher : public PublisherBase // Setup continues in the post construction method, post_init_setup(). } + template + typename std::enable_if::value>::type + do_publish_message(const T & msg) + { + // Avoid allocating when not using intra process. + if (!intra_process_is_enabled_) { + // In this case we're not using intra process. + return this->do_inter_process_publish(msg); + } + // Otherwise we have to allocate memory in a unique_ptr and pass it along. + // As the message is not const, a copy should be made. + // A shared_ptr could also be constructed here. + auto ptr = MessageAllocatorTraits::allocate(*message_allocator_.get(), 1); + MessageAllocatorTraits::construct(*message_allocator_.get(), ptr, msg); + MessageUniquePtr unique_msg(ptr, message_deleter_); + this->publish(std::move(unique_msg)); + } + + template + typename std::enable_if::value>::type + do_publish_message(const MessageT & msg) + { + (void)msg; + throw std::runtime_error( + "publishing serialized messages is only supported for unique pointers"); + } + void do_inter_process_publish(const MessageT & msg) { @@ -314,15 +348,24 @@ class Publisher : public PublisherBase } void - do_serialized_publish(const rcl_serialized_message_t * serialized_msg) + do_serialized_publish(rcl_serialized_message_t * serialized_msg) { - if (intra_process_is_enabled_) { - // TODO(Karsten1987): support serialized message passed by intraprocess - throw std::runtime_error("storing serialized messages in intra process is not supported yet"); + bool inter_process_publish_needed = + get_subscription_count() > get_intra_process_subscription_count(); + + if (inter_process_publish_needed) { + // declare here to avoid deletion before returning method + auto status = rcl_publish_serialized_message(&publisher_handle_, serialized_msg, nullptr); + if (RCL_RET_OK != status) { + rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish serialized message"); + } } - auto status = rcl_publish_serialized_message(&publisher_handle_, serialized_msg, nullptr); - if (RCL_RET_OK != status) { - rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish serialized message"); + + auto msg = std::make_unique( + std::move(*serialized_msg)); + + if (intra_process_is_enabled_) { + do_intra_process_publish(std::move(msg), message_allocator_serialized_); } } @@ -346,8 +389,11 @@ class Publisher : public PublisherBase } } + template void - do_intra_process_publish(std::unique_ptr msg) + do_intra_process_publish( + std::unique_ptr msg, + std::shared_ptr & message_allocator) { auto ipm = weak_ipm_.lock(); if (!ipm) { @@ -358,14 +404,21 @@ class Publisher : public PublisherBase throw std::runtime_error("cannot publish msg which is a null pointer"); } - ipm->template do_intra_process_publish( - intra_process_publisher_id_, + const uint64_t intra_process_publisher_id = std::is_same::value ? + intra_process_publisher_id_serialized_ : intra_process_publisher_id_; + + ipm->template do_intra_process_publish( + intra_process_publisher_id, std::move(msg), - message_allocator_); + message_allocator); } - std::shared_ptr - do_intra_process_publish_and_return_shared(std::unique_ptr msg) + template + std::shared_ptr + do_intra_process_publish_and_return_shared( + std::unique_ptr msg, + std::shared_ptr & message_allocator) { auto ipm = weak_ipm_.lock(); if (!ipm) { @@ -376,10 +429,14 @@ class Publisher : public PublisherBase throw std::runtime_error("cannot publish msg which is a null pointer"); } - return ipm->template do_intra_process_publish_and_return_shared( - intra_process_publisher_id_, + const uint64_t intra_process_publisher_id = std::is_same::value ? + intra_process_publisher_id_serialized_ : intra_process_publisher_id_; + + return ipm->template do_intra_process_publish_and_return_shared( + intra_process_publisher_id, std::move(msg), - message_allocator_); + message_allocator); } /// Copy of original options passed during construction. From c8522b3cd60cab3cb374fc62b32ae50557a7adb2 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Mon, 20 Jan 2020 07:43:25 +0100 Subject: [PATCH 08/45] updated "LifcecylePublisher" for serialized messages Signed-off-by: Joshua Hampp --- .../rclcpp_lifecycle/lifecycle_publisher.hpp | 28 +++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp index 2f94dc78c0..52fd17101c 100644 --- a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp +++ b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp @@ -61,8 +61,9 @@ class LifecyclePublisher : public LifecyclePublisherInterface, rclcpp::node_interfaces::NodeBaseInterface * node_base, const std::string & topic, const rclcpp::QoS & qos, - const rclcpp::PublisherOptionsWithAllocator & options) - : rclcpp::Publisher(node_base, topic, qos, options), + const rclcpp::PublisherOptionsWithAllocator & options, + const rosidl_message_type_support_t & type_support) + : rclcpp::Publisher(node_base, topic, qos, options, type_support), enabled_(false), logger_(rclcpp::get_logger("LifecyclePublisher")) { @@ -110,6 +111,29 @@ class LifecyclePublisher : public LifecyclePublisherInterface, rclcpp::Publisher::publish(msg); } + /// Publish a serialized message. Non specialized version to prevent compiling errors. + template + void publish(std::unique_ptr serialized_msg) + { + (void)serialized_msg; + throw std::runtime_error("not supported"); + } + + /// Publish a serialized message. + template + void publish(std::unique_ptr serialized_msg) + { + if (!enabled_) { + RCLCPP_WARN( + logger_, + "Trying to publish message on the topic '%s', but the publisher is not activated", + this->get_topic_name()); + + return; + } + this->do_serialized_publish(serialized_msg.get()); + } + virtual void on_activate() { From 29fea926af093cee530bfa121eba178655129496 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Mon, 20 Jan 2020 07:46:37 +0100 Subject: [PATCH 09/45] extended "create_publisher" and "create_publisher_factory" to pass message type * backwards compatible * allows creation of publisher with type "rcl_serialized_message_t" Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/create_publisher.hpp | 24 ++++++++++++++++++++- rclcpp/include/rclcpp/publisher_factory.hpp | 9 +++++--- 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/rclcpp/include/rclcpp/create_publisher.hpp b/rclcpp/include/rclcpp/create_publisher.hpp index 811c18b69f..649e0bc8b1 100644 --- a/rclcpp/include/rclcpp/create_publisher.hpp +++ b/rclcpp/include/rclcpp/create_publisher.hpp @@ -43,6 +43,7 @@ std::shared_ptr create_publisher( NodeT & node, const std::string & topic_name, + const rosidl_message_type_support_t & type_support, const rclcpp::QoS & qos, const rclcpp::PublisherOptionsWithAllocator & options = ( rclcpp::PublisherOptionsWithAllocator() @@ -56,7 +57,7 @@ create_publisher( // Create the publisher. auto pub = node_topics->create_publisher( topic_name, - rclcpp::create_publisher_factory(options), + rclcpp::create_publisher_factory(options, type_support), qos ); @@ -66,6 +67,27 @@ create_publisher( return std::dynamic_pointer_cast(pub); } +template< + typename MessageT, + typename AllocatorT = std::allocator, + typename PublisherT = rclcpp::Publisher, + typename NodeT> +std::shared_ptr +create_publisher( + NodeT & node, + const std::string & topic_name, + const rclcpp::QoS & qos, + const rclcpp::PublisherOptionsWithAllocator & options = ( + rclcpp::PublisherOptionsWithAllocator() + ) +) +{ + const auto type_support = *rosidl_typesupport_cpp::get_message_type_support_handle(); + + return create_publisher(node, topic_name, type_support, + qos, options); +} + } // namespace rclcpp #endif // RCLCPP__CREATE_PUBLISHER_HPP_ diff --git a/rclcpp/include/rclcpp/publisher_factory.hpp b/rclcpp/include/rclcpp/publisher_factory.hpp index 87def3cc17..7f64567318 100644 --- a/rclcpp/include/rclcpp/publisher_factory.hpp +++ b/rclcpp/include/rclcpp/publisher_factory.hpp @@ -63,17 +63,20 @@ struct PublisherFactory /// Return a PublisherFactory with functions setup for creating a PublisherT. template PublisherFactory -create_publisher_factory(const rclcpp::PublisherOptionsWithAllocator & options) +create_publisher_factory( + const rclcpp::PublisherOptionsWithAllocator & options, + const rosidl_message_type_support_t & type_support) { PublisherFactory factory { // factory function that creates a MessageT specific PublisherT - [options]( + [options, type_support]( rclcpp::node_interfaces::NodeBaseInterface * node_base, const std::string & topic_name, const rclcpp::QoS & qos ) -> std::shared_ptr { - auto publisher = std::make_shared(node_base, topic_name, qos, options); + auto publisher = std::make_shared(node_base, topic_name, qos, options, + type_support); // This is used for setting up things like intra process comms which // require this->shared_from_this() which cannot be called from // the constructor. From f9ef2c2fd56df7ed7f9d3b502514ed8a386449be Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Mon, 20 Jan 2020 07:47:37 +0100 Subject: [PATCH 10/45] extended "Subscription" for serialized messages: * implemented second communication channel for serialized intra process messages Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/subscription.hpp | 93 ++++++++++++++++++++------ 1 file changed, 72 insertions(+), 21 deletions(-) diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 2b5172dc1b..8b3a676411 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -35,6 +35,7 @@ #include "rclcpp/exceptions.hpp" #include "rclcpp/expand_topic_or_service_name.hpp" #include "rclcpp/experimental/intra_process_manager.hpp" +#include "rclcpp/experimental/serialized_container.hpp" #include "rclcpp/experimental/subscription_intra_process.hpp" #include "rclcpp/logging.hpp" #include "rclcpp/macros.hpp" @@ -155,29 +156,79 @@ class Subscription : public SubscriptionBase "intraprocess communication allowed only with volatile durability"); } - // First create a SubscriptionIntraProcess which will be given to the intra-process manager. - auto context = node_base->get_context(); - using SubscriptionIntraProcessT = rclcpp::experimental::SubscriptionIntraProcess< - CallbackMessageT, - AllocatorT, - typename MessageUniquePtr::deleter_type>; - auto subscription_intra_process = std::make_shared( - callback, - options.get_allocator(), - context, - this->get_topic_name(), // important to get like this, as it has the fully-qualified name - qos_profile, - resolve_intra_process_buffer_type(options.intra_process_buffer_type, callback)); - TRACEPOINT( - rclcpp_subscription_init, - (const void *)get_subscription_handle().get(), - (const void *)subscription_intra_process.get()); - - // Add it to the intra process manager. using rclcpp::experimental::IntraProcessManager; + uint64_t intra_process_subscription_id, intra_process_subscription_id_serialized; + + auto context = node_base->get_context(); auto ipm = context->get_sub_context(); - uint64_t intra_process_subscription_id = ipm->add_subscription(subscription_intra_process); - this->setup_intra_process(intra_process_subscription_id, ipm); + + { + // First create a SubscriptionIntraProcess which will be given to the intra-process manager. + auto subscription_intra_process = std::make_shared< + rclcpp::experimental::SubscriptionIntraProcess< + CallbackMessageT, + AllocatorT, + typename MessageUniquePtr::deleter_type + >>( + callback, + options.get_allocator(), + context, + this->get_topic_name(), // important to get it by the fully-qualified name + qos.get_rmw_qos_profile(), + resolve_intra_process_buffer_type(options.intra_process_buffer_type, callback), + std::make_shared(type_support_handle, + options.template to_rcl_subscription_options(qos).allocator) + ); + TRACEPOINT( + rclcpp_subscription_init, + (const void *)get_subscription_handle().get(), + (const void *)subscription_intra_process.get()); + + // Add it to the intra process manager. + intra_process_subscription_id = ipm->add_subscription(subscription_intra_process); + } + + { + using SerializedMessageAllocatorTraits = + allocator::AllocRebind; + using SerializedMessageAllocator = + typename SerializedMessageAllocatorTraits::allocator_type; + using SerializedMessageDeleter = allocator::Deleter; + using SerializedMessageUniquePtr = + std::unique_ptr; + + // First create a SubscriptionIntraProcess which will be given to the intra-process manager. + auto subscription_intra_process = std::make_shared< + rclcpp::experimental::SubscriptionIntraProcess< + rclcpp::experimental::SerializedContainer, + AllocatorT, + typename SerializedMessageUniquePtr::deleter_type, + CallbackMessageT + >>( + callback, + options.get_allocator(), + context, + this->get_topic_name(), // important to get it by the fully-qualified name + qos.get_rmw_qos_profile(), + resolve_intra_process_buffer_type(options.intra_process_buffer_type, callback), + std::make_shared(type_support_handle, + options.template to_rcl_subscription_options(qos).allocator) + ); + TRACEPOINT( + rclcpp_subscription_init, + (const void *)get_subscription_handle().get(), + (const void *)subscription_intra_process.get()); + + // Add it to the intra process manager. + intra_process_subscription_id_serialized = ipm->add_subscription(subscription_intra_process, + true); + } + + this->setup_intra_process({intra_process_subscription_id, + intra_process_subscription_id_serialized}, ipm); } TRACEPOINT( From 56b520eb95784469ad64166e4484026ed194c516 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Mon, 20 Jan 2020 07:47:58 +0100 Subject: [PATCH 11/45] extended "create_subscription" and "create_subscriptioncreate_publisher_factory" to pass message type * backwards compatible * allows creation of publisher with type "rcl_serialized_message_t" Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/create_subscription.hpp | 38 ++++++++++++++++++- .../include/rclcpp/subscription_factory.hpp | 7 ++-- 2 files changed, 41 insertions(+), 4 deletions(-) diff --git a/rclcpp/include/rclcpp/create_subscription.hpp b/rclcpp/include/rclcpp/create_subscription.hpp index 9248254047..56fd37b3c3 100644 --- a/rclcpp/include/rclcpp/create_subscription.hpp +++ b/rclcpp/include/rclcpp/create_subscription.hpp @@ -51,6 +51,7 @@ typename std::shared_ptr create_subscription( NodeT && node, const std::string & topic_name, + const rosidl_message_type_support_t & type_support, const rclcpp::QoS & qos, CallbackT && callback, const rclcpp::SubscriptionOptionsWithAllocator & options = ( @@ -67,7 +68,8 @@ create_subscription( auto factory = rclcpp::create_subscription_factory( std::forward(callback), options, - msg_mem_strat + msg_mem_strat, + type_support ); auto sub = node_topics->create_subscription(topic_name, factory, qos); @@ -76,6 +78,40 @@ create_subscription( return std::dynamic_pointer_cast(sub); } +template< + typename MessageT, + typename CallbackT, + typename AllocatorT = std::allocator, + typename CallbackMessageT = + typename rclcpp::subscription_traits::has_message_type::type, + typename SubscriptionT = rclcpp::Subscription, + typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy< + CallbackMessageT, + AllocatorT + >, + typename NodeT> +typename std::shared_ptr +create_subscription( + NodeT && node, + const std::string & topic_name, + const rclcpp::QoS & qos, + CallbackT && callback, + const rclcpp::SubscriptionOptionsWithAllocator & options = ( + rclcpp::SubscriptionOptionsWithAllocator() + ), + typename MessageMemoryStrategyT::SharedPtr msg_mem_strat = ( + MessageMemoryStrategyT::create_default() + ) +) +{ + const auto type_support = *rosidl_typesupport_cpp::get_message_type_support_handle(); + + return create_subscription(std::forward( + node), topic_name, type_support, qos, std::forward( + callback), options, msg_mem_strat); +} + } // namespace rclcpp #endif // RCLCPP__CREATE_SUBSCRIPTION_HPP_ diff --git a/rclcpp/include/rclcpp/subscription_factory.hpp b/rclcpp/include/rclcpp/subscription_factory.hpp index a0f265c803..67d2272a99 100644 --- a/rclcpp/include/rclcpp/subscription_factory.hpp +++ b/rclcpp/include/rclcpp/subscription_factory.hpp @@ -78,7 +78,8 @@ SubscriptionFactory create_subscription_factory( CallbackT && callback, const rclcpp::SubscriptionOptionsWithAllocator & options, - typename MessageMemoryStrategyT::SharedPtr msg_mem_strat) + typename MessageMemoryStrategyT::SharedPtr msg_mem_strat, + const rosidl_message_type_support_t & type_support) { auto allocator = options.get_allocator(); @@ -88,7 +89,7 @@ create_subscription_factory( SubscriptionFactory factory { // factory function that creates a MessageT specific SubscriptionT - [options, msg_mem_strat, any_subscription_callback]( + [options, msg_mem_strat, any_subscription_callback, type_support]( rclcpp::node_interfaces::NodeBaseInterface * node_base, const std::string & topic_name, const rclcpp::QoS & qos @@ -99,7 +100,7 @@ create_subscription_factory( auto sub = Subscription::make_shared( node_base, - *rosidl_typesupport_cpp::get_message_type_support_handle(), + type_support, topic_name, qos, any_subscription_callback, From 02921427f7c7645781165299aebfa26fa4c6f580 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Mon, 20 Jan 2020 07:49:00 +0100 Subject: [PATCH 12/45] added unit test for serialized intra process communication * test memory (de)allocation * test communication channels between serialized and unserialized content Signed-off-by: Joshua Hampp --- rclcpp/CMakeLists.txt | 13 + .../test/test_intra_process_communication.cpp | 336 ++++++++++++++++++ 2 files changed, 349 insertions(+) create mode 100644 rclcpp/test/test_intra_process_communication.cpp diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 1389ad8aeb..40fa190408 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -245,6 +245,19 @@ if(BUILD_TESTING) "test_msgs" ) target_link_libraries(test_loaned_message ${PROJECT_NAME}) + ament_add_gtest(test_intra_process_communication test/test_intra_process_communication.cpp + TIMEOUT 120) + if(TARGET test_intra_process_communication) + ament_target_dependencies(test_intra_process_communication + "rcl" + "rcl_interfaces" + "rmw" + "rosidl_generator_cpp" + "rosidl_typesupport_cpp" + "test_msgs" + ) + endif() + target_link_libraries(test_intra_process_communication ${PROJECT_NAME}) ament_add_gtest(test_node test/test_node.cpp TIMEOUT 240) if(TARGET test_node) diff --git a/rclcpp/test/test_intra_process_communication.cpp b/rclcpp/test/test_intra_process_communication.cpp new file mode 100644 index 0000000000..f8f4174429 --- /dev/null +++ b/rclcpp/test/test_intra_process_communication.cpp @@ -0,0 +1,336 @@ +// Copyright 2019 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include + +#include +#include +#include +#include +#include + +#include "rclcpp/exceptions.hpp" +#include "rclcpp/publisher.hpp" +#include "rclcpp/rclcpp.hpp" + +#include "rclcpp/experimental/serialized_container.hpp" + +int32_t & get_test_allocation_counter() +{ + static int32_t counter = 0; + return counter; +} + +void * custom_allocate(size_t size, void * state) +{ + static auto m_allocator = rcutils_get_default_allocator(); + + ++get_test_allocation_counter(); + auto r = m_allocator.allocate(size, state); + return r; +} + +void custom_deallocate(void * pointer, void * state) +{ + static auto m_allocator = rcutils_get_default_allocator(); + + --get_test_allocation_counter(); + m_allocator.deallocate(pointer, state); +} + +rcutils_uint8_array_t make_serialized_string_msg( + const std::shared_ptr & stringMsg) +{ + auto m_allocator = rcutils_get_default_allocator(); + + // add custom (de)allocator to count the references to the object + m_allocator.allocate = &custom_allocate; + m_allocator.deallocate = &custom_deallocate; + + ++get_test_allocation_counter(); + + rcl_serialized_message_t msg = rmw_get_zero_initialized_serialized_message(); + auto ret = rmw_serialized_message_init(&msg, 0, &m_allocator); + if (ret != RCL_RET_OK) { + rclcpp::exceptions::throw_from_rcl_error(ret); + } + + static auto type = + rosidl_typesupport_cpp::get_message_type_support_handle + (); + auto error = rmw_serialize(stringMsg.get(), type, &msg); + if (error != RMW_RET_OK) { + RCUTILS_LOG_ERROR_NAMED("test_intra_process_communication", + "Something went wrong preparing the serialized message"); + } + + return msg; +} + +/** + * Parameterized test. + * The first param are the NodeOptions used to create the nodes. + * The second param are the expected intraprocess count results. + */ +struct TestParameters +{ + rclcpp::NodeOptions node_options[2]; + uint64_t intraprocess_count_results[2]; + size_t runs; + std::string description; +}; + +std::ostream & operator<<(std::ostream & out, const TestParameters & params) +{ + out << params.description; + return out; +} + +class TestPublisherSubscriptionSerialized : public ::testing::TestWithParam +{ +public: + static void SetUpTestCase() + { + if (!rclcpp::is_initialized()) { + rclcpp::init(0, nullptr); + } + } + +protected: + static std::chrono::milliseconds offset; +}; + +std::chrono::milliseconds TestPublisherSubscriptionSerialized::offset = std::chrono::milliseconds( + 2000); +std::array counts; + +void OnMessageSerialized(const std::shared_ptr msg) +{ + EXPECT_NE(msg->buffer, nullptr); + EXPECT_GT(msg->buffer_capacity, 0u); + + ++counts[0]; +} + +void OnMessageConst(std::shared_ptr msg) +{ + EXPECT_EQ(msg->string_value.back(), '9'); + + ++counts[1]; +} + +void OnMessageUniquePtr(std::unique_ptr msg) +{ + EXPECT_EQ(msg->string_value.back(), '9'); + + ++counts[1]; +} + +void OnMessage(std::shared_ptr msg) +{ + EXPECT_EQ(msg->string_value.back(), '9'); + + ++counts[1]; +} + +TEST_P(TestPublisherSubscriptionSerialized, publish_serialized) +{ + get_test_allocation_counter() = 0; + + TestParameters parameters = GetParam(); + { + rclcpp::Node::SharedPtr node = std::make_shared( + "my_node", + "/ns", + parameters.node_options[0]); + auto publisher = node->create_publisher("/topic", 10); + + auto sub_shared = node->create_subscription("/topic", 10, + &OnMessage); + auto sub_unique = node->create_subscription("/topic", 10, + &OnMessageUniquePtr); + auto sub_const_shared = node->create_subscription("/topic", 10, + &OnMessageConst); + auto sub_serialized = node->create_subscription("/topic", 10, + &OnMessageSerialized); + + rclcpp::sleep_for(offset); + + counts.fill(0); + auto stringMsg = get_messages_strings()[3]; + + for (size_t i = 0; i < parameters.runs; i++) { + auto msg0 = make_serialized_string_msg(stringMsg); + + std::unique_ptr stringMsgU( + new test_msgs::msg::Strings( + *stringMsg)); + + publisher->publish(std::make_unique(msg0)); + publisher->publish(*stringMsg); + publisher->publish(std::move(stringMsgU)); + } + rclcpp::spin_some(node); + rclcpp::sleep_for(offset); + + rclcpp::spin_some(node); + } + + if (parameters.runs == 1) { + EXPECT_EQ(counts[0], 3u); + EXPECT_EQ(counts[1], 9u); + } + + EXPECT_LE(get_test_allocation_counter(), 30); +} + +TEST_P(TestPublisherSubscriptionSerialized, publish_serialized_generic) +{ + get_test_allocation_counter() = 0; + + TestParameters parameters = GetParam(); + { + rclcpp::Node::SharedPtr node = std::make_shared( + "my_node", + "/ns", + parameters.node_options[0]); + auto publisher = rclcpp::create_publisher( + node, + "/topic", + *rosidl_typesupport_cpp::get_message_type_support_handle(), + rclcpp::QoS(10)); + + auto sub_gen_serialized = rclcpp::create_subscription( + node, + "/topic", + *rosidl_typesupport_cpp::get_message_type_support_handle(), + rclcpp::QoS(10), + &OnMessageSerialized); + + auto sub_shared = node->create_subscription("/topic", 10, + &OnMessage); + auto sub_unique = node->create_subscription("/topic", 10, + &OnMessageUniquePtr); + auto sub_const_shared = node->create_subscription("/topic", 10, + &OnMessageConst); + auto sub_serialized = node->create_subscription("/topic", 10, + &OnMessageSerialized); + + rclcpp::sleep_for(offset); + + counts.fill(0); + auto stringMsg = get_messages_strings()[3]; + + for (size_t i = 0; i < parameters.runs; i++) { + auto msg0 = make_serialized_string_msg(stringMsg); + + publisher->publish(std::make_unique(msg0)); + } + rclcpp::spin_some(node); + rclcpp::sleep_for(offset); + + rclcpp::spin_some(node); + } + + if (parameters.runs == 1) { + EXPECT_EQ(counts[0], 2u); + EXPECT_EQ(counts[1], 3u); + } + + EXPECT_LE(get_test_allocation_counter(), 30); +} + +auto get_new_context() +{ + auto context = rclcpp::Context::make_shared(); + context->init(0, nullptr); + return context; +} + +std::vector parameters = { + /* + Testing publisher subscription count api and internal process subscription count. + Two subscriptions in the same topic, both using intraprocess comm. + */ + { + { + rclcpp::NodeOptions().use_intra_process_comms(true), + rclcpp::NodeOptions().use_intra_process_comms(true) + }, + {1u, 2u}, + 1, + "two_subscriptions_intraprocess_comm" + }, + /* + Testing publisher subscription count api and internal process subscription count. + Two subscriptions, one using intra-process comm and the other not using it. + */ + { + { + rclcpp::NodeOptions().use_intra_process_comms(true), + rclcpp::NodeOptions().use_intra_process_comms(false) + }, + {1u, 1u}, + 1, + "two_subscriptions_one_intraprocess_one_not" + }, + /* + Testing publisher subscription count api and internal process subscription count. + Two contexts, both using intra-process. + */ + { + { + rclcpp::NodeOptions().use_intra_process_comms(true), + rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(true) + }, + {1u, 1u}, + 1, + "two_subscriptions_in_two_contexts_with_intraprocess_comm" + }, + /* + Testing publisher subscription count api and internal process subscription count. + Two contexts, both of them not using intra-process comm. + */ + { + { + rclcpp::NodeOptions().use_intra_process_comms(false), + rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(false) + }, + {0u, 0u}, + 1, + "two_subscriptions_in_two_contexts_without_intraprocess_comm" + } +}; + +std::vector setRuns(const std::vector & in, const size_t runs) +{ + std::vector out = in; + for (auto & p : out) { + p.runs = runs; + } + return out; +} + +INSTANTIATE_TEST_CASE_P( + TestWithDifferentNodeOptions, TestPublisherSubscriptionSerialized, + ::testing::ValuesIn(parameters), + ::testing::PrintToStringParamName()); + +INSTANTIATE_TEST_CASE_P( + TestWithDifferentNodeOptions1000Runs, TestPublisherSubscriptionSerialized, + ::testing::ValuesIn(setRuns(parameters, 1000)), + ::testing::PrintToStringParamName()); From 5faf8f40e79ea019631f5999506a6798cfd29285 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Mon, 27 Jan 2020 07:58:36 +0100 Subject: [PATCH 13/45] changed include notation Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/experimental/serialization.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rclcpp/include/rclcpp/experimental/serialization.hpp b/rclcpp/include/rclcpp/experimental/serialization.hpp index 78231d6e54..1bb230ce60 100644 --- a/rclcpp/include/rclcpp/experimental/serialization.hpp +++ b/rclcpp/include/rclcpp/experimental/serialization.hpp @@ -15,7 +15,7 @@ #ifndef RCLCPP__EXPERIMENTAL__SERIALIZATION_HPP_ #define RCLCPP__EXPERIMENTAL__SERIALIZATION_HPP_ -#include +#include "rmw/rmw.h" #include #include From a3db0069b60963c82a84956394bbf60148aa0c25 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Mon, 27 Jan 2020 07:58:48 +0100 Subject: [PATCH 14/45] beautified error output Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/experimental/serialization.hpp | 2 +- rclcpp/include/rclcpp/experimental/serialized_container.hpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/rclcpp/include/rclcpp/experimental/serialization.hpp b/rclcpp/include/rclcpp/experimental/serialization.hpp index 1bb230ce60..120b2d20f9 100644 --- a/rclcpp/include/rclcpp/experimental/serialization.hpp +++ b/rclcpp/include/rclcpp/experimental/serialization.hpp @@ -79,7 +79,7 @@ class Serialization : public SerializationBase if (fini_ret != RCL_RET_OK) { RCUTILS_LOG_ERROR_NAMED( "rclcpp", - "failed to destroy serialized message: %s", rcl_get_error_string().str); + "Failed to destroy serialized message: %s", rcl_get_error_string().str); } }); diff --git a/rclcpp/include/rclcpp/experimental/serialized_container.hpp b/rclcpp/include/rclcpp/experimental/serialized_container.hpp index 5aba11d566..a53e62bfd9 100644 --- a/rclcpp/include/rclcpp/experimental/serialized_container.hpp +++ b/rclcpp/include/rclcpp/experimental/serialized_container.hpp @@ -63,7 +63,7 @@ class SerializedContainer : public rcl_serialized_message_t if (fini_ret != RCL_RET_OK) { RCUTILS_LOG_ERROR_NAMED( "rclcpp", - "failed to destroy serialized message: %s", rcl_get_error_string().str); + "Failed to destroy serialized message: %s", rcl_get_error_string().str); } } } From d7a84eaa66664cef30558612ae7a9034acedb7a4 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Mon, 27 Jan 2020 15:33:33 +0100 Subject: [PATCH 15/45] changed timing Signed-off-by: Joshua Hampp --- rclcpp/test/test_intra_process_communication.cpp | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/rclcpp/test/test_intra_process_communication.cpp b/rclcpp/test/test_intra_process_communication.cpp index f8f4174429..9c4c60963e 100644 --- a/rclcpp/test/test_intra_process_communication.cpp +++ b/rclcpp/test/test_intra_process_communication.cpp @@ -115,7 +115,7 @@ class TestPublisherSubscriptionSerialized : public ::testing::TestWithParam counts; void OnMessageSerialized(const std::shared_ptr msg) @@ -184,8 +184,10 @@ TEST_P(TestPublisherSubscriptionSerialized, publish_serialized) publisher->publish(*stringMsg); publisher->publish(std::move(stringMsgU)); } - rclcpp::spin_some(node); - rclcpp::sleep_for(offset); + for (uint32_t i = 0; i < 3; ++i) { + rclcpp::spin_some(node); + rclcpp::sleep_for(offset); + } rclcpp::spin_some(node); } From af14ee76bedf35e9afd8615a9c4a83c62cfb8347 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Tue, 28 Jan 2020 15:08:07 +0100 Subject: [PATCH 16/45] * fixed code style * updated error messages * added const modifier Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/publisher.hpp | 5 +++-- rclcpp/include/rclcpp/subscription.hpp | 3 ++- rclcpp/src/rclcpp/node_interfaces/node_topics.cpp | 2 +- .../include/rclcpp_lifecycle/lifecycle_publisher.hpp | 3 ++- 4 files changed, 8 insertions(+), 5 deletions(-) diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index 3371227537..8fa9aebc78 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -206,12 +206,13 @@ class Publisher : public PublisherBase this->do_publish_message(msg); } - /// Publish a serialized message. Non specialized version to prevent comipiling errors. + /// Publish a serialized message. Non specialized version to prevent compiling errors. template void publish(std::unique_ptr serialized_msg) { (void)serialized_msg; - throw std::runtime_error("not supported"); + throw std::runtime_error( + "publishing unique_ptr with custom deleter only supported for serialized messages"); } /// Publish a serialized message. diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 8b3a676411..9f6284ed65 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -157,7 +157,8 @@ class Subscription : public SubscriptionBase } using rclcpp::experimental::IntraProcessManager; - uint64_t intra_process_subscription_id, intra_process_subscription_id_serialized; + uint64_t intra_process_subscription_id; + uint64_t intra_process_subscription_id_serialized; auto context = node_base->get_context(); auto ipm = context->get_sub_context(); diff --git a/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp b/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp index 7135fbadea..9cb84d3cce 100644 --- a/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp +++ b/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp @@ -99,7 +99,7 @@ NodeTopics::add_subscription( callback_group->add_waitable(subscription_event); } - auto intra_process_waitables = subscription->get_intra_process_waitables(); + const auto intra_process_waitables = subscription->get_intra_process_waitables(); for (auto & intra_process_waitable : intra_process_waitables) { if (nullptr != intra_process_waitable) { // Add to the callback group to be notified about intra-process msgs. diff --git a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp index 52fd17101c..c9f5ea5976 100644 --- a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp +++ b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp @@ -116,7 +116,8 @@ class LifecyclePublisher : public LifecyclePublisherInterface, void publish(std::unique_ptr serialized_msg) { (void)serialized_msg; - throw std::runtime_error("not supported"); + throw std::runtime_error( + "publishing unique_ptr with custom deleter only supported for serialized messages"); } /// Publish a serialized message. From 0c83e4981d6b43b369529290d65decd97a2cee03 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Tue, 28 Jan 2020 15:09:15 +0100 Subject: [PATCH 17/45] updated test_intra_process_communication: * fixed allocation counter * updated data type naming Signed-off-by: Joshua Hampp --- .../test/test_intra_process_communication.cpp | 31 ++++++++++++++++--- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/rclcpp/test/test_intra_process_communication.cpp b/rclcpp/test/test_intra_process_communication.cpp index 9c4c60963e..c2377e58d3 100644 --- a/rclcpp/test/test_intra_process_communication.cpp +++ b/rclcpp/test/test_intra_process_communication.cpp @@ -44,6 +44,27 @@ void * custom_allocate(size_t size, void * state) return r; } +void * custom_zero_allocate(size_t number_of_elements, size_t size_of_element, void * state) +{ + static auto m_allocator = rcutils_get_default_allocator(); + + ++get_test_allocation_counter(); + auto r = m_allocator.zero_allocate(number_of_elements, size_of_element, state); + return r; +} + +void * custom_reallocate(void *pointer, size_t size, void * state) +{ + static auto m_allocator = rcutils_get_default_allocator(); + + if (pointer == nullptr) { + ++get_test_allocation_counter(); + } + + auto r = m_allocator.reallocate(pointer, size, state); + return r; +} + void custom_deallocate(void * pointer, void * state) { static auto m_allocator = rcutils_get_default_allocator(); @@ -52,7 +73,7 @@ void custom_deallocate(void * pointer, void * state) m_allocator.deallocate(pointer, state); } -rcutils_uint8_array_t make_serialized_string_msg( +rcl_serialized_message_t make_serialized_string_msg( const std::shared_ptr & stringMsg) { auto m_allocator = rcutils_get_default_allocator(); @@ -60,8 +81,8 @@ rcutils_uint8_array_t make_serialized_string_msg( // add custom (de)allocator to count the references to the object m_allocator.allocate = &custom_allocate; m_allocator.deallocate = &custom_deallocate; - - ++get_test_allocation_counter(); + m_allocator.reallocate = &custom_reallocate; + m_allocator.zero_allocate = &custom_zero_allocate; rcl_serialized_message_t msg = rmw_get_zero_initialized_serialized_message(); auto ret = rmw_serialized_message_init(&msg, 0, &m_allocator); @@ -197,7 +218,7 @@ TEST_P(TestPublisherSubscriptionSerialized, publish_serialized) EXPECT_EQ(counts[1], 9u); } - EXPECT_LE(get_test_allocation_counter(), 30); + EXPECT_EQ(get_test_allocation_counter(), 0); } TEST_P(TestPublisherSubscriptionSerialized, publish_serialized_generic) @@ -253,7 +274,7 @@ TEST_P(TestPublisherSubscriptionSerialized, publish_serialized_generic) EXPECT_EQ(counts[1], 3u); } - EXPECT_LE(get_test_allocation_counter(), 30); + EXPECT_EQ(get_test_allocation_counter(), 0); } auto get_new_context() From bc228ee0328defdcd6aa9ab5ddecccdffcfc455b Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Tue, 28 Jan 2020 15:31:10 +0100 Subject: [PATCH 18/45] uncrustified file Signed-off-by: Joshua Hampp --- rclcpp/test/test_intra_process_communication.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rclcpp/test/test_intra_process_communication.cpp b/rclcpp/test/test_intra_process_communication.cpp index c2377e58d3..a7d01fecd3 100644 --- a/rclcpp/test/test_intra_process_communication.cpp +++ b/rclcpp/test/test_intra_process_communication.cpp @@ -53,7 +53,7 @@ void * custom_zero_allocate(size_t number_of_elements, size_t size_of_element, v return r; } -void * custom_reallocate(void *pointer, size_t size, void * state) +void * custom_reallocate(void * pointer, size_t size, void * state) { static auto m_allocator = rcutils_get_default_allocator(); From 4c80594a6f7d2ed4594bb30736e01216d7f6cb42 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Thu, 30 Jan 2020 07:58:21 +0100 Subject: [PATCH 19/45] fixed code style Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/experimental/serialization.hpp | 2 +- rclcpp/include/rclcpp/publisher.hpp | 2 +- .../include/rclcpp_lifecycle/lifecycle_publisher.hpp | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/rclcpp/include/rclcpp/experimental/serialization.hpp b/rclcpp/include/rclcpp/experimental/serialization.hpp index 120b2d20f9..76f8ce21b7 100644 --- a/rclcpp/include/rclcpp/experimental/serialization.hpp +++ b/rclcpp/include/rclcpp/experimental/serialization.hpp @@ -15,7 +15,7 @@ #ifndef RCLCPP__EXPERIMENTAL__SERIALIZATION_HPP_ #define RCLCPP__EXPERIMENTAL__SERIALIZATION_HPP_ -#include "rmw/rmw.h" +#include #include #include diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index 8fa9aebc78..72d659d1a3 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -212,7 +212,7 @@ class Publisher : public PublisherBase { (void)serialized_msg; throw std::runtime_error( - "publishing unique_ptr with custom deleter only supported for serialized messages"); + "publishing unique_ptr with custom deleter only supported for serialized messages"); } /// Publish a serialized message. diff --git a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp index c9f5ea5976..960ae91c04 100644 --- a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp +++ b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp @@ -117,7 +117,7 @@ class LifecyclePublisher : public LifecyclePublisherInterface, { (void)serialized_msg; throw std::runtime_error( - "publishing unique_ptr with custom deleter only supported for serialized messages"); + "publishing unique_ptr with custom deleter only supported for serialized messages"); } /// Publish a serialized message. From 2ddbe32469801e85cbcce8e4a9abf6648539ba73 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Fri, 17 Apr 2020 10:56:34 +0200 Subject: [PATCH 20/45] added missing member from rebase Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/publisher.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index 72d659d1a3..ab0e6c15fe 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -448,6 +448,7 @@ class Publisher : public PublisherBase const rclcpp::PublisherOptionsWithAllocator options_; std::shared_ptr message_allocator_; + std::shared_ptr message_allocator_serialized_; MessageDeleter message_deleter_; }; From 4e1e74462476320a6ebbecbf8927d6466098db8a Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Fri, 17 Apr 2020 11:20:08 +0200 Subject: [PATCH 21/45] removed unnecessary dependency Signed-off-by: Joshua Hampp --- rclcpp/CMakeLists.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 40fa190408..3c271b20c6 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -252,7 +252,6 @@ if(BUILD_TESTING) "rcl" "rcl_interfaces" "rmw" - "rosidl_generator_cpp" "rosidl_typesupport_cpp" "test_msgs" ) From d8f1da9c888db1d2a42292b24148a8d4273a9d2c Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Fri, 17 Apr 2020 11:20:46 +0200 Subject: [PATCH 22/45] adapted to multi waitables for ipm Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/wait_set_template.hpp | 34 +++++++++++---------- rclcpp/src/rclcpp/subscription_base.cpp | 8 +++-- 2 files changed, 23 insertions(+), 19 deletions(-) diff --git a/rclcpp/include/rclcpp/wait_set_template.hpp b/rclcpp/include/rclcpp/wait_set_template.hpp index 899d461ed3..39ac5f63df 100644 --- a/rclcpp/include/rclcpp/wait_set_template.hpp +++ b/rclcpp/include/rclcpp/wait_set_template.hpp @@ -165,18 +165,19 @@ class WaitSetTemplate final : private SynchronizationPolicy, private StoragePoli } if (mask.include_intra_process_waitable) { auto local_subscription = inner_subscription; - auto waitable = inner_subscription->get_intra_process_waitable(); - if (nullptr != waitable) { - bool already_in_use = local_subscription->exchange_in_use_by_wait_set_state( - waitable.get(), - true); - if (already_in_use) { - throw std::runtime_error( - "subscription intra-process waitable already associated with a wait set"); + for(auto waitable : inner_subscription->get_intra_process_waitables()) { + if (nullptr != waitable) { + bool already_in_use = local_subscription->exchange_in_use_by_wait_set_state( + waitable.get(), + true); + if (already_in_use) { + throw std::runtime_error( + "subscription intra-process waitable already associated with a wait set"); + } + this->storage_add_waitable( + std::move(waitable), + std::move(local_subscription)); } - this->storage_add_waitable( - std::move(inner_subscription->get_intra_process_waitable()), - std::move(local_subscription)); } } }); @@ -230,11 +231,12 @@ class WaitSetTemplate final : private SynchronizationPolicy, private StoragePoli } } if (mask.include_intra_process_waitable) { - auto local_waitable = inner_subscription->get_intra_process_waitable(); - inner_subscription->exchange_in_use_by_wait_set_state(local_waitable.get(), false); - if (nullptr != local_waitable) { - // This is the case when intra process is disabled for the subscription. - this->storage_remove_waitable(std::move(local_waitable)); + for(auto local_waitable : inner_subscription->get_intra_process_waitables()) { + inner_subscription->exchange_in_use_by_wait_set_state(local_waitable.get(), false); + if (nullptr != local_waitable) { + // This is the case when intra process is disabled for the subscription. + this->storage_remove_waitable(std::move(local_waitable)); + } } } }); diff --git a/rclcpp/src/rclcpp/subscription_base.cpp b/rclcpp/src/rclcpp/subscription_base.cpp index 9336154892..600ecd344f 100644 --- a/rclcpp/src/rclcpp/subscription_base.cpp +++ b/rclcpp/src/rclcpp/subscription_base.cpp @@ -230,7 +230,7 @@ SubscriptionBase::get_intra_process_waitables() const auto ipm = weak_ipm_.lock(); if (!ipm) { throw std::runtime_error( - "SubscriptionBase::get_intra_process_waitable() called " + "SubscriptionBase::get_intra_process_waitables() called " "after destruction of intra process manager"); } @@ -281,8 +281,10 @@ SubscriptionBase::exchange_in_use_by_wait_set_state( if (this == pointer_to_subscription_part) { return subscription_in_use_by_wait_set_.exchange(in_use_state); } - if (get_intra_process_waitable().get() == pointer_to_subscription_part) { - return intra_process_subscription_waitable_in_use_by_wait_set_.exchange(in_use_state); + for(auto & waitable : get_intra_process_waitables()) { + if (waitable.get() == pointer_to_subscription_part) { + return intra_process_subscription_waitable_in_use_by_wait_set_.exchange(in_use_state); + } } for (const auto & qos_event : event_handlers_) { if (qos_event.get() == pointer_to_subscription_part) { From d74c4edac10e2082962cfb2ad9d0bf965c457f92 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Fri, 17 Apr 2020 11:21:00 +0200 Subject: [PATCH 23/45] removed unnecessary include Signed-off-by: Joshua Hampp --- rclcpp/test/test_intra_process_communication.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/rclcpp/test/test_intra_process_communication.cpp b/rclcpp/test/test_intra_process_communication.cpp index a7d01fecd3..da9bc5eca9 100644 --- a/rclcpp/test/test_intra_process_communication.cpp +++ b/rclcpp/test/test_intra_process_communication.cpp @@ -14,7 +14,6 @@ #include -#include #include #include From d4536b30360d95a6b3abe963e71f67dc3816938b Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Fri, 17 Apr 2020 11:21:11 +0200 Subject: [PATCH 24/45] updated to rclcpp::ok Signed-off-by: Joshua Hampp --- rclcpp/test/test_intra_process_communication.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rclcpp/test/test_intra_process_communication.cpp b/rclcpp/test/test_intra_process_communication.cpp index da9bc5eca9..28706d1a2e 100644 --- a/rclcpp/test/test_intra_process_communication.cpp +++ b/rclcpp/test/test_intra_process_communication.cpp @@ -125,7 +125,7 @@ class TestPublisherSubscriptionSerialized : public ::testing::TestWithParam Date: Fri, 17 Apr 2020 11:42:38 +0200 Subject: [PATCH 25/45] Update rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp Co-Authored-By: Karsten Knese Signed-off-by: Joshua Hampp --- .../include/rclcpp/experimental/subscription_intra_process.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index d6a6772eba..8a82717718 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -215,7 +215,7 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase if (any_callback_.use_take_shared_method()) { ConstMessageSharedPtr msg = buffer_->consume_shared(); - if (msg == nullptr) { + if (nullptr == msg) { throw std::runtime_error("Subscription intra-process could not get serialized message"); } any_callback_.dispatch_intra_process(msg, msg_info); From 657d9a0be4f3475c3de78bc8b8b9f541979bd0b9 Mon Sep 17 00:00:00 2001 From: DensoADAS <46967124+DensoADAS@users.noreply.github.com> Date: Fri, 17 Apr 2020 11:42:11 +0200 Subject: [PATCH 26/45] Update rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp Co-Authored-By: Karsten Knese Signed-off-by: Joshua Hampp --- .../include/rclcpp/experimental/subscription_intra_process.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index 8a82717718..486c5d15f8 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -210,7 +210,7 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase void>::type execute_impl() { - rmw_message_info_t msg_info; + rmw_message_info_t msg_info {}; msg_info.from_intra_process = true; if (any_callback_.use_take_shared_method()) { From 9b65b7cefdad79380892b805e9755ee64303ea21 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Fri, 17 Apr 2020 12:11:40 +0200 Subject: [PATCH 27/45] * default deleter * initiazlize variables Co-Authored-By: Karsten Knese Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/experimental/serialization.hpp | 2 +- .../rclcpp/experimental/subscription_intra_process.hpp | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/rclcpp/include/rclcpp/experimental/serialization.hpp b/rclcpp/include/rclcpp/experimental/serialization.hpp index 76f8ce21b7..4fbdf2255b 100644 --- a/rclcpp/include/rclcpp/experimental/serialization.hpp +++ b/rclcpp/include/rclcpp/experimental/serialization.hpp @@ -31,7 +31,7 @@ namespace experimental class SerializationBase { public: - virtual ~SerializationBase() {} + virtual ~SerializationBase() = default; virtual std::shared_ptr serialize_message(const void * message) = 0; diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index 486c5d15f8..a98c7a2c66 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -161,7 +161,7 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase execute_impl() { if (serializer_) { - rmw_message_info_t msg_info; + rmw_message_info_t msg_info = {}; msg_info.from_intra_process = true; ConstMessageSharedPtr msg = buffer_->consume_shared(); @@ -190,7 +190,7 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase void>::type execute_impl() { - rmw_message_info_t msg_info; + rmw_message_info_t msg_info = {}; msg_info.publisher_gid = {0, {0}}; msg_info.from_intra_process = true; @@ -238,7 +238,7 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase throw std::runtime_error("Subscription intra-process could not get serialized message"); } - rmw_message_info_t msg_info; + rmw_message_info_t msg_info = {}; msg_info.from_intra_process = true; if (any_callback_.use_take_shared_method()) { From 18181f5d8d1d00aa9680fd458fd7ad809a3b1cba Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Fri, 17 Apr 2020 12:24:38 +0200 Subject: [PATCH 28/45] changes due to review of PR Signed-off-by: Joshua Hampp --- .../rclcpp/experimental/serialization.hpp | 41 +++++--- ...d_container.hpp => serialized_message.hpp} | 10 +- .../subscription_intra_process.hpp | 94 ++++++++++--------- rclcpp/include/rclcpp/publisher.hpp | 25 +++-- rclcpp/include/rclcpp/subscription.hpp | 10 +- .../test/test_intra_process_communication.cpp | 2 +- .../rclcpp_lifecycle/lifecycle_publisher.hpp | 16 +++- 7 files changed, 118 insertions(+), 80 deletions(-) rename rclcpp/include/rclcpp/experimental/{serialized_container.hpp => serialized_message.hpp} (89%) diff --git a/rclcpp/include/rclcpp/experimental/serialization.hpp b/rclcpp/include/rclcpp/experimental/serialization.hpp index 4fbdf2255b..db6a79474e 100644 --- a/rclcpp/include/rclcpp/experimental/serialization.hpp +++ b/rclcpp/include/rclcpp/experimental/serialization.hpp @@ -33,10 +33,19 @@ class SerializationBase public: virtual ~SerializationBase() = default; + /// Serialize a ROS2 message to a serialized stream + /** + * \param[in] message The ROS2 message which is read and serialized by rmw. + */ virtual std::shared_ptr serialize_message(const void * message) = 0; + /// Deserialize a serialized stream to a ROS message + /** + * \param[in] serialized_message The serialized message to be converted to ROS2 by rmw. + * \param[out] message The deserialized ROS2 message. + */ virtual void deserialize_message( - const rcl_serialized_message_t & serialized_message, + const rcl_serialized_message_t * serialized_message, void * msg) = 0; }; @@ -61,14 +70,17 @@ class Serialization : public SerializationBase std::string(rcutils_get_error_string().str)); } - if (message) { - const auto error = rmw_serialize( - message, - &type_support_, - serialized_message); - if (error != RCL_RET_OK) { - throw std::runtime_error("Failed to serialize."); - } + if (nullptr == message) { + throw std::runtime_error("Message is nullpointer while serialization."); + } + + const auto error = rmw_serialize( + message, + &type_support_, + serialized_message); + if (error != RCL_RET_OK) { + delete serialized_message; + throw std::runtime_error("Failed to serialize."); } auto shared_serialized_msg = std::shared_ptr( @@ -86,16 +98,17 @@ class Serialization : public SerializationBase return shared_serialized_msg; } - void deserialize_message(const rcl_serialized_message_t & serialized_message, void * msg) override + void deserialize_message(const rcl_serialized_message_t * serialized_message, void * msg) override { - if (serialized_message.buffer_capacity == 0 || - serialized_message.buffer_length == 0 || - !serialized_message.buffer) + if (nullptr == serialized_message || + serialized_message->buffer_capacity == 0 || + serialized_message->buffer_length == 0 || + !serialized_message->buffer) { throw std::runtime_error("Failed to deserialize nullptr serialized message."); } - const auto ret = rmw_deserialize(&serialized_message, &type_support_, msg); + const auto ret = rmw_deserialize(serialized_message, &type_support_, msg); if (ret != RMW_RET_OK) { throw std::runtime_error("Failed to deserialize serialized message."); } diff --git a/rclcpp/include/rclcpp/experimental/serialized_container.hpp b/rclcpp/include/rclcpp/experimental/serialized_message.hpp similarity index 89% rename from rclcpp/include/rclcpp/experimental/serialized_container.hpp rename to rclcpp/include/rclcpp/experimental/serialized_message.hpp index a53e62bfd9..1ff6121709 100644 --- a/rclcpp/include/rclcpp/experimental/serialized_container.hpp +++ b/rclcpp/include/rclcpp/experimental/serialized_message.hpp @@ -27,14 +27,14 @@ namespace experimental { /// Object oriented version of rcl_serialized_message_t with destructor to avoid memory leaks -class SerializedContainer : public rcl_serialized_message_t +class SerializedMessage : public rcl_serialized_message_t { public: - SerializedContainer() + SerializedMessage() : rcl_serialized_message_t(rmw_get_zero_initialized_serialized_message()) {} - explicit SerializedContainer(const SerializedContainer & sc) + explicit SerializedMessage(const SerializedMessage & sc) : rcl_serialized_message_t(rmw_get_zero_initialized_serialized_message()) { const auto ret = rmw_serialized_message_init(this, sc.buffer_length, &sc.allocator); @@ -49,14 +49,14 @@ class SerializedContainer : public rcl_serialized_message_t buffer_length = sc.buffer_length; } - explicit SerializedContainer(rcl_serialized_message_t && msg) + explicit SerializedMessage(rcl_serialized_message_t && msg) : rcl_serialized_message_t(msg) { // reset buffer to prevent double free msg = rmw_get_zero_initialized_serialized_message(); } - ~SerializedContainer() + ~SerializedMessage() { if (buffer != nullptr) { const auto fini_ret = rmw_serialized_message_fini(this); diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index a98c7a2c66..5cd7e34bd2 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -38,7 +38,7 @@ namespace rclcpp namespace experimental { -class SerializedContainer; +class SerializedMessage; template< typename MessageT, @@ -77,7 +77,7 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase any_callback_(callback), serializer_(serializer) { if (!std::is_same::value && - !std::is_same::value && + !std::is_same::value && !std::is_same::value) { throw std::runtime_error("SubscriptionIntraProcess wrong callback type"); @@ -153,44 +153,46 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase (void)ret; } + // convert from ROS2 message to rcl_serialized_message_t (serilizatino needed) template typename std::enable_if< std::is_same::value && - !std::is_same::value, + !std::is_same::value, void>::type execute_impl() { - if (serializer_) { - rmw_message_info_t msg_info = {}; - msg_info.from_intra_process = true; + if (nullptr == serializer_) { + throw std::runtime_error("Subscription intra-process can't handle serialized messages"); + } - ConstMessageSharedPtr msg = buffer_->consume_shared(); - auto serialized_msg = - serializer_->serialize_message(reinterpret_cast(msg.get())); + rmw_message_info_t msg_info; + msg_info.from_intra_process = true; - if (serialized_msg == nullptr) { - throw std::runtime_error("Subscription intra-process could not serialize message"); - } + ConstMessageSharedPtr msg = buffer_->consume_shared(); + auto serialized_msg = + serializer_->serialize_message(reinterpret_cast(msg.get())); - if (any_callback_.use_take_shared_method()) { - any_callback_.dispatch_intra_process(serialized_msg, msg_info); - } else { - throw std::runtime_error("Subscription intra-process for serialized " - "messages does not support unique pointers."); - } + if (nullptr == serialized_msg) { + throw std::runtime_error("Subscription intra-process could not serialize message"); + } + + if (any_callback_.use_take_shared_method()) { + any_callback_.dispatch_intra_process(serialized_msg, msg_info); } else { - throw std::runtime_error("Subscription intra-process can't handle serialized messages"); + throw std::runtime_error("Subscription intra-process for serialized " + "messages does not support unique pointers."); } } + // forward from ROS2 message to ROS2 message (same type) template typename std::enable_if< !std::is_same::value && - !std::is_same::value, + !std::is_same::value, void>::type execute_impl() { - rmw_message_info_t msg_info = {}; + rmw_message_info_t msg_info; msg_info.publisher_gid = {0, {0}}; msg_info.from_intra_process = true; @@ -203,19 +205,20 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase } } + // forward from rcl_serialized_message_t to SerializationMessage (no conversion needed) template typename std::enable_if< std::is_same::value && - std::is_same::value, + std::is_same::value, void>::type execute_impl() { - rmw_message_info_t msg_info {}; + rmw_message_info_t msg_info; msg_info.from_intra_process = true; if (any_callback_.use_take_shared_method()) { ConstMessageSharedPtr msg = buffer_->consume_shared(); - if (nullptr == msg) { + if (msg == nullptr) { throw std::runtime_error("Subscription intra-process could not get serialized message"); } any_callback_.dispatch_intra_process(msg, msg_info); @@ -225,35 +228,36 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase } } + // convert from rcl_serialized_message_t to ROS2 message (deserialization needed) template typename std::enable_if< !std::is_same::value && - std::is_same::value, + std::is_same::value, void>::type execute_impl() { - if (serializer_) { - ConstMessageSharedPtr serialized_container = buffer_->consume_shared(); - if (serialized_container == nullptr) { - throw std::runtime_error("Subscription intra-process could not get serialized message"); - } + if (nullptr == serializer_) { + throw std::runtime_error("Subscription intra-process can't handle unserialized messages"); + } - rmw_message_info_t msg_info = {}; - msg_info.from_intra_process = true; - - if (any_callback_.use_take_shared_method()) { - CallbackMessageSharedPtr msg = construct_unique(); - serializer_->deserialize_message(*serialized_container, - reinterpret_cast(msg.get())); - any_callback_.dispatch_intra_process(msg, msg_info); - } else { - CallbackMessageUniquePtr msg = construct_unique(); - serializer_->deserialize_message(*serialized_container, - reinterpret_cast(msg.get())); - any_callback_.dispatch_intra_process(std::move(msg), msg_info); - } + ConstMessageSharedPtr serialized_container = buffer_->consume_shared(); + if (nullptr == serialized_container) { + throw std::runtime_error("Subscription intra-process could not get serialized message"); + } + + rmw_message_info_t msg_info; + msg_info.from_intra_process = true; + + if (any_callback_.use_take_shared_method()) { + CallbackMessageSharedPtr msg = construct_unique(); + serializer_->deserialize_message(serialized_container.get(), + reinterpret_cast(msg.get())); + any_callback_.dispatch_intra_process(msg, msg_info); } else { - throw std::runtime_error("Subscription intra-process can't handle unserialized messages"); + CallbackMessageUniquePtr msg = construct_unique(); + serializer_->deserialize_message(serialized_container.get(), + reinterpret_cast(msg.get())); + any_callback_.dispatch_intra_process(std::move(msg), msg_info); } } diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index ab0e6c15fe..4651bb26b0 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -57,11 +57,11 @@ class Publisher : public PublisherBase using MessageUniquePtr = std::unique_ptr; using MessageSharedPtr = std::shared_ptr; using SerializedMessageAllocatorTraits = - allocator::AllocRebind; using SerializedMessageAllocator = typename SerializedMessageAllocatorTraits::allocator_type; using SerializedMessageDeleter = allocator::Deleter; + rclcpp::experimental::SerializedMessage>; RCLCPP_SMART_PTR_DEFINITIONS(Publisher) @@ -206,6 +206,13 @@ class Publisher : public PublisherBase this->do_publish_message(msg); } + void + publish(const rcl_serialized_message_t & serialized_msg) + { + // Kept for backwards compatibility. Copies compelete memory! + this->publish(std::make_unique(serialized_msg)); + } + /// Publish a serialized message. Non specialized version to prevent compiling errors. template void publish(std::unique_ptr serialized_msg) @@ -219,7 +226,7 @@ class Publisher : public PublisherBase template void publish(std::unique_ptr serialized_msg) { - this->do_serialized_publish(serialized_msg.get()); + this->do_serialized_publish(*serialized_msg); } /// Publish an instance of a LoanedMessage. @@ -349,21 +356,21 @@ class Publisher : public PublisherBase } void - do_serialized_publish(rcl_serialized_message_t * serialized_msg) + do_serialized_publish(rcl_serialized_message_t serialized_msg) { bool inter_process_publish_needed = get_subscription_count() > get_intra_process_subscription_count(); if (inter_process_publish_needed) { // declare here to avoid deletion before returning method - auto status = rcl_publish_serialized_message(&publisher_handle_, serialized_msg, nullptr); + auto status = rcl_publish_serialized_message(&publisher_handle_, &serialized_msg, nullptr); if (RCL_RET_OK != status) { rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish serialized message"); } } - auto msg = std::make_unique( - std::move(*serialized_msg)); + auto msg = std::make_unique( + std::move(serialized_msg)); if (intra_process_is_enabled_) { do_intra_process_publish(std::move(msg), message_allocator_serialized_); @@ -406,7 +413,7 @@ class Publisher : public PublisherBase } const uint64_t intra_process_publisher_id = std::is_same::value ? + rclcpp::experimental::SerializedMessage>::value ? intra_process_publisher_id_serialized_ : intra_process_publisher_id_; ipm->template do_intra_process_publish( @@ -431,7 +438,7 @@ class Publisher : public PublisherBase } const uint64_t intra_process_publisher_id = std::is_same::value ? + rclcpp::experimental::SerializedMessage>::value ? intra_process_publisher_id_serialized_ : intra_process_publisher_id_; return ipm->template do_intra_process_publish_and_return_shared( diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 9f6284ed65..d8d91b755f 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -35,7 +35,7 @@ #include "rclcpp/exceptions.hpp" #include "rclcpp/expand_topic_or_service_name.hpp" #include "rclcpp/experimental/intra_process_manager.hpp" -#include "rclcpp/experimental/serialized_container.hpp" +#include "rclcpp/experimental/serialized_message.hpp" #include "rclcpp/experimental/subscription_intra_process.hpp" #include "rclcpp/logging.hpp" #include "rclcpp/macros.hpp" @@ -191,20 +191,20 @@ class Subscription : public SubscriptionBase { using SerializedMessageAllocatorTraits = - allocator::AllocRebind; using SerializedMessageAllocator = typename SerializedMessageAllocatorTraits::allocator_type; using SerializedMessageDeleter = allocator::Deleter; + rclcpp::experimental::SerializedMessage>; using SerializedMessageUniquePtr = - std::unique_ptr; // First create a SubscriptionIntraProcess which will be given to the intra-process manager. auto subscription_intra_process = std::make_shared< rclcpp::experimental::SubscriptionIntraProcess< - rclcpp::experimental::SerializedContainer, + rclcpp::experimental::SerializedMessage, AllocatorT, typename SerializedMessageUniquePtr::deleter_type, CallbackMessageT diff --git a/rclcpp/test/test_intra_process_communication.cpp b/rclcpp/test/test_intra_process_communication.cpp index 28706d1a2e..b6a5e30f0d 100644 --- a/rclcpp/test/test_intra_process_communication.cpp +++ b/rclcpp/test/test_intra_process_communication.cpp @@ -26,7 +26,7 @@ #include "rclcpp/publisher.hpp" #include "rclcpp/rclcpp.hpp" -#include "rclcpp/experimental/serialized_container.hpp" +#include "rclcpp/experimental/serialized_message.hpp" int32_t & get_test_allocation_counter() { diff --git a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp index 960ae91c04..e01b6628bf 100644 --- a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp +++ b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp @@ -110,6 +110,20 @@ class LifecyclePublisher : public LifecyclePublisherInterface, } rclcpp::Publisher::publish(msg); } + + void + publish(const rcl_serialized_message_t & serialized_msg) + { + if (!enabled_) { + RCLCPP_WARN( + logger_, + "Trying to publish message on the topic '%s', but the publisher is not activated", + this->get_topic_name()); + + return; + } + rclcpp::Publisher::publish(msg); + } /// Publish a serialized message. Non specialized version to prevent compiling errors. template @@ -132,7 +146,7 @@ class LifecyclePublisher : public LifecyclePublisherInterface, return; } - this->do_serialized_publish(serialized_msg.get()); + this->do_serialized_publish(*serialized_msg); } virtual void From 4c7506f5aad63ac791af567f33a1d12282e826c7 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Fri, 17 Apr 2020 12:26:51 +0200 Subject: [PATCH 29/45] uncrustify Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/create_publisher.hpp | 5 ++- rclcpp/include/rclcpp/create_subscription.hpp | 7 +-- .../subscription_intra_process.hpp | 9 ++-- rclcpp/include/rclcpp/publisher_factory.hpp | 5 ++- rclcpp/include/rclcpp/subscription.hpp | 18 +++++--- rclcpp/include/rclcpp/wait_set_template.hpp | 4 +- rclcpp/src/rclcpp/subscription_base.cpp | 2 +- .../test/test_intra_process_communication.cpp | 43 +++++++++++-------- 8 files changed, 56 insertions(+), 37 deletions(-) diff --git a/rclcpp/include/rclcpp/create_publisher.hpp b/rclcpp/include/rclcpp/create_publisher.hpp index 649e0bc8b1..89536d0e98 100644 --- a/rclcpp/include/rclcpp/create_publisher.hpp +++ b/rclcpp/include/rclcpp/create_publisher.hpp @@ -84,8 +84,9 @@ create_publisher( { const auto type_support = *rosidl_typesupport_cpp::get_message_type_support_handle(); - return create_publisher(node, topic_name, type_support, - qos, options); + return create_publisher( + node, topic_name, type_support, + qos, options); } } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/create_subscription.hpp b/rclcpp/include/rclcpp/create_subscription.hpp index 56fd37b3c3..fb9ffea744 100644 --- a/rclcpp/include/rclcpp/create_subscription.hpp +++ b/rclcpp/include/rclcpp/create_subscription.hpp @@ -107,9 +107,10 @@ create_subscription( const auto type_support = *rosidl_typesupport_cpp::get_message_type_support_handle(); return create_subscription(std::forward( - node), topic_name, type_support, qos, std::forward( - callback), options, msg_mem_strat); + MessageMemoryStrategyT>( + std::forward( + node), topic_name, type_support, qos, std::forward( + callback), options, msg_mem_strat); } } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index 5cd7e34bd2..4fb1ddf989 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -223,7 +223,8 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase } any_callback_.dispatch_intra_process(msg, msg_info); } else { - throw std::runtime_error("Subscription intra-process for serialized " + throw std::runtime_error( + "Subscription intra-process for serialized " "messages does not support unique pointers."); } } @@ -250,12 +251,14 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase if (any_callback_.use_take_shared_method()) { CallbackMessageSharedPtr msg = construct_unique(); - serializer_->deserialize_message(serialized_container.get(), + serializer_->deserialize_message( + serialized_container.get(), reinterpret_cast(msg.get())); any_callback_.dispatch_intra_process(msg, msg_info); } else { CallbackMessageUniquePtr msg = construct_unique(); - serializer_->deserialize_message(serialized_container.get(), + serializer_->deserialize_message( + serialized_container.get(), reinterpret_cast(msg.get())); any_callback_.dispatch_intra_process(std::move(msg), msg_info); } diff --git a/rclcpp/include/rclcpp/publisher_factory.hpp b/rclcpp/include/rclcpp/publisher_factory.hpp index 7f64567318..f287d8e49d 100644 --- a/rclcpp/include/rclcpp/publisher_factory.hpp +++ b/rclcpp/include/rclcpp/publisher_factory.hpp @@ -75,8 +75,9 @@ create_publisher_factory( const rclcpp::QoS & qos ) -> std::shared_ptr { - auto publisher = std::make_shared(node_base, topic_name, qos, options, - type_support); + auto publisher = std::make_shared( + node_base, topic_name, qos, options, + type_support); // This is used for setting up things like intra process comms which // require this->shared_from_this() which cannot be called from // the constructor. diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index d8d91b755f..52c4416050 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -177,8 +177,9 @@ class Subscription : public SubscriptionBase this->get_topic_name(), // important to get it by the fully-qualified name qos.get_rmw_qos_profile(), resolve_intra_process_buffer_type(options.intra_process_buffer_type, callback), - std::make_shared(type_support_handle, - options.template to_rcl_subscription_options(qos).allocator) + std::make_shared( + type_support_handle, + options.template to_rcl_subscription_options(qos).allocator) ); TRACEPOINT( rclcpp_subscription_init, @@ -215,8 +216,9 @@ class Subscription : public SubscriptionBase this->get_topic_name(), // important to get it by the fully-qualified name qos.get_rmw_qos_profile(), resolve_intra_process_buffer_type(options.intra_process_buffer_type, callback), - std::make_shared(type_support_handle, - options.template to_rcl_subscription_options(qos).allocator) + std::make_shared( + type_support_handle, + options.template to_rcl_subscription_options(qos).allocator) ); TRACEPOINT( rclcpp_subscription_init, @@ -224,11 +226,13 @@ class Subscription : public SubscriptionBase (const void *)subscription_intra_process.get()); // Add it to the intra process manager. - intra_process_subscription_id_serialized = ipm->add_subscription(subscription_intra_process, - true); + intra_process_subscription_id_serialized = ipm->add_subscription( + subscription_intra_process, + true); } - this->setup_intra_process({intra_process_subscription_id, + this->setup_intra_process( + {intra_process_subscription_id, intra_process_subscription_id_serialized}, ipm); } diff --git a/rclcpp/include/rclcpp/wait_set_template.hpp b/rclcpp/include/rclcpp/wait_set_template.hpp index 39ac5f63df..96e67f0a2b 100644 --- a/rclcpp/include/rclcpp/wait_set_template.hpp +++ b/rclcpp/include/rclcpp/wait_set_template.hpp @@ -165,7 +165,7 @@ class WaitSetTemplate final : private SynchronizationPolicy, private StoragePoli } if (mask.include_intra_process_waitable) { auto local_subscription = inner_subscription; - for(auto waitable : inner_subscription->get_intra_process_waitables()) { + for (auto waitable : inner_subscription->get_intra_process_waitables()) { if (nullptr != waitable) { bool already_in_use = local_subscription->exchange_in_use_by_wait_set_state( waitable.get(), @@ -231,7 +231,7 @@ class WaitSetTemplate final : private SynchronizationPolicy, private StoragePoli } } if (mask.include_intra_process_waitable) { - for(auto local_waitable : inner_subscription->get_intra_process_waitables()) { + for (auto local_waitable : inner_subscription->get_intra_process_waitables()) { inner_subscription->exchange_in_use_by_wait_set_state(local_waitable.get(), false); if (nullptr != local_waitable) { // This is the case when intra process is disabled for the subscription. diff --git a/rclcpp/src/rclcpp/subscription_base.cpp b/rclcpp/src/rclcpp/subscription_base.cpp index 600ecd344f..1f6316c0c8 100644 --- a/rclcpp/src/rclcpp/subscription_base.cpp +++ b/rclcpp/src/rclcpp/subscription_base.cpp @@ -281,7 +281,7 @@ SubscriptionBase::exchange_in_use_by_wait_set_state( if (this == pointer_to_subscription_part) { return subscription_in_use_by_wait_set_.exchange(in_use_state); } - for(auto & waitable : get_intra_process_waitables()) { + for (auto & waitable : get_intra_process_waitables()) { if (waitable.get() == pointer_to_subscription_part) { return intra_process_subscription_waitable_in_use_by_wait_set_.exchange(in_use_state); } diff --git a/rclcpp/test/test_intra_process_communication.cpp b/rclcpp/test/test_intra_process_communication.cpp index b6a5e30f0d..a2cccc7e52 100644 --- a/rclcpp/test/test_intra_process_communication.cpp +++ b/rclcpp/test/test_intra_process_communication.cpp @@ -94,7 +94,8 @@ rcl_serialized_message_t make_serialized_string_msg( (); auto error = rmw_serialize(stringMsg.get(), type, &msg); if (error != RMW_RET_OK) { - RCUTILS_LOG_ERROR_NAMED("test_intra_process_communication", + RCUTILS_LOG_ERROR_NAMED( + "test_intra_process_communication", "Something went wrong preparing the serialized message"); } @@ -179,14 +180,18 @@ TEST_P(TestPublisherSubscriptionSerialized, publish_serialized) parameters.node_options[0]); auto publisher = node->create_publisher("/topic", 10); - auto sub_shared = node->create_subscription("/topic", 10, - &OnMessage); - auto sub_unique = node->create_subscription("/topic", 10, - &OnMessageUniquePtr); - auto sub_const_shared = node->create_subscription("/topic", 10, - &OnMessageConst); - auto sub_serialized = node->create_subscription("/topic", 10, - &OnMessageSerialized); + auto sub_shared = node->create_subscription( + "/topic", 10, + &OnMessage); + auto sub_unique = node->create_subscription( + "/topic", 10, + &OnMessageUniquePtr); + auto sub_const_shared = node->create_subscription( + "/topic", 10, + &OnMessageConst); + auto sub_serialized = node->create_subscription( + "/topic", 10, + &OnMessageSerialized); rclcpp::sleep_for(offset); @@ -243,14 +248,18 @@ TEST_P(TestPublisherSubscriptionSerialized, publish_serialized_generic) rclcpp::QoS(10), &OnMessageSerialized); - auto sub_shared = node->create_subscription("/topic", 10, - &OnMessage); - auto sub_unique = node->create_subscription("/topic", 10, - &OnMessageUniquePtr); - auto sub_const_shared = node->create_subscription("/topic", 10, - &OnMessageConst); - auto sub_serialized = node->create_subscription("/topic", 10, - &OnMessageSerialized); + auto sub_shared = node->create_subscription( + "/topic", 10, + &OnMessage); + auto sub_unique = node->create_subscription( + "/topic", 10, + &OnMessageUniquePtr); + auto sub_const_shared = node->create_subscription( + "/topic", 10, + &OnMessageConst); + auto sub_serialized = node->create_subscription( + "/topic", 10, + &OnMessageSerialized); rclcpp::sleep_for(offset); From 2cefdca2972f2daeb2f46728287e1e2223ecdd5b Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Fri, 17 Apr 2020 12:27:27 +0200 Subject: [PATCH 30/45] added backwards compatibility for publishing serialized messages Signed-off-by: Joshua Hampp --- .../rclcpp/experimental/serialized_message.hpp | 15 +++++++++++++++ rclcpp/include/rclcpp/publisher.hpp | 17 +++++------------ .../rclcpp_lifecycle/lifecycle_publisher.hpp | 14 -------------- 3 files changed, 20 insertions(+), 26 deletions(-) diff --git a/rclcpp/include/rclcpp/experimental/serialized_message.hpp b/rclcpp/include/rclcpp/experimental/serialized_message.hpp index 1ff6121709..d7a27d598f 100644 --- a/rclcpp/include/rclcpp/experimental/serialized_message.hpp +++ b/rclcpp/include/rclcpp/experimental/serialized_message.hpp @@ -49,6 +49,21 @@ class SerializedMessage : public rcl_serialized_message_t buffer_length = sc.buffer_length; } + explicit SerializedMessage(const rcl_serialized_message_t & sc) + : rcl_serialized_message_t(rmw_get_zero_initialized_serialized_message()) + { + const auto ret = rmw_serialized_message_init(this, sc.buffer_length, &sc.allocator); + if (ret != RCL_RET_OK) { + rclcpp::exceptions::throw_from_rcl_error(ret); + } + + // do not call memcpy if the pointer is "static" + if (buffer != sc.buffer) { + std::memcpy(buffer, sc.buffer, sc.buffer_length); + } + buffer_length = sc.buffer_length; + } + explicit SerializedMessage(rcl_serialized_message_t && msg) : rcl_serialized_message_t(msg) { diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index 4651bb26b0..09aff0018a 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -192,8 +192,9 @@ class Publisher : public PublisherBase get_subscription_count() > get_intra_process_subscription_count(); if (inter_process_publish_needed) { - auto shared_msg = this->do_intra_process_publish_and_return_shared(std::move( - msg), message_allocator_); + auto shared_msg = this->do_intra_process_publish_and_return_shared( + std::move( + msg), message_allocator_); this->do_inter_process_publish(*shared_msg); } else { this->do_intra_process_publish(std::move(msg), message_allocator_); @@ -206,13 +207,6 @@ class Publisher : public PublisherBase this->do_publish_message(msg); } - void - publish(const rcl_serialized_message_t & serialized_msg) - { - // Kept for backwards compatibility. Copies compelete memory! - this->publish(std::make_unique(serialized_msg)); - } - /// Publish a serialized message. Non specialized version to prevent compiling errors. template void publish(std::unique_ptr serialized_msg) @@ -330,9 +324,8 @@ class Publisher : public PublisherBase typename std::enable_if::value>::type do_publish_message(const MessageT & msg) { - (void)msg; - throw std::runtime_error( - "publishing serialized messages is only supported for unique pointers"); + // Kept for backwards compatibility. Copies compelete memory! + this->publish(std::make_unique(msg)); } void diff --git a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp index e01b6628bf..4eb3c81a0c 100644 --- a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp +++ b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp @@ -110,20 +110,6 @@ class LifecyclePublisher : public LifecyclePublisherInterface, } rclcpp::Publisher::publish(msg); } - - void - publish(const rcl_serialized_message_t & serialized_msg) - { - if (!enabled_) { - RCLCPP_WARN( - logger_, - "Trying to publish message on the topic '%s', but the publisher is not activated", - this->get_topic_name()); - - return; - } - rclcpp::Publisher::publish(msg); - } /// Publish a serialized message. Non specialized version to prevent compiling errors. template From 2147026fbf7b1656eb83d110d2e3a44ae6283ea6 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Fri, 17 Apr 2020 12:27:41 +0200 Subject: [PATCH 31/45] initializing variables Signed-off-by: Joshua Hampp --- .../experimental/subscription_intra_process.hpp | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index 4fb1ddf989..e44f27c9b4 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -165,7 +165,7 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase throw std::runtime_error("Subscription intra-process can't handle serialized messages"); } - rmw_message_info_t msg_info; + rmw_message_info_t msg_info = {}; msg_info.from_intra_process = true; ConstMessageSharedPtr msg = buffer_->consume_shared(); @@ -179,7 +179,8 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase if (any_callback_.use_take_shared_method()) { any_callback_.dispatch_intra_process(serialized_msg, msg_info); } else { - throw std::runtime_error("Subscription intra-process for serialized " + throw std::runtime_error( + "Subscription intra-process for serialized " "messages does not support unique pointers."); } } @@ -192,7 +193,7 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase void>::type execute_impl() { - rmw_message_info_t msg_info; + rmw_message_info_t msg_info = {}; msg_info.publisher_gid = {0, {0}}; msg_info.from_intra_process = true; @@ -213,7 +214,7 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase void>::type execute_impl() { - rmw_message_info_t msg_info; + rmw_message_info_t msg_info = {}; msg_info.from_intra_process = true; if (any_callback_.use_take_shared_method()) { @@ -246,7 +247,7 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase throw std::runtime_error("Subscription intra-process could not get serialized message"); } - rmw_message_info_t msg_info; + rmw_message_info_t msg_info = {}; msg_info.from_intra_process = true; if (any_callback_.use_take_shared_method()) { From 9b436fa7770495d45626a1da7ad832cf68586955 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20K=C3=B6nig?= Date: Fri, 17 Apr 2020 15:38:58 +0200 Subject: [PATCH 32/45] fix include guards Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/experimental/serialized_message.hpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rclcpp/include/rclcpp/experimental/serialized_message.hpp b/rclcpp/include/rclcpp/experimental/serialized_message.hpp index d7a27d598f..0202eb97eb 100644 --- a/rclcpp/include/rclcpp/experimental/serialized_message.hpp +++ b/rclcpp/include/rclcpp/experimental/serialized_message.hpp @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -#ifndef RCLCPP__EXPERIMENTAL__SERIALIZED_CONTAINER_HPP_ -#define RCLCPP__EXPERIMENTAL__SERIALIZED_CONTAINER_HPP_ +#ifndef RCLCPP__EXPERIMENTAL__SERIALIZED_MESSAGE_HPP_ +#define RCLCPP__EXPERIMENTAL__SERIALIZED_MESSAGE_HPP_ #include @@ -87,4 +87,4 @@ class SerializedMessage : public rcl_serialized_message_t } // namespace experimental } // namespace rclcpp -#endif // RCLCPP__EXPERIMENTAL__SERIALIZED_CONTAINER_HPP_ +#endif // RCLCPP__EXPERIMENTAL__SERIALIZED_MESSAGE_HPP_ From a2fc8bd867c2d530ce405e88cacfdef2f7a22150 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Fri, 17 Apr 2020 19:16:37 +0200 Subject: [PATCH 33/45] enabled publishing of rcl_serialized_message_t Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/publisher.hpp | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index 09aff0018a..3261f80df1 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -207,6 +207,13 @@ class Publisher : public PublisherBase this->do_publish_message(msg); } + template + typename std::enable_if::value>::type + publish(const rcl_serialized_message_t & serialized_msg) + { + this->do_publish_message(serialized_msg); + } + /// Publish a serialized message. Non specialized version to prevent compiling errors. template void publish(std::unique_ptr serialized_msg) @@ -322,7 +329,7 @@ class Publisher : public PublisherBase template typename std::enable_if::value>::type - do_publish_message(const MessageT & msg) + do_publish_message(const T & msg) { // Kept for backwards compatibility. Copies compelete memory! this->publish(std::make_unique(msg)); From b482164b9767dac456bd1658d23547e2bb13eace Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Fri, 17 Apr 2020 20:56:06 +0200 Subject: [PATCH 34/45] moved serialization and serialized message out of "experimental" Signed-off-by: Joshua Hampp --- .../experimental/subscription_intra_process.hpp | 13 +++++++------ rclcpp/include/rclcpp/publisher.hpp | 12 ++++++------ .../rclcpp/{experimental => }/serialization.hpp | 9 +++------ .../{experimental => }/serialized_message.hpp | 11 +++++------ rclcpp/include/rclcpp/subscription.hpp | 14 +++++++------- rclcpp/test/test_intra_process_communication.cpp | 2 +- 6 files changed, 29 insertions(+), 32 deletions(-) rename rclcpp/include/rclcpp/{experimental => }/serialization.hpp (94%) rename rclcpp/include/rclcpp/{experimental => }/serialized_message.hpp (92%) diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index e44f27c9b4..0172b0d036 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -27,8 +27,9 @@ #include "rclcpp/any_subscription_callback.hpp" #include "rclcpp/experimental/buffers/intra_process_buffer.hpp" #include "rclcpp/experimental/create_intra_process_buffer.hpp" -#include "rclcpp/experimental/serialization.hpp" #include "rclcpp/experimental/subscription_intra_process_base.hpp" +#include "rclcpp/serialization.hpp" +#include "rclcpp/serialized_message.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/waitable.hpp" #include "tracetools/tracetools.h" @@ -77,7 +78,7 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase any_callback_(callback), serializer_(serializer) { if (!std::is_same::value && - !std::is_same::value && + !std::is_same::value && !std::is_same::value) { throw std::runtime_error("SubscriptionIntraProcess wrong callback type"); @@ -157,7 +158,7 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase template typename std::enable_if< std::is_same::value && - !std::is_same::value, + !std::is_same::value, void>::type execute_impl() { @@ -189,7 +190,7 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase template typename std::enable_if< !std::is_same::value && - !std::is_same::value, + !std::is_same::value, void>::type execute_impl() { @@ -210,7 +211,7 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase template typename std::enable_if< std::is_same::value && - std::is_same::value, + std::is_same::value, void>::type execute_impl() { @@ -234,7 +235,7 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase template typename std::enable_if< !std::is_same::value && - std::is_same::value, + std::is_same::value, void>::type execute_impl() { diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index 3261f80df1..bb40e26f2f 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -57,11 +57,11 @@ class Publisher : public PublisherBase using MessageUniquePtr = std::unique_ptr; using MessageSharedPtr = std::shared_ptr; using SerializedMessageAllocatorTraits = - allocator::AllocRebind; using SerializedMessageAllocator = typename SerializedMessageAllocatorTraits::allocator_type; using SerializedMessageDeleter = allocator::Deleter; + rclcpp::SerializedMessage>; RCLCPP_SMART_PTR_DEFINITIONS(Publisher) @@ -332,7 +332,7 @@ class Publisher : public PublisherBase do_publish_message(const T & msg) { // Kept for backwards compatibility. Copies compelete memory! - this->publish(std::make_unique(msg)); + this->publish(std::make_unique(msg)); } void @@ -369,7 +369,7 @@ class Publisher : public PublisherBase } } - auto msg = std::make_unique( + auto msg = std::make_unique( std::move(serialized_msg)); if (intra_process_is_enabled_) { @@ -413,7 +413,7 @@ class Publisher : public PublisherBase } const uint64_t intra_process_publisher_id = std::is_same::value ? + rclcpp::SerializedMessage>::value ? intra_process_publisher_id_serialized_ : intra_process_publisher_id_; ipm->template do_intra_process_publish( @@ -438,7 +438,7 @@ class Publisher : public PublisherBase } const uint64_t intra_process_publisher_id = std::is_same::value ? + rclcpp::SerializedMessage>::value ? intra_process_publisher_id_serialized_ : intra_process_publisher_id_; return ipm->template do_intra_process_publish_and_return_shared( diff --git a/rclcpp/include/rclcpp/experimental/serialization.hpp b/rclcpp/include/rclcpp/serialization.hpp similarity index 94% rename from rclcpp/include/rclcpp/experimental/serialization.hpp rename to rclcpp/include/rclcpp/serialization.hpp index db6a79474e..ebff8853da 100644 --- a/rclcpp/include/rclcpp/experimental/serialization.hpp +++ b/rclcpp/include/rclcpp/serialization.hpp @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -#ifndef RCLCPP__EXPERIMENTAL__SERIALIZATION_HPP_ -#define RCLCPP__EXPERIMENTAL__SERIALIZATION_HPP_ +#ifndef RCLCPP__SERIALIZATION_HPP_ +#define RCLCPP__SERIALIZATION_HPP_ #include @@ -24,8 +24,6 @@ namespace rclcpp { -namespace experimental -{ /// Interface to (de)serialize a message class SerializationBase @@ -119,7 +117,6 @@ class Serialization : public SerializationBase rcutils_allocator_t rcutils_allocator_; }; -} // namespace experimental } // namespace rclcpp -#endif // RCLCPP__EXPERIMENTAL__SERIALIZATION_HPP_ +#endif // RCLCPP__SERIALIZATION_HPP_ diff --git a/rclcpp/include/rclcpp/experimental/serialized_message.hpp b/rclcpp/include/rclcpp/serialized_message.hpp similarity index 92% rename from rclcpp/include/rclcpp/experimental/serialized_message.hpp rename to rclcpp/include/rclcpp/serialized_message.hpp index 0202eb97eb..162a2e8f46 100644 --- a/rclcpp/include/rclcpp/experimental/serialized_message.hpp +++ b/rclcpp/include/rclcpp/serialized_message.hpp @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -#ifndef RCLCPP__EXPERIMENTAL__SERIALIZED_MESSAGE_HPP_ -#define RCLCPP__EXPERIMENTAL__SERIALIZED_MESSAGE_HPP_ +#ifndef RCLCPP__SERIALIZED_MESSAGE_HPP_ +#define RCLCPP__SERIALIZED_MESSAGE_HPP_ #include @@ -21,10 +21,10 @@ #include "rmw/serialized_message.h" +#include + namespace rclcpp { -namespace experimental -{ /// Object oriented version of rcl_serialized_message_t with destructor to avoid memory leaks class SerializedMessage : public rcl_serialized_message_t @@ -84,7 +84,6 @@ class SerializedMessage : public rcl_serialized_message_t } }; -} // namespace experimental } // namespace rclcpp -#endif // RCLCPP__EXPERIMENTAL__SERIALIZED_MESSAGE_HPP_ +#endif // RCLCPP__SERIALIZED_MESSAGE_HPP_ diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 52c4416050..9568027e9e 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -35,13 +35,13 @@ #include "rclcpp/exceptions.hpp" #include "rclcpp/expand_topic_or_service_name.hpp" #include "rclcpp/experimental/intra_process_manager.hpp" -#include "rclcpp/experimental/serialized_message.hpp" #include "rclcpp/experimental/subscription_intra_process.hpp" #include "rclcpp/logging.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/message_info.hpp" #include "rclcpp/message_memory_strategy.hpp" #include "rclcpp/node_interfaces/node_base_interface.hpp" +#include "rclcpp/serialized_message.hpp" #include "rclcpp/subscription_base.hpp" #include "rclcpp/subscription_options.hpp" #include "rclcpp/subscription_traits.hpp" @@ -177,7 +177,7 @@ class Subscription : public SubscriptionBase this->get_topic_name(), // important to get it by the fully-qualified name qos.get_rmw_qos_profile(), resolve_intra_process_buffer_type(options.intra_process_buffer_type, callback), - std::make_shared( + std::make_shared( type_support_handle, options.template to_rcl_subscription_options(qos).allocator) ); @@ -192,20 +192,20 @@ class Subscription : public SubscriptionBase { using SerializedMessageAllocatorTraits = - allocator::AllocRebind; using SerializedMessageAllocator = typename SerializedMessageAllocatorTraits::allocator_type; using SerializedMessageDeleter = allocator::Deleter; + rclcpp::SerializedMessage>; using SerializedMessageUniquePtr = - std::unique_ptr; // First create a SubscriptionIntraProcess which will be given to the intra-process manager. auto subscription_intra_process = std::make_shared< rclcpp::experimental::SubscriptionIntraProcess< - rclcpp::experimental::SerializedMessage, + rclcpp::SerializedMessage, AllocatorT, typename SerializedMessageUniquePtr::deleter_type, CallbackMessageT @@ -216,7 +216,7 @@ class Subscription : public SubscriptionBase this->get_topic_name(), // important to get it by the fully-qualified name qos.get_rmw_qos_profile(), resolve_intra_process_buffer_type(options.intra_process_buffer_type, callback), - std::make_shared( + std::make_shared( type_support_handle, options.template to_rcl_subscription_options(qos).allocator) ); diff --git a/rclcpp/test/test_intra_process_communication.cpp b/rclcpp/test/test_intra_process_communication.cpp index a2cccc7e52..f50c544c2e 100644 --- a/rclcpp/test/test_intra_process_communication.cpp +++ b/rclcpp/test/test_intra_process_communication.cpp @@ -26,7 +26,7 @@ #include "rclcpp/publisher.hpp" #include "rclcpp/rclcpp.hpp" -#include "rclcpp/experimental/serialized_message.hpp" +#include "rclcpp/serialized_message.hpp" int32_t & get_test_allocation_counter() { From 93dc82699b0b434a44b4cafe96e9c19d277a6973 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Fri, 17 Apr 2020 20:56:24 +0200 Subject: [PATCH 35/45] added original constructor to LifcyclePublisher Signed-off-by: Joshua Hampp --- .../include/rclcpp_lifecycle/lifecycle_publisher.hpp | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp index 4eb3c81a0c..68bf091596 100644 --- a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp +++ b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp @@ -57,6 +57,17 @@ class LifecyclePublisher : public LifecyclePublisherInterface, using MessageDeleter = rclcpp::allocator::Deleter; using MessageUniquePtr = std::unique_ptr; + LifecyclePublisher( + rclcpp::node_interfaces::NodeBaseInterface * node_base, + const std::string & topic, + const rclcpp::QoS & qos, + const rclcpp::PublisherOptionsWithAllocator & options) + : rclcpp::Publisher(node_base, topic, qos, options), + enabled_(false), + logger_(rclcpp::get_logger("LifecyclePublisher")) + { + } + LifecyclePublisher( rclcpp::node_interfaces::NodeBaseInterface * node_base, const std::string & topic, From 35fd4908f850cb0b9558bfa75f40ab0cfe038b39 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Fri, 17 Apr 2020 20:56:46 +0200 Subject: [PATCH 36/45] fixed year in header Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/serialization.hpp | 2 +- rclcpp/include/rclcpp/serialized_message.hpp | 2 +- rclcpp/test/test_intra_process_communication.cpp | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/rclcpp/include/rclcpp/serialization.hpp b/rclcpp/include/rclcpp/serialization.hpp index ebff8853da..2d7bae3f7a 100644 --- a/rclcpp/include/rclcpp/serialization.hpp +++ b/rclcpp/include/rclcpp/serialization.hpp @@ -1,4 +1,4 @@ -// Copyright 2019 Open Source Robotics Foundation, Inc. +// Copyright 2020 Open Source Robotics Foundation, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/rclcpp/include/rclcpp/serialized_message.hpp b/rclcpp/include/rclcpp/serialized_message.hpp index 162a2e8f46..d89bccd9ab 100644 --- a/rclcpp/include/rclcpp/serialized_message.hpp +++ b/rclcpp/include/rclcpp/serialized_message.hpp @@ -1,4 +1,4 @@ -// Copyright 2019 Open Source Robotics Foundation, Inc. +// Copyright 2020 Open Source Robotics Foundation, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/rclcpp/test/test_intra_process_communication.cpp b/rclcpp/test/test_intra_process_communication.cpp index f50c544c2e..cc702da0e8 100644 --- a/rclcpp/test/test_intra_process_communication.cpp +++ b/rclcpp/test/test_intra_process_communication.cpp @@ -1,4 +1,4 @@ -// Copyright 2019 Open Source Robotics Foundation, Inc. +// Copyright 2020 Open Source Robotics Foundation, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. From 5472d363b174e4346900545bd99432f7f03a983e Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Fri, 17 Apr 2020 20:56:55 +0200 Subject: [PATCH 37/45] added comment Signed-off-by: Joshua Hampp --- rclcpp/src/rclcpp/intra_process_manager.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rclcpp/src/rclcpp/intra_process_manager.cpp b/rclcpp/src/rclcpp/intra_process_manager.cpp index 06ab36d49a..3b0b4d1e19 100644 --- a/rclcpp/src/rclcpp/intra_process_manager.cpp +++ b/rclcpp/src/rclcpp/intra_process_manager.cpp @@ -227,6 +227,8 @@ IntraProcessManager::can_communicate( } // a publisher and a subscription with different content type can't communicate + // if is_serialized is true, the expected message typ is rcl_serialized_message_t + // otherwise the templated ROS2 message type if (sub_info.is_serialized != pub_info.is_serialized) { return false; } From ab73a622976af770936b8606f5eb7ccc361d51d1 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Fri, 17 Apr 2020 20:57:07 +0200 Subject: [PATCH 38/45] updated loop signature Signed-off-by: Joshua Hampp --- rclcpp/src/rclcpp/subscription_base.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rclcpp/src/rclcpp/subscription_base.cpp b/rclcpp/src/rclcpp/subscription_base.cpp index 1f6316c0c8..1d24adf71a 100644 --- a/rclcpp/src/rclcpp/subscription_base.cpp +++ b/rclcpp/src/rclcpp/subscription_base.cpp @@ -236,7 +236,7 @@ SubscriptionBase::get_intra_process_waitables() const std::vector waitables(intra_process_subscription_ids_.size()); - for (size_t i = 0; i < intra_process_subscription_ids_.size(); ++i) { + for (auto i = 0u; i < intra_process_subscription_ids_.size(); ++i) { // Use the id to retrieve the subscription intra-process from the intra-process manager. waitables[i] = ipm->get_subscription_intra_process(intra_process_subscription_ids_[i]); } From 4fa5f090ed05eea20e81b7e3d66a9d895f6d4557 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Fri, 17 Apr 2020 20:57:18 +0200 Subject: [PATCH 39/45] fixed memory loss Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/serialization.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/rclcpp/include/rclcpp/serialization.hpp b/rclcpp/include/rclcpp/serialization.hpp index 2d7bae3f7a..ddc122cb43 100644 --- a/rclcpp/include/rclcpp/serialization.hpp +++ b/rclcpp/include/rclcpp/serialization.hpp @@ -69,6 +69,7 @@ class Serialization : public SerializationBase } if (nullptr == message) { + delete serialized_message; throw std::runtime_error("Message is nullpointer while serialization."); } From ff7a81f5dd5a78c9092ff07a49d9007790a429aa Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Fri, 17 Apr 2020 20:57:38 +0200 Subject: [PATCH 40/45] reuse constructor Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/serialized_message.hpp | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/rclcpp/include/rclcpp/serialized_message.hpp b/rclcpp/include/rclcpp/serialized_message.hpp index d89bccd9ab..96b75f3724 100644 --- a/rclcpp/include/rclcpp/serialized_message.hpp +++ b/rclcpp/include/rclcpp/serialized_message.hpp @@ -34,20 +34,9 @@ class SerializedMessage : public rcl_serialized_message_t : rcl_serialized_message_t(rmw_get_zero_initialized_serialized_message()) {} - explicit SerializedMessage(const SerializedMessage & sc) - : rcl_serialized_message_t(rmw_get_zero_initialized_serialized_message()) - { - const auto ret = rmw_serialized_message_init(this, sc.buffer_length, &sc.allocator); - if (ret != RCL_RET_OK) { - rclcpp::exceptions::throw_from_rcl_error(ret); - } - - // do not call memcpy if the pointer is "static" - if (buffer != sc.buffer) { - std::memcpy(buffer, sc.buffer, sc.buffer_length); - } - buffer_length = sc.buffer_length; - } + explicit SerializedMessage(const SerializedMessage & serialized_message) + : SerializedMessage(static_cast(serialized_message)) + {} explicit SerializedMessage(const rcl_serialized_message_t & sc) : rcl_serialized_message_t(rmw_get_zero_initialized_serialized_message()) From 408343fb968cd6b196b0846195c0d30dcc90be46 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Fri, 17 Apr 2020 20:57:52 +0200 Subject: [PATCH 41/45] renamed variable Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/serialized_message.hpp | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/rclcpp/include/rclcpp/serialized_message.hpp b/rclcpp/include/rclcpp/serialized_message.hpp index 96b75f3724..173f643938 100644 --- a/rclcpp/include/rclcpp/serialized_message.hpp +++ b/rclcpp/include/rclcpp/serialized_message.hpp @@ -38,19 +38,21 @@ class SerializedMessage : public rcl_serialized_message_t : SerializedMessage(static_cast(serialized_message)) {} - explicit SerializedMessage(const rcl_serialized_message_t & sc) + explicit SerializedMessage(const rcl_serialized_message_t & serialized_message) : rcl_serialized_message_t(rmw_get_zero_initialized_serialized_message()) { - const auto ret = rmw_serialized_message_init(this, sc.buffer_length, &sc.allocator); + const auto ret = rmw_serialized_message_init( + this, serialized_message.buffer_length, + &serialized_message.allocator); if (ret != RCL_RET_OK) { rclcpp::exceptions::throw_from_rcl_error(ret); } // do not call memcpy if the pointer is "static" - if (buffer != sc.buffer) { - std::memcpy(buffer, sc.buffer, sc.buffer_length); + if (buffer != serialized_message.buffer) { + std::memcpy(buffer, serialized_message.buffer, serialized_message.buffer_length); } - buffer_length = sc.buffer_length; + buffer_length = serialized_message.buffer_length; } explicit SerializedMessage(rcl_serialized_message_t && msg) From 511476c44539d473b49937c7e2313ff4be0074cf Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Fri, 17 Apr 2020 20:58:00 +0200 Subject: [PATCH 42/45] fixed check Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/serialized_message.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rclcpp/include/rclcpp/serialized_message.hpp b/rclcpp/include/rclcpp/serialized_message.hpp index 173f643938..65a3fc5e0b 100644 --- a/rclcpp/include/rclcpp/serialized_message.hpp +++ b/rclcpp/include/rclcpp/serialized_message.hpp @@ -64,7 +64,7 @@ class SerializedMessage : public rcl_serialized_message_t ~SerializedMessage() { - if (buffer != nullptr) { + if (nullptr != buffer) { const auto fini_ret = rmw_serialized_message_fini(this); if (fini_ret != RCL_RET_OK) { RCUTILS_LOG_ERROR_NAMED( From 1f8226a6f3bdd5a5140e27c1998f7241e3a81d99 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Fri, 17 Apr 2020 20:58:08 +0200 Subject: [PATCH 43/45] line break Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/publisher.hpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index bb40e26f2f..fd53e727d1 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -193,8 +193,7 @@ class Publisher : public PublisherBase if (inter_process_publish_needed) { auto shared_msg = this->do_intra_process_publish_and_return_shared( - std::move( - msg), message_allocator_); + std::move(msg), message_allocator_); this->do_inter_process_publish(*shared_msg); } else { this->do_intra_process_publish(std::move(msg), message_allocator_); From 241164204d5fe9fb87a885c25cc3d48c49aef06c Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Fri, 17 Apr 2020 21:20:51 +0200 Subject: [PATCH 44/45] fixed include order Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/serialized_message.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rclcpp/include/rclcpp/serialized_message.hpp b/rclcpp/include/rclcpp/serialized_message.hpp index 65a3fc5e0b..8e4dbf837f 100644 --- a/rclcpp/include/rclcpp/serialized_message.hpp +++ b/rclcpp/include/rclcpp/serialized_message.hpp @@ -15,14 +15,14 @@ #ifndef RCLCPP__SERIALIZED_MESSAGE_HPP_ #define RCLCPP__SERIALIZED_MESSAGE_HPP_ +#include + #include #include "rcutils/logging_macros.h" #include "rmw/serialized_message.h" -#include - namespace rclcpp { From 43a8446f988e385b81920d9359825577bc0c8e00 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Sun, 19 Apr 2020 06:13:22 +0200 Subject: [PATCH 45/45] renamed overloaded functions: * create_publisher to create_generic_publisher * create_subscription to create_generic_subscription Signed-off-by: Joshua Hampp --- rclcpp/include/rclcpp/create_publisher.hpp | 4 ++-- rclcpp/include/rclcpp/create_subscription.hpp | 11 ++++++----- rclcpp/test/test_intra_process_communication.cpp | 4 ++-- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/rclcpp/include/rclcpp/create_publisher.hpp b/rclcpp/include/rclcpp/create_publisher.hpp index 89536d0e98..c4ac38ece0 100644 --- a/rclcpp/include/rclcpp/create_publisher.hpp +++ b/rclcpp/include/rclcpp/create_publisher.hpp @@ -40,7 +40,7 @@ template< typename PublisherT = rclcpp::Publisher, typename NodeT> std::shared_ptr -create_publisher( +create_generic_publisher( NodeT & node, const std::string & topic_name, const rosidl_message_type_support_t & type_support, @@ -84,7 +84,7 @@ create_publisher( { const auto type_support = *rosidl_typesupport_cpp::get_message_type_support_handle(); - return create_publisher( + return create_generic_publisher( node, topic_name, type_support, qos, options); } diff --git a/rclcpp/include/rclcpp/create_subscription.hpp b/rclcpp/include/rclcpp/create_subscription.hpp index fb9ffea744..7ccc77d9e4 100644 --- a/rclcpp/include/rclcpp/create_subscription.hpp +++ b/rclcpp/include/rclcpp/create_subscription.hpp @@ -48,7 +48,7 @@ template< >, typename NodeT> typename std::shared_ptr -create_subscription( +create_generic_subscription( NodeT && node, const std::string & topic_name, const rosidl_message_type_support_t & type_support, @@ -106,10 +106,11 @@ create_subscription( { const auto type_support = *rosidl_typesupport_cpp::get_message_type_support_handle(); - return create_subscription( - std::forward( - node), topic_name, type_support, qos, std::forward( + return + create_generic_subscription< + MessageT, CallbackT, AllocatorT, CallbackMessageT, SubscriptionT, MessageMemoryStrategyT + >( + std::forward(node), topic_name, type_support, qos, std::forward( callback), options, msg_mem_strat); } diff --git a/rclcpp/test/test_intra_process_communication.cpp b/rclcpp/test/test_intra_process_communication.cpp index cc702da0e8..9a40be728d 100644 --- a/rclcpp/test/test_intra_process_communication.cpp +++ b/rclcpp/test/test_intra_process_communication.cpp @@ -235,13 +235,13 @@ TEST_P(TestPublisherSubscriptionSerialized, publish_serialized_generic) "my_node", "/ns", parameters.node_options[0]); - auto publisher = rclcpp::create_publisher( + auto publisher = rclcpp::create_generic_publisher( node, "/topic", *rosidl_typesupport_cpp::get_message_type_support_handle(), rclcpp::QoS(10)); - auto sub_gen_serialized = rclcpp::create_subscription( + auto sub_gen_serialized = rclcpp::create_generic_subscription( node, "/topic", *rosidl_typesupport_cpp::get_message_type_support_handle(),