Skip to content

Commit

Permalink
[train+tune] Make path joining OS-agnostic by using Path.as_posix o…
Browse files Browse the repository at this point in the history
…ver `os.path.join` (#42037)

`os.path.join` uses an OS-dependent separator (`\` on windows and `/` on posix systems), which causes some issues when `os.path.join` is used in conjunction with `Path`. The combination can result in a mix of `/` and `\` when running Ray Train/Tune on windows. This runs into issues when passing these paths into `pyarrow.fs`, leading to issues such as `FileNotFound`.

---------

Signed-off-by: n3011 <mrinal.haloi11@gmail.com>
Signed-off-by: Ishant Mrinal <33053278+n30111@users.noreply.github.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Co-authored-by: n3011 <mrinal.haloi11@gmail.com>
Co-authored-by: Justin Yu <justinvyu@anyscale.com>
  • Loading branch information
3 people committed Jan 31, 2024
1 parent 3e59df1 commit 30653f1
Show file tree
Hide file tree
Showing 24 changed files with 103 additions and 96 deletions.
5 changes: 3 additions & 2 deletions python/ray/air/integrations/comet.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
from pathlib import Path
from typing import Dict, List

import pyarrow.fs
Expand Down Expand Up @@ -238,8 +239,8 @@ def log_trial_save(self, trial: "Trial"):
for root, dirs, files in os.walk(checkpoint_root):
rel_root = os.path.relpath(root, checkpoint_root)
for file in files:
local_file = os.path.join(checkpoint_root, rel_root, file)
logical_path = os.path.join(rel_root, file)
local_file = Path(checkpoint_root, rel_root, file).as_posix()
logical_path = Path(rel_root, file).as_posix()

# Strip leading `./`
if logical_path.startswith("./"):
Expand Down
8 changes: 4 additions & 4 deletions python/ray/air/result.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import os
import io
import json
from pathlib import Path
import pandas as pd
import pyarrow
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union

import ray
Expand Down Expand Up @@ -167,8 +167,8 @@ def from_path(
raise RuntimeError(f"Trial folder {fs_path} doesn't exist!")

# Restore metrics from result.json
result_json_file = os.path.join(fs_path, EXPR_RESULT_FILE)
progress_csv_file = os.path.join(fs_path, EXPR_PROGRESS_FILE)
result_json_file = Path(fs_path, EXPR_RESULT_FILE).as_posix()
progress_csv_file = Path(fs_path, EXPR_PROGRESS_FILE).as_posix()
if _exists_at_fs_path(fs, result_json_file):
lines = cls._read_file_as_str(fs, result_json_file).split("\n")
json_list = [json.loads(line) for line in lines if line]
Expand Down Expand Up @@ -232,7 +232,7 @@ def from_path(

# Restore the trial error if it exists
error = None
error_file_path = os.path.join(fs_path, EXPR_ERROR_PICKLE_FILE)
error_file_path = Path(fs_path, EXPR_ERROR_PICKLE_FILE).as_posix()
if _exists_at_fs_path(fs, error_file_path):
with fs.open_input_stream(error_file_path) as f:
error = ray.cloudpickle.load(f)
Expand Down
7 changes: 4 additions & 3 deletions python/ray/train/_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import tempfile
import traceback
import uuid
from pathlib import Path
from typing import Any, Dict, Iterator, List, Optional, Union

import pyarrow.fs
Expand Down Expand Up @@ -149,7 +150,7 @@ def get_metadata(self) -> Dict[str, Any]:
If no metadata is stored, an empty dict is returned.
"""
metadata_path = os.path.join(self.path, _METADATA_FILE_NAME)
metadata_path = Path(self.path, _METADATA_FILE_NAME).as_posix()
if not _exists_at_fs_path(self.filesystem, metadata_path):
return {}

Expand All @@ -161,7 +162,7 @@ def set_metadata(self, metadata: Dict[str, Any]) -> None:
This will overwrite any existing metadata stored with this checkpoint.
"""
metadata_path = os.path.join(self.path, _METADATA_FILE_NAME)
metadata_path = Path(self.path, _METADATA_FILE_NAME).as_posix()
with self.filesystem.open_output_stream(metadata_path) as f:
f.write(json.dumps(metadata).encode("utf-8"))

Expand Down Expand Up @@ -341,7 +342,7 @@ def _get_temporary_checkpoint_dir(self) -> str:
"Couldn't create checkpoint directory due to length "
"constraints. Try specifying a shorter checkpoint path."
)
return os.path.join(tmp_dir_path, checkpoint_dir_name)
return Path(tmp_dir_path, checkpoint_dir_name).as_posix()

def __fspath__(self):
raise TypeError(
Expand Down
21 changes: 11 additions & 10 deletions python/ray/train/_internal/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,13 @@ def _should_exclude(self, path: str) -> bool:
`self._exclude` patterns."""
path = Path(path)
relative_path = path.relative_to(self._root_path).as_posix()
alt = os.path.join(relative_path, "") if path.is_dir() else None
match_candidates = [relative_path]
if path.is_dir():
# Everything is in posix path format ('/')
match_candidates.append(relative_path + "/")

for excl in self._exclude:
if fnmatch.fnmatch(relative_path, excl):
return True
if alt and fnmatch.fnmatch(alt, excl):
if any(fnmatch.fnmatch(candidate, excl) for candidate in match_candidates):
return True
return False

Expand Down Expand Up @@ -357,7 +358,7 @@ class StorageContext:
There are 2 types of paths:
1. *_fs_path: A path on the `storage_filesystem`. This is a regular path
which has been prefix-stripped by pyarrow.fs.FileSystem.from_uri and
can be joined with `os.path.join`.
can be joined with `Path(...).as_posix()`.
2. *_local_path: The path on the local filesystem where results are saved to
before persisting to storage.
Expand Down Expand Up @@ -412,7 +413,7 @@ class StorageContext:
pyarrow.fs.copy_files(
local_dir,
os.path.join(storage.trial_fs_path, "subdir"),
Path(storage.trial_fs_path, "subdir").as_posix(),
destination_filesystem=storage.filesystem
)
"""
Expand Down Expand Up @@ -490,18 +491,18 @@ def _create_validation_file(self):
storage path to verify that the storage path can be written to.
This validation file is also used to check whether the storage path is
accessible by all nodes in the cluster."""
valid_file = os.path.join(
valid_file = Path(
self.experiment_fs_path, _VALIDATE_STORAGE_MARKER_FILENAME
)
).as_posix()
self.storage_filesystem.create_dir(self.experiment_fs_path)
with self.storage_filesystem.open_output_stream(valid_file):
pass

def _check_validation_file(self):
"""Checks that the validation file exists at the storage path."""
valid_file = os.path.join(
valid_file = Path(
self.experiment_fs_path, _VALIDATE_STORAGE_MARKER_FILENAME
)
).as_posix()
if not _exists_at_fs_path(fs=self.storage_filesystem, fs_path=valid_file):
raise RuntimeError(
f"Unable to set up cluster storage with the following settings:\n{self}"
Expand Down
6 changes: 4 additions & 2 deletions python/ray/train/base_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,8 @@ def training_loop(self):
"`trainer.fit()`."
)
fs, fs_path = get_fs_and_path(path, storage_filesystem)
with fs.open_input_file(os.path.join(fs_path, _TRAINER_PKL)) as f:
trainer_pkl_path = Path(fs_path, _TRAINER_PKL).as_posix()
with fs.open_input_file(trainer_pkl_path) as f:
trainer_cls, param_dict = pickle.loads(f.readall())

if trainer_cls is not cls:
Expand Down Expand Up @@ -392,7 +393,8 @@ def can_restore(
bool: Whether this path exists and contains the trainer state to resume from
"""
fs, fs_path = get_fs_and_path(path, storage_filesystem)
return _exists_at_fs_path(fs, os.path.join(fs_path, _TRAINER_PKL))
trainer_pkl_path = Path(fs_path, _TRAINER_PKL).as_posix()
return _exists_at_fs_path(fs, trainer_pkl_path)

def __repr__(self):
# A dictionary that maps parameters to their default values.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
import os
import shutil
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Iterator, Optional, Type

Expand Down Expand Up @@ -78,7 +78,7 @@ def on_save(self, args, state, control, **kwargs):

# Copy ckpt files and construct a Ray Train Checkpoint
source_ckpt_path = transformers.trainer.get_last_checkpoint(args.output_dir)
target_ckpt_path = os.path.join(tmpdir, self.CHECKPOINT_NAME)
target_ckpt_path = Path(tmpdir, self.CHECKPOINT_NAME).as_posix()
shutil.copytree(source_ckpt_path, target_ckpt_path)
checkpoint = Checkpoint.from_directory(tmpdir)

Expand Down
6 changes: 3 additions & 3 deletions python/ray/train/lightgbm/lightgbm_checkpoint.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os
import tempfile
from pathlib import Path
from typing import TYPE_CHECKING, Optional

import lightgbm
Expand Down Expand Up @@ -45,7 +45,7 @@ def from_model(
>>> checkpoint = LightGBMCheckpoint.from_model(model.booster_)
"""
tempdir = tempfile.mkdtemp()
booster.save_model(os.path.join(tempdir, cls.MODEL_FILENAME))
booster.save_model(Path(tempdir, cls.MODEL_FILENAME).as_posix())

checkpoint = cls.from_directory(tempdir)
if preprocessor:
Expand All @@ -57,5 +57,5 @@ def get_model(self) -> lightgbm.Booster:
"""Retrieve the LightGBM model stored in this checkpoint."""
with self.as_directory() as checkpoint_path:
return lightgbm.Booster(
model_file=os.path.join(checkpoint_path, self.MODEL_FILENAME)
model_file=Path(checkpoint_path, self.MODEL_FILENAME).as_posix()
)
10 changes: 6 additions & 4 deletions python/ray/train/lightgbm/lightgbm_trainer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import os
from pathlib import Path
from typing import Any, Dict, Union

import lightgbm
Expand Down Expand Up @@ -103,9 +103,9 @@ def get_model(checkpoint: Checkpoint) -> lightgbm.Booster:
"""Retrieve the LightGBM model stored in this checkpoint."""
with checkpoint.as_directory() as checkpoint_path:
return lightgbm.Booster(
model_file=os.path.join(
model_file=Path(
checkpoint_path, LightGBMCheckpoint.MODEL_FILENAME
)
).as_posix()
)

def _train(self, **kwargs):
Expand All @@ -115,7 +115,9 @@ def _load_checkpoint(self, checkpoint: Checkpoint) -> lightgbm.Booster:
return self.__class__.get_model(checkpoint)

def _save_model(self, model: lightgbm.LGBMModel, path: str):
model.booster_.save_model(os.path.join(path, LightGBMCheckpoint.MODEL_FILENAME))
model.booster_.save_model(
Path(path, LightGBMCheckpoint.MODEL_FILENAME).as_posix()
)

def _model_iteration(
self, model: Union[lightgbm.LGBMModel, lightgbm.Booster]
Expand Down
7 changes: 4 additions & 3 deletions python/ray/train/lightning/_lightning_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import shutil
import tempfile
from pathlib import Path
from typing import Any, Dict

import torch
Expand Down Expand Up @@ -233,15 +234,15 @@ def __init__(self) -> None:
super().__init__()
self.trial_name = train.get_context().get_trial_name()
self.local_rank = train.get_context().get_local_rank()
self.tmpdir_prefix = os.path.join(tempfile.gettempdir(), self.trial_name)
self.tmpdir_prefix = Path(tempfile.gettempdir(), self.trial_name).as_posix()
if os.path.isdir(self.tmpdir_prefix) and self.local_rank == 0:
shutil.rmtree(self.tmpdir_prefix)

record_extra_usage_tag(TagKey.TRAIN_LIGHTNING_RAYTRAINREPORTCALLBACK, "1")

def on_train_epoch_end(self, trainer, pl_module) -> None:
# Creates a checkpoint dir with fixed name
tmpdir = os.path.join(self.tmpdir_prefix, str(trainer.current_epoch))
tmpdir = Path(self.tmpdir_prefix, str(trainer.current_epoch)).as_posix()
os.makedirs(tmpdir, exist_ok=True)

# Fetch metrics
Expand All @@ -253,7 +254,7 @@ def on_train_epoch_end(self, trainer, pl_module) -> None:
metrics["step"] = trainer.global_step

# Save checkpoint to local
ckpt_path = os.path.join(tmpdir, self.CHECKPOINT_NAME)
ckpt_path = Path(tmpdir, self.CHECKPOINT_NAME).as_posix()
trainer.save_checkpoint(ckpt_path, weights_only=False)

# Report to train session
Expand Down
7 changes: 4 additions & 3 deletions python/ray/train/tensorflow/tensorflow_checkpoint.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import shutil
import tempfile
from pathlib import Path
from typing import TYPE_CHECKING, Optional

import tensorflow as tf
Expand Down Expand Up @@ -57,7 +58,7 @@ def from_model(
"""
tempdir = tempfile.mkdtemp()
filename = "model.keras"
model.save(os.path.join(tempdir, filename))
model.save(Path(tempdir, filename).as_posix())

checkpoint = cls.from_directory(tempdir)
if preprocessor:
Expand Down Expand Up @@ -90,7 +91,7 @@ def from_h5(
)
tempdir = tempfile.mkdtemp()
filename = os.path.basename(file_path)
new_checkpoint_file = os.path.join(tempdir, filename)
new_checkpoint_file = Path(tempdir, filename).as_posix()
shutil.copy(file_path, new_checkpoint_file)

checkpoint = cls.from_directory(tempdir)
Expand Down Expand Up @@ -150,5 +151,5 @@ def get_model(
)
model_filename = metadata[self.MODEL_FILENAME_KEY]
with self.as_directory() as checkpoint_dir:
model_path = os.path.join(checkpoint_dir, model_filename)
model_path = Path(checkpoint_dir, model_filename).as_posix()
return keras.models.load_model(model_path)
7 changes: 4 additions & 3 deletions python/ray/train/torch/torch_checkpoint.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import tempfile
import warnings
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, Optional

import torch
Expand Down Expand Up @@ -82,7 +83,7 @@ def create_model() -> nn.Module:
"""
tempdir = tempfile.mkdtemp()

model_path = os.path.join(tempdir, cls.MODEL_FILENAME)
model_path = Path(tempdir, cls.MODEL_FILENAME).as_posix()
stripped_state_dict = consume_prefix_in_state_dict_if_present_not_in_place(
state_dict, "module."
)
Expand Down Expand Up @@ -140,7 +141,7 @@ def from_model(
"""
tempdir = tempfile.mkdtemp()

model_path = os.path.join(tempdir, cls.MODEL_FILENAME)
model_path = Path(tempdir, cls.MODEL_FILENAME).as_posix()
torch.save(model, model_path)

checkpoint = cls.from_directory(tempdir)
Expand All @@ -157,7 +158,7 @@ def get_model(self, model: Optional[torch.nn.Module] = None) -> torch.nn.Module:
``model``. Otherwise, the model will be discarded.
"""
with self.as_directory() as tempdir:
model_path = os.path.join(tempdir, self.MODEL_FILENAME)
model_path = Path(tempdir, self.MODEL_FILENAME).as_posix()
if not os.path.exists(model_path):
raise RuntimeError(
"`model.pt` not found within this checkpoint. Make sure you "
Expand Down
6 changes: 3 additions & 3 deletions python/ray/train/xgboost/xgboost_checkpoint.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os
import tempfile
from pathlib import Path
from typing import TYPE_CHECKING, Optional

import xgboost
Expand Down Expand Up @@ -51,7 +51,7 @@ def from_model(
"""
tmpdir = tempfile.mkdtemp()
booster.save_model(os.path.join(tmpdir, cls.MODEL_FILENAME))
booster.save_model(Path(tmpdir, cls.MODEL_FILENAME).as_posix())

checkpoint = cls.from_directory(tmpdir)
if preprocessor:
Expand All @@ -62,5 +62,5 @@ def get_model(self) -> xgboost.Booster:
"""Retrieve the XGBoost model stored in this checkpoint."""
with self.as_directory() as checkpoint_path:
booster = xgboost.Booster()
booster.load_model(os.path.join(checkpoint_path, self.MODEL_FILENAME))
booster.load_model(Path(checkpoint_path, self.MODEL_FILENAME).as_posix())
return booster
6 changes: 3 additions & 3 deletions python/ray/tune/analysis/experiment_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ def _fetch_trial_dataframe(self, trial: Trial) -> DataFrame:
if trial.last_result is None:
return DataFrame()

json_fs_path = os.path.join(trial.storage.trial_fs_path, EXPR_RESULT_FILE)
csv_fs_path = os.path.join(trial.storage.trial_fs_path, EXPR_PROGRESS_FILE)
json_fs_path = Path(trial.storage.trial_fs_path, EXPR_RESULT_FILE).as_posix()
csv_fs_path = Path(trial.storage.trial_fs_path, EXPR_PROGRESS_FILE).as_posix()
# Prefer reading the JSON if it exists.
if _exists_at_fs_path(trial.storage.storage_filesystem, json_fs_path):
with trial.storage.storage_filesystem.open_input_stream(json_fs_path) as f:
Expand Down Expand Up @@ -223,7 +223,7 @@ def _find_newest_experiment_checkpoint(
if not matching:
return None
filename = max(matching)
return os.path.join(experiment_fs_path, filename)
return Path(experiment_fs_path, filename).as_posix()

@property
def experiment_path(self) -> str:
Expand Down

0 comments on commit 30653f1

Please sign in to comment.