Skip to content

Commit

Permalink
Install signal handlers in recorder only inside record method (#1464)
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
(cherry picked from commit 195e406)

# Conflicts:
#	rosbag2_py/src/rosbag2_py/_transport.cpp
  • Loading branch information
MichaelOrlov authored and mergify[bot] committed Jan 4, 2024
1 parent 8810fec commit 1bebffe
Showing 1 changed file with 62 additions and 10 deletions.
72 changes: 62 additions & 10 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Expand Up @@ -144,6 +144,7 @@ class Recorder
public:
Recorder()
{
<<<<<<< HEAD
auto init_options = rclcpp::InitOptions();
init_options.shutdown_on_signal = false;
rclcpp::init(0, nullptr, init_options, rclcpp::SignalHandlerOptions::SigInt);
Expand All @@ -157,6 +158,9 @@ class Recorder
SIGINT, [](int /* signal */) {
rosbag2_py::Recorder::cancel();
});
=======
rclcpp::init(0, nullptr);
>>>>>>> 195e406 (Install signal handlers in recorder only inside record method (#1464))
}

virtual ~Recorder()
Expand All @@ -169,6 +173,7 @@ class Recorder
const rosbag2_storage::StorageOptions & storage_options,
RecordOptions & record_options)
{
<<<<<<< HEAD
exit_ = false;
auto exec = std::make_unique<rclcpp::executors::SingleThreadedExecutor>();
if (record_options.rmw_serialization_format.empty()) {
Expand All @@ -186,19 +191,66 @@ class Recorder
auto spin_thread = std::thread(
[&exec]() {
exec->spin();
=======
auto old_sigterm_handler = std::signal(
SIGTERM, [](int /* signal */) {
rosbag2_py::Recorder::cancel();
>>>>>>> 195e406 (Install signal handlers in recorder only inside record method (#1464))
});
auto old_sigint_handler = std::signal(
SIGINT, [](int /* signal */) {
rosbag2_py::Recorder::cancel();
});
{
// Release the GIL for long-running record, so that calling Python code can use other threads
py::gil_scoped_release release;
std::unique_lock<std::mutex> lock(wait_for_exit_mutex_);
wait_for_exit_cv_.wait(lock, [] {return rosbag2_py::Recorder::exit_.load();});
recorder->stop();

try {
exit_ = false;
auto exec = std::make_unique<rclcpp::executors::SingleThreadedExecutor>();
if (record_options.rmw_serialization_format.empty()) {
record_options.rmw_serialization_format = std::string(rmw_get_serialization_format());
}

auto writer = rosbag2_transport::ReaderWriterFactory::make_writer(record_options);
auto recorder = std::make_shared<rosbag2_transport::Recorder>(
std::move(writer), storage_options, record_options, node_name);
recorder->record();

exec->add_node(recorder);
// Run exec->spin() in a separate thread, because we need to call exec->cancel() after
// recorder->stop() to be able to send notifications about bag split and close.
auto spin_thread = std::thread(
[&exec]() {
exec->spin();
});
{
// Release the GIL for long-running record, so that calling Python code
// can use other threads
py::gil_scoped_release release;
std::unique_lock<std::mutex> lock(wait_for_exit_mutex_);
wait_for_exit_cv_.wait(lock, [] {return rosbag2_py::Recorder::exit_.load();});
recorder->stop();
}
exec->cancel();
if (spin_thread.joinable()) {
spin_thread.join();
}
exec->remove_node(recorder);
} catch (...) {
// Return old signal handlers anyway
if (old_sigterm_handler != SIG_ERR) {
std::signal(SIGTERM, old_sigterm_handler);
}
if (old_sigint_handler != SIG_ERR) {
std::signal(SIGTERM, old_sigint_handler);
}
throw;
}
// Return old signal handlers
if (old_sigterm_handler != SIG_ERR) {
std::signal(SIGTERM, old_sigterm_handler);
}
exec->cancel();
if (spin_thread.joinable()) {
spin_thread.join();
if (old_sigint_handler != SIG_ERR) {
std::signal(SIGTERM, old_sigint_handler);
}
exec->remove_node(recorder);
}

static void cancel()
Expand Down

0 comments on commit 1bebffe

Please sign in to comment.