Skip to content

Commit

Permalink
pkg/cvo/sync_worker: Trigger new sync round on ClusterOperator versio…
Browse files Browse the repository at this point in the history
…ns[name=operator] changes

David and Stephen identified an uneccessary delay [1]:

* 9:42:00, CVO gives up on Kube API server ClusterOperator [2]
* 9:42:47, Kube API server operator achieves 4.12 [3]
* 9:46:22, after a cool-off sleep, the CVO starts in on a new manifest graph-walk attempt [4]
* 9:46:34, CVO notices that the Kube API server ClusterOperator is happy [5]

The 3+ minute delay from 9:42:47 to 9:46:22 is not helpful, and we've
probably had delays like this since my old e02d148
(pkg/cvo/internal/operatorstatus: Replace wait-for with single-shot
"is it alive now?", 2021-05-13, #560), which landed in 4.6.

This commit introduces a "ClusterOperator bumped
versions[name=operator]" trigger to break out of the cool-off sleep.

There's plenty of room to be more precise here.  For example, you
could currently have a versions[name=operator] bump during the sync
loop that the CVO did notice, and that queued notification will break
from the sleep and trigger a possible useless reconciliation round
while we wait on some other resource.  You could drain the
notification queue before the sleep to avoid that, but you wouldn't
want to drain new-work notifications, and I haven't done the work
required to be able to split those apart.

I'm only looking at ClusterOperator at the moment, because of the many
types the CVO manages, ClusterOperator is the one we most frequently
wait on, as large cluster components take their time updating.  It's
possible but less likely that we'd want similar triggers for
additional types in the future (Deployment, etc.), if/when those types
develop more elaborate "is the in-cluster resource sufficient happy?"
checks.

The panic-backed type casting in clusterOperatorInterfaceVersionOrDie
also feel like a hack, but I wasn't able to find a cleaner way to get
at the structured information I want.  Improvements welcome :)

[1]: https://bugzilla.redhat.com/show_bug.cgi?id=2117033#c1
[2]: From Loki: E0808 09:42:00.022500       1 task.go:117] error running apply for clusteroperator "kube-apiserver" (107 of 806): Cluster operator kube-apiserver is updating versions
[3]: $ curl -s https://gcsweb-ci.apps.ci.l2s4.p1.openshiftapps.com/gcs/origin-ci-test/logs/periodic-ci-openshift-release-master-ci-4.12-upgrade-from-stable-4.11-e2e-gcp-sdn-upgrade/1556564581915037696/artifacts/e2e-gcp-sdn-upgrade/openshift-e2e-test/build-log.txt | grep 'clusteroperator/kube-apiserver versions:'
     Aug 08 09:33:48.603 I clusteroperator/kube-apiserver versions: raw-internal 4.11.0-rc.7 -> 4.12.0-0.ci-2022-08-07-192220
     Aug 08 09:42:47.917 I clusteroperator/kube-apiserver versions: operator 4.11.0-rc.7 -> 4.12.0-0.ci-2022-08-07-192220
[4]: From Loki: I0808 09:46:22.998344       1 sync_worker.go:850] apply: 4.12.0-0.ci-2022-08-07-192220 on generation 3 in state Updating at attempt 5
[5]: From Loki: I0808 09:46:34.556374       1 sync_worker.go:973] Done syncing for clusteroperator "kube-apiserver" (107 of 806)
  • Loading branch information
wking authored and openshift-cherrypick-robot committed Aug 18, 2022
1 parent 0520e51 commit 205f3b9
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 8 deletions.
37 changes: 34 additions & 3 deletions pkg/cvo/cvo.go
Expand Up @@ -197,10 +197,11 @@ func New(
clusterProfile: clusterProfile,
}

cvInformer.Informer().AddEventHandler(optr.eventHandler())
cvInformer.Informer().AddEventHandler(optr.clusterVersionEventHandler())
cmConfigInformer.Informer().AddEventHandler(optr.adminAcksEventHandler())
cmConfigManagedInformer.Informer().AddEventHandler(optr.adminGatesEventHandler())

coInformer.Informer().AddEventHandler(optr.clusterOperatorEventHandler())
optr.coLister = coInformer.Lister()
optr.cacheSynced = append(optr.cacheSynced, coInformer.Informer().HasSynced)

Expand Down Expand Up @@ -457,9 +458,9 @@ func (optr *Operator) queueKey() string {
return fmt.Sprintf("%s/%s", optr.namespace, optr.name)
}

// eventHandler queues an update for the cluster version on any change to the given object.
// clusterVersionEventHandler queues an update for the cluster version on any change to the given object.
// Callers should use this with a scoped informer.
func (optr *Operator) eventHandler() cache.ResourceEventHandler {
func (optr *Operator) clusterVersionEventHandler() cache.ResourceEventHandler {
workQueueKey := optr.queueKey()
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
Expand All @@ -478,6 +479,36 @@ func (optr *Operator) eventHandler() cache.ResourceEventHandler {
}
}

// clusterOperatorEventHandler queues an update for the cluster version on any change to the given object.
// Callers should use this with an informer.
func (optr *Operator) clusterOperatorEventHandler() cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
UpdateFunc: func(old, new interface{}) {
versionName := "operator"
_, oldVersion := clusterOperatorInterfaceVersionOrDie(old, versionName)
newStruct, newVersion := clusterOperatorInterfaceVersionOrDie(new, versionName)
if optr.configSync != nil && oldVersion != newVersion {
msg := fmt.Sprintf("Cluster operator %s changed versions[name=%q] from %q to %q", newStruct.ObjectMeta.Name, versionName, oldVersion, newVersion)
optr.configSync.NotifyAboutManagedResourceActivity(new, msg)
}
},
}
}

func clusterOperatorInterfaceVersionOrDie(obj interface{}, name string) (*configv1.ClusterOperator, string) {
co, ok := obj.(*configv1.ClusterOperator)
if !ok {
panic(fmt.Sprintf("%v is %T, not a ClusterOperator", obj, obj))
}

for _, version := range co.Status.Versions {
if version.Name == name {
return co, version.Version
}
}
return co, ""
}

func (optr *Operator) worker(ctx context.Context, queue workqueue.RateLimitingInterface, syncHandler func(context.Context, string) error) {
for processNextWorkItem(ctx, queue, syncHandler, optr.syncFailingStatus) {
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/cvo/sync_test.go
Expand Up @@ -476,6 +476,10 @@ func (r *fakeSyncRecorder) StatusCh() <-chan SyncWorkerStatus {
return ch
}

// Inform the sync worker about activity for a managed resource.
func (r *fakeSyncRecorder) NotifyAboutManagedResourceActivity(obj interface{}, message string) {
}

func (r *fakeSyncRecorder) Start(ctx context.Context, maxWorkers int, cvoOptrName string, lister configlistersv1.ClusterVersionLister) {
}

Expand Down
24 changes: 19 additions & 5 deletions pkg/cvo/sync_worker.go
Expand Up @@ -34,6 +34,9 @@ type ConfigSyncWorker interface {
Start(ctx context.Context, maxWorkers int, cvoOptrName string, lister configlistersv1.ClusterVersionLister)
Update(ctx context.Context, generation int64, desired configv1.Update, config *configv1.ClusterVersion, state payload.State, cvoOptrName string) *SyncWorkerStatus
StatusCh() <-chan SyncWorkerStatus

// Inform the sync worker about activity for a managed resource.
NotifyAboutManagedResourceActivity(obj interface{}, msg string)
}

// PayloadInfo returns details about the payload when it was retrieved.
Expand Down Expand Up @@ -163,7 +166,7 @@ type SyncWorker struct {
minimumReconcileInterval time.Duration

// coordination between the sync loop and external callers
notify chan struct{}
notify chan string
report chan SyncWorkerStatus

// lock guards changes to these fields
Expand Down Expand Up @@ -197,7 +200,7 @@ func NewSyncWorker(retriever PayloadRetriever, builder payload.ResourceBuilder,

minimumReconcileInterval: reconcileInterval,

notify: make(chan struct{}, 1),
notify: make(chan string, 1),
// report is a large buffered channel to improve local testing - most consumers should invoke
// Status() or use the result of calling Update() instead because the channel can be out of date
// if the reader is not fast enough.
Expand Down Expand Up @@ -225,6 +228,16 @@ func (w *SyncWorker) StatusCh() <-chan SyncWorkerStatus {
return w.report
}

// Inform the sync worker about activity for a managed resource.
func (w *SyncWorker) NotifyAboutManagedResourceActivity(obj interface{}, message string) {
select {
case w.notify <- message:
klog.V(2).Infof("Notify the sync worker: %s", message)
default:
klog.V(2).Info("The sync worker already has a pending notification, so do not notify about:no need to inform about: %s", message)
}
}

// syncPayload retrieves, loads, and verifies the specified payload, aka sync's the payload, whenever there is no current
// payload or the current payload differs from the desired payload. Whenever a payload is sync'ed a check is made for
// implicitly enabled capabilities. For the purposes of the check made here, implicitly enabled capabilities are
Expand Down Expand Up @@ -503,8 +516,9 @@ func (w *SyncWorker) Update(ctx context.Context, generation int64, desired confi
w.cancelFn()
w.cancelFn = nil
}
msg := "new work is available"
select {
case w.notify <- struct{}{}:
case w.notify <- msg:
klog.V(2).Info("Notify the sync worker that new work is available")
default:
klog.V(2).Info("The sync worker has already been notified that new work is available")
Expand Down Expand Up @@ -535,8 +549,8 @@ func (w *SyncWorker) Start(ctx context.Context, maxWorkers int, cvoOptrName stri
case <-next:
waitingToReconcile = false
klog.V(2).Infof("Wait finished")
case <-w.notify:
klog.V(2).Infof("Work updated")
case msg := <-w.notify:
klog.V(2).Info(msg)
}

// determine whether we need to do work
Expand Down

0 comments on commit 205f3b9

Please sign in to comment.