Skip to content

Commit

Permalink
[Data] [Docs] Update Loading, Transforming, Inspecting, Iterating Ove…
Browse files Browse the repository at this point in the history
…r, and Saving Data pages (ray-project#44093)

---------

Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <omatthew98@berkeley.edu>
Co-authored-by: angelinalg <122562471+angelinalg@users.noreply.github.com>
  • Loading branch information
2 people authored and stephanie-wang committed Mar 27, 2024
1 parent 0d4c7bd commit 2f2363d
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 116 deletions.
2 changes: 2 additions & 0 deletions doc/source/data/custom-datasource-example.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
.. _custom_datasource:

Advanced: Read and Write Custom File Types
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Expand Down
72 changes: 42 additions & 30 deletions doc/source/data/inspecting-data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ This guide shows you how to:
* `Describe datasets <#describing-datasets>`_
* `Inspect rows <#inspecting-rows>`_
* `Inspect batches <#inspecting-batches>`_
* `Inspect execution statistics <#inspecting-stats>`_
* `Inspect execution statistics <#inspecting-execution-statistics>`_

.. _describing-datasets:

Expand Down Expand Up @@ -151,46 +151,58 @@ Inspecting execution statistics
Ray Data calculates statistics during execution for each operator, such as wall clock time and memory usage.

To view stats about your :class:`Datasets <ray.data.Dataset>`, call :meth:`Dataset.stats() <ray.data.Dataset.stats>` on an executed dataset. The stats are also persisted under `/tmp/ray/session_*/logs/ray-data.log`.
For more on how to read this output, see :ref:`Monitoring Your Workload with the Ray Data Dashboard <monitoring-your-workload>`.

.. testcode::

import ray
import time
import datasets

def f(batch):
return batch

def pause(x):
time.sleep(.0001)
return x
def g(row):
return True

hf_ds = datasets.load_dataset("mnist", "mnist")
ds = (
ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
.map(lambda x: x)
.map(pause)
ray.data.from_huggingface(hf_ds["train"])
.map_batches(f)
.filter(g)
.materialize()
)

for batch in ds.iter_batches():
pass

print(ds.stats())

.. testoutput::
:options: +MOCK

Operator 1 ReadCSV->SplitBlocks(4): 1 tasks executed, 4 blocks produced in 0.22s
* Remote wall time: 222.1ms min, 222.1ms max, 222.1ms mean, 222.1ms total
* Remote cpu time: 15.6ms min, 15.6ms max, 15.6ms mean, 15.6ms total
* Peak heap memory usage (MiB): 157953.12 min, 157953.12 max, 157953 mean
* Output num rows: 150 min, 150 max, 150 mean, 150 total
* Output size bytes: 6000 min, 6000 max, 6000 mean, 6000 total
Operator 1 ReadParquet->SplitBlocks(32): 1 tasks executed, 32 blocks produced in 2.92s
* Remote wall time: 103.38us min, 1.34s max, 42.14ms mean, 1.35s total
* Remote cpu time: 102.0us min, 164.66ms max, 5.37ms mean, 171.72ms total
* UDF time: 0us min, 0us max, 0.0us mean, 0us total
* Peak heap memory usage (MiB): 266375.0 min, 281875.0 max, 274491 mean
* Output num rows per block: 1875 min, 1875 max, 1875 mean, 60000 total
* Output size bytes per block: 537986 min, 555360 max, 545963 mean, 17470820 total
* Output rows per task: 60000 min, 60000 max, 60000 mean, 1 tasks used
* Tasks per node: 1 min, 1 max, 1 mean; 1 nodes used
* Extra metrics: {'obj_store_mem_freed': 5761}

Dataset iterator time breakdown:
* Total time user code is blocked: 5.68ms
* Total time in user code: 0.96us
* Total time overall: 238.93ms
* Num blocks local: 0
* Num blocks remote: 0
* Num blocks unknown location: 1
* Batch iteration time breakdown (summed across prefetch threads):
* In ray.get(): 2.16ms min, 2.16ms max, 2.16ms avg, 2.16ms total
* In batch creation: 897.67us min, 897.67us max, 897.67us avg, 897.67us total
* In batch formatting: 836.87us min, 836.87us max, 836.87us avg, 836.87us total
* Operator throughput:
* Ray Data throughput: 20579.80984833993 rows/s
* Estimated single node throughput: 44492.67361278733 rows/s

Operator 2 MapBatches(f)->Filter(g): 32 tasks executed, 32 blocks produced in 3.63s
* Remote wall time: 675.48ms min, 1.0s max, 797.07ms mean, 25.51s total
* Remote cpu time: 673.41ms min, 897.32ms max, 768.09ms mean, 24.58s total
* UDF time: 661.65ms min, 978.04ms max, 778.13ms mean, 24.9s total
* Peak heap memory usage (MiB): 152281.25 min, 286796.88 max, 164231 mean
* Output num rows per block: 1875 min, 1875 max, 1875 mean, 60000 total
* Output size bytes per block: 530251 min, 547625 max, 538228 mean, 17223300 total
* Output rows per task: 1875 min, 1875 max, 1875 mean, 32 tasks used
* Tasks per node: 32 min, 32 max, 32 mean; 1 nodes used
* Operator throughput:
* Ray Data throughput: 16512.364546087643 rows/s
* Estimated single node throughput: 2352.3683708977856 rows/s

Dataset throughput:
* Ray Data throughput: 11463.372316361854 rows/s
* Estimated single node throughput: 25580.963670075285 rows/s
117 changes: 75 additions & 42 deletions doc/source/data/loading-data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ To read formats other than Parquet, see the :ref:`Input/Output reference <input-
petal.width double
variety string

.. tab-item:: ABL
.. tab-item:: ABS

To read files from Azure Blob Storage, install the
`Filesystem interface to Azure-Datalake Gen1 and Gen2 Storage <https://pypi.org/project/adlfs/>`_
Expand Down Expand Up @@ -454,6 +454,11 @@ Ray Data interoperates with distributed data processing frameworks like
:ref:`Dask <dask-on-ray>`, :ref:`Spark <spark-on-ray>`, :ref:`Modin <modin-on-ray>`, and
:ref:`Mars <mars-on-ray>`.

.. note::

The Ray Community provides these operations but may not actively maintain them. If you run into issues,
create a GitHub issue `here <https://github.com/ray-project/ray/issues>`__.

.. tab-set::

.. tab-item:: Dask
Expand Down Expand Up @@ -573,21 +578,25 @@ Ray Data interoperates with distributed data processing frameworks like
Loading data from ML libraries
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Ray Data interoperates with HuggingFace and TensorFlow datasets.
Ray Data interoperates with HuggingFace, PyTorch, and TensorFlow datasets.

.. tab-set::

.. tab-item:: HuggingFace

To convert a 🤗 Dataset to a Ray Datasets, call
To convert a HuggingFace Dataset to a Ray Datasets, call
:func:`~ray.data.from_huggingface`. This function accesses the underlying Arrow
table and converts it to a Dataset directly.

.. warning::
:class:`~ray.data.from_huggingface` doesn't support parallel
reads. This isn't an issue with in-memory 🤗 Datasets, but may fail with
large memory-mapped 🤗 Datasets. Also, 🤗 ``IterableDataset`` objects aren't
supported.
:class:`~ray.data.from_huggingface` only supports parallel reads in certain
instances, namely for untransformed public HuggingFace Datasets. For those datasets,
Ray Data uses `hosted parquet files <https://huggingface.co/docs/datasets-server/parquet#list-parquet-files>`_
to perform a distributed read; otherwise, Ray Data uses a single node read.
This behavior shouldn't be an issue with in-memory HuggingFace Datasets, but may cause a failure with
large memory-mapped HuggingFace Datasets. Additionally, HuggingFace `DatasetDict <https://huggingface.co/docs/datasets/en/package_reference/main_classes#datasets.DatasetDict>`_ and
`IterableDatasetDict <https://huggingface.co/docs/datasets/en/package_reference/main_classes#datasets.IterableDatasetDict>`_
objects aren't supported.

.. testcode::

Expand All @@ -603,6 +612,31 @@ Ray Data interoperates with HuggingFace and TensorFlow datasets.

[{'text': ''}, {'text': ' = Valkyria Chronicles III = \n'}]

.. tab-item:: PyTorch

To convert a PyTorch dataset to a Ray Dataset, call :func:`~ray.data.from_torch`.

.. testcode::

import ray
from torch.utils.data import Dataset
from torchvision import datasets
from torchvision.transforms import ToTensor

tds = datasets.CIFAR10(root="data", train=True, download=True, transform=ToTensor())
ds = ray.data.from_torch(tds)

print(ds)

.. testoutput::
:options: +MOCK

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to data/cifar-10-python.tar.gz
100%|███████████████████████| 170498071/170498071 [00:07<00:00, 23494838.54it/s]
Extracting data/cifar-10-python.tar.gz to data
Dataset(num_rows=50000, schema={item: object})


.. tab-item:: TensorFlow

To convert a TensorFlow dataset to a Ray Dataset, call :func:`~ray.data.from_tf`.
Expand Down Expand Up @@ -799,45 +833,41 @@ Call :func:`~ray.data.read_sql` to read data from a database that provides a
query="SELECT title, score FROM movie WHERE year >= 1980",
)

.. _reading_bigquery:
.. tab-item:: BigQuery

Reading BigQuery
~~~~~~~~~~~~~~~~
To read from BigQuery, install the
`Python Client for Google BigQuery <https://cloud.google.com/python/docs/reference/bigquery/latest>`_ and the `Python Client for Google BigQueryStorage <https://cloud.google.com/python/docs/reference/bigquerystorage/latest>`_.

To read from BigQuery, install the
`Python Client for Google BigQuery <https://cloud.google.com/python/docs/reference/bigquery/latest>`_ and the `Python Client for Google BigQueryStorage <https://cloud.google.com/python/docs/reference/bigquerystorage/latest>`_.

.. code-block:: console
pip install google-cloud-bigquery
pip install google-cloud-bigquery-storage
.. code-block:: console
To read data from BigQuery, call :func:`~ray.data.read_bigquery` and specify the project id, dataset, and query (if applicable).
pip install google-cloud-bigquery
pip install google-cloud-bigquery-storage
.. testcode::
:skipif: True
To read data from BigQuery, call :func:`~ray.data.read_bigquery` and specify the project id, dataset, and query (if applicable).

import ray
.. testcode::
:skipif: True

# Read the entire dataset (do not specify query)
ds = ray.data.read_bigquery(
project_id="my_gcloud_project_id",
dataset="bigquery-public-data.ml_datasets.iris",
)
import ray

# Read from a SQL query of the dataset (do not specify dataset)
ds = ray.data.read_bigquery(
project_id="my_gcloud_project_id",
query = "SELECT * FROM `bigquery-public-data.ml_datasets.iris` LIMIT 50",
)
# Read the entire dataset. Do not specify query.
ds = ray.data.read_bigquery(
project_id="my_gcloud_project_id",
dataset="bigquery-public-data.ml_datasets.iris",
)

# Write back to BigQuery
ds.write_bigquery(
project_id="my_gcloud_project_id",
dataset="destination_dataset.destination_table",
overwrite_table=True,
)
# Read from a SQL query of the dataset. Do not specify dataset.
ds = ray.data.read_bigquery(
project_id="my_gcloud_project_id",
query = "SELECT * FROM `bigquery-public-data.ml_datasets.iris` LIMIT 50",
)

# Write back to BigQuery
ds.write_bigquery(
project_id="my_gcloud_project_id",
dataset="destination_dataset.destination_table",
overwrite_table=True,
)

.. _reading_mongodb:

Expand Down Expand Up @@ -928,16 +958,19 @@ Loading other datasources

If Ray Data can't load your data, subclass
:class:`~ray.data.Datasource`. Then, construct an instance of your custom
datasource and pass it to :func:`~ray.data.read_datasource`.
datasource and pass it to :func:`~ray.data.read_datasource`. To write results, you might
also need to subclass :class:`ray.data.Datasink`. Then, create an instance of your custom
datasink and pass it to :func:`~ray.data.Dataset.write_datasink`. For more details, see
:ref:`Advanced: Read and Write Custom File Types <custom_datasource>`.

.. testcode::
:skipif: True

# Read from a custom datasource.
ds = ray.data.read_datasource(YourCustomDatasource(), **read_args)
# Write to a custom datasource.
ds.write_datasource(YourCustomDatasource(), **write_args)
# Write to a custom datasink.
ds.write_datasink(YourCustomDatasink())

Performance considerations
==========================
Expand All @@ -950,5 +983,5 @@ utilize the cluster, ranging from ``1...override_num_blocks`` tasks. In other wo
the higher the ``override_num_blocks``, the smaller the data blocks in the Dataset and
hence more opportunities for parallel execution.

For more information on how to tune the number of output blocks, see
:ref:`Tuning output blocks for read <read_output_blocks>`.
For more information on how to tune the number of output blocks and other suggestions
for optimizing read performance, see `Optimizing reads <performance-tips.html#optimizing-reads>`__.
2 changes: 1 addition & 1 deletion doc/source/data/saving-data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ the appropriate scheme. URI can point to buckets or folders.
filesystem = gcsfs.GCSFileSystem(project="my-google-project")
ds.write_parquet("gcs://my-bucket/my-folder", filesystem=filesystem)

.. tab-item:: ABL
.. tab-item:: ABS

To save data to Azure Blob Storage, install the
`Filesystem interface to Azure-Datalake Gen1 and Gen2 Storage <https://pypi.org/project/adlfs/>`_
Expand Down

0 comments on commit 2f2363d

Please sign in to comment.