Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metadata per file info #870

Merged
merged 2 commits into from
Oct 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ void SequentialWriter::init_metadata()
metadata_.starting_time = std::chrono::time_point<std::chrono::high_resolution_clock>(
std::chrono::nanoseconds::max());
metadata_.relative_file_paths = {strip_parent_path(storage_->get_relative_file_path())};
rosbag2_storage::FileInformation file_info{};
file_info.path = strip_parent_path(storage_->get_relative_file_path());
file_info.starting_time = std::chrono::time_point<std::chrono::high_resolution_clock>(
std::chrono::nanoseconds::max());
file_info.message_count = 0;
metadata_.files = {file_info};
}

void SequentialWriter::open(
Expand Down Expand Up @@ -276,6 +282,10 @@ void SequentialWriter::split_bagfile()
switch_to_next_storage();

metadata_.relative_file_paths.push_back(strip_parent_path(storage_->get_relative_file_path()));

rosbag2_storage::FileInformation file_info{};
file_info.path = strip_parent_path(storage_->get_relative_file_path());
metadata_.files.push_back(file_info);
}

void SequentialWriter::write(std::shared_ptr<rosbag2_storage::SerializedBagMessage> message)
Expand All @@ -300,17 +310,25 @@ void SequentialWriter::write(std::shared_ptr<rosbag2_storage::SerializedBagMessa

// Update bagfile starting time
metadata_.starting_time = std::chrono::high_resolution_clock::now();
metadata_.files.back().starting_time = std::chrono::high_resolution_clock::now();
}

const auto message_timestamp = std::chrono::time_point<std::chrono::high_resolution_clock>(
std::chrono::nanoseconds(message->time_stamp));
metadata_.starting_time = std::min(metadata_.starting_time, message_timestamp);

metadata_.files.back().starting_time =
std::min(metadata_.files.back().starting_time, message_timestamp);
const auto duration = message_timestamp - metadata_.starting_time;
metadata_.duration = std::max(metadata_.duration, duration);

const auto file_duration = message_timestamp - metadata_.files.back().starting_time;
metadata_.files.back().duration =
std::max(metadata_.files.back().duration, file_duration);

auto converted_msg = get_writeable_message(message);

metadata_.files.back().message_count++;
if (storage_options_.max_cache_size == 0u) {
// If cache size is set to zero, we write to storage directly
storage_->write(converted_msg);
Expand Down
1 change: 1 addition & 0 deletions rosbag2_cpp/test/rosbag2_cpp/test_multifile_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class MultifileReaderTest : public Test

metadata.relative_file_paths =
{relative_path_1_, relative_path_2_, absolute_path_1_};
metadata.version = 4;

return metadata;
}
Expand Down
1 change: 1 addition & 0 deletions rosbag2_cpp/test/rosbag2_cpp/test_sequential_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class SequentialReaderTest : public Test
auto metadata_io = std::make_unique<NiceMock<MockMetadataIo>>();
rosbag2_storage::BagMetadata metadata;
metadata.relative_file_paths = {relative_file_path};
metadata.version = 4;
metadata.topics_with_message_count.push_back({{topic_with_type}, 1});

EXPECT_CALL(*metadata_io, read_metadata(_)).WillRepeatedly(Return(metadata));
Expand Down
18 changes: 18 additions & 0 deletions rosbag2_py/src/rosbag2_py/_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,29 @@ PYBIND11_MODULE(_storage, m) {
.def_readwrite("topic_metadata", &rosbag2_storage::TopicInformation::topic_metadata)
.def_readwrite("message_count", &rosbag2_storage::TopicInformation::message_count);

pybind11::class_<rosbag2_storage::FileInformation>(m, "FileInformation")
.def(
pybind11::init<std::string,
std::chrono::time_point<std::chrono::high_resolution_clock>,
std::chrono::nanoseconds,
size_t>(),
pybind11::arg("path"),
pybind11::arg("starting_time"),
pybind11::arg("duration"),
pybind11::arg("message_count"))
.def_readwrite("path", &rosbag2_storage::FileInformation::path)
.def_readwrite("starting_time", &rosbag2_storage::FileInformation::starting_time)
.def_readwrite("duration", &rosbag2_storage::FileInformation::duration)
.def_readwrite("message_count", &rosbag2_storage::FileInformation::message_count);

pybind11::class_<rosbag2_storage::BagMetadata>(m, "BagMetadata")
.def(
pybind11::init<
int,
uint64_t,
std::string,
std::vector<std::string>,
std::vector<rosbag2_storage::FileInformation>,
std::chrono::nanoseconds,
std::chrono::time_point<std::chrono::high_resolution_clock>,
uint64_t,
Expand All @@ -117,6 +133,7 @@ PYBIND11_MODULE(_storage, m) {
pybind11::arg("bag_size"),
pybind11::arg("storage_identifier"),
pybind11::arg("relative_file_paths"),
pybind11::arg("files"),
pybind11::arg("duration"),
pybind11::arg("starting_time"),
pybind11::arg("message_count"),
Expand All @@ -127,6 +144,7 @@ PYBIND11_MODULE(_storage, m) {
.def_readwrite("bag_size", &rosbag2_storage::BagMetadata::bag_size)
.def_readwrite("storage_identifier", &rosbag2_storage::BagMetadata::storage_identifier)
.def_readwrite("relative_file_paths", &rosbag2_storage::BagMetadata::relative_file_paths)
.def_readwrite("files", &rosbag2_storage::BagMetadata::files)
.def_readwrite("duration", &rosbag2_storage::BagMetadata::duration)
.def_readwrite("starting_time", &rosbag2_storage::BagMetadata::starting_time)
.def_readwrite("message_count", &rosbag2_storage::BagMetadata::message_count)
Expand Down
11 changes: 10 additions & 1 deletion rosbag2_storage/include/rosbag2_storage/bag_metadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,21 @@ struct TopicInformation
size_t message_count;
};

struct FileInformation
{
std::string path;
std::chrono::time_point<std::chrono::high_resolution_clock> starting_time;
std::chrono::nanoseconds duration;
size_t message_count;
};

struct BagMetadata
{
int version = 4; // upgrade this number when changing the content of the struct
int version = 5; // upgrade this number when changing the content of the struct
uint64_t bag_size = 0; // Will not be serialized
std::string storage_identifier;
std::vector<std::string> relative_file_paths;
std::vector<FileInformation> files;
std::chrono::nanoseconds duration;
std::chrono::time_point<std::chrono::high_resolution_clock> starting_time;
uint64_t message_count;
Expand Down
40 changes: 34 additions & 6 deletions rosbag2_storage/src/rosbag2_storage/metadata_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,30 @@ struct convert<std::vector<rosbag2_storage::TopicInformation>>
}
};

template<>
struct convert<rosbag2_storage::FileInformation>
{
static Node encode(const rosbag2_storage::FileInformation & metadata)
{
Node node;
node["path"] = metadata.path;
node["starting_time"] = metadata.starting_time;
node["duration"] = metadata.duration;
node["message_count"] = metadata.message_count;
return node;
}

static bool decode(const Node & node, rosbag2_storage::FileInformation & metadata)
{
metadata.path = node["path"].as<std::string>();
metadata.starting_time =
node["starting_time"].as<std::chrono::time_point<std::chrono::high_resolution_clock>>();
metadata.duration = node["duration"].as<std::chrono::nanoseconds>();
metadata.message_count = node["message_count"].as<uint64_t>();
return true;
}
};

template<>
struct convert<std::chrono::nanoseconds>
{
Expand Down Expand Up @@ -179,24 +203,22 @@ struct convert<rosbag2_storage::BagMetadata>
Node node;
node["version"] = metadata.version;
node["storage_identifier"] = metadata.storage_identifier;
node["relative_file_paths"] = metadata.relative_file_paths;
node["duration"] = metadata.duration;
node["starting_time"] = metadata.starting_time;
node["message_count"] = metadata.message_count;
node["topics_with_message_count"] = metadata.topics_with_message_count;
node["compression_format"] = metadata.compression_format;
node["compression_mode"] = metadata.compression_mode;
node["relative_file_paths"] = metadata.relative_file_paths;
node["files"] = metadata.files;

if (metadata.version >= 3) { // fields introduced by rosbag2_compression
node["compression_format"] = metadata.compression_format;
node["compression_mode"] = metadata.compression_mode;
}
return node;
}

static bool decode(const Node & node, rosbag2_storage::BagMetadata & metadata)
{
metadata.version = node["version"].as<int>();
metadata.storage_identifier = node["storage_identifier"].as<std::string>();
metadata.relative_file_paths = node["relative_file_paths"].as<std::vector<std::string>>();
metadata.duration = node["duration"].as<std::chrono::nanoseconds>();
metadata.starting_time = node["starting_time"]
.as<std::chrono::time_point<std::chrono::high_resolution_clock>>();
Expand All @@ -205,10 +227,16 @@ struct convert<rosbag2_storage::BagMetadata>
decode_for_version<std::vector<rosbag2_storage::TopicInformation>>(
node["topics_with_message_count"], metadata.version);

metadata.relative_file_paths = node["relative_file_paths"].as<std::vector<std::string>>();

if (metadata.version >= 3) { // fields introduced by rosbag2_compression
metadata.compression_format = node["compression_format"].as<std::string>();
metadata.compression_mode = node["compression_mode"].as<std::string>();
}
if (metadata.version >= 5) {
metadata.files =
node["files"].as<std::vector<rosbag2_storage::FileInformation>>();
}
return true;
}
};
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -1,21 +1,40 @@
rosbag2_bagfile_information:
version: 4
version: 5
storage_identifier: sqlite3
relative_file_paths:
- multiple_files_0.db3
- multiple_files_1.db3
- multiple_files_2.db3
duration:
nanoseconds: 1616630880179528024
nanoseconds: 9500166674
starting_time:
nanoseconds_since_epoch: 22460796745086
message_count: 3177
nanoseconds_since_epoch: 1630486341839168680
message_count: 52
topics_with_message_count:
- topic_metadata:
name: /chatter
name: /topic
type: std_msgs/msg/String
serialization_format: cdr
offered_qos_profiles: "- history: 1\n depth: 1\n reliability: 1\n durability: 2\n deadline:\n sec: 9223372036\n nsec: 854775807\n lifespan:\n sec: 9223372036\n nsec: 854775807\n liveliness: 1\n liveliness_lease_duration:\n sec: 9223372036\n nsec: 854775807\n avoid_ros_namespace_conventions: false"
message_count: 3177
offered_qos_profiles: "- history: 1\n depth: 10\n reliability: 1\n durability: 2\n deadline:\n sec: 9223372036\n nsec: 854775807\n lifespan:\n sec: 9223372036\n nsec: 854775807\n liveliness: 1\n liveliness_lease_duration:\n sec: 9223372036\n nsec: 854775807\n avoid_ros_namespace_conventions: false"
message_count: 52
compression_format: ""
compression_mode: ""
compression_mode: ""
files:
- path: multiple_files_0.db3
starting_time:
nanoseconds_since_epoch: 1630486321838886148
duration:
nanoseconds: 9500166674
message_count: 20
- path: multiple_files_1.db3
starting_time:
nanoseconds_since_epoch: 1630486331839194483
duration:
nanoseconds: 9499955003
message_count: 20
- path: multiple_files_2.db3
starting_time:
nanoseconds_since_epoch: 1630486341839168680
duration:
nanoseconds: 5499907705
message_count: 12
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ TEST_F(RecordFixture, record_end_to_end_with_splitting_bagsize_split_is_at_least

wait_for_metadata();
const auto metadata = metadata_io.read_metadata(root_bag_path_.string());
const auto actual_splits = static_cast<int>(metadata.relative_file_paths.size());
const auto actual_splits = static_cast<int>(metadata.files.size());

// TODO(zmichaels11): Support reliable sync-to-disk for more accurate splits.
// The only guarantee with splits right now is that they will not occur until
Expand All @@ -319,7 +319,7 @@ TEST_F(RecordFixture, record_end_to_end_with_splitting_bagsize_split_is_at_least

// Don't include the last bagfile since it won't be full
for (int i = 0; i < actual_splits - 1; ++i) {
const auto bagfile_path = root_bag_path_ / rcpputils::fs::path{metadata.relative_file_paths[i]};
const auto bagfile_path = root_bag_path_ / rcpputils::fs::path{metadata.files[i].path};
ASSERT_TRUE(bagfile_path.exists()) <<
"Expected bag file: \"" << bagfile_path.string() << "\" to exist.";

Expand Down Expand Up @@ -381,8 +381,8 @@ TEST_F(RecordFixture, record_end_to_end_with_splitting_max_size_not_reached) {
const auto metadata = metadata_io.read_metadata(root_bag_path_.string());

// Check that there's only 1 bagfile and that it exists.
ASSERT_EQ(1u, metadata.relative_file_paths.size());
const auto bagfile_path = root_bag_path_ / rcpputils::fs::path{metadata.relative_file_paths[0]};
ASSERT_EQ(1u, metadata.files.size());
const auto bagfile_path = root_bag_path_ / rcpputils::fs::path{metadata.files[0].path};
ASSERT_TRUE(bagfile_path.exists()) <<
"Expected bag file: \"" << bagfile_path.string() << "\" to exist.";

Expand Down Expand Up @@ -456,8 +456,8 @@ TEST_F(RecordFixture, record_end_to_end_with_splitting_splits_bagfile) {
wait_for_metadata();
const auto metadata = metadata_io.read_metadata(root_bag_path_.string());

for (const auto & rel_path : metadata.relative_file_paths) {
auto path = root_bag_path_ / rcpputils::fs::path(rel_path);
for (const auto & file : metadata.files) {
auto path = root_bag_path_ / rcpputils::fs::path(file.path);
EXPECT_TRUE(rcpputils::fs::exists(path));
}
}
Expand Down Expand Up @@ -520,8 +520,8 @@ TEST_F(RecordFixture, record_end_to_end_with_duration_splitting_splits_bagfile)
wait_for_metadata();
const auto metadata = metadata_io.read_metadata(root_bag_path_.string());

for (const auto & rel_path : metadata.relative_file_paths) {
auto path = root_bag_path_ / rcpputils::fs::path(rel_path);
for (const auto & file : metadata.files) {
auto path = root_bag_path_ / rcpputils::fs::path(file.path);
EXPECT_TRUE(rcpputils::fs::exists(path));
}
}
Expand Down