Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[air] pyarrow.fs persistence: Introduce StorageContext and use it for driver syncing (1/n) #37690

Merged

Conversation

justinvyu
Copy link
Contributor

@justinvyu justinvyu commented Jul 23, 2023

Why are these changes needed?

This PR introduces a StorageContext class that holds onto syncing related parameters and serves as the source of truth for paths such as the storage, experiment, and trial paths as well as their local counterparts. The PR mostly consists of piping this StorageContext throughout Tune, and using it for driver syncing.

Notes

One major change that the prototype introduces is making it so that syncing always happens whenever a storage_path is set. For example, if specifying an NFS storage_path:

storage_path = "/mnt/nfs"

# Before, this would be get resolved as:
# local_path, remote_path = "/mnt/nfs", None
# This is because we view the storage path as a local path and handle it differently than a URI storage path.

# New behavior:
# local_path, remote_path = "~/ray_results", "/mnt/nfs"

So, in many places in the code, the concepts of syncing from local_path -> remote_path now becomes syncing from local_cache_path -> storage_path. Whenever a storage path is set, this is now handled with the "cloud checkpointing" codepath from before.

One consequence of this is that specifying a local directory as the storage_path will now create 2 copies of the experiment directory. The way to get around this is to set the cache path instead using the environment variable. I don't think this way of configuring a local / NFS dir is very nice, so maybe it makes sense to re-introduce a "local_dir" and "sync_dir" concept to make this behavior more explicit.

Reference

This PR is mostly based off of the prototype here: #36969

  • I'm splitting the prototype up into two parts to make it easier for review, and to fill in the gaps one component at a time.
  • What are the differences between the prototype and this PR?
    • Still use our ray.air._internal.remote_storage utils, including get_fs_and_path instead of pyarrow.fs.FileSystem.from_uri. Allow ray.air._internal.remote_storage utils to take in a custom storage filesystem.
    • Centralize the paths (experiment path, trial path, etc.) in the storage context.
    • Use the storage context to perform driver syncing.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>

Missing arg

Signed-off-by: Justin Yu <justinvyu@anyscale.com>

Fix typehint

Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Comment on lines 66 to 69
self.storage_filesystem = storage_filesystem
if is_uri(self.storage_path):
raise ValueError("TODO")
self.storage_fs_path = self.storage_path
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you use a custom pyarrow filesystem, then the user needs to pass in an already stripped path, rather than a URI.

(storage_filesystem=CustomS3FileSystem(), storage_path="bucket/a/b/c")
# Cannot use "s3://bucket/a/b/c", since pyarrow.fs.copy_files will complain about a URI being present

The default case uses pyarrow.fs.from_uri to strip the prefix from the path:

(storage_filesystem=None, storage_path="s3://bucket/a/b/c")
# from_uri(storage_path) -> (DefaultS3FileSystem(), "bucket/a/b/c")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Is this the desired way to specify a storage path, in the case of a custom filesystem?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does normalize_path() fix this? https://arrow.apache.org/docs/python/generated/pyarrow.fs.FileSystem.html#pyarrow.fs.FileSystem.normalize_path

Not sure I see any other APIs in arrow that would resolve the URI relative to a filesystem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option is we can parse URIs and strip the prefix in the case of a custom storage filesystem being passed in.

Copy link
Contributor Author

@justinvyu justinvyu Jul 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I tried normalize and it doesn't strip the prefix.

I think stripping the prefix for the user with some urllib parse probably works in most cases, but there are some edge cases (I am worried that this logic is not robust enough). For example, for HDFS:

# for uri="hdfs://48bb8ca83706:8020/test":
# netloc="48bb8ca83706:8020/"
# netloc's information is taken into account from the pyarrow client.
# so path should not include netloc.
# On the other hand, for uri="s3://my_bucket/test":
# netloc="my_bucket/" and path="test/"
# netloc's information is not part of the pyarrow client.
# so path should include netloc information hence the concatenation.
if uri.startswith("hdfs://"):
path = parsed.path
else:
path = parsed.netloc + parsed.path

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, probably safest to not doing any special parsing then. We can document that users should not specify the URI prefix when using a custom storage filesystem.

(
self.storage_filesystem,
self.storage_fs_path,
) = get_fs_and_path(self.storage_path)
Copy link
Contributor Author

@justinvyu justinvyu Jul 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I stick with get_fs_and_path and the utils in ray.air._internal.remote_storage, since there are many fixes in there that are worthwhile to keep (ex: hdfs uri parsing, pyarrow bug causing uploads of many files to hang forever, as well as bad performance from pyarrow defaults).

There may be a bug in windows path parsing that needs to be fixed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's discuss. I think we need to take a strong stand here, and simplify how this works.

At a high level, Ray Data is working fine with pyarrow filesystem URI resolution, and I don't see why Train needs to diverge here. I doubt the assumptions made before are necessarily valid now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, now that I'm looking at this HDFS URI parsing again, the only reason this is needed is because we cache the pyarrow filesystem and then try to perform the prefix stripping ourselves on the next filesystem operation (using the cached one). We can simplify / remove this.

The one thing that I think we should keep from get_fs_and_path is replacing the default s3/gs syncers with the corresponding fsspec implementations. See #34663

In particular, the default pyarrow S3FileSystem has many issues:

  1. It has a bug that causes uploads of many files to hang forever when using multiple threads to upload. So, we need to manually limit this to use a single thread.
  2. It's slow due to 1 (see the linked PR for performance comparisons).

So, my new imagined get_fs_and_path:

if fsspec installed:
    return fsspec s3fs/gs filesystem from URI

return pyarrow.fs.from_uri(uri)

Copy link
Contributor

@ericl ericl Jul 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'm aware of the benchmarks. We're talking about what looks like ~40% performance improvement, however, for an operation that is typically infrequent (<1% of training overhead, given reasonably configured checkpoint intervals).

I'd like us to drop the fsspec dependency altogether here and standardize, given that this is also likely to be fixed in the future. Particularly performance sensitive users can pass in a custom filesystem.

Comment on lines 144 to 146
def init_shared_storage_context(storage_context: StorageContext):
global _storage_context
_storage_context = storage_context
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A global context is used in a follow-up PR, and it was the easiest way for me to pipe the same storage context from the Tune Trainable to the Trainer.

Comment on lines 158 to 159
self._local_experiment_path = storage.experiment_cache_path
self._remote_experiment_path = storage.experiment_fs_path
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: These concepts of "local" and "remote" should be rebranded as "cache_path" and "storage_fs_path".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the storage case, shall we avoid assigning local attributes like this and keep them in the storage context only?

E.g., set local_experiment_path=None, etc.

I realize this may require some more code changes, but this is probably a necessary refactoring.

# local vs. remote storage paths and should be reworked in a follow-up.
_get_defaults_results_dir()
if _use_storage_context()
else run_config.storage_path,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: Even though the old path takes in a storage_path, the get_experiment_checkpoint_dir method resolves the storage path and only takes the "local" version, which is the cache path in the case of cloud storage.

@justinvyu justinvyu requested a review from krfricke July 24, 2023 07:10
@justinvyu justinvyu marked this pull request as ready for review July 24, 2023 18:06
@ericl ericl self-assigned this Jul 24, 2023
Comment on lines 66 to 69
self.storage_filesystem = storage_filesystem
if is_uri(self.storage_path):
raise ValueError("TODO")
self.storage_fs_path = self.storage_path
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does normalize_path() fix this? https://arrow.apache.org/docs/python/generated/pyarrow.fs.FileSystem.html#pyarrow.fs.FileSystem.normalize_path

Not sure I see any other APIs in arrow that would resolve the URI relative to a filesystem.

self,
storage_path: str,
sync_config: SyncConfig,
experiment_dir_name: str,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we always know the experiment_dir_name when this context object is first created?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment, I create this storage context after the experiment dir name has been resolved.

The whole experiment directory name generation code is pretty complex, so I just left that stuff alone for now.

python/ray/train/_internal/storage.py Show resolved Hide resolved
python/ray/train/_internal/storage.py Outdated Show resolved Hide resolved
python/ray/train/_internal/storage.py Outdated Show resolved Hide resolved
python/ray/train/_internal/storage.py Show resolved Hide resolved
Comment on lines 158 to 159
self._local_experiment_path = storage.experiment_cache_path
self._remote_experiment_path = storage.experiment_fs_path
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the storage case, shall we avoid assigning local attributes like this and keep them in the storage context only?

E.g., set local_experiment_path=None, etc.

I realize this may require some more code changes, but this is probably a necessary refactoring.

@ericl ericl added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Jul 24, 2023
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
@justinvyu justinvyu requested a review from ericl July 26, 2023 21:01
return delete_at_uri, dict(uri=uri)


class _FilesystemSyncer(_BackgroundSyncer):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's the new syncer that doesn't depend on remote_storage.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move this class to the storage.py file?

@justinvyu justinvyu removed the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Jul 26, 2023
@justinvyu
Copy link
Contributor Author

@ericl Thanks for the suggestions! Took another pass, let me know what you think.

Copy link
Contributor

@ericl ericl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok close enough. Let's merge and iterate. Please revert the changes to remote_storage.py and copy paste any necessary utils to storage.py as commented before merging.

I really don't want to entangle old and new code paths.

Signed-off-by: Justin Yu <justinvyu@anyscale.com>

Remove fs_utils

Signed-off-by: Justin Yu <justinvyu@anyscale.com>
@justinvyu justinvyu added the tests-ok The tagger certifies test failures are unrelated and assumes personal liability. label Jul 27, 2023
@justinvyu
Copy link
Contributor Author

@ericl Should be good to merge now.

@ericl ericl merged commit 56b5270 into ray-project:master Jul 27, 2023
78 of 83 checks passed
@ericl
Copy link
Contributor

ericl commented Jul 27, 2023

Merged, thanks!

@justinvyu justinvyu deleted the air/persistence/storage_context_driver branch July 27, 2023 00:37
NripeshN pushed a commit to NripeshN/ray that referenced this pull request Aug 15, 2023
… for driver syncing (1/n) (ray-project#37690)

Signed-off-by: NripeshN <nn2012@hw.ac.uk>
harborn pushed a commit to harborn/ray that referenced this pull request Aug 17, 2023
… for driver syncing (1/n) (ray-project#37690)

Signed-off-by: harborn <gangsheng.wu@intel.com>
harborn pushed a commit to harborn/ray that referenced this pull request Aug 17, 2023
arvind-chandra pushed a commit to lmco/ray that referenced this pull request Aug 31, 2023
… for driver syncing (1/n) (ray-project#37690)

Signed-off-by: e428265 <arvind.chandramouli@lmco.com>
vymao pushed a commit to vymao/ray that referenced this pull request Oct 11, 2023
… for driver syncing (1/n) (ray-project#37690)

Signed-off-by: Victor <vctr.y.m@example.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
tests-ok The tagger certifies test failures are unrelated and assumes personal liability.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants