Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[data] allow task failures during execution #41226

Merged
merged 17 commits into from
Nov 28, 2023

Conversation

raulchen
Copy link
Contributor

@raulchen raulchen commented Nov 17, 2023

Why are these changes needed?

Add an option to allow task failures during dataset execution. Data from the failed tasks will be dropped. This can be useful to avoid the entire execution failing because of a small number of unexpected exceptions (e.g., due to corrupted data) that may happen after the execution has been running for long time.

Related issue number

close #41213

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add the failures to the stats?

@@ -227,6 +227,11 @@ def __init__(
# The additonal ray remote args that should be added to
# the task-pool-based data tasks.
self._task_pool_data_task_remote_args: Dict[str, Any] = {}
# Max number of task failures that are allowed before aborting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we call this max_allowed_block_failures or something similar? Since it is not actually tasks.

Also prefer the term "errors" over "failures" since failures is related to system-level exceptions. Actually, does this also apply to tasks that failed due to system-level exceptions? The comment should make it clear what counts.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good

# Max number of task failures that are allowed before aborting
# the Dataset execution. Data of the failed tasks will be dropped.
# Unlimited if negative.
# By default, not failures are allowed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# By default, not failures are allowed.
# By default, no failures are allowed.

max_blocks_to_read_per_op[state] -= num_blocks_read
except Exception as e:
error_message = (
f'An exception occurred in a task of operator "{state.op}".'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to print the exception traceback here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's printed below

@stephanie-wang stephanie-wang added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Nov 17, 2023
# the Dataset execution. Data of the failed tasks will be dropped.
# Unlimited if negative.
# By default, not failures are allowed.
self.max_allowed_task_failures = 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency with the DataContext options, should we expose this in the constructor and define a constant to represent the default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought of that. But now the parameter list is already too long, and in practice we never create the DataContext objects with parameters. Thus, I prefer to not add new parameters.

@@ -227,6 +227,11 @@ def __init__(
# The additonal ray remote args that should be added to
# the task-pool-based data tasks.
self._task_pool_data_task_remote_args: Dict[str, Any] = {}
# Max number of task failures that are allowed before aborting
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Comment on lines 717 to 722
_run(5, 0, 0, False)
_run(5, 0, 1, True)
_run(5, 2, 1, False)
_run(5, 2, 2, False)
_run(5, 2, 3, True)
_run(5, -1, 5, False)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Might be cleaner to use pytest.mark.parametrize here

Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
@@ -76,6 +76,8 @@ class OpRuntimeMetrics:
num_tasks_have_outputs: int = field(default=0, metadata={"map_only": True})
# Number of finished tasks.
num_tasks_finished: int = field(default=0, metadata={"map_only": True})
# Number of failed tasks.
num_tasks_failed: int = field(default=0, metadata={"map_only": True})
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep using the term "tasks" here for consistency with other metrics.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add this to the Dataset.stats() output?

assert (
"During handling of the above exception, another exception occurred"
not in out_str
), out_str
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR also makes the error stack trace more concise. cc @stephanie-wang @c21

Previously it will print:

Traceback (most recent call last):
  File "python/ray/_raylet.pyx", line 347, in ray._raylet.StreamingObjectRefGenerator._next_sync
  File "python/ray/_raylet.pyx", line 4653, in ray._raylet.CoreWorker.try_read_next_object_ref_stream
  File "python/ray/_raylet.pyx", line 447, in ray._raylet.check_status
ray.exceptions.ObjectRefStreamEndOfStreamError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/chenh/code/ray/python/ray/data/_internal/execution/interfaces/physical_operator.py", line 80, in on_data_ready
    meta = ray.get(next(self._streaming_gen))
  File "python/ray/_raylet.pyx", line 302, in ray._raylet.StreamingObjectRefGenerator.__next__
  File "python/ray/_raylet.pyx", line 365, in ray._raylet.StreamingObjectRefGenerator._next_sync
StopIteration

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "<string>", line 8, in <module>
  File "/Users/chenh/code/ray/python/ray/data/dataset.py", line 2459, in take_all
    for row in self.iter_rows():
  File "/Users/chenh/code/ray/python/ray/data/iterator.py", line 225, in _wrapped_iterator
    for batch in batch_iterable:
  File "/Users/chenh/code/ray/python/ray/data/iterator.py", line 183, in _create_iterator
    for batch in iterator:
  File "/Users/chenh/code/ray/python/ray/data/_internal/block_batching/iter_batches.py", line 176, in iter_batches
    next_batch = next(async_batch_iter)
  File "/Users/chenh/code/ray/python/ray/data/_internal/util.py", line 851, in make_async_gen
    raise next_item
  File "/Users/chenh/code/ray/python/ray/data/_internal/util.py", line 828, in execute_computation
    for item in fn(thread_safe_generator):
  File "/Users/chenh/code/ray/python/ray/data/_internal/block_batching/iter_batches.py", line 167, in _async_iter_batches
    yield from extract_data_from_batch(batch_iter)
  File "/Users/chenh/code/ray/python/ray/data/_internal/block_batching/util.py", line 210, in extract_data_from_batch
    for batch in batch_iter:
  File "/Users/chenh/code/ray/python/ray/data/_internal/block_batching/iter_batches.py", line 306, in restore_original_order
    for batch in batch_iter:
  File "/Users/chenh/code/ray/python/ray/data/_internal/block_batching/iter_batches.py", line 218, in threadpool_computations_format_collate
    yield from formatted_batch_iter
  File "/Users/chenh/code/ray/python/ray/data/_internal/block_batching/util.py", line 158, in format_batches
    for batch in block_iter:
  File "/Users/chenh/code/ray/python/ray/data/_internal/block_batching/util.py", line 117, in blocks_to_batches
    for block in block_iter:
  File "/Users/chenh/code/ray/python/ray/data/_internal/block_batching/util.py", line 54, in resolve_block_refs
    for block_ref in block_ref_iter:
  File "/Users/chenh/code/ray/python/ray/data/_internal/block_batching/iter_batches.py", line 254, in prefetch_batches_locally
    for block_ref, metadata in block_ref_iter:
  File "/Users/chenh/code/ray/python/ray/data/_internal/util.py", line 808, in __next__
    return next(self.it)
  File "/Users/chenh/code/ray/python/ray/data/_internal/execution/legacy_compat.py", line 54, in execute_to_legacy_block_iterator
    for bundle in bundle_iter:
  File "/Users/chenh/code/ray/python/ray/data/_internal/execution/interfaces/executor.py", line 37, in __next__
    return self.get_next()
  File "/Users/chenh/code/ray/python/ray/data/_internal/execution/streaming_executor.py", line 154, in get_next
    raise item
  File "/Users/chenh/code/ray/python/ray/data/_internal/execution/streaming_executor.py", line 223, in run
    while self._scheduling_loop_step(self._topology) and not self._shutdown:
  File "/Users/chenh/code/ray/python/ray/data/_internal/execution/streaming_executor.py", line 271, in _scheduling_loop_step
    num_errored_blocks = process_completed_tasks(
  File "/Users/chenh/code/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 416, in process_completed_tasks
    raise e
  File "/Users/chenh/code/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 383, in process_completed_tasks
    num_blocks_read = task.on_data_ready(
  File "/Users/chenh/code/ray/python/ray/data/_internal/execution/interfaces/physical_operator.py", line 93, in on_data_ready
    raise ex
  File "/Users/chenh/code/ray/python/ray/data/_internal/execution/interfaces/physical_operator.py", line 89, in on_data_ready
    ray.get(block_ref)
  File "/Users/chenh/code/ray/python/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/Users/chenh/code/ray/python/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/Users/chenh/code/ray/python/ray/_private/worker.py", line 2595, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(ValueError): ray::Map(map)() (pid=94579, ip=127.0.0.1)
    for b_out in map_transformer.apply_transform(iter(blocks), ctx):
  File "/Users/chenh/code/ray/python/ray/data/_internal/execution/operators/map_transformer.py", line 371, in __call__
    for data in iter:
  File "/Users/chenh/code/ray/python/ray/data/_internal/execution/operators/map_transformer.py", line 196, in __call__
    yield from self._row_fn(input, ctx)
  File "/Users/chenh/code/ray/python/ray/data/_internal/planner/plan_udf_map_op.py", line 234, in transform_fn
    out_row = fn(row)
  File "/Users/chenh/code/ray/python/ray/data/_internal/planner/plan_udf_map_op.py", line 120, in fn
    return op_fn(item, *fn_args, **fn_kwargs)
  File "<string>", line 6, in map
ValueError: foo

Now it prints

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "<string>", line 8, in <module>
  File "/Users/chenh/code/ray/python/ray/data/dataset.py", line 2459, in take_all
    for row in self.iter_rows():
  File "/Users/chenh/code/ray/python/ray/data/iterator.py", line 225, in _wrapped_iterator
    for batch in batch_iterable:
  File "/Users/chenh/code/ray/python/ray/data/iterator.py", line 183, in _create_iterator
    for batch in iterator:
  File "/Users/chenh/code/ray/python/ray/data/_internal/block_batching/iter_batches.py", line 176, in iter_batches
    next_batch = next(async_batch_iter)
  File "/Users/chenh/code/ray/python/ray/data/_internal/util.py", line 851, in make_async_gen
    raise next_item
  File "/Users/chenh/code/ray/python/ray/data/_internal/util.py", line 828, in execute_computation
    for item in fn(thread_safe_generator):
  File "/Users/chenh/code/ray/python/ray/data/_internal/block_batching/iter_batches.py", line 167, in _async_iter_batches
    yield from extract_data_from_batch(batch_iter)
  File "/Users/chenh/code/ray/python/ray/data/_internal/block_batching/util.py", line 210, in extract_data_from_batch
    for batch in batch_iter:
  File "/Users/chenh/code/ray/python/ray/data/_internal/block_batching/iter_batches.py", line 306, in restore_original_order
    for batch in batch_iter:
  File "/Users/chenh/code/ray/python/ray/data/_internal/block_batching/iter_batches.py", line 218, in threadpool_computations_format_collate
    yield from formatted_batch_iter
  File "/Users/chenh/code/ray/python/ray/data/_internal/block_batching/util.py", line 158, in format_batches
    for batch in block_iter:
  File "/Users/chenh/code/ray/python/ray/data/_internal/block_batching/util.py", line 117, in blocks_to_batches
    for block in block_iter:
  File "/Users/chenh/code/ray/python/ray/data/_internal/block_batching/util.py", line 54, in resolve_block_refs
    for block_ref in block_ref_iter:
  File "/Users/chenh/code/ray/python/ray/data/_internal/block_batching/iter_batches.py", line 254, in prefetch_batches_locally
    for block_ref, metadata in block_ref_iter:
  File "/Users/chenh/code/ray/python/ray/data/_internal/util.py", line 808, in __next__
    return next(self.it)
  File "/Users/chenh/code/ray/python/ray/data/_internal/execution/legacy_compat.py", line 54, in execute_to_legacy_block_iterator
    for bundle in bundle_iter:
  File "/Users/chenh/code/ray/python/ray/data/_internal/execution/interfaces/executor.py", line 37, in __next__
    return self.get_next()
  File "/Users/chenh/code/ray/python/ray/data/_internal/execution/streaming_executor.py", line 154, in get_next
    raise item
  File "/Users/chenh/code/ray/python/ray/data/_internal/execution/streaming_executor.py", line 223, in run
    while self._scheduling_loop_step(self._topology) and not self._shutdown:
  File "/Users/chenh/code/ray/python/ray/data/_internal/execution/streaming_executor.py", line 271, in _scheduling_loop_step
    num_errored_blocks = process_completed_tasks(
  File "/Users/chenh/code/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 416, in process_completed_tasks
    raise e from None
  File "/Users/chenh/code/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 383, in process_completed_tasks
    num_blocks_read = task.on_data_ready(
  File "/Users/chenh/code/ray/python/ray/data/_internal/execution/interfaces/physical_operator.py", line 93, in on_data_ready
    raise ex from None
  File "/Users/chenh/code/ray/python/ray/data/_internal/execution/interfaces/physical_operator.py", line 89, in on_data_ready
    ray.get(block_ref)
  File "/Users/chenh/code/ray/python/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/Users/chenh/code/ray/python/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/Users/chenh/code/ray/python/ray/_private/worker.py", line 2595, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(ValueError): ray::Map(map)() (pid=95483, ip=127.0.0.1)
    for b_out in map_transformer.apply_transform(iter(blocks), ctx):
  File "/Users/chenh/code/ray/python/ray/data/_internal/execution/operators/map_transformer.py", line 371, in __call__
    for data in iter:
  File "/Users/chenh/code/ray/python/ray/data/_internal/execution/operators/map_transformer.py", line 196, in __call__
    yield from self._row_fn(input, ctx)
  File "/Users/chenh/code/ray/python/ray/data/_internal/planner/plan_udf_map_op.py", line 234, in transform_fn
    out_row = fn(row)
  File "/Users/chenh/code/ray/python/ray/data/_internal/planner/plan_udf_map_op.py", line 120, in fn
    return op_fn(item, *fn_args, **fn_kwargs)
  File "<string>", line 6, in map
ValueError: foo

@raulchen raulchen removed the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Nov 21, 2023
@raulchen
Copy link
Contributor Author

@stephanie-wang @bveeramani All comments are addressed. thanks!

Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
@stephanie-wang
Copy link
Contributor

LGTM, but can you add the failure stats to Dataset.stats()?

@raulchen
Copy link
Contributor Author

LGTM, but can you add the failure stats to Dataset.stats()?

All the metrics with export_metric=True are already part of stats().

@raulchen raulchen merged commit ca51322 into ray-project:master Nov 28, 2023
15 of 16 checks passed
@raulchen raulchen deleted the allow-task-failure branch November 28, 2023 00:52
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Nov 29, 2023
Add an option to allow task failures during dataset execution. Data from the failed tasks will be dropped. This can be useful to avoid the entire execution failing because of a small number of unexpected exceptions (e.g., due to corrupted data) that may happen after the execution has been running for long time.

---------

Signed-off-by: Hao Chen <chenh1024@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[data] allow execution to continue when some tasks fail
6 participants