Skip to content

Commit

Permalink
Preemption while borrowing WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
mimowo committed Dec 15, 2023
1 parent a68c958 commit c54df4e
Show file tree
Hide file tree
Showing 12 changed files with 385 additions and 18 deletions.
32 changes: 32 additions & 0 deletions apis/kueue/v1beta1/clusterqueue_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,10 @@ type ClusterQueuePreemption struct {
// +kubebuilder:validation:Enum=Never;LowerPriority;Any
ReclaimWithinCohort PreemptionPolicy `json:"reclaimWithinCohort,omitempty"`

// borrowWithinCohort provides configuration to allow preemption within
// cohort while borrowing.
BorrowWithinCohort *BorrowWithinCohort `json:"borrowFromCohortConfig,omitempty"`

// withinClusterQueue determines whether a pending Workload that doesn't fit
// within the nominal quota for its ClusterQueue, can preempt active Workloads in
// the ClusterQueue. The possible values are:
Expand All @@ -382,6 +386,34 @@ type ClusterQueuePreemption struct {
WithinClusterQueue PreemptionPolicy `json:"withinClusterQueue,omitempty"`
}

type BorrowWithinCohortPolicy string

const (
BorrowWithinCohortPolicyNever BorrowWithinCohortPolicy = "Never"
BorrowWithinCohortPolicyLowerPriority BorrowWithinCohortPolicy = "LowerPriority"
)

// BorrowWithinCohort contains configuration which allows to preempt workloads
// within cohort while borrowing.
type BorrowWithinCohort struct {
// policy determines the policy for preemption to reclaim quota within cohort while borrowing.
// Possible values are:
// - `Never` (default): do not reclaim quota within ClusterQueue while borrowing.
// - `LowerPriority`: allow preempting workloads to reclaim quota within the cohort,
// but only by preempting lower-priority workloads.
//
// +kubebuilder:default=Never
// +kubebuilder:validation:Enum=Never;LowerPriority
Policy BorrowWithinCohortPolicy `json:"policy,omitempty"`

// maxPriorityThreshold allows to restrict the set of workloads for preemption
// to reclaim quota within the cohort, to only workloads with priority below
// or equal the specified level.
//
// +optional
MaxPriorityThreshold *int32 `json:"maxPriorityThreshold,omitempty"`
}

//+genclient
//+genclient:nonNamespaced
//+kubebuilder:object:root=true
Expand Down
27 changes: 26 additions & 1 deletion apis/kueue/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,30 @@ spec:
of Workloads to preempt to accomomdate the pending Workload, preempting
Workloads with lower priority first."
properties:
borrowFromCohortConfig:
description: borrowWithinCohort provides configuration to allow
preemption within cohort while borrowing.
properties:
maxPriorityThreshold:
description: maxPriorityThreshold allows to restrict the set
of workloads for preemption to reclaim quota within the
cohort, to only workloads with priority below or equal the
specified level.
format: int32
type: integer
policy:
default: Never
description: 'policy determines the policy for preemption
to reclaim quota within cohort while borrowing. Possible
values are: - `Never` (default): do not reclaim quota within
ClusterQueue while borrowing. - `LowerPriority`: allow preempting
workloads to reclaim quota within the cohort, but only by
preempting lower-priority workloads.'
enum:
- Never
- LowerPriority
type: string
type: object
reclaimWithinCohort:
default: Never
description: "reclaimWithinCohort determines whether a pending
Expand Down
51 changes: 51 additions & 0 deletions client-go/applyconfiguration/kueue/v1beta1/borrowwithincohort.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions client-go/applyconfiguration/utils.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,30 @@ spec:
of Workloads to preempt to accomomdate the pending Workload, preempting
Workloads with lower priority first."
properties:
borrowFromCohortConfig:
description: borrowWithinCohort provides configuration to allow
preemption within cohort while borrowing.
properties:
maxPriorityThreshold:
description: maxPriorityThreshold allows to restrict the set
of workloads for preemption to reclaim quota within the
cohort, to only workloads with priority below or equal the
specified level.
format: int32
type: integer
policy:
default: Never
description: 'policy determines the policy for preemption
to reclaim quota within cohort while borrowing. Possible
values are: - `Never` (default): do not reclaim quota within
ClusterQueue while borrowing. - `LowerPriority`: allow preempting
workloads to reclaim quota within the cohort, but only by
preempting lower-priority workloads.'
enum:
- Never
- LowerPriority
type: string
type: object
reclaimWithinCohort:
default: Never
description: "reclaimWithinCohort determines whether a pending
Expand Down
7 changes: 7 additions & 0 deletions pkg/scheduler/flavorassigner/flavorassigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,13 @@ func fitsResourceQuota(fName kueue.ResourceFlavorReference, rName corev1.Resourc
// ClusterQueue are preempted.
mode = Preempt
}
if cq.Preemption.BorrowWithinCohort != nil && cq.Preemption.BorrowWithinCohort.Policy != kueue.BorrowWithinCohortPolicyNever {
// when preemption with borrowing is enabled, we can succeeded admitting the
// workload if preemption is used.
if rQuota.BorrowingLimit != nil && val <= rQuota.Nominal+*rQuota.BorrowingLimit {
mode = Preempt
}
}
if rQuota.BorrowingLimit != nil && used+val > rQuota.Nominal+*rQuota.BorrowingLimit {
status.append(fmt.Sprintf("borrowing limit for %s in flavor %s exceeded", rName, fName))
return mode, 0, &status
Expand Down
32 changes: 22 additions & 10 deletions pkg/scheduler/preemption/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -74,7 +75,7 @@ func candidatesOnlyFromQueue(candidates []*workload.Info, clusterQueue string) [
}

// GetTargets returns the list of workloads that should be evicted in order to make room for wl.
func (p *Preemptor) GetTargets(wl workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot) []*workload.Info {
func (p *Preemptor) GetTargets(wl workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot, borrowWithinCohort *kueue.BorrowWithinCohort) []*workload.Info {
resPerFlv := resourcesRequiringPreemption(assignment)
cq := snapshot.ClusterQueues[wl.ClusterQueue]

Expand All @@ -92,20 +93,27 @@ func (p *Preemptor) GetTargets(wl workload.Info, assignment flavorassigner.Assig
// if not borrowing at the same time. Kueue prioritizes preemption of
// workloads from the other queues (that borrowed resources) first, before
// trying to preempt more own workloads and borrow at the same time.

if len(sameQueueCandidates) == len(candidates) {
// There is no risk of preemption of workloads from the other queue,
// so we can try borrowing.
targets = minimalPreemptions(&wl, assignment, snapshot, resPerFlv, candidates, true)
targets = minimalPreemptions(&wl, assignment, snapshot, resPerFlv, candidates, true, nil)
} else {
// There is a risk of preemption of workloads from the other queue in the
// cohort, proceeding without borrowing.
targets = minimalPreemptions(&wl, assignment, snapshot, resPerFlv, candidates, false)
if len(targets) == 0 {
var belowPriority *int32
allowBorrowing := borrowWithinCohort != nil && borrowWithinCohort.Policy != kueue.BorrowWithinCohortPolicyNever
if allowBorrowing {
belowPriority = ptr.To(priority.Priority(wl.Obj))
if borrowWithinCohort.MaxPriorityThreshold != nil && *borrowWithinCohort.MaxPriorityThreshold < *belowPriority {
belowPriority = ptr.To(*borrowWithinCohort.MaxPriorityThreshold + 1)
}
}
targets = minimalPreemptions(&wl, assignment, snapshot, resPerFlv, candidates, allowBorrowing, belowPriority)
if len(targets) == 0 && !allowBorrowing {
// Another attempt. This time only candidates from the same queue, but
// with borrowing. The previous attempt didn't try borrowing and had broader
// scope of preemption.
targets = minimalPreemptions(&wl, assignment, snapshot, resPerFlv, sameQueueCandidates, true)
targets = minimalPreemptions(&wl, assignment, snapshot, resPerFlv, sameQueueCandidates, true, nil)
}
}

Expand Down Expand Up @@ -156,7 +164,7 @@ func (p *Preemptor) applyPreemptionWithSSA(ctx context.Context, w *kueue.Workloa
// Once the Workload fits, the heuristic tries to add Workloads back, in the
// reverse order in which they were removed, while the incoming Workload still
// fits.
func minimalPreemptions(wl *workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowing bool) []*workload.Info {
func minimalPreemptions(wl *workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowing bool, belowPriority *int32) []*workload.Info {
wlReq := totalRequestsForAssignment(wl, assignment)
cq := snapshot.ClusterQueues[wl.ClusterQueue]
// Simulate removing all candidates from the ClusterQueue and cohort.
Expand All @@ -167,6 +175,9 @@ func minimalPreemptions(wl *workload.Info, assignment flavorassigner.Assignment,
if cq != candCQ && !cqIsBorrowing(candCQ, resPerFlv) {
continue
}
if cq != candCQ && belowPriority != nil && priority.Priority(candWl.Obj) >= *belowPriority {
continue
}
snapshot.RemoveWorkload(candWl)
targets = append(targets, candWl)
if workloadFits(wlReq, cq, allowBorrowing) {
Expand Down Expand Up @@ -253,9 +264,9 @@ func findCandidates(wl *kueue.Workload, cq *cache.ClusterQueue, resPerFlv resour
// Can't reclaim quota from itself or ClusterQueues that are not borrowing.
continue
}
onlyLowerPrio := true
if cq.Preemption.ReclaimWithinCohort == kueue.PreemptionPolicyAny {
onlyLowerPrio = false
onlyLowerPrio := false
if cq.Preemption.ReclaimWithinCohort != kueue.PreemptionPolicyAny {
onlyLowerPrio = true
}
for _, candidateWl := range cohortCQ.Workloads {
if onlyLowerPrio && priority.Priority(candidateWl.Obj) >= priority.Priority(wl) {
Expand All @@ -268,6 +279,7 @@ func findCandidates(wl *kueue.Workload, cq *cache.ClusterQueue, resPerFlv resour
}
}
}

return candidates
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/preemption/preemption_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,7 @@ func TestPreemption(t *testing.T) {
snapshot := cqCache.Snapshot()
wlInfo := workload.NewInfo(tc.incoming)
wlInfo.ClusterQueue = tc.targetCQ
targets := preemptor.GetTargets(*wlInfo, tc.assignment, &snapshot)
targets := preemptor.GetTargets(*wlInfo, tc.assignment, &snapshot, nil)
preempted, err := preemptor.IssuePreemptions(ctx, targets, snapshot.ClusterQueues[wlInfo.ClusterQueue])
if err != nil {
t.Fatalf("Failed doing preemption")
Expand Down
8 changes: 4 additions & 4 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func (s *Scheduler) nominate(ctx context.Context, workloads []workload.Info, sna
} else if err := s.validateLimitRange(ctx, &w); err != nil {
e.inadmissibleMsg = err.Error()
} else {
e.assignment, e.preemptionTargets = s.getAssignments(log, &e.Info, &snap)
e.assignment, e.preemptionTargets = s.getAssignments(log, &e.Info, &snap, cq.Preemption.BorrowWithinCohort)
e.inadmissibleMsg = e.assignment.Message()
e.Info.LastAssignment = &e.assignment.LastState
}
Expand All @@ -327,7 +327,7 @@ type partialAssignment struct {
preemptionTargets []*workload.Info
}

func (s *Scheduler) getAssignments(log logr.Logger, wl *workload.Info, snap *cache.Snapshot) (flavorassigner.Assignment, []*workload.Info) {
func (s *Scheduler) getAssignments(log logr.Logger, wl *workload.Info, snap *cache.Snapshot, borrowWithinCohort *kueue.BorrowWithinCohort) (flavorassigner.Assignment, []*workload.Info) {
cq := snap.ClusterQueues[wl.ClusterQueue]
fullAssignment := flavorassigner.AssignFlavors(log, wl, snap.ResourceFlavors, cq, nil)
var fullAssignmentTargets []*workload.Info
Expand All @@ -338,7 +338,7 @@ func (s *Scheduler) getAssignments(log logr.Logger, wl *workload.Info, snap *cac
}

if arm == flavorassigner.Preempt {
fullAssignmentTargets = s.preemptor.GetTargets(*wl, fullAssignment, snap)
fullAssignmentTargets = s.preemptor.GetTargets(*wl, fullAssignment, snap, borrowWithinCohort)
}

// if the feature gate is not enabled or we can preempt
Expand All @@ -352,7 +352,7 @@ func (s *Scheduler) getAssignments(log logr.Logger, wl *workload.Info, snap *cac
if assignment.RepresentativeMode() == flavorassigner.Fit {
return &partialAssignment{assignment: assignment}, true
}
preemptionTargets := s.preemptor.GetTargets(*wl, assignment, snap)
preemptionTargets := s.preemptor.GetTargets(*wl, assignment, snap, borrowWithinCohort)
if len(preemptionTargets) > 0 {

return &partialAssignment{assignment: assignment, preemptionTargets: preemptionTargets}, true
Expand Down

0 comments on commit c54df4e

Please sign in to comment.