Skip to content

Commit

Permalink
[air] Don't set rank-specific local directories for Train workers (ra…
Browse files Browse the repository at this point in the history
…y-project#38007)

Signed-off-by: Victor <vctr.y.m@example.com>
  • Loading branch information
justinvyu authored and Victor committed Oct 11, 2023
1 parent 17fe651 commit 9b7aa07
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 13 deletions.
17 changes: 11 additions & 6 deletions python/ray/train/_internal/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,17 @@ def noop(x):
encode_data_fn = noop
self._encode_data_fn = encode_data_fn

# TODO(xwjiang): Legacy Ray Train trainer clean up!
if trial_info:
# Change the working directory to `logdir`.
logdir = os.path.join(trial_info.logdir, f"rank_{self.world_rank}")
os.makedirs(logdir, exist_ok=True)
os.chdir(logdir)
if _use_storage_context():
# Change the working directory to the local trial directory.
# -> All workers on the same node share a working directory.
os.makedirs(storage.trial_local_path, exist_ok=True)
os.chdir(storage.trial_local_path)
else:
if trial_info:
# Change the working directory to `logdir`.
logdir = os.path.join(trial_info.logdir, f"rank_{self.world_rank}")
os.makedirs(logdir, exist_ok=True)
os.chdir(logdir)

# This lock is used to control the execution of the training thread.
self.continue_lock = threading.Semaphore(0)
Expand Down
18 changes: 18 additions & 0 deletions python/ray/train/_internal/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,8 @@ class StorageContext:
>>> storage.trial_dir_name = "trial_dir"
>>> storage.trial_fs_path
'bucket/path/exp_name/trial_dir'
>>> storage.trial_local_path
'/tmp/ray_results/exp_name/trial_dir'
>>> storage.current_checkpoint_index = 1
>>> storage.checkpoint_fs_path
'bucket/path/exp_name/trial_dir/checkpoint_000001'
Expand All @@ -358,6 +360,10 @@ class StorageContext:
'/tmp/ray_results'
>>> storage.experiment_path
'/tmp/ray_results/exp_name'
>>> storage.experiment_local_path
'/tmp/ray_results/exp_name'
>>> storage.experiment_fs_path
'/tmp/ray_results/exp_name'
>>> storage.syncer is None
True
>>> storage.storage_filesystem # Auto-resolved # doctest: +ELLIPSIS
Expand Down Expand Up @@ -495,6 +501,18 @@ def experiment_local_path(self) -> str:
"""
return os.path.join(self.storage_local_path, self.experiment_dir_name)

@property
def trial_local_path(self) -> str:
"""The local filesystem path to the trial directory.
Raises a ValueError if `trial_dir_name` is not set beforehand.
"""
if self.trial_dir_name is None:
raise RuntimeError(
"Should not access `trial_local_path` without setting `trial_dir_name`"
)
return os.path.join(self.experiment_local_path, self.trial_dir_name)

@property
def trial_fs_path(self) -> str:
"""The trial directory path on the `storage_filesystem`.
Expand Down
20 changes: 13 additions & 7 deletions python/ray/train/lightning/lightning_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,13 +517,19 @@ def restore(

def _lightning_train_loop_per_worker(config):
"""Per-worker training loop for a Lightning Trainer."""
# Change the working directory for all workers to the same directory.
# This aligns with Lightning's settings and avoids inconsistency. Otherwise,
# each worker will have a different log and checkpoint directory if they are
# using relative paths.
working_dir = os.path.join(session.get_trial_dir(), "rank_all")
os.makedirs(working_dir, exist_ok=True)
os.chdir(working_dir)
from ray.train._internal.storage import _use_storage_context

# TODO(justinvyu)/NOTE: This is no longer needed, because we do not switch to
# a rank-specific working directory in the new persistence mode.
# Lightning requires each worker to be in the same working directory.
if not _use_storage_context():
# Change the working directory for all workers to the same directory.
# This aligns with Lightning's settings and avoids inconsistency. Otherwise,
# each worker will have a different log and checkpoint directory if they are
# using relative paths.
working_dir = os.path.join(session.get_trial_dir(), "rank_all")
os.makedirs(working_dir, exist_ok=True)
os.chdir(working_dir)

if not config["lightning_config"]:
raise RuntimeError("'lightning_config' not specified in LightningTrainer!")
Expand Down
2 changes: 2 additions & 0 deletions python/ray/train/tests/test_new_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ def dummy_train_fn(config):
assert train_session.storage
assert train_session.storage.checkpoint_fs_path

assert os.getcwd() == train_session.storage.trial_local_path

trainer = DataParallelTrainer(
dummy_train_fn,
scaling_config=train.ScalingConfig(num_workers=2),
Expand Down

0 comments on commit 9b7aa07

Please sign in to comment.