From c30a2d7acd4872fe391d6085895476488f53c2da Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Sun, 21 May 2023 17:55:34 -0700 Subject: [PATCH] Gracefully handle CTRL+C and CTR+Break events on Windows - Add handler for the native Windows CTRL_C_EVENT, CTRL_BREAK_EVENT and CTRL_CLOSE_EVENT in rosbag2 recorder and player. - Map SIGINT signal to the native Windows CTRL_C_EVENT in the stop_execution(handle, signum) version for Windows. To be able correctly use start and stop execution routines from unit tests. - Enable integration tests which was disabled for Windows due to the incorrect sending and handling of the SIGINT event. - Got rid of the `finalize_metadata_kludge(..)` helper function. Signed-off-by: Michael Orlov --- rosbag2_py/src/rosbag2_py/_transport.cpp | 97 +++++++++++++++++-- .../process_execution_helpers_windows.hpp | 54 ++++++++--- .../test/rosbag2_tests/record_fixture.hpp | 40 +------- .../test_rosbag2_play_end_to_end.cpp | 2 - .../test_rosbag2_record_end_to_end.cpp | 19 +--- .../rosbag2_transport_test_fixture.hpp | 5 - 6 files changed, 136 insertions(+), 81 deletions(-) diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index 1b51f30429..4abb4eeb4f 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -31,6 +31,11 @@ #include "./pybind11.hpp" +#ifdef _WIN32 +#include +#include +#endif + namespace py = pybind11; typedef std::unordered_map QoSMap; @@ -132,7 +137,49 @@ class Player public: Player() { - rclcpp::init(0, nullptr); + auto init_options = rclcpp::InitOptions(); + init_options.shutdown_on_signal = false; + rclcpp::init(0, nullptr, init_options, rclcpp::SignalHandlerOptions::None); + std::signal( + SIGTERM, [](int /* signal */) { + rclcpp::shutdown(); + }); + std::signal( + SIGINT, [](int /* signal */) { + rclcpp::shutdown(); + }); +#ifdef _WIN32 + // According to the + // https://learn.microsoft.com/en-us/cpp/c-runtime-library/reference/signal?view=msvc-170 + // SIGINT is not supported for any Win32 application. When a CTRL+C interrupt occurs, + // Win32 operating systems generate a new thread to specifically handle that interrupt. + // Therefore, need to use native Windows control event handler as analog for the SIGINT. + + // Enable default CTRL+C handler first. This is workaround and needed for the cases when + // process created with CREATE_NEW_PROCESS_GROUP flag. Without it, installing custom Ctrl+C + // handler will not work. + if (!SetConsoleCtrlHandler(nullptr, false)) { + std::cerr << "Rosbag2 player error: Failed to enable default CTL+C handler.\n"; + } + // Installing our own control handler + auto CtrlHandler = [](DWORD fdwCtrlType) -> BOOL { + switch (fdwCtrlType) { + case CTRL_C_EVENT: + case CTRL_BREAK_EVENT: + case CTRL_CLOSE_EVENT: + rclcpp::shutdown(); + return TRUE; + case CTRL_SHUTDOWN_EVENT: + rclcpp::shutdown(); + return FALSE; + default: + return FALSE; + } + }; + if (!SetConsoleCtrlHandler(CtrlHandler, TRUE)) { + std::cerr << "Rosbag2 player error: Could not set control handler.\n"; + } +#endif // #ifdef _WIN32 } virtual ~Player() @@ -157,7 +204,7 @@ class Player player->play(); exec.cancel(); - spin_thread.join(); + if (spin_thread.joinable()) {spin_thread.join();} } void burst( @@ -182,8 +229,8 @@ class Player player->burst(num_messages); exec.cancel(); - spin_thread.join(); - play_thread.join(); + if (spin_thread.joinable()) {spin_thread.join();} + if (play_thread.joinable()) {play_thread.join();} } }; @@ -195,12 +242,50 @@ class Recorder public: Recorder() { - rclcpp::init(0, nullptr); - exec_ = std::make_unique(); + auto init_options = rclcpp::InitOptions(); + init_options.shutdown_on_signal = false; + rclcpp::init(0, nullptr, init_options, rclcpp::SignalHandlerOptions::None); std::signal( SIGTERM, [](int /* signal */) { rclcpp::shutdown(); }); + std::signal( + SIGINT, [](int /* signal */) { + rclcpp::shutdown(); + }); + exec_ = std::make_unique(); +#ifdef _WIN32 + // According to the + // https://learn.microsoft.com/en-us/cpp/c-runtime-library/reference/signal?view=msvc-170 + // SIGINT is not supported for any Win32 application. When a CTRL+C interrupt occurs, + // Win32 operating systems generate a new thread to specifically handle that interrupt. + // Therefore, need to use native Windows control event handler as analog for the SIGINT. + + // Enable default CTRL+C handler first. This is workaround and needed for the cases when + // process created with CREATE_NEW_PROCESS_GROUP flag. Without it, installing custom Ctrl+C + // handler will not work. + if (!SetConsoleCtrlHandler(nullptr, false)) { + std::cerr << "Rosbag2 recorder error: Failed to enable default CTL+C handler.\n"; + } + // Installing our own control handler + auto CtrlHandler = [](DWORD fdwCtrlType) -> BOOL { + switch (fdwCtrlType) { + case CTRL_C_EVENT: + case CTRL_BREAK_EVENT: + case CTRL_CLOSE_EVENT: + rclcpp::shutdown(); + return TRUE; + case CTRL_SHUTDOWN_EVENT: + rclcpp::shutdown(); + return FALSE; + default: + return FALSE; + } + }; + if (!SetConsoleCtrlHandler(CtrlHandler, TRUE)) { + std::cerr << "Rosbag2 recorder error: Could not set control handler.\n"; + } +#endif // #ifdef _WIN32 } virtual ~Recorder() diff --git a/rosbag2_test_common/include/rosbag2_test_common/process_execution_helpers_windows.hpp b/rosbag2_test_common/include/rosbag2_test_common/process_execution_helpers_windows.hpp index eb840c481d..5bc83bf196 100644 --- a/rosbag2_test_common/include/rosbag2_test_common/process_execution_helpers_windows.hpp +++ b/rosbag2_test_common/include/rosbag2_test_common/process_execution_helpers_windows.hpp @@ -18,7 +18,7 @@ #include #include -#include +#include #include #include @@ -46,7 +46,10 @@ PROCESS_INFORMATION create_process(TCHAR * command, const char * path = nullptr) nullptr, nullptr, false, - 0, + // Create process suspended and resume it after adding to the newly created job. Otherwise, + // there is a potential race condition where newly created process starts a subprocess + // before we've called AssignProcessToJobObject(); + CREATE_NEW_PROCESS_GROUP | CREATE_SUSPENDED, nullptr, path, &start_up_info, @@ -73,8 +76,9 @@ int execute_and_wait_until_completion(const std::string & command, const std::st const_char_to_tchar(command.c_str(), command_char); auto process = create_process(command_char, path.c_str()); - DWORD exit_code = 259; // 259 is the code one gets if the process is still active. - while (exit_code == 259) { + ResumeThread(process.hThread); + DWORD exit_code = STILL_ACTIVE; + while (exit_code == STILL_ACTIVE) { EXPECT_TRUE(GetExitCodeProcess(process.hProcess, &exit_code)); std::this_thread::sleep_for(std::chrono::milliseconds(10)); } @@ -97,6 +101,7 @@ ProcessHandle start_execution(const std::string & command) auto process_info = create_process(command_char); AssignProcessToJobObject(h_job, process_info.hProcess); + ResumeThread(process_info.hThread); Process process; process.process_info = process_info; process.job_handle = h_job; @@ -117,12 +122,12 @@ bool wait_until_completion( DWORD exit_code = 0; std::chrono::steady_clock::time_point const start = std::chrono::steady_clock::now(); EXPECT_TRUE(GetExitCodeProcess(handle.process_info.hProcess, &exit_code)); - // 259 indicates that the process is still active - while (exit_code == 259 && std::chrono::steady_clock::now() - start < timeout) { + while (exit_code == STILL_ACTIVE && std::chrono::steady_clock::now() - start < timeout) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); EXPECT_TRUE(GetExitCodeProcess(handle.process_info.hProcess, &exit_code)); } - return exit_code != 259; + EXPECT_EQ(exit_code, 0); + return exit_code != STILL_ACTIVE; } /// @brief Force to stop process with signal if it's currently running @@ -135,16 +140,37 @@ void stop_execution( std::chrono::duration timeout = std::chrono::seconds(10)) { // Match the Unix version by allowing for int signum argument - however Windows does not have - // Linux signals in the same way, so there isn't a 1:1 mapping to dispatch e.g. SIGTERM - DWORD exit_code; + // Linux signals in the same way, so there isn't a 1:1 mapping to dispatch e.g. SIGINT or SIGTERM + DWORD exit_code = STILL_ACTIVE; EXPECT_TRUE(GetExitCodeProcess(handle.process_info.hProcess, &exit_code)); - // 259 indicates that the process is still active: we want to make sure that the process is - // still running properly before killing it. - if (exit_code == 259) { - EXPECT_TRUE(GenerateConsoleCtrlEvent(CTRL_C_EVENT, handle.process_info.dwThreadId)); + // Make sure that the process is still running properly before stopping it. + if (exit_code == STILL_ACTIVE) { + switch (signum) { + // According to the + // https://learn.microsoft.com/en-us/cpp/c-runtime-library/reference/signal?view=msvc-170 + // SIGINT and SIGBREAK is not supported for any Win32 application. + // Need to use native Windows control event instead. + case SIGINT: + EXPECT_TRUE(GenerateConsoleCtrlEvent(CTRL_C_EVENT, handle.process_info.dwProcessId)); + break; + case SIGBREAK: + EXPECT_TRUE(GenerateConsoleCtrlEvent(CTRL_BREAK_EVENT, handle.process_info.dwProcessId)); + break; + case SIGTERM: + // The CTRL_CLOSE_EVENT is analog of the SIGTERM from POSIX. Windows sends CTRL_CLOSE_EVENT + // to all processes attached to a console when the user closes the console (either by + // clicking Close on the console window's window menu, or by clicking the End Task + // button command from Task Manager). Although according to the + // https://learn.microsoft.com/en-us/windows/console/generateconsolectrlevent the + // GenerateConsoleCtrlEvent doesn't support sending CTRL_CLOSE_EVENT. There are no way to + // directly send CTRL_CLOSE_EVENT to the process in the same console application. + // Therefore, adding SIGTERM to the unsupported events. + default: + throw std::runtime_error("Unsupported signum: " + std::to_string(signum)); + } bool process_finished = wait_until_completion(handle, timeout); if (!process_finished) { - std::cerr << "Testing process " << handle.process_info.hProcess << + std::cerr << "Testing process " << handle.process_info.dwProcessId << " hangout. Killing it with TerminateProcess(..) \n"; EXPECT_TRUE(TerminateProcess(handle.process_info.hProcess, 2)); EXPECT_TRUE(process_finished); diff --git a/rosbag2_tests/test/rosbag2_tests/record_fixture.hpp b/rosbag2_tests/test/rosbag2_tests/record_fixture.hpp index 694e6e4973..a3f4158b05 100644 --- a/rosbag2_tests/test/rosbag2_tests/record_fixture.hpp +++ b/rosbag2_tests/test/rosbag2_tests/record_fixture.hpp @@ -126,7 +126,7 @@ class RecordFixture : public ParametrizedTemporaryDirectoryFixture std::this_thread::sleep_for(50ms); } ASSERT_EQ(metadata_io.metadata_file_exists(bag_path), true) - << "Could not find metadata file."; + << "Could not find " << bag_path << " metadata file."; } void wait_for_storage_file(std::chrono::duration timeout = std::chrono::seconds(10)) @@ -181,44 +181,6 @@ class RecordFixture : public ParametrizedTemporaryDirectoryFixture return topic_it->serialization_format; } - void finalize_metadata_kludge( - int expected_splits = 0, - const std::string & compression_format = "", - const std::string & compression_mode = "") - { - // TODO(ros-tooling): Find out how to correctly send a Ctrl-C signal on Windows - // This is necessary as the process is killed hard on Windows and doesn't write a metadata file - #ifdef _WIN32 - rosbag2_storage::BagMetadata metadata{}; - metadata.storage_identifier = rosbag2_storage::get_default_storage_id(); - for (int i = 0; i <= expected_splits; i++) { - rcpputils::fs::path bag_file_path; - if (!compression_format.empty()) { - bag_file_path = get_bag_file_path(i); - } else { - bag_file_path = get_compressed_bag_file_path(i); - } - - if (rcpputils::fs::exists(bag_file_path)) { - metadata.relative_file_paths.push_back(bag_file_path.string()); - } - } - metadata.duration = std::chrono::nanoseconds(0); - metadata.starting_time = - std::chrono::time_point(std::chrono::nanoseconds(0)); - metadata.message_count = 0; - metadata.compression_mode = compression_mode; - metadata.compression_format = compression_format; - - rosbag2_storage::MetadataIo metadata_io; - metadata_io.write_metadata(root_bag_path_.string(), metadata); - #else - (void)expected_splits; - (void)compression_format; - (void)compression_mode; - #endif - } - // relative path to the root of the bag file. rcpputils::fs::path root_bag_path_; diff --git a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_play_end_to_end.cpp b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_play_end_to_end.cpp index de3ce26812..4315198dc7 100644 --- a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_play_end_to_end.cpp +++ b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_play_end_to_end.cpp @@ -271,7 +271,6 @@ TEST_P(PlayEndToEndTestFixture, play_filters_by_topic) { } } -#ifndef _WIN32 TEST_P(PlayEndToEndTestFixture, play_end_to_end_exits_gracefully_on_sigint) { sub_->add_subscription("/test_topic", 3, sub_qos_); sub_->add_subscription("/array_topic", 2, sub_qos_); @@ -293,7 +292,6 @@ TEST_P(PlayEndToEndTestFixture, play_end_to_end_exits_gracefully_on_sigint) { stop_execution(process_id, SIGINT); cleanup_process_handle.cancel(); } -#endif // #ifndef _WIN32 INSTANTIATE_TEST_SUITE_P( TestPlayEndToEnd, diff --git a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_record_end_to_end.cpp b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_record_end_to_end.cpp index 74277e4965..e12a0f0da7 100644 --- a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_record_end_to_end.cpp +++ b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_record_end_to_end.cpp @@ -58,7 +58,6 @@ std::shared_ptr create_string_message( } } // namespace -#ifndef _WIN32 TEST_P(RecordFixture, record_end_to_end_test_with_zstd_file_compression) { constexpr const char topic_name[] = "/test_topic"; const auto compression_format = "zstd"; @@ -118,7 +117,6 @@ TEST_P(RecordFixture, record_end_to_end_test_with_zstd_file_compression) { EXPECT_EQ(message->string_value, "test"); } } -#endif TEST_P(RecordFixture, record_end_to_end_test) { auto message = get_messages_strings()[0]; @@ -149,7 +147,6 @@ TEST_P(RecordFixture, record_end_to_end_test) { stop_execution(process_handle); cleanup_process_handle.cancel(); - finalize_metadata_kludge(); wait_for_metadata(); auto test_topic_messages = get_messages_for_topic("/test_topic"); EXPECT_THAT(test_topic_messages, SizeIs(Ge(expected_test_messages))); @@ -191,12 +188,14 @@ TEST_P(RecordFixture, record_end_to_end_test_start_paused) { stop_execution(process_handle); cleanup_process_handle.cancel(); - finalize_metadata_kludge(); wait_for_metadata(); auto test_topic_messages = get_messages_for_topic("/test_topic"); EXPECT_THAT(test_topic_messages, IsEmpty()); } +#ifndef _WIN32 +// This test is disabled for Windows platform because there are no way to send SIGTERM or its +// alternative CTRL_CLOSE_EVENT directly to the child process in the same console application. TEST_P(RecordFixture, record_end_to_end_exits_gracefully_on_sigterm) { const std::string topic_name = "/test_sigterm"; auto message = get_messages_strings()[0]; @@ -214,11 +213,8 @@ TEST_P(RecordFixture, record_end_to_end_exits_gracefully_on_sigterm) { stop_execution(process_handle, SIGTERM); wait_for_metadata(); } +#endif // #ifndef _WIN32 -// TODO(zmichaels11): Fix and enable this test on Windows. -// This tests depends on the ability to read the metadata file. -// Stopping the process on Windows does a hard kill and the metadata file is not written. -#ifndef _WIN32 TEST_P(RecordFixture, record_end_to_end_with_splitting_metadata_contains_all_topics) { constexpr const int bagfile_split_size = 4 * 1024 * 1024; // 4MB. constexpr const char first_topic_name[] = "/test_topic0"; @@ -273,7 +269,6 @@ TEST_P(RecordFixture, record_end_to_end_with_splitting_metadata_contains_all_top EXPECT_NE(topic_names.end(), topic_names.find(first_topic_name)); EXPECT_NE(topic_names.end(), topic_names.find(second_topic_name)); } -#endif TEST_P(RecordFixture, record_end_to_end_with_splitting_bagsize_split_is_at_least_specified_size) { constexpr const char topic_name[] = "/test_topic"; @@ -307,7 +302,6 @@ TEST_P(RecordFixture, record_end_to_end_with_splitting_bagsize_split_is_at_least pub_manager.run_publishers(); - finalize_metadata_kludge(expected_splits); wait_for_metadata(); rosbag2_storage::MetadataIo metadata_io; const auto metadata = metadata_io.read_metadata(root_bag_path_.string()); @@ -362,7 +356,6 @@ TEST_P(RecordFixture, record_end_to_end_with_splitting_max_size_not_reached) { stop_execution(process_handle); cleanup_process_handle.cancel(); - finalize_metadata_kludge(); wait_for_metadata(); rosbag2_storage::MetadataIo metadata_io; const auto metadata = metadata_io.read_metadata(root_bag_path_.string()); @@ -413,7 +406,6 @@ TEST_P(RecordFixture, record_end_to_end_with_splitting_splits_bagfile) { cleanup_process_handle.cancel(); wait_for_metadata(); - finalize_metadata_kludge(expected_splits); rosbag2_storage::MetadataIo metadata_io; const auto metadata = metadata_io.read_metadata(root_bag_path_.string()); @@ -459,7 +451,6 @@ TEST_P(RecordFixture, record_end_to_end_with_duration_splitting_splits_bagfile) stop_execution(process_handle); cleanup_process_handle.cancel(); - finalize_metadata_kludge(); wait_for_metadata(); rosbag2_storage::MetadataIo metadata_io; const auto metadata = metadata_io.read_metadata(root_bag_path_.string()); @@ -506,7 +497,6 @@ TEST_P(RecordFixture, record_end_to_end_test_with_zstd_file_compression_compress stop_execution(process_handle); cleanup_process_handle.cancel(); - finalize_metadata_kludge(expected_splits); wait_for_metadata(); rosbag2_storage::MetadataIo metadata_io; const auto metadata = metadata_io.read_metadata(root_bag_path_.string()); @@ -594,7 +584,6 @@ TEST_P(RecordFixture, record_end_to_end_test_with_cache) { stop_execution(process_handle); cleanup_process_handle.cancel(); - finalize_metadata_kludge(); wait_for_metadata(); auto test_topic_messages = get_messages_for_topic(topic_name); diff --git a/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp b/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp index 9e75cc84fa..42f8f8f761 100644 --- a/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp +++ b/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp @@ -26,11 +26,6 @@ #include "rosbag2_cpp/types.hpp" #include "rosbag2_cpp/writer.hpp" -#ifdef _WIN32 -# include -# include -#endif - #include "rosbag2_storage/storage_options.hpp" #include "rosbag2_transport/play_options.hpp"