From 6a37e9695fe0dc1f8b1018078725d86cef78aef8 Mon Sep 17 00:00:00 2001 From: Pedro Pena Date: Mon, 20 Jul 2020 15:57:38 -0400 Subject: [PATCH] enabled static executor and added add callback group feature --- rclcpp/CMakeLists.txt | 2 +- .../static_executor_entities_collector.hpp | 11 +++++++ .../static_single_threaded_executor.hpp | 7 +++++ .../static_executor_entities_collector.cpp | 16 +++++++++- .../static_single_threaded_executor.cpp | 31 +++++++++++++++++++ 5 files changed, 65 insertions(+), 2 deletions(-) diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 24d0a1d930..26d7e69362 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -46,7 +46,7 @@ set(${PROJECT_NAME}_SRCS src/rclcpp/duration.cpp src/rclcpp/event.cpp src/rclcpp/exceptions/exceptions.cpp -# src/rclcpp/executable_list.cpp + src/rclcpp/executable_list.cpp src/rclcpp/executor.cpp src/rclcpp/executors.cpp src/rclcpp/expand_topic_or_service_name.cpp diff --git a/rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp b/rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp index 0df25fde9d..5364b9e4e5 100644 --- a/rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp +++ b/rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp @@ -103,6 +103,12 @@ class StaticExecutorEntitiesCollector final size_t get_number_of_ready_guard_conditions() override; + RCLCPP_PUBLIC + void + add_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr); + /** * \sa rclcpp::Executor::add_node() * \throw std::runtime_error if node was already added @@ -223,6 +229,11 @@ class StaticExecutorEntitiesCollector final /// Memory strategy: an interface for handling user-defined memory allocation strategies. rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy_; + typedef std::map> WeakCallbackGroupsToNodesMap; + WeakCallbackGroupsToNodesMap weak_groups_to_nodes_; + /// List of weak nodes registered in the static executor std::list weak_nodes_; diff --git a/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp index c78259ca75..5b36d360f2 100644 --- a/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp @@ -78,6 +78,13 @@ class StaticSingleThreadedExecutor : public rclcpp::Executor void spin() override; + RCLCPP_PUBLIC + void + add_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + bool notify) override; + /// Add a node to the executor. /** * An executor can have zero or more nodes which provide work during `spin` functions. diff --git a/rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp b/rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp index 5461e665ab..be73ddd7c9 100644 --- a/rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp +++ b/rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp @@ -77,7 +77,7 @@ void StaticExecutorEntitiesCollector::fill_memory_strategy() { memory_strategy_->clear_handles(); - bool has_invalid_weak_nodes = memory_strategy_->collect_entities(weak_nodes_); + bool has_invalid_weak_nodes = memory_strategy_->collect_entities(weak_groups_to_nodes_); // Clean up any invalid nodes, if they were detected if (has_invalid_weak_nodes) { @@ -236,6 +236,20 @@ StaticExecutorEntitiesCollector::add_node( guard_conditions_.push_back(node_ptr->get_notify_guard_condition()); } +void +StaticExecutorEntitiesCollector::add_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr) +{ + rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr; + auto insert_info = weak_groups_to_nodes_.insert(std::make_pair(weak_group_ptr, node_ptr)); + group_ptr->allow_executor_to_add().store(false); + bool was_inserted = insert_info.second; + if (!was_inserted) { + throw std::runtime_error("Callback group was already added to executor."); + } +} + bool StaticExecutorEntitiesCollector::remove_node( rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr) diff --git a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp index 728a0902a1..499d67e5dd 100644 --- a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp @@ -49,6 +49,28 @@ StaticSingleThreadedExecutor::spin() } } +void +StaticSingleThreadedExecutor::add_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + bool notify) +{ + // If the node already has an executor + std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic(); + if (has_executor.exchange(true)) { + throw std::runtime_error("Node has already been added to an executor."); + } + + if (notify && + node_ptr->get_associated_with_executor_atomic().exchange(true)) { + // Interrupt waiting to handle new node + if (rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) { + throw std::runtime_error(rcl_get_error_string().str); + } + } + entities_collector_->add_callback_group(group_ptr, node_ptr); +} + void StaticSingleThreadedExecutor::add_node( rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify) @@ -67,6 +89,15 @@ StaticSingleThreadedExecutor::add_node( } entities_collector_->add_node(node_ptr); + + std::for_each( + node_ptr->get_callback_groups().begin(), node_ptr->get_callback_groups().end(), + [this, node_ptr, notify] (rclcpp::CallbackGroup::WeakPtr weak_group){ + auto group_ptr = weak_group.lock(); + if (group_ptr != nullptr && !group_ptr->get_associated_with_executor_atomic().load()) { + add_callback_group(group_ptr, node_ptr, notify); + } + }); } void