Skip to content

Commit

Permalink
Allow stopping admission from a specific LocalQueue.
Browse files Browse the repository at this point in the history
  • Loading branch information
mbobrovskyi committed May 14, 2024
1 parent 5f455b0 commit 6efdcff
Show file tree
Hide file tree
Showing 19 changed files with 706 additions and 99 deletions.
8 changes: 0 additions & 8 deletions apis/kueue/v1beta1/clusterqueue_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,6 @@ const (
BestEffortFIFO QueueingStrategy = "BestEffortFIFO"
)

type StopPolicy string

const (
None StopPolicy = "None"
HoldAndDrain StopPolicy = "HoldAndDrain"
Hold StopPolicy = "Hold"
)

// +kubebuilder:validation:XValidation:rule="self.flavors.all(x, size(x.resources) == size(self.coveredResources))", message="flavors must have the same number of resources as the coveredResources"
type ResourceGroup struct {
// coveredResources is the list of resources covered by the flavors in this
Expand Down
9 changes: 8 additions & 1 deletion apis/kueue/v1beta1/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ package v1beta1

const (
ResourceInUseFinalizerName = "kueue.x-k8s.io/resource-in-use"
DefaultPodSetName = "main"
)

type StopPolicy string

DefaultPodSetName = "main"
const (
None StopPolicy = "None"
HoldAndDrain StopPolicy = "HoldAndDrain"
Hold StopPolicy = "Hold"
)
14 changes: 14 additions & 0 deletions apis/kueue/v1beta1/localqueue_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,20 @@ type LocalQueueSpec struct {
// clusterQueue is a reference to a clusterQueue that backs this localQueue.
// +kubebuilder:validation:XValidation:rule="self == oldSelf", message="field is immutable"
ClusterQueue ClusterQueueReference `json:"clusterQueue,omitempty"`

// stopPolicy - if set to a value different from None, the LocalQueue is considered Inactive,
// no new reservation being made.
//
// Depending on its value, its associated workloads will:
//
// - None - Workloads are admitted
// - HoldAndDrain - Admitted workloads are evicted and Reserving workloads will cancel the reservation.
// - Hold - Admitted workloads will run to completion and Reserving workloads will cancel the reservation.
//
// +optional
// +kubebuilder:validation:Enum=None;Hold;HoldAndDrain
// +kubebuilder:default="None"
StopPolicy *StopPolicy `json:"stopPolicy,omitempty"`
}

// ClusterQueueReference is the name of the ClusterQueue.
Expand Down
12 changes: 12 additions & 0 deletions apis/kueue/v1beta1/workload_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,10 @@ const (
)

const (
// WorkloadInadmissible means that the Workload can't reserve quota
// due to LocalQueue or ClusterQueue doesn't exist or inactive.
WorkloadInadmissible = "Inadmissible"

// WorkloadEvictedByPreemption indicates that the workload was evicted
// in order to free resources for a workload with a higher priority.
WorkloadEvictedByPreemption = "Preempted"
Expand All @@ -346,6 +350,10 @@ const (
// because the ClusterQueue is Stopped.
WorkloadEvictedByClusterQueueStopped = "ClusterQueueStopped"

// WorkloadEvictedByLocalQueueStopped indicates that the workload was evicted
// because the LocalQueue is Stopped.
WorkloadEvictedByLocalQueueStopped = "LocalQueueStopped"

// WorkloadEvictedByDeactivation indicates that the workload was evicted
// because spec.active is set to false.
WorkloadEvictedByDeactivation = "InactiveWorkload"
Expand All @@ -361,6 +369,10 @@ const (
// WorkloadClusterQueueRestarted indicates that the workload was requeued because
// cluster queue was restarted after being stopped.
WorkloadClusterQueueRestarted = "ClusterQueueRestarted"

// WorkloadLocalQueueRestarted indicates that the workload was requeued because
// local queue was restarted after being stopped.
WorkloadLocalQueueRestarted = "LocalQueueRestarted"
)

const (
Expand Down
7 changes: 6 additions & 1 deletion apis/kueue/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions charts/kueue/templates/crd/kueue.x-k8s.io_localqueues.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,24 @@ spec:
x-kubernetes-validations:
- message: field is immutable
rule: self == oldSelf
stopPolicy:
default: None
description: |-
stopPolicy - if set to a value different from None, the LocalQueue is considered Inactive,
no new reservation being made.
Depending on its value, its associated workloads will:
- None - Workloads are admitted
- HoldAndDrain - Admitted workloads are evicted and Reserving workloads will cancel the reservation.
- Hold - Admitted workloads will run to completion and Reserving workloads will cancel the reservation.
enum:
- None
- Hold
- HoldAndDrain
type: string
type: object
status:
description: LocalQueueStatus defines the observed state of LocalQueue
Expand Down
9 changes: 9 additions & 0 deletions client-go/applyconfiguration/kueue/v1beta1/localqueuespec.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions config/components/crd/bases/kueue.x-k8s.io_localqueues.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,24 @@ spec:
x-kubernetes-validations:
- message: field is immutable
rule: self == oldSelf
stopPolicy:
default: None
description: |-
stopPolicy - if set to a value different from None, the LocalQueue is considered Inactive,
no new reservation being made.
Depending on its value, its associated workloads will:
- None - Workloads are admitted
- HoldAndDrain - Admitted workloads are evicted and Reserving workloads will cancel the reservation.
- Hold - Admitted workloads will run to completion and Reserving workloads will cancel the reservation.
enum:
- None
- Hold
- HoldAndDrain
type: string
type: object
status:
description: LocalQueueStatus defines the observed state of LocalQueue
Expand Down
98 changes: 71 additions & 27 deletions pkg/controller/core/localqueue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ import (
)

const (
queueIsInactiveMsg = "Can't submit new workloads to clusterQueue"
failedUpdateLqStatusMsg = "Failed to retrieve localQueue status"
localQueueIsInactiveMsg = "Can't submit new workloads to localQueue"
clusterQueueIsInactiveMsg = "Can't submit new workloads to clusterQueue"
failedUpdateLqStatusMsg = "Failed to retrieve localQueue status"
)

const (
localQueueIsInactiveReason = "LocalQueueIsInactive"
clusterQueueIsInactiveReason = "ClusterQueueIsInactive"
)

Expand Down Expand Up @@ -99,19 +101,25 @@ func (r *LocalQueueReconciler) Reconcile(ctx context.Context, req ctrl.Request)
ctx = ctrl.LoggerInto(ctx, log)
log.V(2).Info("Reconciling LocalQueue")

var err error
if ptr.Deref(queueObj.Spec.StopPolicy, kueue.None) != kueue.None {
err = r.UpdateStatusIfChanged(ctx, &queueObj, metav1.ConditionFalse, localQueueIsInactiveReason, localQueueIsInactiveMsg)
return ctrl.Result{}, client.IgnoreNotFound(err)
}

var cq kueue.ClusterQueue
err := r.client.Get(ctx, client.ObjectKey{Name: string(queueObj.Spec.ClusterQueue)}, &cq)
err = r.client.Get(ctx, client.ObjectKey{Name: string(queueObj.Spec.ClusterQueue)}, &cq)
if err != nil {
if apierrors.IsNotFound(err) {
err = r.UpdateStatusIfChanged(ctx, &queueObj, metav1.ConditionFalse, "ClusterQueueDoesNotExist", queueIsInactiveMsg)
err = r.UpdateStatusIfChanged(ctx, &queueObj, metav1.ConditionFalse, "ClusterQueueDoesNotExist", clusterQueueIsInactiveMsg)
}
return ctrl.Result{}, client.IgnoreNotFound(err)
}
if meta.IsStatusConditionTrue(cq.Status.Conditions, kueue.ClusterQueueActive) {
err = r.UpdateStatusIfChanged(ctx, &queueObj, metav1.ConditionTrue, "Ready", "Can submit new workloads to clusterQueue")
return ctrl.Result{}, client.IgnoreNotFound(err)
}
err = r.UpdateStatusIfChanged(ctx, &queueObj, metav1.ConditionFalse, clusterQueueIsInactiveReason, queueIsInactiveMsg)
err = r.UpdateStatusIfChanged(ctx, &queueObj, metav1.ConditionFalse, clusterQueueIsInactiveReason, clusterQueueIsInactiveMsg)
return ctrl.Result{}, client.IgnoreNotFound(err)
}

Expand All @@ -123,6 +131,9 @@ func (r *LocalQueueReconciler) Create(e event.CreateEvent) bool {
}
log := r.log.WithValues("localQueue", klog.KObj(q))
log.V(2).Info("LocalQueue create event")
if ptr.Deref(q.Spec.StopPolicy, kueue.None) != kueue.None {
return true
}
ctx := logr.NewContext(context.Background(), log)
if err := r.queues.AddLocalQueue(ctx, q); err != nil {
log.Error(err, "Failed to add localQueue to the queueing system")
Expand All @@ -146,20 +157,45 @@ func (r *LocalQueueReconciler) Delete(e event.DeleteEvent) bool {
}

func (r *LocalQueueReconciler) Update(e event.UpdateEvent) bool {
q, match := e.ObjectNew.(*kueue.LocalQueue)
if !match {
oldLq, oldIsLq := e.ObjectOld.(*kueue.LocalQueue)
newLq, newIsLq := e.ObjectNew.(*kueue.LocalQueue)
if !oldIsLq || !newIsLq {
// No need to interact with the queue manager for other objects.
return true
}
log := r.log.WithValues("localQueue", klog.KObj(q))
log := r.log.WithValues("localQueue", klog.KObj(newLq))
log.V(2).Info("Queue update event")
if err := r.queues.UpdateLocalQueue(q); err != nil {
log.Error(err, "Failed to update queue in the queueing system")

oldStopPolicy := ptr.Deref(oldLq.Spec.StopPolicy, kueue.None)
newStopPolicy := ptr.Deref(newLq.Spec.StopPolicy, kueue.None)

if oldStopPolicy == kueue.None && newStopPolicy != kueue.None {
r.queues.DeleteLocalQueue(oldLq)
r.cache.DeleteLocalQueue(oldLq)
return true
}

if oldStopPolicy != kueue.None && newStopPolicy == kueue.None {
ctx := logr.NewContext(context.Background(), log)
if err := r.queues.AddLocalQueue(ctx, newLq); err != nil {
log.Error(err, "Failed to add localQueue to the queueing system")
}
if err := r.cache.AddLocalQueue(newLq); err != nil {
log.Error(err, "Failed to add localQueue to the cache")
}
return true
}
oldQ := e.ObjectOld.(*kueue.LocalQueue)
if err := r.cache.UpdateLocalQueue(oldQ, q); err != nil {
log.Error(err, "Failed to update localQueue in the cache")

if newStopPolicy == kueue.None {
if err := r.queues.UpdateLocalQueue(newLq); err != nil {
log.Error(err, "Failed to update queue in the queueing system")
}
oldQ := e.ObjectOld.(*kueue.LocalQueue)
if err := r.cache.UpdateLocalQueue(oldQ, newLq); err != nil {
log.Error(err, "Failed to update localQueue in the cache")
}
}

return true
}

Expand Down Expand Up @@ -275,21 +311,29 @@ func (r *LocalQueueReconciler) UpdateStatusIfChanged(
reason, msg string,
) error {
oldStatus := queue.Status.DeepCopy()
pendingWls, err := r.queues.PendingWorkloads(queue)
if err != nil {
r.log.Error(err, failedUpdateLqStatusMsg)
return err
}
stats, err := r.cache.LocalQueueUsage(queue)
if err != nil {
r.log.Error(err, failedUpdateLqStatusMsg)
return err
if ptr.Deref(queue.Spec.StopPolicy, kueue.None) == kueue.None {
pendingWls, err := r.queues.PendingWorkloads(queue)
if err != nil {
r.log.Error(err, failedUpdateLqStatusMsg)
return err
}
stats, err := r.cache.LocalQueueUsage(queue)
if err != nil {
r.log.Error(err, failedUpdateLqStatusMsg)
return err
}
queue.Status.PendingWorkloads = pendingWls
queue.Status.ReservingWorkloads = int32(stats.ReservingWorkloads)
queue.Status.AdmittedWorkloads = int32(stats.AdmittedWorkloads)
queue.Status.FlavorsReservation = stats.ReservedResources
queue.Status.FlavorUsage = stats.AdmittedResources
} else {
queue.Status.PendingWorkloads = 0
queue.Status.ReservingWorkloads = 0
queue.Status.AdmittedWorkloads = 0
queue.Status.FlavorsReservation = nil
queue.Status.FlavorUsage = nil
}
queue.Status.PendingWorkloads = pendingWls
queue.Status.ReservingWorkloads = int32(stats.ReservingWorkloads)
queue.Status.AdmittedWorkloads = int32(stats.AdmittedWorkloads)
queue.Status.FlavorsReservation = stats.ReservedResources
queue.Status.FlavorUsage = stats.AdmittedResources
if len(conditionStatus) != 0 && len(reason) != 0 && len(msg) != 0 {
meta.SetStatusCondition(&queue.Status.Conditions, metav1.Condition{
Type: kueue.LocalQueueActive,
Expand Down
50 changes: 49 additions & 1 deletion pkg/controller/core/localqueue_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,54 @@ func TestLocalQueueReconcile(t *testing.T) {
wantLocalQueue *kueue.LocalQueue
wantError error
}{
"local queue with Hold StopPolicy": {
clusterQueue: utiltesting.MakeClusterQueue("test-cluster-queue").
Obj(),
localQueue: utiltesting.MakeLocalQueue("test-queue", "default").
ClusterQueue("test-cluster-queue").
PendingWorkloads(1).
StopPolicy(kueue.Hold).
Generation(1).
Obj(),
wantLocalQueue: utiltesting.MakeLocalQueue("test-queue", "default").
ClusterQueue("test-cluster-queue").
PendingWorkloads(0).
StopPolicy(kueue.Hold).
Generation(1).
Condition(
kueue.LocalQueueActive,
metav1.ConditionFalse,
localQueueIsInactiveReason,
localQueueIsInactiveMsg,
1,
).
Obj(),
wantError: nil,
},
"local queue with HoldAndDrain StopPolicy": {
clusterQueue: utiltesting.MakeClusterQueue("test-cluster-queue").
Obj(),
localQueue: utiltesting.MakeLocalQueue("test-queue", "default").
ClusterQueue("test-cluster-queue").
PendingWorkloads(1).
StopPolicy(kueue.HoldAndDrain).
Generation(1).
Obj(),
wantLocalQueue: utiltesting.MakeLocalQueue("test-queue", "default").
ClusterQueue("test-cluster-queue").
PendingWorkloads(0).
StopPolicy(kueue.HoldAndDrain).
Generation(1).
Condition(
kueue.LocalQueueActive,
metav1.ConditionFalse,
localQueueIsInactiveReason,
localQueueIsInactiveMsg,
1,
).
Obj(),
wantError: nil,
},
"cluster queue is inactive": {
clusterQueue: utiltesting.MakeClusterQueue("test-cluster-queue").
Obj(),
Expand All @@ -58,7 +106,7 @@ func TestLocalQueueReconcile(t *testing.T) {
kueue.LocalQueueActive,
metav1.ConditionFalse,
clusterQueueIsInactiveReason,
queueIsInactiveMsg,
clusterQueueIsInactiveMsg,
1,
).
Obj(),
Expand Down

0 comments on commit 6efdcff

Please sign in to comment.