From 205f3b931cd690a7aecc7f69425925d4bcf8d989 Mon Sep 17 00:00:00 2001 From: "W. Trevor King" Date: Tue, 9 Aug 2022 15:15:10 -0700 Subject: [PATCH] pkg/cvo/sync_worker: Trigger new sync round on ClusterOperator versions[name=operator] changes David and Stephen identified an uneccessary delay [1]: * 9:42:00, CVO gives up on Kube API server ClusterOperator [2] * 9:42:47, Kube API server operator achieves 4.12 [3] * 9:46:22, after a cool-off sleep, the CVO starts in on a new manifest graph-walk attempt [4] * 9:46:34, CVO notices that the Kube API server ClusterOperator is happy [5] The 3+ minute delay from 9:42:47 to 9:46:22 is not helpful, and we've probably had delays like this since my old e02d1489a5 (pkg/cvo/internal/operatorstatus: Replace wait-for with single-shot "is it alive now?", 2021-05-13, #560), which landed in 4.6. This commit introduces a "ClusterOperator bumped versions[name=operator]" trigger to break out of the cool-off sleep. There's plenty of room to be more precise here. For example, you could currently have a versions[name=operator] bump during the sync loop that the CVO did notice, and that queued notification will break from the sleep and trigger a possible useless reconciliation round while we wait on some other resource. You could drain the notification queue before the sleep to avoid that, but you wouldn't want to drain new-work notifications, and I haven't done the work required to be able to split those apart. I'm only looking at ClusterOperator at the moment, because of the many types the CVO manages, ClusterOperator is the one we most frequently wait on, as large cluster components take their time updating. It's possible but less likely that we'd want similar triggers for additional types in the future (Deployment, etc.), if/when those types develop more elaborate "is the in-cluster resource sufficient happy?" checks. The panic-backed type casting in clusterOperatorInterfaceVersionOrDie also feel like a hack, but I wasn't able to find a cleaner way to get at the structured information I want. Improvements welcome :) [1]: https://bugzilla.redhat.com/show_bug.cgi?id=2117033#c1 [2]: From Loki: E0808 09:42:00.022500 1 task.go:117] error running apply for clusteroperator "kube-apiserver" (107 of 806): Cluster operator kube-apiserver is updating versions [3]: $ curl -s https://gcsweb-ci.apps.ci.l2s4.p1.openshiftapps.com/gcs/origin-ci-test/logs/periodic-ci-openshift-release-master-ci-4.12-upgrade-from-stable-4.11-e2e-gcp-sdn-upgrade/1556564581915037696/artifacts/e2e-gcp-sdn-upgrade/openshift-e2e-test/build-log.txt | grep 'clusteroperator/kube-apiserver versions:' Aug 08 09:33:48.603 I clusteroperator/kube-apiserver versions: raw-internal 4.11.0-rc.7 -> 4.12.0-0.ci-2022-08-07-192220 Aug 08 09:42:47.917 I clusteroperator/kube-apiserver versions: operator 4.11.0-rc.7 -> 4.12.0-0.ci-2022-08-07-192220 [4]: From Loki: I0808 09:46:22.998344 1 sync_worker.go:850] apply: 4.12.0-0.ci-2022-08-07-192220 on generation 3 in state Updating at attempt 5 [5]: From Loki: I0808 09:46:34.556374 1 sync_worker.go:973] Done syncing for clusteroperator "kube-apiserver" (107 of 806) --- pkg/cvo/cvo.go | 37 ++++++++++++++++++++++++++++++++++--- pkg/cvo/sync_test.go | 4 ++++ pkg/cvo/sync_worker.go | 24 +++++++++++++++++++----- 3 files changed, 57 insertions(+), 8 deletions(-) diff --git a/pkg/cvo/cvo.go b/pkg/cvo/cvo.go index fda11e5a3..28f7476b3 100644 --- a/pkg/cvo/cvo.go +++ b/pkg/cvo/cvo.go @@ -197,10 +197,11 @@ func New( clusterProfile: clusterProfile, } - cvInformer.Informer().AddEventHandler(optr.eventHandler()) + cvInformer.Informer().AddEventHandler(optr.clusterVersionEventHandler()) cmConfigInformer.Informer().AddEventHandler(optr.adminAcksEventHandler()) cmConfigManagedInformer.Informer().AddEventHandler(optr.adminGatesEventHandler()) + coInformer.Informer().AddEventHandler(optr.clusterOperatorEventHandler()) optr.coLister = coInformer.Lister() optr.cacheSynced = append(optr.cacheSynced, coInformer.Informer().HasSynced) @@ -457,9 +458,9 @@ func (optr *Operator) queueKey() string { return fmt.Sprintf("%s/%s", optr.namespace, optr.name) } -// eventHandler queues an update for the cluster version on any change to the given object. +// clusterVersionEventHandler queues an update for the cluster version on any change to the given object. // Callers should use this with a scoped informer. -func (optr *Operator) eventHandler() cache.ResourceEventHandler { +func (optr *Operator) clusterVersionEventHandler() cache.ResourceEventHandler { workQueueKey := optr.queueKey() return cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -478,6 +479,36 @@ func (optr *Operator) eventHandler() cache.ResourceEventHandler { } } +// clusterOperatorEventHandler queues an update for the cluster version on any change to the given object. +// Callers should use this with an informer. +func (optr *Operator) clusterOperatorEventHandler() cache.ResourceEventHandler { + return cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(old, new interface{}) { + versionName := "operator" + _, oldVersion := clusterOperatorInterfaceVersionOrDie(old, versionName) + newStruct, newVersion := clusterOperatorInterfaceVersionOrDie(new, versionName) + if optr.configSync != nil && oldVersion != newVersion { + msg := fmt.Sprintf("Cluster operator %s changed versions[name=%q] from %q to %q", newStruct.ObjectMeta.Name, versionName, oldVersion, newVersion) + optr.configSync.NotifyAboutManagedResourceActivity(new, msg) + } + }, + } +} + +func clusterOperatorInterfaceVersionOrDie(obj interface{}, name string) (*configv1.ClusterOperator, string) { + co, ok := obj.(*configv1.ClusterOperator) + if !ok { + panic(fmt.Sprintf("%v is %T, not a ClusterOperator", obj, obj)) + } + + for _, version := range co.Status.Versions { + if version.Name == name { + return co, version.Version + } + } + return co, "" +} + func (optr *Operator) worker(ctx context.Context, queue workqueue.RateLimitingInterface, syncHandler func(context.Context, string) error) { for processNextWorkItem(ctx, queue, syncHandler, optr.syncFailingStatus) { } diff --git a/pkg/cvo/sync_test.go b/pkg/cvo/sync_test.go index 721042f31..2e1403e35 100644 --- a/pkg/cvo/sync_test.go +++ b/pkg/cvo/sync_test.go @@ -476,6 +476,10 @@ func (r *fakeSyncRecorder) StatusCh() <-chan SyncWorkerStatus { return ch } +// Inform the sync worker about activity for a managed resource. +func (r *fakeSyncRecorder) NotifyAboutManagedResourceActivity(obj interface{}, message string) { +} + func (r *fakeSyncRecorder) Start(ctx context.Context, maxWorkers int, cvoOptrName string, lister configlistersv1.ClusterVersionLister) { } diff --git a/pkg/cvo/sync_worker.go b/pkg/cvo/sync_worker.go index edc65f093..c7010efec 100644 --- a/pkg/cvo/sync_worker.go +++ b/pkg/cvo/sync_worker.go @@ -34,6 +34,9 @@ type ConfigSyncWorker interface { Start(ctx context.Context, maxWorkers int, cvoOptrName string, lister configlistersv1.ClusterVersionLister) Update(ctx context.Context, generation int64, desired configv1.Update, config *configv1.ClusterVersion, state payload.State, cvoOptrName string) *SyncWorkerStatus StatusCh() <-chan SyncWorkerStatus + + // Inform the sync worker about activity for a managed resource. + NotifyAboutManagedResourceActivity(obj interface{}, msg string) } // PayloadInfo returns details about the payload when it was retrieved. @@ -163,7 +166,7 @@ type SyncWorker struct { minimumReconcileInterval time.Duration // coordination between the sync loop and external callers - notify chan struct{} + notify chan string report chan SyncWorkerStatus // lock guards changes to these fields @@ -197,7 +200,7 @@ func NewSyncWorker(retriever PayloadRetriever, builder payload.ResourceBuilder, minimumReconcileInterval: reconcileInterval, - notify: make(chan struct{}, 1), + notify: make(chan string, 1), // report is a large buffered channel to improve local testing - most consumers should invoke // Status() or use the result of calling Update() instead because the channel can be out of date // if the reader is not fast enough. @@ -225,6 +228,16 @@ func (w *SyncWorker) StatusCh() <-chan SyncWorkerStatus { return w.report } +// Inform the sync worker about activity for a managed resource. +func (w *SyncWorker) NotifyAboutManagedResourceActivity(obj interface{}, message string) { + select { + case w.notify <- message: + klog.V(2).Infof("Notify the sync worker: %s", message) + default: + klog.V(2).Info("The sync worker already has a pending notification, so do not notify about:no need to inform about: %s", message) + } +} + // syncPayload retrieves, loads, and verifies the specified payload, aka sync's the payload, whenever there is no current // payload or the current payload differs from the desired payload. Whenever a payload is sync'ed a check is made for // implicitly enabled capabilities. For the purposes of the check made here, implicitly enabled capabilities are @@ -503,8 +516,9 @@ func (w *SyncWorker) Update(ctx context.Context, generation int64, desired confi w.cancelFn() w.cancelFn = nil } + msg := "new work is available" select { - case w.notify <- struct{}{}: + case w.notify <- msg: klog.V(2).Info("Notify the sync worker that new work is available") default: klog.V(2).Info("The sync worker has already been notified that new work is available") @@ -535,8 +549,8 @@ func (w *SyncWorker) Start(ctx context.Context, maxWorkers int, cvoOptrName stri case <-next: waitingToReconcile = false klog.V(2).Infof("Wait finished") - case <-w.notify: - klog.V(2).Infof("Work updated") + case msg := <-w.notify: + klog.V(2).Info(msg) } // determine whether we need to do work