diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 6979d980f8..324a53c518 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -54,6 +54,9 @@ set(${PROJECT_NAME}_SRCS src/rclcpp/executable_list.cpp src/rclcpp/executor.cpp src/rclcpp/executors.cpp + src/rclcpp/executors/executor_notify_waitable.cpp + src/rclcpp/executors/executor_entities_collector.cpp + src/rclcpp/executors/executor_entities_collection.cpp src/rclcpp/executors/multi_threaded_executor.cpp src/rclcpp/executors/single_threaded_executor.cpp src/rclcpp/executors/static_executor_entities_collector.cpp diff --git a/rclcpp/include/rclcpp/executors/executor_entities_collection.hpp b/rclcpp/include/rclcpp/executors/executor_entities_collection.hpp new file mode 100644 index 0000000000..b1aa47a085 --- /dev/null +++ b/rclcpp/include/rclcpp/executors/executor_entities_collection.hpp @@ -0,0 +1,126 @@ +// Copyright 2023 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXECUTORS__EXECUTOR_ENTITIES_COLLECTION_HPP_ +#define RCLCPP__EXECUTORS__EXECUTOR_ENTITIES_COLLECTION_HPP_ + +#include +#include +#include + +#include "rcpputils/thread_safety_annotations.hpp" + +#include +#include +#include +#include +#include +#include +#include + +namespace rclcpp +{ +namespace executors +{ + +struct ExecutorEntitiesCollection +{ + struct TimerEntry { + rclcpp::TimerBase::WeakPtr timer; + rclcpp::CallbackGroup::WeakPtr callback_group; + }; + using TimerCollection = std::unordered_map; + + struct SubscriptionEntry{ + rclcpp::SubscriptionBase::WeakPtr subscription; + rclcpp::CallbackGroup::WeakPtr callback_group; + }; + using SubscriptionCollection = std::unordered_map; + + struct ClientEntry { + rclcpp::ClientBase::WeakPtr client; + rclcpp::CallbackGroup::WeakPtr callback_group; + }; + using ClientCollection = std::unordered_map; + + struct ServiceEntry { + rclcpp::ServiceBase::WeakPtr service; + rclcpp::CallbackGroup::WeakPtr callback_group; + }; + using ServiceCollection = std::unordered_map; + + struct WaitableEntry { + rclcpp::Waitable::WeakPtr waitable; + rclcpp::CallbackGroup::WeakPtr callback_group; + }; + using WaitableCollection = std::unordered_map; + + using GuardConditionCollection = std::unordered_map; + + TimerCollection timers; + SubscriptionCollection subscriptions; + ClientCollection clients; + ServiceCollection services; + GuardConditionCollection guard_conditions; + WaitableCollection waitables; + + void clear(); + + using TimerUpdatedFunc = std::function; + void update_timers(const ExecutorEntitiesCollection::TimerCollection & other, + TimerUpdatedFunc timer_added, + TimerUpdatedFunc timer_removed); + + using SubscriptionUpdatedFunc = std::function; + void update_subscriptions(const ExecutorEntitiesCollection::SubscriptionCollection & other, + SubscriptionUpdatedFunc subscription_added, + SubscriptionUpdatedFunc subscription_removed); + + using ClientUpdatedFunc = std::function; + void update_clients(const ExecutorEntitiesCollection::ClientCollection & other, + ClientUpdatedFunc client_added, + ClientUpdatedFunc client_removed); + + using ServiceUpdatedFunc = std::function; + void update_services(const ExecutorEntitiesCollection::ServiceCollection & other, + ServiceUpdatedFunc service_added, + ServiceUpdatedFunc service_removed); + + using GuardConditionUpdatedFunc = std::function; + void update_guard_conditions(const ExecutorEntitiesCollection::GuardConditionCollection & other, + GuardConditionUpdatedFunc guard_condition_added, + GuardConditionUpdatedFunc guard_condition_removed); + + using WaitableUpdatedFunc = std::function; + void update_waitables(const ExecutorEntitiesCollection::WaitableCollection & other, + WaitableUpdatedFunc waitable_added, + WaitableUpdatedFunc waitable_removed); + }; + +void +build_entities_collection( + const std::vector & callback_groups, + ExecutorEntitiesCollection & collection); + +std::deque +ready_executables( + const ExecutorEntitiesCollection & collection, + rclcpp::WaitResult & wait_result +); + +} // namespace executors +} // namespace rclcpp + +#endif // RCLCPP__EXECUTORS__EXECUTOR_ENTITIES_COLLECTION_HPP_ diff --git a/rclcpp/include/rclcpp/executors/executor_entities_collector.hpp b/rclcpp/include/rclcpp/executors/executor_entities_collector.hpp new file mode 100644 index 0000000000..f26366022c --- /dev/null +++ b/rclcpp/include/rclcpp/executors/executor_entities_collector.hpp @@ -0,0 +1,149 @@ +// Copyright 2023 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXECUTORS__EXECUTOR_ENTITIES_COLLECTOR_HPP_ +#define RCLCPP__EXECUTORS__EXECUTOR_ENTITIES_COLLECTOR_HPP_ + +#include +#include +#include +#include + +#include "rcpputils/thread_safety_annotations.hpp" + +#include +#include +#include +#include +#include +#include +#include + +namespace rclcpp +{ +namespace executors +{ + +class ExecutorEntitiesCollector +{ +public: + RCLCPP_PUBLIC + ExecutorEntitiesCollector(); + + RCLCPP_PUBLIC + ~ExecutorEntitiesCollector(); + + /// Add a node to the entity collector + /** + * \param[in] node_ptr a shared pointer that points to a node base interface + * \throw std::runtime_error if the node is associated with an executor + */ + RCLCPP_PUBLIC + void + add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr); + + /// Remove a node from the entity collector + /** + * \param[in] node_ptr a shared pointer that points to a node base interface + * \throw std::runtime_error if the node is associated with an executor + * \throw std::runtime_error if the node is associated with this executor + */ + RCLCPP_PUBLIC + void + remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr); + + RCLCPP_PUBLIC + bool + has_node(const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr); + + /// Add a callback group to the entity collector + /** + * \param[in] group_ptr a shared pointer that points to a callback group + * \throw std::runtime_error if the callback_group is associated with an executor + */ + RCLCPP_PUBLIC + void + add_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr); + + /// Remove a callback group from the entity collector + /** + * \param[in] group_ptr a shared pointer that points to a callback group + * \throw std::runtime_error if the callback_group is not associated with an executor + * \throw std::runtime_error if the callback_group is not associated with this executor + */ + RCLCPP_PUBLIC + void + remove_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr); + + RCLCPP_PUBLIC + std::vector + get_all_callback_groups(); + + RCLCPP_PUBLIC + std::vector + get_manually_added_callback_groups(); + + RCLCPP_PUBLIC + std::vector + get_automatically_added_callback_groups(); + + RCLCPP_PUBLIC + void + update_collections(); + + RCLCPP_PUBLIC + std::shared_ptr + get_executor_notify_waitable(); + +protected: + using CallbackGroupCollection = std::set< + rclcpp::CallbackGroup::WeakPtr, + std::owner_less>; + + RCLCPP_PUBLIC + void + add_callback_group_to_collection(rclcpp::CallbackGroup::SharedPtr group_ptr, + CallbackGroupCollection & collection) RCPPUTILS_TSA_REQUIRES(mutex_); + + RCLCPP_PUBLIC + void + remove_callback_group_from_collection(rclcpp::CallbackGroup::SharedPtr group_ptr, + CallbackGroupCollection & collection) RCPPUTILS_TSA_REQUIRES(mutex_); + + RCLCPP_PUBLIC + void + add_automatically_associated_callback_groups() RCPPUTILS_TSA_REQUIRES(mutex_); + + RCLCPP_PUBLIC + void + prune_invalid_nodes_and_groups() RCPPUTILS_TSA_REQUIRES(mutex_); + + mutable std::mutex mutex_; + + CallbackGroupCollection + manually_added_groups_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + + CallbackGroupCollection + automatically_added_groups_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + + /// nodes that are associated with the executor + std::list + weak_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_); + + std::shared_ptr notify_waitable_ RCPPUTILS_TSA_GUARDED_BY(mutex_); +}; +} // namespace executors +} // namespace rclcpp + +#endif // RCLCPP__EXECUTORS__EXECUTOR_ENTITIES_COLLECTOR_HPP_ diff --git a/rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp b/rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp new file mode 100644 index 0000000000..474e1521f1 --- /dev/null +++ b/rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp @@ -0,0 +1,73 @@ +// Copyright 2023 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXECUTORS__EXECUTOR_NOTIFY_WAITABLE_HPP_ +#define RCLCPP__EXECUTORS__EXECUTOR_NOTIFY_WAITABLE_HPP_ + +#include +#include + +#include "rclcpp/guard_condition.hpp" +#include "rclcpp/waitable.hpp" + +namespace rclcpp +{ +namespace executors +{ + +class ExecutorNotifyWaitable: rclcpp::Waitable +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(ExecutorNotifyWaitable) + + // Constructor + RCLCPP_PUBLIC + ExecutorNotifyWaitable() = default; + + // Destructor + RCLCPP_PUBLIC + ~ExecutorNotifyWaitable() override = default; + + RCLCPP_PUBLIC + void + add_to_wait_set(rcl_wait_set_t * wait_set) override; + + RCLCPP_PUBLIC + bool + is_ready(rcl_wait_set_t * wait_set) override; + + RCLCPP_PUBLIC + void + execute(std::shared_ptr & data) override; + + RCLCPP_PUBLIC + std::shared_ptr + take_data() override; + + RCLCPP_PUBLIC + void + add_guard_condition(rclcpp::GuardCondition * guard_condition); + + RCLCPP_PUBLIC + void + remove_guard_condition(rclcpp::GuardCondition * guard_condition); + +private: + std::list notify_guard_conditions_; +}; + +} // namespace executors +} // namespace rclcpp + +#endif // RCLCPP__EXECUTORS__EXECUTOR_NOTIFY_WAITABLE_HPP_ diff --git a/rclcpp/src/rclcpp/executors/executor_entities_collection.cpp b/rclcpp/src/rclcpp/executors/executor_entities_collection.cpp new file mode 100644 index 0000000000..4963a0439b --- /dev/null +++ b/rclcpp/src/rclcpp/executors/executor_entities_collection.cpp @@ -0,0 +1,357 @@ +// Copyright 2023 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rclcpp/executors/executor_entities_collection.hpp" + +namespace rclcpp +{ +namespace executors +{ + +void ExecutorEntitiesCollection::clear() +{ + subscriptions.clear(); + timers.clear(); + guard_conditions.clear(); + clients.clear(); + services.clear(); + waitables.clear(); +} + +void ExecutorEntitiesCollection::update_timers( + const ExecutorEntitiesCollection::TimerCollection & other, + TimerUpdatedFunc timer_added, + TimerUpdatedFunc timer_removed) +{ + for (auto it = this->timers.begin(); it != this->timers.end();) + { + if (other.count(it->first) == 0) { + auto timer = it->second.timer.lock(); + if (timer) + timer_removed(timer); + it = this->timers.erase(it); + } else { + ++it; + } + } + for (auto it = other.begin(); it != other.end(); ++it) + { + if (this->timers.count(it->first) == 0) { + auto timer = it->second.timer.lock(); + if (timer) + timer_added(timer); + this->timers.insert(*it); + } + } +} + +void ExecutorEntitiesCollection::update_subscriptions( + const ExecutorEntitiesCollection::SubscriptionCollection & other, + SubscriptionUpdatedFunc subscription_added, + SubscriptionUpdatedFunc subscription_removed) +{ + for (auto it = this->subscriptions.begin(); it != this->subscriptions.end();) + { + if (other.count(it->first) == 0) { + auto subscription = it->second.subscription.lock(); + if (subscription) + { + subscription_removed(subscription); + } + it = this->subscriptions.erase(it); + } else { + ++it; + } + } + for (auto it = other.begin(); it != other.end(); ++it) + { + if (this->subscriptions.count(it->first) == 0) { + auto subscription = it->second.subscription.lock(); + if (subscription) + { + subscription_added(subscription); + } + this->subscriptions.insert(*it); + } + } +} + +void ExecutorEntitiesCollection::update_clients( + const ExecutorEntitiesCollection::ClientCollection & other, + ClientUpdatedFunc client_added, + ClientUpdatedFunc client_removed) +{ + for (auto it = this->clients.begin(); it != this->clients.end();) + { + if (other.count(it->first) == 0) { + auto client = it->second.client.lock(); + if (client) + client_removed(client); + it = this->clients.erase(it); + } else { + ++it; + } + } + for (auto it = other.begin(); it != other.end(); ++it) + { + if (this->clients.count(it->first) == 0) { + auto client = it->second.client.lock(); + if (client) + client_added(client); + this->clients.insert(*it); + } + } +} + +void ExecutorEntitiesCollection::update_services( + const ExecutorEntitiesCollection::ServiceCollection & other, + ServiceUpdatedFunc service_added, + ServiceUpdatedFunc service_removed) +{ + for (auto it = this->services.begin(); it != this->services.end();) + { + if (other.count(it->first) == 0) { + auto service = it->second.service.lock(); + if (service) + service_removed(service); + it = this->services.erase(it); + } else { + ++it; + } + } + for (auto it = other.begin(); it != other.end(); ++it) + { + if (this->services.count(it->first) == 0) { + auto service = it->second.service.lock(); + if (service) + service_added(service); + this->services.insert(*it); + } + } +} + +void ExecutorEntitiesCollection::update_guard_conditions( + const ExecutorEntitiesCollection::GuardConditionCollection & other, + GuardConditionUpdatedFunc guard_condition_added, + GuardConditionUpdatedFunc guard_condition_removed) +{ + for (auto it = this->guard_conditions.begin(); it != this->guard_conditions.end();) + { + if (other.count(it->first) == 0) { + auto guard_condition = it->second.lock(); + if (guard_condition) + guard_condition_removed(guard_condition); + it = this->guard_conditions.erase(it); + } else { + ++it; + } + } + for (auto it = other.begin(); it != other.end(); ++it) + { + if (this->guard_conditions.count(it->first) == 0) { + auto guard_condition = it->second.lock(); + if (guard_condition) + guard_condition_added(guard_condition); + this->guard_conditions.insert(*it); + } + } +} + +void ExecutorEntitiesCollection::update_waitables( + const ExecutorEntitiesCollection::WaitableCollection & other, + WaitableUpdatedFunc waitable_added, + WaitableUpdatedFunc waitable_removed) +{ + for (auto it = this->waitables.begin(); it != this->waitables.end();) + { + if (other.count(it->first) == 0) { + auto waitable = it->second.waitable.lock(); + if (waitable) + waitable_removed(waitable); + it = this->waitables.erase(it); + } else { + ++it; + } + } + for (auto it = other.begin(); it != other.end(); ++it) + { + if (this->waitables.count(it->first) == 0) { + auto waitable = it->second.waitable.lock(); + if (waitable) + waitable_added(waitable); + this->waitables.insert({it->first, it->second}); + } + } +} + + +void +build_entities_collection( + const std::vector & callback_groups, + ExecutorEntitiesCollection & collection) +{ + collection.clear(); + + for (auto weak_group_ptr : callback_groups) + { + auto group_ptr = weak_group_ptr.lock(); + if (!group_ptr) + { + continue; + } + + group_ptr->collect_all_ptrs( + [&collection, weak_group_ptr](const rclcpp::SubscriptionBase::SharedPtr & subscription) { + collection.subscriptions.insert({ + subscription->get_subscription_handle().get(), + {subscription, weak_group_ptr} + }); + }, + [&collection, weak_group_ptr](const rclcpp::ServiceBase::SharedPtr & service) { + collection.services.insert({ + service->get_service_handle().get(), + {service, weak_group_ptr} + }); + }, + [&collection, weak_group_ptr](const rclcpp::ClientBase::SharedPtr & client) { + collection.clients.insert({ + client->get_client_handle().get(), + {client, weak_group_ptr} + }); + }, + [&collection, weak_group_ptr](const rclcpp::TimerBase::SharedPtr & timer) { + collection.timers.insert({ + timer->get_timer_handle().get(), + {timer, weak_group_ptr} + }); + }, + [&collection, weak_group_ptr](const rclcpp::Waitable::SharedPtr & waitable) { + collection.waitables.insert({ + waitable.get(), + {waitable, weak_group_ptr} + }); + } + ); + } +} + +std::deque +ready_executables( + const ExecutorEntitiesCollection & collection, + rclcpp::WaitResult & wait_result +) +{ + std::deque ret; + + if (wait_result.kind() != rclcpp::WaitResultKind::Ready) + { + return ret; + } + + auto rcl_wait_set = wait_result.get_wait_set().get_rcl_wait_set(); + + for (size_t ii = 0; ii < rcl_wait_set.size_of_timers; ++ii) { + if (rcl_wait_set.timers[ii]) { + auto timer_iter = collection.timers.find(rcl_wait_set.timers[ii]); + if (timer_iter != collection.timers.end()) + { + auto timer = timer_iter->second.timer.lock(); + auto group = timer_iter->second.callback_group.lock(); + if (!timer || !group || !group->can_be_taken_from().load()) + continue; + + if (!timer->call()) + continue; + + rclcpp::AnyExecutable exec; + exec.timer = timer; + exec.callback_group = group; + ret.push_back(exec); + } + } + } + + for (size_t ii = 0; ii < rcl_wait_set.size_of_subscriptions; ++ii) { + if (rcl_wait_set.subscriptions[ii]) { + auto subscription_iter = collection.subscriptions.find(rcl_wait_set.subscriptions[ii]); + if (subscription_iter != collection.subscriptions.end()) + { + auto subscription = subscription_iter->second.subscription.lock(); + auto group = subscription_iter->second.callback_group.lock(); + if (!subscription || !group || !group->can_be_taken_from().load()) + continue; + + rclcpp::AnyExecutable exec; + exec.subscription = subscription; + exec.callback_group = group; + ret.push_back(exec); + } + } + } + + for (size_t ii = 0; ii < rcl_wait_set.size_of_services; ++ii) { + if (rcl_wait_set.services[ii]) { + auto service_iter = collection.services.find(rcl_wait_set.services[ii]); + if (service_iter != collection.services.end()) + { + auto service = service_iter->second.service.lock(); + auto group = service_iter->second.callback_group.lock(); + if (!service || !group || !group->can_be_taken_from().load()) + continue; + + rclcpp::AnyExecutable exec; + exec.service = service; + exec.callback_group = group; + ret.push_back(exec); + } + } + } + + for (size_t ii = 0; ii < rcl_wait_set.size_of_clients; ++ii) { + if (rcl_wait_set.clients[ii]) { + auto client_iter = collection.clients.find(rcl_wait_set.clients[ii]); + if (client_iter != collection.clients.end()) + { + auto client = client_iter->second.client.lock(); + auto group = client_iter->second.callback_group.lock(); + if (!client || !group || !group->can_be_taken_from().load()) + continue; + + rclcpp::AnyExecutable exec; + exec.client = client; + exec.callback_group = group; + ret.push_back(exec); + } + } + } + + for (auto & [handle, entry] : collection.waitables) + { + auto waitable = entry.waitable.lock(); + if (waitable && waitable->is_ready(&rcl_wait_set)) { + auto group = entry.callback_group.lock(); + if (!group || !group->can_be_taken_from().load()) + continue; + + rclcpp::AnyExecutable exec; + exec.waitable = waitable; + exec.callback_group = group; + ret.push_back(exec); + } + } + return ret; +} + +} // namespace executors +} // namespace rclcpp diff --git a/rclcpp/src/rclcpp/executors/executor_entities_collector.cpp b/rclcpp/src/rclcpp/executors/executor_entities_collector.cpp new file mode 100644 index 0000000000..5e4ef9e6b1 --- /dev/null +++ b/rclcpp/src/rclcpp/executors/executor_entities_collector.cpp @@ -0,0 +1,283 @@ +// Copyright 2023 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "rclcpp/executors/executor_entities_collector.hpp" +#include "rclcpp/executors/executor_notify_waitable.hpp" + +namespace rclcpp +{ +namespace executors +{ + +ExecutorEntitiesCollector::ExecutorEntitiesCollector(): + notify_waitable_(std::make_shared()) +{ +} + +ExecutorEntitiesCollector::~ExecutorEntitiesCollector() +{ + std::lock_guard guard{mutex_}; + + // Disassociate each node from this executor collector + std::for_each( + weak_nodes_.begin(), weak_nodes_.end(), [] + (rclcpp::node_interfaces::NodeBaseInterface::WeakPtr weak_node_ptr) { + auto shared_node_ptr = weak_node_ptr.lock(); + if (shared_node_ptr) { + std::atomic_bool & has_executor = shared_node_ptr->get_associated_with_executor_atomic(); + has_executor.store(false); + } + }); + weak_nodes_.clear(); + + // Disassociate each automatically-added callback group from this executor collector + std::for_each( + automatically_added_groups_.begin(), automatically_added_groups_.end(), [] + (rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + auto group_ptr = weak_group_ptr.lock(); + if (group_ptr) { + std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic(); + has_executor.store(false); + } + }); + automatically_added_groups_.clear(); + + // Disassociate each manually-added callback group from this executor collector + std::for_each( + manually_added_groups_.begin(), manually_added_groups_.end(), [] + (rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + auto group_ptr = weak_group_ptr.lock(); + if (group_ptr) { + std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic(); + has_executor.store(false); + } + }); + manually_added_groups_.clear(); +} + +void +ExecutorEntitiesCollector::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr) +{ + std::lock_guard guard{mutex_}; + // If the node already has an executor + std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic(); + if (has_executor.exchange(true)) { + throw std::runtime_error( + std::string("Node '") + node_ptr->get_fully_qualified_name() + + "' has already been added to an executor."); + } + weak_nodes_.push_back(node_ptr); + this->notify_waitable_->add_guard_condition(&node_ptr->get_notify_guard_condition()); + this->add_automatically_associated_callback_groups(); +} + +void +ExecutorEntitiesCollector::remove_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr) +{ + std::lock_guard guard{mutex_}; + if (!node_ptr->get_associated_with_executor_atomic().load()) { + throw std::runtime_error("Node needs to be associated with an executor."); + } + + bool found_node = false; + auto node_it = weak_nodes_.begin(); + while (node_it != weak_nodes_.end()) { + bool matched = (node_it->lock() == node_ptr); + if (matched) { + found_node = true; + node_it = weak_nodes_.erase(node_it); + } else { + ++node_it; + } + } + if (!found_node) { + throw std::runtime_error("Node needs to be associated with this executor."); + } + + for (auto group_it = automatically_added_groups_.begin(); + group_it != automatically_added_groups_.end();) { + auto group_ptr = group_it->lock(); + if (node_ptr->callback_group_in_node(group_ptr)){ + std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic(); + has_executor.store(false); + group_it = automatically_added_groups_.erase(group_it); + } else { + group_it++; + } + } + + this->notify_waitable_->remove_guard_condition(&node_ptr->get_notify_guard_condition()); + std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic(); + has_executor.store(false); +} + +void +ExecutorEntitiesCollector::add_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr) +{ + std::lock_guard guard{mutex_}; + this->add_callback_group_to_collection(group_ptr, manually_added_groups_); +} + +void +ExecutorEntitiesCollector::remove_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr) +{ + std::lock_guard guard{mutex_}; + this->remove_callback_group_from_collection(group_ptr, manually_added_groups_); +} + +std::vector +ExecutorEntitiesCollector::get_all_callback_groups() +{ + std::vector groups; + std::lock_guard guard{mutex_}; + + this->update_collections(); + + for (const auto & group_ptr : manually_added_groups_) { + groups.push_back(group_ptr); + } + for (auto const & group_ptr : automatically_added_groups_) { + groups.push_back(group_ptr); + } + return groups; +} + +std::vector +ExecutorEntitiesCollector::get_manually_added_callback_groups() +{ + std::vector groups; + std::lock_guard guard{mutex_}; + for (const auto & group_ptr : manually_added_groups_) { + groups.push_back(group_ptr); + } + return groups; +} + +std::vector +ExecutorEntitiesCollector::get_automatically_added_callback_groups() +{ + std::vector groups; + std::lock_guard guard{mutex_}; + for (auto const & group_ptr : automatically_added_groups_) { + groups.push_back(group_ptr); + } + return groups; +} + +void +ExecutorEntitiesCollector::update_collections(){ + this->add_automatically_associated_callback_groups(); + this->prune_invalid_nodes_and_groups(); +} + +std::shared_ptr +ExecutorEntitiesCollector::get_executor_notify_waitable() +{ + return this->notify_waitable_; +} + +void +ExecutorEntitiesCollector::add_callback_group_to_collection( + rclcpp::CallbackGroup::SharedPtr group_ptr, + CallbackGroupCollection & collection) +{ + std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic(); + if (has_executor.exchange(true)) { + throw std::runtime_error("Callback group has already been added to an executor."); + } + auto iter = collection.insert(group_ptr); + if (iter.second == false) { + throw std::runtime_error("Callback group has already been added to this executor."); + } + this->notify_waitable_->add_guard_condition(group_ptr->get_notify_guard_condition().get()); +} + +void +ExecutorEntitiesCollector::remove_callback_group_from_collection( + rclcpp::CallbackGroup::SharedPtr group_ptr, + CallbackGroupCollection & collection) +{ + rclcpp::CallbackGroup::WeakPtr weak_group_ptr(group_ptr); + auto iter = collection.find(weak_group_ptr); + if (iter != collection.end()) { + // Check that the group hasn't been orphaned from it's respective node + if (!group_ptr->has_valid_node()) { + throw std::runtime_error("Node must not be deleted before its callback group(s)."); + } + + // Disassociate the callback group with the executor + std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic(); + has_executor.store(false); + collection.erase(iter); + this->notify_waitable_->remove_guard_condition(group_ptr->get_notify_guard_condition().get()); + } else { + throw std::runtime_error("Callback group needs to be associated with executor."); + } +} + +void +ExecutorEntitiesCollector::add_automatically_associated_callback_groups() +{ + for (auto & weak_node : weak_nodes_) { + auto node = weak_node.lock(); + if (node) { + node->for_each_callback_group( + [this, node](rclcpp::CallbackGroup::SharedPtr group_ptr) + { + if (!group_ptr->get_associated_with_executor_atomic().load() && + group_ptr->automatically_add_to_executor_with_node()) + { + this->add_callback_group_to_collection(group_ptr, this->automatically_added_groups_); + } + }); + } + } +} + +void +ExecutorEntitiesCollector::prune_invalid_nodes_and_groups() +{ + for (auto node_it = weak_nodes_.begin(); + node_it != weak_nodes_.end();) + { + if (node_it->expired()) { + node_it = weak_nodes_.erase(node_it); + } else { + node_it++; + } + } + + for (auto group_it = automatically_added_groups_.begin(); + group_it != automatically_added_groups_.end();) { + if (group_it->expired()) { + group_it = automatically_added_groups_.erase(group_it); + } else { + group_it++; + } + } + for (auto group_it = manually_added_groups_.begin(); + group_it != manually_added_groups_.end();) { + if (group_it->expired()) { + group_it = manually_added_groups_.erase(group_it); + } else { + group_it++; + } + } +} + +} // namespace executors +} // namespace rclcpp diff --git a/rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp b/rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp new file mode 100644 index 0000000000..ae9f7767ea --- /dev/null +++ b/rclcpp/src/rclcpp/executors/executor_notify_waitable.cpp @@ -0,0 +1,88 @@ +// Copyright 2023 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rclcpp/executors/executor_notify_waitable.hpp" +#include "rclcpp/detail/add_guard_condition_to_rcl_wait_set.hpp" + +namespace rclcpp +{ +namespace executors +{ + +void +ExecutorNotifyWaitable::add_to_wait_set(rcl_wait_set_t * wait_set) +{ + for (auto guard_condition : this->notify_guard_conditions_) { + detail::add_guard_condition_to_rcl_wait_set(*wait_set, *guard_condition); + } +} + +bool +ExecutorNotifyWaitable::is_ready(rcl_wait_set_t * wait_set) +{ + for (size_t ii = 0; ii < wait_set->size_of_guard_conditions; ++ii) + { + auto rcl_guard_condition = wait_set->guard_conditions[ii]; + + if (nullptr == rcl_guard_condition) { + continue; + } + + for (auto guard_condition : this->notify_guard_conditions_) { + if (&guard_condition->get_rcl_guard_condition() == rcl_guard_condition) { + return true; + } + } + } + + return false; +} + +void +ExecutorNotifyWaitable::execute(std::shared_ptr & data) +{ + (void) data; +} + +std::shared_ptr +ExecutorNotifyWaitable::take_data() +{ + return nullptr; +} + +void +ExecutorNotifyWaitable::add_guard_condition(rclcpp::GuardCondition * guard_condition) +{ + for (const auto & existing_guard_condition : notify_guard_conditions_) + { + if (existing_guard_condition == guard_condition) { + return; + } + } + notify_guard_conditions_.push_back(guard_condition); +} + +void +ExecutorNotifyWaitable::remove_guard_condition(rclcpp::GuardCondition * guard_condition) +{ + for (auto it = notify_guard_conditions_.begin(); it != notify_guard_conditions_.end(); ++it) { + if (*it == guard_condition) { + notify_guard_conditions_.erase(it); + break; + } + } +} + +} // namespace executors +} // namespace rclcpp