Skip to content

Commit

Permalink
Fix preemption algorithm to reduce the number of preemptions (kuberne…
Browse files Browse the repository at this point in the history
…tes-sigs#1979)

* Fix preemption algorithm to reduce the number of preemptions

* review

* Update pkg/scheduler/preemption/preemption_test.go

Co-authored-by: Aldo Culquicondor <1299064+alculquicondor@users.noreply.github.com>

* Update pkg/scheduler/preemption/preemption_test.go

Co-authored-by: Aldo Culquicondor <1299064+alculquicondor@users.noreply.github.com>

* Update pkg/scheduler/preemption/preemption_test.go

Co-authored-by: Aldo Culquicondor <1299064+alculquicondor@users.noreply.github.com>

* remarks2

---------

Co-authored-by: Aldo Culquicondor <1299064+alculquicondor@users.noreply.github.com>
  • Loading branch information
2 people authored and vsoch committed Apr 18, 2024
1 parent ba813e4 commit ed77cdc
Show file tree
Hide file tree
Showing 3 changed files with 402 additions and 14 deletions.
48 changes: 34 additions & 14 deletions pkg/scheduler/preemption/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,17 @@ func (p *Preemptor) GetTargets(wl workload.Info, assignment flavorassigner.Assig
sort.Slice(candidates, candidatesOrdering(candidates, cq.Name, time.Now()))

sameQueueCandidates := candidatesOnlyFromQueue(candidates, wl.ClusterQueue)
wlReq := totalRequestsForAssignment(&wl, assignment)

// To avoid flapping, Kueue only allows preemption of workloads from the same
// queue if borrowing. Preemption of workloads from queues can happen only
// if not borrowing at the same time. Kueue prioritizes preemption of
// workloads from the other queues (that borrowed resources) first, before
// trying to preempt more own workloads and borrow at the same time.

if len(sameQueueCandidates) == len(candidates) {
// There is no possible preemption of workloads from other queues,
// so we'll try borrowing.
return minimalPreemptions(&wl, assignment, snapshot, resPerFlv, candidates, true, nil)
return minimalPreemptions(wlReq, cq, assignment, snapshot, resPerFlv, candidates, true, nil)
}

// There is a potential of preemption of workloads from the other queue in the
Expand All @@ -114,16 +114,20 @@ func (p *Preemptor) GetTargets(wl workload.Info, assignment flavorassigner.Assig
if borrowWithinCohort.MaxPriorityThreshold != nil && *borrowWithinCohort.MaxPriorityThreshold < *allowBorrowingBelowPriority {
allowBorrowingBelowPriority = ptr.To(*borrowWithinCohort.MaxPriorityThreshold + 1)
}
return minimalPreemptions(&wl, assignment, snapshot, resPerFlv, candidates, true, allowBorrowingBelowPriority)
return minimalPreemptions(wlReq, cq, assignment, snapshot, resPerFlv, candidates, true, allowBorrowingBelowPriority)
}
targets := minimalPreemptions(&wl, assignment, snapshot, resPerFlv, candidates, false, nil)
if len(targets) == 0 {
// Another attempt. This time only candidates from the same queue, but
// with borrowing. The previous attempt didn't try borrowing and had broader
// scope of preemption.
targets = minimalPreemptions(&wl, assignment, snapshot, resPerFlv, sameQueueCandidates, true, nil)

// Only try preemptions in the cohort, without borrowing, if the target clusterqueue is still
// under nominal quota for all resources.
if queueUnderNominalInAllRequestedResources(wlReq, cq) {
if targets := minimalPreemptions(wlReq, cq, assignment, snapshot, resPerFlv, candidates, false, nil); len(targets) > 0 {
return targets
}
}
return targets

// Final attempt. This time only candidates from the same queue, but
// with borrowing.
return minimalPreemptions(wlReq, cq, assignment, snapshot, resPerFlv, sameQueueCandidates, true, nil)
}

// IssuePreemptions marks the target workloads as evicted.
Expand Down Expand Up @@ -176,10 +180,7 @@ func (p *Preemptor) applyPreemptionWithSSA(ctx context.Context, w *kueue.Workloa
// Once the Workload fits, the heuristic tries to add Workloads back, in the
// reverse order in which they were removed, while the incoming Workload still
// fits.
func minimalPreemptions(wl *workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowing bool, allowBorrowingBelowPriority *int32) []*workload.Info {
wlReq := totalRequestsForAssignment(wl, assignment)
cq := snapshot.ClusterQueues[wl.ClusterQueue]

func minimalPreemptions(wlReq cache.FlavorResourceQuantities, cq *cache.ClusterQueue, assignment flavorassigner.Assignment, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowing bool, allowBorrowingBelowPriority *int32) []*workload.Info {
// Simulate removing all candidates from the ClusterQueue and cohort.
var targets []*workload.Info
fits := false
Expand Down Expand Up @@ -395,6 +396,25 @@ func workloadFits(wlReq cache.FlavorResourceQuantities, cq *cache.ClusterQueue,
return true
}

func queueUnderNominalInAllRequestedResources(wlReq cache.FlavorResourceQuantities, cq *cache.ClusterQueue) bool {
for _, rg := range cq.ResourceGroups {
for _, flvQuotas := range rg.Flavors {
flvReq, found := wlReq[flvQuotas.Name]
if !found {
// Workload doesn't request this flavor.
continue
}
cqResUsage := cq.Usage[flvQuotas.Name]
for rName := range flvReq {
if cqResUsage[rName] >= flvQuotas.Resources[rName].Nominal {
return false
}
}
}
}
return true
}

// candidatesOrdering criteria:
// 0. Workloads already marked for preemption first.
// 1. Workloads from other ClusterQueues in the cohort before the ones in the
Expand Down
227 changes: 227 additions & 0 deletions pkg/scheduler/preemption/preemption_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,34 @@ func TestPreemption(t *testing.T) {
ReclaimWithinCohort: kueue.PreemptionPolicyLowerPriority,
}).
Obj(),
utiltesting.MakeClusterQueue("a").
Cohort("cohort-three").
ResourceGroup(*utiltesting.MakeFlavorQuotas("default").
Resource(corev1.ResourceCPU, "2").
Resource(corev1.ResourceMemory, "2").
Obj(),
).
Preemption(kueue.ClusterQueuePreemption{
WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
ReclaimWithinCohort: kueue.PreemptionPolicyAny,
}).
Obj(),
utiltesting.MakeClusterQueue("b").
Cohort("cohort-three").
ResourceGroup(*utiltesting.MakeFlavorQuotas("default").
Resource(corev1.ResourceCPU, "2").
Resource(corev1.ResourceMemory, "2").
Obj(),
).
Obj(),
utiltesting.MakeClusterQueue("c").
Cohort("cohort-three").
ResourceGroup(*utiltesting.MakeFlavorQuotas("default").
Resource(corev1.ResourceCPU, "2").
Resource(corev1.ResourceMemory, "2").
Obj(),
).
Obj(),
}
cases := map[string]struct {
admitted []kueue.Workload
Expand Down Expand Up @@ -1062,6 +1090,205 @@ func TestPreemption(t *testing.T) {
wantPreempted: nil,
enableLendingLimit: true,
},
"preemptions from cq when target queue is exhausted for the single requested resource": {
admitted: []kueue.Workload{
*utiltesting.MakeWorkload("a1", "").
Priority(-2).
Request(corev1.ResourceCPU, "1").
ReserveQuota(utiltesting.MakeAdmission("a").Assignment(corev1.ResourceCPU, "default", "1").Obj()).
Obj(),
*utiltesting.MakeWorkload("a2", "").
Priority(-2).
Request(corev1.ResourceCPU, "1").
ReserveQuota(utiltesting.MakeAdmission("a").Assignment(corev1.ResourceCPU, "default", "1").Obj()).
Obj(),
*utiltesting.MakeWorkload("a3", "").
Priority(-1).
Request(corev1.ResourceCPU, "1").
ReserveQuota(utiltesting.MakeAdmission("a").Assignment(corev1.ResourceCPU, "default", "1").Obj()).
Obj(),
*utiltesting.MakeWorkload("b1", "").
Priority(0).
Request(corev1.ResourceCPU, "1").
ReserveQuota(utiltesting.MakeAdmission("b").Assignment(corev1.ResourceCPU, "default", "1").Obj()).
Obj(),
*utiltesting.MakeWorkload("b2", "").
Priority(0).
Request(corev1.ResourceCPU, "1").
ReserveQuota(utiltesting.MakeAdmission("b").Assignment(corev1.ResourceCPU, "default", "1").Obj()).
Obj(),
*utiltesting.MakeWorkload("b3", "").
Priority(0).
Request(corev1.ResourceCPU, "1").
ReserveQuota(utiltesting.MakeAdmission("b").Assignment(corev1.ResourceCPU, "default", "1").Obj()).
Obj(),
},
incoming: utiltesting.MakeWorkload("in", "").
Request(corev1.ResourceCPU, "2").
Priority(0).
Obj(),
targetCQ: "a",
assignment: singlePodSetAssignment(flavorassigner.ResourceAssignment{
corev1.ResourceCPU: &flavorassigner.FlavorAssignment{
Name: "default",
Mode: flavorassigner.Preempt,
},
}),
wantPreempted: sets.New("/a1", "/a2"),
},
"preemptions from cq when target queue is exhausted for two requested resources": {
admitted: []kueue.Workload{
*utiltesting.MakeWorkload("a1", "").
Priority(-2).
Request(corev1.ResourceCPU, "1").
Request(corev1.ResourceMemory, "1").
ReserveQuota(utiltesting.MakeAdmission("a").Assignment(corev1.ResourceCPU, "default", "1").Assignment(corev1.ResourceMemory, "default", "1").Obj()).
Obj(),
*utiltesting.MakeWorkload("a2", "").
Priority(-2).
Request(corev1.ResourceCPU, "1").
Request(corev1.ResourceMemory, "1").
ReserveQuota(utiltesting.MakeAdmission("a").Assignment(corev1.ResourceCPU, "default", "1").Assignment(corev1.ResourceMemory, "default", "1").Obj()).
Obj(),
*utiltesting.MakeWorkload("a3", "").
Priority(-1).
Request(corev1.ResourceCPU, "1").
Request(corev1.ResourceMemory, "1").
ReserveQuota(utiltesting.MakeAdmission("a").Assignment(corev1.ResourceCPU, "default", "1").Assignment(corev1.ResourceMemory, "default", "1").Obj()).
Obj(),
*utiltesting.MakeWorkload("b1", "").
Priority(0).
Request(corev1.ResourceCPU, "1").
Request(corev1.ResourceMemory, "1").
ReserveQuota(utiltesting.MakeAdmission("b").Assignment(corev1.ResourceCPU, "default", "1").Assignment(corev1.ResourceMemory, "default", "1").Obj()).
Obj(),
*utiltesting.MakeWorkload("b2", "").
Priority(0).
Request(corev1.ResourceCPU, "1").
Request(corev1.ResourceMemory, "1").
ReserveQuota(utiltesting.MakeAdmission("b").Assignment(corev1.ResourceCPU, "default", "1").Assignment(corev1.ResourceMemory, "default", "1").Obj()).
Obj(),
*utiltesting.MakeWorkload("b3", "").
Priority(0).
Request(corev1.ResourceCPU, "1").
Request(corev1.ResourceMemory, "1").
ReserveQuota(utiltesting.MakeAdmission("b").Assignment(corev1.ResourceCPU, "default", "1").Assignment(corev1.ResourceMemory, "default", "1").Obj()).
Obj(),
},
incoming: utiltesting.MakeWorkload("in", "").
Request(corev1.ResourceCPU, "2").
Request(corev1.ResourceMemory, "2").
Priority(0).
Obj(),
targetCQ: "a",
assignment: singlePodSetAssignment(flavorassigner.ResourceAssignment{
corev1.ResourceCPU: &flavorassigner.FlavorAssignment{
Name: "default",
Mode: flavorassigner.Preempt,
},
corev1.ResourceMemory: &flavorassigner.FlavorAssignment{
Name: "default",
Mode: flavorassigner.Preempt,
},
}),
wantPreempted: sets.New("/a1", "/a2"),
},
"preemptions from cq when target queue is exhausted for one requested resource, but not the other": {
admitted: []kueue.Workload{
*utiltesting.MakeWorkload("a1", "").
Priority(-2).
Request(corev1.ResourceCPU, "1").
ReserveQuota(utiltesting.MakeAdmission("a").Assignment(corev1.ResourceCPU, "default", "1").Obj()).
Obj(),
*utiltesting.MakeWorkload("a2", "").
Priority(-2).
Request(corev1.ResourceCPU, "1").
ReserveQuota(utiltesting.MakeAdmission("a").Assignment(corev1.ResourceCPU, "default", "1").Obj()).
Obj(),
*utiltesting.MakeWorkload("a3", "").
Priority(-1).
Request(corev1.ResourceCPU, "1").
ReserveQuota(utiltesting.MakeAdmission("a").Assignment(corev1.ResourceCPU, "default", "1").Obj()).
Obj(),
*utiltesting.MakeWorkload("b1", "").
Priority(0).
Request(corev1.ResourceCPU, "1").
ReserveQuota(utiltesting.MakeAdmission("b").Assignment(corev1.ResourceCPU, "default", "1").Obj()).
Obj(),
*utiltesting.MakeWorkload("b2", "").
Priority(0).
Request(corev1.ResourceCPU, "1").
ReserveQuota(utiltesting.MakeAdmission("b").Assignment(corev1.ResourceCPU, "default", "1").Obj()).
Obj(),
*utiltesting.MakeWorkload("b3", "").
Priority(0).
Request(corev1.ResourceCPU, "1").
ReserveQuota(utiltesting.MakeAdmission("b").Assignment(corev1.ResourceCPU, "default", "1").Obj()).
Obj(),
},
incoming: utiltesting.MakeWorkload("in", "").
Request(corev1.ResourceCPU, "2").
Request(corev1.ResourceMemory, "2").
Priority(0).
Obj(),
targetCQ: "a",
assignment: singlePodSetAssignment(flavorassigner.ResourceAssignment{
corev1.ResourceCPU: &flavorassigner.FlavorAssignment{
Name: "default",
Mode: flavorassigner.Preempt,
},
corev1.ResourceMemory: &flavorassigner.FlavorAssignment{
Name: "default",
Mode: flavorassigner.Preempt,
},
}),
wantPreempted: sets.New("/a1", "/a2"),
},
"allow preemption from other cluster queues if target cq is not exhausted for the requested resource": {
admitted: []kueue.Workload{
*utiltesting.MakeWorkload("a1", "").
Priority(-1).
Request(corev1.ResourceCPU, "1").
ReserveQuota(utiltesting.MakeAdmission("a").Assignment(corev1.ResourceCPU, "default", "1").Obj()).
Obj(),
*utiltesting.MakeWorkload("b1", "").
Priority(0).
Request(corev1.ResourceCPU, "1").
ReserveQuota(utiltesting.MakeAdmission("b").Assignment(corev1.ResourceCPU, "default", "1").Obj()).
Obj(),
*utiltesting.MakeWorkload("b2", "").
Priority(0).
Request(corev1.ResourceCPU, "1").
ReserveQuota(utiltesting.MakeAdmission("b").Assignment(corev1.ResourceCPU, "default", "1").Obj()).
Obj(),
*utiltesting.MakeWorkload("b3", "").
Priority(0).
Request(corev1.ResourceCPU, "1").
ReserveQuota(utiltesting.MakeAdmission("b").Assignment(corev1.ResourceCPU, "default", "1").Obj()).
Obj(),
*utiltesting.MakeWorkload("b4", "").
Priority(0).
Request(corev1.ResourceCPU, "1").
ReserveQuota(utiltesting.MakeAdmission("b").Assignment(corev1.ResourceCPU, "default", "1").Obj()).
Obj(),
*utiltesting.MakeWorkload("b5", "").
Priority(-1).
Request(corev1.ResourceCPU, "1").
ReserveQuota(utiltesting.MakeAdmission("b").Assignment(corev1.ResourceCPU, "default", "1").Obj()).
Obj(),
},
incoming: utiltesting.MakeWorkload("in", "").
Request(corev1.ResourceCPU, "2").
Obj(),
targetCQ: "a",
assignment: singlePodSetAssignment(flavorassigner.ResourceAssignment{
corev1.ResourceCPU: &flavorassigner.FlavorAssignment{
Name: "default",
Mode: flavorassigner.Preempt,
},
}),
wantPreempted: sets.New("/a1", "/b5"),
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
Expand Down
Loading

0 comments on commit ed77cdc

Please sign in to comment.