Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add callbacks for PlayerClock::jump(time_point) API with CI fix #779

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 96 additions & 2 deletions rosbag2_cpp/include/rosbag2_cpp/clocks/player_clock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef ROSBAG2_CPP__CLOCKS__PLAYER_CLOCK_HPP_
#define ROSBAG2_CPP__CLOCKS__PLAYER_CLOCK_HPP_

#include <atomic>
#include <chrono>
#include <functional>
#include <memory>
Expand All @@ -35,6 +36,76 @@ namespace rosbag2_cpp
class PlayerClock
{
public:
class JumpHandler final
{
public:
using SharedPtr = std::shared_ptr<JumpHandler>;
using pre_callback_t = std::function<void ()>;
using post_callback_t = std::function<void (const rcl_time_jump_t &)>;

JumpHandler() = delete;
JumpHandler(const JumpHandler &) = delete;
const JumpHandler & operator=(const JumpHandler &) = delete;

/**
* \brief Creates JumpHandler object with callbacks for jump operation.
* \param pre_callback Pre-time jump callback. Must be non-throwing.
* \param post_callback Post-time jump callback. Must be non-throwing.
* \param threshold Callbacks will be triggered if the time jump is greater then the threshold.
* \note These callback functions must remain valid as long as the returned shared pointer is
* valid.
* \return Shared pointer to the newly created JumpHandler object.
* \throws std::bad_alloc if the allocation of the JumpHandler fails.
* \throws std::invalid argument if any of the provided callbacks are nullptr.
*/
ROSBAG2_CPP_PUBLIC
static SharedPtr create(
const pre_callback_t & pre_callback, const post_callback_t & post_callback,
const rcl_jump_threshold_t & threshold)
{
JumpHandler::SharedPtr handler(new JumpHandler(pre_callback, post_callback, threshold));
if (handler == nullptr) {
throw std::bad_alloc{};
}
return handler;
}

/**
* \brief Equal operator for PlayerClock::JumpHandler class.
* \param right Right side operand for equal comparison operation.
* \return true if internal id's of two JumpHandlers match, otherwise false.
*/
ROSBAG2_CPP_PUBLIC
bool operator==(const PlayerClock::JumpHandler & right) const
{
return id == right.id;
}

const pre_callback_t pre_callback;
const post_callback_t post_callback;
const rcl_jump_threshold_t notice_threshold;

private:
uint64_t id;

JumpHandler(
const pre_callback_t & pre_callback, const post_callback_t & post_callback,
const rcl_jump_threshold_t & threshold)
: pre_callback(pre_callback), post_callback(post_callback), notice_threshold(threshold)
{
if (pre_callback == nullptr && post_callback == nullptr) {
throw std::invalid_argument("At least one callback for JumpHandler should be not nullptr");
}
id = create_id();
}

static uint64_t create_id()
{
static std::atomic<uint64_t> id_count{0};
return id_count.fetch_add(1, std::memory_order_relaxed) + 1;
}
};

/**
* Type representing an arbitrary steady time, used to measure real-time durations
* This type is never exposed by the PlayerClock - it is only used as input to the PlayerClock.
Expand Down Expand Up @@ -101,16 +172,39 @@ class PlayerClock
virtual bool is_paused() const = 0;

/**
* Change the current ROS time to an arbitrary time.
* \brief Change the current ROS time to an arbitrary time.
* \note This will wake any waiting `sleep_until` and trigger any registered JumpHandler
* callbacks.
* \param time_point Time point in ROS playback timeline.
*/
ROSBAG2_CPP_PUBLIC
virtual void jump(rcutils_time_point_value_t) = 0;
virtual void jump(rcutils_time_point_value_t time_point) = 0;

/**
* \sa jump
*/
ROSBAG2_CPP_PUBLIC
virtual void jump(rclcpp::Time time) = 0;

/**
* \brief Add callbacks to be called when a time jump exceeds a threshold.
* \details Callbacks specified in JumpHandler object will be called in two cases:
* 1. use_sim_time is true: if the external time source jumps back in time, or forward
* farther than the threshold.
* 2. use_sim_time is false: if jump(time_point) is called and time jumps back or forward
* farther than the threshold.
* \param handler Shared pointer to the JumpHandler object returned from JumpHandler::create(..)
* \throws std::invalid argument if jump threshold has invalid value.
*/
ROSBAG2_CPP_PUBLIC
virtual void add_jump_calbacks(PlayerClock::JumpHandler::SharedPtr handler) = 0;

/**
* \brief remove jump callbacks from processing list.
* \param handler Shared pointer to the JumpHandler object returned from JumpHandler::create(..)
*/
ROSBAG2_CPP_PUBLIC
virtual void remove_jump_callbacks(PlayerClock::JumpHandler::SharedPtr handler) = 0;
};

} // namespace rosbag2_cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ class TimeControllerClock : public PlayerClock

/**
* Change the current ROS time to an arbitrary time.
* This will wake any waiting `sleep_until`.
* Note: this initial implementation does not have any jump-callback handling.
* The Player should not use this method while its queues are active ("during playback")
* \note This will wake any waiting `sleep_until` and trigger any registered JumpHandler
* callbacks.
* \note The Player should not use this method while its queues are active ("during playback")
* as an arbitrary time jump can make those queues, and the state of the Reader/Storage, invalid
* The current Player implementation should only use this method in between calls to `play`,
* to reset the initial time of the clock.
Expand All @@ -124,6 +124,26 @@ class TimeControllerClock : public PlayerClock
ROSBAG2_CPP_PUBLIC
void jump(rclcpp::Time ros_time) override;

/**
* \brief Add callbacks to be called when a time jump exceeds a threshold.
* \details Callbacks specified in JumpHandler object will be called in two cases:
* 1. use_sim_time is true: if the external time source jumps back in time, or forward
* farther than the threshold.
* 2. use_sim_time is false: if jump(time_point) is called and time jumps back or forward
* farther than the threshold.
* \param handler Shared pointer to the JumpHandler object returned from JumpHandler::create(..)
* \throws std::invalid argument if jump threshold has invalid value.
*/
ROSBAG2_CPP_PUBLIC
void add_jump_calbacks(PlayerClock::JumpHandler::SharedPtr handler) override;

/**
* \brief remove jump callbacks from processing list.
* \param handler Shared pointer to the JumpHandler object returned from JumpHandler::create(..)
*/
ROSBAG2_CPP_PUBLIC
void remove_jump_callbacks(PlayerClock::JumpHandler::SharedPtr handler) override;

private:
std::unique_ptr<TimeControllerClockImpl> impl_;
};
Expand Down
128 changes: 123 additions & 5 deletions rosbag2_cpp/src/rosbag2_cpp/clocks/time_controller_clock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <memory>
#include <mutex>
#include <thread>
#include <vector>

#include "rcpputils/thread_safety_annotations.hpp"
#include "rosbag2_cpp/clocks/time_controller_clock.hpp"
Expand Down Expand Up @@ -114,6 +115,72 @@ class TimeControllerClockImpl
snapshot(ros_now());
}

/**
* \brief Adjust internal clock to the specified timestamp.
* \details It will change the current internally maintained offset so that next published time
* is different.
* \note Will trigger any registered JumpHandler callbacks.
* \param ros_time Time point in ROS playback timeline.
*/
void jump(rcutils_time_point_value_t ros_time)
{
rcl_duration_t time_jump_delta;
{
std::lock_guard<std::mutex> lock(state_mutex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since ROS 2 supports C++ 17, we now have access to the std::scoped_lock class. Should we use that here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We haven't yet changed rosbag2 to target C++17 - this is an upgrade that could be done in a new PR if we're inclined

time_jump_delta.nanoseconds = ros_time - steady_to_ros(now_fn());
}

rcl_time_jump_t time_jump{};
time_jump.clock_change = RCL_ROS_TIME_NO_CHANGE;
time_jump.delta = time_jump_delta;

process_callbacks_before_jump(time_jump);
{
std::lock_guard<std::mutex> lock(state_mutex);
snapshot(ros_time);
}
process_callbacks_after_jump(time_jump);
cv.notify_all();
}

/**
* \brief Add callbacks to be called when a time jump exceeds a threshold.
* \param handler Shared pointer to the JumpHandler object returned from JumpHandler::create(..)
* \throws std::invalid argument if jump threshold has invalid value.
*/
void add_jump_callbacks(PlayerClock::JumpHandler::SharedPtr handler)
{
if (handler->notice_threshold.min_forward.nanoseconds < 0) {
throw std::invalid_argument("forward jump threshold must be positive or zero");
}
if (handler->notice_threshold.min_backward.nanoseconds > 0) {
throw std::invalid_argument("backward jump threshold must be negative or zero");
}

std::lock_guard<std::mutex> lock(callback_list_mutex_);
for (auto const & registered_handler : callback_list_) {
if (*registered_handler == *handler) {
return; // Already have this callback in the list.
}
}
callback_list_.push_back(handler);
}

/**
* \brief remove jump callbacks from processing list.
* \param handler Shared pointer to the JumpHandler object returned from JumpHandler::create(..)
*/
void remove_jump_callbacks(PlayerClock::JumpHandler::SharedPtr handler)
{
std::lock_guard<std::mutex> lock(callback_list_mutex_);
for (auto it = callback_list_.begin(); it != callback_list_.end(); ++it) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function name and comment make it sound like this will remove multiple jump callbacks from the processing list. However, the code only removes one. Should the comment/function name change to make it more clear that we're removing one callback?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A JumpHandler contains multiple callback functions. Maybe remove_jump_handler would be more clear - but the language is consistent

if (**it == *handler) {
callback_list_.erase(it);
return;
}
}
}

const PlayerClock::NowFunction now_fn;
const std::chrono::milliseconds sleep_time_while_paused;

Expand All @@ -122,6 +189,49 @@ class TimeControllerClockImpl
double rate RCPPUTILS_TSA_GUARDED_BY(state_mutex) = 1.0;
bool paused RCPPUTILS_TSA_GUARDED_BY(state_mutex) = false;
TimeReference reference RCPPUTILS_TSA_GUARDED_BY(state_mutex);

private:
std::mutex callback_list_mutex_;
std::vector<PlayerClock::JumpHandler::SharedPtr> callback_list_
RCPPUTILS_TSA_GUARDED_BY(callback_list_mutex_);

void process_callbacks_before_jump(const rcl_time_jump_t & time_jump)
{
std::lock_guard<std::mutex> lock(callback_list_mutex_);
for (auto const & handler : callback_list_) {
process_callback(handler, time_jump, true);
}
}

void process_callbacks_after_jump(const rcl_time_jump_t & time_jump)
{
std::lock_guard<std::mutex> lock(callback_list_mutex_);
for (auto const & handler : callback_list_) {
process_callback(handler, time_jump, false);
}
}

static void process_callback(
PlayerClock::JumpHandler::SharedPtr handler, const rcl_time_jump_t & time_jump,
bool before_jump) RCPPUTILS_TSA_REQUIRES(callback_list_mutex_)
{
bool is_clock_change = time_jump.clock_change == RCL_ROS_TIME_ACTIVATED ||
time_jump.clock_change == RCL_ROS_TIME_DEACTIVATED;
if ((is_clock_change && handler->notice_threshold.on_clock_change) ||
(handler->notice_threshold.min_backward.nanoseconds != 0 &&
time_jump.delta.nanoseconds < 0 &&
time_jump.delta.nanoseconds <= handler->notice_threshold.min_backward.nanoseconds) ||
(handler->notice_threshold.min_forward.nanoseconds != 0 &&
time_jump.delta.nanoseconds > 0 &&
time_jump.delta.nanoseconds >= handler->notice_threshold.min_forward.nanoseconds))
{
if (before_jump && handler->pre_callback) {
handler->pre_callback();
} else if (!before_jump && handler->post_callback) {
handler->post_callback(time_jump);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no else statement here. If this is intentional, can we add a comment stating that we intend to do nothing?

}
}
};

TimeControllerClock::TimeControllerClock(
Expand Down Expand Up @@ -186,6 +296,16 @@ bool TimeControllerClock::set_rate(double rate)
return true;
}

void TimeControllerClock::add_jump_calbacks(PlayerClock::JumpHandler::SharedPtr handler)
{
impl_->add_jump_callbacks(handler);
}

void TimeControllerClock::remove_jump_callbacks(PlayerClock::JumpHandler::SharedPtr handler)
{
impl_->remove_jump_callbacks(handler);
}

double TimeControllerClock::get_rate() const
{
std::lock_guard<std::mutex> lock(impl_->state_mutex);
Expand All @@ -198,7 +318,7 @@ void TimeControllerClock::pause()
if (impl_->paused) {
return;
}
// Take snaphot before changing state
// Take snapshot before changing state
impl_->snapshot();
impl_->paused = true;
impl_->cv.notify_all();
Expand All @@ -210,7 +330,7 @@ void TimeControllerClock::resume()
if (!impl_->paused) {
return;
}
// Take snaphot before changing state
// Take snapshot before changing state
impl_->snapshot();
impl_->paused = false;
impl_->cv.notify_all();
Expand All @@ -224,9 +344,7 @@ bool TimeControllerClock::is_paused() const

void TimeControllerClock::jump(rcutils_time_point_value_t ros_time)
{
std::lock_guard<std::mutex> lock(impl_->state_mutex);
impl_->snapshot(ros_time);
impl_->cv.notify_all();
impl_->jump(ros_time);
}

void TimeControllerClock::jump(rclcpp::Time ros_time)
Expand Down
Loading