Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move the remaining controllers to shared informers #45933

Merged
merged 10 commits into from
May 21, 2017
10 changes: 8 additions & 2 deletions cmd/kube-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,14 +397,20 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root
rootCA = rootClientBuilder.ConfigOrDie("tokens-controller").CAData
}

go serviceaccountcontroller.NewTokensController(
controller := serviceaccountcontroller.NewTokensController(
sharedInformers.Core().V1().ServiceAccounts(),
sharedInformers.Core().V1().Secrets(),
rootClientBuilder.ClientOrDie("tokens-controller"),
serviceaccountcontroller.TokensControllerOptions{
TokenGenerator: serviceaccount.JWTTokenGenerator(privateKey),
RootCA: rootCA,
},
).Run(int(s.ConcurrentSATokenSyncs), stop)
)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
go controller.Run(int(s.ConcurrentSATokenSyncs), stop)

// start the first set of informers now so that other controllers can start
sharedInformers.Start(stop)
}

} else {
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
Expand All @@ -54,7 +55,6 @@ go_library(
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/client-go/util/clock:go_default_library",
"//vendor/k8s.io/client-go/util/integer:go_default_library",
],
)
Expand All @@ -80,13 +80,13 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//vendor/k8s.io/client-go/pkg/apis/extensions/v1beta1:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/client-go/util/clock:go_default_library",
"//vendor/k8s.io/client-go/util/testing:go_default_library",
],
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/controller_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ import (
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"

"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/clock"
"k8s.io/client-go/util/integer"

"k8s.io/kubernetes/pkg/api"
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/controller_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ import (
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/clock"
utiltesting "k8s.io/client-go/util/testing"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/garbagecollector/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
Expand All @@ -46,7 +47,6 @@ go_library(
"//vendor/k8s.io/client-go/dynamic:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/util/clock:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
],
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/garbagecollector/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"sync"
"time"

"k8s.io/client-go/util/clock"
"k8s.io/apimachinery/pkg/util/clock"

"github.com/prometheus/client_golang/prometheus"
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/node/testutil/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/pkg/api/v1:go_default_library",
"//vendor/k8s.io/client-go/pkg/api/v1/ref:go_default_library",
"//vendor/k8s.io/client-go/util/clock:go_default_library",
],
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/node/testutil/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ import (
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/watch"

"k8s.io/apimachinery/pkg/util/clock"
clientv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/pkg/api/v1/ref"
"k8s.io/client-go/util/clock"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
Expand Down
4 changes: 0 additions & 4 deletions pkg/controller/serviceaccount/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ go_library(
],
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/informers/informers_generated/externalversions/core/v1:go_default_library",
Expand All @@ -34,14 +33,11 @@ go_library(
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//vendor/k8s.io/apiserver/pkg/registry/generic:go_default_library",
"//vendor/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
Expand Down
107 changes: 46 additions & 61 deletions pkg/controller/serviceaccount/tokens_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,19 @@ import (
"github.com/golang/glog"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1"
listersv1 "k8s.io/kubernetes/pkg/client/listers/core/v1"
clientretry "k8s.io/kubernetes/pkg/client/retry"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/registry/core/secret"
"k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/pkg/util/metrics"
Expand Down Expand Up @@ -71,7 +70,7 @@ type TokensControllerOptions struct {
}

// NewTokensController returns a new *TokensController.
func NewTokensController(cl clientset.Interface, options TokensControllerOptions) *TokensController {
func NewTokensController(serviceAccounts informers.ServiceAccountInformer, secrets informers.SecretInformer, cl clientset.Interface, options TokensControllerOptions) *TokensController {
maxRetries := options.MaxRetries
if maxRetries == 0 {
maxRetries = 10
Expand All @@ -91,44 +90,38 @@ func NewTokensController(cl clientset.Interface, options TokensControllerOptions
metrics.RegisterMetricAndTrackRateLimiterUsage("serviceaccount_controller", cl.Core().RESTClient().GetRateLimiter())
}

e.serviceAccounts, e.serviceAccountController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return e.client.Core().ServiceAccounts(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return e.client.Core().ServiceAccounts(metav1.NamespaceAll).Watch(options)
},
},
&v1.ServiceAccount{},
options.ServiceAccountResync,
e.serviceAccounts = serviceAccounts.Lister()
e.serviceAccountSynced = serviceAccounts.Informer().HasSynced
serviceAccounts.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: e.queueServiceAccountSync,
UpdateFunc: e.queueServiceAccountUpdateSync,
DeleteFunc: e.queueServiceAccountSync,
},
options.ServiceAccountResync,
)

tokenSelector := fields.SelectorFromSet(map[string]string{api.SecretTypeField: string(v1.SecretTypeServiceAccountToken)})
e.secrets, e.secretController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = tokenSelector.String()
return e.client.Core().Secrets(metav1.NamespaceAll).List(options)
secretCache := secrets.Informer().GetIndexer()
e.updatedSecrets = cache.NewIntegerResourceVersionMutationCache(secretCache, secretCache, 60*time.Second, true)
e.secretSynced = secrets.Informer().HasSynced
secrets.Informer().AddEventHandlerWithResyncPeriod(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1.Secret:
return t.Type == v1.SecretTypeServiceAccountToken
default:
utilruntime.HandleError(fmt.Errorf("object passed to %T that is not expected: %T", e, obj))
return false
}
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = tokenSelector.String()
return e.client.Core().Secrets(metav1.NamespaceAll).Watch(options)
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: e.queueSecretSync,
UpdateFunc: e.queueSecretUpdateSync,
DeleteFunc: e.queueSecretSync,
},
},
&v1.Secret{},
options.SecretResync,
cache.ResourceEventHandlerFuncs{
AddFunc: e.queueSecretSync,
UpdateFunc: e.queueSecretUpdateSync,
DeleteFunc: e.queueSecretSync,
},
cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc},
)

return e
Expand All @@ -141,12 +134,15 @@ type TokensController struct {

rootCA []byte

serviceAccounts cache.Store
secrets cache.Indexer
serviceAccounts listersv1.ServiceAccountLister
// updatedSecrets is a wrapper around the shared cache which allows us to record
// and return our local mutations (since we're very likely to act on an updated
// secret before the watch reports it).
updatedSecrets cache.MutationCache

// Since we join two objects, we'll watch both of them with controllers.
serviceAccountController cache.Controller
secretController cache.Controller
serviceAccountSynced cache.InformerSynced
secretSynced cache.InformerSynced

// syncServiceAccountQueue handles service account events:
// * ensures a referenced token exists for service accounts which still exist
Expand All @@ -166,29 +162,22 @@ type TokensController struct {

// Runs controller blocks until stopCh is closed
func (e *TokensController) Run(workers int, stopCh <-chan struct{}) {
// Shut down queues
defer utilruntime.HandleCrash()
defer e.syncServiceAccountQueue.ShutDown()
defer e.syncSecretQueue.ShutDown()

// Start controllers (to fill stores, call informers, fill work queues)
go e.serviceAccountController.Run(stopCh)
go e.secretController.Run(stopCh)

// Wait for stores to fill
for !e.serviceAccountController.HasSynced() || !e.secretController.HasSynced() {
time.Sleep(100 * time.Millisecond)
if !controller.WaitForCacheSync("tokens", stopCh, e.serviceAccountSynced, e.secretSynced) {
return
}

// Spawn workers to process work queues
glog.V(5).Infof("Starting workers")
for i := 0; i < workers; i++ {
go wait.Until(e.syncServiceAccount, 0, stopCh)
go wait.Until(e.syncSecret, 0, stopCh)
}

// Block until stop channel is closed
<-stopCh

// Shut down queues
e.syncServiceAccountQueue.ShutDown()
e.syncSecretQueue.ShutDown()
glog.V(1).Infof("Shutting down")
}

func (e *TokensController) queueServiceAccountSync(obj interface{}) {
Expand Down Expand Up @@ -423,7 +412,7 @@ func (e *TokensController) ensureReferencedToken(serviceAccount *v1.ServiceAccou
}
// Manually add the new token to the cache store.
// This prevents the service account update (below) triggering another token creation, if the referenced token couldn't be found in the store
e.secrets.Add(createdToken)
e.updatedSecrets.Mutation(createdToken)

// Try to add a reference to the newly created token to the service account
addedReference := false
Expand Down Expand Up @@ -626,15 +615,11 @@ func (e *TokensController) removeSecretReference(saNamespace string, saName stri

func (e *TokensController) getServiceAccount(ns string, name string, uid types.UID, fetchOnCacheMiss bool) (*v1.ServiceAccount, error) {
// Look up in cache
obj, exists, err := e.serviceAccounts.GetByKey(makeCacheKey(ns, name))
if err != nil {
sa, err := e.serviceAccounts.ServiceAccounts(ns).Get(name)
if err != nil && !apierrors.IsNotFound(err) {
return nil, err
}
if exists {
sa, ok := obj.(*v1.ServiceAccount)
if !ok {
return nil, fmt.Errorf("expected *v1.ServiceAccount, got %#v", sa)
}
if sa != nil {
// Ensure UID matches if given
if len(uid) == 0 || uid == sa.UID {
return sa, nil
Expand All @@ -646,7 +631,7 @@ func (e *TokensController) getServiceAccount(ns string, name string, uid types.U
}

// Live lookup
sa, err := e.client.Core().ServiceAccounts(ns).Get(name, metav1.GetOptions{})
sa, err = e.client.Core().ServiceAccounts(ns).Get(name, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
return nil, nil
}
Expand All @@ -662,7 +647,7 @@ func (e *TokensController) getServiceAccount(ns string, name string, uid types.U

func (e *TokensController) getSecret(ns string, name string, uid types.UID, fetchOnCacheMiss bool) (*v1.Secret, error) {
// Look up in cache
obj, exists, err := e.secrets.GetByKey(makeCacheKey(ns, name))
obj, exists, err := e.updatedSecrets.GetByKey(makeCacheKey(ns, name))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -699,7 +684,7 @@ func (e *TokensController) getSecret(ns string, name string, uid types.UID, fetc
// listTokenSecrets returns a list of all of the ServiceAccountToken secrets that
// reference the given service account's name and uid
func (e *TokensController) listTokenSecrets(serviceAccount *v1.ServiceAccount) ([]*v1.Secret, error) {
namespaceSecrets, err := e.secrets.ByIndex("namespace", serviceAccount.Namespace)
namespaceSecrets, err := e.updatedSecrets.ByIndex("namespace", serviceAccount.Namespace)
if err != nil {
return nil, err
}
Expand Down