Skip to content

Commit

Permalink
Context, Shutdown Callback, Condition Var per call
Browse files Browse the repository at this point in the history
The `Clock` doesn't have enough information to know which Context should
wake it on shutdown, so this adds a Context as an argument to
sleep_until().

Since the context is per call, the shutdown callback is also registered
per call and cannot be stored on impl_.

The condition_variable is also unique per call to reduce spurious
wakeups when multiple threads sleep on the same clock.
  • Loading branch information
sloretz committed Nov 3, 2021
1 parent 01b86ec commit 8302273
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 15 deletions.
7 changes: 6 additions & 1 deletion rclcpp/include/rclcpp/clock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <memory>
#include <mutex>

#include "rclcpp/contexts/default_context.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/time.hpp"
#include "rclcpp/visibility_control.hpp"
Expand Down Expand Up @@ -87,15 +88,19 @@ class Clock
* false. There is not a consistent choice of sleeping time when the time source changes,
* so this is up to the caller to call again if needed.
*
* \throws std::runtime_error if the context is invalid
* \param until absolute time according to current clock type to sleep until.
* \param context the rclcpp context the clock should use to check that ROS is still initialized.
* \return true immediately if `until` is in the past
* \return true when the time `until` is reached
* \return false if time cannot be reached reliably, for example from shutdown or a change
* of time source.
*/
RCLCPP_PUBLIC
bool
sleep_until(Time until);
sleep_until(
Time until,
Context::SharedPtr context = contexts::get_global_default_context());

/**
* Returns the clock of the type `RCL_ROS_TIME` is active.
Expand Down
38 changes: 24 additions & 14 deletions rclcpp/src/rclcpp/clock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
#include <memory>
#include <thread>

#include "rclcpp/contexts/default_context.hpp"
#include "rclcpp/exceptions.hpp"
#include "rclcpp/utilities.hpp"

#include "rcpputils/scope_exit.hpp"
#include "rcutils/logging_macros.h"

namespace rclcpp
Expand Down Expand Up @@ -50,8 +50,6 @@ class Clock::Impl
rcl_clock_t rcl_clock_;
rcl_allocator_t allocator_;
std::mutex clock_mutex_;
std::condition_variable cv_;
rclcpp::OnShutdownCallbackHandle shutdown_cb_;
};

JumpHandler::JumpHandler(
Expand All @@ -66,15 +64,10 @@ JumpHandler::JumpHandler(
Clock::Clock(rcl_clock_type_t clock_type)
: impl_(new Clock::Impl(clock_type))
{
impl_->shutdown_cb_ = rclcpp::contexts::get_global_default_context()->add_on_shutdown_callback(
[this]() {
impl_->cv_.notify_all();
});
}

Clock::~Clock()
{
rclcpp::contexts::get_global_default_context()->remove_on_shutdown_callback(impl_->shutdown_cb_);
}

Time
Expand All @@ -91,8 +84,11 @@ Clock::now()
}

bool
Clock::sleep_until(Time until)
Clock::sleep_until(Time until, Context::SharedPtr context)
{
if (!context || !context->is_valid()) {
throw std::runtime_error("context cannot be slept with because it's invalid");
}
const auto this_clock_type = get_clock_type();
if (until.get_clock_type() != this_clock_type) {
RCUTILS_LOG_ERROR(
Expand All @@ -102,14 +98,28 @@ Clock::sleep_until(Time until)
}
bool time_source_changed = false;

std::condition_variable cv;

// Wake this thread if the context is shutdown
rclcpp::OnShutdownCallbackHandle shutdown_cb_handle;
shutdown_cb_handle = context->add_on_shutdown_callback(
[&cv]() {
cv.notify_one();
});
// No longer need the shutdown callback when this function exits
auto callback_remover = rcpputils::scope_exit(
[context, &shutdown_cb_handle]() {
context->remove_on_shutdown_callback(shutdown_cb_handle);
});

if (this_clock_type == RCL_STEADY_TIME) {
auto steady_time = std::chrono::steady_clock::time_point(
std::chrono::nanoseconds(until.nanoseconds()));

// loop over spurious wakeups but notice shutdown
std::unique_lock lock(impl_->clock_mutex_);
while (now() < until && rclcpp::ok()) {
impl_->cv_.wait_until(lock, steady_time);
cv.wait_until(lock, steady_time);
}
} else if (this_clock_type == RCL_SYSTEM_TIME) {
auto system_time = std::chrono::time_point<
Expand All @@ -119,7 +129,7 @@ Clock::sleep_until(Time until)
// loop over spurious wakeups but notice shutdown
std::unique_lock lock(impl_->clock_mutex_);
while (now() < until && rclcpp::ok()) {
impl_->cv_.wait_until(lock, system_time);
cv.wait_until(lock, system_time);
}
} else if (this_clock_type == RCL_ROS_TIME) {
// Install jump handler for any amount of time change, for two purposes:
Expand All @@ -131,7 +141,7 @@ Clock::sleep_until(Time until)
threshold.min_forward.nanoseconds = 0;
auto clock_handler = create_jump_callback(
[]() {},
[this](const rcl_time_jump_t &) {impl_->cv_.notify_all();},
[&cv](const rcl_time_jump_t &) {cv.notify_one();},
threshold);

try {
Expand All @@ -143,7 +153,7 @@ Clock::sleep_until(Time until)
// loop over spurious wakeups but notice shutdown or time source change
std::unique_lock lock(impl_->clock_mutex_);
while (now() < until && rclcpp::ok() && !ros_time_is_active()) {
impl_->cv_.wait_until(lock, system_time);
cv.wait_until(lock, system_time);
}
time_source_changed = ros_time_is_active();
} else {
Expand All @@ -152,7 +162,7 @@ Clock::sleep_until(Time until)
// jump callbacks wake the cv on every new sample.
std::unique_lock lock(impl_->clock_mutex_);
while (now() < until && rclcpp::ok() && ros_time_is_active()) {
impl_->cv_.wait(lock);
cv.wait(lock);
}
time_source_changed = !ros_time_is_active();
}
Expand Down

0 comments on commit 8302273

Please sign in to comment.