Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[train+tune] Local directory refactor (1/n): Write launcher state files (tuner.pkl, trainer.pkl) directly to storage #43369

Merged
merged 32 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
41ef191
storage_path default = ~/ray_results
justinvyu Feb 22, 2024
bf323fa
upload trainer pkl directly
justinvyu Feb 22, 2024
dee3249
upload tuner pkl directly
justinvyu Feb 22, 2024
bdd58b3
revert storage path default
justinvyu Feb 22, 2024
0bd1a58
fix optional storage path dependencies for now
justinvyu Feb 22, 2024
24a9fc0
remove todo
justinvyu Feb 22, 2024
90be9ca
small correction...
justinvyu Feb 23, 2024
c18384a
remove ipdb
justinvyu Feb 23, 2024
cab5e12
use converted trainable in tuner entrypoint
justinvyu Feb 23, 2024
09e0273
use non-optional run config
justinvyu Feb 23, 2024
c0d0ba0
remove local restoration test
justinvyu Feb 23, 2024
011ac92
keep base trainer and tuner exp dir name resolution consistent
justinvyu Feb 23, 2024
c3c03ac
add test case for restoration with default RunConfig(name)
justinvyu Feb 23, 2024
25cccb5
Merge branch 'master' of https://github.com/ray-project/ray into uplo…
justinvyu Feb 23, 2024
b367413
centralize on storage context for path handling in the tuner/trainer …
justinvyu Feb 23, 2024
417a7da
fix errors caused by syncing being enabled to the same dir
justinvyu Feb 23, 2024
6b10a13
key concepts small fix
justinvyu Feb 23, 2024
c218937
separate exp folders for doc code
justinvyu Feb 23, 2024
805757b
Merge branch 'master' of https://github.com/ray-project/ray into uplo…
justinvyu Feb 23, 2024
55e069a
Merge branch 'master' of https://github.com/ray-project/ray into uplo…
justinvyu Feb 26, 2024
39eaf81
fix trainer._save test usage
justinvyu Feb 26, 2024
75b80a6
use unique exp names for test
justinvyu Feb 27, 2024
876aad2
fix run config validation test
justinvyu Feb 27, 2024
85c2acd
Merge branch 'master' of https://github.com/ray-project/ray into uplo…
justinvyu Feb 27, 2024
df25a79
Merge branch 'master' of https://github.com/ray-project/ray into uplo…
justinvyu Feb 27, 2024
43d53a0
remove tuner try catch that relies on get_exp_ckpt_dir
justinvyu Feb 27, 2024
533c807
add comment about storage context at the top entrypoint layers
justinvyu Feb 27, 2024
185f57c
fix bug where new storage filesystem is not used on restoration (the …
justinvyu Feb 27, 2024
6f5277a
Merge branch 'master' of https://github.com/ray-project/ray into uplo…
justinvyu Feb 28, 2024
cdeaa44
fix test_errors
justinvyu Feb 28, 2024
f0ead9a
Merge branch 'master' of https://github.com/ray-project/ray into uplo…
justinvyu Feb 28, 2024
9c896f6
Merge branch 'master' of https://github.com/ray-project/ray into uplo…
justinvyu Feb 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/source/train/doc_code/key_concepts.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def train_fn(config):
result_path: str = result.path
result_filesystem: pyarrow.fs.FileSystem = result.filesystem

print("Results location (fs, path) = ({result_filesystem}, {result_path})")
print(f"Results location (fs, path) = ({result_filesystem}, {result_path})")
# __result_path_end__


Expand Down
2 changes: 1 addition & 1 deletion doc/source/train/doc_code/tuner.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@

tuner = Tuner(
trainable=trainer,
run_config=RunConfig(name="test_tuner"),
run_config=RunConfig(name="test_tuner_xgboost"),
param_space=param_space,
tune_config=tune.TuneConfig(
mode="min", metric="train-logloss", num_samples=2, max_concurrent_trials=2
Expand Down
36 changes: 22 additions & 14 deletions python/ray/train/base_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
from ray.air.result import Result
from ray.train import Checkpoint
from ray.train._internal.session import _get_session
from ray.train._internal.storage import _exists_at_fs_path, get_fs_and_path
from ray.train._internal.storage import (
StorageContext,
_exists_at_fs_path,
get_fs_and_path,
)
from ray.util import PublicAPI
from ray.util.annotations import DeveloperAPI

Expand Down Expand Up @@ -226,7 +230,7 @@ def __init__(
self.scaling_config = (
scaling_config if scaling_config is not None else ScalingConfig()
)
self.run_config = run_config if run_config is not None else RunConfig()
self.run_config = copy.copy(run_config) or RunConfig()
self.metadata = metadata
self.datasets = datasets if datasets is not None else {}
self.starting_checkpoint = resume_from_checkpoint
Expand Down Expand Up @@ -569,11 +573,20 @@ def fit(self) -> Result:
``self.as_trainable()``, or during the Tune execution loop.
"""
from ray.tune import ResumeConfig, TuneError
from ray.tune.tuner import Tuner, TunerInternal
from ray.tune.tuner import Tuner

trainable = self.as_trainable()
param_space = self._extract_fields_for_tuner_param_space()

self.run_config.name = (
self.run_config.name or StorageContext.get_experiment_dir_name(trainable)
)
storage = StorageContext(
storage_path=self.run_config.storage_path,
experiment_dir_name=self.run_config.name,
storage_filesystem=self.run_config.storage_filesystem,
)

if self._restore_path:
tuner = Tuner.restore(
path=self._restore_path,
Expand All @@ -594,16 +607,11 @@ def fit(self) -> Result:
_entrypoint=AirEntrypoint.TRAINER,
)

experiment_local_path, _ = TunerInternal.setup_create_experiment_checkpoint_dir(
trainable, self.run_config
)

experiment_local_path = Path(experiment_local_path)
self._save(experiment_local_path)
self._save(storage.storage_filesystem, storage.experiment_fs_path)

restore_msg = TrainingFailedError._RESTORE_MSG.format(
trainer_cls_name=self.__class__.__name__,
path=str(experiment_local_path),
path=str(storage.experiment_fs_path),
)

try:
Expand All @@ -627,7 +635,7 @@ def fit(self) -> Result:
) from result.error
return result

def _save(self, experiment_path: Union[str, Path]):
def _save(self, fs: pyarrow.fs.FileSystem, experiment_path: Union[str, Path]):
"""Saves the current trainer's class along with the `param_dict` of
parameters passed to this trainer's constructor.

Expand Down Expand Up @@ -656,9 +664,9 @@ def raise_fn():

cls_and_param_dict = (self.__class__, param_dict)

experiment_path = Path(experiment_path)
with open(experiment_path / _TRAINER_PKL, "wb") as fp:
pickle.dump(cls_and_param_dict, fp)
fs.create_dir(experiment_path)
with fs.open_output_stream(Path(experiment_path, _TRAINER_PKL).as_posix()) as f:
f.write(pickle.dumps(cls_and_param_dict))

def _extract_fields_for_tuner_param_space(self) -> Dict:
"""Extracts fields to be included in `Tuner.param_space`.
Expand Down
25 changes: 12 additions & 13 deletions python/ray/train/tests/test_trainer_restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from pathlib import Path
from typing import Dict, List

import pyarrow.fs
import pytest

import ray
Expand Down Expand Up @@ -187,26 +188,24 @@ def test_gbdt_trainer_restore(ray_start_6_cpus, tmp_path, trainer_cls, monkeypat
assert tmp_path / exp_name in Path(result.path).parents


@pytest.mark.parametrize("name", [None, "restore_from_uri"])
def test_restore_from_uri_s3(
ray_start_4_cpus, tmp_path, monkeypatch, mock_s3_bucket_uri
ray_start_4_cpus, tmp_path, monkeypatch, mock_s3_bucket_uri, name
):
"""Restoration from S3 should work."""
monkeypatch.setenv("RAY_AIR_LOCAL_CACHE_DIR", str(tmp_path))
trainer = DataParallelTrainer(
train_loop_per_worker=lambda config: train.report({"score": 1}),
scaling_config=ScalingConfig(num_workers=2),
run_config=RunConfig(name="restore_from_uri", storage_path=mock_s3_bucket_uri),
run_config=RunConfig(name=name, storage_path=mock_s3_bucket_uri),
)
trainer.fit()
result = trainer.fit()

# Restore from local dir
DataParallelTrainer.restore(str(tmp_path / "restore_from_uri"))
if name is None:
name = Path(result.path).parent.name

# Restore from S3
assert DataParallelTrainer.can_restore(
str(URI(mock_s3_bucket_uri) / "restore_from_uri")
)
DataParallelTrainer.restore(str(URI(mock_s3_bucket_uri) / "restore_from_uri"))
assert DataParallelTrainer.can_restore(str(URI(mock_s3_bucket_uri) / name))
woshiyyya marked this conversation as resolved.
Show resolved Hide resolved
DataParallelTrainer.restore(str(URI(mock_s3_bucket_uri) / name))


def test_restore_with_datasets(ray_start_4_cpus, tmpdir):
Expand All @@ -222,7 +221,7 @@ def test_restore_with_datasets(ray_start_4_cpus, tmpdir):
scaling_config=ScalingConfig(num_workers=2),
run_config=RunConfig(name="datasets_respecify_test", local_dir=tmpdir),
)
trainer._save(tmpdir)
trainer._save(pyarrow.fs.LocalFileSystem(), tmpdir)

# Restore should complain, if all the datasets don't get passed in again
with pytest.raises(ValueError):
Expand All @@ -248,7 +247,7 @@ def test_restore_with_different_trainer(tmpdir):
scaling_config=ScalingConfig(num_workers=1),
run_config=RunConfig(name="restore_with_diff_trainer"),
)
trainer._save(tmpdir)
trainer._save(pyarrow.fs.LocalFileSystem(), tmpdir)

def attempt_restore(trainer_cls, should_warn: bool, should_raise: bool):
def check_for_raise():
Expand Down Expand Up @@ -301,7 +300,7 @@ def test_trainer_can_restore_utility(tmp_path):
scaling_config=ScalingConfig(num_workers=1),
)
(tmp_path / name).mkdir(exist_ok=True)
trainer._save(tmp_path / name)
trainer._save(pyarrow.fs.LocalFileSystem(), tmp_path / name)

assert DataParallelTrainer.can_restore(path)

Expand Down
3 changes: 1 addition & 2 deletions python/ray/train/tests/test_tune.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ def test_run_config_in_trainer_and_tuner(
run_config=trainer_run_config,
)
with caplog.at_level(logging.INFO, logger="ray.tune.impl.tuner_internal"):
tuner = Tuner(trainer, run_config=tuner_run_config)
Tuner(trainer, run_config=tuner_run_config)

both_msg = (
"`RunConfig` was passed to both the `Tuner` and the `DataParallelTrainer`"
Expand All @@ -302,7 +302,6 @@ def test_run_config_in_trainer_and_tuner(
assert not (tmp_path / "trainer").exists()
assert both_msg not in caplog.text
else:
assert tuner._local_tuner.get_run_config() == RunConfig()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it because we inject a default storage context into the run config if not specified?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because I set the run_config.name in the trainer, then pass that along to the tuner. So, the tuner's run config is no longer the default RunConfig. This is so that we don't generate the experiment name multiple times (which could lead to different folders being used by the trainer vs. tuner).

assert both_msg not in caplog.text


Expand Down
68 changes: 23 additions & 45 deletions python/ray/tune/impl/tuner_internal.py
justinvyu marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import copy
import io
import os
import math
import logging
from pathlib import Path
Expand All @@ -27,7 +26,6 @@
from ray.tune import Experiment, ExperimentAnalysis, ResumeConfig, TuneError
from ray.tune.tune import _Config
from ray.tune.registry import is_function_trainable
from ray.tune.result import _get_defaults_results_dir
from ray.tune.result_grid import ResultGrid
from ray.tune.trainable import Trainable
from ray.tune.tune import run
Expand Down Expand Up @@ -102,7 +100,7 @@ def __init__(
)

self._tune_config = tune_config or TuneConfig()
self._run_config = run_config or RunConfig()
self._run_config = copy.copy(run_config) or RunConfig()
self._entrypoint = _entrypoint

# Restore from Tuner checkpoint.
Expand All @@ -129,12 +127,6 @@ def __init__(
self._resume_config = None
self._is_restored = False
self._tuner_kwargs = copy.deepcopy(_tuner_kwargs) or {}
(
self._local_experiment_dir,
self._experiment_dir_name,
) = self.setup_create_experiment_checkpoint_dir(
self.converted_trainable, self._run_config
)
self._experiment_analysis = None

# This needs to happen before `tune.run()` is kicked in.
Expand All @@ -143,9 +135,24 @@ def __init__(
# without allowing for checkpointing tuner and trainable.
# Thus this has to happen before tune.run() so that we can have something
# to restore from.
experiment_checkpoint_path = Path(self._local_experiment_dir, _TUNER_PKL)
with open(experiment_checkpoint_path, "wb") as fp:
pickle.dump(self.__getstate__(), fp)
self._run_config.name = (
self._run_config.name
or StorageContext.get_experiment_dir_name(self.converted_trainable)
)
storage = StorageContext(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be trying to instantiate this StorageContext only in one place and then pass it around?

Copy link
Contributor Author

@justinvyu justinvyu Feb 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is hard to do since we'd need to pass the context through from BaseTrainer.fit to the public Tuner interface.

One alternative we discussed is having a global storage context, but that'd also require a get_or_create_storage_context logic to account for users coming in through any of the 3 entrypoints (tuner, trainer, tune.run).

I will clarify the usage is just to access the path and the filesystem so that we don't re-implement that logic.

storage_path=self._run_config.storage_path,
experiment_dir_name=self._run_config.name,
storage_filesystem=self._run_config.storage_filesystem,
)
# TODO(justinvyu): Temporary fix for `get_experiment_checkpoint_dir`
justinvyu marked this conversation as resolved.
Show resolved Hide resolved
self._experiment_fs_path = storage.experiment_fs_path

fs = storage.storage_filesystem
fs.create_dir(storage.experiment_fs_path)
with fs.open_output_stream(
Path(storage.experiment_fs_path, _TUNER_PKL).as_posix()
) as f:
f.write(pickle.dumps(self.__getstate__()))

def get_run_config(self) -> RunConfig:
return self._run_config
Expand Down Expand Up @@ -350,12 +357,8 @@ def _restore_from_path_or_uri(
self._run_config.name = path_or_uri_obj.name
self._run_config.storage_path = str(path_or_uri_obj.parent)

(
self._local_experiment_dir,
self._experiment_dir_name,
) = self.setup_create_experiment_checkpoint_dir(
self.converted_trainable, self._run_config
)
# TODO(justinvyu): Temporary fix for `get_experiment_checkpoint_dir`
self._experiment_fs_path = path_or_uri

# Load the experiment results at the point where it left off.
try:
Expand Down Expand Up @@ -426,34 +429,9 @@ def _process_scaling_config(self) -> None:
return
self._param_space["scaling_config"] = scaling_config.__dict__.copy()

@classmethod
def setup_create_experiment_checkpoint_dir(
cls, trainable: TrainableType, run_config: Optional[RunConfig]
) -> Tuple[str, str]:
"""Sets up and creates the local experiment checkpoint dir.
This is so that the `tuner.pkl` file gets stored in the same directory
and gets synced with other experiment results.

Returns:
Tuple: (experiment_path, experiment_dir_name)
"""
# TODO(justinvyu): Move this logic into StorageContext somehow
experiment_dir_name = run_config.name or StorageContext.get_experiment_dir_name(
trainable
)
storage_local_path = _get_defaults_results_dir()
experiment_path = (
Path(storage_local_path).joinpath(experiment_dir_name).as_posix()
)

os.makedirs(experiment_path, exist_ok=True)
return experiment_path, experiment_dir_name

# This has to be done through a function signature (@property won't do).
def get_experiment_checkpoint_dir(self) -> str:
# TODO(justinvyu): This is used to populate an error message.
# This should point to the storage path experiment dir instead.
justinvyu marked this conversation as resolved.
Show resolved Hide resolved
return self._local_experiment_dir
return self._experiment_fs_path

@property
def trainable(self) -> TrainableTypeOrTrainer:
Expand Down Expand Up @@ -583,7 +561,7 @@ def _get_tune_run_arguments(self, trainable: TrainableType) -> Dict[str, Any]:
return dict(
storage_path=self._run_config.storage_path,
storage_filesystem=self._run_config.storage_filesystem,
name=self._experiment_dir_name,
name=self._run_config.name,
mode=self._tune_config.mode,
metric=self._tune_config.metric,
callbacks=self._run_config.callbacks,
Expand Down
4 changes: 2 additions & 2 deletions python/ray/tune/tune.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,9 +512,9 @@ def run(

if _entrypoint == AirEntrypoint.TRAINER:
error_message_map = {
"entrypoint": "Trainer(...)",
"entrypoint": "<FrameworkTrainer>(...)",
"search_space_arg": "param_space",
"restore_entrypoint": 'Trainer.restore(path="{path}", ...)',
"restore_entrypoint": '<FrameworkTrainer>.restore(path="{path}", ...)',
}
elif _entrypoint == AirEntrypoint.TUNER:
error_message_map = {
Expand Down