diff --git a/pkg/cache/clusterqueue.go b/pkg/cache/clusterqueue.go index 7fe289a09e..949338e7ff 100644 --- a/pkg/cache/clusterqueue.go +++ b/pkg/cache/clusterqueue.go @@ -453,6 +453,29 @@ func updateUsage(wi *workload.Info, flvUsage FlavorResourceQuantities, m int64) } } +func updateCohortUsage(wi *workload.Info, cq *ClusterQueue, m int64) { + for _, ps := range wi.TotalRequests { + for wlRes, wlResFlv := range ps.Flavors { + v, wlResExist := ps.Requests[wlRes] + flv, flvExist := cq.Cohort.Usage[wlResFlv] + if flvExist && wlResExist { + if _, exists := flv[wlRes]; exists { + after := cq.Usage[wlResFlv][wlRes] - cq.guaranteedQuota(wlResFlv, wlRes) + // rollback update cq.Usage + before := after - v*m + if before > 0 { + flv[wlRes] -= before + } + // simulate updating cq.Usage + if after > 0 { + flv[wlRes] += after + } + } + } + } + } +} + func (c *ClusterQueue) addLocalQueue(q *kueue.LocalQueue) error { qKey := queueKey(q) if _, ok := c.localQueues[qKey]; ok { diff --git a/pkg/cache/snapshot.go b/pkg/cache/snapshot.go index fa89788e00..870fa64c39 100644 --- a/pkg/cache/snapshot.go +++ b/pkg/cache/snapshot.go @@ -43,18 +43,26 @@ func (s *Snapshot) RemoveWorkload(wl *workload.Info) { delete(cq.Workloads, workload.Key(wl.Obj)) updateUsage(wl, cq.Usage, -1) if cq.Cohort != nil { - updateUsage(wl, cq.Cohort.Usage, -1) + if features.Enabled(features.LendingLimit) { + updateCohortUsage(wl, cq, -1) + } else { + updateUsage(wl, cq.Cohort.Usage, -1) + } } } -// AddWorkload removes a workload from its corresponding ClusterQueue and +// AddWorkload adds a workload from its corresponding ClusterQueue and // updates resource usage. func (s *Snapshot) AddWorkload(wl *workload.Info) { cq := s.ClusterQueues[wl.ClusterQueue] cq.Workloads[workload.Key(wl.Obj)] = wl updateUsage(wl, cq.Usage, 1) if cq.Cohort != nil { - updateUsage(wl, cq.Cohort.Usage, 1) + if features.Enabled(features.LendingLimit) { + updateCohortUsage(wl, cq, 1) + } else { + updateUsage(wl, cq.Cohort.Usage, 1) + } } } diff --git a/pkg/cache/snapshot_test.go b/pkg/cache/snapshot_test.go index d54ae71dbb..61449039e1 100644 --- a/pkg/cache/snapshot_test.go +++ b/pkg/cache/snapshot_test.go @@ -864,3 +864,447 @@ func TestSnapshotAddRemoveWorkload(t *testing.T) { }) } } + +func TestSnapshotAddRemoveWorkloadWithLendingLimit(t *testing.T) { + _ = features.SetEnable(features.LendingLimit, true) + flavors := []*kueue.ResourceFlavor{ + utiltesting.MakeResourceFlavor("default").Obj(), + } + clusterQueues := []*kueue.ClusterQueue{ + utiltesting.MakeClusterQueue("lend-a"). + Cohort("lend"). + ResourceGroup( + *utiltesting.MakeFlavorQuotas("default").Resource(corev1.ResourceCPU, "10", "", "4").Obj(), + ). + Preemption(kueue.ClusterQueuePreemption{ + WithinClusterQueue: kueue.PreemptionPolicyLowerPriority, + ReclaimWithinCohort: kueue.PreemptionPolicyLowerPriority, + }). + Obj(), + utiltesting.MakeClusterQueue("lend-b"). + Cohort("lend"). + ResourceGroup( + *utiltesting.MakeFlavorQuotas("default").Resource(corev1.ResourceCPU, "10", "", "6").Obj(), + ). + Preemption(kueue.ClusterQueuePreemption{ + WithinClusterQueue: kueue.PreemptionPolicyNever, + ReclaimWithinCohort: kueue.PreemptionPolicyAny, + }). + Obj(), + } + workloads := []kueue.Workload{ + *utiltesting.MakeWorkload("lend-a-1", ""). + Request(corev1.ResourceCPU, "1"). + ReserveQuota(utiltesting.MakeAdmission("lend-a").Assignment(corev1.ResourceCPU, "default", "1").Obj()). + Obj(), + *utiltesting.MakeWorkload("lend-a-2", ""). + Request(corev1.ResourceCPU, "9"). + ReserveQuota(utiltesting.MakeAdmission("lend-a").Assignment(corev1.ResourceCPU, "default", "9").Obj()). + Obj(), + *utiltesting.MakeWorkload("lend-a-3", ""). + Request(corev1.ResourceCPU, "6"). + ReserveQuota(utiltesting.MakeAdmission("lend-a").Assignment(corev1.ResourceCPU, "default", "6").Obj()). + Obj(), + *utiltesting.MakeWorkload("lend-b-1", ""). + Request(corev1.ResourceCPU, "4"). + ReserveQuota(utiltesting.MakeAdmission("lend-b").Assignment(corev1.ResourceCPU, "default", "4").Obj()). + Obj(), + } + + ctx := context.Background() + cl := utiltesting.NewClientBuilder().WithLists(&kueue.WorkloadList{Items: workloads}).Build() + + cqCache := New(cl) + for _, flv := range flavors { + cqCache.AddOrUpdateResourceFlavor(flv) + } + for _, cq := range clusterQueues { + if err := cqCache.AddClusterQueue(ctx, cq); err != nil { + t.Fatalf("Couldn't add ClusterQueue to cache: %v", err) + } + } + wlInfos := make(map[string]*workload.Info, len(workloads)) + for _, cq := range cqCache.clusterQueues { + for _, wl := range cq.Workloads { + wlInfos[workload.Key(wl.Obj)] = wl + } + } + initialSnapshot := cqCache.Snapshot() + initialCohortResources := initialSnapshot.ClusterQueues["lend-a"].Cohort.RequestableResources + cases := map[string]struct { + remove []string + add []string + want Snapshot + }{ + "remove all then add all": { + remove: []string{"/lend-a-1", "/lend-a-2", "/lend-a-3", "/lend-b-1"}, + add: []string{"/lend-a-1", "/lend-a-2", "/lend-a-3", "/lend-b-1"}, + want: initialSnapshot, + }, + "remove all": { + remove: []string{"/lend-a-1", "/lend-a-2", "/lend-a-3", "/lend-b-1"}, + want: func() Snapshot { + cohort := &Cohort{ + Name: "lend", + AllocatableResourceGeneration: 2, + RequestableResources: initialCohortResources, + Usage: FlavorResourceQuantities{ + "default": {corev1.ResourceCPU: 0}, + }, + } + return Snapshot{ + ClusterQueues: map[string]*ClusterQueue{ + "lend-a": { + Name: "lend-a", + Cohort: cohort, + Workloads: make(map[string]*workload.Info), + ResourceGroups: cqCache.clusterQueues["lend-a"].ResourceGroups, + FlavorFungibility: defaultFlavorFungibility, + AllocatableResourceGeneration: 1, + Usage: FlavorResourceQuantities{ + "default": {corev1.ResourceCPU: 0}, + }, + GuaranteedQuota: FlavorResourceQuantities{ + "default": { + corev1.ResourceCPU: 6_000, + }, + }, + }, + "lend-b": { + Name: "lend-b", + Cohort: cohort, + Workloads: make(map[string]*workload.Info), + ResourceGroups: cqCache.clusterQueues["lend-b"].ResourceGroups, + FlavorFungibility: defaultFlavorFungibility, + AllocatableResourceGeneration: 1, + Usage: FlavorResourceQuantities{ + "default": {corev1.ResourceCPU: 0}, + }, + GuaranteedQuota: FlavorResourceQuantities{ + "default": { + corev1.ResourceCPU: 4_000, + }, + }, + }, + }, + } + }(), + }, + "remove workload, but still using quota over GuaranteedQuota": { + remove: []string{"/lend-a-2"}, + want: func() Snapshot { + cohort := &Cohort{ + Name: "lend", + AllocatableResourceGeneration: 2, + RequestableResources: initialCohortResources, + Usage: FlavorResourceQuantities{ + "default": {corev1.ResourceCPU: 1_000}, + }, + } + return Snapshot{ + ClusterQueues: map[string]*ClusterQueue{ + "lend-a": { + Name: "lend-a", + Cohort: cohort, + Workloads: make(map[string]*workload.Info), + ResourceGroups: cqCache.clusterQueues["lend-a"].ResourceGroups, + FlavorFungibility: defaultFlavorFungibility, + AllocatableResourceGeneration: 1, + Usage: FlavorResourceQuantities{ + "default": {corev1.ResourceCPU: 7_000}, + }, + GuaranteedQuota: FlavorResourceQuantities{ + "default": { + corev1.ResourceCPU: 6_000, + }, + }, + }, + "lend-b": { + Name: "lend-b", + Cohort: cohort, + Workloads: make(map[string]*workload.Info), + ResourceGroups: cqCache.clusterQueues["lend-b"].ResourceGroups, + FlavorFungibility: defaultFlavorFungibility, + AllocatableResourceGeneration: 1, + Usage: FlavorResourceQuantities{ + "default": {corev1.ResourceCPU: 4_000}, + }, + GuaranteedQuota: FlavorResourceQuantities{ + "default": { + corev1.ResourceCPU: 4_000, + }, + }, + }, + }, + } + }(), + }, + "remove wokload, using same quota as GuaranteedQuota": { + remove: []string{"/lend-a-1", "/lend-a-2"}, + want: func() Snapshot { + cohort := &Cohort{ + Name: "lend", + AllocatableResourceGeneration: 2, + RequestableResources: initialCohortResources, + Usage: FlavorResourceQuantities{ + "default": {corev1.ResourceCPU: 0}, + }, + } + return Snapshot{ + ClusterQueues: map[string]*ClusterQueue{ + "lend-a": { + Name: "lend-a", + Cohort: cohort, + Workloads: make(map[string]*workload.Info), + ResourceGroups: cqCache.clusterQueues["lend-a"].ResourceGroups, + FlavorFungibility: defaultFlavorFungibility, + AllocatableResourceGeneration: 1, + Usage: FlavorResourceQuantities{ + "default": {corev1.ResourceCPU: 6_000}, + }, + GuaranteedQuota: FlavorResourceQuantities{ + "default": { + corev1.ResourceCPU: 6_000, + }, + }, + }, + "lend-b": { + Name: "lend-b", + Cohort: cohort, + Workloads: make(map[string]*workload.Info), + ResourceGroups: cqCache.clusterQueues["lend-b"].ResourceGroups, + FlavorFungibility: defaultFlavorFungibility, + AllocatableResourceGeneration: 1, + Usage: FlavorResourceQuantities{ + "default": {corev1.ResourceCPU: 4_000}, + }, + GuaranteedQuota: FlavorResourceQuantities{ + "default": { + corev1.ResourceCPU: 4_000, + }, + }, + }, + }, + } + }(), + }, + "remove workload, using less quota than GuaranteedQuota": { + remove: []string{"/lend-a-2", "/lend-a-3"}, + want: func() Snapshot { + cohort := &Cohort{ + Name: "lend", + AllocatableResourceGeneration: 2, + RequestableResources: initialCohortResources, + Usage: FlavorResourceQuantities{ + "default": {corev1.ResourceCPU: 0}, + }, + } + return Snapshot{ + ClusterQueues: map[string]*ClusterQueue{ + "lend-a": { + Name: "lend-a", + Cohort: cohort, + Workloads: make(map[string]*workload.Info), + ResourceGroups: cqCache.clusterQueues["lend-a"].ResourceGroups, + FlavorFungibility: defaultFlavorFungibility, + AllocatableResourceGeneration: 1, + Usage: FlavorResourceQuantities{ + "default": {corev1.ResourceCPU: 1_000}, + }, + GuaranteedQuota: FlavorResourceQuantities{ + "default": { + corev1.ResourceCPU: 6_000, + }, + }, + }, + "lend-b": { + Name: "lend-b", + Cohort: cohort, + Workloads: make(map[string]*workload.Info), + ResourceGroups: cqCache.clusterQueues["lend-b"].ResourceGroups, + FlavorFungibility: defaultFlavorFungibility, + AllocatableResourceGeneration: 1, + Usage: FlavorResourceQuantities{ + "default": {corev1.ResourceCPU: 4_000}, + }, + GuaranteedQuota: FlavorResourceQuantities{ + "default": { + corev1.ResourceCPU: 4_000, + }, + }, + }, + }, + } + }(), + }, + "remove all then add workload, using less quota than GuaranteedQuota": { + remove: []string{"/lend-a-1", "/lend-a-2", "/lend-a-3", "/lend-b-1"}, + add: []string{"/lend-a-1"}, + want: func() Snapshot { + cohort := &Cohort{ + Name: "lend", + AllocatableResourceGeneration: 2, + RequestableResources: initialCohortResources, + Usage: FlavorResourceQuantities{ + "default": {corev1.ResourceCPU: 0}, + }, + } + return Snapshot{ + ClusterQueues: map[string]*ClusterQueue{ + "lend-a": { + Name: "lend-a", + Cohort: cohort, + Workloads: make(map[string]*workload.Info), + ResourceGroups: cqCache.clusterQueues["lend-a"].ResourceGroups, + FlavorFungibility: defaultFlavorFungibility, + AllocatableResourceGeneration: 1, + Usage: FlavorResourceQuantities{ + "default": {corev1.ResourceCPU: 1_000}, + }, + GuaranteedQuota: FlavorResourceQuantities{ + "default": { + corev1.ResourceCPU: 6_000, + }, + }, + }, + "lend-b": { + Name: "lend-b", + Cohort: cohort, + Workloads: make(map[string]*workload.Info), + ResourceGroups: cqCache.clusterQueues["lend-b"].ResourceGroups, + FlavorFungibility: defaultFlavorFungibility, + AllocatableResourceGeneration: 1, + Usage: FlavorResourceQuantities{ + "default": {corev1.ResourceCPU: 0}, + }, + GuaranteedQuota: FlavorResourceQuantities{ + "default": { + corev1.ResourceCPU: 4_000, + }, + }, + }, + }, + } + }(), + }, + "remove all then add workload, using same quota as GuaranteedQuota": { + remove: []string{"/lend-a-1", "/lend-a-2", "/lend-a-3", "/lend-b-1"}, + add: []string{"/lend-a-3"}, + want: func() Snapshot { + cohort := &Cohort{ + Name: "lend", + AllocatableResourceGeneration: 2, + RequestableResources: initialCohortResources, + Usage: FlavorResourceQuantities{ + "default": {corev1.ResourceCPU: 0}, + }, + } + return Snapshot{ + ClusterQueues: map[string]*ClusterQueue{ + "lend-a": { + Name: "lend-a", + Cohort: cohort, + Workloads: make(map[string]*workload.Info), + ResourceGroups: cqCache.clusterQueues["lend-a"].ResourceGroups, + FlavorFungibility: defaultFlavorFungibility, + AllocatableResourceGeneration: 1, + Usage: FlavorResourceQuantities{ + "default": {corev1.ResourceCPU: 6_000}, + }, + GuaranteedQuota: FlavorResourceQuantities{ + "default": { + corev1.ResourceCPU: 6_000, + }, + }, + }, + "lend-b": { + Name: "lend-b", + Cohort: cohort, + Workloads: make(map[string]*workload.Info), + ResourceGroups: cqCache.clusterQueues["lend-b"].ResourceGroups, + FlavorFungibility: defaultFlavorFungibility, + AllocatableResourceGeneration: 1, + Usage: FlavorResourceQuantities{ + "default": {corev1.ResourceCPU: 0}, + }, + GuaranteedQuota: FlavorResourceQuantities{ + "default": { + corev1.ResourceCPU: 4_000, + }, + }, + }, + }, + } + }(), + }, + "remove all then add workload, using quota over GuaranteedQuota": { + remove: []string{"/lend-a-1", "/lend-a-2", "/lend-a-3", "/lend-b-1"}, + add: []string{"/lend-a-2"}, + want: func() Snapshot { + cohort := &Cohort{ + Name: "lend", + AllocatableResourceGeneration: 2, + RequestableResources: initialCohortResources, + Usage: FlavorResourceQuantities{ + "default": {corev1.ResourceCPU: 3_000}, + }, + } + return Snapshot{ + ClusterQueues: map[string]*ClusterQueue{ + "lend-a": { + Name: "lend-a", + Cohort: cohort, + Workloads: make(map[string]*workload.Info), + ResourceGroups: cqCache.clusterQueues["lend-a"].ResourceGroups, + FlavorFungibility: defaultFlavorFungibility, + AllocatableResourceGeneration: 1, + Usage: FlavorResourceQuantities{ + "default": {corev1.ResourceCPU: 9_000}, + }, + GuaranteedQuota: FlavorResourceQuantities{ + "default": { + corev1.ResourceCPU: 6_000, + }, + }, + }, + "lend-b": { + Name: "lend-b", + Cohort: cohort, + Workloads: make(map[string]*workload.Info), + ResourceGroups: cqCache.clusterQueues["lend-b"].ResourceGroups, + FlavorFungibility: defaultFlavorFungibility, + AllocatableResourceGeneration: 1, + Usage: FlavorResourceQuantities{ + "default": {corev1.ResourceCPU: 0}, + }, + GuaranteedQuota: FlavorResourceQuantities{ + "default": { + corev1.ResourceCPU: 4_000, + }, + }, + }, + }, + } + }(), + }, + } + cmpOpts := append(snapCmpOpts, + cmpopts.IgnoreFields(ClusterQueue{}, "NamespaceSelector", "Preemption", "Status"), + cmpopts.IgnoreFields(Cohort{}), + cmpopts.IgnoreFields(Snapshot{}, "ResourceFlavors"), + cmpopts.IgnoreTypes(&workload.Info{})) + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + snap := cqCache.Snapshot() + for _, name := range tc.remove { + snap.RemoveWorkload(wlInfos[name]) + } + for _, name := range tc.add { + snap.AddWorkload(wlInfos[name]) + } + if diff := cmp.Diff(tc.want, snap, cmpOpts...); diff != "" { + t.Errorf("Unexpected snapshot state after operations (-want,+got):\n%s", diff) + } + }) + } +} diff --git a/pkg/scheduler/preemption/preemption_test.go b/pkg/scheduler/preemption/preemption_test.go index f4fc733231..5b7e6a4760 100644 --- a/pkg/scheduler/preemption/preemption_test.go +++ b/pkg/scheduler/preemption/preemption_test.go @@ -206,7 +206,6 @@ func TestPreemption(t *testing.T) { Cohort("cohort-lend"). ResourceGroup(*utiltesting.MakeFlavorQuotas("default"). Resource(corev1.ResourceCPU, "6", "", "4"). - Resource(corev1.ResourceMemory, "3Gi", "", "2Gi"). Obj(), ). Preemption(kueue.ClusterQueuePreemption{ @@ -217,13 +216,12 @@ func TestPreemption(t *testing.T) { utiltesting.MakeClusterQueue("lend2"). Cohort("cohort-lend"). ResourceGroup(*utiltesting.MakeFlavorQuotas("default"). - Resource(corev1.ResourceCPU, "6", "", "4"). - Resource(corev1.ResourceMemory, "3Gi", "", "2Gi"). + Resource(corev1.ResourceCPU, "6", "", "2"). Obj(), ). Preemption(kueue.ClusterQueuePreemption{ - WithinClusterQueue: kueue.PreemptionPolicyNever, - ReclaimWithinCohort: kueue.PreemptionPolicyAny, + WithinClusterQueue: kueue.PreemptionPolicyLowerPriority, + ReclaimWithinCohort: kueue.PreemptionPolicyLowerPriority, }). Obj(), } @@ -1043,6 +1041,27 @@ func TestPreemption(t *testing.T) { wantPreempted: sets.New("/lend1-low", "/lend2-low"), enableLendingLimit: true, }, + "cannot preempt from other ClusterQueues if exceeds requestable quota including lending limit": { + admitted: []kueue.Workload{ + *utiltesting.MakeWorkload("lend2-low", ""). + Priority(-1). + Request(corev1.ResourceCPU, "10"). + ReserveQuota(utiltesting.MakeAdmission("lend2").Assignment(corev1.ResourceCPU, "default", "10000m").Obj()). + Obj(), + }, + incoming: utiltesting.MakeWorkload("in", ""). + Request(corev1.ResourceCPU, "9"). + Obj(), + targetCQ: "lend1", + assignment: singlePodSetAssignment(flavorassigner.ResourceAssignment{ + corev1.ResourceCPU: &flavorassigner.FlavorAssignment{ + Name: "default", + Mode: flavorassigner.Preempt, + }, + }), + wantPreempted: nil, + enableLendingLimit: true, + }, } for name, tc := range cases { t.Run(name, func(t *testing.T) {