Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transaction based sqlite3 inserts #225

Merged
merged 25 commits into from
Apr 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
ef0a73f
First iteration of batched/transation based inserts.
Dec 3, 2019
4dc0a57
Commit trasactions on intermediate failures
Dec 3, 2019
a254e19
Changes made to fit coding style and first level of review comments
Dec 13, 2019
67bcdcd
Fix compilation issue and uncrustify errors
Dec 17, 2019
1d6b4cf
First iteration of batched/transation based inserts.
Dec 3, 2019
6fef091
Commit trasactions on intermediate failures
Dec 3, 2019
8898cd4
Changes made to fit coding style and first level of review comments
Dec 13, 2019
37c0e40
Fix compilation issue and uncrustify errors
Dec 17, 2019
fe65e3e
Rework transaction to fit in the comments from pull request,
Feb 13, 2020
2a5f601
Fix errors creeped in due to rebase. Compilation fails due to
Feb 13, 2020
c1bd37d
Fix compilation issue and uncrustify errors
Dec 17, 2019
b3d2b3e
Fix errors creeped in due to rebase. Compilation fails due to
Feb 13, 2020
342a4c7
Fix, uncrustify issues and address comments from github issue 225
Mar 24, 2020
a7effdc
review comments
Karsten1987 Mar 30, 2020
c0768c9
reset cache
Karsten1987 Mar 30, 2020
bc60c79
disable cache when set to lt 1
Karsten1987 Mar 30, 2020
1ebc289
test cache behavior in sequential writer
Karsten1987 Mar 30, 2020
381a103
uncrustify
Karsten1987 Mar 30, 2020
23804e8
fix cpplint
Karsten1987 Mar 30, 2020
21b2c80
address review comments
Karsten1987 Apr 1, 2020
83042cc
correct sqlite statements
Karsten1987 Apr 1, 2020
bac3558
expose max cache size through transport
Karsten1987 Apr 1, 2020
3d2647a
linters
Karsten1987 Apr 1, 2020
d46f141
add end to end test for cache
Karsten1987 Apr 1, 2020
b4401d6
uint64_t
Karsten1987 Apr 1, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions ros2bag/ros2bag/verb/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ def add_arguments(self, parser, cli_name): # noqa: D102
'Default it is zero, recording written in single bagfile and splitting '
'is disabled.'
)
parser.add_argument(
'--max-cache-size', type=int, default=0,
help='maximum amount of messages to hold in cache before writing to disk. '
'Default it is zero, writing every message directly to disk.'
)
parser.add_argument(
'--compression-mode', type=str, default='none',
choices=['none', 'file', 'message'],
Expand Down Expand Up @@ -109,6 +114,7 @@ def main(self, *, args): # noqa: D102
no_discovery=args.no_discovery,
polling_interval=args.polling_interval,
max_bagfile_size=args.max_bag_size,
max_cache_size=args.max_cache_size,
include_hidden_topics=args.include_hidden_topics)
elif args.topics and len(args.topics) > 0:
# NOTE(hidmic): in merged install workspaces on Windows, Python entrypoint lookups
Expand All @@ -128,6 +134,7 @@ def main(self, *, args): # noqa: D102
no_discovery=args.no_discovery,
polling_interval=args.polling_interval,
max_bagfile_size=args.max_bag_size,
max_cache_size=args.max_cache_size,
topics=args.topics,
include_hidden_topics=args.include_hidden_topics)
else:
Expand Down
11 changes: 7 additions & 4 deletions rosbag2_cpp/include/rosbag2_cpp/storage_options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@ struct StorageOptions
std::string uri;
std::string storage_id;

/**
* The maximum size a bagfile can be, in bytes, before it is split.
* A value of 0 indicates that bagfile splitting will not be used.
*/
// The maximum size a bagfile can be, in bytes, before it is split.
// A value of 0 indicates that bagfile splitting will not be used.
uint64_t max_bagfile_size;

// The cache size indiciates how many messages can maximally be hold in cache
// before these being written to disk.
// Defaults to 0, and effectively disables the caching.
uint64_t max_cache_size = 0;
};

} // namespace rosbag2_cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter
// Used in bagfile splitting; specifies the best-effort maximum sub-section of a bagfile in bytes.
uint64_t max_bagfile_size_;

// Intermediate cache to write multiple messages into the storage.
// `max_cache_size` is the amount of messages to hold in storage before writing to disk.
uint64_t max_cache_size_;
std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> cache_;

// Used to track topic -> message count
std::unordered_map<std::string, rosbag2_storage::TopicInformation> topics_names_to_info_;

Expand Down
18 changes: 16 additions & 2 deletions rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,11 @@ void SequentialWriter::open(
const StorageOptions & storage_options,
const ConverterOptions & converter_options)
{
max_bagfile_size_ = storage_options.max_bagfile_size;
base_folder_ = storage_options.uri;
max_bagfile_size_ = storage_options.max_bagfile_size;
max_cache_size_ = storage_options.max_cache_size;

cache_.reserve(max_cache_size_);

if (converter_options.output_serialization_format !=
converter_options.input_serialization_format)
Expand Down Expand Up @@ -203,7 +206,18 @@ void SequentialWriter::write(std::shared_ptr<rosbag2_storage::SerializedBagMessa
const auto duration = message_timestamp - metadata_.starting_time;
metadata_.duration = std::max(metadata_.duration, duration);

storage_->write(converter_ ? converter_->convert(message) : message);
// if cache size is set to zero, we directly call write
if (max_cache_size_ == 0u) {
storage_->write(converter_ ? converter_->convert(message) : message);
} else {
cache_.push_back(converter_ ? converter_->convert(message) : message);
if (cache_.size() >= max_cache_size_) {
storage_->write(cache_);
// reset cache
cache_.clear();
cache_.reserve(max_cache_size_);
}
}
}

bool SequentialWriter::should_split_bagfile() const
Expand Down
3 changes: 3 additions & 0 deletions rosbag2_cpp/test/rosbag2_cpp/mock_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ class MockStorage : public rosbag2_storage::storage_interfaces::ReadWriteInterfa
MOCK_METHOD0(has_next, bool());
MOCK_METHOD0(read_next, std::shared_ptr<rosbag2_storage::SerializedBagMessage>());
MOCK_METHOD1(write, void(std::shared_ptr<const rosbag2_storage::SerializedBagMessage>));
MOCK_METHOD1(
write,
void(const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>>&));
MOCK_METHOD0(get_all_topics_and_types, std::vector<rosbag2_storage::TopicMetadata>());
MOCK_METHOD0(get_metadata, rosbag2_storage::BagMetadata());
MOCK_CONST_METHOD0(get_bagfile_size, uint64_t());
Expand Down
68 changes: 67 additions & 1 deletion rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,9 @@ TEST_F(SequentialWriterTest, writer_splits_when_storage_bagfile_size_gt_max_bagf
const auto expected_splits = message_count / max_bagfile_size;
fake_storage_size_ = 0;

ON_CALL(*storage_, write).WillByDefault(
ON_CALL(
*storage_,
write(An<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>>())).WillByDefault(
[this](std::shared_ptr<const rosbag2_storage::SerializedBagMessage>) {
fake_storage_size_ += 1;
});
Expand Down Expand Up @@ -253,3 +255,67 @@ TEST_F(SequentialWriterTest, writer_splits_when_storage_bagfile_size_gt_max_bagf
EXPECT_EQ(expected_path, path);
}
}

TEST_F(SequentialWriterTest, only_write_after_cache_is_full) {
const size_t counter = 1000;
const uint64_t max_cache_size = 100;

EXPECT_CALL(
*storage_,
write(An<const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> &>())).
Times(counter / max_cache_size);
EXPECT_CALL(
*storage_,
write(An<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>>())).Times(0);

auto sequential_writer = std::make_unique<rosbag2_cpp::writers::SequentialWriter>(
std::move(storage_factory_), converter_factory_, std::move(metadata_io_));
writer_ = std::make_unique<rosbag2_cpp::Writer>(std::move(sequential_writer));

std::string rmw_format = "rmw_format";

auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->topic_name = "test_topic";

storage_options_.max_bagfile_size = 0;
storage_options_.max_cache_size = max_cache_size;

writer_->open(storage_options_, {rmw_format, rmw_format});
writer_->create_topic({"test_topic", "test_msgs/BasicTypes", "", ""});

for (auto i = 0u; i < counter; ++i) {
writer_->write(message);
}
}

TEST_F(SequentialWriterTest, do_not_use_cache_if_cache_size_is_zero) {
const size_t counter = 1000;
const uint64_t max_cache_size = 0;

EXPECT_CALL(
*storage_,
write(An<const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> &>())).
Times(0);
EXPECT_CALL(
*storage_,
write(An<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>>())).Times(counter);

auto sequential_writer = std::make_unique<rosbag2_cpp::writers::SequentialWriter>(
std::move(storage_factory_), converter_factory_, std::move(metadata_io_));
writer_ = std::make_unique<rosbag2_cpp::Writer>(std::move(sequential_writer));

std::string rmw_format = "rmw_format";

auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->topic_name = "test_topic";

storage_options_.max_bagfile_size = 0;
storage_options_.max_cache_size = max_cache_size;

writer_->open(storage_options_, {rmw_format, rmw_format});
writer_->create_topic({"test_topic", "test_msgs/BasicTypes", "", ""});

for (auto i = 0u; i < counter; ++i) {
writer_->write(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <memory>
#include <string>
#include <vector>

#include "rosbag2_storage/serialized_bag_message.hpp"
#include "rosbag2_storage/bag_metadata.hpp"
Expand All @@ -35,6 +36,8 @@ class ROSBAG2_STORAGE_PUBLIC BaseWriteInterface

virtual void write(std::shared_ptr<const SerializedBagMessage> msg) = 0;

virtual void write(const std::vector<std::shared_ptr<const SerializedBagMessage>> & msg) = 0;

virtual void create_topic(const TopicMetadata & topic) = 0;

virtual void remove_topic(const TopicMetadata & topic) = 0;
Expand Down
7 changes: 7 additions & 0 deletions rosbag2_storage/test/rosbag2_storage/test_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ void TestPlugin::write(const std::shared_ptr<const rosbag2_storage::SerializedBa
std::cout << "\nwriting\n";
}

void TestPlugin::write(
const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> & msg)
{
(void) msg;
std::cout << "\nwriting multiple\n";
}

std::vector<rosbag2_storage::TopicMetadata> TestPlugin::get_all_topics_and_types()
{
std::cout << "\nreading topics and types\n";
Expand Down
3 changes: 3 additions & 0 deletions rosbag2_storage/test/rosbag2_storage/test_plugin.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ class TestPlugin : public rosbag2_storage::storage_interfaces::ReadWriteInterfac

void write(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> msg) override;

void write(const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> & msg)
override;

std::vector<rosbag2_storage::TopicMetadata> get_all_topics_and_types() override;

rosbag2_storage::BagMetadata get_metadata() override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef ROSBAG2_STORAGE_DEFAULT_PLUGINS__SQLITE__SQLITE_STORAGE_HPP_
#define ROSBAG2_STORAGE_DEFAULT_PLUGINS__SQLITE__SQLITE_STORAGE_HPP_

#include <atomic>
#include <memory>
#include <string>
#include <unordered_map>
Expand Down Expand Up @@ -43,7 +44,8 @@ class ROSBAG2_STORAGE_DEFAULT_PLUGINS_PUBLIC SqliteStorage
{
public:
SqliteStorage() = default;
~SqliteStorage() override = default;

~SqliteStorage() override;
Karsten1987 marked this conversation as resolved.
Show resolved Hide resolved

void open(
const std::string & uri,
Expand All @@ -56,6 +58,10 @@ class ROSBAG2_STORAGE_DEFAULT_PLUGINS_PUBLIC SqliteStorage

void write(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> message) override;

void write(
const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> & messages)
override;

bool has_next() override;

std::shared_ptr<rosbag2_storage::SerializedBagMessage> read_next() override;
Expand All @@ -77,6 +83,8 @@ class ROSBAG2_STORAGE_DEFAULT_PLUGINS_PUBLIC SqliteStorage
void prepare_for_writing();
void prepare_for_reading();
void fill_topics_and_types();
void activate_transaction();
void commit_transaction();

using ReadQueryResult = SqliteStatementWrapper::QueryResult<
std::shared_ptr<rcutils_uint8_array_t>, rcutils_time_point_value_t, std::string>;
Expand All @@ -90,6 +98,7 @@ class ROSBAG2_STORAGE_DEFAULT_PLUGINS_PUBLIC SqliteStorage
std::unordered_map<std::string, int> topics_;
std::vector<rosbag2_storage::TopicMetadata> all_topics_and_types_;
std::string relative_path_;
std::atomic_bool active_transaction_ {false};
};

} // namespace rosbag2_storage_plugins
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <sys/stat.h>

#include <atomic>
#include <chrono>
#include <cstring>
#include <iostream>
Expand Down Expand Up @@ -63,6 +64,12 @@ constexpr const uint64_t MIN_SPLIT_FILE_SIZE = 86016;

namespace rosbag2_storage_plugins
{
SqliteStorage::~SqliteStorage()
{
if (active_transaction_) {
commit_transaction();
}
}

void SqliteStorage::open(
const std::string & uri, rosbag2_storage::storage_interfaces::IOFlag io_flag)
Expand Down Expand Up @@ -105,6 +112,30 @@ void SqliteStorage::open(
"Opened database '" << relative_path_ << "' for " << to_string(io_flag) << ".");
}

void SqliteStorage::activate_transaction()
{
if (active_transaction_) {
Karsten1987 marked this conversation as resolved.
Show resolved Hide resolved
return;
}

ROSBAG2_STORAGE_DEFAULT_PLUGINS_LOG_DEBUG_STREAM("begin transaction");
database_->prepare_statement("BEGIN TRANSACTION;")->execute_and_reset();

active_transaction_ = true;
}

void SqliteStorage::commit_transaction()
{
if (!active_transaction_) {
Karsten1987 marked this conversation as resolved.
Show resolved Hide resolved
Karsten1987 marked this conversation as resolved.
Show resolved Hide resolved
return;
}

ROSBAG2_STORAGE_DEFAULT_PLUGINS_LOG_DEBUG_STREAM("commit transaction");
database_->prepare_statement("COMMIT;")->execute_and_reset();

active_transaction_ = false;
}

void SqliteStorage::write(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> message)
{
if (!write_statement_) {
Expand All @@ -121,6 +152,22 @@ void SqliteStorage::write(std::shared_ptr<const rosbag2_storage::SerializedBagMe
write_statement_->execute_and_reset();
Karsten1987 marked this conversation as resolved.
Show resolved Hide resolved
}

void SqliteStorage::write(
const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> & messages)
{
if (!write_statement_) {
prepare_for_writing();
}

activate_transaction();

for (auto & message : messages) {
write(message);
}

commit_transaction();
}

bool SqliteStorage::has_next()
{
if (!read_statement_) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,3 +517,50 @@ TEST_F(RecordFixture, record_fails_gracefully_if_plugin_for_given_encoding_does_
EXPECT_THAT(
error_output, HasSubstr("Requested converter for format 'some_rmw' does not exist"));
}

TEST_F(RecordFixture, record_end_to_end_test_with_cache) {
auto max_cache_size = 10;
auto topic_name = "/rosbag2_cache_test_topic";

auto message = get_messages_strings()[0];
message->string_value = "test";
size_t expected_test_messages = 33;

auto process_handle = start_execution(
"ros2 bag record --output " + root_bag_path_.string() + " " + topic_name + " " +
"--max-cache-size " + std::to_string(max_cache_size));
wait_for_db();

pub_man_.add_publisher(topic_name, message, expected_test_messages);

const auto database_path = get_bag_file_path(0).string();

rosbag2_storage_plugins::SqliteWrapper db{
database_path, rosbag2_storage::storage_interfaces::IOFlag::READ_ONLY};
pub_man_.run_publishers(
[this, &db](const std::string & topic_name) {
return count_stored_messages(db, topic_name);
});

stop_execution(process_handle);

// TODO(Martin-Idel-SI): Find out how to correctly send a Ctrl-C signal on Windows
// This is necessary as the process is killed hard on Windows and doesn't write a metadata file
#ifdef _WIN32
rosbag2_storage::BagMetadata metadata{};
metadata.version = 1;
metadata.storage_identifier = "sqlite3";
metadata.relative_file_paths = {get_bag_file_path(0).string()};
metadata.duration = std::chrono::nanoseconds(0);
metadata.starting_time =
std::chrono::time_point<std::chrono::high_resolution_clock>(std::chrono::nanoseconds(0));
metadata.message_count = 0;
rosbag2_storage::MetadataIo metadata_io;
metadata_io.write_metadata(root_bag_path_.string(), metadata);
#endif

wait_for_metadata();
auto test_topic_messages =
get_messages_for_topic<test_msgs::msg::Strings>(topic_name);
EXPECT_THAT(test_topic_messages, SizeIs(Ge(expected_test_messages)));
}
Loading