Skip to content

Commit

Permalink
Avoid unnecessary preemptions when candidates have the same timestamp
Browse files Browse the repository at this point in the history
Change-Id: I68155a467880059c65deb75b00939b14551924cd
  • Loading branch information
alculquicondor committed Mar 20, 2024
1 parent ba46285 commit c752e89
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 12 deletions.
14 changes: 13 additions & 1 deletion pkg/scheduler/preemption/preemption.go
Expand Up @@ -389,6 +389,7 @@ func workloadFits(wlReq cache.FlavorResourceQuantities, cq *cache.ClusterQueue,
}

// candidatesOrdering criteria:
// 0. Workloads already marked for preemption first.
// 1. Workloads from other ClusterQueues in the cohort before the ones in the
// same ClusterQueue as the preemptor.
// 2. Workloads with lower priority first.
Expand All @@ -397,6 +398,11 @@ func candidatesOrdering(candidates []*workload.Info, cq string, now time.Time) f
return func(i, j int) bool {
a := candidates[i]
b := candidates[j]
aEvicted := meta.IsStatusConditionTrue(a.Obj.Status.Conditions, kueue.WorkloadEvicted)
bEvicted := meta.IsStatusConditionTrue(b.Obj.Status.Conditions, kueue.WorkloadEvicted)
if aEvicted != bEvicted {
return aEvicted
}
aInCQ := a.ClusterQueue == cq
bInCQ := b.ClusterQueue == cq
if aInCQ != bInCQ {
Expand All @@ -407,7 +413,13 @@ func candidatesOrdering(candidates []*workload.Info, cq string, now time.Time) f
if pa != pb {
return pa < pb
}
return quotaReservationTime(b.Obj, now).Before(quotaReservationTime(a.Obj, now))
timeA := quotaReservationTime(a.Obj, now)
timeB := quotaReservationTime(b.Obj, now)
if !timeA.Equal(timeB) {
return timeA.After(timeB)
}
// Arbitrary comparison for deterministic sorting.
return a.Obj.UID < b.Obj.UID
}
}

Expand Down
18 changes: 14 additions & 4 deletions pkg/scheduler/preemption/preemption_test.go
Expand Up @@ -1127,15 +1127,25 @@ func TestCandidatesOrdering(t *testing.T) {
Obj()),
workload.NewInfo(utiltesting.MakeWorkload("low", "").
ReserveQuota(utiltesting.MakeAdmission("self").Obj()).
Priority(10).
Priority(-10).
Obj()),
workload.NewInfo(utiltesting.MakeWorkload("other", "").
ReserveQuota(utiltesting.MakeAdmission("other").Obj()).
Priority(10).
Obj()),
workload.NewInfo(utiltesting.MakeWorkload("old", "").
ReserveQuota(utiltesting.MakeAdmission("self").Obj()).
workload.NewInfo(utiltesting.MakeWorkload("evicted", "").
SetOrReplaceCondition(metav1.Condition{
Type: kueue.WorkloadEvicted,
Status: metav1.ConditionTrue,
}).
Obj()),
workload.NewInfo(utiltesting.MakeWorkload("old-a", "").
UID("old-a").
ReserveQuotaAt(utiltesting.MakeAdmission("self").Obj(), now).
Obj()),
workload.NewInfo(utiltesting.MakeWorkload("old-b", "").
UID("old-b").
ReserveQuotaAt(utiltesting.MakeAdmission("self").Obj(), now).
Obj()),
workload.NewInfo(utiltesting.MakeWorkload("current", "").
ReserveQuota(utiltesting.MakeAdmission("self").Obj()).
Expand All @@ -1151,7 +1161,7 @@ func TestCandidatesOrdering(t *testing.T) {
for i, c := range candidates {
gotNames[i] = workload.Key(c.Obj)
}
wantCandidates := []string{"/other", "/low", "/current", "/old", "/high"}
wantCandidates := []string{"/evicted", "/other", "/low", "/current", "/old-a", "/old-b", "/high"}
if diff := cmp.Diff(wantCandidates, gotNames); diff != "" {
t.Errorf("Sorted with wrong order (-want,+got):\n%s", diff)
}
Expand Down
12 changes: 11 additions & 1 deletion pkg/util/testing/wrappers.go
Expand Up @@ -82,6 +82,11 @@ func (w *WorkloadWrapper) Clone() *WorkloadWrapper {
return &WorkloadWrapper{Workload: *w.DeepCopy()}
}

func (w *WorkloadWrapper) UID(uid types.UID) *WorkloadWrapper {
w.Workload.UID = uid
return w
}

func (w *WorkloadWrapper) Finalizers(fin ...string) *WorkloadWrapper {
w.ObjectMeta.Finalizers = fin
return w
Expand Down Expand Up @@ -116,11 +121,16 @@ func (w *WorkloadWrapper) Active(a bool) *WorkloadWrapper {

// ReserveQuota sets workload admission and adds a "QuotaReserved" status condition
func (w *WorkloadWrapper) ReserveQuota(a *kueue.Admission) *WorkloadWrapper {
return w.ReserveQuotaAt(a, time.Now())
}

// ReserveQuotaAt sets workload admission and adds a "QuotaReserved" status condition
func (w *WorkloadWrapper) ReserveQuotaAt(a *kueue.Admission, now time.Time) *WorkloadWrapper {
w.Status.Admission = a
w.Status.Conditions = []metav1.Condition{{
Type: kueue.WorkloadQuotaReserved,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.Now(),
LastTransitionTime: metav1.NewTime(now),
Reason: "AdmittedByTest",
Message: fmt.Sprintf("Admitted by ClusterQueue %s", w.Status.Admission.ClusterQueue),
}}
Expand Down
52 changes: 50 additions & 2 deletions test/integration/scheduler/preemption_test.go
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package scheduler

import (
"fmt"
"time"

"github.com/onsi/ginkgo/v2"
Expand Down Expand Up @@ -306,7 +307,7 @@ var _ = ginkgo.Describe("Preemption", func() {
util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, betaCQ.Name, betaLowWl)
})

ginkgo.It("Should preempt all necessary workloads in concurrent scheduling", func() {
ginkgo.It("Should preempt all necessary workloads in concurrent scheduling with different priorities", func() {
ginkgo.By("Creating workloads in beta-cq that borrow quota")

betaMidWl := testing.MakeWorkload("beta-mid", ns.Name).
Expand Down Expand Up @@ -345,7 +346,7 @@ var _ = ginkgo.Describe("Preemption", func() {
util.FinishEvictionForWorkloads(ctx, k8sClient, betaMidWl)

// one of alpha-mid and gamma-mid should be admitted
gomega.Eventually(func() []*kueue.Workload { return util.FilterAdmittedWorkloads(ctx, k8sClient, alphaMidWl, gammaMidWl) }, util.Interval*4, util.Interval).Should(gomega.HaveLen(1))
util.ExpectWorkloadsToBeAdmittedCount(ctx, k8sClient, 1, alphaMidWl, gammaMidWl)

// betaHighWl remains admitted
util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, betaCQ.Name, betaHighWl)
Expand All @@ -356,6 +357,53 @@ var _ = ginkgo.Describe("Preemption", func() {
util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, alphaCQ.Name, alphaMidWl)
util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, gammaCQ.Name, gammaMidWl)
})

ginkgo.It("Should preempt all necessary workloads in concurrent scheduling with the same priority", func() {
var betaWls []*kueue.Workload
for i := 0; i < 3; i++ {
wl := testing.MakeWorkload(fmt.Sprintf("beta-%d", i), ns.Name).
Queue(betaQ.Name).
Request(corev1.ResourceCPU, "2").
Obj()
gomega.Expect(k8sClient.Create(ctx, wl)).To(gomega.Succeed())
betaWls = append(betaWls, wl)
}
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, betaWls...)

ginkgo.By("Creating preempting pods")

alphaWl := testing.MakeWorkload("alpha", ns.Name).
Queue(alphaQ.Name).
Request(corev1.ResourceCPU, "2").
Obj()
gomega.Expect(k8sClient.Create(ctx, alphaWl)).To(gomega.Succeed())

gammaWl := testing.MakeWorkload("gamma", ns.Name).
Queue(gammaQ.Name).
Request(corev1.ResourceCPU, "2").
Obj()
gomega.Expect(k8sClient.Create(ctx, gammaWl)).To(gomega.Succeed())

var evictedWorkloads []*kueue.Workload
gomega.Eventually(func() int {
evictedWorkloads = util.FilterEvictedWorkloads(ctx, k8sClient, betaWls...)
return len(evictedWorkloads)
}, util.Timeout, util.Interval).Should(gomega.Equal(1), "Number of evicted workloads")

ginkgo.By("Finishing eviction for first set of preempted workloads")
util.FinishEvictionForWorkloads(ctx, k8sClient, evictedWorkloads...)
util.ExpectWorkloadsToBeAdmittedCount(ctx, k8sClient, 1, alphaWl, gammaWl)

gomega.Eventually(func() int {
evictedWorkloads = util.FilterEvictedWorkloads(ctx, k8sClient, betaWls...)
return len(evictedWorkloads)
}, util.Timeout, util.Interval).Should(gomega.Equal(2), "Number of evicted workloads")

ginkgo.By("Finishing eviction for second set of preempted workloads")
util.FinishEvictionForWorkloads(ctx, k8sClient, evictedWorkloads...)
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, alphaWl, gammaWl)
util.ExpectWorkloadsToBeAdmittedCount(ctx, k8sClient, 1, betaWls...)
})
})

ginkgo.Context("In a cohort with StrictFIFO", func() {
Expand Down
26 changes: 22 additions & 4 deletions test/util/util.go
Expand Up @@ -243,11 +243,21 @@ func ExpectWorkloadsToHaveQuotaReservation(ctx context.Context, k8sClient client
}

func FilterAdmittedWorkloads(ctx context.Context, k8sClient client.Client, wls ...*kueue.Workload) []*kueue.Workload {
return filterWorkloads(ctx, k8sClient, workload.HasQuotaReservation, wls...)
}

func FilterEvictedWorkloads(ctx context.Context, k8sClient client.Client, wls ...*kueue.Workload) []*kueue.Workload {
return filterWorkloads(ctx, k8sClient, func(wl *kueue.Workload) bool {
return apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadEvicted)
}, wls...)
}

func filterWorkloads(ctx context.Context, k8sClient client.Client, filter func(*kueue.Workload) bool, wls ...*kueue.Workload) []*kueue.Workload {
ret := make([]*kueue.Workload, 0, len(wls))
var updatedWorkload kueue.Workload
for _, wl := range wls {
err := k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), &updatedWorkload)
if err == nil && workload.HasQuotaReservation(&updatedWorkload) {
if err == nil && filter(&updatedWorkload) {
ret = append(ret, wl)
}
}
Expand All @@ -273,17 +283,25 @@ func ExpectWorkloadsToBePending(ctx context.Context, k8sClient client.Client, wl
}

func ExpectWorkloadsToBeAdmitted(ctx context.Context, k8sClient client.Client, wls ...*kueue.Workload) {
gomega.EventuallyWithOffset(1, func() int {
expectWorkloadsToBeAdmittedCountWithOffset(ctx, 2, k8sClient, len(wls), wls...)
}

func ExpectWorkloadsToBeAdmittedCount(ctx context.Context, k8sClient client.Client, count int, wls ...*kueue.Workload) {
expectWorkloadsToBeAdmittedCountWithOffset(ctx, 2, k8sClient, count, wls...)
}

func expectWorkloadsToBeAdmittedCountWithOffset(ctx context.Context, offset int, k8sClient client.Client, count int, wls ...*kueue.Workload) {
gomega.EventuallyWithOffset(offset, func() int {
admitted := 0
var updatedWorkload kueue.Workload
for _, wl := range wls {
gomega.ExpectWithOffset(1, k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), &updatedWorkload)).To(gomega.Succeed())
gomega.ExpectWithOffset(offset, k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), &updatedWorkload)).To(gomega.Succeed())
if apimeta.IsStatusConditionTrue(updatedWorkload.Status.Conditions, kueue.WorkloadAdmitted) {
admitted++
}
}
return admitted
}, Timeout, Interval).Should(gomega.Equal(len(wls)), "Not enough workloads are admitted")
}, Timeout, Interval).Should(gomega.Equal(count), "Not enough workloads are admitted")
}

func ExpectWorkloadToFinish(ctx context.Context, k8sClient client.Client, wlKey client.ObjectKey) {
Expand Down

0 comments on commit c752e89

Please sign in to comment.