diff --git a/apis/kueue/v1beta1/clusterqueue_types.go b/apis/kueue/v1beta1/clusterqueue_types.go index c6532f526d..aed421b558 100644 --- a/apis/kueue/v1beta1/clusterqueue_types.go +++ b/apis/kueue/v1beta1/clusterqueue_types.go @@ -158,14 +158,6 @@ const ( BestEffortFIFO QueueingStrategy = "BestEffortFIFO" ) -type StopPolicy string - -const ( - None StopPolicy = "None" - HoldAndDrain StopPolicy = "HoldAndDrain" - Hold StopPolicy = "Hold" -) - // +kubebuilder:validation:XValidation:rule="self.flavors.all(x, size(x.resources) == size(self.coveredResources))", message="flavors must have the same number of resources as the coveredResources" type ResourceGroup struct { // coveredResources is the list of resources covered by the flavors in this diff --git a/apis/kueue/v1beta1/constants.go b/apis/kueue/v1beta1/constants.go index e1f6fa2a7d..fb6fa78821 100644 --- a/apis/kueue/v1beta1/constants.go +++ b/apis/kueue/v1beta1/constants.go @@ -18,6 +18,13 @@ package v1beta1 const ( ResourceInUseFinalizerName = "kueue.x-k8s.io/resource-in-use" + DefaultPodSetName = "main" +) + +type StopPolicy string - DefaultPodSetName = "main" +const ( + None StopPolicy = "None" + HoldAndDrain StopPolicy = "HoldAndDrain" + Hold StopPolicy = "Hold" ) diff --git a/apis/kueue/v1beta1/localqueue_types.go b/apis/kueue/v1beta1/localqueue_types.go index 63ed9d6464..e05d3947fc 100644 --- a/apis/kueue/v1beta1/localqueue_types.go +++ b/apis/kueue/v1beta1/localqueue_types.go @@ -27,6 +27,20 @@ type LocalQueueSpec struct { // clusterQueue is a reference to a clusterQueue that backs this localQueue. // +kubebuilder:validation:XValidation:rule="self == oldSelf", message="field is immutable" ClusterQueue ClusterQueueReference `json:"clusterQueue,omitempty"` + + // stopPolicy - if set to a value different from None, the LocalQueue is considered Inactive, + // no new reservation being made. + // + // Depending on its value, its associated workloads will: + // + // - None - Workloads are admitted + // - HoldAndDrain - Admitted workloads are evicted and Reserving workloads will cancel the reservation. + // - Hold - Admitted workloads will run to completion and Reserving workloads will cancel the reservation. + // + // +optional + // +kubebuilder:validation:Enum=None;Hold;HoldAndDrain + // +kubebuilder:default="None" + StopPolicy *StopPolicy `json:"stopPolicy,omitempty"` } // ClusterQueueReference is the name of the ClusterQueue. diff --git a/apis/kueue/v1beta1/workload_types.go b/apis/kueue/v1beta1/workload_types.go index d5894f0119..063e1d9f38 100644 --- a/apis/kueue/v1beta1/workload_types.go +++ b/apis/kueue/v1beta1/workload_types.go @@ -330,6 +330,10 @@ const ( ) const ( + // WorkloadInadmissible means that the Workload can't reserve quota + // due to LocalQueue or ClusterQueue doesn't exist or inactive. + WorkloadInadmissible = "Inadmissible" + // WorkloadEvictedByPreemption indicates that the workload was evicted // in order to free resources for a workload with a higher priority. WorkloadEvictedByPreemption = "Preempted" @@ -346,6 +350,10 @@ const ( // because the ClusterQueue is Stopped. WorkloadEvictedByClusterQueueStopped = "ClusterQueueStopped" + // WorkloadEvictedByLocalQueueStopped indicates that the workload was evicted + // because the LocalQueue is Stopped. + WorkloadEvictedByLocalQueueStopped = "LocalQueueStopped" + // WorkloadEvictedByDeactivation indicates that the workload was evicted // because spec.active is set to false. WorkloadEvictedByDeactivation = "InactiveWorkload" @@ -361,6 +369,10 @@ const ( // WorkloadClusterQueueRestarted indicates that the workload was requeued because // cluster queue was restarted after being stopped. WorkloadClusterQueueRestarted = "ClusterQueueRestarted" + + // WorkloadLocalQueueRestarted indicates that the workload was requeued because + // local queue was restarted after being stopped. + WorkloadLocalQueueRestarted = "LocalQueueRestarted" ) const ( diff --git a/apis/kueue/v1beta1/zz_generated.deepcopy.go b/apis/kueue/v1beta1/zz_generated.deepcopy.go index b81f581738..accb330543 100644 --- a/apis/kueue/v1beta1/zz_generated.deepcopy.go +++ b/apis/kueue/v1beta1/zz_generated.deepcopy.go @@ -551,7 +551,7 @@ func (in *LocalQueue) DeepCopyInto(out *LocalQueue) { *out = *in out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) - out.Spec = in.Spec + in.Spec.DeepCopyInto(&out.Spec) in.Status.DeepCopyInto(&out.Status) } @@ -646,6 +646,11 @@ func (in *LocalQueueResourceUsage) DeepCopy() *LocalQueueResourceUsage { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *LocalQueueSpec) DeepCopyInto(out *LocalQueueSpec) { *out = *in + if in.StopPolicy != nil { + in, out := &in.StopPolicy, &out.StopPolicy + *out = new(StopPolicy) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LocalQueueSpec. diff --git a/charts/kueue/templates/crd/kueue.x-k8s.io_localqueues.yaml b/charts/kueue/templates/crd/kueue.x-k8s.io_localqueues.yaml index d944b7334b..7511817247 100644 --- a/charts/kueue/templates/crd/kueue.x-k8s.io_localqueues.yaml +++ b/charts/kueue/templates/crd/kueue.x-k8s.io_localqueues.yaml @@ -79,6 +79,24 @@ spec: x-kubernetes-validations: - message: field is immutable rule: self == oldSelf + stopPolicy: + default: None + description: |- + stopPolicy - if set to a value different from None, the LocalQueue is considered Inactive, + no new reservation being made. + + + Depending on its value, its associated workloads will: + + + - None - Workloads are admitted + - HoldAndDrain - Admitted workloads are evicted and Reserving workloads will cancel the reservation. + - Hold - Admitted workloads will run to completion and Reserving workloads will cancel the reservation. + enum: + - None + - Hold + - HoldAndDrain + type: string type: object status: description: LocalQueueStatus defines the observed state of LocalQueue diff --git a/client-go/applyconfiguration/kueue/v1beta1/localqueuespec.go b/client-go/applyconfiguration/kueue/v1beta1/localqueuespec.go index f4ce095af0..724b31cd3c 100644 --- a/client-go/applyconfiguration/kueue/v1beta1/localqueuespec.go +++ b/client-go/applyconfiguration/kueue/v1beta1/localqueuespec.go @@ -25,6 +25,7 @@ import ( // with apply. type LocalQueueSpecApplyConfiguration struct { ClusterQueue *v1beta1.ClusterQueueReference `json:"clusterQueue,omitempty"` + StopPolicy *v1beta1.StopPolicy `json:"stopPolicy,omitempty"` } // LocalQueueSpecApplyConfiguration constructs an declarative configuration of the LocalQueueSpec type for use with @@ -40,3 +41,11 @@ func (b *LocalQueueSpecApplyConfiguration) WithClusterQueue(value v1beta1.Cluste b.ClusterQueue = &value return b } + +// WithStopPolicy sets the StopPolicy field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the StopPolicy field is set to the value of the last call. +func (b *LocalQueueSpecApplyConfiguration) WithStopPolicy(value v1beta1.StopPolicy) *LocalQueueSpecApplyConfiguration { + b.StopPolicy = &value + return b +} diff --git a/config/components/crd/bases/kueue.x-k8s.io_localqueues.yaml b/config/components/crd/bases/kueue.x-k8s.io_localqueues.yaml index 5edff1cac7..c70273f9cd 100644 --- a/config/components/crd/bases/kueue.x-k8s.io_localqueues.yaml +++ b/config/components/crd/bases/kueue.x-k8s.io_localqueues.yaml @@ -64,6 +64,24 @@ spec: x-kubernetes-validations: - message: field is immutable rule: self == oldSelf + stopPolicy: + default: None + description: |- + stopPolicy - if set to a value different from None, the LocalQueue is considered Inactive, + no new reservation being made. + + + Depending on its value, its associated workloads will: + + + - None - Workloads are admitted + - HoldAndDrain - Admitted workloads are evicted and Reserving workloads will cancel the reservation. + - Hold - Admitted workloads will run to completion and Reserving workloads will cancel the reservation. + enum: + - None + - Hold + - HoldAndDrain + type: string type: object status: description: LocalQueueStatus defines the observed state of LocalQueue diff --git a/pkg/controller/core/localqueue_controller.go b/pkg/controller/core/localqueue_controller.go index d2a8441ad1..e860983e6a 100644 --- a/pkg/controller/core/localqueue_controller.go +++ b/pkg/controller/core/localqueue_controller.go @@ -44,11 +44,13 @@ import ( ) const ( - queueIsInactiveMsg = "Can't submit new workloads to clusterQueue" - failedUpdateLqStatusMsg = "Failed to retrieve localQueue status" + localQueueIsInactiveMsg = "Can't submit new workloads to localQueue" + clusterQueueIsInactiveMsg = "Can't submit new workloads to clusterQueue" + failedUpdateLqStatusMsg = "Failed to retrieve localQueue status" ) const ( + localQueueIsInactiveReason = "LocalQueueIsInactive" clusterQueueIsInactiveReason = "ClusterQueueIsInactive" ) @@ -99,11 +101,17 @@ func (r *LocalQueueReconciler) Reconcile(ctx context.Context, req ctrl.Request) ctx = ctrl.LoggerInto(ctx, log) log.V(2).Info("Reconciling LocalQueue") + var err error + if ptr.Deref(queueObj.Spec.StopPolicy, kueue.None) != kueue.None { + err = r.UpdateStatusIfChanged(ctx, &queueObj, metav1.ConditionFalse, localQueueIsInactiveReason, localQueueIsInactiveMsg) + return ctrl.Result{}, client.IgnoreNotFound(err) + } + var cq kueue.ClusterQueue - err := r.client.Get(ctx, client.ObjectKey{Name: string(queueObj.Spec.ClusterQueue)}, &cq) + err = r.client.Get(ctx, client.ObjectKey{Name: string(queueObj.Spec.ClusterQueue)}, &cq) if err != nil { if apierrors.IsNotFound(err) { - err = r.UpdateStatusIfChanged(ctx, &queueObj, metav1.ConditionFalse, "ClusterQueueDoesNotExist", queueIsInactiveMsg) + err = r.UpdateStatusIfChanged(ctx, &queueObj, metav1.ConditionFalse, "ClusterQueueDoesNotExist", clusterQueueIsInactiveMsg) } return ctrl.Result{}, client.IgnoreNotFound(err) } @@ -111,7 +119,7 @@ func (r *LocalQueueReconciler) Reconcile(ctx context.Context, req ctrl.Request) err = r.UpdateStatusIfChanged(ctx, &queueObj, metav1.ConditionTrue, "Ready", "Can submit new workloads to clusterQueue") return ctrl.Result{}, client.IgnoreNotFound(err) } - err = r.UpdateStatusIfChanged(ctx, &queueObj, metav1.ConditionFalse, clusterQueueIsInactiveReason, queueIsInactiveMsg) + err = r.UpdateStatusIfChanged(ctx, &queueObj, metav1.ConditionFalse, clusterQueueIsInactiveReason, clusterQueueIsInactiveMsg) return ctrl.Result{}, client.IgnoreNotFound(err) } @@ -123,6 +131,9 @@ func (r *LocalQueueReconciler) Create(e event.CreateEvent) bool { } log := r.log.WithValues("localQueue", klog.KObj(q)) log.V(2).Info("LocalQueue create event") + if ptr.Deref(q.Spec.StopPolicy, kueue.None) != kueue.None { + return true + } ctx := logr.NewContext(context.Background(), log) if err := r.queues.AddLocalQueue(ctx, q); err != nil { log.Error(err, "Failed to add localQueue to the queueing system") @@ -146,20 +157,45 @@ func (r *LocalQueueReconciler) Delete(e event.DeleteEvent) bool { } func (r *LocalQueueReconciler) Update(e event.UpdateEvent) bool { - q, match := e.ObjectNew.(*kueue.LocalQueue) - if !match { + oldLq, oldIsLq := e.ObjectOld.(*kueue.LocalQueue) + newLq, newIsLq := e.ObjectNew.(*kueue.LocalQueue) + if !oldIsLq || !newIsLq { // No need to interact with the queue manager for other objects. return true } - log := r.log.WithValues("localQueue", klog.KObj(q)) + log := r.log.WithValues("localQueue", klog.KObj(newLq)) log.V(2).Info("Queue update event") - if err := r.queues.UpdateLocalQueue(q); err != nil { - log.Error(err, "Failed to update queue in the queueing system") + + oldStopPolicy := ptr.Deref(oldLq.Spec.StopPolicy, kueue.None) + newStopPolicy := ptr.Deref(newLq.Spec.StopPolicy, kueue.None) + + if oldStopPolicy == kueue.None && newStopPolicy != kueue.None { + r.queues.DeleteLocalQueue(oldLq) + r.cache.DeleteLocalQueue(oldLq) + return true + } + + if oldStopPolicy != kueue.None && newStopPolicy == kueue.None { + ctx := logr.NewContext(context.Background(), log) + if err := r.queues.AddLocalQueue(ctx, newLq); err != nil { + log.Error(err, "Failed to add localQueue to the queueing system") + } + if err := r.cache.AddLocalQueue(newLq); err != nil { + log.Error(err, "Failed to add localQueue to the cache") + } + return true } - oldQ := e.ObjectOld.(*kueue.LocalQueue) - if err := r.cache.UpdateLocalQueue(oldQ, q); err != nil { - log.Error(err, "Failed to update localQueue in the cache") + + if newStopPolicy == kueue.None { + if err := r.queues.UpdateLocalQueue(newLq); err != nil { + log.Error(err, "Failed to update queue in the queueing system") + } + oldQ := e.ObjectOld.(*kueue.LocalQueue) + if err := r.cache.UpdateLocalQueue(oldQ, newLq); err != nil { + log.Error(err, "Failed to update localQueue in the cache") + } } + return true } @@ -275,21 +311,29 @@ func (r *LocalQueueReconciler) UpdateStatusIfChanged( reason, msg string, ) error { oldStatus := queue.Status.DeepCopy() - pendingWls, err := r.queues.PendingWorkloads(queue) - if err != nil { - r.log.Error(err, failedUpdateLqStatusMsg) - return err - } - stats, err := r.cache.LocalQueueUsage(queue) - if err != nil { - r.log.Error(err, failedUpdateLqStatusMsg) - return err + if ptr.Deref(queue.Spec.StopPolicy, kueue.None) == kueue.None { + pendingWls, err := r.queues.PendingWorkloads(queue) + if err != nil { + r.log.Error(err, failedUpdateLqStatusMsg) + return err + } + stats, err := r.cache.LocalQueueUsage(queue) + if err != nil { + r.log.Error(err, failedUpdateLqStatusMsg) + return err + } + queue.Status.PendingWorkloads = pendingWls + queue.Status.ReservingWorkloads = int32(stats.ReservingWorkloads) + queue.Status.AdmittedWorkloads = int32(stats.AdmittedWorkloads) + queue.Status.FlavorsReservation = stats.ReservedResources + queue.Status.FlavorUsage = stats.AdmittedResources + } else { + queue.Status.PendingWorkloads = 0 + queue.Status.ReservingWorkloads = 0 + queue.Status.AdmittedWorkloads = 0 + queue.Status.FlavorsReservation = nil + queue.Status.FlavorUsage = nil } - queue.Status.PendingWorkloads = pendingWls - queue.Status.ReservingWorkloads = int32(stats.ReservingWorkloads) - queue.Status.AdmittedWorkloads = int32(stats.AdmittedWorkloads) - queue.Status.FlavorsReservation = stats.ReservedResources - queue.Status.FlavorUsage = stats.AdmittedResources if len(conditionStatus) != 0 && len(reason) != 0 && len(msg) != 0 { meta.SetStatusCondition(&queue.Status.Conditions, metav1.Condition{ Type: kueue.LocalQueueActive, diff --git a/pkg/controller/core/localqueue_controller_test.go b/pkg/controller/core/localqueue_controller_test.go index 3807b14cdd..5ce28f7d3a 100644 --- a/pkg/controller/core/localqueue_controller_test.go +++ b/pkg/controller/core/localqueue_controller_test.go @@ -42,6 +42,54 @@ func TestLocalQueueReconcile(t *testing.T) { wantLocalQueue *kueue.LocalQueue wantError error }{ + "local queue with Hold StopPolicy": { + clusterQueue: utiltesting.MakeClusterQueue("test-cluster-queue"). + Obj(), + localQueue: utiltesting.MakeLocalQueue("test-queue", "default"). + ClusterQueue("test-cluster-queue"). + PendingWorkloads(1). + StopPolicy(kueue.Hold). + Generation(1). + Obj(), + wantLocalQueue: utiltesting.MakeLocalQueue("test-queue", "default"). + ClusterQueue("test-cluster-queue"). + PendingWorkloads(0). + StopPolicy(kueue.Hold). + Generation(1). + Condition( + kueue.LocalQueueActive, + metav1.ConditionFalse, + localQueueIsInactiveReason, + localQueueIsInactiveMsg, + 1, + ). + Obj(), + wantError: nil, + }, + "local queue with HoldAndDrain StopPolicy": { + clusterQueue: utiltesting.MakeClusterQueue("test-cluster-queue"). + Obj(), + localQueue: utiltesting.MakeLocalQueue("test-queue", "default"). + ClusterQueue("test-cluster-queue"). + PendingWorkloads(1). + StopPolicy(kueue.HoldAndDrain). + Generation(1). + Obj(), + wantLocalQueue: utiltesting.MakeLocalQueue("test-queue", "default"). + ClusterQueue("test-cluster-queue"). + PendingWorkloads(0). + StopPolicy(kueue.HoldAndDrain). + Generation(1). + Condition( + kueue.LocalQueueActive, + metav1.ConditionFalse, + localQueueIsInactiveReason, + localQueueIsInactiveMsg, + 1, + ). + Obj(), + wantError: nil, + }, "cluster queue is inactive": { clusterQueue: utiltesting.MakeClusterQueue("test-cluster-queue"). Obj(), @@ -58,7 +106,7 @@ func TestLocalQueueReconcile(t *testing.T) { kueue.LocalQueueActive, metav1.ConditionFalse, clusterQueueIsInactiveReason, - queueIsInactiveMsg, + clusterQueueIsInactiveMsg, 1, ). Obj(), diff --git a/pkg/controller/core/workload_controller.go b/pkg/controller/core/workload_controller.go index 469426dc11..f6e4fcdb7d 100644 --- a/pkg/controller/core/workload_controller.go +++ b/pkg/controller/core/workload_controller.go @@ -205,6 +205,19 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c return ctrl.Result{}, nil } + lq := kueue.LocalQueue{} + err := r.client.Get(ctx, types.NamespacedName{Namespace: wl.Namespace, Name: wl.Spec.QueueName}, &lq) + if client.IgnoreNotFound(err) != nil { + return ctrl.Result{}, err + } + + lqExists := err == nil + lqActive := ptr.Deref(lq.Spec.StopPolicy, kueue.None) == kueue.None + if lqExists && lqActive && isDisabledRequeuedByLocalQueueStopped(&wl) { + workload.SetRequeuedCondition(&wl, kueue.WorkloadLocalQueueRestarted, "The LocalQueue was restarted after being stopped", true) + return ctrl.Result{}, workload.ApplyAdmissionStatus(ctx, r.client, &wl, true) + } + cqName, cqOk := r.queues.ClusterQueueForWorkload(&wl) if cqOk { // because we need to react to API cluster cq events, the list of checks from a cache can lead to race conditions @@ -244,6 +257,10 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c return ctrl.Result{}, err } + if updated, err := r.reconcileOnLocalQueueActiveState(ctx, &wl, &lq); updated || err != nil { + return ctrl.Result{}, err + } + if updated, err := r.reconcileOnClusterQueueActiveState(ctx, &wl, cqName); updated || err != nil { return ctrl.Result{}, err } @@ -274,21 +291,27 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c } switch { - case !r.queues.QueueForWorkloadExists(&wl): + case !lqExists: log.V(3).Info("Workload is inadmissible because of missing LocalQueue", "localQueue", klog.KRef(wl.Namespace, wl.Spec.QueueName)) - if workload.UnsetQuotaReservationWithCondition(&wl, "Inadmissible", fmt.Sprintf("LocalQueue %s doesn't exist", wl.Spec.QueueName)) { + if workload.UnsetQuotaReservationWithCondition(&wl, kueue.WorkloadInadmissible, fmt.Sprintf("LocalQueue %s doesn't exist", wl.Spec.QueueName)) { + err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true) + return ctrl.Result{}, client.IgnoreNotFound(err) + } + case !lqActive: + log.V(3).Info("Workload is inadmissible because of stopped LocalQueue", "localQueue", klog.KRef(wl.Namespace, wl.Spec.QueueName)) + if workload.UnsetQuotaReservationWithCondition(&wl, kueue.WorkloadInadmissible, fmt.Sprintf("LocalQueue %s is inactive", wl.Spec.QueueName)) { err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true) return ctrl.Result{}, client.IgnoreNotFound(err) } case !cqOk: log.V(3).Info("Workload is inadmissible because of missing ClusterQueue", "clusterQueue", klog.KRef("", cqName)) - if workload.UnsetQuotaReservationWithCondition(&wl, "Inadmissible", fmt.Sprintf("ClusterQueue %s doesn't exist", cqName)) { + if workload.UnsetQuotaReservationWithCondition(&wl, kueue.WorkloadInadmissible, fmt.Sprintf("ClusterQueue %s doesn't exist", cqName)) { err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true) return ctrl.Result{}, client.IgnoreNotFound(err) } case !r.cache.ClusterQueueActive(cqName): log.V(3).Info("Workload is inadmissible because ClusterQueue is inactive", "clusterQueue", klog.KRef("", cqName)) - if workload.UnsetQuotaReservationWithCondition(&wl, "Inadmissible", fmt.Sprintf("ClusterQueue %s is inactive", cqName)) { + if workload.UnsetQuotaReservationWithCondition(&wl, kueue.WorkloadInadmissible, fmt.Sprintf("ClusterQueue %s is inactive", cqName)) { err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true) return ctrl.Result{}, client.IgnoreNotFound(err) } @@ -299,8 +322,18 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c // isDisabledRequeuedByClusterQueueStopped returns true if the workload is unset requeued by cluster queue stopped. func isDisabledRequeuedByClusterQueueStopped(w *kueue.Workload) bool { + return isDisabledRequeuedByReason(w, kueue.WorkloadEvictedByClusterQueueStopped) +} + +// isDisabledRequeuedByLocalQueueStopped returns true if the workload is unset requeued by local queue stopped. +func isDisabledRequeuedByLocalQueueStopped(w *kueue.Workload) bool { + return isDisabledRequeuedByReason(w, kueue.WorkloadEvictedByLocalQueueStopped) +} + +// isDisabledRequeuedByReason returns true if the workload is unset requeued by reason. +func isDisabledRequeuedByReason(w *kueue.Workload, reason string) bool { cond := apimeta.FindStatusCondition(w.Status.Conditions, kueue.WorkloadRequeued) - return cond != nil && cond.Status == metav1.ConditionFalse && cond.Reason == kueue.WorkloadEvictedByClusterQueueStopped + return cond != nil && cond.Status == metav1.ConditionFalse && cond.Reason == reason } func (r *WorkloadReconciler) reconcileCheckBasedEviction(ctx context.Context, wl *kueue.Workload) (bool, error) { @@ -331,6 +364,46 @@ func (r *WorkloadReconciler) reconcileSyncAdmissionChecks(ctx context.Context, w return false, nil } +func (r *WorkloadReconciler) reconcileOnLocalQueueActiveState(ctx context.Context, wl *kueue.Workload, lq *kueue.LocalQueue) (bool, error) { + queueStopPolicy := ptr.Deref(lq.Spec.StopPolicy, kueue.None) + + log := ctrl.LoggerFrom(ctx) + + if workload.IsAdmitted(wl) { + if queueStopPolicy != kueue.HoldAndDrain { + return false, nil + } + if apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadEvicted) { + log.V(3).Info("Workload is already evicted due to LocalQueue stopped.", "localQueue", klog.KRef("", wl.Spec.QueueName)) + return false, nil + } + log.V(3).Info("Workload is evicted because the LocalQueue is stopped", "localQueue", klog.KRef(wl.Namespace, wl.Spec.QueueName)) + workload.SetEvictedCondition(wl, kueue.WorkloadEvictedByLocalQueueStopped, "The LocalQueue is stopped") + err := workload.ApplyAdmissionStatus(ctx, r.client, wl, true) + if err == nil { + cqName, cqExist := r.queues.ClusterQueueForWorkload(wl) + if cqExist { + metrics.ReportEvictedWorkloads(cqName, kueue.WorkloadEvictedByLocalQueueStopped) + } + } + return true, client.IgnoreNotFound(err) + } + + if lq.Name == "" || !lq.DeletionTimestamp.IsZero() { + log.V(3).Info("Workload is inadmissible because the LocalQueue is terminating or missing", "localQueue", klog.KRef("", wl.Spec.QueueName)) + _ = workload.UnsetQuotaReservationWithCondition(wl, kueue.WorkloadInadmissible, fmt.Sprintf("LocalQueue %s is terminating or missing", wl.Spec.QueueName)) + return true, workload.ApplyAdmissionStatus(ctx, r.client, wl, true) + } + + if queueStopPolicy != kueue.None { + log.V(3).Info("Workload is inadmissible because the LocalQueue is stopped", "localQueue", klog.KRef("", wl.Spec.QueueName)) + _ = workload.UnsetQuotaReservationWithCondition(wl, kueue.WorkloadInadmissible, fmt.Sprintf("LocalQueue %s is stopped", wl.Spec.QueueName)) + return true, workload.ApplyAdmissionStatus(ctx, r.client, wl, true) + } + + return false, nil +} + func (r *WorkloadReconciler) reconcileOnClusterQueueActiveState(ctx context.Context, wl *kueue.Workload, cqName string) (bool, error) { queue := kueue.ClusterQueue{} err := r.client.Get(ctx, types.NamespacedName{Name: cqName}, &queue) @@ -360,13 +433,13 @@ func (r *WorkloadReconciler) reconcileOnClusterQueueActiveState(ctx context.Cont if err != nil || !queue.DeletionTimestamp.IsZero() { log.V(3).Info("Workload is inadmissible because the ClusterQueue is terminating or missing", "clusterQueue", klog.KRef("", cqName)) - _ = workload.UnsetQuotaReservationWithCondition(wl, "Inadmissible", fmt.Sprintf("ClusterQueue %s is terminating or missing", cqName)) + _ = workload.UnsetQuotaReservationWithCondition(wl, kueue.WorkloadInadmissible, fmt.Sprintf("ClusterQueue %s is terminating or missing", cqName)) return true, workload.ApplyAdmissionStatus(ctx, r.client, wl, true) } if queueStopPolicy != kueue.None { log.V(3).Info("Workload is inadmissible because the ClusterQueue is stopped", "clusterQueue", klog.KRef("", cqName)) - _ = workload.UnsetQuotaReservationWithCondition(wl, "Inadmissible", fmt.Sprintf("ClusterQueue %s is stopped", cqName)) + _ = workload.UnsetQuotaReservationWithCondition(wl, kueue.WorkloadInadmissible, fmt.Sprintf("ClusterQueue %s is stopped", cqName)) return true, workload.ApplyAdmissionStatus(ctx, r.client, wl, true) } @@ -496,7 +569,7 @@ func (r *WorkloadReconciler) Create(e event.CreateEvent) bool { if !workload.HasQuotaReservation(wl) { if !r.queues.AddOrUpdateWorkload(wlCopy) { - log.V(2).Info("Queue for workload didn't exist; ignored for now") + log.V(2).Info("LocalQueue for workload didn't exist or not active; ignored for now") } return true } @@ -626,7 +699,7 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool { // function. if immediate { if !r.queues.AddOrUpdateWorkloadWithoutLock(wlCopy) { - log.V(2).Info("Queue for workload didn't exist; ignored for now") + log.V(2).Info("LocalQueue for workload didn't exist or not active; ignored for now") } } }) @@ -638,7 +711,7 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool { err := r.client.Get(ctx, client.ObjectKeyFromObject(wl), &updatedWl) if err == nil && workloadStatus(&updatedWl) == pending { if !r.queues.AddOrUpdateWorkload(wlCopy) { - log.V(2).Info("Queue for workload didn't exist; ignored for now") + log.V(2).Info("LocalQueue for workload didn't exist or not active; ignored for now") } else { log.V(3).Info("Workload requeued after backoff") } @@ -680,15 +753,15 @@ func (r *WorkloadReconciler) notifyWatchers(oldWl, newWl *kueue.Workload) { // SetupWithManager sets up the controller with the Manager. func (r *WorkloadReconciler) SetupWithManager(mgr ctrl.Manager, cfg *config.Configuration) error { - ruh := &resourceUpdatesHandler{ - r: r, - } + ruh := &resourceUpdatesHandler{r: r} + wqh := &workloadQueueHandler{r: r} return ctrl.NewControllerManagedBy(mgr). For(&kueue.Workload{}). WithOptions(controller.Options{NeedLeaderElection: ptr.To(false)}). Watches(&corev1.LimitRange{}, ruh). Watches(&nodev1.RuntimeClass{}, ruh). - Watches(&kueue.ClusterQueue{}, &workloadCqHandler{client: r.client}). + Watches(&kueue.ClusterQueue{}, wqh). + Watches(&kueue.LocalQueue{}, wqh). WithEventFilter(r). Complete(WithLeadingManager(mgr, r, &kueue.Workload{}, cfg)) } @@ -801,70 +874,90 @@ func (h *resourceUpdatesHandler) queueReconcileForPending(ctx context.Context, _ } } -type workloadCqHandler struct { - client client.Client +type workloadQueueHandler struct { + r *WorkloadReconciler } -var _ handler.EventHandler = (*workloadCqHandler)(nil) +var _ handler.EventHandler = (*workloadQueueHandler)(nil) // Create is called in response to a create event. -func (w *workloadCqHandler) Create(ctx context.Context, ev event.CreateEvent, wq workqueue.RateLimitingInterface) { - if cq, isQueue := ev.Object.(*kueue.ClusterQueue); isQueue { - w.queueReconcileForWorkloads(ctx, cq.Name, wq) +func (w *workloadQueueHandler) Create(ctx context.Context, ev event.CreateEvent, wq workqueue.RateLimitingInterface) { + if cq, isCq := ev.Object.(*kueue.ClusterQueue); isCq { + w.queueReconcileForWorkloadsOfClusterQueue(ctx, cq.Name, wq) + } + if lq, isLq := ev.Object.(*kueue.LocalQueue); isLq { + log := ctrl.LoggerFrom(ctx).WithValues("localQueue", klog.KObj(lq)) + ctx = ctrl.LoggerInto(ctx, log) + w.queueReconcileForWorkloadsOfLocalQueue(ctx, lq, wq) } } // Update is called in response to an update event. -func (w *workloadCqHandler) Update(ctx context.Context, ev event.UpdateEvent, wq workqueue.RateLimitingInterface) { - log := ctrl.LoggerFrom(ctx).WithValues("clusterQueue", klog.KObj(ev.ObjectNew)) - ctx = ctrl.LoggerInto(ctx, log) - log.V(5).Info("Workload cluster queue update event") - oldCq, oldIsQueue := ev.ObjectOld.(*kueue.ClusterQueue) - newCq, newIsQueue := ev.ObjectNew.(*kueue.ClusterQueue) +func (w *workloadQueueHandler) Update(ctx context.Context, ev event.UpdateEvent, wq workqueue.RateLimitingInterface) { + oldCq, oldIsCq := ev.ObjectOld.(*kueue.ClusterQueue) + newCq, newIsCq := ev.ObjectNew.(*kueue.ClusterQueue) + if oldIsCq && newIsCq { + log := ctrl.LoggerFrom(ctx).WithValues("clusterQueue", klog.KObj(ev.ObjectNew)) + ctx = ctrl.LoggerInto(ctx, log) + log.V(5).Info("Workload cluster queue update event") - if !oldIsQueue || !newIsQueue { - return + if !newCq.DeletionTimestamp.IsZero() || + !utilslices.CmpNoOrder(oldCq.Spec.AdmissionChecks, newCq.Spec.AdmissionChecks) || + !gocmp.Equal(oldCq.Spec.AdmissionChecksStrategy, newCq.Spec.AdmissionChecksStrategy) || + !ptr.Equal(oldCq.Spec.StopPolicy, newCq.Spec.StopPolicy) { + w.queueReconcileForWorkloadsOfClusterQueue(ctx, newCq.Name, wq) + } } - if !newCq.DeletionTimestamp.IsZero() || - !utilslices.CmpNoOrder(oldCq.Spec.AdmissionChecks, newCq.Spec.AdmissionChecks) || - !gocmp.Equal(oldCq.Spec.AdmissionChecksStrategy, newCq.Spec.AdmissionChecksStrategy) || - !ptr.Equal(oldCq.Spec.StopPolicy, newCq.Spec.StopPolicy) { - w.queueReconcileForWorkloads(ctx, newCq.Name, wq) + oldLq, oldIsLq := ev.ObjectOld.(*kueue.LocalQueue) + newLq, newIsLq := ev.ObjectNew.(*kueue.LocalQueue) + if oldIsLq && newIsLq { + log := ctrl.LoggerFrom(ctx).WithValues("localQueue", klog.KObj(ev.ObjectNew)) + ctx = ctrl.LoggerInto(ctx, log) + log.V(5).Info("Workload cluster queue update event") + + if !newLq.DeletionTimestamp.IsZero() || !ptr.Equal(oldLq.Spec.StopPolicy, newLq.Spec.StopPolicy) { + w.queueReconcileForWorkloadsOfLocalQueue(ctx, newLq, wq) + } } } // Delete is called in response to a delete event. -func (w *workloadCqHandler) Delete(ctx context.Context, ev event.DeleteEvent, wq workqueue.RateLimitingInterface) { - if cq, isQueue := ev.Object.(*kueue.ClusterQueue); isQueue { - w.queueReconcileForWorkloads(ctx, cq.Name, wq) +func (w *workloadQueueHandler) Delete(ctx context.Context, ev event.DeleteEvent, wq workqueue.RateLimitingInterface) { + if cq, isCq := ev.Object.(*kueue.ClusterQueue); isCq { + w.queueReconcileForWorkloadsOfClusterQueue(ctx, cq.Name, wq) + } + if lq, isLq := ev.Object.(*kueue.LocalQueue); isLq { + log := ctrl.LoggerFrom(ctx).WithValues("localQueue", klog.KObj(lq)) + ctx = ctrl.LoggerInto(ctx, log) + w.queueReconcileForWorkloadsOfLocalQueue(ctx, lq, wq) } } // Generic is called in response to an event of an unknown type or a synthetic event triggered as a cron or // external trigger request. -func (w *workloadCqHandler) Generic(_ context.Context, _ event.GenericEvent, _ workqueue.RateLimitingInterface) { +func (w *workloadQueueHandler) Generic(_ context.Context, _ event.GenericEvent, _ workqueue.RateLimitingInterface) { // nothing to do here } -func (w *workloadCqHandler) queueReconcileForWorkloads(ctx context.Context, cqName string, wq workqueue.RateLimitingInterface) { +func (w *workloadQueueHandler) queueReconcileForWorkloadsOfClusterQueue(ctx context.Context, cqName string, wq workqueue.RateLimitingInterface) { log := ctrl.LoggerFrom(ctx) lst := kueue.LocalQueueList{} - err := w.client.List(ctx, &lst, client.MatchingFields{indexer.QueueClusterQueueKey: cqName}) + err := w.r.client.List(ctx, &lst, client.MatchingFields{indexer.QueueClusterQueueKey: cqName}) if err != nil { log.Error(err, "Could not list cluster queues local queues") } for _, lq := range lst.Items { log := log.WithValues("localQueue", klog.KObj(&lq)) ctx = ctrl.LoggerInto(ctx, log) - w.queueReconcileForWorkloadsOfLocalQueue(ctx, lq.Namespace, lq.Name, wq) + w.queueReconcileForWorkloadsOfLocalQueue(ctx, &lq, wq) } } -func (w *workloadCqHandler) queueReconcileForWorkloadsOfLocalQueue(ctx context.Context, namespace string, name string, wq workqueue.RateLimitingInterface) { +func (w *workloadQueueHandler) queueReconcileForWorkloadsOfLocalQueue(ctx context.Context, lq *kueue.LocalQueue, wq workqueue.RateLimitingInterface) { log := ctrl.LoggerFrom(ctx) lst := kueue.WorkloadList{} - err := w.client.List(ctx, &lst, &client.ListOptions{Namespace: namespace}, client.MatchingFields{indexer.WorkloadQueueKey: name}) + err := w.r.client.List(ctx, &lst, &client.ListOptions{Namespace: lq.Namespace}, client.MatchingFields{indexer.WorkloadQueueKey: lq.Name}) if err != nil { log.Error(err, "Could not list cluster queues workloads") } diff --git a/pkg/controller/core/workload_controller_test.go b/pkg/controller/core/workload_controller_test.go index c900dfd1a2..1965a25021 100644 --- a/pkg/controller/core/workload_controller_test.go +++ b/pkg/controller/core/workload_controller_test.go @@ -714,6 +714,30 @@ func TestReconcile(t *testing.T) { }). Obj(), }, + "should set the WorkloadRequeued condition to true on LocalQueue started": { + cq: utiltesting.MakeClusterQueue("cq").Obj(), + lq: utiltesting.MakeLocalQueue("lq", "ns").ClusterQueue("cq").Obj(), + workload: utiltesting.MakeWorkload("wl", "ns"). + Active(true). + Queue("lq"). + Condition(metav1.Condition{ + Type: kueue.WorkloadRequeued, + Status: metav1.ConditionFalse, + Reason: kueue.WorkloadEvictedByLocalQueueStopped, + Message: "The LocalQueue is stopped", + }). + Obj(), + wantWorkload: utiltesting.MakeWorkload("wl", "ns"). + Active(true). + Queue("lq"). + Condition(metav1.Condition{ + Type: kueue.WorkloadRequeued, + Status: metav1.ConditionTrue, + Reason: kueue.WorkloadLocalQueueRestarted, + Message: "The LocalQueue was restarted after being stopped", + }). + Obj(), + }, "should set the Evicted condition with InactiveWorkload reason when the .spec.active=False": { workload: utiltesting.MakeWorkload("wl", "ns").Active(false).Obj(), wantWorkload: utiltesting.MakeWorkload("wl", "ns"). @@ -792,6 +816,135 @@ func TestReconcile(t *testing.T) { }). Obj(), }, + "should set the Evicted condition with ClusterQueueStopped reason when the StopPolicy is HoldAndDrain": { + cq: utiltesting.MakeClusterQueue("cq").StopPolicy(kueue.HoldAndDrain).Obj(), + lq: utiltesting.MakeLocalQueue("lq", "ns").ClusterQueue("cq").Obj(), + workload: utiltesting.MakeWorkload("wl", "ns"). + Active(true). + ReserveQuota(utiltesting.MakeAdmission("cq").Obj()). + Admitted(true). + Queue("lq"). + Obj(), + wantWorkload: utiltesting.MakeWorkload("wl", "ns"). + Active(true). + ReserveQuota(utiltesting.MakeAdmission("cq").Obj()). + Admitted(true). + Queue("lq"). + Condition(metav1.Condition{ + Type: kueue.WorkloadEvicted, + Status: metav1.ConditionTrue, + Reason: kueue.WorkloadEvictedByClusterQueueStopped, + Message: "The ClusterQueue is stopped", + }). + Obj(), + }, + "should set the Evicted condition with LocalQueueStopped reason when the StopPolicy is HoldAndDrain": { + cq: utiltesting.MakeClusterQueue("cq").Obj(), + lq: utiltesting.MakeLocalQueue("lq", "ns").ClusterQueue("cq").StopPolicy(kueue.HoldAndDrain).Obj(), + workload: utiltesting.MakeWorkload("wl", "ns"). + Active(true). + ReserveQuota(utiltesting.MakeAdmission("cq").Obj()). + Admitted(true). + Queue("lq"). + Obj(), + wantWorkload: utiltesting.MakeWorkload("wl", "ns"). + Active(true). + ReserveQuota(utiltesting.MakeAdmission("cq").Obj()). + Admitted(true). + Queue("lq"). + Condition(metav1.Condition{ + Type: kueue.WorkloadEvicted, + Status: metav1.ConditionTrue, + Reason: kueue.WorkloadEvictedByLocalQueueStopped, + Message: "The LocalQueue is stopped", + }). + Obj(), + }, + "should set status QuotaReserved conditions to False with reason Inadmissible if quota not reserved LocalQueue is not created": { + workload: utiltesting.MakeWorkload("wl", "ns"). + Active(true). + Queue("lq"). + Obj(), + wantWorkload: utiltesting.MakeWorkload("wl", "ns"). + Active(true). + Queue("lq"). + Condition(metav1.Condition{ + Type: kueue.WorkloadQuotaReserved, + Status: metav1.ConditionFalse, + Reason: kueue.WorkloadInadmissible, + Message: "LocalQueue lq doesn't exist", + }). + Obj(), + }, + "should set status QuotaReserved conditions to False with reason Inadmissible if quota not reserved LocalQueue StopPolicy=Hold": { + lq: utiltesting.MakeLocalQueue("lq", "ns").StopPolicy(kueue.Hold).Obj(), + workload: utiltesting.MakeWorkload("wl", "ns"). + Active(true). + Queue("lq"). + Obj(), + wantWorkload: utiltesting.MakeWorkload("wl", "ns"). + Active(true). + Queue("lq"). + Condition(metav1.Condition{ + Type: kueue.WorkloadQuotaReserved, + Status: metav1.ConditionFalse, + Reason: kueue.WorkloadInadmissible, + Message: "LocalQueue lq is inactive", + }). + Obj(), + }, + "should set status QuotaReserved conditions to False with reason Inadmissible if quota not reserved LocalQueue StopPolicy=HoldAndDrain": { + lq: utiltesting.MakeLocalQueue("lq", "ns").StopPolicy(kueue.HoldAndDrain).Obj(), + workload: utiltesting.MakeWorkload("wl", "ns"). + Active(true). + Queue("lq"). + Obj(), + wantWorkload: utiltesting.MakeWorkload("wl", "ns"). + Active(true). + Queue("lq"). + Condition(metav1.Condition{ + Type: kueue.WorkloadQuotaReserved, + Status: metav1.ConditionFalse, + Reason: kueue.WorkloadInadmissible, + Message: "LocalQueue lq is inactive", + }). + Obj(), + }, + "should set status QuotaReserved conditions to False with reason Inadmissible if quota not reserved ClusterQueue is not created": { + lq: utiltesting.MakeLocalQueue("lq", "ns").ClusterQueue("cq").StopPolicy(kueue.None).Obj(), + workload: utiltesting.MakeWorkload("wl", "ns"). + Active(true). + Queue("lq"). + Obj(), + wantWorkload: utiltesting.MakeWorkload("wl", "ns"). + Active(true). + Queue("lq"). + Condition(metav1.Condition{ + Type: kueue.WorkloadQuotaReserved, + Status: metav1.ConditionFalse, + Reason: kueue.WorkloadInadmissible, + Message: "ClusterQueue cq doesn't exist", + }). + Obj(), + }, + "should set status QuotaReserved conditions to False with reason Inadmissible if quota not reserved ClusterQueue StopPolicy=Hold": { + lq: utiltesting.MakeLocalQueue("lq", "ns").ClusterQueue("cq").StopPolicy(kueue.None).Obj(), + cq: utiltesting.MakeClusterQueue("cq").StopPolicy(kueue.Hold).Obj(), + workload: utiltesting.MakeWorkload("wl", "ns"). + Active(true). + Queue("lq"). + Obj(), + wantWorkload: utiltesting.MakeWorkload("wl", "ns"). + Active(true). + Queue("lq"). + Condition(metav1.Condition{ + Type: kueue.WorkloadQuotaReserved, + Status: metav1.ConditionFalse, + Reason: kueue.WorkloadInadmissible, + Message: "ClusterQueue cq is inactive", + }). + Obj(), + }, } for name, tc := range cases { t.Run(name, func(t *testing.T) { @@ -817,6 +970,12 @@ func TestReconcile(t *testing.T) { if err := qManager.AddClusterQueue(ctx, tc.cq); err != nil { t.Errorf("couldn't add the cluster queue to the cache: %v", err) } + } + + if tc.lq != nil { + if err := cl.Create(ctx, tc.lq); err != nil { + t.Errorf("couldn't create the local queue: %v", err) + } if err := qManager.AddLocalQueue(ctx, tc.lq); err != nil { t.Errorf("couldn't add the local queue to the cache: %v", err) } diff --git a/pkg/controller/jobframework/reconciler.go b/pkg/controller/jobframework/reconciler.go index ec8b53363a..c5fe6f96c0 100644 --- a/pkg/controller/jobframework/reconciler.go +++ b/pkg/controller/jobframework/reconciler.go @@ -417,13 +417,9 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques if workload.HasQuotaReservation(wl) { if !job.IsActive() { log.V(6).Info("The job is no longer active, clear the workloads admission") - if evCond.Reason == kueue.WorkloadEvictedByDeactivation || - evCond.Reason == kueue.WorkloadEvictedByPodsReadyTimeout || - evCond.Reason == kueue.WorkloadEvictedByClusterQueueStopped { - workload.SetRequeuedCondition(wl, evCond.Reason, evCond.Message, false) - } else { - workload.SetRequeuedCondition(wl, evCond.Reason, evCond.Message, true) - } + // The requeued condition status set to true only on EvictedByPreemption or EvictedByAdmissionCheck + setRequeued := evCond.Reason == kueue.WorkloadEvictedByPreemption || evCond.Reason == kueue.WorkloadEvictedByAdmissionCheck + workload.SetRequeuedCondition(wl, evCond.Reason, evCond.Message, setRequeued) _ = workload.UnsetQuotaReservationWithCondition(wl, "Pending", evCond.Message) err := workload.ApplyAdmissionStatus(ctx, r.client, wl, true) if err != nil { diff --git a/pkg/controller/jobs/job/job_controller_test.go b/pkg/controller/jobs/job/job_controller_test.go index 3b5c287325..87dd6f45f2 100644 --- a/pkg/controller/jobs/job/job_controller_test.go +++ b/pkg/controller/jobs/job/job_controller_test.go @@ -840,6 +840,86 @@ func TestReconciler(t *testing.T) { }, }, }, + "when workload is evicted due to local queue stopped, job gets suspended": { + job: *baseJobWrapper.Clone(). + Suspend(false). + Obj(), + wantJob: *baseJobWrapper.Clone(). + Suspend(true). + Obj(), + workloads: []kueue.Workload{ + *baseWorkloadWrapper.Clone(). + Admitted(true). + Condition(metav1.Condition{ + Type: kueue.WorkloadEvicted, + Status: metav1.ConditionTrue, + Reason: kueue.WorkloadEvictedByLocalQueueStopped, + Message: "The LocalQueue is stopped", + }). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "check", + State: kueue.CheckStateReady, + PodSetUpdates: []kueue.PodSetUpdate{ + { + Name: "main", + Labels: map[string]string{ + "ac-key": "ac-value", + }, + }, + }, + }). + Obj(), + }, + wantWorkloads: []kueue.Workload{ + *baseWorkloadWrapper.Clone(). + Admitted(true). + Condition(metav1.Condition{ + Type: kueue.WorkloadAdmitted, + Status: metav1.ConditionFalse, + Reason: "NoReservation", + Message: "The workload has no reservation", + }). + Condition(metav1.Condition{ + Type: kueue.WorkloadQuotaReserved, + Status: metav1.ConditionFalse, + Reason: "Pending", + Message: "The LocalQueue is stopped", + }). + Condition(metav1.Condition{ + Type: kueue.WorkloadRequeued, + Status: metav1.ConditionFalse, + Reason: kueue.WorkloadEvictedByLocalQueueStopped, + Message: "The LocalQueue is stopped", + }). + Condition(metav1.Condition{ + Type: kueue.WorkloadEvicted, + Status: metav1.ConditionTrue, + Reason: kueue.WorkloadEvictedByLocalQueueStopped, + Message: "The LocalQueue is stopped", + }). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "check", + State: kueue.CheckStateReady, + PodSetUpdates: []kueue.PodSetUpdate{ + { + Name: "main", + Labels: map[string]string{ + "ac-key": "ac-value", + }, + }, + }, + }). + Obj(), + }, + wantEvents: []utiltesting.EventRecord{ + { + Key: types.NamespacedName{Name: "job", Namespace: "ns"}, + EventType: "Normal", + Reason: "Stopped", + Message: "The LocalQueue is stopped", + }, + }, + }, "when workload is evicted due to preemption, job gets suspended": { job: *baseJobWrapper.Clone(). Suspend(false). diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go index 9173043498..382323e0a7 100644 --- a/pkg/util/testing/wrappers.go +++ b/pkg/util/testing/wrappers.go @@ -543,6 +543,12 @@ func (q *LocalQueueWrapper) ClusterQueue(c string) *LocalQueueWrapper { return q } +// StopPolicy sets the stop policy. +func (q *LocalQueueWrapper) StopPolicy(p kueue.StopPolicy) *LocalQueueWrapper { + q.Spec.StopPolicy = &p + return q +} + // PendingWorkloads updates the pendingWorkloads in status. func (q *LocalQueueWrapper) PendingWorkloads(n int32) *LocalQueueWrapper { q.Status.PendingWorkloads = n @@ -675,6 +681,12 @@ func (c *ClusterQueueWrapper) StopPolicy(p kueue.StopPolicy) *ClusterQueueWrappe return c } +// DeletionTimestamp sets a deletion timestamp for the cluster queue. +func (c *ClusterQueueWrapper) DeletionTimestamp(t time.Time) *ClusterQueueWrapper { + c.ClusterQueue.DeletionTimestamp = ptr.To(metav1.NewTime(t).Rfc3339Copy()) + return c +} + func (c *ClusterQueueWrapper) Label(k, v string) *ClusterQueueWrapper { if c.Labels == nil { c.Labels = make(map[string]string) diff --git a/site/content/en/docs/reference/kueue.v1beta1.md b/site/content/en/docs/reference/kueue.v1beta1.md index baa7d9bf6f..5f9d3e3499 100644 --- a/site/content/en/docs/reference/kueue.v1beta1.md +++ b/site/content/en/docs/reference/kueue.v1beta1.md @@ -1170,6 +1170,20 @@ There could be up to 16 resources.

clusterQueue is a reference to a clusterQueue that backs this localQueue.

+stopPolicy
+StopPolicy + + +

stopPolicy - if set to a value different from None, the LocalQueue is considered Inactive, +no new reservation being made.

+

Depending on its value, its associated workloads will:

+ + + @@ -1792,6 +1806,8 @@ words, it's the used quota that is over the nominalQuota.

- [ClusterQueueSpec](#kueue-x-k8s-io-v1beta1-ClusterQueueSpec) +- [LocalQueueSpec](#kueue-x-k8s-io-v1beta1-LocalQueueSpec) + diff --git a/test/integration/controller/core/workload_controller_test.go b/test/integration/controller/core/workload_controller_test.go index f8ede73b63..7643c8fd87 100644 --- a/test/integration/controller/core/workload_controller_test.go +++ b/test/integration/controller/core/workload_controller_test.go @@ -90,7 +90,7 @@ var _ = ginkgo.Describe("Workload controller", ginkgo.Ordered, ginkgo.ContinueOn gomega.BeComparableTo(metav1.Condition{ Type: kueue.WorkloadQuotaReserved, Status: metav1.ConditionFalse, - Reason: "Inadmissible", + Reason: kueue.WorkloadInadmissible, Message: message, }, util.IgnoreConditionTimestampsAndObservedGeneration), ) @@ -113,7 +113,7 @@ var _ = ginkgo.Describe("Workload controller", ginkgo.Ordered, ginkgo.ContinueOn gomega.BeComparableTo(metav1.Condition{ Type: kueue.WorkloadQuotaReserved, Status: metav1.ConditionFalse, - Reason: "Inadmissible", + Reason: kueue.WorkloadInadmissible, Message: message, }, util.IgnoreConditionTimestampsAndObservedGeneration), ) @@ -140,7 +140,7 @@ var _ = ginkgo.Describe("Workload controller", ginkgo.Ordered, ginkgo.ContinueOn gomega.BeComparableTo(metav1.Condition{ Type: kueue.WorkloadQuotaReserved, Status: metav1.ConditionFalse, - Reason: "Inadmissible", + Reason: kueue.WorkloadInadmissible, Message: message, }, util.IgnoreConditionTimestampsAndObservedGeneration), ) diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index 10119107b1..0f26191842 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -96,6 +96,7 @@ var _ = ginkgo.Describe("Scheduler", func() { preemptionQueue *kueue.LocalQueue admissionCheckQueue *kueue.LocalQueue cqsStopPolicy *kueue.StopPolicy + lqsStopPolicy *kueue.StopPolicy ) ginkgo.JustBeforeEach(func() { @@ -103,6 +104,7 @@ var _ = ginkgo.Describe("Scheduler", func() { gomega.Expect(k8sClient.Create(ctx, spotTaintedFlavor)).To(gomega.Succeed()) gomega.Expect(k8sClient.Create(ctx, spotUntaintedFlavor)).To(gomega.Succeed()) cqsStopPolicy := ptr.Deref(cqsStopPolicy, kueue.None) + lqsStopPolicy := ptr.Deref(lqsStopPolicy, kueue.None) admissionCheck1 = testing.MakeAdmissionCheck("check1").ControllerName("ctrl").Obj() gomega.Expect(k8sClient.Create(ctx, admissionCheck1)).Should(gomega.Succeed()) @@ -172,22 +174,40 @@ var _ = ginkgo.Describe("Scheduler", func() { Obj() gomega.Expect(k8sClient.Create(ctx, admissionCheckClusterQ)).Should(gomega.Succeed()) - prodQueue = testing.MakeLocalQueue("prod-queue", ns.Name).ClusterQueue(prodClusterQ.Name).Obj() + prodQueue = testing.MakeLocalQueue("prod-queue", ns.Name). + StopPolicy(lqsStopPolicy). + ClusterQueue(prodClusterQ.Name). + Obj() gomega.Expect(k8sClient.Create(ctx, prodQueue)).Should(gomega.Succeed()) - devQueue = testing.MakeLocalQueue("dev-queue", ns.Name).ClusterQueue(devClusterQ.Name).Obj() + devQueue = testing.MakeLocalQueue("dev-queue", ns.Name). + ClusterQueue(devClusterQ.Name). + StopPolicy(lqsStopPolicy). + Obj() gomega.Expect(k8sClient.Create(ctx, devQueue)).Should(gomega.Succeed()) - podsCountQueue = testing.MakeLocalQueue("pods-count-queue", ns.Name).ClusterQueue(podsCountClusterQ.Name).Obj() + podsCountQueue = testing.MakeLocalQueue("pods-count-queue", ns.Name). + ClusterQueue(podsCountClusterQ.Name). + StopPolicy(lqsStopPolicy). + Obj() gomega.Expect(k8sClient.Create(ctx, podsCountQueue)).Should(gomega.Succeed()) - podsCountOnlyQueue = testing.MakeLocalQueue("pods-count-only-queue", ns.Name).ClusterQueue(podsCountOnlyClusterQ.Name).Obj() + podsCountOnlyQueue = testing.MakeLocalQueue("pods-count-only-queue", ns.Name). + ClusterQueue(podsCountOnlyClusterQ.Name). + StopPolicy(lqsStopPolicy). + Obj() gomega.Expect(k8sClient.Create(ctx, podsCountOnlyQueue)).Should(gomega.Succeed()) - preemptionQueue = testing.MakeLocalQueue("preemption-queue", ns.Name).ClusterQueue(preemptionClusterQ.Name).Obj() + preemptionQueue = testing.MakeLocalQueue("preemption-queue", ns.Name). + ClusterQueue(preemptionClusterQ.Name). + StopPolicy(lqsStopPolicy). + Obj() gomega.Expect(k8sClient.Create(ctx, preemptionQueue)).Should(gomega.Succeed()) - admissionCheckQueue = testing.MakeLocalQueue("admission-check-queue", ns.Name).ClusterQueue(admissionCheckClusterQ.Name).Obj() + admissionCheckQueue = testing.MakeLocalQueue("admission-check-queue", ns.Name). + ClusterQueue(admissionCheckClusterQ.Name). + StopPolicy(lqsStopPolicy). + Obj() gomega.Expect(k8sClient.Create(ctx, admissionCheckQueue)).Should(gomega.Succeed()) }) @@ -430,12 +450,14 @@ var _ = ginkgo.Describe("Scheduler", func() { }) }) - ginkgo.When("Hold at startup", func() { + ginkgo.When("Hold ClusterQueue at startup", func() { ginkgo.BeforeEach(func() { cqsStopPolicy = ptr.To(kueue.Hold) + lqsStopPolicy = nil }) ginkgo.AfterEach(func() { cqsStopPolicy = nil + lqsStopPolicy = nil }) ginkgo.It("Should admit workloads according to their priorities", func() { const lowPrio, midPrio, highPrio = 0, 10, 100 @@ -454,7 +476,57 @@ var _ = ginkgo.Describe("Scheduler", func() { util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 5) util.ExpectReservingActiveWorkloadsMetric(prodClusterQ, 0) - util.UnholdQueue(ctx, k8sClient, prodClusterQ) + util.UnholdClusterQueue(ctx, k8sClient, prodClusterQ) + + ginkgo.By("checking the workloads with lower priority do not get admitted") + util.ExpectWorkloadsToBePending(ctx, k8sClient, wlLow, wlMid1, wlMid2) + + ginkgo.By("checking the workloads with high priority get admitted") + util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, prodClusterQ.Name, wlHigh1, wlHigh2) + + util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 3) + util.ExpectReservingActiveWorkloadsMetric(prodClusterQ, 2) + util.ExpectQuotaReservedWorkloadsTotalMetric(prodClusterQ, 2) + util.ExpectAdmittedWorkloadsTotalMetric(prodClusterQ, 2) + + ginkgo.By("after the high priority workloads finish, only the mid priority workloads should be admitted") + util.FinishWorkloads(ctx, k8sClient, wlHigh1, wlHigh2) + + util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, prodClusterQ.Name, wlMid1, wlMid2) + util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 1) + util.ExpectReservingActiveWorkloadsMetric(prodClusterQ, 2) + util.ExpectQuotaReservedWorkloadsTotalMetric(prodClusterQ, 4) + util.ExpectAdmittedWorkloadsTotalMetric(prodClusterQ, 4) + }) + }) + + ginkgo.When("Hold LocalQueue at startup", func() { + ginkgo.BeforeEach(func() { + cqsStopPolicy = nil + lqsStopPolicy = ptr.To(kueue.Hold) + }) + ginkgo.AfterEach(func() { + cqsStopPolicy = nil + lqsStopPolicy = nil + }) + ginkgo.It("Should admit workloads according to their priorities", func() { + const lowPrio, midPrio, highPrio = 0, 10, 100 + + wlLow := testing.MakeWorkload("wl-low-priority", ns.Name).Queue(prodQueue.Name).Request(corev1.ResourceCPU, "2").Priority(lowPrio).Obj() + gomega.Expect(k8sClient.Create(ctx, wlLow)).Should(gomega.Succeed()) + wlMid1 := testing.MakeWorkload("wl-mid-priority-1", ns.Name).Queue(prodQueue.Name).Request(corev1.ResourceCPU, "2").Priority(midPrio).Obj() + gomega.Expect(k8sClient.Create(ctx, wlMid1)).Should(gomega.Succeed()) + wlMid2 := testing.MakeWorkload("wl-mid-priority-2", ns.Name).Queue(prodQueue.Name).Request(corev1.ResourceCPU, "2").Priority(midPrio).Obj() + gomega.Expect(k8sClient.Create(ctx, wlMid2)).Should(gomega.Succeed()) + wlHigh1 := testing.MakeWorkload("wl-high-priority-1", ns.Name).Queue(prodQueue.Name).Request(corev1.ResourceCPU, "2").Priority(highPrio).Obj() + gomega.Expect(k8sClient.Create(ctx, wlHigh1)).Should(gomega.Succeed()) + wlHigh2 := testing.MakeWorkload("wl-high-priority-2", ns.Name).Queue(prodQueue.Name).Request(corev1.ResourceCPU, "2").Priority(highPrio).Obj() + gomega.Expect(k8sClient.Create(ctx, wlHigh2)).Should(gomega.Succeed()) + + util.ExpectPendingWorkloadsMetric(prodClusterQ, 0, 0) + util.ExpectReservingActiveWorkloadsMetric(prodClusterQ, 0) + + util.UnholdLocalQueue(ctx, k8sClient, prodQueue) ginkgo.By("checking the workloads with lower priority do not get admitted") util.ExpectWorkloadsToBePending(ctx, k8sClient, wlLow, wlMid1, wlMid2) diff --git a/test/util/util.go b/test/util/util.go index d33ff2b723..eb7241d65e 100644 --- a/test/util/util.go +++ b/test/util/util.go @@ -211,7 +211,7 @@ func DeleteRuntimeClass(ctx context.Context, c client.Client, runtimeClass *node return nil } -func UnholdQueue(ctx context.Context, k8sClient client.Client, cq *kueue.ClusterQueue) { +func UnholdClusterQueue(ctx context.Context, k8sClient client.Client, cq *kueue.ClusterQueue) { gomega.EventuallyWithOffset(1, func(g gomega.Gomega) { var cqCopy kueue.ClusterQueue g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(cq), &cqCopy)).To(gomega.Succeed()) @@ -223,6 +223,18 @@ func UnholdQueue(ctx context.Context, k8sClient client.Client, cq *kueue.Cluster }, Timeout, Interval).Should(gomega.Succeed()) } +func UnholdLocalQueue(ctx context.Context, k8sClient client.Client, lq *kueue.LocalQueue) { + gomega.EventuallyWithOffset(1, func(g gomega.Gomega) { + var lqCopy kueue.LocalQueue + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(lq), &lqCopy)).To(gomega.Succeed()) + if ptr.Deref(lqCopy.Spec.StopPolicy, kueue.None) == kueue.None { + return + } + lqCopy.Spec.StopPolicy = ptr.To(kueue.None) + g.Expect(k8sClient.Update(ctx, &lqCopy)).To(gomega.Succeed()) + }, Timeout, Interval).Should(gomega.Succeed()) +} + func FinishWorkloads(ctx context.Context, k8sClient client.Client, workloads ...*kueue.Workload) { for _, w := range workloads { gomega.EventuallyWithOffset(1, func() error {