Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Train] Update docstring and user guides for train_loop_config #43691

Merged
merged 9 commits into from
Mar 8, 2024
2 changes: 1 addition & 1 deletion doc/source/data/working-with-pytorch.rst
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ Ray Data integrates with :ref:`Ray Train <train-docs>` for easy data ingest for
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer

def train_func(config):
def train_func():
model = nn.Sequential(nn.Linear(30, 1), nn.Sigmoid())
loss_fn = torch.nn.BCELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.001)
Expand Down
51 changes: 51 additions & 0 deletions doc/source/train/common/torch-configure-train_func.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
First, update your training code to support distributed training.
Begin by wrapping your code in a :ref:`training function <train-overview-training-function>`:

.. testcode::
:skipif: True

def train_func():
# Your model training code here.
...

Each distributed training worker executes this function.

You can also specify the input argument for `train_func` as a dictionary via the Trainer's `train_loop_config`. For example:

.. testcode:: python
:skipif: True

def train_func(config):
lr = config["lr"]
num_epochs = config["num_epochs"]

config = {"lr": 1e-4, "num_epochs": 10}
trainer = ray.train.torch.TorchTrainer(train_func, train_loop_config=config, ...)

.. warning::

Avoid passing large data objects through `train_loop_config` to reduce the
serialization and deserialization overhead. Instead, it's preferred to
initialize large objects (e.g. datasets, models) directly in `train_func`.

.. code-block:: diff

def load_dataset():
# Return a large in-memory dataset
...

def load_model():
# Return a large in-memory model instance
...

-config = {"data": load_dataset(), "model": load_model()}

def train_func(config):
- data = config["data"]
- model = config["model"]

+ data = load_dataset()
+ model = load_model()
...

trainer = ray.train.torch.TorchTrainer(train_func, train_loop_config=config, ...)
2 changes: 1 addition & 1 deletion doc/source/train/deepspeed.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ You only need to run your existing training code with a TorchTrainer. You can ex
import deepspeed
from deepspeed.accelerator import get_accelerator

def train_func(config):
def train_func():
# Instantiate your model and dataset
model = ...
train_dataset = ...
Expand Down
30 changes: 10 additions & 20 deletions doc/source/train/getting-started-pytorch-lightning.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ For reference, the final code is as follows:
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig

def train_func(config):
Copy link
Member Author

@woshiyyya woshiyyya Mar 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not showing config argument in the first place, since we didn't specify train_loop_config in TorchTrainer in this code snippet. Users will be confused about where to put the train_func arguments.

woshiyyya marked this conversation as resolved.
Show resolved Hide resolved
def train_func():
# Your PyTorch Lightning training code here.

scaling_config = ScalingConfig(num_workers=2, use_gpu=True)
Expand Down Expand Up @@ -128,7 +128,7 @@ Compare a PyTorch Lightning training script with and without Ray Train.
return torch.optim.Adam(self.model.parameters(), lr=0.001)


def train_func(config):
def train_func():
# Data
transform = Compose([ToTensor(), Normalize((0.5,), (0.5,))])
data_dir = os.path.join(tempfile.gettempdir(), "data")
Expand Down Expand Up @@ -179,17 +179,7 @@ Compare a PyTorch Lightning training script with and without Ray Train.
Set up a training function
--------------------------

First, update your training code to support distributed training.
Begin by wrapping your code in a :ref:`training function <train-overview-training-function>`:

.. testcode::
:skipif: True

def train_func(config):
# Your PyTorch Lightning training code here.

Each distributed training worker executes this function.

.. include:: ./common/torch-configure-train_func.rst

Ray Train sets up your distributed process group on each worker. You only need to
make a few changes to your Lightning Trainer definition.
Expand All @@ -201,7 +191,7 @@ make a few changes to your Lightning Trainer definition.
-from pl.plugins.environments import LightningEnvironment
+import ray.train.lightning

def train_func(config):
def train_func():
...
model = MyLightningModule(...)
datamodule = MyLightningDataModule(...)
Expand Down Expand Up @@ -240,7 +230,7 @@ sampler arguments.
-from pl.strategies import DDPStrategy
+import ray.train.lightning

def train_func(config):
def train_func():
...
trainer = pl.Trainer(
...
Expand All @@ -264,7 +254,7 @@ local, global, and node rank and world size.
-from pl.plugins.environments import LightningEnvironment
+import ray.train.lightning

def train_func(config):
def train_func():
...
trainer = pl.Trainer(
...
Expand All @@ -287,7 +277,7 @@ GPUs by setting ``devices="auto"`` and ``acelerator="auto"``.

import lightning.pytorch as pl

def train_func(config):
def train_func():
...
trainer = pl.Trainer(
...
Expand All @@ -312,7 +302,7 @@ To persist your checkpoints and monitor training progress, add a
import lightning.pytorch as pl
from ray.train.lightning import RayTrainReportCallback

def train_func(config):
def train_func():
...
trainer = pl.Trainer(
...
Expand All @@ -338,7 +328,7 @@ your configurations.
import lightning.pytorch as pl
import ray.train.lightning

def train_func(config):
def train_func():
...
trainer = pl.Trainer(...)
+ trainer = ray.train.lightning.prepare_trainer(trainer)
Expand Down Expand Up @@ -458,7 +448,7 @@ control over their native Lightning code.
prepare_trainer
)

def train_func(config):
def train_func():
# [1] Create a Lightning model
model = MyLightningModule(lr=1e-3, feature_dim=128)

Expand Down
22 changes: 6 additions & 16 deletions doc/source/train/getting-started-pytorch.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ For reference, the final code will look something like the following:
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig

def train_func(config):
def train_func():
# Your PyTorch training code here.
...

Expand Down Expand Up @@ -108,7 +108,7 @@ Compare a PyTorch training script with and without Ray Train.

import ray.train.torch

def train_func(config):
def train_func():
# Model, Loss, Optimizer
model = resnet18(num_classes=10)
model.conv1 = torch.nn.Conv2d(
Expand Down Expand Up @@ -183,17 +183,7 @@ Compare a PyTorch training script with and without Ray Train.
Set up a training function
--------------------------

First, update your training code to support distributed training.
Begin by wrapping your code in a :ref:`training function <train-overview-training-function>`:

.. testcode::
:skipif: True

def train_func(config):
# Your PyTorch training code here.
...

Each distributed training worker executes this function.
.. include:: ./common/torch-configure-train_func.rst

Set up a model
^^^^^^^^^^^^^^
Expand All @@ -208,7 +198,7 @@ Use the :func:`ray.train.torch.prepare_model` utility function to:
-from torch.nn.parallel import DistributedDataParallel
+import ray.train.torch

def train_func(config):
def train_func():

...

Expand Down Expand Up @@ -241,7 +231,7 @@ See :ref:`data-ingest-torch`.
from torch.utils.data import DataLoader
+import ray.train.torch

def train_func(config):
def train_func():

...

Expand Down Expand Up @@ -295,7 +285,7 @@ To monitor progress, you can report intermediate metrics and checkpoints using t

+import ray.train

def train_func(config):
def train_func():

...

Expand Down
23 changes: 7 additions & 16 deletions doc/source/train/getting-started-transformers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ For reference, the final code follows:
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig

def train_func(config):
def train_func():
# Your Transformers training code here.

scaling_config = ScalingConfig(num_workers=2, use_gpu=True)
Expand Down Expand Up @@ -122,7 +122,7 @@ Compare a Hugging Face Transformers training script with and without Ray Train.
# [1] Encapsulate data preprocessing, training, and evaluation
# logic in a training function
# ============================================================
def train_func(config):
def train_func():
# Datasets
dataset = load_dataset("yelp_review_full")
tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")
Expand Down Expand Up @@ -203,18 +203,9 @@ Compare a Hugging Face Transformers training script with and without Ray Train.
Set up a training function
--------------------------

First, update your training code to support distributed training.
You can begin by wrapping your code in a :ref:`training function <train-overview-training-function>`:

.. testcode::
:skipif: True

def train_func(config):
# Your Transformers training code here.

This function executes on each distributed training worker. Ray Train sets up the distributed
process group on each worker before entering this function.
.. include:: ./common/torch-configure-train_func.rst

Ray Train sets up the distributed process group on each worker before entering this function.
Put all the logic into this function, including dataset construction and preprocessing,
model initialization, transformers trainer definition and more.

Expand All @@ -237,7 +228,7 @@ To persist your checkpoints and monitor training progress, add a
import transformers
from ray.train.huggingface.transformers import RayTrainReportCallback

def train_func(config):
def train_func():
...
trainer = transformers.Trainer(...)
+ trainer.add_callback(RayTrainReportCallback())
Expand All @@ -261,7 +252,7 @@ your configurations and enable Ray Data Integration.
import transformers
import ray.train.huggingface.transformers

def train_func(config):
def train_func():
...
trainer = transformers.Trainer(...)
+ trainer = ray.train.huggingface.transformers.prepare_trainer(trainer)
Expand Down Expand Up @@ -384,7 +375,7 @@ native Transformers training code.

# [1] Define the full training function
# =====================================
def train_func(config):
def train_func():
MODEL_NAME = "gpt2"
model_config = AutoConfig.from_pretrained(MODEL_NAME)
model = AutoModelForCausalLM.from_config(model_config)
Expand Down
6 changes: 3 additions & 3 deletions doc/source/train/huggingface-accelerate.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ You only need to run your existing training code with a TorchTrainer. You can ex

from accelerate import Accelerator

def train_func(config):
def train_func():
# Instantiate the accelerator
accelerator = Accelerator(...)

Expand Down Expand Up @@ -102,7 +102,7 @@ object in your training function. Below are starter examples for configuring Acc
"wall_clock_breakdown": False
}

def train_func(config):
def train_func():
# Create a DeepSpeedPlugin from config dict
ds_plugin = DeepSpeedPlugin(hf_ds_config=DEEPSPEED_CONFIG)

Expand Down Expand Up @@ -138,7 +138,7 @@ object in your training function. Below are starter examples for configuring Acc
from torch.distributed.fsdp.fully_sharded_data_parallel import FullOptimStateDictConfig, FullStateDictConfig
from accelerate import Accelerator, FullyShardedDataParallelPlugin

def train_func(config):
def train_func():
fsdp_plugin = FullyShardedDataParallelPlugin(
state_dict_config=FullStateDictConfig(
offload_to_cpu=False,
Expand Down
4 changes: 2 additions & 2 deletions doc/source/train/user-guides/data-loading-preprocessing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ Data ingestion can be set up with four basic steps:
train_dataset = train_dataset.map_batches(increment)


def train_func(config):
def train_func():
batch_size = 16

# Step 4: Access the dataset shard for the training worker via
Expand Down Expand Up @@ -141,7 +141,7 @@ Data ingestion can be set up with four basic steps:
train_data = ray.data.from_huggingface(hf_train_ds)
eval_data = ray.data.from_huggingface(hf_eval_ds)

def train_func(config):
def train_func():
# Access Ray datsets in your train_func via ``get_dataset_shard``.
# The "train" dataset gets sharded across workers by default
train_ds = ray.train.get_dataset_shard("train")
Expand Down
12 changes: 6 additions & 6 deletions doc/source/train/user-guides/experiment-tracking.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ inside of Ray Train:
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig

def train_func(config):
def train_func():
# Training code and native experiment tracking library calls go here.

scaling_config = ScalingConfig(num_workers=2, use_gpu=True)
Expand Down Expand Up @@ -56,7 +56,7 @@ The following examples uses Weights & Biases (W&B) and MLflow but it's adaptable
# This ensures that all ray worker processes have `WANDB_API_KEY` set.
ray.init(runtime_env={"env_vars": {"WANDB_API_KEY": "your_api_key"}})

def train_func(config):
def train_func():
# Step 1 and 2
if train.get_context().get_world_rank() == 0:
wandb.init(
Expand Down Expand Up @@ -91,7 +91,7 @@ The following examples uses Weights & Biases (W&B) and MLflow but it's adaptable
# Run the following on the head node:
# $ databricks configure --token
# mv ~/.databrickscfg YOUR_SHARED_STORAGE_PATH
# This function assumes `databricks_config_file` in config
# This function assumes `databricks_config_file` is specified in the Trainer's `train_loop_config`.
def train_func(config):
# Step 1 and 2
os.environ["DATABRICKS_CONFIG_FILE"] = config["databricks_config_file"]
Expand Down Expand Up @@ -123,7 +123,7 @@ The following examples uses Weights & Biases (W&B) and MLflow but it's adaptable
:skipif: True

from ray import train
def train_func(config):
def train_func():
...
if train.get_context().get_world_rank() == 0:
# Add your logging logic only for rank0 worker.
Expand Down Expand Up @@ -245,7 +245,7 @@ Refer to the tracking libraries' documentation for semantics.
..
.. testcode::

def train_func(config):
def train_func():
if ray.train.get_context().get_world_rank() == 0:
wandb.init(..., config={"ray_train_persistent_storage_path": "TODO: fill in when API stablizes"})

Expand All @@ -263,7 +263,7 @@ Refer to the tracking libraries' documentation for semantics.
from ray.train import ScalingConfig, RunConfig, FailureConfig
from ray.train.torch import TorchTrainer

def train_func(config):
def train_func():
if ray.train.get_context().get_world_rank() == 0:
wandb.init(id=ray.train.get_context().get_trial_id())
...
Expand Down