Skip to content

Commit

Permalink
Merge pull request #826 from openshift-cherrypick-robot/cherry-pick-8…
Browse files Browse the repository at this point in the history
…18-to-release-4.11

OCPBUGS-306: pkg/cvo/sync_worker: Trigger new sync round on ClusterOperator versions[name=operator] changes
  • Loading branch information
openshift-merge-robot committed Aug 24, 2022
2 parents 0520e51 + 205f3b9 commit bd8aa51
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 8 deletions.
37 changes: 34 additions & 3 deletions pkg/cvo/cvo.go
Expand Up @@ -197,10 +197,11 @@ func New(
clusterProfile: clusterProfile,
}

cvInformer.Informer().AddEventHandler(optr.eventHandler())
cvInformer.Informer().AddEventHandler(optr.clusterVersionEventHandler())
cmConfigInformer.Informer().AddEventHandler(optr.adminAcksEventHandler())
cmConfigManagedInformer.Informer().AddEventHandler(optr.adminGatesEventHandler())

coInformer.Informer().AddEventHandler(optr.clusterOperatorEventHandler())
optr.coLister = coInformer.Lister()
optr.cacheSynced = append(optr.cacheSynced, coInformer.Informer().HasSynced)

Expand Down Expand Up @@ -457,9 +458,9 @@ func (optr *Operator) queueKey() string {
return fmt.Sprintf("%s/%s", optr.namespace, optr.name)
}

// eventHandler queues an update for the cluster version on any change to the given object.
// clusterVersionEventHandler queues an update for the cluster version on any change to the given object.
// Callers should use this with a scoped informer.
func (optr *Operator) eventHandler() cache.ResourceEventHandler {
func (optr *Operator) clusterVersionEventHandler() cache.ResourceEventHandler {
workQueueKey := optr.queueKey()
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
Expand All @@ -478,6 +479,36 @@ func (optr *Operator) eventHandler() cache.ResourceEventHandler {
}
}

// clusterOperatorEventHandler queues an update for the cluster version on any change to the given object.
// Callers should use this with an informer.
func (optr *Operator) clusterOperatorEventHandler() cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
UpdateFunc: func(old, new interface{}) {
versionName := "operator"
_, oldVersion := clusterOperatorInterfaceVersionOrDie(old, versionName)
newStruct, newVersion := clusterOperatorInterfaceVersionOrDie(new, versionName)
if optr.configSync != nil && oldVersion != newVersion {
msg := fmt.Sprintf("Cluster operator %s changed versions[name=%q] from %q to %q", newStruct.ObjectMeta.Name, versionName, oldVersion, newVersion)
optr.configSync.NotifyAboutManagedResourceActivity(new, msg)
}
},
}
}

func clusterOperatorInterfaceVersionOrDie(obj interface{}, name string) (*configv1.ClusterOperator, string) {
co, ok := obj.(*configv1.ClusterOperator)
if !ok {
panic(fmt.Sprintf("%v is %T, not a ClusterOperator", obj, obj))
}

for _, version := range co.Status.Versions {
if version.Name == name {
return co, version.Version
}
}
return co, ""
}

func (optr *Operator) worker(ctx context.Context, queue workqueue.RateLimitingInterface, syncHandler func(context.Context, string) error) {
for processNextWorkItem(ctx, queue, syncHandler, optr.syncFailingStatus) {
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/cvo/sync_test.go
Expand Up @@ -476,6 +476,10 @@ func (r *fakeSyncRecorder) StatusCh() <-chan SyncWorkerStatus {
return ch
}

// Inform the sync worker about activity for a managed resource.
func (r *fakeSyncRecorder) NotifyAboutManagedResourceActivity(obj interface{}, message string) {
}

func (r *fakeSyncRecorder) Start(ctx context.Context, maxWorkers int, cvoOptrName string, lister configlistersv1.ClusterVersionLister) {
}

Expand Down
24 changes: 19 additions & 5 deletions pkg/cvo/sync_worker.go
Expand Up @@ -34,6 +34,9 @@ type ConfigSyncWorker interface {
Start(ctx context.Context, maxWorkers int, cvoOptrName string, lister configlistersv1.ClusterVersionLister)
Update(ctx context.Context, generation int64, desired configv1.Update, config *configv1.ClusterVersion, state payload.State, cvoOptrName string) *SyncWorkerStatus
StatusCh() <-chan SyncWorkerStatus

// Inform the sync worker about activity for a managed resource.
NotifyAboutManagedResourceActivity(obj interface{}, msg string)
}

// PayloadInfo returns details about the payload when it was retrieved.
Expand Down Expand Up @@ -163,7 +166,7 @@ type SyncWorker struct {
minimumReconcileInterval time.Duration

// coordination between the sync loop and external callers
notify chan struct{}
notify chan string
report chan SyncWorkerStatus

// lock guards changes to these fields
Expand Down Expand Up @@ -197,7 +200,7 @@ func NewSyncWorker(retriever PayloadRetriever, builder payload.ResourceBuilder,

minimumReconcileInterval: reconcileInterval,

notify: make(chan struct{}, 1),
notify: make(chan string, 1),
// report is a large buffered channel to improve local testing - most consumers should invoke
// Status() or use the result of calling Update() instead because the channel can be out of date
// if the reader is not fast enough.
Expand Down Expand Up @@ -225,6 +228,16 @@ func (w *SyncWorker) StatusCh() <-chan SyncWorkerStatus {
return w.report
}

// Inform the sync worker about activity for a managed resource.
func (w *SyncWorker) NotifyAboutManagedResourceActivity(obj interface{}, message string) {
select {
case w.notify <- message:
klog.V(2).Infof("Notify the sync worker: %s", message)
default:
klog.V(2).Info("The sync worker already has a pending notification, so do not notify about:no need to inform about: %s", message)
}
}

// syncPayload retrieves, loads, and verifies the specified payload, aka sync's the payload, whenever there is no current
// payload or the current payload differs from the desired payload. Whenever a payload is sync'ed a check is made for
// implicitly enabled capabilities. For the purposes of the check made here, implicitly enabled capabilities are
Expand Down Expand Up @@ -503,8 +516,9 @@ func (w *SyncWorker) Update(ctx context.Context, generation int64, desired confi
w.cancelFn()
w.cancelFn = nil
}
msg := "new work is available"
select {
case w.notify <- struct{}{}:
case w.notify <- msg:
klog.V(2).Info("Notify the sync worker that new work is available")
default:
klog.V(2).Info("The sync worker has already been notified that new work is available")
Expand Down Expand Up @@ -535,8 +549,8 @@ func (w *SyncWorker) Start(ctx context.Context, maxWorkers int, cvoOptrName stri
case <-next:
waitingToReconcile = false
klog.V(2).Infof("Wait finished")
case <-w.notify:
klog.V(2).Infof("Work updated")
case msg := <-w.notify:
klog.V(2).Info(msg)
}

// determine whether we need to do work
Expand Down

0 comments on commit bd8aa51

Please sign in to comment.