Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Introduce concurrency argument to replace ComputeStrategy in map-like APIs #41461

Merged
merged 19 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/source/data/examples/gptj_batch_prediction.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"Since we will be using a pretrained model from Hugging Face hub, the simplest way is to use {meth}`map_batches <ray.data.Dataset.map_batches>` with a [callable class UDF](transforming_data_actors). This will allow us to save time by initializing a model just once and then feed it multiple batches of data."
"Since we will be using a pretrained model from Hugging Face hub, the simplest way is to use {meth}`map_batches <ray.data.Dataset.map_batches>` with a [callable class UDF](stateful_transformation). This will allow us to save time by initializing a model just once and then feed it multiple batches of data."
]
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"Since we will be using a pretrained model from Hugging Face hub, the simplest way is to use {meth}`map_batches <ray.data.Dataset.map_batches>` with a [callable class UDF](transforming_data_actors). This will allow us to save time by initializing a model just once and then feed it multiple batches of data."
"Since we will be using a pretrained model from Hugging Face hub, the simplest way is to use {meth}`map_batches <ray.data.Dataset.map_batches>` with a [callable class UDF](stateful_transformation). This will allow us to save time by initializing a model just once and then feed it multiple batches of data."
]
},
{
Expand Down
165 changes: 81 additions & 84 deletions doc/source/data/transforming-data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ This guide shows you how to:

* :ref:`Transform rows <transforming_rows>`
* :ref:`Transform batches <transforming_batches>`
* :ref:`Stateful Transformation <stateful_transformation>`
* :ref:`Groupby and transform groups <transforming_groupby>`
* :ref:`Shuffle rows <shuffling_rows>`
* :ref:`Repartition data <repartitioning_data>`
Expand Down Expand Up @@ -84,22 +85,6 @@ Transforming batches
If your transformation is vectorized like most NumPy or pandas operations, transforming
batches is more performant than transforming rows.

Choosing between tasks and actors
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Ray Data transforms batches with either tasks or actors. Actors perform setup exactly
once. In contrast, tasks require setup every batch. So, if your transformation involves
expensive setup like downloading model weights, use actors. Otherwise, use tasks.

To learn more about tasks and actors, read the
:ref:`Ray Core Key Concepts <core-key-concepts>`.

Transforming batches with tasks
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

To transform batches with tasks, call :meth:`~ray.data.Dataset.map_batches`. Ray Data
uses tasks by default.

.. testcode::

from typing import Dict
Expand All @@ -115,19 +100,90 @@ uses tasks by default.
.map_batches(increase_brightness)
)

.. _transforming_data_actors:
.. _configure_batch_format:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Github somehow shows the change, but this section (Configuring batch format) is not changed.


Configuring batch format
~~~~~~~~~~~~~~~~~~~~~~~~

Ray Data represents batches as dicts of NumPy ndarrays or pandas DataFrames. By
default, Ray Data represents batches as dicts of NumPy ndarrays.

To configure the batch type, specify ``batch_format`` in
:meth:`~ray.data.Dataset.map_batches`. You can return either format from your function.

.. tab-set::

.. tab-item:: NumPy

.. testcode::

from typing import Dict
import numpy as np
import ray

def increase_brightness(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
batch["image"] = np.clip(batch["image"] + 4, 0, 255)
return batch

ds = (
ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
.map_batches(increase_brightness, batch_format="numpy")
)

.. tab-item:: pandas

.. testcode::

import pandas as pd
import ray

def drop_nas(batch: pd.DataFrame) -> pd.DataFrame:
return batch.dropna()

ds = (
ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
.map_batches(drop_nas, batch_format="pandas")
)

Configuring batch size
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Github somehow shows the change, but this section (Configuring batch size) is not changed.

~~~~~~~~~~~~~~~~~~~~~~

Increasing ``batch_size`` improves the performance of vectorized transformations like
NumPy functions and model inference. However, if your batch size is too large, your
program might run out of memory. If you encounter an out-of-memory error, decrease your
``batch_size``.

.. note::

The default batch size depends on your resource type. If you're using CPUs,
the default batch size is 4096. If you're using GPUs, you must specify an explicit
batch size.

.. _stateful_transformation:

Stateful Transformation
==============================

Transforming batches with actors
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
If your transformation is stateful to require expensive setup such as downloading
model weights, use a callable Python class instead of a function. When a Python class
is used, the `__init__` method is called to perform setup exactly once on each worker.
In contrast, function is stateless, so any setup must be performed for each data item..

To transform batches with actors, complete these steps:
Internally, Ray Data uses tasks to execute functions, and uses actors to execute classes.
To learn more about tasks and actors, read the
:ref:`Ray Core Key Concepts <core-key-concepts>`.

To transform data with Python class, complete these steps:
stephanie-wang marked this conversation as resolved.
Show resolved Hide resolved

1. Implement a class. Perform setup in ``__init__`` and transform data in ``__call__``.

2. Create an :class:`~ray.data.ActorPoolStrategy` and configure the number of concurrent
workers. Each worker transforms a partition of data.
2. Configure ``concurrency`` with the number of concurrent workers. Each worker
transforms a partition of data in parallel. You can also pass a tuple of
``(min, max)`` to allow Ray Data to automatically scale the number of concurrent
workers.

3. Call :meth:`~ray.data.Dataset.map_batches` and pass your ``ActorPoolStrategy`` to ``compute``.
3. Call :meth:`~ray.data.Dataset.map_batches`, :meth:`~ray.data.Dataset.map`, or
:meth:`~ray.data.Dataset.flat_map`.

.. tab-set::

Expand All @@ -154,7 +210,7 @@ To transform batches with actors, complete these steps:

ds = (
ray.data.from_numpy(np.ones((32, 100)))
.map_batches(TorchPredictor, compute=ray.data.ActorPoolStrategy(size=2))
.map_batches(TorchPredictor, concurrency=2)
)

.. testcode::
Expand Down Expand Up @@ -188,7 +244,7 @@ To transform batches with actors, complete these steps:
.map_batches(
TorchPredictor,
# Two workers with one GPU each
compute=ray.data.ActorPoolStrategy(size=2),
concurrency=2,
# Batch size is required if you're using GPUs.
batch_size=4,
num_gpus=1
Expand All @@ -200,65 +256,6 @@ To transform batches with actors, complete these steps:

ds.materialize()

.. _configure_batch_format:

Configuring batch format
~~~~~~~~~~~~~~~~~~~~~~~~

Ray Data represents batches as dicts of NumPy ndarrays or pandas DataFrames. By
default, Ray Data represents batches as dicts of NumPy ndarrays.

To configure the batch type, specify ``batch_format`` in
:meth:`~ray.data.Dataset.map_batches`. You can return either format from your function.

.. tab-set::

.. tab-item:: NumPy

.. testcode::

from typing import Dict
import numpy as np
import ray

def increase_brightness(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
batch["image"] = np.clip(batch["image"] + 4, 0, 255)
return batch

ds = (
ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
.map_batches(increase_brightness, batch_format="numpy")
)

.. tab-item:: pandas

.. testcode::

import pandas as pd
import ray

def drop_nas(batch: pd.DataFrame) -> pd.DataFrame:
return batch.dropna()

ds = (
ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
.map_batches(drop_nas, batch_format="pandas")
)

Configuring batch size
~~~~~~~~~~~~~~~~~~~~~~

Increasing ``batch_size`` improves the performance of vectorized transformations like
NumPy functions and model inference. However, if your batch size is too large, your
program might run out of memory. If you encounter an out-of-memory error, decrease your
``batch_size``.

.. note::

The default batch size depends on your resource type. If you're using CPUs,
the default batch size is 4096. If you're using GPUs, you must specify an explicit
batch size.

.. _transforming_groupby:

Groupby and transforming groups
Expand Down
2 changes: 1 addition & 1 deletion doc/source/data/working-with-images.rst
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ Finally, call :meth:`Dataset.map_batches() <ray.data.Dataset.map_batches>`.

For more information on performing inference, see
:ref:`End-to-end: Offline Batch Inference <batch_inference_home>`
and :ref:`Transforming batches with actors <transforming_data_actors>`.
and :ref:`Stateful Transformation <stateful_transformation>`.
c21 marked this conversation as resolved.
Show resolved Hide resolved

.. _saving_images:

Expand Down
2 changes: 1 addition & 1 deletion doc/source/data/working-with-text.rst
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ that sets up and invokes a model. Then, call

For more information on performing inference, see
:ref:`End-to-end: Offline Batch Inference <batch_inference_home>`
and :ref:`Transforming batches with actors <transforming_data_actors>`.
and :ref:`Stateful Transformation <stateful_transformation>`.

.. _saving-text:

Expand Down
11 changes: 9 additions & 2 deletions doc/source/ray-core/_examples/datasets_train/datasets_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,12 +263,18 @@ def column_standard_scaler(s: pd.Series):


def inference(
dataset, model_cls: type, batch_size: int, result_path: str, use_gpu: bool
dataset,
load_model_func,
model_cls: type,
batch_size: int,
result_path: str,
use_gpu: bool,
):
print("inferencing...")
num_gpus = 1 if use_gpu else 0
dataset.map_batches(
model_cls,
fn_constructor_args=[load_model_func],
compute=ray.data.ActorPoolStrategy(),
batch_size=batch_size,
batch_format="pandas",
Expand Down Expand Up @@ -699,7 +705,8 @@ def __call__(self, batch) -> "pd.DataFrame":
)
inference(
inference_dataset,
BatchInferModel(load_model_func),
load_model_func,
BatchInferModel,
100,
inference_output_path,
use_gpu,
Expand Down
2 changes: 0 additions & 2 deletions python/ray/data/_internal/execution/legacy_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
from ray.data._internal.plan import AllToAllStage, ExecutionPlan, OneToOneStage, Stage
from ray.data._internal.stage_impl import LimitStage, RandomizeBlocksStage
from ray.data._internal.stats import DatasetStats, StatsDict
from ray.data._internal.util import validate_compute
from ray.data.block import Block, BlockMetadata, CallableClass, List
from ray.data.context import DataContext
from ray.data.datasource import ReadTask
Expand Down Expand Up @@ -266,7 +265,6 @@ def _stage_to_operator(stage: Stage, input_op: PhysicalOperator) -> PhysicalOper

if isinstance(stage, OneToOneStage):
compute = get_compute(stage.compute)
validate_compute(stage.fn, compute)

block_fn = stage.block_fn
if stage.fn:
Expand Down
3 changes: 1 addition & 2 deletions python/ray/data/_internal/planner/plan_udf_map_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
MapRows,
)
from ray.data._internal.numpy_support import is_valid_udf_return
from ray.data._internal.util import _truncated_repr, validate_compute
from ray.data._internal.util import _truncated_repr
from ray.data.block import (
Block,
BlockAccessor,
Expand All @@ -51,7 +51,6 @@ def plan_udf_map_op(
"""

compute = get_compute(op._compute)
validate_compute(op._fn, compute)
fn, init_fn = _parse_op_fn(op)

if isinstance(op, MapBatches):
Expand Down
Loading
Loading