Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cherry-pick][doc][train] Clarify error message when trying to use local storage for multi-node distributed training and checkpointing #41844

Merged
merged 1 commit into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions doc/source/train/common/torch-configure-run.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
Configure scale and GPUs
------------------------

Outside of your training function, create a :class:`~ray.train.ScalingConfig` object to configure:

1. :class:`num_workers <ray.train.ScalingConfig>` - The number of distributed training worker processes.
2. :class:`use_gpu <ray.train.ScalingConfig>` - Whether each worker should use a GPU (or CPU).

.. testcode::

from ray.train import ScalingConfig
scaling_config = ScalingConfig(num_workers=2, use_gpu=True)


For more details, see :ref:`train_scaling_config`.

Configure persistent storage
----------------------------

Create a :class:`~ray.train.RunConfig` object to specify the path where results
(including checkpoints and artifacts) will be saved.

.. testcode::

from ray.train import RunConfig

# Local path (/some/local/path/unique_run_name)
run_config = RunConfig(storage_path="/some/local/path", name="unique_run_name")

# Shared cloud storage URI (s3://bucket/unique_run_name)
run_config = RunConfig(storage_path="s3://bucket", name="unique_run_name")

# Shared NFS path (/mnt/nfs/unique_run_name)
run_config = RunConfig(storage_path="/mnt/nfs", name="unique_run_name")


.. warning::

Specifying a *shared storage location* (such as cloud storage or NFS) is
*optional* for single-node clusters, but it is **required for multi-node clusters.**
Using a local path will :ref:`raise an error <multinode-local-storage-warning>`
during checkpointing for multi-node clusters.


For more details, see :ref:`persistent-storage-guide`.


Launch a training job
---------------------

Tying this all together, you can now launch a distributed training job
with a :class:`~ray.train.torch.TorchTrainer`.

.. testcode::
:hide:

from ray.train import ScalingConfig

train_func = lambda: None
scaling_config = ScalingConfig(num_workers=1)
run_config = None

.. testcode::

from ray.train.torch import TorchTrainer

trainer = TorchTrainer(
train_func, scaling_config=scaling_config, run_config=run_config
)
result = trainer.fit()


Access training results
-----------------------

After training completes, a :class:`~ray.train.Result` object is returned which contains
information about the training run, including the metrics and checkpoints reported during training.

.. testcode::

result.metrics # The metrics reported during training.
result.checkpoint # The latest checkpoint reported during training.
result.path # The path where logs are stored.
result.error # The exception that was raised, if training failed.

For more usage examples, see :ref:`train-inspect-results`.
8 changes: 6 additions & 2 deletions doc/source/train/deepspeed.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,13 @@ You only need to run your existing training code with a TorchTrainer. You can ex
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(...),
# If running in a multi-node cluster, this is where you
# should configure the run's persistent storage that is accessible
# across all worker nodes.
# run_config=ray.train.RunConfig(storage_path="s3://..."),
...
)
trainer.fit()
result = trainer.fit()


Below is a simple example of ZeRO-3 training with DeepSpeed only.
Expand Down Expand Up @@ -74,7 +78,7 @@ Below is a simple example of ZeRO-3 training with DeepSpeed only.
.. tip::

To run DeepSpeed with pure PyTorch, you **don't need to** provide any additional Ray Train utilities
like :meth:`~ray.train.torch.prepare_model` or :meth:`~ray.train.torch.prepare_data_loader` in your training funciton. Instead,
like :meth:`~ray.train.torch.prepare_model` or :meth:`~ray.train.torch.prepare_data_loader` in your training function. Instead,
keep using `deepspeed.initialize() <https://deepspeed.readthedocs.io/en/latest/initialize.html>`_ as usual to prepare everything
for distributed training.

Expand Down
15 changes: 15 additions & 0 deletions doc/source/train/distributed-xgboost-lightgbm.rst
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,21 @@ Following are some examples of common use-cases:
automatically.


.. warning::

Specifying a *shared storage location* (such as cloud storage or NFS) is
*optional* for single-node clusters, but it is **required for multi-node clusters.**
Using a local path will :ref:`raise an error <multinode-local-storage-warning>`
during checkpointing for multi-node clusters.

.. testcode:: python
:skipif: True

trainer = XGBoostTrainer(
..., run_config=ray.train.RunConfig(storage_path="s3://...")
)


How many remote actors should you use?
--------------------------------------

Expand Down
5 changes: 4 additions & 1 deletion doc/source/train/doc_code/checkpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,12 +417,15 @@ def train_func(config):

from ray.train import Checkpoint

# Create a sample locally available checkpoint
# For demonstration, create a locally available directory with a `model.pt` file.
example_checkpoint_dir = Path("/tmp/test-checkpoint")
example_checkpoint_dir.mkdir()
example_checkpoint_dir.joinpath("model.pt").touch()

# Create the checkpoint, which is a reference to the directory.
checkpoint = Checkpoint.from_directory(example_checkpoint_dir)

# Inspect the checkpoint's contents with either `as_directory` or `to_directory`:
with checkpoint.as_directory() as checkpoint_dir:
assert Path(checkpoint_dir).joinpath("model.pt").exists()

Expand Down
20 changes: 15 additions & 5 deletions doc/source/train/doc_code/gbdt_user_guide.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,23 @@
scaling_config=ScalingConfig(
# Number of workers to use for data parallelism.
num_workers=2,
# Whether to use GPU acceleration.
# Whether to use GPU acceleration. Set to True to schedule GPU workers.
use_gpu=False,
),
label_column="target",
num_boost_round=20,
params={
# XGBoost specific params
# XGBoost specific params (see the `xgboost.train` API reference)
"objective": "binary:logistic",
# "tree_method": "gpu_hist", # uncomment this to use GPU for training
# uncomment this and set `use_gpu=True` to use GPU for training
# "tree_method": "gpu_hist",
"eval_metric": ["logloss", "error"],
},
datasets={"train": train_dataset, "valid": valid_dataset},
# If running in a multi-node cluster, this is where you
# should configure the run's persistent storage that is accessible
# across all worker nodes.
# run_config=ray.train.RunConfig(storage_path="s3://..."),
)
result = trainer.fit()
print(result.metrics)
Expand Down Expand Up @@ -64,7 +69,8 @@
params={
# XGBoost specific params
"objective": "binary:logistic",
# "tree_method": "gpu_hist", # uncomment this to use GPU for training
# uncomment this and set `use_gpu=True` to use GPU for training
# "tree_method": "gpu_hist",
"eval_metric": ["logloss", "error"],
},
datasets={"train": train_dataset, "valid": valid_dataset},
Expand Down Expand Up @@ -92,7 +98,7 @@
scaling_config=ScalingConfig(
# Number of workers to use for data parallelism.
num_workers=2,
# Whether to use GPU acceleration.
# Whether to use GPU acceleration. Set to True to schedule GPU workers.
use_gpu=False,
),
label_column="target",
Expand All @@ -103,6 +109,10 @@
"metric": ["binary_logloss", "binary_error"],
},
datasets={"train": train_dataset, "valid": valid_dataset},
# If running in a multi-node cluster, this is where you
# should configure the run's persistent storage that is accessible
# across all worker nodes.
# run_config=ray.train.RunConfig(storage_path="s3://..."),
)
result = trainer.fit()
print(result.metrics)
Expand Down
12 changes: 9 additions & 3 deletions doc/source/train/doc_code/key_concepts.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,22 @@ def train_fn(config):
print("Loss", metrics["loss"], "checkpoint", checkpoint)

# Get checkpoint with minimal loss
best_checkpoint = min(result.best_checkpoints, key=lambda bc: bc[1]["loss"])[0]
best_checkpoint = min(
result.best_checkpoints, key=lambda checkpoint: checkpoint[1]["loss"]
)[0]

with best_checkpoint.as_directory() as tmpdir:
# Load model from directory
...
# __result_best_checkpoint_end__

import pyarrow

# __result_path_start__
result_path = result.path
print("Results location", result_path)
result_path: str = result.path
result_filesystem: pyarrow.fs.FileSystem = result.filesystem

print("Results location (fs, path) = ({result_filesystem}, {result_path})")
# __result_path_end__


Expand Down
56 changes: 3 additions & 53 deletions doc/source/train/getting-started-pytorch-lightning.rst
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ Compare a PyTorch Lightning training script with and without Ray Train.
train_func,
scaling_config=scaling_config,
# [3a] If running in a multi-node cluster, this is where you
# should configure the run's persistent storage.
# should configure the run's persistent storage that is accessible
# across all worker nodes.
# run_config=ray.train.RunConfig(storage_path="s3://..."),
)
result: ray.train.Result = trainer.fit()
Expand Down Expand Up @@ -344,59 +345,8 @@ your configurations.
...


Configure scale and GPUs
------------------------
.. include:: ./common/torch-configure-run.rst

Outside of your training function, create a :class:`~ray.train.ScalingConfig` object to configure:

1. `num_workers` - The number of distributed training worker processes.
2. `use_gpu` - Whether each worker should use a GPU (or CPU).

.. testcode::

from ray.train import ScalingConfig
scaling_config = ScalingConfig(num_workers=2, use_gpu=True)


For more details, see :ref:`train_scaling_config`.

Launch a training job
---------------------

Tying this all together, you can now launch a distributed training job
with a :class:`~ray.train.torch.TorchTrainer`.

.. testcode::
:hide:

from ray.train import ScalingConfig

train_func = lambda: None
scaling_config = ScalingConfig(num_workers=1)

.. testcode::

from ray.train.torch import TorchTrainer

trainer = TorchTrainer(train_func, scaling_config=scaling_config)
result = trainer.fit()

See :ref:`train-run-config` for more configuration options for `TorchTrainer`.

Access training results
-----------------------

After training completes, Ray Train returns a :class:`~ray.train.Result` object, which contains
information about the training run, including the metrics and checkpoints reported during training.

.. testcode::

result.metrics # The metrics reported during training.
result.checkpoint # The latest checkpoint reported during training.
result.path # The path where logs are stored.
result.error # The exception that was raised, if training failed.

.. TODO: Add results guide

Next steps
----------
Expand Down
57 changes: 5 additions & 52 deletions doc/source/train/getting-started-pytorch.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Learn how to:
Quickstart
----------

For reference, the final code is as follows:
For reference, the final code will look something like the following:

.. testcode::
:skipif: True
Expand All @@ -26,6 +26,7 @@ For reference, the final code is as follows:

def train_func(config):
# Your PyTorch training code here.
...

scaling_config = ScalingConfig(num_workers=2, use_gpu=True)
trainer = TorchTrainer(train_func, scaling_config=scaling_config)
Expand Down Expand Up @@ -160,7 +161,8 @@ Compare a PyTorch training script with and without Ray Train.
train_func,
scaling_config=scaling_config,
# [5a] If running in a multi-node cluster, this is where you
# should configure the run's persistent storage.
# should configure the run's persistent storage that is accessible
# across all worker nodes.
# run_config=ray.train.RunConfig(storage_path="s3://..."),
)
result = trainer.fit()
Expand Down Expand Up @@ -285,57 +287,8 @@ To monitor progress, you can report intermediate metrics and checkpoints using t
For more details, see :ref:`train-monitoring-and-logging` and :ref:`train-checkpointing`.


Configure scale and GPUs
------------------------
.. include:: ./common/torch-configure-run.rst

Outside of your training function, create a :class:`~ray.train.ScalingConfig` object to configure:

1. :class:`num_workers <ray.train.ScalingConfig>` - The number of distributed training worker processes.
2. :class:`use_gpu <ray.train.ScalingConfig>` - Whether each worker should use a GPU (or CPU).

.. testcode::

from ray.train import ScalingConfig
scaling_config = ScalingConfig(num_workers=2, use_gpu=True)


For more details, see :ref:`train_scaling_config`.

Launch a training job
---------------------

Tying this all together, you can now launch a distributed training job
with a :class:`~ray.train.torch.TorchTrainer`.

.. testcode::
:hide:

from ray.train import ScalingConfig

train_func = lambda: None
scaling_config = ScalingConfig(num_workers=1)

.. testcode::

from ray.train.torch import TorchTrainer

trainer = TorchTrainer(train_func, scaling_config=scaling_config)
result = trainer.fit()

Access training results
-----------------------

After training completes, a :class:`~ray.train.Result` object is returned which contains
information about the training run, including the metrics and checkpoints reported during training.

.. testcode::

result.metrics # The metrics reported during training.
result.checkpoint # The latest checkpoint reported during training.
result.path # The path where logs are stored.
result.error # The exception that was raised, if training failed.

.. TODO: Add results guide

Next steps
----------
Expand Down
Loading
Loading