Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Soft deprecate prefetch_blocks parameter of iter_rows #43349

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 10 additions & 4 deletions python/ray/data/dataset.py
Expand Up @@ -3623,7 +3623,9 @@ def iterator(self) -> DataIterator:
return DataIteratorImpl(self)

@ConsumptionAPI
def iter_rows(self, *, prefetch_blocks: int = 0) -> Iterable[Dict[str, Any]]:
def iter_rows(
self, *, prefetch_batches: int = 0, prefetch_blocks: int = 0
) -> Iterable[Dict[str, Any]]:
"""Return an iterable over the rows in this dataset.

Examples:
Expand All @@ -3637,13 +3639,17 @@ def iter_rows(self, *, prefetch_blocks: int = 0) -> Iterable[Dict[str, Any]]:
Time complexity: O(1)

Args:
prefetch_blocks: The number of blocks to prefetch ahead of the
current block during the scan.
prefetch_batches: The number of batches to prefetch ahead of the current
batch during the scan.
prefetch_blocks: This argument is deprecated. Use ``prefetch_batches``
instead.

Returns:
An iterable over the rows in this dataset.
"""
return self.iterator().iter_rows(prefetch_blocks=prefetch_blocks)
return self.iterator().iter_rows(
prefetch_batches=prefetch_batches, prefetch_blocks=prefetch_blocks
)

@ConsumptionAPI
def iter_batches(
Expand Down
27 changes: 21 additions & 6 deletions python/ray/data/iterator.py
@@ -1,5 +1,6 @@
import abc
import time
import warnings
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -204,7 +205,9 @@ def _create_iterator() -> Iterator[DataBatch]:
def _get_dataset_tag(self) -> str:
return "unknown_dataset"

def iter_rows(self, *, prefetch_blocks: int = 0) -> Iterable[Dict[str, Any]]:
def iter_rows(
self, *, prefetch_batches: int = 0, prefetch_blocks: int = 0
) -> Iterable[Dict[str, Any]]:
"""Return a local row iterable over the dataset.

If the dataset is a tabular dataset (Arrow/Pandas blocks), dicts
Expand All @@ -220,15 +223,27 @@ def iter_rows(self, *, prefetch_blocks: int = 0) -> Iterable[Dict[str, Any]]:
Time complexity: O(1)

Args:
prefetch_blocks: The number of blocks to prefetch ahead of the
current block during the scan.
prefetch_batches: The number of batches to prefetch ahead of the current
batch during the scan.
prefetch_blocks: This argument is deprecated. Use ``prefetch_batches``
instead.

Returns:
An iterable over rows of the dataset.
"""
iter_batch_args = {"batch_size": None, "batch_format": None}

iter_batch_args["prefetch_batches"] = prefetch_blocks
iter_batch_args = {
"batch_size": None,
"batch_format": None,
"prefetch_batches": prefetch_batches,
}
if prefetch_blocks > 0:
warnings.warn(
"`prefetch_blocks` is deprecated in Ray 2.10. Use "
"the `prefetch_batches` parameter to specify the amount of prefetching "
"in terms of batches instead of blocks.",
DeprecationWarning,
)
iter_batch_args["prefetch_batches"] = prefetch_blocks

batch_iterable = self.iter_batches(**iter_batch_args)

Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/tests/test_consumption.py
Expand Up @@ -715,7 +715,7 @@ def to_pylist(table):
assert row == df_row.to_dict()

# Prefetch.
for row, t_row in zip(ds.iter_rows(prefetch_blocks=1), to_pylist(t)):
for row, t_row in zip(ds.iter_rows(prefetch_batches=1), to_pylist(t)):
assert isinstance(row, dict)
assert row == t_row

Expand Down