Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Train][Tune] Support reading train result from cloud storage #40622

Merged
merged 12 commits into from
Nov 17, 2023
Merged
109 changes: 58 additions & 51 deletions python/ray/air/result.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import io
import json
import pandas as pd
import pyarrow
Expand Down Expand Up @@ -43,6 +44,11 @@ class Result:
metrics: The latest set of reported metrics.
checkpoint: The latest checkpoint.
error: The execution error of the Trainable run, if the trial finishes in error.
path: Path pointing to the result directory on persistent storage. This can
point to a remote storage location (e.g. S3) or to a local location (path
on the head node). The path is accessible via the result's associated
`filesystem`. For instance, for a result stored in S3 at
``s3://bucket/location``, ``path`` will have the value ``bucket/location``.
metrics_dataframe: The full result dataframe of the Trainable.
The dataframe is indexed by iterations and contains reported
metrics. Note that the dataframe columns are indexed with the
Expand All @@ -58,10 +64,9 @@ class Result:
metrics: Optional[Dict[str, Any]]
checkpoint: Optional["Checkpoint"]
error: Optional[Exception]
path: str
metrics_dataframe: Optional["pd.DataFrame"] = None
best_checkpoints: Optional[List[Tuple["Checkpoint", Dict[str, Any]]]] = None
_local_path: Optional[str] = None
_remote_path: Optional[str] = None
_storage_filesystem: Optional[pyarrow.fs.FileSystem] = None
_items_to_repr = ["error", "metrics", "path", "filesystem", "checkpoint"]

Expand All @@ -72,19 +77,6 @@ def config(self) -> Optional[Dict[str, Any]]:
return None
return self.metrics.get("config", None)

@property
def path(self) -> str:
"""Path pointing to the result directory on persistent storage.

This can point to a remote storage location (e.g. S3) or to a local
location (path on the head node). The path is accessible via the result's
associated `filesystem`.

For instance, for a result stored in S3 at ``s3://bucket/location``,
``path`` will have the value ``bucket/location``.
"""
return self._remote_path or self._local_path

@property
def filesystem(self) -> pyarrow.fs.FileSystem:
"""Return the filesystem that can be used to access the result path.
Expand Down Expand Up @@ -126,44 +118,52 @@ def _repr(self, indent: int = 0) -> str:
def __repr__(self) -> str:
return self._repr(indent=0)

@staticmethod
def _validate_trial_dir(trial_dir: Union[str, os.PathLike]):
"""Check the validity of the local trial folder."""

# TODO(yunxuanx): Add more checks for cloud storage restoration
if not os.path.exists(trial_dir):
raise RuntimeError(f"Trial folder {trial_dir} doesn't exists!")

@classmethod
def from_path(cls, path: Union[str, os.PathLike]) -> "Result":
"""Restore a Result object from local trial directory.
def from_path(
cls,
path: Union[str, os.PathLike],
storage_filesystem: Optional[pyarrow.fs.FileSystem] = None,
) -> "Result":
"""Restore a Result object from local or remote trial directory.

Args:
path: the path to a local trial directory.
path: A storage path or URI of the trial directory. (ex: s3://bucket/path
justinvyu marked this conversation as resolved.
Show resolved Hide resolved
or /tmp/ray_results)
storage_filesystem: A custom filesystem to use. If not provided,
this will be auto-resolved by pyarrow. If provided, the path
is assumed to be prefix-stripped already, and must be a valid path
on the filesystem.

Returns:
A :py:class:`Result` object of that trial.
"""
# TODO(justinvyu): Fix circular dependency.
from ray.train import Checkpoint
from ray.train.constants import CHECKPOINT_DIR_NAME
from ray.train._internal.storage import (
get_fs_and_path,
_exists_at_fs_path,
_read_file_as_str,
_list_at_fs_path,
)

cls._validate_trial_dir(path)

local_path = Path(path)
# TODO(yunxuanx): restoration from cloud storage
fs, fs_path = get_fs_and_path(path, storage_filesystem)
if not _exists_at_fs_path(fs, fs_path):
raise RuntimeError(f"Trial folder {fs_path} doesn't exist!")

# Restore metrics from result.json
result_json_file = local_path / EXPR_RESULT_FILE
progress_csv_file = local_path / EXPR_PROGRESS_FILE
if result_json_file.exists():
with open(result_json_file, "r") as f:
json_list = [json.loads(line) for line in f if line]
metrics_df = pd.json_normalize(json_list, sep="/")
result_json_file = os.path.join(fs_path, EXPR_RESULT_FILE)
progress_csv_file = os.path.join(fs_path, EXPR_PROGRESS_FILE)
if _exists_at_fs_path(fs, result_json_file):
lines = _read_file_as_str(fs, result_json_file).split("\n")
json_list = [json.loads(line) for line in lines if line]
metrics_df = pd.json_normalize(json_list, sep="/")
latest_metrics = json_list[-1] if json_list else {}
# Fallback to restore from progress.csv
elif progress_csv_file.exists():
metrics_df = pd.read_csv(progress_csv_file)
elif _exists_at_fs_path(fs, progress_csv_file):
metrics_df = pd.read_csv(
io.StringIO(_read_file_as_str(fs, progress_csv_file))
)
latest_metrics = (
metrics_df.iloc[-1].to_dict() if not metrics_df.empty else {}
)
Expand All @@ -174,23 +174,30 @@ def from_path(cls, path: Union[str, os.PathLike]) -> "Result":
)

# Restore all checkpoints from the checkpoint folders
checkpoint_dirs = sorted(local_path.glob("checkpoint_*"))
checkpoint_dir_names = sorted(
_list_at_fs_path(
fs,
fs_path,
lambda file_info: file_info.type == pyarrow.fs.FileType.Directory
justinvyu marked this conversation as resolved.
Show resolved Hide resolved
and file_info.base_name.startswith("checkpoint_"),
)
)

if checkpoint_dirs:
if checkpoint_dir_names:
checkpoints = [
Checkpoint.from_directory(checkpoint_dir)
for checkpoint_dir in checkpoint_dirs
Checkpoint(path=Path(fs_path, checkpoint_dir_name).as_posix(), filesystem=fs)
for checkpoint_dir_name in checkpoint_dir_names
]

metrics = []
for checkpoint_dir in checkpoint_dirs:
for checkpoint_dir_name in checkpoint_dir_names:
metrics_corresponding_to_checkpoint = metrics_df[
metrics_df[CHECKPOINT_DIR_NAME] == checkpoint_dir.name
metrics_df[CHECKPOINT_DIR_NAME] == checkpoint_dir_name
]
if metrics_corresponding_to_checkpoint.empty:
logger.warning(
"Could not find metrics corresponding to "
f"{checkpoint_dir.name}. These will default to an empty dict."
f"{checkpoint_dir_name}. These will default to an empty dict."
)
metrics.append(
{}
Expand All @@ -207,16 +214,16 @@ def from_path(cls, path: Union[str, os.PathLike]) -> "Result":

# Restore the trial error if it exists
error = None
error_file_path = local_path / EXPR_ERROR_PICKLE_FILE
if error_file_path.exists():
error = ray.cloudpickle.load(open(error_file_path, "rb"))
error_file_path = os.path.join(fs_path, EXPR_ERROR_PICKLE_FILE)
if _exists_at_fs_path(fs, error_file_path):
with fs.open_input_stream(error_file_path) as f:
error = ray.cloudpickle.load(f)

return Result(
metrics=latest_metrics,
checkpoint=latest_checkpoint,
_local_path=local_path,
_remote_path=None,
_storage_filesystem=pyarrow.fs.LocalFileSystem(),
path=fs_path,
_storage_filesystem=fs,
metrics_dataframe=metrics_df,
best_checkpoints=best_checkpoints,
error=error,
Expand Down
45 changes: 30 additions & 15 deletions python/ray/air/tests/test_result.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import os
import pyarrow
import pytest

import ray

from ray.air._internal.uri_utils import URI
from ray.air.constants import EXPR_RESULT_FILE
from ray import train
from ray.train import Result, CheckpointConfig, RunConfig, ScalingConfig
from ray.train.torch import TorchTrainer
from ray.train.base_trainer import TrainingFailedError
from ray.tune import TuneConfig, Tuner

from ray.train.tests.conftest import mock_s3_bucket_uri
from ray.train.tests.util import create_dict_checkpoint, load_dict_checkpoint
justinvyu marked this conversation as resolved.
Show resolved Hide resolved


Expand All @@ -30,11 +32,14 @@ def worker_loop(_config):
# Do some random reports in between checkpoints.
train.report({"metric_a": -100, "metric_b": -100})

with create_dict_checkpoint({"iter": i}) as checkpoint:
train.report(
metrics={"metric_a": i, "metric_b": -i},
checkpoint=checkpoint,
)
if ray.train.get_context().get_world_rank() == 0:
with create_dict_checkpoint({"iter": i}) as checkpoint:
train.report(
metrics={"metric_a": i, "metric_b": -i},
checkpoint=checkpoint,
)
else:
train.report(metrics={"metric_a": i, "metric_b": -i})
raise RuntimeError()

trainer = TorchTrainer(
Expand Down Expand Up @@ -62,13 +67,20 @@ def build_dummy_tuner(configs):
)


@pytest.mark.parametrize("storage", ["local", "remote"])
@pytest.mark.parametrize("mode", ["trainer", "tuner"])
def test_result_restore(ray_start_4_cpus, monkeypatch, tmpdir, mode):
def test_result_restore(
ray_start_4_cpus, monkeypatch, tmpdir, mock_s3_bucket_uri, storage, mode
):
monkeypatch.setenv("RAY_AIR_LOCAL_CACHE_DIR", str(tmpdir / "ray_results"))

NUM_ITERATIONS = 5
NUM_CHECKPOINTS = 3
storage_path = str(tmpdir)
if storage == "local":
storage_path = str(tmpdir)
elif storage == "remote":
storage_path = str(URI(mock_s3_bucket_uri))

exp_name = "test_result_restore"

configs = {
Expand All @@ -87,13 +99,16 @@ def test_result_restore(ray_start_4_cpus, monkeypatch, tmpdir, mode):
tuner.fit()

# Find the trial directory to restore
exp_dir = os.path.join(storage_path, exp_name)
for dirname in os.listdir(exp_dir):
if dirname.startswith("TorchTrainer"):
trial_dir = os.path.join(exp_dir, dirname)
exp_dir = str(URI(storage_path) / exp_name)
fs, fs_exp_dir = pyarrow.fs.FileSystem.from_uri(exp_dir)
for item in fs.get_file_info(pyarrow.fs.FileSelector(fs_exp_dir)):
if item.type == pyarrow.fs.FileType.Directory and item.base_name.startswith(
"TorchTrainer"
):
trial_dir = str(URI(exp_dir) / item.base_name)
break

# [1] Restore from local path
# [1] Restore from path
result = Result.from_path(trial_dir)

# Check if we restored all checkpoints
Expand Down Expand Up @@ -124,8 +139,8 @@ def test_result_restore(ray_start_4_cpus, monkeypatch, tmpdir, mode):
# Check that the config is properly formatted in the result metrics
assert result.metrics.get("config") == {"train_loop_config": _PARAM_SPACE}

# [2] Restore from local path without result.json
os.remove(f"{trial_dir}/{EXPR_RESULT_FILE}")
# [2] Restore from path without result.json
fs.delete_file((URI(trial_dir) / EXPR_RESULT_FILE).path)
result = Result.from_path(trial_dir)

# Do the same checks as above
Expand Down
35 changes: 34 additions & 1 deletion python/ray/train/_internal/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,11 @@ def _upload_to_uri_with_exclude_fsspec(
)


def _list_at_fs_path(fs: pyarrow.fs.FileSystem, fs_path: str) -> List[str]:
def _list_at_fs_path(
fs: pyarrow.fs.FileSystem,
fs_path: str,
file_filter: Callable[[pyarrow.fs.FileInfo], bool] = lambda x: True,
) -> List[str]:
"""Returns the list of filenames at (fs, fs_path), similar to os.listdir.

If the path doesn't exist, returns an empty list.
Expand All @@ -240,6 +244,7 @@ def _list_at_fs_path(fs: pyarrow.fs.FileSystem, fs_path: str) -> List[str]:
return [
os.path.relpath(file_info.path.lstrip("/"), start=fs_path.lstrip("/"))
for file_info in fs.get_file_info(selector)
if file_filter(file_info)
]


Expand Down Expand Up @@ -302,6 +307,34 @@ def get_fs_and_path(
return pyarrow.fs.FileSystem.from_uri(storage_path)


def _read_file_as_str(
storage_filesystem: pyarrow.fs.FileSystem,
storage_path: str,
compression: Optional[str] = "detect",
buffer_size: Optional[int] = None,
encoding: Optional[str] = "utf-8",
) -> str:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'm okay with adding this helper, but also it might be clearer to just do the filesystem operations directly? We can add this in the future if it turns out we need to read a file as text directly from cloud very often.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it would be cleaner and clearer to do it this way especially the logic is used twice (for json and csv). I was going to make it inner function in from_path function. I can move as a static method in Result like the old _validate_trial_dir.

"""Opens a file as an input stream reading all byte content sequentially and
decoding read bytes as string.

Args:
storage_filesystem: The filesystem to use.
storage_path: The source to open for reading.
compression: The compression algorithm to use for on-the-fly decompression.
If “detect” and source is a file path, then compression will be chosen
based on the file extension. If None, no compression will be applied.
Otherwise, a well-known algorithm name must be supplied (e.g. “gzip”).
buffer_size: If None or 0, no buffering will happen. Otherwise, the size
of the temporary read buffer.
encoding: Encoding of the source bytes.
"""

with storage_filesystem.open_input_stream(
storage_path, compression=compression, buffer_size=buffer_size
) as f:
return f.readall().decode(encoding)


class _FilesystemSyncer(_BackgroundSyncer):
"""Syncer between local filesystem and a `storage_filesystem`."""

Expand Down
3 changes: 1 addition & 2 deletions python/ray/tune/result_grid.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,7 @@ def _trial_to_result(self, trial: Trial) -> Result:
checkpoint=checkpoint,
metrics=trial.last_result.copy(),
error=self._populate_exception(trial),
_local_path=trial.local_path,
_remote_path=trial.path,
path=trial.path,
_storage_filesystem=(
self._experiment_analysis._fs
if isinstance(self._experiment_analysis, ExperimentAnalysis)
justinvyu marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
4 changes: 2 additions & 2 deletions python/ray/tune/tests/test_result_grid.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,14 @@ class MockExperimentAnalysis:
Result(
metrics={"loss": 1.0},
checkpoint=Checkpoint("/tmp/ckpt1"),
_local_path="log_1",
path="log_1",
error=None,
metrics_dataframe=None,
),
Result(
metrics={"loss": 2.0},
checkpoint=Checkpoint("/tmp/ckpt2"),
_local_path="log_2",
path="log_2",
error=RuntimeError(),
metrics_dataframe=None,
best_checkpoints=None,
Expand Down
Loading