Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use rcl_clock_t jump callbacks #543

Merged
merged 2 commits into from Aug 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
69 changes: 12 additions & 57 deletions rclcpp/include/rclcpp/clock.hpp
Expand Up @@ -31,53 +31,18 @@ namespace rclcpp

class TimeSource;

/// A struct to represent a timejump.
/**
* It represents the time jump duration and whether it changed clock type.
*/
struct TimeJump
{
typedef enum ClockChange_t
{
ROS_TIME_NO_CHANGE, ///< The time type before and after the jump is ROS_TIME
ROS_TIME_ACTIVATED, ///< The time type switched to ROS_TIME from SYSTEM_TIME
ROS_TIME_DEACTIVATED, ///< The time type switched to SYSTEM_TIME from ROS_TIME
SYSTEM_TIME_NO_CHANGE ///< The time type before and after the jump is SYSTEM_TIME
} ClockChange_t;

ClockChange_t jump_type_; ///< The change in clock_type if there is one.
rcl_duration_t delta_; ///< The change in time value.
};

/// A class to store a threshold for a TimeJump.
/**
* This class can be used to evaluate a time jump's magnitude.
*/
class JumpThreshold
{
public:
uint64_t min_forward_; ///< The minimum jump forward to be considered exceeded..
uint64_t min_backward_; ///< The minimum backwards jump to be considered exceeded.
bool on_clock_change_; ///< Whether to trigger on any clock type change.

// Test if the threshold is exceeded by a TimeJump
RCLCPP_PUBLIC
bool
is_exceeded(const TimeJump & jump);
};

class JumpHandler
{
public:
RCLCPP_SMART_PTR_DEFINITIONS(JumpHandler)
JumpHandler(
std::function<void()> pre_callback,
std::function<void(TimeJump)> post_callback,
JumpThreshold & threshold);
std::function<void(const rcl_time_jump_t &)> post_callback,
const rcl_jump_threshold_t & threshold);

std::function<void()> pre_callback;
std::function<void(const TimeJump &)> post_callback;
JumpThreshold notice_threshold;
std::function<void(const rcl_time_jump_t &)> post_callback;
rcl_jump_threshold_t notice_threshold;
};

class Clock
Expand Down Expand Up @@ -116,32 +81,22 @@ class Clock
JumpHandler::SharedPtr
create_jump_callback(
std::function<void()> pre_callback,
std::function<void(const TimeJump &)> post_callback,
JumpThreshold & threshold);
std::function<void(const rcl_time_jump_t &)> post_callback,
const rcl_jump_threshold_t & threshold);

private:
// A method for TimeSource to get a list of callbacks to invoke while updating
RCLCPP_PUBLIC
std::vector<JumpHandler::SharedPtr>
get_triggered_callback_handlers(const TimeJump & jump);

// Invoke callbacks that are valid and outside threshold.
// Invoke time jump callback
RCLCPP_PUBLIC
static void invoke_prejump_callbacks(
const std::vector<JumpHandler::SharedPtr> & callbacks);

RCLCPP_PUBLIC
static void invoke_postjump_callbacks(
const std::vector<JumpHandler::SharedPtr> & callbacks,
const TimeJump & jump);
static void
on_time_jump(
const struct rcl_time_jump_t * time_jump,
bool before_jump,
void * user_data);

/// Internal storage backed by rcl
rcl_clock_t rcl_clock_;
friend TimeSource; /// Allow TimeSource to access the rcl_clock_ datatype.
rcl_allocator_t allocator_;

std::mutex callback_list_mutex_;
std::vector<std::weak_ptr<JumpHandler>> active_jump_handlers_;
};

} // namespace rclcpp
Expand Down
107 changes: 38 additions & 69 deletions rclcpp/src/rclcpp/clock.cpp
Expand Up @@ -28,27 +28,11 @@

namespace rclcpp
{
bool
JumpThreshold::is_exceeded(const TimeJump & jump)
{
if (on_clock_change_ &&
(jump.jump_type_ == TimeJump::ClockChange_t::ROS_TIME_ACTIVATED ||
jump.jump_type_ == TimeJump::ClockChange_t::ROS_TIME_DEACTIVATED))
{
return true;
}
if ((uint64_t)jump.delta_.nanoseconds > min_forward_ ||
(uint64_t)jump.delta_.nanoseconds < min_backward_)
{
return true;
}
return false;
}

JumpHandler::JumpHandler(
std::function<void()> pre_callback,
std::function<void(TimeJump)> post_callback,
JumpThreshold & threshold)
std::function<void(const rcl_time_jump_t &)> post_callback,
const rcl_jump_threshold_t & threshold)
: pre_callback(pre_callback),
post_callback(post_callback),
notice_threshold(threshold)
Expand Down Expand Up @@ -115,66 +99,51 @@ Clock::get_clock_type()
return rcl_clock_.type;
}

rclcpp::JumpHandler::SharedPtr
Clock::create_jump_callback(
std::function<void()> pre_callback,
std::function<void(const TimeJump &)> post_callback,
JumpThreshold & threshold)
void
Clock::on_time_jump(
const struct rcl_time_jump_t * time_jump,
bool before_jump,
void * user_data)
{
// JumpHandler jump_callback;
auto jump_callback =
std::make_shared<rclcpp::JumpHandler>(pre_callback, post_callback, threshold);
{
std::lock_guard<std::mutex> guard(callback_list_mutex_);
active_jump_handlers_.push_back(jump_callback);
rclcpp::JumpHandler * handler = static_cast<rclcpp::JumpHandler *>(user_data);
if (before_jump && handler->pre_callback) {
handler->pre_callback();
} else if (!before_jump && handler->post_callback) {
handler->post_callback(*time_jump);
}
return jump_callback;
}

std::vector<JumpHandler::SharedPtr>
Clock::get_triggered_callback_handlers(const TimeJump & jump)
rclcpp::JumpHandler::SharedPtr
Clock::create_jump_callback(
std::function<void()> pre_callback,
std::function<void(const rcl_time_jump_t &)> post_callback,
const rcl_jump_threshold_t & threshold)
{
std::vector<JumpHandler::SharedPtr> callbacks;
std::lock_guard<std::mutex> guard(callback_list_mutex_);
active_jump_handlers_.erase(
std::remove_if(
active_jump_handlers_.begin(),
active_jump_handlers_.end(),
[&callbacks, &jump](const std::weak_ptr<JumpHandler> & wjcb) {
if (auto jcb = wjcb.lock()) {
if (jcb->notice_threshold.is_exceeded(jump)) {
callbacks.push_back(jcb);
}
return false;
}
// Lock failed so clear the weak pointer.
return true;
}),
active_jump_handlers_.end());
return callbacks;
}
// Allocate a new jump handler
auto handler = new rclcpp::JumpHandler(pre_callback, post_callback, threshold);
if (nullptr == handler) {
rclcpp::exceptions::throw_from_rcl_error(RCL_RET_BAD_ALLOC, "Failed to allocate jump handler");
}

void
Clock::invoke_prejump_callbacks(
const std::vector<JumpHandler::SharedPtr> & callbacks)
{
for (const auto cb : callbacks) {
if (cb->pre_callback != nullptr) {
cb->pre_callback();
}
// Try to add the jump callback to the clock
rcl_ret_t ret = rcl_clock_add_jump_callback(&rcl_clock_, threshold,
rclcpp::Clock::on_time_jump, handler);
if (RCL_RET_OK != ret) {
delete handler;
rclcpp::exceptions::throw_from_rcl_error(ret, "Failed to add time jump callback");
}
}

void
Clock::invoke_postjump_callbacks(
const std::vector<JumpHandler::SharedPtr> & callbacks,
const TimeJump & jump)
{
for (auto cb : callbacks) {
if (cb->post_callback != nullptr) {
cb->post_callback(jump);
// *INDENT-OFF*
// create shared_ptr that removes the callback automatically when all copies are destructed
return rclcpp::JumpHandler::SharedPtr(handler, [this](rclcpp::JumpHandler * handler) noexcept {
rcl_ret_t ret = rcl_clock_remove_jump_callback(&rcl_clock_, rclcpp::Clock::on_time_jump,
handler);
delete handler;
if (RCL_RET_OK != ret) {
RCUTILS_LOG_ERROR("Failed to remove time jump callback");
}
}
});
// *INDENT-ON*
}

} // namespace rclcpp
48 changes: 6 additions & 42 deletions rclcpp/src/rclcpp/time_source.cpp
Expand Up @@ -147,54 +147,18 @@ void TimeSource::set_clock(
const builtin_interfaces::msg::Time::SharedPtr msg, bool set_ros_time_enabled,
std::shared_ptr<rclcpp::Clock> clock)
{
// Compute diff
rclcpp::Time msg_time = rclcpp::Time(*msg);
rclcpp::Time now = clock->now();
auto diff = now - msg_time;
rclcpp::TimeJump jump;
jump.delta_.nanoseconds = diff.nanoseconds();

// Compute jump type
if (clock->ros_time_is_active()) {
if (set_ros_time_enabled) {
jump.jump_type_ = TimeJump::ClockChange_t::ROS_TIME_NO_CHANGE;
} else {
jump.jump_type_ = TimeJump::ClockChange_t::ROS_TIME_DEACTIVATED;
}
} else if (!clock->ros_time_is_active()) {
if (set_ros_time_enabled) {
jump.jump_type_ = TimeJump::ClockChange_t::ROS_TIME_ACTIVATED;
} else {
jump.jump_type_ = TimeJump::ClockChange_t::SYSTEM_TIME_NO_CHANGE;
}
}

if (jump.jump_type_ == TimeJump::ClockChange_t::SYSTEM_TIME_NO_CHANGE) {
// No change/no updates don't act.
return;
}

auto active_callbacks = clock->get_triggered_callback_handlers(jump);
clock->invoke_prejump_callbacks(active_callbacks);

// Do change
if (jump.jump_type_ == TimeJump::ClockChange_t::ROS_TIME_DEACTIVATED) {
if (!set_ros_time_enabled && clock->ros_time_is_active()) {
disable_ros_time(clock);
} else if (jump.jump_type_ == TimeJump::ClockChange_t::ROS_TIME_ACTIVATED) {
} else if (set_ros_time_enabled && !clock->ros_time_is_active()) {
enable_ros_time(clock);
}

if (jump.jump_type_ == TimeJump::ClockChange_t::ROS_TIME_ACTIVATED ||
jump.jump_type_ == TimeJump::ClockChange_t::ROS_TIME_NO_CHANGE)
{
auto ret = rcl_set_ros_time_override(&(clock->rcl_clock_), msg_time.nanoseconds());
if (ret != RCL_RET_OK) {
rclcpp::exceptions::throw_from_rcl_error(
ret, "Failed to set ros_time_override_status");
}
auto ret = rcl_set_ros_time_override(&(clock->rcl_clock_), rclcpp::Time(*msg).nanoseconds());
if (ret != RCL_RET_OK) {
rclcpp::exceptions::throw_from_rcl_error(
ret, "Failed to set ros_time_override_status");
}
// Post change callbacks
clock->invoke_postjump_callbacks(active_callbacks, jump);
}

void TimeSource::clock_cb(const rosgraph_msgs::msg::Clock::SharedPtr msg)
Expand Down