Skip to content

Commit

Permalink
Address comments and test with BestEffortFIFO
Browse files Browse the repository at this point in the history
Change-Id: Ib5d6109878510cc0ce6cf3f15c43052cd8338cce
  • Loading branch information
alculquicondor committed Nov 23, 2023
1 parent 0f528e9 commit 0cef5b3
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 125 deletions.
4 changes: 2 additions & 2 deletions pkg/cache/clusterqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package cache
import (
"errors"
"fmt"
"reflect"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -227,7 +227,7 @@ func (c *ClusterQueue) updateResourceGroups(in []kueue.ResourceGroup) {
rg.Flavors = append(rg.Flavors, fQuotas)
}
}
if !reflect.DeepEqual(oldRG, c.ResourceGroups) {
if !equality.Semantic.DeepEqual(oldRG, c.ResourceGroups) {
c.AllocatableResourceGeneration++
}
c.UpdateRGByResource()
Expand Down
28 changes: 12 additions & 16 deletions pkg/scheduler/flavorassigner/flavorassigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,24 +237,20 @@ func lastAssignmentOutdated(wl *workload.Info, cq *cache.ClusterQueue) bool {
// FlavorAssignmentMode.
func AssignFlavors(log logr.Logger, wl *workload.Info, resourceFlavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor, cq *cache.ClusterQueue, counts []int32) Assignment {
if wl.LastAssignment != nil && lastAssignmentOutdated(wl, cq) {
log.V(6).Info("workload's last assignment is outdated, set wl.LastAssignment to nil",
"cq.AllocatableResourceGeneration", cq.AllocatableResourceGeneration,
"wl.LastAssignment.ClusterQueueGeneration", wl.LastAssignment.ClusterQueueGeneration)
if cq.Cohort != nil {
log.V(6).Info("", "cq.Cohort.AllocatableResourceGeneration", cq.Cohort.AllocatableResourceGeneration,
"wl.LastAssignment.CohortGeneration", wl.LastAssignment.CohortGeneration)
}
wl.LastAssignment = nil
} else if wl.LastAssignment != nil {
log.V(6).Info("workload's last assignment is up to date",
"cq.AllocatableResourceGeneration", cq.AllocatableResourceGeneration,
"wl.LastAssignment.ClusterQueueGeneration", wl.LastAssignment.ClusterQueueGeneration)
if cq.Cohort != nil {
log.V(6).Info("", "cq.Cohort.AllocatableResourceGeneration", cq.Cohort.AllocatableResourceGeneration,
"wl.LastAssignment.CohortGeneration", wl.LastAssignment.CohortGeneration)
if logV := log.V(6); logV.Enabled() {
keysValues := []any{
"cq.AllocatableResourceGeneration", cq.AllocatableResourceGeneration,
"wl.LastAssignment.ClusterQueueGeneration", wl.LastAssignment.ClusterQueueGeneration,
}
if cq.Cohort != nil {
keysValues = append(keysValues,
"cq.Cohort.AllocatableResourceGeneration", cq.Cohort.AllocatableResourceGeneration,
"wl.LastAssignment.CohortGeneration", wl.LastAssignment.CohortGeneration,
)
}
logV.Info("Cleared Worload's last assignment becaused it was outdated", keysValues...)
}
} else {
log.V(4).Info("workload's last assignment is nil")
}

if len(counts) == 0 {
Expand Down
127 changes: 29 additions & 98 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,8 @@ func TestSchedule(t *testing.T) {
},
}
cases := map[string]struct {
workloads []kueue.Workload
needScheduleTwice bool
admissionError error
workloads []kueue.Workload
admissionError error
// wantAssignments is a summary of all the admissions in the cache after this cycle.
wantAssignments map[string]kueue.Admission
// wantScheduled is the subset of workloads that got scheduled/admitted in this cycle.
Expand All @@ -221,79 +220,6 @@ func TestSchedule(t *testing.T) {
// disable partial admission
disablePartialAdmission bool
}{
"two flavors to schedule": {
needScheduleTwice: true,
workloads: []kueue.Workload{
*utiltesting.MakeWorkload("sample-job1", "sales").
Queue("local-queue-cq-with-2-flavor").
PodSets(*utiltesting.MakePodSet("main", 1).
Request(corev1.ResourceCPU, "1").
Obj()).
ReserveQuota(utiltesting.MakeAdmission("cq-with-2-flavor").Assignment(corev1.ResourceCPU, "on-demand", "1000m").Obj()).
Obj(),
*utiltesting.MakeWorkload("sample-job2", "sales").
Queue("local-queue-cq-with-2-flavor").
PodSets(*utiltesting.MakePodSet("main", 1).
Request(corev1.ResourceCPU, "1").
Obj()).
ReserveQuota(utiltesting.MakeAdmission("cq-with-2-flavor").Assignment(corev1.ResourceCPU, "on-demand", "1000m").Obj()).
Obj(),
*utiltesting.MakeWorkload("sample-job3", "sales").
Queue("local-queue-cq-with-2-flavor").
PodSets(*utiltesting.MakePodSet("main", 1).
Request(corev1.ResourceCPU, "1").
Obj()).
Obj(),
},
wantScheduled: []string{"sales/sample-job3"},
wantAssignments: map[string]kueue.Admission{
"sales/sample-job1": {
ClusterQueue: "cq-with-2-flavor",
PodSetAssignments: []kueue.PodSetAssignment{
{
Name: "main",
Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{
corev1.ResourceCPU: "on-demand",
},
ResourceUsage: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1000m"),
},
Count: ptr.To[int32](1),
},
},
},
"sales/sample-job2": {
ClusterQueue: "cq-with-2-flavor",
PodSetAssignments: []kueue.PodSetAssignment{
{
Name: "main",
Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{
corev1.ResourceCPU: "on-demand",
},
ResourceUsage: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1000m"),
},
Count: ptr.To[int32](1),
},
},
},
"sales/sample-job3": {
ClusterQueue: "cq-with-2-flavor",
PodSetAssignments: []kueue.PodSetAssignment{
{
Name: "main",
Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{
corev1.ResourceCPU: "spot",
},
ResourceUsage: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1000m"),
},
Count: ptr.To[int32](1),
},
},
},
},
},
"workload fits in single clusterQueue": {
workloads: []kueue.Workload{
*utiltesting.MakeWorkload("foo", "sales").
Expand Down Expand Up @@ -1209,9 +1135,6 @@ func TestSchedule(t *testing.T) {
defer cancel()

scheduler.schedule(ctx)
if tc.needScheduleTwice {
scheduler.schedule(ctx)
}
wg.Wait()

wantScheduled := make(map[string]kueue.Admission)
Expand Down Expand Up @@ -1406,22 +1329,7 @@ func TestClusterQueueUpdate(t *testing.T) {
*utiltesting.MakeFlavorQuotas("spot").
Resource(corev1.ResourceCPU, "100", "0").Obj(),
).Obj()
newClusterQueue1 :=
*utiltesting.MakeClusterQueue("eng-alpha").
QueueingStrategy(kueue.StrictFIFO).
Preemption(kueue.ClusterQueuePreemption{
WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
}).
FlavorFungibility(kueue.FlavorFungibility{
WhenCanPreempt: kueue.Preempt,
}).
ResourceGroup(
*utiltesting.MakeFlavorQuotas("on-demand").
Resource(corev1.ResourceCPU, "50", "50").Obj(),
*utiltesting.MakeFlavorQuotas("spot").
Resource(corev1.ResourceCPU, "100", "0").Obj(),
).Obj()
newClusterQueue2 :=
newClusterQueue :=
*utiltesting.MakeClusterQueue("eng-alpha").
QueueingStrategy(kueue.StrictFIFO).
Preemption(kueue.ClusterQueuePreemption{
Expand All @@ -1445,13 +1353,13 @@ func TestClusterQueueUpdate(t *testing.T) {
{
name: "RGs not change",
cqs: &clusterQueue,
newcq: &newClusterQueue1,
newcq: clusterQueue.DeepCopy(),
wantLastAssignmentGeneration: 1,
},
{
name: "RGs changed",
cqs: &clusterQueue,
newcq: &newClusterQueue2,
newcq: &newClusterQueue,
wantLastAssignmentGeneration: 2,
},
}
Expand Down Expand Up @@ -1487,7 +1395,7 @@ func TestLastSchedulingContext(t *testing.T) {
}
clusterQueue := []kueue.ClusterQueue{
*utiltesting.MakeClusterQueue("eng-alpha").
QueueingStrategy(kueue.StrictFIFO).
QueueingStrategy(kueue.BestEffortFIFO).

This comment has been minimized.

Copy link
@alculquicondor

alculquicondor Nov 23, 2023

Author Contributor

The unit tests pass with StrictFIFO.

We probably should put the workload back into the queue immediately while we haven't tried all the flavors.

Preemption(kueue.ClusterQueuePreemption{
WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
}).
Expand Down Expand Up @@ -1649,6 +1557,29 @@ func TestLastSchedulingContext(t *testing.T) {
"default/preemptor": *utiltesting.MakeAdmission("eng-alpha").Assignment(corev1.ResourceCPU, "on-demand", "20").Obj(),
},
},
{
name: "use next flavor when can't preempt",
cqs: clusterQueue,
admittedWorkloads: []kueue.Workload{
*utiltesting.MakeWorkload("running", "default").
Request(corev1.ResourceCPU, "50").
ReserveQuota(utiltesting.MakeAdmission("eng-alpha").Assignment(corev1.ResourceCPU, "on-demand", "50").Obj()).
Admitted(true).
Obj(),
},
workloads: []kueue.Workload{
*utiltesting.MakeWorkload("new", "default").
Queue("main").
Request(corev1.ResourceCPU, "20").
Obj(),
},
wantPreempted: sets.Set[string]{},
wantAdmissionsOnFirstSchedule: map[string]kueue.Admission{},
wantAdmissionsOnSecondSchedule: map[string]kueue.Admission{
"default/running": *utiltesting.MakeAdmission("eng-alpha").Assignment(corev1.ResourceCPU, "on-demand", "50").Obj(),
"default/new": *utiltesting.MakeAdmission("eng-alpha").Assignment(corev1.ResourceCPU, "spot", "20").Obj(),
},
},
{
name: "borrow before next flavor",
cqs: clusterQueue_cohort,
Expand Down
26 changes: 17 additions & 9 deletions test/integration/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1075,31 +1075,39 @@ var _ = ginkgo.Describe("Scheduler", func() {
util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, devCQ.Name, dWl1)
})

ginkgo.It("Should try next flavor instead of pending", func() {
ginkgo.FIt("Should try next flavor if can't preempt on first", func() {
prodCQ = testing.MakeClusterQueue("prod-cq").
Cohort("all").

This comment has been minimized.

Copy link
@alculquicondor

alculquicondor Nov 23, 2023

Author Contributor

The integration test passes if I set the queueingStrategy to StrictFIFO

ResourceGroup(
*testing.MakeFlavorQuotas("on-demand").Resource(corev1.ResourceCPU, "2").Obj(),
*testing.MakeFlavorQuotas("spot-untainted").Resource(corev1.ResourceCPU, "2").Obj()).
Preemption(kueue.ClusterQueuePreemption{
WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
}).
FlavorFungibility(kueue.FlavorFungibility{
WhenCanPreempt: kueue.Preempt,
}).
Obj()
gomega.Expect(k8sClient.Create(ctx, prodCQ)).Should(gomega.Succeed())

prodQueue := testing.MakeLocalQueue("prod-queue", ns.Name).ClusterQueue(prodCQ.Name).Obj()
gomega.Expect(k8sClient.Create(ctx, prodQueue)).Should(gomega.Succeed())

ginkgo.By("Creating 3 workloads")
ginkgo.By("Creating 2 workloads and ensuring they are admitted")
wl1 := testing.MakeWorkload("wl-1", ns.Name).Queue(prodQueue.Name).Request(corev1.ResourceCPU, "1").Obj()
wl2 := testing.MakeWorkload("wl-2", ns.Name).Queue(prodQueue.Name).Request(corev1.ResourceCPU, "1").Obj()
wl3 := testing.MakeWorkload("wl-3", ns.Name).Queue(prodQueue.Name).Request(corev1.ResourceCPU, "1").Obj()
gomega.Expect(k8sClient.Create(ctx, wl1)).Should(gomega.Succeed())
gomega.Expect(k8sClient.Create(ctx, wl2)).Should(gomega.Succeed())
util.ExpectWorkloadToBeAdmittedAs(ctx, k8sClient, wl1,
testing.MakeAdmission(prodCQ.Name).Assignment(corev1.ResourceCPU, "on-demand", "1").Obj())
util.ExpectWorkloadToBeAdmittedAs(ctx, k8sClient, wl2,
testing.MakeAdmission(prodCQ.Name).Assignment(corev1.ResourceCPU, "on-demand", "1").Obj())

ginkgo.By("Creating an additional workload that can't fit in the first flavor")
wl3 := testing.MakeWorkload("wl-3", ns.Name).Queue(prodQueue.Name).Request(corev1.ResourceCPU, "1").Obj()
gomega.Expect(k8sClient.Create(ctx, wl3)).Should(gomega.Succeed())
prodWl1Admission := testing.MakeAdmission(prodCQ.Name).Assignment(corev1.ResourceCPU, "on-demand", "1").Obj()
prodWl2Admission := testing.MakeAdmission(prodCQ.Name).Assignment(corev1.ResourceCPU, "on-demand", "1").Obj()
prodWl3Admission := testing.MakeAdmission(prodCQ.Name).Assignment(corev1.ResourceCPU, "spot-untainted", "1").Obj()
util.ExpectWorkloadToBeAdmittedAs(ctx, k8sClient, wl1, prodWl1Admission)
util.ExpectWorkloadToBeAdmittedAs(ctx, k8sClient, wl2, prodWl2Admission)
util.ExpectWorkloadToBeAdmittedAs(ctx, k8sClient, wl3, prodWl3Admission)
util.ExpectWorkloadToBeAdmittedAs(ctx, k8sClient, wl3,
testing.MakeAdmission(prodCQ.Name).Assignment(corev1.ResourceCPU, "spot-untainted", "1").Obj())
util.ExpectPendingWorkloadsMetric(prodCQ, 0, 0)
util.ExpectReservingActiveWorkloadsMetric(prodCQ, 3)
util.ExpectAdmittedWorkloadsTotalMetric(prodCQ, 3)
Expand Down

0 comments on commit 0cef5b3

Please sign in to comment.