Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] [Docs] Consolidate shuffling-related information into Shuffling Data page #44098

Merged
merged 6 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/source/data/iterating-over-data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ shuffles all rows. If a full global shuffle isn't required, you can shuffle a su
rows up to a provided buffer size during iteration by specifying
``local_shuffle_buffer_size``. While this isn't a true global shuffle like
``random_shuffle``, it's more performant because it doesn't require excessive data
movement.
movement. For more details about these options, see :doc:`Shuffling Data <shuffling-data>`.

.. tip::

Expand Down
130 changes: 0 additions & 130 deletions doc/source/data/performance-tips.rst
Original file line number Diff line number Diff line change
Expand Up @@ -393,136 +393,6 @@ To illustrate these, the following code uses both strategies to coalesce the 10
...
* Output num rows: 10 min, 10 max, 10 mean, 10 total


.. _optimizing_shuffles:

Optimizing shuffles
-------------------

*Shuffle* operations are all-to-all operations where the entire Dataset must be materialized in memory before execution can proceed.
Currently, these are:

* :meth:`Dataset.groupby <ray.data.Dataset.groupby>`
* :meth:`Dataset.random_shuffle <ray.data.Dataset.random_shuffle>`
* :meth:`Dataset.repartition <ray.data.Dataset.repartition>`
* :meth:`Dataset.sort <ray.data.Dataset.sort>`

.. note:: This is an active area of development. If your Dataset uses a shuffle operation and you are having trouble configuring shuffle, `file a Ray Data issue on GitHub`_

When should you use global per-epoch shuffling?
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Use global per-epoch shuffling only if your model is sensitive to the
randomness of the training data. Based on a
`theoretical foundation <https://arxiv.org/abs/1709.10432>`__ all
gradient-descent-based model trainers benefit from improved (global) shuffle quality.
In practice, the benefit is particularly pronounced for tabular data/models.
However, the more global the shuffle is, the more expensive the shuffling operation.
The increase compounds with distributed data-parallel training on a multi-node cluster due
to data transfer costs. This cost can be prohibitive when using very large datasets.

The best route for determining the best tradeoff between preprocessing time and cost and
per-epoch shuffle quality is to measure the precision gain per training step for your
particular model under different shuffling policies:

* no shuffling,
* local (per-shard) limited-memory shuffle buffer,
* local (per-shard) shuffling,
* windowed (pseudo-global) shuffling, and
* fully global shuffling.

As long as your data loading and shuffling throughput is higher than your training throughput, your GPU should
be saturated. If you have shuffle-sensitive models, push the
shuffle quality higher until this threshold is hit.

.. _shuffle_performance_tips:

Enabling push-based shuffle
~~~~~~~~~~~~~~~~~~~~~~~~~~~

Some Dataset operations require a *shuffle* operation, meaning that data is shuffled from all of the input partitions to all of the output partitions.
These operations include :meth:`Dataset.random_shuffle <ray.data.Dataset.random_shuffle>`,
:meth:`Dataset.sort <ray.data.Dataset.sort>` and :meth:`Dataset.groupby <ray.data.Dataset.groupby>`.
Shuffle can be challenging to scale to large data sizes and clusters, especially when the total dataset size can't fit into memory.

Datasets provides an alternative shuffle implementation known as push-based shuffle for improving large-scale performance.
Try this out if your dataset has more than 1000 blocks or is larger than 1 TB in size.

To try this out locally or on a cluster, you can start with the `nightly release test <https://github.com/ray-project/ray/blob/master/release/nightly_tests/dataset/sort.py>`_ that Ray runs for :meth:`Dataset.random_shuffle <ray.data.Dataset.random_shuffle>` and :meth:`Dataset.sort <ray.data.Dataset.sort>`.
To get an idea of the performance you can expect, here are some run time results for :meth:`Dataset.random_shuffle <ray.data.Dataset.random_shuffle>` on 1-10 TB of data on 20 machines (m5.4xlarge instances on AWS EC2, each with 16 vCPUs, 64 GB RAM).

.. image:: https://docs.google.com/spreadsheets/d/e/2PACX-1vQvBWpdxHsW0-loasJsBpdarAixb7rjoo-lTgikghfCeKPQtjQDDo2fY51Yc1B6k_S4bnYEoChmFrH2/pubchart?oid=598567373&format=image
:align: center

To try out push-based shuffle, set the environment variable ``RAY_DATA_PUSH_BASED_SHUFFLE=1`` when running your application:

.. code-block:: bash

$ wget https://raw.githubusercontent.com/ray-project/ray/master/release/nightly_tests/dataset/sort.py
$ RAY_DATA_PUSH_BASED_SHUFFLE=1 python sort.py --num-partitions=10 --partition-size=1e7

# Dataset size: 10 partitions, 0.01GB partition size, 0.1GB total
# [dataset]: Run `pip install tqdm` to enable progress reporting.
# 2022-05-04 17:30:28,806 INFO push_based_shuffle.py:118 -- Using experimental push-based shuffle.
# Finished in 9.571171760559082
# ...

You can also specify the shuffle implementation during program execution by
setting the ``DataContext.use_push_based_shuffle`` flag:

.. testcode::
:hide:

import ray
ray.shutdown()

.. testcode::

import ray

ctx = ray.data.DataContext.get_current()
ctx.use_push_based_shuffle = True

ds = (
ray.data.range(1000)
.random_shuffle()
)

Large-scale shuffles can take a while to finish.
For debugging purposes, shuffle operations support executing only part of the shuffle, so that you can collect an execution profile more quickly.
Here is an example that shows how to limit a random shuffle operation to two output blocks:

.. testcode::
:hide:

import ray
ray.shutdown()

.. testcode::

import ray

ctx = ray.data.DataContext.get_current()
ctx.set_config(
"debug_limit_shuffle_execution_to_num_blocks", 2
)

ds = (
ray.data.range(1000, override_num_blocks=10)
.random_shuffle()
.materialize()
)
print(ds.stats())

.. testoutput::
:options: +MOCK

Operator 1 ReadRange->RandomShuffle: executed in 0.08s

Suboperator 0 ReadRange->RandomShuffleMap: 2/2 blocks executed
...


Configuring execution
---------------------

Expand Down
225 changes: 225 additions & 0 deletions doc/source/data/shuffling-data.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
.. _shuffling_data:

==============
Shuffling Data
==============

When consuming or iterating over Ray :class:`Datasets <ray.data.dataset.Dataset>`, it can be useful to
shuffle or randomize the order of data (for example, randomizing data ingest order during ML training).
This guide shows several different methods of shuffling data with Ray Data and their respective trade-offs.

Types of shuffling
==================

Ray Data provides several different options for shuffling data, trading off the granularity of shuffle
control with memory consumption and runtime. The options below are listed in increasing order of
resource consumption and runtime; choose the most appropriate method for your use case.

.. _shuffling_file_order:

Shuffle the ordering of files
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

To randomly shuffle the ordering of input files before reading, call a function like
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe change to something like "call a read function that supports shuffling e.g. call read images...". Seems a little unclear what "function like read_images` actually means?

:func:`~ray.data.read_images` and specify ``shuffle="files"``. This randomly assigns
input files to workers for reading.

.. testcode::

import ray

ds = ray.data.read_images(
"s3://anonymous@ray-example-data/image-datasets/simple",
shuffle="files",
)

.. tip::
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This didn't really feel like a tip to me. IMO, it might be better to make this regular text here and in the other sections. In general, I think we should try to use admonitions sparingly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved out of tip and into regular text


This is the fastest option for shuffle, and is a purely metadata operation. This
option doesn't shuffle the actual rows inside files, so the randomness might be
poor if each file has many rows.

.. _local_shuffle_buffer:

Local shuffle when iterating over batches
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

To locally shuffle a subset of rows, call a function like :meth:`~ray.data.Dataset.iter_batches`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again maybe be more descriptive than function like. For this case, if there are few enough options (3ish?), maybe just list them.

and specify `local_shuffle_buffer_size`. This shuffles the rows up to a provided buffer
size during iteration. See more details in
:ref:`Iterating over batches with shuffling <iterating-over-batches-with-shuffling>`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we move that information to this page and then have a small reference on that page to this broader shuffle page?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i felt that it was important to keep this information in the iteration page as well, since it can be a pretty core part of iter_batch-like methods for ML training. and there's greater detail about each iter_batch method for torch/tf, which seems out of place to put in this shuffle page. but if others feel the same, we can move it here


.. testcode::

import ray

ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")

for batch in ds.iter_batches(
batch_size=2,
batch_format="numpy",
local_shuffle_buffer_size=250,
):
print(batch)

.. tip::

This is slower than shuffling ordering of files, and shuffles rows locally without
network transfer. This local shuffle buffer can be used together with shuffling
ordering of files; see :ref:`Shuffle the ordering of files <shuffling_file_order>`.

If you observe reduced throughput when using ``local_shuffle_buffer_size``;
one way to diagnose this is to check the total time spent in batch creation by
examining the ``ds.stats()`` output (``In batch formatting``, under
``Batch iteration time breakdown``).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
If you observe reduced throughput when using ``local_shuffle_buffer_size``;
one way to diagnose this is to check the total time spent in batch creation by
examining the ``ds.stats()`` output (``In batch formatting``, under
``Batch iteration time breakdown``).
If you observe reduced throughput when using ``local_shuffle_buffer_size``,
check the total time spent in batch creation by
examining the ``ds.stats()`` output (``In batch formatting``, under
``Batch iteration time breakdown``).


If this time is significantly larger than the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like these sentences should be part of the same paragraph?

Suggested change
``Batch iteration time breakdown``).
If this time is significantly larger than the
``Batch iteration time breakdown``). If this time is significantly larger than the

time spent in other steps, one way to improve performance is to decrease
``local_shuffle_buffer_size`` or turn off the local shuffle buffer altogether and only :ref:`shuffle the ordering of files <shuffling_file_order>`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
time spent in other steps, one way to improve performance is to decrease
``local_shuffle_buffer_size`` or turn off the local shuffle buffer altogether and only :ref:`shuffle the ordering of files <shuffling_file_order>`.
time spent in other steps, decrease
``local_shuffle_buffer_size`` or turn off the local shuffle buffer altogether and only :ref:`shuffle the ordering of files <shuffling_file_order>`.


Shuffle all rows
~~~~~~~~~~~~~~~~

To randomly shuffle all rows globally, call :meth:`~ray.data.Dataset.random_shuffle`.

.. testcode::

import ray

ds = (
ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
.random_shuffle()
)

.. tip::

This is the slowest option for shuffle, and requires transferring data across
network between workers. This option achieves the best randomness among all options.

.. _optimizing_shuffles:

Advanced: Optimizing shuffles
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might just be a formatting thing, but should this be a subheading or a top level heading? Actually seems like all of the subsections are subsections of "Types of shuffling", is that intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, made this a new subheading outside of Types of shuffling, and titles below are subtitled under the new Advanced: Optimizing shuffles section.

-----------------------------

Shuffle operations are *all-to-all* operations where the entire Dataset must be materialized in memory before execution can proceed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section seems to not be super relevant to shuffling? Is the idea that these optimization might also apply to other all-to-all operations? The "these" in the below line is also unclear. I would have thought it was talking about shuffle operations but think it is talking about all to all operations?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe move some of this to the "Enabling push-based shuffle" below which seems related?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 seemed slightly out of place to me. Wonder if we should just remove this section?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed this section (but kept the note), since the content is also discussed under the Enabling push-based shuffle subsection.

Currently, these are:

* :meth:`Dataset.groupby <ray.data.Dataset.groupby>`
* :meth:`Dataset.random_shuffle <ray.data.Dataset.random_shuffle>`
* :meth:`Dataset.sort <ray.data.Dataset.sort>`

.. note:: This is an active area of development. If your Dataset uses a shuffle operation and you are having trouble configuring shuffle,
`file a Ray Data issue on GitHub <https://github.com/ray-project/ray/issues/new?assignees=&labels=bug%2Ctriage%2Cdata&projects=&template=bug-report.yml&title=[data]+>`_.

When should you use global per-epoch shuffling?
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Use global per-epoch shuffling only if your model is sensitive to the
randomness of the training data. Based on a
`theoretical foundation <https://arxiv.org/abs/1709.10432>`__ all
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
randomness of the training data. Based on a
`theoretical foundation <https://arxiv.org/abs/1709.10432>`__ all
randomness of the training data. Based on a
`theoretical foundation <https://arxiv.org/abs/1709.10432>`__ , all

gradient-descent-based model trainers benefit from improved (global) shuffle quality.
In practice, the benefit is particularly pronounced for tabular data/models.
However, the more global the shuffle is, the more expensive the shuffling operation.
The increase compounds with distributed data-parallel training on a multi-node cluster due
to data transfer costs. This cost can be prohibitive when using very large datasets.

The best route for determining the best tradeoff between preprocessing time and cost and
per-epoch shuffle quality is to measure the precision gain per training step for your
particular model under different shuffling policies:

* no shuffling,
* local (per-shard) limited-memory shuffle buffer,
* local (per-shard) shuffling,
* windowed (pseudo-global) shuffling, and
* fully global shuffling.

As long as your data loading and shuffling throughput is higher than your training throughput, your GPU should
be saturated. If you have shuffle-sensitive models, push the
shuffle quality higher until this threshold is hit.

.. _shuffle_performance_tips:

Enabling push-based shuffle
~~~~~~~~~~~~~~~~~~~~~~~~~~~

Some Dataset operations require a *shuffle* operation, meaning that data is shuffled from all of the input partitions to all of the output partitions.
These operations include :meth:`Dataset.random_shuffle <ray.data.Dataset.random_shuffle>`,
:meth:`Dataset.sort <ray.data.Dataset.sort>` and :meth:`Dataset.groupby <ray.data.Dataset.groupby>`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not super intuitive why these operations require a shuffle, if possible maybe add a quick sentence explaining?

Shuffle can be challenging to scale to large data sizes and clusters, especially when the total dataset size can't fit into memory.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Shuffle can be challenging to scale to large data sizes and clusters, especially when the total dataset size can't fit into memory.
Shuffling can be challenging to scale to large data sizes and clusters, especially when the total dataset size can't fit into memory.


Datasets provides an alternative shuffle implementation known as push-based shuffle for improving large-scale performance.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think this is outdated verbage?

Suggested change
Datasets provides an alternative shuffle implementation known as push-based shuffle for improving large-scale performance.
Ray Data provides an alternative shuffle implementation known as push-based shuffle for improving large-scale performance.

Try this out if your dataset has more than 1000 blocks or is larger than 1 TB in size.

To try this out locally or on a cluster, you can start with the `nightly release test <https://github.com/ray-project/ray/blob/master/release/nightly_tests/dataset/sort.py>`_ that Ray runs for :meth:`Dataset.random_shuffle <ray.data.Dataset.random_shuffle>` and :meth:`Dataset.sort <ray.data.Dataset.sort>`.
To get an idea of the performance you can expect, here are some run time results for :meth:`Dataset.random_shuffle <ray.data.Dataset.random_shuffle>` on 1-10 TB of data on 20 machines (m5.4xlarge instances on AWS EC2, each with 16 vCPUs, 64 GB RAM).

.. image:: https://docs.google.com/spreadsheets/d/e/2PACX-1vQvBWpdxHsW0-loasJsBpdarAixb7rjoo-lTgikghfCeKPQtjQDDo2fY51Yc1B6k_S4bnYEoChmFrH2/pubchart?oid=598567373&format=image
:align: center

To try out push-based shuffle, set the environment variable ``RAY_DATA_PUSH_BASED_SHUFFLE=1`` when running your application:

.. code-block:: bash

$ wget https://raw.githubusercontent.com/ray-project/ray/master/release/nightly_tests/dataset/sort.py
$ RAY_DATA_PUSH_BASED_SHUFFLE=1 python sort.py --num-partitions=10 --partition-size=1e7

# Dataset size: 10 partitions, 0.01GB partition size, 0.1GB total
# [dataset]: Run `pip install tqdm` to enable progress reporting.
# 2022-05-04 17:30:28,806 INFO push_based_shuffle.py:118 -- Using experimental push-based shuffle.
# Finished in 9.571171760559082
# ...

You can also specify the shuffle implementation during program execution by
setting the ``DataContext.use_push_based_shuffle`` flag:

.. testcode::
:hide:

import ray
ray.shutdown()

.. testcode::

import ray

ctx = ray.data.DataContext.get_current()
ctx.use_push_based_shuffle = True

ds = (
ray.data.range(1000)
.random_shuffle()
)

Large-scale shuffles can take a while to finish.
For debugging purposes, shuffle operations support executing only part of the shuffle, so that you can collect an execution profile more quickly.
Here is an example that shows how to limit a random shuffle operation to two output blocks:

.. testcode::
:hide:

import ray
ray.shutdown()

.. testcode::

import ray

ctx = ray.data.DataContext.get_current()
ctx.set_config(
"debug_limit_shuffle_execution_to_num_blocks", 2
)

ds = (
ray.data.range(1000, override_num_blocks=10)
.random_shuffle()
.materialize()
)
print(ds.stats())

.. testoutput::
:options: +MOCK

Operator 1 ReadRange->RandomShuffle: executed in 0.08s

Suboperator 0 ReadRange->RandomShuffleMap: 2/2 blocks executed
...