Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tune] Improve excessive syncing warning and make some deprecations #45210

Merged
merged 11 commits into from
May 20, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -702,14 +702,9 @@ def main():
ds_plugin = DeepSpeedPlugin(hf_ds_config=config.get("ds_config"))
config.update(ds_plugin=ds_plugin)

os.environ["RAY_AIR_LOCAL_CACHE_DIR"] = args.output_dir

ray.init(
runtime_env={
"env_vars": {
"HF_HOME": "/mnt/local_storage/.cache/huggingface",
"RAY_AIR_LOCAL_CACHE_DIR": os.environ["RAY_AIR_LOCAL_CACHE_DIR"],
},
"env_vars": {"HF_HOME": "/mnt/local_storage/.cache/huggingface"},
"working_dir": ".",
}
)
Expand Down
16 changes: 10 additions & 6 deletions python/ray/air/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,9 +661,17 @@ def __post_init__(self):
from ray.train.constants import DEFAULT_STORAGE_PATH
from ray.tune.experimental.output import AirVerbosity, get_air_verbosity

if self.local_dir is not None:
raise DeprecationWarning(
"The `RunConfig(local_dir)` argument is deprecated. "
"You should set the `RunConfig(storage_path)` instead."
"See the docs: https://docs.ray.io/en/latest/train/user-guides/"
"persistent-storage.html#setting-the-local-staging-directory"
)

if self.storage_path is None:
# TODO(justinvyu): [Deprecated] Remove fallback to local dir.
self.storage_path = self.local_dir or DEFAULT_STORAGE_PATH
# TODO(justinvyu): [Deprecated] Remove in 2.30
self.storage_path = DEFAULT_STORAGE_PATH

# If no remote path is set, try to get Ray Storage URI
ray_storage_uri: Optional[str] = _get_storage_uri()
Expand All @@ -690,10 +698,6 @@ def __post_init__(self):
# Todo (krfricke): Currently uses number to pass test_configs::test_repr
self.verbose = get_air_verbosity(AirVerbosity.DEFAULT) or 3

# Convert Paths to strings
if isinstance(self.local_dir, Path):
self.local_dir = self.local_dir.as_posix()

if isinstance(self.storage_path, Path):
self.storage_path = self.storage_path.as_posix()

Expand Down
16 changes: 7 additions & 9 deletions python/ray/air/tests/test_remote_storage_hdfs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Test remote_storage in a ci environment with real hdfs setup."""

import os

import pytest
Expand Down Expand Up @@ -39,7 +40,6 @@ def setup_hdfs():
def test_hdfs_train_checkpointing(tmp_path, monkeypatch, setup_hdfs):
"""See `ray.train.tests.test_new_persistence` for details."""
LOCAL_CACHE_DIR = tmp_path / "ray_results"
monkeypatch.setenv("RAY_AIR_LOCAL_CACHE_DIR", str(LOCAL_CACHE_DIR))
exp_name = "trainer_new_persistence"
no_checkpoint_ranks = [0]

Expand Down Expand Up @@ -81,14 +81,12 @@ def test_hdfs_train_checkpointing(tmp_path, monkeypatch, setup_hdfs):
restored_trainer = DataParallelTrainer.restore(path=storage_path + exp_name)
result = restored_trainer.fit()

with monkeypatch.context() as m:
# This is so that the `resume_from_checkpoint` run doesn't mess up the
# assertions later for the `storage_path=None` case.
m.setenv("RAY_AIR_LOCAL_CACHE_DIR", str(tmp_path / "resume_from_checkpoint"))
_resume_from_checkpoint(
result.checkpoint,
expected_state={"iter": TestConstants.NUM_ITERATIONS - 1},
)
# This is so that the `resume_from_checkpoint` run doesn't mess up the
# assertions later for the `storage_path=None` case.
_resume_from_checkpoint(
result.checkpoint,
expected_state={"iter": TestConstants.NUM_ITERATIONS - 1},
)

local_inspect_dir, storage_fs_path = _get_local_inspect_dir(
root_local_path=tmp_path,
Expand Down
2 changes: 1 addition & 1 deletion python/ray/train/tests/test_trainer_restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ def test_restore_with_datasets(ray_start_4_cpus, tmpdir):
train_loop_per_worker=lambda config: train.report({"score": 1}),
datasets=datasets,
scaling_config=ScalingConfig(num_workers=2),
run_config=RunConfig(name="datasets_respecify_test", local_dir=tmpdir),
run_config=RunConfig(name="datasets_respecify_test"),
)
trainer._save(pyarrow.fs.LocalFileSystem(), str(tmpdir))

Expand Down
9 changes: 6 additions & 3 deletions python/ray/tune/execution/experiment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,20 +201,23 @@ def wait_for_sync():
if (
time_since_last_sync is not None
and time_since_last_sync < self._excessive_sync_threshold
and self._should_force_sync_up
justinvyu marked this conversation as resolved.
Show resolved Hide resolved
):
logger.warning(
"Experiment state snapshotting has been triggered multiple "
f"times in the last {self._excessive_sync_threshold} seconds. "
f"times in the last {self._excessive_sync_threshold} seconds "
"and may become a bottleneck. "
"A snapshot is forced if `CheckpointConfig(num_to_keep)` is set, "
"and a trial has checkpointed >= `num_to_keep` times "
"since the last snapshot.\n"
"You may want to consider increasing the "
"`CheckpointConfig(num_to_keep)` or decreasing the frequency of "
"saving checkpoints.\n"
"You can suppress this error by setting the environment variable "
"You can suppress this warning by setting the environment variable "
"TUNE_WARN_EXCESSIVE_EXPERIMENT_CHECKPOINT_SYNC_THRESHOLD_S "
"to a smaller value than the current threshold "
f"({self._excessive_sync_threshold})."
f"({self._excessive_sync_threshold}). "
"Set it to 0 to completely suppress this warning."
)

self._last_sync_time = time.monotonic()
Expand Down
3 changes: 0 additions & 3 deletions python/ray/tune/impl/tuner_internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,9 +569,6 @@ def _get_tune_run_arguments(self, trainable: TrainableType) -> Dict[str, Any]:
trial_name_creator=self._tune_config.trial_name_creator,
trial_dirname_creator=self._tune_config.trial_dirname_creator,
_entrypoint=self._entrypoint,
# TODO(justinvyu): Finalize the local_dir vs. env var API in 2.8.
# For now, keep accepting both options.
local_dir=self._run_config.local_dir,
# Deprecated
chdir_to_trial_dir=self._tune_config.chdir_to_trial_dir,
)
Expand Down
2 changes: 1 addition & 1 deletion python/ray/tune/search/searcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ def restore_from_dir(self, checkpoint_dir: str):
cost,
run_config=train.RunConfig(
name=self.experiment_name,
local_dir="~/my_results",
storage_path="~/my_results",
),
tune_config=tune.TuneConfig(
search_alg=search_alg,
Expand Down
21 changes: 7 additions & 14 deletions python/ray/tune/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1851,26 +1851,19 @@ def __init__(self, search_alg=None, scheduler=None, **kwargs):
# TODO(justinvyu): [Deprecated] Remove this test once the configs are removed.
def test_local_dir_deprecation(ray_start_2_cpus, tmp_path, monkeypatch):
monkeypatch.setenv("RAY_AIR_LOCAL_CACHE_DIR", str(tmp_path))
with pytest.warns(None) as record:
result = ray.tune.Tuner(lambda _: None).fit()[0]
assert any("RAY_AIR_LOCAL_CACHE_DIR" in str(r.message) for r in record)
assert not result.path.startswith(str(tmp_path))
with pytest.raises(DeprecationWarning):
ray.tune.Tuner(lambda _: None).fit()
monkeypatch.delenv("RAY_AIR_LOCAL_CACHE_DIR")

monkeypatch.setenv("TUNE_RESULT_DIR", str(tmp_path))
with pytest.warns(None) as record:
result = ray.tune.Tuner(lambda _: None).fit()[0]
assert any("TUNE_RESULT_DIR" in str(r.message) for r in record)
assert not result.path.startswith(str(tmp_path))
with pytest.raises(DeprecationWarning):
ray.tune.Tuner(lambda _: None).fit()
monkeypatch.delenv("TUNE_RESULT_DIR")

with pytest.warns(None) as record:
result = ray.tune.Tuner(
with pytest.raises(DeprecationWarning):
ray.tune.Tuner(
lambda _: None, run_config=ray.train.RunConfig(local_dir=str(tmp_path))
).fit()[0]
assert any("local_dir" in str(r.message) for r in record)
# storage_path should fall back to local_dir during the migration period
assert result.path.startswith(str(tmp_path))
)


if __name__ == "__main__":
Expand Down
2 changes: 0 additions & 2 deletions python/ray/tune/tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ def start_ray():


def test_time(start_ray, tmpdir, monkeypatch):
monkeypatch.setenv("RAY_AIR_LOCAL_CACHE_DIR", str(tmpdir))

experiment_name = "test_time"
num_samples = 2

Expand Down
6 changes: 0 additions & 6 deletions python/ray/tune/tests/test_tune_save_restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
import tempfile
import unittest

import pytest

import ray
from ray import tune
from ray.train import CheckpointConfig
Expand Down Expand Up @@ -38,10 +36,6 @@ def load_checkpoint(self, checkpoint_dir):
extra_data = pickle.load(f)
self.state.update(extra_data)

@pytest.fixture(autouse=True)
def setLocalDir(self, tmp_path, monkeypatch):
monkeypatch.setenv("RAY_AIR_LOCAL_CACHE_DIR", str(tmp_path / "ray_results"))

def setUp(self):
self.absolute_local_dir = None
ray.init(num_cpus=1, num_gpus=0, local_mode=self.local_mode)
Expand Down
30 changes: 16 additions & 14 deletions python/ray/tune/tune.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,28 +566,30 @@ def run(

del remote_run_kwargs

# TODO(justinvyu): [Deprecated] Raise in 2.20
# TODO(justinvyu): [Deprecated] Remove in 2.30
ENV_VAR_DEPRECATION_MESSAGE = (
"The environment variable "
"`{}` is deprecated and will be removed in the future. "
"They are no longer used and will not have any effect. "
"You should set the `storage_path` instead. "
"The environment variable `{}` is deprecated. "
"It is no longer used and will not have any effect. "
"You should set the `storage_path` instead. Files will no longer be "
"written to `~/ray_results` as long as `storage_path` is set."
"See the docs: https://docs.ray.io/en/latest/train/user-guides/"
"persistent-storage.html#setting-the-local-staging-directory"
)
if os.environ.get("TUNE_RESULT_DIR"):
warnings.warn(ENV_VAR_DEPRECATION_MESSAGE.format("TUNE_RESULT_DIR"))
raise DeprecationWarning(ENV_VAR_DEPRECATION_MESSAGE.format("TUNE_RESULT_DIR"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My first thought is that it might be odd to raise if an environment variable is set, but I'm not sure what the best practice is, so this is probably fine?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was considering keeping it a warning for another release before just removing it.

Pros:

  • No breaking user code

Cons:

  • Implicit behavior change that users may be relying on will happen without warning / with no suggested fix


if os.environ.get("RAY_AIR_LOCAL_CACHE_DIR"):
warnings.warn(ENV_VAR_DEPRECATION_MESSAGE.format("RAY_AIR_LOCAL_CACHE_DIR"))
raise DeprecationWarning(
ENV_VAR_DEPRECATION_MESSAGE.format("RAY_AIR_LOCAL_CACHE_DIR")
)

if local_dir is not None:
warnings.warn(
"The `local_dir` argument is deprecated and will be removed. "
"This will pass-through to set the `storage_path` for now "
"but will raise an error in the future. "
"You should only set the `storage_path` from now on."
raise DeprecationWarning(
"The `local_dir` argument is deprecated. "
"You should set the `storage_path` instead. "
"See the docs: https://docs.ray.io/en/latest/train/user-guides/"
"persistent-storage.html#setting-the-local-staging-directory"
)
# Have `storage_path` fall back to `local_dir` if only `local_dir` is set.
storage_path = storage_path or local_dir

ray._private.usage.usage_lib.record_library_usage("tune")

Expand Down
6 changes: 0 additions & 6 deletions release/train_tests/multinode_persistence/test_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,6 @@ def test_trainer(root_path_storage_filesystem_label, tmp_path, monkeypatch):
are all correct. See `ray.train.test_new_persistence` for the expected filetree.
6. Tests a new run with `resume_from_checkpoint`.
"""
monkeypatch.setenv("RAY_AIR_LOCAL_CACHE_DIR", str(tmp_path / "ray_results"))

ray.init(runtime_env={"working_dir": "."}, ignore_reinit_error=True)

root_path, storage_filesystem, label = root_path_storage_filesystem_label
Expand Down Expand Up @@ -328,8 +326,6 @@ def test_trainer(root_path_storage_filesystem_label, tmp_path, monkeypatch):
def test_no_storage_error(tmp_path, monkeypatch):
"""Tests that an error is raised if you do multi-node checkpointing
w/ no persistent storage configured."""
monkeypatch.setenv("RAY_AIR_LOCAL_CACHE_DIR", str(tmp_path / "ray_results"))

ray.init(runtime_env={"working_dir": "."}, ignore_reinit_error=True)

trainer = TorchTrainer(
Expand All @@ -353,8 +349,6 @@ def test_no_storage_error(tmp_path, monkeypatch):
def test_no_storage_no_checkpoints(tmp_path, monkeypatch):
"""Tests that it's ok to run multi-node with no persistent storage
if you never report checkpoints."""
monkeypatch.setenv("RAY_AIR_LOCAL_CACHE_DIR", str(tmp_path / "ray_results"))

ray.init(runtime_env={"working_dir": "."}, ignore_reinit_error=True)

trainer = TorchTrainer(
Expand Down
Loading