From 20cfc02ede90151a4f7e8859972d4637f8c696c5 Mon Sep 17 00:00:00 2001 From: yschulz Date: Mon, 6 May 2024 23:59:57 +0200 Subject: [PATCH] Bugfix for writer not being able to open again after closing (#1599) * re-applies fixes from #1590 to rolling. Also removes new message definition in sequential writer test for multiple open operations. Also clears topic_names_to_message_definitions_ and handles message_definitions_s underlying container similarly. Lastly, also avoids reset of factory in the compression writer, adds unit test there too. Signed-off-by: Yannick Schulz * removes unused compressor_ member from compresser writer class. Also delegates rest of the closing behavior to the base class in close method, as it is handled in the open and write methods of the compression writer Signed-off-by: Yannick Schulz * Remove unrelated delta - message_definitions_ was intentionally allocated on the stack and should persist between writer close() and open() because it represents cache for message definitions which is not changes. Signed-off-by: Michael Orlov * Don't call virtual methods from destructors Signed-off-by: Michael Orlov * Cleanup 'rosbag2_directory_next' after the test run Signed-off-by: Michael Orlov * Protect Writer::open(..) and Writer::close() with mutex on upper level - Rationale: To avoid race conditions if open(..) and close() could be ever be called from different threads. Signed-off-by: Michael Orlov * Bugfix for WRITE_SPLIT callback not called for the last compressed file Signed-off-by: Michael Orlov * Bugfix for lost messages from cache when closing compression writer Signed-off-by: Michael Orlov * Use dedicated is_open_ flag instead of relying on storage_ Signed-off-by: Michael Orlov --------- Signed-off-by: Yannick Schulz Signed-off-by: Michael Orlov Co-authored-by: Michael Orlov --- .../sequential_compression_writer.hpp | 2 +- .../sequential_compression_writer.cpp | 42 +++++++++++-------- .../test_sequential_compression_writer.cpp | 25 +++++++++++ .../rosbag2_cpp/writers/sequential_writer.hpp | 4 ++ rosbag2_cpp/src/rosbag2_cpp/writer.cpp | 2 + .../rosbag2_cpp/writers/sequential_writer.cpp | 38 ++++++++++++----- .../rosbag2_tests/test_rosbag2_cpp_api.cpp | 34 ++++++++++++++- 7 files changed, 118 insertions(+), 29 deletions(-) diff --git a/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp b/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp index 192fb4d4f7..d1265848ed 100644 --- a/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp +++ b/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp @@ -182,7 +182,6 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionWriter private: std::unique_ptr compression_factory_{}; - std::shared_ptr compressor_{}; std::mutex compressor_queue_mutex_; std::queue> compressor_message_queue_ RCPPUTILS_TSA_GUARDED_BY(compressor_queue_mutex_); @@ -198,6 +197,7 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionWriter rosbag2_compression::CompressionOptions compression_options_{}; bool should_compress_last_file_{true}; + std::atomic_bool is_open_{false}; // Runs a while loop that pulls data from the compression queue until // compression_is_running_ is false; should be run in a separate thread diff --git a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp index b60d31bbc7..a5da3e7891 100644 --- a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp +++ b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp @@ -65,7 +65,7 @@ SequentialCompressionWriter::SequentialCompressionWriter( SequentialCompressionWriter::~SequentialCompressionWriter() { - close(); + SequentialCompressionWriter::close(); } void SequentialCompressionWriter::compression_thread_fn() @@ -225,13 +225,22 @@ void SequentialCompressionWriter::open( const rosbag2_storage::StorageOptions & storage_options, const rosbag2_cpp::ConverterOptions & converter_options) { + // Note. close and open methods protected with mutex on upper rosbag2_cpp::writer level. + if (this->is_open_) { + return; // The writer already opened. + } std::lock_guard lock(storage_mutex_); SequentialWriter::open(storage_options, converter_options); setup_compression(); + this->is_open_ = true; } void SequentialCompressionWriter::close() { + // Note. close and open methods protected with mutex on upper rosbag2_cpp::writer level. + if (!this->is_open_.exchange(false)) { + return; // The writer is not open + } if (!base_folder_.empty()) { // Reset may be called before initializing the compressor (ex. bad options). // We compress the last file only if it hasn't been compressed earlier (ex. in split_bagfile()). @@ -241,7 +250,18 @@ void SequentialCompressionWriter::close() std::lock_guard lock(storage_mutex_); std::lock_guard compressor_lock(compressor_queue_mutex_); try { - storage_.reset(); // Storage must be closed before it can be compressed. + // Storage must be closed before file can be compressed. + if (use_cache_) { + // destructor will flush message cache + cache_consumer_.reset(); + message_cache_.reset(); + } + finalize_metadata(); + if (storage_) { + storage_->update_metadata(metadata_); + storage_.reset(); // Storage will be closed in storage_ destructor + } + if (!metadata_.relative_file_paths.empty()) { std::string file = metadata_.relative_file_paths.back(); compressor_file_queue_.push(file); @@ -251,22 +271,10 @@ void SequentialCompressionWriter::close() ROSBAG2_COMPRESSION_LOG_WARN_STREAM("Could not compress the last bag file.\n" << e.what()); } } - - stop_compressor_threads(); - - finalize_metadata(); - if (storage_) { - storage_->update_metadata(metadata_); - } - metadata_io_->write_metadata(base_folder_, metadata_); - } - - if (use_cache_) { - cache_consumer_.reset(); - message_cache_.reset(); } - storage_.reset(); // Necessary to ensure that the storage is destroyed before the factory - storage_factory_.reset(); + stop_compressor_threads(); // Note: The metadata_.relative_file_paths will be updated with + // compressed filename when compressor threads will finish. + SequentialWriter::close(); } void SequentialCompressionWriter::create_topic( diff --git a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp index c17a0da25d..fbcc80b908 100644 --- a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp +++ b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp @@ -222,6 +222,31 @@ TEST_F(SequentialCompressionWriterTest, open_succeeds_on_supported_compression_f writer_->open(tmp_dir_storage_options_, {serialization_format_, serialization_format_})); } +TEST_F(SequentialCompressionWriterTest, open_succeeds_twice) +{ + rosbag2_compression::CompressionOptions compression_options{ + DefaultTestCompressor, rosbag2_compression::CompressionMode::FILE, + kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads, + kDefaultCompressionQueueThreadsPriority}; + initializeWriter(compression_options); + + auto tmp_dir = fs::temp_directory_path() / "path_not_empty"; + auto tmp_dir_next = fs::temp_directory_path() / "path_not_empty_next"; + + auto storage_options = rosbag2_storage::StorageOptions(); + auto storage_options_next = rosbag2_storage::StorageOptions(); + + storage_options.uri = tmp_dir.string(); + storage_options_next.uri = tmp_dir_next.string(); + + EXPECT_NO_THROW( + writer_->open(storage_options, {serialization_format_, serialization_format_})); + + writer_->close(); + EXPECT_NO_THROW( + writer_->open(storage_options_next, {serialization_format_, serialization_format_})); +} + TEST_F(SequentialCompressionWriterTest, writer_calls_create_compressor) { rosbag2_compression::CompressionOptions compression_options{ diff --git a/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp b/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp index 1695203185..d9c541d101 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp @@ -162,6 +162,9 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter // Used to track topic -> message count. If cache is present, it is updated by CacheConsumer std::unordered_map topics_names_to_info_; + // Note: topics_names_to_info_ needs to be protected with mutex only when we are explicitly + // adding or deleting items (create_topic(..)/remove_topic(..)) and when we access it from + // CacheConsumer callback i.e., write_messages(..) std::mutex topics_info_mutex_; LocalMessageDefinitionSource message_definitions_; @@ -197,6 +200,7 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter void write_messages( const std::vector> & messages); bool is_first_message_ {true}; + std::atomic_bool is_open_{false}; bag_events::EventCallbackManager callback_manager_; }; diff --git a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp index a6e14b4f41..56d9788787 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp @@ -62,11 +62,13 @@ void Writer::open( const rosbag2_storage::StorageOptions & storage_options, const ConverterOptions & converter_options) { + std::lock_guard writer_lock(writer_mutex_); writer_impl_->open(storage_options, converter_options); } void Writer::close() { + std::lock_guard writer_lock(writer_mutex_); writer_impl_->close(); } diff --git a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp index 10a69b9235..a058a20733 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp @@ -59,7 +59,7 @@ SequentialWriter::SequentialWriter( metadata_io_(std::move(metadata_io)), converter_(nullptr), topics_names_to_info_(), - message_definitions_(), + topic_names_to_message_definitions_(), metadata_() {} @@ -69,7 +69,7 @@ SequentialWriter::~SequentialWriter() // Callbacks likely was created after SequentialWriter object and may point to the already // destructed objects. callback_manager_.delete_all_callbacks(); - close(); + SequentialWriter::close(); } void SequentialWriter::init_metadata() @@ -97,8 +97,13 @@ void SequentialWriter::open( const rosbag2_storage::StorageOptions & storage_options, const ConverterOptions & converter_options) { + // Note. close and open methods protected with mutex on upper rosbag2_cpp::writer level. + if (is_open_) { + return; // The writer already opened + } base_folder_ = storage_options.uri; storage_options_ = storage_options; + if (storage_options_.storage_id.empty()) { storage_options_.storage_id = rosbag2_storage::get_default_storage_id(); } @@ -161,10 +166,15 @@ void SequentialWriter::open( init_metadata(); storage_->update_metadata(metadata_); + is_open_ = true; } void SequentialWriter::close() { + // Note. close and open methods protected with mutex on upper rosbag2_cpp::writer level. + if (!is_open_.exchange(false)) { + return; // The writer is not open + } if (use_cache_) { // destructor will flush message cache cache_consumer_.reset(); @@ -180,13 +190,22 @@ void SequentialWriter::close() } if (storage_) { - auto info = std::make_shared(); - info->closed_file = storage_->get_relative_file_path(); storage_.reset(); // Destroy storage before calling WRITE_SPLIT callback to make sure that // bag file was closed before callback call. + } + if (!metadata_.relative_file_paths.empty()) { + auto info = std::make_shared(); + // Take the latest file name from metadata in case if it was updated after compression in + // derived class + info->closed_file = + (fs::path(base_folder_) / metadata_.relative_file_paths.back()).generic_string(); callback_manager_.execute_callbacks(bag_events::BagEvent::WRITE_SPLIT, info); } - storage_factory_.reset(); + + topics_names_to_info_.clear(); + topic_names_to_message_definitions_.clear(); + + converter_.reset(); } void SequentialWriter::create_topic(const rosbag2_storage::TopicMetadata & topic_with_type) @@ -226,7 +245,7 @@ void SequentialWriter::create_topic( return; } - if (!storage_) { + if (!is_open_) { throw std::runtime_error("Bag is not open. Call open() before writing."); } @@ -260,7 +279,7 @@ void SequentialWriter::create_topic( void SequentialWriter::remove_topic(const rosbag2_storage::TopicMetadata & topic_with_type) { - if (!storage_) { + if (!is_open_) { throw std::runtime_error("Bag is not open. Call open() before removing."); } @@ -308,14 +327,13 @@ void SequentialWriter::switch_to_next_storage() base_folder_, metadata_.relative_file_paths.size()); storage_ = storage_factory_->open_read_write(storage_options_); - storage_->update_metadata(metadata_); - if (!storage_) { std::stringstream errmsg; errmsg << "Failed to rollover bagfile to new file: \"" << storage_options_.uri << "\"!"; throw std::runtime_error(errmsg.str()); } + storage_->update_metadata(metadata_); // Re-register all topics since we rolled-over to a new bagfile. for (const auto & topic : topics_names_to_info_) { auto const & md = topic_names_to_message_definitions_[topic.first]; @@ -348,7 +366,7 @@ void SequentialWriter::split_bagfile() void SequentialWriter::write(std::shared_ptr message) { - if (!storage_) { + if (!is_open_) { throw std::runtime_error("Bag is not open. Call open() before writing."); } diff --git a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_api.cpp b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_api.cpp index f1d14cc0b8..377baa0fe6 100644 --- a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_api.cpp +++ b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_api.cpp @@ -52,8 +52,10 @@ TEST_P(TestRosbag2CPPAPI, minimal_writer_example) serialization.serialize_message(&test_msg, &serialized_msg); auto rosbag_directory = fs::path("test_rosbag2_writer_api_bag"); + auto rosbag_directory_next = fs::path("test_rosbag2_writer_api_bag_next"); // in case the bag was previously not cleaned up fs::remove_all(rosbag_directory); + fs::remove_all(rosbag_directory_next); { rosbag2_cpp::Writer writer; @@ -99,7 +101,18 @@ TEST_P(TestRosbag2CPPAPI, minimal_writer_example) // writing a non-serialized message writer.write(test_msg, "/a/ros2/message", rclcpp::Clock().now()); - // close on scope exit + // close as prompted + writer.close(); + + // open a new bag with the same writer + writer.open(rosbag_directory_next.string()); + + // write same topic to different bag + writer.write( + serialized_msg2, "/yet/another/topic", "test_msgs/msg/BasicTypes", + rclcpp::Clock().now()); + + // close by scope } { @@ -126,6 +139,24 @@ TEST_P(TestRosbag2CPPAPI, minimal_writer_example) // close on scope exit } + { + rosbag2_cpp::Reader reader; + std::string topic; + reader.open(rosbag_directory_next.string()); + ASSERT_TRUE(reader.has_next()); + + auto bag_message = reader.read_next(); + topic = bag_message->topic_name; + + TestMsgT extracted_test_msg; + rclcpp::SerializedMessage extracted_serialized_msg(*bag_message->serialized_data); + serialization.deserialize_message( + &extracted_serialized_msg, &extracted_test_msg); + + EXPECT_EQ(test_msg, extracted_test_msg); + EXPECT_EQ("/yet/another/topic", topic); + } + // alternative reader { rosbag2_cpp::Reader reader; @@ -140,6 +171,7 @@ TEST_P(TestRosbag2CPPAPI, minimal_writer_example) // remove the rosbag again after the test EXPECT_TRUE(fs::remove_all(rosbag_directory)); + EXPECT_TRUE(fs::remove_all(rosbag_directory_next)); } INSTANTIATE_TEST_SUITE_P(