Skip to content

Commit

Permalink
Merge branch 'master' into log-deployment-id
Browse files Browse the repository at this point in the history
  • Loading branch information
edoakes committed Mar 5, 2024
2 parents 4dfd182 + 94bbf99 commit c74d70a
Show file tree
Hide file tree
Showing 46 changed files with 673 additions and 2,108 deletions.
26 changes: 22 additions & 4 deletions python/ray/air/config.py
Expand Up @@ -16,6 +16,7 @@

import pyarrow.fs

from ray._private.storage import _get_storage_uri
from ray._private.thirdparty.tabulate.tabulate import tabulate
from ray.util.annotations import PublicAPI, Deprecated
from ray.widgets import Template, make_table_html_repr
Expand Down Expand Up @@ -581,10 +582,14 @@ class RunConfig:
Args:
name: Name of the trial or experiment. If not provided, will be deduced
from the Trainable.
storage_path: [Beta] Path to store results at. Can be a local directory or
a destination on cloud storage. If Ray storage is set up,
defaults to the storage location. Otherwise, this defaults to
the local ``~/ray_results`` directory.
storage_path: [Beta] Path where all results and checkpoints are persisted.
Can be a local directory or a destination on cloud storage.
For multi-node training/tuning runs, this must be set to a
shared storage location (e.g., S3, NFS).
This defaults to the local ``~/ray_results`` directory.
storage_filesystem: [Beta] A custom filesystem to use for storage.
If this is provided, `storage_path` should be a path with its
prefix stripped (e.g., `s3://bucket/path` -> `bucket/path`).
failure_config: Failure mode configuration.
checkpoint_config: Checkpointing configuration.
sync_config: Configuration object for syncing. See train.SyncConfig.
Expand Down Expand Up @@ -634,8 +639,21 @@ class RunConfig:

def __post_init__(self):
from ray.train import SyncConfig
from ray.train.constants import DEFAULT_STORAGE_PATH
from ray.tune.experimental.output import AirVerbosity, get_air_verbosity

if self.storage_path is None:
self.storage_path = DEFAULT_STORAGE_PATH

# If no remote path is set, try to get Ray Storage URI
ray_storage_uri: Optional[str] = _get_storage_uri()
if ray_storage_uri is not None:
logger.info(
"Using configured Ray Storage URI as the `storage_path`: "
f"{ray_storage_uri}"
)
self.storage_path = ray_storage_uri

if not self.failure_config:
self.failure_config = FailureConfig()

Expand Down
10 changes: 5 additions & 5 deletions python/ray/air/integrations/mlflow.py
Expand Up @@ -3,7 +3,6 @@
from typing import Dict, Optional, Union

import ray
from ray.air import session
from ray.air._internal.mlflow import _MLflowLoggerUtil
from ray.air._internal import usage as air_usage
from ray.air.constants import TRAINING_ITERATION
Expand Down Expand Up @@ -148,13 +147,14 @@ def train_fn(config):
)

try:
train_context = ray.train.get_context()

# Do a try-catch here if we are not in a train session
_session = session._get_session(warn=False)
if _session and rank_zero_only and session.get_world_rank() != 0:
if rank_zero_only and train_context.get_world_rank() != 0:
return _NoopModule()

default_trial_id = session.get_trial_id()
default_trial_name = session.get_trial_name()
default_trial_id = train_context.get_trial_id()
default_trial_name = train_context.get_trial_name()

except RuntimeError:
default_trial_id = None
Expand Down
3 changes: 1 addition & 2 deletions python/ray/air/tests/test_configs.py
Expand Up @@ -34,8 +34,7 @@ def test_repr(config):

def test_storage_filesystem_repr():
config = RunConfig(storage_filesystem=pyarrow.fs.S3FileSystem())
representation = repr(config)
assert len(representation) < MAX_REPR_LENGTH
repr(config)


def test_failure_config_init():
Expand Down
28 changes: 8 additions & 20 deletions python/ray/air/tests/test_integration_mlflow.py
Expand Up @@ -3,13 +3,11 @@
import tempfile
import unittest
from collections import namedtuple
from unittest.mock import patch
from unittest.mock import patch, MagicMock

from mlflow.tracking import MlflowClient

from ray._private.dict import flatten_dict
from ray.train._internal.session import init_session, shutdown_session
from ray.train._internal.storage import StorageContext
from ray.air.integrations.mlflow import MLflowLoggerCallback, setup_mlflow, _NoopModule
from ray.air._internal.mlflow import _MLflowLoggerUtil

Expand Down Expand Up @@ -49,8 +47,7 @@ def setUp(self):
assert client.get_experiment_by_name("existing_experiment").experiment_id == "1"

def tearDown(self) -> None:
# Shutdown session to clean up for next test
shutdown_session()
pass

def testMlFlowLoggerCallbackConfig(self):
# Explicitly pass in all args.
Expand Down Expand Up @@ -225,23 +222,14 @@ def testMlFlowSetupExplicit(self):
)
mlflow.end_run()

def testMlFlowSetupRankNonRankZero(self):
@patch("ray.train.get_context")
def testMlFlowSetupRankNonRankZero(self, mock_get_context):
"""Assert that non-rank-0 workers get a noop module"""
storage = StorageContext(
storage_path=tempfile.mkdtemp(),
experiment_dir_name="exp_name",
trial_dir_name="trial_name",
)
mock_context = MagicMock()
mock_context.get_world_rank.return_value = 1

mock_get_context.return_value = mock_context

init_session(
training_func=None,
world_rank=1,
local_rank=1,
node_rank=1,
local_world_size=2,
world_size=2,
storage=storage,
)
mlflow = setup_mlflow({})
assert isinstance(mlflow, _NoopModule)

Expand Down
11 changes: 5 additions & 6 deletions python/ray/train/_internal/session.py
Expand Up @@ -222,15 +222,14 @@ def reset(
self.training_started = False
self._first_report = True

# Change the working directory to the local trial directory.
# -> All workers on the same node share a working directory.
os.makedirs(storage.trial_local_path, exist_ok=True)
# Change the working directory to a special trial folder.
# This is to ensure that all Ray Train workers have a common working directory.
os.makedirs(storage.trial_working_directory, exist_ok=True)
if bool(int(os.environ.get(RAY_CHDIR_TO_TRIAL_DIR, "1"))):
logger.debug(
"Switching the working directory to the trial directory: "
f"{storage.trial_local_path}"
f"Changing the working directory to: {storage.trial_working_directory}"
)
os.chdir(storage.trial_local_path)
os.chdir(storage.trial_working_directory)

def pause_reporting(self):
"""Ignore all future ``session.report()`` calls."""
Expand Down

0 comments on commit c74d70a

Please sign in to comment.