Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

convert deployment controller to shared informers #34349

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/kube-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl

if containsResource(resources, "deployments") {
glog.Infof("Starting deployment controller")
go deployment.NewDeploymentController(client("deployment-controller"), ResyncPeriod(s)).
go deployment.NewDeploymentController(sharedInformers.Deployments(), sharedInformers.ReplicaSets(), sharedInformers.Pods(), client("deployment-controller")).
Run(int(s.ConcurrentDeploymentSyncs), wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
}
Expand Down
132 changes: 33 additions & 99 deletions pkg/controller/deployment/deployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,12 @@ import (
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/deployment/util"
"k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/metrics"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"
)

const (
Expand All @@ -67,34 +66,28 @@ type DeploymentController struct {
syncHandler func(dKey string) error

// A store of deployments, populated by the dController
dLister cache.StoreToDeploymentLister
// Watches changes to all deployments
dController *cache.Controller
dLister *cache.StoreToDeploymentLister
// A store of ReplicaSets, populated by the rsController
rsLister cache.StoreToReplicaSetLister
// Watches changes to all ReplicaSets
rsController *cache.Controller
rsLister *cache.StoreToReplicaSetLister
// A store of pods, populated by the podController
podLister cache.StoreToPodLister
// Watches changes to all pods
podController *cache.Controller
podLister *cache.StoreToPodLister

// dListerSynced returns true if the Deployment store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
dListerSynced func() bool
dListerSynced cache.InformerSynced
// rsListerSynced returns true if the ReplicaSet store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
rsListerSynced func() bool
rsListerSynced cache.InformerSynced
// podListerSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podListerSynced func() bool
podListerSynced cache.InformerSynced

// Deployments that need to be synced
queue workqueue.RateLimitingInterface
}

// NewDeploymentController creates a new DeploymentController.
func NewDeploymentController(client clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) *DeploymentController {
func NewDeploymentController(dInformer informers.DeploymentInformer, rsInformer informers.ReplicaSetInformer, podInformer informers.PodInformer, client clientset.Interface) *DeploymentController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
// TODO: remove the wrapper when every clients have moved to use the clientset.
Expand All @@ -109,85 +102,41 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
}

dc.dLister.Indexer, dc.dController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return dc.client.Extensions().Deployments(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return dc.client.Extensions().Deployments(api.NamespaceAll).Watch(options)
},
},
&extensions.Deployment{},
FullDeploymentResyncPeriod,
cache.ResourceEventHandlerFuncs{
AddFunc: dc.addDeploymentNotification,
UpdateFunc: dc.updateDeploymentNotification,
// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
DeleteFunc: dc.deleteDeploymentNotification,
},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)

dc.rsLister.Indexer, dc.rsController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return dc.client.Extensions().ReplicaSets(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return dc.client.Extensions().ReplicaSets(api.NamespaceAll).Watch(options)
},
},
&extensions.ReplicaSet{},
resyncPeriod(),
cache.ResourceEventHandlerFuncs{
AddFunc: dc.addReplicaSet,
UpdateFunc: dc.updateReplicaSet,
DeleteFunc: dc.deleteReplicaSet,
},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)

dc.podLister.Indexer, dc.podController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return dc.client.Core().Pods(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return dc.client.Core().Pods(api.NamespaceAll).Watch(options)
},
},
&api.Pod{},
resyncPeriod(),
cache.ResourceEventHandlerFuncs{
AddFunc: dc.addPod,
UpdateFunc: dc.updatePod,
DeleteFunc: dc.deletePod,
},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dc.addDeploymentNotification,
UpdateFunc: dc.updateDeploymentNotification,
// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
DeleteFunc: dc.deleteDeploymentNotification,
})
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dc.addReplicaSet,
UpdateFunc: dc.updateReplicaSet,
DeleteFunc: dc.deleteReplicaSet,
})
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dc.addPod,
UpdateFunc: dc.updatePod,
DeleteFunc: dc.deletePod,
})

dc.syncHandler = dc.syncDeployment
dc.dListerSynced = dc.dController.HasSynced
dc.rsListerSynced = dc.rsController.HasSynced
dc.podListerSynced = dc.podController.HasSynced
dc.dLister = dInformer.Lister()
dc.rsLister = rsInformer.Lister()
dc.podLister = podInformer.Lister()
dc.dListerSynced = dInformer.Informer().HasSynced
dc.rsListerSynced = dInformer.Informer().HasSynced
dc.podListerSynced = dInformer.Informer().HasSynced
return dc
}

// Run begins watching and syncing.
func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer dc.queue.ShutDown()

go dc.dController.Run(stopCh)
go dc.rsController.Run(stopCh)
go dc.podController.Run(stopCh)
glog.Infof("Starting deployment controller")

// Wait for the rc and dc stores to sync before starting any work in this controller.
ready := make(chan struct{})
go dc.waitForSyncedListers(ready, stopCh)
select {
case <-ready:
case <-stopCh:
if !cache.WaitForCacheSync(stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
return
}

Expand All @@ -197,21 +146,6 @@ func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {

<-stopCh
glog.Infof("Shutting down deployment controller")
dc.queue.ShutDown()
}

func (dc *DeploymentController) waitForSyncedListers(ready chan<- struct{}, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()

for !dc.dListerSynced() || !dc.rsListerSynced() || !dc.podListerSynced() {
select {
case <-time.After(StoreSyncedPollPeriod):
case <-stopCh:
return
}
}

close(ready)
}

func (dc *DeploymentController) addDeploymentNotification(obj interface{}) {
Expand Down
33 changes: 28 additions & 5 deletions pkg/controller/deployment/deployment_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/intstr"
"k8s.io/kubernetes/pkg/util/uuid"
Expand Down Expand Up @@ -168,8 +169,10 @@ func newFixture(t *testing.T) *fixture {

func (f *fixture) run(deploymentName string) {
f.client = fake.NewSimpleClientset(f.objects...)
c := NewDeploymentController(f.client, controller.NoResyncPeriodFunc)
informers := informers.NewSharedInformerFactory(f.client, controller.NoResyncPeriodFunc())
c := NewDeploymentController(informers.Deployments(), informers.ReplicaSets(), informers.Pods(), f.client)
c.eventRecorder = &record.FakeRecorder{}
c.dListerSynced = alwaysReady
c.rsListerSynced = alwaysReady
c.podListerSynced = alwaysReady
for _, d := range f.dLister {
Expand All @@ -181,20 +184,35 @@ func (f *fixture) run(deploymentName string) {
for _, pod := range f.podLister {
c.podLister.Indexer.Add(pod)
}
stopCh := make(chan struct{})
defer close(stopCh)
informers.Start(stopCh)

err := c.syncDeployment(deploymentName)
if err != nil {
f.t.Errorf("error syncing deployment: %v", err)
}

actions := f.client.Actions()
informerActions := 0
for i, action := range actions {
if len(f.actions) < i+1 {
if len(action.GetNamespace()) == 0 &&
(action.Matches("list", "pods") ||
action.Matches("list", "replicasets") ||
action.Matches("list", "deployments") ||
action.Matches("watch", "pods") ||
action.Matches("watch", "replicasets") ||
action.Matches("watch", "deployments")) {
informerActions++
continue
}

if len(f.actions)+informerActions < i+1 {
f.t.Errorf("%d unexpected actions: %+v", len(actions)-len(f.actions), actions[i:])
break
}

expectedAction := f.actions[i]
expectedAction := f.actions[i-informerActions]
if !expectedAction.Matches(action.GetVerb(), action.GetResource().Resource) {
f.t.Errorf("Expected\n\t%#v\ngot\n\t%#v", expectedAction, action)
continue
Expand Down Expand Up @@ -236,12 +254,17 @@ func TestSyncDeploymentDontDoAnythingDuringDeletion(t *testing.T) {
// issue: https://github.com/kubernetes/kubernetes/issues/23218
func TestDeploymentController_dontSyncDeploymentsWithEmptyPodSelector(t *testing.T) {
fake := &fake.Clientset{}
controller := NewDeploymentController(fake, controller.NoResyncPeriodFunc)

informers := informers.NewSharedInformerFactory(fake, controller.NoResyncPeriodFunc())
controller := NewDeploymentController(informers.Deployments(), informers.ReplicaSets(), informers.Pods(), fake)
controller.eventRecorder = &record.FakeRecorder{}
controller.dListerSynced = alwaysReady
controller.rsListerSynced = alwaysReady
controller.podListerSynced = alwaysReady

stopCh := make(chan struct{})
defer close(stopCh)
informers.Start(stopCh)

d := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"})
empty := unversioned.LabelSelector{}
d.Spec.Selector = &empty
Expand Down
9 changes: 8 additions & 1 deletion pkg/controller/deployment/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/controller"
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
"k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/util/intstr"
)

Expand Down Expand Up @@ -346,15 +347,21 @@ func TestDeploymentController_cleanupDeployment(t *testing.T) {
for i := range tests {
test := tests[i]
fake := &fake.Clientset{}
controller := NewDeploymentController(fake, controller.NoResyncPeriodFunc)
informers := informers.NewSharedInformerFactory(fake, controller.NoResyncPeriodFunc())
controller := NewDeploymentController(informers.Deployments(), informers.ReplicaSets(), informers.Pods(), fake)

controller.eventRecorder = &record.FakeRecorder{}
controller.dListerSynced = alwaysReady
controller.rsListerSynced = alwaysReady
controller.podListerSynced = alwaysReady
for _, rs := range test.oldRSs {
controller.rsLister.Indexer.Add(rs)
}

stopCh := make(chan struct{})
defer close(stopCh)
informers.Start(stopCh)

d := newDeployment("foo", 1, &test.revisionHistoryLimit, nil, nil, map[string]string{"foo": "bar"})
controller.cleanupDeployment(test.oldRSs, d)

Expand Down
5 changes: 4 additions & 1 deletion pkg/controller/informers/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package informers

import (
"reflect"
"time"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/extensions"
Expand Down Expand Up @@ -98,7 +99,9 @@ func (f *deploymentInformer) Informer() cache.SharedIndexInformer {
},
},
&extensions.Deployment{},
f.defaultResync,
// TODO remove this. It is hardcoded so that "Waiting for the second deployment to clear overlapping annotation" in
// "overlapping deployment should not fight with each other" will work since it requires a full resync to work properly.
30*time.Second,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
f.informers[informerType] = informer
Expand Down