Skip to content

Commit

Permalink
Review remarks 2
Browse files Browse the repository at this point in the history
  • Loading branch information
mimowo committed Jan 5, 2024
1 parent f867bbe commit f4d79cf
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 24 deletions.
2 changes: 2 additions & 0 deletions apis/kueue/v1beta1/clusterqueue_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,8 @@ type BorrowWithinCohort struct {
// maxPriorityThreshold allows to restrict the set of workloads which
// might be preempted by a borrowing workload, to only workloads with
// priority less than or equal to the specified threshold priority.
// When the threshold is not specified, then any workload satisfying the
// policy can be preempted by the borrowing workload.
//
// +optional
MaxPriorityThreshold *int32 `json:"maxPriorityThreshold,omitempty"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,9 @@ spec:
description: maxPriorityThreshold allows to restrict the set
of workloads which might be preempted by a borrowing workload,
to only workloads with priority less than or equal to the
specified threshold priority.
specified threshold priority. When the threshold is not
specified, then any workload satisfying the policy can be
preempted by the borrowing workload.
format: int32
type: integer
policy:
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/flavorassigner/flavorassigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ func fitsResourceQuota(fName kueue.ResourceFlavorReference, rName corev1.Resourc
if cq.Preemption.BorrowWithinCohort != nil && cq.Preemption.BorrowWithinCohort.Policy != kueue.BorrowWithinCohortPolicyNever {
// when preemption with borrowing is enabled, we can succeeded admitting the
// workload if preemption is used.
if rQuota.BorrowingLimit != nil && val <= rQuota.Nominal+*rQuota.BorrowingLimit && val <= cohortAvailable {
if (rQuota.BorrowingLimit == nil || rQuota.BorrowingLimit != nil && val <= rQuota.Nominal+*rQuota.BorrowingLimit) && val <= cohortAvailable {
mode = Preempt
borrow = val > rQuota.Nominal
}
Expand Down
62 changes: 62 additions & 0 deletions pkg/scheduler/flavorassigner/flavorassigner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1902,6 +1902,68 @@ func TestAssignFlavors(t *testing.T) {
},
},
},
"when borrowing while preemption is needed for flavor one, no borrowingLimit; WhenCanBorrow=Borrow": {
wlPods: []kueue.PodSet{
*utiltesting.MakePodSet("main", 1).
Request(corev1.ResourceCPU, "12").
Obj(),
},
clusterQueue: cache.ClusterQueue{
Preemption: kueue.ClusterQueuePreemption{
ReclaimWithinCohort: kueue.PreemptionPolicyLowerPriority,
BorrowWithinCohort: &kueue.BorrowWithinCohort{
Policy: kueue.BorrowWithinCohortPolicyLowerPriority,
},
},
Cohort: &cache.Cohort{
Usage: cache.FlavorResourceQuantities{
"one": {corev1.ResourceCPU: 10000},
},
RequestableResources: cache.FlavorResourceQuantities{
"one": {corev1.ResourceCPU: 12000},
"two": {corev1.ResourceCPU: 12000},
},
},
FlavorFungibility: kueue.FlavorFungibility{
WhenCanBorrow: kueue.Borrow,
WhenCanPreempt: kueue.Preempt,
},
ResourceGroups: []cache.ResourceGroup{{
CoveredResources: sets.New(corev1.ResourceCPU),
Flavors: []cache.FlavorQuotas{{
Name: "one",
Resources: map[corev1.ResourceName]*cache.ResourceQuota{
corev1.ResourceCPU: {Nominal: 0},
},
}, {
Name: "two",
Resources: map[corev1.ResourceName]*cache.ResourceQuota{
corev1.ResourceCPU: {Nominal: 12000},
},
}},
}},
},
wantRepMode: Preempt,
wantAssignment: Assignment{
Borrowing: true,
PodSets: []PodSetAssignment{{
Name: "main",
Flavors: ResourceAssignment{
corev1.ResourceCPU: {Name: "one", Mode: Preempt},
},
Status: &Status{
reasons: []string{"insufficient unused quota in cohort for cpu in flavor one, 10 more needed"},
},
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("12000m"),
},
Count: 1,
}},
Usage: cache.FlavorResourceQuantities{
"one": {corev1.ResourceCPU: 12000},
},
},
},
"when borrowing while preemption is needed for flavor one; WhenCanBorrow=TryNextFlavor": {
wlPods: []kueue.PodSet{
*utiltesting.MakePodSet("main", 1).
Expand Down
43 changes: 22 additions & 21 deletions pkg/scheduler/preemption/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ func (p *Preemptor) GetTargets(wl workload.Info, assignment flavorassigner.Assig
sort.Slice(candidates, candidatesOrdering(candidates, cq.Name, time.Now()))

sameQueueCandidates := candidatesOnlyFromQueue(candidates, wl.ClusterQueue)
var targets []*workload.Info

// To avoid flapping, Kueue only allows preemption of workloads from the same
// queue if borrowing. Preemption of workloads from queues can happen only
Expand All @@ -97,28 +96,30 @@ func (p *Preemptor) GetTargets(wl workload.Info, assignment flavorassigner.Assig
if len(sameQueueCandidates) == len(candidates) {
// There is no possible preemption of workloads from other queues,
// so we'll try borrowing.
targets = minimalPreemptions(&wl, assignment, snapshot, resPerFlv, candidates, true, nil)
} else {
// There is a risk of preemption of workloads from the other queue in the
// cohort, proceeding without borrowing.
var allowBorrowingBelowPriority *int32
borrowWithinCohort := cq.Preemption.BorrowWithinCohort
allowBorrowing := borrowWithinCohort != nil && borrowWithinCohort.Policy != kueue.BorrowWithinCohortPolicyNever
if allowBorrowing {
allowBorrowingBelowPriority = ptr.To(priority.Priority(wl.Obj))
if borrowWithinCohort.MaxPriorityThreshold != nil && *borrowWithinCohort.MaxPriorityThreshold < *allowBorrowingBelowPriority {
allowBorrowingBelowPriority = ptr.To(*borrowWithinCohort.MaxPriorityThreshold + 1)
}
}
targets = minimalPreemptions(&wl, assignment, snapshot, resPerFlv, candidates, allowBorrowing, allowBorrowingBelowPriority)
if len(targets) == 0 && !allowBorrowing {
// Another attempt. This time only candidates from the same queue, but
// with borrowing. The previous attempt didn't try borrowing and had broader
// scope of preemption.
targets = minimalPreemptions(&wl, assignment, snapshot, resPerFlv, sameQueueCandidates, true, nil)
}
return minimalPreemptions(&wl, assignment, snapshot, resPerFlv, candidates, true, nil)
}

// There is a risk of preemption of workloads from the other queue in the
// cohort. We proceed with borrowing only if the dedicated policy
// (borrowWithinCohort) is enabled. This ensures the preempted workloads
// have lower priority, and so they will not preempt the preemptor when
// requeued.
var allowBorrowingBelowPriority *int32
borrowWithinCohort := cq.Preemption.BorrowWithinCohort
allowBorrowing := borrowWithinCohort != nil && borrowWithinCohort.Policy != kueue.BorrowWithinCohortPolicyNever
if allowBorrowing {
allowBorrowingBelowPriority = ptr.To(priority.Priority(wl.Obj))
if borrowWithinCohort.MaxPriorityThreshold != nil && *borrowWithinCohort.MaxPriorityThreshold < *allowBorrowingBelowPriority {
allowBorrowingBelowPriority = ptr.To(*borrowWithinCohort.MaxPriorityThreshold + 1)
}
}
targets := minimalPreemptions(&wl, assignment, snapshot, resPerFlv, candidates, allowBorrowing, allowBorrowingBelowPriority)
if len(targets) == 0 && !allowBorrowing {
// Another attempt. This time only candidates from the same queue, but
// with borrowing. The previous attempt didn't try borrowing and had broader
// scope of preemption.
targets = minimalPreemptions(&wl, assignment, snapshot, resPerFlv, sameQueueCandidates, true, nil)
}
return targets
}

Expand Down
4 changes: 3 additions & 1 deletion site/content/en/docs/reference/kueue.v1beta1.md
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,9 @@ the preempted workloads are of lower priority.</li>
<td>
<p>maxPriorityThreshold allows to restrict the set of workloads which
might be preempted by a borrowing workload, to only workloads with
priority less than or equal to the specified threshold priority.</p>
priority less than or equal to the specified threshold priority.
When the threshold is not specified, then any workload satisfying the
policy can be preempted by the borrowing workload.</p>
</td>
</tr>
</tbody>
Expand Down

0 comments on commit f4d79cf

Please sign in to comment.