diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 96351f7528..3bd558de6e 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -59,6 +59,9 @@ set(${PROJECT_NAME}_SRCS src/rclcpp/executable_list.cpp src/rclcpp/executor.cpp src/rclcpp/executors.cpp + src/rclcpp/executors/executor_entities_collection.cpp + src/rclcpp/executors/executor_entities_collector.cpp + src/rclcpp/executors/executor_notify_waitable.cpp src/rclcpp/executors/multi_threaded_executor.cpp src/rclcpp/executors/single_threaded_executor.cpp src/rclcpp/executors/static_executor_entities_collector.cpp diff --git a/rclcpp/include/rclcpp/callback_group.hpp b/rclcpp/include/rclcpp/callback_group.hpp index 7d03edf343..97579fcf8c 100644 --- a/rclcpp/include/rclcpp/callback_group.hpp +++ b/rclcpp/include/rclcpp/callback_group.hpp @@ -93,11 +93,54 @@ class CallbackGroup * determines whether a callback group is automatically added to an executor * with the node with which it is associated. */ + [[deprecated("Use CallbackGroup constructor with context function argument")]] RCLCPP_PUBLIC explicit CallbackGroup( CallbackGroupType group_type, bool automatically_add_to_executor_with_node = true); + /// Constructor for CallbackGroup. + /** + * Callback Groups have a type, either 'Mutually Exclusive' or 'Reentrant' + * and when creating one the type must be specified. + * + * Callbacks in Reentrant Callback Groups must be able to: + * - run at the same time as themselves (reentrant) + * - run at the same time as other callbacks in their group + * - run at the same time as other callbacks in other groups + * + * Callbacks in Mutually Exclusive Callback Groups: + * - will not be run multiple times simultaneously (non-reentrant) + * - will not be run at the same time as other callbacks in their group + * - but must run at the same time as callbacks in other groups + * + * Additionally, callback groups have a property which determines whether or + * not they are added to an executor with their associated node automatically. + * When creating a callback group the automatically_add_to_executor_with_node + * argument determines this behavior, and if true it will cause the newly + * created callback group to be added to an executor with the node when the + * Executor::add_node method is used. + * If false, this callback group will not be added automatically and would + * have to be added to an executor manually using the + * Executor::add_callback_group method. + * + * Whether the node was added to the executor before creating the callback + * group, or after, is irrelevant; the callback group will be automatically + * added to the executor in either case. + * + * \param[in] group_type The type of the callback group. + * \param[in] get_node_context Lambda to retrieve the node context when + * checking that the creating node is valid and using the guard condition. + * \param[in] automatically_add_to_executor_with_node A boolean that + * determines whether a callback group is automatically added to an executor + * with the node with which it is associated. + */ + RCLCPP_PUBLIC + explicit CallbackGroup( + CallbackGroupType group_type, + std::function get_node_context, + bool automatically_add_to_executor_with_node = true); + /// Default destructor. RCLCPP_PUBLIC ~CallbackGroup(); @@ -178,11 +221,24 @@ class CallbackGroup bool automatically_add_to_executor_with_node() const; - /// Defer creating the notify guard condition and return it. + /// Retrieve the guard condition used to signal changes to this callback group. + /** + * \param[in] context_ptr context to use when creating the guard condition + * \return guard condition if it is valid, otherwise nullptr. + */ + [[deprecated("Use get_notify_guard_condition() without arguments")]] RCLCPP_PUBLIC rclcpp::GuardCondition::SharedPtr get_notify_guard_condition(const rclcpp::Context::SharedPtr context_ptr); + /// Retrieve the guard condition used to signal changes to this callback group. + /** + * \return guard condition if it is valid, otherwise nullptr. + */ + RCLCPP_PUBLIC + rclcpp::GuardCondition::SharedPtr + get_notify_guard_condition(); + /// Trigger the notify guard condition. RCLCPP_PUBLIC void @@ -234,6 +290,8 @@ class CallbackGroup std::shared_ptr notify_guard_condition_ = nullptr; std::recursive_mutex notify_guard_condition_mutex_; + std::function get_context_; + private: template typename TypeT::SharedPtr _find_ptrs_if_impl( diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 94a8488557..3e654faa54 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -637,8 +637,9 @@ class Executor std::atomic_bool spinning; /// Guard condition for signaling the rmw layer to wake up for special events. - rclcpp::GuardCondition interrupt_guard_condition_; + std::shared_ptr interrupt_guard_condition_; + /// Guard condition for signaling the rmw layer to wake up for system shutdown. std::shared_ptr shutdown_guard_condition_; /// Wait set for managing entities that the rmw layer waits on. diff --git a/rclcpp/include/rclcpp/executors/executor_entities_collection.hpp b/rclcpp/include/rclcpp/executors/executor_entities_collection.hpp new file mode 100644 index 0000000000..98a92ccdd8 --- /dev/null +++ b/rclcpp/include/rclcpp/executors/executor_entities_collection.hpp @@ -0,0 +1,212 @@ +// Copyright 2023 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXECUTORS__EXECUTOR_ENTITIES_COLLECTION_HPP_ +#define RCLCPP__EXECUTORS__EXECUTOR_ENTITIES_COLLECTION_HPP_ + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace rclcpp +{ +namespace executors +{ + +/// Structure to represent a single entity's entry in a collection +template +struct CollectionEntry +{ + /// Weak pointer to entity type + using EntityWeakPtr = typename EntityValueType::WeakPtr; + /// Shared pointer to entity type + using EntitySharedPtr = typename EntityValueType::SharedPtr; + + /// The entity + EntityWeakPtr entity; + + /// If relevant, the entity's corresponding callback_group + rclcpp::CallbackGroup::WeakPtr callback_group; +}; + +/// Update a collection based on another collection +/* + * Iterates update_from and update_to to see which entities have been added/removed between + * the two collections. + * + * For each new entry (in update_from, but not in update_to), + * add the entity and fire the on_added callback + * For each removed entry (in update_to, but not in update_from), + * remove the entity and fire the on_removed callback. + * + * \param[in] update_from The collection representing the next iteration's state + * \param[inout] update_to The collection representing the current iteration's state + * \param[in] on_added Callback fired when a new entity is detected + * \param[in] on_removed Callback fired when an entity is removed + */ +template +void update_entities( + const CollectionType & update_from, + CollectionType & update_to, + std::function on_added, + std::function on_removed +) +{ + for (auto it = update_to.begin(); it != update_to.end(); ) { + if (update_from.count(it->first) == 0) { + auto entity = it->second.entity.lock(); + if (entity) { + on_removed(entity); + } + it = update_to.erase(it); + } else { + ++it; + } + } + for (auto it = update_from.begin(); it != update_from.end(); ++it) { + if (update_to.count(it->first) == 0) { + auto entity = it->second.entity.lock(); + if (entity) { + on_added(entity); + } + update_to.insert(*it); + } + } +} + +/// A collection of entities, indexed by their corresponding handles +template +class EntityCollection + : public std::unordered_map> +{ +public: + /// Key type of the map + using Key = const EntityKeyType *; + + /// Weak pointer to entity type + using EntityWeakPtr = typename EntityValueType::WeakPtr; + + /// Shared pointer to entity type + using EntitySharedPtr = typename EntityValueType::SharedPtr; + + /// Update this collection based on the contents of another collection + /** + * Update the internal state of this collection, firing callbacks when entities have been + * added or removed. + * + * \param[in] other Collection to compare to + * \param[in] on_added Callback for when entities have been added + * \param[in] on_removed Callback for when entities have been removed + */ + void update( + const EntityCollection & other, + std::function on_added, + std::function on_removed) + { + update_entities(other, *this, on_added, on_removed); + } +}; + +/// Represent the total set of entities for a single executor +/** + * This allows the entities to be stored from ExecutorEntitiesCollector. + * The structure also makes in convenient to re-evaluate when entities have been added or removed. + */ +struct ExecutorEntitiesCollection +{ + /// Collection type for timer entities + using TimerCollection = EntityCollection; + + /// Collection type for subscription entities + using SubscriptionCollection = EntityCollection; + + /// Collection type for client entities + using ClientCollection = EntityCollection; + + /// Collection type for service entities + using ServiceCollection = EntityCollection; + + /// Collection type for waitable entities + using WaitableCollection = EntityCollection; + + /// Collection type for guard condition entities + using GuardConditionCollection = EntityCollection; + + /// Collection of timers currently in use by the executor. + TimerCollection timers; + + /// Collection of subscriptions currently in use by the executor. + SubscriptionCollection subscriptions; + + /// Collection of clients currently in use by the executor. + ClientCollection clients; + + /// Collection of services currently in use by the executor. + ServiceCollection services; + + /// Collection of guard conditions currently in use by the executor. + GuardConditionCollection guard_conditions; + + /// Collection of waitables currently in use by the executor. + WaitableCollection waitables; + + /// Check if the entities collection is empty + /** + * \return true if all member collections are empty, false otherwise + */ + bool empty() const; + + /// Clear the entities collection + void clear(); +}; + +/// Build an entities collection from callback groups +/** + * Iterates a list of callback groups and adds entities from each valid group + * + * \param[in] callback_groups List of callback groups to check for entities + * \param[inout] colletion Entities collection to populate with found entities + */ +void +build_entities_collection( + const std::vector & callback_groups, + ExecutorEntitiesCollection & collection); + +/// Build a queue of executables ready to be executed +/** + * Iterates a list of entities and adds them to a queue if they are ready. + * + * \param[in] collection Collection of entities corresponding to the current wait set. + * \param[in] wait_result Result of rclcpp::WaitSet::wait corresponding to the collection. + * \return A queue of executables that have been marked ready by the waitset. + */ +std::deque +ready_executables( + const ExecutorEntitiesCollection & collection, + rclcpp::WaitResult & wait_result +); + +} // namespace executors +} // namespace rclcpp + +#endif // RCLCPP__EXECUTORS__EXECUTOR_ENTITIES_COLLECTION_HPP_ diff --git a/rclcpp/include/rclcpp/executors/executor_entities_collector.hpp b/rclcpp/include/rclcpp/executors/executor_entities_collector.hpp new file mode 100644 index 0000000000..ad9bc84fad --- /dev/null +++ b/rclcpp/include/rclcpp/executors/executor_entities_collector.hpp @@ -0,0 +1,270 @@ +// Copyright 2023 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXECUTORS__EXECUTOR_ENTITIES_COLLECTOR_HPP_ +#define RCLCPP__EXECUTORS__EXECUTOR_ENTITIES_COLLECTOR_HPP_ + +#include +#include +#include +#include +#include + +#include "rcpputils/thread_safety_annotations.hpp" + +#include +#include +#include +#include +#include +#include +#include + +namespace rclcpp +{ +namespace executors +{ + +/// Class to monitor a set of nodes and callback groups for changes in entity membership +/** + * This is to be used with an executor to track the membership of various nodes, groups, + * and entities (timers, subscriptions, clients, services, etc) and report status to the + * executor. + * + * In general, users will add either nodes or callback groups to an executor. + * Each node may have callback groups that are automatically associated with executors, + * or callback groups that must be manually associated with an executor. + * + * This object tracks both types of callback groups as well as nodes that have been + * previously added to the executor. + * When a new callback group is added/removed or new entities are added/removed, the + * corresponding node or callback group will signal this to the executor so that the + * entity collection may be rebuilt according to that executor's implementation. + * + */ +class ExecutorEntitiesCollector +{ +public: + /// Constructor + /** + * \param[in] notify_waitable Waitable that is used to signal to the executor + * when nodes or callback groups have been added or removed. + */ + RCLCPP_PUBLIC + explicit ExecutorEntitiesCollector( + std::shared_ptr notify_waitable); + + /// Destructor + RCLCPP_PUBLIC + ~ExecutorEntitiesCollector(); + + /// Indicate if the entities collector has pending additions or removals. + /** + * \return true if there are pending additions or removals + */ + bool has_pending() const; + + /// Add a node to the entity collector + /** + * \param[in] node_ptr a shared pointer that points to a node base interface + * \throw std::runtime_error if the node is associated with an executor + */ + RCLCPP_PUBLIC + void + add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr); + + /// Remove a node from the entity collector + /** + * \param[in] node_ptr a shared pointer that points to a node base interface + * \throw std::runtime_error if the node is associated with an executor + * \throw std::runtime_error if the node is associated with this executor + */ + RCLCPP_PUBLIC + void + remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr); + + /// Add a callback group to the entity collector + /** + * \param[in] group_ptr a shared pointer that points to a callback group + * \throw std::runtime_error if the callback_group is associated with an executor + */ + RCLCPP_PUBLIC + void + add_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr); + + /// Remove a callback group from the entity collector + /** + * \param[in] group_ptr a shared pointer that points to a callback group + * \throw std::runtime_error if the callback_group is not associated with an executor + * \throw std::runtime_error if the callback_group is not associated with this executor + */ + RCLCPP_PUBLIC + void + remove_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr); + + /// Get all callback groups known to this entity collector + /** + * This will include manually added and automatically added (node associated) groups + * \return vector of all callback groups + */ + RCLCPP_PUBLIC + std::vector + get_all_callback_groups() const; + + /// Get manually-added callback groups known to this entity collector + /** + * This will include callback groups that have been added via add_callback_group + * \return vector of manually-added callback groups + */ + RCLCPP_PUBLIC + std::vector + get_manually_added_callback_groups() const; + + /// Get automatically-added callback groups known to this entity collector + /** + * This will include callback groups that are associated with nodes added via add_node + * \return vector of automatically-added callback groups + */ + RCLCPP_PUBLIC + std::vector + get_automatically_added_callback_groups() const; + + /// Update the underlying collections + /** + * This will prune nodes and callback groups that are no longer valid as well + * as add new callback groups from any associated nodes. + */ + RCLCPP_PUBLIC + void + update_collections(); + +protected: + using NodeCollection = std::set< + rclcpp::node_interfaces::NodeBaseInterface::WeakPtr, + std::owner_less>; + + using CallbackGroupCollection = std::set< + rclcpp::CallbackGroup::WeakPtr, + std::owner_less>; + + using WeakNodesToGuardConditionsMap = std::map< + rclcpp::node_interfaces::NodeBaseInterface::WeakPtr, + rclcpp::GuardCondition::WeakPtr, + std::owner_less>; + + using WeakGroupsToGuardConditionsMap = std::map< + rclcpp::CallbackGroup::WeakPtr, + rclcpp::GuardCondition::WeakPtr, + std::owner_less>; + + /// Implementation of removing a node from the collector. + /** + * This will disassociate the node from the collector and remove any + * automatically-added callback groups + * + * This takes and returns an iterator so it may be used as: + * + * it = remove_weak_node(it); + * + * \param[in] weak_node iterator to the weak node to be removed + * \return Valid updated iterator in the same collection + */ + RCLCPP_PUBLIC + NodeCollection::iterator + remove_weak_node(NodeCollection::iterator weak_node) RCPPUTILS_TSA_REQUIRES(mutex_); + + /// Implementation of removing a callback group from the collector. + /** + * This will disassociate the callback group from the collector + * + * This takes and returns an iterator so it may be used as: + * + * it = remove_weak_callback_group(it); + * + * \param[in] weak_group_it iterator to the weak group to be removed + * \param[in] collection the collection to remove the group from + * (manually or automatically added) + * \return Valid updated iterator in the same collection + */ + RCLCPP_PUBLIC + CallbackGroupCollection::iterator + remove_weak_callback_group( + CallbackGroupCollection::iterator weak_group_it, + CallbackGroupCollection & collection) RCPPUTILS_TSA_REQUIRES(mutex_); + + /// Implementation of adding a callback group + /** + * \param[in] group_ptr the group to add + * \param[in] collection the collection to add the group to + */ + RCLCPP_PUBLIC + void + add_callback_group_to_collection( + rclcpp::CallbackGroup::SharedPtr group_ptr, + CallbackGroupCollection & collection) RCPPUTILS_TSA_REQUIRES(mutex_); + + /// Iterate over queued added/remove nodes and callback_groups + RCLCPP_PUBLIC + void + process_queues() RCPPUTILS_TSA_REQUIRES(mutex_); + + /// Check a collection of nodes and add any new callback_groups that + /// are set to be automatically associated via the node. + RCLCPP_PUBLIC + void + add_automatically_associated_callback_groups( + const NodeCollection & nodes_to_check) RCPPUTILS_TSA_REQUIRES(mutex_); + + /// Check all nodes and group for expired weak pointers and remove them. + RCLCPP_PUBLIC + void + prune_invalid_nodes_and_groups() RCPPUTILS_TSA_REQUIRES(mutex_); + + /// mutex to protect collections and pending queues + mutable std::mutex mutex_; + + /// Callback groups that were added via `add_callback_group` + CallbackGroupCollection manually_added_groups_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + + /// Callback groups that were added by their association with added nodes + CallbackGroupCollection automatically_added_groups_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + + /// nodes that are associated with the executor + NodeCollection weak_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + + /// Track guard conditions associated with added nodes + WeakNodesToGuardConditionsMap weak_nodes_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + + /// Track guard conditions associated with added callback groups + WeakGroupsToGuardConditionsMap weak_groups_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + + /// nodes that have been added since the last update. + NodeCollection pending_added_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + + /// nodes that have been removed since the last update. + NodeCollection pending_removed_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + + /// callback groups that have been added since the last update. + CallbackGroupCollection pending_manually_added_groups_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + + /// callback groups that have been removed since the last update. + CallbackGroupCollection pending_manually_removed_groups_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + + /// Waitable to add guard conditions to + std::shared_ptr notify_waitable_; +}; +} // namespace executors +} // namespace rclcpp +// +#endif // RCLCPP__EXECUTORS__EXECUTOR_ENTITIES_COLLECTOR_HPP_ diff --git a/rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp b/rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp new file mode 100644 index 0000000000..88158952d9 --- /dev/null +++ b/rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp @@ -0,0 +1,129 @@ +// Copyright 2023 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXECUTORS__EXECUTOR_NOTIFY_WAITABLE_HPP_ +#define RCLCPP__EXECUTORS__EXECUTOR_NOTIFY_WAITABLE_HPP_ + +#include +#include +#include +#include + +#include "rclcpp/guard_condition.hpp" +#include "rclcpp/waitable.hpp" + +namespace rclcpp +{ +namespace executors +{ + +/// Maintain a collection of guard conditions from associated nodes and callback groups +/// to signal to the executor when associated entities have changed. +class ExecutorNotifyWaitable : public rclcpp::Waitable +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(ExecutorNotifyWaitable) + + // Constructor + /** + * \param[in] on_execute_callback Callback to execute when one of the conditions + * of this waitable has signaled the wait_set. + */ + RCLCPP_PUBLIC + explicit ExecutorNotifyWaitable(std::function on_execute_callback = {}); + + // Destructor + RCLCPP_PUBLIC + ~ExecutorNotifyWaitable() override = default; + + RCLCPP_PUBLIC + ExecutorNotifyWaitable(const ExecutorNotifyWaitable & other); + + + RCLCPP_PUBLIC + ExecutorNotifyWaitable & operator=(const ExecutorNotifyWaitable & other); + + /// Add conditions to the wait set + /** + * \param[inout] wait_set structure that conditions will be added to + */ + RCLCPP_PUBLIC + void + add_to_wait_set(rcl_wait_set_t * wait_set) override; + + /// Check conditions against the wait set + /** + * \param[inout] wait_set structure that internal elements will be checked against. + * \return true if this waitable is ready to be executed, false otherwise. + */ + RCLCPP_PUBLIC + bool + is_ready(rcl_wait_set_t * wait_set) override; + + /// Perform work associated with the waitable. + /** + * This will call the callback provided in the constructor. + * \param[in] data Data to be use for the execute, if available, else nullptr. + */ + RCLCPP_PUBLIC + void + execute(std::shared_ptr & data) override; + + /// Retrieve data to be used in the next execute call. + /** + * \return If available, data to be used, otherwise nullptr + */ + RCLCPP_PUBLIC + std::shared_ptr + take_data() override; + + /// Add a guard condition to be waited on. + /** + * \param[in] guard_condition The guard condition to add. + */ + RCLCPP_PUBLIC + void + add_guard_condition(rclcpp::GuardCondition::WeakPtr guard_condition); + + /// Remove a guard condition from being waited on. + /** + * \param[in] guard_condition The guard condition to remove. + */ + RCLCPP_PUBLIC + void + remove_guard_condition(rclcpp::GuardCondition::WeakPtr guard_condition); + + /// Get the number of ready guard_conditions + /** + * \return The number of guard_conditions associated with the Waitable. + */ + RCLCPP_PUBLIC + size_t + get_number_of_ready_guard_conditions() override; + +private: + /// Callback to run when waitable executes + std::function execute_callback_; + + std::mutex guard_condition_mutex_; + + /// The collection of guard conditions to be waited on. + std::set> notify_guard_conditions_; +}; + +} // namespace executors +} // namespace rclcpp + +#endif // RCLCPP__EXECUTORS__EXECUTOR_NOTIFY_WAITABLE_HPP_ diff --git a/rclcpp/include/rclcpp/node_interfaces/node_base.hpp b/rclcpp/include/rclcpp/node_interfaces/node_base.hpp index a6c84e4aa0..6173a08d50 100644 --- a/rclcpp/include/rclcpp/node_interfaces/node_base.hpp +++ b/rclcpp/include/rclcpp/node_interfaces/node_base.hpp @@ -121,10 +121,19 @@ class NodeBase : public NodeBaseInterface, public std::enable_shared_from_this notify_guard_condition_; bool notify_guard_condition_is_valid_; }; diff --git a/rclcpp/include/rclcpp/node_interfaces/node_base_interface.hpp b/rclcpp/include/rclcpp/node_interfaces/node_base_interface.hpp index fd4f64b22b..e5a3198275 100644 --- a/rclcpp/include/rclcpp/node_interfaces/node_base_interface.hpp +++ b/rclcpp/include/rclcpp/node_interfaces/node_base_interface.hpp @@ -148,13 +148,33 @@ class NodeBaseInterface /** * For example, this should be notified when a publisher is added or removed. * - * \return the GuardCondition if it is valid, else thow runtime error + * \return the GuardCondition if it is valid, else throw runtime error */ RCLCPP_PUBLIC virtual rclcpp::GuardCondition & get_notify_guard_condition() = 0; + /// Return a guard condition that should be notified when the internal node state changes. + /** + * For example, this should be notified when a publisher is added or removed. + * + * \return the GuardCondition if it is valid, else nullptr + */ + RCLCPP_PUBLIC + virtual + rclcpp::GuardCondition::SharedPtr + get_shared_notify_guard_condition() = 0; + + /// Trigger the guard condition that notifies of internal node state changes. + /** + * For example, this should be notified when a publisher is added or removed. + */ + RCLCPP_PUBLIC + virtual + void + trigger_notify_guard_condition() = 0; + /// Return the default preference for using intra process communication. RCLCPP_PUBLIC virtual diff --git a/rclcpp/src/rclcpp/callback_group.cpp b/rclcpp/src/rclcpp/callback_group.cpp index 734c781a69..753a441332 100644 --- a/rclcpp/src/rclcpp/callback_group.cpp +++ b/rclcpp/src/rclcpp/callback_group.cpp @@ -31,10 +31,12 @@ using rclcpp::CallbackGroupType; CallbackGroup::CallbackGroup( CallbackGroupType group_type, + std::function get_context, bool automatically_add_to_executor_with_node) : type_(group_type), associated_with_executor_(false), can_be_taken_from_(true), - automatically_add_to_executor_with_node_(automatically_add_to_executor_with_node) + automatically_add_to_executor_with_node_(automatically_add_to_executor_with_node), + get_context_(get_context) {} CallbackGroup::~CallbackGroup() @@ -111,6 +113,7 @@ CallbackGroup::automatically_add_to_executor_with_node() const return automatically_add_to_executor_with_node_; } +// \TODO(mjcarroll) Deprecated, remove on tock rclcpp::GuardCondition::SharedPtr CallbackGroup::get_notify_guard_condition(const rclcpp::Context::SharedPtr context_ptr) { @@ -129,6 +132,29 @@ CallbackGroup::get_notify_guard_condition(const rclcpp::Context::SharedPtr conte return notify_guard_condition_; } +rclcpp::GuardCondition::SharedPtr +CallbackGroup::get_notify_guard_condition() +{ + std::lock_guard lock(notify_guard_condition_mutex_); + if (!this->get_context_) { + throw std::runtime_error("Callback group was created without context and not passed context"); + } + auto context_ptr = this->get_context_(); + if (context_ptr && context_ptr->is_valid()) { + if (notify_guard_condition_ && context_ptr != notify_guard_condition_->get_context()) { + if (associated_with_executor_) { + trigger_notify_guard_condition(); + } + notify_guard_condition_ = nullptr; + } + if (!notify_guard_condition_) { + notify_guard_condition_ = std::make_shared(context_ptr); + } + return notify_guard_condition_; + } + return nullptr; +} + void CallbackGroup::trigger_notify_guard_condition() { diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 45059bb953..cb8d90c533 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -39,14 +39,11 @@ using namespace std::chrono_literals; using rclcpp::exceptions::throw_from_rcl_error; -using rclcpp::AnyExecutable; using rclcpp::Executor; -using rclcpp::ExecutorOptions; -using rclcpp::FutureReturnCode; Executor::Executor(const rclcpp::ExecutorOptions & options) : spinning(false), - interrupt_guard_condition_(options.context), + interrupt_guard_condition_(std::make_shared(options.context)), shutdown_guard_condition_(std::make_shared(options.context)), memory_strategy_(options.memory_strategy) { @@ -66,7 +63,7 @@ Executor::Executor(const rclcpp::ExecutorOptions & options) memory_strategy_->add_guard_condition(*shutdown_guard_condition_.get()); // Put the executor's guard condition in - memory_strategy_->add_guard_condition(interrupt_guard_condition_); + memory_strategy_->add_guard_condition(*interrupt_guard_condition_.get()); rcl_allocator_t allocator = memory_strategy_->get_allocator(); rcl_ret_t ret = rcl_wait_set_init( @@ -128,7 +125,7 @@ Executor::~Executor() } // Remove and release the sigint guard condition memory_strategy_->remove_guard_condition(shutdown_guard_condition_.get()); - memory_strategy_->remove_guard_condition(&interrupt_guard_condition_); + memory_strategy_->remove_guard_condition(interrupt_guard_condition_.get()); // Remove shutdown callback handle registered to Context if (!context_->remove_on_shutdown_callback(shutdown_callback_handle_)) { @@ -223,8 +220,7 @@ Executor::add_callback_group_to_map( weak_groups_to_nodes_.insert(std::make_pair(weak_group_ptr, node_ptr)); if (node_ptr->get_context()->is_valid()) { - auto callback_group_guard_condition = - group_ptr->get_notify_guard_condition(node_ptr->get_context()); + auto callback_group_guard_condition = group_ptr->get_notify_guard_condition(); weak_groups_to_guard_conditions_[weak_group_ptr] = callback_group_guard_condition.get(); // Add the callback_group's notify condition to the guard condition handles memory_strategy_->add_guard_condition(*callback_group_guard_condition); @@ -233,7 +229,7 @@ Executor::add_callback_group_to_map( if (notify) { // Interrupt waiting to handle new node try { - interrupt_guard_condition_.trigger(); + interrupt_guard_condition_->trigger(); } catch (const rclcpp::exceptions::RCLError & ex) { throw std::runtime_error( std::string( @@ -281,10 +277,10 @@ Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_pt } }); - const auto & gc = node_ptr->get_notify_guard_condition(); - weak_nodes_to_guard_conditions_[node_ptr] = &gc; + const auto gc = node_ptr->get_shared_notify_guard_condition(); + weak_nodes_to_guard_conditions_[node_ptr] = gc.get(); // Add the node's notify condition to the guard condition handles - memory_strategy_->add_guard_condition(gc); + memory_strategy_->add_guard_condition(*gc); weak_nodes_.push_back(node_ptr); } @@ -321,7 +317,7 @@ Executor::remove_callback_group_from_map( if (notify) { try { - interrupt_guard_condition_.trigger(); + interrupt_guard_condition_->trigger(); } catch (const rclcpp::exceptions::RCLError & ex) { throw std::runtime_error( std::string( @@ -389,7 +385,7 @@ Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node } } - memory_strategy_->remove_guard_condition(&node_ptr->get_notify_guard_condition()); + memory_strategy_->remove_guard_condition(node_ptr->get_shared_notify_guard_condition().get()); weak_nodes_to_guard_conditions_.erase(node_ptr); std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic(); @@ -502,7 +498,7 @@ Executor::cancel() { spinning.store(false); try { - interrupt_guard_condition_.trigger(); + interrupt_guard_condition_->trigger(); } catch (const rclcpp::exceptions::RCLError & ex) { throw std::runtime_error( std::string("Failed to trigger guard condition in cancel: ") + ex.what()); @@ -551,7 +547,7 @@ Executor::execute_any_executable(AnyExecutable & any_exec) // Wake the wait, because it may need to be recalculated or work that // was previously blocked is now available. try { - interrupt_guard_condition_.trigger(); + interrupt_guard_condition_->trigger(); } catch (const rclcpp::exceptions::RCLError & ex) { throw std::runtime_error( std::string( diff --git a/rclcpp/src/rclcpp/executors/executor_entities_collection.cpp b/rclcpp/src/rclcpp/executors/executor_entities_collection.cpp new file mode 100644 index 0000000000..567b28014e --- /dev/null +++ b/rclcpp/src/rclcpp/executors/executor_entities_collection.cpp @@ -0,0 +1,203 @@ +// Copyright 2023 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rclcpp/executors/executor_entities_collection.hpp" + +namespace rclcpp +{ +namespace executors +{ +bool ExecutorEntitiesCollection::empty() const +{ + return subscriptions.empty() && + timers.empty() && + guard_conditions.empty() && + clients.empty() && + services.empty() && + waitables.empty(); +} + +void ExecutorEntitiesCollection::clear() +{ + subscriptions.clear(); + timers.clear(); + guard_conditions.clear(); + clients.clear(); + services.clear(); + waitables.clear(); +} + + +void +build_entities_collection( + const std::vector & callback_groups, + ExecutorEntitiesCollection & collection) +{ + collection.clear(); + + for (auto weak_group_ptr : callback_groups) { + auto group_ptr = weak_group_ptr.lock(); + if (!group_ptr) { + continue; + } + + if (group_ptr->can_be_taken_from().load()) { + group_ptr->collect_all_ptrs( + [&collection, weak_group_ptr](const rclcpp::SubscriptionBase::SharedPtr & subscription) { + collection.subscriptions.insert( + { + subscription->get_subscription_handle().get(), + {subscription, weak_group_ptr} + }); + }, + [&collection, weak_group_ptr](const rclcpp::ServiceBase::SharedPtr & service) { + collection.services.insert( + { + service->get_service_handle().get(), + {service, weak_group_ptr} + }); + }, + [&collection, weak_group_ptr](const rclcpp::ClientBase::SharedPtr & client) { + collection.clients.insert( + { + client->get_client_handle().get(), + {client, weak_group_ptr} + }); + }, + [&collection, weak_group_ptr](const rclcpp::TimerBase::SharedPtr & timer) { + collection.timers.insert( + { + timer->get_timer_handle().get(), + {timer, weak_group_ptr} + }); + }, + [&collection, weak_group_ptr](const rclcpp::Waitable::SharedPtr & waitable) { + collection.waitables.insert( + { + waitable.get(), + {waitable, weak_group_ptr} + }); + } + ); + } + } +} + +template +void check_ready( + EntityCollectionType & collection, + std::deque & executables, + size_t size_of_waited_entities, + typename EntityCollectionType::Key * waited_entities, + std::function fill_executable) +{ + for (size_t ii = 0; ii < size_of_waited_entities; ++ii) { + if (!waited_entities[ii]) {continue;} + auto entity_iter = collection.find(waited_entities[ii]); + if (entity_iter != collection.end()) { + auto entity = entity_iter->second.entity.lock(); + if (!entity) { + continue; + } + + auto callback_group = entity_iter->second.callback_group.lock(); + if (callback_group && !callback_group->can_be_taken_from().load()) { + continue; + } + rclcpp::AnyExecutable exec; + + exec.callback_group = callback_group; + if (fill_executable(exec, entity)) { + executables.push_back(exec); + } + } + } +} + +std::deque +ready_executables( + const ExecutorEntitiesCollection & collection, + rclcpp::WaitResult & wait_result +) +{ + std::deque ret; + + if (wait_result.kind() != rclcpp::WaitResultKind::Ready) { + return ret; + } + auto rcl_wait_set = wait_result.get_wait_set().get_rcl_wait_set(); + check_ready( + collection.timers, + ret, + rcl_wait_set.size_of_timers, + rcl_wait_set.timers, + [](rclcpp::AnyExecutable & exec, auto timer) { + if (!timer->call()) { + return false; + } + exec.timer = timer; + return true; + }); + + check_ready( + collection.subscriptions, + ret, + rcl_wait_set.size_of_subscriptions, + rcl_wait_set.subscriptions, + [](rclcpp::AnyExecutable & exec, auto subscription) { + exec.subscription = subscription; + return true; + }); + + + check_ready( + collection.services, + ret, + rcl_wait_set.size_of_services, + rcl_wait_set.services, + [](rclcpp::AnyExecutable & exec, auto service) { + exec.service = service; + return true; + }); + + check_ready( + collection.clients, + ret, + rcl_wait_set.size_of_clients, + rcl_wait_set.clients, + [](rclcpp::AnyExecutable & exec, auto client) { + exec.client = client; + return true; + }); + + for (auto & [handle, entry] : collection.waitables) { + auto waitable = entry.entity.lock(); + if (waitable && waitable->is_ready(&rcl_wait_set)) { + auto group = entry.callback_group.lock(); + if (group && !group->can_be_taken_from().load()) { + continue; + } + + rclcpp::AnyExecutable exec; + exec.waitable = waitable; + exec.callback_group = group; + ret.push_back(exec); + } + } + return ret; +} + +} // namespace executors +} // namespace rclcpp diff --git a/rclcpp/src/rclcpp/executors/executor_entities_collector.cpp b/rclcpp/src/rclcpp/executors/executor_entities_collector.cpp new file mode 100644 index 0000000000..84ada64925 --- /dev/null +++ b/rclcpp/src/rclcpp/executors/executor_entities_collector.cpp @@ -0,0 +1,416 @@ +// Copyright 2023 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "rclcpp/executors/executor_entities_collector.hpp" +#include "rclcpp/executors/executor_notify_waitable.hpp" +#include "rclcpp/node_interfaces/node_base_interface.hpp" + +namespace rclcpp +{ +namespace executors +{ + +ExecutorEntitiesCollector::ExecutorEntitiesCollector( + std::shared_ptr notify_waitable) +: notify_waitable_(notify_waitable) +{ +} + +ExecutorEntitiesCollector::~ExecutorEntitiesCollector() +{ + for (auto weak_node_it = weak_nodes_.begin(); weak_node_it != weak_nodes_.end(); ) { + weak_node_it = remove_weak_node(weak_node_it); + } + + for (auto weak_group_it = automatically_added_groups_.begin(); + weak_group_it != automatically_added_groups_.end(); ) + { + weak_group_it = remove_weak_callback_group(weak_group_it, automatically_added_groups_); + } + + for (auto weak_group_it = manually_added_groups_.begin(); + weak_group_it != manually_added_groups_.end(); ) + { + weak_group_it = remove_weak_callback_group(weak_group_it, manually_added_groups_); + } + + for (auto weak_node_ptr : pending_added_nodes_) { + auto node_ptr = weak_node_ptr.lock(); + if (node_ptr) { + node_ptr->get_associated_with_executor_atomic().store(false); + } + } + pending_added_nodes_.clear(); + pending_removed_nodes_.clear(); + + for (auto weak_group_ptr : pending_manually_added_groups_) { + auto group_ptr = weak_group_ptr.lock(); + if (group_ptr) { + group_ptr->get_associated_with_executor_atomic().store(false); + } + } + pending_manually_added_groups_.clear(); + pending_manually_removed_groups_.clear(); +} + +bool +ExecutorEntitiesCollector::has_pending() const +{ + std::lock_guard lock(mutex_); + return pending_manually_added_groups_.size() != 0 || + pending_manually_removed_groups_.size() != 0 || + pending_added_nodes_.size() != 0 || + pending_removed_nodes_.size() != 0; +} + +void +ExecutorEntitiesCollector::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr) +{ + // If the node already has an executor + std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic(); + if (has_executor.exchange(true)) { + throw std::runtime_error( + std::string("Node '") + node_ptr->get_fully_qualified_name() + + "' has already been added to an executor."); + } + + std::lock_guard lock(mutex_); + bool associated = weak_nodes_.count(node_ptr) != 0; + bool add_queued = pending_added_nodes_.count(node_ptr) != 0; + bool remove_queued = pending_removed_nodes_.count(node_ptr) != 0; + + if ((associated || add_queued) && !remove_queued) { + throw std::runtime_error( + std::string("Node '") + node_ptr->get_fully_qualified_name() + + "' has already been added to this executor."); + } + + this->pending_added_nodes_.insert(node_ptr); +} + +void +ExecutorEntitiesCollector::remove_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr) +{ + if (!node_ptr->get_associated_with_executor_atomic().load()) { + throw std::runtime_error( + std::string("Node '") + node_ptr->get_fully_qualified_name() + + "' needs to be associated with an executor."); + } + + std::lock_guard lock(mutex_); + bool associated = weak_nodes_.count(node_ptr) != 0; + bool add_queued = pending_added_nodes_.count(node_ptr) != 0; + bool remove_queued = pending_removed_nodes_.count(node_ptr) != 0; + + if (!(associated || add_queued) || remove_queued) { + throw std::runtime_error( + std::string("Node '") + node_ptr->get_fully_qualified_name() + + "' needs to be associated with this executor."); + } + + this->pending_removed_nodes_.insert(node_ptr); +} + +void +ExecutorEntitiesCollector::add_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr) +{ + std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic(); + if (has_executor.exchange(true)) { + throw std::runtime_error("Callback group has already been added to an executor."); + } + + std::lock_guard lock(mutex_); + bool associated = manually_added_groups_.count(group_ptr) != 0; + bool add_queued = pending_manually_added_groups_.count(group_ptr) != 0; + bool remove_queued = pending_manually_removed_groups_.count(group_ptr) != 0; + + if ((associated || add_queued) && !remove_queued) { + throw std::runtime_error("Callback group has already been added to this executor."); + } + + this->pending_manually_added_groups_.insert(group_ptr); +} + +void +ExecutorEntitiesCollector::remove_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr) +{ + if (!group_ptr->get_associated_with_executor_atomic().load()) { + throw std::runtime_error("Callback group needs to be associated with an executor."); + } + /** + * TODO(mjcarroll): The callback groups, being created by a node, should never outlive + * the node. Since we haven't historically enforced this, turning this on may cause + * previously-functional code to fail. + * Consider re-enablng this check (along with corresponding CallbackGroup::has_valid_node), + * when we can guarantee node/group lifetimes. + if (!group_ptr->has_valid_node()) { + throw std::runtime_error("Node must not be deleted before its callback group(s)."); + } + */ + + auto weak_group_ptr = rclcpp::CallbackGroup::WeakPtr(group_ptr); + std::lock_guard lock(mutex_); + bool associated = manually_added_groups_.count(group_ptr) != 0; + bool add_queued = pending_manually_added_groups_.count(group_ptr) != 0; + bool remove_queued = pending_manually_removed_groups_.count(group_ptr) != 0; + + if (!(associated || add_queued) || remove_queued) { + throw std::runtime_error("Callback group needs to be associated with this executor."); + } + + this->pending_manually_removed_groups_.insert(group_ptr); +} + +std::vector +ExecutorEntitiesCollector::get_all_callback_groups() const +{ + std::vector groups; + std::lock_guard lock(mutex_); + for (const auto & group_ptr : manually_added_groups_) { + groups.push_back(group_ptr); + } + for (auto const & group_ptr : automatically_added_groups_) { + groups.push_back(group_ptr); + } + return groups; +} + +std::vector +ExecutorEntitiesCollector::get_manually_added_callback_groups() const +{ + std::vector groups; + std::lock_guard lock(mutex_); + for (const auto & group_ptr : manually_added_groups_) { + groups.push_back(group_ptr); + } + return groups; +} + +std::vector +ExecutorEntitiesCollector::get_automatically_added_callback_groups() const +{ + std::vector groups; + std::lock_guard lock(mutex_); + for (auto const & group_ptr : automatically_added_groups_) { + groups.push_back(group_ptr); + } + return groups; +} + +void +ExecutorEntitiesCollector::update_collections() +{ + std::lock_guard lock(mutex_); + this->process_queues(); + this->add_automatically_associated_callback_groups(this->weak_nodes_); + this->prune_invalid_nodes_and_groups(); +} + +ExecutorEntitiesCollector::NodeCollection::iterator +ExecutorEntitiesCollector::remove_weak_node(NodeCollection::iterator weak_node) +{ + // Disassociate the guard condition from the executor notify waitable + auto guard_condition_it = weak_nodes_to_guard_conditions_.find(*weak_node); + if (guard_condition_it != weak_nodes_to_guard_conditions_.end()) { + this->notify_waitable_->remove_guard_condition(guard_condition_it->second); + weak_nodes_to_guard_conditions_.erase(guard_condition_it); + } + + // Mark the node as disassociated (if the node is still valid) + auto node_ptr = weak_node->lock(); + if (node_ptr) { + std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic(); + has_executor.store(false); + } + + // Remove the node from tracked nodes + return weak_nodes_.erase(weak_node); +} + +ExecutorEntitiesCollector::CallbackGroupCollection::iterator +ExecutorEntitiesCollector::remove_weak_callback_group( + CallbackGroupCollection::iterator weak_group_it, + CallbackGroupCollection & collection +) +{ + // Disassociate the guard condition from the executor notify waitable + auto guard_condition_it = weak_groups_to_guard_conditions_.find(*weak_group_it); + if (guard_condition_it != weak_groups_to_guard_conditions_.end()) { + this->notify_waitable_->remove_guard_condition(guard_condition_it->second); + weak_groups_to_guard_conditions_.erase(guard_condition_it); + } + + // Mark the node as disassociated (if the group is still valid) + auto group_ptr = weak_group_it->lock(); + if (group_ptr) { + /** + * TODO(mjcarroll): The callback groups, being created by a node, should never outlive + * the node. Since we haven't historically enforced this, turning this on may cause + * previously-functional code to fail. + * Consider re-enablng this check (along with corresponding CallbackGroup::has_valid_node), + * when we can guarantee node/group lifetimes. + if (!group_ptr->has_valid_node()) { + throw std::runtime_error("Node must not be deleted before its callback group(s)."); + } + */ + std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic(); + has_executor.store(false); + } + + // Remove the node from tracked nodes + return collection.erase(weak_group_it); +} + +void +ExecutorEntitiesCollector::add_callback_group_to_collection( + rclcpp::CallbackGroup::SharedPtr group_ptr, + CallbackGroupCollection & collection) +{ + auto iter = collection.insert(group_ptr); + if (iter.second == false) { + throw std::runtime_error("Callback group has already been added to this executor."); + } + + // Store node guard condition in map and add it to the notify waitable + auto group_guard_condition = group_ptr->get_notify_guard_condition(); + weak_groups_to_guard_conditions_.insert({group_ptr, group_guard_condition}); + this->notify_waitable_->add_guard_condition(group_guard_condition); +} + +void +ExecutorEntitiesCollector::process_queues() +{ + for (auto weak_node_ptr : pending_added_nodes_) { + auto node_ptr = weak_node_ptr.lock(); + if (!node_ptr) { + continue; + } + weak_nodes_.insert(weak_node_ptr); + this->add_automatically_associated_callback_groups({weak_node_ptr}); + + // Store node guard condition in map and add it to the notify waitable + auto node_guard_condition = node_ptr->get_shared_notify_guard_condition(); + weak_nodes_to_guard_conditions_.insert({weak_node_ptr, node_guard_condition}); + this->notify_waitable_->add_guard_condition(node_guard_condition); + } + pending_added_nodes_.clear(); + + for (auto weak_node_ptr : pending_removed_nodes_) { + auto node_it = weak_nodes_.find(weak_node_ptr); + if (node_it != weak_nodes_.end()) { + remove_weak_node(node_it); + } else { + throw std::runtime_error("Node needs to be associated with this executor."); + } + + auto node_ptr = weak_node_ptr.lock(); + if (node_ptr) { + for (auto group_it = automatically_added_groups_.begin(); + group_it != automatically_added_groups_.end(); ) + { + auto group_ptr = group_it->lock(); + if (node_ptr->callback_group_in_node(group_ptr)) { + group_it = remove_weak_callback_group(group_it, automatically_added_groups_); + } else { + ++group_it; + } + } + } + } + pending_removed_nodes_.clear(); + + for (auto weak_group_ptr : pending_manually_added_groups_) { + auto group_ptr = weak_group_ptr.lock(); + if (group_ptr) { + this->add_callback_group_to_collection(group_ptr, manually_added_groups_); + } + } + pending_manually_added_groups_.clear(); + + for (auto weak_group_ptr : pending_manually_removed_groups_) { + auto group_ptr = weak_group_ptr.lock(); + if (group_ptr) { + auto group_it = manually_added_groups_.find(group_ptr); + if (group_it != manually_added_groups_.end()) { + remove_weak_callback_group(group_it, manually_added_groups_); + } else { + throw std::runtime_error( + "Attempting to remove a callback group not added to this executor."); + } + } + } + pending_manually_removed_groups_.clear(); +} + +void +ExecutorEntitiesCollector::add_automatically_associated_callback_groups( + const NodeCollection & nodes_to_check) +{ + for (auto & weak_node : nodes_to_check) { + auto node = weak_node.lock(); + if (node) { + node->for_each_callback_group( + [this, node](rclcpp::CallbackGroup::SharedPtr group_ptr) + { + if (!group_ptr->get_associated_with_executor_atomic().load() && + group_ptr->automatically_add_to_executor_with_node()) + { + std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic(); + if (has_executor.exchange(true)) { + throw std::runtime_error("Callback group has already been added to an executor."); + } + this->add_callback_group_to_collection(group_ptr, this->automatically_added_groups_); + } + }); + } + } +} + +void +ExecutorEntitiesCollector::prune_invalid_nodes_and_groups() +{ + for (auto node_it = weak_nodes_.begin(); + node_it != weak_nodes_.end(); ) + { + if (node_it->expired()) { + node_it = remove_weak_node(node_it); + } else { + node_it++; + } + } + for (auto group_it = automatically_added_groups_.begin(); + group_it != automatically_added_groups_.end(); ) + { + if (group_it->expired()) { + group_it = remove_weak_callback_group(group_it, automatically_added_groups_); + } else { + group_it++; + } + } + for (auto group_it = manually_added_groups_.begin(); + group_it != manually_added_groups_.end(); ) + { + if (group_it->expired()) { + group_it = remove_weak_callback_group(group_it, manually_added_groups_); + } else { + group_it++; + } + } +} + +} // namespace executors +} // namespace rclcpp diff --git a/rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp b/rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp new file mode 100644 index 0000000000..c0ad8a25a4 --- /dev/null +++ b/rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp @@ -0,0 +1,129 @@ +// Copyright 2023 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "rclcpp/exceptions.hpp" +#include "rclcpp/executors/executor_notify_waitable.hpp" + +namespace rclcpp +{ +namespace executors +{ + +ExecutorNotifyWaitable::ExecutorNotifyWaitable(std::function on_execute_callback) +: execute_callback_(on_execute_callback) +{ +} + +ExecutorNotifyWaitable::ExecutorNotifyWaitable(const ExecutorNotifyWaitable & other) +: ExecutorNotifyWaitable(other.execute_callback_) +{ + this->notify_guard_conditions_ = other.notify_guard_conditions_; +} + +ExecutorNotifyWaitable & ExecutorNotifyWaitable::operator=(const ExecutorNotifyWaitable & other) +{ + if (this != &other) { + this->execute_callback_ = other.execute_callback_; + this->notify_guard_conditions_ = other.notify_guard_conditions_; + } + return *this; +} + +void +ExecutorNotifyWaitable::add_to_wait_set(rcl_wait_set_t * wait_set) +{ + std::lock_guard lock(guard_condition_mutex_); + + for (auto weak_guard_condition : this->notify_guard_conditions_) { + auto guard_condition = weak_guard_condition.lock(); + if (guard_condition) { + auto rcl_guard_condition = &guard_condition->get_rcl_guard_condition(); + + rcl_ret_t ret = rcl_wait_set_add_guard_condition( + wait_set, + rcl_guard_condition, NULL); + + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error( + ret, "failed to add guard condition to wait set"); + } + } + } +} + +bool +ExecutorNotifyWaitable::is_ready(rcl_wait_set_t * wait_set) +{ + std::lock_guard lock(guard_condition_mutex_); + + bool any_ready = false; + for (size_t ii = 0; ii < wait_set->size_of_guard_conditions; ++ii) { + auto rcl_guard_condition = wait_set->guard_conditions[ii]; + + if (nullptr == rcl_guard_condition) { + continue; + } + for (auto weak_guard_condition : this->notify_guard_conditions_) { + auto guard_condition = weak_guard_condition.lock(); + if (guard_condition && &guard_condition->get_rcl_guard_condition() == rcl_guard_condition) { + any_ready = true; + } + } + } + return any_ready; +} + +void +ExecutorNotifyWaitable::execute(std::shared_ptr & data) +{ + (void) data; + this->execute_callback_(); +} + +std::shared_ptr +ExecutorNotifyWaitable::take_data() +{ + return nullptr; +} + +void +ExecutorNotifyWaitable::add_guard_condition(rclcpp::GuardCondition::WeakPtr weak_guard_condition) +{ + std::lock_guard lock(guard_condition_mutex_); + auto guard_condition = weak_guard_condition.lock(); + if (guard_condition && notify_guard_conditions_.count(weak_guard_condition) == 0) { + notify_guard_conditions_.insert(weak_guard_condition); + } +} + +void +ExecutorNotifyWaitable::remove_guard_condition(rclcpp::GuardCondition::WeakPtr guard_condition) +{ + std::lock_guard lock(guard_condition_mutex_); + if (notify_guard_conditions_.count(guard_condition) != 0) { + notify_guard_conditions_.erase(guard_condition); + } +} + +size_t +ExecutorNotifyWaitable::get_number_of_ready_guard_conditions() +{ + std::lock_guard lock(guard_condition_mutex_); + return notify_guard_conditions_.size(); +} + +} // namespace executors +} // namespace rclcpp diff --git a/rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp b/rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp index bf50e062f3..6fd0b56a85 100644 --- a/rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp +++ b/rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp @@ -109,8 +109,8 @@ StaticExecutorEntitiesCollector::execute(std::shared_ptr & data) std::lock_guard guard{new_nodes_mutex_}; for (const auto & weak_node : new_nodes_) { if (auto node_ptr = weak_node.lock()) { - const auto & gc = node_ptr->get_notify_guard_condition(); - weak_nodes_to_guard_conditions_[node_ptr] = &gc; + weak_nodes_to_guard_conditions_[node_ptr] = + node_ptr->get_shared_notify_guard_condition().get(); } } new_nodes_.clear(); diff --git a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp index 209fcde556..3c14b37b45 100644 --- a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp @@ -139,7 +139,7 @@ StaticSingleThreadedExecutor::add_callback_group( bool is_new_node = entities_collector_->add_callback_group(group_ptr, node_ptr); if (is_new_node && notify) { // Interrupt waiting to handle new node - interrupt_guard_condition_.trigger(); + interrupt_guard_condition_->trigger(); } } @@ -150,7 +150,7 @@ StaticSingleThreadedExecutor::add_node( bool is_new_node = entities_collector_->add_node(node_ptr); if (is_new_node && notify) { // Interrupt waiting to handle new node - interrupt_guard_condition_.trigger(); + interrupt_guard_condition_->trigger(); } } @@ -167,7 +167,7 @@ StaticSingleThreadedExecutor::remove_callback_group( bool node_removed = entities_collector_->remove_callback_group(group_ptr); // If the node was matched and removed, interrupt waiting if (node_removed && notify) { - interrupt_guard_condition_.trigger(); + interrupt_guard_condition_->trigger(); } } @@ -181,7 +181,7 @@ StaticSingleThreadedExecutor::remove_node( } // If the node was matched and removed, interrupt waiting if (notify) { - interrupt_guard_condition_.trigger(); + interrupt_guard_condition_->trigger(); } } diff --git a/rclcpp/src/rclcpp/node_interfaces/node_base.cpp b/rclcpp/src/rclcpp/node_interfaces/node_base.cpp index 89d8acf6fa..6544d69214 100644 --- a/rclcpp/src/rclcpp/node_interfaces/node_base.cpp +++ b/rclcpp/src/rclcpp/node_interfaces/node_base.cpp @@ -45,7 +45,7 @@ NodeBase::NodeBase( node_handle_(nullptr), default_callback_group_(default_callback_group), associated_with_executor_(false), - notify_guard_condition_(context), + notify_guard_condition_(std::make_shared(context)), notify_guard_condition_is_valid_(false) { // Create the rcl node and store it in a shared_ptr with a custom destructor. @@ -132,8 +132,10 @@ NodeBase::NodeBase( // Create the default callback group, if needed. if (nullptr == default_callback_group_) { using rclcpp::CallbackGroupType; + // Default callback group is mutually exclusive and automatically associated with + // any executors that this node is added to. default_callback_group_ = - NodeBase::create_callback_group(CallbackGroupType::MutuallyExclusive); + NodeBase::create_callback_group(CallbackGroupType::MutuallyExclusive, true); } // Indicate the notify_guard_condition is now valid. @@ -202,11 +204,27 @@ NodeBase::create_callback_group( rclcpp::CallbackGroupType group_type, bool automatically_add_to_executor_with_node) { + auto weak_context = this->get_context()->weak_from_this(); + auto get_node_context = [weak_context]() -> rclcpp::Context::SharedPtr { + return weak_context.lock(); + }; + auto group = std::make_shared( group_type, + get_node_context, automatically_add_to_executor_with_node); std::lock_guard lock(callback_groups_mutex_); callback_groups_.push_back(group); + + // This guard condition is generally used to signal to this node's executor that a callback + // group has been added that should be considered for new entities. + // If this is creating the default callback group, then the notify guard condition won't be + // ready or needed yet, as the node is not done being constructed and therefore cannot be added. + // If the callback group is not automatically associated with this node's executors, then + // triggering the guard condition is also unnecessary, it will be manually added to an exector. + if (notify_guard_condition_is_valid_ && automatically_add_to_executor_with_node) { + this->trigger_notify_guard_condition(); + } return group; } @@ -253,9 +271,29 @@ NodeBase::get_notify_guard_condition() if (!notify_guard_condition_is_valid_) { throw std::runtime_error("failed to get notify guard condition because it is invalid"); } + return *notify_guard_condition_; +} + +rclcpp::GuardCondition::SharedPtr +NodeBase::get_shared_notify_guard_condition() +{ + std::lock_guard notify_condition_lock(notify_guard_condition_mutex_); + if (!notify_guard_condition_is_valid_) { + return nullptr; + } return notify_guard_condition_; } +void +NodeBase::trigger_notify_guard_condition() +{ + std::lock_guard notify_condition_lock(notify_guard_condition_mutex_); + if (!notify_guard_condition_is_valid_) { + throw std::runtime_error("failed to trigger notify guard condition because it is invalid"); + } + notify_guard_condition_->trigger(); +} + bool NodeBase::get_use_intra_process_default() const { diff --git a/rclcpp/src/rclcpp/node_interfaces/node_graph.cpp b/rclcpp/src/rclcpp/node_interfaces/node_graph.cpp index 6f6ed13be7..1228703cb6 100644 --- a/rclcpp/src/rclcpp/node_interfaces/node_graph.cpp +++ b/rclcpp/src/rclcpp/node_interfaces/node_graph.cpp @@ -533,9 +533,8 @@ NodeGraph::notify_graph_change() } } graph_cv_.notify_all(); - auto & node_gc = node_base_->get_notify_guard_condition(); try { - node_gc.trigger(); + node_base_->trigger_notify_guard_condition(); } catch (const rclcpp::exceptions::RCLError & ex) { throw std::runtime_error( std::string("failed to notify wait set on graph change: ") + ex.what()); diff --git a/rclcpp/src/rclcpp/node_interfaces/node_services.cpp b/rclcpp/src/rclcpp/node_interfaces/node_services.cpp index 2f1afd3224..e9c4a5266e 100644 --- a/rclcpp/src/rclcpp/node_interfaces/node_services.cpp +++ b/rclcpp/src/rclcpp/node_interfaces/node_services.cpp @@ -42,9 +42,8 @@ NodeServices::add_service( group->add_service(service_base_ptr); // Notify the executor that a new service was created using the parent Node. - auto & node_gc = node_base_->get_notify_guard_condition(); try { - node_gc.trigger(); + node_base_->trigger_notify_guard_condition(); group->trigger_notify_guard_condition(); } catch (const rclcpp::exceptions::RCLError & ex) { throw std::runtime_error( @@ -69,9 +68,8 @@ NodeServices::add_client( group->add_client(client_base_ptr); // Notify the executor that a new client was created using the parent Node. - auto & node_gc = node_base_->get_notify_guard_condition(); try { - node_gc.trigger(); + node_base_->trigger_notify_guard_condition(); group->trigger_notify_guard_condition(); } catch (const rclcpp::exceptions::RCLError & ex) { throw std::runtime_error( diff --git a/rclcpp/src/rclcpp/node_interfaces/node_timers.cpp b/rclcpp/src/rclcpp/node_interfaces/node_timers.cpp index d2e821a9e6..96097def22 100644 --- a/rclcpp/src/rclcpp/node_interfaces/node_timers.cpp +++ b/rclcpp/src/rclcpp/node_interfaces/node_timers.cpp @@ -42,9 +42,8 @@ NodeTimers::add_timer( } callback_group->add_timer(timer); - auto & node_gc = node_base_->get_notify_guard_condition(); try { - node_gc.trigger(); + node_base_->trigger_notify_guard_condition(); callback_group->trigger_notify_guard_condition(); } catch (const rclcpp::exceptions::RCLError & ex) { throw std::runtime_error( diff --git a/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp b/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp index 167a35f35d..659129d62c 100644 --- a/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp +++ b/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp @@ -70,9 +70,8 @@ NodeTopics::add_publisher( } // Notify the executor that a new publisher was created using the parent Node. - auto & node_gc = node_base_->get_notify_guard_condition(); try { - node_gc.trigger(); + node_base_->trigger_notify_guard_condition(); callback_group->trigger_notify_guard_condition(); } catch (const rclcpp::exceptions::RCLError & ex) { throw std::runtime_error( @@ -119,9 +118,8 @@ NodeTopics::add_subscription( } // Notify the executor that a new subscription was created using the parent Node. - auto & node_gc = node_base_->get_notify_guard_condition(); try { - node_gc.trigger(); + node_base_->trigger_notify_guard_condition(); callback_group->trigger_notify_guard_condition(); } catch (const rclcpp::exceptions::RCLError & ex) { throw std::runtime_error( diff --git a/rclcpp/src/rclcpp/node_interfaces/node_waitables.cpp b/rclcpp/src/rclcpp/node_interfaces/node_waitables.cpp index 1d1fe2ce59..02a9de82b0 100644 --- a/rclcpp/src/rclcpp/node_interfaces/node_waitables.cpp +++ b/rclcpp/src/rclcpp/node_interfaces/node_waitables.cpp @@ -42,9 +42,8 @@ NodeWaitables::add_waitable( group->add_waitable(waitable_ptr); // Notify the executor that a new waitable was created using the parent Node. - auto & node_gc = node_base_->get_notify_guard_condition(); try { - node_gc.trigger(); + node_base_->trigger_notify_guard_condition(); group->trigger_notify_guard_condition(); } catch (const rclcpp::exceptions::RCLError & ex) { throw std::runtime_error( diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index d4da759c02..ddb219b558 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -688,6 +688,24 @@ if(TARGET test_static_executor_entities_collector) target_link_libraries(test_static_executor_entities_collector ${PROJECT_NAME} mimick) endif() +ament_add_gtest(test_entities_collector executors/test_entities_collector.cpp + APPEND_LIBRARY_DIRS "${append_library_dirs}" TIMEOUT 120) +if(TARGET test_entities_collector) + ament_target_dependencies(test_entities_collector + "rcl" + "test_msgs") + target_link_libraries(test_entities_collector ${PROJECT_NAME} mimick) +endif() + +ament_add_gtest(test_executor_notify_waitable executors/test_executor_notify_waitable.cpp + APPEND_LIBRARY_DIRS "${append_library_dirs}" TIMEOUT 120) +if(TARGET test_executor_notify_waitable) + ament_target_dependencies(test_executor_notify_waitable + "rcl" + "test_msgs") + target_link_libraries(test_executor_notify_waitable ${PROJECT_NAME} mimick) +endif() + ament_add_gtest(test_guard_condition test_guard_condition.cpp APPEND_LIBRARY_DIRS "${append_library_dirs}") if(TARGET test_guard_condition) diff --git a/rclcpp/test/rclcpp/executors/test_entities_collector.cpp b/rclcpp/test/rclcpp/executors/test_entities_collector.cpp new file mode 100644 index 0000000000..930dc68aff --- /dev/null +++ b/rclcpp/test/rclcpp/executors/test_entities_collector.cpp @@ -0,0 +1,320 @@ +// Copyright 2023 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "rclcpp/executors/executor_notify_waitable.hpp" +#include "rclcpp/rclcpp.hpp" +#include "rclcpp/executors/executor_entities_collector.hpp" + +#include "../../utils/rclcpp_gtest_macros.hpp" + +class TestExecutorEntitiesCollector : public ::testing::Test +{ +public: + void SetUp() + { + rclcpp::init(0, nullptr); + + notify_waitable = std::make_shared(); + entities_collector = std::make_shared( + notify_waitable); + } + + void TearDown() + { + rclcpp::shutdown(); + } + + std::shared_ptr notify_waitable; + std::shared_ptr entities_collector; +}; + +TEST_F(TestExecutorEntitiesCollector, add_remove_node) { + auto node1 = std::make_shared("node1", "ns"); + + // Add a node + EXPECT_NO_THROW(entities_collector->add_node(node1->get_node_base_interface())); + EXPECT_NO_THROW(entities_collector->update_collections()); + + // Remove a node + EXPECT_NO_THROW(entities_collector->remove_node(node1->get_node_base_interface())); + EXPECT_NO_THROW(entities_collector->update_collections()); +} + +TEST_F(TestExecutorEntitiesCollector, add_node_twice) { + auto node1 = std::make_shared("node1", "ns"); + + EXPECT_NO_THROW(entities_collector->add_node(node1->get_node_base_interface())); + + RCLCPP_EXPECT_THROW_EQ( + entities_collector->add_node(node1->get_node_base_interface()), + std::runtime_error("Node '/ns/node1' has already been added to an executor.")); + + EXPECT_NO_THROW(entities_collector->update_collections()); +} + +TEST_F(TestExecutorEntitiesCollector, add_associated_node) { + auto node1 = std::make_shared("node1", "ns"); + + // Simulate node being associated somewhere else + auto & has_executor = node1->get_node_base_interface()->get_associated_with_executor_atomic(); + has_executor.store(true); + + // Add an already-associated node + RCLCPP_EXPECT_THROW_EQ( + entities_collector->remove_node(node1->get_node_base_interface()), + std::runtime_error("Node '/ns/node1' needs to be associated with this executor.")); + + has_executor.store(false); +} + +TEST_F(TestExecutorEntitiesCollector, remove_unassociated_node) { + auto node1 = std::make_shared("node1", "ns"); + + // Add an already-associated node + RCLCPP_EXPECT_THROW_EQ( + entities_collector->remove_node(node1->get_node_base_interface()), + std::runtime_error("Node '/ns/node1' needs to be associated with an executor.")); + + // Simulate node being associated somewhere else + auto & has_executor = node1->get_node_base_interface()->get_associated_with_executor_atomic(); + has_executor.store(true); + + // Add an already-associated node + RCLCPP_EXPECT_THROW_EQ( + entities_collector->remove_node(node1->get_node_base_interface()), + std::runtime_error("Node '/ns/node1' needs to be associated with this executor.")); +} + +TEST_F(TestExecutorEntitiesCollector, add_remove_node_before_update) { + auto notify_waitable = std::make_shared(); + auto entities_collector = rclcpp::executors::ExecutorEntitiesCollector(notify_waitable); + + auto node1 = std::make_shared("node1", "ns"); + auto node2 = std::make_shared("node2", "ns"); + + // Add and remove nodes without running updatenode + EXPECT_NO_THROW(entities_collector.add_node(node1->get_node_base_interface())); + EXPECT_NO_THROW(entities_collector.add_node(node2->get_node_base_interface())); + EXPECT_NO_THROW(entities_collector.remove_node(node1->get_node_base_interface())); + EXPECT_NO_THROW(entities_collector.remove_node(node2->get_node_base_interface())); + EXPECT_NO_THROW(entities_collector.update_collections()); +} + +TEST_F(TestExecutorEntitiesCollector, add_callback_group) { + auto notify_waitable = std::make_shared(); + auto entities_collector = rclcpp::executors::ExecutorEntitiesCollector(notify_waitable); + + auto node = std::make_shared("node1", "ns"); + rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + + // Add a callback group and update + entities_collector.add_callback_group(cb_group); + + ASSERT_EQ(entities_collector.get_all_callback_groups().size(), 0u); + ASSERT_EQ(entities_collector.get_manually_added_callback_groups().size(), 0u); + ASSERT_EQ(entities_collector.get_automatically_added_callback_groups().size(), 0u); + + entities_collector.update_collections(); + + ASSERT_EQ(entities_collector.get_all_callback_groups().size(), 1u); + ASSERT_EQ(entities_collector.get_manually_added_callback_groups().size(), 1u); + ASSERT_EQ(entities_collector.get_automatically_added_callback_groups().size(), 0u); + + // Remove callback group and update + entities_collector.remove_callback_group(cb_group); + + ASSERT_EQ(entities_collector.get_all_callback_groups().size(), 1u); + ASSERT_EQ(entities_collector.get_manually_added_callback_groups().size(), 1u); + ASSERT_EQ(entities_collector.get_automatically_added_callback_groups().size(), 0u); + + entities_collector.update_collections(); + + ASSERT_EQ(entities_collector.get_all_callback_groups().size(), 0u); + ASSERT_EQ(entities_collector.get_manually_added_callback_groups().size(), 0u); + ASSERT_EQ(entities_collector.get_automatically_added_callback_groups().size(), 0u); +} + +TEST_F(TestExecutorEntitiesCollector, add_node_default_callback_group) { + auto notify_waitable = std::make_shared(); + auto entities_collector = rclcpp::executors::ExecutorEntitiesCollector(notify_waitable); + + auto node = std::make_shared("node1", "ns"); + entities_collector.add_node(node->get_node_base_interface()); + + ASSERT_EQ(entities_collector.get_all_callback_groups().size(), 0u); + ASSERT_EQ(entities_collector.get_manually_added_callback_groups().size(), 0u); + ASSERT_EQ(entities_collector.get_automatically_added_callback_groups().size(), 0u); + + entities_collector.update_collections(); + + ASSERT_EQ(entities_collector.get_all_callback_groups().size(), 1u); + ASSERT_EQ(entities_collector.get_manually_added_callback_groups().size(), 0u); + ASSERT_EQ(entities_collector.get_automatically_added_callback_groups().size(), 1u); +} + +TEST_F(TestExecutorEntitiesCollector, add_callback_group_after_add_node) { + auto notify_waitable = std::make_shared(); + auto entities_collector = rclcpp::executors::ExecutorEntitiesCollector(notify_waitable); + + auto node = std::make_shared("node1", "ns"); + rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + + entities_collector.add_node(node->get_node_base_interface()); + + ASSERT_EQ(entities_collector.get_all_callback_groups().size(), 0u); + ASSERT_EQ(entities_collector.get_manually_added_callback_groups().size(), 0u); + ASSERT_EQ(entities_collector.get_automatically_added_callback_groups().size(), 0u); + + entities_collector.update_collections(); + + ASSERT_EQ(entities_collector.get_all_callback_groups().size(), 2u); + ASSERT_EQ(entities_collector.get_manually_added_callback_groups().size(), 0u); + ASSERT_EQ(entities_collector.get_automatically_added_callback_groups().size(), 2u); + + RCLCPP_EXPECT_THROW_EQ( + entities_collector.add_callback_group(cb_group), + std::runtime_error("Callback group has already been added to an executor.")); +} + +TEST_F(TestExecutorEntitiesCollector, add_callback_group_twice) { + auto notify_waitable = std::make_shared(); + auto entities_collector = rclcpp::executors::ExecutorEntitiesCollector(notify_waitable); + + auto node = std::make_shared("node1", "ns"); + rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + + entities_collector.add_callback_group(cb_group); + + ASSERT_EQ(entities_collector.get_all_callback_groups().size(), 0u); + ASSERT_EQ(entities_collector.get_manually_added_callback_groups().size(), 0u); + ASSERT_EQ(entities_collector.get_automatically_added_callback_groups().size(), 0u); + + entities_collector.update_collections(); + + ASSERT_EQ(entities_collector.get_all_callback_groups().size(), 1u); + ASSERT_EQ(entities_collector.get_manually_added_callback_groups().size(), 1u); + ASSERT_EQ(entities_collector.get_automatically_added_callback_groups().size(), 0u); + + cb_group->get_associated_with_executor_atomic().exchange(false); + RCLCPP_EXPECT_THROW_EQ( + entities_collector.add_callback_group(cb_group), + std::runtime_error("Callback group has already been added to this executor.")); +} + +TEST_F(TestExecutorEntitiesCollector, remove_callback_group_after_node) { + auto notify_waitable = std::make_shared(); + auto entities_collector = rclcpp::executors::ExecutorEntitiesCollector(notify_waitable); + + auto node = std::make_shared("node1", "ns"); + rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + + entities_collector.add_callback_group(cb_group); + + ASSERT_EQ(entities_collector.get_all_callback_groups().size(), 0u); + ASSERT_EQ(entities_collector.get_manually_added_callback_groups().size(), 0u); + ASSERT_EQ(entities_collector.get_automatically_added_callback_groups().size(), 0u); + + entities_collector.update_collections(); + + ASSERT_EQ(entities_collector.get_all_callback_groups().size(), 1u); + ASSERT_EQ(entities_collector.get_manually_added_callback_groups().size(), 1u); + ASSERT_EQ(entities_collector.get_automatically_added_callback_groups().size(), 0u); + + node.reset(); + + /** + * TODO(mjcarroll): Assert this when we are enforcing that nodes must be destroyed + * after their created callback groups. + RCLCPP_EXPECT_THROW_EQ( + entities_collector.remove_callback_group(cb_group), + std::runtime_error("Node must not be deleted before its callback group(s).")); + */ + EXPECT_NO_THROW(entities_collector.update_collections()); +} + +TEST_F(TestExecutorEntitiesCollector, remove_callback_group_after_node2) { + auto notify_waitable = std::make_shared(); + auto entities_collector = rclcpp::executors::ExecutorEntitiesCollector(notify_waitable); + + auto node = std::make_shared("node1", "ns"); + rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + + entities_collector.add_callback_group(cb_group); + + ASSERT_EQ(entities_collector.get_all_callback_groups().size(), 0u); + ASSERT_EQ(entities_collector.get_manually_added_callback_groups().size(), 0u); + ASSERT_EQ(entities_collector.get_automatically_added_callback_groups().size(), 0u); + + entities_collector.update_collections(); + + ASSERT_EQ(entities_collector.get_all_callback_groups().size(), 1u); + ASSERT_EQ(entities_collector.get_manually_added_callback_groups().size(), 1u); + ASSERT_EQ(entities_collector.get_automatically_added_callback_groups().size(), 0u); + + EXPECT_NO_THROW(entities_collector.remove_callback_group(cb_group)); + + node.reset(); + + /** + * TODO(mjcarroll): Assert this when we are enforcing that nodes must be destroyed + * after their created callback groups. + RCLCPP_EXPECT_THROW_EQ( + entities_collector.remove_callback_group(cb_group), + std::runtime_error("Node must not be deleted before its callback group(s).")); + */ + EXPECT_NO_THROW(entities_collector.update_collections()); +} + +TEST_F(TestExecutorEntitiesCollector, remove_callback_group_twice) { + auto notify_waitable = std::make_shared(); + auto entities_collector = rclcpp::executors::ExecutorEntitiesCollector(notify_waitable); + + auto node = std::make_shared("node1", "ns"); + rclcpp::CallbackGroup::SharedPtr cb_group = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + + entities_collector.add_callback_group(cb_group); + entities_collector.update_collections(); + + ASSERT_EQ(entities_collector.get_all_callback_groups().size(), 1u); + ASSERT_EQ(entities_collector.get_manually_added_callback_groups().size(), 1u); + ASSERT_EQ(entities_collector.get_automatically_added_callback_groups().size(), 0u); + + entities_collector.remove_callback_group(cb_group); + entities_collector.update_collections(); + + RCLCPP_EXPECT_THROW_EQ( + entities_collector.remove_callback_group(cb_group), + std::runtime_error("Callback group needs to be associated with an executor.")); +} + +TEST_F(TestExecutorEntitiesCollector, remove_node_opposite_order) { + auto notify_waitable = std::make_shared(); + auto entities_collector = rclcpp::executors::ExecutorEntitiesCollector(notify_waitable); + + auto node1 = std::make_shared("node1", "ns"); + EXPECT_NO_THROW(entities_collector.add_node(node1->get_node_base_interface())); + + auto node2 = std::make_shared("node2", "ns"); + EXPECT_NO_THROW(entities_collector.add_node(node2->get_node_base_interface())); + + EXPECT_NO_THROW(entities_collector.remove_node(node2->get_node_base_interface())); +} diff --git a/rclcpp/test/rclcpp/executors/test_executor_notify_waitable.cpp b/rclcpp/test/rclcpp/executors/test_executor_notify_waitable.cpp new file mode 100644 index 0000000000..ab7f730a2e --- /dev/null +++ b/rclcpp/test/rclcpp/executors/test_executor_notify_waitable.cpp @@ -0,0 +1,97 @@ +// Copyright 2023 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include + +#include "rclcpp/executors/single_threaded_executor.hpp" +#include "rclcpp/rclcpp.hpp" + +#include "rcpputils/scope_exit.hpp" + +#include "rclcpp/executors/executor_notify_waitable.hpp" + +#include "../../utils/rclcpp_gtest_macros.hpp" + + +class TestExecutorNotifyWaitable : public ::testing::Test +{ +public: + void SetUp() + { + rclcpp::init(0, nullptr); + } + + void TearDown() + { + rclcpp::shutdown(); + } +}; + +TEST_F(TestExecutorNotifyWaitable, construct_destruct) { + { + auto waitable = std::make_shared(); + waitable.reset(); + } + { + auto on_execute_callback = []() {}; + auto waitable = + std::make_shared(on_execute_callback); + waitable.reset(); + } +} + +TEST_F(TestExecutorNotifyWaitable, add_remove_guard_conditions) { + auto on_execute_callback = []() {}; + auto waitable = + std::make_shared(on_execute_callback); + + auto node = std::make_shared("my_node", "/ns"); + auto notify_guard_condition = + + node->get_node_base_interface()->get_shared_notify_guard_condition(); + EXPECT_NO_THROW(waitable->add_guard_condition(notify_guard_condition)); + EXPECT_NO_THROW(waitable->remove_guard_condition(notify_guard_condition)); +} + +TEST_F(TestExecutorNotifyWaitable, wait) { + int on_execute_calls = 0; + auto on_execute_callback = [&on_execute_calls]() {on_execute_calls++;}; + + auto waitable = + std::make_shared(on_execute_callback); + + auto node = std::make_shared("my_node", "/ns"); + auto notify_guard_condition = + node->get_node_base_interface()->get_shared_notify_guard_condition(); + EXPECT_NO_THROW(waitable->add_guard_condition(notify_guard_condition)); + + auto default_cbg = node->get_node_base_interface()->get_default_callback_group(); + ASSERT_NE(nullptr, default_cbg->get_notify_guard_condition()); + + auto waitables = node->get_node_waitables_interface(); + waitables->add_waitable(std::static_pointer_cast(waitable), default_cbg); + + rclcpp::executors::SingleThreadedExecutor executor; + executor.add_node(node); + executor.spin_all(std::chrono::seconds(1)); + EXPECT_EQ(1u, on_execute_calls); + + // on_execute_callback doesn't change if the topology doesn't change + executor.spin_all(std::chrono::seconds(1)); + EXPECT_EQ(1u, on_execute_calls); +}