Skip to content

Commit

Permalink
Create common structures for executors to use (ros2#2143)
Browse files Browse the repository at this point in the history
* Deprecate callback_group call taking context

* Add base executor objects that can be used by implementors

* Template common operations

* Add callback to EntitiesCollector constructor
* Make function to check automatically added callback groups take a list

* Make executor own the notify waitable

* Add pending queue to collector, remove from waitable

Also change node's get_guard_condition to return shared_ptr

* Change interrupt guard condition to shared_ptr

Check if guard condition is valid before adding it to the waitable

* Make get_notify_guard_condition follow API tick-tock

* Improve callback group tick-tocking

* Add thread safety annotations and make locks consistent

* Remove the "add_valid_node" API

* Only notify if the trigger condition is valid

* Only trigger if valid and needed

Signed-off-by: Michael Carroll <michael@openrobotics.org>
  • Loading branch information
mjcarroll authored and Alberto Soragna committed May 3, 2023
1 parent bf455b2 commit 442c426
Show file tree
Hide file tree
Showing 24 changed files with 1,986 additions and 45 deletions.
3 changes: 3 additions & 0 deletions rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/executable_list.cpp
src/rclcpp/executor.cpp
src/rclcpp/executors.cpp
src/rclcpp/executors/executor_entities_collection.cpp
src/rclcpp/executors/executor_entities_collector.cpp
src/rclcpp/executors/executor_notify_waitable.cpp
src/rclcpp/executors/multi_threaded_executor.cpp
src/rclcpp/executors/single_threaded_executor.cpp
src/rclcpp/executors/static_executor_entities_collector.cpp
Expand Down
60 changes: 59 additions & 1 deletion rclcpp/include/rclcpp/callback_group.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,54 @@ class CallbackGroup
* determines whether a callback group is automatically added to an executor
* with the node with which it is associated.
*/
[[deprecated("Use CallbackGroup constructor with context function argument")]]
RCLCPP_PUBLIC
explicit CallbackGroup(
CallbackGroupType group_type,
bool automatically_add_to_executor_with_node = true);

/// Constructor for CallbackGroup.
/**
* Callback Groups have a type, either 'Mutually Exclusive' or 'Reentrant'
* and when creating one the type must be specified.
*
* Callbacks in Reentrant Callback Groups must be able to:
* - run at the same time as themselves (reentrant)
* - run at the same time as other callbacks in their group
* - run at the same time as other callbacks in other groups
*
* Callbacks in Mutually Exclusive Callback Groups:
* - will not be run multiple times simultaneously (non-reentrant)
* - will not be run at the same time as other callbacks in their group
* - but must run at the same time as callbacks in other groups
*
* Additionally, callback groups have a property which determines whether or
* not they are added to an executor with their associated node automatically.
* When creating a callback group the automatically_add_to_executor_with_node
* argument determines this behavior, and if true it will cause the newly
* created callback group to be added to an executor with the node when the
* Executor::add_node method is used.
* If false, this callback group will not be added automatically and would
* have to be added to an executor manually using the
* Executor::add_callback_group method.
*
* Whether the node was added to the executor before creating the callback
* group, or after, is irrelevant; the callback group will be automatically
* added to the executor in either case.
*
* \param[in] group_type The type of the callback group.
* \param[in] get_node_context Lambda to retrieve the node context when
* checking that the creating node is valid and using the guard condition.
* \param[in] automatically_add_to_executor_with_node A boolean that
* determines whether a callback group is automatically added to an executor
* with the node with which it is associated.
*/
RCLCPP_PUBLIC
explicit CallbackGroup(
CallbackGroupType group_type,
std::function<rclcpp::Context::SharedPtr(void)> get_node_context,
bool automatically_add_to_executor_with_node = true);

/// Default destructor.
RCLCPP_PUBLIC
~CallbackGroup();
Expand Down Expand Up @@ -178,11 +221,24 @@ class CallbackGroup
bool
automatically_add_to_executor_with_node() const;

/// Defer creating the notify guard condition and return it.
/// Retrieve the guard condition used to signal changes to this callback group.
/**
* \param[in] context_ptr context to use when creating the guard condition
* \return guard condition if it is valid, otherwise nullptr.
*/
[[deprecated("Use get_notify_guard_condition() without arguments")]]
RCLCPP_PUBLIC
rclcpp::GuardCondition::SharedPtr
get_notify_guard_condition(const rclcpp::Context::SharedPtr context_ptr);

/// Retrieve the guard condition used to signal changes to this callback group.
/**
* \return guard condition if it is valid, otherwise nullptr.
*/
RCLCPP_PUBLIC
rclcpp::GuardCondition::SharedPtr
get_notify_guard_condition();

/// Trigger the notify guard condition.
RCLCPP_PUBLIC
void
Expand Down Expand Up @@ -234,6 +290,8 @@ class CallbackGroup
std::shared_ptr<rclcpp::GuardCondition> notify_guard_condition_ = nullptr;
std::recursive_mutex notify_guard_condition_mutex_;

std::function<rclcpp::Context::SharedPtr(void)> get_context_;

private:
template<typename TypeT, typename Function>
typename TypeT::SharedPtr _find_ptrs_if_impl(
Expand Down
3 changes: 2 additions & 1 deletion rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -537,8 +537,9 @@ class Executor
std::atomic_bool spinning;

/// Guard condition for signaling the rmw layer to wake up for special events.
rclcpp::GuardCondition interrupt_guard_condition_;
std::shared_ptr<rclcpp::GuardCondition> interrupt_guard_condition_;

/// Guard condition for signaling the rmw layer to wake up for system shutdown.
std::shared_ptr<rclcpp::GuardCondition> shutdown_guard_condition_;

/// Wait set for managing entities that the rmw layer waits on.
Expand Down
212 changes: 212 additions & 0 deletions rclcpp/include/rclcpp/executors/executor_entities_collection.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
// Copyright 2023 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP__EXECUTORS__EXECUTOR_ENTITIES_COLLECTION_HPP_
#define RCLCPP__EXECUTORS__EXECUTOR_ENTITIES_COLLECTION_HPP_

#include <deque>
#include <functional>
#include <unordered_map>
#include <vector>

#include <rclcpp/any_executable.hpp>
#include <rclcpp/node_interfaces/node_base.hpp>
#include <rclcpp/callback_group.hpp>
#include <rclcpp/executors/executor_notify_waitable.hpp>
#include <rclcpp/visibility_control.hpp>
#include <rclcpp/wait_result.hpp>
#include <rclcpp/wait_set.hpp>

namespace rclcpp
{
namespace executors
{

/// Structure to represent a single entity's entry in a collection
template<typename EntityValueType>
struct CollectionEntry
{
/// Weak pointer to entity type
using EntityWeakPtr = typename EntityValueType::WeakPtr;
/// Shared pointer to entity type
using EntitySharedPtr = typename EntityValueType::SharedPtr;

/// The entity
EntityWeakPtr entity;

/// If relevant, the entity's corresponding callback_group
rclcpp::CallbackGroup::WeakPtr callback_group;
};

/// Update a collection based on another collection
/*
* Iterates update_from and update_to to see which entities have been added/removed between
* the two collections.
*
* For each new entry (in update_from, but not in update_to),
* add the entity and fire the on_added callback
* For each removed entry (in update_to, but not in update_from),
* remove the entity and fire the on_removed callback.
*
* \param[in] update_from The collection representing the next iteration's state
* \param[inout] update_to The collection representing the current iteration's state
* \param[in] on_added Callback fired when a new entity is detected
* \param[in] on_removed Callback fired when an entity is removed
*/
template<typename CollectionType>
void update_entities(
const CollectionType & update_from,
CollectionType & update_to,
std::function<void(const typename CollectionType::EntitySharedPtr &)> on_added,
std::function<void(const typename CollectionType::EntitySharedPtr &)> on_removed
)
{
for (auto it = update_to.begin(); it != update_to.end(); ) {
if (update_from.count(it->first) == 0) {
auto entity = it->second.entity.lock();
if (entity) {
on_removed(entity);
}
it = update_to.erase(it);
} else {
++it;
}
}
for (auto it = update_from.begin(); it != update_from.end(); ++it) {
if (update_to.count(it->first) == 0) {
auto entity = it->second.entity.lock();
if (entity) {
on_added(entity);
}
update_to.insert(*it);
}
}
}

/// A collection of entities, indexed by their corresponding handles
template<typename EntityKeyType, typename EntityValueType>
class EntityCollection
: public std::unordered_map<const EntityKeyType *, CollectionEntry<EntityValueType>>
{
public:
/// Key type of the map
using Key = const EntityKeyType *;

/// Weak pointer to entity type
using EntityWeakPtr = typename EntityValueType::WeakPtr;

/// Shared pointer to entity type
using EntitySharedPtr = typename EntityValueType::SharedPtr;

/// Update this collection based on the contents of another collection
/**
* Update the internal state of this collection, firing callbacks when entities have been
* added or removed.
*
* \param[in] other Collection to compare to
* \param[in] on_added Callback for when entities have been added
* \param[in] on_removed Callback for when entities have been removed
*/
void update(
const EntityCollection<EntityKeyType, EntityValueType> & other,
std::function<void(const EntitySharedPtr &)> on_added,
std::function<void(const EntitySharedPtr &)> on_removed)
{
update_entities(other, *this, on_added, on_removed);
}
};

/// Represent the total set of entities for a single executor
/**
* This allows the entities to be stored from ExecutorEntitiesCollector.
* The structure also makes in convenient to re-evaluate when entities have been added or removed.
*/
struct ExecutorEntitiesCollection
{
/// Collection type for timer entities
using TimerCollection = EntityCollection<rcl_timer_t, rclcpp::TimerBase>;

/// Collection type for subscription entities
using SubscriptionCollection = EntityCollection<rcl_subscription_t, rclcpp::SubscriptionBase>;

/// Collection type for client entities
using ClientCollection = EntityCollection<rcl_client_t, rclcpp::ClientBase>;

/// Collection type for service entities
using ServiceCollection = EntityCollection<rcl_service_t, rclcpp::ServiceBase>;

/// Collection type for waitable entities
using WaitableCollection = EntityCollection<rclcpp::Waitable, rclcpp::Waitable>;

/// Collection type for guard condition entities
using GuardConditionCollection = EntityCollection<rcl_guard_condition_t, rclcpp::GuardCondition>;

/// Collection of timers currently in use by the executor.
TimerCollection timers;

/// Collection of subscriptions currently in use by the executor.
SubscriptionCollection subscriptions;

/// Collection of clients currently in use by the executor.
ClientCollection clients;

/// Collection of services currently in use by the executor.
ServiceCollection services;

/// Collection of guard conditions currently in use by the executor.
GuardConditionCollection guard_conditions;

/// Collection of waitables currently in use by the executor.
WaitableCollection waitables;

/// Check if the entities collection is empty
/**
* \return true if all member collections are empty, false otherwise
*/
bool empty() const;

/// Clear the entities collection
void clear();
};

/// Build an entities collection from callback groups
/**
* Iterates a list of callback groups and adds entities from each valid group
*
* \param[in] callback_groups List of callback groups to check for entities
* \param[inout] colletion Entities collection to populate with found entities
*/
void
build_entities_collection(
const std::vector<rclcpp::CallbackGroup::WeakPtr> & callback_groups,
ExecutorEntitiesCollection & collection);

/// Build a queue of executables ready to be executed
/**
* Iterates a list of entities and adds them to a queue if they are ready.
*
* \param[in] collection Collection of entities corresponding to the current wait set.
* \param[in] wait_result Result of rclcpp::WaitSet::wait corresponding to the collection.
* \return A queue of executables that have been marked ready by the waitset.
*/
std::deque<rclcpp::AnyExecutable>
ready_executables(
const ExecutorEntitiesCollection & collection,
rclcpp::WaitResult<rclcpp::WaitSet> & wait_result
);

} // namespace executors
} // namespace rclcpp

#endif // RCLCPP__EXECUTORS__EXECUTOR_ENTITIES_COLLECTION_HPP_
Loading

0 comments on commit 442c426

Please sign in to comment.