From 6054a7da8258b71d07e4026f12d28377421d24df Mon Sep 17 00:00:00 2001
From: Yuki Iwai
Date: Thu, 8 Feb 2024 22:21:19 +0900
Subject: [PATCH] WIP
Signed-off-by: Yuki Iwai
---
apis/config/v1beta1/configuration_types.go | 9 +-
apis/kueue/v1beta1/workload_types.go | 2 +-
cmd/kueue/main.go | 6 +-
pkg/config/config.go | 4 +
pkg/config/config_test.go | 38 +++++++-
pkg/config/validation.go | 17 ++++
pkg/config/validation_test.go | 29 ++++++
.../core/clusterqueue_controller_test.go | 4 +-
pkg/controller/core/core.go | 23 +++--
pkg/controller/core/workload_controller.go | 97 ++++++++++++++-----
pkg/queue/cluster_queue_impl.go | 8 +-
pkg/queue/cluster_queue_impl_test.go | 8 +-
pkg/queue/manager.go | 73 ++++++++++----
pkg/queue/manager_test.go | 34 ++++---
pkg/util/testing/wrappers.go | 17 ++++
.../api/rest/pending_workloads_cq_test.go | 4 +-
.../api/rest/pending_workloads_lq_test.go | 4 +-
pkg/webhooks/workload_webhook.go | 5 +
pkg/webhooks/workload_webhook_test.go | 19 +++-
pkg/workload/workload.go | 14 +++
pkg/workload/workload_test.go | 70 +++++++++++++
.../en/docs/reference/kueue-config.v1beta1.md | 12 ++-
.../jobs/job/job_controller_test.go | 2 +-
23 files changed, 415 insertions(+), 84 deletions(-)
diff --git a/apis/config/v1beta1/configuration_types.go b/apis/config/v1beta1/configuration_types.go
index fea2f98bba..7cdf5048b3 100644
--- a/apis/config/v1beta1/configuration_types.go
+++ b/apis/config/v1beta1/configuration_types.go
@@ -219,14 +219,19 @@ type MultiKueue struct {
type RequeuingStrategy struct {
// Timestamp defines the timestamp used for requeuing a Workload
- // that was evicted due to Pod readiness.
+ // that was evicted due to Pod readiness. The possible values are:
+ //
+ // - `Eviction` (default): indicates from Workload .metadata.creationTimestamp.
+ // - `Creation`: indicates from Workload .status.conditions.
//
- // Defaults to Eviction.
// +optional
Timestamp *RequeuingTimestamp `json:"timestamp,omitempty"`
// BackoffLimitCount defines the maximum number of requeuing retries.
// When the number is reached, the workload is deactivated (`.spec.activate`=`false`).
+ // The duration until the workload is deactivated is calculated by "Rand+SUM[k=0,n](1.41891^n)"
+ // where the "n" means the "workloadStatus.requeueState.count-1", and the "Rand" means the jitter.
+ // Given that the "backoffLimitCount" equals "30", the result approximately equals 24 hours.
//
// Defaults to null.
// +optional
diff --git a/apis/kueue/v1beta1/workload_types.go b/apis/kueue/v1beta1/workload_types.go
index 535a651aa5..0e04cda1c0 100644
--- a/apis/kueue/v1beta1/workload_types.go
+++ b/apis/kueue/v1beta1/workload_types.go
@@ -151,7 +151,7 @@ type WorkloadStatus struct {
// changed once set.
Admission *Admission `json:"admission,omitempty"`
- // requeueState holds the state of the requeued Workload according to the requeueing strategy.
+ // requeueState holds the state of the requeued Workload according to the requeuing strategy.
//
// +optional
RequeueState *RequeueState `json:"requeueState,omitempty"`
diff --git a/cmd/kueue/main.go b/cmd/kueue/main.go
index 9e88de816a..78d22f85ce 100644
--- a/cmd/kueue/main.go
+++ b/cmd/kueue/main.go
@@ -336,11 +336,7 @@ func setupServerVersionFetcher(mgr ctrl.Manager, kubeConfig *rest.Config) *kubev
}
func blockForPodsReady(cfg *configapi.Configuration) bool {
- return waitForPodsReady(cfg) && cfg.WaitForPodsReady.BlockAdmission != nil && *cfg.WaitForPodsReady.BlockAdmission
-}
-
-func waitForPodsReady(cfg *configapi.Configuration) bool {
- return cfg.WaitForPodsReady != nil && cfg.WaitForPodsReady.Enable
+ return config.WaitForPodsReadyIsEnabled(cfg) && cfg.WaitForPodsReady.BlockAdmission != nil && *cfg.WaitForPodsReady.BlockAdmission
}
func podsReadyRequeuingTimestamp(cfg *configapi.Configuration) configapi.RequeuingTimestamp {
diff --git a/pkg/config/config.go b/pkg/config/config.go
index 53f6294d04..c618ae5e64 100644
--- a/pkg/config/config.go
+++ b/pkg/config/config.go
@@ -168,3 +168,7 @@ func Load(scheme *runtime.Scheme, configFile string) (ctrl.Options, configapi.Co
addTo(&options, &cfg)
return options, cfg, err
}
+
+func WaitForPodsReadyIsEnabled(cfg *configapi.Configuration) bool {
+ return cfg.WaitForPodsReady != nil && cfg.WaitForPodsReady.Enable
+}
diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go
index 6e7a16de1c..109b1c8c96 100644
--- a/pkg/config/config_test.go
+++ b/pkg/config/config_test.go
@@ -156,6 +156,9 @@ apiVersion: config.kueue.x-k8s.io/v1beta1
kind: Configuration
waitForPodsReady:
enable: true
+ requeuingStrategy:
+ timestamp: Creation
+ backoffLimitCount: 10
`), os.FileMode(0600)); err != nil {
t.Fatal(err)
}
@@ -543,7 +546,8 @@ multiKueue:
BlockAdmission: ptr.To(true),
Timeout: &metav1.Duration{Duration: 5 * time.Minute},
RequeuingStrategy: &configapi.RequeuingStrategy{
- Timestamp: ptr.To(configapi.EvictionTimestamp),
+ Timestamp: ptr.To(configapi.CreationTimestamp),
+ BackoffLimitCount: ptr.To[int32](10),
},
},
ClientConnection: defaultClientConnection,
@@ -938,3 +942,35 @@ func TestEncode(t *testing.T) {
})
}
}
+
+func TestWaitForPodsReadyIsEnabled(t *testing.T) {
+ cases := map[string]struct {
+ cfg *configapi.Configuration
+ want bool
+ }{
+ "cfg.waitForPodsReady is null": {
+ cfg: &configapi.Configuration{},
+ },
+ "cfg.WaitForPodsReadyIsEnabled.enable is false": {
+ cfg: &configapi.Configuration{
+ WaitForPodsReady: &configapi.WaitForPodsReady{},
+ },
+ },
+ "waitForPodsReady is true": {
+ cfg: &configapi.Configuration{
+ WaitForPodsReady: &configapi.WaitForPodsReady{
+ Enable: true,
+ },
+ },
+ want: true,
+ },
+ }
+ for name, tc := range cases {
+ t.Run(name, func(t *testing.T) {
+ got := WaitForPodsReadyIsEnabled(tc.cfg)
+ if tc.want != got {
+ t.Errorf("Unexpected result from WaitForPodsReadyIsEnabled\nwant:\n%v\ngot:%v\n", tc.want, got)
+ }
+ })
+ }
+}
diff --git a/pkg/config/validation.go b/pkg/config/validation.go
index 0c85c40d16..b0da023f1a 100644
--- a/pkg/config/validation.go
+++ b/pkg/config/validation.go
@@ -39,11 +39,15 @@ var (
integrationsFrameworksPath = integrationsPath.Child("frameworks")
podOptionsPath = integrationsPath.Child("podOptions")
namespaceSelectorPath = podOptionsPath.Child("namespaceSelector")
+ waitForPodsReadyPath = field.NewPath("waitForPodsReady")
+ requeuingStrategyPath = waitForPodsReadyPath.Child("requeuingStrategy")
)
func validate(c *configapi.Configuration) field.ErrorList {
var allErrs field.ErrorList
+ allErrs = append(allErrs, validateWaitForPodsReady(c)...)
+
allErrs = append(allErrs, validateQueueVisibility(c)...)
// Validate PodNamespaceSelector for the pod framework
@@ -52,6 +56,19 @@ func validate(c *configapi.Configuration) field.ErrorList {
return allErrs
}
+func validateWaitForPodsReady(c *configapi.Configuration) field.ErrorList {
+ var allErrs field.ErrorList
+ if !WaitForPodsReadyIsEnabled(c) {
+ return allErrs
+ }
+ if strategy := c.WaitForPodsReady.RequeuingStrategy; strategy != nil && strategy.Timestamp != nil &&
+ *strategy.Timestamp != configapi.CreationTimestamp && *strategy.Timestamp != configapi.EvictionTimestamp {
+ allErrs = append(allErrs, field.NotSupported(requeuingStrategyPath.Child("timestamp"),
+ strategy.Timestamp, []configapi.RequeuingTimestamp{configapi.CreationTimestamp, configapi.EvictionTimestamp}))
+ }
+ return allErrs
+}
+
func validateQueueVisibility(cfg *configapi.Configuration) field.ErrorList {
var allErrs field.ErrorList
if cfg.QueueVisibility != nil {
diff --git a/pkg/config/validation_test.go b/pkg/config/validation_test.go
index a7abd25a4c..68f308dba4 100644
--- a/pkg/config/validation_test.go
+++ b/pkg/config/validation_test.go
@@ -213,6 +213,35 @@ func TestValidate(t *testing.T) {
},
wantErr: nil,
},
+ "no supported waitForPodsReady.requeuingStrategy.timestamp": {
+ cfg: &configapi.Configuration{
+ Integrations: defaultIntegrations,
+ WaitForPodsReady: &configapi.WaitForPodsReady{
+ Enable: true,
+ RequeuingStrategy: &configapi.RequeuingStrategy{
+ Timestamp: ptr.To[configapi.RequeuingTimestamp]("NoSupported"),
+ },
+ },
+ },
+ wantErr: field.ErrorList{
+ &field.Error{
+ Type: field.ErrorTypeNotSupported,
+ Field: "waitForPodsReady.requeuingStrategy.timestamp",
+ },
+ },
+ },
+ "supported waitForPodsReady.requeuingStrategy.timestamp": {
+ cfg: &configapi.Configuration{
+ Integrations: defaultIntegrations,
+ WaitForPodsReady: &configapi.WaitForPodsReady{
+ Enable: true,
+ RequeuingStrategy: &configapi.RequeuingStrategy{
+ Timestamp: ptr.To(configapi.CreationTimestamp),
+ },
+ },
+ },
+ wantErr: nil,
+ },
}
for name, tc := range testCases {
diff --git a/pkg/controller/core/clusterqueue_controller_test.go b/pkg/controller/core/clusterqueue_controller_test.go
index fa5f1c698e..827d9b50cc 100644
--- a/pkg/controller/core/clusterqueue_controller_test.go
+++ b/pkg/controller/core/clusterqueue_controller_test.go
@@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
+ testingclock "k8s.io/utils/clock/testing"
"k8s.io/utils/ptr"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
@@ -47,6 +48,7 @@ func TestUpdateCqStatusIfChanged(t *testing.T) {
*utiltesting.MakeWorkload("beta", "").Queue(lqName).Obj(),
},
}
+ fakeClock := testingclock.NewFakeClock(time.Now())
testCases := map[string]struct {
cqStatus kueue.ClusterQueueStatus
@@ -198,7 +200,7 @@ func TestUpdateCqStatusIfChanged(t *testing.T) {
qManager: qManager,
}
if tc.newWl != nil {
- r.qManager.AddOrUpdateWorkload(tc.newWl)
+ r.qManager.AddOrUpdateWorkload(ctx, tc.newWl, fakeClock)
}
err := r.updateCqStatusIfChanged(ctx, cq, tc.newConditionStatus, tc.newReason, tc.newMessage)
if err != nil {
diff --git a/pkg/controller/core/core.go b/pkg/controller/core/core.go
index 099623b208..b29eb39aa7 100644
--- a/pkg/controller/core/core.go
+++ b/pkg/controller/core/core.go
@@ -21,8 +21,9 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
- config "sigs.k8s.io/kueue/apis/config/v1beta1"
+ configapi "sigs.k8s.io/kueue/apis/config/v1beta1"
"sigs.k8s.io/kueue/pkg/cache"
+ "sigs.k8s.io/kueue/pkg/config"
"sigs.k8s.io/kueue/pkg/constants"
"sigs.k8s.io/kueue/pkg/queue"
)
@@ -31,7 +32,7 @@ const updateChBuffer = 10
// SetupControllers sets up the core controllers. It returns the name of the
// controller that failed to create and an error, if any.
-func SetupControllers(mgr ctrl.Manager, qManager *queue.Manager, cc *cache.Cache, cfg *config.Configuration) (string, error) {
+func SetupControllers(mgr ctrl.Manager, qManager *queue.Manager, cc *cache.Cache, cfg *configapi.Configuration) (string, error) {
rfRec := NewResourceFlavorReconciler(mgr.GetClient(), qManager, cc)
if err := rfRec.SetupWithManager(mgr, cfg); err != nil {
return "ResourceFlavor", err
@@ -65,27 +66,35 @@ func SetupControllers(mgr ctrl.Manager, qManager *queue.Manager, cc *cache.Cache
if err := NewWorkloadReconciler(mgr.GetClient(), qManager, cc,
mgr.GetEventRecorderFor(constants.WorkloadControllerName),
WithWorkloadUpdateWatchers(qRec, cqRec),
- WithPodsReadyTimeout(podsReadyTimeout(cfg))).SetupWithManager(mgr, cfg); err != nil {
+ WithPodsReadyTimeout(podsReadyTimeout(cfg)),
+ WithRequeuingBackoffLimitCount(requeuingBackoffLimitCount(cfg))).SetupWithManager(mgr, cfg); err != nil {
return "Workload", err
}
return "", nil
}
-func podsReadyTimeout(cfg *config.Configuration) *time.Duration {
- if cfg.WaitForPodsReady != nil && cfg.WaitForPodsReady.Enable && cfg.WaitForPodsReady.Timeout != nil {
+func podsReadyTimeout(cfg *configapi.Configuration) *time.Duration {
+ if config.WaitForPodsReadyIsEnabled(cfg) && cfg.WaitForPodsReady.Timeout != nil {
return &cfg.WaitForPodsReady.Timeout.Duration
}
return nil
}
-func queueVisibilityUpdateInterval(cfg *config.Configuration) time.Duration {
+func requeuingBackoffLimitCount(cfg *configapi.Configuration) *int32 {
+ if config.WaitForPodsReadyIsEnabled(cfg) && cfg.WaitForPodsReady.RequeuingStrategy != nil {
+ return cfg.WaitForPodsReady.RequeuingStrategy.BackoffLimitCount
+ }
+ return nil
+}
+
+func queueVisibilityUpdateInterval(cfg *configapi.Configuration) time.Duration {
if cfg.QueueVisibility != nil {
return time.Duration(cfg.QueueVisibility.UpdateIntervalSeconds) * time.Second
}
return 0
}
-func queueVisibilityClusterQueuesMaxCount(cfg *config.Configuration) int32 {
+func queueVisibilityClusterQueuesMaxCount(cfg *configapi.Configuration) int32 {
if cfg.QueueVisibility != nil && cfg.QueueVisibility.ClusterQueues != nil {
return cfg.QueueVisibility.ClusterQueues.MaxCount
}
diff --git a/pkg/controller/core/workload_controller.go b/pkg/controller/core/workload_controller.go
index 7e9eefaac2..f861fea351 100644
--- a/pkg/controller/core/workload_controller.go
+++ b/pkg/controller/core/workload_controller.go
@@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
+ "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
@@ -63,8 +64,9 @@ var (
)
type options struct {
- watchers []WorkloadUpdateWatcher
- podsReadyTimeout *time.Duration
+ watchers []WorkloadUpdateWatcher
+ podsReadyTimeout *time.Duration
+ requeuingBackoffLimitCount *int32
}
// Option configures the reconciler.
@@ -78,6 +80,14 @@ func WithPodsReadyTimeout(value *time.Duration) Option {
}
}
+// WithRequeuingBackoffLimitCount indicates if the controller should deactivate a workload
+// if it reaches the limitation.
+func WithRequeuingBackoffLimitCount(value *int32) Option {
+ return func(o *options) {
+ o.requeuingBackoffLimitCount = value
+ }
+}
+
// WithWorkloadUpdateWatchers allows to specify the workload update watchers
func WithWorkloadUpdateWatchers(value ...WorkloadUpdateWatcher) Option {
return func(o *options) {
@@ -93,13 +103,14 @@ type WorkloadUpdateWatcher interface {
// WorkloadReconciler reconciles a Workload object
type WorkloadReconciler struct {
- log logr.Logger
- queues *queue.Manager
- cache *cache.Cache
- client client.Client
- watchers []WorkloadUpdateWatcher
- podsReadyTimeout *time.Duration
- recorder record.EventRecorder
+ log logr.Logger
+ queues *queue.Manager
+ cache *cache.Cache
+ client client.Client
+ watchers []WorkloadUpdateWatcher
+ podsReadyTimeout *time.Duration
+ requeuingBackoffLimitCount *int32
+ recorder record.EventRecorder
}
func NewWorkloadReconciler(client client.Client, queues *queue.Manager, cache *cache.Cache, recorder record.EventRecorder, opts ...Option) *WorkloadReconciler {
@@ -109,13 +120,14 @@ func NewWorkloadReconciler(client client.Client, queues *queue.Manager, cache *c
}
return &WorkloadReconciler{
- log: ctrl.Log.WithName("workload-reconciler"),
- client: client,
- queues: queues,
- cache: cache,
- watchers: options.watchers,
- podsReadyTimeout: options.podsReadyTimeout,
- recorder: recorder,
+ log: ctrl.Log.WithName("workload-reconciler"),
+ client: client,
+ queues: queues,
+ cache: cache,
+ watchers: options.watchers,
+ podsReadyTimeout: options.podsReadyTimeout,
+ requeuingBackoffLimitCount: options.requeuingBackoffLimitCount,
+ recorder: recorder,
}
}
@@ -329,12 +341,47 @@ func (r *WorkloadReconciler) reconcileNotReadyTimeout(ctx context.Context, req c
if recheckAfter > 0 {
log.V(4).Info("Workload not yet ready and did not exceed its timeout", "recheckAfter", recheckAfter)
return ctrl.Result{RequeueAfter: recheckAfter}, nil
- } else {
- log.V(2).Info("Start the eviction of the workload due to exceeding the PodsReady timeout")
- workload.SetEvictedCondition(wl, kueue.WorkloadEvictedByPodsReadyTimeout, fmt.Sprintf("Exceeded the PodsReady timeout %s", req.NamespacedName.String()))
- err := workload.ApplyAdmissionStatus(ctx, r.client, wl, false)
- return ctrl.Result{}, client.IgnoreNotFound(err)
}
+ log.V(2).Info("Start the eviction of the workload due to exceeding the PodsReady timeout")
+ if deactivated, err := r.backoffRequeuing(ctx, wl, realClock); err != nil || deactivated {
+ return ctrl.Result{}, err
+ }
+ workload.SetEvictedCondition(wl, kueue.WorkloadEvictedByPodsReadyTimeout, fmt.Sprintf("Exceeded the PodsReady timeout %s", req.NamespacedName.String()))
+ err := workload.ApplyAdmissionStatus(ctx, r.client, wl, true)
+ return ctrl.Result{}, client.IgnoreNotFound(err)
+}
+
+func (r *WorkloadReconciler) backoffRequeuing(ctx context.Context, wl *kueue.Workload, clock clock.Clock) (bool, error) {
+ if r.queues.RequeuingWorkloads().Has(workload.Key(wl)) || apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadEvicted) {
+ return false, nil
+ }
+ if !workload.HasRequeueState(wl) {
+ wl.Status.RequeueState = &kueue.RequeueState{}
+ }
+ // If requeuingBackoffLimitCount equals to null, the workloads is repeatedly and endless re-queued.
+ requeuingCount := ptr.Deref(wl.Status.RequeueState.Count, 0) + 1
+ if r.requeuingBackoffLimitCount != nil && requeuingCount > *r.requeuingBackoffLimitCount {
+ wl.Spec.Active = ptr.To(false)
+ r.recorder.Eventf(wl, corev1.EventTypeNormal, "WorkloadDeactivated",
+ "Deactivated Workload %q by reached re-queue backoffLimitCount", klog.KObj(wl))
+ return true, r.client.Update(ctx, wl)
+ }
+ // The duration until the workload is deactivated is calculated by "Rand+SUM[k=0,n](1.41891^n)"
+ // where the "n" means the "requeuingCount-1", and the "Rand" means the jitter.
+ // Given that the "backoffLimitCount" equals "30", the result approximately equals 24 hours.
+ backoff := &wait.Backoff{
+ Duration: 1 * time.Second,
+ Factor: 1.41891,
+ Jitter: 0.0001,
+ Steps: int(requeuingCount),
+ }
+ var waitTime time.Duration
+ for i := int32(0); i < requeuingCount; i++ {
+ waitTime = backoff.Step()
+ }
+ wl.Status.RequeueState.RequeueAt = ptr.To(metav1.NewTime(clock.Now().Add(waitTime)))
+ wl.Status.RequeueState.Count = &requeuingCount
+ return false, nil
}
func (r *WorkloadReconciler) Create(e event.CreateEvent) bool {
@@ -357,7 +404,7 @@ func (r *WorkloadReconciler) Create(e event.CreateEvent) bool {
workload.AdjustResources(ctx, r.client, wlCopy)
if !workload.HasQuotaReservation(wl) {
- if !r.queues.AddOrUpdateWorkload(wlCopy) {
+ if !r.queues.AddOrUpdateWorkload(ctx, wlCopy, realClock) {
log.V(2).Info("Queue for workload didn't exist; ignored for now")
}
return true
@@ -461,7 +508,7 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool {
})
case prevStatus == pending && status == pending:
- if !r.queues.UpdateWorkload(oldWl, wlCopy) {
+ if !r.queues.UpdateWorkload(ctx, oldWl, wlCopy, realClock) {
log.V(2).Info("Queue for updated workload didn't exist; ignoring for now")
}
@@ -480,7 +527,7 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool {
log.Error(err, "Failed to delete workload from cache")
}
})
- if !r.queues.AddOrUpdateWorkload(wlCopy) {
+ if !r.queues.AddOrUpdateWorkload(ctx, wlCopy, realClock) {
log.V(2).Info("Queue for workload didn't exist; ignored for now")
}
@@ -620,7 +667,7 @@ func (h *resourceUpdatesHandler) queueReconcileForPending(ctx context.Context, _
log := log.WithValues("workload", klog.KObj(wlCopy))
log.V(5).Info("Queue reconcile for")
workload.AdjustResources(ctrl.LoggerInto(ctx, log), h.r.client, wlCopy)
- if !h.r.queues.AddOrUpdateWorkload(wlCopy) {
+ if !h.r.queues.AddOrUpdateWorkload(ctx, wlCopy, realClock) {
log.V(2).Info("Queue for workload didn't exist")
}
}
diff --git a/pkg/queue/cluster_queue_impl.go b/pkg/queue/cluster_queue_impl.go
index 1d898da5a6..3652719b18 100644
--- a/pkg/queue/cluster_queue_impl.go
+++ b/pkg/queue/cluster_queue_impl.go
@@ -46,7 +46,7 @@ type clusterQueueBase struct {
inadmissibleWorkloads map[string]*workload.Info
// popCycle identifies the last call to Pop. It's incremented when calling Pop.
- // popCycle and queueInadmissibleCycle are used to track when there is a requeueing
+ // popCycle and queueInadmissibleCycle are used to track when there is a requeuing
// of inadmissible workloads while a workload is being scheduled.
popCycle int64
@@ -116,6 +116,10 @@ func (c *clusterQueueBase) PushOrUpdate(wInfo *workload.Info) {
// otherwise move or update in place in the queue.
delete(c.inadmissibleWorkloads, key)
}
+ if workload.HasRequeueState(wInfo.Obj) {
+ c.inadmissibleWorkloads[key] = wInfo
+ return
+ }
c.heap.PushOrUpdate(wInfo)
}
@@ -148,7 +152,7 @@ func (c *clusterQueueBase) requeueIfNotPresent(wInfo *workload.Info, immediate b
c.rwm.Lock()
defer c.rwm.Unlock()
key := workload.Key(wInfo.Obj)
- if immediate || c.queueInadmissibleCycle >= c.popCycle || wInfo.LastAssignment.PendingFlavors() {
+ if immediate || c.queueInadmissibleCycle >= c.popCycle || wInfo.LastAssignment.PendingFlavors() || workload.HasRequeueState(wInfo.Obj) {
// If the workload was inadmissible, move it back into the queue.
inadmissibleWl := c.inadmissibleWorkloads[key]
if inadmissibleWl != nil {
diff --git a/pkg/queue/cluster_queue_impl_test.go b/pkg/queue/cluster_queue_impl_test.go
index c1061e414c..85f01f4e64 100644
--- a/pkg/queue/cluster_queue_impl_test.go
+++ b/pkg/queue/cluster_queue_impl_test.go
@@ -305,7 +305,7 @@ func TestClusterQueueImpl(t *testing.T) {
if test.queueInadmissibleWorkloads {
if diff := cmp.Diff(test.wantInadmissibleWorkloadsRequeued,
cq.QueueInadmissibleWorkloads(context.Background(), cl)); diff != "" {
- t.Errorf("Unexpected requeueing of inadmissible workloads (-want,+got):\n%s", diff)
+ t.Errorf("Unexpected requeuing of inadmissible workloads (-want,+got):\n%s", diff)
}
}
@@ -340,7 +340,7 @@ func TestQueueInadmissibleWorkloadsDuringScheduling(t *testing.T) {
t.Errorf("Unexpected active workloads before events (-want,+got):\n%s", diff)
}
- // Simulate requeueing during scheduling attempt.
+ // Simulate requeuing during scheduling attempt.
head := cq.Pop()
cq.QueueInadmissibleWorkloads(ctx, cl)
cq.requeueIfNotPresent(head, false)
@@ -348,10 +348,10 @@ func TestQueueInadmissibleWorkloadsDuringScheduling(t *testing.T) {
activeWorkloads, _ = cq.Dump()
wantActiveWorkloads = []string{workload.Key(wl)}
if diff := cmp.Diff(wantActiveWorkloads, activeWorkloads, cmpDump...); diff != "" {
- t.Errorf("Unexpected active workloads after scheduling with requeueing (-want,+got):\n%s", diff)
+ t.Errorf("Unexpected active workloads after scheduling with requeuing (-want,+got):\n%s", diff)
}
- // Simulating scheduling again without requeueing.
+ // Simulating scheduling again without requeuing.
head = cq.Pop()
cq.requeueIfNotPresent(head, false)
activeWorkloads, _ = cq.Dump()
diff --git a/pkg/queue/manager.go b/pkg/queue/manager.go
index f7f0b33db7..a3fd9bbcec 100644
--- a/pkg/queue/manager.go
+++ b/pkg/queue/manager.go
@@ -20,13 +20,14 @@ import (
"context"
"errors"
"fmt"
- "sync"
-
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/sets"
+ "k8s.io/klog/v2"
+ "k8s.io/utils/clock"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
+ "sync"
config "sigs.k8s.io/kueue/apis/config/v1beta1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
@@ -64,10 +65,11 @@ type Manager struct {
sync.RWMutex
cond sync.Cond
- client client.Client
- statusChecker StatusChecker
- clusterQueues map[string]ClusterQueue
- localQueues map[string]*LocalQueue
+ client client.Client
+ statusChecker StatusChecker
+ clusterQueues map[string]ClusterQueue
+ localQueues map[string]*LocalQueue
+ requeuingWorkloads sets.Set[string]
snapshotsMutex sync.RWMutex
snapshots map[string][]kueue.ClusterQueuePendingWorkload
@@ -84,13 +86,14 @@ func NewManager(client client.Client, checker StatusChecker, opts ...Option) *Ma
opt(&options)
}
m := &Manager{
- client: client,
- statusChecker: checker,
- localQueues: make(map[string]*LocalQueue),
- clusterQueues: make(map[string]ClusterQueue),
- cohorts: make(map[string]sets.Set[string]),
- snapshotsMutex: sync.RWMutex{},
- snapshots: make(map[string][]kueue.ClusterQueuePendingWorkload, 0),
+ client: client,
+ statusChecker: checker,
+ requeuingWorkloads: sets.New[string](),
+ localQueues: make(map[string]*LocalQueue),
+ clusterQueues: make(map[string]ClusterQueue),
+ cohorts: make(map[string]sets.Set[string]),
+ snapshotsMutex: sync.RWMutex{},
+ snapshots: make(map[string][]kueue.ClusterQueuePendingWorkload, 0),
workloadOrdering: workload.Ordering{
PodsReadyRequeuingTimestamp: options.podsReadyRequeuingTimestamp,
},
@@ -291,13 +294,13 @@ func (m *Manager) ClusterQueueForWorkload(wl *kueue.Workload) (string, bool) {
// AddOrUpdateWorkload adds or updates workload to the corresponding queue.
// Returns whether the queue existed.
-func (m *Manager) AddOrUpdateWorkload(w *kueue.Workload) bool {
+func (m *Manager) AddOrUpdateWorkload(ctx context.Context, w *kueue.Workload, clock clock.Clock) bool {
m.Lock()
defer m.Unlock()
- return m.addOrUpdateWorkload(w)
+ return m.addOrUpdateWorkload(ctx, w, clock)
}
-func (m *Manager) addOrUpdateWorkload(w *kueue.Workload) bool {
+func (m *Manager) addOrUpdateWorkload(ctx context.Context, w *kueue.Workload, clock clock.Clock) bool {
qKey := workload.QueueKey(w)
q := m.localQueues[qKey]
if q == nil {
@@ -311,10 +314,44 @@ func (m *Manager) addOrUpdateWorkload(w *kueue.Workload) bool {
}
cq.PushOrUpdate(wInfo)
m.reportPendingWorkloads(q.ClusterQueue, cq)
+ wKey := workload.Key(wInfo.Obj)
+ if workload.HasRequeueState(wInfo.Obj) && !m.requeuingWorkloads.Has(wKey) {
+ m.requeuingWorkloads.Insert(wKey)
+ go func() {
+ m.backoffRequeue(ctx, wInfo, clock)
+ }()
+ }
m.Broadcast()
return true
}
+func (m *Manager) backoffRequeue(ctx context.Context, wInfo *workload.Info, clock clock.Clock) {
+ defer func() {
+ m.Lock()
+ m.requeuingWorkloads.Delete(workload.Key(wInfo.Obj))
+ m.Unlock()
+ }()
+ log := ctrl.LoggerFrom(ctx)
+ select {
+ case <-ctx.Done():
+ log.V(5).Info("Context cancelled when waiting for reached to requeueAt")
+
+ case <-clock.After(clock.Since(wInfo.Obj.Status.RequeueState.RequeueAt.Time)):
+ added := m.RequeueWorkload(ctx, wInfo, RequeueReasonGeneric)
+ log.V(2).Info("Workload re-queued", "workload", klog.KObj(wInfo.Obj),
+ "clusterQueue", klog.KRef("", wInfo.ClusterQueue),
+ "queue", klog.KRef(wInfo.Obj.Namespace, wInfo.Obj.Spec.QueueName),
+ "requeueReason", RequeueReasonGeneric,
+ "added", added)
+ }
+}
+
+func (m *Manager) RequeuingWorkloads() sets.Set[string] {
+ m.RLock()
+ defer m.RUnlock()
+ return m.requeuingWorkloads
+}
+
// RequeueWorkload requeues the workload ensuring that the queue and the
// workload still exist in the client cache and not admitted. It won't
// requeue if the workload is already in the queue (possible if the workload was updated).
@@ -448,13 +485,13 @@ func (m *Manager) queueAllInadmissibleWorkloadsInCohort(ctx context.Context, cq
// UpdateWorkload updates the workload to the corresponding queue or adds it if
// it didn't exist. Returns whether the queue existed.
-func (m *Manager) UpdateWorkload(oldW, w *kueue.Workload) bool {
+func (m *Manager) UpdateWorkload(ctx context.Context, oldW, w *kueue.Workload, clock clock.Clock) bool {
m.Lock()
defer m.Unlock()
if oldW.Spec.QueueName != w.Spec.QueueName {
m.deleteWorkloadFromQueueAndClusterQueue(w, workload.QueueKey(oldW))
}
- return m.addOrUpdateWorkload(w)
+ return m.addOrUpdateWorkload(ctx, w, clock)
}
// CleanUpOnContext tracks the context. When closed, it wakes routines waiting
diff --git a/pkg/queue/manager_test.go b/pkg/queue/manager_test.go
index 1f2df1f74a..9bd0e0ffc3 100644
--- a/pkg/queue/manager_test.go
+++ b/pkg/queue/manager_test.go
@@ -30,6 +30,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
+ testingclock "k8s.io/utils/clock/testing"
"sigs.k8s.io/controller-runtime/pkg/client"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
@@ -256,6 +257,7 @@ func TestUpdateLocalQueue(t *testing.T) {
utiltesting.MakeLocalQueue("bar", "").ClusterQueue("cq2").Obj(),
}
now := time.Now()
+ fakeclock := testingclock.NewFakeClock(now)
workloads := []*kueue.Workload{
utiltesting.MakeWorkload("a", "").Queue("foo").Creation(now.Add(time.Second)).Obj(),
utiltesting.MakeWorkload("b", "").Queue("bar").Creation(now).Obj(),
@@ -278,7 +280,7 @@ func TestUpdateLocalQueue(t *testing.T) {
}
}
for _, w := range workloads {
- manager.AddOrUpdateWorkload(w)
+ manager.AddOrUpdateWorkload(ctx, w, fakeclock)
}
// Update cluster queue of first queue.
@@ -348,6 +350,8 @@ func TestAddWorkload(t *testing.T) {
t.Fatalf("Failed adding queue %s: %v", q.Name, err)
}
}
+ now := time.Now()
+ fakeclock := testingclock.NewFakeClock(now)
cases := []struct {
workload *kueue.Workload
wantAdded bool
@@ -392,7 +396,9 @@ func TestAddWorkload(t *testing.T) {
}
for _, tc := range cases {
t.Run(tc.workload.Name, func(t *testing.T) {
- if added := manager.AddOrUpdateWorkload(tc.workload); added != tc.wantAdded {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ if added := manager.AddOrUpdateWorkload(ctx, tc.workload, fakeclock); added != tc.wantAdded {
t.Errorf("AddWorkload returned %t, want %t", added, tc.wantAdded)
}
})
@@ -406,6 +412,7 @@ func TestStatus(t *testing.T) {
t.Fatalf("Failed adding kueue scheme: %s", err)
}
now := time.Now().Truncate(time.Second)
+ fakeclock := testingclock.NewFakeClock(now)
queues := []kueue.LocalQueue{
{
@@ -460,7 +467,7 @@ func TestStatus(t *testing.T) {
}
for _, wl := range workloads {
wl := wl
- manager.AddOrUpdateWorkload(&wl)
+ manager.AddOrUpdateWorkload(ctx, &wl, fakeclock)
}
cases := map[string]struct {
@@ -503,6 +510,7 @@ func TestRequeueWorkloadStrictFIFO(t *testing.T) {
utiltesting.MakeLocalQueue("foo", "").ClusterQueue("cq").Obj(),
utiltesting.MakeLocalQueue("bar", "").Obj(),
}
+ fakeclock := testingclock.NewFakeClock(time.Now())
cases := []struct {
workload *kueue.Workload
inClient bool
@@ -580,7 +588,7 @@ func TestRequeueWorkloadStrictFIFO(t *testing.T) {
}
}
if tc.inQueue {
- _ = manager.AddOrUpdateWorkload(tc.workload)
+ _ = manager.AddOrUpdateWorkload(ctx, tc.workload, fakeclock)
}
info := workload.NewInfo(tc.workload)
if requeued := manager.RequeueWorkload(ctx, info, RequeueReasonGeneric); requeued != tc.wantRequeued {
@@ -596,6 +604,7 @@ func TestUpdateWorkload(t *testing.T) {
t.Fatalf("Failed adding kueue scheme: %s", err)
}
now := time.Now()
+ fakeclock := testingclock.NewFakeClock(now)
cases := map[string]struct {
clusterQueues []*kueue.ClusterQueue
queues []*kueue.LocalQueue
@@ -732,11 +741,11 @@ func TestUpdateWorkload(t *testing.T) {
}
}
for _, w := range tc.workloads {
- manager.AddOrUpdateWorkload(w)
+ manager.AddOrUpdateWorkload(ctx, w, fakeclock)
}
wl := tc.workloads[0].DeepCopy()
tc.update(wl)
- if updated := manager.UpdateWorkload(tc.workloads[0], wl); updated != tc.wantUpdated {
+ if updated := manager.UpdateWorkload(ctx, tc.workloads[0], wl, fakeclock); updated != tc.wantUpdated {
t.Errorf("UpdatedWorkload returned %t, want %t", updated, tc.wantUpdated)
}
q := manager.localQueues[workload.QueueKey(wl)]
@@ -782,6 +791,7 @@ func TestHeads(t *testing.T) {
t.Fatalf("Failed adding kueue scheme: %s", err)
}
now := time.Now().Truncate(time.Second)
+ fakeclock := testingclock.NewFakeClock(now)
clusterQueues := []*kueue.ClusterQueue{
utiltesting.MakeClusterQueue("active-fooCq").Obj(),
@@ -849,7 +859,7 @@ func TestHeads(t *testing.T) {
go manager.CleanUpOnContext(ctx)
for _, wl := range tc.workloads {
- manager.AddOrUpdateWorkload(wl)
+ manager.AddOrUpdateWorkload(ctx, wl, fakeclock)
}
wlNames := sets.New[string]()
@@ -870,6 +880,7 @@ var ignoreTypeMeta = cmpopts.IgnoreTypes(metav1.TypeMeta{})
// asynchronously.
func TestHeadsAsync(t *testing.T) {
now := time.Now().Truncate(time.Second)
+ fakeclock := testingclock.NewFakeClock(now)
clusterQueues := []*kueue.ClusterQueue{
utiltesting.MakeClusterQueue("fooCq").Obj(),
utiltesting.MakeClusterQueue("barCq").Obj(),
@@ -907,7 +918,7 @@ func TestHeadsAsync(t *testing.T) {
if err := mgr.AddLocalQueue(ctx, &queues[0]); err != nil {
t.Errorf("Failed adding queue: %s", err)
}
- mgr.AddOrUpdateWorkload(&wl)
+ mgr.AddOrUpdateWorkload(ctx, &wl, fakeclock)
go func() {
if err := mgr.AddClusterQueue(ctx, clusterQueues[0]); err != nil {
t.Errorf("Failed adding clusterQueue: %v", err)
@@ -949,7 +960,7 @@ func TestHeadsAsync(t *testing.T) {
t.Errorf("Failed adding queue: %s", err)
}
go func() {
- mgr.AddOrUpdateWorkload(&wl)
+ mgr.AddOrUpdateWorkload(ctx, &wl, fakeclock)
}()
},
wantHeads: []workload.Info{
@@ -970,7 +981,7 @@ func TestHeadsAsync(t *testing.T) {
go func() {
wlCopy := wl.DeepCopy()
wlCopy.ResourceVersion = "old"
- mgr.UpdateWorkload(wlCopy, &wl)
+ mgr.UpdateWorkload(ctx, wlCopy, &wl, fakeclock)
}()
},
wantHeads: []workload.Info{
@@ -1120,6 +1131,7 @@ func (c *fakeStatusChecker) ClusterQueueActive(name string) bool {
func TestGetPendingWorkloadsInfo(t *testing.T) {
now := time.Now().Truncate(time.Second)
+ fakeclock := testingclock.NewFakeClock(now)
clusterQueues := []*kueue.ClusterQueue{
utiltesting.MakeClusterQueue("cq").Obj(),
@@ -1150,7 +1162,7 @@ func TestGetPendingWorkloadsInfo(t *testing.T) {
}
}
for _, w := range workloads {
- manager.AddOrUpdateWorkload(w)
+ manager.AddOrUpdateWorkload(ctx, w, fakeclock)
}
cases := map[string]struct {
diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go
index 7190fa8628..1006cf2bb1 100644
--- a/pkg/util/testing/wrappers.go
+++ b/pkg/util/testing/wrappers.go
@@ -270,6 +270,23 @@ func (w *WorkloadWrapper) DeletionTimestamp(t time.Time) *WorkloadWrapper {
return w
}
+func (w *WorkloadWrapper) RequeueState(count *int32, requeueAt *metav1.Time) *WorkloadWrapper {
+ if count == nil && requeueAt == nil {
+ w.Status.RequeueState = nil
+ return w
+ }
+ if w.Status.RequeueState == nil {
+ w.Status.RequeueState = &kueue.RequeueState{}
+ }
+ if count != nil {
+ w.Status.RequeueState.Count = count
+ }
+ if requeueAt != nil {
+ w.Status.RequeueState.RequeueAt = requeueAt
+ }
+ return w
+}
+
type PodSetWrapper struct{ kueue.PodSet }
func MakePodSet(name string, count int) *PodSetWrapper {
diff --git a/pkg/visibility/api/rest/pending_workloads_cq_test.go b/pkg/visibility/api/rest/pending_workloads_cq_test.go
index d039533cbd..6a474fdcf8 100644
--- a/pkg/visibility/api/rest/pending_workloads_cq_test.go
+++ b/pkg/visibility/api/rest/pending_workloads_cq_test.go
@@ -24,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
+ testingclock "k8s.io/utils/clock/testing"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
visibility "sigs.k8s.io/kueue/apis/visibility/v1alpha1"
@@ -59,6 +60,7 @@ func TestPendingWorkloadsInCQ(t *testing.T) {
}
now := time.Now()
+ fakeclock := testingclock.NewFakeClock(now)
cases := map[string]struct {
clusterQueues []*kueue.ClusterQueue
queues []*kueue.LocalQueue
@@ -336,7 +338,7 @@ func TestPendingWorkloadsInCQ(t *testing.T) {
}
}
for _, w := range tc.workloads {
- manager.AddOrUpdateWorkload(w)
+ manager.AddOrUpdateWorkload(ctx, w, fakeclock)
}
info, err := pendingWorkloadsInCqRest.Get(ctx, tc.req.queueName, tc.req.queryParams)
diff --git a/pkg/visibility/api/rest/pending_workloads_lq_test.go b/pkg/visibility/api/rest/pending_workloads_lq_test.go
index 008d09e34d..1152583070 100644
--- a/pkg/visibility/api/rest/pending_workloads_lq_test.go
+++ b/pkg/visibility/api/rest/pending_workloads_lq_test.go
@@ -25,6 +25,7 @@ import (
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/endpoints/request"
+ testingclock "k8s.io/utils/clock/testing"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
visibility "sigs.k8s.io/kueue/apis/visibility/v1alpha1"
@@ -59,6 +60,7 @@ func TestPendingWorkloadsInLQ(t *testing.T) {
}
now := time.Now()
+ fakeclock := testingclock.NewFakeClock(now)
cases := map[string]struct {
clusterQueues []*kueue.ClusterQueue
queues []*kueue.LocalQueue
@@ -452,7 +454,7 @@ func TestPendingWorkloadsInLQ(t *testing.T) {
}
}
for _, w := range tc.workloads {
- manager.AddOrUpdateWorkload(w)
+ manager.AddOrUpdateWorkload(ctx, w, fakeclock)
}
ctx = request.WithNamespace(ctx, tc.req.nsName)
diff --git a/pkg/webhooks/workload_webhook.go b/pkg/webhooks/workload_webhook.go
index a2c3f51ae8..78fbb85f59 100644
--- a/pkg/webhooks/workload_webhook.go
+++ b/pkg/webhooks/workload_webhook.go
@@ -75,6 +75,11 @@ func (w *WorkloadWebhook) Default(ctx context.Context, obj runtime.Object) error
wl.Spec.PodSets[i].MinCount = nil
}
}
+
+ // If the deactivated workload is re-activated, we need to reset the RequeueState.
+ if ptr.Deref(wl.Spec.Active, true) && workload.IsEvictedByDeactivation(wl) && workload.HasRequeueState(wl) {
+ wl.Status.RequeueState = nil
+ }
return nil
}
diff --git a/pkg/webhooks/workload_webhook_test.go b/pkg/webhooks/workload_webhook_test.go
index 258906aa74..78051be403 100644
--- a/pkg/webhooks/workload_webhook_test.go
+++ b/pkg/webhooks/workload_webhook_test.go
@@ -78,6 +78,22 @@ func TestWorkloadWebhookDefault(t *testing.T) {
},
},
},
+ "re-activated workload with re-queue state is reset the re-queue state": {
+ wl: *testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).
+ Condition(metav1.Condition{
+ Type: kueue.WorkloadEvicted,
+ Status: metav1.ConditionTrue,
+ Reason: kueue.WorkloadEvictedByDeactivation,
+ }).RequeueState(ptr.To[int32](5), ptr.To(metav1.Now())).
+ Obj(),
+ wantWl: *testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).
+ Condition(metav1.Condition{
+ Type: kueue.WorkloadEvicted,
+ Status: metav1.ConditionTrue,
+ Reason: kueue.WorkloadEvictedByDeactivation,
+ }).
+ Obj(),
+ },
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
@@ -86,7 +102,8 @@ func TestWorkloadWebhookDefault(t *testing.T) {
if err := wh.Default(context.Background(), wlCopy); err != nil {
t.Fatalf("Could not apply defaults: %v", err)
}
- if diff := cmp.Diff(tc.wantWl, *wlCopy); diff != "" {
+ if diff := cmp.Diff(tc.wantWl, *wlCopy,
+ cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime")); diff != "" {
t.Errorf("Obtained wrong defaults (-want,+got):\n%s", diff)
}
})
diff --git a/pkg/workload/workload.go b/pkg/workload/workload.go
index 47fbe116d4..c66a18f44d 100644
--- a/pkg/workload/workload.go
+++ b/pkg/workload/workload.go
@@ -375,6 +375,9 @@ func admissionPatch(w *kueue.Workload) *kueue.Workload {
wlCopy := BaseSSAWorkload(w)
wlCopy.Status.Admission = w.Status.Admission.DeepCopy()
+ if HasRequeueState(w) {
+ wlCopy.Status.RequeueState = w.Status.RequeueState.DeepCopy()
+ }
for _, conditionName := range admissionManagedConditions {
if existing := apimeta.FindStatusCondition(w.Status.Conditions, conditionName); existing != nil {
wlCopy.Status.Conditions = append(wlCopy.Status.Conditions, *existing.DeepCopy())
@@ -443,6 +446,11 @@ func ReclaimablePodsAreEqual(a, b []kueue.ReclaimablePod) bool {
return true
}
+// HasRequeueState returns true if the workload has re-queue state.
+func HasRequeueState(w *kueue.Workload) bool {
+ return w.Status.RequeueState != nil
+}
+
// IsAdmitted returns true if the workload is admitted.
func IsAdmitted(w *kueue.Workload) bool {
return apimeta.IsStatusConditionTrue(w.Status.Conditions, kueue.WorkloadAdmitted)
@@ -453,6 +461,12 @@ func IsFinished(w *kueue.Workload) bool {
return apimeta.IsStatusConditionTrue(w.Status.Conditions, kueue.WorkloadFinished)
}
+// IsEvictedByDeactivation returns true if the workload is evicted by deactivation.
+func IsEvictedByDeactivation(w *kueue.Workload) bool {
+ cond := apimeta.FindStatusCondition(w.Status.Conditions, kueue.WorkloadEvicted)
+ return cond != nil && cond.Status == metav1.ConditionTrue && cond.Reason == kueue.WorkloadEvictedByDeactivation
+}
+
func RemoveFinalizer(ctx context.Context, c client.Client, wl *kueue.Workload) error {
if controllerutil.RemoveFinalizer(wl, kueue.ResourceInUseFinalizerName) {
return c.Update(ctx, wl)
diff --git a/pkg/workload/workload_test.go b/pkg/workload/workload_test.go
index 4f682f0285..f53739c0a5 100644
--- a/pkg/workload/workload_test.go
+++ b/pkg/workload/workload_test.go
@@ -456,3 +456,73 @@ func TestAssignmentClusterQueueState(t *testing.T) {
})
}
}
+
+func TestHasRequeueState(t *testing.T) {
+ cases := map[string]struct {
+ workload *kueue.Workload
+ want bool
+ }{
+ "workload has requeue state": {
+ workload: utiltesting.MakeWorkload("test", "test").RequeueState(ptr.To[int32](5), ptr.To(metav1.Now())).Obj(),
+ want: true,
+ },
+ "workload doesn't have requeue state": {
+ workload: utiltesting.MakeWorkload("test", "test").RequeueState(nil, nil).Obj(),
+ },
+ }
+ for name, tc := range cases {
+ t.Run(name, func(t *testing.T) {
+ got := HasRequeueState(tc.workload)
+ if tc.want != got {
+ t.Errorf("Unexpected result from HasRequeuState\nwant:%v\ngot:%v\n", tc.want, got)
+ }
+ })
+ }
+}
+
+func TestIsEvictedByDeactivation(t *testing.T) {
+ cases := map[string]struct {
+ workload *kueue.Workload
+ want bool
+ }{
+ "evicted condition doesn't exist": {
+ workload: utiltesting.MakeWorkload("test", "test").Obj(),
+ },
+ "evicted condition with false status": {
+ workload: utiltesting.MakeWorkload("test", "test").
+ Condition(metav1.Condition{
+ Type: kueue.WorkloadEvicted,
+ Reason: kueue.WorkloadEvictedByDeactivation,
+ Status: metav1.ConditionFalse,
+ }).
+ Obj(),
+ },
+ "evicted condition with PodsReadyTimeout reason": {
+ workload: utiltesting.MakeWorkload("test", "test").
+ Condition(metav1.Condition{
+ Type: kueue.WorkloadEvicted,
+ Reason: kueue.WorkloadEvictedByPodsReadyTimeout,
+ Status: metav1.ConditionTrue,
+ }).
+ Obj(),
+ },
+ "evicted condition with InactiveWorkload reason": {
+ workload: utiltesting.MakeWorkload("test", "test").
+ Condition(metav1.Condition{
+ Type: kueue.WorkloadEvicted,
+ Reason: kueue.WorkloadEvictedByDeactivation,
+ Status: metav1.ConditionTrue,
+ }).
+ Obj(),
+ want: true,
+ },
+ }
+ for name, tc := range cases {
+ t.Run(name, func(t *testing.T) {
+ got := IsEvictedByDeactivation(tc.workload)
+ if tc.want != got {
+ t.Errorf("Unexpected result from IsEvictedByDeactivation\nwant:%v\ngot:%v\n", tc.want, got)
+ }
+ })
+ }
+}
diff --git a/site/content/en/docs/reference/kueue-config.v1beta1.md b/site/content/en/docs/reference/kueue-config.v1beta1.md
index 29a8e3d6cf..a77eb28f91 100644
--- a/site/content/en/docs/reference/kueue-config.v1beta1.md
+++ b/site/content/en/docs/reference/kueue-config.v1beta1.md
@@ -599,8 +599,11 @@ Defaults to 5.
Timestamp defines the timestamp used for requeuing a Workload
-that was evicted due to Pod readiness.
-Defaults to Eviction.
+that was evicted due to Pod readiness. The possible values are:
+
+Eviction (default): indicates from Workload .metadata.creationTimestamp.
+Creation : indicates from Workload .status.conditions.
+
|
backoffLimitCount
@@ -608,7 +611,10 @@ that was evicted due to Pod readiness.
|
BackoffLimitCount defines the maximum number of requeuing retries.
-When the number is reached, the workload is deactivated (.spec.activate =false ).
+When the number is reached, the workload is deactivated (.spec.activate =false ).
+The duration until the workload is deactivated is calculated by "Rand+SUMk=0,n"
+where the "n" means the "workloadStatus.requeueState.count-1", and the "Rand" means the jitter.
+Given that the "backoffLimitCount" equals "30", the result approximately equals 24 hours.
Defaults to null.
|
diff --git a/test/integration/controller/jobs/job/job_controller_test.go b/test/integration/controller/jobs/job/job_controller_test.go
index a5dafef43f..d270fcd365 100644
--- a/test/integration/controller/jobs/job/job_controller_test.go
+++ b/test/integration/controller/jobs/job/job_controller_test.go
@@ -1717,7 +1717,7 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde
})
})
- ginkgo.When("Suspend a running Job without requeueing through Workload's spec.active field", func() {
+ ginkgo.When("Suspend a running Job without requeuing through Workload's spec.active field", func() {
ginkgo.It("Should not readmit a job to the queue after Active is changed to false", func() {
ginkgo.By("creating localQueue")
localQueue := testing.MakeLocalQueue("queue", ns.Name).ClusterQueue(prodClusterQ.Name).Obj()