Skip to content

Commit

Permalink
Allow stopping admission from a specific LocalQueue.
Browse files Browse the repository at this point in the history
  • Loading branch information
mbobrovskyi committed May 13, 2024
1 parent c55f10c commit 8532514
Show file tree
Hide file tree
Showing 19 changed files with 724 additions and 99 deletions.
8 changes: 0 additions & 8 deletions apis/kueue/v1beta1/clusterqueue_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,6 @@ const (
BestEffortFIFO QueueingStrategy = "BestEffortFIFO"
)

type StopPolicy string

const (
None StopPolicy = "None"
HoldAndDrain StopPolicy = "HoldAndDrain"
Hold StopPolicy = "Hold"
)

// +kubebuilder:validation:XValidation:rule="self.flavors.all(x, size(x.resources) == size(self.coveredResources))", message="flavors must have the same number of resources as the coveredResources"
type ResourceGroup struct {
// coveredResources is the list of resources covered by the flavors in this
Expand Down
14 changes: 14 additions & 0 deletions apis/kueue/v1beta1/localqueue_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,20 @@ type LocalQueueSpec struct {
// clusterQueue is a reference to a clusterQueue that backs this localQueue.
// +kubebuilder:validation:XValidation:rule="self == oldSelf", message="field is immutable"
ClusterQueue ClusterQueueReference `json:"clusterQueue,omitempty"`

// stopPolicy - if set to a value different from None, the LocalQueue is considered Inactive,
// no new reservation being made.
//
// Depending on its value, its associated workloads will:
//
// - None - Workloads are admitted
// - HoldAndDrain - Admitted workloads are evicted and Reserving workloads will cancel the reservation.
// - Hold - Admitted workloads will run to completion and Reserving workloads will cancel the reservation.
//
// +optional
// +kubebuilder:validation:Enum=None;Hold;HoldAndDrain
// +kubebuilder:default="None"
StopPolicy *StopPolicy `json:"stopPolicy,omitempty"`
}

// ClusterQueueReference is the name of the ClusterQueue.
Expand Down
25 changes: 25 additions & 0 deletions apis/kueue/v1beta1/stop_policy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1beta1

type StopPolicy string

const (
None StopPolicy = "None"
HoldAndDrain StopPolicy = "HoldAndDrain"
Hold StopPolicy = "Hold"
)
12 changes: 12 additions & 0 deletions apis/kueue/v1beta1/workload_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,10 @@ const (
)

const (
// WorkloadInadmissible means that the Workload can't reserve quota
// due to LocalQueue or ClusterQueue doesn't exist or inactive.
WorkloadInadmissible = "Inadmissible"

// WorkloadEvictedByPreemption indicates that the workload was evicted
// in order to free resources for a workload with a higher priority.
WorkloadEvictedByPreemption = "Preempted"
Expand All @@ -346,6 +350,10 @@ const (
// because the ClusterQueue is Stopped.
WorkloadEvictedByClusterQueueStopped = "ClusterQueueStopped"

// WorkloadEvictedByLocalQueueStopped indicates that the workload was evicted
// because the LocalQueue is Stopped.
WorkloadEvictedByLocalQueueStopped = "LocalQueueStopped"

// WorkloadEvictedByDeactivation indicates that the workload was evicted
// because spec.active is set to false.
WorkloadEvictedByDeactivation = "InactiveWorkload"
Expand All @@ -361,6 +369,10 @@ const (
// WorkloadClusterQueueRestarted indicates that the workload was requeued because
// cluster queue was restarted after being stopped.
WorkloadClusterQueueRestarted = "ClusterQueueRestarted"

// WorkloadLocalQueueRestarted indicates that the workload was requeued because
// local queue was restarted after being stopped.
WorkloadLocalQueueRestarted = "LocalQueueRestarted"
)

const (
Expand Down
7 changes: 6 additions & 1 deletion apis/kueue/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions charts/kueue/templates/crd/kueue.x-k8s.io_localqueues.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,24 @@ spec:
x-kubernetes-validations:
- message: field is immutable
rule: self == oldSelf
stopPolicy:
default: None
description: |-
stopPolicy - if set to a value different from None, the LocalQueue is considered Inactive,
no new reservation being made.
Depending on its value, its associated workloads will:
- None - Workloads are admitted
- HoldAndDrain - Admitted workloads are evicted and Reserving workloads will cancel the reservation.
- Hold - Admitted workloads will run to completion and Reserving workloads will cancel the reservation.
enum:
- None
- Hold
- HoldAndDrain
type: string
type: object
status:
description: LocalQueueStatus defines the observed state of LocalQueue
Expand Down
9 changes: 9 additions & 0 deletions client-go/applyconfiguration/kueue/v1beta1/localqueuespec.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions config/components/crd/bases/kueue.x-k8s.io_localqueues.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,24 @@ spec:
x-kubernetes-validations:
- message: field is immutable
rule: self == oldSelf
stopPolicy:
default: None
description: |-
stopPolicy - if set to a value different from None, the LocalQueue is considered Inactive,
no new reservation being made.
Depending on its value, its associated workloads will:
- None - Workloads are admitted
- HoldAndDrain - Admitted workloads are evicted and Reserving workloads will cancel the reservation.
- Hold - Admitted workloads will run to completion and Reserving workloads will cancel the reservation.
enum:
- None
- Hold
- HoldAndDrain
type: string
type: object
status:
description: LocalQueueStatus defines the observed state of LocalQueue
Expand Down
98 changes: 71 additions & 27 deletions pkg/controller/core/localqueue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ import (
)

const (
queueIsInactiveMsg = "Can't submit new workloads to clusterQueue"
failedUpdateLqStatusMsg = "Failed to retrieve localQueue status"
localQueueIsInactiveMsg = "Can't submit new workloads to localQueue"
clusterQueueIsInactiveMsg = "Can't submit new workloads to clusterQueue"
failedUpdateLqStatusMsg = "Failed to retrieve localQueue status"
)

const (
localQueueIsInactiveReason = "LocalQueueIsInactive"
clusterQueueIsInactiveReason = "ClusterQueueIsInactive"
)

Expand Down Expand Up @@ -99,19 +101,25 @@ func (r *LocalQueueReconciler) Reconcile(ctx context.Context, req ctrl.Request)
ctx = ctrl.LoggerInto(ctx, log)
log.V(2).Info("Reconciling LocalQueue")

var err error
if ptr.Deref(queueObj.Spec.StopPolicy, kueue.None) != kueue.None {
err = r.UpdateStatusIfChanged(ctx, &queueObj, metav1.ConditionFalse, localQueueIsInactiveReason, localQueueIsInactiveMsg)
return ctrl.Result{}, client.IgnoreNotFound(err)
}

var cq kueue.ClusterQueue
err := r.client.Get(ctx, client.ObjectKey{Name: string(queueObj.Spec.ClusterQueue)}, &cq)
err = r.client.Get(ctx, client.ObjectKey{Name: string(queueObj.Spec.ClusterQueue)}, &cq)
if err != nil {
if apierrors.IsNotFound(err) {
err = r.UpdateStatusIfChanged(ctx, &queueObj, metav1.ConditionFalse, "ClusterQueueDoesNotExist", queueIsInactiveMsg)
err = r.UpdateStatusIfChanged(ctx, &queueObj, metav1.ConditionFalse, "ClusterQueueDoesNotExist", clusterQueueIsInactiveMsg)
}
return ctrl.Result{}, client.IgnoreNotFound(err)
}
if meta.IsStatusConditionTrue(cq.Status.Conditions, kueue.ClusterQueueActive) {
err = r.UpdateStatusIfChanged(ctx, &queueObj, metav1.ConditionTrue, "Ready", "Can submit new workloads to clusterQueue")
return ctrl.Result{}, client.IgnoreNotFound(err)
}
err = r.UpdateStatusIfChanged(ctx, &queueObj, metav1.ConditionFalse, clusterQueueIsInactiveReason, queueIsInactiveMsg)
err = r.UpdateStatusIfChanged(ctx, &queueObj, metav1.ConditionFalse, clusterQueueIsInactiveReason, clusterQueueIsInactiveMsg)
return ctrl.Result{}, client.IgnoreNotFound(err)
}

Expand All @@ -123,6 +131,9 @@ func (r *LocalQueueReconciler) Create(e event.CreateEvent) bool {
}
log := r.log.WithValues("localQueue", klog.KObj(q))
log.V(2).Info("LocalQueue create event")
if ptr.Deref(q.Spec.StopPolicy, kueue.None) != kueue.None {
return true
}
ctx := logr.NewContext(context.Background(), log)
if err := r.queues.AddLocalQueue(ctx, q); err != nil {
log.Error(err, "Failed to add localQueue to the queueing system")
Expand All @@ -146,20 +157,45 @@ func (r *LocalQueueReconciler) Delete(e event.DeleteEvent) bool {
}

func (r *LocalQueueReconciler) Update(e event.UpdateEvent) bool {
q, match := e.ObjectNew.(*kueue.LocalQueue)
if !match {
oldLq, oldIsLq := e.ObjectOld.(*kueue.LocalQueue)
newLq, newIsLq := e.ObjectNew.(*kueue.LocalQueue)
if !oldIsLq || !newIsLq {
// No need to interact with the queue manager for other objects.
return true
}
log := r.log.WithValues("localQueue", klog.KObj(q))
log := r.log.WithValues("localQueue", klog.KObj(newLq))
log.V(2).Info("Queue update event")
if err := r.queues.UpdateLocalQueue(q); err != nil {
log.Error(err, "Failed to update queue in the queueing system")

oldStopPolicy := ptr.Deref(oldLq.Spec.StopPolicy, kueue.None)
newStopPolicy := ptr.Deref(newLq.Spec.StopPolicy, kueue.None)

if oldStopPolicy == kueue.None && newStopPolicy != kueue.None {
r.queues.DeleteLocalQueue(oldLq)
r.cache.DeleteLocalQueue(oldLq)
return true
}

if oldStopPolicy != kueue.None && newStopPolicy == kueue.None {
ctx := logr.NewContext(context.Background(), log)
if err := r.queues.AddLocalQueue(ctx, newLq); err != nil {
log.Error(err, "Failed to add localQueue to the queueing system")
}
if err := r.cache.AddLocalQueue(newLq); err != nil {
log.Error(err, "Failed to add localQueue to the cache")
}
return true
}
oldQ := e.ObjectOld.(*kueue.LocalQueue)
if err := r.cache.UpdateLocalQueue(oldQ, q); err != nil {
log.Error(err, "Failed to update localQueue in the cache")

if newStopPolicy == kueue.None {
if err := r.queues.UpdateLocalQueue(newLq); err != nil {
log.Error(err, "Failed to update queue in the queueing system")
}
oldQ := e.ObjectOld.(*kueue.LocalQueue)
if err := r.cache.UpdateLocalQueue(oldQ, newLq); err != nil {
log.Error(err, "Failed to update localQueue in the cache")
}
}

return true
}

Expand Down Expand Up @@ -275,21 +311,29 @@ func (r *LocalQueueReconciler) UpdateStatusIfChanged(
reason, msg string,
) error {
oldStatus := queue.Status.DeepCopy()
pendingWls, err := r.queues.PendingWorkloads(queue)
if err != nil {
r.log.Error(err, failedUpdateLqStatusMsg)
return err
}
stats, err := r.cache.LocalQueueUsage(queue)
if err != nil {
r.log.Error(err, failedUpdateLqStatusMsg)
return err
if ptr.Deref(queue.Spec.StopPolicy, kueue.None) == kueue.None {
pendingWls, err := r.queues.PendingWorkloads(queue)
if err != nil {
r.log.Error(err, failedUpdateLqStatusMsg)
return err
}
stats, err := r.cache.LocalQueueUsage(queue)
if err != nil {
r.log.Error(err, failedUpdateLqStatusMsg)
return err
}
queue.Status.PendingWorkloads = pendingWls
queue.Status.ReservingWorkloads = int32(stats.ReservingWorkloads)
queue.Status.AdmittedWorkloads = int32(stats.AdmittedWorkloads)
queue.Status.FlavorsReservation = stats.ReservedResources
queue.Status.FlavorUsage = stats.AdmittedResources
} else {
queue.Status.PendingWorkloads = 0
queue.Status.ReservingWorkloads = 0
queue.Status.AdmittedWorkloads = 0
queue.Status.FlavorsReservation = nil
queue.Status.FlavorUsage = nil
}
queue.Status.PendingWorkloads = pendingWls
queue.Status.ReservingWorkloads = int32(stats.ReservingWorkloads)
queue.Status.AdmittedWorkloads = int32(stats.AdmittedWorkloads)
queue.Status.FlavorsReservation = stats.ReservedResources
queue.Status.FlavorUsage = stats.AdmittedResources
if len(conditionStatus) != 0 && len(reason) != 0 && len(msg) != 0 {
meta.SetStatusCondition(&queue.Status.Conditions, metav1.Condition{
Type: kueue.LocalQueueActive,
Expand Down

0 comments on commit 8532514

Please sign in to comment.