diff --git a/packages/jumpstarter-cli/jumpstarter_cli/run.py b/packages/jumpstarter-cli/jumpstarter_cli/run.py index 484e16d6b..b288d8968 100644 --- a/packages/jumpstarter-cli/jumpstarter_cli/run.py +++ b/packages/jumpstarter-cli/jumpstarter_cli/run.py @@ -12,6 +12,34 @@ logger = logging.getLogger(__name__) +def _handle_exporter_exceptions(excgroup): + """Handle exceptions from exporter serving.""" + from jumpstarter_cli_common.exceptions import leaf_exceptions + for exc in leaf_exceptions(excgroup): + if not isinstance(exc, anyio.get_cancelled_exc_class()): + click.echo( + f"Exception while serving on the exporter: {type(exc).__name__}: {exc}", + err=True, + ) + + +def _reap_zombie_processes(capture_child=None): + """Reap zombie processes when running as PID 1.""" + try: + while True: + try: + pid, status = os.waitpid(-1, os.WNOHANG) + if pid == 0: + break # No more children + if capture_child and pid == capture_child['pid']: + capture_child['status'] = status + logger.debug(f"PARENT: Reaped zombie process {pid} with status {status}") + except ChildProcessError: + break # No more children + except Exception as e: + logger.warning(f"PARENT: Error during zombie reaping: {e}") + + def _handle_child(config): """Handle child process with graceful shutdown.""" async def serve_with_graceful_shutdown(): @@ -28,6 +56,7 @@ async def signal_handler(): continue # Ignore duplicate signals received_signal = sig logger.info("CHILD: Received %d (%s)", received_signal, signal.Signals(received_signal).name) + if exporter: # Terminate exporter. SIGHUP waits until current lease is let go. Later SIGTERM still overrides if received_signal != signal.SIGHUP: @@ -45,13 +74,7 @@ async def signal_handler(): try: await exporter.serve() except* Exception as excgroup: - from jumpstarter_cli_common.exceptions import leaf_exceptions - for exc in leaf_exceptions(excgroup): - if not isinstance(exc, anyio.get_cancelled_exc_class()): - click.echo( - f"Exception while serving on the exporter: {type(exc).__name__}: {exc}", - err=True, - ) + _handle_exporter_exceptions(excgroup) # Cancel the signal handler after exporter completes signal_tg.cancel_scope.cancel() @@ -62,21 +85,38 @@ async def signal_handler(): sys.exit(anyio.run(serve_with_graceful_shutdown)) +def _wait_for_child(pid, child_info): + """Wait for child process, get status from signal handler if reaped.""" + try: + _, status = os.waitpid(pid, 0) + except ChildProcessError: + status = child_info['status'] + return status + + def _handle_parent(pid): """Handle parent process waiting for child and signal forwarding.""" + child_info = {'pid': pid, 'status': None} + def parent_signal_handler(signum, _): - logger.info("PARENT: Received %d (%s), forwarding to child PID %d", signum, signal.Signals(signum).name, pid) - if pid and pid > 0: - try: - os.kill(pid, signum) - except ProcessLookupError: - pass + if signum == signal.SIGCHLD and os.getpid() == 1: + _reap_zombie_processes(capture_child=child_info) # capture our own direct child if reaped + elif signum != signal.SIGCHLD: + logger.info("PARENT: Got %d (%s), forwarding to child PG %d", signum, signal.Signals(signum).name, pid) + if pid > 0: + try: + os.killpg(pid, signum) + except (ProcessLookupError, OSError): + pass # Set up signal handlers after fork - for sig in (signal.SIGINT, signal.SIGTERM, signal.SIGHUP, signal.SIGQUIT): + for sig in (signal.SIGINT, signal.SIGTERM, signal.SIGHUP, signal.SIGQUIT, signal.SIGCHLD): signal.signal(sig, parent_signal_handler) - _, status = os.waitpid(pid, 0) + status = _wait_for_child(pid, child_info) + if status is None: + return None + if os.WIFEXITED(status): # Interpret child exit code child_exit_code = os.WEXITSTATUS(status) @@ -100,6 +140,7 @@ def _serve_with_exc_handling(config): if (exit_code := _handle_parent(pid)) is not None: return exit_code else: + os.setsid() # Become group leader so all spawned subprocesses are reached by parent's signals _handle_child(config) sys.exit(1) # should never happen diff --git a/packages/jumpstarter-driver-shell/jumpstarter_driver_shell/driver.py b/packages/jumpstarter-driver-shell/jumpstarter_driver_shell/driver.py index bbabadfc0..f790a30f2 100644 --- a/packages/jumpstarter-driver-shell/jumpstarter_driver_shell/driver.py +++ b/packages/jumpstarter-driver-shell/jumpstarter_driver_shell/driver.py @@ -138,6 +138,7 @@ async def _run_inline_shell_script( cmd = self.shell + [script, method] + list(args) # Start the process with pipes for streaming and new process group + self.logger.debug( f"running {method} with cmd: {cmd} and env: {combined_env} " f"and args: {args}") process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, @@ -152,7 +153,6 @@ async def _run_inline_shell_script( # Read output in real-time while process.returncode is None: - self.logger.debug(f"running {method} with cmd: {cmd} and env: {combined_env} and args: {args}") if asyncio.get_event_loop().time() - start_time > self.timeout: # Send SIGTERM to entire process group for graceful termination try: diff --git a/uv.lock b/uv.lock index ed2d7e723..1564fc10a 100644 --- a/uv.lock +++ b/uv.lock @@ -1860,6 +1860,7 @@ name = "jumpstarter-driver-shell" source = { editable = "packages/jumpstarter-driver-shell" } dependencies = [ { name = "anyio" }, + { name = "click" }, { name = "jumpstarter" }, ] @@ -1872,6 +1873,7 @@ dev = [ [package.metadata] requires-dist = [ { name = "anyio", specifier = ">=4.10.0" }, + { name = "click", specifier = ">=8.1.8" }, { name = "jumpstarter", editable = "packages/jumpstarter" }, ]