diff --git a/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp b/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp index 192fb4d4f7..d1265848ed 100644 --- a/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp +++ b/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp @@ -182,7 +182,6 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionWriter private: std::unique_ptr compression_factory_{}; - std::shared_ptr compressor_{}; std::mutex compressor_queue_mutex_; std::queue> compressor_message_queue_ RCPPUTILS_TSA_GUARDED_BY(compressor_queue_mutex_); @@ -198,6 +197,7 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionWriter rosbag2_compression::CompressionOptions compression_options_{}; bool should_compress_last_file_{true}; + std::atomic_bool is_open_{false}; // Runs a while loop that pulls data from the compression queue until // compression_is_running_ is false; should be run in a separate thread diff --git a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp index b60d31bbc7..a5da3e7891 100644 --- a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp +++ b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp @@ -65,7 +65,7 @@ SequentialCompressionWriter::SequentialCompressionWriter( SequentialCompressionWriter::~SequentialCompressionWriter() { - close(); + SequentialCompressionWriter::close(); } void SequentialCompressionWriter::compression_thread_fn() @@ -225,13 +225,22 @@ void SequentialCompressionWriter::open( const rosbag2_storage::StorageOptions & storage_options, const rosbag2_cpp::ConverterOptions & converter_options) { + // Note. close and open methods protected with mutex on upper rosbag2_cpp::writer level. + if (this->is_open_) { + return; // The writer already opened. + } std::lock_guard lock(storage_mutex_); SequentialWriter::open(storage_options, converter_options); setup_compression(); + this->is_open_ = true; } void SequentialCompressionWriter::close() { + // Note. close and open methods protected with mutex on upper rosbag2_cpp::writer level. + if (!this->is_open_.exchange(false)) { + return; // The writer is not open + } if (!base_folder_.empty()) { // Reset may be called before initializing the compressor (ex. bad options). // We compress the last file only if it hasn't been compressed earlier (ex. in split_bagfile()). @@ -241,7 +250,18 @@ void SequentialCompressionWriter::close() std::lock_guard lock(storage_mutex_); std::lock_guard compressor_lock(compressor_queue_mutex_); try { - storage_.reset(); // Storage must be closed before it can be compressed. + // Storage must be closed before file can be compressed. + if (use_cache_) { + // destructor will flush message cache + cache_consumer_.reset(); + message_cache_.reset(); + } + finalize_metadata(); + if (storage_) { + storage_->update_metadata(metadata_); + storage_.reset(); // Storage will be closed in storage_ destructor + } + if (!metadata_.relative_file_paths.empty()) { std::string file = metadata_.relative_file_paths.back(); compressor_file_queue_.push(file); @@ -251,22 +271,10 @@ void SequentialCompressionWriter::close() ROSBAG2_COMPRESSION_LOG_WARN_STREAM("Could not compress the last bag file.\n" << e.what()); } } - - stop_compressor_threads(); - - finalize_metadata(); - if (storage_) { - storage_->update_metadata(metadata_); - } - metadata_io_->write_metadata(base_folder_, metadata_); - } - - if (use_cache_) { - cache_consumer_.reset(); - message_cache_.reset(); } - storage_.reset(); // Necessary to ensure that the storage is destroyed before the factory - storage_factory_.reset(); + stop_compressor_threads(); // Note: The metadata_.relative_file_paths will be updated with + // compressed filename when compressor threads will finish. + SequentialWriter::close(); } void SequentialCompressionWriter::create_topic( diff --git a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp index c17a0da25d..fbcc80b908 100644 --- a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp +++ b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp @@ -222,6 +222,31 @@ TEST_F(SequentialCompressionWriterTest, open_succeeds_on_supported_compression_f writer_->open(tmp_dir_storage_options_, {serialization_format_, serialization_format_})); } +TEST_F(SequentialCompressionWriterTest, open_succeeds_twice) +{ + rosbag2_compression::CompressionOptions compression_options{ + DefaultTestCompressor, rosbag2_compression::CompressionMode::FILE, + kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads, + kDefaultCompressionQueueThreadsPriority}; + initializeWriter(compression_options); + + auto tmp_dir = fs::temp_directory_path() / "path_not_empty"; + auto tmp_dir_next = fs::temp_directory_path() / "path_not_empty_next"; + + auto storage_options = rosbag2_storage::StorageOptions(); + auto storage_options_next = rosbag2_storage::StorageOptions(); + + storage_options.uri = tmp_dir.string(); + storage_options_next.uri = tmp_dir_next.string(); + + EXPECT_NO_THROW( + writer_->open(storage_options, {serialization_format_, serialization_format_})); + + writer_->close(); + EXPECT_NO_THROW( + writer_->open(storage_options_next, {serialization_format_, serialization_format_})); +} + TEST_F(SequentialCompressionWriterTest, writer_calls_create_compressor) { rosbag2_compression::CompressionOptions compression_options{ diff --git a/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp b/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp index 1695203185..d9c541d101 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp @@ -162,6 +162,9 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter // Used to track topic -> message count. If cache is present, it is updated by CacheConsumer std::unordered_map topics_names_to_info_; + // Note: topics_names_to_info_ needs to be protected with mutex only when we are explicitly + // adding or deleting items (create_topic(..)/remove_topic(..)) and when we access it from + // CacheConsumer callback i.e., write_messages(..) std::mutex topics_info_mutex_; LocalMessageDefinitionSource message_definitions_; @@ -197,6 +200,7 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter void write_messages( const std::vector> & messages); bool is_first_message_ {true}; + std::atomic_bool is_open_{false}; bag_events::EventCallbackManager callback_manager_; }; diff --git a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp index a6e14b4f41..56d9788787 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp @@ -62,11 +62,13 @@ void Writer::open( const rosbag2_storage::StorageOptions & storage_options, const ConverterOptions & converter_options) { + std::lock_guard writer_lock(writer_mutex_); writer_impl_->open(storage_options, converter_options); } void Writer::close() { + std::lock_guard writer_lock(writer_mutex_); writer_impl_->close(); } diff --git a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp index 10a69b9235..a058a20733 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp @@ -59,7 +59,7 @@ SequentialWriter::SequentialWriter( metadata_io_(std::move(metadata_io)), converter_(nullptr), topics_names_to_info_(), - message_definitions_(), + topic_names_to_message_definitions_(), metadata_() {} @@ -69,7 +69,7 @@ SequentialWriter::~SequentialWriter() // Callbacks likely was created after SequentialWriter object and may point to the already // destructed objects. callback_manager_.delete_all_callbacks(); - close(); + SequentialWriter::close(); } void SequentialWriter::init_metadata() @@ -97,8 +97,13 @@ void SequentialWriter::open( const rosbag2_storage::StorageOptions & storage_options, const ConverterOptions & converter_options) { + // Note. close and open methods protected with mutex on upper rosbag2_cpp::writer level. + if (is_open_) { + return; // The writer already opened + } base_folder_ = storage_options.uri; storage_options_ = storage_options; + if (storage_options_.storage_id.empty()) { storage_options_.storage_id = rosbag2_storage::get_default_storage_id(); } @@ -161,10 +166,15 @@ void SequentialWriter::open( init_metadata(); storage_->update_metadata(metadata_); + is_open_ = true; } void SequentialWriter::close() { + // Note. close and open methods protected with mutex on upper rosbag2_cpp::writer level. + if (!is_open_.exchange(false)) { + return; // The writer is not open + } if (use_cache_) { // destructor will flush message cache cache_consumer_.reset(); @@ -180,13 +190,22 @@ void SequentialWriter::close() } if (storage_) { - auto info = std::make_shared(); - info->closed_file = storage_->get_relative_file_path(); storage_.reset(); // Destroy storage before calling WRITE_SPLIT callback to make sure that // bag file was closed before callback call. + } + if (!metadata_.relative_file_paths.empty()) { + auto info = std::make_shared(); + // Take the latest file name from metadata in case if it was updated after compression in + // derived class + info->closed_file = + (fs::path(base_folder_) / metadata_.relative_file_paths.back()).generic_string(); callback_manager_.execute_callbacks(bag_events::BagEvent::WRITE_SPLIT, info); } - storage_factory_.reset(); + + topics_names_to_info_.clear(); + topic_names_to_message_definitions_.clear(); + + converter_.reset(); } void SequentialWriter::create_topic(const rosbag2_storage::TopicMetadata & topic_with_type) @@ -226,7 +245,7 @@ void SequentialWriter::create_topic( return; } - if (!storage_) { + if (!is_open_) { throw std::runtime_error("Bag is not open. Call open() before writing."); } @@ -260,7 +279,7 @@ void SequentialWriter::create_topic( void SequentialWriter::remove_topic(const rosbag2_storage::TopicMetadata & topic_with_type) { - if (!storage_) { + if (!is_open_) { throw std::runtime_error("Bag is not open. Call open() before removing."); } @@ -308,14 +327,13 @@ void SequentialWriter::switch_to_next_storage() base_folder_, metadata_.relative_file_paths.size()); storage_ = storage_factory_->open_read_write(storage_options_); - storage_->update_metadata(metadata_); - if (!storage_) { std::stringstream errmsg; errmsg << "Failed to rollover bagfile to new file: \"" << storage_options_.uri << "\"!"; throw std::runtime_error(errmsg.str()); } + storage_->update_metadata(metadata_); // Re-register all topics since we rolled-over to a new bagfile. for (const auto & topic : topics_names_to_info_) { auto const & md = topic_names_to_message_definitions_[topic.first]; @@ -348,7 +366,7 @@ void SequentialWriter::split_bagfile() void SequentialWriter::write(std::shared_ptr message) { - if (!storage_) { + if (!is_open_) { throw std::runtime_error("Bag is not open. Call open() before writing."); } diff --git a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_api.cpp b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_api.cpp index f1d14cc0b8..377baa0fe6 100644 --- a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_api.cpp +++ b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_api.cpp @@ -52,8 +52,10 @@ TEST_P(TestRosbag2CPPAPI, minimal_writer_example) serialization.serialize_message(&test_msg, &serialized_msg); auto rosbag_directory = fs::path("test_rosbag2_writer_api_bag"); + auto rosbag_directory_next = fs::path("test_rosbag2_writer_api_bag_next"); // in case the bag was previously not cleaned up fs::remove_all(rosbag_directory); + fs::remove_all(rosbag_directory_next); { rosbag2_cpp::Writer writer; @@ -99,7 +101,18 @@ TEST_P(TestRosbag2CPPAPI, minimal_writer_example) // writing a non-serialized message writer.write(test_msg, "/a/ros2/message", rclcpp::Clock().now()); - // close on scope exit + // close as prompted + writer.close(); + + // open a new bag with the same writer + writer.open(rosbag_directory_next.string()); + + // write same topic to different bag + writer.write( + serialized_msg2, "/yet/another/topic", "test_msgs/msg/BasicTypes", + rclcpp::Clock().now()); + + // close by scope } { @@ -126,6 +139,24 @@ TEST_P(TestRosbag2CPPAPI, minimal_writer_example) // close on scope exit } + { + rosbag2_cpp::Reader reader; + std::string topic; + reader.open(rosbag_directory_next.string()); + ASSERT_TRUE(reader.has_next()); + + auto bag_message = reader.read_next(); + topic = bag_message->topic_name; + + TestMsgT extracted_test_msg; + rclcpp::SerializedMessage extracted_serialized_msg(*bag_message->serialized_data); + serialization.deserialize_message( + &extracted_serialized_msg, &extracted_test_msg); + + EXPECT_EQ(test_msg, extracted_test_msg); + EXPECT_EQ("/yet/another/topic", topic); + } + // alternative reader { rosbag2_cpp::Reader reader; @@ -140,6 +171,7 @@ TEST_P(TestRosbag2CPPAPI, minimal_writer_example) // remove the rosbag again after the test EXPECT_TRUE(fs::remove_all(rosbag_directory)); + EXPECT_TRUE(fs::remove_all(rosbag_directory_next)); } INSTANTIATE_TEST_SUITE_P(