Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Fix a bug where SIGTERM is ignored to worker processes #40210

Merged
merged 20 commits into from
Oct 25, 2023

Conversation

rkooo567
Copy link
Contributor

@rkooo567 rkooo567 commented Oct 9, 2023

Why are these changes needed?

When a worker process exits, it should follow the process specified below.

  1. If a task is running, if sys.exit is raised, it raises SystemExit. The task execution loop catches this exception and calls CoreWorker::Exit
  2. Normally, it should call CoreWorker::Exit API as a part of shutdown. This is the API that drains the worker and shutdown the worker.
  3. CoreWorker::Exit stops the main execution loop, which terminates the process.
  4. atexit_handler should call shutdown() API from Python.

Currently, there are 3 issues.

  • the problem is that when a SIGTERM is received, it is ignored when a worker is IDLE
  • when it is not ignored, the behavior is pretty undefined (because it never worked :)). This PR defines the behavior more clearly.
  • I also found when a worker was waiting for ray.get, it keeps checking signal. But if the signal calls sys.exit, it has undefined behavior.

This PR fixes all 3 issues. Let's see the new behavior proposal upon SIGTERM.

New behavior for SIGTERM to worker

  • When a SIGTERM is received, we drain a current worker and shutdown. It doesn't have any timeout (i.e., nothing like it drains, and if it cannot drian, it kills itself ungracefully).
  • We consider SIGTERM as a SystemError (not intentional). This also means when a task is terminated by a SIGTERM, tasks could be retried. This is debatable.
  • Ray can handle SIGTERM from workers all the time unless it is overwritten. Overwriting SIGTERM is not recommended (we should provide a different way to run shutdown handler) I.e., when a worker is idle, when a task is running, and when a task is blocked (by ray.get or sleep or backpressure), it should always handled.

How the issues are fixed?

  1. When a worker is idle
    • We periodically checks check_signals and when a status indicates to exit (i.e., if a signal handler raises SystemExit, meaning it calls sys.exit), it initaites the exit.
  2. When a worker is running a task
    • Raising a sys.exit will interrupt a current running task and Exit. It's been working so far.
  3. When ray.get is running
    • ray.get already periodically checks check_signals. However, when a signal raises a SystemExit, it was ignored so far. Instead, we catches SystemExit from check_signals, which makes ray.get return IntentionalSystemExit (exit code = 0) or UnexpectedSystemExit (exit code != 0). And from check_status we raises a SystemExit, which will trigger code path for 2 (when a worker is running a task).

Related issue number

Closes #40182

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@rkooo567
Copy link
Contributor Author

Note: our shutdown path is a bit complex, and it'd be nice to clean this up. Added to the tech debt item (and some doc regarding how shutdown path works here; https://docs.google.com/document/d/1mr4j5p9a0Ce5oYaUDoaiHUdOsbYfBOmAO4CPQTSjwR4/edit#heading=h.fwxjsbhjhpi5).

Copy link
Contributor

@rickyyx rickyyx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - a few high level questions.

python/ray/tests/test_ray_shutdown.py Show resolved Hide resolved
src/ray/common/ray_config_def.h Show resolved Hide resolved
python/ray/_private/worker.py Outdated Show resolved Hide resolved
Copy link
Contributor

@jjyao jjyao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+10000 for cleaning up the shutdown code

@rkooo567
Copy link
Contributor Author

@jjyao @rickyyx I found another issue from Antoni where sys.exit during ray.get is not properly handled. I also addressed this from this PR. I also updated the PR description and simplified some parts (e.g., sigterm handler now just raises SystemExit).

@@ -689,6 +697,33 @@ cdef int prepare_actor_concurrency_groups(
concurrency_groups.push_back(cg)
return 1


def raise_sys_exit_with_custom_error_message(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we move this out of this _raylet.pyx file but to a better place?

@@ -2057,7 +2086,7 @@ cdef CRayStatus task_execution_handler(
except SystemExit as e:
# Tell the core worker to exit as soon as the result objects
# are processed.
if hasattr(e, "is_ray_terminate"):
if hasattr(e, "is_ray_terminate") and e.is_ray_terminate:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we can have is_ray_terminate although is_ray_terminate = False so that we can add additional message when is_ray_terminate = False

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So before:

  • if is_ray_terminate present => we treat it as intentional system exit

Now:

  • If is_ray_terminate peresent AND true => we raise intentional system exit
  • else: UnexpectedSystemExit ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove is_ray_terminate

/// The overhead of this is only a single digit microsecond.
auto status = options_.check_signals();
if (status.IsIntentionalSystemExit()) {
Exit(rpc::WorkerExitType::INTENDED_USER_EXIT,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a new change (we handle Exit inside cpp instead of python)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why so?

@@ -181,7 +183,7 @@ void CoreWorkerDirectTaskSubmitter::ReturnWorker(const rpc::WorkerAddress addr,
}

auto status = lease_entry.lease_client->ReturnWorker(
addr.port, addr.worker_id, was_error, worker_exiting);
addr.port, addr.worker_id, was_error, error_detail, worker_exiting);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file change is to add more detail when a worker is returned with was_error = true. I found it currently doesn't give any detail

@@ -150,12 +150,15 @@ class CoreWorkerDirectTaskSubmitter {
/// \param[in] addr The address of the worker.
/// \param[in] task_queue_key The scheduling class of the worker.
/// \param[in] was_error Whether the task failed to be submitted.
/// \param[in] error_detail The reason why it was errored.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file change is to add more detail when a worker is returned with was_error = true. I found it currently doesn't give any detail

@@ -131,6 +131,8 @@ message ReturnWorkerRequest {
bool disconnect_worker = 3;
// Whether the worker is exiting and cannot be reused.
bool worker_exiting = 4;
// The error message for disconnect_worker.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file change is to add more detail when a worker is returned with was_error = true. I found it currently doesn't give any detail

<< " died.";
const auto &err_msg = stream.str();
RAY_LOG(INFO) << err_msg;
DestroyWorker(worker, rpc::WorkerExitType::SYSTEM_ERROR, err_msg);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file change is to add more detail when a worker is returned with was_error = true. I found it currently doesn't give any detail

@rkooo567
Copy link
Contributor Author

@jjyao I think we can do this all together with the exit code work for 2.9. The scope I am thinking

  1. Clearly define and document the behavior of exit upon various failure scenario (including sigterm)
  2. Add exit code to all different failures
  3. Add more details to task failures ([Core] Task failure reason is not properly propagated when it is failed by worker/node #40359)
  4. Potentially revamp retry APis with other requirements.

@rkooo567 rkooo567 requested a review from a team as a code owner October 16, 2023 13:56
Copy link
Contributor

@rickyyx rickyyx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I am still trying to understand the changes (and as well as the original issues)

Had a couple of questions there. Not so much about requests for code changes. The change looks pretty clean to me.

And another question: how does this relate to the original issue that Antoni ran into? AFAIK, the original issue where actor leaked when pg removed didn't even get to the point of sigterm handling?

@@ -689,6 +697,33 @@ cdef int prepare_actor_concurrency_groups(
concurrency_groups.push_back(cg)
return 1


def raise_sys_exit_with_custom_error_message(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we move this out of this _raylet.pyx file but to a better place?

python/ray/_raylet.pyx Outdated Show resolved Hide resolved
elif status.IsUnexpectedSystemExit():
with gil:
raise_sys_exit_with_custom_error_message(
message, exit_code=1)
else:
raise RaySystemError(message)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would RaySystemError differ from the above 2 cases now?

@@ -2093,10 +2123,30 @@ cdef c_bool kill_main_task(const CTaskID &task_id) nogil:

cdef CRayStatus check_signals() nogil:
with gil:
# The Python exceptions are not handled if it is raised from cdef,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this mean?

@@ -2057,7 +2086,7 @@ cdef CRayStatus task_execution_handler(
except SystemExit as e:
# Tell the core worker to exit as soon as the result objects
# are processed.
if hasattr(e, "is_ray_terminate"):
if hasattr(e, "is_ray_terminate") and e.is_ray_terminate:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So before:

  • if is_ray_terminate present => we treat it as intentional system exit

Now:

  • If is_ray_terminate peresent AND true => we raise intentional system exit
  • else: UnexpectedSystemExit ?

reported to GCS, and any worker failure error will contain them.

The behavior of this API while a task is running is undefined.
Avoid using the API when a task is still running.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this should only be called as part of the shutdown routine? But not an API for exiting workers in general?

python/ray/tests/test_ray_shutdown.py Show resolved Hide resolved
@@ -794,8 +794,13 @@ void CoreWorker::Exit(
exiting_detail_ = std::optional<std::string>{detail};
}
// Release the resources early in case draining takes a long time.
RAY_CHECK_OK(
local_raylet_client_->NotifyDirectCallTaskBlocked(/*release_resources*/ true));
auto status =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this intentional?

/// The overhead of this is only a single digit microsecond.
auto status = options_.check_signals();
if (status.IsIntentionalSystemExit()) {
Exit(rpc::WorkerExitType::INTENDED_USER_EXIT,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why so?

@@ -365,6 +365,10 @@ class NodeManager : public rpc::NodeManagerServiceHandler,

/// Kill a worker.
///
/// This shouldn't be directly used to kill a worker. If you use this API
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's then the usecase for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will remove this comment.

Copy link
Contributor

@rickyyx rickyyx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Synced offline.

@@ -2057,7 +2086,7 @@ cdef CRayStatus task_execution_handler(
except SystemExit as e:
# Tell the core worker to exit as soon as the result objects
# are processed.
if hasattr(e, "is_ray_terminate"):
if hasattr(e, "is_ray_terminate") and e.is_ray_terminate:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove is_ray_terminate

return CRayStatus.IntentionalSystemExit(error_msg.encode("utf-8"))
else:
return CRayStatus.UnexpectedSystemExit(error_msg.encode("utf-8"))
# By default, if signals raise an exception, Python just prints them.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test signals raising an exception

@@ -365,6 +365,10 @@ class NodeManager : public rpc::NodeManagerServiceHandler,

/// Kill a worker.
///
/// This shouldn't be directly used to kill a worker. If you use this API
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will remove this comment.

@rickyyx rickyyx merged commit 6603dcb into ray-project:master Oct 25, 2023
40 of 47 checks passed
@YQ-Wang
Copy link
Contributor

YQ-Wang commented Oct 25, 2023

Thanks for fixing this. This helps make the error handling more predictable.

rkooo567 added a commit to rkooo567/ray that referenced this pull request Oct 26, 2023
…oject#40210)


---------

Co-authored-by: SangBin Cho <sangcho@sangcho-LT93GQWG9C.local>
Co-authored-by: sangcho <sangcho@anyscale.com>
vitsai pushed a commit that referenced this pull request Oct 26, 2023
#40690)

---------

Co-authored-by: SangBin Cho <sangcho@sangcho-LT93GQWG9C.local>
Co-authored-by: sangcho <sangcho@anyscale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Core] Ray workers are not killed by SIGTERM
5 participants