From 97e42c3f355fbcf4ee59c5289385e40719863bb7 Mon Sep 17 00:00:00 2001
From: Aldo Culquicondor
Date: Thu, 25 Apr 2024 19:26:52 +0000
Subject: [PATCH 01/10] Add APIs for configuring fair sharing
Change-Id: I74b24ba8715290753c0bd9b966a109b5b01078b4
---
apis/config/v1beta1/configuration_types.go | 27 +++++++++
apis/config/v1beta1/defaults.go | 3 +
apis/config/v1beta1/zz_generated.deepcopy.go | 25 ++++++++
apis/kueue/v1beta1/clusterqueue_types.go | 16 +++++
apis/kueue/v1beta1/zz_generated.deepcopy.go | 25 ++++++++
.../crd/kueue.x-k8s.io_clusterqueues.yaml | 20 +++++++
.../kueue/v1beta1/clusterqueuespec.go | 9 +++
.../kueue/v1beta1/fairsharing.go | 42 +++++++++++++
client-go/applyconfiguration/utils.go | 2 +
.../bases/kueue.x-k8s.io_clusterqueues.yaml | 20 +++++++
pkg/config/validation.go | 28 ++++++++-
pkg/config/validation_test.go | 42 +++++++++++++
.../en/docs/reference/kueue-config.v1beta1.md | 60 +++++++++++++++++++
.../en/docs/reference/kueue.v1beta1.md | 40 +++++++++++++
14 files changed, 358 insertions(+), 1 deletion(-)
create mode 100644 client-go/applyconfiguration/kueue/v1beta1/fairsharing.go
diff --git a/apis/config/v1beta1/configuration_types.go b/apis/config/v1beta1/configuration_types.go
index 90c4059e37..b9165e4a06 100644
--- a/apis/config/v1beta1/configuration_types.go
+++ b/apis/config/v1beta1/configuration_types.go
@@ -70,6 +70,9 @@ type Configuration struct {
// MultiKueue controls the behaviour of the MultiKueue AdmissionCheck Controller.
MultiKueue *MultiKueue `json:"multiKueue,omitempty"`
+
+ // FairSharing controls the fair sharing semantics across the cluster.
+ FairSharing *FairSharing `json:"fairSharing,omitempty"`
}
type ControllerManager struct {
@@ -352,3 +355,27 @@ type ClusterQueueVisibility struct {
// Defaults to 10.
MaxCount int32 `json:"maxCount,omitempty"`
}
+
+type PreemptionStrategy string
+
+const (
+ LessThanOrEqualToFinalShare PreemptionStrategy = "LessThanOrEqualToFinalShare"
+ LessThanInitialShare PreemptionStrategy = "LessThanInitialShare"
+)
+
+type FairSharing struct {
+ // enable indicates whether to enable fair sharing for all cohorts.
+ // Defaults to false.
+ Enable bool `json:"enable"`
+
+ // preemptionStrategies indicates which constraints should a preemption satisfy.
+ // The preemption algorithm will only use the next strategy in the list if the
+ // incoming workload (preemptor) doesn't fit after using the previous strategies.
+ // Possible values are:
+ // - LessThanOrEqualToFinalShare: Only preempt if the share of the preemptor CQ
+ // will be less than or equal to the share of the preemptee CQ after the preemption occurs.
+ // - LessThanInitialShare: Only preempt if the share of the preemptor CQ will be strictly less
+ // than the share of the preemptee CQ before the preemption.
+ // The default strategy is ["LessThanOrEqualToFinalShare"].
+ PreemptionStrategies []PreemptionStrategy `json:"preemptionStrategies,omitempty"`
+}
diff --git a/apis/config/v1beta1/defaults.go b/apis/config/v1beta1/defaults.go
index 059530eb0f..1b3d2d012e 100644
--- a/apis/config/v1beta1/defaults.go
+++ b/apis/config/v1beta1/defaults.go
@@ -184,4 +184,7 @@ func SetDefaults_Configuration(cfg *Configuration) {
if cfg.MultiKueue.WorkerLostTimeout == nil {
cfg.MultiKueue.WorkerLostTimeout = &metav1.Duration{Duration: DefaultMultiKueueWorkerLostTimeout}
}
+ if fs := cfg.FairSharing; fs != nil && fs.Enabled && len(fs.PreemptionStrategies) == 0 {
+ fs.PreemptionStrategies = []PreemptionStrategy{LessThanOrEqualToFinalShare}
+ }
}
diff --git a/apis/config/v1beta1/zz_generated.deepcopy.go b/apis/config/v1beta1/zz_generated.deepcopy.go
index 8749fcd8af..daa0f3fa8e 100644
--- a/apis/config/v1beta1/zz_generated.deepcopy.go
+++ b/apis/config/v1beta1/zz_generated.deepcopy.go
@@ -107,6 +107,11 @@ func (in *Configuration) DeepCopyInto(out *Configuration) {
*out = new(MultiKueue)
(*in).DeepCopyInto(*out)
}
+ if in.FairSharing != nil {
+ in, out := &in.FairSharing, &out.FairSharing
+ *out = new(FairSharing)
+ (*in).DeepCopyInto(*out)
+ }
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Configuration.
@@ -232,6 +237,26 @@ func (in *ControllerWebhook) DeepCopy() *ControllerWebhook {
return out
}
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *FairSharing) DeepCopyInto(out *FairSharing) {
+ *out = *in
+ if in.PreemptionStrategies != nil {
+ in, out := &in.PreemptionStrategies, &out.PreemptionStrategies
+ *out = make([]PreemptionStrategy, len(*in))
+ copy(*out, *in)
+ }
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FairSharing.
+func (in *FairSharing) DeepCopy() *FairSharing {
+ if in == nil {
+ return nil
+ }
+ out := new(FairSharing)
+ in.DeepCopyInto(out)
+ return out
+}
+
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Integrations) DeepCopyInto(out *Integrations) {
*out = *in
diff --git a/apis/kueue/v1beta1/clusterqueue_types.go b/apis/kueue/v1beta1/clusterqueue_types.go
index 08126827b9..5bb323a802 100644
--- a/apis/kueue/v1beta1/clusterqueue_types.go
+++ b/apis/kueue/v1beta1/clusterqueue_types.go
@@ -121,6 +121,9 @@ type ClusterQueueSpec struct {
// +kubebuilder:validation:Enum=None;Hold;HoldAndDrain
// +kubebuilder:default="None"
StopPolicy *StopPolicy `json:"stopPolicy,omitempty"`
+
+ // fairSharing defines the properties of the ClusterQueue when participating in fair sharing.
+ FairSharing *FairSharing `json:"fairSharing,omitempty"`
}
// AdmissionCheckStrategy defines a strategy for a AdmissionCheck.
@@ -464,6 +467,19 @@ type BorrowWithinCohort struct {
MaxPriorityThreshold *int32 `json:"maxPriorityThreshold,omitempty"`
}
+// FairSharing contains the properties of the ClusterQueue when participating in fair sharing.
+type FairSharing struct {
+ // weight gives a comparative advantage to this ClusterQueue when competing for unused
+ // resources in the cohort against other ClusterQueues.
+ // The share of a ClusterQueue is based on the dominant resource usage above nominal
+ // quotas for each resource, divided by the weight.
+ // Admission prioritizes scheduling workloads from ClusterQueues with the lowest share
+ // and preempting workloads from the ClusterQueues with the highest share.
+ // A zero weight implies infinite share value, meaning that this ClusterQueue will always
+ // be at disadvantage against other ClusterQueues.
+ Weight *resource.Quantity `json:"weight,omitempty"`
+}
+
// +genclient
// +genclient:nonNamespaced
// +kubebuilder:object:root=true
diff --git a/apis/kueue/v1beta1/zz_generated.deepcopy.go b/apis/kueue/v1beta1/zz_generated.deepcopy.go
index 1e074fa57e..b81f581738 100644
--- a/apis/kueue/v1beta1/zz_generated.deepcopy.go
+++ b/apis/kueue/v1beta1/zz_generated.deepcopy.go
@@ -409,6 +409,11 @@ func (in *ClusterQueueSpec) DeepCopyInto(out *ClusterQueueSpec) {
*out = new(StopPolicy)
**out = **in
}
+ if in.FairSharing != nil {
+ in, out := &in.FairSharing, &out.FairSharing
+ *out = new(FairSharing)
+ (*in).DeepCopyInto(*out)
+ }
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterQueueSpec.
@@ -462,6 +467,26 @@ func (in *ClusterQueueStatus) DeepCopy() *ClusterQueueStatus {
return out
}
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *FairSharing) DeepCopyInto(out *FairSharing) {
+ *out = *in
+ if in.Weight != nil {
+ in, out := &in.Weight, &out.Weight
+ x := (*in).DeepCopy()
+ *out = &x
+ }
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FairSharing.
+func (in *FairSharing) DeepCopy() *FairSharing {
+ if in == nil {
+ return nil
+ }
+ out := new(FairSharing)
+ in.DeepCopyInto(out)
+ return out
+}
+
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *FlavorFungibility) DeepCopyInto(out *FlavorFungibility) {
*out = *in
diff --git a/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml b/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml
index cdc1cdcbbe..31b8bb74a8 100644
--- a/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml
+++ b/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml
@@ -133,6 +133,26 @@ spec:
maxLength: 253
pattern: ^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$
type: string
+ fairSharing:
+ description: fairSharing defines the properties of the ClusterQueue
+ when participating in fair sharing.
+ properties:
+ weight:
+ anyOf:
+ - type: integer
+ - type: string
+ description: |-
+ weight gives a comparative advantage to this ClusterQueue when competing for unused
+ resources in the cohort against other ClusterQueues.
+ The share of a ClusterQueue is based on the dominant resource usage above nominal
+ quotas for each resource, divided by the weight.
+ Admission prioritizes scheduling workloads from ClusterQueues with the lowest share
+ and preempting workloads from the ClusterQueues with the highest share.
+ A zero weight implies infinite share value, meaning that this ClusterQueue will always
+ be at disadvantage against other ClusterQueues.
+ pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
+ x-kubernetes-int-or-string: true
+ type: object
flavorFungibility:
default: {}
description: |-
diff --git a/client-go/applyconfiguration/kueue/v1beta1/clusterqueuespec.go b/client-go/applyconfiguration/kueue/v1beta1/clusterqueuespec.go
index 1ae653aa53..1794cea5d0 100644
--- a/client-go/applyconfiguration/kueue/v1beta1/clusterqueuespec.go
+++ b/client-go/applyconfiguration/kueue/v1beta1/clusterqueuespec.go
@@ -34,6 +34,7 @@ type ClusterQueueSpecApplyConfiguration struct {
AdmissionChecks []string `json:"admissionChecks,omitempty"`
AdmissionChecksStrategy *AdmissionChecksStrategyApplyConfiguration `json:"admissionChecksStrategy,omitempty"`
StopPolicy *kueuev1beta1.StopPolicy `json:"stopPolicy,omitempty"`
+ FairSharing *FairSharingApplyConfiguration `json:"fairSharing,omitempty"`
}
// ClusterQueueSpecApplyConfiguration constructs an declarative configuration of the ClusterQueueSpec type for use with
@@ -120,3 +121,11 @@ func (b *ClusterQueueSpecApplyConfiguration) WithStopPolicy(value kueuev1beta1.S
b.StopPolicy = &value
return b
}
+
+// WithFairSharing sets the FairSharing field in the declarative configuration to the given value
+// and returns the receiver, so that objects can be built by chaining "With" function invocations.
+// If called multiple times, the FairSharing field is set to the value of the last call.
+func (b *ClusterQueueSpecApplyConfiguration) WithFairSharing(value *FairSharingApplyConfiguration) *ClusterQueueSpecApplyConfiguration {
+ b.FairSharing = value
+ return b
+}
diff --git a/client-go/applyconfiguration/kueue/v1beta1/fairsharing.go b/client-go/applyconfiguration/kueue/v1beta1/fairsharing.go
new file mode 100644
index 0000000000..e5bba5b4ac
--- /dev/null
+++ b/client-go/applyconfiguration/kueue/v1beta1/fairsharing.go
@@ -0,0 +1,42 @@
+/*
+Copyright The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+// Code generated by applyconfiguration-gen. DO NOT EDIT.
+
+package v1beta1
+
+import (
+ resource "k8s.io/apimachinery/pkg/api/resource"
+)
+
+// FairSharingApplyConfiguration represents an declarative configuration of the FairSharing type for use
+// with apply.
+type FairSharingApplyConfiguration struct {
+ Weight *resource.Quantity `json:"weight,omitempty"`
+}
+
+// FairSharingApplyConfiguration constructs an declarative configuration of the FairSharing type for use with
+// apply.
+func FairSharing() *FairSharingApplyConfiguration {
+ return &FairSharingApplyConfiguration{}
+}
+
+// WithWeight sets the Weight field in the declarative configuration to the given value
+// and returns the receiver, so that objects can be built by chaining "With" function invocations.
+// If called multiple times, the Weight field is set to the value of the last call.
+func (b *FairSharingApplyConfiguration) WithWeight(value resource.Quantity) *FairSharingApplyConfiguration {
+ b.Weight = &value
+ return b
+}
diff --git a/client-go/applyconfiguration/utils.go b/client-go/applyconfiguration/utils.go
index 8bd03f6f73..0716a64185 100644
--- a/client-go/applyconfiguration/utils.go
+++ b/client-go/applyconfiguration/utils.go
@@ -76,6 +76,8 @@ func ForKind(kind schema.GroupVersionKind) interface{} {
return &kueuev1beta1.ClusterQueueSpecApplyConfiguration{}
case v1beta1.SchemeGroupVersion.WithKind("ClusterQueueStatus"):
return &kueuev1beta1.ClusterQueueStatusApplyConfiguration{}
+ case v1beta1.SchemeGroupVersion.WithKind("FairSharing"):
+ return &kueuev1beta1.FairSharingApplyConfiguration{}
case v1beta1.SchemeGroupVersion.WithKind("FlavorFungibility"):
return &kueuev1beta1.FlavorFungibilityApplyConfiguration{}
case v1beta1.SchemeGroupVersion.WithKind("FlavorQuotas"):
diff --git a/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml b/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml
index 7b39187ad6..65b14b5516 100644
--- a/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml
+++ b/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml
@@ -118,6 +118,26 @@ spec:
maxLength: 253
pattern: ^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$
type: string
+ fairSharing:
+ description: fairSharing defines the properties of the ClusterQueue
+ when participating in fair sharing.
+ properties:
+ weight:
+ anyOf:
+ - type: integer
+ - type: string
+ description: |-
+ weight gives a comparative advantage to this ClusterQueue when competing for unused
+ resources in the cohort against other ClusterQueues.
+ The share of a ClusterQueue is based on the dominant resource usage above nominal
+ quotas for each resource, divided by the weight.
+ Admission prioritizes scheduling workloads from ClusterQueues with the lowest share
+ and preempting workloads from the ClusterQueues with the highest share.
+ A zero weight implies infinite share value, meaning that this ClusterQueue will always
+ be at disadvantage against other ClusterQueues.
+ pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
+ x-kubernetes-int-or-string: true
+ type: object
flavorFungibility:
default: {}
description: |-
diff --git a/pkg/config/validation.go b/pkg/config/validation.go
index 5e58bc5ae5..b42cbbf834 100644
--- a/pkg/config/validation.go
+++ b/pkg/config/validation.go
@@ -25,6 +25,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/validation"
"k8s.io/apimachinery/pkg/labels"
+ "k8s.io/apimachinery/pkg/util/sets"
apimachineryvalidation "k8s.io/apimachinery/pkg/util/validation"
"k8s.io/apimachinery/pkg/util/validation/field"
@@ -45,6 +46,7 @@ var (
waitForPodsReadyPath = field.NewPath("waitForPodsReady")
requeuingStrategyPath = waitForPodsReadyPath.Child("requeuingStrategy")
multiKueuePath = field.NewPath("multiKueue")
+ fsPreemptionStrategiesPath = field.NewPath("fairSharing", "preemptionStrategies")
)
func validate(c *configapi.Configuration) field.ErrorList {
@@ -58,7 +60,7 @@ func validate(c *configapi.Configuration) field.ErrorList {
allErrs = append(allErrs, validateIntegrations(c)...)
allErrs = append(allErrs, validateMultiKueue(c)...)
-
+ allErrs = append(allErrs, validateFairSharing(c)...)
return allErrs
}
@@ -174,3 +176,27 @@ func validatePodIntegrationOptions(c *configapi.Configuration) field.ErrorList {
return allErrs
}
+
+func validateFairSharing(c *configapi.Configuration) field.ErrorList {
+ fs := c.FairSharing
+ if fs == nil {
+ return nil
+ }
+ var allErrs field.ErrorList
+ if !fs.Enabled && len(fs.PreemptionStrategies) != 0 {
+ allErrs = append(allErrs, field.Invalid(fsPreemptionStrategiesPath, fs.PreemptionStrategies, "Must be empty when fair sharing is disabled"))
+ }
+ strategies := sets.New[configapi.PreemptionStrategy]()
+ validStrategies := []configapi.PreemptionStrategy{configapi.LessThanInitialShare, configapi.LessThanOrEqualToFinalShare}
+ for i, s := range fs.PreemptionStrategies {
+ path := fsPreemptionStrategiesPath.Index(i)
+ if slices.Index(validStrategies, s) == -1 {
+ allErrs = append(allErrs, field.NotSupported(path, s, validStrategies))
+ }
+ if strategies.Has(s) {
+ allErrs = append(allErrs, field.Duplicate(path, s))
+ }
+ strategies.Insert(s)
+ }
+ return allErrs
+}
diff --git a/pkg/config/validation_test.go b/pkg/config/validation_test.go
index 74815b581d..0106056f99 100644
--- a/pkg/config/validation_test.go
+++ b/pkg/config/validation_test.go
@@ -349,6 +349,48 @@ func TestValidate(t *testing.T) {
},
},
},
+ "preemption strategies when fair sharing is disabled": {
+ cfg: &configapi.Configuration{
+ Integrations: defaultIntegrations,
+ FairSharing: &configapi.FairSharing{
+ PreemptionStrategies: []configapi.PreemptionStrategy{configapi.LessThanOrEqualToFinalShare},
+ },
+ },
+ wantErr: field.ErrorList{
+ &field.Error{
+ Type: field.ErrorTypeInvalid,
+ Field: "fairSharing.preemptionStrategies",
+ },
+ },
+ },
+ "Unknown and duplicated preemption strategies": {
+ cfg: &configapi.Configuration{
+ Integrations: defaultIntegrations,
+ FairSharing: &configapi.FairSharing{
+ Enabled: true,
+ PreemptionStrategies: []configapi.PreemptionStrategy{configapi.LessThanOrEqualToFinalShare, "UNKNOWN", configapi.LessThanInitialShare, configapi.LessThanOrEqualToFinalShare},
+ },
+ },
+ wantErr: field.ErrorList{
+ &field.Error{
+ Type: field.ErrorTypeNotSupported,
+ Field: "fairSharing.preemptionStrategies[1]",
+ },
+ &field.Error{
+ Type: field.ErrorTypeDuplicate,
+ Field: "fairSharing.preemptionStrategies[3]",
+ },
+ },
+ },
+ "valid preemption strategy": {
+ cfg: &configapi.Configuration{
+ Integrations: defaultIntegrations,
+ FairSharing: &configapi.FairSharing{
+ Enabled: true,
+ PreemptionStrategies: []configapi.PreemptionStrategy{configapi.LessThanOrEqualToFinalShare, configapi.LessThanInitialShare},
+ },
+ },
+ },
}
for name, tc := range testCases {
diff --git a/site/content/en/docs/reference/kueue-config.v1beta1.md b/site/content/en/docs/reference/kueue-config.v1beta1.md
index da4d406598..4122ac8f51 100644
--- a/site/content/en/docs/reference/kueue-config.v1beta1.md
+++ b/site/content/en/docs/reference/kueue-config.v1beta1.md
@@ -163,6 +163,13 @@ pending workloads.
MultiKueue controls the behaviour of the MultiKueue AdmissionCheck Controller.
+fairSharing [Required]
+FairSharing
+ |
+
+ FairSharing controls the fair sharing semantics across the cluster.
+ |
+
@@ -395,6 +402,47 @@ must be named tls.key and tls.crt, respectively.
+## `FairSharing` {#FairSharing}
+
+
+**Appears in:**
+
+
+
+
+
+Field | Description |
+
+
+
+enable [Required]
+bool
+ |
+
+ enable indicates whether to enable fair sharing for all cohorts.
+Defaults to false.
+ |
+
+preemptionStrategies [Required]
+[]PreemptionStrategy
+ |
+
+ preemptionStrategies indicates which constraints should a preemption satisfy.
+The preemption algorithm will only use the next strategy in the list if the
+incoming workload (preemptor) doesn't fit after using the previous strategies.
+Possible values are:
+
+- LessThanOrEqualToFinalShare: Only preempt if the share of the preemptor CQ
+will be less than or equal to the share of the preemptee CQ after the preemption occurs.
+- LessThanInitialShare: Only preempt if the share of the preemptor CQ will be strictly less
+than the share of the preemptee CQ before the preemption.
+The default strategy is ["LessThanOrEqualToFinalShare"].
+
+ |
+
+
+
+
## `Integrations` {#Integrations}
@@ -578,6 +626,18 @@ if the connection with its reserving worker cluster is lost.
+## `PreemptionStrategy` {#PreemptionStrategy}
+
+(Alias of `string`)
+
+**Appears in:**
+
+- [FairSharing](#FairSharing)
+
+
+
+
+
## `QueueVisibility` {#QueueVisibility}
diff --git a/site/content/en/docs/reference/kueue.v1beta1.md b/site/content/en/docs/reference/kueue.v1beta1.md
index 069a5cbff7..7a54ea1ed5 100644
--- a/site/content/en/docs/reference/kueue.v1beta1.md
+++ b/site/content/en/docs/reference/kueue.v1beta1.md
@@ -842,6 +842,13 @@ made.
+fairSharing [Required]
+FairSharing
+ |
+
+ fairSharing defines the properties of the ClusterQueue when participating in fair sharing.
+ |
+
@@ -920,6 +927,39 @@ status of the pending workloads in the cluster queue.
+## `FairSharing` {#kueue-x-k8s-io-v1beta1-FairSharing}
+
+
+**Appears in:**
+
+- [ClusterQueueSpec](#kueue-x-k8s-io-v1beta1-ClusterQueueSpec)
+
+
+FairSharing contains the properties of the ClusterQueue when participating in fair sharing.
+
+
+
+Field | Description |
+
+
+
+weight [Required]
+k8s.io/apimachinery/pkg/api/resource.Quantity
+ |
+
+ weight gives a comparative advantage to this ClusterQueue when competing for unused
+resources in the cohort against other ClusterQueues.
+The share of a ClusterQueue is based on the dominant resource usage above nominal
+quotas for each resource, divided by the weight.
+Admission prioritizes scheduling workloads from ClusterQueues with the lowest share
+and preempting workloads from the ClusterQueues with the highest share.
+A zero weight implies infinite share value, meaning that this ClusterQueue will always
+be at disadvantage against other ClusterQueues.
+ |
+
+
+
+
## `FlavorFungibility` {#kueue-x-k8s-io-v1beta1-FlavorFungibility}
From 9f708276726de777373d79b257074e8fbf0a4a90 Mon Sep 17 00:00:00 2001
From: Aldo Culquicondor
Date: Fri, 26 Apr 2024 18:55:06 +0000
Subject: [PATCH 02/10] Validation and defaulting for fair sharing
Change-Id: If347eabdb17af643d46a1e4ad78b79e73c424011
---
apis/config/v1beta1/defaults.go | 2 +-
apis/config/v1beta1/defaults_test.go | 25 +++++++
apis/kueue/v1beta1/clusterqueue_types.go | 2 +
.../crd/kueue.x-k8s.io_clusterqueues.yaml | 6 +-
.../bases/kueue.x-k8s.io_clusterqueues.yaml | 6 +-
.../manager/controller_manager_config.yaml | 3 +
pkg/config/validation.go | 4 +-
pkg/config/validation_test.go | 4 +-
pkg/webhooks/clusterqueue_webhook.go | 14 ++++
.../en/docs/reference/kueue.v1beta1.md | 3 +-
test/integration/webhook/clusterqueue_test.go | 68 +++++++++++++++++++
11 files changed, 127 insertions(+), 10 deletions(-)
diff --git a/apis/config/v1beta1/defaults.go b/apis/config/v1beta1/defaults.go
index 1b3d2d012e..fb8048b9bf 100644
--- a/apis/config/v1beta1/defaults.go
+++ b/apis/config/v1beta1/defaults.go
@@ -184,7 +184,7 @@ func SetDefaults_Configuration(cfg *Configuration) {
if cfg.MultiKueue.WorkerLostTimeout == nil {
cfg.MultiKueue.WorkerLostTimeout = &metav1.Duration{Duration: DefaultMultiKueueWorkerLostTimeout}
}
- if fs := cfg.FairSharing; fs != nil && fs.Enabled && len(fs.PreemptionStrategies) == 0 {
+ if fs := cfg.FairSharing; fs != nil && fs.Enable && len(fs.PreemptionStrategies) == 0 {
fs.PreemptionStrategies = []PreemptionStrategy{LessThanOrEqualToFinalShare}
}
}
diff --git a/apis/config/v1beta1/defaults_test.go b/apis/config/v1beta1/defaults_test.go
index 3385303c99..f214c56263 100644
--- a/apis/config/v1beta1/defaults_test.go
+++ b/apis/config/v1beta1/defaults_test.go
@@ -550,6 +550,31 @@ func TestSetDefaults_Configuration(t *testing.T) {
},
},
},
+ "add default fair sharing configuration when enabled": {
+ original: &Configuration{
+ InternalCertManagement: &InternalCertManagement{
+ Enable: ptr.To(false),
+ },
+ FairSharing: &FairSharing{
+ Enable: true,
+ },
+ },
+ want: &Configuration{
+ Namespace: ptr.To(DefaultNamespace),
+ ControllerManager: defaultCtrlManagerConfigurationSpec,
+ InternalCertManagement: &InternalCertManagement{
+ Enable: ptr.To(false),
+ },
+ ClientConnection: defaultClientConnection,
+ Integrations: defaultIntegrations,
+ QueueVisibility: defaultQueueVisibility,
+ MultiKueue: defaultMultiKueue,
+ FairSharing: &FairSharing{
+ Enable: true,
+ PreemptionStrategies: []PreemptionStrategy{LessThanOrEqualToFinalShare},
+ },
+ },
+ },
}
for name, tc := range testCases {
diff --git a/apis/kueue/v1beta1/clusterqueue_types.go b/apis/kueue/v1beta1/clusterqueue_types.go
index 5bb323a802..c6532f526d 100644
--- a/apis/kueue/v1beta1/clusterqueue_types.go
+++ b/apis/kueue/v1beta1/clusterqueue_types.go
@@ -123,6 +123,7 @@ type ClusterQueueSpec struct {
StopPolicy *StopPolicy `json:"stopPolicy,omitempty"`
// fairSharing defines the properties of the ClusterQueue when participating in fair sharing.
+ // The values are only relevant if fair sharing is enabled in the Kueue configuration.
FairSharing *FairSharing `json:"fairSharing,omitempty"`
}
@@ -477,6 +478,7 @@ type FairSharing struct {
// and preempting workloads from the ClusterQueues with the highest share.
// A zero weight implies infinite share value, meaning that this ClusterQueue will always
// be at disadvantage against other ClusterQueues.
+ // +kubebuilder:default=1
Weight *resource.Quantity `json:"weight,omitempty"`
}
diff --git a/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml b/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml
index 31b8bb74a8..c9c092b231 100644
--- a/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml
+++ b/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml
@@ -134,13 +134,15 @@ spec:
pattern: ^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$
type: string
fairSharing:
- description: fairSharing defines the properties of the ClusterQueue
- when participating in fair sharing.
+ description: |-
+ fairSharing defines the properties of the ClusterQueue when participating in fair sharing.
+ The values are only relevant if fair sharing is enabled in the Kueue configuration.
properties:
weight:
anyOf:
- type: integer
- type: string
+ default: 1
description: |-
weight gives a comparative advantage to this ClusterQueue when competing for unused
resources in the cohort against other ClusterQueues.
diff --git a/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml b/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml
index 65b14b5516..e0f28788d1 100644
--- a/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml
+++ b/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml
@@ -119,13 +119,15 @@ spec:
pattern: ^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$
type: string
fairSharing:
- description: fairSharing defines the properties of the ClusterQueue
- when participating in fair sharing.
+ description: |-
+ fairSharing defines the properties of the ClusterQueue when participating in fair sharing.
+ The values are only relevant if fair sharing is enabled in the Kueue configuration.
properties:
weight:
anyOf:
- type: integer
- type: string
+ default: 1
description: |-
weight gives a comparative advantage to this ClusterQueue when competing for unused
resources in the cohort against other ClusterQueues.
diff --git a/config/components/manager/controller_manager_config.yaml b/config/components/manager/controller_manager_config.yaml
index 13e58ac5df..5e01ceb534 100644
--- a/config/components/manager/controller_manager_config.yaml
+++ b/config/components/manager/controller_manager_config.yaml
@@ -50,3 +50,6 @@ integrations:
# - key: kubernetes.io/metadata.name
# operator: NotIn
# values: [ kube-system, kueue-system ]
+fairSharing:
+ enable: true
+ preemptionStrategies: [LessThanOrEqualToFinalShare]
\ No newline at end of file
diff --git a/pkg/config/validation.go b/pkg/config/validation.go
index b42cbbf834..b4912c814c 100644
--- a/pkg/config/validation.go
+++ b/pkg/config/validation.go
@@ -183,14 +183,14 @@ func validateFairSharing(c *configapi.Configuration) field.ErrorList {
return nil
}
var allErrs field.ErrorList
- if !fs.Enabled && len(fs.PreemptionStrategies) != 0 {
+ if !fs.Enable && len(fs.PreemptionStrategies) != 0 {
allErrs = append(allErrs, field.Invalid(fsPreemptionStrategiesPath, fs.PreemptionStrategies, "Must be empty when fair sharing is disabled"))
}
strategies := sets.New[configapi.PreemptionStrategy]()
validStrategies := []configapi.PreemptionStrategy{configapi.LessThanInitialShare, configapi.LessThanOrEqualToFinalShare}
for i, s := range fs.PreemptionStrategies {
path := fsPreemptionStrategiesPath.Index(i)
- if slices.Index(validStrategies, s) == -1 {
+ if !slices.Contains(validStrategies, s) {
allErrs = append(allErrs, field.NotSupported(path, s, validStrategies))
}
if strategies.Has(s) {
diff --git a/pkg/config/validation_test.go b/pkg/config/validation_test.go
index 0106056f99..10d69c597f 100644
--- a/pkg/config/validation_test.go
+++ b/pkg/config/validation_test.go
@@ -367,7 +367,7 @@ func TestValidate(t *testing.T) {
cfg: &configapi.Configuration{
Integrations: defaultIntegrations,
FairSharing: &configapi.FairSharing{
- Enabled: true,
+ Enable: true,
PreemptionStrategies: []configapi.PreemptionStrategy{configapi.LessThanOrEqualToFinalShare, "UNKNOWN", configapi.LessThanInitialShare, configapi.LessThanOrEqualToFinalShare},
},
},
@@ -386,7 +386,7 @@ func TestValidate(t *testing.T) {
cfg: &configapi.Configuration{
Integrations: defaultIntegrations,
FairSharing: &configapi.FairSharing{
- Enabled: true,
+ Enable: true,
PreemptionStrategies: []configapi.PreemptionStrategy{configapi.LessThanOrEqualToFinalShare, configapi.LessThanInitialShare},
},
},
diff --git a/pkg/webhooks/clusterqueue_webhook.go b/pkg/webhooks/clusterqueue_webhook.go
index cc640a8652..b8793018fe 100644
--- a/pkg/webhooks/clusterqueue_webhook.go
+++ b/pkg/webhooks/clusterqueue_webhook.go
@@ -106,6 +106,9 @@ func ValidateClusterQueue(cq *kueue.ClusterQueue) field.ErrorList {
if cq.Spec.Preemption != nil {
allErrs = append(allErrs, validatePreemption(cq.Spec.Preemption, path.Child("preemption"))...)
}
+ if cq.Spec.FairSharing != nil {
+ allErrs = append(allErrs, validateFairSharing(cq.Spec.FairSharing, path.Child("fairSharing"))...)
+ }
return allErrs
}
@@ -215,3 +218,14 @@ func validateLendingLimit(lend, nominal resource.Quantity, fldPath *field.Path)
}
return allErrs
}
+
+func validateFairSharing(fs *kueue.FairSharing, fldPath *field.Path) field.ErrorList {
+ if fs == nil {
+ return nil
+ }
+ var allErrs field.ErrorList
+ if fs.Weight != nil && fs.Weight.Cmp(resource.Quantity{}) < 0 {
+ allErrs = append(allErrs, field.Invalid(fldPath, fs.Weight.String(), constants.IsNegativeErrorMsg))
+ }
+ return allErrs
+}
diff --git a/site/content/en/docs/reference/kueue.v1beta1.md b/site/content/en/docs/reference/kueue.v1beta1.md
index 7a54ea1ed5..baa7d9bf6f 100644
--- a/site/content/en/docs/reference/kueue.v1beta1.md
+++ b/site/content/en/docs/reference/kueue.v1beta1.md
@@ -846,7 +846,8 @@ made.
FairSharing
- fairSharing defines the properties of the ClusterQueue when participating in fair sharing.
+ fairSharing defines the properties of the ClusterQueue when participating in fair sharing.
+The values are only relevant if fair sharing is enabled in the Kueue configuration.
|
diff --git a/test/integration/webhook/clusterqueue_test.go b/test/integration/webhook/clusterqueue_test.go
index 0f967c06ba..aee24337b5 100644
--- a/test/integration/webhook/clusterqueue_test.go
+++ b/test/integration/webhook/clusterqueue_test.go
@@ -21,6 +21,7 @@ import (
"github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -134,6 +135,37 @@ var _ = ginkgo.Describe("ClusterQueue Webhook", func() {
},
},
),
+ ginkgo.Entry("Default fair sharing",
+ kueue.ClusterQueue{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "foo",
+ },
+ Spec: kueue.ClusterQueueSpec{
+ FairSharing: &kueue.FairSharing{},
+ },
+ },
+ kueue.ClusterQueue{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "foo",
+ Finalizers: []string{kueue.ResourceInUseFinalizerName},
+ },
+ Spec: kueue.ClusterQueueSpec{
+ QueueingStrategy: kueue.BestEffortFIFO,
+ StopPolicy: ptr.To(kueue.None),
+ FlavorFungibility: defaultFlavorFungibility,
+ Preemption: &kueue.ClusterQueuePreemption{
+ ReclaimWithinCohort: kueue.PreemptionPolicyNever,
+ WithinClusterQueue: kueue.PreemptionPolicyNever,
+ BorrowWithinCohort: &kueue.BorrowWithinCohort{
+ Policy: kueue.BorrowWithinCohortPolicyNever,
+ },
+ },
+ FairSharing: &kueue.FairSharing{
+ Weight: ptr.To(resource.MustParse("1")),
+ },
+ },
+ },
+ ),
)
ginkgo.It("Should have qualified flavor names when updating", func() {
@@ -532,6 +564,42 @@ var _ = ginkgo.Describe("ClusterQueue Webhook", func() {
},
},
isValid),
+ ginkgo.Entry("Should allow zero fair share weight",
+ &kueue.ClusterQueue{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "cluster-queue",
+ },
+ Spec: kueue.ClusterQueueSpec{
+ FairSharing: &kueue.FairSharing{
+ Weight: ptr.To(resource.MustParse("0")),
+ },
+ },
+ },
+ isValid),
+ ginkgo.Entry("Should allow fractional weight",
+ &kueue.ClusterQueue{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "cluster-queue",
+ },
+ Spec: kueue.ClusterQueueSpec{
+ FairSharing: &kueue.FairSharing{
+ Weight: ptr.To(resource.MustParse("0.1")),
+ },
+ },
+ },
+ isValid),
+ ginkgo.Entry("Should forbid negative weight",
+ &kueue.ClusterQueue{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "cluster-queue",
+ },
+ Spec: kueue.ClusterQueueSpec{
+ FairSharing: &kueue.FairSharing{
+ Weight: ptr.To(resource.MustParse("-1")),
+ },
+ },
+ },
+ isForbidden),
)
})
})
From 5e569377ea49cafce6f0c32a199aea32800f55d2 Mon Sep 17 00:00:00 2001
From: Aldo Culquicondor
Date: Tue, 30 Apr 2024 15:18:43 +0000
Subject: [PATCH 03/10] Implement multiple fair strategies
Change-Id: I6938d83399c55fecf952a570e2e431ad4ab479b2
---
cmd/kueue/main.go | 1 +
pkg/scheduler/preemption/preemption.go | 61 +++++++++++++++-----
pkg/scheduler/preemption/preemption_test.go | 64 +++++++++++++++++++--
pkg/scheduler/scheduler.go | 21 +++----
pkg/scheduler/scheduler_test.go | 2 +-
5 files changed, 117 insertions(+), 32 deletions(-)
diff --git a/cmd/kueue/main.go b/cmd/kueue/main.go
index fb9ef9671b..b19c7b75ae 100644
--- a/cmd/kueue/main.go
+++ b/cmd/kueue/main.go
@@ -317,6 +317,7 @@ func setupScheduler(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manager
mgr.GetClient(),
mgr.GetEventRecorderFor(constants.AdmissionName),
scheduler.WithPodsReadyRequeuingTimestamp(podsReadyRequeuingTimestamp(cfg)),
+ scheduler.WithFairSharing(cfg.FairSharing),
)
if err := mgr.Add(sched); err != nil {
setupLog.Error(err, "Unable to add scheduler to manager")
diff --git a/pkg/scheduler/preemption/preemption.go b/pkg/scheduler/preemption/preemption.go
index 6cd9d3a0f7..70c569320b 100644
--- a/pkg/scheduler/preemption/preemption.go
+++ b/pkg/scheduler/preemption/preemption.go
@@ -33,6 +33,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
+ config "sigs.k8s.io/kueue/apis/config/v1beta1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/scheduler/flavorassigner"
@@ -50,17 +51,19 @@ type Preemptor struct {
workloadOrdering workload.Ordering
enableFairSharing bool
+ fsStrategies []fsStrategy
// stubs
applyPreemption func(context.Context, *kueue.Workload, string, string) error
}
-func New(cl client.Client, workloadOrdering workload.Ordering, recorder record.EventRecorder, enableFairSharing bool) *Preemptor {
+func New(cl client.Client, workloadOrdering workload.Ordering, recorder record.EventRecorder, fs config.FairSharing) *Preemptor {
p := &Preemptor{
client: cl,
recorder: recorder,
workloadOrdering: workloadOrdering,
- enableFairSharing: enableFairSharing,
+ enableFairSharing: fs.Enable,
+ fsStrategies: parseStrategies(fs.PreemptionStrategies),
}
p.applyPreemption = p.applyPreemptionWithSSA
return p
@@ -117,7 +120,7 @@ func (p *Preemptor) GetTargets(wl workload.Info, assignment flavorassigner.Assig
borrowWithinCohort, thresholdPrio := canBorrowWithinCohort(cq, wl.Obj)
if p.enableFairSharing {
- return fairPreemptions(&wl, assignment, snapshot, resPerFlv, candidates, thresholdPrio)
+ return p.fairPreemptions(&wl, assignment, snapshot, resPerFlv, candidates, thresholdPrio)
}
// There is a potential of preemption of workloads from the other queue in the
// cohort. We proceed with borrowing only if the dedicated policy
@@ -269,7 +272,38 @@ func restoreSnapshot(snapshot *cache.Snapshot, targets []*workload.Info) {
}
}
-func fairPreemptions(wl *workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowingBelowPriority *int32) []*workload.Info {
+type fsStrategy func(preemptorNewShare, preempteeOldShare, preempteeNewShare int) bool
+
+// lessThanOrEqualToFinalShare implements Rule S2-a in https://sigs.k8s.io/kueue/keps/1714-fair-sharing#choosing-workloads-from-clusterqueues-for-preemption
+func lessThanOrEqualToFinalShare(preemptorNewShare, _, preempteeNewShare int) bool {
+ return preemptorNewShare <= preempteeNewShare
+}
+
+// lessThanInitialShare implements rule S2-b in https://sigs.k8s.io/kueue/keps/1714-fair-sharing#choosing-workloads-from-clusterqueues-for-preemption
+func lessThanInitialShare(preemptorNewShare, preempteeOldShare, _ int) bool {
+ return preemptorNewShare < preempteeOldShare
+}
+
+// parseStrategies converts an array of strategies into the functions to the used by the algorithm.
+// This function takes advantage of the properties of the preemption algorithm and the strategies.
+// The number of functions returned might not match the input slice.
+func parseStrategies(s []config.PreemptionStrategy) []fsStrategy {
+ result := []fsStrategy{lessThanOrEqualToFinalShare}
+ if len(s) == 0 {
+ return result
+ }
+ if s[0] == config.LessThanInitialShare {
+ result[0] = lessThanInitialShare
+ // This rule is a superset of the other rule, no need to check other strategies.
+ return result
+ }
+ if len(s) == 1 {
+ return result
+ }
+ return append(result, lessThanInitialShare)
+}
+
+func (p *Preemptor) fairPreemptions(wl *workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowingBelowPriority *int32) []*workload.Info {
cqHeap := cqHeapFromCandidates(candidates, false, snapshot)
nominatedCQ := snapshot.ClusterQueues[wl.ClusterQueue]
wlReq := assignment.TotalRequestsFor(wl)
@@ -299,9 +333,8 @@ func fairPreemptions(wl *workload.Info, assignment flavorassigner.Assignment, sn
for i, candWl := range candCQ.workloads {
belowThreshold := allowBorrowingBelowPriority != nil && priority.Priority(candWl.Obj) < *allowBorrowingBelowPriority
- // Rule S2-a in https://sigs.k8s.io/kueue/keps/1714-fair-sharing#choosing-workloads-from-clusterqueues-for-preemption
newCandShareVal, _ := candCQ.cq.DominantResourceShareWithout(candWl)
- if belowThreshold || newNominatedShareValue <= newCandShareVal {
+ if belowThreshold || p.fsStrategies[0](newNominatedShareValue, candCQ.share, newCandShareVal) {
snapshot.RemoveWorkload(candWl)
targets = append(targets, candWl)
if workloadFits(wlReq, nominatedCQ, true) {
@@ -320,14 +353,13 @@ func fairPreemptions(wl *workload.Info, assignment flavorassigner.Assignment, sn
}
}
}
- if !fits {
- // Try rule S2-b in https://sigs.k8s.io/kueue/keps/1714-fair-sharing#choosing-workloads-from-clusterqueues-for-preemption
- // if rule S2-a was not enough.
+ if !fits && len(p.fsStrategies) > 1 {
+ // Try next strategy if the previous strategy wasn't enough
cqHeap = cqHeapFromCandidates(retryCandidates, true, snapshot)
for cqHeap.Len() > 0 && !fits {
candCQ := cqHeap.Pop()
- if newNominatedShareValue < candCQ.share {
+ if p.fsStrategies[1](newNominatedShareValue, candCQ.share, 0 /* irrelevant */) {
// The criteria doesn't depend on the preempted workload, so just preempt the first candidate.
candWl := candCQ.workloads[0]
snapshot.RemoveWorkload(candWl)
@@ -339,11 +371,10 @@ func fairPreemptions(wl *workload.Info, assignment flavorassigner.Assignment, sn
// it's possible to apply rule S2-b more than once in a CQ.
}
}
-
- if !fits {
- restoreSnapshot(snapshot, targets)
- return nil
- }
+ }
+ if !fits {
+ restoreSnapshot(snapshot, targets)
+ return nil
}
targets = fillBackWorkloads(targets, wlReq, nominatedCQ, snapshot, true)
restoreSnapshot(snapshot, targets)
diff --git a/pkg/scheduler/preemption/preemption_test.go b/pkg/scheduler/preemption/preemption_test.go
index 93db8b152b..9b22b8f165 100644
--- a/pkg/scheduler/preemption/preemption_test.go
+++ b/pkg/scheduler/preemption/preemption_test.go
@@ -33,6 +33,7 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/utils/ptr"
+ config "sigs.k8s.io/kueue/apis/config/v1beta1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/constants"
@@ -1402,7 +1403,7 @@ func TestPreemption(t *testing.T) {
broadcaster := record.NewBroadcaster()
scheme := runtime.NewScheme()
recorder := broadcaster.NewRecorder(scheme, corev1.EventSource{Component: constants.AdmissionName})
- preemptor := New(cl, workload.Ordering{}, recorder, false)
+ preemptor := New(cl, workload.Ordering{}, recorder, config.FairSharing{})
preemptor.applyPreemption = func(ctx context.Context, w *kueue.Workload, _, _ string) error {
lock.Lock()
gotPreempted.Insert(workload.Key(w))
@@ -1439,7 +1440,7 @@ func TestFairPreemptions(t *testing.T) {
flavors := []*kueue.ResourceFlavor{
utiltesting.MakeResourceFlavor("default").Obj(),
}
- clusterQueues := []*kueue.ClusterQueue{
+ baseCQs := []*kueue.ClusterQueue{
utiltesting.MakeClusterQueue("a").
Cohort("all").
ResourceGroup(*utiltesting.MakeFlavorQuotas("default").
@@ -1487,12 +1488,15 @@ func TestFairPreemptions(t *testing.T) {
}
unitWl := *utiltesting.MakeWorkload("unit", "").Request(corev1.ResourceCPU, "1")
cases := map[string]struct {
+ clusterQueues []*kueue.ClusterQueue
+ strategies []config.PreemptionStrategy
admitted []kueue.Workload
incoming *kueue.Workload
targetCQ string
wantPreempted sets.Set[string]
}{
"reclaim nominal from user using the most": {
+ clusterQueues: baseCQs,
admitted: []kueue.Workload{
*unitWl.Clone().Name("a1").SimpleReserveQuota("a", "default", now).Obj(),
*unitWl.Clone().Name("a2").SimpleReserveQuota("a", "default", now).Obj(),
@@ -1509,6 +1513,7 @@ func TestFairPreemptions(t *testing.T) {
wantPreempted: sets.New("/b1"),
},
"can reclaim from queue using less, if taking the latest workload from user using the most isn't enough": {
+ clusterQueues: baseCQs,
admitted: []kueue.Workload{
*utiltesting.MakeWorkload("a1", "").Request(corev1.ResourceCPU, "3").SimpleReserveQuota("a", "default", now).Obj(),
*utiltesting.MakeWorkload("a2", "").Request(corev1.ResourceCPU, "1").SimpleReserveQuota("a", "default", now).Obj(),
@@ -1520,6 +1525,7 @@ func TestFairPreemptions(t *testing.T) {
wantPreempted: sets.New("/a1"), // attempts to preempt b1, but it's not enough.
},
"reclaim borrowable quota from user using the most": {
+ clusterQueues: baseCQs,
admitted: []kueue.Workload{
*unitWl.Clone().Name("a1").SimpleReserveQuota("a", "default", now).Obj(),
*unitWl.Clone().Name("a2").SimpleReserveQuota("a", "default", now).Obj(),
@@ -1536,6 +1542,7 @@ func TestFairPreemptions(t *testing.T) {
wantPreempted: sets.New("/b1"),
},
"preempt one from each CQ borrowing": {
+ clusterQueues: baseCQs,
admitted: []kueue.Workload{
*utiltesting.MakeWorkload("a1", "").Request(corev1.ResourceCPU, "0.5").SimpleReserveQuota("a", "default", now).Obj(),
*utiltesting.MakeWorkload("a2", "").Request(corev1.ResourceCPU, "0.5").SimpleReserveQuota("a", "default", now).Obj(),
@@ -1549,6 +1556,7 @@ func TestFairPreemptions(t *testing.T) {
wantPreempted: sets.New("/a1", "/b1"),
},
"can't preempt when everyone under nominal": {
+ clusterQueues: baseCQs,
admitted: []kueue.Workload{
*unitWl.Clone().Name("a1").SimpleReserveQuota("a", "default", now).Obj(),
*unitWl.Clone().Name("a2").SimpleReserveQuota("a", "default", now).Obj(),
@@ -1564,6 +1572,7 @@ func TestFairPreemptions(t *testing.T) {
targetCQ: "c",
},
"can't preempt when it would switch the imbalance": {
+ clusterQueues: baseCQs,
admitted: []kueue.Workload{
*unitWl.Clone().Name("a1").SimpleReserveQuota("a", "default", now).Obj(),
*unitWl.Clone().Name("a2").SimpleReserveQuota("a", "default", now).Obj(),
@@ -1578,6 +1587,7 @@ func TestFairPreemptions(t *testing.T) {
targetCQ: "a",
},
"can preempt lower priority workloads from same CQ": {
+ clusterQueues: baseCQs,
admitted: []kueue.Workload{
*unitWl.Clone().Name("a1_low").Priority(-1).SimpleReserveQuota("a", "default", now).Obj(),
*unitWl.Clone().Name("a2_low").Priority(-1).SimpleReserveQuota("a", "default", now).Obj(),
@@ -1594,6 +1604,7 @@ func TestFairPreemptions(t *testing.T) {
wantPreempted: sets.New("/a1_low", "/a2_low"),
},
"can preempt a combination of same CQ and highest user": {
+ clusterQueues: baseCQs,
admitted: []kueue.Workload{
*unitWl.Clone().Name("a_low").Priority(-1).SimpleReserveQuota("a", "default", now).Obj(),
*unitWl.Clone().Name("a2").SimpleReserveQuota("a", "default", now).Obj(),
@@ -1610,6 +1621,7 @@ func TestFairPreemptions(t *testing.T) {
wantPreempted: sets.New("/a_low", "/b1"),
},
"preempt huge workload if there is no other option, as long as the target CQ gets a lower share": {
+ clusterQueues: baseCQs,
admitted: []kueue.Workload{
*utiltesting.MakeWorkload("b1", "").Request(corev1.ResourceCPU, "9").SimpleReserveQuota("b", "default", now).Obj(),
},
@@ -1618,6 +1630,7 @@ func TestFairPreemptions(t *testing.T) {
wantPreempted: sets.New("/b1"),
},
"can't preempt huge workload if the incoming is also huge": {
+ clusterQueues: baseCQs,
admitted: []kueue.Workload{
*utiltesting.MakeWorkload("a1", "").Request(corev1.ResourceCPU, "2").SimpleReserveQuota("a", "default", now).Obj(),
*utiltesting.MakeWorkload("b1", "").Request(corev1.ResourceCPU, "7").SimpleReserveQuota("b", "default", now).Obj(),
@@ -1626,6 +1639,7 @@ func TestFairPreemptions(t *testing.T) {
targetCQ: "a",
},
"can't preempt 2 smaller workloads if the incoming is huge": {
+ clusterQueues: baseCQs,
admitted: []kueue.Workload{
*utiltesting.MakeWorkload("b1", "").Request(corev1.ResourceCPU, "2").SimpleReserveQuota("b", "default", now).Obj(),
*utiltesting.MakeWorkload("b2", "").Request(corev1.ResourceCPU, "2").SimpleReserveQuota("b", "default", now).Obj(),
@@ -1635,9 +1649,11 @@ func TestFairPreemptions(t *testing.T) {
targetCQ: "a",
},
"preempt from target and others even if over nominal": {
+ clusterQueues: baseCQs,
+ strategies: []config.PreemptionStrategy{config.LessThanOrEqualToFinalShare, config.LessThanInitialShare},
admitted: []kueue.Workload{
- *utiltesting.MakeWorkload("a1_low", "").Priority(-1).Request(corev1.ResourceCPU, "2").SimpleReserveQuota("b", "default", now).Obj(),
- *utiltesting.MakeWorkload("a2_low", "").Priority(-1).Request(corev1.ResourceCPU, "1").SimpleReserveQuota("b", "default", now).Obj(),
+ *utiltesting.MakeWorkload("a1_low", "").Priority(-1).Request(corev1.ResourceCPU, "2").SimpleReserveQuota("a", "default", now).Obj(),
+ *utiltesting.MakeWorkload("a2_low", "").Priority(-1).Request(corev1.ResourceCPU, "1").SimpleReserveQuota("a", "default", now).Obj(),
*utiltesting.MakeWorkload("b1", "").Request(corev1.ResourceCPU, "3").SimpleReserveQuota("b", "default", now).Obj(),
*utiltesting.MakeWorkload("b2", "").Request(corev1.ResourceCPU, "3").SimpleReserveQuota("b", "default", now).Obj(),
},
@@ -1646,6 +1662,8 @@ func TestFairPreemptions(t *testing.T) {
wantPreempted: sets.New("/a1_low", "/b1"),
},
"prefer to preempt workloads that don't make the target CQ have the biggest share": {
+ clusterQueues: baseCQs,
+ strategies: []config.PreemptionStrategy{config.LessThanOrEqualToFinalShare, config.LessThanInitialShare},
admitted: []kueue.Workload{
*utiltesting.MakeWorkload("b1", "").Request(corev1.ResourceCPU, "2").SimpleReserveQuota("b", "default", now).Obj(),
*utiltesting.MakeWorkload("b2", "").Request(corev1.ResourceCPU, "1").SimpleReserveQuota("b", "default", now).Obj(),
@@ -1658,6 +1676,8 @@ func TestFairPreemptions(t *testing.T) {
wantPreempted: sets.New("/b2"),
},
"preempt from different cluster queues if the end result has a smaller max share": {
+ clusterQueues: baseCQs,
+ strategies: []config.PreemptionStrategy{config.LessThanOrEqualToFinalShare, config.LessThanInitialShare},
admitted: []kueue.Workload{
*utiltesting.MakeWorkload("b1", "").Request(corev1.ResourceCPU, "2").SimpleReserveQuota("b", "default", now).Obj(),
*utiltesting.MakeWorkload("b2", "").Request(corev1.ResourceCPU, "2.5").SimpleReserveQuota("b", "default", now).Obj(),
@@ -1669,6 +1689,8 @@ func TestFairPreemptions(t *testing.T) {
wantPreempted: sets.New("/b1", "/c1"),
},
"scenario above does not flap": {
+ clusterQueues: baseCQs,
+ strategies: []config.PreemptionStrategy{config.LessThanOrEqualToFinalShare, config.LessThanInitialShare},
admitted: []kueue.Workload{
*utiltesting.MakeWorkload("a1", "").Request(corev1.ResourceCPU, "3.5").SimpleReserveQuota("a", "default", now).Obj(),
*utiltesting.MakeWorkload("b2", "").Request(corev1.ResourceCPU, "2.5").SimpleReserveQuota("b", "default", now).Obj(),
@@ -1678,6 +1700,8 @@ func TestFairPreemptions(t *testing.T) {
targetCQ: "b",
},
"cannot preempt if it would make the candidate CQ go under nominal after preempting one element": {
+ clusterQueues: baseCQs,
+ strategies: []config.PreemptionStrategy{config.LessThanOrEqualToFinalShare, config.LessThanInitialShare},
admitted: []kueue.Workload{
*utiltesting.MakeWorkload("b1", "").Request(corev1.ResourceCPU, "3").SimpleReserveQuota("b", "default", now).Obj(),
*utiltesting.MakeWorkload("b2", "").Request(corev1.ResourceCPU, "3").SimpleReserveQuota("b", "default", now).Obj(),
@@ -1687,6 +1711,7 @@ func TestFairPreemptions(t *testing.T) {
targetCQ: "a",
},
"workloads under priority threshold can always be preempted": {
+ clusterQueues: baseCQs,
admitted: []kueue.Workload{
*unitWl.Clone().Name("a1").SimpleReserveQuota("a", "default", now).Obj(),
*unitWl.Clone().Name("a2").SimpleReserveQuota("a", "default", now).Obj(),
@@ -1702,6 +1727,30 @@ func TestFairPreemptions(t *testing.T) {
targetCQ: "a",
wantPreempted: sets.New("/preemptible1", "/preemptible2"),
},
+ "preempt lower priority first, even if big": {
+ clusterQueues: baseCQs,
+ strategies: []config.PreemptionStrategy{config.LessThanInitialShare},
+ admitted: []kueue.Workload{
+ *utiltesting.MakeWorkload("a1", "").Request(corev1.ResourceCPU, "3").SimpleReserveQuota("a", "default", now).Obj(),
+ *utiltesting.MakeWorkload("b_low", "").Priority(0).Request(corev1.ResourceCPU, "5").SimpleReserveQuota("b", "default", now).Obj(),
+ *utiltesting.MakeWorkload("b_high", "").Priority(1).Request(corev1.ResourceCPU, "1").SimpleReserveQuota("b", "default", now).Obj(),
+ },
+ incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "1").Obj(),
+ targetCQ: "a",
+ wantPreempted: sets.New("/b_low"),
+ },
+ "preempt workload that doesn't transfer the imbalance, even if high priority": {
+ clusterQueues: baseCQs,
+ strategies: []config.PreemptionStrategy{config.LessThanOrEqualToFinalShare},
+ admitted: []kueue.Workload{
+ *utiltesting.MakeWorkload("a1", "").Request(corev1.ResourceCPU, "3").SimpleReserveQuota("a", "default", now).Obj(),
+ *utiltesting.MakeWorkload("b_low", "").Priority(0).Request(corev1.ResourceCPU, "5").SimpleReserveQuota("b", "default", now).Obj(),
+ *utiltesting.MakeWorkload("b_high", "").Priority(1).Request(corev1.ResourceCPU, "1").SimpleReserveQuota("b", "default", now).Obj(),
+ },
+ incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "1").Obj(),
+ targetCQ: "a",
+ wantPreempted: sets.New("/b_high"),
+ },
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
@@ -1717,7 +1766,7 @@ func TestFairPreemptions(t *testing.T) {
for _, flv := range flavors {
cqCache.AddOrUpdateResourceFlavor(flv)
}
- for _, cq := range clusterQueues {
+ for _, cq := range tc.clusterQueues {
if err := cqCache.AddClusterQueue(ctx, cq); err != nil {
t.Fatalf("Couldn't add ClusterQueue to cache: %v", err)
}
@@ -1726,7 +1775,10 @@ func TestFairPreemptions(t *testing.T) {
broadcaster := record.NewBroadcaster()
scheme := runtime.NewScheme()
recorder := broadcaster.NewRecorder(scheme, corev1.EventSource{Component: constants.AdmissionName})
- preemptor := New(cl, workload.Ordering{}, recorder, true)
+ preemptor := New(cl, workload.Ordering{}, recorder, config.FairSharing{
+ Enable: true,
+ PreemptionStrategies: tc.strategies,
+ })
snapshot := cqCache.Snapshot()
wlInfo := workload.NewInfo(tc.incoming)
diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go
index d14dbc38e0..8b112ee526 100644
--- a/pkg/scheduler/scheduler.go
+++ b/pkg/scheduler/scheduler.go
@@ -59,8 +59,6 @@ const (
)
type Scheduler struct {
- enableFairSharing bool
-
queues *queue.Manager
cache *cache.Cache
client client.Client
@@ -68,6 +66,7 @@ type Scheduler struct {
admissionRoutineWrapper routine.Wrapper
preemptor *preemption.Preemptor
workloadOrdering workload.Ordering
+ fairSharing config.FairSharing
// Stubs.
applyAdmission func(context.Context, *kueue.Workload) error
@@ -75,7 +74,7 @@ type Scheduler struct {
type options struct {
podsReadyRequeuingTimestamp config.RequeuingTimestamp
- enableFairSharing bool
+ fairSharing config.FairSharing
}
// Option configures the reconciler.
@@ -93,9 +92,11 @@ func WithPodsReadyRequeuingTimestamp(ts config.RequeuingTimestamp) Option {
}
}
-func WithFairSharing(enable bool) Option {
+func WithFairSharing(fs *config.FairSharing) Option {
return func(o *options) {
- o.enableFairSharing = enable
+ if fs != nil {
+ o.fairSharing = *fs
+ }
}
}
@@ -108,12 +109,12 @@ func New(queues *queue.Manager, cache *cache.Cache, cl client.Client, recorder r
PodsReadyRequeuingTimestamp: options.podsReadyRequeuingTimestamp,
}
s := &Scheduler{
- enableFairSharing: options.enableFairSharing,
+ fairSharing: options.fairSharing,
queues: queues,
cache: cache,
client: cl,
recorder: recorder,
- preemptor: preemption.New(cl, wo, recorder, options.enableFairSharing),
+ preemptor: preemption.New(cl, wo, recorder, options.fairSharing),
admissionRoutineWrapper: routine.DefaultWrapper,
workloadOrdering: wo,
}
@@ -201,7 +202,7 @@ func (s *Scheduler) schedule(ctx context.Context) {
// 4. Sort entries based on borrowing, priorities (if enabled) and timestamps.
sort.Sort(entryOrdering{
- enableFairSharing: s.enableFairSharing,
+ enableFairSharing: s.fairSharing.Enable,
entries: entries,
workloadOrdering: s.workloadOrdering,
})
@@ -356,7 +357,7 @@ func (s *Scheduler) nominate(ctx context.Context, workloads []workload.Info, sna
e.assignment, e.preemptionTargets = s.getAssignments(log, &e.Info, &snap)
e.inadmissibleMsg = e.assignment.Message()
e.Info.LastAssignment = &e.assignment.LastState
- if s.enableFairSharing {
+ if s.fairSharing.Enable {
e.dominantResourceShare, e.dominantResourceName = cq.DominantResourceShareWith(e.assignment.TotalRequestsFor(&w))
}
}
@@ -404,7 +405,7 @@ type partialAssignment struct {
func (s *Scheduler) getAssignments(log logr.Logger, wl *workload.Info, snap *cache.Snapshot) (flavorassigner.Assignment, []*workload.Info) {
cq := snap.ClusterQueues[wl.ClusterQueue]
- flvAssigner := flavorassigner.New(wl, cq, snap.ResourceFlavors, s.enableFairSharing)
+ flvAssigner := flavorassigner.New(wl, cq, snap.ResourceFlavors, s.fairSharing.Enable)
fullAssignment := flvAssigner.Assign(log, nil)
var faPreemtionTargets []*workload.Info
diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go
index f7e79ea8ef..c0489d347c 100644
--- a/pkg/scheduler/scheduler_test.go
+++ b/pkg/scheduler/scheduler_test.go
@@ -1697,7 +1697,7 @@ func TestSchedule(t *testing.T) {
t.Errorf("couldn't create the cluster queue: %v", err)
}
}
- scheduler := New(qManager, cqCache, cl, recorder, WithFairSharing(tc.enableFairSharing))
+ scheduler := New(qManager, cqCache, cl, recorder, WithFairSharing(&config.FairSharing{Enable: tc.enableFairSharing}))
gotScheduled := make(map[string]kueue.Admission)
var mu sync.Mutex
scheduler.applyAdmission = func(ctx context.Context, w *kueue.Workload) error {
From 6ca85067c8219f766be4ee2399803267dd0c1b1d Mon Sep 17 00:00:00 2001
From: Aldo Culquicondor
Date: Fri, 3 May 2024 16:55:18 +0000
Subject: [PATCH 04/10] Implement fair sharing weight
Change-Id: I8b49d21f0b0e7a7d2607d9589ba49a4a914cd2aa
---
pkg/cache/cache_test.go | 66 +++++++++-
pkg/cache/clusterqueue.go | 13 ++
pkg/cache/clusterqueue_test.go | 100 ++++++++++++++++
pkg/cache/snapshot.go | 1 +
pkg/cache/snapshot_test.go | 48 ++++++++
pkg/scheduler/preemption/preemption_test.go | 126 ++++++++++++++++++++
pkg/util/testing/wrappers.go | 8 ++
7 files changed, 359 insertions(+), 3 deletions(-)
diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go
index 77c85dfadb..dc70de7719 100644
--- a/pkg/cache/cache_test.go
+++ b/pkg/cache/cache_test.go
@@ -132,6 +132,7 @@ func TestCacheClusterQueueOperations(t *testing.T) {
},
Status: active,
Preemption: defaultPreemption,
+ FairWeight: oneQuantity,
},
"b": {
Name: "b",
@@ -161,6 +162,7 @@ func TestCacheClusterQueueOperations(t *testing.T) {
},
Status: active,
Preemption: defaultPreemption,
+ FairWeight: oneQuantity,
},
"c": {
Name: "c",
@@ -171,6 +173,7 @@ func TestCacheClusterQueueOperations(t *testing.T) {
Usage: FlavorResourceQuantities{},
Status: active,
Preemption: defaultPreemption,
+ FairWeight: oneQuantity,
},
"d": {
Name: "d",
@@ -181,6 +184,7 @@ func TestCacheClusterQueueOperations(t *testing.T) {
Usage: FlavorResourceQuantities{},
Status: active,
Preemption: defaultPreemption,
+ FairWeight: oneQuantity,
},
"e": {
Name: "e",
@@ -209,6 +213,7 @@ func TestCacheClusterQueueOperations(t *testing.T) {
},
Status: pending,
Preemption: defaultPreemption,
+ FairWeight: oneQuantity,
},
"f": {
Name: "f",
@@ -222,6 +227,7 @@ func TestCacheClusterQueueOperations(t *testing.T) {
WhenCanBorrow: kueue.TryNextFlavor,
WhenCanPreempt: kueue.TryNextFlavor,
},
+ FairWeight: oneQuantity,
},
},
wantCohorts: map[string]sets.Set[string]{
@@ -252,6 +258,28 @@ func TestCacheClusterQueueOperations(t *testing.T) {
ReclaimWithinCohort: kueue.PreemptionPolicyLowerPriority,
WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
},
+ FairWeight: oneQuantity,
+ },
+ },
+ },
+ {
+ name: "add ClusterQueue with fair sharing weight",
+ operation: func(cache *Cache) error {
+ cq := utiltesting.MakeClusterQueue("foo").FairWeight(resource.MustParse("2")).Obj()
+ if err := cache.AddClusterQueue(context.Background(), cq); err != nil {
+ return fmt.Errorf("Failed to add ClusterQueue: %w", err)
+ }
+ return nil
+ },
+ wantClusterQueues: map[string]*ClusterQueue{
+ "foo": {
+ Name: "foo",
+ AllocatableResourceGeneration: 1,
+ NamespaceSelector: labels.Everything(),
+ Status: active,
+ FlavorFungibility: defaultFlavorFungibility,
+ Preemption: defaultPreemption,
+ FairWeight: resource.MustParse("2"),
},
},
},
@@ -299,6 +327,7 @@ func TestCacheClusterQueueOperations(t *testing.T) {
},
Status: active,
Preemption: defaultPreemption,
+ FairWeight: oneQuantity,
},
"b": {
Name: "b",
@@ -328,6 +357,7 @@ func TestCacheClusterQueueOperations(t *testing.T) {
},
Status: active,
Preemption: defaultPreemption,
+ FairWeight: oneQuantity,
},
"c": {
Name: "c",
@@ -338,6 +368,7 @@ func TestCacheClusterQueueOperations(t *testing.T) {
Usage: FlavorResourceQuantities{},
Status: active,
Preemption: defaultPreemption,
+ FairWeight: oneQuantity,
},
"d": {
Name: "d",
@@ -348,6 +379,7 @@ func TestCacheClusterQueueOperations(t *testing.T) {
Usage: FlavorResourceQuantities{},
Status: active,
Preemption: defaultPreemption,
+ FairWeight: oneQuantity,
},
"e": {
Name: "e",
@@ -378,6 +410,7 @@ func TestCacheClusterQueueOperations(t *testing.T) {
},
Status: pending,
Preemption: defaultPreemption,
+ FairWeight: oneQuantity,
},
"f": {
Name: "f",
@@ -391,6 +424,7 @@ func TestCacheClusterQueueOperations(t *testing.T) {
WhenCanBorrow: kueue.TryNextFlavor,
WhenCanPreempt: kueue.TryNextFlavor,
},
+ FairWeight: oneQuantity,
},
},
wantCohorts: map[string]sets.Set[string]{
@@ -465,6 +499,7 @@ func TestCacheClusterQueueOperations(t *testing.T) {
},
Status: active,
Preemption: defaultPreemption,
+ FairWeight: oneQuantity,
},
"b": {
Name: "b",
@@ -475,6 +510,7 @@ func TestCacheClusterQueueOperations(t *testing.T) {
Usage: FlavorResourceQuantities{},
Status: active,
Preemption: defaultPreemption,
+ FairWeight: oneQuantity,
},
"c": {
Name: "c",
@@ -485,6 +521,7 @@ func TestCacheClusterQueueOperations(t *testing.T) {
Usage: FlavorResourceQuantities{},
Status: active,
Preemption: defaultPreemption,
+ FairWeight: oneQuantity,
},
"d": {
Name: "d",
@@ -495,6 +532,7 @@ func TestCacheClusterQueueOperations(t *testing.T) {
Usage: FlavorResourceQuantities{},
Status: active,
Preemption: defaultPreemption,
+ FairWeight: oneQuantity,
},
"e": {
Name: "e",
@@ -531,6 +569,7 @@ func TestCacheClusterQueueOperations(t *testing.T) {
},
Status: active,
Preemption: defaultPreemption,
+ FairWeight: oneQuantity,
},
"f": {
Name: "f",
@@ -544,6 +583,7 @@ func TestCacheClusterQueueOperations(t *testing.T) {
WhenCanBorrow: kueue.TryNextFlavor,
WhenCanPreempt: kueue.TryNextFlavor,
},
+ FairWeight: oneQuantity,
},
},
wantCohorts: map[string]sets.Set[string]{
@@ -595,6 +635,7 @@ func TestCacheClusterQueueOperations(t *testing.T) {
},
Status: active,
Preemption: defaultPreemption,
+ FairWeight: oneQuantity,
},
"c": {
Name: "c",
@@ -605,6 +646,7 @@ func TestCacheClusterQueueOperations(t *testing.T) {
Usage: FlavorResourceQuantities{},
Status: active,
Preemption: defaultPreemption,
+ FairWeight: oneQuantity,
},
"e": {
Name: "e",
@@ -635,6 +677,7 @@ func TestCacheClusterQueueOperations(t *testing.T) {
},
Status: pending,
Preemption: defaultPreemption,
+ FairWeight: oneQuantity,
},
"f": {
Name: "f",
@@ -648,6 +691,7 @@ func TestCacheClusterQueueOperations(t *testing.T) {
WhenCanBorrow: kueue.TryNextFlavor,
WhenCanPreempt: kueue.TryNextFlavor,
},
+ FairWeight: oneQuantity,
},
},
wantCohorts: map[string]sets.Set[string]{
@@ -697,6 +741,7 @@ func TestCacheClusterQueueOperations(t *testing.T) {
},
Status: active,
Preemption: defaultPreemption,
+ FairWeight: oneQuantity,
},
"b": {
Name: "b",
@@ -726,6 +771,7 @@ func TestCacheClusterQueueOperations(t *testing.T) {
},
Status: active,
Preemption: defaultPreemption,
+ FairWeight: oneQuantity,
},
"c": {
Name: "c",
@@ -736,6 +782,7 @@ func TestCacheClusterQueueOperations(t *testing.T) {
Usage: FlavorResourceQuantities{},
Status: active,
Preemption: defaultPreemption,
+ FairWeight: oneQuantity,
},
"d": {
Name: "d",
@@ -746,6 +793,7 @@ func TestCacheClusterQueueOperations(t *testing.T) {
Usage: FlavorResourceQuantities{},
Status: active,
Preemption: defaultPreemption,
+ FairWeight: oneQuantity,
},
"e": {
Name: "e",
@@ -774,6 +822,7 @@ func TestCacheClusterQueueOperations(t *testing.T) {
},
Status: active,
Preemption: defaultPreemption,
+ FairWeight: oneQuantity,
},
"f": {
Name: "f",
@@ -787,6 +836,7 @@ func TestCacheClusterQueueOperations(t *testing.T) {
WhenCanBorrow: kueue.TryNextFlavor,
WhenCanPreempt: kueue.TryNextFlavor,
},
+ FairWeight: oneQuantity,
},
},
wantCohorts: map[string]sets.Set[string]{
@@ -902,6 +952,7 @@ func TestCacheClusterQueueOperations(t *testing.T) {
},
Status: pending,
Preemption: defaultPreemption,
+ FairWeight: oneQuantity,
},
},
},
@@ -929,6 +980,7 @@ func TestCacheClusterQueueOperations(t *testing.T) {
"check1": sets.New[kueue.ResourceFlavorReference](),
"check2": sets.New[kueue.ResourceFlavorReference](),
},
+ FairWeight: oneQuantity,
},
},
wantCohorts: map[string]sets.Set[string]{},
@@ -959,7 +1011,9 @@ func TestCacheClusterQueueOperations(t *testing.T) {
AdmissionChecks: map[string]sets.Set[kueue.ResourceFlavorReference]{
"check1": sets.New[kueue.ResourceFlavorReference](),
"check2": sets.New[kueue.ResourceFlavorReference](),
- }},
+ },
+ FairWeight: oneQuantity,
+ },
},
wantCohorts: map[string]sets.Set[string]{},
},
@@ -990,7 +1044,9 @@ func TestCacheClusterQueueOperations(t *testing.T) {
AdmissionChecks: map[string]sets.Set[kueue.ResourceFlavorReference]{
"check1": sets.New[kueue.ResourceFlavorReference](),
"check2": sets.New[kueue.ResourceFlavorReference](),
- }},
+ },
+ FairWeight: oneQuantity,
+ },
},
wantCohorts: map[string]sets.Set[string]{},
},
@@ -1021,7 +1077,9 @@ func TestCacheClusterQueueOperations(t *testing.T) {
AdmissionChecks: map[string]sets.Set[kueue.ResourceFlavorReference]{
"check1": sets.New[kueue.ResourceFlavorReference](),
"check2": sets.New[kueue.ResourceFlavorReference](),
- }},
+ },
+ FairWeight: oneQuantity,
+ },
},
wantCohorts: map[string]sets.Set[string]{},
},
@@ -1072,6 +1130,7 @@ func TestCacheClusterQueueOperations(t *testing.T) {
Lendable: map[corev1.ResourceName]int64{
corev1.ResourceCPU: 10_000,
},
+ FairWeight: oneQuantity,
Workloads: map[string]*workload.Info{
"ns/reserving": {
ClusterQueue: "cq1",
@@ -1164,6 +1223,7 @@ func TestCacheClusterQueueOperations(t *testing.T) {
Preemption: defaultPreemption,
AllocatableResourceGeneration: 1,
FlavorFungibility: defaultFlavorFungibility,
+ FairWeight: oneQuantity,
GuaranteedQuota: FlavorResourceQuantities{
"on-demand": {
corev1.ResourceCPU: 2_000,
diff --git a/pkg/cache/clusterqueue.go b/pkg/cache/clusterqueue.go
index 341465a7fa..4647e2cc16 100644
--- a/pkg/cache/clusterqueue.go
+++ b/pkg/cache/clusterqueue.go
@@ -19,11 +19,13 @@ package cache
import (
"errors"
"fmt"
+ "math"
"strings"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apimeta "k8s.io/apimachinery/pkg/api/meta"
+ "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
@@ -38,6 +40,7 @@ import (
var (
errQueueAlreadyExists = errors.New("queue already exists")
+ oneQuantity = resource.MustParse("1")
)
// ClusterQueue is the internal implementation of kueue.ClusterQueue that
@@ -52,6 +55,7 @@ type ClusterQueue struct {
WorkloadsNotReady sets.Set[string]
NamespaceSelector labels.Selector
Preemption kueue.ClusterQueuePreemption
+ FairWeight resource.Quantity
FlavorFungibility kueue.FlavorFungibility
// Aggregates AdmissionChecks from both .spec.AdmissionChecks and .spec.AdmissionCheckStrategy
// Sets hold ResourceFlavors to which an AdmissionCheck should apply.
@@ -218,6 +222,11 @@ func (c *ClusterQueue) update(in *kueue.ClusterQueue, resourceFlavors map[kueue.
c.FlavorFungibility = defaultFlavorFungibility
}
+ c.FairWeight = oneQuantity
+ if fs := in.Spec.FairSharing; fs != nil && fs.Weight != nil {
+ c.FairWeight = *fs.Weight
+ }
+
if features.Enabled(features.LendingLimit) {
var guaranteedQuota FlavorResourceQuantities
for _, rg := range c.ResourceGroups {
@@ -677,6 +686,9 @@ func (c *ClusterQueue) dominantResourceShare(wlReq FlavorResourceQuantities, m i
if c.Cohort == nil {
return 0, ""
}
+ if c.FairWeight.Cmp(resource.Quantity{}) == 0 {
+ return math.MaxInt, ""
+ }
borrowing := make(map[corev1.ResourceName]int64)
for _, rg := range c.ResourceGroups {
@@ -705,5 +717,6 @@ func (c *ClusterQueue) dominantResourceShare(wlReq FlavorResourceQuantities, m i
}
}
}
+ drs = drs * 1000 / c.FairWeight.MilliValue()
return int(drs), dRes
}
diff --git a/pkg/cache/clusterqueue_test.go b/pkg/cache/clusterqueue_test.go
index 427cfa4b25..4973111d1e 100644
--- a/pkg/cache/clusterqueue_test.go
+++ b/pkg/cache/clusterqueue_test.go
@@ -17,10 +17,12 @@ limitations under the License.
package cache
import (
+ "math"
"testing"
"github.com/google/go-cmp/cmp"
corev1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
@@ -787,6 +789,7 @@ func TestDominantResourceShare(t *testing.T) {
}{
"no cohort": {
cq: ClusterQueue{
+ FairWeight: oneQuantity,
Usage: FlavorResourceQuantities{
"default": {
corev1.ResourceCPU: 1_000,
@@ -814,6 +817,7 @@ func TestDominantResourceShare(t *testing.T) {
},
"usage below nominal": {
cq: ClusterQueue{
+ FairWeight: oneQuantity,
Usage: FlavorResourceQuantities{
"default": {
corev1.ResourceCPU: 1_000,
@@ -847,6 +851,7 @@ func TestDominantResourceShare(t *testing.T) {
},
"usage above nominal": {
cq: ClusterQueue{
+ FairWeight: oneQuantity,
Usage: FlavorResourceQuantities{
"default": {
corev1.ResourceCPU: 3_000,
@@ -882,6 +887,7 @@ func TestDominantResourceShare(t *testing.T) {
},
"one resource above nominal": {
cq: ClusterQueue{
+ FairWeight: oneQuantity,
Usage: FlavorResourceQuantities{
"default": {
corev1.ResourceCPU: 3_000,
@@ -917,6 +923,7 @@ func TestDominantResourceShare(t *testing.T) {
},
"usage with workload above nominal": {
cq: ClusterQueue{
+ FairWeight: oneQuantity,
Usage: FlavorResourceQuantities{
"default": {
corev1.ResourceCPU: 1_000,
@@ -958,6 +965,7 @@ func TestDominantResourceShare(t *testing.T) {
},
"A resource with zero lendable": {
cq: ClusterQueue{
+ FairWeight: oneQuantity,
Usage: FlavorResourceQuantities{
"default": {
corev1.ResourceCPU: 1_000,
@@ -1000,6 +1008,7 @@ func TestDominantResourceShare(t *testing.T) {
},
"multiple flavors": {
cq: ClusterQueue{
+ FairWeight: oneQuantity,
Usage: FlavorResourceQuantities{
"on-demand": {
corev1.ResourceCPU: 15_000,
@@ -1044,6 +1053,97 @@ func TestDominantResourceShare(t *testing.T) {
wantDRName: corev1.ResourceCPU,
wantDRValue: 25, // ((15+10-20)+0)*1000/200 (spot under nominal)
},
+ "above nominal with integer weight": {
+ cq: ClusterQueue{
+ FairWeight: resource.MustParse("2"),
+ Usage: FlavorResourceQuantities{
+ "default": {
+ "example.com/gpu": 7,
+ },
+ },
+ ResourceGroups: []ResourceGroup{
+ {
+ Flavors: []FlavorQuotas{
+ {
+ Name: "default",
+ Resources: map[corev1.ResourceName]*ResourceQuota{
+ "example.com/gpu": {
+ Nominal: 5,
+ },
+ },
+ },
+ },
+ },
+ },
+ Cohort: &Cohort{
+ Lendable: map[corev1.ResourceName]int64{
+ "example.com/gpu": 10,
+ },
+ },
+ },
+ wantDRName: "example.com/gpu",
+ wantDRValue: 100, // ((7-5)*1000/10)/2
+ },
+ "above nominal with decimal weight": {
+ cq: ClusterQueue{
+ FairWeight: resource.MustParse("0.5"),
+ Usage: FlavorResourceQuantities{
+ "default": {
+ "example.com/gpu": 7,
+ },
+ },
+ ResourceGroups: []ResourceGroup{
+ {
+ Flavors: []FlavorQuotas{
+ {
+ Name: "default",
+ Resources: map[corev1.ResourceName]*ResourceQuota{
+ "example.com/gpu": {
+ Nominal: 5,
+ },
+ },
+ },
+ },
+ },
+ },
+ Cohort: &Cohort{
+ Lendable: map[corev1.ResourceName]int64{
+ "example.com/gpu": 10,
+ },
+ },
+ },
+ wantDRName: "example.com/gpu",
+ wantDRValue: 400, // ((7-5)*1000/10)/(1/2)
+ },
+ "above nominal with zero weight": {
+ cq: ClusterQueue{
+ Usage: FlavorResourceQuantities{
+ "default": {
+ "example.com/gpu": 7,
+ },
+ },
+ ResourceGroups: []ResourceGroup{
+ {
+ Flavors: []FlavorQuotas{
+ {
+ Name: "default",
+ Resources: map[corev1.ResourceName]*ResourceQuota{
+ "example.com/gpu": {
+ Nominal: 5,
+ },
+ },
+ },
+ },
+ },
+ },
+ Cohort: &Cohort{
+ Lendable: map[corev1.ResourceName]int64{
+ "example.com/gpu": 10,
+ },
+ },
+ },
+ wantDRValue: math.MaxInt,
+ },
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
diff --git a/pkg/cache/snapshot.go b/pkg/cache/snapshot.go
index c7da94daaa..a83ade093d 100644
--- a/pkg/cache/snapshot.go
+++ b/pkg/cache/snapshot.go
@@ -133,6 +133,7 @@ func (c *ClusterQueue) snapshot() *ClusterQueue {
ResourceGroups: c.ResourceGroups, // Shallow copy is enough.
RGByResource: c.RGByResource, // Shallow copy is enough.
FlavorFungibility: c.FlavorFungibility,
+ FairWeight: c.FairWeight,
AllocatableResourceGeneration: c.AllocatableResourceGeneration,
Usage: make(FlavorResourceQuantities, len(c.Usage)),
Lendable: maps.Clone(c.Lendable),
diff --git a/pkg/cache/snapshot_test.go b/pkg/cache/snapshot_test.go
index 3a869f3aa5..0fc3c5bb33 100644
--- a/pkg/cache/snapshot_test.go
+++ b/pkg/cache/snapshot_test.go
@@ -23,6 +23,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
corev1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
@@ -76,6 +77,7 @@ func TestSnapshot(t *testing.T) {
ReserveQuota(&kueue.Admission{ClusterQueue: "a"}).Obj()),
},
Preemption: defaultPreemption,
+ FairWeight: oneQuantity,
},
"b": {
Name: "b",
@@ -89,6 +91,7 @@ func TestSnapshot(t *testing.T) {
ReserveQuota(&kueue.Admission{ClusterQueue: "b"}).Obj()),
},
Preemption: defaultPreemption,
+ FairWeight: oneQuantity,
},
},
},
@@ -273,6 +276,7 @@ func TestSnapshot(t *testing.T) {
Obj()),
},
Preemption: defaultPreemption,
+ FairWeight: oneQuantity,
NamespaceSelector: labels.Everything(),
Status: active,
},
@@ -340,6 +344,7 @@ func TestSnapshot(t *testing.T) {
Obj()),
},
Preemption: defaultPreemption,
+ FairWeight: oneQuantity,
NamespaceSelector: labels.Everything(),
Status: active,
},
@@ -367,6 +372,7 @@ func TestSnapshot(t *testing.T) {
corev1.ResourceCPU: 100_000,
},
Preemption: defaultPreemption,
+ FairWeight: oneQuantity,
NamespaceSelector: labels.Everything(),
Status: active,
},
@@ -400,6 +406,26 @@ func TestSnapshot(t *testing.T) {
ReclaimWithinCohort: kueue.PreemptionPolicyAny,
WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
},
+ FairWeight: oneQuantity,
+ },
+ },
+ },
+ },
+ "clusterQueue with fair sharing weight": {
+ cqs: []*kueue.ClusterQueue{
+ utiltesting.MakeClusterQueue("with-preemption").FairWeight(resource.MustParse("3")).Obj(),
+ },
+ wantSnapshot: Snapshot{
+ ClusterQueues: map[string]*ClusterQueue{
+ "with-preemption": {
+ Name: "with-preemption",
+ NamespaceSelector: labels.Everything(),
+ AllocatableResourceGeneration: 1,
+ Status: active,
+ Workloads: map[string]*workload.Info{},
+ FlavorFungibility: defaultFlavorFungibility,
+ Preemption: defaultPreemption,
+ FairWeight: resource.MustParse("3"),
},
},
},
@@ -508,6 +534,7 @@ func TestSnapshot(t *testing.T) {
},
},
FlavorFungibility: defaultFlavorFungibility,
+ FairWeight: oneQuantity,
Usage: FlavorResourceQuantities{
"arm": {corev1.ResourceCPU: 15_000},
"x86": {corev1.ResourceCPU: 10_000},
@@ -583,6 +610,7 @@ func TestSnapshot(t *testing.T) {
},
},
FlavorFungibility: defaultFlavorFungibility,
+ FairWeight: oneQuantity,
Usage: FlavorResourceQuantities{
"arm": {corev1.ResourceCPU: 0},
"x86": {corev1.ResourceCPU: 0},
@@ -750,6 +778,7 @@ func TestSnapshotAddRemoveWorkload(t *testing.T) {
Workloads: make(map[string]*workload.Info),
ResourceGroups: cqCache.clusterQueues["c1"].ResourceGroups,
FlavorFungibility: defaultFlavorFungibility,
+ FairWeight: oneQuantity,
AllocatableResourceGeneration: 1,
Usage: FlavorResourceQuantities{
"default": {corev1.ResourceCPU: 0},
@@ -767,6 +796,7 @@ func TestSnapshotAddRemoveWorkload(t *testing.T) {
Workloads: make(map[string]*workload.Info),
ResourceGroups: cqCache.clusterQueues["c2"].ResourceGroups,
FlavorFungibility: defaultFlavorFungibility,
+ FairWeight: oneQuantity,
AllocatableResourceGeneration: 1,
Usage: FlavorResourceQuantities{
"default": {corev1.ResourceCPU: 0},
@@ -808,6 +838,7 @@ func TestSnapshotAddRemoveWorkload(t *testing.T) {
AllocatableResourceGeneration: 1,
ResourceGroups: cqCache.clusterQueues["c1"].ResourceGroups,
FlavorFungibility: defaultFlavorFungibility,
+ FairWeight: oneQuantity,
Usage: FlavorResourceQuantities{
"default": {corev1.ResourceCPU: 0},
"alpha": {corev1.ResourceMemory: utiltesting.Gi},
@@ -827,6 +858,7 @@ func TestSnapshotAddRemoveWorkload(t *testing.T) {
},
ResourceGroups: cqCache.clusterQueues["c2"].ResourceGroups,
FlavorFungibility: defaultFlavorFungibility,
+ FairWeight: oneQuantity,
AllocatableResourceGeneration: 1,
Usage: FlavorResourceQuantities{
"default": {corev1.ResourceCPU: 2_000},
@@ -868,6 +900,7 @@ func TestSnapshotAddRemoveWorkload(t *testing.T) {
AllocatableResourceGeneration: 1,
ResourceGroups: cqCache.clusterQueues["c1"].ResourceGroups,
FlavorFungibility: defaultFlavorFungibility,
+ FairWeight: oneQuantity,
Usage: FlavorResourceQuantities{
"default": {corev1.ResourceCPU: 1_000},
"alpha": {corev1.ResourceMemory: 0},
@@ -888,6 +921,7 @@ func TestSnapshotAddRemoveWorkload(t *testing.T) {
AllocatableResourceGeneration: 1,
ResourceGroups: cqCache.clusterQueues["c2"].ResourceGroups,
FlavorFungibility: defaultFlavorFungibility,
+ FairWeight: oneQuantity,
Usage: FlavorResourceQuantities{
"default": {corev1.ResourceCPU: 2_000},
},
@@ -1019,6 +1053,7 @@ func TestSnapshotAddRemoveWorkloadWithLendingLimit(t *testing.T) {
Workloads: make(map[string]*workload.Info),
ResourceGroups: cqCache.clusterQueues["lend-a"].ResourceGroups,
FlavorFungibility: defaultFlavorFungibility,
+ FairWeight: oneQuantity,
AllocatableResourceGeneration: 1,
Usage: FlavorResourceQuantities{
"default": {corev1.ResourceCPU: 0},
@@ -1038,6 +1073,7 @@ func TestSnapshotAddRemoveWorkloadWithLendingLimit(t *testing.T) {
Workloads: make(map[string]*workload.Info),
ResourceGroups: cqCache.clusterQueues["lend-b"].ResourceGroups,
FlavorFungibility: defaultFlavorFungibility,
+ FairWeight: oneQuantity,
AllocatableResourceGeneration: 1,
Usage: FlavorResourceQuantities{
"default": {corev1.ResourceCPU: 0},
@@ -1077,6 +1113,7 @@ func TestSnapshotAddRemoveWorkloadWithLendingLimit(t *testing.T) {
Workloads: make(map[string]*workload.Info),
ResourceGroups: cqCache.clusterQueues["lend-a"].ResourceGroups,
FlavorFungibility: defaultFlavorFungibility,
+ FairWeight: oneQuantity,
AllocatableResourceGeneration: 1,
Usage: FlavorResourceQuantities{
"default": {corev1.ResourceCPU: 7_000},
@@ -1096,6 +1133,7 @@ func TestSnapshotAddRemoveWorkloadWithLendingLimit(t *testing.T) {
Workloads: make(map[string]*workload.Info),
ResourceGroups: cqCache.clusterQueues["lend-b"].ResourceGroups,
FlavorFungibility: defaultFlavorFungibility,
+ FairWeight: oneQuantity,
AllocatableResourceGeneration: 1,
Usage: FlavorResourceQuantities{
"default": {corev1.ResourceCPU: 4_000},
@@ -1135,6 +1173,7 @@ func TestSnapshotAddRemoveWorkloadWithLendingLimit(t *testing.T) {
Workloads: make(map[string]*workload.Info),
ResourceGroups: cqCache.clusterQueues["lend-a"].ResourceGroups,
FlavorFungibility: defaultFlavorFungibility,
+ FairWeight: oneQuantity,
AllocatableResourceGeneration: 1,
Usage: FlavorResourceQuantities{
"default": {corev1.ResourceCPU: 6_000},
@@ -1154,6 +1193,7 @@ func TestSnapshotAddRemoveWorkloadWithLendingLimit(t *testing.T) {
Workloads: make(map[string]*workload.Info),
ResourceGroups: cqCache.clusterQueues["lend-b"].ResourceGroups,
FlavorFungibility: defaultFlavorFungibility,
+ FairWeight: oneQuantity,
AllocatableResourceGeneration: 1,
Usage: FlavorResourceQuantities{
"default": {corev1.ResourceCPU: 4_000},
@@ -1193,6 +1233,7 @@ func TestSnapshotAddRemoveWorkloadWithLendingLimit(t *testing.T) {
Workloads: make(map[string]*workload.Info),
ResourceGroups: cqCache.clusterQueues["lend-a"].ResourceGroups,
FlavorFungibility: defaultFlavorFungibility,
+ FairWeight: oneQuantity,
AllocatableResourceGeneration: 1,
Usage: FlavorResourceQuantities{
"default": {corev1.ResourceCPU: 1_000},
@@ -1212,6 +1253,7 @@ func TestSnapshotAddRemoveWorkloadWithLendingLimit(t *testing.T) {
Workloads: make(map[string]*workload.Info),
ResourceGroups: cqCache.clusterQueues["lend-b"].ResourceGroups,
FlavorFungibility: defaultFlavorFungibility,
+ FairWeight: oneQuantity,
AllocatableResourceGeneration: 1,
Usage: FlavorResourceQuantities{
"default": {corev1.ResourceCPU: 4_000},
@@ -1252,6 +1294,7 @@ func TestSnapshotAddRemoveWorkloadWithLendingLimit(t *testing.T) {
Workloads: make(map[string]*workload.Info),
ResourceGroups: cqCache.clusterQueues["lend-a"].ResourceGroups,
FlavorFungibility: defaultFlavorFungibility,
+ FairWeight: oneQuantity,
AllocatableResourceGeneration: 1,
Usage: FlavorResourceQuantities{
"default": {corev1.ResourceCPU: 1_000},
@@ -1271,6 +1314,7 @@ func TestSnapshotAddRemoveWorkloadWithLendingLimit(t *testing.T) {
Workloads: make(map[string]*workload.Info),
ResourceGroups: cqCache.clusterQueues["lend-b"].ResourceGroups,
FlavorFungibility: defaultFlavorFungibility,
+ FairWeight: oneQuantity,
AllocatableResourceGeneration: 1,
Usage: FlavorResourceQuantities{
"default": {corev1.ResourceCPU: 0},
@@ -1311,6 +1355,7 @@ func TestSnapshotAddRemoveWorkloadWithLendingLimit(t *testing.T) {
Workloads: make(map[string]*workload.Info),
ResourceGroups: cqCache.clusterQueues["lend-a"].ResourceGroups,
FlavorFungibility: defaultFlavorFungibility,
+ FairWeight: oneQuantity,
AllocatableResourceGeneration: 1,
Usage: FlavorResourceQuantities{
"default": {corev1.ResourceCPU: 6_000},
@@ -1330,6 +1375,7 @@ func TestSnapshotAddRemoveWorkloadWithLendingLimit(t *testing.T) {
Workloads: make(map[string]*workload.Info),
ResourceGroups: cqCache.clusterQueues["lend-b"].ResourceGroups,
FlavorFungibility: defaultFlavorFungibility,
+ FairWeight: oneQuantity,
AllocatableResourceGeneration: 1,
Usage: FlavorResourceQuantities{
"default": {corev1.ResourceCPU: 0},
@@ -1370,6 +1416,7 @@ func TestSnapshotAddRemoveWorkloadWithLendingLimit(t *testing.T) {
Workloads: make(map[string]*workload.Info),
ResourceGroups: cqCache.clusterQueues["lend-a"].ResourceGroups,
FlavorFungibility: defaultFlavorFungibility,
+ FairWeight: oneQuantity,
AllocatableResourceGeneration: 1,
Usage: FlavorResourceQuantities{
"default": {corev1.ResourceCPU: 9_000},
@@ -1389,6 +1436,7 @@ func TestSnapshotAddRemoveWorkloadWithLendingLimit(t *testing.T) {
Workloads: make(map[string]*workload.Info),
ResourceGroups: cqCache.clusterQueues["lend-b"].ResourceGroups,
FlavorFungibility: defaultFlavorFungibility,
+ FairWeight: oneQuantity,
AllocatableResourceGeneration: 1,
Usage: FlavorResourceQuantities{
"default": {corev1.ResourceCPU: 0},
diff --git a/pkg/scheduler/preemption/preemption_test.go b/pkg/scheduler/preemption/preemption_test.go
index 9b22b8f165..d425da9033 100644
--- a/pkg/scheduler/preemption/preemption_test.go
+++ b/pkg/scheduler/preemption/preemption_test.go
@@ -26,6 +26,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
corev1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
@@ -1751,6 +1752,131 @@ func TestFairPreemptions(t *testing.T) {
targetCQ: "a",
wantPreempted: sets.New("/b_high"),
},
+ "CQ with higher weight can preempt more": {
+ clusterQueues: []*kueue.ClusterQueue{
+ utiltesting.MakeClusterQueue("a").
+ Cohort("all").
+ ResourceGroup(*utiltesting.MakeFlavorQuotas("default").
+ Resource(corev1.ResourceCPU, "3").Obj()).
+ Preemption(kueue.ClusterQueuePreemption{
+ WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
+ ReclaimWithinCohort: kueue.PreemptionPolicyAny,
+ }).
+ FairWeight(resource.MustParse("2")).
+ Obj(),
+ utiltesting.MakeClusterQueue("b").
+ Cohort("all").
+ ResourceGroup(*utiltesting.MakeFlavorQuotas("default").
+ Resource(corev1.ResourceCPU, "3").Obj()).
+ Preemption(kueue.ClusterQueuePreemption{
+ WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
+ ReclaimWithinCohort: kueue.PreemptionPolicyAny,
+ }).
+ Obj(),
+ utiltesting.MakeClusterQueue("c").
+ Cohort("all").
+ ResourceGroup(*utiltesting.MakeFlavorQuotas("default").
+ Resource(corev1.ResourceCPU, "3").Obj()).
+ Preemption(kueue.ClusterQueuePreemption{
+ WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
+ ReclaimWithinCohort: kueue.PreemptionPolicyAny,
+ }).
+ Obj(),
+ },
+ admitted: []kueue.Workload{
+ *unitWl.Clone().Name("a1").SimpleReserveQuota("a", "default", now).Obj(),
+ *unitWl.Clone().Name("a2").SimpleReserveQuota("a", "default", now).Obj(),
+ *unitWl.Clone().Name("a3").SimpleReserveQuota("a", "default", now).Obj(),
+ *unitWl.Clone().Name("b1").SimpleReserveQuota("b", "default", now).Obj(),
+ *unitWl.Clone().Name("b2").SimpleReserveQuota("b", "default", now).Obj(),
+ *unitWl.Clone().Name("b3").SimpleReserveQuota("b", "default", now).Obj(),
+ *unitWl.Clone().Name("b4").SimpleReserveQuota("b", "default", now).Obj(),
+ *unitWl.Clone().Name("b5").SimpleReserveQuota("b", "default", now).Obj(),
+ *unitWl.Clone().Name("b6").SimpleReserveQuota("b", "default", now).Obj(),
+ },
+ incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "2").Obj(),
+ targetCQ: "a",
+ wantPreempted: sets.New("/b1", "/b2"),
+ },
+ "can preempt anything borrowing from CQ with 0 weight": {
+ clusterQueues: []*kueue.ClusterQueue{
+ utiltesting.MakeClusterQueue("a").
+ Cohort("all").
+ ResourceGroup(*utiltesting.MakeFlavorQuotas("default").
+ Resource(corev1.ResourceCPU, "3").Obj()).
+ Preemption(kueue.ClusterQueuePreemption{
+ WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
+ ReclaimWithinCohort: kueue.PreemptionPolicyAny,
+ }).
+ Obj(),
+ utiltesting.MakeClusterQueue("b").
+ Cohort("all").
+ ResourceGroup(*utiltesting.MakeFlavorQuotas("default").
+ Resource(corev1.ResourceCPU, "3").Obj()).
+ Preemption(kueue.ClusterQueuePreemption{
+ WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
+ ReclaimWithinCohort: kueue.PreemptionPolicyAny,
+ }).
+ FairWeight(resource.MustParse("0")).
+ Obj(),
+ utiltesting.MakeClusterQueue("c").
+ Cohort("all").
+ ResourceGroup(*utiltesting.MakeFlavorQuotas("default").
+ Resource(corev1.ResourceCPU, "3").Obj()).
+ Preemption(kueue.ClusterQueuePreemption{
+ WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
+ ReclaimWithinCohort: kueue.PreemptionPolicyAny,
+ }).
+ Obj(),
+ },
+ admitted: []kueue.Workload{
+ *unitWl.Clone().Name("a1").SimpleReserveQuota("a", "default", now).Obj(),
+ *unitWl.Clone().Name("a2").SimpleReserveQuota("a", "default", now).Obj(),
+ *unitWl.Clone().Name("a3").SimpleReserveQuota("a", "default", now).Obj(),
+ *unitWl.Clone().Name("b1").SimpleReserveQuota("b", "default", now).Obj(),
+ *unitWl.Clone().Name("b2").SimpleReserveQuota("b", "default", now).Obj(),
+ *unitWl.Clone().Name("b3").SimpleReserveQuota("b", "default", now).Obj(),
+ *unitWl.Clone().Name("b4").SimpleReserveQuota("b", "default", now).Obj(),
+ *unitWl.Clone().Name("b5").SimpleReserveQuota("b", "default", now).Obj(),
+ *unitWl.Clone().Name("b6").SimpleReserveQuota("b", "default", now).Obj(),
+ },
+ incoming: utiltesting.MakeWorkload("a_incoming", "").Request(corev1.ResourceCPU, "3").Obj(),
+ targetCQ: "a",
+ wantPreempted: sets.New("/b1", "/b2", "/b3"),
+ },
+ "can't preempt nominal from CQ with 0 weight": {
+ clusterQueues: []*kueue.ClusterQueue{
+ utiltesting.MakeClusterQueue("a").
+ Cohort("all").
+ ResourceGroup(*utiltesting.MakeFlavorQuotas("default").
+ Resource(corev1.ResourceCPU, "3").Obj()).
+ Preemption(kueue.ClusterQueuePreemption{
+ WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
+ ReclaimWithinCohort: kueue.PreemptionPolicyAny,
+ }).
+ Obj(),
+ utiltesting.MakeClusterQueue("b").
+ Cohort("all").
+ ResourceGroup(*utiltesting.MakeFlavorQuotas("default").
+ Resource(corev1.ResourceCPU, "3").Obj()).
+ Preemption(kueue.ClusterQueuePreemption{
+ WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
+ ReclaimWithinCohort: kueue.PreemptionPolicyAny,
+ }).
+ FairWeight(resource.MustParse("0")).
+ Obj(),
+ },
+ admitted: []kueue.Workload{
+ *unitWl.Clone().Name("a1").SimpleReserveQuota("a", "default", now).Obj(),
+ *unitWl.Clone().Name("a2").SimpleReserveQuota("a", "default", now).Obj(),
+ *unitWl.Clone().Name("a3").SimpleReserveQuota("a", "default", now).Obj(),
+ *unitWl.Clone().Name("b1").SimpleReserveQuota("b", "default", now).Obj(),
+ *unitWl.Clone().Name("b2").SimpleReserveQuota("b", "default", now).Obj(),
+ *unitWl.Clone().Name("b3").SimpleReserveQuota("b", "default", now).Obj(),
+ },
+ incoming: unitWl.Clone().Name("a_incoming").Obj(),
+ targetCQ: "a",
+ },
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go
index 6efa611e7e..5879ead54f 100644
--- a/pkg/util/testing/wrappers.go
+++ b/pkg/util/testing/wrappers.go
@@ -661,6 +661,14 @@ func (c *ClusterQueueWrapper) Label(k, v string) *ClusterQueueWrapper {
return c
}
+func (c *ClusterQueueWrapper) FairWeight(w resource.Quantity) *ClusterQueueWrapper {
+ if c.Spec.FairSharing == nil {
+ c.Spec.FairSharing = &kueue.FairSharing{}
+ }
+ c.Spec.FairSharing.Weight = ptr.To(w)
+ return c
+}
+
// Condition sets a condition on the ClusterQueue.
func (c *ClusterQueueWrapper) Condition(conditionType string, status metav1.ConditionStatus, reason, message string) *ClusterQueueWrapper {
apimeta.SetStatusCondition(&c.Status.Conditions, metav1.Condition{
From b0b7f7ed9c6821514e0a0517b7f33cf4a6b06366 Mon Sep 17 00:00:00 2001
From: Aldo Culquicondor
Date: Fri, 3 May 2024 17:10:05 +0000
Subject: [PATCH 05/10] Fix flaky integration
Change-Id: I68ba0c546ce27d9e6565a30584aacd67fd318ede
---
.../scheduler/fairsharing/fair_sharing_test.go | 8 ++++----
test/integration/scheduler/fairsharing/suite_test.go | 4 +++-
2 files changed, 7 insertions(+), 5 deletions(-)
diff --git a/test/integration/scheduler/fairsharing/fair_sharing_test.go b/test/integration/scheduler/fairsharing/fair_sharing_test.go
index b06877616f..c9a0814975 100644
--- a/test/integration/scheduler/fairsharing/fair_sharing_test.go
+++ b/test/integration/scheduler/fairsharing/fair_sharing_test.go
@@ -55,7 +55,7 @@ var _ = ginkgo.Describe("Scheduler", func() {
})
ginkgo.AfterEach(func() {
- gomega.Expect(util.DeleteResourceFlavor(ctx, k8sClient, defaultFlavor)).To(gomega.Succeed())
+ util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, defaultFlavor, true)
})
ginkgo.When("Preemption is disabled", func() {
@@ -95,9 +95,9 @@ var _ = ginkgo.Describe("Scheduler", func() {
})
ginkgo.AfterEach(func() {
gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed())
- gomega.Expect(util.DeleteClusterQueue(ctx, k8sClient, cqA)).To(gomega.Succeed())
- gomega.Expect(util.DeleteClusterQueue(ctx, k8sClient, cqB)).To(gomega.Succeed())
- gomega.Expect(util.DeleteClusterQueue(ctx, k8sClient, cqShared)).To(gomega.Succeed())
+ util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, cqA, true)
+ util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, cqB, true)
+ util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, cqShared, true)
})
ginkgo.It("Admits workloads respecting fair share", func() {
diff --git a/test/integration/scheduler/fairsharing/suite_test.go b/test/integration/scheduler/fairsharing/suite_test.go
index 4eb7a9efd7..daec8fb30e 100644
--- a/test/integration/scheduler/fairsharing/suite_test.go
+++ b/test/integration/scheduler/fairsharing/suite_test.go
@@ -88,7 +88,9 @@ func managerAndSchedulerSetup(mgr manager.Manager, ctx context.Context) {
gomega.Expect(err).NotTo(gomega.HaveOccurred())
sched := scheduler.New(queues, cCache, mgr.GetClient(), mgr.GetEventRecorderFor(constants.AdmissionName),
- scheduler.WithFairSharing(true))
+ scheduler.WithFairSharing(&config.FairSharing{
+ Enable: true,
+ }))
err = sched.Start(ctx)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}
From 6dce54c2af9c63507f6b9655832f4a90e7cdd4f4 Mon Sep 17 00:00:00 2001
From: Aldo Culquicondor
Date: Fri, 3 May 2024 20:26:51 +0000
Subject: [PATCH 06/10] review
Change-Id: I375166b8c7fdc300eeb43ede25fbcbe73298c7a2
---
pkg/cache/clusterqueue.go | 2 +-
pkg/scheduler/preemption/preemption.go | 4 +++-
2 files changed, 4 insertions(+), 2 deletions(-)
diff --git a/pkg/cache/clusterqueue.go b/pkg/cache/clusterqueue.go
index 4647e2cc16..d8e15f0bf7 100644
--- a/pkg/cache/clusterqueue.go
+++ b/pkg/cache/clusterqueue.go
@@ -686,7 +686,7 @@ func (c *ClusterQueue) dominantResourceShare(wlReq FlavorResourceQuantities, m i
if c.Cohort == nil {
return 0, ""
}
- if c.FairWeight.Cmp(resource.Quantity{}) == 0 {
+ if c.FairWeight.IsZero() {
return math.MaxInt, ""
}
diff --git a/pkg/scheduler/preemption/preemption.go b/pkg/scheduler/preemption/preemption.go
index 70c569320b..1b74d28d98 100644
--- a/pkg/scheduler/preemption/preemption.go
+++ b/pkg/scheduler/preemption/preemption.go
@@ -359,7 +359,9 @@ func (p *Preemptor) fairPreemptions(wl *workload.Info, assignment flavorassigner
for cqHeap.Len() > 0 && !fits {
candCQ := cqHeap.Pop()
- if p.fsStrategies[1](newNominatedShareValue, candCQ.share, 0 /* irrelevant */) {
+ // We can only reach here if the second strategy is LessThanInitialShare, in which
+ // case the last parameter for the strategy function is irrelevant.
+ if p.fsStrategies[1](newNominatedShareValue, candCQ.share, 0) {
// The criteria doesn't depend on the preempted workload, so just preempt the first candidate.
candWl := candCQ.workloads[0]
snapshot.RemoveWorkload(candWl)
From a93e785a5b2861d4f8496a3ff113a19c8e297989 Mon Sep 17 00:00:00 2001
From: Aldo Culquicondor
Date: Mon, 6 May 2024 19:19:08 +0000
Subject: [PATCH 07/10] Update documentation for strategies
Change-Id: I4d56c73f4a949fa5c2b2053fd83a7161553755b0
---
apis/config/v1beta1/configuration_types.go | 15 +++++++++++----
.../en/docs/reference/kueue-config.v1beta1.md | 15 +++++++++++----
2 files changed, 22 insertions(+), 8 deletions(-)
diff --git a/apis/config/v1beta1/configuration_types.go b/apis/config/v1beta1/configuration_types.go
index b9165e4a06..f0534fb8fd 100644
--- a/apis/config/v1beta1/configuration_types.go
+++ b/apis/config/v1beta1/configuration_types.go
@@ -372,10 +372,17 @@ type FairSharing struct {
// The preemption algorithm will only use the next strategy in the list if the
// incoming workload (preemptor) doesn't fit after using the previous strategies.
// Possible values are:
- // - LessThanOrEqualToFinalShare: Only preempt if the share of the preemptor CQ
- // will be less than or equal to the share of the preemptee CQ after the preemption occurs.
- // - LessThanInitialShare: Only preempt if the share of the preemptor CQ will be strictly less
- // than the share of the preemptee CQ before the preemption.
+ // - LessThanOrEqualToFinalShare: Only preempt a workload if the share of the preemptor CQ
+ // with the preemptor workload is less than or equal to the share of the preemptee CQ
+ // without the workload to be preempted.
+ // This strategy might favor preemption of smaller workloads in the preemptee CQ,
+ // regardless of priority or start time, in an effort to keep the share of the CQ
+ // as high as possible.
+ // - LessThanInitialShare: Only preempt a workload if the share of the preemptor CQ
+ // with the incoming workload is strictly less than the share of the preemptee CQ.
+ // This strategy doesn't depend on the share usage of the workload being preempted.
+ // As a result, the strategy chooses to preempt workloads with the lowest priority and
+ // newest start time first.
// The default strategy is ["LessThanOrEqualToFinalShare"].
PreemptionStrategies []PreemptionStrategy `json:"preemptionStrategies,omitempty"`
}
diff --git a/site/content/en/docs/reference/kueue-config.v1beta1.md b/site/content/en/docs/reference/kueue-config.v1beta1.md
index 4122ac8f51..17f715c16e 100644
--- a/site/content/en/docs/reference/kueue-config.v1beta1.md
+++ b/site/content/en/docs/reference/kueue-config.v1beta1.md
@@ -432,10 +432,17 @@ The preemption algorithm will only use the next strategy in the list if the
incoming workload (preemptor) doesn't fit after using the previous strategies.
Possible values are:
-- LessThanOrEqualToFinalShare: Only preempt if the share of the preemptor CQ
-will be less than or equal to the share of the preemptee CQ after the preemption occurs.
-- LessThanInitialShare: Only preempt if the share of the preemptor CQ will be strictly less
-than the share of the preemptee CQ before the preemption.
+
- LessThanOrEqualToFinalShare: Only preempt a workload if the share of the preemptor CQ
+with the preemptor workload is less than or equal to the share of the preemptee CQ
+without the workload to be preempted.
+This strategy might favor preemption of smaller workloads in the preemptee CQ,
+regardless of priority or start time, in an effort to keep the share of the CQ
+as high as possible.
+- LessThanInitialShare: Only preempt a workload if the share of the preemptor CQ
+with the incoming workload is strictly less than the share of the preemptee CQ.
+This strategy doesn't depend on the share usage of the workload being preempted.
+As a result, the strategy chooses to preempt workloads with the lowest priority and
+newest start time first.
The default strategy is ["LessThanOrEqualToFinalShare"].
From 1adb8e0b573083741f964c9b5a9eb160cd616d92 Mon Sep 17 00:00:00 2001
From: Aldo Culquicondor
Date: Tue, 7 May 2024 19:31:29 +0000
Subject: [PATCH 08/10] Change default configuration to match KEP
Change-Id: I6893b7854656601e80f157b60caea27e27dfe4ea
---
apis/config/v1beta1/configuration_types.go | 2 +-
apis/config/v1beta1/defaults.go | 2 +-
apis/config/v1beta1/defaults_test.go | 2 +-
pkg/config/validation.go | 45 ++++++++++++++-----
pkg/config/validation_test.go | 8 +---
pkg/scheduler/preemption/preemption.go | 24 +++++-----
pkg/scheduler/preemption/preemption_test.go | 5 ---
.../en/docs/reference/kueue-config.v1beta1.md | 2 +-
8 files changed, 53 insertions(+), 37 deletions(-)
diff --git a/apis/config/v1beta1/configuration_types.go b/apis/config/v1beta1/configuration_types.go
index f0534fb8fd..2ca5536a52 100644
--- a/apis/config/v1beta1/configuration_types.go
+++ b/apis/config/v1beta1/configuration_types.go
@@ -383,6 +383,6 @@ type FairSharing struct {
// This strategy doesn't depend on the share usage of the workload being preempted.
// As a result, the strategy chooses to preempt workloads with the lowest priority and
// newest start time first.
- // The default strategy is ["LessThanOrEqualToFinalShare"].
+ // The default strategy is ["LessThanOrEqualToFinalShare", "LessThanInitialShare"].
PreemptionStrategies []PreemptionStrategy `json:"preemptionStrategies,omitempty"`
}
diff --git a/apis/config/v1beta1/defaults.go b/apis/config/v1beta1/defaults.go
index fb8048b9bf..c8ae7ef637 100644
--- a/apis/config/v1beta1/defaults.go
+++ b/apis/config/v1beta1/defaults.go
@@ -185,6 +185,6 @@ func SetDefaults_Configuration(cfg *Configuration) {
cfg.MultiKueue.WorkerLostTimeout = &metav1.Duration{Duration: DefaultMultiKueueWorkerLostTimeout}
}
if fs := cfg.FairSharing; fs != nil && fs.Enable && len(fs.PreemptionStrategies) == 0 {
- fs.PreemptionStrategies = []PreemptionStrategy{LessThanOrEqualToFinalShare}
+ fs.PreemptionStrategies = []PreemptionStrategy{LessThanOrEqualToFinalShare, LessThanInitialShare}
}
}
diff --git a/apis/config/v1beta1/defaults_test.go b/apis/config/v1beta1/defaults_test.go
index f214c56263..908c2bc832 100644
--- a/apis/config/v1beta1/defaults_test.go
+++ b/apis/config/v1beta1/defaults_test.go
@@ -571,7 +571,7 @@ func TestSetDefaults_Configuration(t *testing.T) {
MultiKueue: defaultMultiKueue,
FairSharing: &FairSharing{
Enable: true,
- PreemptionStrategies: []PreemptionStrategy{LessThanOrEqualToFinalShare},
+ PreemptionStrategies: []PreemptionStrategy{LessThanOrEqualToFinalShare, LessThanInitialShare},
},
},
},
diff --git a/pkg/config/validation.go b/pkg/config/validation.go
index b4912c814c..2744560020 100644
--- a/pkg/config/validation.go
+++ b/pkg/config/validation.go
@@ -20,12 +20,12 @@ import (
"fmt"
"slices"
"strings"
+ "unsafe"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/validation"
"k8s.io/apimachinery/pkg/labels"
- "k8s.io/apimachinery/pkg/util/sets"
apimachineryvalidation "k8s.io/apimachinery/pkg/util/validation"
"k8s.io/apimachinery/pkg/util/validation/field"
@@ -177,6 +177,31 @@ func validatePodIntegrationOptions(c *configapi.Configuration) field.ErrorList {
return allErrs
}
+var (
+ validStrategySets = [][]configapi.PreemptionStrategy{
+ {
+ configapi.LessThanOrEqualToFinalShare,
+ },
+ {
+ configapi.LessThanInitialShare,
+ },
+ {
+ configapi.LessThanOrEqualToFinalShare,
+ configapi.LessThanInitialShare,
+ },
+ }
+
+ validStrategySetsStr = func() []string {
+ var ss []string
+ for _, s := range validStrategySets {
+ // Casting because strings.Join requires a slice of strings
+ strategies := *(*[]string)(unsafe.Pointer(&s))
+ ss = append(ss, strings.Join(strategies, ","))
+ }
+ return ss
+ }()
+)
+
func validateFairSharing(c *configapi.Configuration) field.ErrorList {
fs := c.FairSharing
if fs == nil {
@@ -186,17 +211,17 @@ func validateFairSharing(c *configapi.Configuration) field.ErrorList {
if !fs.Enable && len(fs.PreemptionStrategies) != 0 {
allErrs = append(allErrs, field.Invalid(fsPreemptionStrategiesPath, fs.PreemptionStrategies, "Must be empty when fair sharing is disabled"))
}
- strategies := sets.New[configapi.PreemptionStrategy]()
- validStrategies := []configapi.PreemptionStrategy{configapi.LessThanInitialShare, configapi.LessThanOrEqualToFinalShare}
- for i, s := range fs.PreemptionStrategies {
- path := fsPreemptionStrategiesPath.Index(i)
- if !slices.Contains(validStrategies, s) {
- allErrs = append(allErrs, field.NotSupported(path, s, validStrategies))
+ if len(fs.PreemptionStrategies) > 0 {
+ validStrategy := false
+ for _, s := range validStrategySets {
+ if slices.Equal(s, fs.PreemptionStrategies) {
+ validStrategy = true
+ break
+ }
}
- if strategies.Has(s) {
- allErrs = append(allErrs, field.Duplicate(path, s))
+ if !validStrategy {
+ allErrs = append(allErrs, field.NotSupported(fsPreemptionStrategiesPath, fs.PreemptionStrategies, validStrategySetsStr))
}
- strategies.Insert(s)
}
return allErrs
}
diff --git a/pkg/config/validation_test.go b/pkg/config/validation_test.go
index 10d69c597f..71357f2ac2 100644
--- a/pkg/config/validation_test.go
+++ b/pkg/config/validation_test.go
@@ -363,7 +363,7 @@ func TestValidate(t *testing.T) {
},
},
},
- "Unknown and duplicated preemption strategies": {
+ "unsupported preemption strategy": {
cfg: &configapi.Configuration{
Integrations: defaultIntegrations,
FairSharing: &configapi.FairSharing{
@@ -374,11 +374,7 @@ func TestValidate(t *testing.T) {
wantErr: field.ErrorList{
&field.Error{
Type: field.ErrorTypeNotSupported,
- Field: "fairSharing.preemptionStrategies[1]",
- },
- &field.Error{
- Type: field.ErrorTypeDuplicate,
- Field: "fairSharing.preemptionStrategies[3]",
+ Field: "fairSharing.preemptionStrategies",
},
},
},
diff --git a/pkg/scheduler/preemption/preemption.go b/pkg/scheduler/preemption/preemption.go
index 1b74d28d98..5ca6425ab9 100644
--- a/pkg/scheduler/preemption/preemption.go
+++ b/pkg/scheduler/preemption/preemption.go
@@ -288,19 +288,19 @@ func lessThanInitialShare(preemptorNewShare, preempteeOldShare, _ int) bool {
// This function takes advantage of the properties of the preemption algorithm and the strategies.
// The number of functions returned might not match the input slice.
func parseStrategies(s []config.PreemptionStrategy) []fsStrategy {
- result := []fsStrategy{lessThanOrEqualToFinalShare}
if len(s) == 0 {
- return result
+ return []fsStrategy{lessThanOrEqualToFinalShare, lessThanInitialShare}
}
- if s[0] == config.LessThanInitialShare {
- result[0] = lessThanInitialShare
- // This rule is a superset of the other rule, no need to check other strategies.
- return result
- }
- if len(s) == 1 {
- return result
+ strategies := make([]fsStrategy, len(s))
+ for i, strategy := range s {
+ switch strategy {
+ case config.LessThanOrEqualToFinalShare:
+ strategies[i] = lessThanOrEqualToFinalShare
+ case config.LessThanInitialShare:
+ strategies[i] = lessThanInitialShare
+ }
}
- return append(result, lessThanInitialShare)
+ return strategies
}
func (p *Preemptor) fairPreemptions(wl *workload.Info, assignment flavorassigner.Assignment, snapshot *cache.Snapshot, resPerFlv resourcesPerFlavor, candidates []*workload.Info, allowBorrowingBelowPriority *int32) []*workload.Info {
@@ -359,8 +359,8 @@ func (p *Preemptor) fairPreemptions(wl *workload.Info, assignment flavorassigner
for cqHeap.Len() > 0 && !fits {
candCQ := cqHeap.Pop()
- // We can only reach here if the second strategy is LessThanInitialShare, in which
- // case the last parameter for the strategy function is irrelevant.
+ // Due to API validation, we can only reach here if the second strategy is LessThanInitialShare,
+ // in which case the last parameter for the strategy function is irrelevant.
if p.fsStrategies[1](newNominatedShareValue, candCQ.share, 0) {
// The criteria doesn't depend on the preempted workload, so just preempt the first candidate.
candWl := candCQ.workloads[0]
diff --git a/pkg/scheduler/preemption/preemption_test.go b/pkg/scheduler/preemption/preemption_test.go
index d425da9033..d6b06cb1c4 100644
--- a/pkg/scheduler/preemption/preemption_test.go
+++ b/pkg/scheduler/preemption/preemption_test.go
@@ -1651,7 +1651,6 @@ func TestFairPreemptions(t *testing.T) {
},
"preempt from target and others even if over nominal": {
clusterQueues: baseCQs,
- strategies: []config.PreemptionStrategy{config.LessThanOrEqualToFinalShare, config.LessThanInitialShare},
admitted: []kueue.Workload{
*utiltesting.MakeWorkload("a1_low", "").Priority(-1).Request(corev1.ResourceCPU, "2").SimpleReserveQuota("a", "default", now).Obj(),
*utiltesting.MakeWorkload("a2_low", "").Priority(-1).Request(corev1.ResourceCPU, "1").SimpleReserveQuota("a", "default", now).Obj(),
@@ -1664,7 +1663,6 @@ func TestFairPreemptions(t *testing.T) {
},
"prefer to preempt workloads that don't make the target CQ have the biggest share": {
clusterQueues: baseCQs,
- strategies: []config.PreemptionStrategy{config.LessThanOrEqualToFinalShare, config.LessThanInitialShare},
admitted: []kueue.Workload{
*utiltesting.MakeWorkload("b1", "").Request(corev1.ResourceCPU, "2").SimpleReserveQuota("b", "default", now).Obj(),
*utiltesting.MakeWorkload("b2", "").Request(corev1.ResourceCPU, "1").SimpleReserveQuota("b", "default", now).Obj(),
@@ -1678,7 +1676,6 @@ func TestFairPreemptions(t *testing.T) {
},
"preempt from different cluster queues if the end result has a smaller max share": {
clusterQueues: baseCQs,
- strategies: []config.PreemptionStrategy{config.LessThanOrEqualToFinalShare, config.LessThanInitialShare},
admitted: []kueue.Workload{
*utiltesting.MakeWorkload("b1", "").Request(corev1.ResourceCPU, "2").SimpleReserveQuota("b", "default", now).Obj(),
*utiltesting.MakeWorkload("b2", "").Request(corev1.ResourceCPU, "2.5").SimpleReserveQuota("b", "default", now).Obj(),
@@ -1691,7 +1688,6 @@ func TestFairPreemptions(t *testing.T) {
},
"scenario above does not flap": {
clusterQueues: baseCQs,
- strategies: []config.PreemptionStrategy{config.LessThanOrEqualToFinalShare, config.LessThanInitialShare},
admitted: []kueue.Workload{
*utiltesting.MakeWorkload("a1", "").Request(corev1.ResourceCPU, "3.5").SimpleReserveQuota("a", "default", now).Obj(),
*utiltesting.MakeWorkload("b2", "").Request(corev1.ResourceCPU, "2.5").SimpleReserveQuota("b", "default", now).Obj(),
@@ -1702,7 +1698,6 @@ func TestFairPreemptions(t *testing.T) {
},
"cannot preempt if it would make the candidate CQ go under nominal after preempting one element": {
clusterQueues: baseCQs,
- strategies: []config.PreemptionStrategy{config.LessThanOrEqualToFinalShare, config.LessThanInitialShare},
admitted: []kueue.Workload{
*utiltesting.MakeWorkload("b1", "").Request(corev1.ResourceCPU, "3").SimpleReserveQuota("b", "default", now).Obj(),
*utiltesting.MakeWorkload("b2", "").Request(corev1.ResourceCPU, "3").SimpleReserveQuota("b", "default", now).Obj(),
diff --git a/site/content/en/docs/reference/kueue-config.v1beta1.md b/site/content/en/docs/reference/kueue-config.v1beta1.md
index 17f715c16e..883721d9e8 100644
--- a/site/content/en/docs/reference/kueue-config.v1beta1.md
+++ b/site/content/en/docs/reference/kueue-config.v1beta1.md
@@ -443,7 +443,7 @@ with the incoming workload is strictly less than the share of the preemptee CQ.
This strategy doesn't depend on the share usage of the workload being preempted.
As a result, the strategy chooses to preempt workloads with the lowest priority and
newest start time first.
-The default strategy is ["LessThanOrEqualToFinalShare"].
+The default strategy is ["LessThanOrEqualToFinalShare", "LessThanInitialShare"].
From a59ed8d4e89d47d69b2367b9440db6f4df107d43 Mon Sep 17 00:00:00 2001
From: Aldo Culquicondor
Date: Tue, 7 May 2024 20:14:40 +0000
Subject: [PATCH 09/10] Disable fair sharing by default
Change-Id: I6379c4304cfb45a08940d57a24b79c6b1564ccca
---
charts/kueue/values.yaml | 3 +++
config/components/manager/controller_manager_config.yaml | 6 +++---
2 files changed, 6 insertions(+), 3 deletions(-)
diff --git a/charts/kueue/values.yaml b/charts/kueue/values.yaml
index a382870214..d3f7cb3be1 100644
--- a/charts/kueue/values.yaml
+++ b/charts/kueue/values.yaml
@@ -104,6 +104,9 @@ managerConfig:
# - key: kubernetes.io/metadata.name
# operator: NotIn
# values: [ kube-system, kueue-system ]
+ # fairSharing:
+ # enable: true
+ # preemptionStrategies: [LessThanOrEqualToFinalShare]
# ports definition for metricsService and webhookService.
metricsService:
ports:
diff --git a/config/components/manager/controller_manager_config.yaml b/config/components/manager/controller_manager_config.yaml
index 5e01ceb534..0dff6140b3 100644
--- a/config/components/manager/controller_manager_config.yaml
+++ b/config/components/manager/controller_manager_config.yaml
@@ -50,6 +50,6 @@ integrations:
# - key: kubernetes.io/metadata.name
# operator: NotIn
# values: [ kube-system, kueue-system ]
-fairSharing:
- enable: true
- preemptionStrategies: [LessThanOrEqualToFinalShare]
\ No newline at end of file
+# fairSharing:
+# enable: true
+# preemptionStrategies: [LessThanOrEqualToFinalShare]
From 357b4991a0b2d5ba33bfb2a70da3bf9b333e5c17 Mon Sep 17 00:00:00 2001
From: Aldo Culquicondor
Date: Wed, 8 May 2024 15:58:40 +0000
Subject: [PATCH 10/10] Relax validation
Change-Id: I1ec9229cb866d31ece5046741ac7686157639758
---
charts/kueue/values.yaml | 2 +-
.../manager/controller_manager_config.yaml | 2 +-
pkg/config/validation.go | 3 ---
pkg/config/validation_test.go | 14 --------------
4 files changed, 2 insertions(+), 19 deletions(-)
diff --git a/charts/kueue/values.yaml b/charts/kueue/values.yaml
index d3f7cb3be1..828c92a7c8 100644
--- a/charts/kueue/values.yaml
+++ b/charts/kueue/values.yaml
@@ -106,7 +106,7 @@ managerConfig:
# values: [ kube-system, kueue-system ]
# fairSharing:
# enable: true
- # preemptionStrategies: [LessThanOrEqualToFinalShare]
+ # preemptionStrategies: [LessThanOrEqualToFinalShare, LessThanInitialShare]
# ports definition for metricsService and webhookService.
metricsService:
ports:
diff --git a/config/components/manager/controller_manager_config.yaml b/config/components/manager/controller_manager_config.yaml
index 0dff6140b3..8570b8bd7b 100644
--- a/config/components/manager/controller_manager_config.yaml
+++ b/config/components/manager/controller_manager_config.yaml
@@ -52,4 +52,4 @@ integrations:
# values: [ kube-system, kueue-system ]
# fairSharing:
# enable: true
-# preemptionStrategies: [LessThanOrEqualToFinalShare]
+# preemptionStrategies: [LessThanOrEqualToFinalShare, LessThanInitialShare]
diff --git a/pkg/config/validation.go b/pkg/config/validation.go
index 2744560020..d88b9e83f9 100644
--- a/pkg/config/validation.go
+++ b/pkg/config/validation.go
@@ -208,9 +208,6 @@ func validateFairSharing(c *configapi.Configuration) field.ErrorList {
return nil
}
var allErrs field.ErrorList
- if !fs.Enable && len(fs.PreemptionStrategies) != 0 {
- allErrs = append(allErrs, field.Invalid(fsPreemptionStrategiesPath, fs.PreemptionStrategies, "Must be empty when fair sharing is disabled"))
- }
if len(fs.PreemptionStrategies) > 0 {
validStrategy := false
for _, s := range validStrategySets {
diff --git a/pkg/config/validation_test.go b/pkg/config/validation_test.go
index 71357f2ac2..3a5e21bd96 100644
--- a/pkg/config/validation_test.go
+++ b/pkg/config/validation_test.go
@@ -349,20 +349,6 @@ func TestValidate(t *testing.T) {
},
},
},
- "preemption strategies when fair sharing is disabled": {
- cfg: &configapi.Configuration{
- Integrations: defaultIntegrations,
- FairSharing: &configapi.FairSharing{
- PreemptionStrategies: []configapi.PreemptionStrategy{configapi.LessThanOrEqualToFinalShare},
- },
- },
- wantErr: field.ErrorList{
- &field.Error{
- Type: field.ErrorTypeInvalid,
- Field: "fairSharing.preemptionStrategies",
- },
- },
- },
"unsupported preemption strategy": {
cfg: &configapi.Configuration{
Integrations: defaultIntegrations,