Skip to content

Commit

Permalink
Clock publisher in Player
Browse files Browse the repository at this point in the history
Fixes #99
Design: #675
Depends on #689
Depends on #693 to expose to the CLI

Add a `rosgraph_msgs/Clock` publisher to the `Player` - that uses `PlayerClock::now()` to publish current time.

Signed-off-by: Emerson Knapp <eknapp@amazon.com>
  • Loading branch information
Emerson Knapp committed Mar 31, 2021
1 parent f06836e commit 009c60d
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 19 deletions.
4 changes: 4 additions & 0 deletions rosbag2_transport/include/rosbag2_transport/play_options.hpp
Expand Up @@ -40,6 +40,10 @@ struct PlayOptions
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides = {};
bool loop = false;
std::vector<std::string> topic_remapping_options = {};

// Rate in Hz at which to publish to /clock.
// 0 (or negative) means that no publisher will be created
float clock_publish_frequency = 0.f;
};

} // namespace rosbag2_transport
Expand Down
65 changes: 46 additions & 19 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Expand Up @@ -93,25 +93,7 @@ bool Player::is_storage_completely_loaded() const

void Player::play(const PlayOptions & options)
{
// Initialize Clock
{
rosbag2_cpp::PlayerClock::ROSTimePoint time_first_message;
float rate = 1.0;

if (options.rate > 0.0) {
rate = options.rate;
}
if (reader_->has_next()) {
auto message = reader_->read_next();
time_first_message = message->time_stamp;
// Could not peek, so need to enqueue this popped first message to be played.
message_queue_.enqueue(message);
} else {
// There are no messages in the storage at all
return;
}
clock_ = std::make_unique<rosbag2_cpp::PlayerClock>(time_first_message, rate);
}
prepare_clock(options);

topic_qos_profile_overrides_ = options.topic_qos_profile_overrides;
prepare_publishers(options);
Expand Down Expand Up @@ -178,6 +160,7 @@ void Player::play_messages_until_queue_empty()
{
rosbag2_storage::SerializedBagMessagePtr message;
while (message_queue_.try_dequeue(message) && rclcpp::ok()) {
rclcpp::spin_some(rosbag2_transport_);
clock_->sleep_until(message->time_stamp);
if (rclcpp::ok()) {
auto publisher_iter = publishers_.find(message->topic_name);
Expand Down Expand Up @@ -220,4 +203,48 @@ void Player::prepare_publishers(const PlayOptions & options)
}
}

void Player::prepare_clock(const PlayOptions & options)
{
// Initialize Clock
rosbag2_cpp::PlayerClock::ROSTimePoint time_first_message;
float rate = 1.0;

if (options.rate > 0.0) {
rate = options.rate;
}
if (reader_->has_next()) {
auto message = reader_->read_next();
time_first_message = message->time_stamp;
// Could not peek, so need to enqueue this popped first message to be played.
message_queue_.enqueue(message);
} else {
// There are no messages in the storage at all
return;
}
clock_ = std::make_unique<rosbag2_cpp::PlayerClock>(time_first_message, rate);

// Create /clock publisher
if (options.clock_publish_frequency > 0.f) {
// NOTE: PlayerClock does not own this publisher because rosbag2_cpp
// should not own transport-based functionality
std::chrono::duration<double> clock_period{1.0 / options.clock_publish_frequency};
auto nanos = std::chrono::duration_cast<std::chrono::nanoseconds>(clock_period);
ROSBAG2_TRANSPORT_LOG_WARN("Nanos is %lu", nanos.count());
rclcpp::QoS clock_qos = rclcpp::QoS(1)
// offer as strict as possible, will match if only best_effort requested
.reliable()
// new joiner can get a clock sample immediately no matter when it was published
.transient_local()
// offer a deadline based on the current rate so that subscriptions can use it if wanted
.deadline(clock_period);
clock_publisher_ = rosbag2_transport_->create_publisher<rosgraph_msgs::msg::Clock>(
"/clock", clock_qos);
clock_publish_timer_ = rosbag2_transport_->create_wall_timer(nanos, [this]() {
auto msg = rosgraph_msgs::msg::Clock();
msg.clock = rclcpp::Time(clock_->now());
clock_publisher_->publish(msg);
});
}
}

} // namespace rosbag2_transport
5 changes: 5 additions & 0 deletions rosbag2_transport/src/rosbag2_transport/player.hpp
Expand Up @@ -22,8 +22,10 @@
#include <string>
#include <unordered_map>

#include "rosgraph_msgs/msg/clock.hpp"
#include "moodycamel/readerwriterqueue.h"

#include "rclcpp/publisher.hpp"
#include "rclcpp/qos.hpp"

#include "rosbag2_cpp/player_clock.hpp"
Expand Down Expand Up @@ -58,6 +60,7 @@ class Player
void play_messages_from_queue();
void play_messages_until_queue_empty();
void prepare_publishers(const PlayOptions & options);
void prepare_clock(const PlayOptions & options);
static constexpr double read_ahead_lower_bound_percentage_ = 0.9;
static const std::chrono::milliseconds queue_read_wait_period_;

Expand All @@ -68,6 +71,8 @@ class Player
std::unordered_map<std::string, std::shared_ptr<GenericPublisher>> publishers_;
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides_;
std::unique_ptr<rosbag2_cpp::PlayerClock> clock_;
rclcpp::Publisher<rosgraph_msgs::msg::Clock>::SharedPtr clock_publisher_;
std::shared_ptr<rclcpp::TimerBase> clock_publish_timer_;
};

} // namespace rosbag2_transport
Expand Down

0 comments on commit 009c60d

Please sign in to comment.