From b2022b071cc0b2639a1d34e3c46cfc27373ca619 Mon Sep 17 00:00:00 2001 From: Yuki Iwai Date: Thu, 8 Feb 2024 22:21:19 +0900 Subject: [PATCH] WIP Signed-off-by: Yuki Iwai --- apis/config/v1beta1/configuration_types.go | 9 +- apis/kueue/v1beta1/workload_types.go | 2 +- cmd/kueue/main.go | 6 +- pkg/config/config.go | 4 + pkg/config/config_test.go | 38 +++++++- pkg/config/validation.go | 17 ++++ pkg/config/validation_test.go | 29 ++++++ .../core/clusterqueue_controller_test.go | 4 +- pkg/controller/core/core.go | 23 +++-- pkg/controller/core/workload_controller.go | 97 ++++++++++++++----- pkg/queue/cluster_queue_impl.go | 8 +- pkg/queue/cluster_queue_impl_test.go | 8 +- pkg/queue/manager.go | 70 ++++++++++--- pkg/queue/manager_test.go | 34 ++++--- pkg/util/testing/wrappers.go | 17 ++++ .../api/rest/pending_workloads_cq_test.go | 4 +- .../api/rest/pending_workloads_lq_test.go | 4 +- pkg/webhooks/workload_webhook.go | 5 + pkg/webhooks/workload_webhook_test.go | 19 +++- pkg/workload/workload.go | 14 +++ pkg/workload/workload_test.go | 70 +++++++++++++ .../en/docs/reference/kueue-config.v1beta1.md | 12 ++- .../jobs/job/job_controller_test.go | 2 +- 23 files changed, 414 insertions(+), 82 deletions(-) diff --git a/apis/config/v1beta1/configuration_types.go b/apis/config/v1beta1/configuration_types.go index fea2f98bba..7cdf5048b3 100644 --- a/apis/config/v1beta1/configuration_types.go +++ b/apis/config/v1beta1/configuration_types.go @@ -219,14 +219,19 @@ type MultiKueue struct { type RequeuingStrategy struct { // Timestamp defines the timestamp used for requeuing a Workload - // that was evicted due to Pod readiness. + // that was evicted due to Pod readiness. The possible values are: + // + // - `Eviction` (default): indicates from Workload .metadata.creationTimestamp. + // - `Creation`: indicates from Workload .status.conditions. // - // Defaults to Eviction. // +optional Timestamp *RequeuingTimestamp `json:"timestamp,omitempty"` // BackoffLimitCount defines the maximum number of requeuing retries. // When the number is reached, the workload is deactivated (`.spec.activate`=`false`). + // The duration until the workload is deactivated is calculated by "Rand+SUM[k=0,n](1.41891^n)" + // where the "n" means the "workloadStatus.requeueState.count-1", and the "Rand" means the jitter. + // Given that the "backoffLimitCount" equals "30", the result approximately equals 24 hours. // // Defaults to null. // +optional diff --git a/apis/kueue/v1beta1/workload_types.go b/apis/kueue/v1beta1/workload_types.go index 535a651aa5..0e04cda1c0 100644 --- a/apis/kueue/v1beta1/workload_types.go +++ b/apis/kueue/v1beta1/workload_types.go @@ -151,7 +151,7 @@ type WorkloadStatus struct { // changed once set. Admission *Admission `json:"admission,omitempty"` - // requeueState holds the state of the requeued Workload according to the requeueing strategy. + // requeueState holds the state of the requeued Workload according to the requeuing strategy. // // +optional RequeueState *RequeueState `json:"requeueState,omitempty"` diff --git a/cmd/kueue/main.go b/cmd/kueue/main.go index 9e88de816a..78d22f85ce 100644 --- a/cmd/kueue/main.go +++ b/cmd/kueue/main.go @@ -336,11 +336,7 @@ func setupServerVersionFetcher(mgr ctrl.Manager, kubeConfig *rest.Config) *kubev } func blockForPodsReady(cfg *configapi.Configuration) bool { - return waitForPodsReady(cfg) && cfg.WaitForPodsReady.BlockAdmission != nil && *cfg.WaitForPodsReady.BlockAdmission -} - -func waitForPodsReady(cfg *configapi.Configuration) bool { - return cfg.WaitForPodsReady != nil && cfg.WaitForPodsReady.Enable + return config.WaitForPodsReadyIsEnabled(cfg) && cfg.WaitForPodsReady.BlockAdmission != nil && *cfg.WaitForPodsReady.BlockAdmission } func podsReadyRequeuingTimestamp(cfg *configapi.Configuration) configapi.RequeuingTimestamp { diff --git a/pkg/config/config.go b/pkg/config/config.go index 53f6294d04..c618ae5e64 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -168,3 +168,7 @@ func Load(scheme *runtime.Scheme, configFile string) (ctrl.Options, configapi.Co addTo(&options, &cfg) return options, cfg, err } + +func WaitForPodsReadyIsEnabled(cfg *configapi.Configuration) bool { + return cfg.WaitForPodsReady != nil && cfg.WaitForPodsReady.Enable +} diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 6e7a16de1c..109b1c8c96 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -156,6 +156,9 @@ apiVersion: config.kueue.x-k8s.io/v1beta1 kind: Configuration waitForPodsReady: enable: true + requeuingStrategy: + timestamp: Creation + backoffLimitCount: 10 `), os.FileMode(0600)); err != nil { t.Fatal(err) } @@ -543,7 +546,8 @@ multiKueue: BlockAdmission: ptr.To(true), Timeout: &metav1.Duration{Duration: 5 * time.Minute}, RequeuingStrategy: &configapi.RequeuingStrategy{ - Timestamp: ptr.To(configapi.EvictionTimestamp), + Timestamp: ptr.To(configapi.CreationTimestamp), + BackoffLimitCount: ptr.To[int32](10), }, }, ClientConnection: defaultClientConnection, @@ -938,3 +942,35 @@ func TestEncode(t *testing.T) { }) } } + +func TestWaitForPodsReadyIsEnabled(t *testing.T) { + cases := map[string]struct { + cfg *configapi.Configuration + want bool + }{ + "cfg.waitForPodsReady is null": { + cfg: &configapi.Configuration{}, + }, + "cfg.WaitForPodsReadyIsEnabled.enable is false": { + cfg: &configapi.Configuration{ + WaitForPodsReady: &configapi.WaitForPodsReady{}, + }, + }, + "waitForPodsReady is true": { + cfg: &configapi.Configuration{ + WaitForPodsReady: &configapi.WaitForPodsReady{ + Enable: true, + }, + }, + want: true, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + got := WaitForPodsReadyIsEnabled(tc.cfg) + if tc.want != got { + t.Errorf("Unexpected result from WaitForPodsReadyIsEnabled\nwant:\n%v\ngot:%v\n", tc.want, got) + } + }) + } +} diff --git a/pkg/config/validation.go b/pkg/config/validation.go index 0c85c40d16..b0da023f1a 100644 --- a/pkg/config/validation.go +++ b/pkg/config/validation.go @@ -39,11 +39,15 @@ var ( integrationsFrameworksPath = integrationsPath.Child("frameworks") podOptionsPath = integrationsPath.Child("podOptions") namespaceSelectorPath = podOptionsPath.Child("namespaceSelector") + waitForPodsReadyPath = field.NewPath("waitForPodsReady") + requeuingStrategyPath = waitForPodsReadyPath.Child("requeuingStrategy") ) func validate(c *configapi.Configuration) field.ErrorList { var allErrs field.ErrorList + allErrs = append(allErrs, validateWaitForPodsReady(c)...) + allErrs = append(allErrs, validateQueueVisibility(c)...) // Validate PodNamespaceSelector for the pod framework @@ -52,6 +56,19 @@ func validate(c *configapi.Configuration) field.ErrorList { return allErrs } +func validateWaitForPodsReady(c *configapi.Configuration) field.ErrorList { + var allErrs field.ErrorList + if !WaitForPodsReadyIsEnabled(c) { + return allErrs + } + if strategy := c.WaitForPodsReady.RequeuingStrategy; strategy != nil && strategy.Timestamp != nil && + *strategy.Timestamp != configapi.CreationTimestamp && *strategy.Timestamp != configapi.EvictionTimestamp { + allErrs = append(allErrs, field.NotSupported(requeuingStrategyPath.Child("timestamp"), + strategy.Timestamp, []configapi.RequeuingTimestamp{configapi.CreationTimestamp, configapi.EvictionTimestamp})) + } + return allErrs +} + func validateQueueVisibility(cfg *configapi.Configuration) field.ErrorList { var allErrs field.ErrorList if cfg.QueueVisibility != nil { diff --git a/pkg/config/validation_test.go b/pkg/config/validation_test.go index a7abd25a4c..68f308dba4 100644 --- a/pkg/config/validation_test.go +++ b/pkg/config/validation_test.go @@ -213,6 +213,35 @@ func TestValidate(t *testing.T) { }, wantErr: nil, }, + "no supported waitForPodsReady.requeuingStrategy.timestamp": { + cfg: &configapi.Configuration{ + Integrations: defaultIntegrations, + WaitForPodsReady: &configapi.WaitForPodsReady{ + Enable: true, + RequeuingStrategy: &configapi.RequeuingStrategy{ + Timestamp: ptr.To[configapi.RequeuingTimestamp]("NoSupported"), + }, + }, + }, + wantErr: field.ErrorList{ + &field.Error{ + Type: field.ErrorTypeNotSupported, + Field: "waitForPodsReady.requeuingStrategy.timestamp", + }, + }, + }, + "supported waitForPodsReady.requeuingStrategy.timestamp": { + cfg: &configapi.Configuration{ + Integrations: defaultIntegrations, + WaitForPodsReady: &configapi.WaitForPodsReady{ + Enable: true, + RequeuingStrategy: &configapi.RequeuingStrategy{ + Timestamp: ptr.To(configapi.CreationTimestamp), + }, + }, + }, + wantErr: nil, + }, } for name, tc := range testCases { diff --git a/pkg/controller/core/clusterqueue_controller_test.go b/pkg/controller/core/clusterqueue_controller_test.go index fa5f1c698e..827d9b50cc 100644 --- a/pkg/controller/core/clusterqueue_controller_test.go +++ b/pkg/controller/core/clusterqueue_controller_test.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" + testingclock "k8s.io/utils/clock/testing" "k8s.io/utils/ptr" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" @@ -47,6 +48,7 @@ func TestUpdateCqStatusIfChanged(t *testing.T) { *utiltesting.MakeWorkload("beta", "").Queue(lqName).Obj(), }, } + fakeClock := testingclock.NewFakeClock(time.Now()) testCases := map[string]struct { cqStatus kueue.ClusterQueueStatus @@ -198,7 +200,7 @@ func TestUpdateCqStatusIfChanged(t *testing.T) { qManager: qManager, } if tc.newWl != nil { - r.qManager.AddOrUpdateWorkload(tc.newWl) + r.qManager.AddOrUpdateWorkload(ctx, tc.newWl, fakeClock) } err := r.updateCqStatusIfChanged(ctx, cq, tc.newConditionStatus, tc.newReason, tc.newMessage) if err != nil { diff --git a/pkg/controller/core/core.go b/pkg/controller/core/core.go index 099623b208..b29eb39aa7 100644 --- a/pkg/controller/core/core.go +++ b/pkg/controller/core/core.go @@ -21,8 +21,9 @@ import ( ctrl "sigs.k8s.io/controller-runtime" - config "sigs.k8s.io/kueue/apis/config/v1beta1" + configapi "sigs.k8s.io/kueue/apis/config/v1beta1" "sigs.k8s.io/kueue/pkg/cache" + "sigs.k8s.io/kueue/pkg/config" "sigs.k8s.io/kueue/pkg/constants" "sigs.k8s.io/kueue/pkg/queue" ) @@ -31,7 +32,7 @@ const updateChBuffer = 10 // SetupControllers sets up the core controllers. It returns the name of the // controller that failed to create and an error, if any. -func SetupControllers(mgr ctrl.Manager, qManager *queue.Manager, cc *cache.Cache, cfg *config.Configuration) (string, error) { +func SetupControllers(mgr ctrl.Manager, qManager *queue.Manager, cc *cache.Cache, cfg *configapi.Configuration) (string, error) { rfRec := NewResourceFlavorReconciler(mgr.GetClient(), qManager, cc) if err := rfRec.SetupWithManager(mgr, cfg); err != nil { return "ResourceFlavor", err @@ -65,27 +66,35 @@ func SetupControllers(mgr ctrl.Manager, qManager *queue.Manager, cc *cache.Cache if err := NewWorkloadReconciler(mgr.GetClient(), qManager, cc, mgr.GetEventRecorderFor(constants.WorkloadControllerName), WithWorkloadUpdateWatchers(qRec, cqRec), - WithPodsReadyTimeout(podsReadyTimeout(cfg))).SetupWithManager(mgr, cfg); err != nil { + WithPodsReadyTimeout(podsReadyTimeout(cfg)), + WithRequeuingBackoffLimitCount(requeuingBackoffLimitCount(cfg))).SetupWithManager(mgr, cfg); err != nil { return "Workload", err } return "", nil } -func podsReadyTimeout(cfg *config.Configuration) *time.Duration { - if cfg.WaitForPodsReady != nil && cfg.WaitForPodsReady.Enable && cfg.WaitForPodsReady.Timeout != nil { +func podsReadyTimeout(cfg *configapi.Configuration) *time.Duration { + if config.WaitForPodsReadyIsEnabled(cfg) && cfg.WaitForPodsReady.Timeout != nil { return &cfg.WaitForPodsReady.Timeout.Duration } return nil } -func queueVisibilityUpdateInterval(cfg *config.Configuration) time.Duration { +func requeuingBackoffLimitCount(cfg *configapi.Configuration) *int32 { + if config.WaitForPodsReadyIsEnabled(cfg) && cfg.WaitForPodsReady.RequeuingStrategy != nil { + return cfg.WaitForPodsReady.RequeuingStrategy.BackoffLimitCount + } + return nil +} + +func queueVisibilityUpdateInterval(cfg *configapi.Configuration) time.Duration { if cfg.QueueVisibility != nil { return time.Duration(cfg.QueueVisibility.UpdateIntervalSeconds) * time.Second } return 0 } -func queueVisibilityClusterQueuesMaxCount(cfg *config.Configuration) int32 { +func queueVisibilityClusterQueuesMaxCount(cfg *configapi.Configuration) int32 { if cfg.QueueVisibility != nil && cfg.QueueVisibility.ClusterQueues != nil { return cfg.QueueVisibility.ClusterQueues.MaxCount } diff --git a/pkg/controller/core/workload_controller.go b/pkg/controller/core/workload_controller.go index 7e9eefaac2..f861fea351 100644 --- a/pkg/controller/core/workload_controller.go +++ b/pkg/controller/core/workload_controller.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" @@ -63,8 +64,9 @@ var ( ) type options struct { - watchers []WorkloadUpdateWatcher - podsReadyTimeout *time.Duration + watchers []WorkloadUpdateWatcher + podsReadyTimeout *time.Duration + requeuingBackoffLimitCount *int32 } // Option configures the reconciler. @@ -78,6 +80,14 @@ func WithPodsReadyTimeout(value *time.Duration) Option { } } +// WithRequeuingBackoffLimitCount indicates if the controller should deactivate a workload +// if it reaches the limitation. +func WithRequeuingBackoffLimitCount(value *int32) Option { + return func(o *options) { + o.requeuingBackoffLimitCount = value + } +} + // WithWorkloadUpdateWatchers allows to specify the workload update watchers func WithWorkloadUpdateWatchers(value ...WorkloadUpdateWatcher) Option { return func(o *options) { @@ -93,13 +103,14 @@ type WorkloadUpdateWatcher interface { // WorkloadReconciler reconciles a Workload object type WorkloadReconciler struct { - log logr.Logger - queues *queue.Manager - cache *cache.Cache - client client.Client - watchers []WorkloadUpdateWatcher - podsReadyTimeout *time.Duration - recorder record.EventRecorder + log logr.Logger + queues *queue.Manager + cache *cache.Cache + client client.Client + watchers []WorkloadUpdateWatcher + podsReadyTimeout *time.Duration + requeuingBackoffLimitCount *int32 + recorder record.EventRecorder } func NewWorkloadReconciler(client client.Client, queues *queue.Manager, cache *cache.Cache, recorder record.EventRecorder, opts ...Option) *WorkloadReconciler { @@ -109,13 +120,14 @@ func NewWorkloadReconciler(client client.Client, queues *queue.Manager, cache *c } return &WorkloadReconciler{ - log: ctrl.Log.WithName("workload-reconciler"), - client: client, - queues: queues, - cache: cache, - watchers: options.watchers, - podsReadyTimeout: options.podsReadyTimeout, - recorder: recorder, + log: ctrl.Log.WithName("workload-reconciler"), + client: client, + queues: queues, + cache: cache, + watchers: options.watchers, + podsReadyTimeout: options.podsReadyTimeout, + requeuingBackoffLimitCount: options.requeuingBackoffLimitCount, + recorder: recorder, } } @@ -329,12 +341,47 @@ func (r *WorkloadReconciler) reconcileNotReadyTimeout(ctx context.Context, req c if recheckAfter > 0 { log.V(4).Info("Workload not yet ready and did not exceed its timeout", "recheckAfter", recheckAfter) return ctrl.Result{RequeueAfter: recheckAfter}, nil - } else { - log.V(2).Info("Start the eviction of the workload due to exceeding the PodsReady timeout") - workload.SetEvictedCondition(wl, kueue.WorkloadEvictedByPodsReadyTimeout, fmt.Sprintf("Exceeded the PodsReady timeout %s", req.NamespacedName.String())) - err := workload.ApplyAdmissionStatus(ctx, r.client, wl, false) - return ctrl.Result{}, client.IgnoreNotFound(err) } + log.V(2).Info("Start the eviction of the workload due to exceeding the PodsReady timeout") + if deactivated, err := r.backoffRequeuing(ctx, wl, realClock); err != nil || deactivated { + return ctrl.Result{}, err + } + workload.SetEvictedCondition(wl, kueue.WorkloadEvictedByPodsReadyTimeout, fmt.Sprintf("Exceeded the PodsReady timeout %s", req.NamespacedName.String())) + err := workload.ApplyAdmissionStatus(ctx, r.client, wl, true) + return ctrl.Result{}, client.IgnoreNotFound(err) +} + +func (r *WorkloadReconciler) backoffRequeuing(ctx context.Context, wl *kueue.Workload, clock clock.Clock) (bool, error) { + if r.queues.RequeuingWorkloads().Has(workload.Key(wl)) || apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadEvicted) { + return false, nil + } + if !workload.HasRequeueState(wl) { + wl.Status.RequeueState = &kueue.RequeueState{} + } + // If requeuingBackoffLimitCount equals to null, the workloads is repeatedly and endless re-queued. + requeuingCount := ptr.Deref(wl.Status.RequeueState.Count, 0) + 1 + if r.requeuingBackoffLimitCount != nil && requeuingCount > *r.requeuingBackoffLimitCount { + wl.Spec.Active = ptr.To(false) + r.recorder.Eventf(wl, corev1.EventTypeNormal, "WorkloadDeactivated", + "Deactivated Workload %q by reached re-queue backoffLimitCount", klog.KObj(wl)) + return true, r.client.Update(ctx, wl) + } + // The duration until the workload is deactivated is calculated by "Rand+SUM[k=0,n](1.41891^n)" + // where the "n" means the "requeuingCount-1", and the "Rand" means the jitter. + // Given that the "backoffLimitCount" equals "30", the result approximately equals 24 hours. + backoff := &wait.Backoff{ + Duration: 1 * time.Second, + Factor: 1.41891, + Jitter: 0.0001, + Steps: int(requeuingCount), + } + var waitTime time.Duration + for i := int32(0); i < requeuingCount; i++ { + waitTime = backoff.Step() + } + wl.Status.RequeueState.RequeueAt = ptr.To(metav1.NewTime(clock.Now().Add(waitTime))) + wl.Status.RequeueState.Count = &requeuingCount + return false, nil } func (r *WorkloadReconciler) Create(e event.CreateEvent) bool { @@ -357,7 +404,7 @@ func (r *WorkloadReconciler) Create(e event.CreateEvent) bool { workload.AdjustResources(ctx, r.client, wlCopy) if !workload.HasQuotaReservation(wl) { - if !r.queues.AddOrUpdateWorkload(wlCopy) { + if !r.queues.AddOrUpdateWorkload(ctx, wlCopy, realClock) { log.V(2).Info("Queue for workload didn't exist; ignored for now") } return true @@ -461,7 +508,7 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool { }) case prevStatus == pending && status == pending: - if !r.queues.UpdateWorkload(oldWl, wlCopy) { + if !r.queues.UpdateWorkload(ctx, oldWl, wlCopy, realClock) { log.V(2).Info("Queue for updated workload didn't exist; ignoring for now") } @@ -480,7 +527,7 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool { log.Error(err, "Failed to delete workload from cache") } }) - if !r.queues.AddOrUpdateWorkload(wlCopy) { + if !r.queues.AddOrUpdateWorkload(ctx, wlCopy, realClock) { log.V(2).Info("Queue for workload didn't exist; ignored for now") } @@ -620,7 +667,7 @@ func (h *resourceUpdatesHandler) queueReconcileForPending(ctx context.Context, _ log := log.WithValues("workload", klog.KObj(wlCopy)) log.V(5).Info("Queue reconcile for") workload.AdjustResources(ctrl.LoggerInto(ctx, log), h.r.client, wlCopy) - if !h.r.queues.AddOrUpdateWorkload(wlCopy) { + if !h.r.queues.AddOrUpdateWorkload(ctx, wlCopy, realClock) { log.V(2).Info("Queue for workload didn't exist") } } diff --git a/pkg/queue/cluster_queue_impl.go b/pkg/queue/cluster_queue_impl.go index 1d898da5a6..3652719b18 100644 --- a/pkg/queue/cluster_queue_impl.go +++ b/pkg/queue/cluster_queue_impl.go @@ -46,7 +46,7 @@ type clusterQueueBase struct { inadmissibleWorkloads map[string]*workload.Info // popCycle identifies the last call to Pop. It's incremented when calling Pop. - // popCycle and queueInadmissibleCycle are used to track when there is a requeueing + // popCycle and queueInadmissibleCycle are used to track when there is a requeuing // of inadmissible workloads while a workload is being scheduled. popCycle int64 @@ -116,6 +116,10 @@ func (c *clusterQueueBase) PushOrUpdate(wInfo *workload.Info) { // otherwise move or update in place in the queue. delete(c.inadmissibleWorkloads, key) } + if workload.HasRequeueState(wInfo.Obj) { + c.inadmissibleWorkloads[key] = wInfo + return + } c.heap.PushOrUpdate(wInfo) } @@ -148,7 +152,7 @@ func (c *clusterQueueBase) requeueIfNotPresent(wInfo *workload.Info, immediate b c.rwm.Lock() defer c.rwm.Unlock() key := workload.Key(wInfo.Obj) - if immediate || c.queueInadmissibleCycle >= c.popCycle || wInfo.LastAssignment.PendingFlavors() { + if immediate || c.queueInadmissibleCycle >= c.popCycle || wInfo.LastAssignment.PendingFlavors() || workload.HasRequeueState(wInfo.Obj) { // If the workload was inadmissible, move it back into the queue. inadmissibleWl := c.inadmissibleWorkloads[key] if inadmissibleWl != nil { diff --git a/pkg/queue/cluster_queue_impl_test.go b/pkg/queue/cluster_queue_impl_test.go index c1061e414c..85f01f4e64 100644 --- a/pkg/queue/cluster_queue_impl_test.go +++ b/pkg/queue/cluster_queue_impl_test.go @@ -305,7 +305,7 @@ func TestClusterQueueImpl(t *testing.T) { if test.queueInadmissibleWorkloads { if diff := cmp.Diff(test.wantInadmissibleWorkloadsRequeued, cq.QueueInadmissibleWorkloads(context.Background(), cl)); diff != "" { - t.Errorf("Unexpected requeueing of inadmissible workloads (-want,+got):\n%s", diff) + t.Errorf("Unexpected requeuing of inadmissible workloads (-want,+got):\n%s", diff) } } @@ -340,7 +340,7 @@ func TestQueueInadmissibleWorkloadsDuringScheduling(t *testing.T) { t.Errorf("Unexpected active workloads before events (-want,+got):\n%s", diff) } - // Simulate requeueing during scheduling attempt. + // Simulate requeuing during scheduling attempt. head := cq.Pop() cq.QueueInadmissibleWorkloads(ctx, cl) cq.requeueIfNotPresent(head, false) @@ -348,10 +348,10 @@ func TestQueueInadmissibleWorkloadsDuringScheduling(t *testing.T) { activeWorkloads, _ = cq.Dump() wantActiveWorkloads = []string{workload.Key(wl)} if diff := cmp.Diff(wantActiveWorkloads, activeWorkloads, cmpDump...); diff != "" { - t.Errorf("Unexpected active workloads after scheduling with requeueing (-want,+got):\n%s", diff) + t.Errorf("Unexpected active workloads after scheduling with requeuing (-want,+got):\n%s", diff) } - // Simulating scheduling again without requeueing. + // Simulating scheduling again without requeuing. head = cq.Pop() cq.requeueIfNotPresent(head, false) activeWorkloads, _ = cq.Dump() diff --git a/pkg/queue/manager.go b/pkg/queue/manager.go index f7f0b33db7..798029e3a3 100644 --- a/pkg/queue/manager.go +++ b/pkg/queue/manager.go @@ -25,6 +25,8 @@ import ( "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2" + "k8s.io/utils/clock" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -64,10 +66,11 @@ type Manager struct { sync.RWMutex cond sync.Cond - client client.Client - statusChecker StatusChecker - clusterQueues map[string]ClusterQueue - localQueues map[string]*LocalQueue + client client.Client + statusChecker StatusChecker + clusterQueues map[string]ClusterQueue + localQueues map[string]*LocalQueue + requeuingWorkloads sets.Set[string] snapshotsMutex sync.RWMutex snapshots map[string][]kueue.ClusterQueuePendingWorkload @@ -84,13 +87,14 @@ func NewManager(client client.Client, checker StatusChecker, opts ...Option) *Ma opt(&options) } m := &Manager{ - client: client, - statusChecker: checker, - localQueues: make(map[string]*LocalQueue), - clusterQueues: make(map[string]ClusterQueue), - cohorts: make(map[string]sets.Set[string]), - snapshotsMutex: sync.RWMutex{}, - snapshots: make(map[string][]kueue.ClusterQueuePendingWorkload, 0), + client: client, + statusChecker: checker, + requeuingWorkloads: sets.New[string](), + localQueues: make(map[string]*LocalQueue), + clusterQueues: make(map[string]ClusterQueue), + cohorts: make(map[string]sets.Set[string]), + snapshotsMutex: sync.RWMutex{}, + snapshots: make(map[string][]kueue.ClusterQueuePendingWorkload, 0), workloadOrdering: workload.Ordering{ PodsReadyRequeuingTimestamp: options.podsReadyRequeuingTimestamp, }, @@ -291,13 +295,13 @@ func (m *Manager) ClusterQueueForWorkload(wl *kueue.Workload) (string, bool) { // AddOrUpdateWorkload adds or updates workload to the corresponding queue. // Returns whether the queue existed. -func (m *Manager) AddOrUpdateWorkload(w *kueue.Workload) bool { +func (m *Manager) AddOrUpdateWorkload(ctx context.Context, w *kueue.Workload, clock clock.Clock) bool { m.Lock() defer m.Unlock() - return m.addOrUpdateWorkload(w) + return m.addOrUpdateWorkload(ctx, w, clock) } -func (m *Manager) addOrUpdateWorkload(w *kueue.Workload) bool { +func (m *Manager) addOrUpdateWorkload(ctx context.Context, w *kueue.Workload, clock clock.Clock) bool { qKey := workload.QueueKey(w) q := m.localQueues[qKey] if q == nil { @@ -311,10 +315,44 @@ func (m *Manager) addOrUpdateWorkload(w *kueue.Workload) bool { } cq.PushOrUpdate(wInfo) m.reportPendingWorkloads(q.ClusterQueue, cq) + wKey := workload.Key(wInfo.Obj) + if workload.HasRequeueState(wInfo.Obj) && !m.requeuingWorkloads.Has(wKey) { + m.requeuingWorkloads.Insert(wKey) + go func() { + m.backoffRequeue(ctx, wInfo, clock) + }() + } m.Broadcast() return true } +func (m *Manager) backoffRequeue(ctx context.Context, wInfo *workload.Info, clock clock.Clock) { + defer func() { + m.Lock() + m.requeuingWorkloads.Delete(workload.Key(wInfo.Obj)) + m.Unlock() + }() + log := ctrl.LoggerFrom(ctx) + select { + case <-ctx.Done(): + log.V(5).Info("Context cancelled when waiting for reached to requeueAt") + + case <-clock.After(clock.Since(wInfo.Obj.Status.RequeueState.RequeueAt.Time)): + added := m.RequeueWorkload(ctx, wInfo, RequeueReasonGeneric) + log.V(2).Info("Workload re-queued", "workload", klog.KObj(wInfo.Obj), + "clusterQueue", klog.KRef("", wInfo.ClusterQueue), + "queue", klog.KRef(wInfo.Obj.Namespace, wInfo.Obj.Spec.QueueName), + "requeueReason", RequeueReasonGeneric, + "added", added) + } +} + +func (m *Manager) RequeuingWorkloads() sets.Set[string] { + m.RLock() + defer m.RUnlock() + return m.requeuingWorkloads +} + // RequeueWorkload requeues the workload ensuring that the queue and the // workload still exist in the client cache and not admitted. It won't // requeue if the workload is already in the queue (possible if the workload was updated). @@ -448,13 +486,13 @@ func (m *Manager) queueAllInadmissibleWorkloadsInCohort(ctx context.Context, cq // UpdateWorkload updates the workload to the corresponding queue or adds it if // it didn't exist. Returns whether the queue existed. -func (m *Manager) UpdateWorkload(oldW, w *kueue.Workload) bool { +func (m *Manager) UpdateWorkload(ctx context.Context, oldW, w *kueue.Workload, clock clock.Clock) bool { m.Lock() defer m.Unlock() if oldW.Spec.QueueName != w.Spec.QueueName { m.deleteWorkloadFromQueueAndClusterQueue(w, workload.QueueKey(oldW)) } - return m.addOrUpdateWorkload(w) + return m.addOrUpdateWorkload(ctx, w, clock) } // CleanUpOnContext tracks the context. When closed, it wakes routines waiting diff --git a/pkg/queue/manager_test.go b/pkg/queue/manager_test.go index 1f2df1f74a..9bd0e0ffc3 100644 --- a/pkg/queue/manager_test.go +++ b/pkg/queue/manager_test.go @@ -30,6 +30,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" + testingclock "k8s.io/utils/clock/testing" "sigs.k8s.io/controller-runtime/pkg/client" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" @@ -256,6 +257,7 @@ func TestUpdateLocalQueue(t *testing.T) { utiltesting.MakeLocalQueue("bar", "").ClusterQueue("cq2").Obj(), } now := time.Now() + fakeclock := testingclock.NewFakeClock(now) workloads := []*kueue.Workload{ utiltesting.MakeWorkload("a", "").Queue("foo").Creation(now.Add(time.Second)).Obj(), utiltesting.MakeWorkload("b", "").Queue("bar").Creation(now).Obj(), @@ -278,7 +280,7 @@ func TestUpdateLocalQueue(t *testing.T) { } } for _, w := range workloads { - manager.AddOrUpdateWorkload(w) + manager.AddOrUpdateWorkload(ctx, w, fakeclock) } // Update cluster queue of first queue. @@ -348,6 +350,8 @@ func TestAddWorkload(t *testing.T) { t.Fatalf("Failed adding queue %s: %v", q.Name, err) } } + now := time.Now() + fakeclock := testingclock.NewFakeClock(now) cases := []struct { workload *kueue.Workload wantAdded bool @@ -392,7 +396,9 @@ func TestAddWorkload(t *testing.T) { } for _, tc := range cases { t.Run(tc.workload.Name, func(t *testing.T) { - if added := manager.AddOrUpdateWorkload(tc.workload); added != tc.wantAdded { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + if added := manager.AddOrUpdateWorkload(ctx, tc.workload, fakeclock); added != tc.wantAdded { t.Errorf("AddWorkload returned %t, want %t", added, tc.wantAdded) } }) @@ -406,6 +412,7 @@ func TestStatus(t *testing.T) { t.Fatalf("Failed adding kueue scheme: %s", err) } now := time.Now().Truncate(time.Second) + fakeclock := testingclock.NewFakeClock(now) queues := []kueue.LocalQueue{ { @@ -460,7 +467,7 @@ func TestStatus(t *testing.T) { } for _, wl := range workloads { wl := wl - manager.AddOrUpdateWorkload(&wl) + manager.AddOrUpdateWorkload(ctx, &wl, fakeclock) } cases := map[string]struct { @@ -503,6 +510,7 @@ func TestRequeueWorkloadStrictFIFO(t *testing.T) { utiltesting.MakeLocalQueue("foo", "").ClusterQueue("cq").Obj(), utiltesting.MakeLocalQueue("bar", "").Obj(), } + fakeclock := testingclock.NewFakeClock(time.Now()) cases := []struct { workload *kueue.Workload inClient bool @@ -580,7 +588,7 @@ func TestRequeueWorkloadStrictFIFO(t *testing.T) { } } if tc.inQueue { - _ = manager.AddOrUpdateWorkload(tc.workload) + _ = manager.AddOrUpdateWorkload(ctx, tc.workload, fakeclock) } info := workload.NewInfo(tc.workload) if requeued := manager.RequeueWorkload(ctx, info, RequeueReasonGeneric); requeued != tc.wantRequeued { @@ -596,6 +604,7 @@ func TestUpdateWorkload(t *testing.T) { t.Fatalf("Failed adding kueue scheme: %s", err) } now := time.Now() + fakeclock := testingclock.NewFakeClock(now) cases := map[string]struct { clusterQueues []*kueue.ClusterQueue queues []*kueue.LocalQueue @@ -732,11 +741,11 @@ func TestUpdateWorkload(t *testing.T) { } } for _, w := range tc.workloads { - manager.AddOrUpdateWorkload(w) + manager.AddOrUpdateWorkload(ctx, w, fakeclock) } wl := tc.workloads[0].DeepCopy() tc.update(wl) - if updated := manager.UpdateWorkload(tc.workloads[0], wl); updated != tc.wantUpdated { + if updated := manager.UpdateWorkload(ctx, tc.workloads[0], wl, fakeclock); updated != tc.wantUpdated { t.Errorf("UpdatedWorkload returned %t, want %t", updated, tc.wantUpdated) } q := manager.localQueues[workload.QueueKey(wl)] @@ -782,6 +791,7 @@ func TestHeads(t *testing.T) { t.Fatalf("Failed adding kueue scheme: %s", err) } now := time.Now().Truncate(time.Second) + fakeclock := testingclock.NewFakeClock(now) clusterQueues := []*kueue.ClusterQueue{ utiltesting.MakeClusterQueue("active-fooCq").Obj(), @@ -849,7 +859,7 @@ func TestHeads(t *testing.T) { go manager.CleanUpOnContext(ctx) for _, wl := range tc.workloads { - manager.AddOrUpdateWorkload(wl) + manager.AddOrUpdateWorkload(ctx, wl, fakeclock) } wlNames := sets.New[string]() @@ -870,6 +880,7 @@ var ignoreTypeMeta = cmpopts.IgnoreTypes(metav1.TypeMeta{}) // asynchronously. func TestHeadsAsync(t *testing.T) { now := time.Now().Truncate(time.Second) + fakeclock := testingclock.NewFakeClock(now) clusterQueues := []*kueue.ClusterQueue{ utiltesting.MakeClusterQueue("fooCq").Obj(), utiltesting.MakeClusterQueue("barCq").Obj(), @@ -907,7 +918,7 @@ func TestHeadsAsync(t *testing.T) { if err := mgr.AddLocalQueue(ctx, &queues[0]); err != nil { t.Errorf("Failed adding queue: %s", err) } - mgr.AddOrUpdateWorkload(&wl) + mgr.AddOrUpdateWorkload(ctx, &wl, fakeclock) go func() { if err := mgr.AddClusterQueue(ctx, clusterQueues[0]); err != nil { t.Errorf("Failed adding clusterQueue: %v", err) @@ -949,7 +960,7 @@ func TestHeadsAsync(t *testing.T) { t.Errorf("Failed adding queue: %s", err) } go func() { - mgr.AddOrUpdateWorkload(&wl) + mgr.AddOrUpdateWorkload(ctx, &wl, fakeclock) }() }, wantHeads: []workload.Info{ @@ -970,7 +981,7 @@ func TestHeadsAsync(t *testing.T) { go func() { wlCopy := wl.DeepCopy() wlCopy.ResourceVersion = "old" - mgr.UpdateWorkload(wlCopy, &wl) + mgr.UpdateWorkload(ctx, wlCopy, &wl, fakeclock) }() }, wantHeads: []workload.Info{ @@ -1120,6 +1131,7 @@ func (c *fakeStatusChecker) ClusterQueueActive(name string) bool { func TestGetPendingWorkloadsInfo(t *testing.T) { now := time.Now().Truncate(time.Second) + fakeclock := testingclock.NewFakeClock(now) clusterQueues := []*kueue.ClusterQueue{ utiltesting.MakeClusterQueue("cq").Obj(), @@ -1150,7 +1162,7 @@ func TestGetPendingWorkloadsInfo(t *testing.T) { } } for _, w := range workloads { - manager.AddOrUpdateWorkload(w) + manager.AddOrUpdateWorkload(ctx, w, fakeclock) } cases := map[string]struct { diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go index 7190fa8628..1006cf2bb1 100644 --- a/pkg/util/testing/wrappers.go +++ b/pkg/util/testing/wrappers.go @@ -270,6 +270,23 @@ func (w *WorkloadWrapper) DeletionTimestamp(t time.Time) *WorkloadWrapper { return w } +func (w *WorkloadWrapper) RequeueState(count *int32, requeueAt *metav1.Time) *WorkloadWrapper { + if count == nil && requeueAt == nil { + w.Status.RequeueState = nil + return w + } + if w.Status.RequeueState == nil { + w.Status.RequeueState = &kueue.RequeueState{} + } + if count != nil { + w.Status.RequeueState.Count = count + } + if requeueAt != nil { + w.Status.RequeueState.RequeueAt = requeueAt + } + return w +} + type PodSetWrapper struct{ kueue.PodSet } func MakePodSet(name string, count int) *PodSetWrapper { diff --git a/pkg/visibility/api/rest/pending_workloads_cq_test.go b/pkg/visibility/api/rest/pending_workloads_cq_test.go index d039533cbd..6a474fdcf8 100644 --- a/pkg/visibility/api/rest/pending_workloads_cq_test.go +++ b/pkg/visibility/api/rest/pending_workloads_cq_test.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + testingclock "k8s.io/utils/clock/testing" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" visibility "sigs.k8s.io/kueue/apis/visibility/v1alpha1" @@ -59,6 +60,7 @@ func TestPendingWorkloadsInCQ(t *testing.T) { } now := time.Now() + fakeclock := testingclock.NewFakeClock(now) cases := map[string]struct { clusterQueues []*kueue.ClusterQueue queues []*kueue.LocalQueue @@ -336,7 +338,7 @@ func TestPendingWorkloadsInCQ(t *testing.T) { } } for _, w := range tc.workloads { - manager.AddOrUpdateWorkload(w) + manager.AddOrUpdateWorkload(ctx, w, fakeclock) } info, err := pendingWorkloadsInCqRest.Get(ctx, tc.req.queueName, tc.req.queryParams) diff --git a/pkg/visibility/api/rest/pending_workloads_lq_test.go b/pkg/visibility/api/rest/pending_workloads_lq_test.go index 008d09e34d..1152583070 100644 --- a/pkg/visibility/api/rest/pending_workloads_lq_test.go +++ b/pkg/visibility/api/rest/pending_workloads_lq_test.go @@ -25,6 +25,7 @@ import ( v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apiserver/pkg/endpoints/request" + testingclock "k8s.io/utils/clock/testing" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" visibility "sigs.k8s.io/kueue/apis/visibility/v1alpha1" @@ -59,6 +60,7 @@ func TestPendingWorkloadsInLQ(t *testing.T) { } now := time.Now() + fakeclock := testingclock.NewFakeClock(now) cases := map[string]struct { clusterQueues []*kueue.ClusterQueue queues []*kueue.LocalQueue @@ -452,7 +454,7 @@ func TestPendingWorkloadsInLQ(t *testing.T) { } } for _, w := range tc.workloads { - manager.AddOrUpdateWorkload(w) + manager.AddOrUpdateWorkload(ctx, w, fakeclock) } ctx = request.WithNamespace(ctx, tc.req.nsName) diff --git a/pkg/webhooks/workload_webhook.go b/pkg/webhooks/workload_webhook.go index a2c3f51ae8..78fbb85f59 100644 --- a/pkg/webhooks/workload_webhook.go +++ b/pkg/webhooks/workload_webhook.go @@ -75,6 +75,11 @@ func (w *WorkloadWebhook) Default(ctx context.Context, obj runtime.Object) error wl.Spec.PodSets[i].MinCount = nil } } + + // If the deactivated workload is re-activated, we need to reset the RequeueState. + if ptr.Deref(wl.Spec.Active, true) && workload.IsEvictedByDeactivation(wl) && workload.HasRequeueState(wl) { + wl.Status.RequeueState = nil + } return nil } diff --git a/pkg/webhooks/workload_webhook_test.go b/pkg/webhooks/workload_webhook_test.go index 258906aa74..78051be403 100644 --- a/pkg/webhooks/workload_webhook_test.go +++ b/pkg/webhooks/workload_webhook_test.go @@ -78,6 +78,22 @@ func TestWorkloadWebhookDefault(t *testing.T) { }, }, }, + "re-activated workload with re-queue state is reset the re-queue state": { + wl: *testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace). + Condition(metav1.Condition{ + Type: kueue.WorkloadEvicted, + Status: metav1.ConditionTrue, + Reason: kueue.WorkloadEvictedByDeactivation, + }).RequeueState(ptr.To[int32](5), ptr.To(metav1.Now())). + Obj(), + wantWl: *testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace). + Condition(metav1.Condition{ + Type: kueue.WorkloadEvicted, + Status: metav1.ConditionTrue, + Reason: kueue.WorkloadEvictedByDeactivation, + }). + Obj(), + }, } for name, tc := range cases { t.Run(name, func(t *testing.T) { @@ -86,7 +102,8 @@ func TestWorkloadWebhookDefault(t *testing.T) { if err := wh.Default(context.Background(), wlCopy); err != nil { t.Fatalf("Could not apply defaults: %v", err) } - if diff := cmp.Diff(tc.wantWl, *wlCopy); diff != "" { + if diff := cmp.Diff(tc.wantWl, *wlCopy, + cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime")); diff != "" { t.Errorf("Obtained wrong defaults (-want,+got):\n%s", diff) } }) diff --git a/pkg/workload/workload.go b/pkg/workload/workload.go index 47fbe116d4..c66a18f44d 100644 --- a/pkg/workload/workload.go +++ b/pkg/workload/workload.go @@ -375,6 +375,9 @@ func admissionPatch(w *kueue.Workload) *kueue.Workload { wlCopy := BaseSSAWorkload(w) wlCopy.Status.Admission = w.Status.Admission.DeepCopy() + if HasRequeueState(w) { + wlCopy.Status.RequeueState = w.Status.RequeueState.DeepCopy() + } for _, conditionName := range admissionManagedConditions { if existing := apimeta.FindStatusCondition(w.Status.Conditions, conditionName); existing != nil { wlCopy.Status.Conditions = append(wlCopy.Status.Conditions, *existing.DeepCopy()) @@ -443,6 +446,11 @@ func ReclaimablePodsAreEqual(a, b []kueue.ReclaimablePod) bool { return true } +// HasRequeueState returns true if the workload has re-queue state. +func HasRequeueState(w *kueue.Workload) bool { + return w.Status.RequeueState != nil +} + // IsAdmitted returns true if the workload is admitted. func IsAdmitted(w *kueue.Workload) bool { return apimeta.IsStatusConditionTrue(w.Status.Conditions, kueue.WorkloadAdmitted) @@ -453,6 +461,12 @@ func IsFinished(w *kueue.Workload) bool { return apimeta.IsStatusConditionTrue(w.Status.Conditions, kueue.WorkloadFinished) } +// IsEvictedByDeactivation returns true if the workload is evicted by deactivation. +func IsEvictedByDeactivation(w *kueue.Workload) bool { + cond := apimeta.FindStatusCondition(w.Status.Conditions, kueue.WorkloadEvicted) + return cond != nil && cond.Status == metav1.ConditionTrue && cond.Reason == kueue.WorkloadEvictedByDeactivation +} + func RemoveFinalizer(ctx context.Context, c client.Client, wl *kueue.Workload) error { if controllerutil.RemoveFinalizer(wl, kueue.ResourceInUseFinalizerName) { return c.Update(ctx, wl) diff --git a/pkg/workload/workload_test.go b/pkg/workload/workload_test.go index 4f682f0285..f53739c0a5 100644 --- a/pkg/workload/workload_test.go +++ b/pkg/workload/workload_test.go @@ -456,3 +456,73 @@ func TestAssignmentClusterQueueState(t *testing.T) { }) } } + +func TestHasRequeueState(t *testing.T) { + cases := map[string]struct { + workload *kueue.Workload + want bool + }{ + "workload has requeue state": { + workload: utiltesting.MakeWorkload("test", "test").RequeueState(ptr.To[int32](5), ptr.To(metav1.Now())).Obj(), + want: true, + }, + "workload doesn't have requeue state": { + workload: utiltesting.MakeWorkload("test", "test").RequeueState(nil, nil).Obj(), + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + got := HasRequeueState(tc.workload) + if tc.want != got { + t.Errorf("Unexpected result from HasRequeuState\nwant:%v\ngot:%v\n", tc.want, got) + } + }) + } +} + +func TestIsEvictedByDeactivation(t *testing.T) { + cases := map[string]struct { + workload *kueue.Workload + want bool + }{ + "evicted condition doesn't exist": { + workload: utiltesting.MakeWorkload("test", "test").Obj(), + }, + "evicted condition with false status": { + workload: utiltesting.MakeWorkload("test", "test"). + Condition(metav1.Condition{ + Type: kueue.WorkloadEvicted, + Reason: kueue.WorkloadEvictedByDeactivation, + Status: metav1.ConditionFalse, + }). + Obj(), + }, + "evicted condition with PodsReadyTimeout reason": { + workload: utiltesting.MakeWorkload("test", "test"). + Condition(metav1.Condition{ + Type: kueue.WorkloadEvicted, + Reason: kueue.WorkloadEvictedByPodsReadyTimeout, + Status: metav1.ConditionTrue, + }). + Obj(), + }, + "evicted condition with InactiveWorkload reason": { + workload: utiltesting.MakeWorkload("test", "test"). + Condition(metav1.Condition{ + Type: kueue.WorkloadEvicted, + Reason: kueue.WorkloadEvictedByDeactivation, + Status: metav1.ConditionTrue, + }). + Obj(), + want: true, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + got := IsEvictedByDeactivation(tc.workload) + if tc.want != got { + t.Errorf("Unexpected result from IsEvictedByDeactivation\nwant:%v\ngot:%v\n", tc.want, got) + } + }) + } +} diff --git a/site/content/en/docs/reference/kueue-config.v1beta1.md b/site/content/en/docs/reference/kueue-config.v1beta1.md index 29a8e3d6cf..a77eb28f91 100644 --- a/site/content/en/docs/reference/kueue-config.v1beta1.md +++ b/site/content/en/docs/reference/kueue-config.v1beta1.md @@ -599,8 +599,11 @@ Defaults to 5.

Timestamp defines the timestamp used for requeuing a Workload -that was evicted due to Pod readiness.

-

Defaults to Eviction.

+that was evicted due to Pod readiness. The possible values are:

+ backoffLimitCount
@@ -608,7 +611,10 @@ that was evicted due to Pod readiness.

BackoffLimitCount defines the maximum number of requeuing retries. -When the number is reached, the workload is deactivated (.spec.activate=false).

+When the number is reached, the workload is deactivated (.spec.activate=false). +The duration until the workload is deactivated is calculated by "Rand+SUMk=0,n" +where the "n" means the "workloadStatus.requeueState.count-1", and the "Rand" means the jitter. +Given that the "backoffLimitCount" equals "30", the result approximately equals 24 hours.

Defaults to null.

diff --git a/test/integration/controller/jobs/job/job_controller_test.go b/test/integration/controller/jobs/job/job_controller_test.go index a5dafef43f..d270fcd365 100644 --- a/test/integration/controller/jobs/job/job_controller_test.go +++ b/test/integration/controller/jobs/job/job_controller_test.go @@ -1717,7 +1717,7 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde }) }) - ginkgo.When("Suspend a running Job without requeueing through Workload's spec.active field", func() { + ginkgo.When("Suspend a running Job without requeuing through Workload's spec.active field", func() { ginkgo.It("Should not readmit a job to the queue after Active is changed to false", func() { ginkgo.By("creating localQueue") localQueue := testing.MakeLocalQueue("queue", ns.Name).ClusterQueue(prodClusterQ.Name).Obj()