Skip to content

Commit

Permalink
pkg/cvo: Use shutdownContext for final status synchronization
Browse files Browse the repository at this point in the history
Address a bug introduced by cc1921d (pkg/start: Release leader
lease on graceful shutdown, 2020-08-03, openshift#424), where canceling the
Operator.Run context would leave the operator with no time to attempt
the final sync [1]:

  E0119 22:24:15.924216       1 cvo.go:344] unable to perform final sync: context canceled

With this commit, I'm piping through shutdownContext, which gets a
two-minute grace period beyond runContext, to give the operator time
to push out that final status (which may include important information
like the fact that the incoming release image has completed
verification).

[1]: https://bugzilla.redhat.com/show_bug.cgi?id=1916384#c10
  • Loading branch information
wking authored and openshift-cherrypick-robot committed Feb 10, 2021
1 parent 94358e8 commit c4ddf03
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 13 deletions.
29 changes: 17 additions & 12 deletions pkg/cvo/cvo.go
Expand Up @@ -289,38 +289,43 @@ func loadConfigMapVerifierDataFromUpdate(update *payload.Update, clientBuilder s
return verifier, persister, nil
}

// Run runs the cluster version operator until stopCh is completed. Workers is ignored for now.
func (optr *Operator) Run(ctx context.Context, workers int) error {
// Run runs the cluster version operator until runContext.Done() and
// then attempts a clean shutdown limited by shutdownContext.Done().
// Assumes runContext.Done() occurs before or simultaneously with
// shutdownContext.Done().
func (optr *Operator) Run(runContext context.Context, shutdownContext context.Context, workers int) error {
defer optr.queue.ShutDown()
stopCh := ctx.Done()
stopCh := runContext.Done()
workerStopCh := make(chan struct{})

klog.Infof("Starting ClusterVersionOperator with minimum reconcile period %s", optr.minimumUpdateCheckInterval)
defer klog.Info("Shutting down ClusterVersionOperator")

if !cache.WaitForCacheSync(stopCh, optr.cacheSynced...) {
return fmt.Errorf("caches never synchronized: %w", ctx.Err())
return fmt.Errorf("caches never synchronized: %w", runContext.Err())
}

// trigger the first cluster version reconcile always
optr.queue.Add(optr.queueKey())

// start the config sync loop, and have it notify the queue when new status is detected
go runThrottledStatusNotifier(ctx, optr.statusInterval, 2, optr.configSync.StatusCh(), func() { optr.queue.Add(optr.queueKey()) })
go optr.configSync.Start(ctx, 16, optr.name, optr.cvLister)
go wait.UntilWithContext(ctx, func(ctx context.Context) { optr.worker(ctx, optr.availableUpdatesQueue, optr.availableUpdatesSync) }, time.Second)
go wait.UntilWithContext(ctx, func(ctx context.Context) { optr.worker(ctx, optr.upgradeableQueue, optr.upgradeableSync) }, time.Second)
go wait.UntilWithContext(ctx, func(ctx context.Context) {
go runThrottledStatusNotifier(runContext, optr.statusInterval, 2, optr.configSync.StatusCh(), func() { optr.queue.Add(optr.queueKey()) })
go optr.configSync.Start(runContext, 16, optr.name, optr.cvLister)
go wait.UntilWithContext(runContext, func(runContext context.Context) {
optr.worker(runContext, optr.availableUpdatesQueue, optr.availableUpdatesSync)
}, time.Second)
go wait.UntilWithContext(runContext, func(runContext context.Context) { optr.worker(runContext, optr.upgradeableQueue, optr.upgradeableSync) }, time.Second)
go wait.UntilWithContext(runContext, func(runContext context.Context) {
defer close(workerStopCh)

// run the worker, then when the queue is closed sync one final time to flush any pending status
optr.worker(ctx, optr.queue, func(ctx context.Context, key string) error { return optr.sync(ctx, key) })
if err := optr.sync(ctx, optr.queueKey()); err != nil {
optr.worker(runContext, optr.queue, func(runContext context.Context, key string) error { return optr.sync(runContext, key) })
if err := optr.sync(shutdownContext, optr.queueKey()); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to perform final sync: %v", err))
}
}, time.Second)
if optr.signatureStore != nil {
go optr.signatureStore.Run(ctx, optr.minimumUpdateCheckInterval*2)
go optr.signatureStore.Run(runContext, optr.minimumUpdateCheckInterval*2)
}

<-stopCh
Expand Down
2 changes: 1 addition & 1 deletion pkg/start/start.go
Expand Up @@ -233,7 +233,7 @@ func (o *Options) run(ctx context.Context, controllerCtx *Context, lock *resourc
resultChannelCount++
go func() {
defer utilruntime.HandleCrash()
err := controllerCtx.CVO.Run(runContext, 2)
err := controllerCtx.CVO.Run(runContext, shutdownContext, 2)
resultChannel <- asyncResult{name: "main operator", error: err}
}()

Expand Down

0 comments on commit c4ddf03

Please sign in to comment.