Skip to content

Commit

Permalink
Crash in callback group pointer vector iterator
Browse files Browse the repository at this point in the history
1. Memory leak in callback group weak reference vectors
  There is leak in the callback group weak reference vectors that occurs when a weak reference becomes invalid (subscription is dropped, service deleted, etc).
The now obsolete weak pointer reference is never deleted from the callback group pointer vector causing the leak.
  The resolution of this problem is implemented by scanning and deleting expired weak pointer at the time of insertion of a new weak pointer into the callback group vectors.
This approach is the lowest computational cost to purging obsolete weak pointers.

2. Crash in iterator for callback group pointer vectors
  This problem exists because a reference to the callback group pointer vector is provided as a return value to facilitate loop iterator.
This is a significant crash root cause with a multithreaded executor where a thread is able to add new subscription to the callback group.
The crash is caused by a concurrent modification of the weak pointer vector while another thread is iterating over that same vector resulting in a crash.

Testing:
  These changes where implemented and tested using a test application which creates / publish / deletes thousands of topics (up to 100,000) from a separate standalone thread while the ROS2 layer is receiving data on the topics deleted.
The muiltithreaded was setup to contain 10 separate executor threads on a single mutually exclusive callback group containing thousands of topics.

issue: #813
Signed-off-by: Guillaume Autran <gautran@clearpath.ai>
  • Loading branch information
guillaumeautran committed Aug 28, 2019
1 parent 17841d6 commit bd6351b
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 135 deletions.
68 changes: 49 additions & 19 deletions rclcpp/include/rclcpp/callback_group.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,25 +62,40 @@ class CallbackGroup
RCLCPP_PUBLIC
explicit CallbackGroup(CallbackGroupType group_type);

RCLCPP_PUBLIC
const std::vector<rclcpp::SubscriptionBase::WeakPtr> &
get_subscription_ptrs() const;

RCLCPP_PUBLIC
const std::vector<rclcpp::TimerBase::WeakPtr> &
get_timer_ptrs() const;

RCLCPP_PUBLIC
const std::vector<rclcpp::ServiceBase::WeakPtr> &
get_service_ptrs() const;

RCLCPP_PUBLIC
const std::vector<rclcpp::ClientBase::WeakPtr> &
get_client_ptrs() const;

RCLCPP_PUBLIC
const std::vector<rclcpp::Waitable::WeakPtr> &
get_waitable_ptrs() const;
template<typename Function>
rclcpp::SubscriptionBase::SharedPtr
find_subscription_ptrs_if(Function func) const
{
return _find_ptrs_if_impl<rclcpp::SubscriptionBase, Function>(func, subscription_ptrs_);
}

template<typename Function>
rclcpp::TimerBase::SharedPtr
find_timer_ptrs_if(Function func) const
{
return _find_ptrs_if_impl<rclcpp::TimerBase, Function>(func, timer_ptrs_);
}

template<typename Function>
rclcpp::ServiceBase::SharedPtr
find_service_ptrs_if(Function func) const
{
return _find_ptrs_if_impl<rclcpp::ServiceBase, Function>(func, service_ptrs_);
}

template<typename Function>
rclcpp::ClientBase::SharedPtr
find_client_ptrs_if(Function func) const
{
return _find_ptrs_if_impl<rclcpp::ClientBase, Function>(func, client_ptrs_);
}

template<typename Function>
rclcpp::Waitable::SharedPtr
find_waitable_ptrs_if(Function func) const
{
return _find_ptrs_if_impl<rclcpp::Waitable, Function>(func, waitable_ptrs_);
}

RCLCPP_PUBLIC
std::atomic_bool &
Expand Down Expand Up @@ -130,6 +145,21 @@ class CallbackGroup
std::vector<rclcpp::ClientBase::WeakPtr> client_ptrs_;
std::vector<rclcpp::Waitable::WeakPtr> waitable_ptrs_;
std::atomic_bool can_be_taken_from_;

private:
template<typename TypeT, typename Function>
typename TypeT::SharedPtr _find_ptrs_if_impl(
Function func, const std::vector<typename TypeT::WeakPtr> & vect_ptrs) const
{
std::lock_guard<std::mutex> lock(mutex_);
for (auto & weak_ptr : vect_ptrs) {
auto ref_ptr = weak_ptr.lock();
if (ref_ptr && func(ref_ptr)) {
return ref_ptr;
}
}
return typename TypeT::SharedPtr();
}
};

} // namespace callback_group
Expand Down
5 changes: 4 additions & 1 deletion rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef RCLCPP__EXECUTORS__MULTI_THREADED_EXECUTOR_HPP_
#define RCLCPP__EXECUTORS__MULTI_THREADED_EXECUTOR_HPP_

#include <chrono>
#include <memory>
#include <mutex>
#include <set>
Expand Down Expand Up @@ -53,7 +54,8 @@ class MultiThreadedExecutor : public executor::Executor
MultiThreadedExecutor(
const executor::ExecutorArgs & args = executor::ExecutorArgs(),
size_t number_of_threads = 0,
bool yield_before_execute = false);
bool yield_before_execute = false,
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));

RCLCPP_PUBLIC
virtual ~MultiThreadedExecutor();
Expand All @@ -77,6 +79,7 @@ class MultiThreadedExecutor : public executor::Executor
std::mutex wait_mutex_;
size_t number_of_threads_;
bool yield_before_execute_;
std::chrono::nanoseconds next_exec_timeout_;

std::set<TimerBase::SharedPtr> scheduled_timers_;
};
Expand Down
41 changes: 16 additions & 25 deletions rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,40 +164,31 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
if (!group || !group->can_be_taken_from().load()) {
continue;
}
for (auto & weak_subscription : group->get_subscription_ptrs()) {
auto subscription = weak_subscription.lock();
if (subscription) {
group->find_subscription_ptrs_if(
[this](const rclcpp::SubscriptionBase::SharedPtr & subscription) {
subscription_handles_.push_back(subscription->get_subscription_handle());
if (subscription->get_intra_process_subscription_handle()) {
subscription_handles_.push_back(
subscription->get_intra_process_subscription_handle());
}
}
}
for (auto & weak_service : group->get_service_ptrs()) {
auto service = weak_service.lock();
if (service) {
return false;
});
group->find_service_ptrs_if([this](const rclcpp::ServiceBase::SharedPtr & service) {
service_handles_.push_back(service->get_service_handle());
}
}
for (auto & weak_client : group->get_client_ptrs()) {
auto client = weak_client.lock();
if (client) {
return false;
});
group->find_client_ptrs_if([this](const rclcpp::ClientBase::SharedPtr & client) {
client_handles_.push_back(client->get_client_handle());
}
}
for (auto & weak_timer : group->get_timer_ptrs()) {
auto timer = weak_timer.lock();
if (timer) {
return false;
});
group->find_timer_ptrs_if([this](const rclcpp::TimerBase::SharedPtr & timer) {
timer_handles_.push_back(timer->get_timer_handle());
}
}
for (auto & weak_waitable : group->get_waitable_ptrs()) {
auto waitable = weak_waitable.lock();
if (waitable) {
return false;
});
group->find_waitable_ptrs_if([this](const rclcpp::Waitable::SharedPtr & waitable) {
waitable_handles_.push_back(waitable);
}
}
return false;
});
}
}
return has_invalid_weak_nodes;
Expand Down
64 changes: 30 additions & 34 deletions rclcpp/src/rclcpp/callback_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,40 +23,6 @@ CallbackGroup::CallbackGroup(CallbackGroupType group_type)
: type_(group_type), can_be_taken_from_(true)
{}

const std::vector<rclcpp::SubscriptionBase::WeakPtr> &
CallbackGroup::get_subscription_ptrs() const
{
std::lock_guard<std::mutex> lock(mutex_);
return subscription_ptrs_;
}

const std::vector<rclcpp::TimerBase::WeakPtr> &
CallbackGroup::get_timer_ptrs() const
{
std::lock_guard<std::mutex> lock(mutex_);
return timer_ptrs_;
}

const std::vector<rclcpp::ServiceBase::WeakPtr> &
CallbackGroup::get_service_ptrs() const
{
std::lock_guard<std::mutex> lock(mutex_);
return service_ptrs_;
}

const std::vector<rclcpp::ClientBase::WeakPtr> &
CallbackGroup::get_client_ptrs() const
{
std::lock_guard<std::mutex> lock(mutex_);
return client_ptrs_;
}

const std::vector<rclcpp::Waitable::WeakPtr> &
CallbackGroup::get_waitable_ptrs() const
{
std::lock_guard<std::mutex> lock(mutex_);
return waitable_ptrs_;
}

std::atomic_bool &
CallbackGroup::can_be_taken_from()
Expand All @@ -76,34 +42,64 @@ CallbackGroup::add_subscription(
{
std::lock_guard<std::mutex> lock(mutex_);
subscription_ptrs_.push_back(subscription_ptr);
subscription_ptrs_.erase(
std::remove_if(
subscription_ptrs_.begin(),
subscription_ptrs_.end(),
[](rclcpp::SubscriptionBase::WeakPtr x) {return x.expired();}),
subscription_ptrs_.end());
}

void
CallbackGroup::add_timer(const rclcpp::TimerBase::SharedPtr timer_ptr)
{
std::lock_guard<std::mutex> lock(mutex_);
timer_ptrs_.push_back(timer_ptr);
timer_ptrs_.erase(
std::remove_if(
timer_ptrs_.begin(),
timer_ptrs_.end(),
[](rclcpp::TimerBase::WeakPtr x) {return x.expired();}),
timer_ptrs_.end());
}

void
CallbackGroup::add_service(const rclcpp::ServiceBase::SharedPtr service_ptr)
{
std::lock_guard<std::mutex> lock(mutex_);
service_ptrs_.push_back(service_ptr);
service_ptrs_.erase(
std::remove_if(
service_ptrs_.begin(),
service_ptrs_.end(),
[](rclcpp::ServiceBase::WeakPtr x) {return x.expired();}),
service_ptrs_.end());
}

void
CallbackGroup::add_client(const rclcpp::ClientBase::SharedPtr client_ptr)
{
std::lock_guard<std::mutex> lock(mutex_);
client_ptrs_.push_back(client_ptr);
client_ptrs_.erase(
std::remove_if(
client_ptrs_.begin(),
client_ptrs_.end(),
[](rclcpp::ClientBase::WeakPtr x) {return x.expired();}),
client_ptrs_.end());
}

void
CallbackGroup::add_waitable(const rclcpp::Waitable::SharedPtr waitable_ptr)
{
std::lock_guard<std::mutex> lock(mutex_);
waitable_ptrs_.push_back(waitable_ptr);
waitable_ptrs_.erase(
std::remove_if(
waitable_ptrs_.begin(),
waitable_ptrs_.end(),
[](rclcpp::Waitable::WeakPtr x) {return x.expired();}),
waitable_ptrs_.end());
}

void
Expand Down
28 changes: 15 additions & 13 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -511,11 +511,12 @@ Executor::get_group_by_timer(rclcpp::TimerBase::SharedPtr timer)
if (!group) {
continue;
}
for (auto & weak_timer : group->get_timer_ptrs()) {
auto t = weak_timer.lock();
if (t == timer) {
return group;
}
auto timer_ref = group->find_timer_ptrs_if(
[timer](const rclcpp::TimerBase::SharedPtr & timer_ptr) -> bool {
return timer_ptr == timer;
});
if (timer_ref) {
return group;
}
}
}
Expand All @@ -535,14 +536,15 @@ Executor::get_next_timer(AnyExecutable & any_exec)
if (!group || !group->can_be_taken_from().load()) {
continue;
}
for (auto & timer_ref : group->get_timer_ptrs()) {
auto timer = timer_ref.lock();
if (timer && timer->is_ready()) {
any_exec.timer = timer;
any_exec.callback_group = group;
any_exec.node_base = node;
return;
}
auto timer_ref = group->find_timer_ptrs_if(
[](const rclcpp::TimerBase::SharedPtr & timer) -> bool {
return timer->is_ready();
});
if (timer_ref) {
any_exec.timer = timer_ref;
any_exec.callback_group = group;
any_exec.node_base = node;
return;
}
}
}
Expand Down
9 changes: 6 additions & 3 deletions rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ using rclcpp::executors::MultiThreadedExecutor;
MultiThreadedExecutor::MultiThreadedExecutor(
const rclcpp::executor::ExecutorArgs & args,
size_t number_of_threads,
bool yield_before_execute)
: executor::Executor(args), yield_before_execute_(yield_before_execute)
bool yield_before_execute,
std::chrono::nanoseconds next_exec_timeout)
: executor::Executor(args),
yield_before_execute_(yield_before_execute),
next_exec_timeout_(next_exec_timeout)
{
number_of_threads_ = number_of_threads ? number_of_threads : std::thread::hardware_concurrency();
if (number_of_threads_ == 0) {
Expand Down Expand Up @@ -77,7 +80,7 @@ MultiThreadedExecutor::run(size_t)
if (!rclcpp::ok(this->context_) || !spinning.load()) {
return;
}
if (!get_next_executable(any_exec)) {
if (!get_next_executable(any_exec, next_exec_timeout_)) {
continue;
}
if (any_exec.timer) {
Expand Down
Loading

0 comments on commit bd6351b

Please sign in to comment.