diff --git a/rclcpp/include/rclcpp/executors/event_waitable.hpp b/rclcpp/include/rclcpp/executors/event_waitable.hpp new file mode 100644 index 0000000000..895bf13f50 --- /dev/null +++ b/rclcpp/include/rclcpp/executors/event_waitable.hpp @@ -0,0 +1,75 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXECUTORS__EVENT_WAITABLE_HPP_ +#define RCLCPP__EXECUTORS__EVENT_WAITABLE_HPP_ + +#include "rclcpp/waitable.hpp" + +namespace rclcpp +{ +namespace executors +{ + +/** + * @brief This class provides a wrapper around the waitable object, that is + * meant to be used with the EventsExecutor. + * The waitset related methods are stubbed out as they should not be called. + * Nodes who want to implement a custom EventWaitable, can derive from this class and override + * the execute function. + */ +class EventWaitable : public rclcpp::Waitable +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(EventWaitable) + + // Constructor + RCLCPP_PUBLIC + EventWaitable() = default; + + // Destructor + RCLCPP_PUBLIC + virtual ~EventWaitable() = default; + + // Executing an EventWaitable is a no-op. + // Derive from this class to implement execute function. + RCLCPP_PUBLIC + virtual void + execute() = 0; + + // Stub API: not used by EventsExecutor + RCLCPP_PUBLIC + bool + is_ready(rcl_wait_set_t * wait_set) final + { + (void)wait_set; + throw std::runtime_error("EventWaitable can't be checked if it's ready"); + return false; + } + + // Stub API: not used by EventsExecutor + RCLCPP_PUBLIC + bool + add_to_wait_set(rcl_wait_set_t * wait_set) final + { + (void)wait_set; + throw std::runtime_error("EventWaitable can't be added to a wait_set"); + return false; + } +}; + +} // namespace executors +} // namespace rclcpp + +#endif // RCLCPP__EXECUTORS__EVENT_WAITABLE_HPP_ diff --git a/rclcpp/include/rclcpp/executors/events_executor.hpp b/rclcpp/include/rclcpp/executors/events_executor.hpp index d784b6842e..ead774011b 100644 --- a/rclcpp/include/rclcpp/executors/events_executor.hpp +++ b/rclcpp/include/rclcpp/executors/events_executor.hpp @@ -18,12 +18,13 @@ #include #include #include -#include +#include #include "rcutils/executor_event_types.h" #include "rclcpp/executor.hpp" #include "rclcpp/executors/events_executor_entities_collector.hpp" +#include "rclcpp/executors/events_executor_notify_waitable.hpp" #include "rclcpp/executors/timers_manager.hpp" #include "rclcpp/node.hpp" @@ -57,7 +58,7 @@ class EventsExecutor : public rclcpp::Executor /// Default destrcutor. RCLCPP_PUBLIC - virtual ~EventsExecutor(); + virtual ~EventsExecutor() = default; /// Events executor implementation of spin. /** @@ -231,8 +232,6 @@ class EventsExecutor : public rclcpp::Executor // facilitate the removal of events from expired entities. using EventQueue = std::deque; - EventsExecutorEntitiesCollector::SharedPtr entities_collector_; - /// Extract and execute events from the queue until it is empty RCLCPP_PUBLIC void @@ -247,6 +246,8 @@ class EventsExecutor : public rclcpp::Executor EventQueue event_queue_; EventQueue local_event_queue_; + EventsExecutorEntitiesCollector::SharedPtr entities_collector_; + EventsExecutorNotifyWaitable::SharedPtr executor_notifier_; // Mutex to protect the insertion of events in the queue std::mutex push_mutex_; // Mutex to protect the execution of events diff --git a/rclcpp/include/rclcpp/executors/events_executor_entities_collector.hpp b/rclcpp/include/rclcpp/executors/events_executor_entities_collector.hpp index f0053e167c..a3ffe6237c 100644 --- a/rclcpp/include/rclcpp/executors/events_executor_entities_collector.hpp +++ b/rclcpp/include/rclcpp/executors/events_executor_entities_collector.hpp @@ -16,11 +16,13 @@ #define RCLCPP__EXECUTORS__EVENTS_EXECUTOR_ENTITIES_COLLECTOR_HPP_ #include +#include #include +#include +#include "rclcpp/executors/event_waitable.hpp" #include "rclcpp/executors/timers_manager.hpp" #include "rclcpp/node_interfaces/node_base_interface.hpp" -#include "rclcpp/waitable.hpp" namespace rclcpp { @@ -46,7 +48,7 @@ class EventsExecutor; * When this occurs, the execute API takes care of handling changes * in the entities currently used by the executor. */ -class EventsExecutorEntitiesCollector final : public rclcpp::Waitable +class EventsExecutorEntitiesCollector final : public EventWaitable { public: RCLCPP_SMART_PTR_DEFINITIONS(EventsExecutorEntitiesCollector) @@ -79,24 +81,6 @@ class EventsExecutorEntitiesCollector final : public rclcpp::Waitable remove_node( rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr); - // Stub API: not used by EventsExecutor - RCLCPP_PUBLIC - bool - is_ready(rcl_wait_set_t * wait_set) override - { - (void)wait_set; - return false; - } - - // Stub API: not used by EventsExecutor - RCLCPP_PUBLIC - bool - add_to_wait_set(rcl_wait_set_t * wait_set) override - { - (void)wait_set; - return false; - } - /// Add a callback group to the entities collector /** * \see rclcpp::Executor::add_callback_group @@ -165,6 +149,12 @@ class EventsExecutorEntitiesCollector final : public rclcpp::Waitable void unset_callback_group_entities_callbacks(rclcpp::CallbackGroup::SharedPtr group); + void + set_guard_condition_callback(const rcl_guard_condition_t * guard_condition); + + void + unset_guard_condition_callback(const rcl_guard_condition_t * guard_condition); + /// Return true if the node belongs to the collector /** * \param[in] group_ptr a node base interface shared pointer diff --git a/rclcpp/include/rclcpp/executors/events_executor_notify_waitable.hpp b/rclcpp/include/rclcpp/executors/events_executor_notify_waitable.hpp new file mode 100644 index 0000000000..bf4b675065 --- /dev/null +++ b/rclcpp/include/rclcpp/executors/events_executor_notify_waitable.hpp @@ -0,0 +1,90 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXECUTORS__EVENTS_EXECUTOR_NOTIFY_WAITABLE_HPP_ +#define RCLCPP__EXECUTORS__EVENTS_EXECUTOR_NOTIFY_WAITABLE_HPP_ + +#include + +#include "rcl/guard_condition.h" +#include "rclcpp/executors/event_waitable.hpp" + +namespace rclcpp +{ +namespace executors +{ + +/** + * @brief This class provides an EventWaitable that allows to + * wake up an EventsExecutor when a guard condition is notified. + */ +class EventsExecutorNotifyWaitable final : public EventWaitable +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(EventsExecutorNotifyWaitable) + + // Constructor + RCLCPP_PUBLIC + EventsExecutorNotifyWaitable() = default; + + // Destructor + RCLCPP_PUBLIC + virtual ~EventsExecutorNotifyWaitable() + { + if (on_destruction_callback_) { + on_destruction_callback_(this); + } + } + + // The function is a no-op, since we only care of waking up the executor + RCLCPP_PUBLIC + void + execute() override + {} + + RCLCPP_PUBLIC + void + add_guard_condition(rcl_guard_condition_t * guard_condition) + { + notify_guard_conditions_.push_back(guard_condition); + } + + RCLCPP_PUBLIC + void + set_events_executor_callback( + const rclcpp::executors::EventsExecutor * executor, + ExecutorEventCallback executor_callback) const override + { + for (auto gc : notify_guard_conditions_) { + rcl_ret_t ret = rcl_guard_condition_set_events_executor_callback( + executor, + executor_callback, + this, + gc, + false); + + if (RCL_RET_OK != ret) { + throw std::runtime_error("Couldn't set guard condition callback"); + } + } + } + +private: + std::list notify_guard_conditions_; +}; + +} // namespace executors +} // namespace rclcpp + +#endif // RCLCPP__EXECUTORS__EVENTS_EXECUTOR_NOTIFY_WAITABLE_HPP_ diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp index 8d3a5dc2f5..9d5af14b8d 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp @@ -43,7 +43,7 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable : topic_name_(topic_name), qos_profile_(qos_profile) {} - virtual ~SubscriptionIntraProcessBase() = default; + virtual ~SubscriptionIntraProcessBase(); RCLCPP_PUBLIC size_t diff --git a/rclcpp/include/rclcpp/qos_event.hpp b/rclcpp/include/rclcpp/qos_event.hpp index 7aadf5a224..8ff2cfc2fa 100644 --- a/rclcpp/include/rclcpp/qos_event.hpp +++ b/rclcpp/include/rclcpp/qos_event.hpp @@ -105,6 +105,13 @@ class QOSEventHandlerBase : public Waitable bool is_ready(rcl_wait_set_t * wait_set) override; + /// Set EventsExecutor's callback + RCLCPP_PUBLIC + void + set_events_executor_callback( + const rclcpp::executors::EventsExecutor * executor, + ExecutorEventCallback executor_callback) const override; + protected: rcl_event_t event_handle_; size_t wait_set_event_index_; @@ -153,25 +160,6 @@ class QOSEventHandler : public QOSEventHandlerBase event_callback_(callback_info); } - /// Set EventsExecutor's callback - RCLCPP_PUBLIC - void - set_events_executor_callback( - const rclcpp::executors::EventsExecutor * executor, - ExecutorEventCallback executor_callback) const override - { - rcl_ret_t ret = rcl_event_set_events_executor_callback( - executor, - executor_callback, - this, - &event_handle_, - false /* Discard previous events */); - - if (RCL_RET_OK != ret) { - throw std::runtime_error("Couldn't set EventsExecutor's callback to event"); - } - } - private: using EventCallbackInfoT = typename std::remove_reference::template argument_type<0>>::type; diff --git a/rclcpp/include/rclcpp/waitable.hpp b/rclcpp/include/rclcpp/waitable.hpp index a9f30c68cb..3bc6f84616 100644 --- a/rclcpp/include/rclcpp/waitable.hpp +++ b/rclcpp/include/rclcpp/waitable.hpp @@ -37,7 +37,7 @@ class Waitable RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(Waitable) RCLCPP_PUBLIC - virtual ~Waitable(); + virtual ~Waitable() = default; /// Get the number of ready subscriptions /** @@ -182,8 +182,10 @@ class Waitable void set_on_destruction_callback(std::function on_destruction_callback); -private: +protected: std::function on_destruction_callback_; + +private: std::atomic in_use_by_wait_set_{false}; }; // class Waitable diff --git a/rclcpp/src/rclcpp/executors/events_executor.cpp b/rclcpp/src/rclcpp/executors/events_executor.cpp index 7086fef9f4..4afab2395a 100644 --- a/rclcpp/src/rclcpp/executors/events_executor.cpp +++ b/rclcpp/src/rclcpp/executors/events_executor.cpp @@ -16,6 +16,7 @@ #include #include #include +#include #include "rclcpp/exceptions/exceptions.hpp" #include "rclcpp/executors/events_executor.hpp" @@ -31,35 +32,22 @@ EventsExecutor::EventsExecutor( timers_manager_ = std::make_shared(context_); entities_collector_ = std::make_shared(this, timers_manager_); - rcl_ret_t ret; - - // Set the global ctrl-c guard condition callback - ret = rcl_guard_condition_set_events_executor_callback( - this, - &EventsExecutor::push_event, - entities_collector_.get(), - options.context->get_interrupt_guard_condition(&wait_set_), - false /* Discard previous events */); - - if (ret != RCL_RET_OK) { - throw std::runtime_error("Couldn't set ctrl-c guard condition callback"); - } - - // Set the executor interrupt guard condition callback - ret = rcl_guard_condition_set_events_executor_callback( - this, - &EventsExecutor::push_event, - entities_collector_.get(), - &interrupt_guard_condition_, - false /* Discard previous events */); - - if (ret != RCL_RET_OK) { - throw std::runtime_error("Couldn't set interrupt guard condition callback"); - } + // This API uses the wait_set only as a token to identify different executors. + auto context_interrupt_gc = options.context->get_interrupt_guard_condition(&wait_set_); + + // Setup the executor notifier to wake up the executor when some guard conditions are tiggered. + // The added guard conditions are guaranteed to not go out of scope before the executor itself. + executor_notifier_ = std::make_shared(); + executor_notifier_->add_guard_condition(context_interrupt_gc); + executor_notifier_->add_guard_condition(&interrupt_guard_condition_); + executor_notifier_->set_events_executor_callback(this, &EventsExecutor::push_event); + executor_notifier_->set_on_destruction_callback( + std::bind( + &EventsExecutor::remove_entity, + this, + std::placeholders::_1)); } -EventsExecutor::~EventsExecutor() {} - void EventsExecutor::spin() { @@ -345,7 +333,7 @@ EventsExecutor::remove_callback_group( // This field is unused because we don't have to wake up // the executor when a callback group is removed. (void)notify; - entities_collector_->remove_callback_group(group_ptr); + entities_collector_->remove_callback_group(group_ptr); } std::vector @@ -364,4 +352,4 @@ std::vector EventsExecutor::get_automatically_added_callback_groups_from_nodes() { return entities_collector_->get_automatically_added_callback_groups_from_nodes(); -} \ No newline at end of file +} diff --git a/rclcpp/src/rclcpp/executors/events_executor_entities_collector.cpp b/rclcpp/src/rclcpp/executors/events_executor_entities_collector.cpp index 08d4904a98..41d2befa0d 100644 --- a/rclcpp/src/rclcpp/executors/events_executor_entities_collector.cpp +++ b/rclcpp/src/rclcpp/executors/events_executor_entities_collector.cpp @@ -13,6 +13,8 @@ // limitations under the License. #include +#include +#include #include "rclcpp/executors/events_executor.hpp" #include "rclcpp/executors/events_executor_entities_collector.hpp" @@ -61,12 +63,7 @@ EventsExecutorEntitiesCollector::~EventsExecutorEntitiesCollector() auto node = pair.first.lock(); if (node) { auto node_gc = pair.second; - rcl_ret_t ret = rcl_guard_condition_set_events_executor_callback( - nullptr, nullptr, nullptr, - node_gc, - false); - - (void)ret; // Can't throw on destructors + unset_guard_condition_callback(node_gc); } } @@ -89,8 +86,7 @@ EventsExecutorEntitiesCollector::add_node( } // Get node callback groups, add them to weak_groups_to_nodes_associated_with_executor_ - for (const auto & weak_group : node_ptr->get_callback_groups()) - { + for (const auto & weak_group : node_ptr->get_callback_groups()) { auto group_ptr = weak_group.lock(); if (group_ptr != nullptr && !group_ptr->get_associated_with_executor_atomic().load() && group_ptr->automatically_add_to_executor_with_node()) @@ -130,16 +126,7 @@ EventsExecutorEntitiesCollector::add_callback_group( if (is_new_node) { // Set an event callback for the node's notify guard condition, so if new entities are added // or removed to this node we will receive an event. - rcl_ret_t ret = rcl_guard_condition_set_events_executor_callback( - associated_executor_, - &EventsExecutor::push_event, - this, - node_ptr->get_notify_guard_condition(), - false /* Discard previous events */); - - if (ret != RCL_RET_OK) { - throw std::runtime_error("Couldn't set node guard condition callback"); - } + set_guard_condition_callback(node_ptr->get_notify_guard_condition()); // Store node's notify guard condition rclcpp::node_interfaces::NodeBaseInterface::WeakPtr node_weak_ptr(node_ptr); @@ -359,7 +346,7 @@ EventsExecutorEntitiesCollector::remove_callback_group_from_map( // Look for the group to remove in the map auto iter = weak_groups_to_nodes.find(weak_group_ptr); - if (iter != weak_groups_to_nodes.end()){ + if (iter != weak_groups_to_nodes.end()) { // Group found, get its associated node. node_ptr = iter->second.lock(); if (node_ptr == nullptr) { @@ -376,18 +363,11 @@ EventsExecutorEntitiesCollector::remove_callback_group_from_map( !has_node(node_ptr, weak_groups_associated_with_executor_to_nodes_) && !has_node(node_ptr, weak_groups_to_nodes_associated_with_executor_); - if(!node_has_associated_callback_groups) { + if (!node_has_associated_callback_groups) { // Node doesn't have more callback groups associated to the executor. // Unset the event callback for the node's notify guard condition, to stop // receiving events if entities are added or removed to this node. - rcl_ret_t ret = rcl_guard_condition_set_events_executor_callback( - nullptr, nullptr, nullptr, - node_ptr->get_notify_guard_condition(), - false); - - if (ret != RCL_RET_OK) { - throw std::runtime_error("Couldn't set node guard condition callback"); - } + unset_guard_condition_callback(node_ptr->get_notify_guard_condition()); // Remove guard condition from list rclcpp::node_interfaces::NodeBaseInterface::WeakPtr weak_node_ptr(node_ptr); @@ -497,3 +477,35 @@ EventsExecutorEntitiesCollector::get_automatically_added_callback_groups_from_no } return groups; } + +void +EventsExecutorEntitiesCollector::set_guard_condition_callback( + const rcl_guard_condition_t * guard_condition) +{ + rcl_ret_t ret = rcl_guard_condition_set_events_executor_callback( + associated_executor_, + &EventsExecutor::push_event, + this, + guard_condition, + false /* Discard previous events */); + + if (ret != RCL_RET_OK) { + throw std::runtime_error("Couldn't set guard condition event callback"); + } +} + +void +EventsExecutorEntitiesCollector::unset_guard_condition_callback( + const rcl_guard_condition_t * guard_condition) +{ + rcl_ret_t ret = rcl_guard_condition_set_events_executor_callback( + nullptr, + nullptr, + nullptr, + guard_condition, + false /* Discard previous events */); + + if (ret != RCL_RET_OK) { + throw std::runtime_error("Couldn't unset guard condition event callback"); + } +} diff --git a/rclcpp/src/rclcpp/qos_event.cpp b/rclcpp/src/rclcpp/qos_event.cpp index 8af3918c48..c4837c09a9 100644 --- a/rclcpp/src/rclcpp/qos_event.cpp +++ b/rclcpp/src/rclcpp/qos_event.cpp @@ -35,6 +35,10 @@ UnsupportedEventTypeException::UnsupportedEventTypeException( QOSEventHandlerBase::~QOSEventHandlerBase() { + if (on_destruction_callback_) { + on_destruction_callback_(this); + } + if (rcl_event_fini(&event_handle_) != RCL_RET_OK) { RCUTILS_LOG_ERROR_NAMED( "rclcpp", @@ -68,4 +72,21 @@ QOSEventHandlerBase::is_ready(rcl_wait_set_t * wait_set) return wait_set->events[wait_set_event_index_] == &event_handle_; } +void +QOSEventHandlerBase::set_events_executor_callback( + const rclcpp::executors::EventsExecutor * executor, + ExecutorEventCallback executor_callback) const +{ + rcl_ret_t ret = rcl_event_set_events_executor_callback( + executor, + executor_callback, + this, + &event_handle_, + false /* Discard previous events */); + + if (RCL_RET_OK != ret) { + throw std::runtime_error("Couldn't set EventsExecutor's callback in QOSEventHandlerBase"); + } +} + } // namespace rclcpp diff --git a/rclcpp/src/rclcpp/subscription_intra_process_base.cpp b/rclcpp/src/rclcpp/subscription_intra_process_base.cpp index 754573e716..56d91f3283 100644 --- a/rclcpp/src/rclcpp/subscription_intra_process_base.cpp +++ b/rclcpp/src/rclcpp/subscription_intra_process_base.cpp @@ -12,12 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include - #include "rclcpp/experimental/subscription_intra_process_base.hpp" using rclcpp::experimental::SubscriptionIntraProcessBase; +SubscriptionIntraProcessBase::~SubscriptionIntraProcessBase() +{ + if (on_destruction_callback_) { + on_destruction_callback_(this); + } +} + bool SubscriptionIntraProcessBase::add_to_wait_set(rcl_wait_set_t * wait_set) { @@ -52,6 +57,6 @@ SubscriptionIntraProcessBase::set_events_executor_callback( true /*Use previous events*/); if (RCL_RET_OK != ret) { - throw std::runtime_error(std::string("Couldn't set guard condition callback")); + throw std::runtime_error("Couldn't set guard condition callback"); } } diff --git a/rclcpp/src/rclcpp/waitable.cpp b/rclcpp/src/rclcpp/waitable.cpp index 919af579d5..c1f4d4c66c 100644 --- a/rclcpp/src/rclcpp/waitable.cpp +++ b/rclcpp/src/rclcpp/waitable.cpp @@ -18,13 +18,6 @@ using rclcpp::Waitable; -Waitable::~Waitable() -{ - if (on_destruction_callback_) { - on_destruction_callback_(this); - } -} - size_t Waitable::get_number_of_ready_subscriptions() { diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index e65064f696..809f2ea2a7 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -416,6 +416,13 @@ class TestWaitable : public rclcpp::Waitable } } + ~TestWaitable() + { + if (on_destruction_callback_) { + on_destruction_callback_(this); + } + } + bool add_to_wait_set(rcl_wait_set_t * wait_set) override {