From d061decae1650f0083881a9ba67bc45bec4fcc8f Mon Sep 17 00:00:00 2001 From: Antonin Stefanutti Date: Tue, 9 Jan 2024 18:48:00 +0100 Subject: [PATCH 1/5] Add HA support for the visibility API --- .../core/admissioncheck_controller.go | 5 +- .../core/clusterqueue_controller.go | 5 +- .../core/leader_aware_reconciler.go | 63 +++++++++++++++++++ pkg/controller/core/localqueue_controller.go | 5 +- .../core/resourceflavor_controller.go | 5 +- pkg/controller/core/workload_controller.go | 4 +- 6 files changed, 82 insertions(+), 5 deletions(-) create mode 100644 pkg/controller/core/leader_aware_reconciler.go diff --git a/pkg/controller/core/admissioncheck_controller.go b/pkg/controller/core/admissioncheck_controller.go index adbdb9a91d..4c771df780 100644 --- a/pkg/controller/core/admissioncheck_controller.go +++ b/pkg/controller/core/admissioncheck_controller.go @@ -23,8 +23,10 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log" @@ -220,7 +222,8 @@ func (r *AdmissionCheckReconciler) SetupWithManager(mgr ctrl.Manager) error { } return ctrl.NewControllerManagedBy(mgr). For(&kueue.AdmissionCheck{}). + WithOptions(controller.Options{NeedLeaderElection: ptr.To(false)}). WatchesRawSource(&source.Channel{Source: r.cqUpdateCh}, &handler). WithEventFilter(r). - Complete(r) + Complete(WithLeadingManager(mgr, r)) } diff --git a/pkg/controller/core/clusterqueue_controller.go b/pkg/controller/core/clusterqueue_controller.go index 0cdb16f8eb..c089874185 100644 --- a/pkg/controller/core/clusterqueue_controller.go +++ b/pkg/controller/core/clusterqueue_controller.go @@ -30,8 +30,10 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -613,13 +615,14 @@ func (r *ClusterQueueReconciler) SetupWithManager(mgr ctrl.Manager) error { } return ctrl.NewControllerManagedBy(mgr). For(&kueue.ClusterQueue{}). + WithOptions(controller.Options{NeedLeaderElection: ptr.To(false)}). Watches(&corev1.Namespace{}, &nsHandler). WatchesRawSource(&source.Channel{Source: r.wlUpdateCh}, &wHandler). WatchesRawSource(&source.Channel{Source: r.rfUpdateCh}, &rfHandler). WatchesRawSource(&source.Channel{Source: r.acUpdateCh}, &acHandler). WatchesRawSource(&source.Channel{Source: r.snapUpdateCh}, &snapHandler). WithEventFilter(r). - Complete(r) + Complete(WithLeadingManager(mgr, r)) } func (r *ClusterQueueReconciler) updateCqStatusIfChanged( diff --git a/pkg/controller/core/leader_aware_reconciler.go b/pkg/controller/core/leader_aware_reconciler.go new file mode 100644 index 0000000000..abcbb2e911 --- /dev/null +++ b/pkg/controller/core/leader_aware_reconciler.go @@ -0,0 +1,63 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package core + +import ( + "context" + + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +// WithLeadingManager returns a decorating reconcile.Reconciler that discards reconciliation requests +// for the controllers that are started with the controller.Options.NeedLeaderElection +// option set to false in non-leading replicas. +// +// Starting controllers in non-leading replicas is needed for these that update the data +// served by the visibility extension API server. +// +// This enables to: +// - Keep the scheduling decisions under the responsibility of the leading replica alone, +// to prevent any concurrency issues. +// - Consume requests from the watch event queues, to prevent them from growing indefinitely +// in the non-leading replicas. +// - Transition to actually reconciling requests in the replica that may acquire +// the leader election lease, in case the previously leading replica failed to renew it. +func WithLeadingManager(mgr ctrl.Manager, reconciler reconcile.Reconciler) reconcile.Reconciler { + return &leaderAwareReconciler{ + elected: mgr.Elected(), + delegate: reconciler, + } +} + +type leaderAwareReconciler struct { + elected <-chan struct{} + delegate reconcile.Reconciler +} + +var _ reconcile.Reconciler = (*leaderAwareReconciler)(nil) + +func (r *leaderAwareReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { + select { + case <-r.elected: + // The manager has been elected leader, delegate reconciliation to the provided reconciler. + return r.delegate.Reconcile(ctx, request) + default: + // The manager hasn't been elected leader yet, skip reconciliation. + return ctrl.Result{}, nil + } +} diff --git a/pkg/controller/core/localqueue_controller.go b/pkg/controller/core/localqueue_controller.go index 5d623ca888..6351728455 100644 --- a/pkg/controller/core/localqueue_controller.go +++ b/pkg/controller/core/localqueue_controller.go @@ -27,8 +27,10 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" @@ -254,10 +256,11 @@ func (r *LocalQueueReconciler) SetupWithManager(mgr ctrl.Manager) error { } return ctrl.NewControllerManagedBy(mgr). For(&kueue.LocalQueue{}). + WithOptions(controller.Options{NeedLeaderElection: ptr.To(false)}). WatchesRawSource(&source.Channel{Source: r.wlUpdateCh}, &qWorkloadHandler{}). Watches(&kueue.ClusterQueue{}, &queueCQHandler). WithEventFilter(r). - Complete(r) + Complete(WithLeadingManager(mgr, r)) } func (r *LocalQueueReconciler) UpdateStatusIfChanged( diff --git a/pkg/controller/core/resourceflavor_controller.go b/pkg/controller/core/resourceflavor_controller.go index f243b9f006..da6405846e 100644 --- a/pkg/controller/core/resourceflavor_controller.go +++ b/pkg/controller/core/resourceflavor_controller.go @@ -24,8 +24,10 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -255,9 +257,10 @@ func (r *ResourceFlavorReconciler) SetupWithManager(mgr ctrl.Manager) error { } return ctrl.NewControllerManagedBy(mgr). For(&kueue.ResourceFlavor{}). + WithOptions(controller.Options{NeedLeaderElection: ptr.To(false)}). WatchesRawSource(&source.Channel{Source: r.cqUpdateCh}, &handler). WithEventFilter(r). - Complete(r) + Complete(WithLeadingManager(mgr, r)) } func resourceFlavors(cq *kueue.ClusterQueue) sets.Set[kueue.ResourceFlavorReference] { diff --git a/pkg/controller/core/workload_controller.go b/pkg/controller/core/workload_controller.go index 7ca6409a21..7c8a391452 100644 --- a/pkg/controller/core/workload_controller.go +++ b/pkg/controller/core/workload_controller.go @@ -35,6 +35,7 @@ import ( "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -495,11 +496,12 @@ func (r *WorkloadReconciler) SetupWithManager(mgr ctrl.Manager) error { } return ctrl.NewControllerManagedBy(mgr). For(&kueue.Workload{}). + WithOptions(controller.Options{NeedLeaderElection: ptr.To(false)}). Watches(&corev1.LimitRange{}, ruh). Watches(&nodev1.RuntimeClass{}, ruh). Watches(&kueue.ClusterQueue{}, &workloadCqHandler{client: r.client}). WithEventFilter(r). - Complete(r) + Complete(WithLeadingManager(mgr, r)) } // admittedNotReadyWorkload returns as a pair of values. The first boolean determines From 787ca7942c38ae65a5253fbca51d3584ce54c791 Mon Sep 17 00:00:00 2001 From: Antonin Stefanutti Date: Wed, 10 Jan 2024 10:24:46 +0100 Subject: [PATCH 2/5] Deploy Kueue in HA mode in e2e tests --- test/e2e/config/kustomization.yaml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/test/e2e/config/kustomization.yaml b/test/e2e/config/kustomization.yaml index d38e4b7edd..c99cb63585 100644 --- a/test/e2e/config/kustomization.yaml +++ b/test/e2e/config/kustomization.yaml @@ -4,6 +4,10 @@ kind: Kustomization resources: - ../../../config/default +replicas: +- name: kueue-controller-manager + count: 2 + patches: - path: manager_e2e_patch.yaml target: From 923e7bb972840a9ea382830104c199838f6d1a70 Mon Sep 17 00:00:00 2001 From: Antonin Stefanutti Date: Thu, 11 Jan 2024 11:22:16 +0100 Subject: [PATCH 3/5] Insure against event misses during leader election failover --- .../core/admissioncheck_controller.go | 5 ++- .../core/clusterqueue_controller.go | 5 ++- pkg/controller/core/core.go | 10 ++--- .../core/leader_aware_reconciler.go | 38 +++++++++++++++---- pkg/controller/core/localqueue_controller.go | 5 ++- .../core/resourceflavor_controller.go | 5 ++- pkg/controller/core/workload_controller.go | 5 ++- 7 files changed, 51 insertions(+), 22 deletions(-) diff --git a/pkg/controller/core/admissioncheck_controller.go b/pkg/controller/core/admissioncheck_controller.go index 4c771df780..f64ba2526c 100644 --- a/pkg/controller/core/admissioncheck_controller.go +++ b/pkg/controller/core/admissioncheck_controller.go @@ -33,6 +33,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" + config "sigs.k8s.io/kueue/apis/config/v1beta1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/cache" "sigs.k8s.io/kueue/pkg/queue" @@ -216,7 +217,7 @@ func (h *acCqHandler) Generic(ctx context.Context, e event.GenericEvent, q workq } // SetupWithManager sets up the controller with the Manager. -func (r *AdmissionCheckReconciler) SetupWithManager(mgr ctrl.Manager) error { +func (r *AdmissionCheckReconciler) SetupWithManager(mgr ctrl.Manager, cfg *config.Configuration) error { handler := acCqHandler{ cache: r.cache, } @@ -225,5 +226,5 @@ func (r *AdmissionCheckReconciler) SetupWithManager(mgr ctrl.Manager) error { WithOptions(controller.Options{NeedLeaderElection: ptr.To(false)}). WatchesRawSource(&source.Channel{Source: r.cqUpdateCh}, &handler). WithEventFilter(r). - Complete(WithLeadingManager(mgr, r)) + Complete(WithLeadingManager(mgr, r, cfg)) } diff --git a/pkg/controller/core/clusterqueue_controller.go b/pkg/controller/core/clusterqueue_controller.go index c089874185..3740b2da8a 100644 --- a/pkg/controller/core/clusterqueue_controller.go +++ b/pkg/controller/core/clusterqueue_controller.go @@ -39,6 +39,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" + config "sigs.k8s.io/kueue/apis/config/v1beta1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/cache" "sigs.k8s.io/kueue/pkg/constants" @@ -596,7 +597,7 @@ func (h *cqSnapshotHandler) Generic(_ context.Context, e event.GenericEvent, q w } // SetupWithManager sets up the controller with the Manager. -func (r *ClusterQueueReconciler) SetupWithManager(mgr ctrl.Manager) error { +func (r *ClusterQueueReconciler) SetupWithManager(mgr ctrl.Manager, cfg *config.Configuration) error { wHandler := cqWorkloadHandler{ qManager: r.qManager, } @@ -622,7 +623,7 @@ func (r *ClusterQueueReconciler) SetupWithManager(mgr ctrl.Manager) error { WatchesRawSource(&source.Channel{Source: r.acUpdateCh}, &acHandler). WatchesRawSource(&source.Channel{Source: r.snapUpdateCh}, &snapHandler). WithEventFilter(r). - Complete(WithLeadingManager(mgr, r)) + Complete(WithLeadingManager(mgr, r, cfg)) } func (r *ClusterQueueReconciler) updateCqStatusIfChanged( diff --git a/pkg/controller/core/core.go b/pkg/controller/core/core.go index 0a2b1a89a1..099623b208 100644 --- a/pkg/controller/core/core.go +++ b/pkg/controller/core/core.go @@ -33,15 +33,15 @@ const updateChBuffer = 10 // controller that failed to create and an error, if any. func SetupControllers(mgr ctrl.Manager, qManager *queue.Manager, cc *cache.Cache, cfg *config.Configuration) (string, error) { rfRec := NewResourceFlavorReconciler(mgr.GetClient(), qManager, cc) - if err := rfRec.SetupWithManager(mgr); err != nil { + if err := rfRec.SetupWithManager(mgr, cfg); err != nil { return "ResourceFlavor", err } acRec := NewAdmissionCheckReconciler(mgr.GetClient(), qManager, cc) - if err := acRec.SetupWithManager(mgr); err != nil { + if err := acRec.SetupWithManager(mgr, cfg); err != nil { return "AdmissionCheck", err } qRec := NewLocalQueueReconciler(mgr.GetClient(), qManager, cc) - if err := qRec.SetupWithManager(mgr); err != nil { + if err := qRec.SetupWithManager(mgr, cfg); err != nil { return "LocalQueue", err } @@ -59,13 +59,13 @@ func SetupControllers(mgr ctrl.Manager, qManager *queue.Manager, cc *cache.Cache } rfRec.AddUpdateWatcher(cqRec) acRec.AddUpdateWatchers(cqRec) - if err := cqRec.SetupWithManager(mgr); err != nil { + if err := cqRec.SetupWithManager(mgr, cfg); err != nil { return "ClusterQueue", err } if err := NewWorkloadReconciler(mgr.GetClient(), qManager, cc, mgr.GetEventRecorderFor(constants.WorkloadControllerName), WithWorkloadUpdateWatchers(qRec, cqRec), - WithPodsReadyTimeout(podsReadyTimeout(cfg))).SetupWithManager(mgr); err != nil { + WithPodsReadyTimeout(podsReadyTimeout(cfg))).SetupWithManager(mgr, cfg); err != nil { return "Workload", err } return "", nil diff --git a/pkg/controller/core/leader_aware_reconciler.go b/pkg/controller/core/leader_aware_reconciler.go index abcbb2e911..3f71d184e8 100644 --- a/pkg/controller/core/leader_aware_reconciler.go +++ b/pkg/controller/core/leader_aware_reconciler.go @@ -18,11 +18,20 @@ package core import ( "context" + "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/reconcile" + + config "sigs.k8s.io/kueue/apis/config/v1beta1" ) +// defaultRequeueDuration defaults the duration used by non-leading replicas +// to requeue events, so no events are missed over the period it takes for +// leader election to fail over a new replica. +const defaultRequeueDuration = 15 * time.Second + // WithLeadingManager returns a decorating reconcile.Reconciler that discards reconciliation requests // for the controllers that are started with the controller.Options.NeedLeaderElection // option set to false in non-leading replicas. @@ -37,16 +46,28 @@ import ( // in the non-leading replicas. // - Transition to actually reconciling requests in the replica that may acquire // the leader election lease, in case the previously leading replica failed to renew it. -func WithLeadingManager(mgr ctrl.Manager, reconciler reconcile.Reconciler) reconcile.Reconciler { +func WithLeadingManager(mgr ctrl.Manager, reconciler reconcile.Reconciler, cfg *config.Configuration) reconcile.Reconciler { + // Default to the recommended lease duration, that's used for core components + requeueDuration := defaultRequeueDuration + // Otherwise used the configured lease duration for the manager + if le := cfg.LeaderElection; le != nil { + zero := metav1.Duration{} + if duration := le.LeaseDuration; duration != zero { + requeueDuration = duration.Duration + } + } + return &leaderAwareReconciler{ - elected: mgr.Elected(), - delegate: reconciler, + elected: mgr.Elected(), + delegate: reconciler, + requeueDuration: requeueDuration, } } type leaderAwareReconciler struct { - elected <-chan struct{} - delegate reconcile.Reconciler + elected <-chan struct{} + delegate reconcile.Reconciler + requeueDuration time.Duration } var _ reconcile.Reconciler = (*leaderAwareReconciler)(nil) @@ -57,7 +78,10 @@ func (r *leaderAwareReconciler) Reconcile(ctx context.Context, request reconcile // The manager has been elected leader, delegate reconciliation to the provided reconciler. return r.delegate.Reconcile(ctx, request) default: - // The manager hasn't been elected leader yet, skip reconciliation. - return ctrl.Result{}, nil + // The manager hasn't been elected leader yet, requeue the reconciliation request + // to prevent against any missed / discarded events over the period it takes + // to fail over a new leading replica, which can take as much as the configured + // lease duration, for it to acquire leadership. + return ctrl.Result{RequeueAfter: r.requeueDuration}, nil } } diff --git a/pkg/controller/core/localqueue_controller.go b/pkg/controller/core/localqueue_controller.go index 6351728455..351e208221 100644 --- a/pkg/controller/core/localqueue_controller.go +++ b/pkg/controller/core/localqueue_controller.go @@ -35,6 +35,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" + config "sigs.k8s.io/kueue/apis/config/v1beta1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/cache" "sigs.k8s.io/kueue/pkg/constants" @@ -250,7 +251,7 @@ func (h *qCQHandler) addLocalQueueToWorkQueue(ctx context.Context, cq *kueue.Clu } // SetupWithManager sets up the controller with the Manager. -func (r *LocalQueueReconciler) SetupWithManager(mgr ctrl.Manager) error { +func (r *LocalQueueReconciler) SetupWithManager(mgr ctrl.Manager, cfg *config.Configuration) error { queueCQHandler := qCQHandler{ client: r.client, } @@ -260,7 +261,7 @@ func (r *LocalQueueReconciler) SetupWithManager(mgr ctrl.Manager) error { WatchesRawSource(&source.Channel{Source: r.wlUpdateCh}, &qWorkloadHandler{}). Watches(&kueue.ClusterQueue{}, &queueCQHandler). WithEventFilter(r). - Complete(WithLeadingManager(mgr, r)) + Complete(WithLeadingManager(mgr, r, cfg)) } func (r *LocalQueueReconciler) UpdateStatusIfChanged( diff --git a/pkg/controller/core/resourceflavor_controller.go b/pkg/controller/core/resourceflavor_controller.go index da6405846e..2f4c586c0e 100644 --- a/pkg/controller/core/resourceflavor_controller.go +++ b/pkg/controller/core/resourceflavor_controller.go @@ -33,6 +33,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" + config "sigs.k8s.io/kueue/apis/config/v1beta1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/cache" "sigs.k8s.io/kueue/pkg/queue" @@ -251,7 +252,7 @@ func (h *cqHandler) Generic(_ context.Context, e event.GenericEvent, q workqueue } // SetupWithManager sets up the controller with the Manager. -func (r *ResourceFlavorReconciler) SetupWithManager(mgr ctrl.Manager) error { +func (r *ResourceFlavorReconciler) SetupWithManager(mgr ctrl.Manager, cfg *config.Configuration) error { handler := cqHandler{ cache: r.cache, } @@ -260,7 +261,7 @@ func (r *ResourceFlavorReconciler) SetupWithManager(mgr ctrl.Manager) error { WithOptions(controller.Options{NeedLeaderElection: ptr.To(false)}). WatchesRawSource(&source.Channel{Source: r.cqUpdateCh}, &handler). WithEventFilter(r). - Complete(WithLeadingManager(mgr, r)) + Complete(WithLeadingManager(mgr, r, cfg)) } func resourceFlavors(cq *kueue.ClusterQueue) sets.Set[kueue.ResourceFlavorReference] { diff --git a/pkg/controller/core/workload_controller.go b/pkg/controller/core/workload_controller.go index 7c8a391452..777d2fb7f8 100644 --- a/pkg/controller/core/workload_controller.go +++ b/pkg/controller/core/workload_controller.go @@ -40,6 +40,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" + config "sigs.k8s.io/kueue/apis/config/v1beta1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/cache" "sigs.k8s.io/kueue/pkg/constants" @@ -490,7 +491,7 @@ func (r *WorkloadReconciler) notifyWatchers(oldWl, newWl *kueue.Workload) { } // SetupWithManager sets up the controller with the Manager. -func (r *WorkloadReconciler) SetupWithManager(mgr ctrl.Manager) error { +func (r *WorkloadReconciler) SetupWithManager(mgr ctrl.Manager, cfg *config.Configuration) error { ruh := &resourceUpdatesHandler{ r: r, } @@ -501,7 +502,7 @@ func (r *WorkloadReconciler) SetupWithManager(mgr ctrl.Manager) error { Watches(&nodev1.RuntimeClass{}, ruh). Watches(&kueue.ClusterQueue{}, &workloadCqHandler{client: r.client}). WithEventFilter(r). - Complete(WithLeadingManager(mgr, r)) + Complete(WithLeadingManager(mgr, r, cfg)) } // admittedNotReadyWorkload returns as a pair of values. The first boolean determines From 47efd3707e8a85bcb7d8710b88d1e637f94e6e10 Mon Sep 17 00:00:00 2001 From: Antonin Stefanutti Date: Thu, 11 Jan 2024 11:40:00 +0100 Subject: [PATCH 4/5] Decorate reconcilers only when leader election is enabled --- pkg/controller/core/leader_aware_reconciler.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/pkg/controller/core/leader_aware_reconciler.go b/pkg/controller/core/leader_aware_reconciler.go index 3f71d184e8..6f7ebeac62 100644 --- a/pkg/controller/core/leader_aware_reconciler.go +++ b/pkg/controller/core/leader_aware_reconciler.go @@ -21,6 +21,7 @@ import ( "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -47,14 +48,17 @@ const defaultRequeueDuration = 15 * time.Second // - Transition to actually reconciling requests in the replica that may acquire // the leader election lease, in case the previously leading replica failed to renew it. func WithLeadingManager(mgr ctrl.Manager, reconciler reconcile.Reconciler, cfg *config.Configuration) reconcile.Reconciler { + // Do not decorate the reconciler if leader election is disabled + if cfg.LeaderElection == nil || !ptr.Deref(cfg.LeaderElection.LeaderElect, false) { + return reconciler + } + // Default to the recommended lease duration, that's used for core components requeueDuration := defaultRequeueDuration // Otherwise used the configured lease duration for the manager - if le := cfg.LeaderElection; le != nil { - zero := metav1.Duration{} - if duration := le.LeaseDuration; duration != zero { - requeueDuration = duration.Duration - } + zero := metav1.Duration{} + if duration := cfg.LeaderElection.LeaseDuration; duration != zero { + requeueDuration = duration.Duration } return &leaderAwareReconciler{ From b4ae8f06cd92fc019618990cd94c003818e797a0 Mon Sep 17 00:00:00 2001 From: Antonin Stefanutti Date: Fri, 12 Jan 2024 13:21:15 +0100 Subject: [PATCH 5/5] Skip requests in non-leading replicas on not found errors --- pkg/controller/core/admissioncheck_controller.go | 2 +- pkg/controller/core/clusterqueue_controller.go | 2 +- pkg/controller/core/leader_aware_reconciler.go | 11 ++++++++++- pkg/controller/core/localqueue_controller.go | 2 +- pkg/controller/core/resourceflavor_controller.go | 2 +- pkg/controller/core/workload_controller.go | 2 +- 6 files changed, 15 insertions(+), 6 deletions(-) diff --git a/pkg/controller/core/admissioncheck_controller.go b/pkg/controller/core/admissioncheck_controller.go index f64ba2526c..67c81d8f32 100644 --- a/pkg/controller/core/admissioncheck_controller.go +++ b/pkg/controller/core/admissioncheck_controller.go @@ -226,5 +226,5 @@ func (r *AdmissionCheckReconciler) SetupWithManager(mgr ctrl.Manager, cfg *confi WithOptions(controller.Options{NeedLeaderElection: ptr.To(false)}). WatchesRawSource(&source.Channel{Source: r.cqUpdateCh}, &handler). WithEventFilter(r). - Complete(WithLeadingManager(mgr, r, cfg)) + Complete(WithLeadingManager(mgr, r, &kueue.AdmissionCheck{}, cfg)) } diff --git a/pkg/controller/core/clusterqueue_controller.go b/pkg/controller/core/clusterqueue_controller.go index 3740b2da8a..a8148d02dc 100644 --- a/pkg/controller/core/clusterqueue_controller.go +++ b/pkg/controller/core/clusterqueue_controller.go @@ -623,7 +623,7 @@ func (r *ClusterQueueReconciler) SetupWithManager(mgr ctrl.Manager, cfg *config. WatchesRawSource(&source.Channel{Source: r.acUpdateCh}, &acHandler). WatchesRawSource(&source.Channel{Source: r.snapUpdateCh}, &snapHandler). WithEventFilter(r). - Complete(WithLeadingManager(mgr, r, cfg)) + Complete(WithLeadingManager(mgr, r, &kueue.ClusterQueue{}, cfg)) } func (r *ClusterQueueReconciler) updateCqStatusIfChanged( diff --git a/pkg/controller/core/leader_aware_reconciler.go b/pkg/controller/core/leader_aware_reconciler.go index 6f7ebeac62..3d54aad04e 100644 --- a/pkg/controller/core/leader_aware_reconciler.go +++ b/pkg/controller/core/leader_aware_reconciler.go @@ -23,6 +23,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" config "sigs.k8s.io/kueue/apis/config/v1beta1" @@ -47,7 +48,7 @@ const defaultRequeueDuration = 15 * time.Second // in the non-leading replicas. // - Transition to actually reconciling requests in the replica that may acquire // the leader election lease, in case the previously leading replica failed to renew it. -func WithLeadingManager(mgr ctrl.Manager, reconciler reconcile.Reconciler, cfg *config.Configuration) reconcile.Reconciler { +func WithLeadingManager(mgr ctrl.Manager, reconciler reconcile.Reconciler, obj client.Object, cfg *config.Configuration) reconcile.Reconciler { // Do not decorate the reconciler if leader election is disabled if cfg.LeaderElection == nil || !ptr.Deref(cfg.LeaderElection.LeaderElect, false) { return reconciler @@ -63,14 +64,18 @@ func WithLeadingManager(mgr ctrl.Manager, reconciler reconcile.Reconciler, cfg * return &leaderAwareReconciler{ elected: mgr.Elected(), + client: mgr.GetClient(), delegate: reconciler, + object: obj, requeueDuration: requeueDuration, } } type leaderAwareReconciler struct { elected <-chan struct{} + client client.Client delegate reconcile.Reconciler + object client.Object requeueDuration time.Duration } @@ -82,6 +87,10 @@ func (r *leaderAwareReconciler) Reconcile(ctx context.Context, request reconcile // The manager has been elected leader, delegate reconciliation to the provided reconciler. return r.delegate.Reconcile(ctx, request) default: + if err := r.client.Get(ctx, request.NamespacedName, r.object); err != nil { + // Discard request if not found, to prevent from re-enqueueing indefinitely. + return ctrl.Result{}, client.IgnoreNotFound(err) + } // The manager hasn't been elected leader yet, requeue the reconciliation request // to prevent against any missed / discarded events over the period it takes // to fail over a new leading replica, which can take as much as the configured diff --git a/pkg/controller/core/localqueue_controller.go b/pkg/controller/core/localqueue_controller.go index 351e208221..e8eb3d9815 100644 --- a/pkg/controller/core/localqueue_controller.go +++ b/pkg/controller/core/localqueue_controller.go @@ -261,7 +261,7 @@ func (r *LocalQueueReconciler) SetupWithManager(mgr ctrl.Manager, cfg *config.Co WatchesRawSource(&source.Channel{Source: r.wlUpdateCh}, &qWorkloadHandler{}). Watches(&kueue.ClusterQueue{}, &queueCQHandler). WithEventFilter(r). - Complete(WithLeadingManager(mgr, r, cfg)) + Complete(WithLeadingManager(mgr, r, &kueue.LocalQueue{}, cfg)) } func (r *LocalQueueReconciler) UpdateStatusIfChanged( diff --git a/pkg/controller/core/resourceflavor_controller.go b/pkg/controller/core/resourceflavor_controller.go index 2f4c586c0e..1e1a0506f9 100644 --- a/pkg/controller/core/resourceflavor_controller.go +++ b/pkg/controller/core/resourceflavor_controller.go @@ -261,7 +261,7 @@ func (r *ResourceFlavorReconciler) SetupWithManager(mgr ctrl.Manager, cfg *confi WithOptions(controller.Options{NeedLeaderElection: ptr.To(false)}). WatchesRawSource(&source.Channel{Source: r.cqUpdateCh}, &handler). WithEventFilter(r). - Complete(WithLeadingManager(mgr, r, cfg)) + Complete(WithLeadingManager(mgr, r, &kueue.ResourceFlavor{}, cfg)) } func resourceFlavors(cq *kueue.ClusterQueue) sets.Set[kueue.ResourceFlavorReference] { diff --git a/pkg/controller/core/workload_controller.go b/pkg/controller/core/workload_controller.go index 777d2fb7f8..96cb8a3caa 100644 --- a/pkg/controller/core/workload_controller.go +++ b/pkg/controller/core/workload_controller.go @@ -502,7 +502,7 @@ func (r *WorkloadReconciler) SetupWithManager(mgr ctrl.Manager, cfg *config.Conf Watches(&nodev1.RuntimeClass{}, ruh). Watches(&kueue.ClusterQueue{}, &workloadCqHandler{client: r.client}). WithEventFilter(r). - Complete(WithLeadingManager(mgr, r, cfg)) + Complete(WithLeadingManager(mgr, r, &kueue.Workload{}, cfg)) } // admittedNotReadyWorkload returns as a pair of values. The first boolean determines