Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[iron] Gracefully handle SIGINT and SIGTERM in rosbag2 recorder (backport #1301) #1394

Merged
merged 1 commit into from Jun 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
51 changes: 39 additions & 12 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Expand Up @@ -189,17 +189,20 @@ class Player

class Recorder
{
private:
std::unique_ptr<rclcpp::executors::SingleThreadedExecutor> exec_;

public:
Recorder()
{
rclcpp::init(0, nullptr);
exec_ = std::make_unique<rclcpp::executors::SingleThreadedExecutor>();
auto init_options = rclcpp::InitOptions();
init_options.shutdown_on_signal = false;
rclcpp::init(0, nullptr, init_options, rclcpp::SignalHandlerOptions::None);

std::signal(
SIGTERM, [](int /* signal */) {
rclcpp::shutdown();
rosbag2_py::Recorder::cancel();
});
std::signal(
SIGINT, [](int /* signal */) {
rosbag2_py::Recorder::cancel();
});
}

Expand All @@ -213,6 +216,8 @@ class Recorder
RecordOptions & record_options,
std::string & node_name)
{
exit_ = false;
auto exec = std::make_unique<rclcpp::executors::SingleThreadedExecutor>();
if (record_options.rmw_serialization_format.empty()) {
record_options.rmw_serialization_format = std::string(rmw_get_serialization_format());
}
Expand All @@ -222,20 +227,42 @@ class Recorder
std::move(writer), storage_options, record_options, node_name);
recorder->record();

exec_->add_node(recorder);
// Release the GIL for long-running record, so that calling Python code can use other threads
exec->add_node(recorder);
// Run exec->spin() in a separate thread, because we need to call exec->cancel() after
// recorder->stop() to be able to send notifications about bag split and close.
auto spin_thread = std::thread(
[&exec]() {
exec->spin();
});
{
// Release the GIL for long-running record, so that calling Python code can use other threads
py::gil_scoped_release release;
exec_->spin();
std::unique_lock<std::mutex> lock(wait_for_exit_mutex_);
wait_for_exit_cv_.wait(lock, [] {return rosbag2_py::Recorder::exit_.load();});
recorder->stop();
}
exec->cancel();
if (spin_thread.joinable()) {
spin_thread.join();
}
exec->remove_node(recorder);
}

void cancel()
static void cancel()
{
exec_->cancel();
exit_ = true;
wait_for_exit_cv_.notify_all();
}

protected:
static std::atomic_bool exit_;
static std::condition_variable wait_for_exit_cv_;
std::mutex wait_for_exit_mutex_;
};

std::atomic_bool Recorder::exit_{false};
std::condition_variable Recorder::wait_for_exit_cv_{};

// Return a RecordOptions struct with defaults set for rewriting bags.
rosbag2_transport::RecordOptions bag_rewrite_default_record_options()
{
Expand Down Expand Up @@ -360,7 +387,7 @@ PYBIND11_MODULE(_transport, m) {
.def(
"record", &rosbag2_py::Recorder::record, py::arg("storage_options"), py::arg("record_options"),
py::arg("node_name") = "rosbag2_recorder")
.def("cancel", &rosbag2_py::Recorder::cancel)
.def_static("cancel", &rosbag2_py::Recorder::cancel)
;

m.def(
Expand Down
1 change: 1 addition & 0 deletions rosbag2_py/test/test_transport.py
Expand Up @@ -80,6 +80,7 @@ def test_record_cancel(tmp_path, storage_id):
metadata_io = rosbag2_py.MetadataIo()
assert wait_for(lambda: metadata_io.metadata_file_exists(bag_path),
timeout=rclpy.duration.Duration(seconds=3))
record_thread.join()

metadata = metadata_io.read_metadata(bag_path)
assert(len(metadata.relative_file_paths))
Expand Down