diff --git a/pkg/cache/clusterqueue.go b/pkg/cache/clusterqueue.go index b9fd6c0ed6..4c11c27ff0 100644 --- a/pkg/cache/clusterqueue.go +++ b/pkg/cache/clusterqueue.go @@ -5,6 +5,7 @@ import ( "fmt" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -200,6 +201,7 @@ func filterQuantities(orig FlavorResourceQuantities, resourceGroups []kueue.Reso } func (c *ClusterQueue) updateResourceGroups(in []kueue.ResourceGroup) { + oldRG := c.ResourceGroups c.ResourceGroups = make([]ResourceGroup, len(in)) for i, rgIn := range in { rg := &c.ResourceGroups[i] @@ -225,7 +227,9 @@ func (c *ClusterQueue) updateResourceGroups(in []kueue.ResourceGroup) { rg.Flavors = append(rg.Flavors, fQuotas) } } - c.AllocatableResourceGeneration++ + if !equality.Semantic.DeepEqual(oldRG, c.ResourceGroups) { + c.AllocatableResourceGeneration++ + } c.UpdateRGByResource() } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 476cd213fa..a46ade9450 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -175,8 +175,9 @@ func TestSchedule(t *testing.T) { }, } cases := map[string]struct { - workloads []kueue.Workload - admissionError error + workloads []kueue.Workload + needScheduleTwice bool + admissionError error // wantAssignments is a summary of all the admissions in the cache after this cycle. wantAssignments map[string]kueue.Admission // wantScheduled is the subset of workloads that got scheduled/admitted in this cycle. @@ -1110,6 +1111,9 @@ func TestSchedule(t *testing.T) { defer cancel() scheduler.schedule(ctx) + if tc.needScheduleTwice { + scheduler.schedule(ctx) + } wg.Wait() wantScheduled := make(map[string]kueue.Admission) @@ -1284,6 +1288,85 @@ func TestEntryOrdering(t *testing.T) { } } +func TestClusterQueueUpdate(t *testing.T) { + resourceFlavors := []*kueue.ResourceFlavor{ + {ObjectMeta: metav1.ObjectMeta{Name: "on-demand"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "spot"}}, + } + clusterQueue := + *utiltesting.MakeClusterQueue("eng-alpha"). + QueueingStrategy(kueue.StrictFIFO). + Preemption(kueue.ClusterQueuePreemption{ + WithinClusterQueue: kueue.PreemptionPolicyLowerPriority, + }). + FlavorFungibility(kueue.FlavorFungibility{ + WhenCanPreempt: kueue.Preempt, + }). + ResourceGroup( + *utiltesting.MakeFlavorQuotas("on-demand"). + Resource(corev1.ResourceCPU, "50", "50").Obj(), + *utiltesting.MakeFlavorQuotas("spot"). + Resource(corev1.ResourceCPU, "100", "0").Obj(), + ).Obj() + newClusterQueue2 := + *utiltesting.MakeClusterQueue("eng-alpha"). + QueueingStrategy(kueue.StrictFIFO). + Preemption(kueue.ClusterQueuePreemption{ + WithinClusterQueue: kueue.PreemptionPolicyLowerPriority, + }). + FlavorFungibility(kueue.FlavorFungibility{ + WhenCanPreempt: kueue.Preempt, + }). + ResourceGroup( + *utiltesting.MakeFlavorQuotas("on-demand"). + Resource(corev1.ResourceCPU, "100", "50").Obj(), + *utiltesting.MakeFlavorQuotas("spot"). + Resource(corev1.ResourceCPU, "100", "0").Obj(), + ).Obj() + cases := []struct { + name string + cqs *kueue.ClusterQueue + newcq *kueue.ClusterQueue + wantLastAssignmentGeneration int64 + }{ + { + name: "RGs not change", + cqs: &clusterQueue, + newcq: clusterQueue.DeepCopy(), + wantLastAssignmentGeneration: 1, + }, + { + name: "RGs changed", + cqs: &clusterQueue, + newcq: &newClusterQueue2, + wantLastAssignmentGeneration: 2, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + clientBuilder := utiltesting.NewClientBuilder(). + WithObjects( + &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "default"}}, + tc.cqs, + ) + cl := clientBuilder.Build() + cqCache := cache.New(cl) + // Workloads are loaded into queues or clusterQueues as we add them. + for _, rf := range resourceFlavors { + cqCache.AddOrUpdateResourceFlavor(rf) + } + cqCache.AddClusterQueue(context.Background(), tc.cqs) + cqCache.UpdateClusterQueue(tc.newcq) + snapshot := cqCache.Snapshot() + if diff := cmp.Diff( + tc.wantLastAssignmentGeneration, + snapshot.ClusterQueues["eng-alpha"].AllocatableResourceGeneration); diff != "" { + t.Errorf("Unexpected assigned clusterQueues in cache (-want,+got):\n%s", diff) + } + }) + } +} + func TestLastSchedulingContext(t *testing.T) { resourceFlavors := []*kueue.ResourceFlavor{ {ObjectMeta: metav1.ObjectMeta{Name: "on-demand"}}, @@ -1357,6 +1440,22 @@ func TestLastSchedulingContext(t *testing.T) { *utiltesting.MakeFlavorQuotas("spot"). Resource(corev1.ResourceCPU, "100", "0").Obj(), ).Obj(), + *utiltesting.MakeClusterQueue("cq-with-2-flavors"). + QueueingStrategy(kueue.StrictFIFO). + Preemption(kueue.ClusterQueuePreemption{ + ReclaimWithinCohort: kueue.PreemptionPolicyAny, + WithinClusterQueue: kueue.PreemptionPolicyLowerOrNewerEqualPriority, + }). + FlavorFungibility(kueue.FlavorFungibility{ + WhenCanBorrow: kueue.Borrow, + WhenCanPreempt: kueue.Preempt, + }). + ResourceGroup( + *utiltesting.MakeFlavorQuotas("on-demand"). + Resource(corev1.ResourceCPU, "2").Obj(), + *utiltesting.MakeFlavorQuotas("spot"). + Resource(corev1.ResourceCPU, "2").Obj()). + Obj(), } queues := []kueue.LocalQueue{ @@ -1396,6 +1495,15 @@ func TestLastSchedulingContext(t *testing.T) { ClusterQueue: "eng-cohort-theta", }, }, + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "sales", + Name: "local-queue-cq-with-2-flavors", + }, + Spec: kueue.LocalQueueSpec{ + ClusterQueue: "cq-with-2-flavors", + }, + }, } wl := utiltesting.MakeWorkload("low-1", "default"). Request(corev1.ResourceCPU, "50"). @@ -1412,6 +1520,78 @@ func TestLastSchedulingContext(t *testing.T) { wantAdmissionsOnFirstSchedule map[string]kueue.Admission wantAdmissionsOnSecondSchedule map[string]kueue.Admission }{ + { + name: "two flavors to schedule", + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("sample-job1", "sales"). + Queue("local-queue-cq-with-2-flavors"). + PodSets(*utiltesting.MakePodSet("main", 1). + Request(corev1.ResourceCPU, "1"). + Obj()). + ReserveQuota(utiltesting.MakeAdmission("cq-with-2-flavors").Assignment(corev1.ResourceCPU, "on-demand", "1").Obj()). + Obj(), + *utiltesting.MakeWorkload("sample-job2", "sales"). + Queue("local-queue-cq-with-2-flavors"). + PodSets(*utiltesting.MakePodSet("main", 1). + Request(corev1.ResourceCPU, "1"). + Obj()). + ReserveQuota(utiltesting.MakeAdmission("cq-with-2-flavors").Assignment(corev1.ResourceCPU, "on-demand", "1").Obj()). + Obj(), + *utiltesting.MakeWorkload("sample-job3", "sales"). + Queue("local-queue-cq-with-2-flavors"). + PodSets(*utiltesting.MakePodSet("main", 1). + Request(corev1.ResourceCPU, "1"). + Obj()). + Obj(), + }, + wantAdmissionsOnSecondSchedule: map[string]kueue.Admission{ + "sales/sample-job1": { + ClusterQueue: "cq-with-2-flavors", + PodSetAssignments: []kueue.PodSetAssignment{ + { + Name: "main", + Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{ + corev1.ResourceCPU: "on-demand", + }, + ResourceUsage: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1000m"), + }, + Count: ptr.To[int32](1), + }, + }, + }, + "sales/sample-job2": { + ClusterQueue: "cq-with-2-flavors", + PodSetAssignments: []kueue.PodSetAssignment{ + { + Name: "main", + Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{ + corev1.ResourceCPU: "on-demand", + }, + ResourceUsage: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1000m"), + }, + Count: ptr.To[int32](1), + }, + }, + }, + "sales/sample-job3": { + ClusterQueue: "cq-with-2-flavors", + PodSetAssignments: []kueue.PodSetAssignment{ + { + Name: "main", + Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{ + corev1.ResourceCPU: "spot", + }, + ResourceUsage: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1000m"), + }, + Count: ptr.To[int32](1), + }, + }, + }, + }, + }, { name: "scheduling context not changed", cqs: clusterQueue, diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index 0a9c8fb0fb..5719a1f2cc 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -1075,6 +1075,36 @@ var _ = ginkgo.Describe("Scheduler", func() { util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, devCQ.Name, dWl1) }) + ginkgo.It("Should try next flavor instead of pending", func() { + prodCQ = testing.MakeClusterQueue("prod-cq"). + Cohort("all"). + ResourceGroup( + *testing.MakeFlavorQuotas("on-demand").Resource(corev1.ResourceCPU, "2").Obj(), + *testing.MakeFlavorQuotas("spot-untainted").Resource(corev1.ResourceCPU, "2").Obj()). + Obj() + gomega.Expect(k8sClient.Create(ctx, prodCQ)).Should(gomega.Succeed()) + + prodQueue := testing.MakeLocalQueue("prod-queue", ns.Name).ClusterQueue(prodCQ.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, prodQueue)).Should(gomega.Succeed()) + + ginkgo.By("Creating 3 workloads") + wl1 := testing.MakeWorkload("wl-1", ns.Name).Queue(prodQueue.Name).Request(corev1.ResourceCPU, "1").Obj() + wl2 := testing.MakeWorkload("wl-2", ns.Name).Queue(prodQueue.Name).Request(corev1.ResourceCPU, "1").Obj() + wl3 := testing.MakeWorkload("wl-3", ns.Name).Queue(prodQueue.Name).Request(corev1.ResourceCPU, "1").Obj() + gomega.Expect(k8sClient.Create(ctx, wl1)).Should(gomega.Succeed()) + gomega.Expect(k8sClient.Create(ctx, wl2)).Should(gomega.Succeed()) + gomega.Expect(k8sClient.Create(ctx, wl3)).Should(gomega.Succeed()) + util.ExpectWorkloadToBeAdmittedAs(ctx, k8sClient, wl1, + testing.MakeAdmission(prodCQ.Name).Assignment(corev1.ResourceCPU, "on-demand", "1").Obj()) + util.ExpectWorkloadToBeAdmittedAs(ctx, k8sClient, wl2, + testing.MakeAdmission(prodCQ.Name).Assignment(corev1.ResourceCPU, "on-demand", "1").Obj()) + util.ExpectWorkloadToBeAdmittedAs(ctx, k8sClient, wl3, + testing.MakeAdmission(prodCQ.Name).Assignment(corev1.ResourceCPU, "spot-untainted", "1").Obj()) + util.ExpectPendingWorkloadsMetric(prodCQ, 0, 0) + util.ExpectReservingActiveWorkloadsMetric(prodCQ, 3) + util.ExpectAdmittedWorkloadsTotalMetric(prodCQ, 3) + }) + ginkgo.It("Should try next flavor instead of borrowing", func() { prodCQ = testing.MakeClusterQueue("prod-cq"). Cohort("all").