From 6f7503eafede497bda4f04c8e4b2de798b9cca99 Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Mon, 14 Jun 2021 17:00:51 -0700 Subject: [PATCH] Add callbacks for PlayerClock::jump(time_point) API with CI fix (#779) Signed-off-by: Michael Orlov --- .../rosbag2_cpp/clocks/player_clock.hpp | 98 ++++++- .../clocks/time_controller_clock.hpp | 26 +- .../clocks/time_controller_clock.cpp | 128 ++++++++- .../test_time_controller_clock.cpp | 263 +++++++++++++++++- 4 files changed, 504 insertions(+), 11 deletions(-) diff --git a/rosbag2_cpp/include/rosbag2_cpp/clocks/player_clock.hpp b/rosbag2_cpp/include/rosbag2_cpp/clocks/player_clock.hpp index 64d3797679..4bdb874196 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/clocks/player_clock.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/clocks/player_clock.hpp @@ -15,6 +15,7 @@ #ifndef ROSBAG2_CPP__CLOCKS__PLAYER_CLOCK_HPP_ #define ROSBAG2_CPP__CLOCKS__PLAYER_CLOCK_HPP_ +#include #include #include #include @@ -35,6 +36,76 @@ namespace rosbag2_cpp class PlayerClock { public: + class JumpHandler final + { +public: + using SharedPtr = std::shared_ptr; + using pre_callback_t = std::function; + using post_callback_t = std::function; + + JumpHandler() = delete; + JumpHandler(const JumpHandler &) = delete; + const JumpHandler & operator=(const JumpHandler &) = delete; + + /** + * \brief Creates JumpHandler object with callbacks for jump operation. + * \param pre_callback Pre-time jump callback. Must be non-throwing. + * \param post_callback Post-time jump callback. Must be non-throwing. + * \param threshold Callbacks will be triggered if the time jump is greater then the threshold. + * \note These callback functions must remain valid as long as the returned shared pointer is + * valid. + * \return Shared pointer to the newly created JumpHandler object. + * \throws std::bad_alloc if the allocation of the JumpHandler fails. + * \throws std::invalid argument if any of the provided callbacks are nullptr. + */ + ROSBAG2_CPP_PUBLIC + static SharedPtr create( + const pre_callback_t & pre_callback, const post_callback_t & post_callback, + const rcl_jump_threshold_t & threshold) + { + JumpHandler::SharedPtr handler(new JumpHandler(pre_callback, post_callback, threshold)); + if (handler == nullptr) { + throw std::bad_alloc{}; + } + return handler; + } + + /** + * \brief Equal operator for PlayerClock::JumpHandler class. + * \param right Right side operand for equal comparison operation. + * \return true if internal id's of two JumpHandlers match, otherwise false. + */ + ROSBAG2_CPP_PUBLIC + bool operator==(const PlayerClock::JumpHandler & right) const + { + return id == right.id; + } + + const pre_callback_t pre_callback; + const post_callback_t post_callback; + const rcl_jump_threshold_t notice_threshold; + +private: + uint64_t id; + + JumpHandler( + const pre_callback_t & pre_callback, const post_callback_t & post_callback, + const rcl_jump_threshold_t & threshold) + : pre_callback(pre_callback), post_callback(post_callback), notice_threshold(threshold) + { + if (pre_callback == nullptr && post_callback == nullptr) { + throw std::invalid_argument("At least one callback for JumpHandler should be not nullptr"); + } + id = create_id(); + } + + static uint64_t create_id() + { + static std::atomic id_count{0}; + return id_count.fetch_add(1, std::memory_order_relaxed) + 1; + } + }; + /** * Type representing an arbitrary steady time, used to measure real-time durations * This type is never exposed by the PlayerClock - it is only used as input to the PlayerClock. @@ -101,16 +172,39 @@ class PlayerClock virtual bool is_paused() const = 0; /** - * Change the current ROS time to an arbitrary time. + * \brief Change the current ROS time to an arbitrary time. + * \note This will wake any waiting `sleep_until` and trigger any registered JumpHandler + * callbacks. + * \param time_point Time point in ROS playback timeline. */ ROSBAG2_CPP_PUBLIC - virtual void jump(rcutils_time_point_value_t) = 0; + virtual void jump(rcutils_time_point_value_t time_point) = 0; /** * \sa jump */ ROSBAG2_CPP_PUBLIC virtual void jump(rclcpp::Time time) = 0; + + /** + * \brief Add callbacks to be called when a time jump exceeds a threshold. + * \details Callbacks specified in JumpHandler object will be called in two cases: + * 1. use_sim_time is true: if the external time source jumps back in time, or forward + * farther than the threshold. + * 2. use_sim_time is false: if jump(time_point) is called and time jumps back or forward + * farther than the threshold. + * \param handler Shared pointer to the JumpHandler object returned from JumpHandler::create(..) + * \throws std::invalid argument if jump threshold has invalid value. + */ + ROSBAG2_CPP_PUBLIC + virtual void add_jump_calbacks(PlayerClock::JumpHandler::SharedPtr handler) = 0; + + /** + * \brief remove jump callbacks from processing list. + * \param handler Shared pointer to the JumpHandler object returned from JumpHandler::create(..) + */ + ROSBAG2_CPP_PUBLIC + virtual void remove_jump_callbacks(PlayerClock::JumpHandler::SharedPtr handler) = 0; }; } // namespace rosbag2_cpp diff --git a/rosbag2_cpp/include/rosbag2_cpp/clocks/time_controller_clock.hpp b/rosbag2_cpp/include/rosbag2_cpp/clocks/time_controller_clock.hpp index e74c4d5f39..1a737b151b 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/clocks/time_controller_clock.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/clocks/time_controller_clock.hpp @@ -109,9 +109,9 @@ class TimeControllerClock : public PlayerClock /** * Change the current ROS time to an arbitrary time. - * This will wake any waiting `sleep_until`. - * Note: this initial implementation does not have any jump-callback handling. - * The Player should not use this method while its queues are active ("during playback") + * \note This will wake any waiting `sleep_until` and trigger any registered JumpHandler + * callbacks. + * \note The Player should not use this method while its queues are active ("during playback") * as an arbitrary time jump can make those queues, and the state of the Reader/Storage, invalid * The current Player implementation should only use this method in between calls to `play`, * to reset the initial time of the clock. @@ -124,6 +124,26 @@ class TimeControllerClock : public PlayerClock ROSBAG2_CPP_PUBLIC void jump(rclcpp::Time ros_time) override; + /** + * \brief Add callbacks to be called when a time jump exceeds a threshold. + * \details Callbacks specified in JumpHandler object will be called in two cases: + * 1. use_sim_time is true: if the external time source jumps back in time, or forward + * farther than the threshold. + * 2. use_sim_time is false: if jump(time_point) is called and time jumps back or forward + * farther than the threshold. + * \param handler Shared pointer to the JumpHandler object returned from JumpHandler::create(..) + * \throws std::invalid argument if jump threshold has invalid value. + */ + ROSBAG2_CPP_PUBLIC + void add_jump_calbacks(PlayerClock::JumpHandler::SharedPtr handler) override; + + /** + * \brief remove jump callbacks from processing list. + * \param handler Shared pointer to the JumpHandler object returned from JumpHandler::create(..) + */ + ROSBAG2_CPP_PUBLIC + void remove_jump_callbacks(PlayerClock::JumpHandler::SharedPtr handler) override; + private: std::unique_ptr impl_; }; diff --git a/rosbag2_cpp/src/rosbag2_cpp/clocks/time_controller_clock.cpp b/rosbag2_cpp/src/rosbag2_cpp/clocks/time_controller_clock.cpp index 9dbe59bfe3..27bbde1ac9 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/clocks/time_controller_clock.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/clocks/time_controller_clock.cpp @@ -16,6 +16,7 @@ #include #include #include +#include #include "rcpputils/thread_safety_annotations.hpp" #include "rosbag2_cpp/clocks/time_controller_clock.hpp" @@ -114,6 +115,72 @@ class TimeControllerClockImpl snapshot(ros_now()); } + /** + * \brief Adjust internal clock to the specified timestamp. + * \details It will change the current internally maintained offset so that next published time + * is different. + * \note Will trigger any registered JumpHandler callbacks. + * \param ros_time Time point in ROS playback timeline. + */ + void jump(rcutils_time_point_value_t ros_time) + { + rcl_duration_t time_jump_delta; + { + std::lock_guard lock(state_mutex); + time_jump_delta.nanoseconds = ros_time - steady_to_ros(now_fn()); + } + + rcl_time_jump_t time_jump{}; + time_jump.clock_change = RCL_ROS_TIME_NO_CHANGE; + time_jump.delta = time_jump_delta; + + process_callbacks_before_jump(time_jump); + { + std::lock_guard lock(state_mutex); + snapshot(ros_time); + } + process_callbacks_after_jump(time_jump); + cv.notify_all(); + } + + /** + * \brief Add callbacks to be called when a time jump exceeds a threshold. + * \param handler Shared pointer to the JumpHandler object returned from JumpHandler::create(..) + * \throws std::invalid argument if jump threshold has invalid value. + */ + void add_jump_callbacks(PlayerClock::JumpHandler::SharedPtr handler) + { + if (handler->notice_threshold.min_forward.nanoseconds < 0) { + throw std::invalid_argument("forward jump threshold must be positive or zero"); + } + if (handler->notice_threshold.min_backward.nanoseconds > 0) { + throw std::invalid_argument("backward jump threshold must be negative or zero"); + } + + std::lock_guard lock(callback_list_mutex_); + for (auto const & registered_handler : callback_list_) { + if (*registered_handler == *handler) { + return; // Already have this callback in the list. + } + } + callback_list_.push_back(handler); + } + + /** + * \brief remove jump callbacks from processing list. + * \param handler Shared pointer to the JumpHandler object returned from JumpHandler::create(..) + */ + void remove_jump_callbacks(PlayerClock::JumpHandler::SharedPtr handler) + { + std::lock_guard lock(callback_list_mutex_); + for (auto it = callback_list_.begin(); it != callback_list_.end(); ++it) { + if (**it == *handler) { + callback_list_.erase(it); + return; + } + } + } + const PlayerClock::NowFunction now_fn; const std::chrono::milliseconds sleep_time_while_paused; @@ -122,6 +189,49 @@ class TimeControllerClockImpl double rate RCPPUTILS_TSA_GUARDED_BY(state_mutex) = 1.0; bool paused RCPPUTILS_TSA_GUARDED_BY(state_mutex) = false; TimeReference reference RCPPUTILS_TSA_GUARDED_BY(state_mutex); + +private: + std::mutex callback_list_mutex_; + std::vector callback_list_ + RCPPUTILS_TSA_GUARDED_BY(callback_list_mutex_); + + void process_callbacks_before_jump(const rcl_time_jump_t & time_jump) + { + std::lock_guard lock(callback_list_mutex_); + for (auto const & handler : callback_list_) { + process_callback(handler, time_jump, true); + } + } + + void process_callbacks_after_jump(const rcl_time_jump_t & time_jump) + { + std::lock_guard lock(callback_list_mutex_); + for (auto const & handler : callback_list_) { + process_callback(handler, time_jump, false); + } + } + + static void process_callback( + PlayerClock::JumpHandler::SharedPtr handler, const rcl_time_jump_t & time_jump, + bool before_jump) RCPPUTILS_TSA_REQUIRES(callback_list_mutex_) + { + bool is_clock_change = time_jump.clock_change == RCL_ROS_TIME_ACTIVATED || + time_jump.clock_change == RCL_ROS_TIME_DEACTIVATED; + if ((is_clock_change && handler->notice_threshold.on_clock_change) || + (handler->notice_threshold.min_backward.nanoseconds != 0 && + time_jump.delta.nanoseconds < 0 && + time_jump.delta.nanoseconds <= handler->notice_threshold.min_backward.nanoseconds) || + (handler->notice_threshold.min_forward.nanoseconds != 0 && + time_jump.delta.nanoseconds > 0 && + time_jump.delta.nanoseconds >= handler->notice_threshold.min_forward.nanoseconds)) + { + if (before_jump && handler->pre_callback) { + handler->pre_callback(); + } else if (!before_jump && handler->post_callback) { + handler->post_callback(time_jump); + } + } + } }; TimeControllerClock::TimeControllerClock( @@ -186,6 +296,16 @@ bool TimeControllerClock::set_rate(double rate) return true; } +void TimeControllerClock::add_jump_calbacks(PlayerClock::JumpHandler::SharedPtr handler) +{ + impl_->add_jump_callbacks(handler); +} + +void TimeControllerClock::remove_jump_callbacks(PlayerClock::JumpHandler::SharedPtr handler) +{ + impl_->remove_jump_callbacks(handler); +} + double TimeControllerClock::get_rate() const { std::lock_guard lock(impl_->state_mutex); @@ -198,7 +318,7 @@ void TimeControllerClock::pause() if (impl_->paused) { return; } - // Take snaphot before changing state + // Take snapshot before changing state impl_->snapshot(); impl_->paused = true; impl_->cv.notify_all(); @@ -210,7 +330,7 @@ void TimeControllerClock::resume() if (!impl_->paused) { return; } - // Take snaphot before changing state + // Take snapshot before changing state impl_->snapshot(); impl_->paused = false; impl_->cv.notify_all(); @@ -224,9 +344,7 @@ bool TimeControllerClock::is_paused() const void TimeControllerClock::jump(rcutils_time_point_value_t ros_time) { - std::lock_guard lock(impl_->state_mutex); - impl_->snapshot(ros_time); - impl_->cv.notify_all(); + impl_->jump(ros_time); } void TimeControllerClock::jump(rclcpp::Time ros_time) diff --git a/rosbag2_cpp/test/rosbag2_cpp/test_time_controller_clock.cpp b/rosbag2_cpp/test/rosbag2_cpp/test_time_controller_clock.cpp index ed20144c4c..a2603d99fc 100644 --- a/rosbag2_cpp/test/rosbag2_cpp/test_time_controller_clock.cpp +++ b/rosbag2_cpp/test/rosbag2_cpp/test_time_controller_clock.cpp @@ -16,17 +16,18 @@ #include #include - #include "rosbag2_cpp/clocks/time_controller_clock.hpp" using namespace testing; // NOLINT using SteadyTimePoint = std::chrono::steady_clock::time_point; +using SteadyTimeDurationSecT = std::chrono::duration; using NowFunction = rosbag2_cpp::PlayerClock::NowFunction; class TimeControllerClockTest : public Test { public: TimeControllerClockTest() + : return_time(std::chrono::nanoseconds(0)) { now_fn = [this]() { return return_time; @@ -258,3 +259,263 @@ TEST_F(TimeControllerClockTest, jump) return_time += std::chrono::seconds(2); EXPECT_EQ(clock.now(), RCUTILS_S_TO_NS(52)); } + +TEST_F(TimeControllerClockTest, jump_forward_with_playback_rate) +{ + // Use non zero start time along side with payback rate differ than 1.0 + ros_start_time = RCUTILS_S_TO_NS(1); + const SteadyTimePoint wall_time_begin(std::chrono::seconds(2)); + return_time = wall_time_begin; // Init now_fn() to some non zero wall_time_begin + const double playback_rate = 1.5; + rosbag2_cpp::TimeControllerClock testing_clock(ros_start_time, now_fn); + testing_clock.set_rate(playback_rate); + + const rcutils_time_point_value_t ros_time_point_for_jump = ros_start_time + RCUTILS_S_TO_NS(3); + testing_clock.jump(ros_time_point_for_jump); + + // Emulate wall clock ticks + const SteadyTimePoint current_wall_time = wall_time_begin + SteadyTimeDurationSecT(5); + return_time = current_wall_time; // now_fn() will return wall_time_begin + 5 sec + + // Jump shouldn't take in to account playback rate when jumping in time, + // although TimeControllerClock::now() should respect playback rate after jump. + rcutils_time_point_value_t expected_ros_time = ros_time_point_for_jump + + static_cast(playback_rate * (as_nanos(current_wall_time) - + as_nanos(wall_time_begin))); + + EXPECT_EQ(testing_clock.now(), expected_ros_time); +} + +TEST_F(TimeControllerClockTest, jump_backward_with_playback_rate) +{ + // Use non zero start time along side with payback rate differ than 1.0 + ros_start_time = RCUTILS_S_TO_NS(5); + const SteadyTimePoint wall_time_begin(std::chrono::seconds(2)); + return_time = wall_time_begin; // Init now_fn() to some non zero wall_time_begin + const double playback_rate = 1.5; + rosbag2_cpp::TimeControllerClock testing_clock(ros_start_time, now_fn); + testing_clock.set_rate(playback_rate); + + const rcutils_time_point_value_t ros_time_point_for_jump = ros_start_time - RCUTILS_S_TO_NS(3); + testing_clock.jump(ros_time_point_for_jump); + + // Emulate wall clock ticks + const SteadyTimePoint current_wall_time = wall_time_begin + SteadyTimeDurationSecT(5); + return_time = current_wall_time; // now_fn() will return wall_time_begin + 5 sec + + // Jump shouldn't take in to account playback rate when jumping in time, + // although TimeControllerClock::now() should respect playback rate after jump. + rcutils_time_point_value_t expected_ros_time = ros_time_point_for_jump + + static_cast(playback_rate * (as_nanos(current_wall_time) - + as_nanos(wall_time_begin))); + + EXPECT_EQ(testing_clock.now(), expected_ros_time); +} + +TEST_F(TimeControllerClockTest, jump_handler_basic) +{ + auto pre_callback = [&]() {}; + auto post_callback = [&](const rcl_time_jump_t &) {}; + rcl_jump_threshold_t threshold{}; + EXPECT_THROW( + rosbag2_cpp::TimeControllerClock::JumpHandler::create(nullptr, nullptr, threshold), + std::invalid_argument); + EXPECT_NO_THROW( + rosbag2_cpp::TimeControllerClock::JumpHandler::create(nullptr, post_callback, threshold)); + EXPECT_NO_THROW( + rosbag2_cpp::TimeControllerClock::JumpHandler::create(pre_callback, nullptr, threshold)); + EXPECT_NO_THROW( + rosbag2_cpp::TimeControllerClock::JumpHandler::create(pre_callback, post_callback, threshold)); +} + +TEST_F(TimeControllerClockTest, add_jump_callbacks) +{ + auto pre_callback = [&]() {}; + auto post_callback = [&](const rcl_time_jump_t &) {}; + + rosbag2_cpp::TimeControllerClock testing_clock(ros_start_time, now_fn); + + rcl_jump_threshold_t threshold{}; + threshold.min_backward.nanoseconds = 10; + threshold.min_forward.nanoseconds = 10; + auto callback_handler = + rosbag2_cpp::TimeControllerClock::JumpHandler::create(pre_callback, post_callback, threshold); + EXPECT_THROW(testing_clock.add_jump_calbacks(callback_handler), std::invalid_argument); + + threshold.min_backward.nanoseconds = -10; + threshold.min_forward.nanoseconds = -10; + callback_handler = + rosbag2_cpp::TimeControllerClock::JumpHandler::create(pre_callback, post_callback, threshold); + EXPECT_THROW(testing_clock.add_jump_calbacks(callback_handler), std::invalid_argument); + + threshold.min_backward.nanoseconds = -10; + threshold.min_forward.nanoseconds = 10; + callback_handler = + rosbag2_cpp::TimeControllerClock::JumpHandler::create(pre_callback, post_callback, threshold); + EXPECT_NO_THROW(testing_clock.add_jump_calbacks(callback_handler)); +} + +TEST_F(TimeControllerClockTest, jump_forward_with_callbacks_called) +{ + ros_start_time = RCUTILS_S_TO_NS(5); + const rcutils_time_point_value_t ros_time_point_for_jump = ros_start_time + RCUTILS_S_TO_NS(10); + const rcutils_duration_value_t time_jump_delta_ns = ros_time_point_for_jump - ros_start_time; + rosbag2_cpp::TimeControllerClock testing_clock(ros_start_time, now_fn); + + int pre_callback_called_times = 0; + int post_callback_called_times = 0; + + auto pre_callback = [&]() { + pre_callback_called_times++; + // Check that clock hasn't been adjusted yet + EXPECT_EQ(testing_clock.now(), ros_start_time); + }; + + auto post_callback = [&](const rcl_time_jump_t & jump_time) { + post_callback_called_times++; + // Check that clock already adjusted + EXPECT_EQ(testing_clock.now(), ros_time_point_for_jump); + EXPECT_EQ(jump_time.delta.nanoseconds, time_jump_delta_ns); + EXPECT_EQ(jump_time.clock_change, RCL_ROS_TIME_NO_CHANGE); + }; + + rcl_jump_threshold_t threshold{}; + threshold.min_forward.nanoseconds = time_jump_delta_ns - 1; + threshold.min_backward.nanoseconds = 0; + auto callback_handler = + rosbag2_cpp::TimeControllerClock::JumpHandler::create(pre_callback, post_callback, threshold); + testing_clock.add_jump_calbacks(callback_handler); + testing_clock.jump(ros_time_point_for_jump); + EXPECT_EQ(testing_clock.now(), ros_time_point_for_jump); + EXPECT_EQ(pre_callback_called_times, 1); + EXPECT_EQ(post_callback_called_times, 1); + testing_clock.remove_jump_callbacks(callback_handler); +} + +TEST_F(TimeControllerClockTest, jump_backward_with_callbacks_called) +{ + ros_start_time = RCUTILS_S_TO_NS(10); + const rcutils_time_point_value_t ros_time_point_for_jump = ros_start_time - RCUTILS_S_TO_NS(3); + const rcutils_duration_value_t time_jump_delta_ns = ros_time_point_for_jump - ros_start_time; + rosbag2_cpp::TimeControllerClock testing_clock(ros_start_time, now_fn); + + int pre_callback_called_times = 0; + int post_callback_called_times = 0; + + auto pre_callback = [&]() { + pre_callback_called_times++; + // Check that clock hasn't been adjusted yet + EXPECT_EQ(testing_clock.now(), ros_start_time); + }; + + auto post_callback = [&](const rcl_time_jump_t & jump_time) { + post_callback_called_times++; + // Check that clock already adjusted + EXPECT_EQ(testing_clock.now(), ros_time_point_for_jump); + EXPECT_EQ(jump_time.delta.nanoseconds, time_jump_delta_ns); + EXPECT_EQ(jump_time.clock_change, RCL_ROS_TIME_NO_CHANGE); + }; + + rcl_jump_threshold_t threshold{}; + threshold.min_forward.nanoseconds = 0; + threshold.min_backward.nanoseconds = time_jump_delta_ns + 1; + auto callback_handler = + rosbag2_cpp::TimeControllerClock::JumpHandler::create(pre_callback, post_callback, threshold); + testing_clock.add_jump_calbacks(callback_handler); + testing_clock.jump(ros_time_point_for_jump); + ASSERT_EQ(testing_clock.now(), ros_time_point_for_jump); + EXPECT_EQ(pre_callback_called_times, 1); + EXPECT_EQ(post_callback_called_times, 1); + testing_clock.remove_jump_callbacks(callback_handler); +} + +TEST_F(TimeControllerClockTest, jump_callbacks_threshold_boundary_check) +{ + ros_start_time = RCUTILS_S_TO_NS(5); + const rcutils_time_point_value_t ros_time_point_for_jump = ros_start_time + RCUTILS_S_TO_NS(10); + const rcutils_duration_value_t time_jump_delta_ns = ros_time_point_for_jump - ros_start_time; + rosbag2_cpp::TimeControllerClock testing_clock(ros_start_time, now_fn); + + int pre_callback_called_times = 0; + int post_callback_called_times = 0; + auto pre_callback = [&]() {pre_callback_called_times++;}; + auto post_callback = [&](const rcl_time_jump_t &) {post_callback_called_times++;}; + + rcl_jump_threshold_t threshold{}; + threshold.min_forward.nanoseconds = 0; + threshold.min_backward.nanoseconds = -1 * (time_jump_delta_ns - RCUTILS_S_TO_NS(2)); + + auto callback_handler = + rosbag2_cpp::TimeControllerClock::JumpHandler::create(pre_callback, post_callback, threshold); + testing_clock.add_jump_calbacks(callback_handler); + testing_clock.jump(ros_time_point_for_jump); + ASSERT_EQ(testing_clock.now(), ros_time_point_for_jump); + EXPECT_EQ(pre_callback_called_times, 0); + EXPECT_EQ(post_callback_called_times, 0); + testing_clock.remove_jump_callbacks(callback_handler); + + pre_callback_called_times = 0; + post_callback_called_times = 0; + threshold.min_forward.nanoseconds = time_jump_delta_ns - RCUTILS_S_TO_NS(2); + threshold.min_backward.nanoseconds = 0; + callback_handler = + rosbag2_cpp::TimeControllerClock::JumpHandler::create(pre_callback, post_callback, threshold); + testing_clock.add_jump_calbacks(callback_handler); + testing_clock.jump(ros_start_time); + ASSERT_EQ(testing_clock.now(), ros_start_time); + EXPECT_EQ(pre_callback_called_times, 0); + EXPECT_EQ(post_callback_called_times, 0); + testing_clock.remove_jump_callbacks(callback_handler); + + pre_callback_called_times = 0; + post_callback_called_times = 0; + threshold.min_forward.nanoseconds = time_jump_delta_ns + 1; + threshold.min_backward.nanoseconds = 0; + callback_handler = + rosbag2_cpp::TimeControllerClock::JumpHandler::create(pre_callback, post_callback, threshold); + testing_clock.add_jump_calbacks(callback_handler); + testing_clock.jump(ros_time_point_for_jump); + ASSERT_EQ(testing_clock.now(), ros_time_point_for_jump); + EXPECT_EQ(pre_callback_called_times, 0); + EXPECT_EQ(post_callback_called_times, 0); + testing_clock.remove_jump_callbacks(callback_handler); + + pre_callback_called_times = 0; + post_callback_called_times = 0; + threshold.min_forward.nanoseconds = 0; + threshold.min_backward.nanoseconds = -1 * (time_jump_delta_ns + 1); + callback_handler = + rosbag2_cpp::TimeControllerClock::JumpHandler::create(pre_callback, post_callback, threshold); + testing_clock.add_jump_calbacks(callback_handler); + testing_clock.jump(ros_start_time); + ASSERT_EQ(testing_clock.now(), ros_start_time); + EXPECT_EQ(pre_callback_called_times, 0); + EXPECT_EQ(post_callback_called_times, 0); + testing_clock.remove_jump_callbacks(callback_handler); + + pre_callback_called_times = 0; + post_callback_called_times = 0; + threshold.min_forward.nanoseconds = time_jump_delta_ns; + threshold.min_backward.nanoseconds = 0; + callback_handler = + rosbag2_cpp::TimeControllerClock::JumpHandler::create(pre_callback, post_callback, threshold); + testing_clock.add_jump_calbacks(callback_handler); + testing_clock.jump(ros_time_point_for_jump); + ASSERT_EQ(testing_clock.now(), ros_time_point_for_jump); + EXPECT_EQ(pre_callback_called_times, 1); + EXPECT_EQ(post_callback_called_times, 1); + testing_clock.remove_jump_callbacks(callback_handler); + + pre_callback_called_times = 0; + post_callback_called_times = 0; + threshold.min_forward.nanoseconds = 0; + threshold.min_backward.nanoseconds = -1 * time_jump_delta_ns; + callback_handler = + rosbag2_cpp::TimeControllerClock::JumpHandler::create(pre_callback, post_callback, threshold); + testing_clock.add_jump_calbacks(callback_handler); + testing_clock.jump(ros_start_time); + ASSERT_EQ(testing_clock.now(), ros_start_time); + EXPECT_EQ(pre_callback_called_times, 1); + EXPECT_EQ(post_callback_called_times, 1); + testing_clock.remove_jump_callbacks(callback_handler); +}