From c54df4e9c61d1814efbc69a275094f65ec2cfb3b Mon Sep 17 00:00:00 2001 From: Michal Wozniak Date: Fri, 15 Dec 2023 13:18:25 +0100 Subject: [PATCH] Preemption while borrowing WIP --- apis/kueue/v1beta1/clusterqueue_types.go | 32 ++++ apis/kueue/v1beta1/zz_generated.deepcopy.go | 27 ++- .../crd/kueue.x-k8s.io_clusterqueues.yaml | 24 +++ .../kueue/v1beta1/borrowwithincohort.go | 51 +++++ .../kueue/v1beta1/clusterqueuepreemption.go | 13 +- client-go/applyconfiguration/utils.go | 2 + .../bases/kueue.x-k8s.io_clusterqueues.yaml | 24 +++ .../flavorassigner/flavorassigner.go | 7 + pkg/scheduler/preemption/preemption.go | 32 +++- pkg/scheduler/preemption/preemption_test.go | 2 +- pkg/scheduler/scheduler.go | 8 +- test/integration/scheduler/preemption_test.go | 181 ++++++++++++++++++ 12 files changed, 385 insertions(+), 18 deletions(-) create mode 100644 client-go/applyconfiguration/kueue/v1beta1/borrowwithincohort.go diff --git a/apis/kueue/v1beta1/clusterqueue_types.go b/apis/kueue/v1beta1/clusterqueue_types.go index bea80090dd..6c4017b8b3 100644 --- a/apis/kueue/v1beta1/clusterqueue_types.go +++ b/apis/kueue/v1beta1/clusterqueue_types.go @@ -366,6 +366,10 @@ type ClusterQueuePreemption struct { // +kubebuilder:validation:Enum=Never;LowerPriority;Any ReclaimWithinCohort PreemptionPolicy `json:"reclaimWithinCohort,omitempty"` + // borrowWithinCohort provides configuration to allow preemption within + // cohort while borrowing. + BorrowWithinCohort *BorrowWithinCohort `json:"borrowFromCohortConfig,omitempty"` + // withinClusterQueue determines whether a pending Workload that doesn't fit // within the nominal quota for its ClusterQueue, can preempt active Workloads in // the ClusterQueue. The possible values are: @@ -382,6 +386,34 @@ type ClusterQueuePreemption struct { WithinClusterQueue PreemptionPolicy `json:"withinClusterQueue,omitempty"` } +type BorrowWithinCohortPolicy string + +const ( + BorrowWithinCohortPolicyNever BorrowWithinCohortPolicy = "Never" + BorrowWithinCohortPolicyLowerPriority BorrowWithinCohortPolicy = "LowerPriority" +) + +// BorrowWithinCohort contains configuration which allows to preempt workloads +// within cohort while borrowing. +type BorrowWithinCohort struct { + // policy determines the policy for preemption to reclaim quota within cohort while borrowing. + // Possible values are: + // - `Never` (default): do not reclaim quota within ClusterQueue while borrowing. + // - `LowerPriority`: allow preempting workloads to reclaim quota within the cohort, + // but only by preempting lower-priority workloads. + // + // +kubebuilder:default=Never + // +kubebuilder:validation:Enum=Never;LowerPriority + Policy BorrowWithinCohortPolicy `json:"policy,omitempty"` + + // maxPriorityThreshold allows to restrict the set of workloads for preemption + // to reclaim quota within the cohort, to only workloads with priority below + // or equal the specified level. + // + // +optional + MaxPriorityThreshold *int32 `json:"maxPriorityThreshold,omitempty"` +} + //+genclient //+genclient:nonNamespaced //+kubebuilder:object:root=true diff --git a/apis/kueue/v1beta1/zz_generated.deepcopy.go b/apis/kueue/v1beta1/zz_generated.deepcopy.go index 77a8c4bc00..3be88528da 100644 --- a/apis/kueue/v1beta1/zz_generated.deepcopy.go +++ b/apis/kueue/v1beta1/zz_generated.deepcopy.go @@ -193,6 +193,26 @@ func (in *AdmissionCheckStatus) DeepCopy() *AdmissionCheckStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *BorrowWithinCohort) DeepCopyInto(out *BorrowWithinCohort) { + *out = *in + if in.MaxPriorityThreshold != nil { + in, out := &in.MaxPriorityThreshold, &out.MaxPriorityThreshold + *out = new(int32) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BorrowWithinCohort. +func (in *BorrowWithinCohort) DeepCopy() *BorrowWithinCohort { + if in == nil { + return nil + } + out := new(BorrowWithinCohort) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ClusterQueue) DeepCopyInto(out *ClusterQueue) { *out = *in @@ -291,6 +311,11 @@ func (in *ClusterQueuePendingWorkloadsStatus) DeepCopy() *ClusterQueuePendingWor // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ClusterQueuePreemption) DeepCopyInto(out *ClusterQueuePreemption) { *out = *in + if in.BorrowWithinCohort != nil { + in, out := &in.BorrowWithinCohort, &out.BorrowWithinCohort + *out = new(BorrowWithinCohort) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterQueuePreemption. @@ -326,7 +351,7 @@ func (in *ClusterQueueSpec) DeepCopyInto(out *ClusterQueueSpec) { if in.Preemption != nil { in, out := &in.Preemption, &out.Preemption *out = new(ClusterQueuePreemption) - **out = **in + (*in).DeepCopyInto(*out) } if in.AdmissionChecks != nil { in, out := &in.AdmissionChecks, &out.AdmissionChecks diff --git a/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml b/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml index 30ba4efbf1..ee9b5084c6 100644 --- a/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml +++ b/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml @@ -176,6 +176,30 @@ spec: of Workloads to preempt to accomomdate the pending Workload, preempting Workloads with lower priority first." properties: + borrowFromCohortConfig: + description: borrowWithinCohort provides configuration to allow + preemption within cohort while borrowing. + properties: + maxPriorityThreshold: + description: maxPriorityThreshold allows to restrict the set + of workloads for preemption to reclaim quota within the + cohort, to only workloads with priority below or equal the + specified level. + format: int32 + type: integer + policy: + default: Never + description: 'policy determines the policy for preemption + to reclaim quota within cohort while borrowing. Possible + values are: - `Never` (default): do not reclaim quota within + ClusterQueue while borrowing. - `LowerPriority`: allow preempting + workloads to reclaim quota within the cohort, but only by + preempting lower-priority workloads.' + enum: + - Never + - LowerPriority + type: string + type: object reclaimWithinCohort: default: Never description: "reclaimWithinCohort determines whether a pending diff --git a/client-go/applyconfiguration/kueue/v1beta1/borrowwithincohort.go b/client-go/applyconfiguration/kueue/v1beta1/borrowwithincohort.go new file mode 100644 index 0000000000..76333b71e6 --- /dev/null +++ b/client-go/applyconfiguration/kueue/v1beta1/borrowwithincohort.go @@ -0,0 +1,51 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Code generated by applyconfiguration-gen. DO NOT EDIT. + +package v1beta1 + +import ( + v1beta1 "sigs.k8s.io/kueue/apis/kueue/v1beta1" +) + +// BorrowWithinCohortApplyConfiguration represents an declarative configuration of the BorrowWithinCohort type for use +// with apply. +type BorrowWithinCohortApplyConfiguration struct { + Policy *v1beta1.BorrowWithinCohortPolicy `json:"policy,omitempty"` + MaxPriorityThreshold *int32 `json:"maxPriorityThreshold,omitempty"` +} + +// BorrowWithinCohortApplyConfiguration constructs an declarative configuration of the BorrowWithinCohort type for use with +// apply. +func BorrowWithinCohort() *BorrowWithinCohortApplyConfiguration { + return &BorrowWithinCohortApplyConfiguration{} +} + +// WithPolicy sets the Policy field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Policy field is set to the value of the last call. +func (b *BorrowWithinCohortApplyConfiguration) WithPolicy(value v1beta1.BorrowWithinCohortPolicy) *BorrowWithinCohortApplyConfiguration { + b.Policy = &value + return b +} + +// WithMaxPriorityThreshold sets the MaxPriorityThreshold field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the MaxPriorityThreshold field is set to the value of the last call. +func (b *BorrowWithinCohortApplyConfiguration) WithMaxPriorityThreshold(value int32) *BorrowWithinCohortApplyConfiguration { + b.MaxPriorityThreshold = &value + return b +} diff --git a/client-go/applyconfiguration/kueue/v1beta1/clusterqueuepreemption.go b/client-go/applyconfiguration/kueue/v1beta1/clusterqueuepreemption.go index 0b267cfd05..1d6fc57b3c 100644 --- a/client-go/applyconfiguration/kueue/v1beta1/clusterqueuepreemption.go +++ b/client-go/applyconfiguration/kueue/v1beta1/clusterqueuepreemption.go @@ -24,8 +24,9 @@ import ( // ClusterQueuePreemptionApplyConfiguration represents an declarative configuration of the ClusterQueuePreemption type for use // with apply. type ClusterQueuePreemptionApplyConfiguration struct { - ReclaimWithinCohort *v1beta1.PreemptionPolicy `json:"reclaimWithinCohort,omitempty"` - WithinClusterQueue *v1beta1.PreemptionPolicy `json:"withinClusterQueue,omitempty"` + ReclaimWithinCohort *v1beta1.PreemptionPolicy `json:"reclaimWithinCohort,omitempty"` + BorrowWithinCohort *BorrowWithinCohortApplyConfiguration `json:"borrowFromCohortConfig,omitempty"` + WithinClusterQueue *v1beta1.PreemptionPolicy `json:"withinClusterQueue,omitempty"` } // ClusterQueuePreemptionApplyConfiguration constructs an declarative configuration of the ClusterQueuePreemption type for use with @@ -42,6 +43,14 @@ func (b *ClusterQueuePreemptionApplyConfiguration) WithReclaimWithinCohort(value return b } +// WithBorrowWithinCohort sets the BorrowWithinCohort field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the BorrowWithinCohort field is set to the value of the last call. +func (b *ClusterQueuePreemptionApplyConfiguration) WithBorrowWithinCohort(value *BorrowWithinCohortApplyConfiguration) *ClusterQueuePreemptionApplyConfiguration { + b.BorrowWithinCohort = value + return b +} + // WithWithinClusterQueue sets the WithinClusterQueue field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. // If called multiple times, the WithinClusterQueue field is set to the value of the last call. diff --git a/client-go/applyconfiguration/utils.go b/client-go/applyconfiguration/utils.go index d160d50a1e..bd6fa7b056 100644 --- a/client-go/applyconfiguration/utils.go +++ b/client-go/applyconfiguration/utils.go @@ -42,6 +42,8 @@ func ForKind(kind schema.GroupVersionKind) interface{} { return &kueuev1beta1.AdmissionCheckStateApplyConfiguration{} case v1beta1.SchemeGroupVersion.WithKind("AdmissionCheckStatus"): return &kueuev1beta1.AdmissionCheckStatusApplyConfiguration{} + case v1beta1.SchemeGroupVersion.WithKind("BorrowWithinCohort"): + return &kueuev1beta1.BorrowWithinCohortApplyConfiguration{} case v1beta1.SchemeGroupVersion.WithKind("ClusterQueue"): return &kueuev1beta1.ClusterQueueApplyConfiguration{} case v1beta1.SchemeGroupVersion.WithKind("ClusterQueuePendingWorkload"): diff --git a/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml b/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml index 457e51a63a..0b8cd40967 100644 --- a/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml +++ b/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml @@ -163,6 +163,30 @@ spec: of Workloads to preempt to accomomdate the pending Workload, preempting Workloads with lower priority first." properties: + borrowFromCohortConfig: + description: borrowWithinCohort provides configuration to allow + preemption within cohort while borrowing. + properties: + maxPriorityThreshold: + description: maxPriorityThreshold allows to restrict the set + of workloads for preemption to reclaim quota within the + cohort, to only workloads with priority below or equal the + specified level. + format: int32 + type: integer + policy: + default: Never + description: 'policy determines the policy for preemption + to reclaim quota within cohort while borrowing. Possible + values are: - `Never` (default): do not reclaim quota within + ClusterQueue while borrowing. - `LowerPriority`: allow preempting + workloads to reclaim quota within the cohort, but only by + preempting lower-priority workloads.' + enum: + - Never + - LowerPriority + type: string + type: object reclaimWithinCohort: default: Never description: "reclaimWithinCohort determines whether a pending diff --git a/pkg/scheduler/flavorassigner/flavorassigner.go b/pkg/scheduler/flavorassigner/flavorassigner.go index c653fccb79..0db148bfd4 100644 --- a/pkg/scheduler/flavorassigner/flavorassigner.go +++ b/pkg/scheduler/flavorassigner/flavorassigner.go @@ -558,6 +558,13 @@ func fitsResourceQuota(fName kueue.ResourceFlavorReference, rName corev1.Resourc // ClusterQueue are preempted. mode = Preempt } + if cq.Preemption.BorrowWithinCohort != nil && cq.Preemption.BorrowWithinCohort.Policy != kueue.BorrowWithinCohortPolicyNever { + // when preemption with borrowing is enabled, we can succeeded admitting the + // workload if preemption is used. + if rQuota.BorrowingLimit != nil && val <= rQuota.Nominal+*rQuota.BorrowingLimit { + mode = Preempt + } + } if rQuota.BorrowingLimit != nil && used+val > rQuota.Nominal+*rQuota.BorrowingLimit { status.append(fmt.Sprintf("borrowing limit for %s in flavor %s exceeded", rName, fName)) return mode, 0, &status diff --git a/pkg/scheduler/preemption/preemption.go b/pkg/scheduler/preemption/preemption.go index 788e99fdf3..954286838f 100644 --- a/pkg/scheduler/preemption/preemption.go +++ b/pkg/scheduler/preemption/preemption.go @@ -29,6 +29,7 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -74,7 +75,7 @@ func candidatesOnlyFromQueue(candidates []*workload.Info, clusterQueue string) [ } // GetTargets returns the list of workloads that should be evicted in order to make room for wl. -func (p *Preemptor) GetTargets(wl workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot) []*workload.Info { +func (p *Preemptor) GetTargets(wl workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot, borrowWithinCohort *kueue.BorrowWithinCohort) []*workload.Info { resPerFlv := resourcesRequiringPreemption(assignment) cq := snapshot.ClusterQueues[wl.ClusterQueue] @@ -92,20 +93,27 @@ func (p *Preemptor) GetTargets(wl workload.Info, assignment flavorassigner.Assig // if not borrowing at the same time. Kueue prioritizes preemption of // workloads from the other queues (that borrowed resources) first, before // trying to preempt more own workloads and borrow at the same time. - if len(sameQueueCandidates) == len(candidates) { // There is no risk of preemption of workloads from the other queue, // so we can try borrowing. - targets = minimalPreemptions(&wl, assignment, snapshot, resPerFlv, candidates, true) + targets = minimalPreemptions(&wl, assignment, snapshot, resPerFlv, candidates, true, nil) } else { // There is a risk of preemption of workloads from the other queue in the // cohort, proceeding without borrowing. - targets = minimalPreemptions(&wl, assignment, snapshot, resPerFlv, candidates, false) - if len(targets) == 0 { + var belowPriority *int32 + allowBorrowing := borrowWithinCohort != nil && borrowWithinCohort.Policy != kueue.BorrowWithinCohortPolicyNever + if allowBorrowing { + belowPriority = ptr.To(priority.Priority(wl.Obj)) + if borrowWithinCohort.MaxPriorityThreshold != nil && *borrowWithinCohort.MaxPriorityThreshold < *belowPriority { + belowPriority = ptr.To(*borrowWithinCohort.MaxPriorityThreshold + 1) + } + } + targets = minimalPreemptions(&wl, assignment, snapshot, resPerFlv, candidates, allowBorrowing, belowPriority) + if len(targets) == 0 && !allowBorrowing { // Another attempt. This time only candidates from the same queue, but // with borrowing. The previous attempt didn't try borrowing and had broader // scope of preemption. - targets = minimalPreemptions(&wl, assignment, snapshot, resPerFlv, sameQueueCandidates, true) + targets = minimalPreemptions(&wl, assignment, snapshot, resPerFlv, sameQueueCandidates, true, nil) } } @@ -156,7 +164,7 @@ func (p *Preemptor) applyPreemptionWithSSA(ctx context.Context, w *kueue.Workloa // Once the Workload fits, the heuristic tries to add Workloads back, in the // reverse order in which they were removed, while the incoming Workload still // fits. -func minimalPreemptions(wl *workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowing bool) []*workload.Info { +func minimalPreemptions(wl *workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowing bool, belowPriority *int32) []*workload.Info { wlReq := totalRequestsForAssignment(wl, assignment) cq := snapshot.ClusterQueues[wl.ClusterQueue] // Simulate removing all candidates from the ClusterQueue and cohort. @@ -167,6 +175,9 @@ func minimalPreemptions(wl *workload.Info, assignment flavorassigner.Assignment, if cq != candCQ && !cqIsBorrowing(candCQ, resPerFlv) { continue } + if cq != candCQ && belowPriority != nil && priority.Priority(candWl.Obj) >= *belowPriority { + continue + } snapshot.RemoveWorkload(candWl) targets = append(targets, candWl) if workloadFits(wlReq, cq, allowBorrowing) { @@ -253,9 +264,9 @@ func findCandidates(wl *kueue.Workload, cq *cache.ClusterQueue, resPerFlv resour // Can't reclaim quota from itself or ClusterQueues that are not borrowing. continue } - onlyLowerPrio := true - if cq.Preemption.ReclaimWithinCohort == kueue.PreemptionPolicyAny { - onlyLowerPrio = false + onlyLowerPrio := false + if cq.Preemption.ReclaimWithinCohort != kueue.PreemptionPolicyAny { + onlyLowerPrio = true } for _, candidateWl := range cohortCQ.Workloads { if onlyLowerPrio && priority.Priority(candidateWl.Obj) >= priority.Priority(wl) { @@ -268,6 +279,7 @@ func findCandidates(wl *kueue.Workload, cq *cache.ClusterQueue, resPerFlv resour } } } + return candidates } diff --git a/pkg/scheduler/preemption/preemption_test.go b/pkg/scheduler/preemption/preemption_test.go index a0ef325b4f..c1b35a76e0 100644 --- a/pkg/scheduler/preemption/preemption_test.go +++ b/pkg/scheduler/preemption/preemption_test.go @@ -790,7 +790,7 @@ func TestPreemption(t *testing.T) { snapshot := cqCache.Snapshot() wlInfo := workload.NewInfo(tc.incoming) wlInfo.ClusterQueue = tc.targetCQ - targets := preemptor.GetTargets(*wlInfo, tc.assignment, &snapshot) + targets := preemptor.GetTargets(*wlInfo, tc.assignment, &snapshot, nil) preempted, err := preemptor.IssuePreemptions(ctx, targets, snapshot.ClusterQueues[wlInfo.ClusterQueue]) if err != nil { t.Fatalf("Failed doing preemption") diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 95e127910f..088a8ad123 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -313,7 +313,7 @@ func (s *Scheduler) nominate(ctx context.Context, workloads []workload.Info, sna } else if err := s.validateLimitRange(ctx, &w); err != nil { e.inadmissibleMsg = err.Error() } else { - e.assignment, e.preemptionTargets = s.getAssignments(log, &e.Info, &snap) + e.assignment, e.preemptionTargets = s.getAssignments(log, &e.Info, &snap, cq.Preemption.BorrowWithinCohort) e.inadmissibleMsg = e.assignment.Message() e.Info.LastAssignment = &e.assignment.LastState } @@ -327,7 +327,7 @@ type partialAssignment struct { preemptionTargets []*workload.Info } -func (s *Scheduler) getAssignments(log logr.Logger, wl *workload.Info, snap *cache.Snapshot) (flavorassigner.Assignment, []*workload.Info) { +func (s *Scheduler) getAssignments(log logr.Logger, wl *workload.Info, snap *cache.Snapshot, borrowWithinCohort *kueue.BorrowWithinCohort) (flavorassigner.Assignment, []*workload.Info) { cq := snap.ClusterQueues[wl.ClusterQueue] fullAssignment := flavorassigner.AssignFlavors(log, wl, snap.ResourceFlavors, cq, nil) var fullAssignmentTargets []*workload.Info @@ -338,7 +338,7 @@ func (s *Scheduler) getAssignments(log logr.Logger, wl *workload.Info, snap *cac } if arm == flavorassigner.Preempt { - fullAssignmentTargets = s.preemptor.GetTargets(*wl, fullAssignment, snap) + fullAssignmentTargets = s.preemptor.GetTargets(*wl, fullAssignment, snap, borrowWithinCohort) } // if the feature gate is not enabled or we can preempt @@ -352,7 +352,7 @@ func (s *Scheduler) getAssignments(log logr.Logger, wl *workload.Info, snap *cac if assignment.RepresentativeMode() == flavorassigner.Fit { return &partialAssignment{assignment: assignment}, true } - preemptionTargets := s.preemptor.GetTargets(*wl, assignment, snap) + preemptionTargets := s.preemptor.GetTargets(*wl, assignment, snap, borrowWithinCohort) if len(preemptionTargets) > 0 { return &partialAssignment{assignment: assignment, preemptionTargets: preemptionTargets}, true diff --git a/test/integration/scheduler/preemption_test.go b/test/integration/scheduler/preemption_test.go index a7673b893b..e8bd03b8f4 100644 --- a/test/integration/scheduler/preemption_test.go +++ b/test/integration/scheduler/preemption_test.go @@ -22,7 +22,10 @@ import ( "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" + apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/util/testing" @@ -33,6 +36,7 @@ const ( lowPriority int32 = iota - 1 midPriority highPriority + veryHighPriority ) var _ = ginkgo.Describe("Preemption", func() { @@ -355,4 +359,181 @@ var _ = ginkgo.Describe("Preemption", func() { }) }) + ginkgo.Context("When most quota is in a shared ClusterQueue in a cohort", func() { + var ( + aStandardCQ, aBestEffortCQ, bStandardCQ, sharedCQ *kueue.ClusterQueue + aStandardLQ, aBestEffortLQ, bStandardLQ *kueue.LocalQueue + ) + + ginkgo.BeforeEach(func() { + aStandardCQ = testing.MakeClusterQueue("a-standard-cq"). + Cohort("all"). + ResourceGroup(*testing.MakeFlavorQuotas("alpha").Resource(corev1.ResourceCPU, "1", "5").Obj()). + Preemption(kueue.ClusterQueuePreemption{ + ReclaimWithinCohort: kueue.PreemptionPolicyLowerPriority, + BorrowWithinCohort: &kueue.BorrowWithinCohort{ + Policy: kueue.BorrowWithinCohortPolicyLowerPriority, + MaxPriorityThreshold: ptr.To(midPriority), + }, + }). + Obj() + gomega.Expect(k8sClient.Create(ctx, aStandardCQ)).To(gomega.Succeed()) + aStandardLQ = testing.MakeLocalQueue("a-standard-lq", ns.Name).ClusterQueue(aStandardCQ.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, aStandardLQ)).To(gomega.Succeed()) + + aBestEffortCQ = testing.MakeClusterQueue("a-best-effort-cq"). + Cohort("all"). + ResourceGroup(*testing.MakeFlavorQuotas("alpha").Resource(corev1.ResourceCPU, "1", "5").Obj()). + Preemption(kueue.ClusterQueuePreemption{ + ReclaimWithinCohort: kueue.PreemptionPolicyLowerPriority, + BorrowWithinCohort: &kueue.BorrowWithinCohort{ + Policy: kueue.BorrowWithinCohortPolicyLowerPriority, + }, + }). + Obj() + gomega.Expect(k8sClient.Create(ctx, aBestEffortCQ)).To(gomega.Succeed()) + aBestEffortLQ = testing.MakeLocalQueue("a-best-effort-lq", ns.Name).ClusterQueue(aBestEffortCQ.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, aBestEffortLQ)).To(gomega.Succeed()) + + bStandardCQ = testing.MakeClusterQueue("b-standard-cq"). + Cohort("all"). + ResourceGroup(*testing.MakeFlavorQuotas("alpha").Resource(corev1.ResourceCPU, "1", "5").Obj()). + Preemption(kueue.ClusterQueuePreemption{ + ReclaimWithinCohort: kueue.PreemptionPolicyLowerPriority, + BorrowWithinCohort: &kueue.BorrowWithinCohort{ + Policy: kueue.BorrowWithinCohortPolicyLowerPriority, + MaxPriorityThreshold: ptr.To(midPriority), + }, + }). + Obj() + gomega.Expect(k8sClient.Create(ctx, bStandardCQ)).To(gomega.Succeed()) + bStandardLQ = testing.MakeLocalQueue("b-standard-lq", ns.Name).ClusterQueue(bStandardCQ.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, bStandardLQ)).To(gomega.Succeed()) + + sharedCQ = testing.MakeClusterQueue("shared-cq"). + Cohort("all"). + ResourceGroup(*testing.MakeFlavorQuotas("alpha").Resource(corev1.ResourceCPU, "5").Obj()). + Obj() + gomega.Expect(k8sClient.Create(ctx, sharedCQ)).To(gomega.Succeed()) + }) + + ginkgo.AfterEach(func() { + gomega.Expect(util.DeleteWorkloadsInNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) + util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, aStandardCQ, true) + util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, aBestEffortCQ, true) + util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, bStandardCQ, true) + util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, sharedCQ, true) + }) + + ginkgo.It("Should allow a higher-priority workload to preempt a lower priority workload", func() { + ginkgo.By("Create a low priority workload which requires borrowing") + aBestEffortLowWl := testing.MakeWorkload("a-best-effort-low", ns.Name). + Queue(aBestEffortLQ.Name). + Priority(lowPriority). + Request(corev1.ResourceCPU, "5"). + Obj() + gomega.Expect(k8sClient.Create(ctx, aBestEffortLowWl)).To(gomega.Succeed()) + + ginkgo.By("Await for the a-best-effort-low workload to be admitted") + util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, aBestEffortCQ.Name, aBestEffortLowWl) + + ginkgo.By("Create the a-standard-high workload") + aStandardHighWl := testing.MakeWorkload("a-standard-high", ns.Name). + Queue(aStandardLQ.Name). + Priority(highPriority). + Request(corev1.ResourceCPU, "5"). + Obj() + gomega.Expect(k8sClient.Create(ctx, aStandardHighWl)).To(gomega.Succeed()) + + ginkgo.By("Finish eviction fo the a-best-effort-low workload") + util.FinishEvictionForWorkloads(ctx, k8sClient, aBestEffortLowWl) + + ginkgo.By("Verify the a-standard-high workload is admitted") + util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, aStandardCQ.Name, aStandardHighWl) + + ginkgo.By("Verify the a-best-effort-low workload is pending") + util.ExpectWorkloadsToBePending(ctx, k8sClient, aBestEffortLowWl) + }) + + ginkgo.It("Should not allow a lower-priority workload to preempt a higher priority workload", func() { + ginkgo.By("Create a high priority workload which requires borrowing") + aStandardHighWl := testing.MakeWorkload("a-standard-high", ns.Name). + Queue(aStandardLQ.Name). + Priority(highPriority). + Request(corev1.ResourceCPU, "5"). + Obj() + gomega.Expect(k8sClient.Create(ctx, aStandardHighWl)).To(gomega.Succeed()) + + ginkgo.By("Await for the a-standard-high workload to be admitted") + util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, aStandardCQ.Name, aStandardHighWl) + + aBestEffortLowWl := testing.MakeWorkload("a-best-effort-low", ns.Name). + Queue(aBestEffortLQ.Name). + Priority(lowPriority). + Request(corev1.ResourceCPU, "5"). + Obj() + gomega.Expect(k8sClient.Create(ctx, aBestEffortLowWl)).To(gomega.Succeed()) + + ginkgo.By("Verify the a-standard-high workload remains admitted") + gomega.Consistently(func() bool { + gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(aStandardHighWl), aStandardHighWl)).To(gomega.Succeed()) + return apimeta.IsStatusConditionTrue(aStandardHighWl.Status.Conditions, kueue.WorkloadEvicted) + }, util.ConsistentDuration, util.Interval).Should(gomega.BeFalse()) + }) + + ginkgo.It("Should not allow a higher-priority workload to preempt a lower-priority workload, if above the threshold", func() { + ginkgo.By("Create a high priority workload which requires borrowing") + aStandardHighWl := testing.MakeWorkload("a-standard-high", ns.Name). + Queue(aStandardLQ.Name). + Priority(highPriority). + Request(corev1.ResourceCPU, "5"). + Obj() + gomega.Expect(k8sClient.Create(ctx, aStandardHighWl)).To(gomega.Succeed()) + + ginkgo.By("Await for the a-standard-high workload to be admitted") + util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, aStandardCQ.Name, aStandardHighWl) + + ginkgo.By("Create the b-standard-very-high workload") + bStandardVeryHighWl := testing.MakeWorkload("b-standard-very-high", ns.Name). + Queue(bStandardLQ.Name). + Priority(veryHighPriority). + Request(corev1.ResourceCPU, "5"). + Obj() + gomega.Expect(k8sClient.Create(ctx, bStandardVeryHighWl)).To(gomega.Succeed()) + + ginkgo.By("Verify the a-standard-high workload remains admitted") + gomega.Consistently(func() bool { + gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(aStandardHighWl), aStandardHighWl)).To(gomega.Succeed()) + return apimeta.IsStatusConditionTrue(aStandardHighWl.Status.Conditions, kueue.WorkloadEvicted) + }, util.ConsistentDuration, util.Interval).Should(gomega.BeFalse()) + }) + + ginkgo.It("Should not allow a higher-priority workload to preempt a lower-priority workload, if the low priority one is not borrowing", func() { + ginkgo.By("Create a low priority workload which does not borrowing") + aBestEffortLowWl := testing.MakeWorkload("a-best-effort-low", ns.Name). + Queue(aBestEffortLQ.Name). + Priority(lowPriority). + Request(corev1.ResourceCPU, "1"). + Obj() + gomega.Expect(k8sClient.Create(ctx, aBestEffortLowWl)).To(gomega.Succeed()) + + ginkgo.By("Await for the a-best-effort-low workload to be admitted") + util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, aBestEffortCQ.Name, aBestEffortLowWl) + + ginkgo.By("Create the a-standard-high workload") + aStandardHighWl := testing.MakeWorkload("a-standard-high", ns.Name). + Queue(aStandardLQ.Name). + Priority(veryHighPriority). + Request(corev1.ResourceCPU, "5"). + Obj() + gomega.Expect(k8sClient.Create(ctx, aStandardHighWl)).To(gomega.Succeed()) + + ginkgo.By("Verify the a-best-effort-low workload remains admitted") + gomega.Consistently(func() bool { + gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(aBestEffortLowWl), aBestEffortLowWl)).To(gomega.Succeed()) + return apimeta.IsStatusConditionTrue(aBestEffortLowWl.Status.Conditions, kueue.WorkloadEvicted) + }, util.ConsistentDuration, util.Interval).Should(gomega.BeFalse()) + }) + }) + })