Skip to content

Commit

Permalink
[Data] Add retry for _sample_fragment during `ParquetDatasource._es…
Browse files Browse the repository at this point in the history
…timate_files_encoding_ratio()` (#42759)
  • Loading branch information
scottjlee committed Jan 27, 2024
1 parent 0c46f31 commit 3627e94
Showing 1 changed file with 16 additions and 8 deletions.
24 changes: 16 additions & 8 deletions python/ray/data/datasource/parquet_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,20 @@ def _deserialize_fragments(
return [p.deserialize() for p in serialized_fragments]


# This retry helps when the upstream datasource is not able to handle
# overloaded read request or failed with some retriable failures.
# For example when reading data from HA hdfs service, hdfs might
# lose connection for some unknown reason expecially when
# simutaneously running many hyper parameter tuning jobs
# with ray.data parallelism setting at high value like the default 200
# Such connection failure can be restored with some waiting and retry.
def _deserialize_fragments_with_retry(
serialized_fragments: List[_SerializedFragment],
) -> List["pyarrow._dataset.ParquetFileFragment"]:
"""
Deserialize the given serialized_fragments with retry upon errors.
This retry helps when the upstream datasource is not able to handle
overloaded read request or failed with some retriable failures.
For example when reading data from HA hdfs service, hdfs might
lose connection for some unknown reason expecially when
simutaneously running many hyper parameter tuning jobs
with ray.data parallelism setting at high value like the default 200
Such connection failure can be restored with some waiting and retry.
"""
min_interval = 0
final_exception = None
for i in range(FILE_READING_RETRY):
Expand Down Expand Up @@ -459,7 +463,11 @@ def _estimate_files_encoding_ratio(self) -> float:
# Use SPREAD scheduling strategy to avoid packing many sampling tasks on
# same machine to cause OOM issue, as sampling can be memory-intensive.
futures.append(
sample_fragment.options(scheduling_strategy=scheduling).remote(
sample_fragment.options(
scheduling_strategy=scheduling,
# Retry in case of transient errors during sampling.
retry_exceptions=[OSError],
).remote(
self._to_batches_kwargs,
self._columns,
self._schema,
Expand Down

0 comments on commit 3627e94

Please sign in to comment.