-
Notifications
You must be signed in to change notification settings - Fork 321
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support suspension of RayClusters #1711
Support suspension of RayClusters #1711
Conversation
Some manual testing I did so far: I have a kind cluster locally with lcoally built kuberay-operator and a ray cluster:
If I run
If I remove the suspend field, pods are created again:
|
Will add unit / e2e tests shortly |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I've left a couple of general comments, otherwise looks good! I'll get back to you if I have any valuable feedback when I get a change to test it.
|
||
// if RayCluster is suspended, delete all head pods | ||
if instance.Spec.Suspend != nil && *instance.Spec.Suspend { | ||
for _, headPod := range headPods.Items { | ||
if err := r.Delete(ctx, &headPod); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to perform a delete collection operation, e.g. with DeleteAllOf
, instead of multiple delete ones?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, great suggestion :)
return err | ||
} | ||
|
||
for _, workerPod := range workerPods.Items { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dito: could it be possible to perform a single delete collection operation, e.g. with DeleteAllOf
, instead of multiple delete ones?
r.Log.Info("reconcilePods", "head Pod", headPod.Name, "shouldDelete", shouldDelete, "reason", reason) | ||
if shouldDelete { | ||
|
||
// if RayCluster is suspended, delete all head pods |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems some logic is common to the standard cluster deletion. Maybe there is an opportunity to factorise some of it and avoid duplication / discrepancies?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems some logic is common to the standard cluster deletion.
I might have missed it, but I didn't see any deletion logic for the cluster, my assumption was that this was handled via OwnerReferences and Kubernetes garbage collection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could have but I don't think that's the case. I'm referring to that code block in particular:
kuberay/ray-operator/controllers/ray/raycluster_controller.go
Lines 202 to 259 in 4c2c046
r.Log.Info( | |
fmt.Sprintf("The RayCluster with GCS enabled, %s, is being deleted. Start to handle the Redis cleanup finalizer %s.", | |
instance.Name, common.GCSFaultToleranceRedisCleanupFinalizer), | |
"DeletionTimestamp", instance.ObjectMeta.DeletionTimestamp) | |
// Delete the head Pod if it exists. | |
headPods := corev1.PodList{} | |
filterLabels := client.MatchingLabels{common.RayClusterLabelKey: instance.Name, common.RayNodeTypeLabelKey: string(rayv1.HeadNode)} | |
if err := r.List(ctx, &headPods, client.InNamespace(instance.Namespace), filterLabels); err != nil { | |
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err | |
} | |
for _, headPod := range headPods.Items { | |
if !headPod.DeletionTimestamp.IsZero() { | |
r.Log.Info(fmt.Sprintf("The head Pod %s is already being deleted. Skip deleting this Pod.", headPod.Name)) | |
continue | |
} | |
r.Log.Info(fmt.Sprintf( | |
"Delete the head Pod %s before the Redis cleanup. "+ | |
"The storage namespace %s in Redis cannot be fully deleted if the GCS process on the head Pod is still writing to it.", | |
headPod.Name, headPod.Annotations[common.RayExternalStorageNSAnnotationKey])) | |
if err := r.Delete(ctx, &headPod); err != nil { | |
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err | |
} | |
} | |
// Delete all worker Pods if they exist. | |
for _, workerGroup := range instance.Spec.WorkerGroupSpecs { | |
workerPods := corev1.PodList{} | |
filterLabels = client.MatchingLabels{common.RayClusterLabelKey: instance.Name, common.RayNodeGroupLabelKey: workerGroup.GroupName} | |
if err := r.List(ctx, &workerPods, client.InNamespace(instance.Namespace), filterLabels); err != nil { | |
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err | |
} | |
for _, workerPod := range workerPods.Items { | |
if !workerPod.DeletionTimestamp.IsZero() { | |
r.Log.Info(fmt.Sprintf("The worker Pod %s is already being deleted. Skip deleting this Pod.", workerPod.Name)) | |
continue | |
} | |
r.Log.Info(fmt.Sprintf( | |
"Delete the worker Pod %s. This step isn't necessary for initiating the Redis cleanup process.", workerPod.Name)) | |
if err := r.Delete(ctx, &workerPod); err != nil { | |
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err | |
} | |
} | |
} | |
// If the number of head Pods is not 0, wait for it to be terminated before initiating the Redis cleanup process. | |
if len(headPods.Items) != 0 { | |
r.Log.Info(fmt.Sprintf( | |
"Wait for the head Pod %s to be terminated before initiating the Redis cleanup process. "+ | |
"The storage namespace %s in Redis cannot be fully deleted if the GCS process on the head Pod is still writing to it.", | |
headPods.Items[0].Name, headPods.Items[0].Annotations[common.RayExternalStorageNSAnnotationKey])) | |
// Requeue after 10 seconds because it takes much longer than DefaultRequeueDuration (2 seconds) for the head Pod to be terminated. | |
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil | |
} | |
// We can start the Redis cleanup process now because the head Pod has been terminated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm going to opt out of updating this code for now as I don't want to change the logic around redis cleanup job. But it might be good to revisit this.
In most delete cases though, it seems like we defer to garbage collection
kuberay/ray-operator/controllers/ray/raycluster_controller.go
Lines 311 to 314 in 4c2c046
if instance.DeletionTimestamp != nil && !instance.DeletionTimestamp.IsZero() { | |
r.Log.Info("RayCluster is being deleted, just ignore", "cluster name", request.Name) | |
return ctrl.Result{}, nil | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, sounds good!
0fa4ace
to
69c6061
Compare
@@ -599,6 +599,12 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv | |||
} | |||
} | |||
|
|||
// if RayCluster is suspended, delete all pods and skip reconcile | |||
if instance.Spec.Suspend != nil && *instance.Spec.Suspend { | |||
filterLabels = client.MatchingLabels{common.RayClusterLabelKey: instance.Name} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the cluster state be checked and the deletion skipped when it has already been suspended, to avoid unnecessary requests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// if RayCluster is suspended, delete all pods and skip reconcile | ||
if instance.Spec.Suspend != nil && *instance.Spec.Suspend { | ||
filterLabels = client.MatchingLabels{common.RayClusterLabelKey: instance.Name} | ||
return r.DeleteAllOf(ctx, &corev1.Pod{}, client.InNamespace(instance.Namespace), filterLabels) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would emitting an event be useful, to track suspension events? Or maybe a condition would be better suited?
@andrewsykim do you think e2e tests would be useful? I can help if needed. |
69c6061
to
99cc751
Compare
return nil | ||
} | ||
|
||
clusterLabel := client.MatchingLabels{common.RayClusterLabelKey: instance.Name} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kevin85421 is it safe to use this cluster label to find pods to terminate for suspension? Are there pods we shouldn't delete using this label if a RayCluster is suspended?
Alternatively we can specifically call DeleteCollection for head node and worker group nodes like this:
headPodLabels := client.MatchingLabels{common.RayClusterLabelKey: instance.Name, common.RayNodeTypeLabelKey: string(rayv1.HeadNode)}
if err := r.DeleteAllOf(ctx, &corev1.Pod{}, client.InNamespace(instance.Namespace), headPodLabels); err != nil {
return err
}
for _, worker := range instance.Spec.WorkerGroupSpecs {
workerGroupLabels := client.MatchingLabels{common.RayClusterLabelKey: instance.Name, common.RayNodeGroupLabelKey: worker.GroupName}
if err := r.DeleteAllOf(ctx, &corev1.Pod{}, client.InNamespace(instance.Namespace), workerGroupLabels); err != nil {
return err
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it safe to use this cluster label to find pods to terminate for suspension? Are there pods we shouldn't delete using this label if a RayCluster is suspended?
Yes, I think it is safe.
Alternatively we can specifically call DeleteCollection for head node and worker group nodes like this
SGTM
@astefanutti yes please feel free to open a PR against mine or have a follow-up PR after this one |
@kevin85421 I think this is ready for review, please take a look |
@andrewsykim perfect, will do once yours is merged. |
99cc751
to
702e56c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some questions:
-
Currently, we only terminate Pods. Should we also delete other Kubernetes resources like Services, Ingresses, Roles, etc.?
-
Do we need to keep reconciling a suspended CR?
Personally I don't think this is necessary because the runtime cost of these are negligible. |
6e94eba
to
9b4fe0d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally I don't think this is necessary because the runtime cost of these are negligible.
- An edge case that I can come up with is the LoadBalancer service. If we don't delete the service, will the LoadBalancer outside of the Kubernetes cluster be deleted?
Do we need to keep reconciling a suspended CR?
- Can we only reconcile it when
suspend
changes back tofalse
?
@andrewsykim Would you mind taking a look at the CI errors? Thanks! |
@kevin85421 the CI errors don't seem related to any of my changes:
Any ideas? It seems to fail pretty consistently |
Try to rebase with the master branch. |
Those LBs would not be deleted if a RayCluster is suspended. And for now I think this is the correct approach. Deleting the LB on suspension would mean that the LB is provisioned a new DNS or IP when ray cluster is resumed. Some users may expect the endpoint of the LB to remain consistent even if they temporarily spin down all the pods. |
9b4fe0d
to
c8c7acb
Compare
Originally I didn't add this because of deletion cases, cause we still want to allow deletion of suspend clusters right? But I think we can just add this check after deletion checks. I updated the PR with a top level check in |
c8c7acb
to
b326569
Compare
Signed-off-by: Andrew Sy Kim <andrewsy@google.com>
b326569
to
c5325ed
Compare
Got it. This makes sense.
I cannot get the point. What's the relationship between skipping reconciliation and the deletion of suspended RayCluster CR? |
Sorry for the confusion, I just meant that deleting a suspended cluster should be possible which is why I didn't skip reconcile for suspended cluster initially. But I realized that deletion is handled by garbage collection via OwnerReference, so I added a check to skip reconcile if cluster is suspended. See https://github.com/ray-project/kuberay/pull/1711/files#diff-72ecc3ca405f1e828187748d4f1ec8160bccffa2a4f84a364cd7a94a78e1adb9R315-R318 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
I manually tested it and found a bug. I will open a follow-up PR to fix it. |
Thanks @kevin85421 |
Why are these changes needed?
Support suspending RayClusters. See #1667 for more details on the use-case.
Related issue number
Closes #1667
Checks