-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Data] Retry open files with expotential backoff #38773
Conversation
OPEN_FILE_RETRY_MAX_BACKOFF_SECONDS = 32 | ||
|
||
# The max number of retry attempts for opening file. | ||
OPEN_FILE_RETRY_MAX_ATTEMPTS = 10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make these configurable by the user?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel these configurations are advanced, so hesitate to put them in DataContext now. Users can change these Python variable directly, such as FILE_SIZE_FETCH_PARALLELIZATION_THRESHOLD
and PATHS_PER_FILE_SIZE_FETCH_TASK
above. I think we can add them into DataContext, if we see more users ask for this.
import random | ||
import time | ||
|
||
for i in range(OPEN_FILE_RETRY_MAX_ATTEMPTS): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rewrite this to guard against OPEN_FILE_RETRY_MAX_ATTEMPTS set to 0? The name is also slightly confusing (can't tell if it's the total number of attempts or number of retries).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me rename to OPEN_FILE_MAX_ATTEMPTS
, and throw exception if it'set smaller than 1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
Signed-off-by: Cheng Su <scnju13@gmail.com>
Signed-off-by: Cheng Su <scnju13@gmail.com>
|
||
if OPEN_FILE_MAX_ATTEMPTS < 1: | ||
raise ValueError( | ||
"OPEN_FILE_MAX_ATTEMPTS cannot be negative or 0, but get: " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"OPEN_FILE_MAX_ATTEMPTS cannot be negative or 0, but get: " | |
"OPEN_FILE_MAX_ATTEMPTS cannot be negative or 0. Get: " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
(2 ** (i + 1)) * random.random(), | ||
OPEN_FILE_RETRY_MAX_BACKOFF_SECONDS, | ||
) | ||
logger.get_logger().debug( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be an info? and also print the file name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be an info?
I think the point of the PR is to not pollute user console, so we do not print the stack trace, and doing retry automatically.
I thought about making an info at the beginning, but I think users probably don't care about it. Think about the AWS cli, it never prints out if it's retried aws s3 cp <...>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
print the file name?
File name is also included in log, anything I am missing?
@@ -162,6 +166,33 @@ def test_write_datasource(ray_start_regular_shared): | |||
assert ray.get(output.data_sink.get_rows_written.remote()) == 10 | |||
|
|||
|
|||
def test_open_file_with_retry(ray_start_regular_shared): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's kind of weird to put this test in test_formats.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, moved to test_file_based_datasource.py
.
ray.data.datasource.file_based_datasource.OPEN_FILE_MAX_ATTEMPTS = 3 | ||
counter = Counter() | ||
with pytest.raises(OSError): | ||
_open_file_with_retry("dummy", lambda: counter.foo(4)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also test the case where the function can succeed after retry?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@raulchen above assertion on line 184 already tests it. Any other situation you are thinking about?
_open_file_with_retry("dummy", lambda: counter.foo(4)) | ||
ray.data.datasource.file_based_datasource.OPEN_FILE_MAX_ATTEMPTS = ( | ||
original_max_attempts | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, patch this attribute, so it can restore the original value even if the test fails.
Example:
def test_resource_constrained_triggers_autoscaling(monkeypatch):
RESOURCE_REQUEST_TIMEOUT = 5
monkeypatch.setattr(
ray.data._internal.execution.autoscaling_requester,
"RESOURCE_REQUEST_TIMEOUT",
RESOURCE_REQUEST_TIMEOUT,
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to try: finally:
to restore for now. I feel that already simple enough.
@@ -162,6 +166,33 @@ def test_write_datasource(ray_start_regular_shared): | |||
assert ray.get(output.data_sink.get_rows_written.remote()) == 10 | |||
|
|||
|
|||
def test_open_file_with_retry(ray_start_regular_shared): | |||
class Counter: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename it to something like FlakyFileOpener? also max_attempts
can be set in __init__
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
Signed-off-by: Cheng Su <scnju13@gmail.com>
Signed-off-by: Cheng Su <scnju13@gmail.com>
if is_retryable and i + 1 < OPEN_FILE_MAX_ATTEMPTS: | ||
# Retry with binary expoential backoff with random jitter. | ||
backoff = min( | ||
(2 ** (i + 1)) * random.random(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is same as AWS retry behavior - https://docs.aws.amazon.com/sdkref/latest/guide/feature-retry-behavior.html .
This is P0 bugfix issue for our users, who encountered this S3 issue in 2.6. cc @zhe-thoughts for review. |
The test failure is not related here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P0 fix awaited by a user, OK to merge
* Retry open files with expotential backoff Signed-off-by: Cheng Su <scnju13@gmail.com> Signed-off-by: e428265 <arvind.chandramouli@lmco.com>
* Retry open files with expotential backoff Signed-off-by: Cheng Su <scnju13@gmail.com> Signed-off-by: Victor <vctr.y.m@example.com>
Why are these changes needed?
This PR is to retry open files call (for both read and write), with expotential backoff internally in Ray Data task. The motivation is to avoid throw throttling exception for users when reading/writing many files to remote storage, such as S3.
Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.