[Data] Support UDF retries in case of transient exceptions#63023
Conversation
Signed-off-by: Ayush Kumar <ayushk7102@gmail.com>
…tryable Signed-off-by: Ayush Kumar <ayushk7102@gmail.com>
Signed-off-by: Ayush Kumar <ayushk7102@gmail.com>
Signed-off-by: Ayush Kumar <ayushk7102@gmail.com>
Signed-off-by: Ayush Kumar <ayushk7102@gmail.com>
There was a problem hiding this comment.
Code Review
This pull request implements a retry mechanism for User Defined Functions (UDFs) within Ray Data map tasks, allowing for transient error recovery based on configurable exception patterns. Key changes include the addition of retried_udf_errors and max_udf_retries to the DataContext, updates to the iterate_with_retry utility to include exception causes in pattern matching, and the integration of this retry logic into the MapOperator. Feedback focuses on refactoring duplicated iterator creation logic in map_operator.py to improve maintainability and simplifying the exception string construction in util.py.
Signed-off-by: Ayush Kumar <ayushk7102@gmail.com>
…04_25_trans_retries
richardliaw
left a comment
There was a problem hiding this comment.
A couple comments:
- I'm not that big of a fan of putting
udfin the naming - Is there really a use case where you want to actually separate the read task exception classes from the map task exception classes?
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Signed-off-by: Ayush Kumar <ayushk7102@gmail.com>
…04_25_trans_retries
I think it makes sense to deduplicate behaviour by unifying the transient failures in map tasks and read tasks. The only counter-example I can think of is when a user explicitly wants RateLimit errors to be retried in the IO stage, and not in the |
That being said, if we decide to unify io errors and map tasks, I think that would be better as a follow-up PR to limit scope. |
Signed-off-by: Ayush Kumar <ayushk7102@gmail.com>
Signed-off-by: Ayush Kumar <ayushk7102@gmail.com>
Signed-off-by: Ayush Kumar <ayushk7102@gmail.com>
Signed-off-by: Ayush Kumar <ayushk7102@gmail.com>
… warning Signed-off-by: Ayush Kumar <ayushk7102@gmail.com>
Signed-off-by: Ayush Kumar <ayushk7102@gmail.com>
Signed-off-by: Ayush Kumar <ayushk7102@gmail.com>
…ixtures Signed-off-by: Ayush Kumar <ayushk7102@gmail.com>
Signed-off-by: Ayush Kumar <ayushk7102@gmail.com>
…04_25_trans_retries
…error for next test Signed-off-by: Ayush Kumar <ayushk7102@gmail.com>
…ed in iterate_with_retry Signed-off-by: Ayush Kumar <ayushk7102@gmail.com>
…to retry.py Signed-off-by: Ayush Kumar <ayushk7102@gmail.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Reviewed by Cursor Bugbot for commit 4c44074. Configure here.
Kunchd
left a comment
There was a problem hiding this comment.
Looks good overall, left some nits for core side.
MengjinYan
left a comment
There was a problem hiding this comment.
Only nit comments for the core side change.
…ches_error and format_exception, updated docstrings Signed-off-by: Ayush Kumar <ayushk7102@gmail.com>
…ct#63023) ## Description Adds support for retrying UDF exceptions in Ray Data map tasks. Previously, any exception raised inside a `map_batches` / `map` UDF would immediately fail the task. This PR allows users to configure which exceptions should trigger a retry, enabling more resilient pipelines for transient errors (e.g. rate limits, flaky external services). Two new `DataContext` fields control the behavior of UDF retries: - `retried_map_errors`: False (default, no retries), True (retry any exception), or a `List[str]` (retry only when the exception message contains one of the input substrings). - `max_map_retries`: Maximum retry attempts per task. Default is 3. Retries use the existing `iterate_with_retry` utility fn with exponential backoff. We unwrap `UserCodeException.__cause__` so that the original UDF error message is matched, not the Ray Data wrapper for all exceptions arising from user code. Example usage: To retry a transform in the case of a rate limit error which has the following stack trace: ``` openai.RateLimitError: Error code: 429 - {'error': {'message': 'You exceeded your current quota, please check your plan and billing details.', 'type': 'insufficient_quota', 'param': None, 'code': 'insufficient_quota'}} ``` We can set the following parameters in `DataContext` ``` ctx = ray.data.DataContext.get_current() ctx.retried_map_errors = ["RateLimit", "429"] ctx.max_map_retries = 5 ds.map_batches(my_udf).take_all() ``` ## Additional information ### Implementation - In `_map_task`, read `retried_map_errors` from the context. If set, wrap the transform pipeline in a factory function and pass it to the existing `iterate_with_retry` utility instead of iterating directly - `iterate_with_retry` catches exceptions, checks if the message matches any of the input patterns, and retries with backoff in `max_map_retries` attempts. We extend `iterate_with_retry` to check for `e.__cause__` to unwrap the `UserCodeException` into the actual exception from the UDF - 4 unit tests added in `test_map.py` to test retries exhausted, successful, retry all and non-matching exceptions --------- Signed-off-by: Ayush Kumar <ayushk7102@gmail.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
…ct#63023) ## Description Adds support for retrying UDF exceptions in Ray Data map tasks. Previously, any exception raised inside a `map_batches` / `map` UDF would immediately fail the task. This PR allows users to configure which exceptions should trigger a retry, enabling more resilient pipelines for transient errors (e.g. rate limits, flaky external services). Two new `DataContext` fields control the behavior of UDF retries: - `retried_map_errors`: False (default, no retries), True (retry any exception), or a `List[str]` (retry only when the exception message contains one of the input substrings). - `max_map_retries`: Maximum retry attempts per task. Default is 3. Retries use the existing `iterate_with_retry` utility fn with exponential backoff. We unwrap `UserCodeException.__cause__` so that the original UDF error message is matched, not the Ray Data wrapper for all exceptions arising from user code. Example usage: To retry a transform in the case of a rate limit error which has the following stack trace: ``` openai.RateLimitError: Error code: 429 - {'error': {'message': 'You exceeded your current quota, please check your plan and billing details.', 'type': 'insufficient_quota', 'param': None, 'code': 'insufficient_quota'}} ``` We can set the following parameters in `DataContext` ``` ctx = ray.data.DataContext.get_current() ctx.retried_map_errors = ["RateLimit", "429"] ctx.max_map_retries = 5 ds.map_batches(my_udf).take_all() ``` ## Additional information ### Implementation - In `_map_task`, read `retried_map_errors` from the context. If set, wrap the transform pipeline in a factory function and pass it to the existing `iterate_with_retry` utility instead of iterating directly - `iterate_with_retry` catches exceptions, checks if the message matches any of the input patterns, and retries with backoff in `max_map_retries` attempts. We extend `iterate_with_retry` to check for `e.__cause__` to unwrap the `UserCodeException` into the actual exception from the UDF - 4 unit tests added in `test_map.py` to test retries exhausted, successful, retry all and non-matching exceptions --------- Signed-off-by: Ayush Kumar <ayushk7102@gmail.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Signed-off-by: anindyam1969 <amukherjee@kinetica.com>
…ct#63023) ## Description Adds support for retrying UDF exceptions in Ray Data map tasks. Previously, any exception raised inside a `map_batches` / `map` UDF would immediately fail the task. This PR allows users to configure which exceptions should trigger a retry, enabling more resilient pipelines for transient errors (e.g. rate limits, flaky external services). Two new `DataContext` fields control the behavior of UDF retries: - `retried_map_errors`: False (default, no retries), True (retry any exception), or a `List[str]` (retry only when the exception message contains one of the input substrings). - `max_map_retries`: Maximum retry attempts per task. Default is 3. Retries use the existing `iterate_with_retry` utility fn with exponential backoff. We unwrap `UserCodeException.__cause__` so that the original UDF error message is matched, not the Ray Data wrapper for all exceptions arising from user code. Example usage: To retry a transform in the case of a rate limit error which has the following stack trace: ``` openai.RateLimitError: Error code: 429 - {'error': {'message': 'You exceeded your current quota, please check your plan and billing details.', 'type': 'insufficient_quota', 'param': None, 'code': 'insufficient_quota'}} ``` We can set the following parameters in `DataContext` ``` ctx = ray.data.DataContext.get_current() ctx.retried_map_errors = ["RateLimit", "429"] ctx.max_map_retries = 5 ds.map_batches(my_udf).take_all() ``` ## Additional information ### Implementation - In `_map_task`, read `retried_map_errors` from the context. If set, wrap the transform pipeline in a factory function and pass it to the existing `iterate_with_retry` utility instead of iterating directly - `iterate_with_retry` catches exceptions, checks if the message matches any of the input patterns, and retries with backoff in `max_map_retries` attempts. We extend `iterate_with_retry` to check for `e.__cause__` to unwrap the `UserCodeException` into the actual exception from the UDF - 4 unit tests added in `test_map.py` to test retries exhausted, successful, retry all and non-matching exceptions --------- Signed-off-by: Ayush Kumar <ayushk7102@gmail.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

Description
Adds support for retrying UDF exceptions in Ray Data map tasks. Previously, any exception raised inside a
map_batches/mapUDF would immediately fail the task. This PR allows users to configure which exceptions should trigger a retry, enabling more resilient pipelines for transient errors (e.g. rate limits, flaky external services).Two new
DataContextfields control the behavior of UDF retries:retried_map_errors: False (default, no retries), True (retry any exception), or aList[str](retry only when the exception message contains one of the input substrings).max_map_retries: Maximum retry attempts per task. Default is 3.Retries use the existing
iterate_with_retryutility fn with exponential backoff. We unwrapUserCodeException.__cause__so that the original UDF error message is matched, not the Ray Data wrapper for all exceptions arising from user code.Example usage:
To retry a transform in the case of a rate limit error which has the following stack trace:
We can set the following parameters in
DataContextAdditional information
Implementation
_map_task, readretried_map_errorsfrom the context. If set, wrap the transform pipeline in a factory function and pass it to the existingiterate_with_retryutility instead of iterating directlyiterate_with_retrycatches exceptions, checks if the message matches any of the input patterns, and retries with backoff inmax_map_retriesattempts. We extenditerate_with_retryto check fore.__cause__to unwrap theUserCodeExceptioninto the actual exception from the UDFtest_map.pyto test retries exhausted, successful, retry all and non-matching exceptions