Skip to content

Commit

Permalink
add events-executor and timers-manager in rclcpp (ros2#2155)
Browse files Browse the repository at this point in the history
* add events-executor and timers-manager in rclcpp

fix check for execute_timers_separate_thread; ignore on_ready_callback if asked to execute a timer;  reduce usage of void pointers

* have the executor notify waitable fiddle with guard condition callbacks only if necessary

Signed-off-by: Alberto Soragna <alberto.soragna@gmail.com>
  • Loading branch information
alsora authored and Alberto Soragna committed Apr 29, 2023
1 parent f7215ee commit 84482ca
Show file tree
Hide file tree
Showing 17 changed files with 3,377 additions and 39 deletions.
2 changes: 2 additions & 0 deletions rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/executors/static_executor_entities_collector.cpp
src/rclcpp/executors/static_single_threaded_executor.cpp
src/rclcpp/expand_topic_or_service_name.cpp
src/rclcpp/experimental/executors/events_executor/events_executor.cpp
src/rclcpp/experimental/timers_manager.cpp
src/rclcpp/future_return_code.cpp
src/rclcpp/generic_publisher.cpp
src/rclcpp/generic_subscription.cpp
Expand Down
1 change: 1 addition & 0 deletions rclcpp/include/rclcpp/executors.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "rclcpp/executors/multi_threaded_executor.hpp"
#include "rclcpp/executors/single_threaded_executor.hpp"
#include "rclcpp/executors/static_single_threaded_executor.hpp"
#include "rclcpp/experimental/executors/events_executor/events_executor.hpp"
#include "rclcpp/node.hpp"
#include "rclcpp/utilities.hpp"
#include "rclcpp/visibility_control.hpp"
Expand Down
33 changes: 31 additions & 2 deletions rclcpp/include/rclcpp/executors/executor_notify_waitable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,25 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable
std::shared_ptr<void>
take_data() override;

/// Take the data from an entity ID so that it can be consumed with `execute`.
/**
* \param[in] id ID of the entity to take data from.
* \return If available, data to be used, otherwise nullptr
* \sa rclcpp::Waitable::take_data_by_entity_id
*/
RCLCPP_PUBLIC
std::shared_ptr<void>
take_data_by_entity_id(size_t id) override;

/// Set a callback to be called whenever the waitable becomes ready.
/**
* \param[in] callback callback to set
* \sa rclcpp::Waitable::set_on_ready_callback
*/
RCLCPP_PUBLIC
void
set_on_ready_callback(std::function<void(size_t, int)> callback) override;

/// Add a guard condition to be waited on.
/**
* \param[in] guard_condition The guard condition to add.
Expand All @@ -96,13 +115,21 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable
void
add_guard_condition(rclcpp::GuardCondition::WeakPtr guard_condition);

/// Unset any callback registered via set_on_ready_callback.
/**
* \sa rclcpp::Waitable::clear_on_ready_callback
*/
RCLCPP_PUBLIC
void
clear_on_ready_callback() override;

/// Remove a guard condition from being waited on.
/**
* \param[in] guard_condition The guard condition to remove.
* \param[in] weak_guard_condition The guard condition to remove.
*/
RCLCPP_PUBLIC
void
remove_guard_condition(rclcpp::GuardCondition::WeakPtr guard_condition);
remove_guard_condition(rclcpp::GuardCondition::WeakPtr weak_guard_condition);

/// Get the number of ready guard_conditions
/**
Expand All @@ -118,6 +145,8 @@ class ExecutorNotifyWaitable : public rclcpp::Waitable

std::mutex guard_condition_mutex_;

std::function<void(size_t)> on_ready_callback_;

/// The collection of guard conditions to be waited on.
std::set<rclcpp::GuardCondition::WeakPtr,
std::owner_less<rclcpp::GuardCondition::WeakPtr>> notify_guard_conditions_;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
// Copyright 2023 iRobot Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENTS_EXECUTOR_HPP_
#define RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENTS_EXECUTOR_HPP_

#include <atomic>
#include <chrono>
#include <memory>
#include <vector>

#include "rclcpp/executor.hpp"
#include "rclcpp/executors/executor_entities_collection.hpp"
#include "rclcpp/executors/executor_entities_collector.hpp"
#include "rclcpp/experimental/executors/events_executor/events_executor_event_types.hpp"
#include "rclcpp/experimental/executors/events_executor/events_queue.hpp"
#include "rclcpp/experimental/executors/events_executor/simple_events_queue.hpp"
#include "rclcpp/experimental/timers_manager.hpp"
#include "rclcpp/node.hpp"

namespace rclcpp
{
namespace experimental
{
namespace executors
{

/// Events executor implementation
/**
* This executor uses an events queue and a timers manager to execute entities from its
* associated nodes and callback groups.
* ROS 2 entities allow to set callback functions that are invoked when the entity is triggered
* or has work to do. The events-executor sets these callbacks such that they push an
* event into its queue.
*
* This executor tries to reduce as much as possible the amount of maintenance operations.
* This allows to use customized `EventsQueue` classes to achieve different goals such
* as very low CPU usage, bounded memory requirement, determinism, etc.
*
* The executor uses a weak ownership model and it locks entities only while executing
* their related events.
*
* To run this executor:
* rclcpp::experimental::executors::EventsExecutor executor;
* executor.add_node(node);
* executor.spin();
* executor.remove_node(node);
*/
class EventsExecutor : public rclcpp::Executor
{
friend class EventsExecutorEntitiesCollector;

public:
RCLCPP_SMART_PTR_DEFINITIONS(EventsExecutor)

/// Default constructor. See the default constructor for Executor.
/**
* \param[in] events_queue The queue used to store events.
* \param[in] execute_timers_separate_thread If true, timers are executed in a separate
* thread. If false, timers are executed in the same thread as all other entities.
* \param[in] options Options used to configure the executor.
*/
RCLCPP_PUBLIC
explicit EventsExecutor(
rclcpp::experimental::executors::EventsQueue::UniquePtr events_queue = std::make_unique<
rclcpp::experimental::executors::SimpleEventsQueue>(),
bool execute_timers_separate_thread = false,
const rclcpp::ExecutorOptions & options = rclcpp::ExecutorOptions());

/// Default destructor.
RCLCPP_PUBLIC
virtual ~EventsExecutor();

/// Events executor implementation of spin.
/**
* This function will block until work comes in, execute it, and keep blocking.
* It will only be interrupted by a CTRL-C (managed by the global signal handler).
* \throws std::runtime_error when spin() called while already spinning
*/
RCLCPP_PUBLIC
void
spin() override;

/// Events executor implementation of spin some
/**
* This non-blocking function will execute the timers and events
* that were ready when this API was called, until timeout or no
* more work available. New ready-timers/events arrived while
* executing work, won't be taken into account here.
*
* Example:
* while(condition) {
* spin_some();
* sleep(); // User should have some sync work or
* // sleep to avoid a 100% CPU usage
* }
*/
RCLCPP_PUBLIC
void
spin_some(std::chrono::nanoseconds max_duration = std::chrono::nanoseconds(0)) override;

/// Events executor implementation of spin all
/**
* This non-blocking function will execute timers and events
* until timeout or no more work available. If new ready-timers/events
* arrive while executing work available, they will be executed
* as long as the timeout hasn't expired.
*
* Example:
* while(condition) {
* spin_all();
* sleep(); // User should have some sync work or
* // sleep to avoid a 100% CPU usage
* }
*/
RCLCPP_PUBLIC
void
spin_all(std::chrono::nanoseconds max_duration) override;

/// Add a node to the executor.
/**
* \sa rclcpp::Executor::add_node
*/
RCLCPP_PUBLIC
void
add_node(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
bool notify = true) override;

/// Convenience function which takes Node and forwards NodeBaseInterface.
/**
* \sa rclcpp::EventsExecutor::add_node
*/
RCLCPP_PUBLIC
void
add_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify = true) override;

/// Remove a node from the executor.
/**
* \sa rclcpp::Executor::remove_node
*/
RCLCPP_PUBLIC
void
remove_node(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
bool notify = true) override;

/// Convenience function which takes Node and forwards NodeBaseInterface.
/**
* \sa rclcpp::Executor::remove_node
*/
RCLCPP_PUBLIC
void
remove_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify = true) override;

/// Add a callback group to an executor.
/**
* \sa rclcpp::Executor::add_callback_group
*/
RCLCPP_PUBLIC
void
add_callback_group(
rclcpp::CallbackGroup::SharedPtr group_ptr,
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
bool notify = true) override;

/// Remove callback group from the executor
/**
* \sa rclcpp::Executor::remove_callback_group
*/
RCLCPP_PUBLIC
void
remove_callback_group(
rclcpp::CallbackGroup::SharedPtr group_ptr,
bool notify = true) override;

/// Get callback groups that belong to executor.
/**
* \sa rclcpp::Executor::get_all_callback_groups()
*/
RCLCPP_PUBLIC
std::vector<rclcpp::CallbackGroup::WeakPtr>
get_all_callback_groups() override;

/// Get callback groups that belong to executor.
/**
* \sa rclcpp::Executor::get_manually_added_callback_groups()
*/
RCLCPP_PUBLIC
std::vector<rclcpp::CallbackGroup::WeakPtr>
get_manually_added_callback_groups() override;

/// Get callback groups that belong to executor.
/**
* \sa rclcpp::Executor::get_automatically_added_callback_groups_from_nodes()
*/
RCLCPP_PUBLIC
std::vector<rclcpp::CallbackGroup::WeakPtr>
get_automatically_added_callback_groups_from_nodes() override;

protected:
/// Internal implementation of spin_once
RCLCPP_PUBLIC
void
spin_once_impl(std::chrono::nanoseconds timeout) override;

/// Internal implementation of spin_some
RCLCPP_PUBLIC
void
spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive);

private:
RCLCPP_DISABLE_COPY(EventsExecutor)

/// Execute a provided executor event if its associated entities are available
void
execute_event(const ExecutorEvent & event);

/// Collect entities from callback groups and refresh the current collection with them
void
refresh_current_collection_from_callback_groups();

/// Refresh the current collection using the provided new_collection
void
refresh_current_collection(const rclcpp::executors::ExecutorEntitiesCollection & new_collection);

/// Create a listener callback function for the provided entity
std::function<void(size_t)>
create_entity_callback(void * entity_key, ExecutorEventType type);

/// Create a listener callback function for the provided waitable entity
std::function<void(size_t, int)>
create_waitable_callback(const rclcpp::Waitable * waitable_id);

/// Searches for the provided entity_id in the collection and returns the entity if valid
template<typename CollectionType>
typename CollectionType::EntitySharedPtr
retrieve_entity(typename CollectionType::Key entity_id, CollectionType & collection)
{
// Check if the entity_id is in the collection
auto it = collection.find(entity_id);
if (it == collection.end()) {
return nullptr;
}

// Check if the entity associated with the entity_id is valid
// and remove it from the collection if it isn't
auto entity = it->second.entity.lock();
if (!entity) {
collection.erase(it);
}

// Return the retrieved entity (this can be a nullptr if the entity was not valid)
return entity;
}

/// Queue where entities can push events
rclcpp::experimental::executors::EventsQueue::UniquePtr events_queue_;

std::shared_ptr<rclcpp::executors::ExecutorEntitiesCollector> entities_collector_;
std::shared_ptr<rclcpp::executors::ExecutorEntitiesCollection> current_entities_collection_;
std::shared_ptr<rclcpp::executors::ExecutorNotifyWaitable> notify_waitable_;

/// Flag used to reduce the number of unnecessary waitable events
std::atomic<bool> notify_waitable_event_pushed_ {false};

/// Timers manager used to track and/or execute associated timers
std::shared_ptr<rclcpp::experimental::TimersManager> timers_manager_;
};

} // namespace executors
} // namespace experimental
} // namespace rclcpp

#endif // RCLCPP__EXPERIMENTAL__EXECUTORS__EVENTS_EXECUTOR__EVENTS_EXECUTOR_HPP_
Loading

0 comments on commit 84482ca

Please sign in to comment.