Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Humble backport]: rosbag2_storage_mcap: fix seeking behavior and iteration order #1205

Merged
merged 1 commit into from
Dec 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions mcap_vendor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ endif()
macro(build_mcap_vendor)
include(FetchContent)
fetchcontent_declare(mcap
URL https://github.com/foxglove/mcap/archive/refs/tags/releases/cpp/v0.7.0.tar.gz
URL_HASH SHA1=0fccc7bf49e3d8f4dc05219f537cf7d9fa4adb42)
URL https://github.com/foxglove/mcap/archive/refs/tags/releases/cpp/v0.8.0.tar.gz
URL_HASH SHA1=b44637791da2c9c1cec61a3ba6994f1ef63a228c # v0.8.0
)
fetchcontent_makeavailable(mcap)

fetchcontent_declare(lz4
Expand Down
3 changes: 1 addition & 2 deletions rosbag2_storage_mcap/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ if(BUILD_TESTING)
find_package(ament_cmake_gmock REQUIRED)
find_package(ament_lint_auto REQUIRED)
find_package(rcpputils REQUIRED)
find_package(rosbag2_cpp REQUIRED)
find_package(rosbag2_test_common REQUIRED)
find_package(std_msgs REQUIRED)

Expand All @@ -101,7 +100,7 @@ if(BUILD_TESTING)

ament_add_gmock(test_mcap_storage test/rosbag2_storage_mcap/test_mcap_storage.cpp)
target_link_libraries(test_mcap_storage ${PROJECT_NAME})
ament_target_dependencies(test_mcap_storage rosbag2_storage rosbag2_cpp rosbag2_test_common std_msgs)
ament_target_dependencies(test_mcap_storage rosbag2_storage rosbag2_test_common std_msgs)
target_compile_definitions(test_mcap_storage PRIVATE ${MCAP_COMPILE_DEFS})

ament_add_gmock(test_message_definition_cache test/rosbag2_storage_mcap/test_message_definition_cache.cpp)
Expand Down
1 change: 0 additions & 1 deletion rosbag2_storage_mcap/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<test_depend>rcpputils</test_depend>
<test_depend>rosbag2_cpp</test_depend>
<test_depend>rosbag2_test_common</test_depend>
<test_depend>std_msgs</test_depend>
<test_depend>rosbag2_storage_mcap_testdata</test_depend>
Expand Down
63 changes: 55 additions & 8 deletions rosbag2_storage_mcap/src/mcap_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,10 @@ class MCAPStorage : public rosbag2_storage::storage_interfaces::ReadWriteInterfa
rosbag2_storage::storage_interfaces::IOFlag io_flag,
const std::string & storage_config_uri);

void reset_iterator(rcutils_time_point_value_t start_time = 0);
void reset_iterator();
bool read_and_enqueue_message();
bool enqueued_message_is_already_read();
bool message_indexes_present();
void ensure_summary_read();

std::optional<rosbag2_storage::storage_interfaces::IOFlag> opened_as_;
Expand All @@ -247,6 +249,9 @@ class MCAPStorage : public rosbag2_storage::storage_interfaces::ReadWriteInterfa
rosbag2_storage_mcap::internal::MessageDefinitionCache msgdef_cache_{};

bool has_read_summary_ = false;
rcutils_time_point_value_t last_read_time_point_ = 0;
std::optional<mcap::RecordOffset> last_read_message_offset_;
std::optional<mcap::RecordOffset> last_enqueued_message_offset_;
};

MCAPStorage::MCAPStorage()
Expand Down Expand Up @@ -318,6 +323,7 @@ void MCAPStorage::open_impl(const std::string & uri, const std::string & preset_
if (!status.ok()) {
throw std::runtime_error(status.message);
}
last_read_time_point_ = 0;
reset_iterator();
break;
}
Expand Down Expand Up @@ -437,20 +443,18 @@ bool MCAPStorage::read_and_enqueue_message()
if (!linear_iterator_) {
return false;
}
// Already have popped and queued the next message.
if (next_ != nullptr) {
return true;
}

auto & it = *linear_iterator_;

// At the end of the recording
if (it == linear_view_->end()) {
next_ = nullptr;
return false;
}

const auto & messageView = *it;
auto msg = std::make_shared<rosbag2_storage::SerializedBagMessage>();
last_enqueued_message_offset_ = messageView.messageOffset;
msg->time_stamp = rcutils_time_point_value_t(messageView.message.logTime);
msg->topic_name = messageView.channel->topic;
msg->serialized_data = rosbag2_storage::make_serialized_message(messageView.message.data,
Expand All @@ -463,11 +467,20 @@ bool MCAPStorage::read_and_enqueue_message()
return true;
}

void MCAPStorage::reset_iterator(rcutils_time_point_value_t start_time)
void MCAPStorage::reset_iterator()
{
ensure_summary_read();
mcap::ReadMessageOptions options;
options.startTime = mcap::Timestamp(start_time);
if (read_order_ == mcap::ReadMessageOptions::ReadOrder::ReverseLogTimeOrder) {
options.startTime = 0;
// endTime is an exclusive range endpoint, but we want to start from `last_read_time_point_`.
// therefore, the time range we pass to the MCAP library is one nanosecond later than the
// seek point specified.
options.endTime = mcap::Timestamp(last_read_time_point_ + 1);
} else {
options.startTime = mcap::Timestamp(last_read_time_point_);
options.endTime = mcap::MaxTime;
}
options.readOrder = read_order_;
if (!storage_filter_.topics.empty()) {
options.topicFilter = [this](std::string_view topic) {
Expand All @@ -492,6 +505,34 @@ void MCAPStorage::reset_iterator(rcutils_time_point_value_t start_time)
linear_view_ =
std::make_unique<mcap::LinearMessageView>(mcap_reader_->readMessages(OnProblem, options));
linear_iterator_ = std::make_unique<mcap::LinearMessageView::Iterator>(linear_view_->begin());
if (!read_and_enqueue_message()) {
return;
}
while (enqueued_message_is_already_read()) {
if (!read_and_enqueue_message()) {
return;
}
}
}

bool MCAPStorage::enqueued_message_is_already_read()
{
if (last_read_message_offset_ == std::nullopt) {
return false;
}
if (last_enqueued_message_offset_ == std::nullopt) {
return false;
}
if (next_ == nullptr) {
return false;
}
if (last_read_time_point_ != next_->time_stamp) {
return false;
}
if (read_order_ == mcap::ReadMessageOptions::ReadOrder::ReverseLogTimeOrder) {
return (*last_enqueued_message_offset_ >= *last_read_message_offset_);
}
return (*last_enqueued_message_offset_ <= *last_read_message_offset_);
}

void MCAPStorage::ensure_summary_read()
Expand Down Expand Up @@ -567,6 +608,8 @@ std::shared_ptr<rosbag2_storage::SerializedBagMessage> MCAPStorage::read_next()
if (!has_next()) {
throw std::runtime_error{"No next message is available."};
}
last_read_time_point_ = next_->time_stamp;
last_read_message_offset_ = last_enqueued_message_offset_;
// Importantly, clear next_ via move so that a next message can be read.
return std::move(next_);
}
Expand Down Expand Up @@ -595,7 +638,11 @@ void MCAPStorage::reset_filter()

void MCAPStorage::seek(const rcutils_time_point_value_t & time_stamp)
{
reset_iterator(time_stamp);
if (time_stamp != last_read_time_point_) {
last_read_message_offset_ = std::nullopt;
}
last_read_time_point_ = time_stamp;
reset_iterator();
}

/** ReadWriteInterface **/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,9 @@
#include "rclcpp/serialization.hpp"
#include "rclcpp/serialized_message.hpp"
#include "rcpputils/filesystem_helper.hpp"
#include "rosbag2_cpp/reader.hpp"
#include "rosbag2_cpp/readers/sequential_reader.hpp"
#include "rosbag2_cpp/writer.hpp"
#include "rosbag2_cpp/writers/sequential_writer.hpp"
#include "rosbag2_storage/storage_factory.hpp"
#ifdef ROSBAG2_STORAGE_MCAP_HAS_STORAGE_OPTIONS
#include "rosbag2_storage/storage_options.hpp"
using StorageOptions = rosbag2_storage::StorageOptions;
#else
#include "rosbag2_cpp/storage_options.hpp"
using StorageOptions = rosbag2_cpp::StorageOptions;
#endif
#include "rosbag2_test_common/temporary_directory_fixture.hpp"
#include "std_msgs/msg/string.hpp"
Expand All @@ -40,7 +33,7 @@ using TemporaryDirectoryFixture = rosbag2_test_common::TemporaryDirectoryFixture
TEST_F(TemporaryDirectoryFixture, can_write_and_read_basic_mcap_file)
{
auto uri = rcpputils::fs::path(temporary_dir_path_) / "bag";
auto expected_bag = uri / "bag_0.mcap";
auto expected_bag = rcpputils::fs::path(temporary_dir_path_) / "bag.mcap";
const int64_t timestamp_nanos = 100; // arbitrary value
rcutils_time_point_value_t time_stamp{timestamp_nanos};
const std::string topic_name = "test_topic";
Expand All @@ -51,22 +44,23 @@ TEST_F(TemporaryDirectoryFixture, can_write_and_read_basic_mcap_file)
rclcpp::Serialization<std_msgs::msg::String> serialization;

{
StorageOptions options;
options.uri = uri.string();
options.storage_id = storage_id;
rosbag2_storage::TopicMetadata topic_metadata;
topic_metadata.name = topic_name;
topic_metadata.type = "std_msgs/msg/String";

std_msgs::msg::String msg;
msg.data = message_data;

rosbag2_cpp::Writer writer{std::make_unique<rosbag2_cpp::writers::SequentialWriter>()};
#ifndef ROSBAG2_STORAGE_MCAP_WRITER_CREATES_DIRECTORY
rcpputils::fs::create_directories(uri);
rosbag2_storage::StorageFactory factory;
#ifdef ROSBAG2_STORAGE_MCAP_HAS_STORAGE_OPTIONS
rosbag2_storage::StorageOptions options;
options.uri = uri.string();
options.storage_id = storage_id;
auto writer = factory.open_read_write(options);
#else
auto writer = factory.open_read_write(uri.string(), storage_id);
#endif
writer.open(options, rosbag2_cpp::ConverterOptions{});
writer.create_topic(topic_metadata);
writer->create_topic(topic_metadata);

auto serialized_msg = std::make_shared<rclcpp::SerializedMessage>();
serialization.serialize_message(&msg, serialized_msg.get());
Expand All @@ -81,20 +75,24 @@ TEST_F(TemporaryDirectoryFixture, can_write_and_read_basic_mcap_file)
[](rcutils_uint8_array_t * /* data */) {});
serialized_bag_msg->time_stamp = time_stamp;
serialized_bag_msg->topic_name = topic_name;
writer.write(serialized_bag_msg);
writer->write(serialized_bag_msg);
EXPECT_TRUE(expected_bag.is_regular_file());
}
{
StorageOptions options;
rosbag2_storage::StorageFactory factory;
#ifdef ROSBAG2_STORAGE_MCAP_HAS_STORAGE_OPTIONS
rosbag2_storage::StorageOptions options;
options.uri = expected_bag.string();
options.storage_id = storage_id;

rosbag2_cpp::Reader reader{std::make_unique<rosbag2_cpp::readers::SequentialReader>()};
reader.open(options, rosbag2_cpp::ConverterOptions{});
EXPECT_TRUE(reader.has_next());
auto reader = factory.open_read_only(options);
#else
auto reader = factory.open_read_only(expected_bag.string(), storage_id);
#endif
reader->open(options);
EXPECT_TRUE(reader->has_next());

std_msgs::msg::String msg;
auto serialized_bag_msg = reader.read_next();
auto serialized_bag_msg = reader->read_next();
rclcpp::SerializedMessage extracted_serialized_msg(*serialized_bag_msg->serialized_data);
serialization.deserialize_message(&extracted_serialized_msg, &msg);
EXPECT_EQ(msg.data, message_data);
Expand All @@ -106,19 +104,17 @@ TEST_F(TemporaryDirectoryFixture, can_write_and_read_basic_mcap_file)
TEST_F(TemporaryDirectoryFixture, can_write_mcap_with_zstd_configured_from_yaml)
{
auto uri = rcpputils::fs::path(temporary_dir_path_) / "bag";
auto expected_bag = uri / "bag_0.mcap";
auto expected_bag = rcpputils::fs::path(temporary_dir_path_) / "bag.mcap";
const int64_t timestamp_nanos = 100; // arbitrary value
rcutils_time_point_value_t time_stamp{timestamp_nanos};
const std::string topic_name = "test_topic";
const std::string message_data = "Test Message 1";
const std::string storage_id = "mcap";
const std::string config_path = _TEST_RESOURCES_DIR_PATH;
// COMPATIBILITY(foxy)
// using verbose APIs for Foxy compatibility which did not yet provide plain-message API
rclcpp::Serialization<std_msgs::msg::String> serialization;

{
StorageOptions options;
rosbag2_storage::StorageOptions options;
options.uri = uri.string();
options.storage_id = storage_id;
options.storage_config_uri = config_path + "/mcap_writer_options_zstd.yaml";
Expand All @@ -129,12 +125,9 @@ TEST_F(TemporaryDirectoryFixture, can_write_mcap_with_zstd_configured_from_yaml)
std_msgs::msg::String msg;
msg.data = message_data;

rosbag2_cpp::Writer writer{std::make_unique<rosbag2_cpp::writers::SequentialWriter>()};
#ifndef ROSBAG2_STORAGE_MCAP_WRITER_CREATES_DIRECTORY
rcpputils::fs::create_directories(uri);
#endif
writer.open(options, rosbag2_cpp::ConverterOptions{});
writer.create_topic(topic_metadata);
rosbag2_storage::StorageFactory factory;
auto writer = factory.open_read_write(options);
writer->create_topic(topic_metadata);

auto serialized_msg = std::make_shared<rclcpp::SerializedMessage>();
serialization.serialize_message(&msg, serialized_msg.get());
Expand All @@ -149,21 +142,21 @@ TEST_F(TemporaryDirectoryFixture, can_write_mcap_with_zstd_configured_from_yaml)
[](rcutils_uint8_array_t * /* data */) {});
serialized_bag_msg->time_stamp = time_stamp;
serialized_bag_msg->topic_name = topic_name;
writer.write(serialized_bag_msg);
writer.write(serialized_bag_msg);
writer->write(serialized_bag_msg);
writer->write(serialized_bag_msg);
EXPECT_TRUE(expected_bag.is_regular_file());
}
{
StorageOptions options;
rosbag2_storage::StorageOptions options;
options.uri = expected_bag.string();
options.storage_id = storage_id;

rosbag2_cpp::Reader reader{std::make_unique<rosbag2_cpp::readers::SequentialReader>()};
reader.open(options, rosbag2_cpp::ConverterOptions{});
EXPECT_TRUE(reader.has_next());
rosbag2_storage::StorageFactory factory;
auto reader = factory.open_read_only(options);
EXPECT_TRUE(reader->has_next());

std_msgs::msg::String msg;
auto serialized_bag_msg = reader.read_next();
auto serialized_bag_msg = reader->read_next();
rclcpp::SerializedMessage extracted_serialized_msg(*serialized_bag_msg->serialized_data);
serialization.deserialize_message(&extracted_serialized_msg, &msg);
EXPECT_EQ(msg.data, message_data);
Expand Down