Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

convert replica set controller to shared informer #34962

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/kube-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl

if containsResource(resources, "replicasets") {
glog.Infof("Starting ReplicaSet controller")
go replicaset.NewReplicaSetController(sharedInformers.Pods().Informer(), client("replicaset-controller"), ResyncPeriod(s), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS), s.EnableGarbageCollector).
go replicaset.NewReplicaSetController(sharedInformers.ReplicaSets(), sharedInformers.Pods(), client("replicaset-controller"), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS), s.EnableGarbageCollector).
Run(int(s.ConcurrentRSSyncs), wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
}
Expand Down
117 changes: 32 additions & 85 deletions pkg/controller/replicaset/replica_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,11 @@ import (
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/metrics"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"
)

const (
Expand Down Expand Up @@ -75,13 +73,6 @@ type ReplicaSetController struct {
kubeClient clientset.Interface
podControl controller.PodControlInterface

// internalPodInformer is used to hold a personal informer. If we're using
// a normal shared informer, then the informer will be started for us. If
// we have a personal informer, we must start it ourselves. If you start
// the controller using NewReplicationManager(passing SharedInformer), this
// will be null
internalPodInformer cache.SharedIndexInformer

// A ReplicaSet is temporarily suspended after creating/deleting these many replicas.
// It resumes normal action after observing the watch events for them.
burstReplicas int
Expand All @@ -92,16 +83,12 @@ type ReplicaSetController struct {
expectations *controller.UIDTrackingControllerExpectations

// A store of ReplicaSets, populated by the rsController
rsStore cache.StoreToReplicaSetLister
// Watches changes to all ReplicaSets
rsController *cache.Controller
rsLister *cache.StoreToReplicaSetLister
// A store of pods, populated by the podController
podStore cache.StoreToPodLister
// Watches changes to all pods
podController cache.ControllerInterface
// podStoreSynced returns true if the pod store has been synced at least once.
podLister *cache.StoreToPodLister
// podListerSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podStoreSynced func() bool
podListerSynced cache.InformerSynced

lookupCache *controller.MatchingCache

Expand All @@ -113,84 +100,52 @@ type ReplicaSetController struct {
garbageCollectorEnabled bool
}

// NewReplicaSetController creates a new ReplicaSetController.
func NewReplicaSetController(podInformer cache.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicaSetController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")})

return newReplicaSetController(
eventBroadcaster.NewRecorder(api.EventSource{Component: "replicaset-controller"}),
podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled)
}

// newReplicaSetController configures a replica set controller with the specified event recorder
func newReplicaSetController(eventRecorder record.EventRecorder, podInformer cache.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicaSetController {
// NewReplicaSetController configures a replica set controller with the specified event recorder
func NewReplicaSetController(rsInformer informers.ReplicaSetInformer, podInformer informers.PodInformer, kubeClient clientset.Interface, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicaSetController {
if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("replicaset_controller", kubeClient.Core().GetRESTClient().GetRateLimiter())
}
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")})

rsc := &ReplicaSetController{
kubeClient: kubeClient,
podControl: controller.RealPodControl{
KubeClient: kubeClient,
Recorder: eventRecorder,
Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "replicaset-controller"}),
},
burstReplicas: burstReplicas,
expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "replicaset"),
garbageCollectorEnabled: garbageCollectorEnabled,
}

rsc.rsStore.Indexer, rsc.rsController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return rsc.kubeClient.Extensions().ReplicaSets(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return rsc.kubeClient.Extensions().ReplicaSets(api.NamespaceAll).Watch(options)
},
},
&extensions.ReplicaSet{},
// TODO: Can we have much longer period here?
FullControllerResyncPeriod,
cache.ResourceEventHandlerFuncs{
AddFunc: rsc.enqueueReplicaSet,
UpdateFunc: rsc.updateRS,
// This will enter the sync loop and no-op, because the replica set has been deleted from the store.
// Note that deleting a replica set immediately after scaling it to 0 will not work. The recommended
// way of achieving this is by performing a `stop` operation on the replica set.
DeleteFunc: rsc.enqueueReplicaSet,
},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)

podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: rsc.enqueueReplicaSet,
UpdateFunc: rsc.updateRS,
// This will enter the sync loop and no-op, because the replica set has been deleted from the store.
// Note that deleting a replica set immediately after scaling it to 0 will not work. The recommended
// way of achieving this is by performing a `stop` operation on the replica set.
DeleteFunc: rsc.enqueueReplicaSet,
})
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: rsc.addPod,
// This invokes the ReplicaSet for every pod change, eg: host assignment. Though this might seem like
// overkill the most frequent pod update is status, and the associated ReplicaSet will only list from
// local storage, so it should be ok.
UpdateFunc: rsc.updatePod,
DeleteFunc: rsc.deletePod,
})
rsc.podStore.Indexer = podInformer.GetIndexer()
rsc.podController = podInformer.GetController()

rsc.syncHandler = rsc.syncReplicaSet
rsc.podStoreSynced = rsc.podController.HasSynced
rsc.rsLister = rsInformer.Lister()
rsc.podLister = podInformer.Lister()
rsc.podListerSynced = podInformer.Informer().HasSynced
rsc.lookupCache = controller.NewMatchingCache(lookupCacheSize)
return rsc
}

// NewReplicationManagerFromClient creates a new ReplicationManager that runs its own informer.
func NewReplicaSetControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicaSetController {
podInformer := informers.NewPodInformer(kubeClient, resyncPeriod())
garbageCollectorEnabled := false
rsc := NewReplicaSetController(podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled)
rsc.internalPodInformer = podInformer
return rsc
}

// SetEventRecorder replaces the event recorder used by the ReplicaSetController
// with the given recorder. Only used for testing.
func (rsc *ReplicaSetController) SetEventRecorder(recorder record.EventRecorder) {
Expand All @@ -204,16 +159,16 @@ func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer rsc.queue.ShutDown()

go rsc.rsController.Run(stopCh)
go rsc.podController.Run(stopCh)
glog.Infof("Starting ReplicaSet controller")

if !cache.WaitForCacheSync(stopCh, rsc.podListerSynced) {
return
}

for i := 0; i < workers; i++ {
go wait.Until(rsc.worker, time.Second, stopCh)
}

if rsc.internalPodInformer != nil {
go rsc.internalPodInformer.Run(stopCh)
}
<-stopCh
glog.Infof("Shutting down ReplicaSet Controller")
}
Expand All @@ -236,7 +191,7 @@ func (rsc *ReplicaSetController) getPodReplicaSet(pod *api.Pod) *extensions.Repl
}

// if not cached or cached value is invalid, search all the rs to find the matching one, and update cache
rss, err := rsc.rsStore.GetPodReplicaSets(pod)
rss, err := rsc.rsLister.GetPodReplicaSets(pod)
if err != nil {
glog.V(4).Infof("No ReplicaSets found for pod %v, ReplicaSet controller will avoid syncing", pod.Name)
return nil
Expand Down Expand Up @@ -300,7 +255,7 @@ func (rsc *ReplicaSetController) updateRS(old, cur interface{}) {

// isCacheValid check if the cache is valid
func (rsc *ReplicaSetController) isCacheValid(pod *api.Pod, cachedRS *extensions.ReplicaSet) bool {
_, err := rsc.rsStore.ReplicaSets(cachedRS.Namespace).Get(cachedRS.Name)
_, err := rsc.rsLister.ReplicaSets(cachedRS.Namespace).Get(cachedRS.Name)
// rs has been deleted or updated, cache is invalid
if err != nil || !isReplicaSetMatch(pod, cachedRS) {
return false
Expand Down Expand Up @@ -582,15 +537,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
glog.V(4).Infof("Finished syncing replica set %q (%v)", key, time.Now().Sub(startTime))
}()

if !rsc.podStoreSynced() {
// Sleep so we give the pod reflector goroutine a chance to run.
time.Sleep(PodStoreSyncedPollPeriod)
glog.Infof("Waiting for pods controller to sync, requeuing ReplicaSet %v", key)
rsc.queue.Add(key)
return nil
}

obj, exists, err := rsc.rsStore.Indexer.GetByKey(key)
obj, exists, err := rsc.rsLister.Indexer.GetByKey(key)
if !exists {
glog.V(4).Infof("ReplicaSet has been deleted %v", key)
rsc.expectations.DeleteExpectations(key)
Expand Down Expand Up @@ -624,7 +571,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
if rsc.garbageCollectorEnabled {
// list all pods to include the pods that don't match the rs`s selector
// anymore but has the stale controller ref.
pods, err := rsc.podStore.Pods(rs.Namespace).List(labels.Everything())
pods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
if err != nil {
return err
}
Expand Down Expand Up @@ -659,7 +606,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
return aggregate
}
} else {
pods, err := rsc.podStore.Pods(rs.Namespace).List(selector)
pods, err := rsc.podLister.Pods(rs.Namespace).List(selector)
if err != nil {
return err
}
Expand Down