Skip to content

Commit

Permalink
operator: adjust controllers to controller factory
Browse files Browse the repository at this point in the history
  • Loading branch information
mfojtik committed Mar 6, 2020
1 parent bb89c2b commit 9b68628
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 377 deletions.
120 changes: 23 additions & 97 deletions pkg/operator/boundsatokensignercontroller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,22 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
errorsutil "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/keyutil"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"

"github.com/openshift/cluster-kube-apiserver-operator/pkg/operator/operatorclient"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
"github.com/openshift/library-go/pkg/operator/resource/resourceapply"
"github.com/openshift/library-go/pkg/operator/v1helpers"

"github.com/openshift/cluster-kube-apiserver-operator/pkg/operator/operatorclient"
)

const (
workQueueKey = "key"

operatorNamespace = operatorclient.OperatorNamespace
targetNamespace = operatorclient.TargetNamespace

Expand All @@ -53,65 +51,50 @@ type BoundSATokenSignerController struct {
operatorClient v1helpers.StaticPodOperatorClient
secretClient corev1client.SecretsGetter
configMapClient corev1client.ConfigMapsGetter
eventRecorder events.Recorder

cachesSynced []cache.InformerSynced

// queue only ever has one item, but it has nice error handling backoff/retry semantics
queue workqueue.RateLimitingInterface
}

func NewBoundSATokenSignerController(
operatorClient v1helpers.StaticPodOperatorClient,
kubeInformersForNamespaces v1helpers.KubeInformersForNamespaces,
kubeClient kubernetes.Interface,
eventRecorder events.Recorder,
) *BoundSATokenSignerController {
) factory.Controller {

ret := &BoundSATokenSignerController{
operatorClient: operatorClient,
secretClient: v1helpers.CachedSecretGetter(kubeClient.CoreV1(), kubeInformersForNamespaces),
configMapClient: v1helpers.CachedConfigMapGetter(kubeClient.CoreV1(), kubeInformersForNamespaces),
eventRecorder: eventRecorder.WithComponentSuffix("bound-sa-token-signer-controller"),

cachesSynced: []cache.InformerSynced{
kubeInformersForNamespaces.InformersFor(operatorNamespace).Core().V1().Secrets().Informer().HasSynced,
kubeInformersForNamespaces.InformersFor(targetNamespace).Core().V1().Secrets().Informer().HasSynced,
kubeInformersForNamespaces.InformersFor(targetNamespace).Core().V1().ConfigMaps().Informer().HasSynced,
operatorClient.Informer().HasSynced,
},

queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "BoundSATokenSignerController"),
}

kubeInformersForNamespaces.InformersFor(operatorNamespace).Core().V1().Secrets().Informer().AddEventHandler(ret.eventHandler())
kubeInformersForNamespaces.InformersFor(targetNamespace).Core().V1().Secrets().Informer().AddEventHandler(ret.eventHandler())
kubeInformersForNamespaces.InformersFor(targetNamespace).Core().V1().ConfigMaps().Informer().AddEventHandler(ret.eventHandler())

return ret
return factory.New().WithInformers(
kubeInformersForNamespaces.InformersFor(operatorNamespace).Core().V1().Secrets().Informer(),
kubeInformersForNamespaces.InformersFor(targetNamespace).Core().V1().Secrets().Informer(),
kubeInformersForNamespaces.InformersFor(targetNamespace).Core().V1().ConfigMaps().Informer(),
operatorClient.Informer(),
).ResyncEvery(time.Minute).WithSync(ret.sync).ToController("BoundSATokenSignerController", eventRecorder)
}

func (c *BoundSATokenSignerController) sync() bool {
success := true
syncMethods := []func() error{
func (c *BoundSATokenSignerController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
syncMethods := []func(ctx context.Context, syncCtx factory.SyncContext) error{
c.ensureNextOperatorSigningSecret,
c.ensurePublicKeyConfigMap,
c.ensureOperandSigningSecret,
}
errs := []error{}
for _, syncMethod := range syncMethods {
err := syncMethod()
err := syncMethod(ctx, syncCtx)
if err != nil {
utilruntime.HandleError(err)
success = false
errs = append(errs, err)
}
}
return success
return errorsutil.NewAggregate(errs)
}

// ensureNextOperatorSigningSecret ensures the existence of a secret in the operator
// namespace containing an RSA keypair used for signing and validating bound service
// account tokens.
func (c *BoundSATokenSignerController) ensureNextOperatorSigningSecret() error {
func (c *BoundSATokenSignerController) ensureNextOperatorSigningSecret(ctx context.Context, syncCtx factory.SyncContext) error {
// Attempt to retrieve the operator secret
secret, err := c.secretClient.Secrets(operatorNamespace).Get(NextSigningKeySecretName, metav1.GetOptions{})
if err != nil && !errors.IsNotFound(err) {
Expand All @@ -127,7 +110,7 @@ func (c *BoundSATokenSignerController) ensureNextOperatorSigningSecret() error {
return err
}

secret, _, err = resourceapply.ApplySecret(c.secretClient, c.eventRecorder, newSecret)
secret, _, err = resourceapply.ApplySecret(c.secretClient, syncCtx.Recorder(), newSecret)
if err != nil {
return err
}
Expand All @@ -140,7 +123,7 @@ func (c *BoundSATokenSignerController) ensureNextOperatorSigningSecret() error {
// present in the operand configmap. If the configmap is missing, it will be created
// with the current public key. If the configmap exists but does not contain the
// current public key, the key will be added.
func (c *BoundSATokenSignerController) ensurePublicKeyConfigMap() error {
func (c *BoundSATokenSignerController) ensurePublicKeyConfigMap(ctx context.Context, syncCtx factory.SyncContext) error {
// Retrieve the operator secret that contains the current public key
operatorSecret, err := c.secretClient.Secrets(operatorNamespace).Get(NextSigningKeySecretName, metav1.GetOptions{})
if err != nil {
Expand Down Expand Up @@ -191,7 +174,7 @@ func (c *BoundSATokenSignerController) ensurePublicKeyConfigMap() error {

// Ensure the configmap is updated with the current public key
configMap.Data[nextKeyKey] = currPublicKey
configMap, _, err = resourceapply.ApplyConfigMap(c.configMapClient, c.eventRecorder, configMap)
configMap, _, err = resourceapply.ApplyConfigMap(c.configMapClient, syncCtx.Recorder(), configMap)
if err != nil {
return err
}
Expand All @@ -205,7 +188,7 @@ func (c *BoundSATokenSignerController) ensurePublicKeyConfigMap() error {
// operand secret already exists, it will only be updated once the associated public
// key has been synced to all master nodes to ensure that issued tokens can be
// verified by all apiservers.
func (c *BoundSATokenSignerController) ensureOperandSigningSecret() error {
func (c *BoundSATokenSignerController) ensureOperandSigningSecret(ctx context.Context, syncCtx factory.SyncContext) error {
// Retrieve the operator signing secret
operatorSecret, err := c.secretClient.Secrets(operatorNamespace).Get(NextSigningKeySecretName, metav1.GetOptions{})
if err != nil {
Expand Down Expand Up @@ -262,7 +245,7 @@ func (c *BoundSATokenSignerController) ensureOperandSigningSecret() error {
if !syncAllowed {
return nil
}
_, _, err = resourceapply.SyncSecret(c.secretClient, c.eventRecorder,
_, _, err = resourceapply.SyncSecret(c.secretClient, syncCtx.Recorder(),
operatorNamespace, NextSigningKeySecretName,
targetNamespace, SigningKeySecretName, []metav1.OwnerReference{})
return err
Expand Down Expand Up @@ -305,63 +288,6 @@ func (c *BoundSATokenSignerController) publicKeySyncedToAllNodes(publicKey strin
return true, nil
}

func (c *BoundSATokenSignerController) Run(ctx context.Context) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()

klog.Infof("Starting BoundSATokenSignerController")
defer klog.Infof("Shutting down BoundSATokenSignerController")

if !cache.WaitForCacheSync(ctx.Done(), c.cachesSynced...) {
utilruntime.HandleError(fmt.Errorf("caches did not sync"))
return
}

stopCh := ctx.Done()

// Run only a single worker
go wait.Until(c.runWorker, time.Second, stopCh)

// start a time based thread to ensure we stay up to date
go wait.Until(func() {
c.queue.Add(workQueueKey)
}, time.Minute, stopCh)

<-stopCh
}

func (c *BoundSATokenSignerController) runWorker() {
for c.processNextWorkItem() {
}
}

func (c *BoundSATokenSignerController) processNextWorkItem() bool {
dsKey, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(dsKey)

success := c.sync()
if success {
c.queue.Forget(dsKey)
return true
}

c.queue.AddRateLimited(dsKey)

return true
}

// eventHandler queues the operator to check spec and status
func (c *BoundSATokenSignerController) eventHandler() cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { c.queue.Add(workQueueKey) },
UpdateFunc: func(old, new interface{}) { c.queue.Add(workQueueKey) },
DeleteFunc: func(obj interface{}) { c.queue.Add(workQueueKey) },
}
}

// newNextSigningSecret creates a new secret populated with a new keypair.
func newNextSigningSecret() (*corev1.Secret, error) {
rsaKey, err := rsa.GenerateKey(rand.Reader, keySize)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
package certrotationtimeupgradeablecontroller

import (
"context"
"fmt"
"time"

operatorv1 "github.com/openshift/api/operator/v1"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
"github.com/openshift/library-go/pkg/operator/v1helpers"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
coreinformersv1 "k8s.io/client-go/informers/core/v1"
corelistersv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
)

var (
Expand All @@ -26,34 +23,25 @@ var (
type CertRotationTimeUpgradeableController struct {
operatorClient v1helpers.OperatorClient
configMapLister corelistersv1.ConfigMapLister

cachesToSync []cache.InformerSynced
queue workqueue.RateLimitingInterface
eventRecorder events.Recorder
}

func NewCertRotationTimeUpgradeableController(
operatorClient v1helpers.OperatorClient,
configMapInformer coreinformersv1.ConfigMapInformer,
eventRecorder events.Recorder,
) *CertRotationTimeUpgradeableController {
) factory.Controller {
c := &CertRotationTimeUpgradeableController{

operatorClient: operatorClient,
configMapLister: configMapInformer.Lister(),
eventRecorder: eventRecorder.WithComponentSuffix("certRotationTime-upgradeable"),

queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "CertRotationTimeUpgradeableController"),
}

operatorClient.Informer().AddEventHandler(c.eventHandler())
configMapInformer.Informer().AddEventHandler(c.eventHandler())

c.cachesToSync = append(c.cachesToSync, operatorClient.Informer().HasSynced, configMapInformer.Informer().HasSynced)
return c
return factory.New().WithInformers(
operatorClient.Informer(),
configMapInformer.Informer(),
).WithSync(c.sync).ResyncEvery(time.Second).ToController("CertRotationTimeUpgradeableController", eventRecorder.WithComponentSuffix("certRotationTime-upgradeable"))
}

func (c *CertRotationTimeUpgradeableController) sync() error {
func (c *CertRotationTimeUpgradeableController) sync(ctx context.Context, syncContext factory.SyncContext) error {
certRotationTimeConfigMap, err := c.configMapLister.ConfigMaps("openshift-config").Get("unsupported-cert-rotation-config")
if !errors.IsNotFound(err) && err != nil {
return err
Expand Down Expand Up @@ -84,53 +72,3 @@ func newUpgradeableCondition(certRotationTimeConfigMap *corev1.ConfigMap) operat
}

}

// Run starts the kube-apiserver and blocks until stopCh is closed.
func (c *CertRotationTimeUpgradeableController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()

klog.Infof("Starting CertRotationTimeUpgradeableController")
defer klog.Infof("Shutting down CertRotationTimeUpgradeableController")
if !cache.WaitForCacheSync(stopCh, c.cachesToSync...) {
return
}

// doesn't matter what workers say, only start one.
go wait.Until(c.runWorker, time.Second, stopCh)

<-stopCh
}

func (c *CertRotationTimeUpgradeableController) runWorker() {
for c.processNextWorkItem() {
}
}

func (c *CertRotationTimeUpgradeableController) processNextWorkItem() bool {
dsKey, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(dsKey)

err := c.sync()
if err == nil {
c.queue.Forget(dsKey)
return true
}

utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err))
c.queue.AddRateLimited(dsKey)

return true
}

// eventHandler queues the operator to check spec and status
func (c *CertRotationTimeUpgradeableController) eventHandler() cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { c.queue.Add(certRotationTimeUpgradeableControllerWorkQueueKey) },
UpdateFunc: func(old, new interface{}) { c.queue.Add(certRotationTimeUpgradeableControllerWorkQueueKey) },
DeleteFunc: func(obj interface{}) { c.queue.Add(certRotationTimeUpgradeableControllerWorkQueueKey) },
}
}

0 comments on commit 9b68628

Please sign in to comment.