Skip to content
This repository was archived by the owner on Jan 23, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 56 additions & 15 deletions packages/jumpstarter-cli/jumpstarter_cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,34 @@
logger = logging.getLogger(__name__)


def _handle_exporter_exceptions(excgroup):
"""Handle exceptions from exporter serving."""
from jumpstarter_cli_common.exceptions import leaf_exceptions
for exc in leaf_exceptions(excgroup):
if not isinstance(exc, anyio.get_cancelled_exc_class()):
click.echo(
f"Exception while serving on the exporter: {type(exc).__name__}: {exc}",
err=True,
)


def _reap_zombie_processes(capture_child=None):
"""Reap zombie processes when running as PID 1."""
try:
while True:
try:
pid, status = os.waitpid(-1, os.WNOHANG)
if pid == 0:
break # No more children
if capture_child and pid == capture_child['pid']:
capture_child['status'] = status
logger.debug(f"PARENT: Reaped zombie process {pid} with status {status}")
except ChildProcessError:
break # No more children
except Exception as e:
logger.warning(f"PARENT: Error during zombie reaping: {e}")


def _handle_child(config):
"""Handle child process with graceful shutdown."""
async def serve_with_graceful_shutdown():
Expand All @@ -28,6 +56,7 @@ async def signal_handler():
continue # Ignore duplicate signals
received_signal = sig
logger.info("CHILD: Received %d (%s)", received_signal, signal.Signals(received_signal).name)

if exporter:
# Terminate exporter. SIGHUP waits until current lease is let go. Later SIGTERM still overrides
if received_signal != signal.SIGHUP:
Expand All @@ -45,13 +74,7 @@ async def signal_handler():
try:
await exporter.serve()
except* Exception as excgroup:
from jumpstarter_cli_common.exceptions import leaf_exceptions
for exc in leaf_exceptions(excgroup):
if not isinstance(exc, anyio.get_cancelled_exc_class()):
click.echo(
f"Exception while serving on the exporter: {type(exc).__name__}: {exc}",
err=True,
)
_handle_exporter_exceptions(excgroup)

# Cancel the signal handler after exporter completes
signal_tg.cancel_scope.cancel()
Expand All @@ -62,21 +85,38 @@ async def signal_handler():
sys.exit(anyio.run(serve_with_graceful_shutdown))


def _wait_for_child(pid, child_info):
"""Wait for child process, get status from signal handler if reaped."""
try:
_, status = os.waitpid(pid, 0)
except ChildProcessError:
status = child_info['status']
return status


def _handle_parent(pid):
"""Handle parent process waiting for child and signal forwarding."""
child_info = {'pid': pid, 'status': None}

def parent_signal_handler(signum, _):
logger.info("PARENT: Received %d (%s), forwarding to child PID %d", signum, signal.Signals(signum).name, pid)
if pid and pid > 0:
try:
os.kill(pid, signum)
except ProcessLookupError:
pass
if signum == signal.SIGCHLD and os.getpid() == 1:
_reap_zombie_processes(capture_child=child_info) # capture our own direct child if reaped
elif signum != signal.SIGCHLD:
logger.info("PARENT: Got %d (%s), forwarding to child PG %d", signum, signal.Signals(signum).name, pid)
if pid > 0:
try:
os.killpg(pid, signum)
except (ProcessLookupError, OSError):
pass

# Set up signal handlers after fork
for sig in (signal.SIGINT, signal.SIGTERM, signal.SIGHUP, signal.SIGQUIT):
for sig in (signal.SIGINT, signal.SIGTERM, signal.SIGHUP, signal.SIGQUIT, signal.SIGCHLD):
signal.signal(sig, parent_signal_handler)

_, status = os.waitpid(pid, 0)
status = _wait_for_child(pid, child_info)
if status is None:
return None

if os.WIFEXITED(status):
# Interpret child exit code
child_exit_code = os.WEXITSTATUS(status)
Expand All @@ -100,6 +140,7 @@ def _serve_with_exc_handling(config):
if (exit_code := _handle_parent(pid)) is not None:
return exit_code
else:
os.setsid() # Become group leader so all spawned subprocesses are reached by parent's signals
_handle_child(config)
sys.exit(1) # should never happen

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ async def _run_inline_shell_script(
cmd = self.shell + [script, method] + list(args)

# Start the process with pipes for streaming and new process group
self.logger.debug( f"running {method} with cmd: {cmd} and env: {combined_env} " f"and args: {args}")
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
Expand All @@ -152,7 +153,6 @@ async def _run_inline_shell_script(

# Read output in real-time
while process.returncode is None:
self.logger.debug(f"running {method} with cmd: {cmd} and env: {combined_env} and args: {args}")
if asyncio.get_event_loop().time() - start_time > self.timeout:
# Send SIGTERM to entire process group for graceful termination
try:
Expand Down
2 changes: 2 additions & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading