Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[train] New persistence mode: Add storage type telemetry #39286

Merged
merged 6 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions python/ray/air/_internal/usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

if TYPE_CHECKING:
from ray.train.trainer import BaseTrainer
from ray.train._internal.storage import StorageContext
from ray.tune.schedulers import TrialScheduler
from ray.tune.search import BasicVariantGenerator, Searcher
from ray.tune import Callback
Expand Down Expand Up @@ -226,6 +227,31 @@ def _get_tag_for_remote_path(remote_path: str) -> str:
return tag


def tag_storage_type(storage: "StorageContext"):
"""Records the storage configuration of an experiment.

The storage configuration is set by `RunConfig(storage_path, storage_filesystem)`.

The possible storage types (defined by `pyarrow.fs.FileSystem.type_name`) are:
- 'local' = pyarrow.fs.LocalFileSystem. This includes NFS usage.
- 'mock' = pyarrow.fs._MockFileSystem. This is used for testing.
- ('s3', 'gcs', 'abfs', 'hdfs'): Various remote storage schemes
with default implementations in pyarrow.
- 'custom' = All other storage schemes, which includes ALL cases where a
custom `storage_filesystem` is provided.
"""
whitelist = {"local", "mock", "s3", "gcs", "abfs", "hdfs"}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there also gs as an alternative for gcs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, pyarrow fs type name is always gcs


storage_config_tag = None
if storage.custom_fs_provided:
storage_config_tag = "custom"
elif storage.storage_filesystem.type_name in whitelist:
storage_config_tag = storage.storage_filesystem.type_name
justinvyu marked this conversation as resolved.
Show resolved Hide resolved

if storage_config_tag is not None:
record_extra_usage_tag(TagKey.AIR_STORAGE_CONFIGURATION, storage_config_tag)


def tag_ray_air_storage_config(
local_path: str, remote_path: Optional[str], sync_config: "SyncConfig"
) -> None:
Expand Down
80 changes: 29 additions & 51 deletions python/ray/air/tests/test_air_usage.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
"""Unit tests for AIR telemetry."""

from collections import namedtuple
import json
import os
from packaging.version import Version

import pyarrow.fs
import pytest
from unittest.mock import MagicMock, patch

Expand All @@ -12,6 +13,7 @@
from ray.air._internal import usage as air_usage
from ray.air._internal.usage import AirEntrypoint
from ray.air.integrations import wandb, mlflow, comet
from ray.train._internal.storage import StorageContext
from ray.tune.callback import Callback
from ray.tune.experiment.experiment import Experiment
from ray.tune.logger import LoggerCallback
Expand Down Expand Up @@ -68,59 +70,35 @@ def ray_start_4_cpus():
ray.shutdown()


# (nfs: bool, remote_path: str | None, syncing_disabled: bool, expected: str)
_StorageTestConfig = namedtuple(
"StorageTestConfig", ["nfs", "remote_path", "syncing_disabled", "expected"]
)

_storage_test_configs = [
# Local
_StorageTestConfig(False, None, False, "driver"),
_StorageTestConfig(False, None, True, "local"),
# Remote
_StorageTestConfig(False, "s3://mock/bucket?param=1", False, "s3"),
_StorageTestConfig(False, "gs://mock/bucket?param=1", False, "gs"),
_StorageTestConfig(False, "hdfs://mock/bucket?param=1", False, "hdfs"),
_StorageTestConfig(False, "file://mock/bucket?param=1", False, "local_uri"),
_StorageTestConfig(False, "memory://mock/bucket?param=1", False, "memory"),
_StorageTestConfig(
False, "custom://mock/bucket?param=1", False, "custom_remote_storage"
),
# NFS
_StorageTestConfig(True, None, True, "nfs"),
]


@pytest.mark.parametrize(
"storage_test_config",
_storage_test_configs,
ids=[str(config) for config in _storage_test_configs],
"storage_path_filesystem_expected",
[
("/tmp/test", None, "local"),
("s3://test", None, "s3"),
("gs://test", None, "gcs"),
("mock://test", None, "mock"),
("test", pyarrow.fs.LocalFileSystem(), "custom"),
],
)
def test_tag_ray_air_storage_config(
tmp_path, storage_test_config, mock_record, monkeypatch
):
if storage_test_config.nfs:
import ray.air._internal.remote_storage

monkeypatch.setattr(
ray.air._internal.remote_storage,
"_get_network_mounts",
lambda: [str(tmp_path)],
)

local_path = str(tmp_path / "local_path")
sync_config = (
train.SyncConfig(syncer=None)
if storage_test_config.syncing_disabled
else train.SyncConfig()
)

air_usage.tag_ray_air_storage_config(
local_path=local_path,
remote_path=storage_test_config.remote_path,
sync_config=sync_config,
def test_tag_storage_type(storage_path_filesystem_expected, mock_record, monkeypatch):
# Don't write anything to storage for the test.
monkeypatch.setattr(StorageContext, "_create_validation_file", lambda _: None)
monkeypatch.setattr(StorageContext, "_check_validation_file", lambda _: None)

storage_path, storage_filesystem, expected = storage_path_filesystem_expected

if Version(pyarrow.__version__) < Version("9.0.0") and storage_path.startswith(
"gs://"
):
pytest.skip("GCS support requires pyarrow >= 9.0.0")

storage = StorageContext(
storage_path=storage_path,
experiment_dir_name="test",
storage_filesystem=storage_filesystem,
)
assert storage_test_config.expected == mock_record[TagKey.AIR_STORAGE_CONFIGURATION]
air_usage.tag_storage_type(storage)
assert mock_record[TagKey.AIR_STORAGE_CONFIGURATION] == expected


class _CustomLoggerCallback(LoggerCallback):
Expand Down
1 change: 1 addition & 0 deletions python/ray/tests/test_usage_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -1207,6 +1207,7 @@ def run_usage_stats_server(reporter):
expected_payload["tune_scheduler"] = "FIFOScheduler"
expected_payload["tune_searcher"] = "BasicVariantGenerator"
expected_payload["air_entrypoint"] = "Tuner.fit"
expected_payload["air_storage_configuration"] = "local"
assert payload["extra_usage_tags"] == expected_payload
assert payload["total_num_nodes"] == 1
assert payload["total_num_running_jobs"] == 1
Expand Down
4 changes: 2 additions & 2 deletions python/ray/train/_internal/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ def __init__(
trial_dir_name: Optional[str] = None,
current_checkpoint_index: int = 0,
):
custom_fs_provided = storage_filesystem is not None
self.custom_fs_provided = storage_filesystem is not None

self.storage_local_path = _get_defaults_results_dir()

Expand Down Expand Up @@ -459,7 +459,7 @@ def __init__(
# Otherwise, syncing is only needed if storage_local_path
# and storage_fs_path point to different locations.
syncing_needed = (
custom_fs_provided or self.storage_fs_path != self.storage_local_path
self.custom_fs_provided or self.storage_fs_path != self.storage_local_path
)
self.syncer: Optional[Syncer] = (
_FilesystemSyncer(
Expand Down
4 changes: 3 additions & 1 deletion python/ray/tune/tune.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,6 @@ class and registered trainables.
if _use_storage_context():
local_path, remote_path = None, None
sync_config = sync_config or SyncConfig()
# TODO(justinvyu): Fix telemetry for the new persistence.

# TODO(justinvyu): Finalize the local_dir vs. env var API in 2.8.
# For now, keep accepting both options.
Expand Down Expand Up @@ -953,6 +952,9 @@ class and registered trainables.

progress_metrics = _detect_progress_metrics(_get_trainable(run_or_experiment))

if _use_storage_context():
air_usage.tag_storage_type(experiments[0].storage)

Comment on lines +955 to +957
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come this is tracked down here rather than where the TODO was?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need the Experiment object to be initialized to access the StorageContext

# NOTE: Report callback telemetry before populating the list with default callbacks.
# This tracks user-specified callback usage.
air_usage.tag_callbacks(callbacks)
Expand Down
Loading