diff --git a/rclcpp/include/rclcpp/callback_group.hpp b/rclcpp/include/rclcpp/callback_group.hpp index 57319b2c23..a1b7a1229f 100644 --- a/rclcpp/include/rclcpp/callback_group.hpp +++ b/rclcpp/include/rclcpp/callback_group.hpp @@ -62,25 +62,40 @@ class CallbackGroup RCLCPP_PUBLIC explicit CallbackGroup(CallbackGroupType group_type); - RCLCPP_PUBLIC - const std::vector & - get_subscription_ptrs() const; - - RCLCPP_PUBLIC - const std::vector & - get_timer_ptrs() const; - - RCLCPP_PUBLIC - const std::vector & - get_service_ptrs() const; - - RCLCPP_PUBLIC - const std::vector & - get_client_ptrs() const; - - RCLCPP_PUBLIC - const std::vector & - get_waitable_ptrs() const; + template + rclcpp::SubscriptionBase::SharedPtr + find_subscription_ptrs_if(Function func) const + { + return _find_ptrs_if_impl(func, subscription_ptrs_); + } + + template + rclcpp::TimerBase::SharedPtr + find_timer_ptrs_if(Function func) const + { + return _find_ptrs_if_impl(func, timer_ptrs_); + } + + template + rclcpp::ServiceBase::SharedPtr + find_service_ptrs_if(Function func) const + { + return _find_ptrs_if_impl(func, service_ptrs_); + } + + template + rclcpp::ClientBase::SharedPtr + find_client_ptrs_if(Function func) const + { + return _find_ptrs_if_impl(func, client_ptrs_); + } + + template + rclcpp::Waitable::SharedPtr + find_waitable_ptrs_if(Function func) const + { + return _find_ptrs_if_impl(func, waitable_ptrs_); + } RCLCPP_PUBLIC std::atomic_bool & @@ -130,6 +145,21 @@ class CallbackGroup std::vector client_ptrs_; std::vector waitable_ptrs_; std::atomic_bool can_be_taken_from_; + +private: + template + typename TypeT::SharedPtr _find_ptrs_if_impl( + Function func, const std::vector & vect_ptrs) const + { + std::lock_guard lock(mutex_); + for (auto & weak_ptr : vect_ptrs) { + auto ref_ptr = weak_ptr.lock(); + if (ref_ptr && func(ref_ptr)) { + return ref_ptr; + } + } + return typename TypeT::SharedPtr(); + } }; } // namespace callback_group diff --git a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp index 72df5dd82a..316c016a45 100644 --- a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp @@ -15,6 +15,7 @@ #ifndef RCLCPP__EXECUTORS__MULTI_THREADED_EXECUTOR_HPP_ #define RCLCPP__EXECUTORS__MULTI_THREADED_EXECUTOR_HPP_ +#include #include #include #include @@ -53,7 +54,8 @@ class MultiThreadedExecutor : public executor::Executor MultiThreadedExecutor( const executor::ExecutorArgs & args = executor::ExecutorArgs(), size_t number_of_threads = 0, - bool yield_before_execute = false); + bool yield_before_execute = false, + std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); RCLCPP_PUBLIC virtual ~MultiThreadedExecutor(); @@ -77,6 +79,7 @@ class MultiThreadedExecutor : public executor::Executor std::mutex wait_mutex_; size_t number_of_threads_; bool yield_before_execute_; + std::chrono::nanoseconds next_exec_timeout_; std::set scheduled_timers_; }; diff --git a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp index 61d5b74097..b70d849347 100644 --- a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp @@ -164,40 +164,31 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy if (!group || !group->can_be_taken_from().load()) { continue; } - for (auto & weak_subscription : group->get_subscription_ptrs()) { - auto subscription = weak_subscription.lock(); - if (subscription) { + group->find_subscription_ptrs_if( + [this](const rclcpp::SubscriptionBase::SharedPtr & subscription) { subscription_handles_.push_back(subscription->get_subscription_handle()); if (subscription->get_intra_process_subscription_handle()) { subscription_handles_.push_back( subscription->get_intra_process_subscription_handle()); } - } - } - for (auto & weak_service : group->get_service_ptrs()) { - auto service = weak_service.lock(); - if (service) { + return false; + }); + group->find_service_ptrs_if([this](const rclcpp::ServiceBase::SharedPtr & service) { service_handles_.push_back(service->get_service_handle()); - } - } - for (auto & weak_client : group->get_client_ptrs()) { - auto client = weak_client.lock(); - if (client) { + return false; + }); + group->find_client_ptrs_if([this](const rclcpp::ClientBase::SharedPtr & client) { client_handles_.push_back(client->get_client_handle()); - } - } - for (auto & weak_timer : group->get_timer_ptrs()) { - auto timer = weak_timer.lock(); - if (timer) { + return false; + }); + group->find_timer_ptrs_if([this](const rclcpp::TimerBase::SharedPtr & timer) { timer_handles_.push_back(timer->get_timer_handle()); - } - } - for (auto & weak_waitable : group->get_waitable_ptrs()) { - auto waitable = weak_waitable.lock(); - if (waitable) { + return false; + }); + group->find_waitable_ptrs_if([this](const rclcpp::Waitable::SharedPtr & waitable) { waitable_handles_.push_back(waitable); - } - } + return false; + }); } } return has_invalid_weak_nodes; diff --git a/rclcpp/src/rclcpp/callback_group.cpp b/rclcpp/src/rclcpp/callback_group.cpp index b812afc516..e9d0e80c48 100644 --- a/rclcpp/src/rclcpp/callback_group.cpp +++ b/rclcpp/src/rclcpp/callback_group.cpp @@ -23,40 +23,6 @@ CallbackGroup::CallbackGroup(CallbackGroupType group_type) : type_(group_type), can_be_taken_from_(true) {} -const std::vector & -CallbackGroup::get_subscription_ptrs() const -{ - std::lock_guard lock(mutex_); - return subscription_ptrs_; -} - -const std::vector & -CallbackGroup::get_timer_ptrs() const -{ - std::lock_guard lock(mutex_); - return timer_ptrs_; -} - -const std::vector & -CallbackGroup::get_service_ptrs() const -{ - std::lock_guard lock(mutex_); - return service_ptrs_; -} - -const std::vector & -CallbackGroup::get_client_ptrs() const -{ - std::lock_guard lock(mutex_); - return client_ptrs_; -} - -const std::vector & -CallbackGroup::get_waitable_ptrs() const -{ - std::lock_guard lock(mutex_); - return waitable_ptrs_; -} std::atomic_bool & CallbackGroup::can_be_taken_from() @@ -76,6 +42,12 @@ CallbackGroup::add_subscription( { std::lock_guard lock(mutex_); subscription_ptrs_.push_back(subscription_ptr); + subscription_ptrs_.erase( + std::remove_if( + subscription_ptrs_.begin(), + subscription_ptrs_.end(), + [](rclcpp::SubscriptionBase::WeakPtr x) {return x.expired();}), + subscription_ptrs_.end()); } void @@ -83,6 +55,12 @@ CallbackGroup::add_timer(const rclcpp::TimerBase::SharedPtr timer_ptr) { std::lock_guard lock(mutex_); timer_ptrs_.push_back(timer_ptr); + timer_ptrs_.erase( + std::remove_if( + timer_ptrs_.begin(), + timer_ptrs_.end(), + [](rclcpp::TimerBase::WeakPtr x) {return x.expired();}), + timer_ptrs_.end()); } void @@ -90,6 +68,12 @@ CallbackGroup::add_service(const rclcpp::ServiceBase::SharedPtr service_ptr) { std::lock_guard lock(mutex_); service_ptrs_.push_back(service_ptr); + service_ptrs_.erase( + std::remove_if( + service_ptrs_.begin(), + service_ptrs_.end(), + [](rclcpp::ServiceBase::WeakPtr x) {return x.expired();}), + service_ptrs_.end()); } void @@ -97,6 +81,12 @@ CallbackGroup::add_client(const rclcpp::ClientBase::SharedPtr client_ptr) { std::lock_guard lock(mutex_); client_ptrs_.push_back(client_ptr); + client_ptrs_.erase( + std::remove_if( + client_ptrs_.begin(), + client_ptrs_.end(), + [](rclcpp::ClientBase::WeakPtr x) {return x.expired();}), + client_ptrs_.end()); } void @@ -104,6 +94,12 @@ CallbackGroup::add_waitable(const rclcpp::Waitable::SharedPtr waitable_ptr) { std::lock_guard lock(mutex_); waitable_ptrs_.push_back(waitable_ptr); + waitable_ptrs_.erase( + std::remove_if( + waitable_ptrs_.begin(), + waitable_ptrs_.end(), + [](rclcpp::Waitable::WeakPtr x) {return x.expired();}), + waitable_ptrs_.end()); } void diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 341b8457d2..4820b7f095 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -511,11 +511,12 @@ Executor::get_group_by_timer(rclcpp::TimerBase::SharedPtr timer) if (!group) { continue; } - for (auto & weak_timer : group->get_timer_ptrs()) { - auto t = weak_timer.lock(); - if (t == timer) { - return group; - } + auto timer_ref = group->find_timer_ptrs_if( + [timer](const rclcpp::TimerBase::SharedPtr & timer_ptr) -> bool { + return timer_ptr == timer; + }); + if (timer_ref) { + return group; } } } @@ -535,14 +536,15 @@ Executor::get_next_timer(AnyExecutable & any_exec) if (!group || !group->can_be_taken_from().load()) { continue; } - for (auto & timer_ref : group->get_timer_ptrs()) { - auto timer = timer_ref.lock(); - if (timer && timer->is_ready()) { - any_exec.timer = timer; - any_exec.callback_group = group; - any_exec.node_base = node; - return; - } + auto timer_ref = group->find_timer_ptrs_if( + [](const rclcpp::TimerBase::SharedPtr & timer) -> bool { + return timer->is_ready(); + }); + if (timer_ref) { + any_exec.timer = timer_ref; + any_exec.callback_group = group; + any_exec.node_base = node; + return; } } } diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index 24c5c79b2c..cc34d09d87 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -27,8 +27,11 @@ using rclcpp::executors::MultiThreadedExecutor; MultiThreadedExecutor::MultiThreadedExecutor( const rclcpp::executor::ExecutorArgs & args, size_t number_of_threads, - bool yield_before_execute) -: executor::Executor(args), yield_before_execute_(yield_before_execute) + bool yield_before_execute, + std::chrono::nanoseconds next_exec_timeout) +: executor::Executor(args), + yield_before_execute_(yield_before_execute), + next_exec_timeout_(next_exec_timeout) { number_of_threads_ = number_of_threads ? number_of_threads : std::thread::hardware_concurrency(); if (number_of_threads_ == 0) { @@ -77,7 +80,7 @@ MultiThreadedExecutor::run(size_t) if (!rclcpp::ok(this->context_) || !spinning.load()) { return; } - if (!get_next_executable(any_exec)) { + if (!get_next_executable(any_exec, next_exec_timeout_)) { continue; } if (any_exec.timer) { diff --git a/rclcpp/src/rclcpp/memory_strategy.cpp b/rclcpp/src/rclcpp/memory_strategy.cpp index 01b6226d3d..0a886e4a39 100644 --- a/rclcpp/src/rclcpp/memory_strategy.cpp +++ b/rclcpp/src/rclcpp/memory_strategy.cpp @@ -32,16 +32,14 @@ MemoryStrategy::get_subscription_by_handle( if (!group) { continue; } - for (auto & weak_subscription : group->get_subscription_ptrs()) { - auto subscription = weak_subscription.lock(); - if (subscription) { - if (subscription->get_subscription_handle() == subscriber_handle) { - return subscription; - } - if (subscription->get_intra_process_subscription_handle() == subscriber_handle) { - return subscription; - } - } + auto match_subscription = group->find_subscription_ptrs_if( + [&subscriber_handle](const rclcpp::SubscriptionBase::SharedPtr & subscription) -> bool { + return + (subscription->get_subscription_handle() == subscriber_handle) || + (subscription->get_intra_process_subscription_handle() == subscriber_handle); + }); + if (match_subscription) { + return match_subscription; } } } @@ -63,11 +61,12 @@ MemoryStrategy::get_service_by_handle( if (!group) { continue; } - for (auto & weak_service : group->get_service_ptrs()) { - auto service = weak_service.lock(); - if (service && service->get_service_handle() == service_handle) { - return service; - } + auto service_ref = group->find_service_ptrs_if( + [&service_handle](const rclcpp::ServiceBase::SharedPtr & service) -> bool { + return service->get_service_handle() == service_handle; + }); + if (service_ref) { + return service_ref; } } } @@ -89,11 +88,12 @@ MemoryStrategy::get_client_by_handle( if (!group) { continue; } - for (auto & weak_client : group->get_client_ptrs()) { - auto client = weak_client.lock(); - if (client && client->get_client_handle() == client_handle) { - return client; - } + auto client_ref = group->find_client_ptrs_if( + [&client_handle](const rclcpp::ClientBase::SharedPtr & client) -> bool { + return client->get_client_handle() == client_handle; + }); + if (client_ref) { + return client_ref; } } } @@ -138,11 +138,12 @@ MemoryStrategy::get_group_by_subscription( if (!group) { continue; } - for (auto & weak_sub : group->get_subscription_ptrs()) { - auto sub = weak_sub.lock(); - if (sub == subscription) { - return group; - } + auto match_sub = group->find_subscription_ptrs_if( + [&subscription](const rclcpp::SubscriptionBase::SharedPtr & sub) -> bool { + return sub == subscription; + }); + if (match_sub) { + return group; } } } @@ -164,11 +165,12 @@ MemoryStrategy::get_group_by_service( if (!group) { continue; } - for (auto & weak_serv : group->get_service_ptrs()) { - auto serv = weak_serv.lock(); - if (serv && serv == service) { - return group; - } + auto service_ref = group->find_service_ptrs_if( + [&service](const rclcpp::ServiceBase::SharedPtr & serv) -> bool { + return serv == service; + }); + if (service_ref) { + return group; } } } @@ -190,11 +192,12 @@ MemoryStrategy::get_group_by_client( if (!group) { continue; } - for (auto & weak_client : group->get_client_ptrs()) { - auto cli = weak_client.lock(); - if (cli && cli == client) { - return group; - } + auto client_ref = group->find_client_ptrs_if( + [&client](const rclcpp::ClientBase::SharedPtr & cli) -> bool { + return cli == client; + }); + if (client_ref) { + return group; } } } @@ -216,11 +219,12 @@ MemoryStrategy::get_group_by_waitable( if (!group) { continue; } - for (auto & weak_waitable : group->get_waitable_ptrs()) { - auto group_waitable = weak_waitable.lock(); - if (group_waitable && group_waitable == waitable) { - return group; - } + auto waitable_ref = group->find_waitable_ptrs_if( + [&waitable](const rclcpp::Waitable::SharedPtr & group_waitable) -> bool { + return group_waitable == waitable; + }); + if (waitable_ref) { + return group; } } }