Skip to content

Commit

Permalink
[Cherry-pick][Bug][GCS FT] Clean up the Redis key before the head Pod…
Browse files Browse the repository at this point in the history
… is deleted (#1989) (#2017)
  • Loading branch information
kevin85421 committed Mar 15, 2024
1 parent e64a9b6 commit c3775bf
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 13 deletions.
18 changes: 9 additions & 9 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,22 +179,22 @@ func (r *RayClusterReconciler) Reconcile(ctx context.Context, request ctrl.Reque
return ctrl.Result{}, client.IgnoreNotFound(err)
}

func (r *RayClusterReconciler) deleteAllPods(ctx context.Context, namespace string, filterLabels client.MatchingLabels) (active int, pods corev1.PodList, err error) {
func (r *RayClusterReconciler) deleteAllPods(ctx context.Context, namespace string, filterLabels client.MatchingLabels) (pods corev1.PodList, err error) {
logger := ctrl.LoggerFrom(ctx)
if err = r.List(ctx, &pods, client.InNamespace(namespace), filterLabels); err != nil {
return 0, pods, err
return pods, err
}
active = 0
active := 0
for _, pod := range pods.Items {
if pod.DeletionTimestamp.IsZero() {
active++
}
}
if active > 0 {
logger.Info("Deleting all Pods with labels", "filterLabels", filterLabels, "Number of active Pods", active)
return active, pods, r.DeleteAllOf(ctx, &corev1.Pod{}, client.InNamespace(namespace), filterLabels)
return pods, r.DeleteAllOf(ctx, &corev1.Pod{}, client.InNamespace(namespace), filterLabels)
}
return active, pods, nil
return pods, nil
}

func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request ctrl.Request, instance *rayv1.RayCluster) (ctrl.Result, error) {
Expand Down Expand Up @@ -230,21 +230,21 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request
"DeletionTimestamp", instance.ObjectMeta.DeletionTimestamp)

// Delete the head Pod if it exists.
numDeletedHeads, headPods, err := r.deleteAllPods(ctx, instance.Namespace, client.MatchingLabels{
headPods, err := r.deleteAllPods(ctx, instance.Namespace, client.MatchingLabels{
utils.RayClusterLabelKey: instance.Name,
utils.RayNodeTypeLabelKey: string(rayv1.HeadNode),
})
if err != nil {
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
}
// Delete all worker Pods if they exist.
if _, _, err = r.deleteAllPods(ctx, instance.Namespace, client.MatchingLabels{
if _, err = r.deleteAllPods(ctx, instance.Namespace, client.MatchingLabels{
utils.RayClusterLabelKey: instance.Name,
utils.RayNodeTypeLabelKey: string(rayv1.WorkerNode),
}); err != nil {
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
}
if numDeletedHeads > 0 {
if len(headPods.Items) > 0 {
logger.Info(fmt.Sprintf(
"Wait for the head Pod %s to be terminated before initiating the Redis cleanup process. "+
"The storage namespace %s in Redis cannot be fully deleted if the GCS process on the head Pod is still writing to it.",
Expand Down Expand Up @@ -632,7 +632,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
// if RayCluster is suspended, delete all pods and skip reconcile
if instance.Spec.Suspend != nil && *instance.Spec.Suspend {
clusterLabel := client.MatchingLabels{utils.RayClusterLabelKey: instance.Name}
if _, _, err := r.deleteAllPods(ctx, instance.Namespace, clusterLabel); err != nil {
if _, err := r.deleteAllPods(ctx, instance.Namespace, clusterLabel); err != nil {
return err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2624,15 +2624,13 @@ func TestDeleteAllPods(t *testing.T) {
}
ctx := context.Background()
// The first `deleteAllPods` function call should delete the "alive" Pod.
active, pods, err := testRayClusterReconciler.deleteAllPods(ctx, ns, filter)
pods, err := testRayClusterReconciler.deleteAllPods(ctx, ns, filter)
assert.Nil(t, err)
assert.Equal(t, 1, active)
assert.Equal(t, 2, len(pods.Items))
assert.Subset(t, []string{"alive", "deleted"}, []string{pods.Items[0].Name, pods.Items[1].Name})
// The second `deleteAllPods` function call should delete no Pods because none are active.
active, pods, err = testRayClusterReconciler.deleteAllPods(ctx, ns, filter)
pods, err = testRayClusterReconciler.deleteAllPods(ctx, ns, filter)
assert.Nil(t, err)
assert.Equal(t, 0, active)
assert.Equal(t, 1, len(pods.Items))
assert.Equal(t, "deleted", pods.Items[0].Name)
// Make sure that the above `deleteAllPods` calls didn't remove other Pods.
Expand Down

0 comments on commit c3775bf

Please sign in to comment.