Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Independent mutex to check and remove timer from scheduled timer #1168

Open
wants to merge 6 commits into
base: rolling
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -82,6 +82,7 @@ class MultiThreadedExecutor : public rclcpp::Executor
RCLCPP_DISABLE_COPY(MultiThreadedExecutor)

std::mutex wait_mutex_;
std::mutex scheduled_timers_mutex_;
size_t number_of_threads_;
bool yield_before_execute_;
std::chrono::nanoseconds next_exec_timeout_;
Expand Down
25 changes: 14 additions & 11 deletions rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp
Expand Up @@ -83,27 +83,30 @@ MultiThreadedExecutor::run(size_t)
if (!get_next_executable(any_exec, next_exec_timeout_)) {
continue;
}
if (any_exec.timer) {
// Guard against multiple threads getting the same timer.
if (scheduled_timers_.count(any_exec.timer) != 0) {
// Make sure that any_exec's callback group is reset before
// the lock is released.
if (any_exec.callback_group) {
any_exec.callback_group->can_be_taken_from().store(true);
}
continue;
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. context switch here with already scheduled timer by Thread-1. (Thread-2)

if (any_exec.timer) {
std::lock_guard<std::mutex> wait_lock(scheduled_timers_mutex_);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. the same timer will be scheduled again and executed. (Thread-2)

// Guard against multiple threads getting the same timer.
if (scheduled_timers_.count(any_exec.timer) != 0) {
// Make sure that any_exec's callback group is reset before
// the lock is released.
if (any_exec.callback_group) {
any_exec.callback_group->can_be_taken_from().store(true);
}
scheduled_timers_.insert(any_exec.timer);
continue;
}
scheduled_timers_.insert(any_exec.timer);
}

if (yield_before_execute_) {
std::this_thread::yield();
}

execute_any_executable(any_exec);

if (any_exec.timer) {
std::lock_guard<std::mutex> wait_lock(wait_mutex_);
std::lock_guard<std::mutex> wait_lock(scheduled_timers_mutex_);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. execute timer and remove it from scheduled_timers_mutex_ (Thread-1)

auto it = scheduled_timers_.find(any_exec.timer);
if (it != scheduled_timers_.end()) {
scheduled_timers_.erase(it);
Expand Down
6 changes: 6 additions & 0 deletions rclcpp/test/CMakeLists.txt
Expand Up @@ -464,6 +464,12 @@ if(TARGET test_guard_condition)
target_link_libraries(test_guard_condition ${PROJECT_NAME})
endif()

ament_add_gtest(test_timer_count rclcpp/test_timer_call_count.cpp
APPEND_LIBRARY_DIRS "${append_library_dirs}")
if(TARGET test_timer_count)
target_link_libraries(test_timer_count ${PROJECT_NAME})
endif()

ament_add_gtest(test_wait_set rclcpp/test_wait_set.cpp
APPEND_LIBRARY_DIRS "${append_library_dirs}")
if(TARGET test_wait_set)
Expand Down
116 changes: 116 additions & 0 deletions rclcpp/test/rclcpp/test_timer_call_count.cpp
@@ -0,0 +1,116 @@
// Copyright 2020 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <gtest/gtest.h>

#include <chrono>
#include <string>
#include <memory>

#include "rclcpp/rclcpp.hpp"
#include "rclcpp/exceptions.hpp"
#include "rclcpp/node.hpp"
#include "rclcpp/executors.hpp"
#include "rclcpp/executor.hpp"

using namespace std::chrono_literals;

class TestTimerCount : public ::testing::Test
{
protected:
static void SetUpTestCase()
{
rclcpp::init(0, nullptr);
}
};

constexpr int EXECUTION_COUNT = 5;
constexpr rcl_duration_value_t TIME_ELAPSED = 12;
/*
Test timer wait mutex with multithreaded executor.
After 5 call
*/

TEST_F(TestTimerCount, timer_call_count_multi_threaded) {
ivanpauno marked this conversation as resolved.
Show resolved Hide resolved
rclcpp::executors::MultiThreadedExecutor executor;

std::shared_ptr<rclcpp::Node> node =
std::make_shared<rclcpp::Node>("timer_call_count_multi_threaded");

auto cbg = node->create_callback_group(rclcpp::CallbackGroupType::Reentrant);

rclcpp::Clock system_clock(RCL_STEADY_TIME);
std::mutex last_mutex;
rclcpp::Time initial = system_clock.now();

std::atomic_int timer_count {0};

auto timer_callback = [&timer_count, &executor, &system_clock, &last_mutex, &initial]() {
rclcpp::Time now = system_clock.now();
timer_count++;

{
std::lock_guard<std::mutex> lock(last_mutex);
rcl_duration_value_t diff = (now - initial).nanoseconds() / (rcl_duration_value_t)1.0e9;

if (diff > TIME_ELAPSED) {
executor.cancel();
ASSERT_GT(timer_count, EXECUTION_COUNT);
}
}
};

auto timer_ = node->create_wall_timer(
2s, timer_callback, cbg);

executor.add_node(node);
executor.spin();
}

/*
Test timer wait mutex with singlethreaded executor
*/
TEST_F(TestTimerCount, timer_call_count_single_threaded) {
rclcpp::executors::SingleThreadedExecutor executor;

std::shared_ptr<rclcpp::Node> node =
std::make_shared<rclcpp::Node>("timer_call_count_single_threaded");

rclcpp::Clock system_clock(RCL_STEADY_TIME);
std::mutex last_mutex;
rclcpp::Time initial = system_clock.now();

std::atomic_int timer_count {0};

auto timer_callback = [&timer_count, &executor, &system_clock, &last_mutex, &initial]() {
rclcpp::Time now = system_clock.now();
timer_count++;

{
std::lock_guard<std::mutex> lock(last_mutex);
rcl_duration_value_t diff = (now - initial).nanoseconds() / (rcl_duration_value_t)1.0e9;

if (diff > TIME_ELAPSED) {
executor.cancel();
ASSERT_GT(timer_count, EXECUTION_COUNT);
}
}
};

auto timer_ = node->create_wall_timer(
2s, timer_callback);

executor.add_node(node);
executor.spin();
}