Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove some code duplication between SequentialWriter and SequentialCompressionWriter #527

Merged
merged 4 commits into from
Oct 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include "rosbag2_cpp/serialization_format_converter_factory.hpp"
#include "rosbag2_cpp/serialization_format_converter_factory_interface.hpp"
#include "rosbag2_cpp/storage_options.hpp"
#include "rosbag2_cpp/writer_interfaces/base_writer_interface.hpp"
#include "rosbag2_cpp/writers/sequential_writer.hpp"

#include "rosbag2_storage/metadata_io.hpp"
#include "rosbag2_storage/storage_factory.hpp"
Expand All @@ -48,7 +48,7 @@ namespace rosbag2_compression
{

class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionWriter
: public rosbag2_cpp::writer_interfaces::BaseWriterInterface
: public rosbag2_cpp::writers::SequentialWriter
{
public:
explicit SequentialCompressionWriter(
Expand Down Expand Up @@ -81,33 +81,6 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionWriter
*/
void reset() override;

/**
* Create a new topic in the underlying storage. Needs to be called for every topic used within
* a message which is passed to write(...).
*
* \param topic_with_type name and type identifier of topic to be created
* \throws runtime_error if the Writer is not open.
*/
void create_topic(const rosbag2_storage::TopicMetadata & topic_with_type) override;

/**
* Remove a new topic in the underlying storage.
* If creation of subscription fails remove the topic
* from the db (more of cleanup)
*
* \param topic_with_type name and type identifier of topic to be created
* \throws runtime_error if the Writer is not open.
*/
void remove_topic(const rosbag2_storage::TopicMetadata & topic_with_type) override;

/**
* Write a message to a bagfile. The topic needs to have been created before writing is possible.
*
* \param message to be written to the bagfile
* \throws runtime_error if the Writer is not open.
*/
void write(std::shared_ptr<rosbag2_storage::SerializedBagMessage> message) override;

protected:
/**
* Compress the most recent file and update the metadata file path.
Expand All @@ -132,38 +105,27 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionWriter
virtual void setup_compression();

private:
std::string base_folder_;
std::unique_ptr<rosbag2_storage::StorageFactoryInterface> storage_factory_{};
std::shared_ptr<rosbag2_cpp::SerializationFormatConverterFactoryInterface> converter_factory_{};
std::shared_ptr<rosbag2_storage::storage_interfaces::ReadWriteInterface> storage_{};
std::unique_ptr<rosbag2_storage::MetadataIo> metadata_io_{};
std::unique_ptr<rosbag2_cpp::Converter> converter_{};
std::unique_ptr<rosbag2_compression::BaseCompressorInterface> compressor_{};
std::unique_ptr<rosbag2_compression::CompressionFactory> compression_factory_{};

// Used in bagfile splitting; specifies the best-effort maximum sub-section of a bagfile in bytes.
uint64_t max_bagfile_size_{rosbag2_storage::storage_interfaces::MAX_BAGFILE_SIZE_NO_SPLIT};

// Used to track topic -> message count
std::unordered_map<std::string, rosbag2_storage::TopicInformation> topics_names_to_info_{};

rosbag2_storage::BagMetadata metadata_{};

rosbag2_compression::CompressionOptions compression_options_{};

bool should_compress_last_file_{true};

// Closes the current backed storage and opens the next bagfile.
void split_bagfile();

// Checks if the current recording bagfile needs to be split and rolled over to a new file.
bool should_split_bagfile() const;
void split_bagfile() override;

// Prepares the metadata by setting initial values.
void init_metadata();

// Record TopicInformation into metadata
void finalize_metadata();
void init_metadata() override;

// Helper method used by write to get the message in a format that is ready to be written.
// Common use cases include converting the message using the converter or
// performing other operations like compression on it
std::shared_ptr<rosbag2_storage::SerializedBagMessage>
get_writeable_message(
std::shared_ptr<rosbag2_storage::SerializedBagMessage> message) override;
};

} // namespace rosbag2_compression

#endif // ROSBAG2_COMPRESSION__SEQUENTIAL_COMPRESSION_WRITER_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ std::string format_storage_uri(const std::string & base_folder, uint64_t storage

SequentialCompressionWriter::SequentialCompressionWriter(
const rosbag2_compression::CompressionOptions & compression_options)
: storage_factory_{std::make_unique<rosbag2_storage::StorageFactory>()},
converter_factory_{std::make_shared<rosbag2_cpp::SerializationFormatConverterFactory>()},
metadata_io_{std::make_unique<rosbag2_storage::MetadataIo>()},
: SequentialWriter(),
compression_factory_{std::make_unique<rosbag2_compression::CompressionFactory>()},
compression_options_{compression_options}
{}
Expand All @@ -66,9 +64,7 @@ SequentialCompressionWriter::SequentialCompressionWriter(
std::unique_ptr<rosbag2_storage::StorageFactoryInterface> storage_factory,
std::shared_ptr<rosbag2_cpp::SerializationFormatConverterFactoryInterface> converter_factory,
std::unique_ptr<rosbag2_storage::MetadataIo> metadata_io)
: storage_factory_{std::move(storage_factory)},
converter_factory_{std::move(converter_factory)},
metadata_io_{std::move(metadata_io)},
: SequentialWriter(std::move(storage_factory), converter_factory, std::move(metadata_io)),
compression_factory_{std::move(compression_factory)},
compression_options_{compression_options}
{}
Expand Down Expand Up @@ -103,50 +99,11 @@ void SequentialCompressionWriter::open(
const rosbag2_cpp::StorageOptions & storage_options,
const rosbag2_cpp::ConverterOptions & converter_options)
{
max_bagfile_size_ = storage_options.max_bagfile_size;
base_folder_ = storage_options.uri;

if (converter_options.output_serialization_format !=
converter_options.input_serialization_format)
{
converter_ = std::make_unique<rosbag2_cpp::Converter>(converter_options, converter_factory_);
}

rcpputils::fs::path db_path(base_folder_);
if (db_path.is_directory()) {
std::stringstream error;
error << "Database directory already exists (" << db_path.string() <<
"), can't overwrite existing database";
throw std::runtime_error{error.str()};
}

bool dir_created = rcpputils::fs::create_directories(db_path);
if (!dir_created) {
std::stringstream error;
error << "Failed to create database directory (" << db_path.string() << ").";
throw std::runtime_error{error.str()};
}

const auto storage_uri = format_storage_uri(base_folder_, 0);
storage_ = storage_factory_->open_read_write(storage_uri, storage_options.storage_id);
if (!storage_) {
throw std::runtime_error{"No storage could be initialized. Abort"};
}

if (max_bagfile_size_ != 0 &&
max_bagfile_size_ < storage_->get_minimum_split_file_size())
{
std::stringstream error;
error << "Invalid bag splitting size given. Please provide a value greater than " <<
storage_->get_minimum_split_file_size() << ". Specified value of " <<
storage_options.max_bagfile_size;
throw std::runtime_error{error.str()};
}

SequentialWriter::open(storage_options, converter_options);
setup_compression();
init_metadata();
}


void SequentialCompressionWriter::reset()
{
if (!base_folder_.empty() && compressor_) {
Expand All @@ -170,52 +127,6 @@ void SequentialCompressionWriter::reset()
storage_factory_.reset();
}

void SequentialCompressionWriter::create_topic(
const rosbag2_storage::TopicMetadata & topic_with_type)
{
if (!storage_) {
throw std::runtime_error{"Bag is not open. Call open() before writing."};
}

if (converter_) {
converter_->add_topic(topic_with_type.name, topic_with_type.type);
}

if (topics_names_to_info_.find(topic_with_type.name) ==
topics_names_to_info_.end())
{
rosbag2_storage::TopicInformation info{};
info.topic_metadata = topic_with_type;

const auto insert_res = topics_names_to_info_.insert(
std::make_pair(topic_with_type.name, info));

if (!insert_res.second) {
std::stringstream errmsg;
errmsg << "Failed to insert topic \"" << topic_with_type.name << "\"!";
throw std::runtime_error{errmsg.str()};
}

storage_->create_topic(topic_with_type);
}
}

void SequentialCompressionWriter::remove_topic(
const rosbag2_storage::TopicMetadata & topic_with_type)
{
if (!storage_) {
throw std::runtime_error{"Bag is not open. Call open() before removing."};
}

if (topics_names_to_info_.erase(topic_with_type.name) > 0) {
storage_->remove_topic(topic_with_type);
} else {
std::stringstream errmsg;
errmsg << "Failed to remove the non-existing topic \"" << topic_with_type.name << "\"!";
throw std::runtime_error{errmsg.str()};
}
}

void SequentialCompressionWriter::compress_last_file()
{
if (!compressor_) {
Expand Down Expand Up @@ -282,64 +193,15 @@ void SequentialCompressionWriter::compress_message(
compressor_->compress_serialized_bag_message(message.get());
}

void SequentialCompressionWriter::write(
std::shared_ptr<rosbag2_storage::SerializedBagMessage>
SequentialCompressionWriter::get_writeable_message(
std::shared_ptr<rosbag2_storage::SerializedBagMessage> message)
{
if (!storage_) {
throw std::runtime_error{"Bag is not open. Call open() before writing."};
}

// Update the message count for the Topic.
++topics_names_to_info_.at(message->topic_name).message_count;

if (should_split_bagfile()) {
split_bagfile();
}

const auto message_timestamp = std::chrono::time_point<std::chrono::high_resolution_clock>{
std::chrono::nanoseconds(message->time_stamp)};
metadata_.starting_time = std::min(metadata_.starting_time, message_timestamp);

const auto duration = message_timestamp - metadata_.starting_time;
metadata_.duration = std::max(metadata_.duration, duration);

auto converted_message = converter_ ? converter_->convert(message) : message;
auto writeable_msg = SequentialWriter::get_writeable_message(message);
if (compression_options_.compression_mode == rosbag2_compression::CompressionMode::MESSAGE) {
compress_message(converted_message);
}

storage_->write(converted_message);
}

bool SequentialCompressionWriter::should_split_bagfile() const
{
if (max_bagfile_size_ == rosbag2_storage::storage_interfaces::MAX_BAGFILE_SIZE_NO_SPLIT) {
return false;
} else {
return storage_->get_bagfile_size() > max_bagfile_size_;
}
}

void SequentialCompressionWriter::finalize_metadata()
{
metadata_.bag_size = 0;

for (const auto & path : metadata_.relative_file_paths) {
const auto bag_path = rcpputils::fs::path{path};

if (bag_path.exists()) {
metadata_.bag_size += bag_path.file_size();
}
}

metadata_.topics_with_message_count.clear();
metadata_.topics_with_message_count.reserve(topics_names_to_info_.size());
metadata_.message_count = 0;

for (const auto & topic : topics_names_to_info_) {
metadata_.topics_with_message_count.push_back(topic.second);
metadata_.message_count += topic.second.message_count;
compress_message(writeable_msg);
}
return writeable_msg;
}

jaisontj marked this conversation as resolved.
Show resolved Hide resolved
} // namespace rosbag2_compression
13 changes: 10 additions & 3 deletions rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter
*/
void write(std::shared_ptr<rosbag2_storage::SerializedBagMessage> message) override;

private:
protected:
std::string base_folder_;
std::unique_ptr<rosbag2_storage::StorageFactoryInterface> storage_factory_;
std::shared_ptr<SerializationFormatConverterFactoryInterface> converter_factory_;
Expand All @@ -128,16 +128,23 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter
rosbag2_storage::BagMetadata metadata_;

// Closes the current backed storage and opens the next bagfile.
void split_bagfile();
virtual void split_bagfile();

// Checks if the current recording bagfile needs to be split and rolled over to a new file.
bool should_split_bagfile() const;

// Prepares the metadata by setting initial values.
void init_metadata();
virtual void init_metadata();

// Record TopicInformation into metadata
void finalize_metadata();

// Helper method used by write to get the message in a format that is ready to be written.
// Common use cases include converting the message using the converter or
// performing other operations like compression on it
virtual std::shared_ptr<rosbag2_storage::SerializedBagMessage>
get_writeable_message(
std::shared_ptr<rosbag2_storage::SerializedBagMessage> message);
};

} // namespace writers
Expand Down
12 changes: 10 additions & 2 deletions rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,12 @@ void SequentialWriter::write(std::shared_ptr<rosbag2_storage::SerializedBagMessa
const auto duration = message_timestamp - metadata_.starting_time;
metadata_.duration = std::max(metadata_.duration, duration);

auto converted_msg = get_writeable_message(message);
// if cache size is set to zero, we directly call write
if (max_cache_size_ == 0u) {
storage_->write(converter_ ? converter_->convert(message) : message);
storage_->write(converted_msg);
} else {
cache_.push_back(converter_ ? converter_->convert(message) : message);
cache_.push_back(converted_msg);
if (cache_.size() >= max_cache_size_) {
storage_->write(cache_);
// reset cache
Expand All @@ -257,6 +258,13 @@ void SequentialWriter::write(std::shared_ptr<rosbag2_storage::SerializedBagMessa
}
}

std::shared_ptr<rosbag2_storage::SerializedBagMessage>
SequentialWriter::get_writeable_message(
std::shared_ptr<rosbag2_storage::SerializedBagMessage> message)
{
return converter_ ? converter_->convert(message) : message;
}

bool SequentialWriter::should_split_bagfile() const
{
// Assume we aren't splitting
Expand Down