Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[air] pyarrow.fs persistence (4/n): Introduce a simplified checkpoint manager #37962

Merged

Conversation

justinvyu
Copy link
Contributor

@justinvyu justinvyu commented Aug 1, 2023

Why are these changes needed?

This PR condenses the functionality of ray.air._internal.checkpoint_manager into a few simplified classes:

  1. Introduces a new _TrainingResult in place of the old _TrackedCheckpoint, which holds onto a (checkpoint, metrics) checkpoint result reported by the user.
  2. The logic of _CheckpointManager is stripped down to only handle the top K heap of checkpoints. This includes deleting the bottom checkpoints.
    • Previously, the old _CheckpointManager also handled "committing" checkpoints to disk. This was mostly for in-memory checkpoints, which don't apply to the new persistence mode. This behavior has been removed.
    • The implementation of delete_fn is now standardized using the pyarrow fs.

This is a prerequisite cleanup for #37888, which shouldn't deal with the old _TrackedCheckpoint interface anymore. The old _TrackedCheckpoint holds onto the node that a checkpoint lives on + implements "commit" / "delete" and has a bunch of other functionality that doesn't apply anymore.

Future cleanup / considerations

  • We should remove ray.air._internal.checkpoint_manager, ray.train._internal.checkpoint, and ray.tune.execution.checkpoint_manager.
  • This checkpoint manager can also be moved from the driver to the trial process. (That is the direction we're trying to head with the experiment layering effort.)

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
@@ -0,0 +1,191 @@
from pathlib import Path
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was adapted from ray/air/test_checkpoint_manager.py

@@ -0,0 +1,218 @@
import heapq
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was adapted from ray/air/_internal/checkpoint_manager.py

and len(self._top_k_checkpoints) > self._checkpoint_config.num_to_keep
):
worst_checkpoint = heapq.heappop(self._top_k_checkpoints)
self._maybe_delete_checkpoint(worst_checkpoint.tracked_checkpoint)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not cleanup checkpoints right away instead of deferring?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is behavior from before: we always keep the latest checkpoint around, even if it's not a top k checkpoint. Restoration from fault tolerance restores from the latest checkpoint at the moment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ericl Are you ok with keeping the existing functionality for now? This test walks through an example.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I'm more talking about the implementation. I think now's a good time to clean up some of this code. I believe we can replace this class with the following, which is much simpler and unlikely to be much slower:

class _CheckpointManager:

    def __init__(
        self,
        checkpoint_config: Optional[CheckpointConfig],
        latest_checkpoint_index: int = 0,
    ):
        self._checkpoint_config = checkpoint_config or CheckpointConfig()
        self._checkpoints: List[_TrackedCheckpoint)] = []
        self._latest_checkpoint_index = latest_checkpoint_index

    def register_checkpoint(self, tracked_checkpoint: _TrackedCheckpoint):
        self._checkpoints.append(tracked_checkpoint)

        if self._checkpoint_config.num_to_keep:
            # Could cache score as optimization if we wanted
            candidates = sorted(self._checkpoints[:-1], key=self._get_checkpoint_score)
            while candidates and len(self._checkpoints) > self._checkpoint_config.num_to_keep:
                checkpoint = candidates.pop(0).checkpoint
                _delete_fs_path(fs=checkpoint.filesystem, fs_path=checkpoint.path)

    @property
    def best_checkpoint(self) -> Optional[_TrackedCheckpoint]:
        if not self._checkpoints:
            return None
        return sorted(self._checkpoints, key=self._get_checkpoint_score)[-1]

    @property
    def latest_checkpoint(self) -> Optional[_TrackedCheckpoint]:
        if not self._checkpoints:
            return None
        return self._checkpoints[-1]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, simplified it in a slightly different way that doesn't need to sort the list. PTAL

There was also a slight behavioral change from above: the K+1th checkpoint wouldn't be deleted if the best checkpoint is always excluded from the candidates.

@ericl ericl added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Aug 1, 2023
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
@justinvyu justinvyu removed the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Aug 1, 2023
@justinvyu justinvyu requested a review from ericl August 1, 2023 20:51
@justinvyu
Copy link
Contributor Author

@ericl Any thoughts on a new name for _TrackedCheckpoint? I'm using it in the next PR and aliasing it to _NewTrackedCheckpoint, but I think this name is confusing, and it's clearer if we just choose a new name for it.

I'm thinking of changing it to TrainingResult, since it's an object that holds the (metrics, checkpoint) reported from the user:

  • We can use this in place of ray.train._internal.session.TrainingResult (which is pretty much the same thing).
  • This is in line with unifying the 2 session reports.
  • This is also in line with combining the train and save steps in the Tune control loop.

@ericl
Copy link
Contributor

ericl commented Aug 1, 2023 via email

Signed-off-by: Justin Yu <justinvyu@anyscale.com>
@justinvyu justinvyu added the tests-ok The tagger certifies test failures are unrelated and assumes personal liability. label Aug 2, 2023
@ericl ericl merged commit f19e027 into ray-project:master Aug 2, 2023
48 of 54 checks passed
@justinvyu justinvyu deleted the air/persistence/simplified_ckpt_manager branch August 2, 2023 18:17
ericl pushed a commit that referenced this pull request Aug 3, 2023
…ection (#37888)

This PR:
1. Uses the storage context to upload the new `ray.train.Checkpoint` (from #37925)
directly from the Train worker.
2. Gets checkpoint reporting to work in the save direction, simplifying the checkpoint handling logic to avoid the Train `CheckpointManager` and use as single, simplified checkpoint manager (from #37962).
3. Updates the e2e test to check for worker-uploaded checkpoints.

### Follow-ups needed

1. `Trial` path resolution is still messed up (using the legacy path), causing some issues with the custom fs test case. That test case skips some assertions at the moment. This fix is up next.
2. Trial restoration is explicitly disabled at the moment. This is up next as well.
3. Artifacts are currently being synced by the driver due to the train worker living on the same node, which is why it passes in the test case. This upload should be done from the worker, and the test case should be updated to check that.
4. The `on_checkpoint` hook for `tune.Callback` takes in a `_TrackedCheckpoint`. Currently, I skip invoking the callbacks -- TBD what to expose to the user callbacks here.
5. Checkpoints cannot be ordered based on auto-filled metrics at the moment, only user specified metrics. Ex: `CheckpointConfig(checkpoint_score_attribute="training_iteration", mode="min")`
NripeshN pushed a commit to NripeshN/ray that referenced this pull request Aug 15, 2023
…nt manager (ray-project#37962)

Signed-off-by: NripeshN <nn2012@hw.ac.uk>
NripeshN pushed a commit to NripeshN/ray that referenced this pull request Aug 15, 2023
…ection (ray-project#37888)

This PR:
1. Uses the storage context to upload the new `ray.train.Checkpoint` (from ray-project#37925)
directly from the Train worker.
2. Gets checkpoint reporting to work in the save direction, simplifying the checkpoint handling logic to avoid the Train `CheckpointManager` and use as single, simplified checkpoint manager (from ray-project#37962).
3. Updates the e2e test to check for worker-uploaded checkpoints.

### Follow-ups needed

1. `Trial` path resolution is still messed up (using the legacy path), causing some issues with the custom fs test case. That test case skips some assertions at the moment. This fix is up next.
2. Trial restoration is explicitly disabled at the moment. This is up next as well.
3. Artifacts are currently being synced by the driver due to the train worker living on the same node, which is why it passes in the test case. This upload should be done from the worker, and the test case should be updated to check that.
4. The `on_checkpoint` hook for `tune.Callback` takes in a `_TrackedCheckpoint`. Currently, I skip invoking the callbacks -- TBD what to expose to the user callbacks here.
5. Checkpoints cannot be ordered based on auto-filled metrics at the moment, only user specified metrics. Ex: `CheckpointConfig(checkpoint_score_attribute="training_iteration", mode="min")`

Signed-off-by: NripeshN <nn2012@hw.ac.uk>
harborn pushed a commit to harborn/ray that referenced this pull request Aug 17, 2023
…nt manager (ray-project#37962)

Signed-off-by: harborn <gangsheng.wu@intel.com>
harborn pushed a commit to harborn/ray that referenced this pull request Aug 17, 2023
…ection (ray-project#37888)

This PR:
1. Uses the storage context to upload the new `ray.train.Checkpoint` (from ray-project#37925)
directly from the Train worker.
2. Gets checkpoint reporting to work in the save direction, simplifying the checkpoint handling logic to avoid the Train `CheckpointManager` and use as single, simplified checkpoint manager (from ray-project#37962).
3. Updates the e2e test to check for worker-uploaded checkpoints.

### Follow-ups needed

1. `Trial` path resolution is still messed up (using the legacy path), causing some issues with the custom fs test case. That test case skips some assertions at the moment. This fix is up next.
2. Trial restoration is explicitly disabled at the moment. This is up next as well.
3. Artifacts are currently being synced by the driver due to the train worker living on the same node, which is why it passes in the test case. This upload should be done from the worker, and the test case should be updated to check that.
4. The `on_checkpoint` hook for `tune.Callback` takes in a `_TrackedCheckpoint`. Currently, I skip invoking the callbacks -- TBD what to expose to the user callbacks here.
5. Checkpoints cannot be ordered based on auto-filled metrics at the moment, only user specified metrics. Ex: `CheckpointConfig(checkpoint_score_attribute="training_iteration", mode="min")`

Signed-off-by: harborn <gangsheng.wu@intel.com>
harborn pushed a commit to harborn/ray that referenced this pull request Aug 17, 2023
harborn pushed a commit to harborn/ray that referenced this pull request Aug 17, 2023
…ection (ray-project#37888)

This PR:
1. Uses the storage context to upload the new `ray.train.Checkpoint` (from ray-project#37925)
directly from the Train worker.
2. Gets checkpoint reporting to work in the save direction, simplifying the checkpoint handling logic to avoid the Train `CheckpointManager` and use as single, simplified checkpoint manager (from ray-project#37962).
3. Updates the e2e test to check for worker-uploaded checkpoints.

### Follow-ups needed

1. `Trial` path resolution is still messed up (using the legacy path), causing some issues with the custom fs test case. That test case skips some assertions at the moment. This fix is up next.
2. Trial restoration is explicitly disabled at the moment. This is up next as well.
3. Artifacts are currently being synced by the driver due to the train worker living on the same node, which is why it passes in the test case. This upload should be done from the worker, and the test case should be updated to check that.
4. The `on_checkpoint` hook for `tune.Callback` takes in a `_TrackedCheckpoint`. Currently, I skip invoking the callbacks -- TBD what to expose to the user callbacks here.
5. Checkpoints cannot be ordered based on auto-filled metrics at the moment, only user specified metrics. Ex: `CheckpointConfig(checkpoint_score_attribute="training_iteration", mode="min")`
arvind-chandra pushed a commit to lmco/ray that referenced this pull request Aug 31, 2023
…nt manager (ray-project#37962)

Signed-off-by: e428265 <arvind.chandramouli@lmco.com>
arvind-chandra pushed a commit to lmco/ray that referenced this pull request Aug 31, 2023
…ection (ray-project#37888)

This PR:
1. Uses the storage context to upload the new `ray.train.Checkpoint` (from ray-project#37925)
directly from the Train worker.
2. Gets checkpoint reporting to work in the save direction, simplifying the checkpoint handling logic to avoid the Train `CheckpointManager` and use as single, simplified checkpoint manager (from ray-project#37962).
3. Updates the e2e test to check for worker-uploaded checkpoints.

### Follow-ups needed

1. `Trial` path resolution is still messed up (using the legacy path), causing some issues with the custom fs test case. That test case skips some assertions at the moment. This fix is up next.
2. Trial restoration is explicitly disabled at the moment. This is up next as well.
3. Artifacts are currently being synced by the driver due to the train worker living on the same node, which is why it passes in the test case. This upload should be done from the worker, and the test case should be updated to check that.
4. The `on_checkpoint` hook for `tune.Callback` takes in a `_TrackedCheckpoint`. Currently, I skip invoking the callbacks -- TBD what to expose to the user callbacks here.
5. Checkpoints cannot be ordered based on auto-filled metrics at the moment, only user specified metrics. Ex: `CheckpointConfig(checkpoint_score_attribute="training_iteration", mode="min")`

Signed-off-by: e428265 <arvind.chandramouli@lmco.com>
vymao pushed a commit to vymao/ray that referenced this pull request Oct 11, 2023
…nt manager (ray-project#37962)

Signed-off-by: Victor <vctr.y.m@example.com>
vymao pushed a commit to vymao/ray that referenced this pull request Oct 11, 2023
…ection (ray-project#37888)

This PR:
1. Uses the storage context to upload the new `ray.train.Checkpoint` (from ray-project#37925)
directly from the Train worker.
2. Gets checkpoint reporting to work in the save direction, simplifying the checkpoint handling logic to avoid the Train `CheckpointManager` and use as single, simplified checkpoint manager (from ray-project#37962).
3. Updates the e2e test to check for worker-uploaded checkpoints.

### Follow-ups needed

1. `Trial` path resolution is still messed up (using the legacy path), causing some issues with the custom fs test case. That test case skips some assertions at the moment. This fix is up next.
2. Trial restoration is explicitly disabled at the moment. This is up next as well.
3. Artifacts are currently being synced by the driver due to the train worker living on the same node, which is why it passes in the test case. This upload should be done from the worker, and the test case should be updated to check that.
4. The `on_checkpoint` hook for `tune.Callback` takes in a `_TrackedCheckpoint`. Currently, I skip invoking the callbacks -- TBD what to expose to the user callbacks here.
5. Checkpoints cannot be ordered based on auto-filled metrics at the moment, only user specified metrics. Ex: `CheckpointConfig(checkpoint_score_attribute="training_iteration", mode="min")`

Signed-off-by: Victor <vctr.y.m@example.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
tests-ok The tagger certifies test failures are unrelated and assumes personal liability.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants