Skip to content

Commit

Permalink
feat(SerializedBagMessage): Added publish_time_stamp to and renamed t…
Browse files Browse the repository at this point in the history
…ime_stamp to receive_time_stamp

Signed-off-by: Janosch Machowinski <J.Machowinski@cellumation.com>
  • Loading branch information
Janosch Machowinski authored and Janosch Machowinski committed Jan 31, 2024
1 parent a2f06c8 commit aa55430
Show file tree
Hide file tree
Showing 27 changed files with 103 additions and 53 deletions.
Expand Up @@ -360,7 +360,7 @@ SequentialCompressionWriter::compress_message(
std::shared_ptr<const rosbag2_storage::SerializedBagMessage> message)
{
auto compressed_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
compressed_message->time_stamp = message->time_stamp;
compressed_message->receive_time_stamp = message->receive_time_stamp;
compressed_message->topic_name = message->topic_name;
compressor.compress_serialized_bag_message(message.get(), compressed_message.get());
return compressed_message;
Expand Down
20 changes: 20 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/writer.hpp
Expand Up @@ -199,6 +199,26 @@ class ROSBAG2_CPP_PUBLIC Writer
const std::string & type_name,
const rclcpp::Time & time);

/**
* Write a serialized message to a bagfile.
* The topic will be created if it has not been created already.
*
* \warning after calling this function, the serialized data will no longer be managed by message.
*
* \param message rclcpp::SerializedMessage The serialized message to be written to the bagfile
* \param topic_name the string of the topic this messages belongs to
* \param type_name the string of the type associated with this message
* \param recv_time The time stamp when the message was received
* \param send_time The time stamp when the message was send
* \throws runtime_error if the Writer is not open.
*/
void write(
std::shared_ptr<const rclcpp::SerializedMessage> message,
const std::string & topic_name,
const std::string & type_name,
const rclcpp::Time & recv_time,
const rclcpp::Time & send_time);

/**
* Write a non-serialized message to a bagfile.
* The topic will be created if it has not been created already.
Expand Down
4 changes: 2 additions & 2 deletions rosbag2_cpp/src/rosbag2_cpp/converter.cpp
Expand Up @@ -75,13 +75,13 @@ std::shared_ptr<rosbag2_storage::SerializedBagMessage> Converter::convert(
// deserialize
rosbag2_cpp::introspection_message_set_topic_name(
allocated_ros_message.get(), message->topic_name.c_str());
allocated_ros_message->time_stamp = message->time_stamp;
allocated_ros_message->time_stamp = message->receive_time_stamp;
input_converter_->deserialize(message, introspection_ts, allocated_ros_message);

// re-serialize
output_message->serialized_data = rosbag2_storage::make_empty_serialized_message(0);
output_message->topic_name = std::string(allocated_ros_message->topic_name);
output_message->time_stamp = allocated_ros_message->time_stamp;
output_message->receive_time_stamp = allocated_ros_message->time_stamp;
output_converter_->serialize(allocated_ros_message, introspection_ts, output_message);
return output_message;
}
Expand Down
16 changes: 14 additions & 2 deletions rosbag2_cpp/src/rosbag2_cpp/writer.cpp
Expand Up @@ -135,7 +135,8 @@ void Writer::write(
{
auto serialized_bag_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
serialized_bag_message->topic_name = topic_name;
serialized_bag_message->time_stamp = time.nanoseconds();
serialized_bag_message->receive_time_stamp = time.nanoseconds();
serialized_bag_message->publish_time_stamp = time.nanoseconds();

serialized_bag_message->serialized_data = std::shared_ptr<rcutils_uint8_array_t>(
new rcutils_uint8_array_t,
Expand Down Expand Up @@ -182,10 +183,21 @@ void Writer::write(
const std::string & topic_name,
const std::string & type_name,
const rclcpp::Time & time)
{
write(message, topic_name, type_name, time, time);
}

void Writer::write(
std::shared_ptr<const rclcpp::SerializedMessage> message,
const std::string & topic_name,
const std::string & type_name,
const rclcpp::Time & recv_time,
const rclcpp::Time & send_time)
{
auto serialized_bag_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
serialized_bag_message->topic_name = topic_name;
serialized_bag_message->time_stamp = time.nanoseconds();
serialized_bag_message->receive_time_stamp = recv_time.nanoseconds();
serialized_bag_message->publish_time_stamp = send_time.nanoseconds();
// point to actual data and keep reference to original message to avoid premature releasing
serialized_bag_message->serialized_data = std::shared_ptr<rcutils_uint8_array_t>(
new rcutils_uint8_array_t(message->get_rcl_serialized_message()),
Expand Down
4 changes: 2 additions & 2 deletions rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp
Expand Up @@ -349,7 +349,7 @@ void SequentialWriter::write(std::shared_ptr<const rosbag2_storage::SerializedBa
throw std::runtime_error("Bag is not open. Call open() before writing.");
}

if (!message_within_accepted_time_range(message->time_stamp)) {
if (!message_within_accepted_time_range(message->receive_time_stamp)) {
return;
}

Expand All @@ -365,7 +365,7 @@ void SequentialWriter::write(std::shared_ptr<const rosbag2_storage::SerializedBa
}

const auto message_timestamp = std::chrono::time_point<std::chrono::high_resolution_clock>(
std::chrono::nanoseconds(message->time_stamp));
std::chrono::nanoseconds(message->receive_time_stamp));

if (is_first_message_) {
// Update bagfile starting time
Expand Down
2 changes: 1 addition & 1 deletion rosbag2_cpp/test/rosbag2_cpp/fake_data.cpp
Expand Up @@ -65,7 +65,7 @@ void write_sample_split_bag(

auto msg = std::make_shared<rosbag2_storage::SerializedBagMessage>();
msg->serialized_data = rosbag2_storage::make_serialized_message(&value, sizeof(value));
msg->time_stamp = time_stamp;
msg->receive_time_stamp = time_stamp;
msg->topic_name = topic_name;
writer.write(msg);
}
Expand Down
2 changes: 1 addition & 1 deletion rosbag2_cpp/test/rosbag2_cpp/test_sequential_reader.cpp
Expand Up @@ -309,7 +309,7 @@ class ReadOrderTest : public ParametrizedTemporaryDirectoryFixture
// Check both timestamp and value to uniquely identify messages in expected order
ASSERT_TRUE(reader.has_next());
auto next = reader.read_next();
EXPECT_EQ(next->time_stamp, expect_timestamp);
EXPECT_EQ(next->receive_time_stamp, expect_timestamp);

ASSERT_EQ(next->serialized_data->buffer_length, 4u);
uint32_t value = *reinterpret_cast<uint32_t *>(next->serialized_data->buffer);
Expand Down
Expand Up @@ -113,7 +113,7 @@ void WriterBenchmark::start_benchmark()
get_logger(), "Error getting current time. Error:" <<
rcutils_get_error_string().str);
}
message->time_stamp = time_stamp;
message->receive_time_stamp = time_stamp;
message->topic_name = queue->topic_name();

try {
Expand Down
2 changes: 1 addition & 1 deletion rosbag2_py/src/rosbag2_py/_reader.cpp
Expand Up @@ -52,7 +52,7 @@ class Reader : public rosbag2_cpp::Reader
std::string serialized_data(rcutils_data.buffer,
rcutils_data.buffer + rcutils_data.buffer_length);
return pybind11::make_tuple(
next->topic_name, pybind11::bytes(serialized_data), next->time_stamp);
next->topic_name, pybind11::bytes(serialized_data), next->receive_time_stamp);
}
};

Expand Down
2 changes: 1 addition & 1 deletion rosbag2_py/src/rosbag2_py/_writer.cpp
Expand Up @@ -54,7 +54,7 @@ class Writer : public rosbag2_cpp::Writer
bag_message->topic_name = topic_name;
bag_message->serialized_data =
rosbag2_storage::make_serialized_message(message.c_str(), message.length());
bag_message->time_stamp = time_stamp;
bag_message->receive_time_stamp = time_stamp;

rosbag2_cpp::Writer::write(bag_message);
}
Expand Down
Expand Up @@ -27,7 +27,15 @@ namespace rosbag2_storage
struct SerializedBagMessage
{
std::shared_ptr<rcutils_uint8_array_t> serialized_data;
rcutils_time_point_value_t time_stamp;
/**
* @brief Nanosecond timestamp when this message was received.
*/
rcutils_time_point_value_t receive_time_stamp;
/**
* @brief Nanosecond timestamp when this message was initially published. If
* not available, this will be set to time_stamp.
*/
rcutils_time_point_value_t publish_time_stamp;
std::string topic_name;
};

Expand Down
17 changes: 9 additions & 8 deletions rosbag2_storage_mcap/src/mcap_storage.cpp
Expand Up @@ -512,7 +512,8 @@ bool MCAPStorage::read_and_enqueue_message()
const auto & messageView = *it;
auto msg = std::make_shared<rosbag2_storage::SerializedBagMessage>();
last_enqueued_message_offset_ = messageView.messageOffset;
msg->time_stamp = rcutils_time_point_value_t(messageView.message.logTime);
msg->receive_time_stamp = rcutils_time_point_value_t(messageView.message.logTime);
msg->publish_time_stamp = rcutils_time_point_value_t(messageView.message.publishTime);
msg->topic_name = messageView.channel->topic;
msg->serialized_data = rosbag2_storage::make_serialized_message(messageView.message.data,
messageView.message.dataSize);
Expand Down Expand Up @@ -583,7 +584,7 @@ bool MCAPStorage::enqueued_message_is_already_read()
if (next_ == nullptr) {
return false;
}
if (last_read_time_point_ != next_->time_stamp) {
if (last_read_time_point_ != next_->receive_time_stamp) {
return false;
}
if (read_order_ == mcap::ReadMessageOptions::ReadOrder::ReverseLogTimeOrder) {
Expand Down Expand Up @@ -670,7 +671,7 @@ std::shared_ptr<rosbag2_storage::SerializedBagMessage> MCAPStorage::read_next()
if (!has_next()) {
throw std::runtime_error{"No next message is available."};
}
last_read_time_point_ = next_->time_stamp;
last_read_time_point_ = next_->receive_time_stamp;
last_read_message_offset_ = last_enqueued_message_offset_;
// Importantly, clear next_ via move so that a next message can be read.
return std::move(next_);
Expand Down Expand Up @@ -750,11 +751,11 @@ void MCAPStorage::write(std::shared_ptr<const rosbag2_storage::SerializedBagMess
mcap::Message mcap_msg;
mcap_msg.channelId = channel_it->second;
mcap_msg.sequence = 0;
if (msg->time_stamp < 0) {
RCUTILS_LOG_WARN_NAMED(LOG_NAME, "Invalid message timestamp %ld", msg->time_stamp);
if (msg->receive_time_stamp < 0) {
RCUTILS_LOG_WARN_NAMED(LOG_NAME, "Invalid message timestamp %ld", msg->receive_time_stamp);
}
mcap_msg.logTime = mcap::Timestamp(msg->time_stamp);
mcap_msg.publishTime = mcap_msg.logTime;
mcap_msg.logTime = mcap::Timestamp(msg->receive_time_stamp);
mcap_msg.publishTime = mcap::Timestamp(msg->publish_time_stamp);
mcap_msg.dataSize = msg->serialized_data->buffer_length;
mcap_msg.data = reinterpret_cast<const std::byte *>(msg->serialized_data->buffer);
const auto status = mcap_writer_->write(mcap_msg);
Expand All @@ -770,7 +771,7 @@ void MCAPStorage::write(std::shared_ptr<const rosbag2_storage::SerializedBagMess
// Increment global message count
metadata_.message_count++;
// Determine recording duration
const auto message_time = time_point(std::chrono::nanoseconds(msg->time_stamp));
const auto message_time = time_point(std::chrono::nanoseconds(msg->receive_time_stamp));
metadata_.duration = std::max(metadata_.duration, message_time - metadata_.starting_time);
}

Expand Down
Expand Up @@ -69,7 +69,7 @@ class McapStorageTestFixture : public rosbag2_test_common::TemporaryDirectoryFix
rw_storage->create_topic(topic_metadata, std::get<3>(msg));
auto bag_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
bag_message->serialized_data = make_serialized_message(std::get<0>(msg));
bag_message->time_stamp = std::get<1>(msg);
bag_message->receive_time_stamp = std::get<1>(msg);
bag_message->topic_name = topic_metadata.name;
rw_storage->write(bag_message);
}
Expand Down Expand Up @@ -207,7 +207,7 @@ TEST_F(TemporaryDirectoryFixture, can_write_and_read_basic_mcap_file)
serialized_bag_msg->serialized_data = std::shared_ptr<rcutils_uint8_array_t>(
const_cast<rcutils_uint8_array_t *>(&serialized_msg->get_rcl_serialized_message()),
[](rcutils_uint8_array_t * /* data */) {});
serialized_bag_msg->time_stamp = time_stamp;
serialized_bag_msg->receive_time_stamp = time_stamp;
serialized_bag_msg->topic_name = topic_name;
writer->write(serialized_bag_msg);
}
Expand Down Expand Up @@ -294,7 +294,7 @@ TEST_F(TemporaryDirectoryFixture, can_write_mcap_with_zstd_configured_from_yaml)
serialized_bag_msg->serialized_data = std::shared_ptr<rcutils_uint8_array_t>(
const_cast<rcutils_uint8_array_t *>(&serialized_msg->get_rcl_serialized_message()),
[](rcutils_uint8_array_t * /* data */) {});
serialized_bag_msg->time_stamp = time_stamp;
serialized_bag_msg->receive_time_stamp = time_stamp;
serialized_bag_msg->topic_name = topic_name;
writer->write(serialized_bag_msg);
writer->write(serialized_bag_msg);
Expand Down
Expand Up @@ -288,7 +288,9 @@ void SqliteStorage::write_locked(
}

try {
write_statement_->bind(message->time_stamp, topic_entry->second, message->serialized_data);
write_statement_->bind(
message->receive_time_stamp, topic_entry->second,
message->serialized_data);
} catch (const SqliteException & exc) {
if (SQLITE_TOOBIG == exc.get_sqlite_return_code()) {
// Get the sqlite string/blob limit.
Expand Down Expand Up @@ -361,12 +363,12 @@ std::shared_ptr<rosbag2_storage::SerializedBagMessage> SqliteStorage::read_next(

auto bag_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
bag_message->serialized_data = std::get<0>(*current_message_row_);
bag_message->time_stamp = std::get<1>(*current_message_row_);
bag_message->receive_time_stamp = std::get<1>(*current_message_row_);
bag_message->topic_name = std::get<2>(*current_message_row_);

// set start time to current time
// and set seek_row_id to the new row id up
seek_time_ = bag_message->time_stamp;
seek_time_ = bag_message->receive_time_stamp;
seek_row_id_ = std::get<3>(*current_message_row_) + (read_order_.reverse ? -1 : 1);

++current_message_row_;
Expand Down
Expand Up @@ -113,7 +113,7 @@ class StorageTestFixture : public TemporaryDirectoryFixture
rw_storage.create_topic({topic_name, type_name, rmw_format, {}, ""}, {});
auto bag_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
bag_message->serialized_data = make_serialized_message(std::get<0>(msg));
bag_message->time_stamp = std::get<1>(msg);
bag_message->receive_time_stamp = std::get<1>(msg);
bag_message->topic_name = topic_name;
rw_storage.write(bag_message);
}
Expand Down Expand Up @@ -189,14 +189,16 @@ class StorageTestFixture : public TemporaryDirectoryFixture
// Prepare rosbag2 serialized message to write
auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->serialized_data = make_serialized_message(std::get<0>(msg));
message->time_stamp = std::get<1>(msg);
message->receive_time_stamp = std::get<1>(msg);
message->topic_name = topic_name;
// Write message to DB
auto topic_entry = topics.find(topic_name);
if (topic_entry == end(topics)) {
throw SqliteException("Topic '" + topic_name + "' has not been created yet!");
}
write_statement->bind(message->time_stamp, topic_entry->second, message->serialized_data);
write_statement->bind(
message->receive_time_stamp, topic_entry->second,
message->serialized_data);
write_statement->execute_and_reset();
}
}
Expand Down
Expand Up @@ -72,7 +72,7 @@ TEST_F(StorageTestFixture, string_messages_are_written_and_read_to_and_from_sqli
ASSERT_THAT(read_messages, SizeIs(3));
for (size_t i = 0; i < 3; i++) {
EXPECT_THAT(deserialize_message(read_messages[i]->serialized_data), Eq(string_messages[i]));
EXPECT_THAT(read_messages[i]->time_stamp, Eq(std::get<1>(messages[i])));
EXPECT_THAT(read_messages[i]->receive_time_stamp, Eq(std::get<1>(messages[i])));
EXPECT_THAT(read_messages[i]->topic_name, Eq(topics[i]));
}
}
Expand Down Expand Up @@ -112,10 +112,10 @@ TEST_F(StorageTestFixture, get_next_returns_messages_in_timestamp_order) {

EXPECT_TRUE(readable_storage->has_next());
auto first_message = readable_storage->read_next();
EXPECT_THAT(first_message->time_stamp, Eq(2));
EXPECT_THAT(first_message->receive_time_stamp, Eq(2));
EXPECT_TRUE(readable_storage->has_next());
auto second_message = readable_storage->read_next();
EXPECT_THAT(second_message->time_stamp, Eq(6));
EXPECT_THAT(second_message->receive_time_stamp, Eq(6));
EXPECT_FALSE(readable_storage->has_next());
}

Expand Down Expand Up @@ -335,7 +335,7 @@ TEST_F(StorageTestFixture, messages_readable_for_prefoxy_db_schema) {
ASSERT_THAT(read_messages, SizeIs(3));
for (size_t i = 0; i < 3; i++) {
EXPECT_THAT(deserialize_message(read_messages[i]->serialized_data), Eq(string_messages[i]));
EXPECT_THAT(read_messages[i]->time_stamp, Eq(std::get<1>(messages[i])));
EXPECT_THAT(read_messages[i]->receive_time_stamp, Eq(std::get<1>(messages[i])));
EXPECT_THAT(read_messages[i]->topic_name, Eq(topics[i]));
}
}
Expand Down
2 changes: 1 addition & 1 deletion rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_api.cpp
Expand Up @@ -61,7 +61,7 @@ TEST_P(TestRosbag2CPPAPI, minimal_writer_example)
writer.open(storage_options);

auto bag_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
auto ret = rcutils_system_time_now(&bag_message->time_stamp);
auto ret = rcutils_system_time_now(&bag_message->receive_time_stamp);
if (ret != RCL_RET_OK) {
FAIL() << "couldn't assign time rosbag message";
}
Expand Down
2 changes: 1 addition & 1 deletion rosbag2_transport/src/rosbag2_transport/bag_rewrite.cpp
Expand Up @@ -53,7 +53,7 @@ std::shared_ptr<rosbag2_storage::SerializedBagMessage> get_next(
if (msg == nullptr) {
continue;
}
if (earliest_msg == nullptr || msg->time_stamp < earliest_msg->time_stamp) {
if (earliest_msg == nullptr || msg->receive_time_stamp < earliest_msg->receive_time_stamp) {
earliest_msg = msg;
earliest_msg_index = i;
}
Expand Down
8 changes: 4 additions & 4 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Expand Up @@ -642,10 +642,10 @@ bool PlayerImpl::play_next()

bool next_message_published = false;
while (rclcpp::ok() && !next_message_published && !stop_playback_ &&
message_ptr != nullptr && !shall_stop_at_timestamp(message_ptr->time_stamp))
message_ptr != nullptr && !shall_stop_at_timestamp(message_ptr->receive_time_stamp))
{
next_message_published = publish_message(message_ptr);
clock_->jump(message_ptr->time_stamp);
clock_->jump(message_ptr->receive_time_stamp);

message_queue_.pop();
message_ptr = peek_next_message_from_queue();
Expand Down Expand Up @@ -847,11 +847,11 @@ void PlayerImpl::play_messages_from_queue()
ready_to_play_from_queue_cv_.notify_all();
}
while (rclcpp::ok() && !stop_playback_ &&
message_ptr != nullptr && !shall_stop_at_timestamp(message_ptr->time_stamp))
message_ptr != nullptr && !shall_stop_at_timestamp(message_ptr->receive_time_stamp))
{
// Do not move on until sleep_until returns true
// It will always sleep, so this is not a tight busy loop on pause
while (rclcpp::ok() && !clock_->sleep_until(message_ptr->time_stamp)) {
while (rclcpp::ok() && !clock_->sleep_until(message_ptr->receive_time_stamp)) {
if (std::atomic_exchange(&cancel_wait_for_next_message_, false)) {
break;
}
Expand Down
3 changes: 2 additions & 1 deletion rosbag2_transport/src/rosbag2_transport/recorder.cpp
Expand Up @@ -539,7 +539,8 @@ RecorderImpl::create_subscription(
if (!paused_.load()) {
writer_->write(
message, topic_name, topic_type,
rclcpp::Time(mi.get_rmw_message_info().received_timestamp));
rclcpp::Time(mi.get_rmw_message_info().received_timestamp),
rclcpp::Time(mi.get_rmw_message_info().source_timestamp));
}
});
}
Expand Down
Expand Up @@ -121,7 +121,7 @@ class MockSequentialReader : public rosbag2_cpp::reader_interfaces::BaseReaderIn
metadata_.message_count = messages.size();
if (!messages.empty()) {
const auto message_timestamp = std::chrono::time_point<std::chrono::high_resolution_clock>(
std::chrono::nanoseconds(messages[0]->time_stamp));
std::chrono::nanoseconds(messages[0]->receive_time_stamp));
metadata_.starting_time = message_timestamp;
}
messages_ = std::move(messages);
Expand Down

0 comments on commit aa55430

Please sign in to comment.