Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use middleware send and receive timestamps from message_info during recording #1531

Merged
merged 9 commits into from
Apr 11, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,8 @@ SequentialCompressionWriter::compress_message(
std::shared_ptr<const rosbag2_storage::SerializedBagMessage> message)
{
auto compressed_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
compressed_message->time_stamp = message->time_stamp;
compressed_message->recv_timestamp = message->recv_timestamp;
compressed_message->send_timestamp = message->send_timestamp;
compressed_message->topic_name = message->topic_name;
compressor.compress_serialized_bag_message(message.get(), compressed_message.get());
return compressed_message;
Expand Down
19 changes: 19 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,25 @@ class ROSBAG2_CPP_PUBLIC Writer
const std::string & type_name,
const rclcpp::Time & time);

/**
* Write a serialized message to a bagfile.
* The topic will be created if it has not been created already.
*
* \warning after calling this function, the serialized data will no longer be managed by message.
*
* \param message rclcpp::SerializedMessage The serialized message to be written to the bagfile
* \param topic_name the string of the topic this messages belongs to
* \param type_name the string of the type associated with this message
* \param recv_time The time stamp when the message was received
* \param send_time The time stamp when the message was send
* \throws runtime_error if the Writer is not open.
*/
void write(
std::shared_ptr<const rclcpp::SerializedMessage> message,
const std::string & topic_name,
const std::string & type_name,
const rcutils_time_point_value_t & recv_timestamp,
const rcutils_time_point_value_t & send_timestamp);
/**
* Write a non-serialized message to a bagfile.
* The topic will be created if it has not been created already.
Expand Down
5 changes: 3 additions & 2 deletions rosbag2_cpp/src/rosbag2_cpp/converter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,14 @@ std::shared_ptr<rosbag2_storage::SerializedBagMessage> Converter::convert(
// deserialize
rosbag2_cpp::introspection_message_set_topic_name(
allocated_ros_message.get(), message->topic_name.c_str());
allocated_ros_message->time_stamp = message->time_stamp;
allocated_ros_message->time_stamp = message->recv_timestamp;
input_converter_->deserialize(message, introspection_ts, allocated_ros_message);

// re-serialize
output_message->serialized_data = rosbag2_storage::make_empty_serialized_message(0);
output_message->topic_name = std::string(allocated_ros_message->topic_name);
output_message->time_stamp = allocated_ros_message->time_stamp;
output_message->recv_timestamp = message->recv_timestamp;
output_message->send_timestamp = message->send_timestamp;
output_converter_->serialize(allocated_ros_message, introspection_ts, output_message);
return output_message;
}
Expand Down
16 changes: 14 additions & 2 deletions rosbag2_cpp/src/rosbag2_cpp/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ void Writer::write(
{
auto serialized_bag_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
serialized_bag_message->topic_name = topic_name;
serialized_bag_message->time_stamp = time.nanoseconds();
serialized_bag_message->recv_timestamp = time.nanoseconds();
serialized_bag_message->send_timestamp = time.nanoseconds();

serialized_bag_message->serialized_data = std::shared_ptr<rcutils_uint8_array_t>(
new rcutils_uint8_array_t,
Expand Down Expand Up @@ -182,10 +183,21 @@ void Writer::write(
const std::string & topic_name,
const std::string & type_name,
const rclcpp::Time & time)
{
write(message, topic_name, type_name, time.nanoseconds(), time.nanoseconds());
}

void Writer::write(
std::shared_ptr<const rclcpp::SerializedMessage> message,
const std::string & topic_name,
const std::string & type_name,
const rcutils_time_point_value_t & recv_timestamp,
const rcutils_time_point_value_t & send_timestamp)
{
auto serialized_bag_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
serialized_bag_message->topic_name = topic_name;
serialized_bag_message->time_stamp = time.nanoseconds();
serialized_bag_message->recv_timestamp = recv_timestamp;
serialized_bag_message->send_timestamp = send_timestamp;
// point to actual data and keep reference to original message to avoid premature releasing
serialized_bag_message->serialized_data = std::shared_ptr<rcutils_uint8_array_t>(
new rcutils_uint8_array_t(message->get_rcl_serialized_message()),
Expand Down
4 changes: 2 additions & 2 deletions rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ void SequentialWriter::write(std::shared_ptr<const rosbag2_storage::SerializedBa
throw std::runtime_error("Bag is not open. Call open() before writing.");
}

if (!message_within_accepted_time_range(message->time_stamp)) {
if (!message_within_accepted_time_range(message->recv_timestamp)) {
return;
}

Expand All @@ -368,7 +368,7 @@ void SequentialWriter::write(std::shared_ptr<const rosbag2_storage::SerializedBa
}

const auto message_timestamp = std::chrono::time_point<std::chrono::high_resolution_clock>(
std::chrono::nanoseconds(message->time_stamp));
std::chrono::nanoseconds(message->recv_timestamp));

if (is_first_message_) {
// Update bagfile starting time
Expand Down
3 changes: 2 additions & 1 deletion rosbag2_cpp/test/rosbag2_cpp/fake_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ void write_sample_split_bag(

auto msg = std::make_shared<rosbag2_storage::SerializedBagMessage>();
msg->serialized_data = rosbag2_storage::make_serialized_message(&value, sizeof(value));
msg->time_stamp = time_stamp;
msg->recv_timestamp = time_stamp;
msg->send_timestamp = time_stamp;
msg->topic_name = topic_name;
writer.write(msg);
}
Expand Down
2 changes: 1 addition & 1 deletion rosbag2_cpp/test/rosbag2_cpp/test_sequential_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ class ReadOrderTest : public ParametrizedTemporaryDirectoryFixture
// Check both timestamp and value to uniquely identify messages in expected order
ASSERT_TRUE(reader.has_next());
auto next = reader.read_next();
EXPECT_EQ(next->time_stamp, expect_timestamp);
EXPECT_EQ(next->recv_timestamp, expect_timestamp);

ASSERT_EQ(next->serialized_data->buffer_length, 4u);
uint32_t value = *reinterpret_cast<uint32_t *>(next->serialized_data->buffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ void WriterBenchmark::start_benchmark()
get_logger(), "Error getting current time. Error:" <<
rcutils_get_error_string().str);
}
message->time_stamp = time_stamp;
message->recv_timestamp = time_stamp;
message->topic_name = queue->topic_name();

try {
Expand Down
7 changes: 7 additions & 0 deletions rosbag2_py/rosbag2_py/_writer.pyi
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from typing import overload
import rosbag2_py._compression_options
import rosbag2_py._storage

Expand All @@ -8,7 +9,10 @@ class SequentialCompressionWriter:
def remove_topic(self, arg0: rosbag2_py._storage.TopicMetadata) -> None: ...
def split_bagfile(self) -> None: ...
def take_snapshot(self) -> bool: ...
@overload
def write(self, arg0: str, arg1: str, arg2: int) -> None: ...
@overload
def write(self, arg0: str, arg1: str, arg2: int, arg3: int) -> None: ...

class SequentialWriter:
def __init__(self) -> None: ...
Expand All @@ -18,7 +22,10 @@ class SequentialWriter:
def remove_topic(self, arg0: rosbag2_py._storage.TopicMetadata) -> None: ...
def split_bagfile(self) -> None: ...
def take_snapshot(self) -> bool: ...
@overload
def write(self, arg0: str, arg1: str, arg2: int) -> None: ...
@overload
def write(self, arg0: str, arg1: str, arg2: int, arg3: int) -> None: ...

def get_registered_compressors() -> Set[str]: ...
def get_registered_serializers() -> Set[str]: ...
Expand Down
2 changes: 1 addition & 1 deletion rosbag2_py/src/rosbag2_py/_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class Reader : public rosbag2_cpp::Reader
std::string serialized_data(rcutils_data.buffer,
rcutils_data.buffer + rcutils_data.buffer_length);
return pybind11::make_tuple(
next->topic_name, pybind11::bytes(serialized_data), next->time_stamp);
next->topic_name, pybind11::bytes(serialized_data), next->recv_timestamp);
}
};

Expand Down
26 changes: 23 additions & 3 deletions rosbag2_py/src/rosbag2_py/_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,23 @@ class Writer : public rosbag2_cpp::Writer
void write(
const std::string & topic_name, const std::string & message,
const rcutils_time_point_value_t & time_stamp)
{
write(topic_name, message, time_stamp, time_stamp);
}

void write(
const std::string & topic_name, const std::string & message,
const rcutils_time_point_value_t & recv_timestamp,
const rcutils_time_point_value_t & send_timestamp)
{
auto bag_message =
std::make_shared<rosbag2_storage::SerializedBagMessage>();

bag_message->topic_name = topic_name;
bag_message->serialized_data =
rosbag2_storage::make_serialized_message(message.c_str(), message.length());
bag_message->time_stamp = time_stamp;
bag_message->recv_timestamp = recv_timestamp;
bag_message->send_timestamp = send_timestamp;

rosbag2_cpp::Writer::write(bag_message);
}
Expand Down Expand Up @@ -97,7 +106,12 @@ PYBIND11_MODULE(_writer, m) {
pybind11::overload_cast<
const rosbag2_storage::StorageOptions &, const rosbag2_cpp::ConverterOptions &
>(&PyWriter::open))
.def("write", &PyWriter::write)
.def(
"write", pybind11::overload_cast<const std::string &, const std::string &,
const rcutils_time_point_value_t &>(&PyWriter::write))
.def(
"write", pybind11::overload_cast<const std::string &, const std::string &,
const rcutils_time_point_value_t &, const rcutils_time_point_value_t &>(&PyWriter::write))
.def("close", &PyWriter::close)
.def("remove_topic", &PyWriter::remove_topic)
.def(
Expand All @@ -114,7 +128,13 @@ PYBIND11_MODULE(_writer, m) {
pybind11::overload_cast<
const rosbag2_storage::StorageOptions &, const rosbag2_cpp::ConverterOptions &
>(&PyCompressionWriter::open))
.def("write", &PyCompressionWriter::write)
.def(
"write", pybind11::overload_cast<const std::string &, const std::string &,
const rcutils_time_point_value_t &>(&PyCompressionWriter::write))
.def(
"write", pybind11::overload_cast<const std::string &, const std::string &,
const rcutils_time_point_value_t &,
const rcutils_time_point_value_t &>(&PyCompressionWriter::write))
.def("remove_topic", &PyCompressionWriter::remove_topic)
.def(
"create_topic",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,15 @@ namespace rosbag2_storage
struct SerializedBagMessage
{
std::shared_ptr<rcutils_uint8_array_t> serialized_data;
rcutils_time_point_value_t time_stamp;
/**
* @brief Nanosecond timestamp when this message was received.
*/
rcutils_time_point_value_t recv_timestamp;
/**
* @brief Nanosecond timestamp when this message was initially published. If
* not available, this will be set to recv_timestamp.
*/
rcutils_time_point_value_t send_timestamp;
std::string topic_name;
};

Expand Down
17 changes: 9 additions & 8 deletions rosbag2_storage_mcap/src/mcap_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,8 @@ bool MCAPStorage::read_and_enqueue_message()
const auto & messageView = *it;
auto msg = std::make_shared<rosbag2_storage::SerializedBagMessage>();
last_enqueued_message_offset_ = messageView.messageOffset;
msg->time_stamp = rcutils_time_point_value_t(messageView.message.logTime);
msg->recv_timestamp = rcutils_time_point_value_t(messageView.message.logTime);
msg->send_timestamp = rcutils_time_point_value_t(messageView.message.publishTime);
msg->topic_name = messageView.channel->topic;
msg->serialized_data = rosbag2_storage::make_serialized_message(messageView.message.data,
messageView.message.dataSize);
Expand Down Expand Up @@ -590,7 +591,7 @@ bool MCAPStorage::enqueued_message_is_already_read()
if (next_ == nullptr) {
return false;
}
if (last_read_time_point_ != next_->time_stamp) {
if (last_read_time_point_ != next_->recv_timestamp) {
return false;
}
if (read_order_ == mcap::ReadMessageOptions::ReadOrder::ReverseLogTimeOrder) {
Expand Down Expand Up @@ -677,7 +678,7 @@ std::shared_ptr<rosbag2_storage::SerializedBagMessage> MCAPStorage::read_next()
if (!has_next()) {
throw std::runtime_error{"No next message is available."};
}
last_read_time_point_ = next_->time_stamp;
last_read_time_point_ = next_->recv_timestamp;
last_read_message_offset_ = last_enqueued_message_offset_;
// Importantly, clear next_ via move so that a next message can be read.
return std::move(next_);
Expand Down Expand Up @@ -772,11 +773,11 @@ void MCAPStorage::write_lock_free(std::shared_ptr<const rosbag2_storage::Seriali
mcap::Message mcap_msg;
mcap_msg.channelId = channel_it->second;
mcap_msg.sequence = 0;
if (msg->time_stamp < 0) {
RCUTILS_LOG_WARN_NAMED(LOG_NAME, "Invalid message timestamp %ld", msg->time_stamp);
if (msg->recv_timestamp < 0) {
RCUTILS_LOG_WARN_NAMED(LOG_NAME, "Invalid message timestamp %ld", msg->recv_timestamp);
}
mcap_msg.logTime = mcap::Timestamp(msg->time_stamp);
mcap_msg.publishTime = mcap_msg.logTime;
mcap_msg.logTime = mcap::Timestamp(msg->recv_timestamp);
mcap_msg.publishTime = mcap::Timestamp(msg->send_timestamp);
mcap_msg.dataSize = msg->serialized_data->buffer_length;
mcap_msg.data = reinterpret_cast<const std::byte *>(msg->serialized_data->buffer);
const auto status = mcap_writer_->write(mcap_msg);
Expand All @@ -792,7 +793,7 @@ void MCAPStorage::write_lock_free(std::shared_ptr<const rosbag2_storage::Seriali
// Increment global message count
metadata_.message_count++;
// Determine recording duration
const auto message_time = time_point(std::chrono::nanoseconds(msg->time_stamp));
const auto message_time = time_point(std::chrono::nanoseconds(msg->recv_timestamp));
metadata_.duration = std::max(metadata_.duration, message_time - metadata_.starting_time);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class McapStorageTestFixture : public rosbag2_test_common::TemporaryDirectoryFix
rw_storage->create_topic(topic_metadata, std::get<3>(msg));
auto bag_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
bag_message->serialized_data = make_serialized_message(std::get<0>(msg));
bag_message->time_stamp = std::get<1>(msg);
bag_message->recv_timestamp = std::get<1>(msg);
bag_message->topic_name = topic_metadata.name;
rw_storage->write(bag_message);
}
Expand Down Expand Up @@ -208,7 +208,7 @@ TEST_F(TemporaryDirectoryFixture, can_write_and_read_basic_mcap_file)
serialized_bag_msg->serialized_data = std::shared_ptr<rcutils_uint8_array_t>(
const_cast<rcutils_uint8_array_t *>(&serialized_msg->get_rcl_serialized_message()),
[](rcutils_uint8_array_t * /* data */) {});
serialized_bag_msg->time_stamp = time_stamp;
serialized_bag_msg->recv_timestamp = time_stamp;
serialized_bag_msg->topic_name = topic_name;
writer->write(serialized_bag_msg);
}
Expand Down Expand Up @@ -295,7 +295,7 @@ TEST_F(TemporaryDirectoryFixture, can_write_mcap_with_zstd_configured_from_yaml)
serialized_bag_msg->serialized_data = std::shared_ptr<rcutils_uint8_array_t>(
const_cast<rcutils_uint8_array_t *>(&serialized_msg->get_rcl_serialized_message()),
[](rcutils_uint8_array_t * /* data */) {});
serialized_bag_msg->time_stamp = time_stamp;
serialized_bag_msg->recv_timestamp = time_stamp;
serialized_bag_msg->topic_name = topic_name;
writer->write(serialized_bag_msg);
writer->write(serialized_bag_msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,9 @@ void SqliteStorage::write_locked(
}

try {
write_statement_->bind(message->time_stamp, topic_entry->second, message->serialized_data);
write_statement_->bind(
message->recv_timestamp, topic_entry->second,
message->serialized_data);
} catch (const SqliteException & exc) {
if (SQLITE_TOOBIG == exc.get_sqlite_return_code()) {
// Get the sqlite string/blob limit.
Expand Down Expand Up @@ -380,12 +382,12 @@ std::shared_ptr<rosbag2_storage::SerializedBagMessage> SqliteStorage::read_next(

auto bag_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
bag_message->serialized_data = std::get<0>(*current_message_row_);
bag_message->time_stamp = std::get<1>(*current_message_row_);
bag_message->recv_timestamp = std::get<1>(*current_message_row_);
bag_message->topic_name = std::get<2>(*current_message_row_);

// set start time to current time
// and set seek_row_id to the new row id up
seek_time_ = bag_message->time_stamp;
seek_time_ = bag_message->recv_timestamp;
seek_row_id_ = std::get<3>(*current_message_row_) + (read_order_.reverse ? -1 : 1);

++current_message_row_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class StorageTestFixture : public TemporaryDirectoryFixture
rw_storage.create_topic({0u, topic_name, type_name, rmw_format, {}, ""}, {});
auto bag_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
bag_message->serialized_data = make_serialized_message(std::get<0>(msg));
bag_message->time_stamp = std::get<1>(msg);
bag_message->recv_timestamp = std::get<1>(msg);
bag_message->topic_name = topic_name;
rw_storage.write(bag_message);
}
Expand Down Expand Up @@ -188,14 +188,16 @@ class StorageTestFixture : public TemporaryDirectoryFixture
// Prepare rosbag2 serialized message to write
auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->serialized_data = make_serialized_message(std::get<0>(msg));
message->time_stamp = std::get<1>(msg);
message->recv_timestamp = std::get<1>(msg);
message->topic_name = topic_name;
// Write message to DB
auto topic_entry = topics.find(topic_name);
if (topic_entry == end(topics)) {
throw SqliteException("Topic '" + topic_name + "' has not been created yet!");
}
write_statement->bind(message->time_stamp, topic_entry->second, message->serialized_data);
write_statement->bind(
message->recv_timestamp, topic_entry->second,
message->serialized_data);
write_statement->execute_and_reset();
}
}
Expand Down