diff --git a/pkg/controller/core/admissioncheck_controller.go b/pkg/controller/core/admissioncheck_controller.go index 9c53affcf7..4c771df780 100644 --- a/pkg/controller/core/admissioncheck_controller.go +++ b/pkg/controller/core/admissioncheck_controller.go @@ -23,8 +23,10 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log" @@ -65,9 +67,9 @@ func NewAdmissionCheckReconciler( } } -//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=admissionchecks,verbs=get;list;watch;create;update;patch;delete -//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=admissionchecks/status,verbs=get;update;patch -//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=admissionchecks/finalizers,verbs=update +// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=admissionchecks,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=admissionchecks/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=admissionchecks/finalizers,verbs=update // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. @@ -220,7 +222,8 @@ func (r *AdmissionCheckReconciler) SetupWithManager(mgr ctrl.Manager) error { } return ctrl.NewControllerManagedBy(mgr). For(&kueue.AdmissionCheck{}). + WithOptions(controller.Options{NeedLeaderElection: ptr.To(false)}). WatchesRawSource(&source.Channel{Source: r.cqUpdateCh}, &handler). WithEventFilter(r). - Complete(r) + Complete(WithLeadingManager(mgr, r)) } diff --git a/pkg/controller/core/clusterqueue_controller.go b/pkg/controller/core/clusterqueue_controller.go index 32b5e658a6..c089874185 100644 --- a/pkg/controller/core/clusterqueue_controller.go +++ b/pkg/controller/core/clusterqueue_controller.go @@ -30,8 +30,10 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -138,11 +140,11 @@ func NewClusterQueueReconciler( } } -//+kubebuilder:rbac:groups="",resources=namespaces,verbs=get;list;watch -//+kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch -//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=clusterqueues,verbs=get;list;watch;create;update;patch;delete -//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=clusterqueues/status,verbs=get;update;patch -//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=clusterqueues/finalizers,verbs=update +// +kubebuilder:rbac:groups="",resources=namespaces,verbs=get;list;watch +// +kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch +// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=clusterqueues,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=clusterqueues/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=clusterqueues/finalizers,verbs=update func (r *ClusterQueueReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { var cqObj kueue.ClusterQueue @@ -373,7 +375,7 @@ func clearOldResourceQuotas(oldCq, newCq *kueue.ClusterQueue) { if newFlavor, found := newFlavors[flavor.Name]; !found || len(newFlavor.Resources) == 0 { metrics.ClearClusterQueueResourceQuotas(oldCq.Name, string(flavor.Name), "") } else { - //check all resources + // check all resources newResources := slices.ToRefMap(newFlavor.Resources, func(r *kueue.ResourceQuota) corev1.ResourceName { return r.Name }) for ri := range flavor.Resources { rname := flavor.Resources[ri].Name @@ -613,13 +615,14 @@ func (r *ClusterQueueReconciler) SetupWithManager(mgr ctrl.Manager) error { } return ctrl.NewControllerManagedBy(mgr). For(&kueue.ClusterQueue{}). + WithOptions(controller.Options{NeedLeaderElection: ptr.To(false)}). Watches(&corev1.Namespace{}, &nsHandler). WatchesRawSource(&source.Channel{Source: r.wlUpdateCh}, &wHandler). WatchesRawSource(&source.Channel{Source: r.rfUpdateCh}, &rfHandler). WatchesRawSource(&source.Channel{Source: r.acUpdateCh}, &acHandler). WatchesRawSource(&source.Channel{Source: r.snapUpdateCh}, &snapHandler). WithEventFilter(r). - Complete(r) + Complete(WithLeadingManager(mgr, r)) } func (r *ClusterQueueReconciler) updateCqStatusIfChanged( diff --git a/pkg/controller/core/leader_aware_reconciler.go b/pkg/controller/core/leader_aware_reconciler.go new file mode 100644 index 0000000000..1b99324af9 --- /dev/null +++ b/pkg/controller/core/leader_aware_reconciler.go @@ -0,0 +1,52 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package core + +import ( + "context" + + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +// WithLeadingManager returns a decorating reconcile.Reconciler that skips reconciliation requests +// until the manager has been elected leader. Once the manager is elected leader, the returned +// reconcile.Reconciler simply delegates to the provided reconcile.Reconciler. +func WithLeadingManager(mgr ctrl.Manager, reconciler reconcile.Reconciler) reconcile.Reconciler { + return &leaderAwareReconciler{ + elected: mgr.Elected(), + delegate: reconciler, + } +} + +type leaderAwareReconciler struct { + elected <-chan struct{} + delegate reconcile.Reconciler +} + +var _ reconcile.Reconciler = (*leaderAwareReconciler)(nil) + +func (r *leaderAwareReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { + select { + case <-r.elected: + // The manager has been elected leader, delegate reconciliation to the provided reconciler. + return r.delegate.Reconcile(ctx, request) + default: + // The manager hasn't been elected leader yet, skip reconciliation. + return ctrl.Result{}, nil + } +} diff --git a/pkg/controller/core/localqueue_controller.go b/pkg/controller/core/localqueue_controller.go index 0a21da34ae..6351728455 100644 --- a/pkg/controller/core/localqueue_controller.go +++ b/pkg/controller/core/localqueue_controller.go @@ -27,8 +27,10 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" @@ -77,10 +79,10 @@ func (r *LocalQueueReconciler) NotifyWorkloadUpdate(oldWl, newWl *kueue.Workload } } -//+kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch -//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=localqueues,verbs=get;list;watch;create;update;patch;delete -//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=localqueues/status,verbs=get;update;patch -//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=localqueues/finalizers,verbs=update +// +kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch +// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=localqueues,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=localqueues/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=localqueues/finalizers,verbs=update func (r *LocalQueueReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { var queueObj kueue.LocalQueue @@ -254,10 +256,11 @@ func (r *LocalQueueReconciler) SetupWithManager(mgr ctrl.Manager) error { } return ctrl.NewControllerManagedBy(mgr). For(&kueue.LocalQueue{}). + WithOptions(controller.Options{NeedLeaderElection: ptr.To(false)}). WatchesRawSource(&source.Channel{Source: r.wlUpdateCh}, &qWorkloadHandler{}). Watches(&kueue.ClusterQueue{}, &queueCQHandler). WithEventFilter(r). - Complete(r) + Complete(WithLeadingManager(mgr, r)) } func (r *LocalQueueReconciler) UpdateStatusIfChanged( diff --git a/pkg/controller/core/resourceflavor_controller.go b/pkg/controller/core/resourceflavor_controller.go index 47a733bb3a..da6405846e 100644 --- a/pkg/controller/core/resourceflavor_controller.go +++ b/pkg/controller/core/resourceflavor_controller.go @@ -24,8 +24,10 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -64,8 +66,8 @@ func NewResourceFlavorReconciler( } } -//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=resourceflavors,verbs=get;list;watch;update;delete -//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=resourceflavors/finalizers,verbs=update +// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=resourceflavors,verbs=get;list;watch;update;delete +// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=resourceflavors/finalizers,verbs=update func (r *ResourceFlavorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { var flavor kueue.ResourceFlavor @@ -255,9 +257,10 @@ func (r *ResourceFlavorReconciler) SetupWithManager(mgr ctrl.Manager) error { } return ctrl.NewControllerManagedBy(mgr). For(&kueue.ResourceFlavor{}). + WithOptions(controller.Options{NeedLeaderElection: ptr.To(false)}). WatchesRawSource(&source.Channel{Source: r.cqUpdateCh}, &handler). WithEventFilter(r). - Complete(r) + Complete(WithLeadingManager(mgr, r)) } func resourceFlavors(cq *kueue.ClusterQueue) sets.Set[kueue.ResourceFlavorReference] { diff --git a/pkg/controller/core/workload_controller.go b/pkg/controller/core/workload_controller.go index e5457a4f17..7c8a391452 100644 --- a/pkg/controller/core/workload_controller.go +++ b/pkg/controller/core/workload_controller.go @@ -35,6 +35,7 @@ import ( "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -116,12 +117,12 @@ func NewWorkloadReconciler(client client.Client, queues *queue.Manager, cache *c } } -//+kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch -//+kubebuilder:rbac:groups="",resources=limitranges,verbs=get;list;watch -//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads,verbs=get;list;watch;create;update;patch;delete -//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads/status,verbs=get;update;patch -//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads/finalizers,verbs=update -//+kubebuilder:rbac:groups=node.k8s.io,resources=runtimeclasses,verbs=get;list;watch +// +kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch +// +kubebuilder:rbac:groups="",resources=limitranges,verbs=get;list;watch +// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads/finalizers,verbs=update +// +kubebuilder:rbac:groups=node.k8s.io,resources=runtimeclasses,verbs=get;list;watch func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { var wl kueue.Workload @@ -495,11 +496,12 @@ func (r *WorkloadReconciler) SetupWithManager(mgr ctrl.Manager) error { } return ctrl.NewControllerManagedBy(mgr). For(&kueue.Workload{}). + WithOptions(controller.Options{NeedLeaderElection: ptr.To(false)}). Watches(&corev1.LimitRange{}, ruh). Watches(&nodev1.RuntimeClass{}, ruh). Watches(&kueue.ClusterQueue{}, &workloadCqHandler{client: r.client}). WithEventFilter(r). - Complete(r) + Complete(WithLeadingManager(mgr, r)) } // admittedNotReadyWorkload returns as a pair of values. The first boolean determines