Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow timers to keep up the intended rate in MultiThreadedExecutor #1516

Merged
merged 10 commits into from
Feb 17, 2021
65 changes: 64 additions & 1 deletion rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,70 @@ class MultiThreadedExecutor : public rclcpp::Executor
private:
RCLCPP_DISABLE_COPY(MultiThreadedExecutor)

std::mutex wait_mutex_;
/// \internal A mutex that has two locking mechanism, one with higher priority than the other.
/**
* After the current mutex owner release the lock, a thread that used the high
* priority mechanism will have priority over threads that used the low priority mechanism.
*/
class MutexTwoPriorities
ivanpauno marked this conversation as resolved.
Show resolved Hide resolved
{
public:
class HpMutex
ivanpauno marked this conversation as resolved.
Show resolved Hide resolved
{
public:
explicit HpMutex(MutexTwoPriorities & parent)
: parent_(parent) {}

void lock()
{
parent_.data_.lock();
}

void unlock()
{
parent_.data_.unlock();
}

private:
MutexTwoPriorities & parent_;
};

class LpMutex
{
public:
explicit LpMutex(MutexTwoPriorities & parent)
: parent_(parent) {}

void lock()
{
// low_prio_.lock(); data_.lock();
ivanpauno marked this conversation as resolved.
Show resolved Hide resolved
std::unique_lock<std::mutex> lpg{parent_.low_prio_};
parent_.data_.lock();
lpg.release();
}

void unlock()
{
// data_.unlock(); low_prio_.unlock()
ivanpauno marked this conversation as resolved.
Show resolved Hide resolved
std::lock_guard<std::mutex> lpg{parent_.low_prio_, std::adopt_lock};
parent_.data_.unlock();
}

private:
MutexTwoPriorities & parent_;
};

HpMutex hp() {return HpMutex{*this};}
LpMutex lp() {return LpMutex{*this};}

private:
// Implementation detail: the whole idea here is that only one low priority thread can be
// trying to take the data_ mutex, while all high priority threads are already waiting there.
std::mutex low_prio_;
std::mutex data_;
};

MutexTwoPriorities wait_mutex_;
size_t number_of_threads_;
bool yield_before_execute_;
std::chrono::nanoseconds next_exec_timeout_;
Expand Down
11 changes: 8 additions & 3 deletions rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,16 @@ MultiThreadedExecutor::~MultiThreadedExecutor() {}
void
MultiThreadedExecutor::spin()
{
using MutexTwoPriorities = rclcpp::executors::MultiThreadedExecutor::MutexTwoPriorities;
if (spinning.exchange(true)) {
throw std::runtime_error("spin() called while already spinning");
}
RCLCPP_SCOPE_EXIT(this->spinning.store(false); );
std::vector<std::thread> threads;
size_t thread_id = 0;
{
std::lock_guard<std::mutex> wait_lock(wait_mutex_);
auto lp_wait_mutex = wait_mutex_.lp();
std::lock_guard<MutexTwoPriorities::LpMutex> wait_lock(lp_wait_mutex);
for (; thread_id < number_of_threads_ - 1; ++thread_id) {
auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id);
threads.emplace_back(func);
Expand All @@ -73,10 +75,12 @@ MultiThreadedExecutor::get_number_of_threads()
void
MultiThreadedExecutor::run(size_t)
{
using MutexTwoPriorities = rclcpp::executors::MultiThreadedExecutor::MutexTwoPriorities;
while (rclcpp::ok(this->context_) && spinning.load()) {
rclcpp::AnyExecutable any_exec;
{
std::lock_guard<std::mutex> wait_lock(wait_mutex_);
auto lp_wait_mutex = wait_mutex_.lp();
std::lock_guard<MutexTwoPriorities::LpMutex> wait_lock(lp_wait_mutex);
if (!rclcpp::ok(this->context_) || !spinning.load()) {
return;
}
Expand All @@ -103,7 +107,8 @@ MultiThreadedExecutor::run(size_t)
execute_any_executable(any_exec);
ivanpauno marked this conversation as resolved.
Show resolved Hide resolved

if (any_exec.timer) {
std::lock_guard<std::mutex> wait_lock(wait_mutex_);
auto hp_wait_mutex = wait_mutex_.hp();
std::lock_guard<MutexTwoPriorities::HpMutex> wait_lock(hp_wait_mutex);
auto it = scheduled_timers_.find(any_exec.timer);
if (it != scheduled_timers_.end()) {
scheduled_timers_.erase(it);
Expand Down