Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create common structures for executors to use #2143

Merged
merged 25 commits into from
Apr 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
2bf88de
Deprecate callback_group call taking context
mjcarroll Mar 29, 2023
9099635
Add base executor objects that can be used by implementors
mjcarroll Mar 29, 2023
2426056
Template common operations
mjcarroll Mar 29, 2023
173ffd6
Address reviewer feedback:
mjcarroll Mar 29, 2023
a524bf0
Lint
mjcarroll Mar 29, 2023
89f2106
Address reviewer feedback and fix templates
mjcarroll Mar 30, 2023
9695eaa
Merge branch 'rolling' into mjcarroll/executor_structures
mjcarroll Mar 30, 2023
e173e5a
Lint and docs
mjcarroll Mar 30, 2023
653d1a3
Make executor own the notify waitable
mjcarroll Mar 31, 2023
a6c4c1b
Add pending queue to collector, remove from waitable
mjcarroll Apr 3, 2023
9dd48ce
Change interrupt guard condition to shared_ptr
mjcarroll Apr 3, 2023
6267741
Lint and docs
mjcarroll Apr 3, 2023
1b1a915
Don't exchange atomic twice
mjcarroll Apr 3, 2023
0a9c9a6
Fix add_node and add more tests
mjcarroll Apr 3, 2023
0ae0bea
Make get_notify_guard_condition follow API tick-tock
mjcarroll Apr 3, 2023
87f41bf
Improve callback group tick-tocking
mjcarroll Apr 3, 2023
5809328
Don't lock twice
mjcarroll Apr 3, 2023
debe396
Address reviewer feedback
mjcarroll Apr 4, 2023
c4b6589
Add thread safety annotations and make locks consistent
mjcarroll Apr 4, 2023
cd7aaba
Address reviewer feedback
mjcarroll Apr 11, 2023
6379f0c
Remove the "add_valid_node" API
mjcarroll Apr 12, 2023
855c64d
Only notify if the trigger condition is valid
mjcarroll Apr 12, 2023
d9a9206
Only trigger if valid and needed
mjcarroll Apr 13, 2023
838d1ae
Merge branch 'rolling' into mjcarroll/executor_structures
mjcarroll Apr 13, 2023
ab3bbf4
Merge branch 'rolling' into mjcarroll/executor_structures
mjcarroll Apr 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/executable_list.cpp
src/rclcpp/executor.cpp
src/rclcpp/executors.cpp
src/rclcpp/executors/executor_entities_collection.cpp
src/rclcpp/executors/executor_entities_collector.cpp
src/rclcpp/executors/executor_notify_waitable.cpp
src/rclcpp/executors/multi_threaded_executor.cpp
src/rclcpp/executors/single_threaded_executor.cpp
src/rclcpp/executors/static_executor_entities_collector.cpp
Expand Down
60 changes: 59 additions & 1 deletion rclcpp/include/rclcpp/callback_group.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,54 @@ class CallbackGroup
* determines whether a callback group is automatically added to an executor
* with the node with which it is associated.
*/
[[deprecated("Use CallbackGroup constructor with context function argument")]]
RCLCPP_PUBLIC
explicit CallbackGroup(
CallbackGroupType group_type,
bool automatically_add_to_executor_with_node = true);

/// Constructor for CallbackGroup.
/**
* Callback Groups have a type, either 'Mutually Exclusive' or 'Reentrant'
* and when creating one the type must be specified.
*
* Callbacks in Reentrant Callback Groups must be able to:
* - run at the same time as themselves (reentrant)
* - run at the same time as other callbacks in their group
* - run at the same time as other callbacks in other groups
*
* Callbacks in Mutually Exclusive Callback Groups:
* - will not be run multiple times simultaneously (non-reentrant)
* - will not be run at the same time as other callbacks in their group
* - but must run at the same time as callbacks in other groups
*
* Additionally, callback groups have a property which determines whether or
* not they are added to an executor with their associated node automatically.
* When creating a callback group the automatically_add_to_executor_with_node
* argument determines this behavior, and if true it will cause the newly
* created callback group to be added to an executor with the node when the
* Executor::add_node method is used.
* If false, this callback group will not be added automatically and would
* have to be added to an executor manually using the
* Executor::add_callback_group method.
*
* Whether the node was added to the executor before creating the callback
* group, or after, is irrelevant; the callback group will be automatically
* added to the executor in either case.
*
* \param[in] group_type The type of the callback group.
* \param[in] get_node_context Lambda to retrieve the node context when
* checking that the creating node is valid and using the guard condition.
* \param[in] automatically_add_to_executor_with_node A boolean that
* determines whether a callback group is automatically added to an executor
* with the node with which it is associated.
*/
RCLCPP_PUBLIC
explicit CallbackGroup(
CallbackGroupType group_type,
std::function<rclcpp::Context::SharedPtr(void)> get_node_context,
mjcarroll marked this conversation as resolved.
Show resolved Hide resolved
bool automatically_add_to_executor_with_node = true);

/// Default destructor.
RCLCPP_PUBLIC
~CallbackGroup();
Expand Down Expand Up @@ -178,11 +221,24 @@ class CallbackGroup
bool
automatically_add_to_executor_with_node() const;

/// Defer creating the notify guard condition and return it.
/// Retrieve the guard condition used to signal changes to this callback group.
/**
* \param[in] context_ptr context to use when creating the guard condition
* \return guard condition if it is valid, otherwise nullptr.
*/
[[deprecated("Use get_notify_guard_condition() without arguments")]]
RCLCPP_PUBLIC
rclcpp::GuardCondition::SharedPtr
get_notify_guard_condition(const rclcpp::Context::SharedPtr context_ptr);

/// Retrieve the guard condition used to signal changes to this callback group.
/**
* \return guard condition if it is valid, otherwise nullptr.
*/
RCLCPP_PUBLIC
rclcpp::GuardCondition::SharedPtr
get_notify_guard_condition();

/// Trigger the notify guard condition.
RCLCPP_PUBLIC
void
Expand Down Expand Up @@ -234,6 +290,8 @@ class CallbackGroup
std::shared_ptr<rclcpp::GuardCondition> notify_guard_condition_ = nullptr;
std::recursive_mutex notify_guard_condition_mutex_;

std::function<rclcpp::Context::SharedPtr(void)> get_context_;

private:
template<typename TypeT, typename Function>
typename TypeT::SharedPtr _find_ptrs_if_impl(
Expand Down
3 changes: 2 additions & 1 deletion rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -637,8 +637,9 @@ class Executor
std::atomic_bool spinning;

/// Guard condition for signaling the rmw layer to wake up for special events.
rclcpp::GuardCondition interrupt_guard_condition_;
std::shared_ptr<rclcpp::GuardCondition> interrupt_guard_condition_;

/// Guard condition for signaling the rmw layer to wake up for system shutdown.
std::shared_ptr<rclcpp::GuardCondition> shutdown_guard_condition_;

/// Wait set for managing entities that the rmw layer waits on.
Expand Down
212 changes: 212 additions & 0 deletions rclcpp/include/rclcpp/executors/executor_entities_collection.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
// Copyright 2023 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP__EXECUTORS__EXECUTOR_ENTITIES_COLLECTION_HPP_
#define RCLCPP__EXECUTORS__EXECUTOR_ENTITIES_COLLECTION_HPP_

#include <deque>
mjcarroll marked this conversation as resolved.
Show resolved Hide resolved
#include <functional>
#include <unordered_map>
#include <vector>

#include <rclcpp/any_executable.hpp>
#include <rclcpp/node_interfaces/node_base.hpp>
#include <rclcpp/callback_group.hpp>
#include <rclcpp/executors/executor_notify_waitable.hpp>
#include <rclcpp/visibility_control.hpp>
#include <rclcpp/wait_result.hpp>
#include <rclcpp/wait_set.hpp>

namespace rclcpp
{
namespace executors
{

/// Structure to represent a single entity's entry in a collection
template<typename EntityValueType>
struct CollectionEntry
{
/// Weak pointer to entity type
using EntityWeakPtr = typename EntityValueType::WeakPtr;
/// Shared pointer to entity type
using EntitySharedPtr = typename EntityValueType::SharedPtr;

/// The entity
EntityWeakPtr entity;

/// If relevant, the entity's corresponding callback_group
rclcpp::CallbackGroup::WeakPtr callback_group;
};

/// Update a collection based on another collection
/*
* Iterates update_from and update_to to see which entities have been added/removed between
* the two collections.
*
* For each new entry (in update_from, but not in update_to),
* add the entity and fire the on_added callback
* For each removed entry (in update_to, but not in update_from),
* remove the entity and fire the on_removed callback.
*
* \param[in] update_from The collection representing the next iteration's state
* \param[inout] update_to The collection representing the current iteration's state
* \param[in] on_added Callback fired when a new entity is detected
* \param[in] on_removed Callback fired when an entity is removed
*/
template<typename CollectionType>
void update_entities(
const CollectionType & update_from,
CollectionType & update_to,
std::function<void(const typename CollectionType::EntitySharedPtr &)> on_added,
std::function<void(const typename CollectionType::EntitySharedPtr &)> on_removed
)
{
for (auto it = update_to.begin(); it != update_to.end(); ) {
if (update_from.count(it->first) == 0) {
auto entity = it->second.entity.lock();
if (entity) {
on_removed(entity);
}
it = update_to.erase(it);
} else {
++it;
}
}
for (auto it = update_from.begin(); it != update_from.end(); ++it) {
if (update_to.count(it->first) == 0) {
auto entity = it->second.entity.lock();
if (entity) {
on_added(entity);
}
update_to.insert(*it);
}
}
}

/// A collection of entities, indexed by their corresponding handles
template<typename EntityKeyType, typename EntityValueType>
class EntityCollection
: public std::unordered_map<const EntityKeyType *, CollectionEntry<EntityValueType>>
{
public:
/// Key type of the map
using Key = const EntityKeyType *;

/// Weak pointer to entity type
using EntityWeakPtr = typename EntityValueType::WeakPtr;

/// Shared pointer to entity type
using EntitySharedPtr = typename EntityValueType::SharedPtr;

/// Update this collection based on the contents of another collection
/**
* Update the internal state of this collection, firing callbacks when entities have been
* added or removed.
*
* \param[in] other Collection to compare to
* \param[in] on_added Callback for when entities have been added
* \param[in] on_removed Callback for when entities have been removed
*/
void update(
const EntityCollection<EntityKeyType, EntityValueType> & other,
std::function<void(const EntitySharedPtr &)> on_added,
std::function<void(const EntitySharedPtr &)> on_removed)
{
update_entities(other, *this, on_added, on_removed);
}
};

/// Represent the total set of entities for a single executor
/**
* This allows the entities to be stored from ExecutorEntitiesCollector.
* The structure also makes in convenient to re-evaluate when entities have been added or removed.
*/
struct ExecutorEntitiesCollection
{
/// Collection type for timer entities
using TimerCollection = EntityCollection<rcl_timer_t, rclcpp::TimerBase>;

/// Collection type for subscription entities
using SubscriptionCollection = EntityCollection<rcl_subscription_t, rclcpp::SubscriptionBase>;

/// Collection type for client entities
using ClientCollection = EntityCollection<rcl_client_t, rclcpp::ClientBase>;

/// Collection type for service entities
using ServiceCollection = EntityCollection<rcl_service_t, rclcpp::ServiceBase>;

/// Collection type for waitable entities
using WaitableCollection = EntityCollection<rclcpp::Waitable, rclcpp::Waitable>;

/// Collection type for guard condition entities
using GuardConditionCollection = EntityCollection<rcl_guard_condition_t, rclcpp::GuardCondition>;

/// Collection of timers currently in use by the executor.
TimerCollection timers;

/// Collection of subscriptions currently in use by the executor.
SubscriptionCollection subscriptions;

/// Collection of clients currently in use by the executor.
ClientCollection clients;

/// Collection of services currently in use by the executor.
ServiceCollection services;

/// Collection of guard conditions currently in use by the executor.
GuardConditionCollection guard_conditions;

/// Collection of waitables currently in use by the executor.
WaitableCollection waitables;

/// Check if the entities collection is empty
/**
* \return true if all member collections are empty, false otherwise
*/
bool empty() const;

/// Clear the entities collection
void clear();
};

/// Build an entities collection from callback groups
/**
* Iterates a list of callback groups and adds entities from each valid group
*
* \param[in] callback_groups List of callback groups to check for entities
* \param[inout] colletion Entities collection to populate with found entities
*/
void
build_entities_collection(
const std::vector<rclcpp::CallbackGroup::WeakPtr> & callback_groups,
ExecutorEntitiesCollection & collection);

/// Build a queue of executables ready to be executed
/**
* Iterates a list of entities and adds them to a queue if they are ready.
*
* \param[in] collection Collection of entities corresponding to the current wait set.
* \param[in] wait_result Result of rclcpp::WaitSet::wait corresponding to the collection.
* \return A queue of executables that have been marked ready by the waitset.
*/
std::deque<rclcpp::AnyExecutable>
ready_executables(
const ExecutorEntitiesCollection & collection,
rclcpp::WaitResult<rclcpp::WaitSet> & wait_result
);

} // namespace executors
} // namespace rclcpp

#endif // RCLCPP__EXECUTORS__EXECUTOR_ENTITIES_COLLECTION_HPP_