Skip to content

Commit

Permalink
pkg/cvo: Use shutdownContext for final status synchronization
Browse files Browse the repository at this point in the history
Address a bug introduced by cc1921d (pkg/start: Release leader
lease on graceful shutdown, 2020-08-03, openshift#424), where canceling the
Operator.Run context would leave the operator with no time to attempt
the final sync [1]:

  E0119 22:24:15.924216       1 cvo.go:344] unable to perform final sync: context canceled

With this commit, I'm piping through shutdownContext, which gets a
two-minute grace period beyond runContext, to give the operator time
to push out that final status (which may include important information
like the fact that the incoming release image has completed
verification).

---

This commit picks c4ddf03 (pkg/cvo: Use shutdownContext for final
status synchronization, 2021-01-19, openshift#517) back to 4.5.  It's not a
clean pick, because we're missing changes like:

* b72e843 (Bug 1822844: Block z level upgrades if
  ClusterVersionOverridesSet set, 2020-04-30, openshift#364).
* 1d1de3b (Use context to add timeout to cincinnati HTTP request,
  2019-01-15, openshift#410).

which also touched these lines.  But we've gotten this far without
backporting rhbz#1822844, and openshift#410 was never associated with a bug in
the first place, so instead of pulling back more of 4.6 to get a clean
pick, I've just manually reconciled the pick conflicts.

Removing Start from pkg/start (again) fixes a buggy re-introduction in
the manually-backported 20421b6 (*: Add lots of Context and options
arguments, 2020-07-24, openshift#470).

[1]: https://bugzilla.redhat.com/show_bug.cgi?id=1916384#c10
  • Loading branch information
wking committed Feb 23, 2021
1 parent 0a34ac3 commit 785b7b4
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 26 deletions.
29 changes: 17 additions & 12 deletions pkg/cvo/cvo.go
Expand Up @@ -314,38 +314,43 @@ func loadConfigMapVerifierDataFromUpdate(update *payload.Update, clientBuilder v
return nil, nil, nil
}

// Run runs the cluster version operator until stopCh is completed. Workers is ignored for now.
func (optr *Operator) Run(ctx context.Context, workers int) error {
// Run runs the cluster version operator until runContext.Done() and
// then attempts a clean shutdown limited by shutdownContext.Done().
// Assumes runContext.Done() occurs before or simultaneously with
// shutdownContext.Done().
func (optr *Operator) Run(runContext context.Context, shutdownContext context.Context, workers int) error {
defer optr.queue.ShutDown()
stopCh := ctx.Done()
stopCh := runContext.Done()
workerStopCh := make(chan struct{})

klog.Infof("Starting ClusterVersionOperator with minimum reconcile period %s", optr.minimumUpdateCheckInterval)
defer klog.Info("Shutting down ClusterVersionOperator")

if !cache.WaitForCacheSync(stopCh, optr.cacheSynced...) {
return fmt.Errorf("caches never synchronized: %w", ctx.Err())
return fmt.Errorf("caches never synchronized: %w", runContext.Err())
}

// trigger the first cluster version reconcile always
optr.queue.Add(optr.queueKey())

// start the config sync loop, and have it notify the queue when new status is detected
go runThrottledStatusNotifier(stopCh, optr.statusInterval, 2, optr.configSync.StatusCh(), func() { optr.queue.Add(optr.queueKey()) })
go optr.configSync.Start(ctx, 16)
go wait.Until(func() { optr.worker(ctx, optr.availableUpdatesQueue, optr.availableUpdatesSync) }, time.Second, stopCh)
go wait.Until(func() { optr.worker(ctx, optr.upgradeableQueue, optr.upgradeableSync) }, time.Second, stopCh)
go wait.Until(func() {
go optr.configSync.Start(runContext, 16)
go wait.UntilWithContext(runContext, func(runContext context.Context) {
optr.worker(runContext, optr.availableUpdatesQueue, optr.availableUpdatesSync)
}, time.Second)
go wait.UntilWithContext(runContext, func(runContext context.Context) { optr.worker(runContext, optr.upgradeableQueue, optr.upgradeableSync) }, time.Second)
go wait.UntilWithContext(runContext, func(runContext context.Context) {
defer close(workerStopCh)

// run the worker, then when the queue is closed sync one final time to flush any pending status
optr.worker(ctx, optr.queue, func(key string) error { return optr.sync(ctx, key) })
if err := optr.sync(ctx, optr.queueKey()); err != nil {
optr.worker(runContext, optr.queue, func(key string) error { return optr.sync(runContext, key) })
if err := optr.sync(shutdownContext, optr.queueKey()); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to perform final sync: %v", err))
}
}, time.Second, stopCh)
}, time.Second)
if optr.signatureStore != nil {
go optr.signatureStore.Run(ctx, optr.minimumUpdateCheckInterval*2)
go optr.signatureStore.Run(runContext, optr.minimumUpdateCheckInterval*2)
}

<-stopCh
Expand Down
15 changes: 1 addition & 14 deletions pkg/start/start.go
Expand Up @@ -187,7 +187,7 @@ func (o *Options) run(ctx context.Context, controllerCtx *Context, lock *resourc
resultChannelCount++
go func() {
defer utilruntime.HandleCrash()
err := controllerCtx.CVO.Run(runContext, 2)
err := controllerCtx.CVO.Run(runContext, shutdownContext, 2)
resultChannel <- asyncResult{name: "main operator", error: err}
}()

Expand Down Expand Up @@ -403,16 +403,3 @@ func (o *Options) NewControllerContext(cb *ClientBuilder) *Context {
}
return ctx
}

// Start launches the controllers in the provided context and any supporting
// infrastructure. When ch is closed the controllers will be shut down.
func (c *Context) Start(ctx context.Context) {
ch := ctx.Done()
go c.CVO.Run(ctx, 2)
if c.AutoUpdate != nil {
go c.AutoUpdate.Run(ctx, 2)
}
c.CVInformerFactory.Start(ch)
c.OpenshiftConfigInformerFactory.Start(ch)
c.InformerFactory.Start(ch)
}

0 comments on commit 785b7b4

Please sign in to comment.