Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prioritize WorkersToDelete #208

Merged

Conversation

sriram-anyscale
Copy link
Collaborator

@sriram-anyscale sriram-anyscale commented Mar 22, 2022

Why are these changes needed?

There are multiple race conditions between the Ray Autoscaler and the Kuberay reconciliation loop that this PR addresses. For example, suppose the Autoscaler requests a downscale by reducing the number of replicas and specifies the workers to delete. And suppose that a worker pod independently dies before the Kuberay reconciliation loop runs. The current code will delete a random set of pods to meet the Replicas count and ignore WorkersToDelete.

This PR makes the reconciliation loop first delete all the named pods in WorkersToDelete, and then reconciles the remaining running pods to match Replicas (either way - scale up or down).

To verify that this change is compatible with all other components that work with Kuberay (e.g., Ray Autoscaler), the change is currently guarded by a feature flag - which needs to be set when Kuberay is started. This way we can test version compatibility.

There is a matching change in the Ray Autoscaler (ray-project/ray#23428). During testing we have to make sure that the before/after Ray Autoscaler works with the before/after Kuberay (essentially 4 combinations).

Related issue number

None

Checks

This PR is not ready to be merged right now. I need help on how to do the feature flag as well as how to run tests. Once I get past this, I will update the PR appropriately.

  • I've made sure the tests are passing.
  • Testing Strategy
    • Unit tests
    • Manual tests
    • This PR is not tested :(

…then adjust the total number of running pods to match Replicas
…then adjust the total number of running pods to match Replicas
…cale/kuberay into prioritize-workers-to-delete
@pcmoritz pcmoritz requested a review from Jeffwan March 22, 2022 19:05
@pcmoritz
Copy link
Collaborator

@Jeffwan Can you have a look at the PR and help resolve the questions?

@Jeffwan
Copy link
Collaborator

Jeffwan commented Mar 22, 2022

Sure. I will help review the change today

@asm582
Copy link
Contributor

asm582 commented Mar 23, 2022

Please excuse my ignorance but do we know how is WorkersToDelete list obtained? is it generated or supplied by the user?

@sriram-anyscale
Copy link
Collaborator Author

sriram-anyscale commented Mar 23, 2022 via email

@Jeffwan
Copy link
Collaborator

Jeffwan commented Mar 23, 2022

We will need to test version compatibility for all four cases

Please help list the cases. You mean autoscaler scale up/down with unexpected new/removed pods? (2 * 2)?

@@ -248,6 +248,35 @@ func (r *RayClusterReconciler) reconcilePods(instance *rayiov1alpha1.RayCluster)
}
}
diff := *worker.Replicas - int32(len(runningPods.Items))

//// SriramQ: How do I create a feature flag to guard the new functionality?
featureFlag := true
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good question. We didn't have global feature gate mechanism at this moment. Let's use this temporarily.

I create #211 for the feature gate implementation and discussion

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to do better - we should be able to provide this as a startup option. I think I know how to do this - please wait for my next code update.

@@ -262,7 +291,7 @@ func (r *RayClusterReconciler) reconcilePods(instance *rayiov1alpha1.RayCluster)
} else if diff == 0 {
log.Info("reconcilePods", "all workers already exist for group", worker.GroupName)
continue
} else if int32(len(runningPods.Items)) == (*worker.Replicas + int32(len(worker.ScaleStrategy.WorkersToDelete))) {
} else if -diff == int32(len(worker.ScaleStrategy.WorkersToDelete)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worker.ScaleStrategy.WorkersToDelete has been set to 0 in line 274 if the flag is true. I think you may want to assign the value to a different variable?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually no. You are correct that this is dead code when featureFlag is true. However, we need to retain existing behavior when featureFlag is false. Once we have finished testing and commit to the new logic, we will remove the featureFlag check and also delete this case from the if statement (as a followup PR).

// we need to scale down
workersToRemove := int32(len(runningPods.Items)) - *worker.Replicas
//// SriramQ: Isn't this too early? This does not consider the IsNotFound case (see below)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the code block 255-275, you can probably track the number of pods deleted (excluding NOT_FOUND).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My question is related to when featureFlag is false (meaning existing behavior). If you consider my scenario from an earlier comment where 5 of to 10 entries in WorkersToDelete are missing, randomlyRemoveWorkers is off by 5.

I do not want to change existing behavior (when featureFlag is false) - I am just asking as a clarifying question. When featureFlag is true, WorkersToDelete is empty - so everything will work fine.

ray-operator/controllers/raycluster_controller.go Outdated Show resolved Hide resolved
ray-operator/controllers/raycluster_controller.go Outdated Show resolved Hide resolved
r.Recorder.Eventf(instance, v1.EventTypeNormal, "Deleted", "Deleted pod %s", pod.Name)
}
//// SriramQ: Any difference between this and "worker.ScaleStrategy.WorkesToDelete = ..."
//// SriramQ: I assume this means that the operator is clearing WorkersToDelete in
//// UpdateStatus() - which means the clearing in the Autoscaler is redundant
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

em. How does autoscaler get accurate data to clear WorkersToDelete?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct here - the current Autoscaler logic is not perfect. I am removing that code to clear WorkersToDelete in the Autoscaler as a separate Ray PR.

I had actually assumed that Kuberay was not clearing WorkersToDelete when I saw the Autoscaler code, but then saw that it was in fact doing it. This is the right approach - glad that it is this way.

@sriram-anyscale
Copy link
Collaborator Author

@sriram-anyscale
Copy link
Collaborator Author

We will need to test version compatibility for all four cases

Please help list the cases. You mean autoscaler scale up/down with unexpected new/removed pods? (2 * 2)?

I mean featureFlag = true/false combined with before/after the Ray Autoscaler change in a PR I will have ready today.

@@ -262,7 +291,7 @@ func (r *RayClusterReconciler) reconcilePods(instance *rayiov1alpha1.RayCluster)
} else if diff == 0 {
log.Info("reconcilePods", "all workers already exist for group", worker.GroupName)
continue
} else if int32(len(runningPods.Items)) == (*worker.Replicas + int32(len(worker.ScaleStrategy.WorkersToDelete))) {
} else if -diff == int32(len(worker.ScaleStrategy.WorkersToDelete)) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually no. You are correct that this is dead code when featureFlag is true. However, we need to retain existing behavior when featureFlag is false. Once we have finished testing and commit to the new logic, we will remove the featureFlag check and also delete this case from the if statement (as a followup PR).

// we need to scale down
workersToRemove := int32(len(runningPods.Items)) - *worker.Replicas
//// SriramQ: Isn't this too early? This does not consider the IsNotFound case (see below)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My question is related to when featureFlag is false (meaning existing behavior). If you consider my scenario from an earlier comment where 5 of to 10 entries in WorkersToDelete are missing, randomlyRemoveWorkers is off by 5.

I do not want to change existing behavior (when featureFlag is false) - I am just asking as a clarifying question. When featureFlag is true, WorkersToDelete is empty - so everything will work fine.

r.Recorder.Eventf(instance, v1.EventTypeNormal, "Deleted", "Deleted pod %s", pod.Name)
}
//// SriramQ: Any difference between this and "worker.ScaleStrategy.WorkesToDelete = ..."
//// SriramQ: I assume this means that the operator is clearing WorkersToDelete in
//// UpdateStatus() - which means the clearing in the Autoscaler is redundant
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct here - the current Autoscaler logic is not perfect. I am removing that code to clear WorkersToDelete in the Autoscaler as a separate Ray PR.

I had actually assumed that Kuberay was not clearing WorkersToDelete when I saw the Autoscaler code, but then saw that it was in fact doing it. This is the right approach - glad that it is this way.

instance.Spec.WorkerGroupSpecs[index].ScaleStrategy.WorkersToDelete = []string{}

// remove the remaining pods not part of the scaleStrategy
i := 0
if int(randomlyRemovedWorkers) > 0 {
for _, randomPodToDelete := range runningPods.Items {
found := false
//// SriramQ: Isn't the following loop dead code - see my previous question
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to leave this code here - and will delete when we commit to the new logic and remove featureFlag. It's clearly dead code though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we expect nodes in WorkersToDelete to be re-used?, here is a scenario let's say the user downscaled the cluster, autoscaler with help of GCS drained the nodes, the nodes are idle but immediately user again upscaled the cluster. In such a scenario, do we intend to remove a few workers from workerToDelete?

sriram-anyscale added a commit to sriram-anyscale/ray that referenced this pull request Mar 23, 2022
There are situations when the coordination between the Autoscaler and Kuberay can get confused. This PR along with a Kuberay PR (ray-project/kuberay#208) addresses these situations.

Examples:

- Autoscaler request Kuberay to delete a specific set of nodes, but before the Kuberay reconciler kicks in, a node dies. This causes Kuberay to delete a random set of nodes instead of the ones specified. This issue gets fixed in the Kuberay PR.

- Autoscaler requests creation or termination of nodes. But simultaneously there is another request that changes the number of replicas (e.g., through the Kuberay API server). In this case, the _wait_for_pods methods will never terminate, and cause the Autoscaler to get stuck. This PR fixes this issue.

Details on the code changes:

The Autoscaler no longer waits for Kuberay to complete the request (through waiting in _wait_for_pods). Instead it makes sure the previous request has been completed each time before it submits a new request.

Instead of ensuring that the number of replicas are correct (as _wait_for_pods was doing) - which is error prone, we now check that Kuberay has cleared workersToDelete as the indication that the previous request has been completed.

The Autoscaler no longer clears workersToDelete.

The Autoscaler adds a dummy entry into workersToDelete even for createNode requests (which Kuberay will eventually clear) so future requests can ensure the createNode request has been completed.
@pcmoritz
Copy link
Collaborator

The PR looks great -- I don't know as much about the code as other people in this thread so don't feel like I can approve it, @Jeffwan can you have another look and approve if it looks good to you?

I also convinced myself of the fact that the code is equivalent if the feature flag is not set, so I'm a bit confused that the CI is failing on the latest commit. It would be great to dig into that a bit more :)

@chenk008
Copy link
Contributor

chenk008 commented Mar 24, 2022

I'm still confused with these change, please hold on. @pcmoritz @Jeffwan

@chenk008
Copy link
Contributor

We should follow Kubernetes Best Practices https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/ , here the desired state is replica.

@Jeffwan
Copy link
Collaborator

Jeffwan commented Mar 24, 2022

so I'm a bit confused that the CI is failing on the latest commit

I think the test is flaky and I will put some time to fix it. We can rerun the tests later. I will have another check and let's also address comments from @chenk008

@pcmoritz
Copy link
Collaborator

@chenk008 My thoughts here are the following, and this is very similar to what @sriram-anyscale talked about in the last community meeting: Kubernetes and its autoscalers uses replicas a lot since K8s is often used for stateless computation with components that have no identity (all the replicas are interchangeable). This however is not true for the pods that Ray is creating: They have different actors in them and cannot be treated interchangeably.

Now the pod is still a great abstraction for that, but collections of interchangeable pods (replicas) not so much. That's why it makes sense to move away from the replicas concept. Instead of replicas, for Ray the much more natural abstraction is just a list of pods that have identity -- this is what the Ray autoscaler basically operates on.

Is that appropriately addressing your comment about the Kubernetes best practices or did you have something more specific or different in mind?

// Essentially WorkersToDelete has to be deleted to meet the expectations of the Autoscaler.
log.Info("reconcilePods", "removing the pods in the scaleStrategy of", worker.GroupName)
for _, podsToDelete := range worker.ScaleStrategy.WorkersToDelete {
if diff >= 0 {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chenk008 - this if statement may address your concerns. However I do not think it is a good idea. But we can use this as a starting point for a discussion. I fully agree that we should be deleting the nodes directly (and the not being declarative issue). However my PR did not introduce this problem. The node deletion and non-declarative aspects has already existed before my PR. This change with this if statement makes the code strictly better than it was (I hope this part is obvious). I argue that the code will be even better without the if statement (which is the discussion issue).

The reason we cannot delete the nodes directly is due to how the CRD and associated logic has been designed. If we delete a node directly, Kuberay will go ahead and add a new one (which is not desirable in most cases). What we need to do in the current setup is to atomically decrease the number of replicas and remove the nodes.

There are multiple scenarios where the scheduler/autoscaler needs to remove a node or multiple nodes but not maintain the current number of replicas. We really need to revisit the CRD design to address this, but this PR is attempting to improve the implementation given the current CRD.

@chenk008
Copy link
Contributor

chenk008 commented Mar 25, 2022

@pcmoritz @sriram-anyscale I agree that ray is a stateful workload.

Maybe there is some gap in our discussion: we start ray with block, and the container entrypoint is ray start. When the ray worker(raylet) died, the container will exit, and the kubelet will restart the container, the ray work will come back. Kuberay reconciliation loop does not involve in this flow.

Consider the cases:

  1. scale down with specified node: I think that's most of the cases. It is a little hard to atomically decrease the number of replicas and remove the nodes.
  2. scale down with random node: We rarely use this. Just adjust the replica
  3. scale up: It is easy to adjust the replica

We should have consensus on the default behavior of ray-operator and autoscaler.

@Jeffwan I think we can merge it, but maybe should discuss in the other issue.

@sriram-anyscale
Copy link
Collaborator Author

sriram-anyscale commented Mar 25, 2022 via email

@sriram-anyscale
Copy link
Collaborator Author

To summarize I hope it is OK to merge after removing the "if statement" I just added. Please comment either way - thanks!

… we have verified that the tests pass with the flag set to true)
@pcmoritz
Copy link
Collaborator

pcmoritz commented Mar 25, 2022

@chenk008 The problem is that Ray needs to know that the nodes/pods came back so it can restart the actors (the actors won't be restarted if kubernetes just re-runs the pod with the ray start entrypoint). This is how Ray fault tolerance is designed today.

My proposal here is to remove the if statement right now and discuss this more in the next Kuberay meeting (I think this is better discussed in person). We won't switch the feature flag to true before we have discussed this question and agree on it.

Given how the Ray autoscaler is designed today, the code without the if statement makes the most sense, so let's merge the PR with that now, so we can fix the bugs with the Ray Autoscaler <> KubeRay integration. Note it won't have an impact on existing KubeRay users since it is behind a feature flag.

@Jeffwan @chenk008 Does this course of action sound good to you?

@chenk008
Copy link
Contributor

LGTM! We should merge it to fix the bugs with the integration.

@pcmoritz pcmoritz merged commit a46ba3f into ray-project:master Mar 26, 2022
@pcmoritz
Copy link
Collaborator

Thanks everybody for their efforts to help with this :)

@Jeffwan
Copy link
Collaborator

Jeffwan commented Mar 26, 2022

I was busy with some internal stuff and just get a chance to check new updated threads. Overall looks good to me, since we already have a MVP version (in v0.2.0) out, we can iterate quickly on master given it's protected by feature flag. For the further design improvements, let's create separate issue and discuss them in the community meeting.

@sriram-anyscale sriram-anyscale deleted the prioritize-workers-to-delete branch April 3, 2022 15:03
@DmitriGekhtman DmitriGekhtman mentioned this pull request Jul 8, 2022
2 tasks
DmitriGekhtman added a commit that referenced this pull request Jul 15, 2022
This PR

Flips the flag introduced in Prioritize WorkersToDelete #208. This allows the autoscaler to function properly without additional configuration of the operator deployment.

Updates the docs accordingly.

Makes minor tweaks to the autoscaling documentation, including documenting recently added fields to the sample config.

Updates the default autoscaler image with changes from Ray upstream, to include the bug fix from [KubeRay][Autoscaler][Core] Add a flag to disable ray status version check ray#26584.

Signed-off-by: Dmitri Gekhtman <dmitri.m.gekhtman@gmail.com>
lowang-bh pushed a commit to lowang-bh/kuberay that referenced this pull request Sep 24, 2023
* Modifies the reconciliation loop to act on WorkersToDelete first and then adjust the total number of running pods to match Replicas

* Modifies the reconciliation loop to act on WorkersToDelete first and then adjust the total number of running pods to match Replicas

* Removed my questions that were comments in the source code and added the featureFlag as a command line flag.

* Added a change as a potential solution to issues raised in the PR

* fixed location of if statement

* Removed the if statement and set feature flag back to false (now that we have verified that the tests pass with the flag set to true)
lowang-bh pushed a commit to lowang-bh/kuberay that referenced this pull request Sep 24, 2023
…ect#379)

This PR

Flips the flag introduced in Prioritize WorkersToDelete ray-project#208. This allows the autoscaler to function properly without additional configuration of the operator deployment.

Updates the docs accordingly.

Makes minor tweaks to the autoscaling documentation, including documenting recently added fields to the sample config.

Updates the default autoscaler image with changes from Ray upstream, to include the bug fix from [KubeRay][Autoscaler][Core] Add a flag to disable ray status version check ray#26584.

Signed-off-by: Dmitri Gekhtman <dmitri.m.gekhtman@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants