Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Karsten Knese <karsten@openrobotics.org>
  • Loading branch information
Karsten1987 committed Apr 1, 2020
1 parent 23804e8 commit 21b2c80
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 29 deletions.
6 changes: 3 additions & 3 deletions rosbag2_cpp/include/rosbag2_cpp/storage_options.hpp
Expand Up @@ -30,10 +30,10 @@ struct StorageOptions
// A value of 0 indicates that bagfile splitting will not be used.
uint64_t max_bagfile_size;

// The chunk size indiciates how many messages should be hold in cache
// The cache size indiciates how many messages can maximally be hold in cache
// before these being written to disk.
// Defaults to 1, and effectively disables the caching.
size_t chunk_size = 1;
// Defaults to 0, and effectively disables the caching.
uint64_t max_cache_size = 0;
};

} // namespace rosbag2_cpp
Expand Down
10 changes: 5 additions & 5 deletions rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp
Expand Up @@ -110,14 +110,14 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter
std::unique_ptr<rosbag2_storage::MetadataIo> metadata_io_;
std::unique_ptr<Converter> converter_;

// Intermediate cache to write multiple messages into the storage.
// chunk size is the amount of messages to hold in storage before writing to disk.
size_t chunk_size_;
std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> cache_;

// Used in bagfile splitting; specifies the best-effort maximum sub-section of a bagfile in bytes.
uint64_t max_bagfile_size_;

// Intermediate cache to write multiple messages into the storage.
// `max_cache_size` is the amount of messages to hold in storage before writing to disk.
size_t max_cache_size_;
std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> cache_;

// Used to track topic -> message count
std::unordered_map<std::string, rosbag2_storage::TopicInformation> topics_names_to_info_;

Expand Down
28 changes: 13 additions & 15 deletions rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp
Expand Up @@ -77,11 +77,11 @@ void SequentialWriter::open(
const StorageOptions & storage_options,
const ConverterOptions & converter_options)
{
max_bagfile_size_ = storage_options.max_bagfile_size;
base_folder_ = storage_options.uri;
chunk_size_ = storage_options.chunk_size;
max_bagfile_size_ = storage_options.max_bagfile_size;
max_cache_size_ = storage_options.max_cache_size;

cache_.reserve(chunk_size_);
cache_.reserve(max_cache_size_);

if (converter_options.output_serialization_format !=
converter_options.input_serialization_format)
Expand Down Expand Up @@ -206,19 +206,17 @@ void SequentialWriter::write(std::shared_ptr<rosbag2_storage::SerializedBagMessa
const auto duration = message_timestamp - metadata_.starting_time;
metadata_.duration = std::max(metadata_.duration, duration);

// No need to cache if there's only space for 1 message
// We'll call write directly in this case.
if (chunk_size_ <= 1) {
// if cache size is set to zero, we directly call write
if (max_cache_size_ == 0u) {
storage_->write(converter_ ? converter_->convert(message) : message);
return;
}

cache_.push_back(converter_ ? converter_->convert(message) : message);
if (cache_.size() >= chunk_size_) {
storage_->write(cache_);
// reset cache
cache_.clear();
cache_.reserve(chunk_size_);
} else {
cache_.push_back(converter_ ? converter_->convert(message) : message);
if (cache_.size() >= max_cache_size_) {
storage_->write(cache_);
// reset cache
cache_.clear();
cache_.reserve(max_cache_size_);
}
}
}

Expand Down
12 changes: 6 additions & 6 deletions rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp
Expand Up @@ -258,12 +258,12 @@ TEST_F(SequentialWriterTest, writer_splits_when_storage_bagfile_size_gt_max_bagf

TEST_F(SequentialWriterTest, only_write_after_cache_is_full) {
const size_t counter = 1000;
const size_t chunk_size = 100;
const size_t max_cache_size = 100;

EXPECT_CALL(
*storage_,
write(An<const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> &>())).
Times(counter / chunk_size);
Times(counter / max_cache_size);
EXPECT_CALL(
*storage_,
write(An<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>>())).Times(0);
Expand All @@ -278,7 +278,7 @@ TEST_F(SequentialWriterTest, only_write_after_cache_is_full) {
message->topic_name = "test_topic";

storage_options_.max_bagfile_size = 0;
storage_options_.chunk_size = chunk_size;
storage_options_.max_cache_size = max_cache_size;

writer_->open(storage_options_, {rmw_format, rmw_format});
writer_->create_topic({"test_topic", "test_msgs/BasicTypes", "", ""});
Expand All @@ -288,9 +288,9 @@ TEST_F(SequentialWriterTest, only_write_after_cache_is_full) {
}
}

TEST_F(SequentialWriterTest, do_not_use_cache_if_chunk_size_is_one) {
TEST_F(SequentialWriterTest, do_not_use_cache_if_cache_size_is_zero) {
const size_t counter = 1000;
const size_t chunk_size = 1;
const size_t max_cache_size = 0;

EXPECT_CALL(
*storage_,
Expand All @@ -310,7 +310,7 @@ TEST_F(SequentialWriterTest, do_not_use_cache_if_chunk_size_is_one) {
message->topic_name = "test_topic";

storage_options_.max_bagfile_size = 0;
storage_options_.chunk_size = chunk_size;
storage_options_.max_cache_size = max_cache_size;

writer_->open(storage_options_, {rmw_format, rmw_format});
writer_->create_topic({"test_topic", "test_msgs/BasicTypes", "", ""});
Expand Down

0 comments on commit 21b2c80

Please sign in to comment.