Skip to content

Commit

Permalink
Apply similar suggestions from SequentialReader
Browse files Browse the repository at this point in the history
Signed-off-by: Zachary Michaels <zmichaels11@gmail.com>
  • Loading branch information
zmichaels11 committed Jan 21, 2020
1 parent fa99bec commit 506dc78
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 49 deletions.
5 changes: 5 additions & 0 deletions rosbag2_compression/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ if(NOT CMAKE_CXX_STANDARD)
set(CMAKE_CXX_STANDARD 14)
endif()

# Windows supplies macros for min and max by default. We should only use min and max from stl
if(WIN32)
add_definitions(-DNOMINMAX)
endif()

if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@
#include "rosbag2_cpp/writer_interfaces/base_writer_interface.hpp"
#include "rosbag2_cpp/serialization_format_converter_factory.hpp"
#include "rosbag2_cpp/serialization_format_converter_factory_interface.hpp"
#include "rosbag2_cpp/compression_options.hpp"
#include "rosbag2_cpp/converter_options.hpp"
#include "rosbag2_cpp/converter.hpp"
#include "rosbag2_cpp/storage_options.hpp"

#include "rosbag2_storage/metadata_io.hpp"
#include "rosbag2_storage/storage_factory_interface.hpp"
#include "rosbag2_storage/storage_factory.hpp"

#include "rosbag2_compression/compression_options.hpp"

#include "base_compressor_interface.hpp"
#include "visibility_control.hpp"
#include "compression_options.hpp"
Expand Down Expand Up @@ -111,14 +113,21 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionWriter
*/
virtual void compress_message(std::shared_ptr<rosbag2_storage::SerializedBagMessage> message);

/**
* Initializes the compressor if a compression mode is specified.
*
* \throws std::invalid_argument if compression_options isn't supported.
*/
virtual void setup_compression();

private:
std::string base_folder_;
std::unique_ptr<rosbag2_storage::StorageFactoryInterface> storage_factory_{};
std::shared_ptr<rosbag2_cpp::SerializationFormatConverterFactoryInterface> converter_factory_{};
std::shared_ptr<rosbag2_storage::storage_interfaces::ReadWriteInterface> storage_{nullptr};
std::shared_ptr<rosbag2_storage::storage_interfaces::ReadWriteInterface> storage_{};
std::unique_ptr<rosbag2_storage::MetadataIo> metadata_io_{};
std::unique_ptr<rosbag2_cpp::Converter> converter_{nullptr};
std::unique_ptr<rosbag2_compression::BaseCompressorInterface> compressor_{nullptr};
std::unique_ptr<rosbag2_cpp::Converter> converter_{};
std::unique_ptr<rosbag2_compression::BaseCompressorInterface> compressor_{};

// Used in bagfile splitting; specifies the best-effort maximum sub-section of a bagfile in bytes.
uint64_t max_bagfile_size_{rosbag2_storage::storage_interfaces::MAX_BAGFILE_SIZE_NO_SPLIT};
Expand All @@ -128,9 +137,7 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionWriter

rosbag2_storage::BagMetadata metadata_{};

// Used in invoking compression
rosbag2_compression::CompressionOptions compression_options_{};
std::vector<std::string> supported_compressors_{"zstd"};

bool should_compress_last_file_{true};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#include "rosbag2_compression/sequential_compression_writer.hpp"

#include <algorithm>
#include <cassert>
#include <chrono>
#include <memory>
#include <stdexcept>
Expand Down Expand Up @@ -48,20 +47,20 @@ std::string format_storage_uri(const std::string & base_folder, uint64_t storage

SequentialCompressionWriter::SequentialCompressionWriter(
const rosbag2_compression::CompressionOptions & compression_options)
: storage_factory_(std::make_unique<rosbag2_storage::StorageFactory>()),
converter_factory_(std::make_shared<rosbag2_cpp::SerializationFormatConverterFactory>()),
metadata_io_(std::make_unique<rosbag2_storage::MetadataIo>()),
compression_options_(compression_options) {}
: storage_factory_{std::make_unique<rosbag2_storage::StorageFactory>()},
converter_factory_{std::make_shared<rosbag2_cpp::SerializationFormatConverterFactory>()},
metadata_io_{std::make_unique<rosbag2_storage::MetadataIo>()},
compression_options_{compression_options} {}

SequentialCompressionWriter::SequentialCompressionWriter(
const rosbag2_compression::CompressionOptions & compression_options,
std::unique_ptr<rosbag2_storage::StorageFactoryInterface> storage_factory,
std::shared_ptr<rosbag2_cpp::SerializationFormatConverterFactoryInterface> converter_factory,
std::unique_ptr<rosbag2_storage::MetadataIo> metadata_io)
: storage_factory_(std::move(storage_factory)),
converter_factory_(std::move(converter_factory)),
metadata_io_(std::move(metadata_io)),
compression_options_(compression_options) {}
: storage_factory_{std::move(storage_factory)},
converter_factory_{std::move(converter_factory)},
metadata_io_{std::move(metadata_io)},
compression_options_{compression_options} {}

SequentialCompressionWriter::~SequentialCompressionWriter()
{
Expand All @@ -72,14 +71,31 @@ void SequentialCompressionWriter::init_metadata()
{
metadata_ = rosbag2_storage::BagMetadata{};
metadata_.storage_identifier = storage_->get_storage_identifier();
metadata_.starting_time = std::chrono::time_point<std::chrono::high_resolution_clock>(
std::chrono::nanoseconds::max());
metadata_.starting_time = std::chrono::time_point<std::chrono::high_resolution_clock>{
std::chrono::nanoseconds::max()};
metadata_.relative_file_paths = {storage_->get_relative_file_path()};
metadata_.compression_format = compression_options_.compression_format;
metadata_.compression_mode =
rosbag2_compression::compression_mode_to_string(compression_options_.compression_mode);
}

void SequentialCompressionWriter::setup_compression()
{
if (compression_options_.compression_mode == rosbag2_compression::CompressionMode::NONE) {
throw std::invalid_argument{
"SequentialCompressionWriter requires a CompressionMode that is not NONE!"};
}

// TODO(zmichaels11) Support additional compression formats
if (compression_options_.compression_format == "zstd") {
compressor_ = std::make_unique<rosbag2_compression::ZstdCompressor>();
} else {
std::stringstream err;
err << "Unsupported compression format: \"" << compression_options_.compression_format << "\"";
throw std::invalid_argument{err.str()};
}
}

void SequentialCompressionWriter::open(
const rosbag2_cpp::StorageOptions & storage_options,
const rosbag2_cpp::ConverterOptions & converter_options)
Expand All @@ -96,7 +112,7 @@ void SequentialCompressionWriter::open(
const auto storage_uri = format_storage_uri(base_folder_, 0);
storage_ = storage_factory_->open_read_write(storage_uri, storage_options.storage_id);
if (!storage_) {
throw std::runtime_error("No storage could be initialized. Abort");
throw std::runtime_error{"No storage could be initialized. Abort"};
}

if (storage_options.max_bagfile_size != 0 &&
Expand All @@ -106,25 +122,20 @@ void SequentialCompressionWriter::open(
"Invalid bag splitting size given. Please provide a different value."};
}

if (std::find(supported_compressors_.begin(), supported_compressors_.end(),
compression_options_.compression_format) == supported_compressors_.end())
{
std::stringstream err;
err << "Compression format " << compression_options_.compression_format << " is not supported.";
throw std::invalid_argument{err.str()};
}
// Currently we support only Zstd compression
compressor_ = std::make_unique<rosbag2_compression::ZstdCompressor>();
setup_compression();
init_metadata();
}

void SequentialCompressionWriter::reset()
{
if (!base_folder_.empty()) {
if (!compressor_) {
throw std::runtime_error{"Compressor was not opened!"};
}

// Reset may be called before initializing the compressor (ex. bad options).
// We compress the last file only if it hasn't been compressed earlier (ex. in split_bagfile()).
if (compressor_ &&
compression_options_.compression_mode == rosbag2_compression::CompressionMode::FILE &&
if (compression_options_.compression_mode == rosbag2_compression::CompressionMode::FILE &&
should_compress_last_file_)
{
try {
Expand Down Expand Up @@ -175,7 +186,7 @@ void SequentialCompressionWriter::remove_topic(
const rosbag2_storage::TopicMetadata & topic_with_type)
{
if (!storage_) {
throw std::runtime_error("Bag is not open. Call open() before removing.");
throw std::runtime_error{"Bag is not open. Call open() before removing."};
}

if (topics_names_to_info_.erase(topic_with_type.name) > 0) {
Expand All @@ -189,9 +200,15 @@ void SequentialCompressionWriter::remove_topic(

void SequentialCompressionWriter::compress_last_file()
{
assert(compressor_ != nullptr);
metadata_.relative_file_paths.back() =
compressor_->compress_uri(metadata_.relative_file_paths.back());
if (!compressor_) {
throw std::runtime_error{"Compressor was not opened!"};
}

const auto & to_compress = metadata_.relative_file_paths.back();

ROSBAG2_COMPRESSION_LOG_DEBUG_STREAM("Compressing \"" << to_compress << "\"");

metadata_.relative_file_paths.back() = compressor_->compress_uri(to_compress);
}

void SequentialCompressionWriter::split_bagfile()
Expand All @@ -200,23 +217,25 @@ void SequentialCompressionWriter::split_bagfile()
compress_last_file();
}

// Add a check to make sure reset() does not compress the file again if we couldn't load the
// storage plugin.
should_compress_last_file_ = false;
const auto storage_uri = format_storage_uri(
base_folder_,
metadata_.relative_file_paths.size());

storage_ = storage_factory_->open_read_write(storage_uri, metadata_.storage_identifier);

if (!storage_) {
// Add a check to make sure reset() does not compress the file again if we couldn't load the
// storage plugin.
should_compress_last_file_ = false;

std::stringstream errmsg;
errmsg << "Failed to rollover bagfile to new file: \"" << storage_uri << "\"!";
throw std::runtime_error(errmsg.str());
throw std::runtime_error{errmsg.str()};
}
should_compress_last_file_ = true;

metadata_.relative_file_paths.push_back(storage_->get_relative_file_path());

// Re-register all topics since we rolled-over to a new bagfile.
// Re-register all topics since we rolled-over to a new bagfile.
for (const auto & topic : topics_names_to_info_) {
storage_->create_topic(topic.second.topic_metadata);
}
Expand All @@ -225,15 +244,18 @@ void SequentialCompressionWriter::split_bagfile()
void SequentialCompressionWriter::compress_message(
std::shared_ptr<rosbag2_storage::SerializedBagMessage> message)
{
assert(compressor_ != nullptr);
if (!compressor_) {
throw std::runtime_error{"Cannot compress message; Writer is not open!"};
}

compressor_->compress_serialized_bag_message(message.get());
}

void SequentialCompressionWriter::write(
std::shared_ptr<rosbag2_storage::SerializedBagMessage> message)
{
if (!storage_) {
throw std::runtime_error("Bag is not open. Call open() before writing.");
throw std::runtime_error{"Bag is not open. Call open() before writing."};
}

// Update the message count for the Topic.
Expand All @@ -243,8 +265,8 @@ void SequentialCompressionWriter::write(
split_bagfile();
}

const auto message_timestamp = std::chrono::time_point<std::chrono::high_resolution_clock>(
std::chrono::nanoseconds(message->time_stamp));
const auto message_timestamp = std::chrono::time_point<std::chrono::high_resolution_clock>{
std::chrono::nanoseconds(message->time_stamp)};
metadata_.starting_time = std::min(metadata_.starting_time, message_timestamp);

const auto duration = message_timestamp - metadata_.starting_time;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ class SequentialCompressionWriterTest : public Test
{
public:
SequentialCompressionWriterTest()
: storage_factory_(std::make_unique<StrictMock<MockStorageFactory>>()),
storage_(std::make_shared<NiceMock<MockStorage>>()),
converter_factory_(std::make_shared<StrictMock<MockConverterFactory>>()),
metadata_io_(std::make_unique<NiceMock<MockMetadataIo>>()),
storage_options_(),
serialization_format_("rmw_format")
: storage_factory_{std::make_unique<StrictMock<MockStorageFactory>>()},
storage_{std::make_shared<NiceMock<MockStorage>>()},
converter_factory_{std::make_shared<StrictMock<MockConverterFactory>>()},
metadata_io_{std::make_unique<NiceMock<MockMetadataIo>>()},
storage_options_{},
serialization_format_{"rmw_format"}
{
ON_CALL(*storage_factory_, open_read_write(_, _)).WillByDefault(Return(storage_));
EXPECT_CALL(*storage_factory_, open_read_write(_, _)).Times(AtLeast(0));
Expand Down

0 comments on commit 506dc78

Please sign in to comment.