Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIR] Remove head node syncing as the default storage option #37142

Merged
merged 49 commits into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
e46dc89
No-op on sync trial dir + raise error with call to action upon missin…
justinvyu Jul 6, 2023
ec10d0d
Some fixes
justinvyu Jul 6, 2023
6744ddf
Add env var as a tracked constant
justinvyu Jul 6, 2023
d0e639e
Add a warning log message to cover the multi-node + no checkpoints + …
justinvyu Jul 7, 2023
6544a28
Update the test
justinvyu Jul 7, 2023
b792842
Make sure no warning is logged if syncer callback is disabled
justinvyu Jul 7, 2023
d79d0b2
Fix test
justinvyu Jul 7, 2023
837dfcf
Add link to GH issue
justinvyu Jul 7, 2023
0fe99dd
Improve the error message
justinvyu Jul 7, 2023
77c62ab
Enable env var for legacy tests
justinvyu Jul 7, 2023
19b8d9c
Set env var for multinode legacy sync test
justinvyu Jul 7, 2023
b29ac3b
Set env var for release test for head node syncing
justinvyu Jul 7, 2023
ce13f18
Add some sanity checks for multi-node at the start of head node synci…
justinvyu Jul 7, 2023
1e58891
Merge branch 'master' of https://github.com/ray-project/ray into air/…
justinvyu Jul 7, 2023
409a4f3
Add RAY_ prefix
justinvyu Jul 7, 2023
74d5695
Fix session.report call
justinvyu Jul 7, 2023
b811337
Don't show error if syncing is manually disabled
justinvyu Jul 7, 2023
818ee1b
Set env var for multinode test properly
justinvyu Jul 7, 2023
94768f4
Merge branch 'master' of https://github.com/ray-project/ray into air/…
justinvyu Jul 7, 2023
1995b8a
Merge branch 'master' of https://github.com/ray-project/ray into air/…
justinvyu Jul 7, 2023
b489ff6
use fail_fast=raise to catch deprecation error
justinvyu Jul 7, 2023
1a83c83
Fix lint
justinvyu Jul 7, 2023
b621f07
Only log warning once across all trials
justinvyu Jul 7, 2023
0afda7b
env_var=0 -> flag not enabled
justinvyu Jul 7, 2023
ec4467b
use /mnt/cluster_storage for release tests that checkpoint across mul…
justinvyu Jul 8, 2023
25ec545
Update dolly finetuning
justinvyu Jul 8, 2023
84142db
Fix ml rllib connect test
justinvyu Jul 8, 2023
386d93c
Fix lint
justinvyu Jul 8, 2023
da500eb
Fix mmt workspace template
justinvyu Jul 8, 2023
818a6ee
Merge branch 'master' of https://github.com/ray-project/ray into air/…
justinvyu Jul 8, 2023
77f17c3
Fix all rllib release tests
justinvyu Jul 8, 2023
1ec6f5a
Fix default if env var not set
justinvyu Jul 10, 2023
7e62da0
Fix ref in docs
justinvyu Jul 10, 2023
cf31953
Merge branch 'master' of https://github.com/ray-project/ray into air/…
justinvyu Jul 10, 2023
54ab6d3
Mark modified release tests as affected
justinvyu Jul 10, 2023
f572816
Merge branch 'master' of https://github.com/ray-project/ray into air/…
justinvyu Jul 10, 2023
a0c5954
add to schema + change to string for filtering
justinvyu Jul 10, 2023
28d22d8
Fix typo in rllib test launcher
justinvyu Jul 11, 2023
80b57d0
Merge branch 'master' of https://github.com/ray-project/ray into air/…
justinvyu Jul 11, 2023
6ca0839
typo
justinvyu Jul 11, 2023
727b98d
Merge branch 'master' of https://github.com/ray-project/ray into air/…
justinvyu Jul 11, 2023
361dafd
Merge branch 'master' of https://github.com/ray-project/ray into air/…
justinvyu Jul 11, 2023
e0e6eae
update message to say removal in 2.8
justinvyu Jul 11, 2023
a0c85cc
Revert "add to schema + change to string for filtering"
justinvyu Jul 11, 2023
9e9227f
Fix gptj finetuning release test
justinvyu Jul 11, 2023
f1ede3b
Merge branch 'master' of https://github.com/ray-project/ray into air/…
justinvyu Jul 12, 2023
1d94f05
2.8 -> 2.7 in comments
justinvyu Jul 12, 2023
2c1d562
Merge branch 'master' of https://github.com/ray-project/ray into air/…
justinvyu Jul 12, 2023
4e7c93d
tentatively deprecate in 2.7
justinvyu Jul 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
Expand Down Expand Up @@ -321,11 +322,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"## Fine-tune with LightningTrainer\n",
"\n",
"```{note}\n",
"Here we save the checkpoints to the local file system. You can also upload the checkpoints to cloud storage by setting S3 bucket URI to {class}`air.RunConfig(storage_path=S3_BUCKET_URI) <ray.air.RunConfig>`. See {ref}`train-run-config` for an example.\n",
"```"
"## Fine-tune with LightningTrainer"
]
},
{
Expand Down Expand Up @@ -387,6 +384,43 @@
"lightning_config.trainer(callbacks=[DollyV2ProgressBar(num_iters_per_epoch)])"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"```{note}\n",
"Since this example runs with multiple nodes, we need to persist checkpoints\n",
"and other outputs to some external storage for access after training has completed.\n",
"**You should set up cloud storage or NFS, then replace `storage_path` with your own cloud bucket URI or NFS path.**\n",
"\n",
"See the [storage guide](tune-storage-options) for more details.\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"storage_path=\"s3://your-bucket-here\" # TODO: Set up cloud storage\n",
"# storage_path=\"/mnt/path/to/nfs\" # TODO: Alternatively, set up NFS"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": [
"remove-cell"
]
},
"outputs": [],
"source": [
"storage_path = \"/mnt/cluster_storage\""
]
},
{
"cell_type": "code",
"execution_count": 9,
Expand Down Expand Up @@ -921,9 +955,10 @@
"from ray.tune.syncer import SyncConfig\n",
"# Save AIR checkpoints according to the performance on validation set\n",
"run_config = RunConfig(\n",
" storage_path=storage_path,\n",
" name=\"finetune_dolly-v2-7b\",\n",
" checkpoint_config=CheckpointConfig(),\n",
" sync_config=SyncConfig(sync_artifacts=False)\n",
" sync_config=SyncConfig(sync_artifacts=False),\n",
")\n",
"\n",
"# Scale the DDP training workload across 16 GPUs\n",
Expand Down Expand Up @@ -954,6 +989,7 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
Expand Down
9 changes: 7 additions & 2 deletions doc/source/templates/02_many_model_training/start.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
"from statsforecast.models import AutoARIMA, AutoETS, MSTL\n",
"\n",
"from ray import tune\n",
"from ray.air import session\n"
"from ray.air import session, RunConfig\n"
]
},
{
Expand Down Expand Up @@ -265,7 +265,12 @@
"metadata": {},
"outputs": [],
"source": [
"tuner = tune.Tuner(trainable, param_space=param_space)\n",
"tuner = tune.Tuner(\n",
" trainable,\n",
" param_space=param_space,\n",
" # Experiment results are saved to a shared filesystem available to all nodes.\n",
" run_config=RunConfig(storage_path=\"/mnt/cluster_storage\"),\n",
")\n",
"result_grid = tuner.fit()\n"
]
},
Expand Down
6 changes: 6 additions & 0 deletions python/ray/air/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,16 @@
# as Trainable)
DISABLE_LAZY_CHECKPOINTING_ENV = "TRAIN_DISABLE_LAZY_CHECKPOINTING"

# TODO(ml-team): [Deprecation - head node syncing] Remove in 2.7.
justinvyu marked this conversation as resolved.
Show resolved Hide resolved
# Whether or not the sync-to-head behavior is enabled by default.
# If unset, running AIR on a multi-node cluster with checkpointing will raise
# an error telling the user to switch to cloud/NFS.
REENABLE_DEPRECATED_SYNC_TO_HEAD_NODE = "RAY_AIR_REENABLE_DEPRECATED_SYNC_TO_HEAD_NODE"

# NOTE: When adding a new environment variable, please track it in this list.
# TODO(ml-team): Most env var constants should get moved here.
AIR_ENV_VARS = {
REENABLE_DEPRECATED_SYNC_TO_HEAD_NODE,
COPY_DIRECTORY_CHECKPOINTS_INSTEAD_OF_MOVING_ENV,
DISABLE_LAZY_CHECKPOINTING_ENV,
"RAY_AIR_FULL_TRACEBACKS",
Expand Down
5 changes: 3 additions & 2 deletions python/ray/train/examples/pytorch/torch_linear_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import torch.nn as nn
import ray.train as train
from ray.train.torch import TorchTrainer, TorchCheckpoint
from ray.air.config import ScalingConfig
from ray.air.config import RunConfig, ScalingConfig


class LinearDataset(torch.utils.data.Dataset):
Expand Down Expand Up @@ -85,12 +85,13 @@ def train_func(config):
return results


def train_linear(num_workers=2, use_gpu=False, epochs=3):
def train_linear(num_workers=2, use_gpu=False, epochs=3, storage_path=None):
config = {"lr": 1e-2, "hidden_size": 1, "batch_size": 4, "epochs": epochs}
trainer = TorchTrainer(
train_loop_per_worker=train_func,
train_loop_config=config,
scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
run_config=RunConfig(storage_path=storage_path),
)
result = trainer.fit()

Expand Down
8 changes: 6 additions & 2 deletions python/ray/train/examples/tf/tensorflow_mnist_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from ray.train.tensorflow import TensorflowTrainer
from ray.air.integrations.keras import ReportCheckpointCallback
from ray.air.config import ScalingConfig
from ray.air.config import RunConfig, ScalingConfig


def mnist_dataset(batch_size: int) -> tf.data.Dataset:
Expand Down Expand Up @@ -79,13 +79,17 @@ def train_func(config: dict):


def train_tensorflow_mnist(
num_workers: int = 2, use_gpu: bool = False, epochs: int = 4
num_workers: int = 2,
use_gpu: bool = False,
epochs: int = 4,
storage_path: str = None,
) -> Result:
config = {"lr": 1e-3, "batch_size": 64, "epochs": epochs}
trainer = TensorflowTrainer(
train_loop_per_worker=train_func,
train_loop_config=config,
scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
run_config=RunConfig(storage_path=storage_path),
)
results = trainer.fit()
return results
Expand Down
82 changes: 67 additions & 15 deletions python/ray/tune/syncer.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@
delete_at_uri,
is_non_local_path_uri,
)
from ray.air.constants import LAZY_CHECKPOINT_MARKER_FILE, TRAINING_ITERATION
from ray.air.constants import (
LAZY_CHECKPOINT_MARKER_FILE,
REENABLE_DEPRECATED_SYNC_TO_HEAD_NODE,
TRAINING_ITERATION,
)
from ray.exceptions import RayActorError
from ray.tune import TuneError
from ray.tune.callback import Callback
Expand Down Expand Up @@ -75,6 +79,29 @@
f"./{LAZY_CHECKPOINT_MARKER_FILE}",
]

_SYNC_TO_HEAD_DEPRECATION_MESSAGE = (
"Ray AIR no longer supports the synchronization of checkpoints and other "
"artifacts from worker nodes to the head node. This means that the "
"checkpoints and artifacts saved by trials scheduled on worker nodes will not be "
"accessible during the run (e.g., resuming from a checkpoint "
"after a failure) or after the run "
"(e.g., loading the checkpoint of a trial that ran on an already "
"terminated worker node).\n\n"
"To fix this issue, configure AIR to use either:\n"
"(1) Cloud storage: `RunConfig(storage_path='s3://your/bucket')`\n"
"(2) A network filesystem mounted on all nodes: "
"`RunConfig(storage_path='/mnt/path/to/nfs_storage')`\n"
"See this Github issue for more details on transitioning to cloud storage/NFS "
"as well as an explanation on why this functionality is "
"being removed: https://github.com/ray-project/ray/issues/37177\n\n"
"Other temporary workarounds:\n"
"- If you want to avoid errors/warnings and continue running with "
"syncing explicitly turned off, set `RunConfig(SyncConfig(syncer=None))`\n"
"- Or, to re-enable the head node syncing behavior, set the "
f"environment variable {REENABLE_DEPRECATED_SYNC_TO_HEAD_NODE}=1\n"
" - **Note that this functionality will be fully removed in Ray 2.7.**"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
" - **Note that this functionality will be fully removed in Ray 2.7.**"
" - **Note that this functionality will be fully removed in Ray 2.78**"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh no

Suggested change
" - **Note that this functionality will be fully removed in Ray 2.7.**"
" - **Note that this functionality will be fully removed in Ray 2.8.**"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually could we keep these removals in 2.7? cc @pcmoritz

Copy link
Contributor Author

@justinvyu justinvyu Jul 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the REP https://github.com/ray-project/enhancements/pull/35/files, I'm going to update this message to say removal in Ray 2.8. cc: @ericl

)


@PublicAPI
@dataclass
Expand Down Expand Up @@ -790,14 +817,6 @@ def _sync_trial_dir(
if not self._enabled or trial.uses_cloud_checkpointing:
return False

sync_process = self._get_trial_sync_process(trial)

# Always run if force=True
# Otherwise, only run if we should sync (considering sync period)
# and if there is no sync currently still running.
if not force and (not self._should_sync(trial) or sync_process.is_running):
return False

source_ip = self._trial_ips.get(trial.trial_id, None)

if not source_ip:
Expand All @@ -815,6 +834,22 @@ def _sync_trial_dir(

self._trial_ips[trial.trial_id] = source_ip

if not bool(int(os.environ.get(REENABLE_DEPRECATED_SYNC_TO_HEAD_NODE, "0"))):
# Only log a warning for remote trials, since
# this only affects artifacts that are saved on worker nodes.
if source_ip != ray.util.get_node_ip_address():
if log_once("deprecated_head_node_sync"):
logger.warning(_SYNC_TO_HEAD_DEPRECATION_MESSAGE)
return False

sync_process = self._get_trial_sync_process(trial)

# Always run if force=True
# Otherwise, only run if we should sync (considering sync period)
# and if there is no sync currently still running.
if not force and (not self._should_sync(trial) or sync_process.is_running):
return False

try:
sync_process.wait()
except TuneError as e:
Expand Down Expand Up @@ -887,16 +922,33 @@ def on_checkpoint(
checkpoint: _TrackedCheckpoint,
**info,
):
if not self._enabled or trial.uses_cloud_checkpointing:
return

if checkpoint.storage_mode == CheckpointStorage.MEMORY:
return

if self._sync_trial_dir(
trial, force=trial.sync_on_checkpoint, wait=True
) and not os.path.exists(checkpoint.dir_or_data):
raise TuneError(
f"Trial {trial}: Checkpoint path {checkpoint.dir_or_data} not "
"found after successful sync down."
if not bool(int(os.environ.get(REENABLE_DEPRECATED_SYNC_TO_HEAD_NODE, "0"))):
# If we have saved a checkpoint, but it's not accessible on the driver,
# that means that it lives on some other node and would be synced to head
# prior to Ray 2.6.
if not os.path.exists(checkpoint.dir_or_data):
raise DeprecationWarning(_SYNC_TO_HEAD_DEPRECATION_MESSAGE)
# else:
# No need to raise an error about syncing, since the driver can find
# the checkpoint, because either:
# - the checkpoint lives on the head node
# - a shared filesystem is used
else:
# Old head node syncing codepath
synced = self._sync_trial_dir(
trial, force=trial.sync_on_checkpoint, wait=True
)
if synced and not os.path.exists(checkpoint.dir_or_data):
raise TuneError(
f"Trial {trial}: Checkpoint path {checkpoint.dir_or_data} not "
"found after successful sync down."
)

def wait_for_all(self):
# Remove any sync processes as needed, and only wait on the remaining ones.
Expand Down
7 changes: 6 additions & 1 deletion python/ray/tune/tests/test_multinode_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import ray
from ray import tune
from ray.air.config import CheckpointConfig
from ray.air.constants import REENABLE_DEPRECATED_SYNC_TO_HEAD_NODE
from ray.air.util.node import _force_on_node
from ray.autoscaler._private.fake_multi_node.node_provider import FAKE_HEAD_NODE_ID
from ray.autoscaler._private.fake_multi_node.test_utils import DockerCluster
Expand Down Expand Up @@ -206,7 +207,11 @@ def testCheckpointSync(self):
)
# Connect via Ray client and wait until all nodes are there
self.cluster.start()
self.cluster.connect(client=True, timeout=120)
self.cluster.connect(
client=True,
timeout=120,
runtime_env={"env_vars": {REENABLE_DEPRECATED_SYNC_TO_HEAD_NODE: "1"}},
)
self.cluster.wait_for_resources({"CPU": 12})

# This train function trains for 10 iterations per run
Expand Down
Loading