From daa8d260d1eba956d886c3746247ce9b980a0776 Mon Sep 17 00:00:00 2001 From: Vicente Ferrara Date: Thu, 8 Feb 2024 21:21:03 +0000 Subject: [PATCH] scale down n m patch error field not declared in schema commented out podSet immutability from workload webhook to be able to update that field added more comments clean code nit debuggin n m patch error field not declared in schema clean code n m patch error field not declared in schema commented out podSet immutability from workload webhook to be able to update that field added more comments clean code nit a cluster queue reconciliation fixed, it had to do with the infot totalrequests from admission inside the worklad go file working with scheduler cleaning code cleaning code cleaning cleaning cleaning integation test, but it messes up with parallelism test which should be expected updated parallelism it test updated wrappers kep removed Kep removed log lines clean code added a better conditional for updating the resize if the job is a RayCluster added Kind condition updated test and equivalentToWorkload condition added podset assigments check --- pkg/controller/core/workload_controller.go | 21 ++++++ pkg/controller/jobframework/reconciler.go | 25 +++++-- pkg/features/kube_features.go | 5 ++ pkg/scheduler/scheduler.go | 65 ++++++++++++++++++- pkg/util/testingjobs/raycluster/wrappers.go | 6 ++ pkg/webhooks/workload_webhook.go | 11 ---- .../raycluster/raycluster_controller_test.go | 58 ++++++++++++++++- 7 files changed, 173 insertions(+), 18 deletions(-) diff --git a/pkg/controller/core/workload_controller.go b/pkg/controller/core/workload_controller.go index 5ce8c8a43b..f29dd08895 100644 --- a/pkg/controller/core/workload_controller.go +++ b/pkg/controller/core/workload_controller.go @@ -48,6 +48,7 @@ import ( "sigs.k8s.io/kueue/pkg/cache" "sigs.k8s.io/kueue/pkg/constants" "sigs.k8s.io/kueue/pkg/controller/core/indexer" + "sigs.k8s.io/kueue/pkg/features" "sigs.k8s.io/kueue/pkg/queue" "sigs.k8s.io/kueue/pkg/util/slices" "sigs.k8s.io/kueue/pkg/workload" @@ -587,11 +588,31 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool { if err := r.cache.UpdateWorkload(oldWl, wlCopy); err != nil { log.Error(err, "Updating workload in cache") } + // This forces you to go through the scheduler to update PodSetAssignments if there's a difference between the + // worker group podSetAssignments.Count of the old and new workload + updatePsa := compareAdmissionPodSetAssignmentCount(oldWl.Status.Admission, wlCopy.Status.Admission) + if features.Enabled(features.DynamicallySizedJobs) && updatePsa && !r.queues.UpdateWorkload(oldWl, wlCopy) { + log.V(2).Info("Queue for updated workload didn't exist; ignoring for now") + } } return true } +func compareAdmissionPodSetAssignmentCount(oldWlAdmission *kueue.Admission, newWlAdmisson *kueue.Admission) bool { + // this is specific to RayClusters, it contains a PodSet of length 2, containing head and workers + oldWlPsa := len(oldWlAdmission.PodSetAssignments) + newWlPsa := len(newWlAdmisson.PodSetAssignments) + if oldWlPsa == 2 && oldWlPsa == newWlPsa { + if oldWlAdmission.PodSetAssignments[1].Name == newWlAdmisson.PodSetAssignments[1].Name { + if oldWlAdmission.PodSetAssignments[1].Count != newWlAdmisson.PodSetAssignments[1].Count { + return true + } + } + } + return false +} + func (r *WorkloadReconciler) Generic(e event.GenericEvent) bool { r.log.V(3).Info("Ignore generic event", "obj", klog.KObj(e.Object), "kind", e.Object.GetObjectKind().GroupVersionKind()) return false diff --git a/pkg/controller/jobframework/reconciler.go b/pkg/controller/jobframework/reconciler.go index 865bc0c446..2904cce145 100644 --- a/pkg/controller/jobframework/reconciler.go +++ b/pkg/controller/jobframework/reconciler.go @@ -348,6 +348,21 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques } } + // 4.1 update podSetCount for RayCluster resize + if features.Enabled(features.DynamicallySizedJobs) && wl != nil && workload.IsAdmitted(wl) && job.GVK().Kind == "RayCluster" { + podSets := job.PodSets() + jobPodSetCount := podSets[1].Count + workloadPodSetCount := wl.Spec.PodSets[1].Count + if workloadPodSetCount > jobPodSetCount { + toUpdate := wl + _, err := r.updateWorkloadToMatchJob(ctx, job, object, toUpdate, "Updated Workload due to resize: %v") + if err != nil { + return ctrl.Result{}, err + } + return ctrl.Result{}, nil + } + } + // 5. handle WaitForPodsReady only for a standalone job. // handle a job when waitForPodsReady is enabled, and it is the main job if r.waitForPodsReady { @@ -574,7 +589,7 @@ func (r *JobReconciler) ensureOneWorkload(ctx context.Context, job GenericJob, o } if toUpdate != nil { - return r.updateWorkloadToMatchJob(ctx, job, object, toUpdate) + return r.updateWorkloadToMatchJob(ctx, job, object, toUpdate, "Updated not matching Workload for suspended job: %v") } return match, nil @@ -679,7 +694,9 @@ func equivalentToWorkload(ctx context.Context, c client.Client, job GenericJob, jobPodSets := clearMinCountsIfFeatureDisabled(job.PodSets()) if runningPodSets := expectedRunningPodSets(ctx, c, wl); runningPodSets != nil { - if equality.ComparePodSetSlices(jobPodSets, runningPodSets) { + jobPodSetCount := job.PodSets() + workloadPodSetCount := wl.Spec.PodSets[1].Count + if equality.ComparePodSetSlices(jobPodSets, runningPodSets) || (features.Enabled(features.DynamicallySizedJobs) && job.GVK().Kind == "RayCluster" && jobPodSetCount[1].Count < workloadPodSetCount) { return true } // If the workload is admitted but the job is suspended, do the check @@ -692,7 +709,7 @@ func equivalentToWorkload(ctx context.Context, c client.Client, job GenericJob, return equality.ComparePodSetSlices(jobPodSets, wl.Spec.PodSets) } -func (r *JobReconciler) updateWorkloadToMatchJob(ctx context.Context, job GenericJob, object client.Object, wl *kueue.Workload) (*kueue.Workload, error) { +func (r *JobReconciler) updateWorkloadToMatchJob(ctx context.Context, job GenericJob, object client.Object, wl *kueue.Workload, message string) (*kueue.Workload, error) { newWl, err := r.constructWorkload(ctx, job, object) if err != nil { return nil, fmt.Errorf("can't construct workload for update: %w", err) @@ -707,7 +724,7 @@ func (r *JobReconciler) updateWorkloadToMatchJob(ctx context.Context, job Generi } r.record.Eventf(object, corev1.EventTypeNormal, ReasonUpdatedWorkload, - "Updated not matching Workload for suspended job: %v", klog.KObj(wl)) + message, klog.KObj(wl)) return newWl, nil } diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index b9d75db47d..e9c2ca7261 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -83,6 +83,10 @@ const ( // // Enables lending limit. LendingLimit featuregate.Feature = "LendingLimit" + // owner: @vicenteferrara + // kep: + // alpha: v0.7 + DynamicallySizedJobs featuregate.Feature = "DynamicallySizedJobs" ) func init() { @@ -104,6 +108,7 @@ var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ PrioritySortingWithinCohort: {Default: true, PreRelease: featuregate.Beta}, MultiKueue: {Default: false, PreRelease: featuregate.Alpha}, LendingLimit: {Default: false, PreRelease: featuregate.Alpha}, + DynamicallySizedJobs: {Default: true, PreRelease: featuregate.Alpha}, } func SetFeatureGateDuringTest(tb testing.TB, f featuregate.Feature, value bool) func() { diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 9d61ad2ea5..f8e3c94354 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -258,6 +258,9 @@ func (s *Scheduler) schedule(ctx context.Context) { s.cache.WaitForPodsReady(ctx) log.V(5).Info("Finished waiting for all admitted workloads to be in the PodsReady condition") } + if e.status == assumed { + continue + } e.status = nominated if err := s.admit(ctx, e, cq.AdmissionChecks); err != nil { e.inadmissibleMsg = fmt.Sprintf("Failed to admit workload: %v", err) @@ -313,8 +316,21 @@ func (s *Scheduler) nominate(ctx context.Context, workloads []workload.Info, sna ns := corev1.Namespace{} e := entry{Info: w} if s.cache.IsAssumedOrAdmittedWorkload(w) { - log.Info("Workload skipped from admission because it's already assumed or admitted", "workload", klog.KObj(w.Obj)) - continue + if features.Enabled(features.DynamicallySizedJobs) { + // here we want to get the flavors and resources assigned again so that we can update PodSetAssignments + e.assignment, e.preemptionTargets = s.getResizeAssignment(log, &e.Info, &snap) + e.inadmissibleMsg = e.assignment.Message() + e.Info.LastAssignment = &e.assignment.LastState + e.status = assumed + if err := s.updateResizePodSetAssignments(ctx, e); err != nil { + log.Error(err, "Could not apploy admission to assumed workload") + continue + } + + } else { + log.Info("Workload skipped from admission because it's already assumed or admitted", "workload", klog.KObj(w.Obj)) + continue + } } else if workload.HasRetryOrRejectedChecks(w.Obj) { e.inadmissibleMsg = "The workload has failed admission checks" } else if snap.InactiveClusterQueueSets.Has(w.ClusterQueue) { @@ -377,6 +393,29 @@ type partialAssignment struct { preemptionTargets []*workload.Info } +func (s *Scheduler) getResizeAssignment(log logr.Logger, wl *workload.Info, snap *cache.Snapshot) (flavorassigner.Assignment, []*workload.Info) { + cq := snap.ClusterQueues[wl.ClusterQueue] + flvAssigner := flavorassigner.New(wl, cq, snap.ResourceFlavors) + resizeAssignment := flvAssigner.Assign(log, nil) + var faPreemtionTargets []*workload.Info + + arm := resizeAssignment.RepresentativeMode() + if arm == flavorassigner.Fit { + return resizeAssignment, nil + } + + if arm == flavorassigner.Preempt { + faPreemtionTargets = s.preemptor.GetTargets(*wl, resizeAssignment, snap) + } + + // if the feature gate is not enabled or we can preempt + if !features.Enabled(features.PartialAdmission) || len(faPreemtionTargets) > 0 { + return resizeAssignment, faPreemtionTargets + } + + return resizeAssignment, nil +} + func (s *Scheduler) getAssignments(log logr.Logger, wl *workload.Info, snap *cache.Snapshot) (flavorassigner.Assignment, []*workload.Info) { cq := snap.ClusterQueues[wl.ClusterQueue] flvAssigner := flavorassigner.New(wl, cq, snap.ResourceFlavors) @@ -477,6 +516,28 @@ func (s *Scheduler) validateLimitRange(ctx context.Context, wi *workload.Info) e return nil } +// admit sets the admitting clusterQueue and flavors into the workload of +// the entry, and asynchronously updates the object in the apiserver after +// assuming it in the cache. +func (s *Scheduler) updateResizePodSetAssignments(ctx context.Context, e entry) error { + newWorkload := e.Obj.DeepCopy() + admission := &kueue.Admission{ + ClusterQueue: kueue.ClusterQueueReference(e.ClusterQueue), + PodSetAssignments: e.assignment.ToAPI(), + } + + workload.SetQuotaReservation(newWorkload, admission) + _ = workload.SyncAdmittedCondition(newWorkload) + + if e.status == assumed { + // Apply admission means to update the workload with the new admission status, this is for the case of a scale down + // we shouldn't requeue a scale down we should only update the workload + return s.applyAdmission(ctx, newWorkload) + } + + return nil +} + // admit sets the admitting clusterQueue and flavors into the workload of // the entry, and asynchronously updates the object in the apiserver after // assuming it in the cache. diff --git a/pkg/util/testingjobs/raycluster/wrappers.go b/pkg/util/testingjobs/raycluster/wrappers.go index 7d55e93bdf..e573d4974b 100644 --- a/pkg/util/testingjobs/raycluster/wrappers.go +++ b/pkg/util/testingjobs/raycluster/wrappers.go @@ -80,6 +80,12 @@ func (j *ClusterWrapper) NodeSelectorHeadGroup(k, v string) *ClusterWrapper { return j } +// Set replica count +func (j *ClusterWrapper) SetReplicaCount(c int32) *ClusterWrapper { + j.Spec.WorkerGroupSpecs[0].Replicas = ptr.To(c) + return j +} + // Obj returns the inner Job. func (j *ClusterWrapper) Obj() *rayv1.RayCluster { return &j.RayCluster diff --git a/pkg/webhooks/workload_webhook.go b/pkg/webhooks/workload_webhook.go index a4527f693c..18d85a509e 100644 --- a/pkg/webhooks/workload_webhook.go +++ b/pkg/webhooks/workload_webhook.go @@ -347,7 +347,6 @@ func ValidateWorkloadUpdate(newObj, oldObj *kueue.Workload) field.ErrorList { allErrs = append(allErrs, ValidateWorkload(newObj)...) if workload.HasQuotaReservation(oldObj) { - allErrs = append(allErrs, apivalidation.ValidateImmutableField(newObj.Spec.PodSets, oldObj.Spec.PodSets, specPath.Child("podSets"))...) allErrs = append(allErrs, apivalidation.ValidateImmutableField(newObj.Spec.PriorityClassSource, oldObj.Spec.PriorityClassSource, specPath.Child("priorityClassSource"))...) allErrs = append(allErrs, apivalidation.ValidateImmutableField(newObj.Spec.PriorityClassName, oldObj.Spec.PriorityClassName, specPath.Child("priorityClassName"))...) } @@ -355,21 +354,11 @@ func ValidateWorkloadUpdate(newObj, oldObj *kueue.Workload) field.ErrorList { allErrs = append(allErrs, apivalidation.ValidateImmutableField(newObj.Spec.QueueName, oldObj.Spec.QueueName, specPath.Child("queueName"))...) allErrs = append(allErrs, validateReclaimablePodsUpdate(newObj, oldObj, field.NewPath("status", "reclaimablePods"))...) } - allErrs = append(allErrs, validateAdmissionUpdate(newObj.Status.Admission, oldObj.Status.Admission, field.NewPath("status", "admission"))...) allErrs = append(allErrs, validateImmutablePodSetUpdates(newObj, oldObj, statusPath.Child("admissionChecks"))...) return allErrs } -// validateAdmissionUpdate validates that admission can be set or unset, but the -// fields within can't change. -func validateAdmissionUpdate(new, old *kueue.Admission, path *field.Path) field.ErrorList { - if old == nil || new == nil { - return nil - } - return apivalidation.ValidateImmutableField(new, old, path) -} - // validateReclaimablePodsUpdate validates that the reclaimable counts do not decrease, this should be checked // while the workload is admitted. func validateReclaimablePodsUpdate(newObj, oldObj *kueue.Workload, basePath *field.Path) field.ErrorList { diff --git a/test/integration/controller/jobs/raycluster/raycluster_controller_test.go b/test/integration/controller/jobs/raycluster/raycluster_controller_test.go index 26d7c0ce55..a72d0ddf47 100644 --- a/test/integration/controller/jobs/raycluster/raycluster_controller_test.go +++ b/test/integration/controller/jobs/raycluster/raycluster_controller_test.go @@ -205,7 +205,7 @@ var _ = ginkgo.Describe("RayCluster controller", ginkgo.Ordered, ginkgo.Continue return apimeta.IsStatusConditionTrue(createdWorkload.Status.Conditions, kueue.WorkloadQuotaReserved) }, util.Timeout, util.Interval).Should(gomega.BeTrue()) - ginkgo.By("checking the job gets suspended when parallelism changes and the added node selectors are removed") + ginkgo.By("checking the job is suspended when parallelism increases and the added node selectors are removed") parallelism := ptr.Deref(job.Spec.WorkerGroupSpecs[0].Replicas, 1) newParallelism := parallelism + 1 createdJob.Spec.WorkerGroupSpecs[0].Replicas = &newParallelism @@ -632,6 +632,62 @@ var _ = ginkgo.Describe("RayCluster Job controller interacting with scheduler", util.ExpectPendingWorkloadsMetric(clusterQueue, 0, 0) util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 1) }) + + ginkgo.It("Should not suspend job when there's a scale down", func() { + ginkgo.By("creating localQueue") + localQueue = testing.MakeLocalQueue("local-queue", ns.Name).ClusterQueue(clusterQueue.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, localQueue)).Should(gomega.Succeed()) + + ginkgo.By("checking a dev job starts") + job := testingraycluster.MakeCluster("dev-job", ns.Name).SetReplicaCount(4).Queue(localQueue.Name). + RequestHead(corev1.ResourceCPU, "1"). + RequestWorkerGroup(corev1.ResourceCPU, "1"). + Obj() + gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed()) + createdJob := &rayv1.RayCluster{} + gomega.Eventually(func() bool { + gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: job.Name, Namespace: job.Namespace}, createdJob)). + Should(gomega.Succeed()) + return *createdJob.Spec.Suspend + }, util.Timeout, util.Interval).Should(gomega.BeFalse()) + gomega.Expect(createdJob.Spec.HeadGroupSpec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(spotUntaintedFlavor.Name)) + gomega.Expect(createdJob.Spec.WorkerGroupSpecs[0].Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor.Name)) + util.ExpectPendingWorkloadsMetric(clusterQueue, 0, 0) + util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 1) + + ginkgo.By("reduce the number of replicas, check the job is not suspended") + replicaCount := ptr.Deref(job.Spec.WorkerGroupSpecs[0].Replicas, 1) + newReplicaCount := replicaCount - 2 + createdJob.Spec.WorkerGroupSpecs[0].Replicas = &newReplicaCount + gomega.Expect(k8sClient.Update(ctx, createdJob)).Should(gomega.Succeed()) + gomega.Eventually(func() bool { + gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: job.Name, Namespace: job.Namespace}, createdJob)). + Should(gomega.Succeed()) + return *createdJob.Spec.Suspend + }, util.Timeout, util.Interval).Should(gomega.BeFalse()) + + ginkgo.By("checking the workload is updated with new count") + createdWorkload := &kueue.Workload{} + wlLookupKey := types.NamespacedName{Name: workloadraycluster.GetWorkloadNameForRayCluster(job.Name, job.UID), Namespace: ns.Name} + + gomega.Eventually(func() error { + return k8sClient.Get(ctx, wlLookupKey, createdWorkload) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + gomega.Eventually(func() bool { + if err := k8sClient.Get(ctx, wlLookupKey, createdWorkload); err != nil { + return false + } + return createdWorkload.Spec.PodSets[1].Count == newReplicaCount + }, util.Timeout, util.Interval).Should(gomega.BeTrue()) + gomega.Eventually(func() bool { + if err := k8sClient.Get(ctx, wlLookupKey, createdWorkload); err != nil { + return false + } + return *createdWorkload.Status.Admission.PodSetAssignments[1].Count == newReplicaCount + }, util.Timeout, util.Interval).Should(gomega.BeTrue()) + + }) + }) var _ = ginkgo.Describe("Job controller with preemption enabled", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() {