Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Fix a bug where SIGTERM is ignored to worker processes #40210

Merged
merged 20 commits into from
Oct 25, 2023
14 changes: 10 additions & 4 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@
import ray.job_config
import ray.remote_function
from ray import ActorID, JobID, Language, ObjectRef
from ray._raylet import StreamingObjectRefGenerator
from ray._raylet import (
StreamingObjectRefGenerator,
raise_sys_exit_with_custom_error_message,
)
from ray.runtime_env.runtime_env import _merge_runtime_env
from ray._private import ray_option_utils
from ray._private.client_mode_hook import client_mode_hook
Expand Down Expand Up @@ -785,8 +788,10 @@ def main_loop(self):
"""The main loop a worker runs to receive and execute tasks."""

def sigterm_handler(signum, frame):
shutdown(True)
sys.exit(1)
raise_sys_exit_with_custom_error_message(
"The process receives a SIGTERM.", exit_code=1
)
# Note: shutdown() function is called from atexit handler.

ray._private.utils.set_sigterm_handler(sigterm_handler)
self.core_worker.run_task_loop()
Expand Down Expand Up @@ -1741,7 +1746,8 @@ def shutdown(_exiting_interpreter: bool = False):
# we will tear down any processes spawned by ray.init() and the background
# IO thread in the core worker doesn't currently handle that gracefully.
if hasattr(global_worker, "core_worker"):
global_worker.core_worker.shutdown()
if global_worker.mode == SCRIPT_MODE or global_worker.mode == LOCAL_MODE:
global_worker.core_worker.shutdown_driver()
del global_worker.core_worker
# We need to reset function actor manager to clear the context
global_worker.function_actor_manager = FunctionActorManager(global_worker)
Expand Down
2 changes: 1 addition & 1 deletion python/ray/_private/workers/default_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@
if worker_process_setup_hook_key:
error = load_and_execute_setup_hook(worker_process_setup_hook_key)
if error is not None:
worker.core_worker.exit_worker("system", error)
worker.core_worker.drain_and_exit_worker("system", error)

if mode == ray.WORKER_MODE:
worker.main_loop()
Expand Down
111 changes: 82 additions & 29 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ from ray.includes.common cimport (
PythonGetLogBatchLines,
WORKER_EXIT_TYPE_USER_ERROR,
WORKER_EXIT_TYPE_SYSTEM_ERROR,
WORKER_EXIT_TYPE_INTENTIONAL_SYSTEM_ERROR,
kResourceUnitScaling,
kImplicitResourcePrefix,
kWorkerSetupHookKeyName,
Expand Down Expand Up @@ -456,6 +457,13 @@ cdef int check_status(const CRayStatus& status) nogil except -1:
raise ValueError(message)
elif status.IsRpcError():
raise RpcError(message, rpc_code=status.rpc_code())
elif status.IsIntentionalSystemExit():
with gil:
raise_sys_exit_with_custom_error_message(message)
elif status.IsUnexpectedSystemExit():
with gil:
raise_sys_exit_with_custom_error_message(
message, exit_code=1)
else:
raise RaySystemError(message)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would RaySystemError differ from the above 2 cases now?


Expand Down Expand Up @@ -692,6 +700,31 @@ cdef int prepare_actor_concurrency_groups(
concurrency_groups.push_back(cg)
return 1


def raise_sys_exit_with_custom_error_message(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we move this out of this _raylet.pyx file but to a better place?

ray_terminate_msg: str,
exit_code: int = 0) -> None:
"""It is equivalent to sys.exit, but it can contain
a custom message. Custom message is reported to
raylet and is accessible via GCS (from `ray get workers`).

Note that sys.exit == raise SystemExit. I.e., this API
simply raises SystemExit with a custom error message accessible
via `e.ray_terminate_msg`.

Args:
ray_terminate_msg: The error message to propagate to GCS.
exit_code: The exit code. If it is not 0, it is considered
as a system error.
"""
# Raising SystemExit(0) is equivalent to
# sys.exit(0).
# https://docs.python.org/3/library/exceptions.html#SystemExit
e = SystemExit(exit_code)
e.ray_terminate_msg = ray_terminate_msg
raise e


cdef prepare_args_and_increment_put_refs(
CoreWorker core_worker,
Language language, args,
Expand Down Expand Up @@ -1112,13 +1145,13 @@ cdef report_streaming_generator_output(
is_plasma_object(return_obj.second)))

with nogil:
CCoreWorkerProcess.GetCoreWorker().ReportGeneratorItemReturns(
check_status(CCoreWorkerProcess.GetCoreWorker().ReportGeneratorItemReturns(
return_obj,
context.generator_id,
context.caller_address,
generator_index,
context.attempt_number,
context.waiter)
context.waiter))
context.generator_index += 1
return True
else:
Expand Down Expand Up @@ -1146,13 +1179,13 @@ cdef report_streaming_generator_output(
"Writes to a ObjectRefStream of an "
"index {}".format(context.generator_index))
with nogil:
CCoreWorkerProcess.GetCoreWorker().ReportGeneratorItemReturns(
check_status(CCoreWorkerProcess.GetCoreWorker().ReportGeneratorItemReturns(
return_obj,
context.generator_id,
context.caller_address,
generator_index,
context.attempt_number,
context.waiter)
context.waiter))
context.generator_index += 1
return False

Expand Down Expand Up @@ -1515,10 +1548,7 @@ cdef void execute_task(
# actor, Ray will exit the actor.
def exit_current_actor_if_asyncio():
if core_worker.current_actor_is_asyncio():
error = SystemExit(0)
error.is_ray_terminate = True
error.ray_terminate_msg = "exit_actor() is called."
raise error
raise_sys_exit_with_custom_error_message("exit_actor() is called.")

function_descriptor = CFunctionDescriptorToPython(
ray_function.GetFunctionDescriptor())
Expand Down Expand Up @@ -1963,7 +1993,7 @@ cdef execute_task_with_cancellation_handler(
actor = None
actor_id = core_worker.get_actor_id()
if not actor_id.is_nil():
actor = core_worker.actors[actor_id]
actor = worker.actors[actor_id]

store_task_errors(
worker, e,
Expand Down Expand Up @@ -1993,12 +2023,9 @@ cdef execute_task_with_cancellation_handler(
# If we've reached the max number of executions for this worker, exit.
task_counter = manager.get_task_counter(function_descriptor)
if task_counter == execution_info.max_calls:
exit = SystemExit(0)
exit.is_ray_terminate = True
exit.ray_terminate_msg = (
raise_sys_exit_with_custom_error_message(
"max_call has reached, "
f"max_calls: {execution_info.max_calls}")
raise exit

cdef shared_ptr[LocalMemoryBuffer] ray_error_to_memory_buf(ray_error):
cdef bytes py_bytes = ray_error.to_bytes()
Expand Down Expand Up @@ -2086,26 +2113,27 @@ cdef CRayStatus task_execution_handler(
except SystemExit as e:
# Tell the core worker to exit as soon as the result objects
# are processed.
if hasattr(e, "is_ray_terminate"):
return CRayStatus.IntentionalSystemExit(e.ray_terminate_msg)
elif hasattr(e, "is_creation_task_error"):
if hasattr(e, "is_creation_task_error"):
return CRayStatus.CreationTaskError(e.init_error_message)
elif e.code is not None and e.code == 0:
# This means the system exit was
# normal based on the python convention.
# https://docs.python.org/3/library/sys.html#sys.exit
return CRayStatus.IntentionalSystemExit(
f"Worker exits with an exit code {e.code}.")
msg = f"Worker exits with an exit code {e.code}."
if hasattr(e, "ray_terminate_msg"):
msg += (f" {e.ray_terminate_msg}")
return CRayStatus.IntentionalSystemExit(msg)
else:
msg = f"Worker exits with an exit code {e.code}."
# In K8s, SIGTERM likely means we hit memory limits, so print
# a more informative message there.
if "KUBERNETES_SERVICE_HOST" in os.environ:
msg += (
" The worker may have exceeded K8s pod memory limits.")
if hasattr(e, "ray_terminate_msg"):
msg += (f" {e.ray_terminate_msg}")
if hasattr(e, "unexpected_error_traceback"):
msg += (f"\n {e.unexpected_error_traceback}")
logger.exception(msg)
msg += (f" {e.unexpected_error_traceback}")
return CRayStatus.UnexpectedSystemExit(msg)

return CRayStatus.OK()
Expand All @@ -2122,10 +2150,30 @@ cdef c_bool kill_main_task(const CTaskID &task_id) nogil:

cdef CRayStatus check_signals() nogil:
with gil:
# The Python exceptions are not handled if it is raised from cdef,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this mean?

# so we have to handle it here.
try:
PyErr_CheckSignals()
except KeyboardInterrupt:
return CRayStatus.Interrupted(b"")
except SystemExit as e:
error_msg = (
"SystemExit is raised (sys.exit is called).")
if e.code is not None:
error_msg += f" Exit code: {e.code}."
else:
error_msg += " Exit code was not specified."

if hasattr(e, "ray_terminate_msg"):
error_msg += f" {e.ray_terminate_msg}"

if e.code and e.code == 0:
return CRayStatus.IntentionalSystemExit(error_msg.encode("utf-8"))
else:
return CRayStatus.UnexpectedSystemExit(error_msg.encode("utf-8"))
# By default, if signals raise an exception, Python just prints them.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test signals raising an exception

# To keep the same behavior, we don't handle any other exceptions.

return CRayStatus.OK()


Expand Down Expand Up @@ -3059,15 +3107,15 @@ cdef class CoreWorker:
self._task_id_to_future = {}
self.thread_pool_for_async_event_loop = None

def shutdown(self):
def shutdown_driver(self):
# If it's a worker, the core worker process should have been
# shutdown. So we can't call
# `CCoreWorkerProcess.GetCoreWorker().GetWorkerType()` here.
# Instead, we use the cached `is_driver` flag to test if it's a
# driver.
if self.is_driver:
with nogil:
CCoreWorkerProcess.Shutdown()
assert self.is_driver
with nogil:
CCoreWorkerProcess.Shutdown()

def notify_raylet(self):
with nogil:
Expand All @@ -3077,13 +3125,16 @@ cdef class CoreWorker:
with nogil:
CCoreWorkerProcess.RunTaskExecutionLoop()

def exit_worker(self, exit_type: str, c_string detail):
def drain_and_exit_worker(self, exit_type: str, c_string detail):
"""
Exit the current worker process. This API should only be used by
a worker. If this API is called, the worker will finish currently
executing task, initiate the shutdown, and stop itself gracefully.
The given exit_type and detail will be reported to GCS, and any
worker failure error will contain them.
a worker. If this API is called, the worker will wait to finish
currently executing task, initiate the shutdown, and stop
itself gracefully. The given exit_type and detail will be
reported to GCS, and any worker failure error will contain them.

The behavior of this API while a task is running is undefined.
Avoid using the API when a task is still running.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this should only be called as part of the shutdown routine? But not an API for exiting workers in general?

"""
cdef:
CWorkerExitType c_exit_type
Expand All @@ -3093,6 +3144,8 @@ cdef class CoreWorker:
c_exit_type = WORKER_EXIT_TYPE_USER_ERROR
if exit_type == "system":
c_exit_type = WORKER_EXIT_TYPE_SYSTEM_ERROR
elif exit_type == "intentional_system_exit":
c_exit_type = WORKER_EXIT_TYPE_INTENTIONAL_SYSTEM_ERROR
else:
raise ValueError(f"Invalid exit type: {exit_type}")
assert not self.is_driver
Expand Down
6 changes: 2 additions & 4 deletions python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
STREAMING_GENERATOR_RETURN,
PythonFunctionDescriptor,
StreamingObjectRefGenerator,
raise_sys_exit_with_custom_error_message,
)
from ray.exceptions import AsyncioActorExit
from ray.util.annotations import DeveloperAPI, PublicAPI
Expand Down Expand Up @@ -1462,10 +1463,7 @@ def exit_actor():

# Set a flag to indicate this is an intentional actor exit. This
# reduces log verbosity.
exit = SystemExit(0)
exit.is_ray_terminate = True
exit.ray_terminate_msg = "exit_actor() is called."
raise exit
raise_sys_exit_with_custom_error_message("exit_actor() is called.")
else:
raise TypeError(
"exit_actor API is called on a non-actor worker, "
Expand Down
Loading
Loading