Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

complete the controller context for init funcs #46783

Merged
merged 1 commit into from
Jun 4, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
281 changes: 89 additions & 192 deletions cmd/kube-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"k8s.io/apimachinery/pkg/util/wait"

"k8s.io/apiserver/pkg/server/healthz"
utilfeature "k8s.io/apiserver/pkg/util/feature"

"k8s.io/client-go/discovery"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
Expand All @@ -55,13 +54,7 @@ import (
"k8s.io/kubernetes/pkg/client/leaderelection/resourcelock"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
nodecontroller "k8s.io/kubernetes/pkg/controller/node"
routecontroller "k8s.io/kubernetes/pkg/controller/route"
servicecontroller "k8s.io/kubernetes/pkg/controller/service"
serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach"
persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/pkg/util/configz"

Expand Down Expand Up @@ -175,10 +168,19 @@ func Run(s *options.CMServer) error {
} else {
clientBuilder = rootClientBuilder
}
ctx, err := CreateControllerContext(s, rootClientBuilder, clientBuilder, stop)
if err != nil {
glog.Fatalf("error building controller context: %v", err)
}
saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController

err := StartControllers(NewControllerInitializers(), s, rootClientBuilder, clientBuilder, stop)
glog.Fatalf("error running controllers: %v", err)
panic("unreachable")
if err := StartControllers(ctx, saTokenControllerInitFunc, NewControllerInitializers()); err != nil {
glog.Fatalf("error starting controllers: %v", err)
}

ctx.InformerFactory.Start(ctx.Stop)

select {}
}

if !s.LeaderElection.LeaderElect {
Expand Down Expand Up @@ -231,6 +233,10 @@ type ControllerContext struct {
// AvailableResources is a map listing currently available resources
AvailableResources map[schema.GroupVersionResource]bool

// Cloud is the cloud provider interface for the controllers to use.
// It must be initialized and ready to use.
Cloud cloudprovider.Interface

// Stop is the stop channel
Stop <-chan struct{}
}
Expand Down Expand Up @@ -272,16 +278,14 @@ type InitFunc func(ctx ControllerContext) (bool, error)
func KnownControllers() []string {
ret := sets.StringKeySet(NewControllerInitializers())

// add "special" controllers that aren't initialized normally. These controllers cannot be initialized
// using a normal function. The only known special case is the SA token controller which *must* be started
// first to ensure that the SA tokens for future controllers will exist. Think very carefully before adding
// to this list.
ret.Insert(
saTokenControllerName,
nodeControllerName,
serviceControllerName,
routeControllerName,
pvBinderControllerName,
attachDetachControllerName,
)

// add "special" controllers that aren't initialized normally
return ret.List()
}

Expand All @@ -290,6 +294,10 @@ var ControllersDisabledByDefault = sets.NewString(
"tokencleaner",
)

const (
saTokenControllerName = "serviceaccount-token"
)

// NewControllerInitializers is a public map of named controller groups (you can start more than one in an init func)
// paired to their InitFunc. This allows for structured downstream composition and subdivision.
func NewControllerInitializers() map[string]InitFunc {
Expand All @@ -314,6 +322,11 @@ func NewControllerInitializers() map[string]InitFunc {
controllers["ttl"] = startTTLController
controllers["bootstrapsigner"] = startBootstrapSignerController
controllers["tokencleaner"] = startTokenCleanerController
controllers["service"] = startServiceController
controllers["node"] = startNodeController
controllers["route"] = startRouteController
controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
controllers["attachdetach"] = startAttachDetachController

return controllers
}
Expand Down Expand Up @@ -366,78 +379,49 @@ func GetAvailableResources(clientBuilder controller.ControllerClientBuilder) (ma
return allResources, nil
}

const (
saTokenControllerName = "serviceaccount-token"
nodeControllerName = "node"
serviceControllerName = "service"
routeControllerName = "route"
pvBinderControllerName = "persistentvolume-binder"
attachDetachControllerName = "attachdetach"
)

func StartControllers(controllers map[string]InitFunc, s *options.CMServer, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) error {
func CreateControllerContext(s *options.CMServer, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) (ControllerContext, error) {
versionedClient := rootClientBuilder.ClientOrDie("shared-informers")
sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)())

// always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest
if len(s.ServiceAccountKeyFile) > 0 && IsControllerEnabled(saTokenControllerName, ControllersDisabledByDefault, s.Controllers...) {
privateKey, err := serviceaccount.ReadPrivateKey(s.ServiceAccountKeyFile)
if err != nil {
return fmt.Errorf("error reading key for service account token controller: %v", err)
} else {
var rootCA []byte
if s.RootCAFile != "" {
rootCA, err = ioutil.ReadFile(s.RootCAFile)
if err != nil {
return fmt.Errorf("error reading root-ca-file at %s: %v", s.RootCAFile, err)
}
if _, err := certutil.ParseCertsPEM(rootCA); err != nil {
return fmt.Errorf("error parsing root-ca-file at %s: %v", s.RootCAFile, err)
}
} else {
rootCA = rootClientBuilder.ConfigOrDie("tokens-controller").CAData
}

controller := serviceaccountcontroller.NewTokensController(
sharedInformers.Core().V1().ServiceAccounts(),
sharedInformers.Core().V1().Secrets(),
rootClientBuilder.ClientOrDie("tokens-controller"),
serviceaccountcontroller.TokensControllerOptions{
TokenGenerator: serviceaccount.JWTTokenGenerator(privateKey),
RootCA: rootCA,
},
)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
go controller.Run(int(s.ConcurrentSATokenSyncs), stop)

// start the first set of informers now so that other controllers can start
sharedInformers.Start(stop)
}

} else {
glog.Warningf("%q is disabled", saTokenControllerName)
availableResources, err := GetAvailableResources(rootClientBuilder)
if err != nil {
return ControllerContext{}, err
}

availableResources, err := GetAvailableResources(clientBuilder)
cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
if err != nil {
return err
return ControllerContext{}, fmt.Errorf("cloud provider could not be initialized: %v", err)
}
if cloud != nil {
// Initialize the cloud provider with a reference to the clientBuilder
cloud.Initialize(rootClientBuilder)
}

ctx := ControllerContext{
ClientBuilder: clientBuilder,
InformerFactory: sharedInformers,
Options: *s,
AvailableResources: availableResources,
Cloud: cloud,
Stop: stop,
}
return ctx, nil
}

func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc) error {
// Always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest
// If this fails, just return here and fail since other controllers won't be able to get credentials.
if _, err := startSATokenController(ctx); err != nil {
return err
}

for controllerName, initFn := range controllers {
if !ctx.IsControllerEnabled(controllerName) {
glog.Warningf("%q is disabled", controllerName)
continue
}

time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
time.Sleep(wait.Jitter(ctx.Options.ControllerStartInterval.Duration, ControllerStartJitter))

glog.V(1).Infof("Starting %q", controllerName)
started, err := initFn(ctx)
Expand All @@ -452,144 +436,57 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root
glog.Infof("Started %q", controllerName)
}

// all the remaining plugins want this cloud variable
cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
if err != nil {
return fmt.Errorf("cloud provider could not be initialized: %v", err)
}
return nil
}

if cloud != nil {
// Initialize the cloud provider with a reference to the clientBuilder
cloud.Initialize(clientBuilder)
}
// serviceAccountTokenControllerStarter is special because it must run first to set up permissions for other controllers.
// It cannot use the "normal" client builder, so it tracks its own. It must also avoid being included in the "normal"
// init map so that it can always run first.
type serviceAccountTokenControllerStarter struct {
rootClientBuilder controller.ControllerClientBuilder
}

if ctx.IsControllerEnabled(nodeControllerName) {
_, clusterCIDR, err := net.ParseCIDR(s.ClusterCIDR)
if err != nil {
glog.Warningf("Unsuccessful parsing of cluster CIDR %v: %v", s.ClusterCIDR, err)
}
_, serviceCIDR, err := net.ParseCIDR(s.ServiceCIDR)
if err != nil {
glog.Warningf("Unsuccessful parsing of service CIDR %v: %v", s.ServiceCIDR, err)
}
nodeController, err := nodecontroller.NewNodeController(
sharedInformers.Core().V1().Pods(),
sharedInformers.Core().V1().Nodes(),
sharedInformers.Extensions().V1beta1().DaemonSets(),
cloud,
clientBuilder.ClientOrDie("node-controller"),
s.PodEvictionTimeout.Duration,
s.NodeEvictionRate,
s.SecondaryNodeEvictionRate,
s.LargeClusterSizeThreshold,
s.UnhealthyZoneThreshold,
s.NodeMonitorGracePeriod.Duration,
s.NodeStartupGracePeriod.Duration,
s.NodeMonitorPeriod.Duration,
clusterCIDR,
serviceCIDR,
int(s.NodeCIDRMaskSize),
s.AllocateNodeCIDRs,
nodecontroller.CIDRAllocatorType(s.CIDRAllocatorType),
s.EnableTaintManager,
utilfeature.DefaultFeatureGate.Enabled(features.TaintBasedEvictions),
)
if err != nil {
return fmt.Errorf("failed to initialize nodecontroller: %v", err)
}
go nodeController.Run(stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
} else {
glog.Warningf("%q is disabled", nodeControllerName)
func (c serviceAccountTokenControllerStarter) startServiceAccountTokenController(ctx ControllerContext) (bool, error) {
if !ctx.IsControllerEnabled(saTokenControllerName) {
glog.Warningf("%q is disabled", saTokenControllerName)
return false, nil
}

if ctx.IsControllerEnabled(serviceControllerName) {
serviceController, err := servicecontroller.New(
cloud,
clientBuilder.ClientOrDie("service-controller"),
sharedInformers.Core().V1().Services(),
sharedInformers.Core().V1().Nodes(),
s.ClusterName,
)
if err != nil {
glog.Errorf("Failed to start service controller: %v", err)
} else {
go serviceController.Run(stop, int(s.ConcurrentServiceSyncs))
}
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
} else {
glog.Warningf("%q is disabled", serviceControllerName)
if len(ctx.Options.ServiceAccountKeyFile) == 0 {
glog.Warningf("%q is disabled because there is no private key", saTokenControllerName)
return false, nil
}

if ctx.IsControllerEnabled(routeControllerName) {
_, clusterCIDR, err := net.ParseCIDR(s.ClusterCIDR)
if err != nil {
glog.Warningf("Unsuccessful parsing of cluster CIDR %v: %v", s.ClusterCIDR, err)
}
if s.AllocateNodeCIDRs && s.ConfigureCloudRoutes {
if cloud == nil {
glog.Warning("configure-cloud-routes is set, but no cloud provider specified. Will not configure cloud provider routes.")
} else if routes, ok := cloud.Routes(); !ok {
glog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.")
} else {
routeController := routecontroller.New(routes, clientBuilder.ClientOrDie("route-controller"), sharedInformers.Core().V1().Nodes(), s.ClusterName, clusterCIDR)
go routeController.Run(stop, s.RouteReconciliationPeriod.Duration)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
}
} else {
glog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", s.AllocateNodeCIDRs, s.ConfigureCloudRoutes)
}
} else {
glog.Warningf("%q is disabled", routeControllerName)
privateKey, err := serviceaccount.ReadPrivateKey(ctx.Options.ServiceAccountKeyFile)
if err != nil {
return true, fmt.Errorf("error reading key for service account token controller: %v", err)
}

if ctx.IsControllerEnabled(pvBinderControllerName) {
params := persistentvolumecontroller.ControllerParameters{
KubeClient: clientBuilder.ClientOrDie("persistent-volume-binder"),
SyncPeriod: s.PVClaimBinderSyncPeriod.Duration,
VolumePlugins: ProbeControllerVolumePlugins(cloud, s.VolumeConfiguration),
Cloud: cloud,
ClusterName: s.ClusterName,
VolumeInformer: sharedInformers.Core().V1().PersistentVolumes(),
ClaimInformer: sharedInformers.Core().V1().PersistentVolumeClaims(),
ClassInformer: sharedInformers.Storage().V1().StorageClasses(),
EnableDynamicProvisioning: s.VolumeConfiguration.EnableDynamicProvisioning,
var rootCA []byte
if ctx.Options.RootCAFile != "" {
rootCA, err = ioutil.ReadFile(ctx.Options.RootCAFile)
if err != nil {
return true, fmt.Errorf("error reading root-ca-file at %s: %v", ctx.Options.RootCAFile, err)
}
volumeController, volumeControllerErr := persistentvolumecontroller.NewController(params)
if volumeControllerErr != nil {
return fmt.Errorf("failed to construct persistentvolume controller: %v", volumeControllerErr)
if _, err := certutil.ParseCertsPEM(rootCA); err != nil {
return true, fmt.Errorf("error parsing root-ca-file at %s: %v", ctx.Options.RootCAFile, err)
}
go volumeController.Run(stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
} else {
glog.Warningf("%q is disabled", pvBinderControllerName)
rootCA = c.rootClientBuilder.ConfigOrDie("tokens-controller").CAData
}

if ctx.IsControllerEnabled(attachDetachControllerName) {
if s.ReconcilerSyncLoopPeriod.Duration < time.Second {
return fmt.Errorf("Duration time must be greater than one second as set via command line option reconcile-sync-loop-period.")
}
attachDetachController, attachDetachControllerErr :=
attachdetach.NewAttachDetachController(
clientBuilder.ClientOrDie("attachdetach-controller"),
sharedInformers.Core().V1().Pods(),
sharedInformers.Core().V1().Nodes(),
sharedInformers.Core().V1().PersistentVolumeClaims(),
sharedInformers.Core().V1().PersistentVolumes(),
cloud,
ProbeAttachableVolumePlugins(s.VolumeConfiguration),
s.DisableAttachDetachReconcilerSync,
s.ReconcilerSyncLoopPeriod.Duration)
if attachDetachControllerErr != nil {
return fmt.Errorf("failed to start attach/detach controller: %v", attachDetachControllerErr)
}
go attachDetachController.Run(stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
} else {
glog.Warningf("%q is disabled", attachDetachControllerName)
}
controller := serviceaccountcontroller.NewTokensController(
ctx.InformerFactory.Core().V1().ServiceAccounts(),
ctx.InformerFactory.Core().V1().Secrets(),
c.rootClientBuilder.ClientOrDie("tokens-controller"),
serviceaccountcontroller.TokensControllerOptions{
TokenGenerator: serviceaccount.JWTTokenGenerator(privateKey),
RootCA: rootCA,
},
)
go controller.Run(int(ctx.Options.ConcurrentSATokenSyncs), ctx.Stop)

sharedInformers.Start(stop)
// start the first set of informers now so that other controllers can start
ctx.InformerFactory.Start(ctx.Stop)
Copy link
Member

@liggitt liggitt Jun 1, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it odd to tie sharedInformers to this controller? if you don't provide a private key and this never starts, doesn't it block other controllers? if you're not running with --use-service-account-credentials, what else in the controller manager depends on this controller?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it odd to tie sharedInformers to this controller?

Not really, this is the logical order from before, it's just slightly more obvious

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what else in the controller manager depends on this controller?

Not completely certain. I think the future is subdivision.


select {}
return true, nil
}