Skip to content

Commit

Permalink
Add callbacks for PlayerClock::jump(time_point) API with CI fix (#779)
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
  • Loading branch information
MichaelOrlov committed Jun 15, 2021
1 parent 758d96b commit 6f7503e
Show file tree
Hide file tree
Showing 4 changed files with 504 additions and 11 deletions.
98 changes: 96 additions & 2 deletions rosbag2_cpp/include/rosbag2_cpp/clocks/player_clock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef ROSBAG2_CPP__CLOCKS__PLAYER_CLOCK_HPP_
#define ROSBAG2_CPP__CLOCKS__PLAYER_CLOCK_HPP_

#include <atomic>
#include <chrono>
#include <functional>
#include <memory>
Expand All @@ -35,6 +36,76 @@ namespace rosbag2_cpp
class PlayerClock
{
public:
class JumpHandler final
{
public:
using SharedPtr = std::shared_ptr<JumpHandler>;
using pre_callback_t = std::function<void ()>;
using post_callback_t = std::function<void (const rcl_time_jump_t &)>;

JumpHandler() = delete;
JumpHandler(const JumpHandler &) = delete;
const JumpHandler & operator=(const JumpHandler &) = delete;

/**
* \brief Creates JumpHandler object with callbacks for jump operation.
* \param pre_callback Pre-time jump callback. Must be non-throwing.
* \param post_callback Post-time jump callback. Must be non-throwing.
* \param threshold Callbacks will be triggered if the time jump is greater then the threshold.
* \note These callback functions must remain valid as long as the returned shared pointer is
* valid.
* \return Shared pointer to the newly created JumpHandler object.
* \throws std::bad_alloc if the allocation of the JumpHandler fails.
* \throws std::invalid argument if any of the provided callbacks are nullptr.
*/
ROSBAG2_CPP_PUBLIC
static SharedPtr create(
const pre_callback_t & pre_callback, const post_callback_t & post_callback,
const rcl_jump_threshold_t & threshold)
{
JumpHandler::SharedPtr handler(new JumpHandler(pre_callback, post_callback, threshold));
if (handler == nullptr) {
throw std::bad_alloc{};
}
return handler;
}

/**
* \brief Equal operator for PlayerClock::JumpHandler class.
* \param right Right side operand for equal comparison operation.
* \return true if internal id's of two JumpHandlers match, otherwise false.
*/
ROSBAG2_CPP_PUBLIC
bool operator==(const PlayerClock::JumpHandler & right) const
{
return id == right.id;
}

const pre_callback_t pre_callback;
const post_callback_t post_callback;
const rcl_jump_threshold_t notice_threshold;

private:
uint64_t id;

JumpHandler(
const pre_callback_t & pre_callback, const post_callback_t & post_callback,
const rcl_jump_threshold_t & threshold)
: pre_callback(pre_callback), post_callback(post_callback), notice_threshold(threshold)
{
if (pre_callback == nullptr && post_callback == nullptr) {
throw std::invalid_argument("At least one callback for JumpHandler should be not nullptr");
}
id = create_id();
}

static uint64_t create_id()
{
static std::atomic<uint64_t> id_count{0};
return id_count.fetch_add(1, std::memory_order_relaxed) + 1;
}
};

/**
* Type representing an arbitrary steady time, used to measure real-time durations
* This type is never exposed by the PlayerClock - it is only used as input to the PlayerClock.
Expand Down Expand Up @@ -101,16 +172,39 @@ class PlayerClock
virtual bool is_paused() const = 0;

/**
* Change the current ROS time to an arbitrary time.
* \brief Change the current ROS time to an arbitrary time.
* \note This will wake any waiting `sleep_until` and trigger any registered JumpHandler
* callbacks.
* \param time_point Time point in ROS playback timeline.
*/
ROSBAG2_CPP_PUBLIC
virtual void jump(rcutils_time_point_value_t) = 0;
virtual void jump(rcutils_time_point_value_t time_point) = 0;

/**
* \sa jump
*/
ROSBAG2_CPP_PUBLIC
virtual void jump(rclcpp::Time time) = 0;

/**
* \brief Add callbacks to be called when a time jump exceeds a threshold.
* \details Callbacks specified in JumpHandler object will be called in two cases:
* 1. use_sim_time is true: if the external time source jumps back in time, or forward
* farther than the threshold.
* 2. use_sim_time is false: if jump(time_point) is called and time jumps back or forward
* farther than the threshold.
* \param handler Shared pointer to the JumpHandler object returned from JumpHandler::create(..)
* \throws std::invalid argument if jump threshold has invalid value.
*/
ROSBAG2_CPP_PUBLIC
virtual void add_jump_calbacks(PlayerClock::JumpHandler::SharedPtr handler) = 0;

/**
* \brief remove jump callbacks from processing list.
* \param handler Shared pointer to the JumpHandler object returned from JumpHandler::create(..)
*/
ROSBAG2_CPP_PUBLIC
virtual void remove_jump_callbacks(PlayerClock::JumpHandler::SharedPtr handler) = 0;
};

} // namespace rosbag2_cpp
Expand Down
26 changes: 23 additions & 3 deletions rosbag2_cpp/include/rosbag2_cpp/clocks/time_controller_clock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ class TimeControllerClock : public PlayerClock

/**
* Change the current ROS time to an arbitrary time.
* This will wake any waiting `sleep_until`.
* Note: this initial implementation does not have any jump-callback handling.
* The Player should not use this method while its queues are active ("during playback")
* \note This will wake any waiting `sleep_until` and trigger any registered JumpHandler
* callbacks.
* \note The Player should not use this method while its queues are active ("during playback")
* as an arbitrary time jump can make those queues, and the state of the Reader/Storage, invalid
* The current Player implementation should only use this method in between calls to `play`,
* to reset the initial time of the clock.
Expand All @@ -124,6 +124,26 @@ class TimeControllerClock : public PlayerClock
ROSBAG2_CPP_PUBLIC
void jump(rclcpp::Time ros_time) override;

/**
* \brief Add callbacks to be called when a time jump exceeds a threshold.
* \details Callbacks specified in JumpHandler object will be called in two cases:
* 1. use_sim_time is true: if the external time source jumps back in time, or forward
* farther than the threshold.
* 2. use_sim_time is false: if jump(time_point) is called and time jumps back or forward
* farther than the threshold.
* \param handler Shared pointer to the JumpHandler object returned from JumpHandler::create(..)
* \throws std::invalid argument if jump threshold has invalid value.
*/
ROSBAG2_CPP_PUBLIC
void add_jump_calbacks(PlayerClock::JumpHandler::SharedPtr handler) override;

/**
* \brief remove jump callbacks from processing list.
* \param handler Shared pointer to the JumpHandler object returned from JumpHandler::create(..)
*/
ROSBAG2_CPP_PUBLIC
void remove_jump_callbacks(PlayerClock::JumpHandler::SharedPtr handler) override;

private:
std::unique_ptr<TimeControllerClockImpl> impl_;
};
Expand Down
128 changes: 123 additions & 5 deletions rosbag2_cpp/src/rosbag2_cpp/clocks/time_controller_clock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <memory>
#include <mutex>
#include <thread>
#include <vector>

#include "rcpputils/thread_safety_annotations.hpp"
#include "rosbag2_cpp/clocks/time_controller_clock.hpp"
Expand Down Expand Up @@ -114,6 +115,72 @@ class TimeControllerClockImpl
snapshot(ros_now());
}

/**
* \brief Adjust internal clock to the specified timestamp.
* \details It will change the current internally maintained offset so that next published time
* is different.
* \note Will trigger any registered JumpHandler callbacks.
* \param ros_time Time point in ROS playback timeline.
*/
void jump(rcutils_time_point_value_t ros_time)
{
rcl_duration_t time_jump_delta;
{
std::lock_guard<std::mutex> lock(state_mutex);
time_jump_delta.nanoseconds = ros_time - steady_to_ros(now_fn());
}

rcl_time_jump_t time_jump{};
time_jump.clock_change = RCL_ROS_TIME_NO_CHANGE;
time_jump.delta = time_jump_delta;

process_callbacks_before_jump(time_jump);
{
std::lock_guard<std::mutex> lock(state_mutex);
snapshot(ros_time);
}
process_callbacks_after_jump(time_jump);
cv.notify_all();
}

/**
* \brief Add callbacks to be called when a time jump exceeds a threshold.
* \param handler Shared pointer to the JumpHandler object returned from JumpHandler::create(..)
* \throws std::invalid argument if jump threshold has invalid value.
*/
void add_jump_callbacks(PlayerClock::JumpHandler::SharedPtr handler)
{
if (handler->notice_threshold.min_forward.nanoseconds < 0) {
throw std::invalid_argument("forward jump threshold must be positive or zero");
}
if (handler->notice_threshold.min_backward.nanoseconds > 0) {
throw std::invalid_argument("backward jump threshold must be negative or zero");
}

std::lock_guard<std::mutex> lock(callback_list_mutex_);
for (auto const & registered_handler : callback_list_) {
if (*registered_handler == *handler) {
return; // Already have this callback in the list.
}
}
callback_list_.push_back(handler);
}

/**
* \brief remove jump callbacks from processing list.
* \param handler Shared pointer to the JumpHandler object returned from JumpHandler::create(..)
*/
void remove_jump_callbacks(PlayerClock::JumpHandler::SharedPtr handler)
{
std::lock_guard<std::mutex> lock(callback_list_mutex_);
for (auto it = callback_list_.begin(); it != callback_list_.end(); ++it) {
if (**it == *handler) {
callback_list_.erase(it);
return;
}
}
}

const PlayerClock::NowFunction now_fn;
const std::chrono::milliseconds sleep_time_while_paused;

Expand All @@ -122,6 +189,49 @@ class TimeControllerClockImpl
double rate RCPPUTILS_TSA_GUARDED_BY(state_mutex) = 1.0;
bool paused RCPPUTILS_TSA_GUARDED_BY(state_mutex) = false;
TimeReference reference RCPPUTILS_TSA_GUARDED_BY(state_mutex);

private:
std::mutex callback_list_mutex_;
std::vector<PlayerClock::JumpHandler::SharedPtr> callback_list_
RCPPUTILS_TSA_GUARDED_BY(callback_list_mutex_);

void process_callbacks_before_jump(const rcl_time_jump_t & time_jump)
{
std::lock_guard<std::mutex> lock(callback_list_mutex_);
for (auto const & handler : callback_list_) {
process_callback(handler, time_jump, true);
}
}

void process_callbacks_after_jump(const rcl_time_jump_t & time_jump)
{
std::lock_guard<std::mutex> lock(callback_list_mutex_);
for (auto const & handler : callback_list_) {
process_callback(handler, time_jump, false);
}
}

static void process_callback(
PlayerClock::JumpHandler::SharedPtr handler, const rcl_time_jump_t & time_jump,
bool before_jump) RCPPUTILS_TSA_REQUIRES(callback_list_mutex_)
{
bool is_clock_change = time_jump.clock_change == RCL_ROS_TIME_ACTIVATED ||
time_jump.clock_change == RCL_ROS_TIME_DEACTIVATED;
if ((is_clock_change && handler->notice_threshold.on_clock_change) ||
(handler->notice_threshold.min_backward.nanoseconds != 0 &&
time_jump.delta.nanoseconds < 0 &&
time_jump.delta.nanoseconds <= handler->notice_threshold.min_backward.nanoseconds) ||
(handler->notice_threshold.min_forward.nanoseconds != 0 &&
time_jump.delta.nanoseconds > 0 &&
time_jump.delta.nanoseconds >= handler->notice_threshold.min_forward.nanoseconds))
{
if (before_jump && handler->pre_callback) {
handler->pre_callback();
} else if (!before_jump && handler->post_callback) {
handler->post_callback(time_jump);
}
}
}
};

TimeControllerClock::TimeControllerClock(
Expand Down Expand Up @@ -186,6 +296,16 @@ bool TimeControllerClock::set_rate(double rate)
return true;
}

void TimeControllerClock::add_jump_calbacks(PlayerClock::JumpHandler::SharedPtr handler)
{
impl_->add_jump_callbacks(handler);
}

void TimeControllerClock::remove_jump_callbacks(PlayerClock::JumpHandler::SharedPtr handler)
{
impl_->remove_jump_callbacks(handler);
}

double TimeControllerClock::get_rate() const
{
std::lock_guard<std::mutex> lock(impl_->state_mutex);
Expand All @@ -198,7 +318,7 @@ void TimeControllerClock::pause()
if (impl_->paused) {
return;
}
// Take snaphot before changing state
// Take snapshot before changing state
impl_->snapshot();
impl_->paused = true;
impl_->cv.notify_all();
Expand All @@ -210,7 +330,7 @@ void TimeControllerClock::resume()
if (!impl_->paused) {
return;
}
// Take snaphot before changing state
// Take snapshot before changing state
impl_->snapshot();
impl_->paused = false;
impl_->cv.notify_all();
Expand All @@ -224,9 +344,7 @@ bool TimeControllerClock::is_paused() const

void TimeControllerClock::jump(rcutils_time_point_value_t ros_time)
{
std::lock_guard<std::mutex> lock(impl_->state_mutex);
impl_->snapshot(ros_time);
impl_->cv.notify_all();
impl_->jump(ros_time);
}

void TimeControllerClock::jump(rclcpp::Time ros_time)
Expand Down
Loading

0 comments on commit 6f7503e

Please sign in to comment.