From 0bc652458c00958088d9ef1be942df6e6c4c8793 Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Mon, 12 Jun 2023 23:21:53 -0700 Subject: [PATCH] Gracefully handle SIGINT and SIGTERM in rosbag2 recorder (#1301) * Gracefully handle SIGINT and SIGTERM signal for rosbag2 recorder - Call recorder->stop() and exec_->cancel() instead of rclcpp::shutdown Signed-off-by: Michael Orlov * Use singleton for static executor in the rosbag2_py::Recorder Signed-off-by: Michael Orlov * Rollback to the non-static executor and don't call executor->cancel() - In case when signal will arrive we will trigger our internal exit_ variable and wait while current exec_->spin_all(10msec) will exit. Signed-off-by: Michael Orlov * Spin recorder node in a separate thread for better handling exit - Run exec->spin() in a separate thread, because we need to call exec->cancel() after recorder->stop() to be able to send notifications about bag split and close. - Wait on conditional variable for exit_ flag Signed-off-by: Michael Orlov * Address race condition in rosbag2_py.test_record_cancel - Add `record_thread.join()` before trying to parse metadata. Signed-off-by: Michael Orlov --------- Signed-off-by: Michael Orlov (cherry picked from commit 46a23e9c0a6f4ab4e41b2b2d8471b0a25ab1be79) --- rosbag2_py/src/rosbag2_py/_transport.cpp | 51 ++++++++++++++++++------ rosbag2_py/test/test_transport.py | 1 + 2 files changed, 40 insertions(+), 12 deletions(-) diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index 1b51f3042..0237bf5cf 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -189,17 +189,20 @@ class Player class Recorder { -private: - std::unique_ptr exec_; - public: Recorder() { - rclcpp::init(0, nullptr); - exec_ = std::make_unique(); + auto init_options = rclcpp::InitOptions(); + init_options.shutdown_on_signal = false; + rclcpp::init(0, nullptr, init_options, rclcpp::SignalHandlerOptions::None); + std::signal( SIGTERM, [](int /* signal */) { - rclcpp::shutdown(); + rosbag2_py::Recorder::cancel(); + }); + std::signal( + SIGINT, [](int /* signal */) { + rosbag2_py::Recorder::cancel(); }); } @@ -213,6 +216,8 @@ class Recorder RecordOptions & record_options, std::string & node_name) { + exit_ = false; + auto exec = std::make_unique(); if (record_options.rmw_serialization_format.empty()) { record_options.rmw_serialization_format = std::string(rmw_get_serialization_format()); } @@ -222,20 +227,42 @@ class Recorder std::move(writer), storage_options, record_options, node_name); recorder->record(); - exec_->add_node(recorder); - // Release the GIL for long-running record, so that calling Python code can use other threads + exec->add_node(recorder); + // Run exec->spin() in a separate thread, because we need to call exec->cancel() after + // recorder->stop() to be able to send notifications about bag split and close. + auto spin_thread = std::thread( + [&exec]() { + exec->spin(); + }); { + // Release the GIL for long-running record, so that calling Python code can use other threads py::gil_scoped_release release; - exec_->spin(); + std::unique_lock lock(wait_for_exit_mutex_); + wait_for_exit_cv_.wait(lock, [] {return rosbag2_py::Recorder::exit_.load();}); + recorder->stop(); + } + exec->cancel(); + if (spin_thread.joinable()) { + spin_thread.join(); } + exec->remove_node(recorder); } - void cancel() + static void cancel() { - exec_->cancel(); + exit_ = true; + wait_for_exit_cv_.notify_all(); } + +protected: + static std::atomic_bool exit_; + static std::condition_variable wait_for_exit_cv_; + std::mutex wait_for_exit_mutex_; }; +std::atomic_bool Recorder::exit_{false}; +std::condition_variable Recorder::wait_for_exit_cv_{}; + // Return a RecordOptions struct with defaults set for rewriting bags. rosbag2_transport::RecordOptions bag_rewrite_default_record_options() { @@ -360,7 +387,7 @@ PYBIND11_MODULE(_transport, m) { .def( "record", &rosbag2_py::Recorder::record, py::arg("storage_options"), py::arg("record_options"), py::arg("node_name") = "rosbag2_recorder") - .def("cancel", &rosbag2_py::Recorder::cancel) + .def_static("cancel", &rosbag2_py::Recorder::cancel) ; m.def( diff --git a/rosbag2_py/test/test_transport.py b/rosbag2_py/test/test_transport.py index 345cb1323..548708d40 100644 --- a/rosbag2_py/test/test_transport.py +++ b/rosbag2_py/test/test_transport.py @@ -80,6 +80,7 @@ def test_record_cancel(tmp_path, storage_id): metadata_io = rosbag2_py.MetadataIo() assert wait_for(lambda: metadata_io.metadata_file_exists(bag_path), timeout=rclpy.duration.Duration(seconds=3)) + record_thread.join() metadata = metadata_io.read_metadata(bag_path) assert(len(metadata.relative_file_paths))