From aa55430ecae86dc70b8f59c76a75a4a042b8125b Mon Sep 17 00:00:00 2001 From: Janosch Machowinski Date: Wed, 31 Jan 2024 15:52:38 +0000 Subject: [PATCH] feat(SerializedBagMessage): Added publish_time_stamp to and renamed time_stamp to receive_time_stamp Signed-off-by: Janosch Machowinski --- .../sequential_compression_writer.cpp | 2 +- rosbag2_cpp/include/rosbag2_cpp/writer.hpp | 20 +++++++++++++++++++ rosbag2_cpp/src/rosbag2_cpp/converter.cpp | 4 ++-- rosbag2_cpp/src/rosbag2_cpp/writer.cpp | 16 +++++++++++++-- .../rosbag2_cpp/writers/sequential_writer.cpp | 4 ++-- rosbag2_cpp/test/rosbag2_cpp/fake_data.cpp | 2 +- .../rosbag2_cpp/test_sequential_reader.cpp | 2 +- .../src/writer_benchmark.cpp | 2 +- rosbag2_py/src/rosbag2_py/_reader.cpp | 2 +- rosbag2_py/src/rosbag2_py/_writer.cpp | 2 +- .../serialized_bag_message.hpp | 10 +++++++++- rosbag2_storage_mcap/src/mcap_storage.cpp | 17 ++++++++-------- .../test_mcap_storage.cpp | 6 +++--- .../sqlite_storage.cpp | 8 +++++--- .../storage_test_fixture.hpp | 8 +++++--- .../test_sqlite_storage.cpp | 8 ++++---- .../rosbag2_tests/test_rosbag2_cpp_api.cpp | 2 +- .../src/rosbag2_transport/bag_rewrite.cpp | 2 +- .../src/rosbag2_transport/player.cpp | 8 ++++---- .../src/rosbag2_transport/recorder.cpp | 3 ++- .../mock_sequential_reader.hpp | 2 +- .../rosbag2_transport_test_fixture.hpp | 2 +- .../test/rosbag2_transport/test_burst.cpp | 2 +- .../test_keyboard_controls.cpp | 13 +++++++----- .../test/rosbag2_transport/test_play_next.cpp | 2 +- .../rosbag2_transport/test_play_timing.cpp | 5 +++-- .../test_record_all_use_sim_time.cpp | 2 +- 27 files changed, 103 insertions(+), 53 deletions(-) diff --git a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp index 54b94d42bb..93e1815c8f 100644 --- a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp +++ b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp @@ -360,7 +360,7 @@ SequentialCompressionWriter::compress_message( std::shared_ptr message) { auto compressed_message = std::make_shared(); - compressed_message->time_stamp = message->time_stamp; + compressed_message->receive_time_stamp = message->receive_time_stamp; compressed_message->topic_name = message->topic_name; compressor.compress_serialized_bag_message(message.get(), compressed_message.get()); return compressed_message; diff --git a/rosbag2_cpp/include/rosbag2_cpp/writer.hpp b/rosbag2_cpp/include/rosbag2_cpp/writer.hpp index 4a738ee226..230a3703a8 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/writer.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/writer.hpp @@ -199,6 +199,26 @@ class ROSBAG2_CPP_PUBLIC Writer const std::string & type_name, const rclcpp::Time & time); + /** + * Write a serialized message to a bagfile. + * The topic will be created if it has not been created already. + * + * \warning after calling this function, the serialized data will no longer be managed by message. + * + * \param message rclcpp::SerializedMessage The serialized message to be written to the bagfile + * \param topic_name the string of the topic this messages belongs to + * \param type_name the string of the type associated with this message + * \param recv_time The time stamp when the message was received + * \param send_time The time stamp when the message was send + * \throws runtime_error if the Writer is not open. + */ + void write( + std::shared_ptr message, + const std::string & topic_name, + const std::string & type_name, + const rclcpp::Time & recv_time, + const rclcpp::Time & send_time); + /** * Write a non-serialized message to a bagfile. * The topic will be created if it has not been created already. diff --git a/rosbag2_cpp/src/rosbag2_cpp/converter.cpp b/rosbag2_cpp/src/rosbag2_cpp/converter.cpp index e12859f5a5..4591e7fbb7 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/converter.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/converter.cpp @@ -75,13 +75,13 @@ std::shared_ptr Converter::convert( // deserialize rosbag2_cpp::introspection_message_set_topic_name( allocated_ros_message.get(), message->topic_name.c_str()); - allocated_ros_message->time_stamp = message->time_stamp; + allocated_ros_message->time_stamp = message->receive_time_stamp; input_converter_->deserialize(message, introspection_ts, allocated_ros_message); // re-serialize output_message->serialized_data = rosbag2_storage::make_empty_serialized_message(0); output_message->topic_name = std::string(allocated_ros_message->topic_name); - output_message->time_stamp = allocated_ros_message->time_stamp; + output_message->receive_time_stamp = allocated_ros_message->time_stamp; output_converter_->serialize(allocated_ros_message, introspection_ts, output_message); return output_message; } diff --git a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp index 355977d03d..0984069d27 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp @@ -135,7 +135,8 @@ void Writer::write( { auto serialized_bag_message = std::make_shared(); serialized_bag_message->topic_name = topic_name; - serialized_bag_message->time_stamp = time.nanoseconds(); + serialized_bag_message->receive_time_stamp = time.nanoseconds(); + serialized_bag_message->publish_time_stamp = time.nanoseconds(); serialized_bag_message->serialized_data = std::shared_ptr( new rcutils_uint8_array_t, @@ -182,10 +183,21 @@ void Writer::write( const std::string & topic_name, const std::string & type_name, const rclcpp::Time & time) +{ + write(message, topic_name, type_name, time, time); +} + +void Writer::write( + std::shared_ptr message, + const std::string & topic_name, + const std::string & type_name, + const rclcpp::Time & recv_time, + const rclcpp::Time & send_time) { auto serialized_bag_message = std::make_shared(); serialized_bag_message->topic_name = topic_name; - serialized_bag_message->time_stamp = time.nanoseconds(); + serialized_bag_message->receive_time_stamp = recv_time.nanoseconds(); + serialized_bag_message->publish_time_stamp = send_time.nanoseconds(); // point to actual data and keep reference to original message to avoid premature releasing serialized_bag_message->serialized_data = std::shared_ptr( new rcutils_uint8_array_t(message->get_rcl_serialized_message()), diff --git a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp index 37d3d49b0e..babf38edf8 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp @@ -349,7 +349,7 @@ void SequentialWriter::write(std::shared_ptrtime_stamp)) { + if (!message_within_accepted_time_range(message->receive_time_stamp)) { return; } @@ -365,7 +365,7 @@ void SequentialWriter::write(std::shared_ptr( - std::chrono::nanoseconds(message->time_stamp)); + std::chrono::nanoseconds(message->receive_time_stamp)); if (is_first_message_) { // Update bagfile starting time diff --git a/rosbag2_cpp/test/rosbag2_cpp/fake_data.cpp b/rosbag2_cpp/test/rosbag2_cpp/fake_data.cpp index 08e4354817..089242b62f 100644 --- a/rosbag2_cpp/test/rosbag2_cpp/fake_data.cpp +++ b/rosbag2_cpp/test/rosbag2_cpp/fake_data.cpp @@ -65,7 +65,7 @@ void write_sample_split_bag( auto msg = std::make_shared(); msg->serialized_data = rosbag2_storage::make_serialized_message(&value, sizeof(value)); - msg->time_stamp = time_stamp; + msg->receive_time_stamp = time_stamp; msg->topic_name = topic_name; writer.write(msg); } diff --git a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_reader.cpp b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_reader.cpp index 9ec24ac9c2..62e06f7be9 100644 --- a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_reader.cpp +++ b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_reader.cpp @@ -309,7 +309,7 @@ class ReadOrderTest : public ParametrizedTemporaryDirectoryFixture // Check both timestamp and value to uniquely identify messages in expected order ASSERT_TRUE(reader.has_next()); auto next = reader.read_next(); - EXPECT_EQ(next->time_stamp, expect_timestamp); + EXPECT_EQ(next->receive_time_stamp, expect_timestamp); ASSERT_EQ(next->serialized_data->buffer_length, 4u); uint32_t value = *reinterpret_cast(next->serialized_data->buffer); diff --git a/rosbag2_performance/rosbag2_performance_benchmarking/src/writer_benchmark.cpp b/rosbag2_performance/rosbag2_performance_benchmarking/src/writer_benchmark.cpp index e5f621faf3..4598ddd064 100644 --- a/rosbag2_performance/rosbag2_performance_benchmarking/src/writer_benchmark.cpp +++ b/rosbag2_performance/rosbag2_performance_benchmarking/src/writer_benchmark.cpp @@ -113,7 +113,7 @@ void WriterBenchmark::start_benchmark() get_logger(), "Error getting current time. Error:" << rcutils_get_error_string().str); } - message->time_stamp = time_stamp; + message->receive_time_stamp = time_stamp; message->topic_name = queue->topic_name(); try { diff --git a/rosbag2_py/src/rosbag2_py/_reader.cpp b/rosbag2_py/src/rosbag2_py/_reader.cpp index 3f68afc9d1..6e158c770f 100644 --- a/rosbag2_py/src/rosbag2_py/_reader.cpp +++ b/rosbag2_py/src/rosbag2_py/_reader.cpp @@ -52,7 +52,7 @@ class Reader : public rosbag2_cpp::Reader std::string serialized_data(rcutils_data.buffer, rcutils_data.buffer + rcutils_data.buffer_length); return pybind11::make_tuple( - next->topic_name, pybind11::bytes(serialized_data), next->time_stamp); + next->topic_name, pybind11::bytes(serialized_data), next->receive_time_stamp); } }; diff --git a/rosbag2_py/src/rosbag2_py/_writer.cpp b/rosbag2_py/src/rosbag2_py/_writer.cpp index 46b498f4d7..08ce82ad91 100644 --- a/rosbag2_py/src/rosbag2_py/_writer.cpp +++ b/rosbag2_py/src/rosbag2_py/_writer.cpp @@ -54,7 +54,7 @@ class Writer : public rosbag2_cpp::Writer bag_message->topic_name = topic_name; bag_message->serialized_data = rosbag2_storage::make_serialized_message(message.c_str(), message.length()); - bag_message->time_stamp = time_stamp; + bag_message->receive_time_stamp = time_stamp; rosbag2_cpp::Writer::write(bag_message); } diff --git a/rosbag2_storage/include/rosbag2_storage/serialized_bag_message.hpp b/rosbag2_storage/include/rosbag2_storage/serialized_bag_message.hpp index ab28af6734..032a592b87 100644 --- a/rosbag2_storage/include/rosbag2_storage/serialized_bag_message.hpp +++ b/rosbag2_storage/include/rosbag2_storage/serialized_bag_message.hpp @@ -27,7 +27,15 @@ namespace rosbag2_storage struct SerializedBagMessage { std::shared_ptr serialized_data; - rcutils_time_point_value_t time_stamp; + /** + * @brief Nanosecond timestamp when this message was received. + */ + rcutils_time_point_value_t receive_time_stamp; + /** + * @brief Nanosecond timestamp when this message was initially published. If + * not available, this will be set to time_stamp. + */ + rcutils_time_point_value_t publish_time_stamp; std::string topic_name; }; diff --git a/rosbag2_storage_mcap/src/mcap_storage.cpp b/rosbag2_storage_mcap/src/mcap_storage.cpp index 20a6633abe..1fe9f18e11 100644 --- a/rosbag2_storage_mcap/src/mcap_storage.cpp +++ b/rosbag2_storage_mcap/src/mcap_storage.cpp @@ -512,7 +512,8 @@ bool MCAPStorage::read_and_enqueue_message() const auto & messageView = *it; auto msg = std::make_shared(); last_enqueued_message_offset_ = messageView.messageOffset; - msg->time_stamp = rcutils_time_point_value_t(messageView.message.logTime); + msg->receive_time_stamp = rcutils_time_point_value_t(messageView.message.logTime); + msg->publish_time_stamp = rcutils_time_point_value_t(messageView.message.publishTime); msg->topic_name = messageView.channel->topic; msg->serialized_data = rosbag2_storage::make_serialized_message(messageView.message.data, messageView.message.dataSize); @@ -583,7 +584,7 @@ bool MCAPStorage::enqueued_message_is_already_read() if (next_ == nullptr) { return false; } - if (last_read_time_point_ != next_->time_stamp) { + if (last_read_time_point_ != next_->receive_time_stamp) { return false; } if (read_order_ == mcap::ReadMessageOptions::ReadOrder::ReverseLogTimeOrder) { @@ -670,7 +671,7 @@ std::shared_ptr MCAPStorage::read_next() if (!has_next()) { throw std::runtime_error{"No next message is available."}; } - last_read_time_point_ = next_->time_stamp; + last_read_time_point_ = next_->receive_time_stamp; last_read_message_offset_ = last_enqueued_message_offset_; // Importantly, clear next_ via move so that a next message can be read. return std::move(next_); @@ -750,11 +751,11 @@ void MCAPStorage::write(std::shared_ptrsecond; mcap_msg.sequence = 0; - if (msg->time_stamp < 0) { - RCUTILS_LOG_WARN_NAMED(LOG_NAME, "Invalid message timestamp %ld", msg->time_stamp); + if (msg->receive_time_stamp < 0) { + RCUTILS_LOG_WARN_NAMED(LOG_NAME, "Invalid message timestamp %ld", msg->receive_time_stamp); } - mcap_msg.logTime = mcap::Timestamp(msg->time_stamp); - mcap_msg.publishTime = mcap_msg.logTime; + mcap_msg.logTime = mcap::Timestamp(msg->receive_time_stamp); + mcap_msg.publishTime = mcap::Timestamp(msg->publish_time_stamp); mcap_msg.dataSize = msg->serialized_data->buffer_length; mcap_msg.data = reinterpret_cast(msg->serialized_data->buffer); const auto status = mcap_writer_->write(mcap_msg); @@ -770,7 +771,7 @@ void MCAPStorage::write(std::shared_ptrtime_stamp)); + const auto message_time = time_point(std::chrono::nanoseconds(msg->receive_time_stamp)); metadata_.duration = std::max(metadata_.duration, message_time - metadata_.starting_time); } diff --git a/rosbag2_storage_mcap/test/rosbag2_storage_mcap/test_mcap_storage.cpp b/rosbag2_storage_mcap/test/rosbag2_storage_mcap/test_mcap_storage.cpp index 4daba43f2b..1309d2dc28 100644 --- a/rosbag2_storage_mcap/test/rosbag2_storage_mcap/test_mcap_storage.cpp +++ b/rosbag2_storage_mcap/test/rosbag2_storage_mcap/test_mcap_storage.cpp @@ -69,7 +69,7 @@ class McapStorageTestFixture : public rosbag2_test_common::TemporaryDirectoryFix rw_storage->create_topic(topic_metadata, std::get<3>(msg)); auto bag_message = std::make_shared(); bag_message->serialized_data = make_serialized_message(std::get<0>(msg)); - bag_message->time_stamp = std::get<1>(msg); + bag_message->receive_time_stamp = std::get<1>(msg); bag_message->topic_name = topic_metadata.name; rw_storage->write(bag_message); } @@ -207,7 +207,7 @@ TEST_F(TemporaryDirectoryFixture, can_write_and_read_basic_mcap_file) serialized_bag_msg->serialized_data = std::shared_ptr( const_cast(&serialized_msg->get_rcl_serialized_message()), [](rcutils_uint8_array_t * /* data */) {}); - serialized_bag_msg->time_stamp = time_stamp; + serialized_bag_msg->receive_time_stamp = time_stamp; serialized_bag_msg->topic_name = topic_name; writer->write(serialized_bag_msg); } @@ -294,7 +294,7 @@ TEST_F(TemporaryDirectoryFixture, can_write_mcap_with_zstd_configured_from_yaml) serialized_bag_msg->serialized_data = std::shared_ptr( const_cast(&serialized_msg->get_rcl_serialized_message()), [](rcutils_uint8_array_t * /* data */) {}); - serialized_bag_msg->time_stamp = time_stamp; + serialized_bag_msg->receive_time_stamp = time_stamp; serialized_bag_msg->topic_name = topic_name; writer->write(serialized_bag_msg); writer->write(serialized_bag_msg); diff --git a/rosbag2_storage_sqlite3/src/rosbag2_storage_sqlite3/sqlite_storage.cpp b/rosbag2_storage_sqlite3/src/rosbag2_storage_sqlite3/sqlite_storage.cpp index a1e32ebee4..bdd3f6469b 100644 --- a/rosbag2_storage_sqlite3/src/rosbag2_storage_sqlite3/sqlite_storage.cpp +++ b/rosbag2_storage_sqlite3/src/rosbag2_storage_sqlite3/sqlite_storage.cpp @@ -288,7 +288,9 @@ void SqliteStorage::write_locked( } try { - write_statement_->bind(message->time_stamp, topic_entry->second, message->serialized_data); + write_statement_->bind( + message->receive_time_stamp, topic_entry->second, + message->serialized_data); } catch (const SqliteException & exc) { if (SQLITE_TOOBIG == exc.get_sqlite_return_code()) { // Get the sqlite string/blob limit. @@ -361,12 +363,12 @@ std::shared_ptr SqliteStorage::read_next( auto bag_message = std::make_shared(); bag_message->serialized_data = std::get<0>(*current_message_row_); - bag_message->time_stamp = std::get<1>(*current_message_row_); + bag_message->receive_time_stamp = std::get<1>(*current_message_row_); bag_message->topic_name = std::get<2>(*current_message_row_); // set start time to current time // and set seek_row_id to the new row id up - seek_time_ = bag_message->time_stamp; + seek_time_ = bag_message->receive_time_stamp; seek_row_id_ = std::get<3>(*current_message_row_) + (read_order_.reverse ? -1 : 1); ++current_message_row_; diff --git a/rosbag2_storage_sqlite3/test/rosbag2_storage_sqlite3/storage_test_fixture.hpp b/rosbag2_storage_sqlite3/test/rosbag2_storage_sqlite3/storage_test_fixture.hpp index 5c99f9e9b2..e57f66ea7f 100644 --- a/rosbag2_storage_sqlite3/test/rosbag2_storage_sqlite3/storage_test_fixture.hpp +++ b/rosbag2_storage_sqlite3/test/rosbag2_storage_sqlite3/storage_test_fixture.hpp @@ -113,7 +113,7 @@ class StorageTestFixture : public TemporaryDirectoryFixture rw_storage.create_topic({topic_name, type_name, rmw_format, {}, ""}, {}); auto bag_message = std::make_shared(); bag_message->serialized_data = make_serialized_message(std::get<0>(msg)); - bag_message->time_stamp = std::get<1>(msg); + bag_message->receive_time_stamp = std::get<1>(msg); bag_message->topic_name = topic_name; rw_storage.write(bag_message); } @@ -189,14 +189,16 @@ class StorageTestFixture : public TemporaryDirectoryFixture // Prepare rosbag2 serialized message to write auto message = std::make_shared(); message->serialized_data = make_serialized_message(std::get<0>(msg)); - message->time_stamp = std::get<1>(msg); + message->receive_time_stamp = std::get<1>(msg); message->topic_name = topic_name; // Write message to DB auto topic_entry = topics.find(topic_name); if (topic_entry == end(topics)) { throw SqliteException("Topic '" + topic_name + "' has not been created yet!"); } - write_statement->bind(message->time_stamp, topic_entry->second, message->serialized_data); + write_statement->bind( + message->receive_time_stamp, topic_entry->second, + message->serialized_data); write_statement->execute_and_reset(); } } diff --git a/rosbag2_storage_sqlite3/test/rosbag2_storage_sqlite3/test_sqlite_storage.cpp b/rosbag2_storage_sqlite3/test/rosbag2_storage_sqlite3/test_sqlite_storage.cpp index 9070d85bae..2eea0b801a 100644 --- a/rosbag2_storage_sqlite3/test/rosbag2_storage_sqlite3/test_sqlite_storage.cpp +++ b/rosbag2_storage_sqlite3/test/rosbag2_storage_sqlite3/test_sqlite_storage.cpp @@ -72,7 +72,7 @@ TEST_F(StorageTestFixture, string_messages_are_written_and_read_to_and_from_sqli ASSERT_THAT(read_messages, SizeIs(3)); for (size_t i = 0; i < 3; i++) { EXPECT_THAT(deserialize_message(read_messages[i]->serialized_data), Eq(string_messages[i])); - EXPECT_THAT(read_messages[i]->time_stamp, Eq(std::get<1>(messages[i]))); + EXPECT_THAT(read_messages[i]->receive_time_stamp, Eq(std::get<1>(messages[i]))); EXPECT_THAT(read_messages[i]->topic_name, Eq(topics[i])); } } @@ -112,10 +112,10 @@ TEST_F(StorageTestFixture, get_next_returns_messages_in_timestamp_order) { EXPECT_TRUE(readable_storage->has_next()); auto first_message = readable_storage->read_next(); - EXPECT_THAT(first_message->time_stamp, Eq(2)); + EXPECT_THAT(first_message->receive_time_stamp, Eq(2)); EXPECT_TRUE(readable_storage->has_next()); auto second_message = readable_storage->read_next(); - EXPECT_THAT(second_message->time_stamp, Eq(6)); + EXPECT_THAT(second_message->receive_time_stamp, Eq(6)); EXPECT_FALSE(readable_storage->has_next()); } @@ -335,7 +335,7 @@ TEST_F(StorageTestFixture, messages_readable_for_prefoxy_db_schema) { ASSERT_THAT(read_messages, SizeIs(3)); for (size_t i = 0; i < 3; i++) { EXPECT_THAT(deserialize_message(read_messages[i]->serialized_data), Eq(string_messages[i])); - EXPECT_THAT(read_messages[i]->time_stamp, Eq(std::get<1>(messages[i]))); + EXPECT_THAT(read_messages[i]->receive_time_stamp, Eq(std::get<1>(messages[i]))); EXPECT_THAT(read_messages[i]->topic_name, Eq(topics[i])); } } diff --git a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_api.cpp b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_api.cpp index 8ec6de1ce0..eff6c529ac 100644 --- a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_api.cpp +++ b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_api.cpp @@ -61,7 +61,7 @@ TEST_P(TestRosbag2CPPAPI, minimal_writer_example) writer.open(storage_options); auto bag_message = std::make_shared(); - auto ret = rcutils_system_time_now(&bag_message->time_stamp); + auto ret = rcutils_system_time_now(&bag_message->receive_time_stamp); if (ret != RCL_RET_OK) { FAIL() << "couldn't assign time rosbag message"; } diff --git a/rosbag2_transport/src/rosbag2_transport/bag_rewrite.cpp b/rosbag2_transport/src/rosbag2_transport/bag_rewrite.cpp index 0ea9771b10..faff6e1d1e 100644 --- a/rosbag2_transport/src/rosbag2_transport/bag_rewrite.cpp +++ b/rosbag2_transport/src/rosbag2_transport/bag_rewrite.cpp @@ -53,7 +53,7 @@ std::shared_ptr get_next( if (msg == nullptr) { continue; } - if (earliest_msg == nullptr || msg->time_stamp < earliest_msg->time_stamp) { + if (earliest_msg == nullptr || msg->receive_time_stamp < earliest_msg->receive_time_stamp) { earliest_msg = msg; earliest_msg_index = i; } diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index 83f3f98d05..585369a234 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -642,10 +642,10 @@ bool PlayerImpl::play_next() bool next_message_published = false; while (rclcpp::ok() && !next_message_published && !stop_playback_ && - message_ptr != nullptr && !shall_stop_at_timestamp(message_ptr->time_stamp)) + message_ptr != nullptr && !shall_stop_at_timestamp(message_ptr->receive_time_stamp)) { next_message_published = publish_message(message_ptr); - clock_->jump(message_ptr->time_stamp); + clock_->jump(message_ptr->receive_time_stamp); message_queue_.pop(); message_ptr = peek_next_message_from_queue(); @@ -847,11 +847,11 @@ void PlayerImpl::play_messages_from_queue() ready_to_play_from_queue_cv_.notify_all(); } while (rclcpp::ok() && !stop_playback_ && - message_ptr != nullptr && !shall_stop_at_timestamp(message_ptr->time_stamp)) + message_ptr != nullptr && !shall_stop_at_timestamp(message_ptr->receive_time_stamp)) { // Do not move on until sleep_until returns true // It will always sleep, so this is not a tight busy loop on pause - while (rclcpp::ok() && !clock_->sleep_until(message_ptr->time_stamp)) { + while (rclcpp::ok() && !clock_->sleep_until(message_ptr->receive_time_stamp)) { if (std::atomic_exchange(&cancel_wait_for_next_message_, false)) { break; } diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index 3c9aae665f..016eda203c 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -539,7 +539,8 @@ RecorderImpl::create_subscription( if (!paused_.load()) { writer_->write( message, topic_name, topic_type, - rclcpp::Time(mi.get_rmw_message_info().received_timestamp)); + rclcpp::Time(mi.get_rmw_message_info().received_timestamp), + rclcpp::Time(mi.get_rmw_message_info().source_timestamp)); } }); } diff --git a/rosbag2_transport/test/rosbag2_transport/mock_sequential_reader.hpp b/rosbag2_transport/test/rosbag2_transport/mock_sequential_reader.hpp index 46ac168c01..37ea88f400 100644 --- a/rosbag2_transport/test/rosbag2_transport/mock_sequential_reader.hpp +++ b/rosbag2_transport/test/rosbag2_transport/mock_sequential_reader.hpp @@ -121,7 +121,7 @@ class MockSequentialReader : public rosbag2_cpp::reader_interfaces::BaseReaderIn metadata_.message_count = messages.size(); if (!messages.empty()) { const auto message_timestamp = std::chrono::time_point( - std::chrono::nanoseconds(messages[0]->time_stamp)); + std::chrono::nanoseconds(messages[0]->receive_time_stamp)); metadata_.starting_time = message_timestamp; } messages_ = std::move(messages); diff --git a/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp b/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp index 9e75cc84fa..69bd31554e 100644 --- a/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp +++ b/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp @@ -60,7 +60,7 @@ class Rosbag2TransportTestFixture : public Test { auto bag_msg = std::make_shared(); bag_msg->serialized_data = memory_management_.serialize_message(message); - bag_msg->time_stamp = milliseconds * 1000000; + bag_msg->receive_time_stamp = milliseconds * 1000000; bag_msg->topic_name = topic; return bag_msg; diff --git a/rosbag2_transport/test/rosbag2_transport/test_burst.cpp b/rosbag2_transport/test/rosbag2_transport/test_burst.cpp index 0d53e32aae..45a95f25cd 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_burst.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_burst.cpp @@ -249,7 +249,7 @@ TEST_F(RosBag2PlayTestFixture, play_respect_messages_timing_after_burst) { auto replay_time = std::chrono::steady_clock::now() - start; auto expected_replay_time = - std::chrono::nanoseconds(messages.back()->time_stamp - messages[1]->time_stamp); + std::chrono::nanoseconds(messages.back()->receive_time_stamp - messages[1]->receive_time_stamp); // Check for lower bound with some tolerance ASSERT_THAT(replay_time, Gt(expected_replay_time - std::chrono::milliseconds(50))); // Check for upper bound with some tolerance diff --git a/rosbag2_transport/test/rosbag2_transport/test_keyboard_controls.cpp b/rosbag2_transport/test/rosbag2_transport/test_keyboard_controls.cpp index 8d3f8fd605..61c4f5ef93 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_keyboard_controls.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_keyboard_controls.cpp @@ -103,8 +103,9 @@ TEST_F(RosBag2PlayTestFixture, invalid_keybindings) {serialize_test_message("topic1", 0, primitive_message1), serialize_test_message("topic1", 0, primitive_message2)}; - messages[0]->time_stamp = 100; - messages[1]->time_stamp = messages[0]->time_stamp + message_time_difference.nanoseconds(); + messages[0]->receive_time_stamp = 100; + messages[1]->receive_time_stamp = messages[0]->receive_time_stamp + + message_time_difference.nanoseconds(); play_options_.play_next_key = KeyboardHandler::KeyCode::UNKNOWN; @@ -138,9 +139,11 @@ TEST_F(RosBag2PlayTestFixture, test_keyboard_controls) serialize_test_message("topic1", 0, primitive_message2), serialize_test_message("topic1", 0, primitive_message3)}; - messages[0]->time_stamp = 100; - messages[1]->time_stamp = messages[0]->time_stamp + message_time_difference.nanoseconds(); - messages[2]->time_stamp = messages[1]->time_stamp + message_time_difference.nanoseconds(); + messages[0]->receive_time_stamp = 100; + messages[1]->receive_time_stamp = messages[0]->receive_time_stamp + + message_time_difference.nanoseconds(); + messages[2]->receive_time_stamp = messages[1]->receive_time_stamp + + message_time_difference.nanoseconds(); auto prepared_mock_reader = std::make_unique(); prepared_mock_reader->prepare(messages, topics_and_types); diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_next.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_next.cpp index 12886e8fc0..70c58467b3 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_next.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_next.cpp @@ -202,7 +202,7 @@ TEST_F(RosBag2PlayTestFixture, play_respect_messages_timing_after_play_next) { auto replay_time = std::chrono::steady_clock::now() - start; auto expected_replay_time = - std::chrono::nanoseconds(messages.back()->time_stamp - messages[1]->time_stamp); + std::chrono::nanoseconds(messages.back()->receive_time_stamp - messages[1]->receive_time_stamp); // Check for lower bound with some tolerance ASSERT_THAT(replay_time, Gt(expected_replay_time - std::chrono::milliseconds(50))); // Check for upper bound with some tolerance diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_timing.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_timing.cpp index befdc2defe..dec61187b6 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_timing.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_timing.cpp @@ -55,8 +55,9 @@ class PlayerTestFixture : public Rosbag2TransportTestFixture serialize_test_message("topic1", 0, primitive_message2) }; - messages[0]->time_stamp = 100; - messages[1]->time_stamp = messages[0]->time_stamp + message_time_difference.nanoseconds(); + messages[0]->receive_time_stamp = 100; + messages[1]->receive_time_stamp = messages[0]->receive_time_stamp + + message_time_difference.nanoseconds(); prepared_mock_reader = std::make_unique(); prepared_mock_reader->prepare(messages, topics_and_types); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp index 0d9241aff2..002cdc2bc8 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp @@ -135,5 +135,5 @@ TEST_F(RecordIntegrationTestFixture, record_all_with_sim_time) } } // check that the timestamp is same as the clock message - EXPECT_THAT(string_messages[0]->time_stamp, time_value); + EXPECT_THAT(string_messages[0]->receive_time_stamp, time_value); }