-
Notifications
You must be signed in to change notification settings - Fork 426
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Storage] Adding new storage mode CSYNC #2336
[Storage] Adding new storage mode CSYNC #2336
Conversation
merge master
merge master
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding this feature @landscapepainter! This is very exciting. Could you merge the latest master branch? I will try this PR with our Vicuna examples. Left several questions.
sky/data/skystorage.py
Outdated
def set_s3_sync_cmd(src_path: str, bucketname: str, num_threads: int, | ||
delete: bool, no_follow_symlinks: bool): | ||
"""Builds sync command for aws s3""" | ||
config_cmd = ('aws configure set default.s3.max_concurrent_requests ' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems not only applied to the following commands but other s3 related commands as well. For example, it may affect the mount mode and copy mode. Is there a way to limit the change to the current command only?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In COPY
mode, this method remains unaffected because _execute_storage_csync
is executed after all files required for the COPY
mode have been downloaded to the remote node.
Our MOUNT
mode for s3 uses goofys
, and goofys
does not use aws cli config. It only shares the credentials.
Unfortunately, aws cli does not support direct argument for increasing concurrency. So I set a safegaurd ar run_sync
method to set back the config to default value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In COPY mode, this method remains unaffected because _execute_storage_csync is executed after all files required for the COPY mode have been downloaded to the remote node.
This is not true if a user runs sky launch
on the same cluster twice right?
Resetting it after every run_sync
sounds good to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What will happen if two such commands run simultaneously? IIUC the second will cover the modification for the first command?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cblmemo You are right. If there are more than one CSYNC for S3 running in the cluster, and if those two happens to have different number of threads set and run at the same time when config for the second run is set before the sync is ran for the first run, the second one's modification will cover the first one's command. But I don't think there would be any good reason for users to set different number of threads for different daemon running in the cluster. Perhaps I can document the expected behavior?
sky/data/skystorage.py
Outdated
except subprocess.CalledProcessError: | ||
if max_retries > 0: | ||
#TODO: display the error with remaining # of retries | ||
wait_time = interval / 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the wait time set to interval / 2
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This error handling originates from the error gsutil rsync
raises when training script is writing to the mounted directory and syncing from gsutil rsync
occurs at the same time. As the file size gsutil rsync
is uploading changes while the upload, the error occurs. This is simply resolved by rerunning the syncing command. As the error usually occurs after some progress on sync, having wait_time
set to interval
would be too long, so had it set to half of it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It sounds like a heuristic, can we add some comments in the code here? Also, is it better to use a normal backoff for retry, or using interval / 2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally, interval should be set to the amount of time that would take for the sync
tool to upload the complete list of checkpoint, which allows to keep the storage and the node synced as much as possible without overlapped launch of the sync commands. For example, it took ~900ish seconds for gsutil rsync
to sync 60GB of checkpoints from our original Vicuna example. So it would be nice to keep the interval as 900s in this case.
As the error we are trying to handle here with the retry occurs when coincidentally training script is writing to the mounted directory and syncing from gsutil rsync
occurs at the same time, it can take some time for this already running sync process to end. So I updated with a Backoff
object from common_utils
with interval/2
of initial backoff.
@cblmemo Finally, this is ready for a look! Majority of the logic updates are added to For launching, on top of the original sync daemon, there are two FUSE processes launched: storage mount on the csync_read_path, and For terminating, we originally checked if there's a sync process running, and terminated the sync daemon if there isn't any sync process running. Now, on top of that, the termination process sequentially unmounts the FUSE processes, and then terminates the sync daemon. Update: e2e testing with checksum vicuna example, #2432 , runs correctly. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this amazing work!! Left some comments and mostly nits. Will take a look on https://github.com/landscapepainter/libfuse/ soon.
return storage_mount_script | ||
|
||
|
||
def _handle_fuse_process(fuse_cmd: str) -> Tuple[int, int, str]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
docstr; especially for the return value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually why not use sky/utils/subprocess_utils.py::run
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't seem to have a good way to obtain the pid if used with subprocess.run()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I'm a little bit confused about why we need the pid here. IIRC the with subprocess.Popen
will wait the process to finish and then enter the following statements. Why do we need a pid that is already finished?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding the fuse file!! left some comments
sky/data/csync/redirect-fuse.c
Outdated
|
||
free(read_path); | ||
free(write_path); | ||
set_destroy(&set); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this set? is it possible that under one dir there are two files with same name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is possible to have two files with same name in each directories.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sry can you elaborate a little bit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So xmp_readdir
file operation is called when ls
is ran at the mountpoint. And we design this in a way that ls
displays all the files/directories from both read directory(mounted by cloud storage) and write directory(the write cache). And if there's a file with identical names in both directories, we only want to display it once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not only display the read dir? IIUC it should include every file in the write dir?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh is this for the case when there are some new write files in the write dir that have not been synced to the cloud?
This PR is stale because it has been open 120 days with no activity. Remove stale label or comment or this will be closed in 10 days. |
This PR was closed because it has been stalled for 10 days with no activity. |
This closes #1862
When using managed spot job for training, it's necessary to save the checkpoints in the cloud storage as we want the training to recover from the checkpoint when preempted. And this requires the application to store the checkpoints in the cloud storage. However, writing the checkpoints to a cloud storage mounted directory takes a long time, especially for large models like LLM, and this stalls the training becoming the bottleneck.
To mitigate this issue, this PR adds a new storage mode,
CSYNC
, where it runs a daemon to run syncing commands periodically at the specified remote path. This way, the training application can write the checkpoints to an unmounted directory and the checkpoints get synced asynchronously., which makes the process much quicker.When a cluster running sync command laucnhed by
csync
is attempted to be stopped withsky stop
/sky down
, then it waits until the sync is completed before proceeding to stop/down the cluster.Note:
backend_utils.wait_and_terminate_csync()
withincli.py/_down_or_stop()
can be implemented smoothly with storage info in cluster table's metadata. Will implement this in a separate issuebackend_utils.wait_and_terminate_csync()
does not supportautostop
/autodown
yet. Will be implemented after this PR is mergedTODO:
_execute_storage_syncs
to setupmode: CSYNC
sky stop
/down
/autostop
/autodown
withcluster
'smetadata
mode: CSYNC
Tested (run the relevant ones):
bash format.sh
CSYNC
modeCSYNC
mode on a subdirectory of a bucket as asource
sky stop
on a cluster running sync command launched bycsync
CSYNC
mode to confirm if training recovers correctly from checkpoints.pytest tests/test_smoke.py::TestStorageWithCredentials
pytest tests/test_smoke.py::test_aws_storage_mounts_with_stop --aws
pytest tests/test_smoke.py::test_gcp_storage_mounts_with_stop --gcp