Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Hard deprecate BlockWritePathProvider #43342

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
92 changes: 8 additions & 84 deletions python/ray/data/datasource/block_path_provider.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
import posixpath
from typing import TYPE_CHECKING, Optional
from ray.util.annotations import Deprecated

from ray.util.annotations import DeveloperAPI

if TYPE_CHECKING:
import pyarrow


@DeveloperAPI
@Deprecated
class BlockWritePathProvider:
"""Abstract callable that provides concrete output paths when writing
dataset blocks.
Expand All @@ -16,87 +10,17 @@ class BlockWritePathProvider:
DefaultBlockWritePathProvider
"""

def _get_write_path_for_block(
self,
base_path: str,
*,
filesystem: Optional["pyarrow.fs.FileSystem"] = None,
dataset_uuid: Optional[str] = None,
task_index: Optional[int] = None,
block_index: Optional[int] = None,
file_format: Optional[str] = None,
) -> str:
"""
Resolves and returns the write path for the given dataset block. When
implementing this method, care should be taken to ensure that a unique
path is provided for every dataset block.

Args:
base_path: The base path to write the dataset block out to. This is
expected to be the same for all blocks in the dataset, and may
point to either a directory or file prefix.
filesystem: The filesystem implementation that will be used to
write a file out to the write path returned.
dataset_uuid: Unique identifier for the dataset that this block
belongs to.
block: The block to write.
task_index: Ordered index of the write task within its parent
dataset.
block_index: Ordered index of the block to write within its parent
write task.
file_format: File format string for the block that can be used as
the file extension in the write path returned.

Returns:
The dataset block write path.
"""
raise NotImplementedError

def __call__(
self,
base_path: str,
*,
filesystem: Optional["pyarrow.fs.FileSystem"] = None,
dataset_uuid: Optional[str] = None,
task_index: Optional[int] = None,
block_index: Optional[int] = None,
file_format: Optional[str] = None,
) -> str:
return self._get_write_path_for_block(
base_path,
filesystem=filesystem,
dataset_uuid=dataset_uuid,
task_index=task_index,
block_index=block_index,
file_format=file_format,
def __init__(self) -> None:
raise DeprecationWarning(
"`BlockWritePathProvider` has been deprecated in favor of "
"`FilenameProvider`. For more information, see "
"https://docs.ray.io/en/master/data/api/doc/ray.data.datasource.FilenameProvider.html", # noqa: E501
)


@DeveloperAPI
@Deprecated
class DefaultBlockWritePathProvider(BlockWritePathProvider):
"""Default block write path provider implementation that writes each
dataset block out to a file of the form:
{base_path}/{dataset_uuid}_{task_index}_{block_index}.{file_format}
"""

def _get_write_path_for_block(
self,
base_path: str,
*,
filesystem: Optional["pyarrow.fs.FileSystem"] = None,
dataset_uuid: Optional[str] = None,
task_index: Optional[int] = None,
block_index: Optional[int] = None,
file_format: Optional[str] = None,
) -> str:
assert task_index is not None
# Add the task index to the filename to make sure that each task writes
# to a different and deterministically generated filename.
if block_index is not None:
suffix = f"{dataset_uuid}_{task_index:06}_{block_index:06}.{file_format}"
else:
suffix = f"{dataset_uuid}_{task_index:06}.{file_format}"
# Uses POSIX path for cross-filesystem compatibility, since PyArrow
# FileSystem paths are always forward slash separated, see:
# https://arrow.apache.org/docs/python/filesystems.html
return posixpath.join(base_path, suffix)
38 changes: 9 additions & 29 deletions python/ray/data/datasource/file_datasink.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import posixpath
import warnings
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional

from ray._private.utils import _add_creatable_buckets_param_if_s3_uri
Expand Down Expand Up @@ -60,14 +59,13 @@ def __init__(
open_stream_args = {}

if block_path_provider is not None:
warnings.warn(
raise DeprecationWarning(
"`block_path_provider` has been deprecated in favor of "
"`filename_provider`. For more information, see "
"https://docs.ray.io/en/master/data/api/doc/ray.data.datasource.FilenameProvider.html", # noqa: E501
DeprecationWarning,
)

if filename_provider is None and block_path_provider is None:
if filename_provider is None:
filename_provider = _DefaultFilenameProvider(
dataset_uuid=dataset_uuid, file_format=file_format
)
Expand Down Expand Up @@ -183,16 +181,9 @@ def write_row_to_file(self, row: Dict[str, Any], file: "pyarrow.NativeFile"):

def write_block(self, block: BlockAccessor, block_index: int, ctx: TaskContext):
for row_index, row in enumerate(block.iter_rows(public_row_format=False)):
if self.filename_provider is not None:
filename = self.filename_provider.get_filename_for_row(
row, ctx.task_idx, block_index, row_index
)
else:
# TODO: Remove this code path once we remove `BlockWritePathProvider`.
filename = (
f"{self.dataset_uuid}_{ctx.task_idx:06}_{block_index:06}_"
f"{row_index:06}.{self.file_format}"
)
filename = self.filename_provider.get_filename_for_row(
row, ctx.task_idx, block_index, row_index
)
write_path = posixpath.join(self.path, filename)

def write_row_to_path():
Expand Down Expand Up @@ -245,21 +236,10 @@ def write_block_to_file(self, block: BlockAccessor, file: "pyarrow.NativeFile"):
raise NotImplementedError

def write_block(self, block: BlockAccessor, block_index: int, ctx: TaskContext):
if self.filename_provider is not None:
filename = self.filename_provider.get_filename_for_block(
block, ctx.task_idx, block_index
)
write_path = posixpath.join(self.path, filename)
else:
# TODO: Remove this code path once we remove `BlockWritePathProvider`.
write_path = self.block_path_provider(
self.path,
filesystem=self.filesystem,
dataset_uuid=self.dataset_uuid,
task_index=ctx.task_idx,
block_index=block_index,
file_format=self.file_format,
)
filename = self.filename_provider.get_filename_for_block(
block, ctx.task_idx, block_index
)
write_path = posixpath.join(self.path, filename)

def write_block_to_path():
with self.open_output_stream(write_path) as file:
Expand Down
22 changes: 0 additions & 22 deletions python/ray/data/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from ray.air.constants import TENSOR_COLUMN_NAME
from ray.air.util.tensor_extensions.arrow import ArrowTensorArray
from ray.data.block import BlockExecStats, BlockMetadata
from ray.data.datasource.block_path_provider import BlockWritePathProvider
from ray.data.tests.mock_server import * # noqa

# Trigger pytest hook to automatically zip test cluster logs to archive dir on failure
Expand Down Expand Up @@ -153,27 +152,6 @@ def local_fs():
yield pa.fs.LocalFileSystem()


@pytest.fixture(scope="function")
def mock_block_write_path_provider():
class MockBlockWritePathProvider(BlockWritePathProvider):
def _get_write_path_for_block(
self,
base_path,
*,
filesystem=None,
dataset_uuid=None,
task_index=None,
block_index=None,
file_format=None,
):
suffix = (
f"{task_index:06}_{block_index:06}_{dataset_uuid}.test.{file_format}"
)
return posixpath.join(base_path, suffix)

yield MockBlockWritePathProvider()


@pytest.fixture(scope="function")
def base_partitioned_df():
yield pd.DataFrame(
Expand Down
51 changes: 0 additions & 51 deletions python/ray/data/tests/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -746,57 +746,6 @@ def test_csv_roundtrip(ray_start_regular_shared, fs, data_path):
BlockAccessor.for_block(ray.get(block)).size_bytes() == meta.size_bytes


@pytest.mark.parametrize(
"fs,data_path,endpoint_url",
[
(None, lazy_fixture("local_path"), None),
(lazy_fixture("local_fs"), lazy_fixture("local_path"), None),
(lazy_fixture("s3_fs"), lazy_fixture("s3_path"), lazy_fixture("s3_server")),
],
)
def test_csv_write_block_path_provider(
# NOTE: This is shutdown_only because this is the last use of the shared local
# cluster; following this, we start a new cluster, so we need to shut this one down.
# If the ordering of these tests change, then this needs to change.
shutdown_only,
fs,
data_path,
endpoint_url,
mock_block_write_path_provider,
):
if endpoint_url is None:
storage_options = {}
else:
storage_options = dict(client_kwargs=dict(endpoint_url=endpoint_url))

# Single block.
df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
ds = ray.data.from_pandas([df1])
ds._set_uuid("data")
ds.write_csv(
data_path, filesystem=fs, block_path_provider=mock_block_write_path_provider
)
file_path = os.path.join(data_path, "000000_000000_data.test.csv")
assert df1.equals(pd.read_csv(file_path, storage_options=storage_options))

# Two blocks.
df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]})
ds = ray.data.from_pandas([df1, df2])
ds._set_uuid("data")
ds.write_csv(
data_path, filesystem=fs, block_path_provider=mock_block_write_path_provider
)
file_path2 = os.path.join(data_path, "000001_000000_data.test.csv")
df = pd.concat([df1, df2])
ds_df = pd.concat(
[
pd.read_csv(file_path, storage_options=storage_options),
pd.read_csv(file_path2, storage_options=storage_options),
]
)
assert df.equals(ds_df)


# NOTE: The last test using the shared ray_start_regular_shared cluster must use the
# shutdown_only fixture so the shared cluster is shut down, otherwise the below
# test_write_datasink_ray_remote_args test, which uses a cluster_utils cluster, will
Expand Down
59 changes: 0 additions & 59 deletions python/ray/data/tests/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,65 +575,6 @@ def test_json_roundtrip(ray_start_regular_shared, fs, data_path):
BlockAccessor.for_block(ray.get(block)).size_bytes() == meta.size_bytes


@pytest.mark.parametrize(
"fs,data_path,endpoint_url",
[
(None, lazy_fixture("local_path"), None),
(lazy_fixture("local_fs"), lazy_fixture("local_path"), None),
(lazy_fixture("s3_fs"), lazy_fixture("s3_path"), lazy_fixture("s3_server")),
],
)
def test_json_write_block_path_provider(
ray_start_regular_shared,
fs,
data_path,
endpoint_url,
mock_block_write_path_provider,
):
if endpoint_url is None:
storage_options = {}
else:
storage_options = dict(client_kwargs=dict(endpoint_url=endpoint_url))

# Single block.
df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
ds = ray.data.from_pandas([df1])
ds._set_uuid("data")
ds.write_json(
data_path, filesystem=fs, block_path_provider=mock_block_write_path_provider
)
file_path = os.path.join(data_path, "000000_000000_data.test.json")
assert df1.equals(
pd.read_json(
file_path, orient="records", lines=True, storage_options=storage_options
)
)

# Two blocks.
df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]})
ds = ray.data.from_pandas([df1, df2])
ds._set_uuid("data")
ds.write_json(
data_path, filesystem=fs, block_path_provider=mock_block_write_path_provider
)
file_path2 = os.path.join(data_path, "000001_000000_data.test.json")
df = pd.concat([df1, df2])
ds_df = pd.concat(
[
pd.read_json(
file_path, orient="records", lines=True, storage_options=storage_options
),
pd.read_json(
file_path2,
orient="records",
lines=True,
storage_options=storage_options,
),
]
)
assert df.equals(ds_df)


@pytest.mark.parametrize(
"fs,data_path,endpoint_url",
[
Expand Down
42 changes: 0 additions & 42 deletions python/ray/data/tests/test_numpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,48 +295,6 @@ def test_numpy_write(ray_start_regular_shared, fs, data_path, endpoint_url):
np.testing.assert_equal(extract_values("data", ds.take(1)), [np.array([0])])


@pytest.mark.parametrize(
"fs,data_path,endpoint_url",
[
(None, lazy_fixture("local_path"), None),
(lazy_fixture("local_fs"), lazy_fixture("local_path"), None),
(lazy_fixture("s3_fs"), lazy_fixture("s3_path"), lazy_fixture("s3_server")),
],
)
def test_numpy_write_block_path_provider(
ray_start_regular_shared,
fs,
data_path,
endpoint_url,
mock_block_write_path_provider,
):
ds = ray.data.range_tensor(10, parallelism=2)
ds._set_uuid("data")
ds.write_numpy(
data_path,
filesystem=fs,
block_path_provider=mock_block_write_path_provider,
column="data",
)
file_path1 = os.path.join(data_path, "000000_000000_data.test.npy")
file_path2 = os.path.join(data_path, "000001_000000_data.test.npy")
if endpoint_url is None:
arr1 = np.load(file_path1)
arr2 = np.load(file_path2)
else:
from s3fs.core import S3FileSystem

s3 = S3FileSystem(client_kwargs={"endpoint_url": endpoint_url})
arr1 = np.load(s3.open(file_path1))
arr2 = np.load(s3.open(file_path2))
assert ds.count() == 10
assert len(arr1) == 5
assert len(arr2) == 5
assert arr1.sum() == 10
assert arr2.sum() == 35
np.testing.assert_equal(extract_values("data", ds.take(1)), [np.array([0])])


@pytest.mark.parametrize("num_rows_per_file", [5, 10, 50])
def test_write_num_rows_per_file(tmp_path, ray_start_regular_shared, num_rows_per_file):
ray.data.range(100, parallelism=20).write_numpy(
Expand Down