Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Add fault tolerance to remote tasks #41084

Merged
merged 5 commits into from
Nov 14, 2023

Conversation

bveeramani
Copy link
Member

@bveeramani bveeramani commented Nov 13, 2023

Why are these changes needed?

Fault tolerance is table stakes for Ray Data, and this PR adds the feature for batch inference. To ensure that tasks are retried in the case of system failures (like nodes crashing), this PR configures max_retries for all remote tasks. It also adds a chaos release test.

Successful release test run: https://buildkite.com/ray-project/release/builds/1001#018bc754-e4e0-4e19-bceb-6954bb95fc81

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Balaji Veeramani <balaji@anyscale.com>
Signed-off-by: Balaji Veeramani <balaji@anyscale.com>
Signed-off-by: Balaji Veeramani <balaji@anyscale.com>
Signed-off-by: Balaji Veeramani <balaji@anyscale.com>
Signed-off-by: Balaji Veeramani <balaji@anyscale.com>
@@ -1394,6 +1394,7 @@ def get_and_run_node_killer(
lifetime=None,
no_start=False,
max_nodes_to_kill=2,
node_kill_delay_s=0,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The node killer kills nodes at random times:

sleep_interval = random.random() * self.node_kill_interval_s

If a node is killed while runtime env is initialized, the program will fail, even if Ray Data is fault tolerant. To avoid this sort of flakiness, I've added a delay.

@@ -352,7 +352,7 @@ def _apply_default_remote_args(ray_remote_args: Dict[str, Any]) -> Dict[str, Any
"max_task_retries" not in ray_remote_args
and ray_remote_args.get("max_restarts") != 0
):
ray_remote_args["max_task_retries"] = 5
ray_remote_args["max_task_retries"] = -1
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defaults are specified all over the code base. We should really consolidate them at some point. Maybe something like #39797.

I took a stab at it, but it's a bit more involved than I expected, so I'm leaving it to a future PR.

@@ -18,12 +18,12 @@ def cached_remote_fn(fn: Any, **ray_remote_args) -> Any:
"""
if fn not in CACHED_FUNCTIONS:
default_ray_remote_args = {
"retry_exceptions": True,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This default was set two years ago by #18296. But, I think we should get rid of it at this point for a couple reasons:

  1. It's obsolete. [Data] Retry open files with expotential backoff #38773 added more specific retries.
  2. It can lead to bad UX. For example, if a non-transient error occurs, Ray will repeatedly retry the task, and you won't realize there's an error until your program potentially times out (or it never does!). Ideally, your program would immediately fail with the non-transient error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I was doing some large-scale workloads, I depend on this flag to retry failed tasks due to AWS throttling or other temporary network issues.
If we remove it, we should allow retrying per op. E.g., set it to true only for read/write.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@raulchen what were the specific errors you ran into? Shouldn't throttling be handled by #38773?

Copy link
Contributor

@raulchen raulchen Nov 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One example is AWS Error NETWORK_CONNECTION during CreateMultipartUpload operation: curlCode: 28, Timeout was reached.
I guess it's because write is not covered by that PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Weird. We should be performing retries for writes:

with _open_file_with_retry(
write_path,
lambda: self.filesystem.open_output_stream(
write_path, **self.open_stream_args
),
) as file:
self.write_row_to_file(row, file)

@raulchen did you run into this issue after #38773 was merged? If so, I can add retry_exceptions for reads and writes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I got that error with last week's nightly.
After a second thought, I feel it's not a good idea to simply add retry_exceptions for reads and writes.
One opposite example is if users pass in a list of parquet files for read, where the files have different schemas. In this case, the read tasks will retry indefinitely without printing any info.
Maybe it's better to just add retry for IO-related code. I think #38773 already covered read. We'll need to also do something similar for writes.
It's ok to do that in a follow-up PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bveeramani created an issue here #41211

@bveeramani bveeramani marked this pull request as ready for review November 13, 2023 07:57
@bveeramani bveeramani merged commit 29aea3d into ray-project:master Nov 14, 2023
17 of 23 checks passed
@bveeramani bveeramani deleted the fault-tolerance branch November 14, 2023 19:56
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Nov 29, 2023
Fault tolerance is table stakes for Ray Data, and this PR adds the feature for batch inference. To ensure that tasks are retried in the case of system failures (like nodes crashing), this PR configures max_retries for all remote tasks. It also adds a chaos release test.

---------

Signed-off-by: Balaji Veeramani <balaji@anyscale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants