From 2a6edf18bb998377be38063e0fbd7a5c185b26f0 Mon Sep 17 00:00:00 2001 From: Alberto Soragna Date: Wed, 16 Jun 2021 07:59:13 +0100 Subject: [PATCH] cleanup intra-process-manager Signed-off-by: Alberto Soragna --- .../create_intra_process_buffer.hpp | 5 +- .../experimental/intra_process_manager.hpp | 31 +++------- .../subscription_intra_process.hpp | 3 +- .../subscription_intra_process_base.hpp | 9 ++- .../subscription_intra_process_buffer.hpp | 3 +- rclcpp/include/rclcpp/publisher.hpp | 8 +-- rclcpp/include/rclcpp/subscription.hpp | 10 ++-- rclcpp/src/rclcpp/intra_process_manager.cpp | 60 +++++++++---------- .../subscription_intra_process_base.cpp | 2 +- .../rclcpp/test_intra_process_manager.cpp | 43 ++++++------- 10 files changed, 77 insertions(+), 97 deletions(-) diff --git a/rclcpp/include/rclcpp/experimental/create_intra_process_buffer.hpp b/rclcpp/include/rclcpp/experimental/create_intra_process_buffer.hpp index ff57a849c3..1d232d14fe 100644 --- a/rclcpp/include/rclcpp/experimental/create_intra_process_buffer.hpp +++ b/rclcpp/include/rclcpp/experimental/create_intra_process_buffer.hpp @@ -24,6 +24,7 @@ #include "rclcpp/experimental/buffers/intra_process_buffer.hpp" #include "rclcpp/experimental/buffers/ring_buffer_implementation.hpp" #include "rclcpp/intra_process_buffer_type.hpp" +#include "rclcpp/qos.hpp" namespace rclcpp { @@ -37,13 +38,13 @@ template< typename rclcpp::experimental::buffers::IntraProcessBuffer::UniquePtr create_intra_process_buffer( IntraProcessBufferType buffer_type, - rmw_qos_profile_t qos, + const rclcpp::QoS & qos, std::shared_ptr allocator) { using MessageSharedPtr = std::shared_ptr; using MessageUniquePtr = std::unique_ptr; - size_t buffer_size = qos.depth; + size_t buffer_size = qos.depth(); using rclcpp::experimental::buffers::IntraProcessBuffer; typename IntraProcessBuffer::UniquePtr buffer; diff --git a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp index 4d968ba7d8..a927a81856 100644 --- a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp @@ -305,25 +305,6 @@ class IntraProcessManager get_subscription_intra_process(uint64_t intra_process_subscription_id); private: - struct SubscriptionInfo - { - SubscriptionInfo() = default; - - rclcpp::experimental::SubscriptionIntraProcessBase::WeakPtr subscription; - rmw_qos_profile_t qos; - const char * topic_name; - bool use_take_shared_method; - }; - - struct PublisherInfo - { - PublisherInfo() = default; - - rclcpp::PublisherBase::WeakPtr publisher; - rmw_qos_profile_t qos; - const char * topic_name; - }; - struct SplittedSubscriptions { std::vector take_shared_subscriptions; @@ -331,10 +312,10 @@ class IntraProcessManager }; using SubscriptionMap = - std::unordered_map; + std::unordered_map; using PublisherMap = - std::unordered_map; + std::unordered_map; using PublisherToSubscriptionIdsMap = std::unordered_map; @@ -350,7 +331,9 @@ class IntraProcessManager RCLCPP_PUBLIC bool - can_communicate(PublisherInfo pub_info, SubscriptionInfo sub_info) const; + can_communicate( + rclcpp::PublisherBase::SharedPtr pub, + rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr sub) const; template< typename MessageT, @@ -366,7 +349,7 @@ class IntraProcessManager if (subscription_it == subscriptions_.end()) { throw std::runtime_error("subscription has unexpectedly gone out of scope"); } - auto subscription_base = subscription_it->second.subscription.lock(); + auto subscription_base = subscription_it->second.lock(); if (subscription_base) { auto subscription = std::dynamic_pointer_cast< rclcpp::experimental::SubscriptionIntraProcessBuffer @@ -404,7 +387,7 @@ class IntraProcessManager if (subscription_it == subscriptions_.end()) { throw std::runtime_error("subscription has unexpectedly gone out of scope"); } - auto subscription_base = subscription_it->second.subscription.lock(); + auto subscription_base = subscription_it->second.lock(); if (subscription_base) { auto subscription = std::dynamic_pointer_cast< rclcpp::experimental::SubscriptionIntraProcessBuffer diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index 92ee8fde78..f174cd5c07 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -31,6 +31,7 @@ #include "rclcpp/experimental/create_intra_process_buffer.hpp" #include "rclcpp/experimental/subscription_intra_process_base.hpp" #include "rclcpp/experimental/subscription_intra_process_buffer.hpp" +#include "rclcpp/qos.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/waitable.hpp" #include "tracetools/tracetools.h" @@ -72,7 +73,7 @@ class SubscriptionIntraProcess std::shared_ptr allocator, rclcpp::Context::SharedPtr context, const std::string & topic_name, - rmw_qos_profile_t qos_profile, + const rclcpp::QoS & qos_profile, rclcpp::IntraProcessBufferType buffer_type) : SubscriptionIntraProcessBuffer( allocator, diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp index c41317c705..ae57521999 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp @@ -25,6 +25,7 @@ #include "rcl/error_handling.h" +#include "rclcpp/qos.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/waitable.hpp" @@ -39,7 +40,9 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable RCLCPP_SMART_PTR_ALIASES_ONLY(SubscriptionIntraProcessBase) RCLCPP_PUBLIC - SubscriptionIntraProcessBase(const std::string & topic_name, rmw_qos_profile_t qos_profile) + SubscriptionIntraProcessBase( + const std::string & topic_name, + const rclcpp::QoS & qos_profile) : topic_name_(topic_name), qos_profile_(qos_profile) {} @@ -71,7 +74,7 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable get_topic_name() const; RCLCPP_PUBLIC - rmw_qos_profile_t + QoS get_actual_qos() const; protected: @@ -83,7 +86,7 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable trigger_guard_condition() = 0; std::string topic_name_; - rmw_qos_profile_t qos_profile_; + QoS qos_profile_; }; } // namespace experimental diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp index b58c2853ca..05a830633c 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp @@ -30,6 +30,7 @@ #include "rclcpp/experimental/buffers/intra_process_buffer.hpp" #include "rclcpp/experimental/create_intra_process_buffer.hpp" #include "rclcpp/experimental/subscription_intra_process_base.hpp" +#include "rclcpp/qos.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/waitable.hpp" #include "tracetools/tracetools.h" @@ -64,7 +65,7 @@ class SubscriptionIntraProcessBuffer : public SubscriptionIntraProcessBase std::shared_ptr allocator, rclcpp::Context::SharedPtr context, const std::string & topic_name, - rmw_qos_profile_t qos_profile, + const rclcpp::QoS & qos_profile, rclcpp::IntraProcessBufferType buffer_type) : SubscriptionIntraProcessBase(topic_name, qos_profile) { diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index c7f77420c4..346a87f3f1 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -186,15 +186,15 @@ class Publisher : public PublisherBase // Get the intra process manager instance for this context. auto ipm = context->get_sub_context(); // Register the publisher with the intra process manager. - if (qos.get_rmw_qos_profile().history == RMW_QOS_POLICY_HISTORY_KEEP_ALL) { + if (qos.history() != rclcpp::HistoryPolicy::KeepLast) { throw std::invalid_argument( - "intraprocess communication is not allowed with keep all history qos policy"); + "intraprocess communication allowed only with keep last history qos policy"); } - if (qos.get_rmw_qos_profile().depth == 0) { + if (qos.depth() == 0) { throw std::invalid_argument( "intraprocess communication is not allowed with a zero qos history depth value"); } - if (qos.get_rmw_qos_profile().durability != RMW_QOS_POLICY_DURABILITY_VOLATILE) { + if (qos.durability() != rclcpp::DurabilityPolicy::Volatile) { throw std::invalid_argument( "intraprocess communication allowed only with volatile durability"); } diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index dd73bffa46..0b42549978 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -183,16 +183,16 @@ class Subscription : public SubscriptionBase using rclcpp::detail::resolve_intra_process_buffer_type; // Check if the QoS is compatible with intra-process. - rmw_qos_profile_t qos_profile = get_actual_qos().get_rmw_qos_profile(); - if (qos_profile.history == RMW_QOS_POLICY_HISTORY_KEEP_ALL) { + auto qos_profile = get_actual_qos(); + if (qos_profile.history() != rclcpp::HistoryPolicy::KeepLast) { throw std::invalid_argument( - "intraprocess communication is not allowed with keep all history qos policy"); + "intraprocess communication allowed only with keep last history qos policy"); } - if (qos_profile.depth == 0) { + if (qos_profile.depth() == 0) { throw std::invalid_argument( "intraprocess communication is not allowed with 0 depth qos policy"); } - if (qos_profile.durability != RMW_QOS_POLICY_DURABILITY_VOLATILE) { + if (qos_profile.durability() != rclcpp::DurabilityPolicy::Volatile) { throw std::invalid_argument( "intraprocess communication allowed only with volatile durability"); } diff --git a/rclcpp/src/rclcpp/intra_process_manager.cpp b/rclcpp/src/rclcpp/intra_process_manager.cpp index 339f7de9a9..efce4afeaf 100644 --- a/rclcpp/src/rclcpp/intra_process_manager.cpp +++ b/rclcpp/src/rclcpp/intra_process_manager.cpp @@ -36,23 +36,26 @@ IntraProcessManager::add_publisher(rclcpp::PublisherBase::SharedPtr publisher) { std::unique_lock lock(mutex_); - auto id = IntraProcessManager::get_next_unique_id(); + uint64_t pub_id = IntraProcessManager::get_next_unique_id(); - publishers_[id].publisher = publisher; - publishers_[id].topic_name = publisher->get_topic_name(); - publishers_[id].qos = publisher->get_actual_qos().get_rmw_qos_profile(); + publishers_[pub_id] = publisher; // Initialize the subscriptions storage for this publisher. - pub_to_subs_[id] = SplittedSubscriptions(); + pub_to_subs_[pub_id] = SplittedSubscriptions(); // create an entry for the publisher id and populate with already existing subscriptions for (auto & pair : subscriptions_) { - if (can_communicate(publishers_[id], pair.second)) { - insert_sub_id_for_pub(pair.first, id, pair.second.use_take_shared_method); + auto subscription = pair.second.lock(); + if (!subscription) { + continue; + } + if (can_communicate(publisher, subscription)) { + uint64_t sub_id = pair.first; + insert_sub_id_for_pub(sub_id, pub_id, subscription->use_take_shared_method()); } } - return id; + return pub_id; } uint64_t @@ -60,21 +63,23 @@ IntraProcessManager::add_subscription(SubscriptionIntraProcessBase::SharedPtr su { std::unique_lock lock(mutex_); - auto id = IntraProcessManager::get_next_unique_id(); + uint64_t sub_id = IntraProcessManager::get_next_unique_id(); - subscriptions_[id].subscription = subscription; - subscriptions_[id].topic_name = subscription->get_topic_name(); - subscriptions_[id].qos = subscription->get_actual_qos(); - subscriptions_[id].use_take_shared_method = subscription->use_take_shared_method(); + subscriptions_[sub_id] = subscription; // adds the subscription id to all the matchable publishers for (auto & pair : publishers_) { - if (can_communicate(pair.second, subscriptions_[id])) { - insert_sub_id_for_pub(id, pair.first, subscriptions_[id].use_take_shared_method); + auto publisher = pair.second.lock(); + if (!publisher) { + continue; + } + if (can_communicate(publisher, subscription)) { + uint64_t pub_id = pair.first; + insert_sub_id_for_pub(sub_id, pub_id, subscription->use_take_shared_method()); } } - return id; + return sub_id; } void @@ -116,7 +121,7 @@ IntraProcessManager::matches_any_publishers(const rmw_gid_t * id) const std::shared_lock lock(mutex_); for (auto & publisher_pair : publishers_) { - auto publisher = publisher_pair.second.publisher.lock(); + auto publisher = publisher_pair.second.lock(); if (!publisher) { continue; } @@ -157,7 +162,7 @@ IntraProcessManager::get_subscription_intra_process(uint64_t intra_process_subsc if (subscription_it == subscriptions_.end()) { return nullptr; } else { - auto subscription = subscription_it->second.subscription.lock(); + auto subscription = subscription_it->second.lock(); if (subscription) { return subscription; } else { @@ -204,25 +209,16 @@ IntraProcessManager::insert_sub_id_for_pub( bool IntraProcessManager::can_communicate( - PublisherInfo pub_info, - SubscriptionInfo sub_info) const + rclcpp::PublisherBase::SharedPtr pub, + rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr sub) const { // publisher and subscription must be on the same topic - if (strcmp(pub_info.topic_name, sub_info.topic_name) != 0) { - return false; - } - - // TODO(alsora): the following checks for qos compatibility should be provided by the RMW - // a reliable subscription can't be connected with a best effort publisher - if ( - sub_info.qos.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE && - pub_info.qos.reliability == RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT) - { + if (strcmp(pub->get_topic_name(), sub->get_topic_name()) != 0) { return false; } - // a publisher and a subscription with different durability can't communicate - if (sub_info.qos.durability != pub_info.qos.durability) { + auto check_result = rclcpp::qos_check_compatible(pub->get_actual_qos(), sub->get_actual_qos()); + if (check_result.compatibility == rclcpp::QoSCompatibility::Error) { return false; } diff --git a/rclcpp/src/rclcpp/subscription_intra_process_base.cpp b/rclcpp/src/rclcpp/subscription_intra_process_base.cpp index 3b951f34de..2738161675 100644 --- a/rclcpp/src/rclcpp/subscription_intra_process_base.cpp +++ b/rclcpp/src/rclcpp/subscription_intra_process_base.cpp @@ -31,7 +31,7 @@ SubscriptionIntraProcessBase::get_topic_name() const return topic_name_.c_str(); } -rmw_qos_profile_t +rclcpp::QoS SubscriptionIntraProcessBase::get_actual_qos() const { return qos_profile_; diff --git a/rclcpp/test/rclcpp/test_intra_process_manager.cpp b/rclcpp/test/rclcpp/test_intra_process_manager.cpp index 00350edeb0..59437aa560 100644 --- a/rclcpp/test/rclcpp/test_intra_process_manager.cpp +++ b/rclcpp/test/rclcpp/test_intra_process_manager.cpp @@ -52,8 +52,8 @@ class PublisherBase public: RCLCPP_SMART_PTR_DEFINITIONS(PublisherBase) - PublisherBase() - : qos(rclcpp::QoS(10)), + explicit PublisherBase(rclcpp::QoS qos = rclcpp::QoS(10)) + : qos_profile(qos), topic_name("topic") {} @@ -76,7 +76,7 @@ class PublisherBase rclcpp::QoS get_actual_qos() const { - return qos; + return qos_profile; } bool @@ -93,7 +93,7 @@ class PublisherBase return false; } - rclcpp::QoS qos; + rclcpp::QoS qos_profile; std::string topic_name; uint64_t intra_process_publisher_id_; IntraProcessManagerWeakPtr weak_ipm_; @@ -111,9 +111,9 @@ class Publisher : public PublisherBase RCLCPP_SMART_PTR_DEFINITIONS(Publisher) - Publisher() + explicit Publisher(rclcpp::QoS qos = rclcpp::QoS(10)) + : PublisherBase(qos) { - qos = rclcpp::QoS(10); auto allocator = std::make_shared(); message_allocator_ = std::make_shared(*allocator.get()); } @@ -194,8 +194,8 @@ class SubscriptionIntraProcessBase public: RCLCPP_SMART_PTR_ALIASES_ONLY(SubscriptionIntraProcessBase) - SubscriptionIntraProcessBase() - : qos_profile(rmw_qos_profile_default), topic_name("topic") + explicit SubscriptionIntraProcessBase(rclcpp::QoS qos = rclcpp::QoS(10)) + : qos_profile(qos), topic_name("topic") {} virtual ~SubscriptionIntraProcessBase() {} @@ -203,7 +203,7 @@ class SubscriptionIntraProcessBase virtual bool use_take_shared_method() const = 0; - rmw_qos_profile_t + QoS get_actual_qos() { return qos_profile; @@ -215,7 +215,7 @@ class SubscriptionIntraProcessBase return topic_name; } - rmw_qos_profile_t qos_profile; + rclcpp::QoS qos_profile; const char * topic_name; }; @@ -228,8 +228,8 @@ class SubscriptionIntraProcessBuffer : public SubscriptionIntraProcessBase public: RCLCPP_SMART_PTR_DEFINITIONS(SubscriptionIntraProcessBuffer) - SubscriptionIntraProcessBuffer() - : take_shared_method(false) + explicit SubscriptionIntraProcessBuffer(rclcpp::QoS qos) + : SubscriptionIntraProcessBase(qos), take_shared_method(false) { buffer = std::make_unique>(); } @@ -278,8 +278,8 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBuffer< public: RCLCPP_SMART_PTR_DEFINITIONS(SubscriptionIntraProcess) - SubscriptionIntraProcess() - : SubscriptionIntraProcessBuffer() + explicit SubscriptionIntraProcess(rclcpp::QoS qos = rclcpp::QoS(10)) + : SubscriptionIntraProcessBuffer(qos) { } }; @@ -359,15 +359,12 @@ TEST(TestIntraProcessManager, add_pub_sub) { auto ipm = std::make_shared(); - auto p1 = std::make_shared(); - p1->qos.get_rmw_qos_profile().reliability = RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT; + auto p1 = std::make_shared(rclcpp::QoS(10).best_effort()); - auto p2 = std::make_shared(); - p2->qos.get_rmw_qos_profile().reliability = RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT; + auto p2 = std::make_shared(rclcpp::QoS(10).best_effort()); p2->topic_name = "different_topic_name"; - auto s1 = std::make_shared(); - s1->qos_profile.reliability = RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT; + auto s1 = std::make_shared(rclcpp::QoS(10).best_effort()); auto p1_id = ipm->add_publisher(p1); auto p2_id = ipm->add_publisher(p2); @@ -383,11 +380,9 @@ TEST(TestIntraProcessManager, add_pub_sub) { ASSERT_EQ(0u, p2_subs); ASSERT_EQ(0u, non_existing_pub_subs); - auto p3 = std::make_shared(); - p3->qos.get_rmw_qos_profile().reliability = RMW_QOS_POLICY_RELIABILITY_RELIABLE; + auto p3 = std::make_shared(rclcpp::QoS(10).reliable()); - auto s2 = std::make_shared(); - s2->qos_profile.reliability = RMW_QOS_POLICY_RELIABILITY_RELIABLE; + auto s2 = std::make_shared(rclcpp::QoS(10).reliable()); auto s2_id = ipm->add_subscription(s2); auto p3_id = ipm->add_publisher(p3);