Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: check if RGs is updated #1356

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion pkg/cache/clusterqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -200,6 +201,7 @@ func filterQuantities(orig FlavorResourceQuantities, resourceGroups []kueue.Reso
}

func (c *ClusterQueue) updateResourceGroups(in []kueue.ResourceGroup) {
oldRG := c.ResourceGroups
alculquicondor marked this conversation as resolved.
Show resolved Hide resolved
c.ResourceGroups = make([]ResourceGroup, len(in))
for i, rgIn := range in {
rg := &c.ResourceGroups[i]
Expand All @@ -225,7 +227,9 @@ func (c *ClusterQueue) updateResourceGroups(in []kueue.ResourceGroup) {
rg.Flavors = append(rg.Flavors, fQuotas)
}
}
c.AllocatableResourceGeneration++
if !equality.Semantic.DeepEqual(oldRG, c.ResourceGroups) {
c.AllocatableResourceGeneration++
}
c.UpdateRGByResource()
}

Expand Down
184 changes: 182 additions & 2 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,9 @@ func TestSchedule(t *testing.T) {
},
}
cases := map[string]struct {
workloads []kueue.Workload
admissionError error
workloads []kueue.Workload
needScheduleTwice bool
admissionError error
// wantAssignments is a summary of all the admissions in the cache after this cycle.
wantAssignments map[string]kueue.Admission
// wantScheduled is the subset of workloads that got scheduled/admitted in this cycle.
Expand Down Expand Up @@ -1110,6 +1111,9 @@ func TestSchedule(t *testing.T) {
defer cancel()

scheduler.schedule(ctx)
if tc.needScheduleTwice {
KunWuLuan marked this conversation as resolved.
Show resolved Hide resolved
scheduler.schedule(ctx)
}
wg.Wait()

wantScheduled := make(map[string]kueue.Admission)
Expand Down Expand Up @@ -1284,6 +1288,85 @@ func TestEntryOrdering(t *testing.T) {
}
}

func TestClusterQueueUpdate(t *testing.T) {
alculquicondor marked this conversation as resolved.
Show resolved Hide resolved
resourceFlavors := []*kueue.ResourceFlavor{
{ObjectMeta: metav1.ObjectMeta{Name: "on-demand"}},
{ObjectMeta: metav1.ObjectMeta{Name: "spot"}},
}
clusterQueue :=
*utiltesting.MakeClusterQueue("eng-alpha").
QueueingStrategy(kueue.StrictFIFO).
Preemption(kueue.ClusterQueuePreemption{
WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
}).
FlavorFungibility(kueue.FlavorFungibility{
WhenCanPreempt: kueue.Preempt,
}).
ResourceGroup(
*utiltesting.MakeFlavorQuotas("on-demand").
Resource(corev1.ResourceCPU, "50", "50").Obj(),
*utiltesting.MakeFlavorQuotas("spot").
Resource(corev1.ResourceCPU, "100", "0").Obj(),
).Obj()
newClusterQueue2 :=
*utiltesting.MakeClusterQueue("eng-alpha").
QueueingStrategy(kueue.StrictFIFO).
Preemption(kueue.ClusterQueuePreemption{
WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
}).
FlavorFungibility(kueue.FlavorFungibility{
WhenCanPreempt: kueue.Preempt,
}).
ResourceGroup(
*utiltesting.MakeFlavorQuotas("on-demand").
Resource(corev1.ResourceCPU, "100", "50").Obj(),
*utiltesting.MakeFlavorQuotas("spot").
Resource(corev1.ResourceCPU, "100", "0").Obj(),
).Obj()
cases := []struct {
name string
cqs *kueue.ClusterQueue
newcq *kueue.ClusterQueue
wantLastAssignmentGeneration int64
}{
{
name: "RGs not change",
cqs: &clusterQueue,
newcq: clusterQueue.DeepCopy(),
wantLastAssignmentGeneration: 1,
},
{
name: "RGs changed",
cqs: &clusterQueue,
newcq: &newClusterQueue2,
wantLastAssignmentGeneration: 2,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
clientBuilder := utiltesting.NewClientBuilder().
WithObjects(
&corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "default"}},
tc.cqs,
)
cl := clientBuilder.Build()
cqCache := cache.New(cl)
// Workloads are loaded into queues or clusterQueues as we add them.
for _, rf := range resourceFlavors {
cqCache.AddOrUpdateResourceFlavor(rf)
}
cqCache.AddClusterQueue(context.Background(), tc.cqs)
cqCache.UpdateClusterQueue(tc.newcq)
snapshot := cqCache.Snapshot()
if diff := cmp.Diff(
tc.wantLastAssignmentGeneration,
snapshot.ClusterQueues["eng-alpha"].AllocatableResourceGeneration); diff != "" {
t.Errorf("Unexpected assigned clusterQueues in cache (-want,+got):\n%s", diff)
}
})
}
}

func TestLastSchedulingContext(t *testing.T) {
resourceFlavors := []*kueue.ResourceFlavor{
{ObjectMeta: metav1.ObjectMeta{Name: "on-demand"}},
Expand Down Expand Up @@ -1357,6 +1440,22 @@ func TestLastSchedulingContext(t *testing.T) {
*utiltesting.MakeFlavorQuotas("spot").
Resource(corev1.ResourceCPU, "100", "0").Obj(),
).Obj(),
*utiltesting.MakeClusterQueue("cq-with-2-flavors").
QueueingStrategy(kueue.StrictFIFO).
Preemption(kueue.ClusterQueuePreemption{
ReclaimWithinCohort: kueue.PreemptionPolicyAny,
WithinClusterQueue: kueue.PreemptionPolicyLowerOrNewerEqualPriority,
}).
FlavorFungibility(kueue.FlavorFungibility{
WhenCanBorrow: kueue.Borrow,
WhenCanPreempt: kueue.Preempt,
}).
ResourceGroup(
*utiltesting.MakeFlavorQuotas("on-demand").
Resource(corev1.ResourceCPU, "2").Obj(),
*utiltesting.MakeFlavorQuotas("spot").
Resource(corev1.ResourceCPU, "2").Obj()).
Obj(),
}

queues := []kueue.LocalQueue{
Expand Down Expand Up @@ -1396,6 +1495,15 @@ func TestLastSchedulingContext(t *testing.T) {
ClusterQueue: "eng-cohort-theta",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "sales",
Name: "local-queue-cq-with-2-flavors",
},
Spec: kueue.LocalQueueSpec{
ClusterQueue: "cq-with-2-flavors",
},
},
}
wl := utiltesting.MakeWorkload("low-1", "default").
Request(corev1.ResourceCPU, "50").
Expand All @@ -1412,6 +1520,78 @@ func TestLastSchedulingContext(t *testing.T) {
wantAdmissionsOnFirstSchedule map[string]kueue.Admission
wantAdmissionsOnSecondSchedule map[string]kueue.Admission
}{
{
name: "two flavors to schedule",
workloads: []kueue.Workload{
*utiltesting.MakeWorkload("sample-job1", "sales").
Queue("local-queue-cq-with-2-flavors").
PodSets(*utiltesting.MakePodSet("main", 1).
Request(corev1.ResourceCPU, "1").
Obj()).
ReserveQuota(utiltesting.MakeAdmission("cq-with-2-flavors").Assignment(corev1.ResourceCPU, "on-demand", "1").Obj()).
Obj(),
*utiltesting.MakeWorkload("sample-job2", "sales").
Queue("local-queue-cq-with-2-flavors").
PodSets(*utiltesting.MakePodSet("main", 1).
Request(corev1.ResourceCPU, "1").
Obj()).
ReserveQuota(utiltesting.MakeAdmission("cq-with-2-flavors").Assignment(corev1.ResourceCPU, "on-demand", "1").Obj()).
Obj(),
*utiltesting.MakeWorkload("sample-job3", "sales").
Queue("local-queue-cq-with-2-flavors").
PodSets(*utiltesting.MakePodSet("main", 1).
Request(corev1.ResourceCPU, "1").
Obj()).
Obj(),
},
wantAdmissionsOnSecondSchedule: map[string]kueue.Admission{
"sales/sample-job1": {
ClusterQueue: "cq-with-2-flavors",
PodSetAssignments: []kueue.PodSetAssignment{
{
Name: "main",
Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{
corev1.ResourceCPU: "on-demand",
},
ResourceUsage: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1000m"),
},
Count: ptr.To[int32](1),
},
},
},
"sales/sample-job2": {
ClusterQueue: "cq-with-2-flavors",
PodSetAssignments: []kueue.PodSetAssignment{
{
Name: "main",
Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{
corev1.ResourceCPU: "on-demand",
},
ResourceUsage: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1000m"),
},
Count: ptr.To[int32](1),
},
},
},
"sales/sample-job3": {
ClusterQueue: "cq-with-2-flavors",
PodSetAssignments: []kueue.PodSetAssignment{
{
Name: "main",
Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{
corev1.ResourceCPU: "spot",
},
ResourceUsage: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1000m"),
},
Count: ptr.To[int32](1),
},
},
},
},
},
{
name: "scheduling context not changed",
cqs: clusterQueue,
Expand Down
30 changes: 30 additions & 0 deletions test/integration/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1075,6 +1075,36 @@ var _ = ginkgo.Describe("Scheduler", func() {
util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, devCQ.Name, dWl1)
})

ginkgo.It("Should try next flavor instead of pending", func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nstogner any other integration test you would like to see?

prodCQ = testing.MakeClusterQueue("prod-cq").
Cohort("all").
ResourceGroup(
*testing.MakeFlavorQuotas("on-demand").Resource(corev1.ResourceCPU, "2").Obj(),
*testing.MakeFlavorQuotas("spot-untainted").Resource(corev1.ResourceCPU, "2").Obj()).
Copy link
Contributor

@alculquicondor alculquicondor Nov 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just noticed that this doesn't have fungibility or preemption policies set. Was it failing even without fungibility police?

Obj()
gomega.Expect(k8sClient.Create(ctx, prodCQ)).Should(gomega.Succeed())

prodQueue := testing.MakeLocalQueue("prod-queue", ns.Name).ClusterQueue(prodCQ.Name).Obj()
gomega.Expect(k8sClient.Create(ctx, prodQueue)).Should(gomega.Succeed())

ginkgo.By("Creating 3 workloads")
wl1 := testing.MakeWorkload("wl-1", ns.Name).Queue(prodQueue.Name).Request(corev1.ResourceCPU, "1").Obj()
wl2 := testing.MakeWorkload("wl-2", ns.Name).Queue(prodQueue.Name).Request(corev1.ResourceCPU, "1").Obj()
wl3 := testing.MakeWorkload("wl-3", ns.Name).Queue(prodQueue.Name).Request(corev1.ResourceCPU, "1").Obj()
gomega.Expect(k8sClient.Create(ctx, wl1)).Should(gomega.Succeed())
gomega.Expect(k8sClient.Create(ctx, wl2)).Should(gomega.Succeed())
gomega.Expect(k8sClient.Create(ctx, wl3)).Should(gomega.Succeed())
util.ExpectWorkloadToBeAdmittedAs(ctx, k8sClient, wl1,
testing.MakeAdmission(prodCQ.Name).Assignment(corev1.ResourceCPU, "on-demand", "1").Obj())
util.ExpectWorkloadToBeAdmittedAs(ctx, k8sClient, wl2,
testing.MakeAdmission(prodCQ.Name).Assignment(corev1.ResourceCPU, "on-demand", "1").Obj())
util.ExpectWorkloadToBeAdmittedAs(ctx, k8sClient, wl3,
testing.MakeAdmission(prodCQ.Name).Assignment(corev1.ResourceCPU, "spot-untainted", "1").Obj())
util.ExpectPendingWorkloadsMetric(prodCQ, 0, 0)
util.ExpectReservingActiveWorkloadsMetric(prodCQ, 3)
util.ExpectAdmittedWorkloadsTotalMetric(prodCQ, 3)
})

ginkgo.It("Should try next flavor instead of borrowing", func() {
prodCQ = testing.MakeClusterQueue("prod-cq").
Cohort("all").
Expand Down