Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes an init race condition #93

Merged
merged 5 commits into from
May 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions rosbag2/include/rosbag2/writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,16 @@ class ROSBAG2_PUBLIC Writer
*/
virtual void create_topic(const TopicMetadata & topic_with_type);

/**
* Remove a new topic in the underlying storage.
* If creation of subscription fails remove the topic
* from the db (more of cleanup)
*
* \param topic_with_type name and type identifier of topic to be created
* \throws runtime_error if the Writer is not open.
*/
virtual void remove_topic(const TopicMetadata & topic_with_type);

/**
* Write a message to a bagfile. The topic needs to have been created before writing is possible.
*
Expand Down
9 changes: 9 additions & 0 deletions rosbag2/src/rosbag2/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@ void Writer::create_topic(const TopicMetadata & topic_with_type)
storage_->create_topic(topic_with_type);
}

void Writer::remove_topic(const TopicMetadata & topic_with_type)
{
if (!storage_) {
throw std::runtime_error("Bag is not open. Call open() before removing.");
}

storage_->remove_topic(topic_with_type);
}

void Writer::write(std::shared_ptr<SerializedBagMessage> message)
{
if (!storage_) {
Expand Down
1 change: 1 addition & 0 deletions rosbag2/test/rosbag2/mock_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class MockStorage : public rosbag2_storage::storage_interfaces::ReadWriteInterfa
public:
MOCK_METHOD2(open, void(const std::string &, rosbag2_storage::storage_interfaces::IOFlag));
MOCK_METHOD1(create_topic, void(const rosbag2_storage::TopicMetadata &));
MOCK_METHOD1(remove_topic, void(const rosbag2_storage::TopicMetadata &));
MOCK_METHOD0(has_next, bool());
MOCK_METHOD0(read_next, std::shared_ptr<rosbag2_storage::SerializedBagMessage>());
MOCK_METHOD1(write, void(std::shared_ptr<const rosbag2_storage::SerializedBagMessage>));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class ROSBAG2_STORAGE_PUBLIC BaseWriteInterface
virtual void write(std::shared_ptr<const SerializedBagMessage> msg) = 0;

virtual void create_topic(const TopicMetadata & topic) = 0;

virtual void remove_topic(const TopicMetadata & topic) = 0;
};

} // namespace storage_interfaces
Expand Down
5 changes: 5 additions & 0 deletions rosbag2_storage/test/rosbag2_storage/test_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ void TestPlugin::create_topic(const rosbag2_storage::TopicMetadata & topic)
std::cout << "Created topic with name =" << topic.name << " and type =" << topic.type << ".\n";
}

void TestPlugin::remove_topic(const rosbag2_storage::TopicMetadata & topic)
{
std::cout << "Removed topic with name =" << topic.name << " and type =" << topic.type << ".\n";
}

void TestPlugin::write(const std::shared_ptr<const rosbag2_storage::SerializedBagMessage> msg)
{
(void) msg;
Expand Down
2 changes: 2 additions & 0 deletions rosbag2_storage/test/rosbag2_storage/test_plugin.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class TestPlugin : public rosbag2_storage::storage_interfaces::ReadWriteInterfac

void create_topic(const rosbag2_storage::TopicMetadata & topic) override;

void remove_topic(const rosbag2_storage::TopicMetadata & topic) override;

bool has_next() override;

std::shared_ptr<rosbag2_storage::SerializedBagMessage> read_next() override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class ROSBAG2_STORAGE_DEFAULT_PLUGINS_PUBLIC SqliteStorage
rosbag2_storage::storage_interfaces::IOFlag io_flag =
rosbag2_storage::storage_interfaces::IOFlag::READ_WRITE) override;

void remove_topic(const rosbag2_storage::TopicMetadata & topic) override;

void create_topic(const rosbag2_storage::TopicMetadata & topic) override;

void write(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> message) override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,18 @@ void SqliteStorage::create_topic(const rosbag2_storage::TopicMetadata & topic)
}
}

void SqliteStorage::remove_topic(const rosbag2_storage::TopicMetadata & topic)
{
if (topics_.find(topic.name) != std::end(topics_)) {
auto delete_topic =
database_->prepare_statement(
"DELETE FROM topics where name = ? and type = ? and serialization_format = ?");
delete_topic->bind(topic.name, topic.type, topic.serialization_format);
delete_topic->execute_and_reset();
topics_.erase(topic.name);
}
}

void SqliteStorage::prepare_for_writing()
{
write_statement_ = database_->prepare_statement(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,22 @@ TEST_F(StorageTestFixture, get_metadata_returns_correct_struct_if_no_messages) {
));
EXPECT_THAT(metadata.duration, Eq(std::chrono::seconds(0)));
}

TEST_F(StorageTestFixture, remove_topics_and_types_returns_the_empty_vector) {
std::unique_ptr<rosbag2_storage::storage_interfaces::ReadWriteInterface> writable_storage =
std::make_unique<rosbag2_storage_plugins::SqliteStorage>();
writable_storage->open(temporary_dir_path_);
writable_storage->create_topic({"topic1", "type1", "rmw1"});
metadata_io_.write_metadata(temporary_dir_path_, writable_storage->get_metadata());
writable_storage->remove_topic({"topic1", "type1", "rmw1"});
writable_storage.reset();

// Remove topics

auto readable_storage = std::make_unique<rosbag2_storage_plugins::SqliteStorage>();
readable_storage->open(
temporary_dir_path_, rosbag2_storage::storage_interfaces::IOFlag::READ_ONLY);
auto topics_and_types = readable_storage->get_all_topics_and_types();

EXPECT_THAT(topics_and_types, IsEmpty());
}
6 changes: 5 additions & 1 deletion rosbag2_transport/src/rosbag2_transport/recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,15 @@ void Recorder::subscribe_topics(
void Recorder::subscribe_topic(const rosbag2::TopicMetadata & topic)
{
auto subscription = create_subscription(topic.name, topic.type);

if (subscription) {
writer_->create_topic(topic);
subscribed_topics_.insert(topic.name);
subscriptions_.push_back(subscription);
writer_->create_topic(topic);
ROSBAG2_TRANSPORT_LOG_INFO_STREAM("Subscribed to topic '" << topic.name << "'");
} else {
writer_->remove_topic(topic);
subscribed_topics_.erase(topic.name);
}
}

Expand Down