From eed8d68c9e2be0d451c8d3dd7a259aa4a7b432bb Mon Sep 17 00:00:00 2001 From: Ishu Goel Date: Tue, 17 Sep 2019 15:23:29 +0200 Subject: [PATCH 01/11] Added static single threaded executor functionality Signed-off-by: Ishu Goel executor enhanced to run clients and waitable Signed-off-by: Ishu Goel tested executor Signed-off-by: Ishu Goel added semi-dynamic feature to the executor Signed-off-by: Ishu Goel Jenkins error fixes Signed-off-by: Ishu Goel Added static single threaded executor functionality Signed-off-by: Ishu Goel --- rclcpp/CMakeLists.txt | 2 + rclcpp/include/rclcpp/executable_list.hpp | 69 ++++ rclcpp/include/rclcpp/executor.hpp | 1 - rclcpp/include/rclcpp/executors.hpp | 1 + .../static_single_threaded_executor.hpp | 208 +++++++++++ rclcpp/include/rclcpp/memory_strategy.hpp | 1 + rclcpp/src/rclcpp/executable_list.cpp | 29 ++ rclcpp/src/rclcpp/executor.cpp | 1 + .../static_single_threaded_executor.cpp | 329 ++++++++++++++++++ 9 files changed, 640 insertions(+), 1 deletion(-) create mode 100644 rclcpp/include/rclcpp/executable_list.hpp create mode 100644 rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp create mode 100644 rclcpp/src/rclcpp/executable_list.cpp create mode 100644 rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 26361a2d83..524f90f287 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -40,11 +40,13 @@ set(${PROJECT_NAME}_SRCS src/rclcpp/duration.cpp src/rclcpp/event.cpp src/rclcpp/exceptions.cpp + src/rclcpp/executable_list.cpp src/rclcpp/executor.cpp src/rclcpp/executors.cpp src/rclcpp/expand_topic_or_service_name.cpp src/rclcpp/executors/multi_threaded_executor.cpp src/rclcpp/executors/single_threaded_executor.cpp + src/rclcpp/executors/static_single_threaded_executor.cpp src/rclcpp/graph_listener.cpp src/rclcpp/init_options.cpp src/rclcpp/intra_process_manager.cpp diff --git a/rclcpp/include/rclcpp/executable_list.hpp b/rclcpp/include/rclcpp/executable_list.hpp new file mode 100644 index 0000000000..c70de2a13e --- /dev/null +++ b/rclcpp/include/rclcpp/executable_list.hpp @@ -0,0 +1,69 @@ +// Copyright 2019 Nobleo Technology +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXECUTABLE_LIST_HPP_ +#define RCLCPP__EXECUTABLE_LIST_HPP_ + +#include + +#include "rclcpp/callback_group.hpp" +#include "rclcpp/client.hpp" +#include "rclcpp/macros.hpp" +#include "rclcpp/node_interfaces/node_base_interface.hpp" +#include "rclcpp/service.hpp" +#include "rclcpp/subscription.hpp" +#include "rclcpp/timer.hpp" +#include "rclcpp/visibility_control.hpp" +#include "rclcpp/waitable.hpp" + +namespace rclcpp +{ +namespace executor +{ + +/// This struct contains subscriptionbase, timerbase, etc. which can be used to run callbacks. +struct ExecutableList +{ + RCLCPP_PUBLIC + ExecutableList(); + + RCLCPP_PUBLIC + virtual ~ExecutableList(); + + // Vector containing the SubscriptionBase of all the subscriptions added to the executor. + std::vector subscription; + // Contains the count of added subscriptions + size_t number_of_subscriptions; + // Vector containing the TimerBase of all the timers added to the executor. + std::vector timer; + // Contains the count of added timers + size_t number_of_timers; + // Vector containing the ServiceBase of all the services added to the executor. + std::vector service; + // Contains the count of added services + size_t number_of_services; + // Vector containing the ClientBase of all the clients added to the executor. + std::vector client; + // Contains the count of added clients + size_t number_of_clients; + // Vector containing all the waitables added to the executor. + std::vector waitable; + // Contains the count of added waitables + size_t number_of_waitables; +}; + +} // namespace executor +} // namespace rclcpp + +#endif // RCLCPP__EXECUTABLE_LIST_HPP_ diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index b5ff96d66d..aa94c01476 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -357,7 +357,6 @@ class Executor /// The context associated with this executor. std::shared_ptr context_; -private: RCLCPP_DISABLE_COPY(Executor) std::list weak_nodes_; diff --git a/rclcpp/include/rclcpp/executors.hpp b/rclcpp/include/rclcpp/executors.hpp index 6c73994dea..ac91141a43 100644 --- a/rclcpp/include/rclcpp/executors.hpp +++ b/rclcpp/include/rclcpp/executors.hpp @@ -20,6 +20,7 @@ #include "rclcpp/executors/multi_threaded_executor.hpp" #include "rclcpp/executors/single_threaded_executor.hpp" +#include "rclcpp/executors/static_single_threaded_executor.hpp" #include "rclcpp/node.hpp" #include "rclcpp/utilities.hpp" #include "rclcpp/visibility_control.hpp" diff --git a/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp new file mode 100644 index 0000000000..62c66aef7d --- /dev/null +++ b/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp @@ -0,0 +1,208 @@ +// Copyright 2019 Nobleo Technology. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXECUTORS__STATIC_SINGLE_THREADED_EXECUTOR_HPP_ +#define RCLCPP__EXECUTORS__STATIC_SINGLE_THREADED_EXECUTOR_HPP_ + +#include + +#include +#include +#include +#include + +#include "rclcpp/executor.hpp" +#include "rclcpp/macros.hpp" +#include "rclcpp/memory_strategies.hpp" +#include "rclcpp/node.hpp" +#include "rclcpp/utilities.hpp" +#include "rclcpp/rate.hpp" +#include "rclcpp/visibility_control.hpp" + +namespace rclcpp +{ +namespace executors +{ + +/// Static executor implementation +/** + * This executor is a static version of original single threaded executor. + * This executor makes the assumption that system does not change during runtime. + * This means all nodes, callbackgroups, timers, subscriptions etc. are created + * before .spin() is called. + * + * To run this executor instead of SingleThreadedExecutor replace: + * rclcpp::executors::SingleThreadedExecutor exec; + * by + * rclcpp::executors::StaticSingleThreadedExecutor exec; + * in your source code and spin node(s) in the following way: + * exec.add_node(node); + * exec.spin(); + * exec.remove_node(node); + */ +class StaticSingleThreadedExecutor : public executor::Executor +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(StaticSingleThreadedExecutor) + + /// Default constructor. See the default constructor for Executor. + RCLCPP_PUBLIC + StaticSingleThreadedExecutor( + const executor::ExecutorArgs & args = executor::ExecutorArgs()); + + /// Default destrcutor. + RCLCPP_PUBLIC + virtual ~StaticSingleThreadedExecutor(); + + /// Static executor implementation of spin. + // This function will block until work comes in, execute it, and keep blocking. + // It will only be interrupt by a CTRL-C (managed by the global signal handler). + RCLCPP_PUBLIC + void + spin(); + + /// Spin (blocking) until the future is complete, it times out waiting, or rclcpp is interrupted. + /** + * \param[in] future The future to wait on. If this function returns SUCCESS, the future can be + * accessed without blocking (though it may still throw an exception). + * \param[in] timeout Optional timeout parameter, which gets passed to + * Executor::execute_ready_executables. + * `-1` is block forever, `0` is non-blocking. + * If the time spent inside the blocking loop exceeds this timeout, return a TIMEOUT return + * code. + * \return The return code, one of `SUCCESS`, `INTERRUPTED`, or `TIMEOUT`. + * + * Example usage: + * rclcpp::executors::StaticSingleThreadedExecutor exec; + * // ... other part of code like creating node + * // define future + * exec.add_node(node); + * exec.spin_until_future_complete(future); + */ + template + rclcpp::executor::FutureReturnCode + spin_until_future_complete( + std::shared_future & future, + std::chrono::duration timeout = std::chrono::duration(-1)) + { + std::future_status status = future.wait_for(std::chrono::seconds(0)); + if (status == std::future_status::ready) { + return rclcpp::executor::FutureReturnCode::SUCCESS; + } + + auto end_time = std::chrono::steady_clock::now(); + std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast( + timeout); + if (timeout_ns > std::chrono::nanoseconds::zero()) { + end_time += timeout_ns; + } + std::chrono::nanoseconds timeout_left = timeout_ns; + + rclcpp::executor::ExecutableList executable_list; + // Collect entities and clean any invalid nodes. + run_collect_entities(); + get_executable_list(executable_list); + while (rclcpp::ok(this->context_)) { + // Do one set of work. + execute_ready_executables(executable_list, timeout_left); + // Check if the future is set, return SUCCESS if it is. + status = future.wait_for(std::chrono::seconds(0)); + if (status == std::future_status::ready) { + return rclcpp::executor::FutureReturnCode::SUCCESS; + } + // If the original timeout is < 0, then this is blocking, never TIMEOUT. + if (timeout_ns < std::chrono::nanoseconds::zero()) { + continue; + } + // Otherwise check if we still have time to wait, return TIMEOUT if not. + auto now = std::chrono::steady_clock::now(); + if (now >= end_time) { + return rclcpp::executor::FutureReturnCode::TIMEOUT; + } + // Subtract the elapsed time from the original timeout. + timeout_left = std::chrono::duration_cast(end_time - now); + } + + // The future did not complete before ok() returned false, return INTERRUPTED. + return rclcpp::executor::FutureReturnCode::INTERRUPTED; + } + +protected: + /// Check which executables in ExecutableList struct are ready from wait_set and execute them. + /** + * \param[in] exec_list Structure that can hold subscriptionbases, timerbases, etc + * \param[in] timeout Optional timeout parameter. + */ + RCLCPP_PUBLIC + void + execute_ready_executables(executor::ExecutableList & exec_list, + std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); + + /// Get list of TimerBase of all available timers. + RCLCPP_PUBLIC + void + get_timer_list(executor::ExecutableList & exec_list); + + /// Get list of SubscriptionBase of all available subscriptions. + RCLCPP_PUBLIC + void + get_subscription_list(executor::ExecutableList & exec_list); + + /// Get list of ServiceBase of all available services. + RCLCPP_PUBLIC + void + get_service_list(executor::ExecutableList & exec_list); + + /// Get list of ClientBase of all available clients. + RCLCPP_PUBLIC + void + get_client_list(executor::ExecutableList & exec_list); + + /// Get list of all available waitables. + RCLCPP_PUBLIC + void + get_waitable_list(executor::ExecutableList & exec_list); + + /// Get available timers, subscribers, services, clients and waitables in ExecutableList struct. + /** + * \param[in] exec_list Structure that can hold subscriptionbases, timerbases, etc + * \param[in] timeout Optional timeout parameter. + */ + RCLCPP_PUBLIC + void + get_executable_list(executor::ExecutableList & executable_list, + std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); + + /// Function to run collect_entities() and clean any invalid nodes. + RCLCPP_PUBLIC + void run_collect_entities(); + + /// Function to add_handles_to_wait_set and wait for work and + // block until the wait set is ready or until the timeout has been exceeded. + RCLCPP_PUBLIC + void refresh_wait_set(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); + + /// Function to reallocate space for entities in the wait set. + RCLCPP_PUBLIC + void prepare_wait_set(); + + +private: + RCLCPP_DISABLE_COPY(StaticSingleThreadedExecutor) +}; + +} // namespace executors +} // namespace rclcpp + +#endif // RCLCPP__EXECUTORS__STATIC_SINGLE_THREADED_EXECUTOR_HPP_ diff --git a/rclcpp/include/rclcpp/memory_strategy.hpp b/rclcpp/include/rclcpp/memory_strategy.hpp index aaf730b9b2..03a07d0bf1 100644 --- a/rclcpp/include/rclcpp/memory_strategy.hpp +++ b/rclcpp/include/rclcpp/memory_strategy.hpp @@ -22,6 +22,7 @@ #include "rcl/wait.h" #include "rclcpp/any_executable.hpp" +#include "rclcpp/executable_list.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/node_interfaces/node_base_interface.hpp" #include "rclcpp/visibility_control.hpp" diff --git a/rclcpp/src/rclcpp/executable_list.cpp b/rclcpp/src/rclcpp/executable_list.cpp new file mode 100644 index 0000000000..ccd9e91722 --- /dev/null +++ b/rclcpp/src/rclcpp/executable_list.cpp @@ -0,0 +1,29 @@ +// Copyright 2019 Nobleo Technology +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rclcpp/executable_list.hpp" + +using rclcpp::executor::ExecutableList; + +ExecutableList::ExecutableList() +: + number_of_subscriptions(0), + number_of_timers(0), + number_of_services(0), + number_of_clients(0), + number_of_waitables(0) + {} + +ExecutableList::~ExecutableList() +{} diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 22f5c2f063..8670527b97 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -457,6 +457,7 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout) memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(), memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services(), memory_strategy_->number_of_ready_events()); + if (RCL_RET_OK != ret) { throw std::runtime_error( std::string("Couldn't resize the wait set : ") + rcl_get_error_string().str); diff --git a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp new file mode 100644 index 0000000000..526243d40c --- /dev/null +++ b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp @@ -0,0 +1,329 @@ +// Copyright 2019 Nobleo Technology. +// + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rclcpp/executors/static_single_threaded_executor.hpp" +#include "rclcpp/executable_list.hpp" +#include "rclcpp/scope_exit.hpp" + +using rclcpp::executors::StaticSingleThreadedExecutor; +using rclcpp::executor::ExecutableList; + +StaticSingleThreadedExecutor::StaticSingleThreadedExecutor(const rclcpp::executor::ExecutorArgs & args) +: executor::Executor(args) {} + +StaticSingleThreadedExecutor::~StaticSingleThreadedExecutor() {} + + +void +StaticSingleThreadedExecutor::spin() +{ + if (spinning.exchange(true)) { + throw std::runtime_error("spin() called while already spinning"); + } + RCLCPP_SCOPE_EXIT(this->spinning.store(false); ); + rclcpp::executor::ExecutableList executable_list; + run_collect_entities(); + get_executable_list(executable_list); + while (rclcpp::ok(this->context_) && spinning.load()) { + execute_ready_executables(executable_list); + } +} + +void +StaticSingleThreadedExecutor::get_timer_list(ExecutableList & exec_list) +{ + // Clear the previous timers (if any) from the ExecutableList struct + exec_list.timer.clear(); + exec_list.number_of_timers = 0; + for (auto & weak_node : weak_nodes_) { + auto node = weak_node.lock(); + if (!node) { + continue; + } + // Check in all the callback groups + for (auto & weak_group : node->get_callback_groups()) { + auto group = weak_group.lock(); + if (!group || !group->can_be_taken_from().load()) { + continue; + } + group->find_timer_ptrs_if([&exec_list](const rclcpp::TimerBase::SharedPtr & timer) { + if(timer){ + //if any timer is found, push it in the exec_list struct + exec_list.timer.push_back(timer); + exec_list.number_of_timers++; + } + return false; + }); + } + } +} + +void +StaticSingleThreadedExecutor::get_subscription_list(ExecutableList & exec_list) +{ + // Clear the previous subscriptions (if any) from the ExecutableList struct + exec_list.subscription.clear(); + exec_list.number_of_subscriptions = 0; + for (auto & weak_node : weak_nodes_) { + auto node = weak_node.lock(); + if (!node) { + continue; + } + // Check in all the callback groups + for (auto & weak_group : node->get_callback_groups()) { + auto group = weak_group.lock(); + if (!group || !group->can_be_taken_from().load()) { + continue; + } + group->find_subscription_ptrs_if([&exec_list](const rclcpp::SubscriptionBase::SharedPtr & subscription) { + if(subscription){ + //if any subscription (intra-process as well) is found, push it in the exec_list struct + exec_list.subscription.push_back(subscription); + exec_list.number_of_subscriptions++; + } + return false; + }); + } + } +} + +void +StaticSingleThreadedExecutor::get_service_list(ExecutableList & exec_list) +{ + // Clear the previous services (if any) from the ExecutableList struct + exec_list.service.clear(); + exec_list.number_of_services = 0; + for (auto & weak_node : weak_nodes_) { + auto node = weak_node.lock(); + if (!node) { + continue; + } + // Check in all the callback groups + for (auto & weak_group : node->get_callback_groups()) { + auto group = weak_group.lock(); + if (!group || !group->can_be_taken_from().load()) { + continue; + } + group->find_service_ptrs_if([&exec_list](const rclcpp::ServiceBase::SharedPtr & service) { + if(service){ + //if any service is found, push it in the exec_list struct + exec_list.service.push_back(service); + exec_list.number_of_services++; + } + return false; + }); + } + } +} + +void +StaticSingleThreadedExecutor::get_client_list(ExecutableList & exec_list) +{ + // Clear the previous clients (if any) from the ExecutableList struct + exec_list.client.clear(); + exec_list.number_of_clients = 0; + for (auto & weak_node : weak_nodes_) { + auto node = weak_node.lock(); + if (!node) { + continue; + } + // Check in all the callback groups + for (auto & weak_group : node->get_callback_groups()) { + auto group = weak_group.lock(); + if (!group || !group->can_be_taken_from().load()) { + continue; + } + group->find_client_ptrs_if([&exec_list](const rclcpp::ClientBase::SharedPtr & client) { + if(client){ + //if any client is found, push it in the exec_list struct + exec_list.client.push_back(client); + exec_list.number_of_clients++; + } + return false; + }); + } + } +} + +void +StaticSingleThreadedExecutor::get_waitable_list(ExecutableList & exec_list) +{ + // Clear the previous waitables (if any) from the ExecutableList struct + exec_list.waitable.clear(); + exec_list.number_of_waitables = 0; + for (auto & weak_node : weak_nodes_) { + auto node = weak_node.lock(); + if (!node) { + continue; + } + // Check in all the callback groups + for (auto & weak_group : node->get_callback_groups()) { + auto group = weak_group.lock(); + if (!group || !group->can_be_taken_from().load()) { + continue; + } + group->find_waitable_ptrs_if([&exec_list](const rclcpp::Waitable::SharedPtr & waitable) { + if(waitable){ + //if any waitable is found, push it in the exec_list struct + exec_list.waitable.push_back(waitable); + exec_list.number_of_waitables++; + } + return false; + }); + } + } +} + +void +StaticSingleThreadedExecutor::get_executable_list( + ExecutableList & executable_list, std::chrono::nanoseconds timeout) +{ + // prepare the wait_set + prepare_wait_set(); + + // add handles to the wait_set and wait for work + refresh_wait_set(timeout); + + // Get all the timers + get_timer_list(executable_list); + + // Get all the subscribers + get_subscription_list(executable_list); + + // Get all the services + get_service_list(executable_list); + + // Get all the clients + get_client_list(executable_list); + + // Get all the waitables + get_waitable_list(executable_list); +} + +void +StaticSingleThreadedExecutor::execute_ready_executables( + ExecutableList & exec_list, std::chrono::nanoseconds timeout) +{ + refresh_wait_set(timeout); + //Execute all the ready subscriptions + for (size_t i = 0; i < wait_set_.size_of_subscriptions; ++i) { + if (wait_set_.size_of_subscriptions && i < exec_list.number_of_subscriptions) { + if (wait_set_.subscriptions[i]) { + if (exec_list.subscription[i]->get_intra_process_subscription_handle()) { + execute_intra_process_subscription(exec_list.subscription[i]); + } + else { + execute_subscription(exec_list.subscription[i]); + } + } + } + } + //Execute all the ready timers + for (size_t i = 0; i < wait_set_.size_of_timers; ++i) { + if (wait_set_.size_of_timers && i < exec_list.number_of_timers) { + if (wait_set_.timers[i] && exec_list.timer[i]->is_ready()) { + execute_timer(exec_list.timer[i]); + } + } + } + //Execute all the ready services + for (size_t i = 0; i < wait_set_.size_of_services; ++i) { + if (wait_set_.size_of_services && i < exec_list.number_of_services) { + if (wait_set_.services[i]) { + execute_service(exec_list.service[i]); + } + } + } + //Execute all the ready clients + for (size_t i = 0; i < wait_set_.size_of_clients; ++i) { + if (wait_set_.size_of_clients && i < exec_list.number_of_clients) { + if (wait_set_.clients[i]) { + execute_client(exec_list.client[i]); + } + } + } + //Execute all the ready waitables + for (size_t i = 0; i < exec_list.number_of_waitables; ++i) { + if (exec_list.number_of_waitables && exec_list.waitable[i]->is_ready(&wait_set_)) { + exec_list.waitable[i]->execute(); + } + } +} + +void +StaticSingleThreadedExecutor::run_collect_entities() +{ + memory_strategy_->clear_handles(); + bool has_invalid_weak_nodes = memory_strategy_->collect_entities(weak_nodes_); + + // Clean up any invalid nodes, if they were detected + if (has_invalid_weak_nodes) { + auto node_it = weak_nodes_.begin(); + auto gc_it = guard_conditions_.begin(); + while (node_it != weak_nodes_.end()) { + if (node_it->expired()) { + node_it = weak_nodes_.erase(node_it); + memory_strategy_->remove_guard_condition(*gc_it); + gc_it = guard_conditions_.erase(gc_it); + } else { + ++node_it; + ++gc_it; + } + } + } +} + +void +StaticSingleThreadedExecutor::prepare_wait_set() +{ + // clear wait set + if (rcl_wait_set_clear(&wait_set_) != RCL_RET_OK) { + throw std::runtime_error("Couldn't clear wait set"); + } + + // The size of waitables are accounted for in size of the other entities + rcl_ret_t ret = rcl_wait_set_resize( + &wait_set_, memory_strategy_->number_of_ready_subscriptions(), + memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(), + memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services(), + memory_strategy_->number_of_ready_events()); + if (RCL_RET_OK != ret) { + throw std::runtime_error( + std::string("Couldn't resize the wait set : ") + rcl_get_error_string().str); + } +} + +void +StaticSingleThreadedExecutor::refresh_wait_set(std::chrono::nanoseconds timeout) +{ + // clear wait set + if (rcl_wait_set_clear(&wait_set_) != RCL_RET_OK) { + throw std::runtime_error("Couldn't clear wait set"); + } + // add handles to the wait_set + if (!memory_strategy_->add_handles_to_wait_set(&wait_set_)) { + throw std::runtime_error("Couldn't fill wait set"); + } + rcl_ret_t status = + rcl_wait(&wait_set_, std::chrono::duration_cast(timeout).count()); + if (status == RCL_RET_WAIT_SET_EMPTY) { + RCUTILS_LOG_WARN_NAMED( + "rclcpp", + "empty wait set received in rcl_wait(). This should never happen."); + } else if (status != RCL_RET_OK && status != RCL_RET_TIMEOUT) { + using rclcpp::exceptions::throw_from_rcl_error; + throw_from_rcl_error(status, "rcl_wait() failed"); + } +} From 8a3d56cb859a3b86d6146c6a05c197f81690e464 Mon Sep 17 00:00:00 2001 From: Ishu Goel Date: Fri, 18 Oct 2019 11:00:41 +0200 Subject: [PATCH 02/11] Added semi-dynamic feature and made changes based on review comments Signed-off-by: Ishu Goel --- rclcpp/include/rclcpp/executable_list.hpp | 3 +- .../static_single_threaded_executor.hpp | 17 +- rclcpp/include/rclcpp/memory_strategy.hpp | 1 - rclcpp/src/rclcpp/executable_list.cpp | 7 +- rclcpp/src/rclcpp/executor.cpp | 1 - .../static_single_threaded_executor.cpp | 156 +++++++++--------- 6 files changed, 93 insertions(+), 92 deletions(-) diff --git a/rclcpp/include/rclcpp/executable_list.hpp b/rclcpp/include/rclcpp/executable_list.hpp index c70de2a13e..1939ee50b5 100644 --- a/rclcpp/include/rclcpp/executable_list.hpp +++ b/rclcpp/include/rclcpp/executable_list.hpp @@ -1,5 +1,3 @@ -// Copyright 2019 Nobleo Technology -// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -16,6 +14,7 @@ #define RCLCPP__EXECUTABLE_LIST_HPP_ #include +#include #include "rclcpp/callback_group.hpp" #include "rclcpp/client.hpp" diff --git a/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp index 62c66aef7d..b3365416be 100644 --- a/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp @@ -1,5 +1,3 @@ -// Copyright 2019 Nobleo Technology. -// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -21,10 +19,11 @@ #include #include #include - +#include #include "rclcpp/executor.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/memory_strategies.hpp" +#include "rclcpp/executable_list.hpp" #include "rclcpp/node.hpp" #include "rclcpp/utilities.hpp" #include "rclcpp/rate.hpp" @@ -146,8 +145,9 @@ class StaticSingleThreadedExecutor : public executor::Executor */ RCLCPP_PUBLIC void - execute_ready_executables(executor::ExecutableList & exec_list, - std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); + execute_ready_executables( + executor::ExecutableList & exec_list, + std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); /// Get list of TimerBase of all available timers. RCLCPP_PUBLIC @@ -181,15 +181,16 @@ class StaticSingleThreadedExecutor : public executor::Executor */ RCLCPP_PUBLIC void - get_executable_list(executor::ExecutableList & executable_list, - std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); + get_executable_list( + executor::ExecutableList & executable_list, + std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); /// Function to run collect_entities() and clean any invalid nodes. RCLCPP_PUBLIC void run_collect_entities(); /// Function to add_handles_to_wait_set and wait for work and - // block until the wait set is ready or until the timeout has been exceeded. + // block until the wait set is ready or until the timeout has been exceeded. RCLCPP_PUBLIC void refresh_wait_set(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); diff --git a/rclcpp/include/rclcpp/memory_strategy.hpp b/rclcpp/include/rclcpp/memory_strategy.hpp index 03a07d0bf1..aaf730b9b2 100644 --- a/rclcpp/include/rclcpp/memory_strategy.hpp +++ b/rclcpp/include/rclcpp/memory_strategy.hpp @@ -22,7 +22,6 @@ #include "rcl/wait.h" #include "rclcpp/any_executable.hpp" -#include "rclcpp/executable_list.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/node_interfaces/node_base_interface.hpp" #include "rclcpp/visibility_control.hpp" diff --git a/rclcpp/src/rclcpp/executable_list.cpp b/rclcpp/src/rclcpp/executable_list.cpp index ccd9e91722..a1ed845055 100644 --- a/rclcpp/src/rclcpp/executable_list.cpp +++ b/rclcpp/src/rclcpp/executable_list.cpp @@ -1,5 +1,3 @@ -// Copyright 2019 Nobleo Technology -// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -17,13 +15,12 @@ using rclcpp::executor::ExecutableList; ExecutableList::ExecutableList() -: - number_of_subscriptions(0), +: number_of_subscriptions(0), number_of_timers(0), number_of_services(0), number_of_clients(0), number_of_waitables(0) - {} +{} ExecutableList::~ExecutableList() {} diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 8670527b97..22f5c2f063 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -457,7 +457,6 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout) memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(), memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services(), memory_strategy_->number_of_ready_events()); - if (RCL_RET_OK != ret) { throw std::runtime_error( std::string("Couldn't resize the wait set : ") + rcl_get_error_string().str); diff --git a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp index 526243d40c..f3a79e76c9 100644 --- a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp @@ -1,6 +1,3 @@ -// Copyright 2019 Nobleo Technology. -// - // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -14,13 +11,13 @@ // limitations under the License. #include "rclcpp/executors/static_single_threaded_executor.hpp" -#include "rclcpp/executable_list.hpp" #include "rclcpp/scope_exit.hpp" using rclcpp::executors::StaticSingleThreadedExecutor; using rclcpp::executor::ExecutableList; -StaticSingleThreadedExecutor::StaticSingleThreadedExecutor(const rclcpp::executor::ExecutorArgs & args) +StaticSingleThreadedExecutor::StaticSingleThreadedExecutor( + const rclcpp::executor::ExecutorArgs & args) : executor::Executor(args) {} StaticSingleThreadedExecutor::~StaticSingleThreadedExecutor() {} @@ -59,13 +56,13 @@ StaticSingleThreadedExecutor::get_timer_list(ExecutableList & exec_list) continue; } group->find_timer_ptrs_if([&exec_list](const rclcpp::TimerBase::SharedPtr & timer) { - if(timer){ - //if any timer is found, push it in the exec_list struct - exec_list.timer.push_back(timer); - exec_list.number_of_timers++; - } - return false; - }); + if(timer){ + // If any timer is found, push it in the exec_list struct + exec_list.timer.push_back(timer); + exec_list.number_of_timers++; + } + return false; + }); } } } @@ -87,14 +84,15 @@ StaticSingleThreadedExecutor::get_subscription_list(ExecutableList & exec_list) if (!group || !group->can_be_taken_from().load()) { continue; } - group->find_subscription_ptrs_if([&exec_list](const rclcpp::SubscriptionBase::SharedPtr & subscription) { - if(subscription){ - //if any subscription (intra-process as well) is found, push it in the exec_list struct - exec_list.subscription.push_back(subscription); - exec_list.number_of_subscriptions++; - } - return false; - }); + group->find_subscription_ptrs_if([&exec_list]( + const rclcpp::SubscriptionBase::SharedPtr & subscription) { + if(subscription){ + // If any subscription (intra-process as well) is found, push it in the exec_list struct + exec_list.subscription.push_back(subscription); + exec_list.number_of_subscriptions++; + } + return false; + }); } } } @@ -117,13 +115,13 @@ StaticSingleThreadedExecutor::get_service_list(ExecutableList & exec_list) continue; } group->find_service_ptrs_if([&exec_list](const rclcpp::ServiceBase::SharedPtr & service) { - if(service){ - //if any service is found, push it in the exec_list struct - exec_list.service.push_back(service); - exec_list.number_of_services++; - } - return false; - }); + if(service){ + // If any service is found, push it in the exec_list struct + exec_list.service.push_back(service); + exec_list.number_of_services++; + } + return false; + }); } } } @@ -146,13 +144,13 @@ StaticSingleThreadedExecutor::get_client_list(ExecutableList & exec_list) continue; } group->find_client_ptrs_if([&exec_list](const rclcpp::ClientBase::SharedPtr & client) { - if(client){ - //if any client is found, push it in the exec_list struct - exec_list.client.push_back(client); - exec_list.number_of_clients++; - } - return false; - }); + if(client){ + // If any client is found, push it in the exec_list struct + exec_list.client.push_back(client); + exec_list.number_of_clients++; + } + return false; + }); } } } @@ -175,13 +173,13 @@ StaticSingleThreadedExecutor::get_waitable_list(ExecutableList & exec_list) continue; } group->find_waitable_ptrs_if([&exec_list](const rclcpp::Waitable::SharedPtr & waitable) { - if(waitable){ - //if any waitable is found, push it in the exec_list struct - exec_list.waitable.push_back(waitable); - exec_list.number_of_waitables++; - } - return false; - }); + if(waitable){ + // If any waitable is found, push it in the exec_list struct + exec_list.waitable.push_back(waitable); + exec_list.number_of_waitables++; + } + return false; + }); } } } @@ -216,50 +214,57 @@ void StaticSingleThreadedExecutor::execute_ready_executables( ExecutableList & exec_list, std::chrono::nanoseconds timeout) { - refresh_wait_set(timeout); - //Execute all the ready subscriptions - for (size_t i = 0; i < wait_set_.size_of_subscriptions; ++i) { - if (wait_set_.size_of_subscriptions && i < exec_list.number_of_subscriptions) { - if (wait_set_.subscriptions[i]) { - if (exec_list.subscription[i]->get_intra_process_subscription_handle()) { - execute_intra_process_subscription(exec_list.subscription[i]); - } - else { - execute_subscription(exec_list.subscription[i]); - } + refresh_wait_set(timeout); + // Execute all the ready subscriptions + for (size_t i = 0; i < wait_set_.size_of_subscriptions; ++i) { + if (wait_set_.size_of_subscriptions && i < exec_list.number_of_subscriptions) { + if (wait_set_.subscriptions[i]) { + if (exec_list.subscription[i]->get_intra_process_subscription_handle()) { + execute_intra_process_subscription(exec_list.subscription[i]); + } else { + execute_subscription(exec_list.subscription[i]); } } } - //Execute all the ready timers - for (size_t i = 0; i < wait_set_.size_of_timers; ++i) { - if (wait_set_.size_of_timers && i < exec_list.number_of_timers) { - if (wait_set_.timers[i] && exec_list.timer[i]->is_ready()) { - execute_timer(exec_list.timer[i]); - } + } + // Execute all the ready timers + for (size_t i = 0; i < wait_set_.size_of_timers; ++i) { + if (wait_set_.size_of_timers && i < exec_list.number_of_timers) { + if (wait_set_.timers[i] && exec_list.timer[i]->is_ready()) { + execute_timer(exec_list.timer[i]); } } - //Execute all the ready services - for (size_t i = 0; i < wait_set_.size_of_services; ++i) { - if (wait_set_.size_of_services && i < exec_list.number_of_services) { - if (wait_set_.services[i]) { - execute_service(exec_list.service[i]); - } + } + // Execute all the ready services + for (size_t i = 0; i < wait_set_.size_of_services; ++i) { + if (wait_set_.size_of_services && i < exec_list.number_of_services) { + if (wait_set_.services[i]) { + execute_service(exec_list.service[i]); } } - //Execute all the ready clients - for (size_t i = 0; i < wait_set_.size_of_clients; ++i) { - if (wait_set_.size_of_clients && i < exec_list.number_of_clients) { - if (wait_set_.clients[i]) { - execute_client(exec_list.client[i]); - } + } + // Execute all the ready clients + for (size_t i = 0; i < wait_set_.size_of_clients; ++i) { + if (wait_set_.size_of_clients && i < exec_list.number_of_clients) { + if (wait_set_.clients[i]) { + execute_client(exec_list.client[i]); } } - //Execute all the ready waitables - for (size_t i = 0; i < exec_list.number_of_waitables; ++i) { - if (exec_list.number_of_waitables && exec_list.waitable[i]->is_ready(&wait_set_)) { - exec_list.waitable[i]->execute(); - } + } + // Execute all the ready waitables + for (size_t i = 0; i < exec_list.number_of_waitables; ++i) { + if (exec_list.number_of_waitables && exec_list.waitable[i]->is_ready(&wait_set_)) { + exec_list.waitable[i]->execute(); } + } + // Check the guard_conditions to see if anything is added to the executor + for (size_t i = 0; i < wait_set_.size_of_guard_conditions; ++i) { + if (wait_set_.guard_conditions[i]) { + // rebuild the wait_set and ExecutableList struct + run_collect_entities(); + get_executable_list(exec_list); + } + } } void @@ -299,6 +304,7 @@ StaticSingleThreadedExecutor::prepare_wait_set() memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(), memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services(), memory_strategy_->number_of_ready_events()); + if (RCL_RET_OK != ret) { throw std::runtime_error( std::string("Couldn't resize the wait set : ") + rcl_get_error_string().str); From fb010821e1ba8f270c3adae6ff0474989e65deaf Mon Sep 17 00:00:00 2001 From: MartinCornelis2 Date: Tue, 3 Mar 2020 16:21:47 +0100 Subject: [PATCH 03/11] re-added accidentally deleted code in node.hpp, fixed static_single_threaded_executor.cpp w.r.t. intra-process change since last commit Signed-off-by: MartinCornelis2 --- .../static_single_threaded_executor.cpp | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp index f3a79e76c9..5db47f5b95 100644 --- a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp @@ -214,16 +214,12 @@ void StaticSingleThreadedExecutor::execute_ready_executables( ExecutableList & exec_list, std::chrono::nanoseconds timeout) { - refresh_wait_set(timeout); + refresh_wait_set(timeout); // Execute all the ready subscriptions for (size_t i = 0; i < wait_set_.size_of_subscriptions; ++i) { if (wait_set_.size_of_subscriptions && i < exec_list.number_of_subscriptions) { if (wait_set_.subscriptions[i]) { - if (exec_list.subscription[i]->get_intra_process_subscription_handle()) { - execute_intra_process_subscription(exec_list.subscription[i]); - } else { - execute_subscription(exec_list.subscription[i]); - } + execute_subscription(exec_list.subscription[i]); } } } @@ -231,7 +227,7 @@ StaticSingleThreadedExecutor::execute_ready_executables( for (size_t i = 0; i < wait_set_.size_of_timers; ++i) { if (wait_set_.size_of_timers && i < exec_list.number_of_timers) { if (wait_set_.timers[i] && exec_list.timer[i]->is_ready()) { - execute_timer(exec_list.timer[i]); + execute_timer(exec_list.timer[i]); } } } @@ -239,7 +235,7 @@ StaticSingleThreadedExecutor::execute_ready_executables( for (size_t i = 0; i < wait_set_.size_of_services; ++i) { if (wait_set_.size_of_services && i < exec_list.number_of_services) { if (wait_set_.services[i]) { - execute_service(exec_list.service[i]); + execute_service(exec_list.service[i]); } } } @@ -247,7 +243,7 @@ StaticSingleThreadedExecutor::execute_ready_executables( for (size_t i = 0; i < wait_set_.size_of_clients; ++i) { if (wait_set_.size_of_clients && i < exec_list.number_of_clients) { if (wait_set_.clients[i]) { - execute_client(exec_list.client[i]); + execute_client(exec_list.client[i]); } } } @@ -259,10 +255,12 @@ StaticSingleThreadedExecutor::execute_ready_executables( } // Check the guard_conditions to see if anything is added to the executor for (size_t i = 0; i < wait_set_.size_of_guard_conditions; ++i) { - if (wait_set_.guard_conditions[i]) { - // rebuild the wait_set and ExecutableList struct + if (wait_set_.guard_conditions[i] && + wait_set_.guard_conditions[i]!= context_->get_interrupt_guard_condition(&wait_set_) && + wait_set_.guard_conditions[i]!= &interrupt_guard_condition_) { run_collect_entities(); get_executable_list(exec_list); + break; } } } From 0ccc766e562487d0fed8aa348f5d87bf92753c38 Mon Sep 17 00:00:00 2001 From: Mauro Date: Tue, 3 Mar 2020 18:29:15 +0000 Subject: [PATCH 04/11] Remove not needed comparison wait_set_.size_of_* is always different than '0' if we are inside the for loop Signed-off-by: Mauro --- .../executors/static_single_threaded_executor.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp index 5db47f5b95..6fbe273de3 100644 --- a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp @@ -217,7 +217,7 @@ StaticSingleThreadedExecutor::execute_ready_executables( refresh_wait_set(timeout); // Execute all the ready subscriptions for (size_t i = 0; i < wait_set_.size_of_subscriptions; ++i) { - if (wait_set_.size_of_subscriptions && i < exec_list.number_of_subscriptions) { + if (i < exec_list.number_of_subscriptions) { if (wait_set_.subscriptions[i]) { execute_subscription(exec_list.subscription[i]); } @@ -225,7 +225,7 @@ StaticSingleThreadedExecutor::execute_ready_executables( } // Execute all the ready timers for (size_t i = 0; i < wait_set_.size_of_timers; ++i) { - if (wait_set_.size_of_timers && i < exec_list.number_of_timers) { + if (i < exec_list.number_of_timers) { if (wait_set_.timers[i] && exec_list.timer[i]->is_ready()) { execute_timer(exec_list.timer[i]); } @@ -233,7 +233,7 @@ StaticSingleThreadedExecutor::execute_ready_executables( } // Execute all the ready services for (size_t i = 0; i < wait_set_.size_of_services; ++i) { - if (wait_set_.size_of_services && i < exec_list.number_of_services) { + if (i < exec_list.number_of_services) { if (wait_set_.services[i]) { execute_service(exec_list.service[i]); } @@ -241,7 +241,7 @@ StaticSingleThreadedExecutor::execute_ready_executables( } // Execute all the ready clients for (size_t i = 0; i < wait_set_.size_of_clients; ++i) { - if (wait_set_.size_of_clients && i < exec_list.number_of_clients) { + if (i < exec_list.number_of_clients) { if (wait_set_.clients[i]) { execute_client(exec_list.client[i]); } @@ -249,7 +249,7 @@ StaticSingleThreadedExecutor::execute_ready_executables( } // Execute all the ready waitables for (size_t i = 0; i < exec_list.number_of_waitables; ++i) { - if (exec_list.number_of_waitables && exec_list.waitable[i]->is_ready(&wait_set_)) { + if (exec_list.waitable[i]->is_ready(&wait_set_)) { exec_list.waitable[i]->execute(); } } From 05c18033ff1265651a66a095d7189028902c2ab9 Mon Sep 17 00:00:00 2001 From: Mauro Date: Wed, 4 Mar 2020 11:14:23 +0000 Subject: [PATCH 05/11] If new entity added to a node: re-collect entities Now we check ONLY node guard_conditions_ Some possible guard conditions to be triggered HERE are: 1. Ctrl+C guard condition 2. Executor interrupt_guard_condition_ 3. Node guard_conditions_ 4. Waitables guard conditions 5. ..more The previous approach was only checking if NOT (1 & 2), so if a Waitable was triggered, it would re-collect all entities, even if no new node entity was added. This was the case of the intra process manager, who relies on waitables. Every time a subscriber got a message, all the entities were collected. Signed-off-by: Mauro --- .../static_single_threaded_executor.cpp | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp index 6fbe273de3..ca68850443 100644 --- a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp @@ -253,14 +253,19 @@ StaticSingleThreadedExecutor::execute_ready_executables( exec_list.waitable[i]->execute(); } } - // Check the guard_conditions to see if anything is added to the executor + // Check the guard_conditions to see if a new entity was added to a node for (size_t i = 0; i < wait_set_.size_of_guard_conditions; ++i) { - if (wait_set_.guard_conditions[i] && - wait_set_.guard_conditions[i]!= context_->get_interrupt_guard_condition(&wait_set_) && - wait_set_.guard_conditions[i]!= &interrupt_guard_condition_) { - run_collect_entities(); - get_executable_list(exec_list); - break; + if (wait_set_.guard_conditions[i]) { + // Check if the guard condition triggered belongs to a node + auto it = std::find(guard_conditions_.begin(), guard_conditions_.end(), + wait_set_.guard_conditions[i]); + + // If it does, re-collect entities + if (it != guard_conditions_.end()) { + run_collect_entities(); + get_executable_list(exec_list); + break; + } } } } From 2e9bea3902f250729bd36848d15eaf8d9da3d8b1 Mon Sep 17 00:00:00 2001 From: Mauro Date: Wed, 4 Mar 2020 21:15:30 +0000 Subject: [PATCH 06/11] Implement static executor entities collector Signed-off-by: Mauro --- rclcpp/CMakeLists.txt | 1 + rclcpp/include/rclcpp/executable_list.hpp | 26 +- .../static_executor_entities_collector.hpp | 154 +++++++++ .../static_single_threaded_executor.hpp | 107 +++---- rclcpp/include/rclcpp/memory_strategy.hpp | 1 + .../strategies/allocator_memory_strategy.hpp | 9 +- rclcpp/src/rclcpp/executable_list.cpp | 54 ++++ .../static_executor_entities_collector.cpp | 281 ++++++++++++++++ .../static_single_threaded_executor.cpp | 302 +++--------------- 9 files changed, 622 insertions(+), 313 deletions(-) create mode 100644 rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp create mode 100644 rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 524f90f287..fb80b4be88 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -46,6 +46,7 @@ set(${PROJECT_NAME}_SRCS src/rclcpp/expand_topic_or_service_name.cpp src/rclcpp/executors/multi_threaded_executor.cpp src/rclcpp/executors/single_threaded_executor.cpp + src/rclcpp/executors/static_executor_entities_collector.cpp src/rclcpp/executors/static_single_threaded_executor.cpp src/rclcpp/graph_listener.cpp src/rclcpp/init_options.cpp diff --git a/rclcpp/include/rclcpp/executable_list.hpp b/rclcpp/include/rclcpp/executable_list.hpp index 1939ee50b5..2a243442a2 100644 --- a/rclcpp/include/rclcpp/executable_list.hpp +++ b/rclcpp/include/rclcpp/executable_list.hpp @@ -38,7 +38,31 @@ struct ExecutableList ExecutableList(); RCLCPP_PUBLIC - virtual ~ExecutableList(); + ~ExecutableList(); + + RCLCPP_PUBLIC + void + clear(); + + RCLCPP_PUBLIC + void + add_subscription(rclcpp::SubscriptionBase::SharedPtr); + + RCLCPP_PUBLIC + void + add_timer(rclcpp::TimerBase::SharedPtr); + + RCLCPP_PUBLIC + void + add_service(rclcpp::ServiceBase::SharedPtr); + + RCLCPP_PUBLIC + void + add_client(rclcpp::ClientBase::SharedPtr); + + RCLCPP_PUBLIC + void + add_waitable(rclcpp::Waitable::SharedPtr); // Vector containing the SubscriptionBase of all the subscriptions added to the executor. std::vector subscription; diff --git a/rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp b/rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp new file mode 100644 index 0000000000..c10705eb29 --- /dev/null +++ b/rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp @@ -0,0 +1,154 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXECUTORS__STATIC_EXECUTOR_ENTITIES_COLLECTOR_HPP_ +#define RCLCPP__EXECUTORS__STATIC_EXECUTOR_ENTITIES_COLLECTOR_HPP_ + +namespace rclcpp +{ +namespace executors +{ + +class StaticExecutorEntitiesCollector : + public rclcpp::Waitable, + public std::enable_shared_from_this +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(StaticExecutorEntitiesCollector) + + // Constructor + RCLCPP_PUBLIC + StaticExecutorEntitiesCollector() = default; + + // Destructor + ~StaticExecutorEntitiesCollector(); + + RCLCPP_PUBLIC + void + init(rcl_wait_set_t* p_wait_set, + memory_strategy::MemoryStrategy::SharedPtr& memory_strategy, + rcl_guard_condition_t* executor_guard_condition); + + RCLCPP_PUBLIC + void + execute(); + + RCLCPP_PUBLIC + void + fill_memory_strategy(); + + RCLCPP_PUBLIC + void + fill_executable_list(); + + /// Function to reallocate space for entities in the wait set. + RCLCPP_PUBLIC + void + prepare_wait_set(); + + /// Function to add_handles_to_wait_set and wait for work and + // block until the wait set is ready or until the timeout has been exceeded. + RCLCPP_PUBLIC + void + refresh_wait_set(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); + + RCLCPP_PUBLIC + bool + add_to_wait_set(rcl_wait_set_t * wait_set) override; + + RCLCPP_PUBLIC + size_t + get_number_of_ready_guard_conditions() override; + + RCLCPP_PUBLIC + void + add_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr); + + RCLCPP_PUBLIC + bool + remove_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr); + + /// Complete all available queued work without blocking. + /** + * This function checks if after the guard condition was triggered + * (or a spurious wakeup happened) we are really ready to execute + * i.e. re-collect entities + */ + RCLCPP_PUBLIC + bool + is_ready(rcl_wait_set_t * wait_set) override; + + RCLCPP_PUBLIC + size_t + get_number_of_timers() {return exec_list_.number_of_timers;} + + RCLCPP_PUBLIC + size_t + get_number_of_subscriptions() {return exec_list_.number_of_subscriptions;} + + RCLCPP_PUBLIC + size_t + get_number_of_services() {return exec_list_.number_of_services;} + + RCLCPP_PUBLIC + size_t + get_number_of_clients() {return exec_list_.number_of_clients;} + + RCLCPP_PUBLIC + size_t + get_number_of_waitables() {return exec_list_.number_of_waitables;} + + RCLCPP_PUBLIC + rclcpp::SubscriptionBase::SharedPtr + get_subscription(size_t i) {return exec_list_.subscription[i];} + + RCLCPP_PUBLIC + rclcpp::TimerBase::SharedPtr + get_timer(size_t i) {return exec_list_.timer[i];} + + RCLCPP_PUBLIC + rclcpp::ServiceBase::SharedPtr + get_service(size_t i) {return exec_list_.service[i];} + + RCLCPP_PUBLIC + rclcpp::ClientBase::SharedPtr + get_client(size_t i) {return exec_list_.client[i];} + + RCLCPP_PUBLIC + rclcpp::Waitable::SharedPtr + get_waitable(size_t i) {return exec_list_.waitable[i];} + +private: + /// Nodes guard conditions which trigger this waitable + std::list guard_conditions_; + + /// Memory strategy: an interface for handling user-defined memory allocation strategies. + memory_strategy::MemoryStrategy::SharedPtr memory_strategy_; + + /// List of weak nodes registered in the static executor + std::list weak_nodes_; + + /// Wait set for managing entities that the rmw layer waits on. + rcl_wait_set_t* p_wait_set_ = nullptr; + + /// Executable list: timers, subscribers, clients, services and waitables + executor::ExecutableList exec_list_; +}; + +} // namespace executors +} // namespace rclcpp + +#endif // RCLCPP__EXECUTORS__STATIC_EXECUTOR_ENTITIES_COLLECTOR_HPP_ \ No newline at end of file diff --git a/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp index b3365416be..b73fbce8be 100644 --- a/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp @@ -28,6 +28,7 @@ #include "rclcpp/utilities.hpp" #include "rclcpp/rate.hpp" #include "rclcpp/visibility_control.hpp" +#include "rclcpp/executors/static_executor_entities_collector.hpp" namespace rclcpp { @@ -36,10 +37,10 @@ namespace executors /// Static executor implementation /** - * This executor is a static version of original single threaded executor. - * This executor makes the assumption that system does not change during runtime. - * This means all nodes, callbackgroups, timers, subscriptions etc. are created - * before .spin() is called. + * This executor is a static version of the original single threaded executor. + * It's static because it doesn't reconstruct the executable list for every iteration. + * All nodes, callbackgroups, timers, subscriptions etc. are created before + * spin() is called, and modified only when an entity is added/removed to/from a node. * * To run this executor instead of SingleThreadedExecutor replace: * rclcpp::executors::SingleThreadedExecutor exec; @@ -71,6 +72,39 @@ class StaticSingleThreadedExecutor : public executor::Executor void spin(); + /// Add a node to the executor. + /** + * An executor can have zero or more nodes which provide work during `spin` functions. + * \param[in] node_ptr Shared pointer to the node to be added. + * \param[in] notify True to trigger the interrupt guard condition during this function. If + * the executor is blocked at the rmw layer while waiting for work and it is notified that a new + * node was added, it will wake up. + */ + RCLCPP_PUBLIC + void + add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify = true) override; + + /// Convenience function which takes Node and forwards NodeBaseInterface. + RCLCPP_PUBLIC + void + add_node(std::shared_ptr node_ptr, bool notify = true) override; + + /// Remove a node from the executor. + /** + * \param[in] node_ptr Shared pointer to the node to remove. + * \param[in] notify True to trigger the interrupt guard condition and wake up the executor. + * This is useful if the last node was removed from the executor while the executor was blocked + * waiting for work in another thread, because otherwise the executor would never be notified. + */ + RCLCPP_PUBLIC + void + remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify = true) override; + + /// Convenience function which takes Node and forwards NodeBaseInterface. + RCLCPP_PUBLIC + void + remove_node(std::shared_ptr node_ptr, bool notify = true) override; + /// Spin (blocking) until the future is complete, it times out waiting, or rclcpp is interrupted. /** * \param[in] future The future to wait on. If this function returns SUCCESS, the future can be @@ -108,13 +142,13 @@ class StaticSingleThreadedExecutor : public executor::Executor } std::chrono::nanoseconds timeout_left = timeout_ns; - rclcpp::executor::ExecutableList executable_list; - // Collect entities and clean any invalid nodes. - run_collect_entities(); - get_executable_list(executable_list); + entities_collector_ = std::make_shared(); + entities_collector_->init(&wait_set_, memory_strategy_, &interrupt_guard_condition_); + while (rclcpp::ok(this->context_)) { // Do one set of work. - execute_ready_executables(executable_list, timeout_left); + entities_collector_->refresh_wait_set(timeout_left); + execute_ready_executables(); // Check if the future is set, return SUCCESS if it is. status = future.wait_for(std::chrono::seconds(0)); if (status == std::future_status::ready) { @@ -145,62 +179,13 @@ class StaticSingleThreadedExecutor : public executor::Executor */ RCLCPP_PUBLIC void - execute_ready_executables( - executor::ExecutableList & exec_list, - std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); - - /// Get list of TimerBase of all available timers. - RCLCPP_PUBLIC - void - get_timer_list(executor::ExecutableList & exec_list); - - /// Get list of SubscriptionBase of all available subscriptions. - RCLCPP_PUBLIC - void - get_subscription_list(executor::ExecutableList & exec_list); - - /// Get list of ServiceBase of all available services. - RCLCPP_PUBLIC - void - get_service_list(executor::ExecutableList & exec_list); - - /// Get list of ClientBase of all available clients. - RCLCPP_PUBLIC - void - get_client_list(executor::ExecutableList & exec_list); - - /// Get list of all available waitables. - RCLCPP_PUBLIC - void - get_waitable_list(executor::ExecutableList & exec_list); - - /// Get available timers, subscribers, services, clients and waitables in ExecutableList struct. - /** - * \param[in] exec_list Structure that can hold subscriptionbases, timerbases, etc - * \param[in] timeout Optional timeout parameter. - */ - RCLCPP_PUBLIC - void - get_executable_list( - executor::ExecutableList & executable_list, - std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); - - /// Function to run collect_entities() and clean any invalid nodes. - RCLCPP_PUBLIC - void run_collect_entities(); - - /// Function to add_handles_to_wait_set and wait for work and - // block until the wait set is ready or until the timeout has been exceeded. - RCLCPP_PUBLIC - void refresh_wait_set(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); - - /// Function to reallocate space for entities in the wait set. - RCLCPP_PUBLIC - void prepare_wait_set(); + execute_ready_executables(); private: RCLCPP_DISABLE_COPY(StaticSingleThreadedExecutor) + + StaticExecutorEntitiesCollector::SharedPtr entities_collector_; }; } // namespace executors diff --git a/rclcpp/include/rclcpp/memory_strategy.hpp b/rclcpp/include/rclcpp/memory_strategy.hpp index aaf730b9b2..3549802bdd 100644 --- a/rclcpp/include/rclcpp/memory_strategy.hpp +++ b/rclcpp/include/rclcpp/memory_strategy.hpp @@ -56,6 +56,7 @@ class RCLCPP_PUBLIC MemoryStrategy virtual size_t number_of_guard_conditions() const = 0; virtual size_t number_of_waitables() const = 0; + virtual void add_waitable_handle(const rclcpp::Waitable::SharedPtr & waitable) = 0; virtual bool add_handles_to_wait_set(rcl_wait_set_t * wait_set) = 0; virtual void clear_handles() = 0; virtual void remove_null_handles(rcl_wait_set_t * wait_set) = 0; diff --git a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp index dae737060b..487799f750 100644 --- a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp @@ -169,7 +169,6 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy subscription_handles_.push_back(subscription->get_subscription_handle()); return false; }); - group->find_service_ptrs_if( [this](const rclcpp::ServiceBase::SharedPtr & service) { service_handles_.push_back(service->get_service_handle()); @@ -195,6 +194,14 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy return has_invalid_weak_nodes; } + void add_waitable_handle(const rclcpp::Waitable::SharedPtr & waitable) + { + if(waitable == nullptr){ + throw std::runtime_error("rclcpp: Received NULL waitable."); + } + waitable_handles_.push_back(waitable); + } + bool add_handles_to_wait_set(rcl_wait_set_t * wait_set) override { for (auto subscription : subscription_handles_) { diff --git a/rclcpp/src/rclcpp/executable_list.cpp b/rclcpp/src/rclcpp/executable_list.cpp index a1ed845055..f4e716d422 100644 --- a/rclcpp/src/rclcpp/executable_list.cpp +++ b/rclcpp/src/rclcpp/executable_list.cpp @@ -24,3 +24,57 @@ ExecutableList::ExecutableList() ExecutableList::~ExecutableList() {} + +void +ExecutableList::clear() +{ + this->timer.clear(); + this->number_of_timers = 0; + + this->subscription.clear(); + this->number_of_subscriptions = 0; + + this->service.clear(); + this->number_of_services = 0; + + this->client.clear(); + this->number_of_clients = 0; + + this->waitable.clear(); + this->number_of_waitables = 0; +} + +void +ExecutableList::add_subscription(rclcpp::SubscriptionBase::SharedPtr subscription) +{ + this->subscription.push_back(std::move(subscription)); + this->number_of_subscriptions++; +} + +void +ExecutableList::add_timer(rclcpp::TimerBase::SharedPtr timer) +{ + this->timer.push_back(std::move(timer)); + this->number_of_timers++; +} + +void +ExecutableList::add_service(rclcpp::ServiceBase::SharedPtr service) +{ + this->service.push_back(std::move(service)); + this->number_of_services++; +} + +void +ExecutableList::add_client(rclcpp::ClientBase::SharedPtr client) +{ + this->client.push_back(std::move(client)); + this->number_of_clients++; +} + +void +ExecutableList::add_waitable(rclcpp::Waitable::SharedPtr waitable) +{ + this->waitable.push_back(std::move(waitable)); + this->number_of_waitables++; +} \ No newline at end of file diff --git a/rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp b/rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp new file mode 100644 index 0000000000..a0eebb7604 --- /dev/null +++ b/rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp @@ -0,0 +1,281 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rclcpp/executors/static_single_threaded_executor.hpp" +#include "rclcpp/executors/static_executor_entities_collector.hpp" + +using rclcpp::executors::StaticExecutorEntitiesCollector; + +StaticExecutorEntitiesCollector::~StaticExecutorEntitiesCollector() +{ + // Disassociate all nodes + for (auto & weak_node : weak_nodes_) { + auto node = weak_node.lock(); + if (node) { + std::atomic_bool & has_executor = node->get_associated_with_executor_atomic(); + has_executor.store(false); + } + } + exec_list_.clear(); + weak_nodes_.clear(); + guard_conditions_.clear(); +} + +void +StaticExecutorEntitiesCollector::init( + rcl_wait_set_t* p_wait_set, + memory_strategy::MemoryStrategy::SharedPtr& memory_strategy, + rcl_guard_condition_t* executor_guard_condition) +{ + // Empty initialize executable list + exec_list_ = executor::ExecutableList(); + // Get executor's wait_set_ pointer + p_wait_set_ = p_wait_set; + // Get executor's memory strategy ptr + if (memory_strategy == nullptr) { + throw std::runtime_error("Received NULL memory strategy in executor waitable."); + } + memory_strategy_ = memory_strategy; + + // Add executor's guard condition + guard_conditions_.push_back(executor_guard_condition); + + // Get memory strategy and executable list. Prepare wait_set_ + execute(); +} + +void +StaticExecutorEntitiesCollector::execute() +{ + // Fill memory strategy with entities coming from weak_nodes_ + fill_memory_strategy(); + // Fill exec_list_ with entities coming from weak_nodes_ (same as memory strategy) + fill_executable_list(); + // Resize the wait_set_ based on memory_strategy handles (rcl_wait_set_resize) + prepare_wait_set(); +} + +void +StaticExecutorEntitiesCollector::fill_memory_strategy() +{ + memory_strategy_->clear_handles(); + bool has_invalid_weak_nodes = memory_strategy_->collect_entities(weak_nodes_); + + // Clean up any invalid nodes, if they were detected + if (has_invalid_weak_nodes) { + auto node_it = weak_nodes_.begin(); + while (node_it != weak_nodes_.end()) { + if (node_it->expired()) { + node_it = weak_nodes_.erase(node_it); + } else { + ++node_it; + } + } + } + + // Add the static executor waitable to the memory strategy + memory_strategy_->add_waitable_handle(this->shared_from_this()); +} + +void +StaticExecutorEntitiesCollector::fill_executable_list() +{ + exec_list_.clear(); + + for (auto & weak_node : weak_nodes_) { + auto node = weak_node.lock(); + if (!node) { + continue; + } + // Check in all the callback groups + for (auto & weak_group : node->get_callback_groups()) { + auto group = weak_group.lock(); + if (!group || !group->can_be_taken_from().load()) { + continue; + } + + group->find_timer_ptrs_if( + [this](const rclcpp::TimerBase::SharedPtr & timer) { + if(timer){ + exec_list_.add_timer(timer); + } + return false; + }); + group->find_subscription_ptrs_if( + [this](const rclcpp::SubscriptionBase::SharedPtr & subscription) { + if(subscription){ + exec_list_.add_subscription(subscription); + } + return false; + }); + group->find_service_ptrs_if( + [this](const rclcpp::ServiceBase::SharedPtr & service) { + if(service){ + exec_list_.add_service(service); + } + return false; + }); + group->find_client_ptrs_if( + [this](const rclcpp::ClientBase::SharedPtr & client) { + if(client){ + exec_list_.add_client(client); + } + return false; + }); + group->find_waitable_ptrs_if( + [this](const rclcpp::Waitable::SharedPtr & waitable) { + if(waitable){ + exec_list_.add_waitable(waitable); + } + return false; + }); + } + } + + // Add the executor's waitable to the executable list + exec_list_.add_waitable(shared_from_this()); +} + +void +StaticExecutorEntitiesCollector::prepare_wait_set() +{ + // clear wait set + if (rcl_wait_set_clear(p_wait_set_) != RCL_RET_OK) { + throw std::runtime_error("Couldn't clear wait set"); + } + + // The size of waitables are accounted for in size of the other entities + rcl_ret_t ret = rcl_wait_set_resize( + p_wait_set_, memory_strategy_->number_of_ready_subscriptions(), + memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(), + memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services(), + memory_strategy_->number_of_ready_events()); + + if (RCL_RET_OK != ret) { + throw std::runtime_error( + std::string("Couldn't resize the wait set : ") + rcl_get_error_string().str); + } +} + +void +StaticExecutorEntitiesCollector::refresh_wait_set(std::chrono::nanoseconds timeout) +{ + // clear wait set (memeset to '0' all wait_set_ entities + // but keeps the wait_set_ number of entities) + if (rcl_wait_set_clear(p_wait_set_) != RCL_RET_OK) { + throw std::runtime_error("Couldn't clear wait set"); + } + + if (!memory_strategy_->add_handles_to_wait_set(p_wait_set_)) { + throw std::runtime_error("Couldn't fill wait set"); + } + + rcl_ret_t status = + rcl_wait(p_wait_set_, std::chrono::duration_cast(timeout).count()); + + if (status == RCL_RET_WAIT_SET_EMPTY) { + RCUTILS_LOG_WARN_NAMED( + "rclcpp", + "empty wait set received in rcl_wait(). This should never happen."); + } + else if (status != RCL_RET_OK && status != RCL_RET_TIMEOUT) { + using rclcpp::exceptions::throw_from_rcl_error; + throw_from_rcl_error(status, "rcl_wait() failed"); + } +} + +bool +StaticExecutorEntitiesCollector::add_to_wait_set(rcl_wait_set_t * wait_set) +{ + // Add waitable guard conditions (one for each registered node) into the wait set. + for (const auto & gc : guard_conditions_) { + rcl_ret_t ret = rcl_wait_set_add_guard_condition(wait_set, gc, NULL); + if (ret != RCL_RET_OK) { + throw std::runtime_error("Executor waitable: couldn't add guard condition to wait set"); + } + } + return true; +} + +size_t StaticExecutorEntitiesCollector::get_number_of_ready_guard_conditions() +{ + return guard_conditions_.size(); +} + +void +StaticExecutorEntitiesCollector::add_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr) +{ + // Check to ensure node not already added + for (auto & weak_node : weak_nodes_) { + auto node = weak_node.lock(); + if (node == node_ptr) { + // TODO(jacquelinekay): Use a different error here? + throw std::runtime_error("Cannot add node to executor, node already added."); + } + } + + weak_nodes_.push_back(node_ptr); + guard_conditions_.push_back(node_ptr->get_notify_guard_condition()); +} + +bool +StaticExecutorEntitiesCollector::remove_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr) +{ + auto node_it = weak_nodes_.begin(); + + while (node_it != weak_nodes_.end()) { + bool matched = (node_it->lock() == node_ptr); + if (matched) { + // Find and remove node and its guard condition + auto gc_it = std::find(guard_conditions_.begin(), + guard_conditions_.end(), + node_ptr->get_notify_guard_condition()); + + if (gc_it != guard_conditions_.end()) { + guard_conditions_.erase(gc_it); + weak_nodes_.erase(node_it); + return true; + } + + throw std::runtime_error("Didn't find guard condition associated with node."); + + } else { + ++node_it; + } + } + + return false; +} + +bool +StaticExecutorEntitiesCollector::is_ready(rcl_wait_set_t * p_wait_set) +{ + // Check wait_set guard_conditions for added/removed entities to/from a node + for (size_t i = 0; i < p_wait_set->size_of_guard_conditions; ++i) { + if (p_wait_set->guard_conditions[i] != NULL) { + // Check if the guard condition triggered belongs to a node + auto it = std::find(guard_conditions_.begin(), guard_conditions_.end(), + p_wait_set->guard_conditions[i]); + + // If it does, we are ready to re-collect entities + if (it != guard_conditions_.end()) { + return true; + } + } + } + // None of the guard conditions triggered belong to a registered node + return false; +} \ No newline at end of file diff --git a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp index ca68850443..d4a230f2d9 100644 --- a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp @@ -18,11 +18,13 @@ using rclcpp::executor::ExecutableList; StaticSingleThreadedExecutor::StaticSingleThreadedExecutor( const rclcpp::executor::ExecutorArgs & args) -: executor::Executor(args) {} +: executor::Executor(args) +{ + entities_collector_ = std::make_shared(); +} StaticSingleThreadedExecutor::~StaticSingleThreadedExecutor() {} - void StaticSingleThreadedExecutor::spin() { @@ -30,309 +32,109 @@ StaticSingleThreadedExecutor::spin() throw std::runtime_error("spin() called while already spinning"); } RCLCPP_SCOPE_EXIT(this->spinning.store(false); ); - rclcpp::executor::ExecutableList executable_list; - run_collect_entities(); - get_executable_list(executable_list); + + // Set memory_strategy_ and exec_list_ based on weak_nodes_ + // Prepare wait_set_ based on memory_strategy_ + entities_collector_->init(&wait_set_, memory_strategy_, &interrupt_guard_condition_); + while (rclcpp::ok(this->context_) && spinning.load()) { - execute_ready_executables(executable_list); + // Refresh wait set and wait for work + entities_collector_->refresh_wait_set(); + execute_ready_executables(); } } void -StaticSingleThreadedExecutor::get_timer_list(ExecutableList & exec_list) +StaticSingleThreadedExecutor::add_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify) { - // Clear the previous timers (if any) from the ExecutableList struct - exec_list.timer.clear(); - exec_list.number_of_timers = 0; - for (auto & weak_node : weak_nodes_) { - auto node = weak_node.lock(); - if (!node) { - continue; - } - // Check in all the callback groups - for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); - if (!group || !group->can_be_taken_from().load()) { - continue; - } - group->find_timer_ptrs_if([&exec_list](const rclcpp::TimerBase::SharedPtr & timer) { - if(timer){ - // If any timer is found, push it in the exec_list struct - exec_list.timer.push_back(timer); - exec_list.number_of_timers++; - } - return false; - }); - } + // If the node already has an executor + std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic(); + if (has_executor.exchange(true)) { + throw std::runtime_error("Node has already been added to an executor."); } -} -void -StaticSingleThreadedExecutor::get_subscription_list(ExecutableList & exec_list) -{ - // Clear the previous subscriptions (if any) from the ExecutableList struct - exec_list.subscription.clear(); - exec_list.number_of_subscriptions = 0; - for (auto & weak_node : weak_nodes_) { - auto node = weak_node.lock(); - if (!node) { - continue; - } - // Check in all the callback groups - for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); - if (!group || !group->can_be_taken_from().load()) { - continue; - } - group->find_subscription_ptrs_if([&exec_list]( - const rclcpp::SubscriptionBase::SharedPtr & subscription) { - if(subscription){ - // If any subscription (intra-process as well) is found, push it in the exec_list struct - exec_list.subscription.push_back(subscription); - exec_list.number_of_subscriptions++; - } - return false; - }); + if (notify) { + // Interrupt waiting to handle new node + if (rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) { + throw std::runtime_error(rcl_get_error_string().str); } } -} -void -StaticSingleThreadedExecutor::get_service_list(ExecutableList & exec_list) -{ - // Clear the previous services (if any) from the ExecutableList struct - exec_list.service.clear(); - exec_list.number_of_services = 0; - for (auto & weak_node : weak_nodes_) { - auto node = weak_node.lock(); - if (!node) { - continue; - } - // Check in all the callback groups - for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); - if (!group || !group->can_be_taken_from().load()) { - continue; - } - group->find_service_ptrs_if([&exec_list](const rclcpp::ServiceBase::SharedPtr & service) { - if(service){ - // If any service is found, push it in the exec_list struct - exec_list.service.push_back(service); - exec_list.number_of_services++; - } - return false; - }); - } - } + entities_collector_->add_node(node_ptr); } void -StaticSingleThreadedExecutor::get_client_list(ExecutableList & exec_list) +StaticSingleThreadedExecutor::add_node(std::shared_ptr node_ptr, bool notify) { - // Clear the previous clients (if any) from the ExecutableList struct - exec_list.client.clear(); - exec_list.number_of_clients = 0; - for (auto & weak_node : weak_nodes_) { - auto node = weak_node.lock(); - if (!node) { - continue; - } - // Check in all the callback groups - for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); - if (!group || !group->can_be_taken_from().load()) { - continue; - } - group->find_client_ptrs_if([&exec_list](const rclcpp::ClientBase::SharedPtr & client) { - if(client){ - // If any client is found, push it in the exec_list struct - exec_list.client.push_back(client); - exec_list.number_of_clients++; - } - return false; - }); - } - } + this->add_node(node_ptr->get_node_base_interface(), notify); } void -StaticSingleThreadedExecutor::get_waitable_list(ExecutableList & exec_list) +StaticSingleThreadedExecutor::remove_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify) { - // Clear the previous waitables (if any) from the ExecutableList struct - exec_list.waitable.clear(); - exec_list.number_of_waitables = 0; - for (auto & weak_node : weak_nodes_) { - auto node = weak_node.lock(); - if (!node) { - continue; - } - // Check in all the callback groups - for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); - if (!group || !group->can_be_taken_from().load()) { - continue; + bool node_removed = entities_collector_->remove_node(node_ptr); + + if (notify) { + // If the node was matched and removed, interrupt waiting + if (node_removed) { + if (rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) { + throw std::runtime_error(rcl_get_error_string().str); } - group->find_waitable_ptrs_if([&exec_list](const rclcpp::Waitable::SharedPtr & waitable) { - if(waitable){ - // If any waitable is found, push it in the exec_list struct - exec_list.waitable.push_back(waitable); - exec_list.number_of_waitables++; - } - return false; - }); } } + + std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic(); + has_executor.store(false); } void -StaticSingleThreadedExecutor::get_executable_list( - ExecutableList & executable_list, std::chrono::nanoseconds timeout) +StaticSingleThreadedExecutor::remove_node(std::shared_ptr node_ptr, bool notify) { - // prepare the wait_set - prepare_wait_set(); - - // add handles to the wait_set and wait for work - refresh_wait_set(timeout); - - // Get all the timers - get_timer_list(executable_list); - - // Get all the subscribers - get_subscription_list(executable_list); - - // Get all the services - get_service_list(executable_list); - - // Get all the clients - get_client_list(executable_list); - - // Get all the waitables - get_waitable_list(executable_list); + this->remove_node(node_ptr->get_node_base_interface(), notify); } void -StaticSingleThreadedExecutor::execute_ready_executables( - ExecutableList & exec_list, std::chrono::nanoseconds timeout) +StaticSingleThreadedExecutor::execute_ready_executables() { - refresh_wait_set(timeout); // Execute all the ready subscriptions for (size_t i = 0; i < wait_set_.size_of_subscriptions; ++i) { - if (i < exec_list.number_of_subscriptions) { + if (i < entities_collector_->get_number_of_subscriptions()) { if (wait_set_.subscriptions[i]) { - execute_subscription(exec_list.subscription[i]); + execute_subscription(entities_collector_->get_subscription(i)); } } } // Execute all the ready timers for (size_t i = 0; i < wait_set_.size_of_timers; ++i) { - if (i < exec_list.number_of_timers) { - if (wait_set_.timers[i] && exec_list.timer[i]->is_ready()) { - execute_timer(exec_list.timer[i]); + if (i < entities_collector_->get_number_of_timers()) { + if (wait_set_.timers[i] && entities_collector_->get_timer(i)->is_ready()) { + execute_timer(entities_collector_->get_timer(i)); } } } // Execute all the ready services for (size_t i = 0; i < wait_set_.size_of_services; ++i) { - if (i < exec_list.number_of_services) { + if (i < entities_collector_->get_number_of_services()) { if (wait_set_.services[i]) { - execute_service(exec_list.service[i]); + execute_service(entities_collector_->get_service(i)); } } } // Execute all the ready clients for (size_t i = 0; i < wait_set_.size_of_clients; ++i) { - if (i < exec_list.number_of_clients) { + if (i < entities_collector_->get_number_of_clients()) { if (wait_set_.clients[i]) { - execute_client(exec_list.client[i]); + execute_client(entities_collector_->get_client(i)); } } } // Execute all the ready waitables - for (size_t i = 0; i < exec_list.number_of_waitables; ++i) { - if (exec_list.waitable[i]->is_ready(&wait_set_)) { - exec_list.waitable[i]->execute(); - } - } - // Check the guard_conditions to see if a new entity was added to a node - for (size_t i = 0; i < wait_set_.size_of_guard_conditions; ++i) { - if (wait_set_.guard_conditions[i]) { - // Check if the guard condition triggered belongs to a node - auto it = std::find(guard_conditions_.begin(), guard_conditions_.end(), - wait_set_.guard_conditions[i]); - - // If it does, re-collect entities - if (it != guard_conditions_.end()) { - run_collect_entities(); - get_executable_list(exec_list); - break; - } - } - } -} - -void -StaticSingleThreadedExecutor::run_collect_entities() -{ - memory_strategy_->clear_handles(); - bool has_invalid_weak_nodes = memory_strategy_->collect_entities(weak_nodes_); - - // Clean up any invalid nodes, if they were detected - if (has_invalid_weak_nodes) { - auto node_it = weak_nodes_.begin(); - auto gc_it = guard_conditions_.begin(); - while (node_it != weak_nodes_.end()) { - if (node_it->expired()) { - node_it = weak_nodes_.erase(node_it); - memory_strategy_->remove_guard_condition(*gc_it); - gc_it = guard_conditions_.erase(gc_it); - } else { - ++node_it; - ++gc_it; - } + for (size_t i = 0; i < entities_collector_->get_number_of_waitables(); ++i) { + if (entities_collector_->get_waitable(i)->is_ready(&wait_set_)) { + entities_collector_->get_waitable(i)->execute(); } } } -void -StaticSingleThreadedExecutor::prepare_wait_set() -{ - // clear wait set - if (rcl_wait_set_clear(&wait_set_) != RCL_RET_OK) { - throw std::runtime_error("Couldn't clear wait set"); - } - - // The size of waitables are accounted for in size of the other entities - rcl_ret_t ret = rcl_wait_set_resize( - &wait_set_, memory_strategy_->number_of_ready_subscriptions(), - memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(), - memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services(), - memory_strategy_->number_of_ready_events()); - - if (RCL_RET_OK != ret) { - throw std::runtime_error( - std::string("Couldn't resize the wait set : ") + rcl_get_error_string().str); - } -} - -void -StaticSingleThreadedExecutor::refresh_wait_set(std::chrono::nanoseconds timeout) -{ - // clear wait set - if (rcl_wait_set_clear(&wait_set_) != RCL_RET_OK) { - throw std::runtime_error("Couldn't clear wait set"); - } - // add handles to the wait_set - if (!memory_strategy_->add_handles_to_wait_set(&wait_set_)) { - throw std::runtime_error("Couldn't fill wait set"); - } - rcl_ret_t status = - rcl_wait(&wait_set_, std::chrono::duration_cast(timeout).count()); - if (status == RCL_RET_WAIT_SET_EMPTY) { - RCUTILS_LOG_WARN_NAMED( - "rclcpp", - "empty wait set received in rcl_wait(). This should never happen."); - } else if (status != RCL_RET_OK && status != RCL_RET_TIMEOUT) { - using rclcpp::exceptions::throw_from_rcl_error; - throw_from_rcl_error(status, "rcl_wait() failed"); - } -} From 5b1af664f699ddd5b43d88698c2b706482b44903 Mon Sep 17 00:00:00 2001 From: William Woodall Date: Fri, 27 Mar 2020 05:01:13 -0700 Subject: [PATCH 07/11] fixup and style Signed-off-by: William Woodall --- rclcpp/include/rclcpp/executable_list.hpp | 21 +++++----- .../static_executor_entities_collector.hpp | 32 ++++++++++----- .../static_single_threaded_executor.hpp | 14 ++++--- .../strategies/allocator_memory_strategy.hpp | 4 +- rclcpp/src/rclcpp/executable_list.cpp | 2 +- .../static_executor_entities_collector.cpp | 39 ++++++++++--------- .../static_single_threaded_executor.cpp | 1 - 7 files changed, 63 insertions(+), 50 deletions(-) diff --git a/rclcpp/include/rclcpp/executable_list.hpp b/rclcpp/include/rclcpp/executable_list.hpp index 2a243442a2..1456fd31df 100644 --- a/rclcpp/include/rclcpp/executable_list.hpp +++ b/rclcpp/include/rclcpp/executable_list.hpp @@ -16,12 +16,9 @@ #include #include -#include "rclcpp/callback_group.hpp" #include "rclcpp/client.hpp" -#include "rclcpp/macros.hpp" -#include "rclcpp/node_interfaces/node_base_interface.hpp" #include "rclcpp/service.hpp" -#include "rclcpp/subscription.hpp" +#include "rclcpp/subscription_base.hpp" #include "rclcpp/timer.hpp" #include "rclcpp/visibility_control.hpp" #include "rclcpp/waitable.hpp" @@ -31,13 +28,15 @@ namespace rclcpp namespace executor { -/// This struct contains subscriptionbase, timerbase, etc. which can be used to run callbacks. -struct ExecutableList +/// This class contains subscriptionbase, timerbase, etc. which can be used to run callbacks. +class ExecutableList { +public: RCLCPP_PUBLIC ExecutableList(); RCLCPP_PUBLIC + virtual ~ExecutableList(); RCLCPP_PUBLIC @@ -46,23 +45,23 @@ struct ExecutableList RCLCPP_PUBLIC void - add_subscription(rclcpp::SubscriptionBase::SharedPtr); + add_subscription(rclcpp::SubscriptionBase::SharedPtr subscription); RCLCPP_PUBLIC void - add_timer(rclcpp::TimerBase::SharedPtr); + add_timer(rclcpp::TimerBase::SharedPtr timer); RCLCPP_PUBLIC void - add_service(rclcpp::ServiceBase::SharedPtr); + add_service(rclcpp::ServiceBase::SharedPtr service); RCLCPP_PUBLIC void - add_client(rclcpp::ClientBase::SharedPtr); + add_client(rclcpp::ClientBase::SharedPtr client); RCLCPP_PUBLIC void - add_waitable(rclcpp::Waitable::SharedPtr); + add_waitable(rclcpp::Waitable::SharedPtr waitable); // Vector containing the SubscriptionBase of all the subscriptions added to the executor. std::vector subscription; diff --git a/rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp b/rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp index c10705eb29..2c570a5f80 100644 --- a/rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp +++ b/rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp @@ -15,14 +15,25 @@ #ifndef RCLCPP__EXECUTORS__STATIC_EXECUTOR_ENTITIES_COLLECTOR_HPP_ #define RCLCPP__EXECUTORS__STATIC_EXECUTOR_ENTITIES_COLLECTOR_HPP_ +#include + +#include +#include + +#include +#include +#include +#include +#include + namespace rclcpp { namespace executors { -class StaticExecutorEntitiesCollector : - public rclcpp::Waitable, - public std::enable_shared_from_this +class StaticExecutorEntitiesCollector + : public rclcpp::Waitable, + public std::enable_shared_from_this { public: RCLCPP_SMART_PTR_DEFINITIONS(StaticExecutorEntitiesCollector) @@ -36,13 +47,14 @@ class StaticExecutorEntitiesCollector : RCLCPP_PUBLIC void - init(rcl_wait_set_t* p_wait_set, - memory_strategy::MemoryStrategy::SharedPtr& memory_strategy, - rcl_guard_condition_t* executor_guard_condition); + init( + rcl_wait_set_t * p_wait_set, + rclcpp::memory_strategy::MemoryStrategy::SharedPtr & memory_strategy, + rcl_guard_condition_t * executor_guard_condition); RCLCPP_PUBLIC void - execute(); + execute() override; RCLCPP_PUBLIC void @@ -142,13 +154,13 @@ class StaticExecutorEntitiesCollector : std::list weak_nodes_; /// Wait set for managing entities that the rmw layer waits on. - rcl_wait_set_t* p_wait_set_ = nullptr; + rcl_wait_set_t * p_wait_set_ = nullptr; /// Executable list: timers, subscribers, clients, services and waitables - executor::ExecutableList exec_list_; + rclcpp::executor::ExecutableList exec_list_; }; } // namespace executors } // namespace rclcpp -#endif // RCLCPP__EXECUTORS__STATIC_EXECUTOR_ENTITIES_COLLECTOR_HPP_ \ No newline at end of file +#endif // RCLCPP__EXECUTORS__STATIC_EXECUTOR_ENTITIES_COLLECTOR_HPP_ diff --git a/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp index b73fbce8be..e1a85e385b 100644 --- a/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp @@ -58,8 +58,7 @@ class StaticSingleThreadedExecutor : public executor::Executor /// Default constructor. See the default constructor for Executor. RCLCPP_PUBLIC - StaticSingleThreadedExecutor( - const executor::ExecutorArgs & args = executor::ExecutorArgs()); + StaticSingleThreadedExecutor(const executor::ExecutorArgs & args = executor::ExecutorArgs()); /// Default destrcutor. RCLCPP_PUBLIC @@ -70,7 +69,7 @@ class StaticSingleThreadedExecutor : public executor::Executor // It will only be interrupt by a CTRL-C (managed by the global signal handler). RCLCPP_PUBLIC void - spin(); + spin() override; /// Add a node to the executor. /** @@ -82,7 +81,9 @@ class StaticSingleThreadedExecutor : public executor::Executor */ RCLCPP_PUBLIC void - add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify = true) override; + add_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + bool notify = true) override; /// Convenience function which takes Node and forwards NodeBaseInterface. RCLCPP_PUBLIC @@ -98,7 +99,9 @@ class StaticSingleThreadedExecutor : public executor::Executor */ RCLCPP_PUBLIC void - remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify = true) override; + remove_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + bool notify = true) override; /// Convenience function which takes Node and forwards NodeBaseInterface. RCLCPP_PUBLIC @@ -181,7 +184,6 @@ class StaticSingleThreadedExecutor : public executor::Executor void execute_ready_executables(); - private: RCLCPP_DISABLE_COPY(StaticSingleThreadedExecutor) diff --git a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp index 487799f750..bc703f5f59 100644 --- a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp @@ -196,8 +196,8 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy void add_waitable_handle(const rclcpp::Waitable::SharedPtr & waitable) { - if(waitable == nullptr){ - throw std::runtime_error("rclcpp: Received NULL waitable."); + if (nullptr == waitable) { + throw std::runtime_error("waitable object unexpectedly nullptr"); } waitable_handles_.push_back(waitable); } diff --git a/rclcpp/src/rclcpp/executable_list.cpp b/rclcpp/src/rclcpp/executable_list.cpp index f4e716d422..5bd7712be8 100644 --- a/rclcpp/src/rclcpp/executable_list.cpp +++ b/rclcpp/src/rclcpp/executable_list.cpp @@ -77,4 +77,4 @@ ExecutableList::add_waitable(rclcpp::Waitable::SharedPtr waitable) { this->waitable.push_back(std::move(waitable)); this->number_of_waitables++; -} \ No newline at end of file +} diff --git a/rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp b/rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp index a0eebb7604..45cc62c50d 100644 --- a/rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp +++ b/rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp @@ -34,9 +34,9 @@ StaticExecutorEntitiesCollector::~StaticExecutorEntitiesCollector() void StaticExecutorEntitiesCollector::init( - rcl_wait_set_t* p_wait_set, - memory_strategy::MemoryStrategy::SharedPtr& memory_strategy, - rcl_guard_condition_t* executor_guard_condition) + rcl_wait_set_t * p_wait_set, + memory_strategy::MemoryStrategy::SharedPtr & memory_strategy, + rcl_guard_condition_t * executor_guard_condition) { // Empty initialize executable list exec_list_ = executor::ExecutableList(); @@ -107,38 +107,38 @@ StaticExecutorEntitiesCollector::fill_executable_list() group->find_timer_ptrs_if( [this](const rclcpp::TimerBase::SharedPtr & timer) { - if(timer){ + if (timer) { exec_list_.add_timer(timer); } return false; }); group->find_subscription_ptrs_if( [this](const rclcpp::SubscriptionBase::SharedPtr & subscription) { - if(subscription){ + if (subscription) { exec_list_.add_subscription(subscription); } return false; }); group->find_service_ptrs_if( [this](const rclcpp::ServiceBase::SharedPtr & service) { - if(service){ + if (service) { exec_list_.add_service(service); } return false; }); group->find_client_ptrs_if( [this](const rclcpp::ClientBase::SharedPtr & client) { - if(client){ + if (client) { exec_list_.add_client(client); } return false; }); - group->find_waitable_ptrs_if( - [this](const rclcpp::Waitable::SharedPtr & waitable) { - if(waitable){ + group->find_waitable_ptrs_if( + [this](const rclcpp::Waitable::SharedPtr & waitable) { + if (waitable) { exec_list_.add_waitable(waitable); } - return false; + return false; }); } } @@ -188,8 +188,7 @@ StaticExecutorEntitiesCollector::refresh_wait_set(std::chrono::nanoseconds timeo RCUTILS_LOG_WARN_NAMED( "rclcpp", "empty wait set received in rcl_wait(). This should never happen."); - } - else if (status != RCL_RET_OK && status != RCL_RET_TIMEOUT) { + } else if (status != RCL_RET_OK && status != RCL_RET_TIMEOUT) { using rclcpp::exceptions::throw_from_rcl_error; throw_from_rcl_error(status, "rcl_wait() failed"); } @@ -240,9 +239,10 @@ StaticExecutorEntitiesCollector::remove_node( bool matched = (node_it->lock() == node_ptr); if (matched) { // Find and remove node and its guard condition - auto gc_it = std::find(guard_conditions_.begin(), - guard_conditions_.end(), - node_ptr->get_notify_guard_condition()); + auto gc_it = std::find( + guard_conditions_.begin(), + guard_conditions_.end(), + node_ptr->get_notify_guard_condition()); if (gc_it != guard_conditions_.end()) { guard_conditions_.erase(gc_it); @@ -267,8 +267,9 @@ StaticExecutorEntitiesCollector::is_ready(rcl_wait_set_t * p_wait_set) for (size_t i = 0; i < p_wait_set->size_of_guard_conditions; ++i) { if (p_wait_set->guard_conditions[i] != NULL) { // Check if the guard condition triggered belongs to a node - auto it = std::find(guard_conditions_.begin(), guard_conditions_.end(), - p_wait_set->guard_conditions[i]); + auto it = std::find( + guard_conditions_.begin(), guard_conditions_.end(), + p_wait_set->guard_conditions[i]); // If it does, we are ready to re-collect entities if (it != guard_conditions_.end()) { @@ -278,4 +279,4 @@ StaticExecutorEntitiesCollector::is_ready(rcl_wait_set_t * p_wait_set) } // None of the guard conditions triggered belong to a registered node return false; -} \ No newline at end of file +} diff --git a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp index d4a230f2d9..c52268dad4 100644 --- a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp @@ -137,4 +137,3 @@ StaticSingleThreadedExecutor::execute_ready_executables() } } } - From 6b1535199e230f667762bf611dd308e1c87bc3f4 Mon Sep 17 00:00:00 2001 From: William Woodall Date: Fri, 27 Mar 2020 05:13:10 -0700 Subject: [PATCH 08/11] mark new classes as final, with non-virtual destructors Signed-off-by: William Woodall --- rclcpp/include/rclcpp/executable_list.hpp | 3 +-- .../rclcpp/executors/static_executor_entities_collector.hpp | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/rclcpp/include/rclcpp/executable_list.hpp b/rclcpp/include/rclcpp/executable_list.hpp index 1456fd31df..ca5b6f76c4 100644 --- a/rclcpp/include/rclcpp/executable_list.hpp +++ b/rclcpp/include/rclcpp/executable_list.hpp @@ -29,14 +29,13 @@ namespace executor { /// This class contains subscriptionbase, timerbase, etc. which can be used to run callbacks. -class ExecutableList +class ExecutableList final { public: RCLCPP_PUBLIC ExecutableList(); RCLCPP_PUBLIC - virtual ~ExecutableList(); RCLCPP_PUBLIC diff --git a/rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp b/rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp index 2c570a5f80..502bba05a6 100644 --- a/rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp +++ b/rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp @@ -31,7 +31,7 @@ namespace rclcpp namespace executors { -class StaticExecutorEntitiesCollector +class StaticExecutorEntitiesCollector final : public rclcpp::Waitable, public std::enable_shared_from_this { From 828785e7a592fc423f58ddd4d41ffedf6bd5ff20 Mon Sep 17 00:00:00 2001 From: Ishu Goel Date: Fri, 27 Mar 2020 14:47:55 +0100 Subject: [PATCH 09/11] adding copyright to static executor files Signed-off-by: Ishu Goel --- rclcpp/include/rclcpp/executable_list.hpp | 2 ++ .../rclcpp/executors/static_single_threaded_executor.hpp | 2 ++ rclcpp/src/rclcpp/executable_list.cpp | 2 ++ rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp | 2 ++ 4 files changed, 8 insertions(+) diff --git a/rclcpp/include/rclcpp/executable_list.hpp b/rclcpp/include/rclcpp/executable_list.hpp index ca5b6f76c4..2929b5410a 100644 --- a/rclcpp/include/rclcpp/executable_list.hpp +++ b/rclcpp/include/rclcpp/executable_list.hpp @@ -1,3 +1,5 @@ +// Copyright 2019 Nobleo Technology +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp index e1a85e385b..56f2ca5b8c 100644 --- a/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp @@ -1,3 +1,5 @@ +// Copyright 2019 Nobleo Technology +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/rclcpp/src/rclcpp/executable_list.cpp b/rclcpp/src/rclcpp/executable_list.cpp index 5bd7712be8..6f9345ccd5 100644 --- a/rclcpp/src/rclcpp/executable_list.cpp +++ b/rclcpp/src/rclcpp/executable_list.cpp @@ -1,3 +1,5 @@ +// Copyright 2019 Nobleo Technology +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp index c52268dad4..060121d6f1 100644 --- a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp @@ -1,3 +1,5 @@ +// Copyright 2019 Nobleo Technology +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at From 0a389b9e8323ede6bd295b0f7a35e3f4ecee3d1c Mon Sep 17 00:00:00 2001 From: William Woodall Date: Fri, 27 Mar 2020 15:50:17 -0700 Subject: [PATCH 10/11] fixup Signed-off-by: William Woodall --- .../rclcpp/executors/static_executor_entities_collector.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp b/rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp index 502bba05a6..eeeec38959 100644 --- a/rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp +++ b/rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp @@ -148,7 +148,7 @@ class StaticExecutorEntitiesCollector final std::list guard_conditions_; /// Memory strategy: an interface for handling user-defined memory allocation strategies. - memory_strategy::MemoryStrategy::SharedPtr memory_strategy_; + rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy_; /// List of weak nodes registered in the static executor std::list weak_nodes_; From adced0c9fcf7a8b5ef4ad17c31a0d065abb81923 Mon Sep 17 00:00:00 2001 From: William Woodall Date: Fri, 27 Mar 2020 16:12:34 -0700 Subject: [PATCH 11/11] cpplint fixes Signed-off-by: William Woodall --- .../static_executor_entities_collector.hpp | 15 ++++++++------- .../executors/static_single_threaded_executor.hpp | 14 ++++++++------ rclcpp/src/rclcpp/executable_list.cpp | 2 ++ .../static_executor_entities_collector.cpp | 6 +++++- .../executors/static_single_threaded_executor.cpp | 3 +++ 5 files changed, 26 insertions(+), 14 deletions(-) diff --git a/rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp b/rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp index eeeec38959..49d069e5a7 100644 --- a/rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp +++ b/rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp @@ -15,16 +15,17 @@ #ifndef RCLCPP__EXECUTORS__STATIC_EXECUTOR_ENTITIES_COLLECTOR_HPP_ #define RCLCPP__EXECUTORS__STATIC_EXECUTOR_ENTITIES_COLLECTOR_HPP_ +#include #include -#include -#include +#include "rcl/guard_condition.h" +#include "rcl/wait.h" -#include -#include -#include -#include -#include +#include "rclcpp/executable_list.hpp" +#include "rclcpp/macros.hpp" +#include "rclcpp/memory_strategy.hpp" +#include "rclcpp/visibility_control.hpp" +#include "rclcpp/waitable.hpp" namespace rclcpp { diff --git a/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp index 56f2ca5b8c..43d086f7db 100644 --- a/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp @@ -15,22 +15,23 @@ #ifndef RCLCPP__EXECUTORS__STATIC_SINGLE_THREADED_EXECUTOR_HPP_ #define RCLCPP__EXECUTORS__STATIC_SINGLE_THREADED_EXECUTOR_HPP_ -#include - #include #include #include #include #include + +#include "rmw/rmw.h" + +#include "rclcpp/executable_list.hpp" #include "rclcpp/executor.hpp" +#include "rclcpp/executors/static_executor_entities_collector.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/memory_strategies.hpp" -#include "rclcpp/executable_list.hpp" #include "rclcpp/node.hpp" -#include "rclcpp/utilities.hpp" #include "rclcpp/rate.hpp" +#include "rclcpp/utilities.hpp" #include "rclcpp/visibility_control.hpp" -#include "rclcpp/executors/static_executor_entities_collector.hpp" namespace rclcpp { @@ -60,7 +61,8 @@ class StaticSingleThreadedExecutor : public executor::Executor /// Default constructor. See the default constructor for Executor. RCLCPP_PUBLIC - StaticSingleThreadedExecutor(const executor::ExecutorArgs & args = executor::ExecutorArgs()); + explicit StaticSingleThreadedExecutor( + const executor::ExecutorArgs & args = executor::ExecutorArgs()); /// Default destrcutor. RCLCPP_PUBLIC diff --git a/rclcpp/src/rclcpp/executable_list.cpp b/rclcpp/src/rclcpp/executable_list.cpp index 6f9345ccd5..d9dae57cdd 100644 --- a/rclcpp/src/rclcpp/executable_list.cpp +++ b/rclcpp/src/rclcpp/executable_list.cpp @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include + #include "rclcpp/executable_list.hpp" using rclcpp::executor::ExecutableList; diff --git a/rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp b/rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp index 45cc62c50d..f8e32dad66 100644 --- a/rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp +++ b/rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp @@ -12,9 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "rclcpp/executors/static_single_threaded_executor.hpp" #include "rclcpp/executors/static_executor_entities_collector.hpp" +#include +#include + +#include "rclcpp/executors/static_single_threaded_executor.hpp" + using rclcpp::executors::StaticExecutorEntitiesCollector; StaticExecutorEntitiesCollector::~StaticExecutorEntitiesCollector() diff --git a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp index 060121d6f1..c6c1d1be9c 100644 --- a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp @@ -13,6 +13,9 @@ // limitations under the License. #include "rclcpp/executors/static_single_threaded_executor.hpp" + +#include + #include "rclcpp/scope_exit.hpp" using rclcpp::executors::StaticSingleThreadedExecutor;