Skip to content

Commit

Permalink
[Data] Remove prefetch_blocks parameter of batch-based iteration AP…
Browse files Browse the repository at this point in the history
…Is (#43347)

Since Ray 2.4, we raise an error when users use the prefetch_blocks parameter of iteration APIs like iter_batches (users are instructed to use prefetch_batches instead). This PR removes the parameter entirely and any associated dead code.

---------

Signed-off-by: Balaji Veeramani <balaji@anyscale.com>
  • Loading branch information
bveeramani committed Feb 22, 2024
1 parent a8cb077 commit 0aea6db
Show file tree
Hide file tree
Showing 3 changed files with 1 addition and 42 deletions.
6 changes: 1 addition & 5 deletions python/ray/air/util/check_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,13 @@ def __init__(
num_epochs: int = 1,
prefetch_batches: int = 1,
batch_size: Optional[int] = 4096,
# Deprecated.
prefetch_blocks: int = 0,
**kwargs,
):
if not scaling_config:
scaling_config = ScalingConfig(num_workers=1)
super().__init__(
train_loop_per_worker=DummyTrainer.make_train_loop(
num_epochs, prefetch_batches, prefetch_blocks, batch_size
num_epochs, prefetch_batches, batch_size
),
*args,
scaling_config=scaling_config,
Expand All @@ -63,7 +61,6 @@ def __init__(
def make_train_loop(
num_epochs: int,
prefetch_batches: int,
prefetch_blocks: int,
batch_size: Optional[int],
):
"""Make a debug train loop that runs for the given amount of epochs."""
Expand All @@ -83,7 +80,6 @@ def train_loop_per_worker():
batch_start = time.perf_counter()
for batch in data_shard.iter_batches(
prefetch_batches=prefetch_batches,
prefetch_blocks=prefetch_blocks,
batch_size=batch_size,
):
batch_delay = time.perf_counter() - batch_start
Expand Down
15 changes: 0 additions & 15 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3661,8 +3661,6 @@ def iter_batches(
local_shuffle_buffer_size: Optional[int] = None,
local_shuffle_seed: Optional[int] = None,
_collate_fn: Optional[Callable[[DataBatch], CollatedData]] = None,
# Deprecated.
prefetch_blocks: int = 0,
) -> Iterable[DataBatch]:
"""Return an iterable over batches of data.
Expand Down Expand Up @@ -3714,7 +3712,6 @@ def iter_batches(
batch_format = _apply_batch_format(batch_format)
return self.iterator().iter_batches(
prefetch_batches=prefetch_batches,
prefetch_blocks=prefetch_blocks,
batch_size=batch_size,
batch_format=batch_format,
drop_last=drop_last,
Expand All @@ -3735,8 +3732,6 @@ def iter_torch_batches(
drop_last: bool = False,
local_shuffle_buffer_size: Optional[int] = None,
local_shuffle_seed: Optional[int] = None,
# Deprecated
prefetch_blocks: int = 0,
) -> Iterable[TorchBatchType]:
"""Return an iterable over batches of data represented as Torch tensors.
Expand Down Expand Up @@ -3822,7 +3817,6 @@ def iter_torch_batches(
""" # noqa: E501
return self.iterator().iter_torch_batches(
prefetch_batches=prefetch_batches,
prefetch_blocks=prefetch_blocks,
batch_size=batch_size,
dtypes=dtypes,
device=device,
Expand All @@ -3842,8 +3836,6 @@ def iter_tf_batches(
drop_last: bool = False,
local_shuffle_buffer_size: Optional[int] = None,
local_shuffle_seed: Optional[int] = None,
# Deprecated
prefetch_blocks: int = 0,
) -> Iterable[TensorFlowTensorBatchType]:
"""Return an iterable over batches of data represented as TensorFlow tensors.
Expand Down Expand Up @@ -3909,7 +3901,6 @@ def iter_tf_batches(
""" # noqa: E501
return self.iterator().iter_tf_batches(
prefetch_batches=prefetch_batches,
prefetch_blocks=prefetch_blocks,
batch_size=batch_size,
dtypes=dtypes,
drop_last=drop_last,
Expand All @@ -3936,8 +3927,6 @@ def to_torch(
local_shuffle_seed: Optional[int] = None,
unsqueeze_label_tensor: bool = True,
unsqueeze_feature_tensors: bool = True,
# Deprecated
prefetch_blocks: int = 0,
) -> "torch.utils.data.IterableDataset":
"""Return a
`Torch IterableDataset <https://pytorch.org/docs/stable/data.html#torch.utils.data.IterableDataset>`_
Expand Down Expand Up @@ -4037,7 +4026,6 @@ def to_torch(
label_column_dtype=label_column_dtype,
feature_column_dtypes=feature_column_dtypes,
batch_size=batch_size,
prefetch_blocks=prefetch_blocks,
prefetch_batches=prefetch_batches,
drop_last=drop_last,
local_shuffle_buffer_size=local_shuffle_buffer_size,
Expand All @@ -4059,8 +4047,6 @@ def to_tf(
local_shuffle_seed: Optional[int] = None,
feature_type_spec: Union["tf.TypeSpec", Dict[str, "tf.TypeSpec"]] = None,
label_type_spec: Union["tf.TypeSpec", Dict[str, "tf.TypeSpec"]] = None,
# Deprecated
prefetch_blocks: int = 0,
) -> "tf.data.Dataset":
"""Return a `TensorFlow Dataset <https://www.tensorflow.org/api_docs/python/tf/data/Dataset/>`_
over this :class:`~ray.data.Dataset`.
Expand Down Expand Up @@ -4165,7 +4151,6 @@ def to_tf(
feature_columns=feature_columns,
label_columns=label_columns,
prefetch_batches=prefetch_batches,
prefetch_blocks=prefetch_blocks,
drop_last=drop_last,
batch_size=batch_size,
local_shuffle_buffer_size=local_shuffle_buffer_size,
Expand Down
22 changes: 0 additions & 22 deletions python/ray/data/iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,6 @@ def iter_batches(
local_shuffle_seed: Optional[int] = None,
_collate_fn: Optional[Callable[[DataBatch], "CollatedData"]] = None,
_finalize_fn: Optional[Callable[[Any], Any]] = None,
# Deprecated.
prefetch_blocks: int = 0,
) -> Iterable[DataBatch]:
"""Return a batched iterable over the dataset.
Expand Down Expand Up @@ -153,14 +151,6 @@ def iter_batches(
Returns:
An iterable over record batches.
"""

if prefetch_blocks > 0:
raise DeprecationWarning(
"`prefetch_blocks` arg is deprecated in Ray 2.4. Use "
"the `prefetch_batches` arg instead to specify the amount of "
"prefetching in terms of batches instead of blocks."
)

batch_format = _apply_batch_format(batch_format)

def _create_iterator() -> Iterator[DataBatch]:
Expand Down Expand Up @@ -276,8 +266,6 @@ def iter_torch_batches(
drop_last: bool = False,
local_shuffle_buffer_size: Optional[int] = None,
local_shuffle_seed: Optional[int] = None,
# Deprecated.
prefetch_blocks: int = 0,
) -> Iterable["TorchBatchType"]:
"""Return a batched iterable of Torch Tensors over the dataset.
Expand Down Expand Up @@ -404,7 +392,6 @@ def finalize_fn(batch: Union["torch.Tensor", Dict[str, "torch.Tensor"]]):

return self.iter_batches(
prefetch_batches=prefetch_batches,
prefetch_blocks=prefetch_blocks,
batch_size=batch_size,
drop_last=drop_last,
local_shuffle_buffer_size=local_shuffle_buffer_size,
Expand All @@ -422,8 +409,6 @@ def iter_tf_batches(
drop_last: bool = False,
local_shuffle_buffer_size: Optional[int] = None,
local_shuffle_seed: Optional[int] = None,
# Deprecated.
prefetch_blocks: int = 0,
) -> Iterable["TensorFlowTensorBatchType"]:
"""Return a batched iterable of TensorFlow Tensors over the dataset.
Expand Down Expand Up @@ -479,7 +464,6 @@ def iter_tf_batches(

batch_iterable = self.iter_batches(
prefetch_batches=prefetch_batches,
prefetch_blocks=prefetch_blocks,
batch_size=batch_size,
drop_last=drop_last,
local_shuffle_buffer_size=local_shuffle_buffer_size,
Expand Down Expand Up @@ -512,8 +496,6 @@ def to_torch(
local_shuffle_seed: Optional[int] = None,
unsqueeze_label_tensor: bool = True,
unsqueeze_feature_tensors: bool = True,
# Deprecated.
prefetch_blocks: int = 0,
) -> "torch.utils.data.IterableDataset":
"""Return a Torch IterableDataset over this dataset.
Expand Down Expand Up @@ -647,7 +629,6 @@ def make_generator():
for batch in self.iter_batches(
batch_size=batch_size,
batch_format="pandas",
prefetch_blocks=prefetch_blocks,
prefetch_batches=prefetch_batches,
drop_last=drop_last,
local_shuffle_buffer_size=local_shuffle_buffer_size,
Expand Down Expand Up @@ -702,8 +683,6 @@ def to_tf(
local_shuffle_seed: Optional[int] = None,
feature_type_spec: Union["tf.TypeSpec", Dict[str, "tf.TypeSpec"]] = None,
label_type_spec: Union["tf.TypeSpec", Dict[str, "tf.TypeSpec"]] = None,
# Deprecated.
prefetch_blocks: int = 0,
) -> "tf.data.Dataset":
"""Return a TF Dataset over this dataset.
Expand Down Expand Up @@ -843,7 +822,6 @@ def convert_batch_to_tensors(
def generator():
for batch in self.iter_batches(
prefetch_batches=prefetch_batches,
prefetch_blocks=prefetch_blocks,
batch_size=batch_size,
drop_last=drop_last,
local_shuffle_buffer_size=local_shuffle_buffer_size,
Expand Down

0 comments on commit 0aea6db

Please sign in to comment.