-
Notifications
You must be signed in to change notification settings - Fork 7k
[core] Improve worker/driver signal handling #57086
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0b5540e
d914f38
06182da
2c16f4c
e97e2c4
40c3f42
63e3039
e3a4843
7a92063
5b85f9c
1f6b236
0e281c7
2a33f08
2a70ea8
ead19f0
fabf28b
801e762
ff60e40
2c5c8de
d1fdf42
d5acae6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,7 @@ | |
| from typing import ( | ||
| TYPE_CHECKING, | ||
| Any, | ||
| Callable, | ||
| Dict, | ||
| List, | ||
| Mapping, | ||
|
|
@@ -801,6 +802,97 @@ def set_sigterm_handler(sigterm_handler): | |
| signal.signal(signal.SIGTERM, sigterm_handler) | ||
|
|
||
|
|
||
| _signal_handler_installed = False | ||
| _graceful_shutdown_in_progress = False | ||
|
|
||
|
|
||
| def install_driver_signal_handler() -> None: | ||
| """Install SIGTERM handler for Ray driver processes. | ||
|
|
||
| Implements graceful-then-forced shutdown semantics: | ||
| - First SIGTERM: trigger graceful shutdown via sys.exit() (allows atexit handlers) | ||
| - Second SIGTERM: escalate to immediate forced shutdown via os._exit(1) | ||
|
|
||
| Must be called from the main thread (Python signal handlers requirement). | ||
| Refer to https://docs.python.org/3/library/signal.html#signals-and-threads for more details. | ||
| """ | ||
| global _signal_handler_installed, _graceful_shutdown_in_progress | ||
| if _signal_handler_installed: | ||
| return | ||
|
|
||
| if threading.current_thread() is not threading.main_thread(): | ||
| logger.warning( | ||
| "Signal handlers not installed because current thread is not the main thread. Refer to https://docs.python.org/3/library/signal.html#signals-and-threads for more details." | ||
| ) | ||
| return | ||
|
Comment on lines
+823
to
+827
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should raise an exception. Does it happen anywhere today? |
||
|
|
||
| def _handler(signum, _frame): | ||
| global _graceful_shutdown_in_progress | ||
| if not _graceful_shutdown_in_progress: | ||
| _graceful_shutdown_in_progress = True | ||
| sys.exit(signum) | ||
| else: | ||
| logger.warning( | ||
| "Received second SIGTERM signal; escalating to immediate forced shutdown." | ||
| ) | ||
| os._exit(1) | ||
|
|
||
| set_sigterm_handler(_handler) | ||
| _signal_handler_installed = True | ||
|
|
||
|
|
||
codope marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| def install_worker_signal_handler(force_shutdown_fn: Callable[[str], None]) -> None: | ||
| """Install SIGTERM handler for Ray worker processes. | ||
|
|
||
| Workers receive external SIGTERM as a forced shutdown signal to avoid hangs | ||
| during blocking operations like ray.get()/wait(). This is different from | ||
| driver semantics where the first signal is graceful. | ||
|
|
||
| Must be called from the main thread (Python signal handlers requirement). | ||
| Refer to https://docs.python.org/3/library/signal.html#signals-and-threads for more details. | ||
|
|
||
| Args: | ||
| force_shutdown_fn: Function to call for forced shutdown. Should accept a | ||
| single string argument (detail message). | ||
|
|
||
| Raises: | ||
| AssertionError: If force_shutdown_fn is None. | ||
|
|
||
| Only installs on the main thread; logs a warning otherwise. | ||
| """ | ||
| global _signal_handler_installed | ||
| assert ( | ||
| force_shutdown_fn is not None | ||
| ), "Worker signal handlers require force_shutdown_fn" | ||
|
|
||
| if _signal_handler_installed: | ||
| return | ||
|
|
||
| if threading.current_thread() is not threading.main_thread(): | ||
| logger.warning( | ||
| "Signal handlers not installed because current thread is not the main thread. Refer to https://docs.python.org/3/library/signal.html#signals-and-threads for more details." | ||
| ) | ||
| return | ||
|
|
||
| def _handler(signum, _frame): | ||
| # Workers treat external SIGTERM as immediate forced exit to avoid hangs. | ||
| signal_name = signal.Signals(signum).name | ||
| force_shutdown_fn(signal_name) | ||
|
|
||
| set_sigterm_handler(_handler) | ||
| _signal_handler_installed = True | ||
codope marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
|
|
||
| def reset_signal_handler_state() -> None: | ||
| """Reset signal handler module flags for subsequent ray.init() in same process. | ||
|
|
||
| Called during ray.shutdown() to allow re-initialization of signal handlers. | ||
| """ | ||
| global _signal_handler_installed, _graceful_shutdown_in_progress | ||
| _signal_handler_installed = False | ||
| _graceful_shutdown_in_progress = False | ||
|
|
||
|
|
||
| def try_to_symlink(symlink_path, target_path): | ||
| """Attempt to create a symlink. | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -87,7 +87,6 @@ | |||||
| from ray._raylet import ( | ||||||
| ObjectRefGenerator, | ||||||
| TaskID, | ||||||
| raise_sys_exit_with_custom_error_message, | ||||||
| ) | ||||||
| from ray.actor import ActorClass | ||||||
| from ray.exceptions import ObjectStoreFullError, RayError, RaySystemError, RayTaskError | ||||||
|
|
@@ -1037,13 +1036,12 @@ def get_objects( | |||||
| def main_loop(self): | ||||||
| """The main loop a worker runs to receive and execute tasks.""" | ||||||
|
|
||||||
| def sigterm_handler(signum, frame): | ||||||
| raise_sys_exit_with_custom_error_message( | ||||||
| "The process receives a SIGTERM.", exit_code=1 | ||||||
| def force_shutdown(detail: str): | ||||||
| self.core_worker.force_exit_worker( | ||||||
| ray_constants.WORKER_EXIT_TYPE_SYSTEM, detail.encode("utf-8") | ||||||
| ) | ||||||
| # Note: shutdown() function is called from atexit handler. | ||||||
|
|
||||||
| ray._private.utils.set_sigterm_handler(sigterm_handler) | ||||||
| ray._private.utils.install_worker_signal_handler(force_shutdown) | ||||||
| self.core_worker.run_task_loop() | ||||||
| sys.exit(0) | ||||||
|
|
||||||
|
|
@@ -1676,17 +1674,7 @@ def init( | |||||
| system_reserved_memory=system_reserved_memory, | ||||||
| ) | ||||||
|
|
||||||
| # terminate any signal before connecting driver | ||||||
| def sigterm_handler(signum, frame): | ||||||
| sys.exit(signum) | ||||||
|
|
||||||
| if threading.current_thread() is threading.main_thread(): | ||||||
| ray._private.utils.set_sigterm_handler(sigterm_handler) | ||||||
| else: | ||||||
| logger.warning( | ||||||
| "SIGTERM handler is not set because current thread " | ||||||
| "is not the main thread." | ||||||
| ) | ||||||
| ray._private.utils.install_driver_signal_handler() | ||||||
|
|
||||||
| # If available, use RAY_ADDRESS to override if the address was left | ||||||
| # unspecified, or set to "auto" in the call to init | ||||||
|
|
@@ -2101,6 +2089,7 @@ def shutdown(_exiting_interpreter: bool = False): | |||||
| from ray.dag.compiled_dag_node import _shutdown_all_compiled_dags | ||||||
|
|
||||||
| _shutdown_all_compiled_dags() | ||||||
| ray._private.utils.reset_signal_handler_state() | ||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
there's no real state, just the handler |
||||||
| global_worker.shutdown_gpu_object_manager() | ||||||
|
|
||||||
| if _exiting_interpreter and global_worker.mode == SCRIPT_MODE: | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -130,6 +130,7 @@ from ray.includes.common cimport ( | |
| WORKER_EXIT_TYPE_USER_ERROR, | ||
| WORKER_EXIT_TYPE_SYSTEM_ERROR, | ||
| WORKER_EXIT_TYPE_INTENTIONAL_SYSTEM_ERROR, | ||
| WORKER_EXIT_TYPE_INTENDED_USER_EXIT, | ||
| kResourceUnitScaling, | ||
| kImplicitResourcePrefix, | ||
| kWorkerSetupHookKeyName, | ||
|
|
@@ -3020,18 +3021,56 @@ cdef class CoreWorker: | |
| CWorkerExitType c_exit_type | ||
| cdef const shared_ptr[LocalMemoryBuffer] null_ptr | ||
|
|
||
| if exit_type == "user": | ||
| if exit_type == ray_constants.WORKER_EXIT_TYPE_USER: | ||
| c_exit_type = WORKER_EXIT_TYPE_USER_ERROR | ||
| elif exit_type == "system": | ||
| elif exit_type == ray_constants.WORKER_EXIT_TYPE_SYSTEM: | ||
| c_exit_type = WORKER_EXIT_TYPE_SYSTEM_ERROR | ||
| elif exit_type == "intentional_system_exit": | ||
| elif exit_type == ray_constants.WORKER_EXIT_TYPE_INTENTIONAL_SYSTEM: | ||
| c_exit_type = WORKER_EXIT_TYPE_INTENTIONAL_SYSTEM_ERROR | ||
| else: | ||
| raise ValueError(f"Invalid exit type: {exit_type}") | ||
| assert not self.is_driver | ||
| with nogil: | ||
| CCoreWorkerProcess.GetCoreWorker().Exit(c_exit_type, detail, null_ptr) | ||
|
|
||
| def force_exit_worker(self, exit_type: str, c_string detail): | ||
| """Force exit the current worker process immediately without draining. | ||
|
|
||
| Terminates the worker process via CoreWorker.ForceExit, bypassing graceful | ||
| shutdown (no task draining). Used for forced shutdowns triggered by signals | ||
| or other immediate termination scenarios. | ||
|
Comment on lines
+3039
to
+3041
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this the existing behavior? |
||
|
|
||
| Args: | ||
| exit_type: Type of exit. Must be one of: | ||
| - "user": User-initiated forced exit (INTENDED_USER_EXIT) | ||
| - "system": System error forced exit (SYSTEM_ERROR) | ||
| - "intentional_system_exit": Intentional system-initiated exit | ||
| detail: Human-readable detail string describing the exit reason. | ||
|
|
||
| Raises: | ||
| AssertionError: If called from a driver process (must be worker-only). | ||
| ValueError: If exit_type is not one of the valid options. | ||
| """ | ||
| assert not self.is_driver, ( | ||
| "force_exit_worker must only be called by workers, not drivers" | ||
| ) | ||
| cdef CWorkerExitType c_exit_type | ||
| if exit_type == ray_constants.WORKER_EXIT_TYPE_USER: | ||
| c_exit_type = WORKER_EXIT_TYPE_INTENDED_USER_EXIT | ||
| elif exit_type == ray_constants.WORKER_EXIT_TYPE_SYSTEM: | ||
| c_exit_type = WORKER_EXIT_TYPE_SYSTEM_ERROR | ||
| elif exit_type == ray_constants.WORKER_EXIT_TYPE_INTENTIONAL_SYSTEM: | ||
| c_exit_type = WORKER_EXIT_TYPE_INTENTIONAL_SYSTEM_ERROR | ||
| else: | ||
| raise ValueError( | ||
| f"Invalid exit_type '{exit_type}'; expected " | ||
| f"'{ray_constants.WORKER_EXIT_TYPE_USER}', " | ||
| f"'{ray_constants.WORKER_EXIT_TYPE_SYSTEM}', or " | ||
| f"'{ray_constants.WORKER_EXIT_TYPE_INTENTIONAL_SYSTEM}'" | ||
| ) | ||
| with nogil: | ||
| CCoreWorkerProcess.GetCoreWorker().ForceExit(c_exit_type, detail) | ||
|
|
||
| def get_current_task_name(self) -> str: | ||
| """Return the current task name. | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -591,6 +591,16 @@ def ray_start_no_cpu(request, maybe_setup_external_redis): | |
| yield res | ||
|
|
||
|
|
||
| # Simple fixture that starts and stops Ray with default settings. | ||
| @pytest.fixture | ||
| def ray_start(): | ||
| ray.init() | ||
| try: | ||
| yield | ||
| finally: | ||
| ray.shutdown() | ||
|
Comment on lines
+594
to
+601
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there's a fixture just below that does the same thing? |
||
|
|
||
|
|
||
| # The following fixture will start ray with 1 cpu. | ||
| @pytest.fixture | ||
| def ray_start_regular(request, maybe_setup_external_redis): | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SIGTERMhandler should be idempotent. There's no purpose to having an "escalation" here because the caller can & shouldSIGKILLif they want to un-gracefully kill the process.