Skip to content

Commit

Permalink
Use rcl_clock_t jump callbacks
Browse files Browse the repository at this point in the history
Relieves rclcpp::TimeSource responsibility of calling jump callbacks.
  • Loading branch information
sloretz committed Aug 27, 2018
1 parent 354d933 commit 9b55985
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 179 deletions.
69 changes: 12 additions & 57 deletions rclcpp/include/rclcpp/clock.hpp
Expand Up @@ -31,53 +31,18 @@ namespace rclcpp

class TimeSource;

/// A struct to represent a timejump.
/**
* It represents the time jump duration and whether it changed clock type.
*/
struct TimeJump
{
typedef enum ClockChange_t
{
ROS_TIME_NO_CHANGE, ///< The time type before and after the jump is ROS_TIME
ROS_TIME_ACTIVATED, ///< The time type switched to ROS_TIME from SYSTEM_TIME
ROS_TIME_DEACTIVATED, ///< The time type switched to SYSTEM_TIME from ROS_TIME
SYSTEM_TIME_NO_CHANGE ///< The time type before and after the jump is SYSTEM_TIME
} ClockChange_t;

ClockChange_t jump_type_; ///< The change in clock_type if there is one.
rcl_duration_t delta_; ///< The change in time value.
};

/// A class to store a threshold for a TimeJump.
/**
* This class can be used to evaluate a time jump's magnitude.
*/
class JumpThreshold
{
public:
uint64_t min_forward_; ///< The minimum jump forward to be considered exceeded..
uint64_t min_backward_; ///< The minimum backwards jump to be considered exceeded.
bool on_clock_change_; ///< Whether to trigger on any clock type change.

// Test if the threshold is exceeded by a TimeJump
RCLCPP_PUBLIC
bool
is_exceeded(const TimeJump & jump);
};

class JumpHandler
{
public:
RCLCPP_SMART_PTR_DEFINITIONS(JumpHandler)
JumpHandler(
std::function<void()> pre_callback,
std::function<void(TimeJump)> post_callback,
JumpThreshold & threshold);
std::function<void(const rcl_time_jump_t &)> post_callback,
const rcl_jump_threshold_t & threshold);

std::function<void()> pre_callback;
std::function<void(const TimeJump &)> post_callback;
JumpThreshold notice_threshold;
std::function<void(const rcl_time_jump_t &)> post_callback;
rcl_jump_threshold_t notice_threshold;
};

class Clock
Expand Down Expand Up @@ -116,32 +81,22 @@ class Clock
JumpHandler::SharedPtr
create_jump_callback(
std::function<void()> pre_callback,
std::function<void(const TimeJump &)> post_callback,
JumpThreshold & threshold);
std::function<void(const rcl_time_jump_t &)> post_callback,
const rcl_jump_threshold_t & threshold);

private:
// A method for TimeSource to get a list of callbacks to invoke while updating
RCLCPP_PUBLIC
std::vector<JumpHandler::SharedPtr>
get_triggered_callback_handlers(const TimeJump & jump);

// Invoke callbacks that are valid and outside threshold.
// Invoke time jump callback
RCLCPP_PUBLIC
static void invoke_prejump_callbacks(
const std::vector<JumpHandler::SharedPtr> & callbacks);

RCLCPP_PUBLIC
static void invoke_postjump_callbacks(
const std::vector<JumpHandler::SharedPtr> & callbacks,
const TimeJump & jump);
static void
on_time_jump(
const struct rcl_time_jump_t * time_jump,
bool before_jump,
void * user_data);

/// Internal storage backed by rcl
rcl_clock_t rcl_clock_;
friend TimeSource; /// Allow TimeSource to access the rcl_clock_ datatype.
rcl_allocator_t allocator_;

std::mutex callback_list_mutex_;
std::vector<std::weak_ptr<JumpHandler>> active_jump_handlers_;
};

} // namespace rclcpp
Expand Down
107 changes: 38 additions & 69 deletions rclcpp/src/rclcpp/clock.cpp
Expand Up @@ -28,27 +28,11 @@

namespace rclcpp
{
bool
JumpThreshold::is_exceeded(const TimeJump & jump)
{
if (on_clock_change_ &&
(jump.jump_type_ == TimeJump::ClockChange_t::ROS_TIME_ACTIVATED ||
jump.jump_type_ == TimeJump::ClockChange_t::ROS_TIME_DEACTIVATED))
{
return true;
}
if ((uint64_t)jump.delta_.nanoseconds > min_forward_ ||
(uint64_t)jump.delta_.nanoseconds < min_backward_)
{
return true;
}
return false;
}

JumpHandler::JumpHandler(
std::function<void()> pre_callback,
std::function<void(TimeJump)> post_callback,
JumpThreshold & threshold)
std::function<void(const rcl_time_jump_t &)> post_callback,
const rcl_jump_threshold_t & threshold)
: pre_callback(pre_callback),
post_callback(post_callback),
notice_threshold(threshold)
Expand Down Expand Up @@ -115,66 +99,51 @@ Clock::get_clock_type()
return rcl_clock_.type;
}

rclcpp::JumpHandler::SharedPtr
Clock::create_jump_callback(
std::function<void()> pre_callback,
std::function<void(const TimeJump &)> post_callback,
JumpThreshold & threshold)
void
Clock::on_time_jump(
const struct rcl_time_jump_t * time_jump,
bool before_jump,
void * user_data)
{
// JumpHandler jump_callback;
auto jump_callback =
std::make_shared<rclcpp::JumpHandler>(pre_callback, post_callback, threshold);
{
std::lock_guard<std::mutex> guard(callback_list_mutex_);
active_jump_handlers_.push_back(jump_callback);
rclcpp::JumpHandler * handler = static_cast<rclcpp::JumpHandler *>(user_data);
if (before_jump && handler->pre_callback) {
handler->pre_callback();
} else if (handler->post_callback) {
handler->post_callback(*time_jump);
}
return jump_callback;
}

std::vector<JumpHandler::SharedPtr>
Clock::get_triggered_callback_handlers(const TimeJump & jump)
rclcpp::JumpHandler::SharedPtr
Clock::create_jump_callback(
std::function<void()> pre_callback,
std::function<void(const rcl_time_jump_t &)> post_callback,
const rcl_jump_threshold_t & threshold)
{
std::vector<JumpHandler::SharedPtr> callbacks;
std::lock_guard<std::mutex> guard(callback_list_mutex_);
active_jump_handlers_.erase(
std::remove_if(
active_jump_handlers_.begin(),
active_jump_handlers_.end(),
[&callbacks, &jump](const std::weak_ptr<JumpHandler> & wjcb) {
if (auto jcb = wjcb.lock()) {
if (jcb->notice_threshold.is_exceeded(jump)) {
callbacks.push_back(jcb);
}
return false;
}
// Lock failed so clear the weak pointer.
return true;
}),
active_jump_handlers_.end());
return callbacks;
}
// Allocate a new jump handler
auto handler = new rclcpp::JumpHandler(pre_callback, post_callback, threshold);
if (nullptr == handler) {
rclcpp::exceptions::throw_from_rcl_error(RCL_RET_BAD_ALLOC, "Failed to allocate jump handler");
}

void
Clock::invoke_prejump_callbacks(
const std::vector<JumpHandler::SharedPtr> & callbacks)
{
for (const auto cb : callbacks) {
if (cb->pre_callback != nullptr) {
cb->pre_callback();
}
// Try to add the jump callback to the clock
rcl_ret_t ret = rcl_clock_add_jump_callback(&rcl_clock_, threshold,
rclcpp::Clock::on_time_jump, handler);
if (RCL_RET_OK != ret) {
delete handler;
rclcpp::exceptions::throw_from_rcl_error(ret, "Failed to add time jump callback");
}
}

void
Clock::invoke_postjump_callbacks(
const std::vector<JumpHandler::SharedPtr> & callbacks,
const TimeJump & jump)
{
for (auto cb : callbacks) {
if (cb->post_callback != nullptr) {
cb->post_callback(jump);
// *INDENT-OFF*
// create shared_ptr that removes the callback automatically when all copies are destructed
return rclcpp::JumpHandler::SharedPtr(handler, [this](rclcpp::JumpHandler * handler) noexcept {
rcl_ret_t ret = rcl_clock_remove_jump_callback(&rcl_clock_, rclcpp::Clock::on_time_jump,
handler);
delete handler;
if (RCL_RET_OK != ret) {
RCUTILS_LOG_ERROR("Failed to remove time jump callback");
}
}
});
// *INDENT-ON*
}

} // namespace rclcpp
48 changes: 6 additions & 42 deletions rclcpp/src/rclcpp/time_source.cpp
Expand Up @@ -147,54 +147,18 @@ void TimeSource::set_clock(
const builtin_interfaces::msg::Time::SharedPtr msg, bool set_ros_time_enabled,
std::shared_ptr<rclcpp::Clock> clock)
{
// Compute diff
rclcpp::Time msg_time = rclcpp::Time(*msg);
rclcpp::Time now = clock->now();
auto diff = now - msg_time;
rclcpp::TimeJump jump;
jump.delta_.nanoseconds = diff.nanoseconds();

// Compute jump type
if (clock->ros_time_is_active()) {
if (set_ros_time_enabled) {
jump.jump_type_ = TimeJump::ClockChange_t::ROS_TIME_NO_CHANGE;
} else {
jump.jump_type_ = TimeJump::ClockChange_t::ROS_TIME_DEACTIVATED;
}
} else if (!clock->ros_time_is_active()) {
if (set_ros_time_enabled) {
jump.jump_type_ = TimeJump::ClockChange_t::ROS_TIME_ACTIVATED;
} else {
jump.jump_type_ = TimeJump::ClockChange_t::SYSTEM_TIME_NO_CHANGE;
}
}

if (jump.jump_type_ == TimeJump::ClockChange_t::SYSTEM_TIME_NO_CHANGE) {
// No change/no updates don't act.
return;
}

auto active_callbacks = clock->get_triggered_callback_handlers(jump);
clock->invoke_prejump_callbacks(active_callbacks);

// Do change
if (jump.jump_type_ == TimeJump::ClockChange_t::ROS_TIME_DEACTIVATED) {
if (!set_ros_time_enabled && clock->ros_time_is_active()) {
disable_ros_time(clock);
} else if (jump.jump_type_ == TimeJump::ClockChange_t::ROS_TIME_ACTIVATED) {
} else if (set_ros_time_enabled && !clock->ros_time_is_active()) {
enable_ros_time(clock);
}

if (jump.jump_type_ == TimeJump::ClockChange_t::ROS_TIME_ACTIVATED ||
jump.jump_type_ == TimeJump::ClockChange_t::ROS_TIME_NO_CHANGE)
{
auto ret = rcl_set_ros_time_override(&(clock->rcl_clock_), msg_time.nanoseconds());
if (ret != RCL_RET_OK) {
rclcpp::exceptions::throw_from_rcl_error(
ret, "Failed to set ros_time_override_status");
}
auto ret = rcl_set_ros_time_override(&(clock->rcl_clock_), rclcpp::Time(*msg).nanoseconds());
if (ret != RCL_RET_OK) {
rclcpp::exceptions::throw_from_rcl_error(
ret, "Failed to set ros_time_override_status");
}
// Post change callbacks
clock->invoke_postjump_callbacks(active_callbacks, jump);
}

void TimeSource::clock_cb(const rosgraph_msgs::msg::Clock::SharedPtr msg)
Expand Down
22 changes: 11 additions & 11 deletions rclcpp/test/test_time_source.cpp
Expand Up @@ -141,19 +141,19 @@ class CallbackObject
void pre_callback(int id) {last_precallback_id_ = id;}

int last_postcallback_id_;
rclcpp::TimeJump last_timejump_;
void post_callback(const rclcpp::TimeJump & jump, int id)
rcl_time_jump_t last_timejump_;
void post_callback(const rcl_time_jump_t & jump, int id)
{
last_postcallback_id_ = id; last_timejump_ = jump;
}
};

TEST_F(TestTimeSource, callbacks) {
CallbackObject cbo;
rclcpp::JumpThreshold jump_threshold;
jump_threshold.min_forward_ = 0;
jump_threshold.min_backward_ = 0;
jump_threshold.on_clock_change_ = true;
rcl_jump_threshold_t jump_threshold;
jump_threshold.min_forward.nanoseconds = 0;
jump_threshold.min_backward.nanoseconds = 0;
jump_threshold.on_clock_change = true;

rclcpp::TimeSource ts(node);
auto ros_clock = std::make_shared<rclcpp::Clock>(RCL_ROS_TIME);
Expand Down Expand Up @@ -212,7 +212,7 @@ TEST_F(TestTimeSource, callbacks) {
// Register a callback handler with only pre_callback
rclcpp::JumpHandler::SharedPtr callback_handler3 = ros_clock->create_jump_callback(
std::bind(&CallbackObject::pre_callback, &cbo, 3),
std::function<void(rclcpp::TimeJump)>(),
std::function<void(rcl_time_jump_t)>(),
jump_threshold);

trigger_clock_changes(node);
Expand All @@ -233,10 +233,10 @@ TEST_F(TestTimeSource, callbacks) {

TEST_F(TestTimeSource, callback_handler_erasure) {
CallbackObject cbo;
rclcpp::JumpThreshold jump_threshold;
jump_threshold.min_forward_ = 0;
jump_threshold.min_backward_ = 0;
jump_threshold.on_clock_change_ = true;
rcl_jump_threshold_t jump_threshold;
jump_threshold.min_forward.nanoseconds = 0;
jump_threshold.min_backward.nanoseconds = 0;
jump_threshold.on_clock_change = true;

rclcpp::TimeSource ts(node);
auto ros_clock = std::make_shared<rclcpp::Clock>(RCL_ROS_TIME);
Expand Down

0 comments on commit 9b55985

Please sign in to comment.