Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix throw in playback of split+compressed bagfiles #294

Merged
merged 5 commits into from
Mar 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,17 @@ void SequentialCompressionReader::open(
storage_ = storage_factory_->open_read_only(
*current_file_iterator_, metadata_.storage_identifier);
if (!storage_) {
throw std::runtime_error{"No storage could be initialized. Abort"};
std::stringstream errmsg;
errmsg << "No storage could be initialized for: \"" <<
storage_options.uri << "\".";

throw std::runtime_error{errmsg.str()};
}
} else {
throw std::runtime_error{"Compression is not supported for legacy bag files."};
std::stringstream errmsg;
errmsg << "Could not find metadata for bag: \"" << storage_options.uri <<
"\". Legacy bag files are not supported if this is a ROS 1 bag file.";
throw std::runtime_error{errmsg.str()};
}
const auto & topics = metadata_.topics_with_message_count;
if (topics.empty()) {
Expand Down Expand Up @@ -151,12 +158,19 @@ void SequentialCompressionReader::load_next_file()
throw std::runtime_error{"Cannot load next file; already on last file!"};
}

if (decompressor_) {
throw std::runtime_error{"Bagfile is not opened"};
if (compression_mode_ == rosbag2_compression::CompressionMode::NONE) {
throw std::runtime_error{"Cannot use SequentialCompressionWriter with NONE compression mode."};
}

++current_file_iterator_;
if (compression_mode_ == rosbag2_compression::CompressionMode::FILE) {
if (decompressor_ == nullptr) {
throw std::runtime_error{
"The bag file was not properly opened. "
"Somehow the compression mode was set without opening a decompressor."
};
}

ROSBAG2_COMPRESSION_LOG_DEBUG_STREAM("Decompressing " << get_current_file().c_str());
*current_file_iterator_ = decompressor_->decompress_uri(get_current_file());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,48 @@ TEST_F(SequentialCompressionReaderTest, reader_calls_create_decompressor)
reader_->open(
rosbag2_cpp::StorageOptions(), {"", storage_serialization_format_});
}

TEST_F(SequentialCompressionReaderTest, compression_called_when_splitting_bagfile)
{
const auto relative_path_1 = "/path/to/storage1";
const auto relative_path_2 = "/path/to/storage2";
rosbag2_storage::BagMetadata metadata;
metadata.relative_file_paths = {relative_path_1, relative_path_2};
metadata.topics_with_message_count.push_back({{topic_with_type_}, 10});
metadata.bag_size = 512000;
metadata.compression_format = "zstd";
metadata.compression_mode =
rosbag2_compression::compression_mode_to_string(rosbag2_compression::CompressionMode::FILE);
ON_CALL(*metadata_io_, read_metadata(_)).WillByDefault(Return(metadata));
ON_CALL(*metadata_io_, metadata_file_exists(_)).WillByDefault(Return(true));

auto decompressor = std::make_unique<NiceMock<MockDecompressor>>();
// We are mocking two splits, so only file decompression should occur twice
EXPECT_CALL(*decompressor, decompress_uri(_)).Times(2)
.WillOnce(Return(relative_path_1))
.WillOnce(Return(relative_path_2));
EXPECT_CALL(*decompressor, decompress_serialized_bag_message(_)).Times(0);

auto compression_factory = std::make_unique<StrictMock<MockCompressionFactory>>();
ON_CALL(*compression_factory, create_decompressor(_))
.WillByDefault(Return(ByMove(std::move(decompressor))));
EXPECT_CALL(*compression_factory, create_decompressor(_)).Times(1);
// open_read_only should be called twice when opening 2 split bags
EXPECT_CALL(*storage_factory_, open_read_only(_, _)).Times(2);
// storage::has_next() is called twice when reader::has_next() is called
EXPECT_CALL(*storage_, has_next()).Times(2)
.WillOnce(Return(false)) // Load the next file
.WillOnce(Return(true)); // We have a message from the new file

auto compression_reader = std::make_unique<rosbag2_compression::SequentialCompressionReader>(
std::move(compression_factory),
std::move(storage_factory_),
converter_factory_,
std::move(metadata_io_));

compression_reader->open(
rosbag2_cpp::StorageOptions(), {"", storage_serialization_format_});
EXPECT_EQ(compression_reader->has_next_file(), true);
EXPECT_EQ(compression_reader->has_next(), true);
compression_reader->read_next();
}