Skip to content

Commit

Permalink
Remove JumpHandler copy-implementation from PlayerClock/TimeControlle…
Browse files Browse the repository at this point in the history
…rClock

This API was originally envisioned as a way to handle the `seek`/`jump` operation, but that feature ended up being implemented inline in the `Player`. This is not used now, and I do not anticipate using it in the future. Removing as housekeeping - if it is needed later it can always be re-added from history.

Signed-off-by: Emerson Knapp <emerson.b.knapp@gmail.com>
  • Loading branch information
emersonknapp committed Dec 8, 2021
1 parent d6ffd91 commit baa8613
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 423 deletions.
94 changes: 6 additions & 88 deletions rosbag2_cpp/include/rosbag2_cpp/clocks/player_clock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,76 +36,6 @@ namespace rosbag2_cpp
class PlayerClock
{
public:
class JumpHandler final
{
public:
using SharedPtr = std::shared_ptr<JumpHandler>;
using pre_callback_t = std::function<void ()>;
using post_callback_t = std::function<void (const rcl_time_jump_t &)>;

JumpHandler() = delete;
JumpHandler(const JumpHandler &) = delete;
const JumpHandler & operator=(const JumpHandler &) = delete;

/**
* \brief Creates JumpHandler object with callbacks for jump operation.
* \param pre_callback Pre-time jump callback. Must be non-throwing.
* \param post_callback Post-time jump callback. Must be non-throwing.
* \param threshold Callbacks will be triggered if the time jump is greater then the threshold.
* \note These callback functions must remain valid as long as the returned shared pointer is
* valid.
* \return Shared pointer to the newly created JumpHandler object.
* \throws std::bad_alloc if the allocation of the JumpHandler fails.
* \throws std::invalid argument if any of the provided callbacks are nullptr.
*/
ROSBAG2_CPP_PUBLIC
static SharedPtr create(
const pre_callback_t & pre_callback, const post_callback_t & post_callback,
const rcl_jump_threshold_t & threshold)
{
JumpHandler::SharedPtr handler(new JumpHandler(pre_callback, post_callback, threshold));
if (handler == nullptr) {
throw std::bad_alloc{};
}
return handler;
}

/**
* \brief Equal operator for PlayerClock::JumpHandler class.
* \param right Right side operand for equal comparison operation.
* \return true if internal id's of two JumpHandlers match, otherwise false.
*/
ROSBAG2_CPP_PUBLIC
bool operator==(const PlayerClock::JumpHandler & right) const
{
return id == right.id;
}

const pre_callback_t pre_callback;
const post_callback_t post_callback;
const rcl_jump_threshold_t notice_threshold;

private:
uint64_t id;

JumpHandler(
const pre_callback_t & pre_callback, const post_callback_t & post_callback,
const rcl_jump_threshold_t & threshold)
: pre_callback(pre_callback), post_callback(post_callback), notice_threshold(threshold)
{
if (pre_callback == nullptr && post_callback == nullptr) {
throw std::invalid_argument("At least one callback for JumpHandler should be not nullptr");
}
id = create_id();
}

static uint64_t create_id()
{
static std::atomic<uint64_t> id_count{0};
return id_count.fetch_add(1, std::memory_order_relaxed) + 1;
}
};

/**
* Type representing an arbitrary steady time, used to measure real-time durations
* This type is never exposed by the PlayerClock - it is only used as input to the PlayerClock.
Expand Down Expand Up @@ -186,25 +116,13 @@ class PlayerClock
ROSBAG2_CPP_PUBLIC
virtual void jump(rclcpp::Time time) = 0;

/**
* \brief Add callbacks to be called when a time jump exceeds a threshold.
* \details Callbacks specified in JumpHandler object will be called in two cases:
* 1. use_sim_time is true: if the external time source jumps back in time, or forward
* farther than the threshold.
* 2. use_sim_time is false: if jump(time_point) is called and time jumps back or forward
* farther than the threshold.
* \param handler Shared pointer to the JumpHandler object returned from JumpHandler::create(..)
* \throws std::invalid argument if jump threshold has invalid value.
*/
ROSBAG2_CPP_PUBLIC
virtual void add_jump_calbacks(PlayerClock::JumpHandler::SharedPtr handler) = 0;

/**
* \brief remove jump callbacks from processing list.
* \param handler Shared pointer to the JumpHandler object returned from JumpHandler::create(..)
*/
/// Add a callback to invoke if the jump threshold is exceeded.
/// \sa rclcpp::Clock::create_jump_callback
ROSBAG2_CPP_PUBLIC
virtual void remove_jump_callbacks(PlayerClock::JumpHandler::SharedPtr handler) = 0;
virtual rclcpp::Clock::JumpHandler::SharedPtr create_jump_callback(
rclcpp::Clock::JumpHandler::pre_callback_t pre_callback,
rclcpp::Clock::JumpHandler::post_callback_t post_callback,
const rcl_jump_threshold_t & threshold) = 0;
};

} // namespace rosbag2_cpp
Expand Down
28 changes: 9 additions & 19 deletions rosbag2_cpp/include/rosbag2_cpp/clocks/time_controller_clock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace rosbag2_cpp

/**
* Version of the PlayerClock interface that has control over time.
* It does not listen to any external ROS Time Source and can optionally publish /clock
* It does not listen to any ROS time source, instead using an internal steady time.
*/
class TimeControllerClockImpl;
class TimeControllerClock : public PlayerClock
Expand Down Expand Up @@ -126,25 +126,15 @@ class TimeControllerClock : public PlayerClock
ROSBAG2_CPP_PUBLIC
void jump(rclcpp::Time ros_time) override;

/**
* \brief Add callbacks to be called when a time jump exceeds a threshold.
* \details Callbacks specified in JumpHandler object will be called in two cases:
* 1. use_sim_time is true: if the external time source jumps back in time, or forward
* farther than the threshold.
* 2. use_sim_time is false: if jump(time_point) is called and time jumps back or forward
* farther than the threshold.
* \param handler Shared pointer to the JumpHandler object returned from JumpHandler::create(..)
* \throws std::invalid argument if jump threshold has invalid value.
*/
ROSBAG2_CPP_PUBLIC
void add_jump_calbacks(PlayerClock::JumpHandler::SharedPtr handler) override;
/// Since a jump can only occur via a `jump` call by the owner of this Clock, jump callbacks
/// are not handled here, they are a no-op. It is expected that the caller handles jumps
/// in their calling code.
rclcpp::Clock::JumpHandler::SharedPtr create_jump_callback(
rclcpp::Clock::JumpHandler::pre_callback_t pre_callback,
rclcpp::Clock::JumpHandler::post_callback_t post_callback,
const rcl_jump_threshold_t & threshold) override
{}

/**
* \brief remove jump callbacks from processing list.
* \param handler Shared pointer to the JumpHandler object returned from JumpHandler::create(..)
*/
ROSBAG2_CPP_PUBLIC
void remove_jump_callbacks(PlayerClock::JumpHandler::SharedPtr handler) override;

private:
std::unique_ptr<TimeControllerClockImpl> impl_;
Expand Down
111 changes: 3 additions & 108 deletions rosbag2_cpp/src/rosbag2_cpp/clocks/time_controller_clock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,61 +127,9 @@ class TimeControllerClockImpl
*/
void jump(rcutils_time_point_value_t ros_time)
{
rcl_duration_t time_jump_delta;
{
std::lock_guard<std::mutex> lock(state_mutex);
time_jump_delta.nanoseconds = ros_time - steady_to_ros(now_fn());
}

rcl_time_jump_t time_jump{};
time_jump.clock_change = RCL_ROS_TIME_NO_CHANGE;
time_jump.delta = time_jump_delta;

process_callbacks_before_jump(time_jump);
{
std::lock_guard<std::mutex> lock(state_mutex);
snapshot(ros_time);
cv.notify_all();
}
process_callbacks_after_jump(time_jump);
}

/**
* \brief Add callbacks to be called when a time jump exceeds a threshold.
* \param handler Shared pointer to the JumpHandler object returned from JumpHandler::create(..)
* \throws std::invalid argument if jump threshold has invalid value.
*/
void add_jump_callbacks(PlayerClock::JumpHandler::SharedPtr handler)
{
if (handler->notice_threshold.min_forward.nanoseconds < 0) {
throw std::invalid_argument("forward jump threshold must be positive or zero");
}
if (handler->notice_threshold.min_backward.nanoseconds > 0) {
throw std::invalid_argument("backward jump threshold must be negative or zero");
}

std::lock_guard<std::mutex> lock(callback_list_mutex_);
for (auto const & registered_handler : callback_list_) {
if (*registered_handler == *handler) {
return; // Already have this callback in the list.
}
}
callback_list_.push_back(handler);
}

/**
* \brief remove jump callbacks from processing list.
* \param handler Shared pointer to the JumpHandler object returned from JumpHandler::create(..)
*/
void remove_jump_callbacks(PlayerClock::JumpHandler::SharedPtr handler)
{
std::lock_guard<std::mutex> lock(callback_list_mutex_);
for (auto it = callback_list_.begin(); it != callback_list_.end(); ++it) {
if (**it == *handler) {
callback_list_.erase(it);
return;
}
}
std::lock_guard<std::mutex> lock(state_mutex);
snapshot(ros_time);
cv.notify_all();
}

const PlayerClock::NowFunction now_fn;
Expand All @@ -192,49 +140,6 @@ class TimeControllerClockImpl
double rate RCPPUTILS_TSA_GUARDED_BY(state_mutex) = 1.0;
bool paused RCPPUTILS_TSA_GUARDED_BY(state_mutex) = false;
TimeReference reference RCPPUTILS_TSA_GUARDED_BY(state_mutex);

private:
std::mutex callback_list_mutex_;
std::vector<PlayerClock::JumpHandler::SharedPtr> callback_list_
RCPPUTILS_TSA_GUARDED_BY(callback_list_mutex_);

void process_callbacks_before_jump(const rcl_time_jump_t & time_jump)
{
std::lock_guard<std::mutex> lock(callback_list_mutex_);
for (auto const & handler : callback_list_) {
process_callback(handler, time_jump, true);
}
}

void process_callbacks_after_jump(const rcl_time_jump_t & time_jump)
{
std::lock_guard<std::mutex> lock(callback_list_mutex_);
for (auto const & handler : callback_list_) {
process_callback(handler, time_jump, false);
}
}

void process_callback(
PlayerClock::JumpHandler::SharedPtr handler, const rcl_time_jump_t & time_jump,
bool before_jump) const RCPPUTILS_TSA_REQUIRES(callback_list_mutex_)
{
bool is_clock_change = time_jump.clock_change == RCL_ROS_TIME_ACTIVATED ||
time_jump.clock_change == RCL_ROS_TIME_DEACTIVATED;
if ((is_clock_change && handler->notice_threshold.on_clock_change) ||
(handler->notice_threshold.min_backward.nanoseconds != 0 &&
time_jump.delta.nanoseconds < 0 &&
time_jump.delta.nanoseconds <= handler->notice_threshold.min_backward.nanoseconds) ||
(handler->notice_threshold.min_forward.nanoseconds != 0 &&
time_jump.delta.nanoseconds > 0 &&
time_jump.delta.nanoseconds >= handler->notice_threshold.min_forward.nanoseconds))
{
if (before_jump && handler->pre_callback) {
handler->pre_callback();
} else if (!before_jump && handler->post_callback) {
handler->post_callback(time_jump);
}
}
}
};

TimeControllerClock::TimeControllerClock(
Expand Down Expand Up @@ -300,16 +205,6 @@ bool TimeControllerClock::set_rate(double rate)
return true;
}

void TimeControllerClock::add_jump_calbacks(PlayerClock::JumpHandler::SharedPtr handler)
{
impl_->add_jump_callbacks(handler);
}

void TimeControllerClock::remove_jump_callbacks(PlayerClock::JumpHandler::SharedPtr handler)
{
impl_->remove_jump_callbacks(handler);
}

double TimeControllerClock::get_rate() const
{
std::lock_guard<std::mutex> lock(impl_->state_mutex);
Expand Down
Loading

0 comments on commit baa8613

Please sign in to comment.