Skip to content

Commit

Permalink
[Data] Add example of how to read and write custom file types (#41785)
Browse files Browse the repository at this point in the history
#40127 removed the "Implementing a Custom Datasource" example because it used deprecated APIs. This PR introduces a new example that uses up-to-date APIs.

---------

Signed-off-by: Balaji Veeramani <balaji@anyscale.com>
  • Loading branch information
bveeramani committed Dec 12, 2023
1 parent b7ed648 commit ee10ea6
Show file tree
Hide file tree
Showing 7 changed files with 290 additions and 28 deletions.
18 changes: 3 additions & 15 deletions doc/source/data/api/input_output.rst
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,6 @@ Datasource API
read_datasource
Datasource
ReadTask
datasource.Reader
datasource.FilenameProvider

Datasink API
Expand All @@ -268,8 +267,9 @@ Datasink API

Dataset.write_datasink
Datasink
RowBasedFileDatasink
BlockBasedFileDatasink
datasource.RowBasedFileDatasink
datasource.BlockBasedFileDatasink
datasource.FileBasedDatasource

Partitioning API
----------------
Expand Down Expand Up @@ -299,16 +299,4 @@ MetadataProvider API
datasource.DefaultParquetMetadataProvider
datasource.FastFileMetadataProvider


.. _block_write_path_provider:

BlockWritePathProvider API
--------------------------

.. autosummary::
:nosignatures:
:toctree: doc/

datasource.BlockWritePathProvider
datasource.DefaultBlockWritePathProvider

130 changes: 130 additions & 0 deletions doc/source/data/custom-datasource-example.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
Advanced: Read and Write Custom File Types
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. vale off
.. Ignoring Vale because of future tense.
This guide shows you how to extend Ray Data to read and write file types that aren't
natively supported. This is an advanced guide, and you'll use unstable internal APIs.

.. vale on
Images are already supported with the :func:`~ray.data.read_images`
and :meth:`~ray.data.Dataset.write_images` APIs, but this example shows you how to
implement them for illustrative purposes.

Read data from files
--------------------

.. tip::
If you're not contributing to Ray Data, you don't need to create a
:class:`~ray.data.Datasource`. Instead, you can call
:func:`~ray.data.read_binary_files` and decode files with
:meth:`~ray.data.Dataset.map`.

The core abstraction for reading files is :class:`~ray.data.datasource.FileBasedDatasource`.
It provides file-specific functionality on top of the
:class:`~ray.data.Datasource` interface.

To subclass :class:`~ray.data.datasource.FileBasedDatasource`, implement the constructor
and :meth:`~ray.data.datasource.FileBasedDatasource._read_stream`.

Implement the constructor
=========================

Call the superclass constructor and specify the files you want to read.
Optionally, specify valid file extensions. Ray Data ignores files with other extensions.

.. literalinclude:: doc_code/custom_datasource_example.py
:language: python
:start-after: __datasource_constructor_start__
:end-before: __datasource_constructor_end__

Implement ``_read_stream``
==========================

``_read_stream`` is a generator that yields one or more blocks of data from a file.

`Blocks <https://github.com/ray-project/ray/blob/23d3bfcb9dd97ea666b7b4b389f29b9cc0810121/python/ray/data/block.py#L54>`_
are a Data-internal abstraction for a collection of rows. They can be PyArrow tables,
pandas DataFrames, or dictionaries of NumPy arrays.

Don't create a block directly. Instead, add rows of data to a
`DelegatingBlockBuilder <https://github.com/ray-project/ray/blob/23d3bfcb9dd97ea666b7b4b389f29b9cc0810121/python/ray/data/_internal/delegating_block_builder.py#L10>`_.

.. literalinclude:: doc_code/custom_datasource_example.py
:language: python
:start-after: __read_stream_start__
:end-before: __read_stream_end__

Read your data
==============

Once you've implemented ``ImageDatasource``, call :func:`~ray.data.read_datasource` to
read images into a :class:`~ray.data.Dataset`. Ray Data reads your files in parallel.

.. literalinclude:: doc_code/custom_datasource_example.py
:language: python
:start-after: __read_datasource_start__
:end-before: __read_datasource_end__

Write data to files
-------------------

.. note::
The write interface is under active development and might change in the future. If
you have feature requests,
`open a GitHub Issue <https://github.com/ray-project/ray/issues/new?assignees=&labels=enhancement%2Ctriage&projects=&template=feature-request.yml&title=%5B%3CRay+component%3A+Core%7CRLlib%7Cetc...%3E%5D+>`_.

The core abstractions for writing data to files are :class:`~ray.data.datasource.RowBasedFileDatasink` and
:class:`~ray.data.datasource.BlockBasedFileDatasink`. They provide file-specific functionality on top of the
:class:`~ray.data.Datasink` interface.

If you want to write one row per file, subclass :class:`~ray.data.datasource.RowBasedFileDatasink`.
Otherwise, subclass :class:`~ray.data.datasource.BlockBasedFileDatasink`.

.. vale off
.. Ignoring Vale because of future tense.
In this example, you'll write one image per file, so you'll subclass
:class:`~ray.data.datasource.RowBasedFileDatasink`. To subclass
:class:`~ray.data.datasource.RowBasedFileDatasink`, implement the constructor and
:meth:`~ray.data.datasource.RowBasedFileDatasink.write_row_to_file`.

.. vale on
Implement the constructor
=========================

Call the superclass constructor and specify the folder to write to. Optionally, specify
a string representing the file format (for example, ``"png"``). Ray Data uses the
file format as the file extension.

.. literalinclude:: doc_code/custom_datasource_example.py
:language: python
:start-after: __datasink_constructor_start__
:end-before: __datasink_constructor_end__

Implement ``write_row_to_file``
===============================

``write_row_to_file`` writes a row of data to a file. Each row is a dictionary that maps
column names to values.

.. literalinclude:: doc_code/custom_datasource_example.py
:language: python
:start-after: __write_row_to_file_start__
:end-before: __write_row_to_file_end__

Write your data
===============

Once you've implemented ``ImageDatasink``, call :meth:`~ray.data.Dataset.write_datasink`
to write images to files. Ray Data writes to multiple files in parallel.

.. literalinclude:: doc_code/custom_datasource_example.py
:language: python
:start-after: __write_datasink_start__
:end-before: __write_datasink_end__
79 changes: 79 additions & 0 deletions doc/source/data/doc_code/custom_datasource_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# flake8: noqa
# fmt: off

from typing import Iterator, Union, List

import pyarrow

from ray.data.block import Block

# __datasource_constructor_start__
from ray.data.datasource import FileBasedDatasource

class ImageDatasource(FileBasedDatasource):
def __init__(self, paths: Union[str, List[str]], *, mode: str):
super().__init__(
paths,
file_extensions=["png", "jpg", "jpeg", "bmp", "gif", "tiff"],
)

self.mode = mode # Specify read options in the constructor
# __datasource_constructor_end__

# __read_stream_start__
def _read_stream(self, f: "pyarrow.NativeFile", path: str) -> Iterator[Block]:
import io
import numpy as np
from PIL import Image
from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder

data = f.readall()
image = Image.open(io.BytesIO(data))
image = image.convert(self.mode)

# Each block contains one row
builder = DelegatingBlockBuilder()
array = np.array(image)
item = {"image": array}
builder.add(item)
yield builder.build()
# __read_stream_end__

# __read_datasource_start__
import ray

ds = ray.data.read_datasource(
ImageDatasource("s3://anonymous@ray-example-data/batoidea", mode="RGB")
)
# __read_datasource_end__


from typing import Any, Dict
import pyarrow

# __datasink_constructor_start__
from ray.data.datasource import RowBasedFileDatasink

class ImageDatasink(RowBasedFileDatasink):
def __init__(self, path: str, column: str, file_format: str):
super().__init__(path, file_format=file_format)

self.column = column
self.file_format = file_format # Specify write options in the constructor
# __datasink_constructor_end__

# __write_row_to_file_start__
def write_row_to_file(self, row: Dict[str, Any], file: pyarrow.NativeFile):
import io
from PIL import Image

# PIL can't write to a NativeFile, so we have to write to a buffer first.
image = Image.fromarray(row[self.column])
buffer = io.BytesIO()
image.save(buffer, format=self.file_format)
file.write(buffer.getvalue())
# __write_row_to_file_end__

# __write_datasink_start__
ds.write_datasink(ImageDatasink("/tmp/results", column="image", file_format="png"))
# __write_datasink_end__
1 change: 1 addition & 0 deletions doc/source/data/user-guide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ show you how achieve several tasks.
performance-tips
preprocessors
monitoring-your-workload
custom-datasource-example
9 changes: 0 additions & 9 deletions python/ray/data/datasource/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,6 @@ class Datasource:
"""Interface for defining a custom :class:`~ray.data.Dataset` datasource.
To read a datasource into a dataset, use :meth:`~ray.data.read_datasource`.
To write to a writable datasource, use :meth:`~ray.data.Dataset.write_datasource`.
See ``RangeDatasource`` and ``DummyOutputDatasource`` for examples
of how to implement readable and writable datasources.
.. note::
Datasource instances must be serializable, since
:meth:`~ray.data.Datasource.write` is called in remote tasks.
""" # noqa: E501

@Deprecated
Expand Down
6 changes: 2 additions & 4 deletions python/ray/data/datasource/file_based_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,9 @@ def __repr__(self):

@DeveloperAPI
class FileBasedDatasource(Datasource):
"""File-based datasource, for reading and writing files.
"""File-based datasource for reading files.
This class should not be used directly, and should instead be subclassed
and tailored to particular file formats. Classes deriving from this class
must implement _read_file().
Don't use this class directly. Instead, subclass it and implement `_read_stream()`.
"""

# If `_WRITE_FILE_PER_ROW` is `True`, this datasource calls `_write_row` and writes
Expand Down
75 changes: 75 additions & 0 deletions python/ray/data/datasource/file_datasink.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,21 @@ def __init__(
dataset_uuid: Optional[str] = None,
file_format: Optional[str] = None,
):
"""Initialize this datasink.
Args:
path: The folder to write files to.
filesystem: The filesystem to write files to. If not provided, the
filesystem is inferred from the path.
try_create_dir: Whether to create the directory to write files to.
open_stream_args: Arguments to pass to ``filesystem.open_output_stream``.
filename_provider: A :class:`ray.data.datasource.FilenameProvider` that
generates filenames for each row or block.
dataset_uuid: The UUID of the dataset being written. If specified, it's
included in the filename.
file_format: The file extension. If specified, files are written with this
extension.
"""
if open_stream_args is None:
open_stream_args = {}

Expand Down Expand Up @@ -132,7 +147,44 @@ def supports_distributed_writes(self) -> bool:

@DeveloperAPI
class RowBasedFileDatasink(_FileDatasink):
"""A datasink that writes one row to each file.
Subclasses must implement ``write_row_to_file`` and call the superclass constructor.
Examples:
.. testcode::
import io
import numpy as np
from PIL import Image
from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
from ray.data.datasource import FileBasedDatasource
class ImageDatasource(FileBasedDatasource):
def __init__(self, paths):
super().__init__(
paths,
file_extensions=["png", "jpg", "jpeg", "bmp", "gif", "tiff"],
)
def _read_stream(self, f, path):
data = f.readall()
image = Image.open(io.BytesIO(data))
builder = DelegatingBlockBuilder()
array = np.array(image)
item = {"image": array}
builder.add(item)
yield builder.build()
""" # noqa: E501

def write_row_to_file(self, row: Dict[str, Any], file: "pyarrow.NativeFile"):
"""Write a row to a file.
Args:
row: The row to write.
file: The file to write the row to.
"""
raise NotImplementedError

def _write_row_to_file_with_retry(
Expand Down Expand Up @@ -172,7 +224,30 @@ def write_block(self, block: BlockAccessor, block_index: int, ctx: TaskContext):

@DeveloperAPI
class BlockBasedFileDatasink(_FileDatasink):
"""A datasink that writes multiple rows to each file.
Subclasses must implement ``write_block_to_file`` and call the superclass
constructor.
Examples:
.. testcode::
class CSVDatasink(BlockBasedFileDatasink):
def __init__(self, path: str):
super().__init__(path, file_format="csv")
def write_block_to_file(self, block: BlockAccessor, file: "pyarrow.NativeFile"):
from pyarrow import csv
csv.write_csv(block.to_arrow(), file)
""" # noqa: E501

def write_block_to_file(self, block: BlockAccessor, file: "pyarrow.NativeFile"):
"""Write a block of data to a file.
Args:
block: The block to write.
file: The file to write the block to.
"""
raise NotImplementedError

def _write_block_to_file_with_retry(
Expand Down

0 comments on commit ee10ea6

Please sign in to comment.