Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[train] New persistence mode cleanup: Remainders in ray.air/ray.train #40168

Merged
merged 9 commits into from
Oct 6, 2023
60 changes: 0 additions & 60 deletions python/ray/air/config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
import copy
import logging
import os
import warnings
from collections import defaultdict
from dataclasses import _MISSING_TYPE, dataclass, fields
from pathlib import Path
Expand All @@ -19,7 +16,6 @@

import pyarrow.fs

from ray._private.storage import _get_storage_uri
from ray._private.thirdparty.tabulate.tabulate import tabulate
from ray.util.annotations import PublicAPI, Deprecated
from ray.widgets import Template, make_table_html_repr
Expand Down Expand Up @@ -657,62 +653,6 @@ def __post_init__(self):
if isinstance(self.storage_path, Path):
self.storage_path = self.storage_path.as_posix()

# TODO(justinvyu): [code_removal] Legacy stuff below.
from ray.tune.utils.util import _resolve_storage_path
from ray.train._internal.storage import _use_storage_context
from ray.train._internal.syncer import Syncer

if _use_storage_context():
return

local_path, remote_path = _resolve_storage_path(
self.storage_path, self.local_dir, self.sync_config.upload_dir
)

if self.sync_config.upload_dir:
assert remote_path == self.sync_config.upload_dir
warnings.warn(
"Setting a `SyncConfig.upload_dir` is deprecated and will be removed "
"in the future. Pass `RunConfig.storage_path` instead."
)
# Set upload_dir to None to avoid further downstream resolution.
# Copy object first to not alter user input.
self.sync_config = copy.copy(self.sync_config)
self.sync_config.upload_dir = None

if self.local_dir:
assert local_path == self.local_dir
warnings.warn(
"Setting a `RunConfig.local_dir` is deprecated and will be removed "
"in the future. If you are not using remote storage,"
"set the `RunConfig.storage_path` instead. Otherwise, set the"
"`RAY_AIR_LOCAL_CACHE_DIR` environment variable to control "
"the local cache location."
)
self.local_dir = None

if not remote_path:
remote_path = _get_storage_uri()
if remote_path:
logger.info(
"Using configured Ray storage URI as storage path: "
f"{remote_path}"
)

if remote_path:
self.storage_path = remote_path
if local_path:
# If storage_path is a remote path set by SyncConfig.upload_dir,
# this may not have been set in the previous if clause.
os.environ["RAY_AIR_LOCAL_CACHE_DIR"] = local_path
elif local_path:
self.storage_path = local_path

if isinstance(self.sync_config.syncer, Syncer) and not remote_path:
raise ValueError(
"Must specify a remote `storage_path` to use a custom `syncer`."
)

def __repr__(self):
from ray.train import SyncConfig

Expand Down
14 changes: 5 additions & 9 deletions python/ray/air/integrations/comet.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

import pyarrow.fs

from ray.train._internal.storage import _use_storage_context
from ray.tune.logger import LoggerCallback
from ray.tune.experiment import Trial
from ray.tune.utils import flatten_dict
Expand Down Expand Up @@ -228,14 +227,11 @@ def log_trial_save(self, trial: "Trial"):

checkpoint_root = None

if _use_storage_context():
if isinstance(trial.checkpoint.filesystem, pyarrow.fs.LocalFileSystem):
checkpoint_root = trial.checkpoint.path
# Todo: For other filesystems, we may want to use
# artifact.add_remote() instead. However, this requires a full
# URI. We can add this once we have a way to retrieve it.
else:
checkpoint_root = trial.checkpoint.dir_or_data
if isinstance(trial.checkpoint.filesystem, pyarrow.fs.LocalFileSystem):
checkpoint_root = trial.checkpoint.path
# Todo: For other filesystems, we may want to use
# artifact.add_remote() instead. However, this requires a full
# URI. We can add this once we have a way to retrieve it.

# Walk through checkpoint directory and add all files to artifact
if checkpoint_root:
Expand Down
8 changes: 2 additions & 6 deletions python/ray/air/integrations/wandb.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from ray.air._internal import usage as air_usage
from ray.air.util.node import _force_on_current_node

from ray.train._internal.storage import _use_storage_context
from ray.tune.logger import LoggerCallback
from ray.tune.utils import flatten_dict
from ray.tune.experiment import Trial
Expand Down Expand Up @@ -675,11 +674,8 @@ def log_trial_result(self, iteration: int, trial: "Trial", result: Dict):
def log_trial_save(self, trial: "Trial"):
if self.upload_checkpoints and trial.checkpoint:
checkpoint_root = None
if _use_storage_context():
if isinstance(trial.checkpoint.filesystem, pyarrow.fs.LocalFileSystem):
checkpoint_root = trial.checkpoint.path
else:
checkpoint_root = trial.checkpoint.dir_or_data
if isinstance(trial.checkpoint.filesystem, pyarrow.fs.LocalFileSystem):
checkpoint_root = trial.checkpoint.path

if checkpoint_root:
self._trial_queues[trial].put((_QueueItem.CHECKPOINT, checkpoint_root))
Expand Down
9 changes: 2 additions & 7 deletions python/ray/train/_internal/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from ray.data import Dataset, DatasetPipeline
from ray.train import Checkpoint
from ray.train._internal.accelerator import Accelerator
from ray.train._internal.storage import StorageContext, _use_storage_context
from ray.train._internal.storage import StorageContext
from ray.train.constants import (
CHECKPOINT_DIR_NAME,
DETAILED_AUTOFILLED_KEYS,
Expand Down Expand Up @@ -148,10 +148,6 @@ def __init__(
self.local_world_size = local_world_size
self.world_size = world_size

# Checkpoint configurations
# Only used if checkpoint_upload_from_workers is True.
self.legacy_checkpoint_uri = None

assert storage
logger.debug(f"StorageContext on SESSION (rank={world_rank}):\n{storage}")

Expand Down Expand Up @@ -245,8 +241,7 @@ def finish(self, timeout: Optional[float] = None):
self.continue_lock.release()

# Force a final (blocking) sync of artifacts in the trial path to storage.
if _use_storage_context():
self.storage.persist_artifacts(force=True)
self.storage.persist_artifacts(force=True)

# Wait for training to finish.
# This will raise any errors that occur during training, including
Expand Down
101 changes: 7 additions & 94 deletions python/ray/train/_internal/syncer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,6 @@
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

from ray._private.thirdparty.tabulate.tabulate import tabulate
from ray.air._internal.remote_storage import (
delete_at_uri,
download_from_uri,
fs_hint,
is_non_local_path_uri,
upload_to_uri,
)
from ray.train.constants import _DEPRECATED_VALUE
from ray.util import log_once
from ray.util.annotations import DeveloperAPI, PublicAPI
Expand All @@ -34,21 +27,20 @@
class SyncConfig:
"""Configuration object for Train/Tune file syncing to `RunConfig(storage_path)`.

See :ref:`tune-persisted-experiment-data` for an overview of what data is
synchronized.

In Ray Train/Tune, here is where syncing (mainly uploading) happens:

The experiment driver (on the head node) syncs the experiment directory to storage
(which includes experiment state such as searcher state, the list of trials
and their statuses, and trial metadata).

It's also possible to sync artifacts from the trial directory to storage
by setting `sync_artifacts=True`.
For a Ray Tune run with many trials, each trial will upload its trial directory
to storage, which includes arbitrary files that you dumped during the run.
For a Ray Train run doing distributed training, each remote worker will similarly
upload its trial directory to storage.

See :ref:`tune-storage-options` for more details and examples.
See :ref:`persistent-storage-guide` for more details and examples.

Args:
sync_period: Minimum time in seconds to wait between two sync operations.
Expand Down Expand Up @@ -91,28 +83,17 @@ def _deprecation_warning(self, attr_name: str, extra_msg: str):
def __post_init__(self):
for (attr_name, extra_msg) in [
("upload_dir", "\nPlease specify `train.RunConfig(storage_path)` instead."),
# TODO(justinvyu): Point users to some user guide for custom fs.
(
"syncer",
"\nPlease implement custom syncing logic with a custom "
"`pyarrow.fs.FileSystem` instead, and pass it into "
"`train.RunConfig(storage_filesystem)`.",
"`train.RunConfig(storage_filesystem)`. "
"See here: https://docs.ray.io/en/latest/train/user-guides/persistent-storage.html#custom-storage", # noqa: E501
),
("sync_on_checkpoint", ""),
]:
self._deprecation_warning(attr_name, extra_msg)

# TODO(justinvyu): [code_removal]
from ray.train._internal.storage import _use_storage_context

if not _use_storage_context():
if self.upload_dir == _DEPRECATED_VALUE:
self.upload_dir = None
if self.syncer == _DEPRECATED_VALUE:
self.syncer = "auto"
if self.sync_on_checkpoint == _DEPRECATED_VALUE:
self.sync_on_checkpoint = True

def _repr_html_(self) -> str:
"""Generate an HTML representation of the SyncConfig."""
return Template("scrollableTable.html.j2").render(
Expand Down Expand Up @@ -391,29 +372,6 @@ def close(self):
def _repr_html_(self) -> str:
return

@classmethod
def validate_upload_dir(cls, upload_dir: str) -> bool:
"""Checks if ``upload_dir`` is supported by the Syncer.

Returns True if ``upload_dir`` is valid, otherwise raises
``ValueError``.

Args:
upload_dir: Path to validate.
"""
if not upload_dir:
return True

if upload_dir.startswith("file://"):
return True

if not is_non_local_path_uri(upload_dir):
raise ValueError(
f"Could not identify external storage filesystem for "
f"upload dir `{upload_dir}`. "
f"Hint: {fs_hint(upload_dir)}"
)


class _BackgroundSyncer(Syncer):
"""Syncer using a background process for asynchronous file transfer."""
Expand Down Expand Up @@ -477,10 +435,6 @@ def _sync_up_command(
def sync_down(
self, remote_dir: str, local_dir: str, exclude: Optional[List] = None
) -> bool:
from ray.train._internal.storage import _use_storage_context

assert not _use_storage_context(), "Should never be used in this mode."

if self._should_continue_existing_sync():
logger.warning(
f"Last sync still in progress, "
Expand Down Expand Up @@ -535,50 +489,9 @@ def __getstate__(self):
return state


class _DefaultSyncer(_BackgroundSyncer):
"""Default syncer between local and remote storage, using `pyarrow.fs.copy_files`"""

def _sync_up_command(
self, local_path: str, uri: str, exclude: Optional[List] = None
) -> Tuple[Callable, Dict]:
return (
upload_to_uri,
dict(local_path=local_path, uri=uri, exclude=exclude),
)

def _sync_down_command(self, uri: str, local_path: str) -> Tuple[Callable, Dict]:
return (
download_from_uri,
dict(uri=uri, local_path=local_path),
)

def _delete_command(self, uri: str) -> Tuple[Callable, Dict]:
return delete_at_uri, dict(uri=uri)


# TODO(justinvyu): [code_removal]
@DeveloperAPI
def get_node_to_storage_syncer(
sync_config: SyncConfig, upload_dir: Optional[str] = None
) -> Optional[Syncer]:
""""""
if sync_config.syncer is None:
return None

if not sync_config.upload_dir and not upload_dir:
return None

if sync_config.syncer == "auto":
return _DefaultSyncer(
sync_period=sync_config.sync_period, sync_timeout=sync_config.sync_timeout
)

if isinstance(sync_config.syncer, Syncer):
return sync_config.syncer

raise ValueError(
f"Unknown syncer type passed in SyncConfig: {type(sync_config.syncer)}. "
f"Note that custom sync functions and templates have been deprecated. "
f"Instead you can implement you own `Syncer` class. "
f"Please leave a comment on GitHub if you run into any issues with this: "
f"https://github.com/ray-project/ray/issues"
)
raise DeprecationWarning
14 changes: 0 additions & 14 deletions python/ray/train/lightning/lightning_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,20 +501,6 @@ def _unify_checkpoint_configs(

def _lightning_train_loop_per_worker(config):
"""Per-worker training loop for a Lightning Trainer."""
from ray.train._internal.storage import _use_storage_context

# TODO(justinvyu)/NOTE: This is no longer needed, because we do not switch to
# a rank-specific working directory in the new persistence mode.
# Lightning requires each worker to be in the same working directory.
if not _use_storage_context():
# Change the working directory for all workers to the same directory.
# This aligns with Lightning's settings and avoids inconsistency. Otherwise,
# each worker will have a different log and checkpoint directory if they are
# using relative paths.
working_dir = os.path.join(session.get_trial_dir(), "rank_all")
os.makedirs(working_dir, exist_ok=True)
os.chdir(working_dir)

if not config["lightning_config"]:
raise RuntimeError("'lightning_config' not specified in LightningTrainer!")

Expand Down
Loading