Skip to content

Commit

Permalink
operator: refactor all static pods controller to use controller factory
Browse files Browse the repository at this point in the history
  • Loading branch information
mfojtik committed Jan 27, 2020
1 parent 31f2f15 commit 3816ea7
Show file tree
Hide file tree
Showing 20 changed files with 197 additions and 972 deletions.
89 changes: 7 additions & 82 deletions pkg/operator/loglevel/logging_controller.go
Expand Up @@ -2,53 +2,27 @@ package loglevel

import (
"context"
"fmt"
"time"

utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"

operatorv1 "github.com/openshift/api/operator/v1"

"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
operatorv1helpers "github.com/openshift/library-go/pkg/operator/v1helpers"
)

var workQueueKey = "instance"

type LogLevelController struct {
operatorClient operatorv1helpers.OperatorClient

cachesToSync []cache.InformerSynced
queue workqueue.RateLimitingInterface
eventRecorder events.Recorder
}

// sets the klog level based on desired state
func NewClusterOperatorLoggingController(
operatorClient operatorv1helpers.OperatorClient,
recorder events.Recorder,
) *LogLevelController {
c := &LogLevelController{
operatorClient: operatorClient,
eventRecorder: recorder.WithComponentSuffix("loglevel-controller"),

queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "LoggingSyncer"),
}

operatorClient.Informer().AddEventHandler(c.eventHandler())

c.cachesToSync = append(c.cachesToSync, operatorClient.Informer().HasSynced)

return c
func NewClusterOperatorLoggingController(operatorClient operatorv1helpers.OperatorClient, recorder events.Recorder) factory.Controller {
c := &LogLevelController{operatorClient: operatorClient}
return factory.New().WithInformers(operatorClient.Informer()).WithSync(c.sync).ToController("LoggingSyncer", recorder)
}

// sync reacts to a change in prereqs by finding information that is required to match another value in the cluster. This
// must be information that is logically "owned" by another component.
func (c LogLevelController) sync() error {
func (c LogLevelController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
detailedSpec, _, _, err := c.operatorClient.GetOperatorState()
if err != nil {
return err
Expand All @@ -68,59 +42,10 @@ func (c LogLevelController) sync() error {

// Set the new loglevel if the operator spec changed
if err := SetVerbosityValue(desiredLogLevel); err != nil {
c.eventRecorder.Warningf("OperatorLoglevelChangeFailed", "Unable to change operator log level from %q to %q: %v", currentLogLevel, desiredLogLevel, err)
syncCtx.Recorder().Warningf("OperatorLoglevelChangeFailed", "Unable to change operator log level from %q to %q: %v", currentLogLevel, desiredLogLevel, err)
return err
}

c.eventRecorder.Eventf("OperatorLoglevelChange", "Operator log level changed from %q to %q", currentLogLevel, desiredLogLevel)
syncCtx.Recorder().Eventf("OperatorLoglevelChange", "Operator log level changed from %q to %q", currentLogLevel, desiredLogLevel)
return nil
}

func (c *LogLevelController) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()

klog.Infof("Starting LogLevelController")
defer klog.Infof("Shutting down LogLevelController")
if !cache.WaitForCacheSync(ctx.Done(), c.cachesToSync...) {
return
}

// doesn't matter what workers say, only start one.
go wait.UntilWithContext(ctx, c.runWorker, time.Second)

<-ctx.Done()
}

func (c *LogLevelController) runWorker(ctx context.Context) {
for c.processNextWorkItem() {
}
}

func (c *LogLevelController) processNextWorkItem() bool {
dsKey, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(dsKey)

err := c.sync()
if err == nil {
c.queue.Forget(dsKey)
return true
}

utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err))
c.queue.AddRateLimited(dsKey)

return true
}

// eventHandler queues the operator to check spec and loglevel
func (c *LogLevelController) eventHandler() cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { c.queue.Add(workQueueKey) },
UpdateFunc: func(old, new interface{}) { c.queue.Add(workQueueKey) },
DeleteFunc: func(obj interface{}) { c.queue.Add(workQueueKey) },
}
}
8 changes: 5 additions & 3 deletions pkg/operator/loglevel/logging_controller_test.go
@@ -1,10 +1,12 @@
package loglevel

import (
"context"
"testing"

operatorv1 "github.com/openshift/api/operator/v1"

"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
"github.com/openshift/library-go/pkg/operator/v1helpers"
)
Expand All @@ -30,7 +32,7 @@ func TestClusterOperatorLoggingController(t *testing.T) {

// no-op, desired == current
// When OperatorLogLevel is "" we assume the loglevel is Normal.
if err := controller.sync(); err != nil {
if err := controller.Sync(context.TODO(), factory.NewSyncContext("LoggingController", recorder)); err != nil {
t.Fatal(err)
}

Expand All @@ -40,7 +42,7 @@ func TestClusterOperatorLoggingController(t *testing.T) {

// change the log level to trace should 1 emit event
operatorSpec.OperatorLogLevel = operatorv1.Trace
if err := controller.sync(); err != nil {
if err := controller.Sync(context.TODO(), factory.NewSyncContext("LoggingController", recorder)); err != nil {
t.Fatal(err)
}

Expand All @@ -54,7 +56,7 @@ func TestClusterOperatorLoggingController(t *testing.T) {
}

// next sync should not produce any extra event
if err := controller.sync(); err != nil {
if err := controller.Sync(context.TODO(), factory.NewSyncContext("LoggingController", recorder)); err != nil {
t.Fatal(err)
}

Expand Down
103 changes: 18 additions & 85 deletions pkg/operator/revisioncontroller/revision_controller.go
Expand Up @@ -5,23 +5,18 @@ import (
"fmt"
"strconv"
"strings"
"time"

"k8s.io/klog"

operatorv1 "github.com/openshift/api/operator/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"

operatorv1 "github.com/openshift/api/operator/v1"

"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/condition"
"github.com/openshift/library-go/pkg/operator/events"
"github.com/openshift/library-go/pkg/operator/management"
Expand Down Expand Up @@ -55,10 +50,6 @@ type RevisionController struct {
operatorClient LatestRevisionClient
configMapGetter corev1client.ConfigMapsGetter
secretGetter corev1client.SecretsGetter

cachesToSync []cache.InformerSynced
queue workqueue.RateLimitingInterface
eventRecorder events.Recorder
}

type RevisionResource struct {
Expand All @@ -76,7 +67,7 @@ func NewRevisionController(
configMapGetter corev1client.ConfigMapsGetter,
secretGetter corev1client.SecretsGetter,
eventRecorder events.Recorder,
) *RevisionController {
) factory.Controller {
c := &RevisionController{
targetNamespace: targetNamespace,
configMaps: configMaps,
Expand All @@ -85,25 +76,17 @@ func NewRevisionController(
operatorClient: operatorClient,
configMapGetter: configMapGetter,
secretGetter: secretGetter,
eventRecorder: eventRecorder.WithComponentSuffix("revision-controller"),

queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "RevisionController"),
}

operatorClient.Informer().AddEventHandler(c.eventHandler())
kubeInformersForTargetNamespace.Core().V1().ConfigMaps().Informer().AddEventHandler(c.eventHandler())
kubeInformersForTargetNamespace.Core().V1().Secrets().Informer().AddEventHandler(c.eventHandler())

c.cachesToSync = append(c.cachesToSync, operatorClient.Informer().HasSynced)
c.cachesToSync = append(c.cachesToSync, kubeInformersForTargetNamespace.Core().V1().ConfigMaps().Informer().HasSynced)
c.cachesToSync = append(c.cachesToSync, kubeInformersForTargetNamespace.Core().V1().Secrets().Informer().HasSynced)

return c
return factory.New().WithInformers(
operatorClient.Informer(),
kubeInformersForTargetNamespace.Core().V1().ConfigMaps().Informer(),
kubeInformersForTargetNamespace.Core().V1().Secrets().Informer()).WithSync(c.sync).ToController("RevisionController", eventRecorder)
}

// createRevisionIfNeeded takes care of creating content for the static pods to use.
// returns whether or not requeue and if an error happened when updating status. Normally it updates status itself.
func (c RevisionController) createRevisionIfNeeded(latestAvailableRevision int32, resourceVersion string) (bool, error) {
func (c RevisionController) createRevisionIfNeeded(recorder events.Recorder, latestAvailableRevision int32, resourceVersion string) (bool, error) {
isLatestRevisionCurrent, reason := c.isLatestRevisionCurrent(latestAvailableRevision)

// check to make sure that the latestRevision has the exact content we expect. No mutation here, so we start creating the next Revision only when it is required
Expand All @@ -112,16 +95,16 @@ func (c RevisionController) createRevisionIfNeeded(latestAvailableRevision int32
}

nextRevision := latestAvailableRevision + 1
c.eventRecorder.Eventf("RevisionTriggered", "new revision %d triggered by %q", nextRevision, reason)
if err := c.createNewRevision(nextRevision); err != nil {
recorder.Eventf("RevisionTriggered", "new revision %d triggered by %q", nextRevision, reason)
if err := c.createNewRevision(recorder, nextRevision); err != nil {
cond := operatorv1.OperatorCondition{
Type: "RevisionControllerDegraded",
Status: operatorv1.ConditionTrue,
Reason: "ContentCreationError",
Message: err.Error(),
}
if _, _, updateError := v1helpers.UpdateStatus(c.operatorClient, v1helpers.UpdateConditionFn(cond)); updateError != nil {
c.eventRecorder.Warningf("RevisionCreateFailed", "Failed to create revision %d: %v", nextRevision, err.Error())
recorder.Warningf("RevisionCreateFailed", "Failed to create revision %d: %v", nextRevision, err.Error())
return true, updateError
}
return true, nil
Expand All @@ -134,7 +117,7 @@ func (c RevisionController) createRevisionIfNeeded(latestAvailableRevision int32
if _, updated, updateError := c.operatorClient.UpdateLatestRevisionOperatorStatus(nextRevision, v1helpers.UpdateConditionFn(cond)); updateError != nil {
return true, updateError
} else if updated {
c.eventRecorder.Eventf("RevisionCreate", "Revision %d created because %s", latestAvailableRevision, reason)
recorder.Eventf("RevisionCreate", "Revision %d created because %s", latestAvailableRevision, reason)
}

return false, nil
Expand Down Expand Up @@ -207,7 +190,7 @@ func (c RevisionController) isLatestRevisionCurrent(revision int32) (bool, strin
return true, ""
}

func (c RevisionController) createNewRevision(revision int32) error {
func (c RevisionController) createNewRevision(recorder events.Recorder, revision int32) error {
statusConfigMap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: c.targetNamespace,
Expand All @@ -218,7 +201,7 @@ func (c RevisionController) createNewRevision(revision int32) error {
"revision": fmt.Sprintf("%d", revision),
},
}
statusConfigMap, _, err := resourceapply.ApplyConfigMap(c.configMapGetter, c.eventRecorder, statusConfigMap)
statusConfigMap, _, err := resourceapply.ApplyConfigMap(c.configMapGetter, recorder, statusConfigMap)
if err != nil {
return err
}
Expand All @@ -230,7 +213,7 @@ func (c RevisionController) createNewRevision(revision int32) error {
}}

for _, cm := range c.configMaps {
obj, _, err := resourceapply.SyncConfigMap(c.configMapGetter, c.eventRecorder, c.targetNamespace, cm.Name, c.targetNamespace, nameFor(cm.Name, revision), ownerRefs)
obj, _, err := resourceapply.SyncConfigMap(c.configMapGetter, recorder, c.targetNamespace, cm.Name, c.targetNamespace, nameFor(cm.Name, revision), ownerRefs)
if err != nil {
return err
}
Expand All @@ -239,7 +222,7 @@ func (c RevisionController) createNewRevision(revision int32) error {
}
}
for _, s := range c.secrets {
obj, _, err := resourceapply.SyncSecret(c.secretGetter, c.eventRecorder, c.targetNamespace, s.Name, c.targetNamespace, nameFor(s.Name, revision), ownerRefs)
obj, _, err := resourceapply.SyncSecret(c.secretGetter, recorder, c.targetNamespace, s.Name, c.targetNamespace, nameFor(s.Name, revision), ownerRefs)
if err != nil {
return err
}
Expand Down Expand Up @@ -277,7 +260,7 @@ func (c RevisionController) getLatestAvailableRevision(operatorStatus *operatorv
return latestRevision, nil
}

func (c RevisionController) sync() error {
func (c RevisionController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
operatorSpec, originalOperatorStatus, latestAvailableRevision, resourceVersion, err := c.operatorClient.GetLatestRevisionState()
if err != nil {
return err
Expand All @@ -304,7 +287,7 @@ func (c RevisionController) sync() error {
}
}

requeue, syncErr := c.createRevisionIfNeeded(latestAvailableRevision, resourceVersion)
requeue, syncErr := c.createRevisionIfNeeded(syncCtx.Recorder(), latestAvailableRevision, resourceVersion)
if requeue && syncErr == nil {
return fmt.Errorf("synthetic requeue request (err: %v)", syncErr)
}
Expand All @@ -328,53 +311,3 @@ func (c RevisionController) sync() error {

return err
}

// Run starts the kube-apiserver and blocks until stopCh is closed.
func (c *RevisionController) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()

klog.Infof("Starting RevisionController")
defer klog.Infof("Shutting down RevisionController")
if !cache.WaitForCacheSync(ctx.Done(), c.cachesToSync...) {
return
}

// doesn't matter what workers say, only start one.
go wait.UntilWithContext(ctx, c.runWorker, time.Second)

<-ctx.Done()
}

func (c *RevisionController) runWorker(ctx context.Context) {
for c.processNextWorkItem() {
}
}

func (c *RevisionController) processNextWorkItem() bool {
dsKey, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(dsKey)

err := c.sync()
if err == nil {
c.queue.Forget(dsKey)
return true
}

utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err))
c.queue.AddRateLimited(dsKey)

return true
}

// eventHandler queues the operator to check spec and status
func (c *RevisionController) eventHandler() cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { c.queue.Add(revisionControllerWorkQueueKey) },
UpdateFunc: func(old, new interface{}) { c.queue.Add(revisionControllerWorkQueueKey) },
DeleteFunc: func(obj interface{}) { c.queue.Add(revisionControllerWorkQueueKey) },
}
}
4 changes: 3 additions & 1 deletion pkg/operator/revisioncontroller/revision_controller_test.go
@@ -1,10 +1,12 @@
package revisioncontroller

import (
"context"
"strings"
"testing"
"time"

"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/v1helpers"

v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -448,7 +450,7 @@ func TestRevisionController(t *testing.T) {
kubeClient.CoreV1(),
eventRecorder,
)
syncErr := c.sync()
syncErr := c.Sync(context.TODO(), factory.NewSyncContext("RevisionController", eventRecorder))
if tc.validateStatus != nil {
_, status, _, _ := tc.staticPodOperatorClient.GetStaticPodOperatorState()
tc.validateStatus(t, status)
Expand Down

0 comments on commit 3816ea7

Please sign in to comment.