diff --git a/pkg/controller/core/admissioncheck_controller.go b/pkg/controller/core/admissioncheck_controller.go index adbdb9a91d..67c81d8f32 100644 --- a/pkg/controller/core/admissioncheck_controller.go +++ b/pkg/controller/core/admissioncheck_controller.go @@ -23,14 +23,17 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" + config "sigs.k8s.io/kueue/apis/config/v1beta1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/cache" "sigs.k8s.io/kueue/pkg/queue" @@ -214,13 +217,14 @@ func (h *acCqHandler) Generic(ctx context.Context, e event.GenericEvent, q workq } // SetupWithManager sets up the controller with the Manager. -func (r *AdmissionCheckReconciler) SetupWithManager(mgr ctrl.Manager) error { +func (r *AdmissionCheckReconciler) SetupWithManager(mgr ctrl.Manager, cfg *config.Configuration) error { handler := acCqHandler{ cache: r.cache, } return ctrl.NewControllerManagedBy(mgr). For(&kueue.AdmissionCheck{}). + WithOptions(controller.Options{NeedLeaderElection: ptr.To(false)}). WatchesRawSource(&source.Channel{Source: r.cqUpdateCh}, &handler). WithEventFilter(r). - Complete(r) + Complete(WithLeadingManager(mgr, r, &kueue.AdmissionCheck{}, cfg)) } diff --git a/pkg/controller/core/clusterqueue_controller.go b/pkg/controller/core/clusterqueue_controller.go index 0cdb16f8eb..a8148d02dc 100644 --- a/pkg/controller/core/clusterqueue_controller.go +++ b/pkg/controller/core/clusterqueue_controller.go @@ -30,13 +30,16 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" + config "sigs.k8s.io/kueue/apis/config/v1beta1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/cache" "sigs.k8s.io/kueue/pkg/constants" @@ -594,7 +597,7 @@ func (h *cqSnapshotHandler) Generic(_ context.Context, e event.GenericEvent, q w } // SetupWithManager sets up the controller with the Manager. -func (r *ClusterQueueReconciler) SetupWithManager(mgr ctrl.Manager) error { +func (r *ClusterQueueReconciler) SetupWithManager(mgr ctrl.Manager, cfg *config.Configuration) error { wHandler := cqWorkloadHandler{ qManager: r.qManager, } @@ -613,13 +616,14 @@ func (r *ClusterQueueReconciler) SetupWithManager(mgr ctrl.Manager) error { } return ctrl.NewControllerManagedBy(mgr). For(&kueue.ClusterQueue{}). + WithOptions(controller.Options{NeedLeaderElection: ptr.To(false)}). Watches(&corev1.Namespace{}, &nsHandler). WatchesRawSource(&source.Channel{Source: r.wlUpdateCh}, &wHandler). WatchesRawSource(&source.Channel{Source: r.rfUpdateCh}, &rfHandler). WatchesRawSource(&source.Channel{Source: r.acUpdateCh}, &acHandler). WatchesRawSource(&source.Channel{Source: r.snapUpdateCh}, &snapHandler). WithEventFilter(r). - Complete(r) + Complete(WithLeadingManager(mgr, r, &kueue.ClusterQueue{}, cfg)) } func (r *ClusterQueueReconciler) updateCqStatusIfChanged( diff --git a/pkg/controller/core/core.go b/pkg/controller/core/core.go index 0a2b1a89a1..099623b208 100644 --- a/pkg/controller/core/core.go +++ b/pkg/controller/core/core.go @@ -33,15 +33,15 @@ const updateChBuffer = 10 // controller that failed to create and an error, if any. func SetupControllers(mgr ctrl.Manager, qManager *queue.Manager, cc *cache.Cache, cfg *config.Configuration) (string, error) { rfRec := NewResourceFlavorReconciler(mgr.GetClient(), qManager, cc) - if err := rfRec.SetupWithManager(mgr); err != nil { + if err := rfRec.SetupWithManager(mgr, cfg); err != nil { return "ResourceFlavor", err } acRec := NewAdmissionCheckReconciler(mgr.GetClient(), qManager, cc) - if err := acRec.SetupWithManager(mgr); err != nil { + if err := acRec.SetupWithManager(mgr, cfg); err != nil { return "AdmissionCheck", err } qRec := NewLocalQueueReconciler(mgr.GetClient(), qManager, cc) - if err := qRec.SetupWithManager(mgr); err != nil { + if err := qRec.SetupWithManager(mgr, cfg); err != nil { return "LocalQueue", err } @@ -59,13 +59,13 @@ func SetupControllers(mgr ctrl.Manager, qManager *queue.Manager, cc *cache.Cache } rfRec.AddUpdateWatcher(cqRec) acRec.AddUpdateWatchers(cqRec) - if err := cqRec.SetupWithManager(mgr); err != nil { + if err := cqRec.SetupWithManager(mgr, cfg); err != nil { return "ClusterQueue", err } if err := NewWorkloadReconciler(mgr.GetClient(), qManager, cc, mgr.GetEventRecorderFor(constants.WorkloadControllerName), WithWorkloadUpdateWatchers(qRec, cqRec), - WithPodsReadyTimeout(podsReadyTimeout(cfg))).SetupWithManager(mgr); err != nil { + WithPodsReadyTimeout(podsReadyTimeout(cfg))).SetupWithManager(mgr, cfg); err != nil { return "Workload", err } return "", nil diff --git a/pkg/controller/core/leader_aware_reconciler.go b/pkg/controller/core/leader_aware_reconciler.go new file mode 100644 index 0000000000..3d54aad04e --- /dev/null +++ b/pkg/controller/core/leader_aware_reconciler.go @@ -0,0 +1,100 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package core + +import ( + "context" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/ptr" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + config "sigs.k8s.io/kueue/apis/config/v1beta1" +) + +// defaultRequeueDuration defaults the duration used by non-leading replicas +// to requeue events, so no events are missed over the period it takes for +// leader election to fail over a new replica. +const defaultRequeueDuration = 15 * time.Second + +// WithLeadingManager returns a decorating reconcile.Reconciler that discards reconciliation requests +// for the controllers that are started with the controller.Options.NeedLeaderElection +// option set to false in non-leading replicas. +// +// Starting controllers in non-leading replicas is needed for these that update the data +// served by the visibility extension API server. +// +// This enables to: +// - Keep the scheduling decisions under the responsibility of the leading replica alone, +// to prevent any concurrency issues. +// - Consume requests from the watch event queues, to prevent them from growing indefinitely +// in the non-leading replicas. +// - Transition to actually reconciling requests in the replica that may acquire +// the leader election lease, in case the previously leading replica failed to renew it. +func WithLeadingManager(mgr ctrl.Manager, reconciler reconcile.Reconciler, obj client.Object, cfg *config.Configuration) reconcile.Reconciler { + // Do not decorate the reconciler if leader election is disabled + if cfg.LeaderElection == nil || !ptr.Deref(cfg.LeaderElection.LeaderElect, false) { + return reconciler + } + + // Default to the recommended lease duration, that's used for core components + requeueDuration := defaultRequeueDuration + // Otherwise used the configured lease duration for the manager + zero := metav1.Duration{} + if duration := cfg.LeaderElection.LeaseDuration; duration != zero { + requeueDuration = duration.Duration + } + + return &leaderAwareReconciler{ + elected: mgr.Elected(), + client: mgr.GetClient(), + delegate: reconciler, + object: obj, + requeueDuration: requeueDuration, + } +} + +type leaderAwareReconciler struct { + elected <-chan struct{} + client client.Client + delegate reconcile.Reconciler + object client.Object + requeueDuration time.Duration +} + +var _ reconcile.Reconciler = (*leaderAwareReconciler)(nil) + +func (r *leaderAwareReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { + select { + case <-r.elected: + // The manager has been elected leader, delegate reconciliation to the provided reconciler. + return r.delegate.Reconcile(ctx, request) + default: + if err := r.client.Get(ctx, request.NamespacedName, r.object); err != nil { + // Discard request if not found, to prevent from re-enqueueing indefinitely. + return ctrl.Result{}, client.IgnoreNotFound(err) + } + // The manager hasn't been elected leader yet, requeue the reconciliation request + // to prevent against any missed / discarded events over the period it takes + // to fail over a new leading replica, which can take as much as the configured + // lease duration, for it to acquire leadership. + return ctrl.Result{RequeueAfter: r.requeueDuration}, nil + } +} diff --git a/pkg/controller/core/localqueue_controller.go b/pkg/controller/core/localqueue_controller.go index 5d623ca888..e8eb3d9815 100644 --- a/pkg/controller/core/localqueue_controller.go +++ b/pkg/controller/core/localqueue_controller.go @@ -27,12 +27,15 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" + config "sigs.k8s.io/kueue/apis/config/v1beta1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/cache" "sigs.k8s.io/kueue/pkg/constants" @@ -248,16 +251,17 @@ func (h *qCQHandler) addLocalQueueToWorkQueue(ctx context.Context, cq *kueue.Clu } // SetupWithManager sets up the controller with the Manager. -func (r *LocalQueueReconciler) SetupWithManager(mgr ctrl.Manager) error { +func (r *LocalQueueReconciler) SetupWithManager(mgr ctrl.Manager, cfg *config.Configuration) error { queueCQHandler := qCQHandler{ client: r.client, } return ctrl.NewControllerManagedBy(mgr). For(&kueue.LocalQueue{}). + WithOptions(controller.Options{NeedLeaderElection: ptr.To(false)}). WatchesRawSource(&source.Channel{Source: r.wlUpdateCh}, &qWorkloadHandler{}). Watches(&kueue.ClusterQueue{}, &queueCQHandler). WithEventFilter(r). - Complete(r) + Complete(WithLeadingManager(mgr, r, &kueue.LocalQueue{}, cfg)) } func (r *LocalQueueReconciler) UpdateStatusIfChanged( diff --git a/pkg/controller/core/resourceflavor_controller.go b/pkg/controller/core/resourceflavor_controller.go index f243b9f006..1e1a0506f9 100644 --- a/pkg/controller/core/resourceflavor_controller.go +++ b/pkg/controller/core/resourceflavor_controller.go @@ -24,13 +24,16 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" + config "sigs.k8s.io/kueue/apis/config/v1beta1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/cache" "sigs.k8s.io/kueue/pkg/queue" @@ -249,15 +252,16 @@ func (h *cqHandler) Generic(_ context.Context, e event.GenericEvent, q workqueue } // SetupWithManager sets up the controller with the Manager. -func (r *ResourceFlavorReconciler) SetupWithManager(mgr ctrl.Manager) error { +func (r *ResourceFlavorReconciler) SetupWithManager(mgr ctrl.Manager, cfg *config.Configuration) error { handler := cqHandler{ cache: r.cache, } return ctrl.NewControllerManagedBy(mgr). For(&kueue.ResourceFlavor{}). + WithOptions(controller.Options{NeedLeaderElection: ptr.To(false)}). WatchesRawSource(&source.Channel{Source: r.cqUpdateCh}, &handler). WithEventFilter(r). - Complete(r) + Complete(WithLeadingManager(mgr, r, &kueue.ResourceFlavor{}, cfg)) } func resourceFlavors(cq *kueue.ClusterQueue) sets.Set[kueue.ResourceFlavorReference] { diff --git a/pkg/controller/core/workload_controller.go b/pkg/controller/core/workload_controller.go index 7ca6409a21..96cb8a3caa 100644 --- a/pkg/controller/core/workload_controller.go +++ b/pkg/controller/core/workload_controller.go @@ -35,10 +35,12 @@ import ( "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" + config "sigs.k8s.io/kueue/apis/config/v1beta1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/cache" "sigs.k8s.io/kueue/pkg/constants" @@ -489,17 +491,18 @@ func (r *WorkloadReconciler) notifyWatchers(oldWl, newWl *kueue.Workload) { } // SetupWithManager sets up the controller with the Manager. -func (r *WorkloadReconciler) SetupWithManager(mgr ctrl.Manager) error { +func (r *WorkloadReconciler) SetupWithManager(mgr ctrl.Manager, cfg *config.Configuration) error { ruh := &resourceUpdatesHandler{ r: r, } return ctrl.NewControllerManagedBy(mgr). For(&kueue.Workload{}). + WithOptions(controller.Options{NeedLeaderElection: ptr.To(false)}). Watches(&corev1.LimitRange{}, ruh). Watches(&nodev1.RuntimeClass{}, ruh). Watches(&kueue.ClusterQueue{}, &workloadCqHandler{client: r.client}). WithEventFilter(r). - Complete(r) + Complete(WithLeadingManager(mgr, r, &kueue.Workload{}, cfg)) } // admittedNotReadyWorkload returns as a pair of values. The first boolean determines diff --git a/test/e2e/config/kustomization.yaml b/test/e2e/config/kustomization.yaml index d38e4b7edd..c99cb63585 100644 --- a/test/e2e/config/kustomization.yaml +++ b/test/e2e/config/kustomization.yaml @@ -4,6 +4,10 @@ kind: Kustomization resources: - ../../../config/default +replicas: +- name: kueue-controller-manager + count: 2 + patches: - path: manager_e2e_patch.yaml target: