Skip to content

Commit

Permalink
Implement storing and loading ROS_DISTRO from metadata.yaml and mcap …
Browse files Browse the repository at this point in the history
…files (#1241)

* Implement storing and loading ROS_DISTRO from mcap metadata

Signed-off-by: Emerson Knapp <emerson.b.knapp@gmail.com>

* Just add ros_distro to BagMetadata and skip all the side channeling, why do that

Signed-off-by: Emerson Knapp <emerson.b.knapp@gmail.com>

* Fix functionality

Signed-off-by: Emerson Knapp <emerson.b.knapp@gmail.com>

* Add mcap test

Signed-off-by: Emerson Knapp <emerson.b.knapp@gmail.com>

* Move functions to get_metadata and update_metadata

Signed-off-by: Emerson Knapp <emerson.b.knapp@gmail.com>

* Update metadata version

Signed-off-by: Emerson Knapp <emerson.b.knapp@gmail.com>

* Add warning message when ROS_DISTRO not set

Signed-off-by: Emerson Knapp <emerson.b.knapp@gmail.com>

* Small tweaks

Signed-off-by: Emerson Knapp <emerson.b.knapp@gmail.com>

* Update test expectation and print unknown when value unknown

Signed-off-by: Emerson Knapp <emerson.b.knapp@gmail.com>

* comment out unused variable

Signed-off-by: Emerson Knapp <emerson.b.knapp@gmail.com>

---------

Signed-off-by: Emerson Knapp <emerson.b.knapp@gmail.com>
  • Loading branch information
emersonknapp committed Jun 3, 2023
1 parent 13f5151 commit 02a70b4
Show file tree
Hide file tree
Showing 11 changed files with 76 additions and 31 deletions.
6 changes: 6 additions & 0 deletions rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <utility>
#include <vector>

#include "rcpputils/env.hpp"
#include "rcpputils/filesystem_helper.hpp"

#include "rosbag2_cpp/info.hpp"
Expand Down Expand Up @@ -78,6 +79,11 @@ void SequentialWriter::init_metadata()
file_info.message_count = 0;
metadata_.custom_data = storage_options_.custom_data;
metadata_.files = {file_info};
metadata_.ros_distro = rcpputils::get_env_var("ROS_DISTRO");
if (metadata_.ros_distro.empty()) {
ROSBAG2_CPP_LOG_WARN(
"Environment variable ROS_DISTRO not set, can't store value in bag metadata.");
}
}

void SequentialWriter::open(
Expand Down
12 changes: 8 additions & 4 deletions rosbag2_py/src/rosbag2_py/_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,8 @@ PYBIND11_MODULE(_storage, m) {
std::vector<rosbag2_storage::TopicInformation> topics_with_message_count,
std::string compression_format,
std::string compression_mode,
std::unordered_map<std::string, std::string> custom_data)
std::unordered_map<std::string, std::string> custom_data,
std::string ros_distro)
{
return rosbag2_storage::BagMetadata{
version,
Expand All @@ -243,10 +244,11 @@ PYBIND11_MODULE(_storage, m) {
topics_with_message_count,
compression_format,
compression_mode,
custom_data
custom_data,
ros_distro,
};
}),
pybind11::arg("version") = 7,
pybind11::arg("version") = rosbag2_storage::BagMetadata{}.version,
pybind11::arg("bag_size") = 0,
pybind11::arg("storage_identifier") = "",
pybind11::arg("relative_file_paths") = std::vector<std::string>(),
Expand All @@ -258,7 +260,8 @@ PYBIND11_MODULE(_storage, m) {
pybind11::arg("topics_with_message_count") = std::vector<rosbag2_storage::TopicInformation>(),
pybind11::arg("compression_format") = "",
pybind11::arg("compression_mode") = "",
pybind11::arg("custom_data") = std::unordered_map<std::string, std::string>())
pybind11::arg("custom_data") = std::unordered_map<std::string, std::string>(),
pybind11::arg("ros_distro") = "")
.def_readwrite("version", &rosbag2_storage::BagMetadata::version)
.def_readwrite("bag_size", &rosbag2_storage::BagMetadata::bag_size)
.def_readwrite("storage_identifier", &rosbag2_storage::BagMetadata::storage_identifier)
Expand Down Expand Up @@ -287,6 +290,7 @@ PYBIND11_MODULE(_storage, m) {
.def_readwrite("compression_format", &rosbag2_storage::BagMetadata::compression_format)
.def_readwrite("compression_mode", &rosbag2_storage::BagMetadata::compression_mode)
.def_readwrite("custom_data", &rosbag2_storage::BagMetadata::custom_data)
.def_readwrite("ros_distro", &rosbag2_storage::BagMetadata::ros_distro)
.def(
"__repr__", [](const rosbag2_storage::BagMetadata & metadata) {
return format_bag_meta_data(metadata);
Expand Down
5 changes: 5 additions & 0 deletions rosbag2_py/src/rosbag2_py/format_bag_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,18 @@ std::string format_bag_meta_data(const rosbag2_storage::BagMetadata & metadata)
auto end_time = start_time + metadata.duration;
std::stringstream info_stream;
int indentation_spaces = 19; // The longest info field (Topics with Type:) plus one space.
std::string ros_distro = metadata.ros_distro;
if (ros_distro.empty()) {
ros_distro = "unknown";
}

info_stream << std::endl;
info_stream << "Files: ";
format_file_paths(metadata.relative_file_paths, info_stream, indentation_spaces);
info_stream << "Bag size: " << format_file_size(
metadata.bag_size) << std::endl;
info_stream << "Storage id: " << metadata.storage_identifier << std::endl;
info_stream << "ROS Distro: " << ros_distro << std::endl;
info_stream << "Duration: " << format_duration(
metadata.duration)["time_in_sec"] << "s" << std::endl;
info_stream << "Start: " << format_time_point(start_time) <<
Expand Down
3 changes: 2 additions & 1 deletion rosbag2_storage/include/rosbag2_storage/bag_metadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ struct FileInformation

struct BagMetadata
{
int version = 7; // upgrade this number when changing the content of the struct
int version = 8; // upgrade this number when changing the content of the struct
uint64_t bag_size = 0; // Will not be serialized
std::string storage_identifier;
std::vector<std::string> relative_file_paths;
Expand All @@ -54,6 +54,7 @@ struct BagMetadata
std::string compression_format;
std::string compression_mode;
std::unordered_map<std::string, std::string> custom_data; // {key: value, ...}
std::string ros_distro;
};

} // namespace rosbag2_storage
Expand Down
6 changes: 4 additions & 2 deletions rosbag2_storage/include/rosbag2_storage/yaml.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ struct convert<rosbag2_storage::BagMetadata>
node["relative_file_paths"] = metadata.relative_file_paths;
node["files"] = metadata.files;
node["custom_data"] = metadata.custom_data;

node["ros_distro"] = metadata.ros_distro;
return node;
}

Expand Down Expand Up @@ -273,7 +273,9 @@ struct convert<rosbag2_storage::BagMetadata>
if (metadata.version >= 6) {
metadata.custom_data = node["custom_data"].as<std::unordered_map<std::string, std::string>>();
}

if (metadata.version >= 8) {
metadata.ros_distro = node["ros_distro"].as<std::string>();
}
return true;
}
};
Expand Down
33 changes: 33 additions & 0 deletions rosbag2_storage_mcap/src/mcap_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,31 @@ rosbag2_storage::BagMetadata MCAPStorage::get_metadata()
metadata_.topics_with_message_count.push_back(topic_info);
}

const auto & mcap_metadatas = mcap_reader_->metadataIndexes();
auto range = mcap_metadatas.equal_range("rosbag2");
mcap::Status status{};
mcap::Record mcap_record{};
mcap::Metadata mcap_metadata{};
for (auto i = range.first; i != range.second; ++i) {
status = mcap::McapReader::ReadRecord(*data_source_, i->second.offset, &mcap_record);
if (!status.ok()) {
OnProblem(status);
continue;
}
status = mcap::McapReader::ParseMetadata(mcap_record, &mcap_metadata);
if (!status.ok()) {
OnProblem(status);
continue;
}
try {
metadata_.ros_distro = mcap_metadata.metadata.at("ROS_DISTRO");
} catch (const std::out_of_range & /* err */) {
RCUTILS_LOG_ERROR_NAMED(
LOG_NAME, "Metadata record with name 'rosbag2' did not contain key 'ROS_DISTRO'.");
}
break;
}

return metadata_;
}

Expand Down Expand Up @@ -803,6 +828,14 @@ void MCAPStorage::update_metadata(const rosbag2_storage::BagMetadata & bag_metad
"MCAP storage plugin does not support message compression, "
"consider using chunk compression by setting `compression: 'Zstd'` in storage config");
}

mcap::Metadata metadata;
metadata.name = "rosbag2";
metadata.metadata = {{"ROS_DISTRO", bag_metadata.ros_distro}};
mcap::Status status = mcap_writer_->write(metadata);
if (!status.ok()) {
OnProblem(status);
}
}
#endif

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "rclcpp/serialization.hpp"
#include "rclcpp/serialized_message.hpp"
#include "rcpputils/env.hpp"
#include "rcpputils/filesystem_helper.hpp"
#include "rosbag2_storage/storage_factory.hpp"
#ifdef ROSBAG2_STORAGE_MCAP_HAS_STORAGE_OPTIONS
Expand Down Expand Up @@ -52,6 +53,7 @@ TEST_F(TemporaryDirectoryFixture, can_write_and_read_basic_mcap_file)
// COMPATIBILITY(foxy)
// using verbose APIs for Foxy compatibility which did not yet provide plain-message API
rclcpp::Serialization<std_msgs::msg::String> serialization;
rosbag2_storage::StorageFactory factory;

{
rosbag2_storage::TopicMetadata topic_metadata;
Expand All @@ -64,7 +66,6 @@ TEST_F(TemporaryDirectoryFixture, can_write_and_read_basic_mcap_file)
std_msgs::msg::String msg;
msg.data = message_data;

rosbag2_storage::StorageFactory factory;
#ifdef ROSBAG2_STORAGE_MCAP_HAS_STORAGE_OPTIONS
rosbag2_storage::StorageOptions options;
options.uri = uri.string();
Expand All @@ -89,10 +90,9 @@ TEST_F(TemporaryDirectoryFixture, can_write_and_read_basic_mcap_file)
serialized_bag_msg->time_stamp = time_stamp;
serialized_bag_msg->topic_name = topic_name;
writer->write(serialized_bag_msg);
EXPECT_TRUE(expected_bag.is_regular_file());
}
EXPECT_TRUE(expected_bag.is_regular_file());
{
rosbag2_storage::StorageFactory factory;
#ifdef ROSBAG2_STORAGE_MCAP_HAS_STORAGE_OPTIONS
rosbag2_storage::StorageOptions options;
options.uri = expected_bag.string();
Expand All @@ -118,6 +118,9 @@ TEST_F(TemporaryDirectoryFixture, can_write_and_read_basic_mcap_file)
1u}}));
EXPECT_THAT(metadata.message_count, Eq(1u));

const auto current_distro = rcpputils::get_env_var("ROS_DISTRO");
EXPECT_EQ(metadata.ros_distro, current_distro);

EXPECT_TRUE(reader->has_next());

std_msgs::msg::String msg;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ class ROSBAG2_STORAGE_DEFAULT_PLUGINS_PUBLIC SqliteStorage
SqliteWrapper & get_sqlite_database_wrapper();

int get_db_schema_version() const;
std::string get_recorded_ros_distro() const;

enum class PresetProfile
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,13 @@ void SqliteStorage::read_metadata()
std::chrono::time_point<std::chrono::high_resolution_clock>(std::chrono::nanoseconds(min_time));
metadata_.duration = std::chrono::nanoseconds(max_time) - std::chrono::nanoseconds(min_time);
metadata_.bag_size = get_bagfile_size();

if (db_schema_version_ >= 3 && database_->table_exists("schema")) {
// Read schema version
auto statement = database_->prepare_statement("SELECT ros_distro from schema;");
auto query_results = statement->execute_query<std::string>();
metadata_.ros_distro = std::get<0>(*query_results.begin());
}
}

rosbag2_storage::BagMetadata SqliteStorage::get_metadata()
Expand Down Expand Up @@ -802,18 +809,6 @@ int SqliteStorage::get_db_schema_version() const
return db_schema_version_;
}

std::string SqliteStorage::get_recorded_ros_distro() const
{
std::string ros_distro;
if (db_schema_version_ >= 3 && database_->table_exists("schema")) {
// Read schema version
auto statement = database_->prepare_statement("SELECT ros_distro from schema;");
auto query_results = statement->execute_query<std::string>();
ros_distro = std::get<0>(*query_results.begin());
}
return ros_distro;
}

int SqliteStorage::get_last_rowid()
{
auto statement = database_->prepare_statement("SELECT max(rowid) from messages;");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,15 +350,13 @@ TEST_F(StorageTestFixture, get_db_schema_version_returns_correct_value) {
EXPECT_GE(writable_storage->get_db_schema_version(), 3);
}

TEST_F(StorageTestFixture, get_recorded_ros_distro_returns_correct_value) {
TEST_F(StorageTestFixture, metadata_ros_distro_returns_correct_value) {
auto writable_storage = std::make_shared<rosbag2_storage_plugins::SqliteStorage>();
EXPECT_TRUE(writable_storage->get_recorded_ros_distro().empty());

auto db_file = (rcpputils::fs::path(temporary_dir_path_) / "rosbag").string();
writable_storage->open({db_file, plugin_id_});

std::string current_ros_distro = rcpputils::get_env_var("ROS_DISTRO");
EXPECT_EQ(writable_storage->get_recorded_ros_distro(), current_ros_distro);
EXPECT_EQ(writable_storage->get_metadata().ros_distro, current_ros_distro);
}

TEST_F(StorageTestFixture, check_backward_compatibility_with_schema_version_2) {
Expand All @@ -367,17 +365,14 @@ TEST_F(StorageTestFixture, check_backward_compatibility_with_schema_version_2) {
std::make_unique<rosbag2_storage_plugins::SqliteStorage>();

EXPECT_EQ(readable_storage->get_db_schema_version(), -1);
EXPECT_TRUE(readable_storage->get_recorded_ros_distro().empty());

auto db_file = (rcpputils::fs::path(temporary_dir_path_) / "rosbag.db3").string();

readable_storage->open(
{db_file, plugin_id_},
rosbag2_storage::storage_interfaces::IOFlag::READ_ONLY);
std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> read_messages;

EXPECT_EQ(readable_storage->get_db_schema_version(), 2);
EXPECT_TRUE(readable_storage->get_recorded_ros_distro().empty());
EXPECT_TRUE(readable_storage->get_metadata().ros_distro.empty());
}

TEST_F(StorageTestFixture, get_metadata_returns_correct_struct_if_no_messages) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ TEST_P(InfoEndToEndTestFixture, info_end_to_end_test) {
std::string output = internal::GetCapturedStdout();
auto expected_storage = GetParam();
auto expected_file = rosbag2_test_common::bag_filename_for_storage_id("cdr_test_0", GetParam());
std::string expected_ros_distro = "unknown";

EXPECT_THAT(exit_code, Eq(EXIT_SUCCESS));
// The bag size depends on the os and is not asserted, the time is asserted time zone independent
Expand All @@ -51,6 +52,7 @@ TEST_P(InfoEndToEndTestFixture, info_end_to_end_test) {
"\nFiles: " + expected_file +
"\nBag size: .*B"
"\nStorage id: " + expected_storage +
"\nROS Distro: " + expected_ros_distro +
"\nDuration: 0\\.151s"
"\nStart: Apr .+ 2020 .*:.*:36.763 \\(1586406456\\.763\\)"
"\nEnd: Apr .+ 2020 .*:.*:36.914 \\(1586406456\\.914\\)"
Expand Down

0 comments on commit 02a70b4

Please sign in to comment.