Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release blocker][Feature] Only Autoscaler can make decisions to delete Pods #1253

Merged
merged 3 commits into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ray-operator/controllers/ray/common/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ const (
RAYCLUSTER_DEFAULT_REQUEUE_SECONDS_ENV = "RAYCLUSTER_DEFAULT_REQUEUE_SECONDS_ENV"
RAYCLUSTER_DEFAULT_REQUEUE_SECONDS = 300

// This KubeRay operator environment variable is used to determine if random Pod
// deletion should be enabled. Note that this only takes effect when autoscaling
// is enabled for the RayCluster. This is a feature flag for v0.6.0, and will be
// removed if the default behavior is stable enoguh.
ENABLE_RANDOM_POD_DELETE = "ENABLE_RANDOM_POD_DELETE"

// Ray core default configurations
DefaultRedisPassword = "5241590000000000"
DefaultWorkerRayGcsReconnectTimeoutS = "600"
Expand Down
49 changes: 35 additions & 14 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
// Reconcile head Pod
if len(headPods.Items) == 1 {
headPod := headPods.Items[0]
r.Log.Info("reconcilePods ", "head pod found", headPod.Name)
r.Log.Info("reconcilePods", "head pod found", headPod.Name)
if headPod.Status.Phase == corev1.PodRunning || headPod.Status.Phase == corev1.PodPending {
r.Log.Info("reconcilePods", "head pod is up and running... checking workers", headPod.Name)
} else if headPod.Status.Phase == corev1.PodFailed && strings.Contains(headPod.Status.Reason, "Evicted") {
Expand All @@ -431,15 +431,15 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
}
if len(headPods.Items) == 0 || headPods.Items == nil {
// create head pod
r.Log.Info("reconcilePods ", "creating head pod for cluster", instance.Name)
r.Log.Info("reconcilePods", "creating head pod for cluster", instance.Name)
common.CreatedClustersCounterInc(instance.Namespace)
if err := r.createHeadPod(ctx, *instance); err != nil {
common.FailedClustersCounterInc(instance.Namespace)
return err
}
common.SuccessfulClustersCounterInc(instance.Namespace)
} else if len(headPods.Items) > 1 {
r.Log.Info("reconcilePods ", "more than 1 head pod found for cluster", instance.Name)
r.Log.Info("reconcilePods", "more than 1 head pod found for cluster", instance.Name)
itemLength := len(headPods.Items)
for index := 0; index < itemLength; index++ {
if headPods.Items[index].Status.Phase == corev1.PodRunning || headPods.Items[index].Status.Phase == corev1.PodPending {
Expand Down Expand Up @@ -594,19 +594,40 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
r.Log.Info("reconcilePods", "all workers already exist for group", worker.GroupName)
continue
} else {
// diff < 0 means that we need to delete some Pods to meet the desired number of replicas.
randomlyRemovedWorkers := -diff
r.Log.Info("reconcilePods", "Number workers to delete randomly", randomlyRemovedWorkers, "Worker group", worker.GroupName)
for i := 0; i < int(randomlyRemovedWorkers); i++ {
randomPodToDelete := runningPods.Items[i]
r.Log.Info("Randomly deleting Pod", "progress", fmt.Sprintf("%d / %d", i+1, randomlyRemovedWorkers), "with name", randomPodToDelete.Name)
if err := r.Delete(ctx, &randomPodToDelete); err != nil {
if !errors.IsNotFound(err) {
return err
// diff < 0 indicates the need to delete some Pods to match the desired number of replicas. However,
// randomly deleting Pods is certainly not ideal. So, if autoscaling is enabled for the cluster, we
// will disable random Pod deletion, making Autoscaler the sole decision-maker for Pod deletions.
enableInTreeAutoscaling := (instance.Spec.EnableInTreeAutoscaling != nil) && (*instance.Spec.EnableInTreeAutoscaling)

// TODO (kevin85421): `enableRandomPodDelete` is a feature flag for KubeRay v0.6.0. If users want to use
// the old behavior, they can set the environment variable `ENABLE_RANDOM_POD_DELETE` to `true`. When the
// default behavior is stable enough, we can remove this feature flag.
enableRandomPodDelete := false
if enableInTreeAutoscaling {
if s := os.Getenv(common.ENABLE_RANDOM_POD_DELETE); strings.ToLower(s) == "true" {
enableRandomPodDelete = true
}
}
// Case 1: If Autoscaler is disabled, we will always enable random Pod deletion no matter the value of the feature flag.
// Case 2: If Autoscaler is enabled, we will respect the value of the feature flag. If the feature flag environment variable
// is not set, we will disable random Pod deletion by default.
if !enableInTreeAutoscaling || enableRandomPodDelete {
// diff < 0 means that we need to delete some Pods to meet the desired number of replicas.
randomlyRemovedWorkers := -diff
r.Log.Info("reconcilePods", "Number workers to delete randomly", randomlyRemovedWorkers, "Worker group", worker.GroupName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rephrase the log statement.
"Randomly pick xx workers to delete from worker group xxx."

for i := 0; i < int(randomlyRemovedWorkers); i++ {
randomPodToDelete := runningPods.Items[i]
r.Log.Info("Randomly deleting Pod", "progress", fmt.Sprintf("%d / %d", i+1, randomlyRemovedWorkers), "with name", randomPodToDelete.Name)
if err := r.Delete(ctx, &randomPodToDelete); err != nil {
if !errors.IsNotFound(err) {
return err
}
r.Log.Info("reconcilePods", "The worker Pod has already been deleted", randomPodToDelete.Name)
}
r.Log.Info("reconcilePods", "The worker Pod has already been deleted", randomPodToDelete.Name)
r.Recorder.Eventf(instance, corev1.EventTypeNormal, "Deleted", "Deleted Pod %s", randomPodToDelete.Name)
}
r.Recorder.Eventf(instance, corev1.EventTypeNormal, "Deleted", "Deleted Pod %s", randomPodToDelete.Name)
} else {
r.Log.Info(fmt.Sprintf("Random Pod deletion is disabled for cluster %s. The only decision-maker for Pod deletions is Autoscaler.", instance.Name))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Ray Autoscaler. (if that is what you mean :) )

}
}
}
Expand Down
Loading
Loading