Skip to content

Commit

Permalink
[Data] Hard deprecate Datasource-related methods (#42936)
Browse files Browse the repository at this point in the history
We deprecated write_datasource and write-related Datasource methods in Ray 2.9. This PR hard deprecates them for Ray 2.10.

In specific, this PR hard deprecates Dataset.write_datasource and removes the following user-facing APIs:

Datasource.on_write_start
Datasource.write
Datasource.on_write_completed
Datasource.on_write_failed
DummyOutputDatasource (renamed to DummyOutputDatasink)
If a user implements one of the removed methods, their program won't break. Instead, they'll receive a deprecation error when they call Dataset.write_datasource.

---------

Signed-off-by: Balaji Veeramani <balaji@anyscale.com>
  • Loading branch information
bveeramani committed Feb 22, 2024
1 parent 153af0f commit d70b5f8
Show file tree
Hide file tree
Showing 12 changed files with 102 additions and 435 deletions.
5 changes: 3 additions & 2 deletions python/ray/data/_internal/planner/plan_write_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def generate_write_fn(
datasink_or_legacy_datasource: Union[Datasink, Datasource], **write_args
) -> Callable[[Iterator[Block], TaskContext], Iterator[Block]]:
# If the write op succeeds, the resulting Dataset is a list of
# WriteResult (one element per write task). Otherwise, an error will
# arbitrary objects (one object per write task). Otherwise, an error will
# be raised. The Datasource can handle execution outcomes with the
# on_write_complete() and on_write_failed().
def fn(blocks: Iterator[Block], ctx) -> Iterator[Block]:
Expand All @@ -29,7 +29,8 @@ def fn(blocks: Iterator[Block], ctx) -> Iterator[Block]:
blocks, ctx, **write_args
)

# NOTE: `WriteResult` isn't a valid block type, so we need to wrap it up.
# NOTE: Write tasks can return anything, so we need to wrap it in a valid block
# type.
import pandas as pd

block = pd.DataFrame({"write_result": [write_result]})
Expand Down
50 changes: 3 additions & 47 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,7 @@
from ray.data._internal.remote_fn import cached_remote_fn
from ray.data._internal.split import _get_num_rows, _split_at_indices
from ray.data._internal.stats import DatasetStats, DatasetStatsSummary, StatsManager
from ray.data._internal.util import (
AllToAllAPI,
ConsumptionAPI,
_is_local_scheme,
get_compute_strategy,
)
from ray.data._internal.util import AllToAllAPI, ConsumptionAPI, get_compute_strategy
from ray.data.aggregate import AggregateFn, Max, Mean, Min, Std, Sum
from ray.data.block import (
VALID_BATCH_FORMATS,
Expand Down Expand Up @@ -3545,51 +3540,12 @@ def write_datasource(
ray_remote_args: Kwargs passed to ``ray.remote`` in the write tasks.
write_args: Additional write args to pass to the :class:`~ray.data.Datasource`.
""" # noqa: E501
warnings.warn(
raise DeprecationWarning(
"`write_datasource` is deprecated in Ray 2.9. Create a `Datasink` and use "
"`write_datasink` instead. For more information, see "
"https://docs.ray.io/en/master/data/api/doc/ray.data.Datasource.html.", # noqa: E501
DeprecationWarning,
"https://docs.ray.io/en/master/data/api/doc/ray.data.Datasink.html.", # noqa: E501
)

if ray_remote_args is None:
ray_remote_args = {}
path = write_args.get("path", None)
if path and _is_local_scheme(path):
if ray.util.client.ray.is_connected():
raise ValueError(
f"The local scheme paths {path} are not supported in Ray Client."
)
ray_remote_args["scheduling_strategy"] = NodeAffinitySchedulingStrategy(
ray.get_runtime_context().get_node_id(),
soft=False,
)

plan = self._plan.copy()

write_op = Write(
self._logical_plan.dag,
datasource,
ray_remote_args=ray_remote_args,
**write_args,
)
logical_plan = LogicalPlan(write_op)

try:
import pandas as pd

datasource.on_write_start(**write_args)
self._write_ds = Dataset(plan, logical_plan).materialize()
blocks = ray.get(self._write_ds._plan.execute().get_blocks())
assert all(
isinstance(block, pd.DataFrame) and len(block) == 1 for block in blocks
)
write_results = [block["write_result"][0] for block in blocks]
datasource.on_write_complete(write_results, **write_args)
except Exception as e:
datasource.on_write_failed([], e)
raise

@ConsumptionAPI(pattern="Time complexity:")
def write_datasink(
self,
Expand Down
7 changes: 2 additions & 5 deletions python/ray/data/datasource/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,12 @@
)
from ray.data.datasource.csv_datasink import _CSVDatasink
from ray.data.datasource.csv_datasource import CSVDatasource
from ray.data.datasource.datasink import Datasink
from ray.data.datasource.datasink import Datasink, DummyOutputDatasink
from ray.data.datasource.datasource import (
Datasource,
DummyOutputDatasource,
RandomIntRowDatasource,
Reader,
ReadTask,
WriteResult,
)
from ray.data.datasource.file_based_datasource import (
FileBasedDatasource,
Expand Down Expand Up @@ -81,7 +79,7 @@
"DefaultBlockWritePathProvider",
"DefaultFileMetadataProvider",
"DefaultParquetMetadataProvider",
"DummyOutputDatasource",
"DummyOutputDatasink",
"FastFileMetadataProvider",
"FileBasedDatasource",
"FileExtensionFilter",
Expand Down Expand Up @@ -114,6 +112,5 @@
"TorchDatasource",
"_WebDatasetDatasink",
"WebDatasetDatasource",
"WriteResult",
"_S3FileSystemWrapper",
]
59 changes: 58 additions & 1 deletion python/ray/data/datasource/datasink.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from typing import Any, Iterable, List, Optional

import ray
from ray.data._internal.execution.interfaces import TaskContext
from ray.data.block import Block
from ray.data.block import Block, BlockAccessor
from ray.util.annotations import DeveloperAPI


Expand Down Expand Up @@ -85,3 +86,59 @@ def num_rows_per_write(self) -> Optional[int]:
If ``None``, Ray Data passes a system-chosen number of rows.
"""
return None


@DeveloperAPI
class DummyOutputDatasink(Datasink):
"""An example implementation of a writable datasource for testing.
Examples:
>>> import ray
>>> from ray.data.datasource import DummyOutputDatasink
>>> output = DummyOutputDatasink()
>>> ray.data.range(10).write_datasink(output)
>>> assert output.num_ok == 1
"""

def __init__(self):
ctx = ray.data.DataContext.get_current()

# Setup a dummy actor to send the data. In a real datasource, write
# tasks would send data to an external system instead of a Ray actor.
@ray.remote(scheduling_strategy=ctx.scheduling_strategy)
class DataSink:
def __init__(self):
self.rows_written = 0
self.enabled = True

def write(self, block: Block) -> str:
block = BlockAccessor.for_block(block)
self.rows_written += block.num_rows()
return "ok"

def get_rows_written(self):
return self.rows_written

self.data_sink = DataSink.remote()
self.num_ok = 0
self.num_failed = 0
self.enabled = True

def write(
self,
blocks: Iterable[Block],
ctx: TaskContext,
) -> Any:
tasks = []
if not self.enabled:
raise ValueError("disabled")
for b in blocks:
tasks.append(self.data_sink.write.remote(b))
ray.get(tasks)
return "ok"

def on_write_complete(self, write_results: List[Any]) -> None:
assert all(w == "ok" for w in write_results), write_results
self.num_ok += 1

def on_write_failed(self, error: Exception) -> None:
self.num_failed += 1
160 changes: 9 additions & 151 deletions python/ray/data/datasource/datasource.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,11 @@
import warnings
from typing import Any, Callable, Iterable, List, Optional
from typing import Callable, Iterable, List, Optional

import numpy as np

import ray
from ray.data._internal.execution.interfaces import TaskContext
from ray.data._internal.util import _check_pyarrow_version
from ray.data.block import Block, BlockAccessor, BlockMetadata
from ray.data.context import DataContext
from ray.types import ObjectRef
from ray.data.block import Block, BlockMetadata
from ray.util.annotations import Deprecated, DeveloperAPI, PublicAPI

WriteResult = Any


@PublicAPI
class Datasource:
Expand All @@ -31,79 +24,13 @@ def create_reader(self, **read_args) -> "Reader":
Args:
read_args: Additional kwargs to pass to the datasource impl.
"""
warnings.warn(
"`create_reader` has been deprecated in Ray 2.9. Instead of creating a "
"`Reader`, implement `Datasource.get_read_tasks` and "
"`Datasource.estimate_inmemory_data_size`.",
DeprecationWarning,
)
return _LegacyDatasourceReader(self, **read_args)

@Deprecated
def prepare_read(self, parallelism: int, **read_args) -> List["ReadTask"]:
"""Deprecated: Please implement create_reader() instead."""
raise NotImplementedError

@Deprecated
def on_write_start(self, **write_args) -> None:
"""Callback for when a write job starts.
Use this method to perform setup for write tasks. For example, creating a
staging bucket in S3.
Args:
write_args: Additional kwargs to pass to the datasource impl.
"""
pass

@Deprecated
def write(
self,
blocks: Iterable[Block],
ctx: TaskContext,
**write_args,
) -> WriteResult:
"""Write blocks out to the datasource. This is used by a single write task.
Args:
blocks: List of data blocks.
ctx: ``TaskContext`` for the write task.
write_args: Additional kwargs to pass to the datasource impl.
Returns:
The output of the write task.
"""
raise NotImplementedError

@Deprecated
def on_write_complete(self, write_results: List[WriteResult], **kwargs) -> None:
"""Callback for when a write job completes.
This can be used to "commit" a write output. This method must
succeed prior to ``write_datasource()`` returning to the user. If this
method fails, then ``on_write_failed()`` will be called.
Args:
write_results: The list of the write task results.
kwargs: Forward-compatibility placeholder.
"""
pass

@Deprecated
def on_write_failed(
self, write_results: List[ObjectRef[WriteResult]], error: Exception, **kwargs
) -> None:
"""Callback for when a write job fails.
This is called on a best-effort basis on write failures.
Args:
write_results: The list of the write task result futures.
error: The first error encountered.
kwargs: Forward-compatibility placeholder.
"""
pass

def get_name(self) -> str:
"""Return a human-readable name for this datasource.
This will be used as the names of the read tasks.
Expand Down Expand Up @@ -237,66 +164,6 @@ def __call__(self) -> Iterable[Block]:
yield from result


@DeveloperAPI
class DummyOutputDatasource(Datasource):
"""An example implementation of a writable datasource for testing.
Examples:
>>> import ray
>>> from ray.data.datasource import DummyOutputDatasource
>>> output = DummyOutputDatasource() # doctest: +SKIP
>>> ray.data.range(10).write_datasource(output) # doctest: +SKIP
>>> assert output.num_ok == 1 # doctest: +SKIP
"""

def __init__(self):
ctx = DataContext.get_current()

# Setup a dummy actor to send the data. In a real datasource, write
# tasks would send data to an external system instead of a Ray actor.
@ray.remote(scheduling_strategy=ctx.scheduling_strategy)
class DataSink:
def __init__(self):
self.rows_written = 0
self.enabled = True

def write(self, block: Block) -> str:
block = BlockAccessor.for_block(block)
self.rows_written += block.num_rows()
return "ok"

def get_rows_written(self):
return self.rows_written

self.data_sink = DataSink.remote()
self.num_ok = 0
self.num_failed = 0
self.enabled = True

def write(
self,
blocks: Iterable[Block],
ctx: TaskContext,
**write_args,
) -> WriteResult:
tasks = []
if not self.enabled:
raise ValueError("disabled")
for b in blocks:
tasks.append(self.data_sink.write.remote(b))
ray.get(tasks)
return "ok"

def on_write_complete(self, write_results: List[WriteResult]) -> None:
assert all(w == "ok" for w in write_results), write_results
self.num_ok += 1

def on_write_failed(
self, write_results: List[ObjectRef[WriteResult]], error: Exception
) -> None:
self.num_failed += 1


@DeveloperAPI
class RandomIntRowDatasource(Datasource):
"""An example datasource that generates rows with random int64 columns.
Expand All @@ -311,22 +178,6 @@ class RandomIntRowDatasource(Datasource):
{'c_0': 4983608804013926748, 'c_1': 1160140066899844087}
"""

def get_name(self) -> str:
"""Return a human-readable name for this datasource.
This will be used as the names of the read tasks.
Note: overrides the base `Datasource` method.
"""
return "RandomInt"

def create_reader(
self,
n: int,
num_columns: int,
) -> List[ReadTask]:
return _RandomIntRowDatasourceReader(n, num_columns)


class _RandomIntRowDatasourceReader(Reader):
def __init__(self, n: int, num_columns: int):
self._n = n
self._num_columns = num_columns
Expand Down Expand Up @@ -379,3 +230,10 @@ def make_block(count: int, num_columns: int) -> Block:
i += block_size

return read_tasks

def get_name(self) -> str:
"""Return a human-readable name for this datasource.
This will be used as the names of the read tasks.
Note: overrides the base `Datasource` method.
"""
return "RandomInt"

0 comments on commit d70b5f8

Please sign in to comment.