Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[air] Don't set rank-specific local directories for Train workers #38007

Merged
merged 4 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions python/ray/train/_internal/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,17 @@ def noop(x):
encode_data_fn = noop
self._encode_data_fn = encode_data_fn

# TODO(xwjiang): Legacy Ray Train trainer clean up!
if trial_info:
# Change the working directory to `logdir`.
logdir = os.path.join(trial_info.logdir, f"rank_{self.world_rank}")
os.makedirs(logdir, exist_ok=True)
os.chdir(logdir)
if _use_storage_context():
# Change the working directory to the local trial directory.
# -> All workers on the same node share a working directory.
os.makedirs(storage.trial_local_path, exist_ok=True)
os.chdir(storage.trial_local_path)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: Ray actors don't inherit the working directory of their parent. Even though the Trainable actor changes its directory by default to the trial dir, the worker actor still starts at the original working dir.

else:
if trial_info:
# Change the working directory to `logdir`.
logdir = os.path.join(trial_info.logdir, f"rank_{self.world_rank}")
os.makedirs(logdir, exist_ok=True)
os.chdir(logdir)

# This lock is used to control the execution of the training thread.
self.continue_lock = threading.Semaphore(0)
Expand Down
18 changes: 18 additions & 0 deletions python/ray/train/_internal/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,8 @@ class StorageContext:
>>> storage.trial_dir_name = "trial_dir"
>>> storage.trial_fs_path
'bucket/path/exp_name/trial_dir'
>>> storage.trial_local_path
'/tmp/ray_results/exp_name/trial_dir'
>>> storage.current_checkpoint_index = 1
>>> storage.checkpoint_fs_path
'bucket/path/exp_name/trial_dir/checkpoint_000001'
Expand All @@ -358,6 +360,10 @@ class StorageContext:
'/tmp/ray_results'
>>> storage.experiment_path
'/tmp/ray_results/exp_name'
>>> storage.experiment_local_path
'/tmp/ray_results/exp_name'
>>> storage.experiment_fs_path
'/tmp/ray_results/exp_name'
>>> storage.syncer is None
True
>>> storage.storage_filesystem # Auto-resolved # doctest: +ELLIPSIS
Expand Down Expand Up @@ -495,6 +501,18 @@ def experiment_local_path(self) -> str:
"""
return os.path.join(self.storage_local_path, self.experiment_dir_name)

@property
def trial_local_path(self) -> str:
"""The local filesystem path to the trial directory.
Raises a ValueError if `trial_dir_name` is not set beforehand.
"""
if self.trial_dir_name is None:
raise RuntimeError(
"Should not access `trial_local_path` without setting `trial_dir_name`"
)
return os.path.join(self.experiment_local_path, self.trial_dir_name)

@property
def trial_fs_path(self) -> str:
"""The trial directory path on the `storage_filesystem`.
Expand Down
20 changes: 13 additions & 7 deletions python/ray/train/lightning/lightning_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,13 +517,19 @@ def restore(

def _lightning_train_loop_per_worker(config):
"""Per-worker training loop for a Lightning Trainer."""
# Change the working directory for all workers to the same directory.
# This aligns with Lightning's settings and avoids inconsistency. Otherwise,
# each worker will have a different log and checkpoint directory if they are
# using relative paths.
working_dir = os.path.join(session.get_trial_dir(), "rank_all")
os.makedirs(working_dir, exist_ok=True)
os.chdir(working_dir)
from ray.train._internal.storage import _use_storage_context

# TODO(justinvyu)/NOTE: This is no longer needed, because we do not switch to
# a rank-specific working directory in the new persistence mode.
# Lightning requires each worker to be in the same working directory.
if not _use_storage_context():
# Change the working directory for all workers to the same directory.
# This aligns with Lightning's settings and avoids inconsistency. Otherwise,
# each worker will have a different log and checkpoint directory if they are
# using relative paths.
working_dir = os.path.join(session.get_trial_dir(), "rank_all")
os.makedirs(working_dir, exist_ok=True)
os.chdir(working_dir)

if not config["lightning_config"]:
raise RuntimeError("'lightning_config' not specified in LightningTrainer!")
Expand Down
2 changes: 2 additions & 0 deletions python/ray/train/tests/test_new_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ def dummy_train_fn(config):
assert train_session.storage
assert train_session.storage.checkpoint_fs_path

assert os.getcwd() == train_session.storage.trial_local_path

trainer = DataParallelTrainer(
dummy_train_fn,
scaling_config=train.ScalingConfig(num_workers=2),
Expand Down
Loading