Skip to content

Commit

Permalink
encryption: move controllers to use context
Browse files Browse the repository at this point in the history
  • Loading branch information
mfojtik committed Mar 24, 2020
1 parent d429589 commit 0b9944e
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 20 deletions.
10 changes: 6 additions & 4 deletions pkg/operator/encryption/controllers.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package encryption

import (
"context"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
Expand All @@ -19,7 +21,7 @@ import (
)

type runner interface {
Run(stopCh <-chan struct{})
Run(ctx context.Context, workers int)
}

func NewControllers(
Expand Down Expand Up @@ -101,10 +103,10 @@ type Controllers struct {
controllers []runner
}

func (c *Controllers) Run(stopCh <-chan struct{}) {
func (c *Controllers) Run(ctx context.Context, workers int) {
for _, controller := range c.controllers {
con := controller // capture range variable
go con.Run(stopCh)
go con.Run(ctx, workers)
}
<-stopCh
<-ctx.Done()
}
8 changes: 4 additions & 4 deletions pkg/operator/encryption/controllers/key_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,21 +346,21 @@ func needsNewKey(grKeys state.GroupResourceState, currentMode state.Mode, extern
return latestKeyID, "rotation-interval-has-passed", time.Since(latestKey.Migrated.Timestamp) > encryptionSecretMigrationInterval
}

func (c *keyController) Run(stopCh <-chan struct{}) {
func (c *keyController) Run(ctx context.Context, _ int) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()

klog.Infof("Starting EncryptionKeyController")
defer klog.Infof("Shutting down EncryptionKeyController")
if !cache.WaitForCacheSync(stopCh, c.preRunCachesSynced...) {
if !cache.WaitForCacheSync(ctx.Done(), c.preRunCachesSynced...) {
utilruntime.HandleError(fmt.Errorf("caches did not sync"))
return
}

// only start one worker
go wait.Until(c.runWorker, time.Second, stopCh)
go wait.Until(c.runWorker, time.Second, ctx.Done())

<-stopCh
<-ctx.Done()
}

func (c *keyController) runWorker() {
Expand Down
8 changes: 4 additions & 4 deletions pkg/operator/encryption/controllers/migration_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,21 +310,21 @@ func setResourceMigrated(gr schema.GroupResource, s *corev1.Secret) (bool, error
return true, nil
}

func (c *migrationController) Run(stopCh <-chan struct{}) {
func (c *migrationController) Run(ctx context.Context, _ int) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()

klog.Infof("Starting EncryptionMigrationController")
defer klog.Infof("Shutting down EncryptionMigrationController")
if !cache.WaitForCacheSync(stopCh, c.preRunCachesSynced...) {
if !cache.WaitForCacheSync(ctx.Done(), c.preRunCachesSynced...) {
utilruntime.HandleError(fmt.Errorf("caches did not sync"))
return
}

// only start one worker
go wait.Until(c.runWorker, time.Second, stopCh)
go wait.Until(c.runWorker, time.Second, ctx.Done())

<-stopCh
<-ctx.Done()
}

func (c *migrationController) runWorker() {
Expand Down
8 changes: 4 additions & 4 deletions pkg/operator/encryption/controllers/prune_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,21 +187,21 @@ NextEncryptionSecret:
return utilerrors.FilterOut(utilerrors.NewAggregate(deleteErrs), errors.IsNotFound)
}

func (c *pruneController) Run(stopCh <-chan struct{}) {
func (c *pruneController) Run(ctx context.Context, _ int) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()

klog.Infof("Starting EncryptionPruneController")
defer klog.Infof("Shutting down EncryptionPruneController")
if !cache.WaitForCacheSync(stopCh, c.preRunCachesSynced...) {
if !cache.WaitForCacheSync(ctx.Done(), c.preRunCachesSynced...) {
utilruntime.HandleError(fmt.Errorf("caches did not sync"))
return
}

// only start one worker
go wait.Until(c.runWorker, time.Second, stopCh)
go wait.Until(c.runWorker, time.Second, ctx.Done())

<-stopCh
<-ctx.Done()
}

func (c *pruneController) runWorker() {
Expand Down
9 changes: 5 additions & 4 deletions pkg/operator/encryption/controllers/state_controller.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package controllers

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -153,21 +154,21 @@ func (c *stateController) applyEncryptionConfigSecret(encryptionConfig *apiserve
return changed, applyErr
}

func (c *stateController) Run(stopCh <-chan struct{}) {
func (c *stateController) Run(ctx context.Context, _ int) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()

klog.Infof("Starting EncryptionStateController")
defer klog.Infof("Shutting down EncryptionStateController")
if !cache.WaitForCacheSync(stopCh, c.preRunCachesSynced...) {
if !cache.WaitForCacheSync(ctx.Done(), c.preRunCachesSynced...) {
utilruntime.HandleError(fmt.Errorf("caches did not sync for EncryptionStateController"))
return
}

// only start one worker
go wait.Until(c.runWorker, time.Second, stopCh)
go wait.Until(c.runWorker, time.Second, ctx.Done())

<-stopCh
<-ctx.Done()
}

func (c *stateController) runWorker() {
Expand Down

0 comments on commit 0b9944e

Please sign in to comment.