Skip to content

Commit

Permalink
configobserver: migrate controller to factory
Browse files Browse the repository at this point in the history
  • Loading branch information
mfojtik committed Mar 11, 2020
1 parent 38b127e commit 3f9a525
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 64 deletions.
85 changes: 23 additions & 62 deletions pkg/operator/configobserver/config_observer_controller.go
Expand Up @@ -14,20 +14,17 @@ import (
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/rand"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"

operatorv1 "github.com/openshift/api/operator/v1"

"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/condition"
"github.com/openshift/library-go/pkg/operator/events"
"github.com/openshift/library-go/pkg/operator/resourcesynccontroller"
"github.com/openshift/library-go/pkg/operator/v1helpers"
)

const configObserverWorkKey = "key"

// Listers is an interface which will be passed to the config observer funcs. It is expected to be hard-cast to the "correct" type
type Listers interface {
// ResourceSyncer can be used to copy content from one namespace to another
Expand All @@ -42,39 +39,32 @@ type Listers interface {
type ObserveConfigFunc func(listers Listers, recorder events.Recorder, existingConfig map[string]interface{}) (observedConfig map[string]interface{}, errs []error)

type ConfigObserver struct {

// observers are called in an undefined order and their results are merged to
// determine the observed configuration.
observers []ObserveConfigFunc

operatorClient v1helpers.OperatorClient
// listers are used by config observers to retrieve necessary resources
listers Listers

queue workqueue.RateLimitingInterface
eventRecorder events.Recorder
}

func NewConfigObserver(
operatorClient v1helpers.OperatorClient,
eventRecorder events.Recorder,
listers Listers,
observers ...ObserveConfigFunc,
) *ConfigObserver {
return &ConfigObserver{
) factory.Controller {
c := &ConfigObserver{
operatorClient: operatorClient,
eventRecorder: eventRecorder.WithComponentSuffix("config-observer"),

queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ConfigObserver"),

observers: observers,
listers: listers,
observers: observers,
listers: listers,
}
return factory.New().ResyncEvery(time.Second).WithSync(c.sync).WithInformers(listersToInformer(listers)...).ToController("ConfigObserver", eventRecorder.WithComponentSuffix("config-observer"))
}

// sync reacts to a change in prereqs by finding information that is required to match another value in the cluster. This
// must be information that is logically "owned" by another component.
func (c ConfigObserver) sync() error {
func (c ConfigObserver) sync(ctx context.Context, syncCtx factory.SyncContext) error {
originalSpec, _, _, err := c.operatorClient.GetOperatorState()
if err != nil {
return err
Expand All @@ -91,7 +81,7 @@ func (c ConfigObserver) sync() error {
var observedConfigs []map[string]interface{}
for _, i := range rand.Perm(len(c.observers)) {
var currErrs []error
observedConfig, currErrs := c.observers[i](c.listers, c.eventRecorder, existingConfig)
observedConfig, currErrs := c.observers[i](c.listers, syncCtx.Recorder(), existingConfig)
observedConfigs = append(observedConfigs, observedConfig)
errs = append(errs, currErrs...)
}
Expand All @@ -115,12 +105,12 @@ func (c ConfigObserver) sync() error {
}

if !equality.Semantic.DeepEqual(existingConfig, mergedObservedConfig) {
c.eventRecorder.Eventf("ObservedConfigChanged", "Writing updated observed config: %v", diff.ObjectDiff(existingConfig, mergedObservedConfig))
syncCtx.Recorder().Eventf("ObservedConfigChanged", "Writing updated observed config: %v", diff.ObjectDiff(existingConfig, mergedObservedConfig))
if _, _, err := v1helpers.UpdateSpec(c.operatorClient, v1helpers.UpdateObservedConfigFn(mergedObservedConfig)); err != nil {
// At this point we failed to write the updated config. If we are permanently broken, do not pile the errors from observers
// but instead reset the errors and only report single error condition.
errs = []error{fmt.Errorf("error writing updated observed config: %v", err)}
c.eventRecorder.Warningf("ObservedConfigWriteError", "Failed to write observed config: %v", err)
syncCtx.Recorder().Warningf("ObservedConfigWriteError", "Failed to write observed config: %v", err)
}
}
configError := v1helpers.NewMultiLineAggregate(errs)
Expand All @@ -142,52 +132,23 @@ func (c ConfigObserver) sync() error {
return configError
}

func (c *ConfigObserver) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()

klog.Infof("Starting ConfigObserver")
defer klog.Infof("Shutting down ConfigObserver")
if !cache.WaitForCacheSync(ctx.Done(), c.listers.PreRunHasSynced()...) {
utilruntime.HandleError(fmt.Errorf("caches did not sync"))
return
// listersToInformer converts the Listers interface to informer with empty AddEventHandler as we only care about synced caches in the Run.
func listersToInformer(l Listers) []factory.Informer {
result := make([]factory.Informer, len(l.PreRunHasSynced()))
for i := range l.PreRunHasSynced() {
result[i] = &listerInformer{cacheSynced: l.PreRunHasSynced()[i]}
}

// doesn't matter what workers say, only start one.
go wait.UntilWithContext(ctx, c.runWorker, time.Second)

<-ctx.Done()
return result
}

func (c *ConfigObserver) runWorker(_ context.Context) {
for c.processNextWorkItem() {
}
type listerInformer struct {
cacheSynced cache.InformerSynced
}

func (c *ConfigObserver) processNextWorkItem() bool {
dsKey, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(dsKey)

err := c.sync()
if err == nil {
c.queue.Forget(dsKey)
return true
}

utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err))
c.queue.AddRateLimited(dsKey)

return true
func (l *listerInformer) AddEventHandler(cache.ResourceEventHandler) {
return
}

// eventHandler queues the operator to check spec and status
func (c *ConfigObserver) EventHandler() cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { c.queue.Add(configObserverWorkKey) },
UpdateFunc: func(old, new interface{}) { c.queue.Add(configObserverWorkKey) },
DeleteFunc: func(obj interface{}) { c.queue.Add(configObserverWorkKey) },
}
func (l *listerInformer) HasSynced() bool {
return l.cacheSynced()
}
@@ -1,11 +1,13 @@
package configobserver

import (
"context"
"fmt"
"reflect"
"strings"
"testing"

"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/condition"
"github.com/openshift/library-go/pkg/operator/resourcesynccontroller"

Expand All @@ -20,6 +22,7 @@ import (
"k8s.io/client-go/tools/cache"

operatorv1 "github.com/openshift/api/operator/v1"

"github.com/openshift/library-go/pkg/operator/events"
"github.com/openshift/library-go/pkg/operator/v1helpers"
)
Expand Down Expand Up @@ -208,9 +211,8 @@ func TestSyncStatus(t *testing.T) {
listers: &fakeLister{},
operatorClient: operatorConfigClient,
observers: tc.observers,
eventRecorder: events.NewRecorder(eventClient.CoreV1().Events("test"), "test-operator", &corev1.ObjectReference{}),
}
err := configObserver.sync()
err := configObserver.sync(context.TODO(), factory.NewSyncContext("test", events.NewRecorder(eventClient.CoreV1().Events("test"), "test-operator", &corev1.ObjectReference{})))
if tc.expectError && err == nil {
t.Fatal("error expected")
}
Expand Down

0 comments on commit 3f9a525

Please sign in to comment.