diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 6979d980f8..6d58d0b4ea 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -51,6 +51,9 @@ set(${PROJECT_NAME}_SRCS src/rclcpp/duration.cpp src/rclcpp/event.cpp src/rclcpp/exceptions/exceptions.cpp + src/rclcpp/experimental/executors/events_executor/events_executor_entities_collector.cpp + src/rclcpp/experimental/executors/events_executor/events_executor.cpp + src/rclcpp/experimental/timers_manager.cpp src/rclcpp/executable_list.cpp src/rclcpp/executor.cpp src/rclcpp/executors.cpp diff --git a/rclcpp/include/rclcpp/executors.hpp b/rclcpp/include/rclcpp/executors.hpp index 36fb0d63cf..f7ff06055f 100644 --- a/rclcpp/include/rclcpp/executors.hpp +++ b/rclcpp/include/rclcpp/executors.hpp @@ -21,6 +21,7 @@ #include "rclcpp/executors/multi_threaded_executor.hpp" #include "rclcpp/executors/single_threaded_executor.hpp" #include "rclcpp/executors/static_single_threaded_executor.hpp" +#include "rclcpp/experimental/executors/events_executor/events_executor.hpp" #include "rclcpp/node.hpp" #include "rclcpp/utilities.hpp" #include "rclcpp/visibility_control.hpp" diff --git a/rclcpp/include/rclcpp/experimental/executors/events_executor/event_waitable.hpp b/rclcpp/include/rclcpp/experimental/executors/events_executor/event_waitable.hpp new file mode 100644 index 0000000000..a9336762e7 --- /dev/null +++ b/rclcpp/include/rclcpp/experimental/executors/events_executor/event_waitable.hpp @@ -0,0 +1,70 @@ +// Copyright 2023 iRobot Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENT_WAITABLE_HPP_ +#define RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENT_WAITABLE_HPP_ + +#include "rclcpp/waitable.hpp" + +namespace rclcpp +{ +namespace experimental +{ +namespace executors +{ + +/** + * @brief This class provides a wrapper around the waitable object, that is + * meant to be used with the EventsExecutor. + * The waitset related methods are stubbed out as they should not be called. + * This class is abstract as the execute method of rclcpp::Waitable is not implemented. + * Nodes who want to implement a custom EventWaitable, can derive from this class and override + * the execute method. + */ +class EventWaitable : public rclcpp::Waitable +{ +public: + // Constructor + RCLCPP_PUBLIC + EventWaitable() = default; + + // Destructor + RCLCPP_PUBLIC + virtual ~EventWaitable() = default; + + // Stub API: not used by EventsExecutor + RCLCPP_PUBLIC + bool + is_ready(rcl_wait_set_t * wait_set) final + { + (void)wait_set; + throw std::runtime_error("EventWaitable can't be checked if it's ready"); + return false; + } + + // Stub API: not used by EventsExecutor + RCLCPP_PUBLIC + void + add_to_wait_set(rcl_wait_set_t * wait_set) final + { + (void)wait_set; + throw std::runtime_error("EventWaitable can't be added to a wait_set"); + } +}; + +} // namespace executors +} // namespace experimental +} // namespace rclcpp + +#endif // RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENT_WAITABLE_HPP_ diff --git a/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor.hpp b/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor.hpp new file mode 100644 index 0000000000..6e79accbd1 --- /dev/null +++ b/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor.hpp @@ -0,0 +1,237 @@ +// Copyright 2023 iRobot Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENTS_EXECUTOR_HPP_ +#define RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENTS_EXECUTOR_HPP_ + +#include +#include +#include +#include + +#include "rclcpp/executor.hpp" +#include "rclcpp/experimental/executors/events_executor/events_executor_entities_collector.hpp" +#include "rclcpp/experimental/executors/events_executor/events_executor_event_types.hpp" +#include "rclcpp/experimental/executors/events_executor/events_executor_notify_waitable.hpp" +#include "rclcpp/experimental/executors/events_executor/events_queue.hpp" +#include "rclcpp/experimental/executors/events_executor/simple_events_queue.hpp" +#include "rclcpp/experimental/timers_manager.hpp" +#include "rclcpp/node.hpp" + +namespace rclcpp +{ +namespace experimental +{ +namespace executors +{ + +/// Events executor implementation +/** + * This executor uses an events queue and a timers manager to execute entities from its + * associated nodes and callback groups. + * The RMW listener APIs are used to collect new events. + * + * This executor tries to reduce as much as possible the amount of maintenance operations. + * This allows to use customized `EventsQueue` classes to achieve different goals such + * as very low CPU usage, bounded memory requirement, determinism, etc. + * + * The executor uses a weak ownership model and it locks entities only while executing + * their related events. + * + * To run this executor: + * rclcpp::experimental::executors::EventsExecutor executor; + * executor.add_node(node); + * executor.spin(); + * executor.remove_node(node); + */ +class EventsExecutor : public rclcpp::Executor +{ + friend class EventsExecutorEntitiesCollector; + +public: + RCLCPP_SMART_PTR_DEFINITIONS(EventsExecutor) + + /// Default constructor. See the default constructor for Executor. + /** + * \param[in] events_queue The queue used to store events. + * \param[in] execute_timers_separate_thread If true, timers are executed in a separate + * thread. If false, timers are executed in the same thread as all other entities. + * \param[in] options Options used to configure the executor. + */ + RCLCPP_PUBLIC + explicit EventsExecutor( + rclcpp::experimental::executors::EventsQueue::UniquePtr events_queue = std::make_unique< + rclcpp::experimental::executors::SimpleEventsQueue>(), + bool execute_timers_separate_thread = false, + const rclcpp::ExecutorOptions & options = rclcpp::ExecutorOptions()); + + /// Default destructor. + RCLCPP_PUBLIC + virtual ~EventsExecutor(); + + /// Events executor implementation of spin. + /** + * This function will block until work comes in, execute it, and keep blocking. + * It will only be interrupted by a CTRL-C (managed by the global signal handler). + * \throws std::runtime_error when spin() called while already spinning + */ + RCLCPP_PUBLIC + void + spin() override; + + /// Events executor implementation of spin some + /** + * This non-blocking function will execute the timers and events + * that were ready when this API was called, until timeout or no + * more work available. New ready-timers/events arrived while + * executing work, won't be taken into account here. + * + * Example: + * while(condition) { + * spin_some(); + * sleep(); // User should have some sync work or + * // sleep to avoid a 100% CPU usage + * } + */ + RCLCPP_PUBLIC + void + spin_some(std::chrono::nanoseconds max_duration = std::chrono::nanoseconds(0)) override; + + /// Events executor implementation of spin all + /** + * This non-blocking function will execute timers and events + * until timeout or no more work available. If new ready-timers/events + * arrive while executing work available, they will be executed + * as long as the timeout hasn't expired. + * + * Example: + * while(condition) { + * spin_all(); + * sleep(); // User should have some sync work or + * // sleep to avoid a 100% CPU usage + * } + */ + RCLCPP_PUBLIC + void + spin_all(std::chrono::nanoseconds max_duration) override; + + /// Add a node to the executor. + /** + * \sa rclcpp::Executor::add_node + */ + RCLCPP_PUBLIC + void + add_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + bool notify = true) override; + + /// Convenience function which takes Node and forwards NodeBaseInterface. + /** + * \sa rclcpp::EventsExecutor::add_node + */ + RCLCPP_PUBLIC + void + add_node(std::shared_ptr node_ptr, bool notify = true) override; + + /// Remove a node from the executor. + /** + * \sa rclcpp::Executor::remove_node + */ + RCLCPP_PUBLIC + void + remove_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + bool notify = true) override; + + /// Convenience function which takes Node and forwards NodeBaseInterface. + /** + * \sa rclcpp::Executor::remove_node + */ + RCLCPP_PUBLIC + void + remove_node(std::shared_ptr node_ptr, bool notify = true) override; + + /// Add a callback group to an executor. + /** + * \sa rclcpp::Executor::add_callback_group + */ + RCLCPP_PUBLIC + void + add_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + bool notify = true) override; + + /// Remove callback group from the executor + /** + * \sa rclcpp::Executor::remove_callback_group + */ + RCLCPP_PUBLIC + void + remove_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + bool notify = true) override; + + RCLCPP_PUBLIC + std::vector + get_all_callback_groups() override; + + /// Get callback groups that belong to executor. + /** + * \sa rclcpp::Executor::get_manually_added_callback_groups() + */ + RCLCPP_PUBLIC + std::vector + get_manually_added_callback_groups() override; + + /// Get callback groups that belong to executor. + /** + * \sa rclcpp::Executor::get_automatically_added_callback_groups_from_nodes() + */ + RCLCPP_PUBLIC + std::vector + get_automatically_added_callback_groups_from_nodes() override; + +protected: + RCLCPP_PUBLIC + void + spin_once_impl(std::chrono::nanoseconds timeout) override; + + RCLCPP_PUBLIC + void + spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive); + +private: + RCLCPP_DISABLE_COPY(EventsExecutor) + + // Execute a single event + RCLCPP_PUBLIC + void + execute_event(const ExecutorEvent & event); + + // Queue where entities can push events + rclcpp::experimental::executors::EventsQueue::UniquePtr events_queue_; + + EventsExecutorEntitiesCollector::SharedPtr entities_collector_; + EventsExecutorNotifyWaitable::SharedPtr executor_notifier_; + + // Timers manager + std::shared_ptr timers_manager_; +}; + +} // namespace executors +} // namespace experimental +} // namespace rclcpp + +#endif // RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENTS_EXECUTOR_HPP_ diff --git a/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor_entities_collector.hpp b/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor_entities_collector.hpp new file mode 100644 index 0000000000..8d6aea74ae --- /dev/null +++ b/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor_entities_collector.hpp @@ -0,0 +1,348 @@ +// Copyright 2023 iRobot Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENTS_EXECUTOR_ENTITIES_COLLECTOR_HPP_ +#define RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENTS_EXECUTOR_ENTITIES_COLLECTOR_HPP_ + +#include +#include +#include +#include +#include +#include + +#include "rcl/guard_condition.h" +#include "rcl/wait.h" + +#include "rclcpp/callback_group.hpp" +#include "rclcpp/experimental/executors/events_executor/events_executor_event_types.hpp" +#include "rclcpp/experimental/timers_manager.hpp" +#include "rclcpp/macros.hpp" +#include "rclcpp/node_interfaces/node_base_interface.hpp" +#include "rclcpp/visibility_control.hpp" +#include "rclcpp/waitable.hpp" + +namespace rclcpp +{ +namespace experimental +{ +namespace executors +{ +// forward declaration of EventsExecutor to avoid circular dependency +class EventsExecutor; + +typedef std::map> WeakCallbackGroupsToNodesMap; + +/** + * @brief This class provides a waitable object that is used for managing the + * entities (i.e. nodes and their subscriptions, timers, services, etc) + * added to an EventsExecutor. + * The add/remove node APIs are used when a node is added/removed from + * the associated EventsExecutor and result in setting/unsetting the + * events callbacks and adding timers to the timers manager. + * + * Being this class derived from Waitable, it can be used to wake up an + * executor thread while it's spinning. + * When this occurs, the execute API takes care of handling changes + * in the entities currently used by the executor. + */ +class EventsExecutorEntitiesCollector final + : public rclcpp::Waitable, + public std::enable_shared_from_this +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(EventsExecutorEntitiesCollector) + + // Constructor + RCLCPP_PUBLIC + EventsExecutorEntitiesCollector( + EventsExecutor * executor); + + // Destructor + RCLCPP_PUBLIC + ~EventsExecutorEntitiesCollector() override; + + // Initialize entities collector + RCLCPP_PUBLIC + void init(); + + /// Execute the waitable. + RCLCPP_PUBLIC + void + execute(std::shared_ptr & data) override; + + /// Function to add_handles_to_wait_set and wait for work and + /** + * block until the wait set is ready or until the timeout has been exceeded. + * \throws std::runtime_error if wait set couldn't be cleared or filled. + * \throws any rcl errors from rcl_wait, \see rclcpp::exceptions::throw_from_rcl_error() + */ + RCLCPP_PUBLIC + void + refresh_wait_set(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); + + /** + * \throws std::runtime_error if it couldn't add guard condition to wait set + */ + RCLCPP_PUBLIC + void + add_to_wait_set(rcl_wait_set_t * wait_set) override; + + /// Complete all available queued work without blocking. + /** + * This function checks if after the guard condition was triggered + * (or a spurious wakeup happened) we are really ready to execute + * i.e. re-collect entities + */ + RCLCPP_PUBLIC + bool + is_ready(rcl_wait_set_t * wait_set) override; + + RCLCPP_PUBLIC + std::shared_ptr + take_data() override; + + RCLCPP_PUBLIC + std::shared_ptr + take_data_by_entity_id(size_t id) override; + + /// Add a callback group to an executor. + /** + * \see rclcpp::Executor::add_callback_group + */ + RCLCPP_PUBLIC + bool + add_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr); + + /// Add a callback group to an executor. + /** + * \see rclcpp::Executor::add_callback_group + * \return boolean whether the node from the callback group is new + */ + RCLCPP_PUBLIC + bool + add_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); + + /// Remove a callback group from the executor. + /** + * \see rclcpp::Executor::remove_callback_group + */ + RCLCPP_PUBLIC + bool + remove_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr); + + /// Remove a callback group from the executor. + /** + * \see rclcpp::Executor::remove_callback_group_from_map + */ + RCLCPP_PUBLIC + bool + remove_callback_group_from_map( + rclcpp::CallbackGroup::SharedPtr group_ptr, + WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); + + /** + * \see rclcpp::Executor::add_node() + * \throw std::runtime_error if node was already added + */ + RCLCPP_PUBLIC + bool + add_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr); + + /** + * \see rclcpp::Executor::remove_node() + * \throw std::runtime_error if no guard condition is associated with node. + */ + RCLCPP_PUBLIC + bool + remove_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr); + + RCLCPP_PUBLIC + std::vector + get_all_callback_groups(); + + /// Get callback groups that belong to executor. + /** + * \see rclcpp::Executor::get_manually_added_callback_groups() + */ + RCLCPP_PUBLIC + std::vector + get_manually_added_callback_groups(); + + /// Get callback groups that belong to executor. + /** + * \see rclcpp::Executor::get_automatically_added_callback_groups_from_nodes() + */ + RCLCPP_PUBLIC + std::vector + get_automatically_added_callback_groups_from_nodes(); + + /// + /** + * Get the subscription shared pointer corresponding + * to a subscription identifier + */ + RCLCPP_PUBLIC + rclcpp::SubscriptionBase::SharedPtr + get_subscription(const void * subscription_id); + + /// + /** + * Get the client shared pointer corresponding + * to a client identifier + */ + RCLCPP_PUBLIC + rclcpp::ClientBase::SharedPtr + get_client(const void * client_id); + + /// + /** + * Get the service shared pointer corresponding + * to a service identifier + */ + RCLCPP_PUBLIC + rclcpp::ServiceBase::SharedPtr + get_service(const void * service_id); + + /// + /** + * Get the waitable shared pointer corresponding + * to a waitable identifier + */ + RCLCPP_PUBLIC + rclcpp::Waitable::SharedPtr + get_waitable(const void * waitable_id); + + /// + /** + * Add a weak pointer to a waitable + */ + RCLCPP_PUBLIC + void + add_waitable(rclcpp::Waitable::SharedPtr waitable); + +private: + /// Return true if the node belongs to the collector + /** + * \param[in] node_ptr a node base interface shared pointer + * \param[in] weak_groups_to_nodes map to nodes to lookup + * \return boolean whether a node belongs the collector + */ + bool + has_node( + const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) const; + + /// Add all callback groups that can be automatically added by any executor + /// and is not already associated with an executor from nodes + /// that are associated with executor + /** + * \see rclcpp::Executor::add_callback_groups_from_nodes_associated_to_executor() + */ + void + add_callback_groups_from_nodes_associated_to_executor(); + + void + callback_group_added_impl( + rclcpp::CallbackGroup::SharedPtr group); + + void + node_added_impl( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node); + + void + callback_group_removed_impl( + rclcpp::CallbackGroup::SharedPtr group); + + void + node_removed_impl( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node); + + void + set_entities_event_callbacks_from_map( + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); + + void + set_callback_group_entities_callbacks(rclcpp::CallbackGroup::SharedPtr group); + + void + unset_callback_group_entities_callbacks(rclcpp::CallbackGroup::SharedPtr group); + + void + set_guard_condition_callback(rclcpp::GuardCondition * guard_condition); + + void + unset_guard_condition_callback(rclcpp::GuardCondition * guard_condition); + + std::function + create_entity_callback(void * exec_entity_id, ExecutorEventType type); + + std::function + create_waitable_callback(void * waitable_id); + + typedef std::map> + WeakCallbackGroupsToGuardConditionsMap; + + typedef std::map> + WeakNodesToGuardConditionsMap; + + /// maps callback groups to guard conditions + WeakCallbackGroupsToGuardConditionsMap weak_groups_to_guard_conditions_; + + WeakNodesToGuardConditionsMap weak_nodes_to_guard_conditions_; + + // maps callback groups to nodes. + WeakCallbackGroupsToNodesMap weak_groups_associated_with_executor_to_nodes_; + // maps callback groups to nodes. + WeakCallbackGroupsToNodesMap weak_groups_to_nodes_associated_with_executor_; + + /// List of weak nodes registered in the static executor + std::list weak_nodes_; + + // Maps: entity identifiers to weak pointers from the entities registered in the executor + // so in the case of an event providing and ID, we can retrieve and own the corresponding + // entity while it performs work + std::unordered_map weak_subscriptions_map_; + std::unordered_map weak_services_map_; + std::unordered_map weak_clients_map_; + std::unordered_map weak_waitables_map_; + + /// Executor using this entities collector object + EventsExecutor * associated_executor_ = nullptr; + /// Instance of the timers manager used by the associated executor + rclcpp::experimental::TimersManager::SharedPtr timers_manager_; + + // Mutex to protect vector of new nodes. + std::recursive_mutex reentrant_mutex_; +}; + +} // namespace executors +} // namespace experimental +} // namespace rclcpp + +#endif // RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENTS_EXECUTOR_ENTITIES_COLLECTOR_HPP_ diff --git a/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor_event_types.hpp b/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor_event_types.hpp new file mode 100644 index 0000000000..9aaf6a9bb5 --- /dev/null +++ b/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor_event_types.hpp @@ -0,0 +1,46 @@ +// Copyright 2023 iRobot Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENTS_EXECUTOR_EVENT_TYPES_HPP_ +#define RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENTS_EXECUTOR_EVENT_TYPES_HPP_ + +namespace rclcpp +{ +namespace experimental +{ +namespace executors +{ + +enum ExecutorEventType +{ + CLIENT_EVENT, + SUBSCRIPTION_EVENT, + SERVICE_EVENT, + TIMER_EVENT, + WAITABLE_EVENT +}; + +struct ExecutorEvent +{ + const void * exec_entity_id; + int gen_entity_id; + ExecutorEventType type; + size_t num_events; +}; + +} // namespace executors +} // namespace experimental +} // namespace rclcpp + +#endif // RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENTS_EXECUTOR_EVENT_TYPES_HPP_ diff --git a/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor_notify_waitable.hpp b/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor_notify_waitable.hpp new file mode 100644 index 0000000000..f9f9b11b06 --- /dev/null +++ b/rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor_notify_waitable.hpp @@ -0,0 +1,103 @@ +// Copyright 2023 iRobot Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENTS_EXECUTOR_NOTIFY_WAITABLE_HPP_ +#define RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENTS_EXECUTOR_NOTIFY_WAITABLE_HPP_ + +#include +#include + +#include "rcl/guard_condition.h" +#include "rclcpp/experimental/executors/events_executor/event_waitable.hpp" + +namespace rclcpp +{ +namespace experimental +{ +namespace executors +{ + +/** + * @brief This class provides an EventWaitable that allows to + * wake up an EventsExecutor when a guard condition is notified. + */ +class EventsExecutorNotifyWaitable final : public EventWaitable +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(EventsExecutorNotifyWaitable) + + // Constructor + RCLCPP_PUBLIC + EventsExecutorNotifyWaitable() = default; + + // Destructor + RCLCPP_PUBLIC + virtual ~EventsExecutorNotifyWaitable() = default; + + // The function is a no-op, since we only care of waking up the executor + RCLCPP_PUBLIC + void + execute(std::shared_ptr & data) override + { + (void)data; + } + + RCLCPP_PUBLIC + void + add_guard_condition(rclcpp::GuardCondition * guard_condition) + { + notify_guard_conditions_.push_back(guard_condition); + } + + RCLCPP_PUBLIC + void + set_on_ready_callback(std::function callback) override + { + // The second argument of the callback should identify which guard condition + // triggered the event. However it's not relevant here as we only + // care about waking up the executor, so we pass a 0. + auto gc_callback = [callback](size_t count) { + callback(count, 0); + }; + + for (auto gc : notify_guard_conditions_) { + gc->set_on_trigger_callback(gc_callback); + } + } + + RCLCPP_PUBLIC + std::shared_ptr + take_data() override + { + // This waitable doesn't handle any data + return nullptr; + } + + RCLCPP_PUBLIC + std::shared_ptr + take_data_by_entity_id(size_t id) override + { + (void) id; + return take_data(); + } + +private: + std::list notify_guard_conditions_; +}; + +} // namespace executors +} // namespace experimental +} // namespace rclcpp + +#endif // RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENTS_EXECUTOR_NOTIFY_WAITABLE_HPP_ diff --git a/rclcpp/include/rclcpp/experimental/executors/events_executor/events_queue.hpp b/rclcpp/include/rclcpp/experimental/executors/events_executor/events_queue.hpp new file mode 100644 index 0000000000..24282d6027 --- /dev/null +++ b/rclcpp/include/rclcpp/experimental/executors/events_executor/events_queue.hpp @@ -0,0 +1,100 @@ +// Copyright 2023 iRobot Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENTS_QUEUE_HPP_ +#define RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENTS_QUEUE_HPP_ + +#include + +#include "rclcpp/macros.hpp" +#include "rclcpp/visibility_control.hpp" + +#include "rclcpp/experimental/executors/events_executor/events_executor_event_types.hpp" + +namespace rclcpp +{ +namespace experimental +{ +namespace executors +{ + +/** + * @brief This abstract class can be used to implement different types of queues + * where `ExecutorEvent` can be stored. + * The derived classes should choose which underlying container to use and + * the strategy for pushing and popping events. + * For example a queue implementation may be bounded or unbounded and have + * different pruning strategies. + * Implementations may or may not check the validity of events and decide how to handle + * the situation where an event is not valid anymore (e.g. a subscription history cache overruns) + */ +class EventsQueue +{ +public: + RCLCPP_SMART_PTR_ALIASES_ONLY(EventsQueue) + + RCLCPP_PUBLIC + EventsQueue() = default; + + /** + * @brief Destruct the object. + */ + RCLCPP_PUBLIC + virtual ~EventsQueue() = default; + + /** + * @brief push event into the queue + * @param event The event to push into the queue + */ + RCLCPP_PUBLIC + virtual + void + enqueue(const rclcpp::experimental::executors::ExecutorEvent & event) = 0; + + /** + * @brief Extracts an event from the queue, eventually waiting until timeout + * if none is available. + * @return true if event has been found, false if timeout + */ + RCLCPP_PUBLIC + virtual + bool + dequeue( + rclcpp::experimental::executors::ExecutorEvent & event, + std::chrono::nanoseconds timeout = std::chrono::nanoseconds::max()) = 0; + + /** + * @brief Test whether queue is empty + * @return true if the queue's size is 0, false otherwise. + */ + RCLCPP_PUBLIC + virtual + bool + empty() const = 0; + + /** + * @brief Returns the number of elements in the queue. + * @return the number of elements in the queue. + */ + RCLCPP_PUBLIC + virtual + size_t + size() const = 0; +}; + +} // namespace executors +} // namespace experimental +} // namespace rclcpp + +#endif // RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENTS_QUEUE_HPP_ diff --git a/rclcpp/include/rclcpp/experimental/executors/events_executor/simple_events_queue.hpp b/rclcpp/include/rclcpp/experimental/executors/events_executor/simple_events_queue.hpp new file mode 100644 index 0000000000..489ecc24bd --- /dev/null +++ b/rclcpp/include/rclcpp/experimental/executors/events_executor/simple_events_queue.hpp @@ -0,0 +1,134 @@ +// Copyright 2023 iRobot Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__SIMPLE_EVENTS_QUEUE_HPP_ +#define RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__SIMPLE_EVENTS_QUEUE_HPP_ + +#include +#include +#include +#include + +#include "rclcpp/experimental/executors/events_executor/events_queue.hpp" + +namespace rclcpp +{ +namespace experimental +{ +namespace executors +{ + +/** + * @brief This class implements an EventsQueue as a simple wrapper around a std::queue. + * It does not perform any checks about the size of queue, which can grow + * unbounded without being pruned. + * The simplicity of this implementation makes it suitable for optimizing CPU usage. + */ +class SimpleEventsQueue : public EventsQueue +{ +public: + RCLCPP_PUBLIC + ~SimpleEventsQueue() override = default; + + /** + * @brief enqueue event into the queue + * Thread safe + * @param event The event to enqueue into the queue + */ + RCLCPP_PUBLIC + void + enqueue(const rclcpp::experimental::executors::ExecutorEvent & event) override + { + rclcpp::experimental::executors::ExecutorEvent single_event = event; + single_event.num_events = 1; + { + std::unique_lock lock(mutex_); + for (size_t ev = 0; ev < event.num_events; ev++) { + event_queue_.push(single_event); + } + } + events_queue_cv_.notify_one(); + } + + /** + * @brief waits for an event until timeout, gets a single event + * Thread safe + * @return true if event, false if timeout + */ + RCLCPP_PUBLIC + bool + dequeue( + rclcpp::experimental::executors::ExecutorEvent & event, + std::chrono::nanoseconds timeout = std::chrono::nanoseconds::max()) override + { + std::unique_lock lock(mutex_); + + // Initialize to true because it's only needed if we have a valid timeout + bool has_data = true; + if (timeout != std::chrono::nanoseconds::max()) { + has_data = + events_queue_cv_.wait_for(lock, timeout, [this]() {return !event_queue_.empty();}); + } else { + events_queue_cv_.wait(lock, [this]() {return !event_queue_.empty();}); + } + + if (has_data) { + event = event_queue_.front(); + event_queue_.pop(); + return true; + } + + return false; + } + + /** + * @brief Test whether queue is empty + * Thread safe + * @return true if the queue's size is 0, false otherwise. + */ + RCLCPP_PUBLIC + bool + empty() const override + { + std::unique_lock lock(mutex_); + return event_queue_.empty(); + } + + /** + * @brief Returns the number of elements in the queue. + * Thread safe + * @return the number of elements in the queue. + */ + RCLCPP_PUBLIC + size_t + size() const override + { + std::unique_lock lock(mutex_); + return event_queue_.size(); + } + +private: + // The underlying queue implementation + std::queue event_queue_; + // Mutex to protect the insertion/extraction of events in the queue + mutable std::mutex mutex_; + // Variable used to notify when an event is added to the queue + std::condition_variable events_queue_cv_; +}; + +} // namespace executors +} // namespace experimental +} // namespace rclcpp + +#endif // RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__SIMPLE_EVENTS_QUEUE_HPP_ diff --git a/rclcpp/include/rclcpp/experimental/timers_manager.hpp b/rclcpp/include/rclcpp/experimental/timers_manager.hpp new file mode 100644 index 0000000000..5f3ff84fca --- /dev/null +++ b/rclcpp/include/rclcpp/experimental/timers_manager.hpp @@ -0,0 +1,539 @@ +// Copyright 2023 iRobot Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXPERIMENTAL__TIMERS_MANAGER_HPP_ +#define RCLCPP__EXPERIMENTAL__TIMERS_MANAGER_HPP_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "rclcpp/context.hpp" +#include "rclcpp/timer.hpp" + +namespace rclcpp +{ +namespace experimental +{ + +/** + * @brief This class provides a way for storing and executing timer objects. + * It provides APIs to suit the needs of different applications and execution models. + * All public APIs provided by this class are thread-safe. + * + * Timers management + * This class provides APIs to add/remove timers to/from an internal storage. + * It keeps a list of weak pointers from added timers, and locks them only when + * they need to be executed or modified. + * Timers are kept ordered in a binary-heap priority queue. + * Calls to add/remove APIs will temporarily block the execution of the timers and + * will require to reorder the internal priority queue. + * Because of this, they have a not-negligible impact on the performance. + * + * Timers execution + * The most efficient implementation consists in letting a TimersManager object + * to spawn a thread where timers are monitored and periodically executed. + * Besides this, other APIs allow to either execute a single timer or all the + * currently ready ones. + * This class assumes that the `execute_callback()` API of the stored timers is never + * called by other entities, but it can only be called from here. + * If this assumption is not respected, the heap property may be invalidated, + * so timers may be executed out of order, without this object noticing it. + * + */ +class TimersManager +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(TimersManager) + + /** + * @brief Construct a new TimersManager object + * + * @param context custom context to be used. + * Shared ownership of the context is held until destruction. + * @param on_ready_callback The timers on ready callback. if not callable, + * this object will directly execute timers when they are ready. + */ + RCLCPP_PUBLIC + TimersManager( + std::shared_ptr context, + std::function on_ready_callback = nullptr); + + /** + * @brief Destruct the TimersManager object making sure to stop thread and release memory. + */ + RCLCPP_PUBLIC + ~TimersManager(); + + /** + * @brief Adds a new timer to the storage, maintaining weak ownership of it. + * Function is thread safe and it can be called regardless of the state of the timers thread. + * + * @param timer the timer to add. + * @throws std::invalid_argument if timer is a nullptr. + */ + RCLCPP_PUBLIC + void add_timer(rclcpp::TimerBase::SharedPtr timer); + + /** + * @brief Remove a single timer from the object storage. + * Will do nothing if the timer was not being stored here. + * Function is thread safe and it can be called regardless of the state of the timers thread. + * + * @param timer the timer to remove. + */ + RCLCPP_PUBLIC + void remove_timer(rclcpp::TimerBase::SharedPtr timer); + + /** + * @brief Remove all the timers stored in the object. + * Function is thread safe and it can be called regardless of the state of the timers thread. + */ + RCLCPP_PUBLIC + void clear(); + + /** + * @brief Starts a thread that takes care of executing the timers stored in this object. + * Function will throw an error if the timers thread was already running. + */ + RCLCPP_PUBLIC + void start(); + + /** + * @brief Stops the timers thread. + * Will do nothing if the timer thread was not running. + */ + RCLCPP_PUBLIC + void stop(); + + /** + * @brief Executes all the timers currently ready when the function was invoked. + * This function will lock all the stored timers throughout its duration. + * This function is thread safe. + * @throws std::runtime_error if the timers thread was already running. + */ + RCLCPP_PUBLIC + void execute_ready_timers(); + + /** + * @brief Get the number of timers that are currently ready. + * This function is thread safe. + * + * @return size_t number of ready timers. + * @throws std::runtime_error if the timers thread was already running. + */ + RCLCPP_PUBLIC + size_t get_number_ready_timers(); + + /** + * @brief Executes head timer if ready. + * This function is thread safe. + * + * @return true if head timer was ready. + * @throws std::runtime_error if the timers thread was already running. + */ + RCLCPP_PUBLIC + bool execute_head_timer(); + + /** + * @brief Executes timer identified by its ID. + * This function is thread safe. + * + * @param timer_id the timer ID of the timer to execute + */ + RCLCPP_PUBLIC + void execute_ready_timer(const void * timer_id); + + /** + * @brief Get the amount of time before the next timer triggers. + * This function is thread safe. + * + * @return std::chrono::nanoseconds to wait, + * the returned value could be negative if the timer is already expired + * or std::chrono::nanoseconds::max() if there are no timers stored in the object. + * @throws std::runtime_error if the timers thread was already running. + */ + RCLCPP_PUBLIC + std::chrono::nanoseconds get_head_timeout(); + +private: + RCLCPP_DISABLE_COPY(TimersManager) + + using TimerPtr = rclcpp::TimerBase::SharedPtr; + using WeakTimerPtr = rclcpp::TimerBase::WeakPtr; + + // Forward declaration + class TimersHeap; + + /** + * @brief This class allows to store weak pointers to timers in a heap-like data structure. + * The root of the heap is the timer that triggers first. + * Since this class uses weak ownership, it is not guaranteed that it represents a valid heap + * at any point in time as timers could go out of scope, thus invalidating it. + * The "validate_and_lock" API allows to restore the heap property and also returns a locked version + * of the timers heap. + * This class is not thread safe and requires external mutexes to protect its usage. + */ + class WeakTimersHeap + { +public: + /** + * @brief Add a new timer to the heap. After the addition, the heap property is enforced. + * + * @param timer new timer to add. + * @return true if timer has been added, false if it was already there. + */ + bool add_timer(TimerPtr timer) + { + TimersHeap locked_heap = this->validate_and_lock(); + bool added = locked_heap.add_timer(std::move(timer)); + + if (added) { + // Re-create the weak heap with the new timer added + this->store(locked_heap); + } + + return added; + } + + /** + * @brief Remove a timer from the heap. After the removal, the heap property is enforced. + * + * @param timer timer to remove. + * @return true if timer has been removed, false if it was not there. + */ + bool remove_timer(TimerPtr timer) + { + TimersHeap locked_heap = this->validate_and_lock(); + bool removed = locked_heap.remove_timer(std::move(timer)); + + if (removed) { + // Re-create the weak heap with the timer removed + this->store(locked_heap); + } + + return removed; + } + + TimerPtr get_timer(const void * timer_id) + { + for (auto & weak_timer : weak_heap_) { + auto timer = weak_timer.lock(); + if (timer.get() == timer_id) { + return timer; + } + } + return nullptr; + } + + /** + * @brief Returns a const reference to the front element. + */ + const WeakTimerPtr & front() const + { + return weak_heap_.front(); + } + + /** + * @brief Returns whether the heap is empty or not. + */ + bool empty() const + { + return weak_heap_.empty(); + } + + /** + * @brief This function restores the current object as a valid heap + * and it returns a locked version of it. + * Timers that went out of scope are removed from the container. + * It is the only public API to access and manipulate the stored timers. + * + * @return TimersHeap owned timers corresponding to the current object + */ + TimersHeap validate_and_lock() + { + TimersHeap locked_heap; + bool any_timer_destroyed = false; + + for (auto weak_timer : weak_heap_) { + auto timer = weak_timer.lock(); + if (timer) { + // This timer is valid, so add it to the locked heap + // Note: we access friend private `owned_heap_` member field. + locked_heap.owned_heap_.push_back(std::move(timer)); + } else { + // This timer went out of scope, so we don't add it to locked heap + // and we mark the corresponding flag. + // It's not needed to erase it from weak heap, as we are going to re-heapify. + // Note: we can't exit from the loop here, as we need to find all valid timers. + any_timer_destroyed = true; + } + } + + // If a timer has gone out of scope, then the remaining elements do not represent + // a valid heap anymore. We need to re-heapify the timers heap. + if (any_timer_destroyed) { + locked_heap.heapify(); + // Re-create the weak heap now that elements have been heapified again + this->store(locked_heap); + } + + return locked_heap; + } + + /** + * @brief This function allows to recreate the heap of weak pointers + * from an heap of owned pointers. + * It is required to be called after a locked TimersHeap generated from this object + * has been modified in any way (e.g. timers triggered, added, removed). + * + * @param heap timers heap to store as weak pointers + */ + void store(const TimersHeap & heap) + { + weak_heap_.clear(); + // Note: we access friend private `owned_heap_` member field. + for (auto t : heap.owned_heap_) { + weak_heap_.push_back(t); + } + } + + /** + * @brief Remove all timers from the heap. + */ + void clear() + { + weak_heap_.clear(); + } + +private: + std::vector weak_heap_; + }; + + /** + * @brief This class is the equivalent of WeakTimersHeap but with ownership of the timers. + * It can be generated by locking the weak version. + * It provides operations to manipulate the heap. + * This class is not thread safe and requires external mutexes to protect its usage. + */ + class TimersHeap + { +public: + /** + * @brief Try to add a new timer to the heap. + * After the addition, the heap property is preserved. + * @param timer new timer to add. + * @return true if timer has been added, false if it was already there. + */ + bool add_timer(TimerPtr timer) + { + // Nothing to do if the timer is already stored here + auto it = std::find(owned_heap_.begin(), owned_heap_.end(), timer); + if (it != owned_heap_.end()) { + return false; + } + + owned_heap_.push_back(std::move(timer)); + std::push_heap(owned_heap_.begin(), owned_heap_.end(), timer_greater); + + return true; + } + + /** + * @brief Try to remove a timer from the heap. + * After the removal, the heap property is preserved. + * @param timer timer to remove. + * @return true if timer has been removed, false if it was not there. + */ + bool remove_timer(TimerPtr timer) + { + // Nothing to do if the timer is not stored here + auto it = std::find(owned_heap_.begin(), owned_heap_.end(), timer); + if (it == owned_heap_.end()) { + return false; + } + + owned_heap_.erase(it); + this->heapify(); + + return true; + } + + /** + * @brief Returns a reference to the front element. + * @return reference to front element. + */ + TimerPtr & front() + { + return owned_heap_.front(); + } + + /** + * @brief Returns a const reference to the front element. + * @return const reference to front element. + */ + const TimerPtr & front() const + { + return owned_heap_.front(); + } + + /** + * @brief Returns whether the heap is empty or not. + * @return true if the heap is empty. + */ + bool empty() const + { + return owned_heap_.empty(); + } + + /** + * @brief Returns the size of the heap. + * @return the number of valid timers in the heap. + */ + size_t size() const + { + return owned_heap_.size(); + } + + /** + * @brief Get the number of timers that are currently ready. + * @return size_t number of ready timers. + */ + size_t get_number_ready_timers() const + { + size_t ready_timers = 0; + + for (TimerPtr t : owned_heap_) { + if (t->is_ready()) { + ready_timers++; + } + } + + return ready_timers; + } + + /** + * @brief Restore a valid heap after the root value has been replaced (e.g. timer triggered). + */ + void heapify_root() + { + // The following code is a more efficient version than doing + // pop_heap, pop_back, push_back, push_heap + // as it removes the need for the last push_heap + + // Push the modified element (i.e. the current root) at the bottom of the heap + owned_heap_.push_back(owned_heap_[0]); + // Exchange first and last-1 elements and reheapify + std::pop_heap(owned_heap_.begin(), owned_heap_.end(), timer_greater); + // Remove last element + owned_heap_.pop_back(); + } + + /** + * @brief Completely restores the structure to a valid heap + */ + void heapify() + { + std::make_heap(owned_heap_.begin(), owned_heap_.end(), timer_greater); + } + + void clear_callbacks() + { + for (TimerPtr & t : owned_heap_) { + t->clear_on_reset_callback(); + } + } + + /** + * @brief Friend declaration to allow the `validate_and_lock()` function to access the + * underlying heap container + */ + friend TimersHeap WeakTimersHeap::validate_and_lock(); + + /** + * @brief Friend declaration to allow the `store()` function to access the + * underlying heap container + */ + friend void WeakTimersHeap::store(const TimersHeap & heap); + +private: + /** + * @brief Comparison function between timers. + * @return true if `a` triggers after `b`. + */ + static bool timer_greater(TimerPtr a, TimerPtr b) + { + // FIXME! + return a->time_until_trigger() > b->time_until_trigger(); + } + + std::vector owned_heap_; + }; + + /** + * @brief Implements a loop that keeps executing ready timers. + * This function is executed in the timers thread. + */ + void run_timers(); + + /** + * @brief Get the amount of time before the next timer triggers. + * This function is not thread safe, acquire a mutex before calling it. + * + * @return std::chrono::nanoseconds to wait, + * the returned value could be negative if the timer is already expired + * or std::chrono::nanoseconds::max() if the heap is empty. + * This function is not thread safe, acquire the timers_mutex_ before calling it. + */ + std::chrono::nanoseconds get_head_timeout_unsafe(); + + /** + * @brief Executes all the timers currently ready when the function is invoked + * while keeping the heap correctly sorted. + * This function is not thread safe, acquire the timers_mutex_ before calling it. + */ + void execute_ready_timers_unsafe(); + + // Callback to be called when timer is ready + std::function on_ready_callback_ = nullptr; + + // Thread used to run the timers execution task + std::thread timers_thread_; + // Protects access to timers + std::mutex timers_mutex_; + // Protects access to stop() + std::mutex stop_mutex_; + // Notifies the timers thread whenever timers are added/removed + std::condition_variable timers_cv_; + // Flag used as predicate by timers_cv_ that denotes one or more timers being added/removed + bool timers_updated_ {false}; + // Indicates whether the timers thread is currently running or not + std::atomic running_ {false}; + // Parent context used to understand if ROS is still active + std::shared_ptr context_; + // Timers heap storage with weak ownership + WeakTimersHeap weak_timers_heap_; +}; + +} // namespace experimental +} // namespace rclcpp + +#endif // RCLCPP__EXPERIMENTAL__TIMERS_MANAGER_HPP_ diff --git a/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp b/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp new file mode 100644 index 0000000000..48181d92d6 --- /dev/null +++ b/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp @@ -0,0 +1,327 @@ +// Copyright 2023 iRobot Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rclcpp/experimental/executors/events_executor/events_executor.hpp" + +#include +#include +#include +#include +#include + +#include "rcpputils/scope_exit.hpp" + +#include "rclcpp/exceptions/exceptions.hpp" + +using namespace std::chrono_literals; + +using rclcpp::experimental::executors::EventsExecutor; + +EventsExecutor::EventsExecutor( + rclcpp::experimental::executors::EventsQueue::UniquePtr events_queue, + bool execute_timers_separate_thread, + const rclcpp::ExecutorOptions & options) +: rclcpp::Executor(options) +{ + // Get ownership of the queue used to store events. + if (!events_queue) { + throw std::invalid_argument("events_queue can't be a null pointer"); + } + events_queue_ = std::move(events_queue); + + // Create timers manager + std::function timer_on_ready_cb = nullptr; + if (execute_timers_separate_thread) { + timer_on_ready_cb = [this](const void * timer_id) { + ExecutorEvent event = {timer_id, -1, ExecutorEventType::TIMER_EVENT, 1}; + this->events_queue_->enqueue(event); + }; + } + timers_manager_ = + std::make_shared(context_, timer_on_ready_cb); + + // Create entities collector + entities_collector_ = std::make_shared(this); + entities_collector_->init(); + + // Setup the executor notifier to wake up the executor when some guard conditions are tiggered. + // The added guard conditions are guaranteed to not go out of scope before the executor itself. + executor_notifier_ = std::make_shared(); + executor_notifier_->add_guard_condition(shutdown_guard_condition_.get()); + executor_notifier_->add_guard_condition(&interrupt_guard_condition_); + + entities_collector_->add_waitable(executor_notifier_); +} + +EventsExecutor::~EventsExecutor() +{ + spinning.store(false); +} + +void +EventsExecutor::spin() +{ + if (spinning.exchange(true)) { + throw std::runtime_error("spin() called while already spinning"); + } + RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + + timers_manager_->start(); + RCPPUTILS_SCOPE_EXIT(timers_manager_->stop(); ); + + while (rclcpp::ok(context_) && spinning.load()) { + // Wait until we get an event + ExecutorEvent event; + bool has_event = events_queue_->dequeue(event); + if (has_event) { + this->execute_event(event); + } + } +} + +void +EventsExecutor::spin_some(std::chrono::nanoseconds max_duration) +{ + return this->spin_some_impl(max_duration, false); +} + +void +EventsExecutor::spin_all(std::chrono::nanoseconds max_duration) +{ + if (max_duration <= 0ns) { + throw std::invalid_argument("max_duration must be positive"); + } + return this->spin_some_impl(max_duration, true); +} + +void +EventsExecutor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive) +{ + if (spinning.exchange(true)) { + throw std::runtime_error("spin_some() called while already spinning"); + } + + RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + + auto start = std::chrono::steady_clock::now(); + + auto max_duration_not_elapsed = [max_duration, start]() { + if (std::chrono::nanoseconds(0) == max_duration) { + // told to spin forever if need be + return true; + } else if (std::chrono::steady_clock::now() - start < max_duration) { + // told to spin only for some maximum amount of time + return true; + } + // spun too long + return false; + }; + + // Get the number of events and timers ready at start + const size_t ready_events_at_start = events_queue_->size(); + size_t executed_events = 0; + const size_t ready_timers_at_start = timers_manager_->get_number_ready_timers(); + size_t executed_timers = 0; + + while (rclcpp::ok(context_) && spinning.load() && max_duration_not_elapsed()) { + // Execute first ready event from queue if exists + if (exhaustive || (executed_events < ready_events_at_start)) { + bool has_event = !events_queue_->empty(); + + if (has_event) { + ExecutorEvent event; + bool ret = events_queue_->dequeue(event, std::chrono::nanoseconds(0)); + if (ret) { + this->execute_event(event); + executed_events++; + continue; + } + } + } + + // Execute first timer if it is ready + if (exhaustive || (executed_timers < ready_timers_at_start)) { + bool timer_executed = timers_manager_->execute_head_timer(); + if (timer_executed) { + executed_timers++; + continue; + } + } + + // If there's no more work available, exit + break; + } +} + +void +EventsExecutor::spin_once_impl(std::chrono::nanoseconds timeout) +{ + // In this context a negative input timeout means no timeout + if (timeout < 0ns) { + timeout = std::chrono::nanoseconds::max(); + } + + // Select the smallest between input timeout and timer timeout + bool is_timer_timeout = false; + auto next_timer_timeout = timers_manager_->get_head_timeout(); + if (next_timer_timeout < timeout) { + timeout = next_timer_timeout; + is_timer_timeout = true; + } + + ExecutorEvent event; + bool has_event = events_queue_->dequeue(event, timeout); + + // If we wake up from the wait with an event, it means that it + // arrived before any of the timers expired. + if (has_event) { + this->execute_event(event); + } else if (is_timer_timeout) { + timers_manager_->execute_head_timer(); + } +} + +void +EventsExecutor::add_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify) +{ + // This field is unused because we don't have to wake up the executor when a node is added. + (void) notify; + + // Add node to entities collector + entities_collector_->add_node(node_ptr); +} + +void +EventsExecutor::add_node(std::shared_ptr node_ptr, bool notify) +{ + this->add_node(node_ptr->get_node_base_interface(), notify); +} + +void +EventsExecutor::remove_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify) +{ + // This field is unused because we don't have to wake up the executor when a node is removed. + (void)notify; + + // Remove node from entities collector. + // This will result in un-setting all the event callbacks from its entities. + // After this function returns, this executor will not receive any more events associated + // to these entities. + entities_collector_->remove_node(node_ptr); +} + +void +EventsExecutor::remove_node(std::shared_ptr node_ptr, bool notify) +{ + this->remove_node(node_ptr->get_node_base_interface(), notify); +} + +void +EventsExecutor::execute_event(const ExecutorEvent & event) +{ + switch (event.type) { + case ExecutorEventType::CLIENT_EVENT: + { + auto client = entities_collector_->get_client(event.exec_entity_id); + + if (client) { + for (size_t i = 0; i < event.num_events; i++) { + execute_client(client); + } + } + break; + } + case ExecutorEventType::SUBSCRIPTION_EVENT: + { + auto subscription = entities_collector_->get_subscription(event.exec_entity_id); + + if (subscription) { + for (size_t i = 0; i < event.num_events; i++) { + execute_subscription(subscription); + } + } + break; + } + case ExecutorEventType::SERVICE_EVENT: + { + auto service = entities_collector_->get_service(event.exec_entity_id); + + if (service) { + for (size_t i = 0; i < event.num_events; i++) { + execute_service(service); + } + } + break; + } + case ExecutorEventType::TIMER_EVENT: + { + timers_manager_->execute_ready_timer(event.exec_entity_id); + break; + } + case ExecutorEventType::WAITABLE_EVENT: + { + auto waitable = entities_collector_->get_waitable(event.exec_entity_id); + + if (waitable) { + for (size_t i = 0; i < event.num_events; i++) { + auto data = waitable->take_data_by_entity_id(event.gen_entity_id); + waitable->execute(data); + } + } + break; + } + } +} + +void +EventsExecutor::add_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + bool notify) +{ + // This field is unused because we don't have to wake up + // the executor when a callback group is added. + (void)notify; + entities_collector_->add_callback_group(group_ptr, node_ptr); +} + +void +EventsExecutor::remove_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, bool notify) +{ + // This field is unused because we don't have to wake up + // the executor when a callback group is removed. + (void)notify; + entities_collector_->remove_callback_group(group_ptr); +} + +std::vector +EventsExecutor::get_all_callback_groups() +{ + return entities_collector_->get_all_callback_groups(); +} + +std::vector +EventsExecutor::get_manually_added_callback_groups() +{ + return entities_collector_->get_manually_added_callback_groups(); +} + +std::vector +EventsExecutor::get_automatically_added_callback_groups_from_nodes() +{ + return entities_collector_->get_automatically_added_callback_groups_from_nodes(); +} diff --git a/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor_entities_collector.cpp b/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor_entities_collector.cpp new file mode 100644 index 0000000000..11109cee14 --- /dev/null +++ b/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor_entities_collector.cpp @@ -0,0 +1,699 @@ +// Copyright 2023 iRobot Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rclcpp/experimental/executors/events_executor/events_executor_entities_collector.hpp" + +#include +#include +#include +#include +#include +#include + +#include "rclcpp/memory_strategy.hpp" +#include "rclcpp/detail/add_guard_condition_to_rcl_wait_set.hpp" +#include "rclcpp/experimental/executors/events_executor/events_executor.hpp" + +using rclcpp::experimental::executors::ExecutorEvent; +using rclcpp::experimental::executors::ExecutorEventType; +using rclcpp::experimental::executors::EventsExecutorEntitiesCollector; + +EventsExecutorEntitiesCollector::EventsExecutorEntitiesCollector( + rclcpp::experimental::executors::EventsExecutor * executor) +{ + if (executor == nullptr) { + throw std::runtime_error("Received nullptr executor in EventsExecutorEntitiesCollector."); + } + + associated_executor_ = executor; + timers_manager_ = associated_executor_->timers_manager_; +} + +void +EventsExecutorEntitiesCollector::init() +{ + std::lock_guard lock(reentrant_mutex_); + // Add the EventsExecutorEntitiesCollector shared_ptr to waitables map + weak_waitables_map_.emplace(this, this->shared_from_this()); +} + +EventsExecutorEntitiesCollector::~EventsExecutorEntitiesCollector() +{ + std::lock_guard lock(reentrant_mutex_); + + // Disassociate all callback groups and thus nodes. + for (const auto & pair : weak_groups_associated_with_executor_to_nodes_) { + auto group = pair.first.lock(); + if (group) { + std::atomic_bool & has_executor = group->get_associated_with_executor_atomic(); + has_executor.store(false); + callback_group_removed_impl(group); + } + } + for (const auto & pair : weak_groups_to_nodes_associated_with_executor_) { + auto group = pair.first.lock(); + if (group) { + std::atomic_bool & has_executor = group->get_associated_with_executor_atomic(); + has_executor.store(false); + callback_group_removed_impl(group); + } + } + // Disassociate all nodes + for (const auto & weak_node : weak_nodes_) { + auto node = weak_node.lock(); + if (node) { + std::atomic_bool & has_executor = node->get_associated_with_executor_atomic(); + has_executor.store(false); + node_removed_impl(node); + } + } + + // Unset callback group notify guard condition executor callback + for (const auto & pair : weak_groups_to_guard_conditions_) { + auto group = pair.first.lock(); + if (group) { + auto & group_gc = pair.second; + unset_guard_condition_callback(group_gc); + } + } + + weak_clients_map_.clear(); + weak_services_map_.clear(); + weak_waitables_map_.clear(); + weak_subscriptions_map_.clear(); + weak_nodes_to_guard_conditions_.clear(); + weak_groups_to_guard_conditions_.clear(); + + weak_groups_associated_with_executor_to_nodes_.clear(); + weak_groups_to_nodes_associated_with_executor_.clear(); + weak_nodes_.clear(); +} + +void +EventsExecutorEntitiesCollector::execute(std::shared_ptr & data) +{ + // This function is called when the associated executor is notified that something changed. + // We do not know if an entity has been added or removed so we have to rebuild everything. + (void)data; + + std::lock_guard lock(reentrant_mutex_); + timers_manager_->clear(); + + // If a registered node has a new callback group, register the group. + add_callback_groups_from_nodes_associated_to_executor(); + + // For all groups registered in the executor, set their event callbacks. + set_entities_event_callbacks_from_map(weak_groups_associated_with_executor_to_nodes_); + set_entities_event_callbacks_from_map(weak_groups_to_nodes_associated_with_executor_); +} + +std::shared_ptr +EventsExecutorEntitiesCollector::take_data() +{ + return nullptr; +} + +std::shared_ptr +EventsExecutorEntitiesCollector::take_data_by_entity_id(size_t id) +{ + (void)id; + return nullptr; +} + +void +EventsExecutorEntitiesCollector::add_to_wait_set(rcl_wait_set_t * wait_set) +{ + (void)wait_set; +} + +bool +EventsExecutorEntitiesCollector::is_ready(rcl_wait_set_t * p_wait_set) +{ + (void)p_wait_set; + return false; +} + +bool +EventsExecutorEntitiesCollector::add_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr) +{ + bool is_new_node = false; + // If the node already has an executor + std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic(); + if (has_executor.exchange(true)) { + throw std::runtime_error("Node has already been added to an executor."); + } + node_ptr->for_each_callback_group( + [this, node_ptr, &is_new_node](rclcpp::CallbackGroup::SharedPtr group_ptr) + { + if ( + !group_ptr->get_associated_with_executor_atomic().load() && + group_ptr->automatically_add_to_executor_with_node()) + { + is_new_node = (add_callback_group( + group_ptr, + node_ptr, + weak_groups_to_nodes_associated_with_executor_) || + is_new_node); + } + }); + weak_nodes_.push_back(node_ptr); + return is_new_node; +} + +bool +EventsExecutorEntitiesCollector::add_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) +{ + // If the callback_group already has an executor + std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic(); + if (has_executor.exchange(true)) { + throw std::runtime_error("Callback group has already been added to an executor."); + } + bool is_new_node = !has_node(node_ptr, weak_groups_associated_with_executor_to_nodes_) && + !has_node(node_ptr, weak_groups_to_nodes_associated_with_executor_); + rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr; + auto insert_info = weak_groups_to_nodes.insert( + std::make_pair(weak_group_ptr, node_ptr)); + bool was_inserted = insert_info.second; + if (!was_inserted) { + throw std::runtime_error("Callback group was already added to executor."); + } + + if (is_new_node) { + node_added_impl(node_ptr); + } + + if (node_ptr->get_context()->is_valid()) { + auto callback_group_guard_condition = + group_ptr->get_notify_guard_condition(node_ptr->get_context()); + + rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr; + weak_groups_to_guard_conditions_[weak_group_ptr] = callback_group_guard_condition.get(); + } + + callback_group_added_impl(group_ptr); + + return is_new_node; +} + +bool +EventsExecutorEntitiesCollector::add_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr) +{ + return add_callback_group(group_ptr, node_ptr, weak_groups_associated_with_executor_to_nodes_); +} + +bool +EventsExecutorEntitiesCollector::remove_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr) +{ + return this->remove_callback_group_from_map( + group_ptr, + weak_groups_associated_with_executor_to_nodes_); +} + +bool +EventsExecutorEntitiesCollector::remove_callback_group_from_map( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) +{ + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr; + rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr; + auto iter = weak_groups_to_nodes.find(weak_group_ptr); + if (iter != weak_groups_to_nodes.end()) { + node_ptr = iter->second.lock(); + if (node_ptr == nullptr) { + throw std::runtime_error("Node must not be deleted before its callback group(s)."); + } + weak_groups_to_nodes.erase(iter); + callback_group_removed_impl(group_ptr); + } else { + throw std::runtime_error("Callback group needs to be associated with executor."); + } + // If the node was matched and removed, interrupt waiting. + bool node_removed = false; + if (!has_node(node_ptr, weak_groups_associated_with_executor_to_nodes_) && + !has_node(node_ptr, weak_groups_to_nodes_associated_with_executor_)) + { + node_removed_impl(node_ptr); + node_removed = true; + } + + weak_groups_to_guard_conditions_.erase(weak_group_ptr); + + return node_removed; +} + +bool +EventsExecutorEntitiesCollector::remove_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr) +{ + if (!node_ptr->get_associated_with_executor_atomic().load()) { + return false; + } + bool node_found = false; + auto node_it = weak_nodes_.begin(); + while (node_it != weak_nodes_.end()) { + bool matched = (node_it->lock() == node_ptr); + if (matched) { + weak_nodes_.erase(node_it); + node_found = true; + break; + } + ++node_it; + } + if (!node_found) { + return false; + } + std::vector found_group_ptrs; + std::for_each( + weak_groups_to_nodes_associated_with_executor_.begin(), + weak_groups_to_nodes_associated_with_executor_.end(), + [&found_group_ptrs, node_ptr](std::pair key_value_pair) { + auto & weak_node_ptr = key_value_pair.second; + auto shared_node_ptr = weak_node_ptr.lock(); + auto group_ptr = key_value_pair.first.lock(); + if (shared_node_ptr == node_ptr) { + found_group_ptrs.push_back(group_ptr); + } + }); + std::for_each( + found_group_ptrs.begin(), found_group_ptrs.end(), [this] + (rclcpp::CallbackGroup::SharedPtr group_ptr) { + this->remove_callback_group_from_map( + group_ptr, + weak_groups_to_nodes_associated_with_executor_); + }); + std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic(); + has_executor.store(false); + return true; +} + +// Returns true iff the weak_groups_to_nodes map has node_ptr as the value in any of its entry. +bool +EventsExecutorEntitiesCollector::has_node( + const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & + weak_groups_to_nodes) const +{ + return std::find_if( + weak_groups_to_nodes.begin(), + weak_groups_to_nodes.end(), + [&](const WeakCallbackGroupsToNodesMap::value_type & other) -> bool { + auto other_ptr = other.second.lock(); + return other_ptr == node_ptr; + }) != weak_groups_to_nodes.end(); +} + +void +EventsExecutorEntitiesCollector::add_callback_groups_from_nodes_associated_to_executor() +{ + for (const auto & weak_node : weak_nodes_) { + auto node = weak_node.lock(); + if (node) { + node->for_each_callback_group( + [this, node](rclcpp::CallbackGroup::SharedPtr shared_group_ptr) + { + if (shared_group_ptr->automatically_add_to_executor_with_node() && + !shared_group_ptr->get_associated_with_executor_atomic().load()) + { + add_callback_group( + shared_group_ptr, + node, + weak_groups_to_nodes_associated_with_executor_); + } + }); + } + } +} + +std::vector +EventsExecutorEntitiesCollector::get_all_callback_groups() +{ + std::vector groups; + for (const auto & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) { + groups.push_back(group_node_ptr.first); + } + for (const auto & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) { + groups.push_back(group_node_ptr.first); + } + return groups; +} + +std::vector +EventsExecutorEntitiesCollector::get_manually_added_callback_groups() +{ + std::vector groups; + for (const auto & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) { + groups.push_back(group_node_ptr.first); + } + return groups; +} + +std::vector +EventsExecutorEntitiesCollector::get_automatically_added_callback_groups_from_nodes() +{ + std::vector groups; + for (const auto & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) { + groups.push_back(group_node_ptr.first); + } + return groups; +} + +void +EventsExecutorEntitiesCollector::callback_group_added_impl( + rclcpp::CallbackGroup::SharedPtr group) +{ + std::lock_guard lock(reentrant_mutex_); + + rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group; + + auto iter = weak_groups_to_guard_conditions_.find(weak_group_ptr); + if (iter != weak_groups_to_guard_conditions_.end()) { + // Set an event callback for the group's notify guard condition, so if new entities are added + // or removed to this node we will receive an event. + set_guard_condition_callback(iter->second); + } + // For all entities in the callback group, set their event callback + set_callback_group_entities_callbacks(group); +} + +void +EventsExecutorEntitiesCollector::node_added_impl( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node) +{ + std::lock_guard lock(reentrant_mutex_); + + auto notify_guard_condition = &(node->get_notify_guard_condition()); + // Set an event callback for the node's notify guard condition, so if new entities are added + // or removed to this node we will receive an event. + set_guard_condition_callback(notify_guard_condition); + + // Store node's notify guard condition + weak_nodes_to_guard_conditions_[node] = notify_guard_condition; +} + +void +EventsExecutorEntitiesCollector::callback_group_removed_impl( + rclcpp::CallbackGroup::SharedPtr group) +{ + std::lock_guard lock(reentrant_mutex_); + // For all the entities in the group, unset their callbacks + unset_callback_group_entities_callbacks(group); +} + +void +EventsExecutorEntitiesCollector::node_removed_impl( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node) +{ + std::lock_guard lock(reentrant_mutex_); + // Node doesn't have more callback groups associated to the executor. + // Unset the event callback for the node's notify guard condition, to stop + // receiving events if entities are added or removed to this node. + unset_guard_condition_callback(&(node->get_notify_guard_condition())); + + // Remove guard condition from list + rclcpp::node_interfaces::NodeBaseInterface::WeakPtr weak_node_ptr(node); + weak_nodes_to_guard_conditions_.erase(weak_node_ptr); +} + +void +EventsExecutorEntitiesCollector::set_entities_event_callbacks_from_map( + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) +{ + for (const auto & pair : weak_groups_to_nodes) { + auto group = pair.first.lock(); + auto node = pair.second.lock(); + if (!node || !group || !group->can_be_taken_from().load()) { + continue; + } + set_callback_group_entities_callbacks(group); + } +} + +void +EventsExecutorEntitiesCollector::set_callback_group_entities_callbacks( + rclcpp::CallbackGroup::SharedPtr group) +{ + // Timers are handled by the timers manager + group->find_timer_ptrs_if( + [this](const rclcpp::TimerBase::SharedPtr & timer) { + if (timer) { + timers_manager_->add_timer(timer); + } + return false; + }); + + // Set callbacks for all other entity types + group->find_subscription_ptrs_if( + [this](const rclcpp::SubscriptionBase::SharedPtr & subscription) { + if (subscription) { + weak_subscriptions_map_.emplace(subscription.get(), subscription); + + subscription->set_on_new_message_callback( + create_entity_callback(subscription.get(), ExecutorEventType::SUBSCRIPTION_EVENT)); + } + return false; + }); + group->find_service_ptrs_if( + [this](const rclcpp::ServiceBase::SharedPtr & service) { + if (service) { + weak_services_map_.emplace(service.get(), service); + + service->set_on_new_request_callback( + create_entity_callback(service.get(), ExecutorEventType::SERVICE_EVENT)); + } + return false; + }); + group->find_client_ptrs_if( + [this](const rclcpp::ClientBase::SharedPtr & client) { + if (client) { + weak_clients_map_.emplace(client.get(), client); + + client->set_on_new_response_callback( + create_entity_callback(client.get(), ExecutorEventType::CLIENT_EVENT)); + } + return false; + }); + group->find_waitable_ptrs_if( + [this](const rclcpp::Waitable::SharedPtr & waitable) { + if (waitable) { + weak_waitables_map_.emplace(waitable.get(), waitable); + + waitable->set_on_ready_callback( + create_waitable_callback(waitable.get())); + } + return false; + }); +} + +void +EventsExecutorEntitiesCollector::unset_callback_group_entities_callbacks( + rclcpp::CallbackGroup::SharedPtr group) +{ + auto iter = weak_groups_to_guard_conditions_.find(group); + + if (iter != weak_groups_to_guard_conditions_.end()) { + unset_guard_condition_callback(iter->second); + } + + // Timers are handled by the timers manager + group->find_timer_ptrs_if( + [this](const rclcpp::TimerBase::SharedPtr & timer) { + if (timer) { + timers_manager_->remove_timer(timer); + } + return false; + }); + + // Unset callbacks for all other entity types + group->find_subscription_ptrs_if( + [this](const rclcpp::SubscriptionBase::SharedPtr & subscription) { + if (subscription) { + subscription->clear_on_new_message_callback(); + weak_subscriptions_map_.erase(subscription.get()); + } + return false; + }); + group->find_service_ptrs_if( + [this](const rclcpp::ServiceBase::SharedPtr & service) { + if (service) { + service->clear_on_new_request_callback(); + weak_services_map_.erase(service.get()); + } + return false; + }); + group->find_client_ptrs_if( + [this](const rclcpp::ClientBase::SharedPtr & client) { + if (client) { + client->clear_on_new_response_callback(); + weak_clients_map_.erase(client.get()); + } + return false; + }); + group->find_waitable_ptrs_if( + [this](const rclcpp::Waitable::SharedPtr & waitable) { + if (waitable) { + waitable->clear_on_ready_callback(); + weak_waitables_map_.erase(waitable.get()); + } + return false; + }); +} + +void +EventsExecutorEntitiesCollector::set_guard_condition_callback( + rclcpp::GuardCondition * guard_condition) +{ + auto gc_callback = [this](size_t num_events) { + // Override num events (we don't care more than a single event) + num_events = 1; + int gc_id = -1; + ExecutorEvent event = {this, gc_id, ExecutorEventType::WAITABLE_EVENT, num_events}; + associated_executor_->events_queue_->enqueue(event); + }; + + guard_condition->set_on_trigger_callback(gc_callback); +} + +void +EventsExecutorEntitiesCollector::unset_guard_condition_callback( + rclcpp::GuardCondition * guard_condition) +{ + guard_condition->set_on_trigger_callback(nullptr); +} + +rclcpp::SubscriptionBase::SharedPtr +EventsExecutorEntitiesCollector::get_subscription(const void * subscription_id) +{ + std::lock_guard lock(reentrant_mutex_); + + auto it = weak_subscriptions_map_.find(subscription_id); + + if (it != weak_subscriptions_map_.end()) { + auto subscription_weak_ptr = it->second; + auto subscription_shared_ptr = subscription_weak_ptr.lock(); + + if (subscription_shared_ptr) { + return subscription_shared_ptr; + } + + // The subscription expired, remove from map + weak_subscriptions_map_.erase(it); + } + return nullptr; +} + +rclcpp::ClientBase::SharedPtr +EventsExecutorEntitiesCollector::get_client(const void * client_id) +{ + std::lock_guard lock(reentrant_mutex_); + + auto it = weak_clients_map_.find(client_id); + + if (it != weak_clients_map_.end()) { + auto client_weak_ptr = it->second; + auto client_shared_ptr = client_weak_ptr.lock(); + + if (client_shared_ptr) { + return client_shared_ptr; + } + + // The client expired, remove from map + weak_clients_map_.erase(it); + } + return nullptr; +} + +rclcpp::ServiceBase::SharedPtr +EventsExecutorEntitiesCollector::get_service(const void * service_id) +{ + std::lock_guard lock(reentrant_mutex_); + + auto it = weak_services_map_.find(service_id); + + if (it != weak_services_map_.end()) { + auto service_weak_ptr = it->second; + auto service_shared_ptr = service_weak_ptr.lock(); + + if (service_shared_ptr) { + return service_shared_ptr; + } + + // The service expired, remove from map + weak_services_map_.erase(it); + } + return nullptr; +} + +rclcpp::Waitable::SharedPtr +EventsExecutorEntitiesCollector::get_waitable(const void * waitable_id) +{ + std::lock_guard lock(reentrant_mutex_); + + auto it = weak_waitables_map_.find(waitable_id); + + if (it != weak_waitables_map_.end()) { + auto waitable_weak_ptr = it->second; + auto waitable_shared_ptr = waitable_weak_ptr.lock(); + + if (waitable_shared_ptr) { + return waitable_shared_ptr; + } + + // The waitable expired, remove from map + weak_waitables_map_.erase(it); + } + return nullptr; +} + +void +EventsExecutorEntitiesCollector::add_waitable(rclcpp::Waitable::SharedPtr waitable) +{ + std::lock_guard lock(reentrant_mutex_); + + weak_waitables_map_.emplace(waitable.get(), waitable); + + waitable->set_on_ready_callback( + create_waitable_callback(waitable.get())); +} + +std::function +EventsExecutorEntitiesCollector::create_entity_callback( + void * exec_entity_id, ExecutorEventType event_type) +{ + std::function + callback = [this, exec_entity_id, event_type](size_t num_events) { + ExecutorEvent event = {exec_entity_id, -1, event_type, num_events}; + associated_executor_->events_queue_->enqueue(event); + }; + return callback; +} + +std::function +EventsExecutorEntitiesCollector::create_waitable_callback(void * exec_entity_id) +{ + std::function + callback = [this, exec_entity_id](size_t num_events, int gen_entity_id) { + ExecutorEvent event = + {exec_entity_id, gen_entity_id, ExecutorEventType::WAITABLE_EVENT, num_events}; + associated_executor_->events_queue_->enqueue(event); + }; + return callback; +} diff --git a/rclcpp/src/rclcpp/experimental/timers_manager.cpp b/rclcpp/src/rclcpp/experimental/timers_manager.cpp new file mode 100644 index 0000000000..c3319c5206 --- /dev/null +++ b/rclcpp/src/rclcpp/experimental/timers_manager.cpp @@ -0,0 +1,311 @@ +// Copyright 2023 iRobot Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rclcpp/experimental/timers_manager.hpp" + +#include + +#include +#include +#include +#include + +using rclcpp::experimental::TimersManager; + +TimersManager::TimersManager( + std::shared_ptr context, + std::function on_ready_callback) +{ + context_ = context; + on_ready_callback_ = on_ready_callback; +} + +TimersManager::~TimersManager() +{ + // Remove all timers + this->clear(); + + // Make sure timers thread is stopped before destroying this object + this->stop(); +} + +void TimersManager::add_timer(rclcpp::TimerBase::SharedPtr timer) +{ + if (!timer) { + throw std::invalid_argument("TimersManager::add_timer() trying to add nullptr timer"); + } + + bool added = false; + { + std::unique_lock lock(timers_mutex_); + added = weak_timers_heap_.add_timer(timer); + timers_updated_ = timers_updated_ || added; + } + + timer->set_on_reset_callback( + [this](size_t arg) { + { + (void)arg; + std::unique_lock lock(timers_mutex_); + timers_updated_ = true; + } + timers_cv_.notify_one(); + }); + + if (added) { + // Notify that a timer has been added + timers_cv_.notify_one(); + } +} + +void TimersManager::start() +{ + // Make sure that the thread is not already running + if (running_.exchange(true)) { + throw std::runtime_error("TimersManager::start() can't start timers thread as already running"); + } + + timers_thread_ = std::thread(&TimersManager::run_timers, this); +} + +void TimersManager::stop() +{ + // Lock stop() function to prevent race condition in destructor + std::unique_lock lock(stop_mutex_); + running_ = false; + + // Notify the timers manager thread to wake up + { + std::unique_lock lock(timers_mutex_); + timers_updated_ = true; + } + timers_cv_.notify_one(); + + // Join timers thread if it's running + if (timers_thread_.joinable()) { + timers_thread_.join(); + } +} + +std::chrono::nanoseconds TimersManager::get_head_timeout() +{ + // Do not allow to interfere with the thread running + if (running_) { + throw std::runtime_error( + "get_head_timeout() can't be used while timers thread is running"); + } + + std::unique_lock lock(timers_mutex_); + return this->get_head_timeout_unsafe(); +} + +size_t TimersManager::get_number_ready_timers() +{ + // Do not allow to interfere with the thread running + if (running_) { + throw std::runtime_error( + "get_number_ready_timers() can't be used while timers thread is running"); + } + + std::unique_lock lock(timers_mutex_); + TimersHeap locked_heap = weak_timers_heap_.validate_and_lock(); + return locked_heap.get_number_ready_timers(); +} + +void TimersManager::execute_ready_timers() +{ + // Do not allow to interfere with the thread running + if (running_) { + throw std::runtime_error( + "execute_ready_timers() can't be used while timers thread is running"); + } + + std::unique_lock lock(timers_mutex_); + this->execute_ready_timers_unsafe(); +} + +bool TimersManager::execute_head_timer() +{ + // Do not allow to interfere with the thread running + if (running_) { + throw std::runtime_error( + "execute_head_timer() can't be used while timers thread is running"); + } + + std::unique_lock lock(timers_mutex_); + + TimersHeap timers_heap = weak_timers_heap_.validate_and_lock(); + + // Nothing to do if we don't have any timer + if (timers_heap.empty()) { + return false; + } + + TimerPtr head_timer = timers_heap.front(); + + const bool timer_ready = head_timer->is_ready(); + if (timer_ready) { + head_timer->call(); + if (on_ready_callback_) { + on_ready_callback_(head_timer.get()); + } else { + head_timer->execute_callback(); + } + timers_heap.heapify_root(); + weak_timers_heap_.store(timers_heap); + } + + return timer_ready; +} + +void TimersManager::execute_ready_timer(const void * timer_id) +{ + TimerPtr ready_timer; + { + std::unique_lock lock(timers_mutex_); + ready_timer = weak_timers_heap_.get_timer(timer_id); + } + if (ready_timer) { + ready_timer->execute_callback(); + } +} + +std::chrono::nanoseconds TimersManager::get_head_timeout_unsafe() +{ + // If we don't have any weak pointer, then we just return maximum timeout + if (weak_timers_heap_.empty()) { + return std::chrono::nanoseconds::max(); + } + // Weak heap is not empty, so try to lock the first element. + // If it is still a valid pointer, it is guaranteed to be the correct head + TimerPtr head_timer = weak_timers_heap_.front().lock(); + + if (!head_timer) { + // The first element has expired, we can't make other assumptions on the heap + // and we need to entirely validate it. + TimersHeap locked_heap = weak_timers_heap_.validate_and_lock(); + // NOTE: the following operations will not modify any element in the heap, so we + // don't have to call `weak_timers_heap_.store(locked_heap)` at the end. + + if (locked_heap.empty()) { + return std::chrono::nanoseconds::max(); + } + head_timer = locked_heap.front(); + } + + return head_timer->time_until_trigger(); +} + +void TimersManager::execute_ready_timers_unsafe() +{ + // We start by locking the timers + TimersHeap locked_heap = weak_timers_heap_.validate_and_lock(); + + // Nothing to do if we don't have any timer + if (locked_heap.empty()) { + return; + } + + // Keep executing timers until they are ready and they were already ready when we started. + // The two checks prevent this function from blocking indefinitely if the + // time required for executing the timers is longer than their period. + + TimerPtr head_timer = locked_heap.front(); + const size_t number_ready_timers = locked_heap.get_number_ready_timers(); + size_t executed_timers = 0; + while (executed_timers < number_ready_timers && head_timer->is_ready()) { + head_timer->call(); + if (on_ready_callback_) { + on_ready_callback_(head_timer.get()); + } else { + head_timer->execute_callback(); + } + + executed_timers++; + // Executing a timer will result in updating its time_until_trigger, so re-heapify + locked_heap.heapify_root(); + // Get new head timer + head_timer = locked_heap.front(); + } + + // After having performed work on the locked heap we reflect the changes to weak one. + // Timers will be already sorted the next time we need them if none went out of scope. + weak_timers_heap_.store(locked_heap); +} + +void TimersManager::run_timers() +{ + while (rclcpp::ok(context_) && running_) { + // Lock mutex + std::unique_lock lock(timers_mutex_); + + std::chrono::nanoseconds time_to_sleep = get_head_timeout_unsafe(); + + // No need to wait if a timer is already available + if (time_to_sleep > std::chrono::nanoseconds::zero()) { + if (time_to_sleep != std::chrono::nanoseconds::max()) { + // Wait until timeout or notification that timers have been updated + timers_cv_.wait_for(lock, time_to_sleep, [this]() {return timers_updated_;}); + } else { + // Wait until notification that timers have been updated + timers_cv_.wait(lock, [this]() {return timers_updated_;}); + } + } + + // Reset timers updated flag + timers_updated_ = false; + + // Execute timers + this->execute_ready_timers_unsafe(); + } + + // Make sure the running flag is set to false when we exit from this function + // to allow restarting the timers thread. + running_ = false; +} + +void TimersManager::clear() +{ + { + // Lock mutex and then clear all data structures + std::unique_lock lock(timers_mutex_); + + TimersHeap locked_heap = weak_timers_heap_.validate_and_lock(); + locked_heap.clear_callbacks(); + + weak_timers_heap_.clear(); + + timers_updated_ = true; + } + + // Notify timers thread such that it can re-compute its timeout + timers_cv_.notify_one(); +} + +void TimersManager::remove_timer(TimerPtr timer) +{ + bool removed = false; + { + std::unique_lock lock(timers_mutex_); + removed = weak_timers_heap_.remove_timer(timer); + + timers_updated_ = timers_updated_ || removed; + } + + if (removed) { + // Notify timers thread such that it can re-compute its timeout + timers_cv_.notify_one(); + timer->clear_on_reset_callback(); + } +} diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index d4da759c02..1ef45d2926 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -618,6 +618,14 @@ if(TARGET test_timer) target_link_libraries(test_timer ${PROJECT_NAME} mimick) endif() +ament_add_gtest(test_timers_manager test_timers_manager.cpp + APPEND_LIBRARY_DIRS "${append_library_dirs}") +if(TARGET test_timers_manager) + ament_target_dependencies(test_timers_manager + "rcl") + target_link_libraries(test_timers_manager ${PROJECT_NAME} mimick) +endif() + ament_add_gtest(test_time_source test_time_source.cpp APPEND_LIBRARY_DIRS "${append_library_dirs}") if(TARGET test_time_source) @@ -688,6 +696,22 @@ if(TARGET test_static_executor_entities_collector) target_link_libraries(test_static_executor_entities_collector ${PROJECT_NAME} mimick) endif() +ament_add_gtest(test_events_executor executors/test_events_executor.cpp + APPEND_LIBRARY_DIRS "${append_library_dirs}") +if(TARGET test_events_executor) + ament_target_dependencies(test_events_executor + "test_msgs") + target_link_libraries(test_events_executor ${PROJECT_NAME}) +endif() + +ament_add_gtest(test_events_queue executors/test_events_queue.cpp + APPEND_LIBRARY_DIRS "${append_library_dirs}") +if(TARGET test_events_queue) + ament_target_dependencies(test_events_queue + "test_msgs") + target_link_libraries(test_events_queue ${PROJECT_NAME}) +endif() + ament_add_gtest(test_guard_condition test_guard_condition.cpp APPEND_LIBRARY_DIRS "${append_library_dirs}") if(TARGET test_guard_condition) diff --git a/rclcpp/test/rclcpp/executors/test_events_executor.cpp b/rclcpp/test/rclcpp/executors/test_events_executor.cpp new file mode 100644 index 0000000000..38ded069ac --- /dev/null +++ b/rclcpp/test/rclcpp/executors/test_events_executor.cpp @@ -0,0 +1,499 @@ +// Copyright 2023 iRobot Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include + +#include "rclcpp/experimental/executors/events_executor/events_executor.hpp" + +#include "test_msgs/srv/empty.hpp" +#include "test_msgs/msg/empty.hpp" + +using namespace std::chrono_literals; + +using rclcpp::experimental::executors::EventsExecutor; + +class TestEventsExecutor : public ::testing::Test +{ +public: + void SetUp() + { + rclcpp::init(0, nullptr); + } + + void TearDown() + { + rclcpp::shutdown(); + } +}; + +TEST_F(TestEventsExecutor, run_clients_servers) +{ + // rmw_connextdds doesn't support events-executor + if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) { + GTEST_SKIP(); + } + + auto node = std::make_shared("node"); + + bool request_received = false; + bool response_received = false; + auto service = + node->create_service( + "service", + [&request_received]( + const test_msgs::srv::Empty::Request::SharedPtr, + test_msgs::srv::Empty::Response::SharedPtr) + { + request_received = true; + }); + auto client = node->create_client("service"); + + EventsExecutor executor; + executor.add_node(node); + + bool spin_exited = false; + std::thread spinner([&spin_exited, &executor, this]() { + executor.spin(); + spin_exited = true; + }); + + auto request = std::make_shared(); + client->async_send_request( + request, + [&response_received](rclcpp::Client::SharedFuture result_future) { + (void)result_future; + response_received = true; + }); + + // Wait some time for the client-server to be invoked + auto start = std::chrono::steady_clock::now(); + while ( + !response_received && + !spin_exited && + (std::chrono::steady_clock::now() - start < 1s)) + { + std::this_thread::sleep_for(5ms); + } + + executor.cancel(); + spinner.join(); + executor.remove_node(node); + + EXPECT_TRUE(request_received); + EXPECT_TRUE(response_received); + EXPECT_TRUE(spin_exited); +} + +TEST_F(TestEventsExecutor, spin_once_max_duration_timeout) +{ + // rmw_connextdds doesn't support events-executor + if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) { + GTEST_SKIP(); + } + + auto node = std::make_shared("node"); + + EventsExecutor executor; + executor.add_node(node); + + // Consume previous events so we have a fresh start + executor.spin_all(1s); + + size_t t_runs = 0; + auto t = node->create_wall_timer( + 10s, + [&]() { + t_runs++; + }); + + // This first spin_once takes care of the waitable event + // generated by the addition of the timer to the node + executor.spin_once(1s); + EXPECT_EQ(0u, t_runs); + + auto start = std::chrono::steady_clock::now(); + + // This second spin_once should take care of the timer, + executor.spin_once(10ms); + + // but doesn't spin the time enough to call the timer callback. + EXPECT_EQ(0u, t_runs); + EXPECT_TRUE(std::chrono::steady_clock::now() - start < 200ms); +} + +// FIX THIS TEST! The entities collector is being called too many times! +/* +TEST_F(TestEventsExecutor, spin_once_max_duration_timer) +{ + // rmw_connextdds doesn't support events-executor + if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) { + GTEST_SKIP(); + } + + auto node = std::make_shared("node"); + + EventsExecutor executor; + executor.add_node(node); + + // Consume previous events so we have a fresh start + executor.spin_all(1s); + + size_t t_runs = 0; + auto t = node->create_wall_timer( + 10ms, + [&]() { + t_runs++; + }); + + // This first spin_once takes care of the waitable event + // generated by the addition of the timer to the node + executor.spin_once(1s); + EXPECT_EQ(0u, t_runs); + + auto start = std::chrono::steady_clock::now(); + + // This second spin_once should take care of the timer + executor.spin_once(11ms); + + EXPECT_EQ(1u, t_runs); + EXPECT_TRUE(std::chrono::steady_clock::now() - start < 200ms); +} +*/ + +TEST_F(TestEventsExecutor, spin_some_max_duration) +{ + // rmw_connextdds doesn't support events-executor + if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) { + GTEST_SKIP(); + } + + { + auto node = std::make_shared("node"); + + size_t t_runs = 0; + auto t = node->create_wall_timer( + 10s, + [&]() { + t_runs++; + }); + + EventsExecutor executor; + executor.add_node(node); + + auto start = std::chrono::steady_clock::now(); + executor.spin_some(10ms); + + EXPECT_EQ(0u, t_runs); + EXPECT_TRUE(std::chrono::steady_clock::now() - start < 200ms); + } + + { + auto node = std::make_shared("node"); + + size_t t_runs = 0; + auto t = node->create_wall_timer( + 10ms, + [&]() { + t_runs++; + }); + + // Sleep some time for the timer to be ready when spin + std::this_thread::sleep_for(10ms); + + EventsExecutor executor; + executor.add_node(node); + + auto start = std::chrono::steady_clock::now(); + executor.spin_some(10s); + + EXPECT_EQ(1u, t_runs); + EXPECT_TRUE(std::chrono::steady_clock::now() - start < 200ms); + } +} + +TEST_F(TestEventsExecutor, spin_some_zero_duration) +{ + // rmw_connextdds doesn't support events-executor + if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) { + GTEST_SKIP(); + } + + auto node = std::make_shared("node"); + + size_t t_runs = 0; + auto t = node->create_wall_timer( + 20ms, + [&]() { + t_runs++; + }); + + // Sleep some time for the timer to be ready when spin + std::this_thread::sleep_for(20ms); + + EventsExecutor executor; + executor.add_node(node); + executor.spin_some(0ms); + + EXPECT_EQ(1u, t_runs); +} + +TEST_F(TestEventsExecutor, spin_all_max_duration) +{ + // rmw_connextdds doesn't support events-executor + if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) { + GTEST_SKIP(); + } + + { + auto node = std::make_shared("node"); + + size_t t_runs = 0; + auto t = node->create_wall_timer( + 10s, + [&]() { + t_runs++; + }); + + EventsExecutor executor; + executor.add_node(node); + + auto start = std::chrono::steady_clock::now(); + executor.spin_all(10ms); + + EXPECT_EQ(0u, t_runs); + EXPECT_TRUE(std::chrono::steady_clock::now() - start < 200ms); + } + + { + auto node = std::make_shared("node"); + + size_t t_runs = 0; + auto t = node->create_wall_timer( + 10ms, + [&]() { + t_runs++; + }); + + // Sleep some time for the timer to be ready when spin + std::this_thread::sleep_for(10ms); + + EventsExecutor executor; + executor.add_node(node); + + auto start = std::chrono::steady_clock::now(); + executor.spin_all(10s); + + EXPECT_EQ(1u, t_runs); + EXPECT_TRUE(std::chrono::steady_clock::now() - start < 200ms); + } + + EventsExecutor executor; + EXPECT_THROW(executor.spin_all(0ms), std::invalid_argument); + EXPECT_THROW(executor.spin_all(-5ms), std::invalid_argument); +} + +TEST_F(TestEventsExecutor, cancel_while_timers_running) +{ + // rmw_connextdds doesn't support events-executor + if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) { + GTEST_SKIP(); + } + + auto node = std::make_shared("node"); + + EventsExecutor executor; + executor.add_node(node); + + // Take care of previous events for a fresh start + executor.spin_all(1s); + + size_t t1_runs = 0; + auto t1 = node->create_wall_timer( + 1ms, + [&]() { + t1_runs++; + std::this_thread::sleep_for(50ms); + }); + + size_t t2_runs = 0; + auto t2 = node->create_wall_timer( + 1ms, + [&]() { + t2_runs++; + std::this_thread::sleep_for(50ms); + }); + + + std::thread spinner([&executor, this]() {executor.spin();}); + + std::this_thread::sleep_for(10ms); + // Call cancel while t1 callback is still being executed + executor.cancel(); + spinner.join(); + + // Depending on the latency on the system, t2 may start to execute before cancel is signaled + EXPECT_GE(1u, t1_runs); + EXPECT_GE(1u, t2_runs); +} + +TEST_F(TestEventsExecutor, cancel_while_timers_waiting) +{ + // rmw_connextdds doesn't support events-executor + if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) { + GTEST_SKIP(); + } + + auto node = std::make_shared("node"); + + size_t t1_runs = 0; + auto t1 = node->create_wall_timer( + 100s, + [&]() { + t1_runs++; + }); + + EventsExecutor executor; + executor.add_node(node); + + auto start = std::chrono::steady_clock::now(); + std::thread spinner([&executor, this]() {executor.spin();}); + + std::this_thread::sleep_for(10ms); + executor.cancel(); + spinner.join(); + + EXPECT_EQ(0u, t1_runs); + EXPECT_TRUE(std::chrono::steady_clock::now() - start < 1s); +} + +TEST_F(TestEventsExecutor, destroy_entities) +{ + // rmw_connextdds doesn't support events-executor + if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) { + GTEST_SKIP(); + } + + // Create a publisher node and start publishing messages + auto node_pub = std::make_shared("node_pub"); + auto publisher = node_pub->create_publisher("topic", rclcpp::QoS(10)); + auto timer = node_pub->create_wall_timer( + 2ms, [&]() {publisher->publish(std::make_unique());}); + EventsExecutor executor_pub; + executor_pub.add_node(node_pub); + std::thread spinner([&executor_pub, this]() {executor_pub.spin();}); + + // Create a node with two different subscriptions to the topic + auto node_sub = std::make_shared("node_sub"); + size_t callback_count_1 = 0; + auto subscription_1 = + node_sub->create_subscription( + "topic", rclcpp::QoS(10), [&](test_msgs::msg::Empty::ConstSharedPtr) {callback_count_1++;}); + size_t callback_count_2 = 0; + auto subscription_2 = + node_sub->create_subscription( + "topic", rclcpp::QoS(10), [&](test_msgs::msg::Empty::ConstSharedPtr) {callback_count_2++;}); + EventsExecutor executor_sub; + executor_sub.add_node(node_sub); + + // Wait some time while messages are published + std::this_thread::sleep_for(10ms); + + // Destroy one of the two subscriptions + subscription_1.reset(); + + // Let subscriptions executor spin + executor_sub.spin_some(10ms); + + // The callback count of the destroyed subscription remained at 0 + EXPECT_EQ(0u, callback_count_1); + EXPECT_LT(0u, callback_count_2); + + executor_pub.cancel(); + spinner.join(); +} + +/* + Testing construction of a subscriptions with QoS event callback functions. + */ +std::string * g_pub_log_msg; +std::string * g_sub_log_msg; +std::promise * g_log_msgs_promise; +TEST_F(TestEventsExecutor, test_default_incompatible_qos_callbacks) +{ + // rmw_connextdds doesn't support events-executor + if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) { + GTEST_SKIP(); + } + + auto node = std::make_shared("node"); + rcutils_logging_output_handler_t original_output_handler = rcutils_logging_get_output_handler(); + + std::string pub_log_msg; + std::string sub_log_msg; + std::promise log_msgs_promise; + g_pub_log_msg = &pub_log_msg; + g_sub_log_msg = &sub_log_msg; + g_log_msgs_promise = &log_msgs_promise; + auto logger_callback = []( + const rcutils_log_location_t * /*location*/, + int /*level*/, const char * /*name*/, rcutils_time_point_value_t /*timestamp*/, + const char * format, va_list * args) -> void { + char buffer[1024]; + vsnprintf(buffer, sizeof(buffer), format, *args); + const std::string msg = buffer; + if (msg.rfind("New subscription discovered on topic '/test_topic'", 0) == 0) { + *g_pub_log_msg = buffer; + } else if (msg.rfind("New publisher discovered on topic '/test_topic'", 0) == 0) { + *g_sub_log_msg = buffer; + } + + if (!g_pub_log_msg->empty() && !g_sub_log_msg->empty()) { + g_log_msgs_promise->set_value(); + } + }; + rcutils_logging_set_output_handler(logger_callback); + + std::shared_future log_msgs_future = log_msgs_promise.get_future(); + + rclcpp::QoS qos_profile_publisher(10); + qos_profile_publisher.durability(RMW_QOS_POLICY_DURABILITY_VOLATILE); + auto publisher = node->create_publisher( + "test_topic", qos_profile_publisher); + + rclcpp::QoS qos_profile_subscription(10); + qos_profile_subscription.durability(RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL); + auto subscription = node->create_subscription( + "test_topic", qos_profile_subscription, [&](test_msgs::msg::Empty::ConstSharedPtr) {}); + + EventsExecutor ex; + ex.add_node(node->get_node_base_interface()); + + const auto timeout = std::chrono::seconds(10); + ex.spin_until_future_complete(log_msgs_future, timeout); + + EXPECT_EQ( + "New subscription discovered on topic '/test_topic', requesting incompatible QoS. " + "No messages will be sent to it. Last incompatible policy: DURABILITY_QOS_POLICY", + pub_log_msg); + EXPECT_EQ( + "New publisher discovered on topic '/test_topic', offering incompatible QoS. " + "No messages will be sent to it. Last incompatible policy: DURABILITY_QOS_POLICY", + sub_log_msg); + + rcutils_logging_set_output_handler(original_output_handler); +} diff --git a/rclcpp/test/rclcpp/executors/test_events_queue.cpp b/rclcpp/test/rclcpp/executors/test_events_queue.cpp new file mode 100644 index 0000000000..0bacd36d00 --- /dev/null +++ b/rclcpp/test/rclcpp/executors/test_events_queue.cpp @@ -0,0 +1,82 @@ +// Copyright 2023 iRobot Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include + +#include "rclcpp/experimental/executors/events_executor/events_executor_event_types.hpp" +#include "rclcpp/experimental/executors/events_executor/simple_events_queue.hpp" + +using namespace std::chrono_literals; + +TEST(TestEventsQueue, SimpleQueueTest) +{ + // Create a SimpleEventsQueue and a local queue + auto simple_queue = std::make_unique(); + rclcpp::experimental::executors::ExecutorEvent event {}; + bool ret = false; + + // Make sure the queue is empty at startup + EXPECT_TRUE(simple_queue->empty()); + EXPECT_EQ(simple_queue->size(), 0u); + + // Push 11 messages + for (uint32_t i = 1; i < 11; i++) { + rclcpp::experimental::executors::ExecutorEvent stub_event {}; + stub_event.num_events = 1; + simple_queue->enqueue(stub_event); + + EXPECT_FALSE(simple_queue->empty()); + EXPECT_EQ(simple_queue->size(), i); + } + + // Pop one message + ret = simple_queue->dequeue(event); + EXPECT_TRUE(ret); + EXPECT_FALSE(simple_queue->empty()); + EXPECT_EQ(simple_queue->size(), 9u); + + // Pop one message + ret = simple_queue->dequeue(event, std::chrono::nanoseconds(0)); + EXPECT_TRUE(ret); + EXPECT_FALSE(simple_queue->empty()); + EXPECT_EQ(simple_queue->size(), 8u); + + while (!simple_queue->empty()) { + ret = simple_queue->dequeue(event); + EXPECT_TRUE(ret); + } + + EXPECT_TRUE(simple_queue->empty()); + EXPECT_EQ(simple_queue->size(), 0u); + + ret = simple_queue->dequeue(event, std::chrono::nanoseconds(0)); + EXPECT_FALSE(ret); + + // Lets push an event into the queue and get it back + rclcpp::experimental::executors::ExecutorEvent push_event = { + simple_queue.get(), + 99, + rclcpp::experimental::executors::ExecutorEventType::SUBSCRIPTION_EVENT, + 1}; + + simple_queue->enqueue(push_event); + ret = simple_queue->dequeue(event); + EXPECT_TRUE(ret); + EXPECT_EQ(push_event.exec_entity_id, event.exec_entity_id); + EXPECT_EQ(push_event.gen_entity_id, event.gen_entity_id); + EXPECT_EQ(push_event.type, event.type); + EXPECT_EQ(push_event.num_events, event.num_events); +} diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index 1fa2cbb4dd..1f063d10f7 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -92,7 +92,8 @@ using ExecutorTypes = ::testing::Types< rclcpp::executors::SingleThreadedExecutor, rclcpp::executors::MultiThreadedExecutor, - rclcpp::executors::StaticSingleThreadedExecutor>; + rclcpp::executors::StaticSingleThreadedExecutor, + rclcpp::experimental::executors::EventsExecutor>; class ExecutorTypeNames { @@ -113,6 +114,10 @@ class ExecutorTypeNames return "StaticSingleThreadedExecutor"; } + if (std::is_same()) { + return "EventsExecutor"; + } + return ""; } }; @@ -126,12 +131,22 @@ TYPED_TEST_SUITE(TestExecutors, ExecutorTypes, ExecutorTypeNames); using StandardExecutors = ::testing::Types< rclcpp::executors::SingleThreadedExecutor, - rclcpp::executors::MultiThreadedExecutor>; + rclcpp::executors::MultiThreadedExecutor, + rclcpp::experimental::executors::EventsExecutor>; TYPED_TEST_SUITE(TestExecutorsStable, StandardExecutors, ExecutorTypeNames); // Make sure that executors detach from nodes when destructing -TYPED_TEST(TestExecutors, detachOnDestruction) { +TYPED_TEST(TestExecutors, detachOnDestruction) +{ using ExecutorType = TypeParam; + // rmw_connextdds doesn't support events-executor + if ( + std::is_same() && + std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) + { + GTEST_SKIP(); + } + { ExecutorType executor; executor.add_node(this->node); @@ -145,8 +160,17 @@ TYPED_TEST(TestExecutors, detachOnDestruction) { // Make sure that the executor can automatically remove expired nodes correctly // Currently fails for StaticSingleThreadedExecutor so it is being skipped, see: // https://github.com/ros2/rclcpp/issues/1231 -TYPED_TEST(TestExecutorsStable, addTemporaryNode) { +TYPED_TEST(TestExecutorsStable, addTemporaryNode) +{ using ExecutorType = TypeParam; + // rmw_connextdds doesn't support events-executor + if ( + std::is_same() && + std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) + { + GTEST_SKIP(); + } + ExecutorType executor; { @@ -164,8 +188,17 @@ TYPED_TEST(TestExecutorsStable, addTemporaryNode) { } // Check executor throws properly if the same node is added a second time -TYPED_TEST(TestExecutors, addNodeTwoExecutors) { +TYPED_TEST(TestExecutors, addNodeTwoExecutors) +{ using ExecutorType = TypeParam; + // rmw_connextdds doesn't support events-executor + if ( + std::is_same() && + std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) + { + GTEST_SKIP(); + } + ExecutorType executor1; ExecutorType executor2; EXPECT_NO_THROW(executor1.add_node(this->node)); @@ -174,8 +207,17 @@ TYPED_TEST(TestExecutors, addNodeTwoExecutors) { } // Check simple spin example -TYPED_TEST(TestExecutors, spinWithTimer) { +TYPED_TEST(TestExecutors, spinWithTimer) +{ using ExecutorType = TypeParam; + // rmw_connextdds doesn't support events-executor + if ( + std::is_same() && + std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) + { + GTEST_SKIP(); + } + ExecutorType executor; bool timer_completed = false; @@ -196,8 +238,17 @@ TYPED_TEST(TestExecutors, spinWithTimer) { executor.remove_node(this->node, true); } -TYPED_TEST(TestExecutors, spinWhileAlreadySpinning) { +TYPED_TEST(TestExecutors, spinWhileAlreadySpinning) +{ using ExecutorType = TypeParam; + // rmw_connextdds doesn't support events-executor + if ( + std::is_same() && + std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) + { + GTEST_SKIP(); + } + ExecutorType executor; executor.add_node(this->node); @@ -222,8 +273,17 @@ TYPED_TEST(TestExecutors, spinWhileAlreadySpinning) { } // Check executor exits immediately if future is complete. -TYPED_TEST(TestExecutors, testSpinUntilFutureComplete) { +TYPED_TEST(TestExecutors, testSpinUntilFutureComplete) +{ using ExecutorType = TypeParam; + // rmw_connextdds doesn't support events-executor + if ( + std::is_same() && + std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) + { + GTEST_SKIP(); + } + ExecutorType executor; executor.add_node(this->node); @@ -244,8 +304,17 @@ TYPED_TEST(TestExecutors, testSpinUntilFutureComplete) { } // Same test, but uses a shared future. -TYPED_TEST(TestExecutors, testSpinUntilSharedFutureComplete) { +TYPED_TEST(TestExecutors, testSpinUntilSharedFutureComplete) +{ using ExecutorType = TypeParam; + // rmw_connextdds doesn't support events-executor + if ( + std::is_same() && + std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) + { + GTEST_SKIP(); + } + ExecutorType executor; executor.add_node(this->node); @@ -267,8 +336,17 @@ TYPED_TEST(TestExecutors, testSpinUntilSharedFutureComplete) { } // For a longer running future that should require several iterations of spin_once -TYPED_TEST(TestExecutors, testSpinUntilFutureCompleteNoTimeout) { +TYPED_TEST(TestExecutors, testSpinUntilFutureCompleteNoTimeout) +{ using ExecutorType = TypeParam; + // rmw_connextdds doesn't support events-executor + if ( + std::is_same() && + std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) + { + GTEST_SKIP(); + } + ExecutorType executor; executor.add_node(this->node); @@ -313,8 +391,17 @@ TYPED_TEST(TestExecutors, testSpinUntilFutureCompleteNoTimeout) { } // Check spin_until_future_complete timeout works as expected -TYPED_TEST(TestExecutors, testSpinUntilFutureCompleteWithTimeout) { +TYPED_TEST(TestExecutors, testSpinUntilFutureCompleteWithTimeout) +{ using ExecutorType = TypeParam; + // rmw_connextdds doesn't support events-executor + if ( + std::is_same() && + std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) + { + GTEST_SKIP(); + } + ExecutorType executor; executor.add_node(this->node); @@ -380,6 +467,13 @@ class TestWaitable : public rclcpp::Waitable return nullptr; } + std::shared_ptr + take_data_by_entity_id(size_t id) override + { + (void) id; + return nullptr; + } + void execute(std::shared_ptr & data) override { @@ -388,6 +482,21 @@ class TestWaitable : public rclcpp::Waitable std::this_thread::sleep_for(3ms); } + void + set_on_ready_callback(std::function callback) override + { + auto gc_callback = [callback](size_t count) { + callback(count, 0); + }; + gc_.set_on_trigger_callback(gc_callback); + } + + void + clear_on_ready_callback() override + { + gc_.set_on_trigger_callback(nullptr); + } + size_t get_number_of_ready_guard_conditions() override {return 1;} @@ -402,8 +511,17 @@ class TestWaitable : public rclcpp::Waitable rclcpp::GuardCondition gc_; }; -TYPED_TEST(TestExecutors, spinAll) { +TYPED_TEST(TestExecutors, spinAll) +{ using ExecutorType = TypeParam; + // rmw_connextdds doesn't support events-executor + if ( + std::is_same() && + std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) + { + GTEST_SKIP(); + } + ExecutorType executor; auto waitable_interfaces = this->node->get_node_waitables_interface(); auto my_waitable = std::make_shared(); @@ -443,8 +561,17 @@ TYPED_TEST(TestExecutors, spinAll) { spinner.join(); } -TYPED_TEST(TestExecutors, spinSome) { +TYPED_TEST(TestExecutors, spinSome) +{ using ExecutorType = TypeParam; + // rmw_connextdds doesn't support events-executor + if ( + std::is_same() && + std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) + { + GTEST_SKIP(); + } + ExecutorType executor; auto waitable_interfaces = this->node->get_node_waitables_interface(); auto my_waitable = std::make_shared(); @@ -483,8 +610,17 @@ TYPED_TEST(TestExecutors, spinSome) { } // Check spin_node_until_future_complete with node base pointer -TYPED_TEST(TestExecutors, testSpinNodeUntilFutureCompleteNodeBasePtr) { +TYPED_TEST(TestExecutors, testSpinNodeUntilFutureCompleteNodeBasePtr) +{ using ExecutorType = TypeParam; + // rmw_connextdds doesn't support events-executor + if ( + std::is_same() && + std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) + { + GTEST_SKIP(); + } + ExecutorType executor; std::promise promise; @@ -498,8 +634,17 @@ TYPED_TEST(TestExecutors, testSpinNodeUntilFutureCompleteNodeBasePtr) { } // Check spin_node_until_future_complete with node pointer -TYPED_TEST(TestExecutors, testSpinNodeUntilFutureCompleteNodePtr) { +TYPED_TEST(TestExecutors, testSpinNodeUntilFutureCompleteNodePtr) +{ using ExecutorType = TypeParam; + // rmw_connextdds doesn't support events-executor + if ( + std::is_same() && + std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) + { + GTEST_SKIP(); + } + ExecutorType executor; std::promise promise; @@ -513,8 +658,17 @@ TYPED_TEST(TestExecutors, testSpinNodeUntilFutureCompleteNodePtr) { } // Check spin_until_future_complete can be properly interrupted. -TYPED_TEST(TestExecutors, testSpinUntilFutureCompleteInterrupted) { +TYPED_TEST(TestExecutors, testSpinUntilFutureCompleteInterrupted) +{ using ExecutorType = TypeParam; + // rmw_connextdds doesn't support events-executor + if ( + std::is_same() && + std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) + { + GTEST_SKIP(); + } + ExecutorType executor; executor.add_node(this->node); @@ -556,7 +710,8 @@ TYPED_TEST(TestExecutors, testSpinUntilFutureCompleteInterrupted) { } // Check spin_until_future_complete with node base pointer (instantiates its own executor) -TEST(TestExecutors, testSpinUntilFutureCompleteNodeBasePtr) { +TEST(TestExecutors, testSpinUntilFutureCompleteNodeBasePtr) +{ rclcpp::init(0, nullptr); { @@ -576,7 +731,8 @@ TEST(TestExecutors, testSpinUntilFutureCompleteNodeBasePtr) { } // Check spin_until_future_complete with node pointer (instantiates its own executor) -TEST(TestExecutors, testSpinUntilFutureCompleteNodePtr) { +TEST(TestExecutors, testSpinUntilFutureCompleteNodePtr) +{ rclcpp::init(0, nullptr); { diff --git a/rclcpp/test/rclcpp/test_add_callback_groups_to_executor.cpp b/rclcpp/test/rclcpp/test_add_callback_groups_to_executor.cpp index 02fa0b7a94..49131638db 100644 --- a/rclcpp/test/rclcpp/test_add_callback_groups_to_executor.cpp +++ b/rclcpp/test/rclcpp/test_add_callback_groups_to_executor.cpp @@ -46,11 +46,15 @@ class TestAddCallbackGroupsToExecutor : public ::testing::Test } }; +template +class TestAddCallbackGroupsToExecutorStable : public TestAddCallbackGroupsToExecutor {}; + using ExecutorTypes = ::testing::Types< rclcpp::executors::SingleThreadedExecutor, rclcpp::executors::MultiThreadedExecutor, - rclcpp::executors::StaticSingleThreadedExecutor>; + rclcpp::executors::StaticSingleThreadedExecutor, + rclcpp::experimental::executors::EventsExecutor>; class ExecutorTypeNames { @@ -71,17 +75,38 @@ class ExecutorTypeNames return "StaticSingleThreadedExecutor"; } + if (std::is_same()) { + return "EventsExecutor"; + } + return ""; } }; TYPED_TEST_SUITE(TestAddCallbackGroupsToExecutor, ExecutorTypes, ExecutorTypeNames); +// StaticSingleThreadedExecutor is not included in these tests for now +using StandardExecutors = + ::testing::Types< + rclcpp::executors::SingleThreadedExecutor, + rclcpp::executors::MultiThreadedExecutor, + rclcpp::experimental::executors::EventsExecutor>; +TYPED_TEST_SUITE(TestAddCallbackGroupsToExecutorStable, StandardExecutors, ExecutorTypeNames); + /* * Test adding callback groups. */ -TYPED_TEST(TestAddCallbackGroupsToExecutor, add_callback_groups) { +TYPED_TEST(TestAddCallbackGroupsToExecutor, add_callback_groups) +{ using ExecutorType = TypeParam; + // rmw_connextdds doesn't support events-executor + if ( + std::is_same() && + std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) + { + GTEST_SKIP(); + } + auto node = std::make_shared("my_node", "/ns"); auto timer_callback = []() {}; rclcpp::CallbackGroup::SharedPtr cb_grp = node->create_callback_group( @@ -127,8 +152,17 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, add_callback_groups) { /* * Test removing callback groups. */ -TYPED_TEST(TestAddCallbackGroupsToExecutor, remove_callback_groups) { +TYPED_TEST(TestAddCallbackGroupsToExecutor, remove_callback_groups) +{ using ExecutorType = TypeParam; + // rmw_connextdds doesn't support events-executor + if ( + std::is_same() && + std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) + { + GTEST_SKIP(); + } + auto node = std::make_shared("my_node", "/ns"); auto timer_callback = []() {}; rclcpp::CallbackGroup::SharedPtr cb_grp = node->create_callback_group( @@ -158,7 +192,16 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, remove_callback_groups) { */ TYPED_TEST(TestAddCallbackGroupsToExecutor, add_duplicate_callback_groups) { - rclcpp::executors::MultiThreadedExecutor executor; + using ExecutorType = TypeParam; + // rmw_connextdds doesn't support events-executor + if ( + std::is_same() && + std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) + { + GTEST_SKIP(); + } + + ExecutorType executor; auto node = std::make_shared("my_node", "/ns"); auto timer_callback = []() {}; rclcpp::CallbackGroup::SharedPtr cb_grp = node->create_callback_group( @@ -176,7 +219,16 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, add_duplicate_callback_groups) */ TYPED_TEST(TestAddCallbackGroupsToExecutor, add_callback_groups_after_add_node_to_executor) { - rclcpp::executors::MultiThreadedExecutor executor; + using ExecutorType = TypeParam; + // rmw_connextdds doesn't support events-executor + if ( + std::is_same() && + std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) + { + GTEST_SKIP(); + } + + ExecutorType executor; auto node = std::make_shared("my_node", "/ns"); executor.add_node(node->get_node_base_interface()); ASSERT_EQ(executor.get_all_callback_groups().size(), 1u); @@ -210,13 +262,22 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, add_callback_groups_after_add_node_t */ TYPED_TEST(TestAddCallbackGroupsToExecutor, add_unallowable_callback_groups) { + using ExecutorType = TypeParam; + // rmw_connextdds doesn't support events-executor + if ( + std::is_same() && + std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) + { + GTEST_SKIP(); + } + + ExecutorType executor; auto node = std::make_shared("my_node", "/ns"); auto timer_callback = []() {}; rclcpp::CallbackGroup::SharedPtr cb_grp = node->create_callback_group( rclcpp::CallbackGroupType::MutuallyExclusive, false); rclcpp::TimerBase::SharedPtr timer_ = node->create_wall_timer( 2s, timer_callback, cb_grp); - rclcpp::executors::MultiThreadedExecutor executor; executor.add_callback_group(cb_grp, node->get_node_base_interface()); ASSERT_EQ(executor.get_all_callback_groups().size(), 1u); @@ -245,14 +306,23 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, add_unallowable_callback_groups) */ TYPED_TEST(TestAddCallbackGroupsToExecutor, one_node_many_callback_groups_many_executors) { + using ExecutorType = TypeParam; + // rmw_connextdds doesn't support events-executor + if ( + std::is_same() && + std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) + { + GTEST_SKIP(); + } + + ExecutorType timer_executor; + ExecutorType sub_executor; auto node = std::make_shared("my_node", "/ns"); auto timer_callback = []() {}; rclcpp::CallbackGroup::SharedPtr cb_grp = node->create_callback_group( rclcpp::CallbackGroupType::MutuallyExclusive, false); rclcpp::TimerBase::SharedPtr timer_ = node->create_wall_timer( 2s, timer_callback, cb_grp); - rclcpp::executors::MultiThreadedExecutor timer_executor; - rclcpp::executors::MultiThreadedExecutor sub_executor; timer_executor.add_callback_group(cb_grp, node->get_node_base_interface()); const rclcpp::QoS qos(10); auto options = rclcpp::SubscriptionOptions(); @@ -282,14 +352,23 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, one_node_many_callback_groups_many_e * because the executor can't be triggered while a subscriber created, see * https://github.com/ros2/rclcpp/issues/1611 */ -TYPED_TEST(TestAddCallbackGroupsToExecutor, subscriber_triggered_to_receive_message) +TYPED_TEST(TestAddCallbackGroupsToExecutorStable, subscriber_triggered_to_receive_message) { + using ExecutorType = TypeParam; + // rmw_connextdds doesn't support events-executor + if ( + std::is_same() && + std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) + { + GTEST_SKIP(); + } + auto node = std::make_shared("my_node", "/ns"); // create a thread running an executor with a new callback group for a coming subscriber rclcpp::CallbackGroup::SharedPtr cb_grp = node->create_callback_group( rclcpp::CallbackGroupType::MutuallyExclusive, false); - rclcpp::executors::SingleThreadedExecutor cb_grp_executor; + ExecutorType cb_grp_executor; std::promise received_message_promise; auto received_message_future = received_message_promise.get_future(); @@ -329,7 +408,7 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, subscriber_triggered_to_receive_mess timer_promise.set_value(); }; - rclcpp::executors::SingleThreadedExecutor timer_executor; + ExecutorType timer_executor; timer = node->create_wall_timer(100ms, timer_callback); timer_executor.add_node(node); auto future = timer_promise.get_future(); @@ -346,8 +425,17 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, subscriber_triggered_to_receive_mess * because the executor can't be triggered while a subscriber created, see * https://github.com/ros2/rclcpp/issues/2067 */ -TYPED_TEST(TestAddCallbackGroupsToExecutor, callback_group_create_after_spin) +TYPED_TEST(TestAddCallbackGroupsToExecutorStable, callback_group_create_after_spin) { + using ExecutorType = TypeParam; + // rmw_connextdds doesn't support events-executor + if ( + std::is_same() && + std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) + { + GTEST_SKIP(); + } + auto node = std::make_shared("my_node", "/ns"); // create a publisher to send data @@ -357,7 +445,7 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, callback_group_create_after_spin) publisher->publish(test_msgs::msg::Empty()); // create a thread running an executor - rclcpp::executors::SingleThreadedExecutor executor; + ExecutorType executor; executor.add_node(node); std::promise received_message_promise; auto received_message_future = received_message_promise.get_future(); @@ -392,7 +480,16 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, callback_group_create_after_spin) */ TYPED_TEST(TestAddCallbackGroupsToExecutor, remove_callback_group) { - rclcpp::executors::MultiThreadedExecutor executor; + using ExecutorType = TypeParam; + // rmw_connextdds doesn't support events-executor + if ( + std::is_same() && + std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) + { + GTEST_SKIP(); + } + + ExecutorType executor; auto node = std::make_shared("my_node", "/ns"); auto timer_callback = []() {}; rclcpp::CallbackGroup::SharedPtr cb_grp = node->create_callback_group( diff --git a/rclcpp/test/rclcpp/test_timers_manager.cpp b/rclcpp/test/rclcpp/test_timers_manager.cpp new file mode 100644 index 0000000000..03cc53679c --- /dev/null +++ b/rclcpp/test/rclcpp/test_timers_manager.cpp @@ -0,0 +1,426 @@ +// Copyright 2023 iRobot Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include + +#include "rclcpp/contexts/default_context.hpp" +#include "rclcpp/experimental/timers_manager.hpp" + +using namespace std::chrono_literals; + +using rclcpp::experimental::TimersManager; + +using CallbackT = std::function; +using TimerT = rclcpp::WallTimer; + +class TestTimersManager : public ::testing::Test +{ +public: + void SetUp() + { + rclcpp::init(0, nullptr); + } + + void TearDown() + { + rclcpp::shutdown(); + } +}; + +TEST_F(TestTimersManager, empty_manager) +{ + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + EXPECT_EQ(std::chrono::nanoseconds::max(), timers_manager->get_head_timeout()); + EXPECT_FALSE(timers_manager->execute_head_timer()); + EXPECT_NO_THROW(timers_manager->execute_ready_timers()); + EXPECT_NO_THROW(timers_manager->clear()); + EXPECT_NO_THROW(timers_manager->start()); + EXPECT_NO_THROW(timers_manager->stop()); +} + +TEST_F(TestTimersManager, add_run_remove_timer) +{ + size_t t_runs = 0; + auto t = TimerT::make_shared( + 1ms, + [&t_runs]() { + t_runs++; + }, + rclcpp::contexts::get_global_default_context()); + std::weak_ptr t_weak = t; + + // Add the timer to the timers manager + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + timers_manager->add_timer(t); + + // Sleep for more 3 times the timer period + std::this_thread::sleep_for(3ms); + + // The timer is executed only once, even if we slept 3 times the period + timers_manager->execute_ready_timers(); + EXPECT_EQ(1u, t_runs); + + // Remove the timer from the manager + timers_manager->remove_timer(t); + + t.reset(); + // The timer is now not valid anymore + EXPECT_FALSE(t_weak.lock() != nullptr); +} + +TEST_F(TestTimersManager, clear) +{ + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + auto t1 = TimerT::make_shared(1ms, CallbackT(), rclcpp::contexts::get_global_default_context()); + std::weak_ptr t1_weak = t1; + auto t2 = TimerT::make_shared(1ms, CallbackT(), rclcpp::contexts::get_global_default_context()); + std::weak_ptr t2_weak = t2; + + timers_manager->add_timer(t1); + timers_manager->add_timer(t2); + + EXPECT_TRUE(t1_weak.lock() != nullptr); + EXPECT_TRUE(t2_weak.lock() != nullptr); + + timers_manager->clear(); + + t1.reset(); + t2.reset(); + + EXPECT_FALSE(t1_weak.lock() != nullptr); + EXPECT_FALSE(t2_weak.lock() != nullptr); +} + +TEST_F(TestTimersManager, remove_not_existing_timer) +{ + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + // Try to remove a nullptr timer + EXPECT_NO_THROW(timers_manager->remove_timer(nullptr)); + + auto t = TimerT::make_shared(1ms, CallbackT(), rclcpp::contexts::get_global_default_context()); + timers_manager->add_timer(t); + + // Remove twice the same timer + timers_manager->remove_timer(t); + EXPECT_NO_THROW(timers_manager->remove_timer(t)); +} + +TEST_F(TestTimersManager, timers_thread_exclusive_usage) +{ + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + timers_manager->start(); + + EXPECT_THROW(timers_manager->start(), std::exception); + EXPECT_THROW(timers_manager->get_head_timeout(), std::exception); + EXPECT_THROW(timers_manager->execute_ready_timers(), std::exception); + EXPECT_THROW(timers_manager->execute_head_timer(), std::exception); + + timers_manager->stop(); + + EXPECT_NO_THROW(timers_manager->get_head_timeout()); + EXPECT_NO_THROW(timers_manager->execute_ready_timers()); + EXPECT_NO_THROW(timers_manager->execute_head_timer()); +} + +TEST_F(TestTimersManager, add_timer_twice) +{ + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + auto t = TimerT::make_shared(1ms, CallbackT(), rclcpp::contexts::get_global_default_context()); + + timers_manager->add_timer(t); + EXPECT_NO_THROW(timers_manager->add_timer(t)); +} + +TEST_F(TestTimersManager, add_nullptr) +{ + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + EXPECT_THROW(timers_manager->add_timer(nullptr), std::exception); +} + +TEST_F(TestTimersManager, head_not_ready) +{ + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + size_t t_runs = 0; + auto t = TimerT::make_shared( + 10s, + [&t_runs]() { + t_runs++; + }, + rclcpp::contexts::get_global_default_context()); + + timers_manager->add_timer(t); + + // Timer will take 10s to get ready, so nothing to execute here + bool ret = timers_manager->execute_head_timer(); + EXPECT_FALSE(ret); + EXPECT_EQ(0u, t_runs); +} + +TEST_F(TestTimersManager, timers_order) +{ + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + size_t t1_runs = 0; + auto t1 = TimerT::make_shared( + 10ms, + [&t1_runs]() { + t1_runs++; + }, + rclcpp::contexts::get_global_default_context()); + + size_t t2_runs = 0; + auto t2 = TimerT::make_shared( + 30ms, + [&t2_runs]() { + t2_runs++; + }, + rclcpp::contexts::get_global_default_context()); + + size_t t3_runs = 0; + auto t3 = TimerT::make_shared( + 100ms, + [&t3_runs]() { + t3_runs++; + }, + rclcpp::contexts::get_global_default_context()); + + // Add timers in a random order + timers_manager->add_timer(t2); + timers_manager->add_timer(t3); + timers_manager->add_timer(t1); + + std::this_thread::sleep_for(10ms); + timers_manager->execute_ready_timers(); + EXPECT_EQ(1u, t1_runs); + EXPECT_EQ(0u, t2_runs); + EXPECT_EQ(0u, t3_runs); + + std::this_thread::sleep_for(30ms); + timers_manager->execute_ready_timers(); + EXPECT_EQ(2u, t1_runs); + EXPECT_EQ(1u, t2_runs); + EXPECT_EQ(0u, t3_runs); + + std::this_thread::sleep_for(100ms); + timers_manager->execute_ready_timers(); + EXPECT_EQ(3u, t1_runs); + EXPECT_EQ(2u, t2_runs); + EXPECT_EQ(1u, t3_runs); + + timers_manager->remove_timer(t1); + + std::this_thread::sleep_for(30ms); + timers_manager->execute_ready_timers(); + EXPECT_EQ(3u, t1_runs); + EXPECT_EQ(3u, t2_runs); + EXPECT_EQ(1u, t3_runs); +} + +TEST_F(TestTimersManager, start_stop_timers_thread) +{ + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + auto t = TimerT::make_shared(1ms, []() {}, rclcpp::contexts::get_global_default_context()); + timers_manager->add_timer(t); + + // Calling start multiple times will throw an error + EXPECT_NO_THROW(timers_manager->start()); + EXPECT_THROW(timers_manager->start(), std::exception); + + // Calling stop multiple times does not throw an error + EXPECT_NO_THROW(timers_manager->stop()); + EXPECT_NO_THROW(timers_manager->stop()); +} + +TEST_F(TestTimersManager, timers_thread) +{ + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + size_t t1_runs = 0; + auto t1 = TimerT::make_shared( + 1ms, + [&t1_runs]() { + t1_runs++; + }, + rclcpp::contexts::get_global_default_context()); + + size_t t2_runs = 0; + auto t2 = TimerT::make_shared( + 1ms, + [&t2_runs]() { + t2_runs++; + }, + rclcpp::contexts::get_global_default_context()); + + // Add timers + timers_manager->add_timer(t1); + timers_manager->add_timer(t2); + + // Run timers thread for a while + timers_manager->start(); + std::this_thread::sleep_for(5ms); + timers_manager->stop(); + + EXPECT_LT(1u, t1_runs); + EXPECT_LT(1u, t2_runs); + EXPECT_EQ(t1_runs, t2_runs); +} + +TEST_F(TestTimersManager, destructor) +{ + size_t t_runs = 0; + auto t = TimerT::make_shared( + 1ms, + [&t_runs]() { + t_runs++; + }, + rclcpp::contexts::get_global_default_context()); + std::weak_ptr t_weak = t; + + // When the timers manager is destroyed, it will stop the thread + // and clear the timers + { + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + timers_manager->add_timer(t); + + timers_manager->start(); + std::this_thread::sleep_for(100ms); + + EXPECT_LT(1u, t_runs); + } + + // The thread is not running anymore, so this value does not increase + size_t runs = t_runs; + std::this_thread::sleep_for(100ms); + EXPECT_EQ(runs, t_runs); + t.reset(); + EXPECT_FALSE(t_weak.lock() != nullptr); +} + +TEST_F(TestTimersManager, add_remove_while_thread_running) +{ + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + size_t t1_runs = 0; + auto t1 = TimerT::make_shared( + 1ms, + [&t1_runs]() { + t1_runs++; + }, + rclcpp::contexts::get_global_default_context()); + + size_t t2_runs = 0; + auto t2 = TimerT::make_shared( + 1ms, + [&t2_runs]() { + t2_runs++; + }, + rclcpp::contexts::get_global_default_context()); + + // Add timers + timers_manager->add_timer(t1); + + // Start timers thread + timers_manager->start(); + + // After a while remove t1 and add t2 + std::this_thread::sleep_for(5ms); + timers_manager->remove_timer(t1); + size_t tmp_t1 = t1_runs; + timers_manager->add_timer(t2); + + // Wait some more time and then stop + std::this_thread::sleep_for(5ms); + timers_manager->stop(); + + // t1 has stopped running + EXPECT_EQ(tmp_t1, t1_runs); + // t2 is correctly running + EXPECT_LT(1u, t2_runs); +} + +TEST_F(TestTimersManager, infinite_loop) +{ + // This test makes sure that even if timers have a period shorter than the duration + // of their callback the functions never block indefinitely. + + auto timers_manager = std::make_shared( + rclcpp::contexts::get_global_default_context()); + + size_t t1_runs = 0; + auto t1 = TimerT::make_shared( + 1ms, + [&t1_runs]() { + t1_runs++; + std::this_thread::sleep_for(5ms); + }, + rclcpp::contexts::get_global_default_context()); + + size_t t2_runs = 0; + auto t2 = TimerT::make_shared( + 1ms, + [&t2_runs]() { + t2_runs++; + std::this_thread::sleep_for(5ms); + }, + rclcpp::contexts::get_global_default_context()); + + timers_manager->add_timer(t1); + timers_manager->add_timer(t2); + + // Sleep for enough time to trigger timers + std::this_thread::sleep_for(3ms); + timers_manager->execute_ready_timers(); + EXPECT_EQ(1u, t1_runs); + EXPECT_EQ(1u, t2_runs); + + // Due to the long execution of timer callbacks, timers are already ready + bool ret = timers_manager->execute_head_timer(); + EXPECT_TRUE(ret); + EXPECT_EQ(3u, t1_runs + t2_runs); + + // Start a timers thread + timers_manager->start(); + std::this_thread::sleep_for(10ms); + timers_manager->stop(); + + EXPECT_LT(3u, t1_runs + t2_runs); + EXPECT_LT(1u, t1_runs); + EXPECT_LT(1u, t2_runs); +}