diff --git a/pkg/controller/core/admissioncheck_controller.go b/pkg/controller/core/admissioncheck_controller.go index 9c53affcf7..15afea138f 100644 --- a/pkg/controller/core/admissioncheck_controller.go +++ b/pkg/controller/core/admissioncheck_controller.go @@ -23,8 +23,10 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log" @@ -220,7 +222,8 @@ func (r *AdmissionCheckReconciler) SetupWithManager(mgr ctrl.Manager) error { } return ctrl.NewControllerManagedBy(mgr). For(&kueue.AdmissionCheck{}). + WithOptions(controller.Options{NeedLeaderElection: ptr.To(false)}). WatchesRawSource(&source.Channel{Source: r.cqUpdateCh}, &handler). WithEventFilter(r). - Complete(r) + Complete(WithLeadingManager(mgr, r)) } diff --git a/pkg/controller/core/clusterqueue_controller.go b/pkg/controller/core/clusterqueue_controller.go index 32b5e658a6..5e53494065 100644 --- a/pkg/controller/core/clusterqueue_controller.go +++ b/pkg/controller/core/clusterqueue_controller.go @@ -30,8 +30,10 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -613,13 +615,14 @@ func (r *ClusterQueueReconciler) SetupWithManager(mgr ctrl.Manager) error { } return ctrl.NewControllerManagedBy(mgr). For(&kueue.ClusterQueue{}). + WithOptions(controller.Options{NeedLeaderElection: ptr.To(false)}). Watches(&corev1.Namespace{}, &nsHandler). WatchesRawSource(&source.Channel{Source: r.wlUpdateCh}, &wHandler). WatchesRawSource(&source.Channel{Source: r.rfUpdateCh}, &rfHandler). WatchesRawSource(&source.Channel{Source: r.acUpdateCh}, &acHandler). WatchesRawSource(&source.Channel{Source: r.snapUpdateCh}, &snapHandler). WithEventFilter(r). - Complete(r) + Complete(WithLeadingManager(mgr, r)) } func (r *ClusterQueueReconciler) updateCqStatusIfChanged( diff --git a/pkg/controller/core/leader_aware_reconciler.go b/pkg/controller/core/leader_aware_reconciler.go new file mode 100644 index 0000000000..1b99324af9 --- /dev/null +++ b/pkg/controller/core/leader_aware_reconciler.go @@ -0,0 +1,52 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package core + +import ( + "context" + + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +// WithLeadingManager returns a decorating reconcile.Reconciler that skips reconciliation requests +// until the manager has been elected leader. Once the manager is elected leader, the returned +// reconcile.Reconciler simply delegates to the provided reconcile.Reconciler. +func WithLeadingManager(mgr ctrl.Manager, reconciler reconcile.Reconciler) reconcile.Reconciler { + return &leaderAwareReconciler{ + elected: mgr.Elected(), + delegate: reconciler, + } +} + +type leaderAwareReconciler struct { + elected <-chan struct{} + delegate reconcile.Reconciler +} + +var _ reconcile.Reconciler = (*leaderAwareReconciler)(nil) + +func (r *leaderAwareReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { + select { + case <-r.elected: + // The manager has been elected leader, delegate reconciliation to the provided reconciler. + return r.delegate.Reconcile(ctx, request) + default: + // The manager hasn't been elected leader yet, skip reconciliation. + return ctrl.Result{}, nil + } +} diff --git a/pkg/controller/core/localqueue_controller.go b/pkg/controller/core/localqueue_controller.go index 0a21da34ae..a3108c864f 100644 --- a/pkg/controller/core/localqueue_controller.go +++ b/pkg/controller/core/localqueue_controller.go @@ -27,8 +27,10 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" @@ -254,10 +256,11 @@ func (r *LocalQueueReconciler) SetupWithManager(mgr ctrl.Manager) error { } return ctrl.NewControllerManagedBy(mgr). For(&kueue.LocalQueue{}). + WithOptions(controller.Options{NeedLeaderElection: ptr.To(false)}). WatchesRawSource(&source.Channel{Source: r.wlUpdateCh}, &qWorkloadHandler{}). Watches(&kueue.ClusterQueue{}, &queueCQHandler). WithEventFilter(r). - Complete(r) + Complete(WithLeadingManager(mgr, r)) } func (r *LocalQueueReconciler) UpdateStatusIfChanged( diff --git a/pkg/controller/core/resourceflavor_controller.go b/pkg/controller/core/resourceflavor_controller.go index 47a733bb3a..385026c8e9 100644 --- a/pkg/controller/core/resourceflavor_controller.go +++ b/pkg/controller/core/resourceflavor_controller.go @@ -24,8 +24,10 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -255,9 +257,10 @@ func (r *ResourceFlavorReconciler) SetupWithManager(mgr ctrl.Manager) error { } return ctrl.NewControllerManagedBy(mgr). For(&kueue.ResourceFlavor{}). + WithOptions(controller.Options{NeedLeaderElection: ptr.To(false)}). WatchesRawSource(&source.Channel{Source: r.cqUpdateCh}, &handler). WithEventFilter(r). - Complete(r) + Complete(WithLeadingManager(mgr, r)) } func resourceFlavors(cq *kueue.ClusterQueue) sets.Set[kueue.ResourceFlavorReference] { diff --git a/pkg/controller/core/workload_controller.go b/pkg/controller/core/workload_controller.go index e5457a4f17..9d85c4076d 100644 --- a/pkg/controller/core/workload_controller.go +++ b/pkg/controller/core/workload_controller.go @@ -35,6 +35,7 @@ import ( "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -495,11 +496,12 @@ func (r *WorkloadReconciler) SetupWithManager(mgr ctrl.Manager) error { } return ctrl.NewControllerManagedBy(mgr). For(&kueue.Workload{}). + WithOptions(controller.Options{NeedLeaderElection: ptr.To(false)}). Watches(&corev1.LimitRange{}, ruh). Watches(&nodev1.RuntimeClass{}, ruh). Watches(&kueue.ClusterQueue{}, &workloadCqHandler{client: r.client}). WithEventFilter(r). - Complete(r) + Complete(WithLeadingManager(mgr, r)) } // admittedNotReadyWorkload returns as a pair of values. The first boolean determines