Skip to content

Commit

Permalink
[air] pyarrow.fs persistence: Introduce StorageContext and use it…
Browse files Browse the repository at this point in the history
… for driver syncing (1/n) (ray-project#37690)

Signed-off-by: e428265 <arvind.chandramouli@lmco.com>
  • Loading branch information
justinvyu authored and arvind-chandra committed Aug 31, 2023
1 parent e3edf82 commit 3bf2506
Show file tree
Hide file tree
Showing 12 changed files with 1,052 additions and 179 deletions.
19 changes: 17 additions & 2 deletions python/ray/air/_internal/uri_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ class URI:
's3://bucket/a?scheme=http&endpoint_override=localhost%3A900'
>>> s3_uri.parent.name, s3_uri.name
('bucket', 'a')
>>> local_path = URI("/tmp/local")
>>> str(local_path)
'/tmp/local'
>>> str(local_path.parent)
'/tmp'
>>> str(local_path / "b" / "c")
'/tmp/local/b/c'
Args:
uri: The URI to represent.
Expand All @@ -30,8 +37,10 @@ class URI:
def __init__(self, uri: str):
self._parsed = urllib.parse.urlparse(uri)
if not self._parsed.scheme:
raise ValueError(f"Invalid URI: {uri}")
self._path = Path(os.path.normpath(self._parsed.netloc + self._parsed.path))
# Just treat this as a regular path
self._path = Path(uri)
else:
self._path = Path(os.path.normpath(self._parsed.netloc + self._parsed.path))

@property
def name(self) -> str:
Expand Down Expand Up @@ -60,6 +69,8 @@ def __truediv__(self, path_to_append):
def _get_str_representation(
cls, parsed_uri: urllib.parse.ParseResult, path: Union[str, Path]
) -> str:
if not parsed_uri.scheme:
return str(path)
return parsed_uri._replace(netloc=str(path), path="").geturl()

def __repr__(self):
Expand All @@ -69,6 +80,10 @@ def __str__(self):
return self._get_str_representation(self._parsed, self._path)


def is_uri(path: str) -> bool:
return bool(urllib.parse.urlparse(path).scheme)


def _join_path_or_uri(base_path: str, path_to_join: str) -> str:
"""Joins paths to form either a URI (w/ possible URL params) or a local path.
Expand Down
3 changes: 3 additions & 0 deletions python/ray/air/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
Tuple,
)

import pyarrow.fs

from ray._private.storage import _get_storage_uri
from ray._private.thirdparty.tabulate.tabulate import tabulate
from ray.air.constants import WILDCARD_KEY
Expand Down Expand Up @@ -750,6 +752,7 @@ class RunConfig:

name: Optional[str] = None
storage_path: Optional[str] = None
storage_filesystem: Optional[pyarrow.fs.FileSystem] = None
callbacks: Optional[List["Callback"]] = None
stop: Optional[Union[Mapping, "Stopper", Callable[[str, Mapping], bool]]] = None
failure_config: Optional[FailureConfig] = None
Expand Down
8 changes: 8 additions & 0 deletions python/ray/train/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,14 @@ py_test(
deps = [":train_lib"]
)

py_test(
name = "test_new_persistence",
size = "small",
srcs = ["tests/test_new_persistence.py"],
tags = ["team:ml", "exclusive"],
deps = [":train_lib", ":conftest"]
)

py_test(
name = "test_predictor",
size = "small",
Expand Down
Loading

0 comments on commit 3bf2506

Please sign in to comment.