Skip to content

Commit

Permalink
Merge pull request #517 from openshift-cherrypick-robot/cherry-pick-5…
Browse files Browse the repository at this point in the history
…01-to-release-4.6

Bug 1927515: pkg/cvo: Use shutdownContext for final status synchronization
  • Loading branch information
openshift-merge-robot committed Feb 19, 2021
2 parents 934b12b + c4ddf03 commit 0932104
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 13 deletions.
29 changes: 17 additions & 12 deletions pkg/cvo/cvo.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,38 +289,43 @@ func loadConfigMapVerifierDataFromUpdate(update *payload.Update, clientBuilder s
return verifier, persister, nil
}

// Run runs the cluster version operator until stopCh is completed. Workers is ignored for now.
func (optr *Operator) Run(ctx context.Context, workers int) error {
// Run runs the cluster version operator until runContext.Done() and
// then attempts a clean shutdown limited by shutdownContext.Done().
// Assumes runContext.Done() occurs before or simultaneously with
// shutdownContext.Done().
func (optr *Operator) Run(runContext context.Context, shutdownContext context.Context, workers int) error {
defer optr.queue.ShutDown()
stopCh := ctx.Done()
stopCh := runContext.Done()
workerStopCh := make(chan struct{})

klog.Infof("Starting ClusterVersionOperator with minimum reconcile period %s", optr.minimumUpdateCheckInterval)
defer klog.Info("Shutting down ClusterVersionOperator")

if !cache.WaitForCacheSync(stopCh, optr.cacheSynced...) {
return fmt.Errorf("caches never synchronized: %w", ctx.Err())
return fmt.Errorf("caches never synchronized: %w", runContext.Err())
}

// trigger the first cluster version reconcile always
optr.queue.Add(optr.queueKey())

// start the config sync loop, and have it notify the queue when new status is detected
go runThrottledStatusNotifier(ctx, optr.statusInterval, 2, optr.configSync.StatusCh(), func() { optr.queue.Add(optr.queueKey()) })
go optr.configSync.Start(ctx, 16, optr.name, optr.cvLister)
go wait.UntilWithContext(ctx, func(ctx context.Context) { optr.worker(ctx, optr.availableUpdatesQueue, optr.availableUpdatesSync) }, time.Second)
go wait.UntilWithContext(ctx, func(ctx context.Context) { optr.worker(ctx, optr.upgradeableQueue, optr.upgradeableSync) }, time.Second)
go wait.UntilWithContext(ctx, func(ctx context.Context) {
go runThrottledStatusNotifier(runContext, optr.statusInterval, 2, optr.configSync.StatusCh(), func() { optr.queue.Add(optr.queueKey()) })
go optr.configSync.Start(runContext, 16, optr.name, optr.cvLister)
go wait.UntilWithContext(runContext, func(runContext context.Context) {
optr.worker(runContext, optr.availableUpdatesQueue, optr.availableUpdatesSync)
}, time.Second)
go wait.UntilWithContext(runContext, func(runContext context.Context) { optr.worker(runContext, optr.upgradeableQueue, optr.upgradeableSync) }, time.Second)
go wait.UntilWithContext(runContext, func(runContext context.Context) {
defer close(workerStopCh)

// run the worker, then when the queue is closed sync one final time to flush any pending status
optr.worker(ctx, optr.queue, func(ctx context.Context, key string) error { return optr.sync(ctx, key) })
if err := optr.sync(ctx, optr.queueKey()); err != nil {
optr.worker(runContext, optr.queue, func(runContext context.Context, key string) error { return optr.sync(runContext, key) })
if err := optr.sync(shutdownContext, optr.queueKey()); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to perform final sync: %v", err))
}
}, time.Second)
if optr.signatureStore != nil {
go optr.signatureStore.Run(ctx, optr.minimumUpdateCheckInterval*2)
go optr.signatureStore.Run(runContext, optr.minimumUpdateCheckInterval*2)
}

<-stopCh
Expand Down
2 changes: 1 addition & 1 deletion pkg/start/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (o *Options) run(ctx context.Context, controllerCtx *Context, lock *resourc
resultChannelCount++
go func() {
defer utilruntime.HandleCrash()
err := controllerCtx.CVO.Run(runContext, 2)
err := controllerCtx.CVO.Run(runContext, shutdownContext, 2)
resultChannel <- asyncResult{name: "main operator", error: err}
}()

Expand Down

0 comments on commit 0932104

Please sign in to comment.