-
Notifications
You must be signed in to change notification settings - Fork 248
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add correct timing behaviour for rosbag play #32
Conversation
8e8af82
to
6eea477
Compare
rosbag2/src/rosbag2/demo_play.cpp
Outdated
#include "rosbag2/rosbag2.hpp" | ||
|
||
int main(int argc, const char ** argv) | ||
{ | ||
rclcpp::init(argc, argv); | ||
|
||
auto options = rosbag2::Rosbag2PlayOptions(); | ||
options.queue_buffer_length_ = 1000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What exactly does this queue_buffer_length
describe? Is it the amount of serialized messages to load?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is the queue size up to which the storage is preloaded for smooth replaying.
rosbag2/src/rosbag2/player.cpp
Outdated
|
||
bool is_pending(const std::future<void> & future) | ||
{ | ||
return !(future.valid() && future.wait_for(std::chrono::seconds(0)) == std::future_status::ready); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does wait_for(std::chrono::seconds(0))
do? Wait forever or return immediately?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will return immediately.
|
||
namespace rosbag2 | ||
{ | ||
struct Rosbag2PlayOptions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The options should also encompass the qos options in order to guarantee correct replay behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, however I would prefer to do this later with a followup pr (#34) as we do not yet have the CLI integration.
rosbag2/src/rosbag2/player.cpp
Outdated
message_queue_.enqueue(message); | ||
} | ||
|
||
auto queue_lower_boundary = options.queue_buffer_length_ - options.queue_buffer_length_ / 10; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a few comments on this:
- Where does that magic number (90%) come from?
- The result of this should be casted explicitely to a type. I believe you can lose decimal precision here.
- Should this number be part of the
Rosbag2PlayOptions
struct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- About being part of
Rosbag2PlayOptions
: I am not sure. It would be an expert user option only and we definitely would need to provide a useful default, e.g. 90%. - There is actually no cast necessary as
/
is integer division in C++ which is what we want here. - This is also a candidate for Allow to specify more parameters for rosbag play #34.
namespace rosbag2 | ||
{ | ||
|
||
struct ReplayableMessage |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I would prefer that the SerializedBagMessage
struct gets enhanced by the time_to_start
field (maybe in another name), but I don't see the point of cascading these types, given that the serialized bag message was introduced for this purpose.
Because I believe this time point field can also be used otherwise besides publishing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I disagree on that one. The SerializedBagMessage
represents a message with meta data that can be stored in a bag file.
The ReplayableMessage
represents a SerializedBagMessage
enriched with play-specific meta data that is needed for playing. These are different things and thus should have different types.
The tests seem to be (at least) flaky on my OSX. Also, I guess it'd be good to cleanup the The demo applications are currently not installed. So you can't run them with the When running the demo application, I have three files in my workspace
The print of for the For the generic subscription: Overall: |
rosbag2/src/rosbag2/player.cpp
Outdated
message_queue_.size_approx() < options.queue_buffer_length_ && | ||
is_pending(storage_loading_future)) | ||
{ | ||
std::this_thread::sleep_for(std::chrono::milliseconds(100)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could see that this parameter should be exposed or configurable. Could we pack this into the Rosbag2PlayOptions
struct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely possible. I am not yet sure how useful this parameter will be. I suggest to discuss this here (#34).
|
rosbag2/src/rosbag2/player.cpp
Outdated
{ | ||
while ( | ||
message_queue_.size_approx() < options.queue_buffer_length_ && | ||
is_pending(storage_loading_future)) | ||
message_queue_.size_approx() < options.read_ahead_queue_size && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this queue have to be initialized in some way? Or is it guaranteed that size_approx()
returns 0 by default initialization?
rosbag2/src/rosbag2/player.cpp
Outdated
while (message_queue_.size_approx() != 0 || is_pending(storage_loading_future)) { | ||
ReplayableMessage message; | ||
if (message_queue_.try_dequeue(message)) { | ||
std::this_thread::sleep_until(start_time + message.time_since_start); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, my point is that start_time + message.time_since_start
could be smaller than now
, right?
But from what I read, the sleep_until
command doesn't block on a time point in the past. So this should be alright.
rosbag2/src/rosbag2/rosbag2.cpp
Outdated
void Rosbag2::record(const std::string & file_name) | ||
{ | ||
auto topics_and_types = node_->get_all_topics_with_types(); | ||
std::vector<std::string> topic_names(topics_and_types.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this function throw when no topics are available?
node->get_all_topics_with_types() : | ||
node->get_topics_with_types(topic_names); | ||
|
||
auto topics_and_types = node_->get_topics_with_types(topic_names); | ||
if (topics_and_types.empty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this check should go into the other record function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As long as we do not have topic discovery after startup we should also report if we could not detect the topics that were explicitely specified. So I think is is an error for both record
overloads and the location is fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The last commit fixes the record all test for me:
For now the publishing starts only after the reading is completly done. This should change aufter ros2GH-68 is done and a thread-safe queue can be used instead of std::queue.
- The main (play) thread sleeps until the time for publishing the message is reached. - Using std::chrono time_point and duration for type-safe time arithmetic instead of rcutils time types.
- Subscribers need to maintain a longer history if the messages are not consumed fast enough.
The Classloader instance needs to outlive all objects created by it.
Reason: record and play have almost no common code but do the exact opposite with the storage and rclcpp.
- only required in tests - this decreases the amount of packages needed for a clean build without tests
- Download and install headers from moodycamel readerwriterqueue - Download and install headers from moodycamel concurrentqueue - Use readerwriterqueue in code to load and publish concurrently
- Only load if queue contains less than 1000 messages - Wait a millisecond before loading again once the queue is long enough
- gets rid of string_msgs dependency
- Currently, we write sequentially in order of arrival time so reading in id order is fine - This may change at a later time and should not change the reading behaviour, i.e. we need to read in order of timestamps
Make the future a class member of player to avoid having to hand it into several functions which is difficult with a move-only type.
Add an explicit overload for record without a topic_names argument to record all topics.
3255d5c
to
08cfd51
Compare
I'll go ahead and merge this. The flaky test is already ticketed in #37 |
Signed-off-by: Jacob Bandes-Storch <jacob@foxglove.dev>
Signed-off-by: Jacob Bandes-Storch <jacob@foxglove.dev> Signed-off-by: James Smith <james@foxglove.dev>
This PR adds correct timing behaviour for rosbag play
In addition, a few technical aspects have been improved
test_msgs
(notstd_msgs
) in rosbag2 tests