From f0ea37d4555a072b15087cb17148349228483e8c Mon Sep 17 00:00:00 2001 From: Thien Tran Date: Fri, 28 Feb 2025 10:01:21 +0800 Subject: [PATCH 1/2] bug/feat: improve subprocess --- engine/cli/commands/server_start_cmd.cc | 6 +- .../extensions/python-engine/python_engine.cc | 15 +- .../extensions/python-engine/python_engine.h | 2 +- engine/services/hardware_service.cc | 6 +- engine/utils/process/utils.cc | 265 +++++++++++++++--- engine/utils/process/utils.h | 27 +- 6 files changed, 265 insertions(+), 56 deletions(-) diff --git a/engine/cli/commands/server_start_cmd.cc b/engine/cli/commands/server_start_cmd.cc index c2ef779f1..a4bcb1eb5 100644 --- a/engine/cli/commands/server_start_cmd.cc +++ b/engine/cli/commands/server_start_cmd.cc @@ -119,10 +119,10 @@ bool ServerStartCmd::Exec(const std::string& host, int port, commands.push_back(get_data_folder_path()); commands.push_back("--loglevel"); commands.push_back(log_level_); - auto pid = cortex::process::SpawnProcess(commands); - if (pid < 0) { + auto result = cortex::process::SpawnProcess(commands); + if (result.has_error()) { // Fork failed - std::cerr << "Could not start server: " << std::endl; + std::cerr << "Could not start server: " << result.error() << std::endl; return false; } else { // Parent process diff --git a/engine/extensions/python-engine/python_engine.cc b/engine/extensions/python-engine/python_engine.cc index 685301b47..a1d4b395f 100644 --- a/engine/extensions/python-engine/python_engine.cc +++ b/engine/extensions/python-engine/python_engine.cc @@ -286,16 +286,18 @@ void PythonEngine::LoadModel( // Add the parsed arguments to the command command.insert(command.end(), args.begin(), args.end()); - pid = cortex::process::SpawnProcess(command); - process_map_[model] = pid; - if (pid == -1) { + auto result = cortex::process::SpawnProcess(command); + + if (result.has_error()) { + CTL_ERR("Fail to spawn process. " << result.error()); + std::unique_lock lock(models_mutex_); if (models_.find(model) != models_.end()) { models_.erase(model); } Json::Value error; - error["error"] = "Fail to spawn process with pid -1"; + error["error"] = "Fail to spawn process"; Json::Value status; status["is_done"] = true; status["has_error"] = true; @@ -304,6 +306,9 @@ void PythonEngine::LoadModel( callback(std::move(status), std::move(error)); return; } + + pid = result.value().pid; + process_map_[model] = result.value(); } catch (const std::exception& e) { std::unique_lock lock(models_mutex_); if (models_.find(model) != models_.end()) { @@ -356,7 +361,7 @@ void PythonEngine::UnloadModel( } else { Json::Value error; error["error"] = "Fail to terminate process with id: " + - std::to_string(process_map_[model]); + std::to_string(process_map_[model].pid); Json::Value status; status["is_done"] = true; status["has_error"] = true; diff --git a/engine/extensions/python-engine/python_engine.h b/engine/extensions/python-engine/python_engine.h index 842ce8259..2c2883809 100644 --- a/engine/extensions/python-engine/python_engine.h +++ b/engine/extensions/python-engine/python_engine.h @@ -39,7 +39,7 @@ class PythonEngine : public EngineI { std::unordered_map models_; extensions::TemplateRenderer renderer_; std::unique_ptr async_file_logger_; - std::unordered_map process_map_; + std::unordered_map process_map_; trantor::ConcurrentTaskQueue q_; // Helper functions diff --git a/engine/services/hardware_service.cc b/engine/services/hardware_service.cc index 972647b51..e6bcc89ef 100644 --- a/engine/services/hardware_service.cc +++ b/engine/services/hardware_service.cc @@ -197,10 +197,10 @@ bool HardwareService::Restart(const std::string& host, int port) { commands.push_back(get_data_folder_path()); commands.push_back("--loglevel"); commands.push_back(luh::LogLevelStr(luh::global_log_level)); - auto pid = cortex::process::SpawnProcess(commands); - if (pid < 0) { + auto result = cortex::process::SpawnProcess(commands); + if (result.has_error()) { // Fork failed - std::cerr << "Could not start server: " << std::endl; + std::cerr << "Could not start server: " << result.error() << std::endl; return false; } else { // Parent process diff --git a/engine/utils/process/utils.cc b/engine/utils/process/utils.cc index f81796c5a..c09e66889 100644 --- a/engine/utils/process/utils.cc +++ b/engine/utils/process/utils.cc @@ -1,10 +1,13 @@ #include "utils/process/utils.h" +#include #include "utils/logging_utils.h" #if defined(_WIN32) #include #elif defined(__APPLE__) || defined(__linux__) -extern char **environ; // environment variables +extern char** environ; // environment variables +#include +#include #endif namespace cortex::process { @@ -36,7 +39,9 @@ std::vector ConvertToArgv(const std::vector& args) { return argv; } -pid_t SpawnProcess(const std::vector& command) { +cpp::result SpawnProcess( + const std::vector& command, const std::string& stdout_file, + const std::string& stderr_file) { try { #if defined(_WIN32) // Windows process creation @@ -44,6 +49,50 @@ pid_t SpawnProcess(const std::vector& command) { PROCESS_INFORMATION pi = {0}; si.cb = sizeof(si); + HANDLE hJob = NULL, hStdOut = NULL, hStdErr = NULL; + + // redirect stdout and stderr + if (!stdout_file.empty() || !stderr_file.empty()) { + si.dwFlags |= STARTF_USESTDHANDLES; + + // when STARTF_USESTDHANDLES is set, we have to explicitly inherit + // parent's handles, otherwise subprocess may successfuly spawn but + // exit immediately. + si.hStdOutput = GetStdHandle(STD_OUTPUT_HANDLE); + si.hStdError = GetStdHandle(STD_ERROR_HANDLE); + si.hStdInput = GetStdHandle(STD_INPUT_HANDLE); + + SECURITY_ATTRIBUTES sa; + sa.nLength = sizeof(sa); + sa.lpSecurityDescriptor = NULL; + sa.bInheritHandle = TRUE; + + if (!stdout_file.empty()) { + hStdOut = CreateFileA(stdout_file.c_str(), FILE_APPEND_DATA, + FILE_SHARE_READ | FILE_SHARE_WRITE, &sa, + OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL); + if (hStdOut == INVALID_HANDLE_VALUE) + throw std::runtime_error("Unable to create " + stdout_file + + " to redirect stdout"); + + si.hStdOutput = hStdOut; + } + if (!stderr_file.empty()) { + hStdErr = CreateFileA(stderr_file.c_str(), FILE_APPEND_DATA, + FILE_SHARE_WRITE | FILE_SHARE_READ, &sa, + OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL); + if (hStdErr == INVALID_HANDLE_VALUE) { + if (hStdOut != NULL) + CloseHandle(hStdOut); + + throw std::runtime_error("Unable to create " + stderr_file + + " to redirect stderr"); + } + + si.hStdError = hStdErr; + } + } + // Construct command line std::string cmd_line = ConstructWindowsCommandLine(command); @@ -51,28 +100,64 @@ pid_t SpawnProcess(const std::vector& command) { char command_buffer[4096]; strncpy_s(command_buffer, cmd_line.c_str(), sizeof(command_buffer)); - if (!CreateProcessA(NULL, // lpApplicationName - command_buffer, // lpCommandLine - NULL, // lpProcessAttributes - NULL, // lpThreadAttributes - FALSE, // bInheritHandles - 0, // dwCreationFlags - NULL, // lpEnvironment - NULL, // lpCurrentDirectory - &si, // lpStartupInfo - &pi // lpProcessInformation + // create a suspended process. we will resume it later after adding it to + // a job (see below) + if (!CreateProcessA(NULL, // lpApplicationName + command_buffer, // lpCommandLine + NULL, // lpProcessAttributes + NULL, // lpThreadAttributes + TRUE, // bInheritHandles + CREATE_SUSPENDED, // dwCreationFlags + NULL, // lpEnvironment + NULL, // lpCurrentDirectory + &si, // lpStartupInfo + &pi // lpProcessInformation )) { + if (hStdOut != NULL) + CloseHandle(hStdOut); + if (hStdErr != NULL) + CloseHandle(hStdErr); throw std::runtime_error("Failed to create process on Windows"); } - // Store the process ID - pid_t pid = pi.dwProcessId; + // https://devblogs.microsoft.com/oldnewthing/20131209-00/?p=2433 + // resume thread after job object assignment to make sure child processes + // will be spawned in the same job object. + hJob = CreateJobObjectA(NULL, NULL); + std::string err_msg; + bool success = false; + if (!AssignProcessToJobObject(hJob, pi.hProcess)) { + err_msg = "Unable to assign process to job object"; + } else if (ResumeThread(pi.hThread) == (DWORD)(-1)) { + err_msg = "Unable to resume thread"; + } else { + success = true; + } + + // clean up if not successful + if (!success) { + TerminateProcess(pi.hProcess, 0); + CloseHandle(pi.hProcess); + CloseHandle(pi.hThread); + CloseHandle(hJob); + if (hStdOut != NULL) + CloseHandle(hStdOut); + if (hStdErr != NULL) + CloseHandle(hStdErr); + throw std::runtime_error(err_msg); + } // Close handles to avoid resource leaks CloseHandle(pi.hProcess); CloseHandle(pi.hThread); - return pid; + ProcessInfo proc_info; + proc_info.pid = pi.dwProcessId; + proc_info.hJob = hJob; + proc_info.hStdOut = hStdOut; + proc_info.hStdErr = hStdErr; + + return proc_info; #elif defined(__APPLE__) || defined(__linux__) // POSIX process creation @@ -81,31 +166,98 @@ pid_t SpawnProcess(const std::vector& command) { // Convert command vector to char*[] auto argv = ConvertToArgv(command); + // redirect stdout and stderr + // caller should make sure the redirect files exist. + posix_spawn_file_actions_t* action_ptr = NULL; + + if (!stdout_file.empty() || !stderr_file.empty()) { + posix_spawn_file_actions_t action; + posix_spawn_file_actions_init(&action); + action_ptr = &action; + + if (!stdout_file.empty()) { + if (std::filesystem::exists(stdout_file)) { + int rc = posix_spawn_file_actions_addopen(&action, STDOUT_FILENO, + stdout_file.data(), + O_WRONLY | O_APPEND, 0); + if (rc != 0) { + posix_spawn_file_actions_destroy(action_ptr); + throw std::runtime_error("Unable to add stdout to file action"); + } + } + } + + if (!stderr_file.empty()) { + if (std::filesystem::exists(stderr_file)) { + int rc = posix_spawn_file_actions_addopen(&action, STDERR_FILENO, + stderr_file.data(), + O_WRONLY | O_APPEND, 0); + if (rc != 0) { + posix_spawn_file_actions_destroy(action_ptr); + throw std::runtime_error("Unable to add stderr to file action"); + } + } + } + } + // Use posix_spawn for cross-platform compatibility auto spawn_result = posix_spawn(&pid, // pid output command[0].c_str(), // executable path - NULL, // file actions + action_ptr, // file actions NULL, // spawn attributes argv.data(), // argument vector environ // environment (inherit) ); + // NOTE: it seems like it's ok to destroy this immediately before + // subprocess terminates. + if (action_ptr != NULL) { + posix_spawn_file_actions_destroy(action_ptr); + } + if (spawn_result != 0) { throw std::runtime_error("Failed to spawn process"); } - return pid; + ProcessInfo proc_info; + proc_info.pid = pid; + + return proc_info; #else #error Unsupported platform #endif } catch (const std::exception& e) { LOG_ERROR << "Process spawning error: " << e.what(); - return -1; + return cpp::fail(e.what()); + } +} + +static void SetProcessTerminated(ProcessInfo& proc_info) { + if (proc_info.pid == PID_TERMINATED) + return; + + proc_info.pid = PID_TERMINATED; + + // close handles on Windows +#if defined(_WIN32) + CloseHandle(proc_info.hJob); + proc_info.hJob = NULL; + if (proc_info.hStdOut != NULL) { + CloseHandle(proc_info.hStdOut); + proc_info.hStdOut = NULL; + } + if (proc_info.hStdErr != NULL) { + CloseHandle(proc_info.hStdErr); + proc_info.hStdErr = NULL; } +#endif } -bool IsProcessAlive(pid_t pid) { +bool IsProcessAlive(ProcessInfo& proc_info) { + if (proc_info.pid == PID_TERMINATED) + return false; + #ifdef _WIN32 // Windows implementation HANDLE snapshot = CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0); @@ -118,53 +270,88 @@ bool IsProcessAlive(pid_t pid) { if (Process32First(snapshot, &processEntry)) { do { - if (processEntry.th32ProcessID == pid) { + if (processEntry.th32ProcessID == proc_info.pid) { CloseHandle(snapshot); return true; } } while (Process32Next(snapshot, &processEntry)); } + // pid not found in snapshot -> process has terminated. CloseHandle(snapshot); + SetProcessTerminated(proc_info); return false; #elif defined(__APPLE__) || defined(__linux__) // Unix-like systems (Linux and macOS) implementation - if (pid <= 0) { - return false; - } + + // NOTE: kill(pid, 0) only works if the process has been reaped. + // if the process has terminated but not reaped (exit status is still + // stored in the process table), kill(pid, 0) still returns 0. // Try to send signal 0 to the process // This doesn't actually send a signal but checks if we can send signals to the process - int result = kill(pid, 0); + // Process exists and we have permission to send it signals + // if (kill(proc_info.pid, 0) == 0) { + // return true; + // } - if (result == 0) { - return true; // Process exists and we have permission to send it signals - } + // // process exists but we don't have permission to send signal + // if (errno == EPERM) + // return true; - return errno != ESRCH; // ESRCH means "no such process" + if (waitpid(proc_info.pid, NULL, WNOHANG) == 0) + return true; + SetProcessTerminated(proc_info); + return false; #else #error "Unsupported platform" #endif } -bool KillProcess(pid_t pid) { -#if defined(_WIN32) - HANDLE hProcess = OpenProcess(PROCESS_TERMINATE, FALSE, pid); - if (hProcess == NULL) { - LOG_ERROR << "Failed to open process"; - return false; - } +bool WaitProcess(ProcessInfo& proc_info) { + if (proc_info.pid == PID_TERMINATED) + return true; + + bool success; - bool is_success = TerminateProcess(hProcess, 0) == TRUE; +#if defined(_WIN32) + // NOTE: OpenProcess() may fail if the process has terminated. + HANDLE hProcess = OpenProcess(SYNCHRONIZE, FALSE, proc_info.pid); + success = WaitForSingleObject(hProcess, INFINITE) == WAIT_OBJECT_0; CloseHandle(hProcess); - return is_success; #elif defined(__APPLE__) || defined(__linux__) - // NOTE: should we use SIGKILL here to be consistent with Windows? - return kill(pid, SIGTERM) == 0; + // NOTE: waitpid() may fail if the process has terminated and the OS + // has reaped it (i.e. clear its exit status). + success = waitpid(proc_info.pid, NULL, 0) == proc_info.pid; +#else +#error "Unsupported platform" +#endif + + if (success) + SetProcessTerminated(proc_info); + return success; +} + +bool KillProcess(ProcessInfo& proc_info) { + if (proc_info.pid == PID_TERMINATED) + return true; + + bool success; + +#if defined(_WIN32) + success = TerminateJobObject(proc_info.hJob, 0) == 0; +#elif defined(__APPLE__) || defined(__linux__) + // we send SIGTERM to subprocess. we trust that this subprocess will + // propagate SIGTERM correctly to its children processes. + success = kill(proc_info.pid, SIGTERM) == 0; #else #error "Unsupported platform" #endif + + if (success) + SetProcessTerminated(proc_info); + return success; } } // namespace cortex::process diff --git a/engine/utils/process/utils.h b/engine/utils/process/utils.h index 2a5c62dfa..19b821cef 100644 --- a/engine/utils/process/utils.h +++ b/engine/utils/process/utils.h @@ -12,16 +12,33 @@ using pid_t = DWORD; #include #endif -#include #include +#include +#include "utils/result.hpp" namespace cortex::process { + +// set pid to this value to signal that this pid should not be used. +constexpr pid_t PID_TERMINATED = 0; + +struct ProcessInfo { + pid_t pid; +#ifdef _WIN32 + // hJob is used to terminate process and its children. + // hStdOut and hStdErr must be manually closed upon process termination. + HANDLE hJob, hStdOut, hStdErr; +#endif +}; + std::string ConstructWindowsCommandLine(const std::vector& args); std::vector ConvertToArgv(const std::vector& args); -pid_t SpawnProcess(const std::vector& command); -bool IsProcessAlive(pid_t pid); -bool KillProcess(pid_t pid); +cpp::result SpawnProcess( + const std::vector& command, + const std::string& stdout_file = "", const std::string& stderr_file = ""); +bool IsProcessAlive(ProcessInfo& proc_info); +bool WaitProcess(ProcessInfo& proc_info); +bool KillProcess(ProcessInfo& proc_info); -} +} // namespace cortex::process From 7ec13300ab096a142d744100626775faf64e7d92 Mon Sep 17 00:00:00 2001 From: Thien Tran Date: Mon, 3 Mar 2025 21:10:19 +0800 Subject: [PATCH 2/2] log subprocess command for easier debugging --- engine/utils/process/utils.cc | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/engine/utils/process/utils.cc b/engine/utils/process/utils.cc index c09e66889..8cd0adc64 100644 --- a/engine/utils/process/utils.cc +++ b/engine/utils/process/utils.cc @@ -1,5 +1,6 @@ #include "utils/process/utils.h" #include +#include #include "utils/logging_utils.h" #if defined(_WIN32) @@ -42,6 +43,12 @@ std::vector ConvertToArgv(const std::vector& args) { cpp::result SpawnProcess( const std::vector& command, const std::string& stdout_file, const std::string& stderr_file) { + std::stringstream ss; + for (const auto item : command) { + ss << item << " "; + } + CTL_INF("Spawning process with command: " << ss.str()); + try { #if defined(_WIN32) // Windows process creation