Skip to content

Commit

Permalink
[train] New persistence mode: Add storage type telemetry (ray-project…
Browse files Browse the repository at this point in the history
…#39286)

Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Jim Thompson <jimthompson5802@gmail.com>
  • Loading branch information
justinvyu authored and jimthompson5802 committed Sep 12, 2023
1 parent b97ce55 commit c843e50
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 54 deletions.
27 changes: 27 additions & 0 deletions python/ray/air/_internal/usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

if TYPE_CHECKING:
from ray.train.trainer import BaseTrainer
from ray.train._internal.storage import StorageContext
from ray.tune.schedulers import TrialScheduler
from ray.tune.search import BasicVariantGenerator, Searcher
from ray.tune import Callback
Expand Down Expand Up @@ -226,6 +227,32 @@ def _get_tag_for_remote_path(remote_path: str) -> str:
return tag


def tag_storage_type(storage: "StorageContext"):
"""Records the storage configuration of an experiment.
The storage configuration is set by `RunConfig(storage_path, storage_filesystem)`.
The possible storage types (defined by `pyarrow.fs.FileSystem.type_name`) are:
- 'local' = pyarrow.fs.LocalFileSystem. This includes NFS usage.
- 'mock' = pyarrow.fs._MockFileSystem. This is used for testing.
- ('s3', 'gcs', 'abfs', 'hdfs'): Various remote storage schemes
with default implementations in pyarrow.
- 'custom' = All other storage schemes, which includes ALL cases where a
custom `storage_filesystem` is provided.
- 'other' = catches any other cases not explicitly handled above.
"""
whitelist = {"local", "mock", "s3", "gcs", "abfs", "hdfs"}

if storage.custom_fs_provided:
storage_config_tag = "custom"
elif storage.storage_filesystem.type_name in whitelist:
storage_config_tag = storage.storage_filesystem.type_name
else:
storage_config_tag = "other"

record_extra_usage_tag(TagKey.AIR_STORAGE_CONFIGURATION, storage_config_tag)


def tag_ray_air_storage_config(
local_path: str, remote_path: Optional[str], sync_config: "SyncConfig"
) -> None:
Expand Down
80 changes: 29 additions & 51 deletions python/ray/air/tests/test_air_usage.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
"""Unit tests for AIR telemetry."""

from collections import namedtuple
import json
import os
from packaging.version import Version

import pyarrow.fs
import pytest
from unittest.mock import MagicMock, patch

Expand All @@ -12,6 +13,7 @@
from ray.air._internal import usage as air_usage
from ray.air._internal.usage import AirEntrypoint
from ray.air.integrations import wandb, mlflow, comet
from ray.train._internal.storage import StorageContext
from ray.tune.callback import Callback
from ray.tune.experiment.experiment import Experiment
from ray.tune.logger import LoggerCallback
Expand Down Expand Up @@ -68,59 +70,35 @@ def ray_start_4_cpus():
ray.shutdown()


# (nfs: bool, remote_path: str | None, syncing_disabled: bool, expected: str)
_StorageTestConfig = namedtuple(
"StorageTestConfig", ["nfs", "remote_path", "syncing_disabled", "expected"]
)

_storage_test_configs = [
# Local
_StorageTestConfig(False, None, False, "driver"),
_StorageTestConfig(False, None, True, "local"),
# Remote
_StorageTestConfig(False, "s3://mock/bucket?param=1", False, "s3"),
_StorageTestConfig(False, "gs://mock/bucket?param=1", False, "gs"),
_StorageTestConfig(False, "hdfs://mock/bucket?param=1", False, "hdfs"),
_StorageTestConfig(False, "file://mock/bucket?param=1", False, "local_uri"),
_StorageTestConfig(False, "memory://mock/bucket?param=1", False, "memory"),
_StorageTestConfig(
False, "custom://mock/bucket?param=1", False, "custom_remote_storage"
),
# NFS
_StorageTestConfig(True, None, True, "nfs"),
]


@pytest.mark.parametrize(
"storage_test_config",
_storage_test_configs,
ids=[str(config) for config in _storage_test_configs],
"storage_path_filesystem_expected",
[
("/tmp/test", None, "local"),
("s3://test", None, "s3"),
("gs://test", None, "gcs"),
("mock://test", None, "mock"),
("test", pyarrow.fs.LocalFileSystem(), "custom"),
],
)
def test_tag_ray_air_storage_config(
tmp_path, storage_test_config, mock_record, monkeypatch
):
if storage_test_config.nfs:
import ray.air._internal.remote_storage

monkeypatch.setattr(
ray.air._internal.remote_storage,
"_get_network_mounts",
lambda: [str(tmp_path)],
)

local_path = str(tmp_path / "local_path")
sync_config = (
train.SyncConfig(syncer=None)
if storage_test_config.syncing_disabled
else train.SyncConfig()
)

air_usage.tag_ray_air_storage_config(
local_path=local_path,
remote_path=storage_test_config.remote_path,
sync_config=sync_config,
def test_tag_storage_type(storage_path_filesystem_expected, mock_record, monkeypatch):
# Don't write anything to storage for the test.
monkeypatch.setattr(StorageContext, "_create_validation_file", lambda _: None)
monkeypatch.setattr(StorageContext, "_check_validation_file", lambda _: None)

storage_path, storage_filesystem, expected = storage_path_filesystem_expected

if Version(pyarrow.__version__) < Version("9.0.0") and storage_path.startswith(
"gs://"
):
pytest.skip("GCS support requires pyarrow >= 9.0.0")

storage = StorageContext(
storage_path=storage_path,
experiment_dir_name="test",
storage_filesystem=storage_filesystem,
)
assert storage_test_config.expected == mock_record[TagKey.AIR_STORAGE_CONFIGURATION]
air_usage.tag_storage_type(storage)
assert mock_record[TagKey.AIR_STORAGE_CONFIGURATION] == expected


class _CustomLoggerCallback(LoggerCallback):
Expand Down
1 change: 1 addition & 0 deletions python/ray/tests/test_usage_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -1207,6 +1207,7 @@ def run_usage_stats_server(reporter):
expected_payload["tune_scheduler"] = "FIFOScheduler"
expected_payload["tune_searcher"] = "BasicVariantGenerator"
expected_payload["air_entrypoint"] = "Tuner.fit"
expected_payload["air_storage_configuration"] = "local"
assert payload["extra_usage_tags"] == expected_payload
assert payload["total_num_nodes"] == 1
assert payload["total_num_running_jobs"] == 1
Expand Down
4 changes: 2 additions & 2 deletions python/ray/train/_internal/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ def __init__(
trial_dir_name: Optional[str] = None,
current_checkpoint_index: int = 0,
):
custom_fs_provided = storage_filesystem is not None
self.custom_fs_provided = storage_filesystem is not None

self.storage_local_path = _get_defaults_results_dir()

Expand Down Expand Up @@ -459,7 +459,7 @@ def __init__(
# Otherwise, syncing is only needed if storage_local_path
# and storage_fs_path point to different locations.
syncing_needed = (
custom_fs_provided or self.storage_fs_path != self.storage_local_path
self.custom_fs_provided or self.storage_fs_path != self.storage_local_path
)
self.syncer: Optional[Syncer] = (
_FilesystemSyncer(
Expand Down
4 changes: 3 additions & 1 deletion python/ray/tune/tune.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,6 @@ class and registered trainables.
if _use_storage_context():
local_path, remote_path = None, None
sync_config = sync_config or SyncConfig()
# TODO(justinvyu): Fix telemetry for the new persistence.

# TODO(justinvyu): Finalize the local_dir vs. env var API in 2.8.
# For now, keep accepting both options.
Expand Down Expand Up @@ -953,6 +952,9 @@ class and registered trainables.

progress_metrics = _detect_progress_metrics(_get_trainable(run_or_experiment))

if _use_storage_context():
air_usage.tag_storage_type(experiments[0].storage)

# NOTE: Report callback telemetry before populating the list with default callbacks.
# This tracks user-specified callback usage.
air_usage.tag_callbacks(callbacks)
Expand Down

0 comments on commit c843e50

Please sign in to comment.