Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

shared controller informers #23575

Merged
merged 2 commits into from
Apr 18, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 7 additions & 2 deletions cmd/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller"
endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint"
"k8s.io/kubernetes/pkg/controller/framework/informers"
nodecontroller "k8s.io/kubernetes/pkg/controller/node"
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing"
Expand Down Expand Up @@ -194,14 +195,18 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string
eventBroadcaster.StartRecordingToSink(cl.Events(""))
scheduler.New(schedulerConfig).Run()

podInformer := informers.CreateSharedPodInformer(clientset, controller.NoResyncPeriodFunc())

// ensure the service endpoints are sync'd several times within the window that the integration tests wait
go endpointcontroller.NewEndpointController(clientset, controller.NoResyncPeriodFunc).
go endpointcontroller.NewEndpointController(podInformer, clientset).
Run(3, wait.NeverStop)

// TODO: Write an integration test for the replication controllers watch.
go replicationcontroller.NewReplicationManager(clientset, controller.NoResyncPeriodFunc, replicationcontroller.BurstReplicas, 4096).
go replicationcontroller.NewReplicationManager(podInformer, clientset, controller.NoResyncPeriodFunc, replicationcontroller.BurstReplicas, 4096).
Run(3, wait.NeverStop)

go podInformer.Run(wait.NeverStop)

nodeController := nodecontroller.NewNodeController(nil, clientset, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(),
40*time.Second, 60*time.Second, 5*time.Second, nil, false)
nodeController.Run(5 * time.Second)
Expand Down
15 changes: 14 additions & 1 deletion cmd/kube-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"net/http"
"net/http/pprof"
"os"
"reflect"
"strconv"
"time"

Expand All @@ -48,6 +49,8 @@ import (
"k8s.io/kubernetes/pkg/controller/daemon"
"k8s.io/kubernetes/pkg/controller/deployment"
endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/controller/gc"
"k8s.io/kubernetes/pkg/controller/job"
namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace"
Expand Down Expand Up @@ -189,11 +192,16 @@ func Run(s *options.CMServer) error {
}

func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig *restclient.Config, stop <-chan struct{}) error {
go endpointcontroller.NewEndpointController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "endpoint-controller")), ResyncPeriod(s)).
podInformer := informers.CreateSharedPodInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pod-informer")), ResyncPeriod(s)())
informers := map[reflect.Type]framework.SharedInformer{}
informers[reflect.TypeOf(&api.Pod{})] = podInformer

go endpointcontroller.NewEndpointController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "endpoint-controller"))).
Run(s.ConcurrentEndpointSyncs, wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))

go replicationcontroller.NewReplicationManager(
podInformer,
clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replication-controller")),
ResyncPeriod(s),
replicationcontroller.BurstReplicas,
Expand Down Expand Up @@ -410,6 +418,11 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
).Run()
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))

// run the shared informers
for _, informer := range informers {
go informer.Run(wait.NeverStop)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@deads2k I'm creating a new controller so I was looking into using a shared informer. Some questions:

  1. Are shared informers ready for wider use, or are they still a work in progress?
  2. All shared informers (including the podInformer) are started here (Run(...) is called) in the controllermanager, but I see that the individual controllers also start the pod informer passed into them manually (internalPodInformer.Run(...)), won't this result in the podInformer's Run(...) getting called twice?

Copy link
Contributor

@sjenning sjenning Apr 28, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@saad-ali the internalPodInformer field of the controllers is only set when calling the New_Resource_ControllerFromClient() functions. All of the Run() functions have a check to see if the internalPodInformer != nil before trying to start it.

Said another way, tests use New_Resource_ControllerFromClient() so that the Run() will start the informer. The regular New_Resource_Controller() function, don't set internalPodInformer, and the shared informer is started here.

Also, FYI I have PR #24841 open for converting some of the remaining controllers to using the shared pod informer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sjenning that makes perfect sense. Thanks for the clarification.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are shared informers ready for wider use, or are they still a work in progress?

They are ready for wider use. The initial transitions of controllers to a shared pod informer went very smoothly, so I don't expect any structural problems. Remember not to mess with the object in the cache, its shared.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have called the informer method that your register on
"AddMeAsLongAsIPromiseNotToMutateTheSharedCache"

On Thu, Apr 28, 2016 at 8:54 AM, David Eads notifications@github.com
wrote:

In cmd/kube-controller-manager/app/controllermanager.go
#23575 (comment)
:

@@ -410,6 +418,11 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
).Run()
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))

  • // run the shared informers
  • for _, informer := range informers {
  •   go informer.Run(wait.NeverStop)
    

Are shared informers ready for wider use, or are they still a work in
progress?

They are ready for wider use. The initial transitions of controllers to a
shared pod informer went very smoothly, so I don't expect any structural
problems. Remember not to mess with the object in the cache, its shared.


You are receiving this because you were mentioned.
Reply to this email directly or view it on GitHub
https://github.com/kubernetes/kubernetes/pull/23575/files/f0c33d65b6fea84d791059d96c93a15e4b3de4ec#r61421456

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:-)
but I completely agree

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are ready for wider use. The initial transitions of controllers to a shared pod informer went very smoothly, so I don't expect any structural problems. Remember not to mess with the object in the cache, its shared.

Sounds good, thanks!

}

select {}
}

Expand Down
4 changes: 2 additions & 2 deletions contrib/mesos/pkg/controllermanager/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (s *CMServer) Run(_ []string) error {
endpoints := s.createEndpointController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "endpoint-controller")))
go endpoints.Run(s.ConcurrentEndpointSyncs, wait.NeverStop)

go replicationcontroller.NewReplicationManager(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replication-controller")), s.resyncPeriod, replicationcontroller.BurstReplicas, s.LookupCacheSizeForRC).
go replicationcontroller.NewReplicationManagerFromClient(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replication-controller")), s.resyncPeriod, replicationcontroller.BurstReplicas, s.LookupCacheSizeForRC).
Run(s.ConcurrentRCSyncs, wait.NeverStop)

if s.TerminatedPodGCThreshold > 0 {
Expand Down Expand Up @@ -346,7 +346,7 @@ func (s *CMServer) createEndpointController(client *clientset.Clientset) kmendpo
return kmendpoint.NewEndpointController(client)
}
glog.V(2).Infof("Creating podIP:containerPort endpoint controller")
stockEndpointController := endpointcontroller.NewEndpointController(client, s.resyncPeriod)
stockEndpointController := endpointcontroller.NewEndpointControllerFromClient(client, s.resyncPeriod)
return stockEndpointController
}

Expand Down
52 changes: 32 additions & 20 deletions pkg/controller/endpoint/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
Expand All @@ -58,7 +59,7 @@ var (
)

// NewEndpointController returns a new *EndpointController.
func NewEndpointController(client *clientset.Clientset, resyncPeriod controller.ResyncPeriodFunc) *EndpointController {
func NewEndpointController(podInformer framework.SharedInformer, client *clientset.Clientset) *EndpointController {
e := &EndpointController{
client: client,
queue: workqueue.New(),
Expand All @@ -85,24 +86,23 @@ func NewEndpointController(client *clientset.Clientset, resyncPeriod controller.
},
)

e.podStore.Store, e.podController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return e.client.Core().Pods(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return e.client.Core().Pods(api.NamespaceAll).Watch(options)
},
},
&api.Pod{},
resyncPeriod(),
framework.ResourceEventHandlerFuncs{
AddFunc: e.addPod,
UpdateFunc: e.updatePod,
DeleteFunc: e.deletePod,
},
)
e.podStoreSynced = e.podController.HasSynced
podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
AddFunc: e.addPod,
UpdateFunc: e.updatePod,
DeleteFunc: e.deletePod,
})
e.podStore.Store = podInformer.GetStore()
e.podController = podInformer.GetController()
e.podStoreSynced = podInformer.HasSynced

return e
}

// NewEndpointControllerFromClient returns a new *EndpointController that runs its own informer.
func NewEndpointControllerFromClient(client *clientset.Clientset, resyncPeriod controller.ResyncPeriodFunc) *EndpointController {
podInformer := informers.CreateSharedPodInformer(client, resyncPeriod())
e := NewEndpointController(podInformer, client)
e.internalPodInformer = podInformer

return e
}
Expand All @@ -114,6 +114,13 @@ type EndpointController struct {
serviceStore cache.StoreToServiceLister
podStore cache.StoreToPodLister

// internalPodInformer is used to hold a personal informer. If we're using
// a normal shared informer, then the informer will be started for us. If
// we have a personal informer, we must start it ourselves. If you start
// the controller using NewEndpointController(passing SharedInformer), this
// will be null
internalPodInformer framework.SharedInformer

// Services that need to be updated. A channel is inappropriate here,
// because it allows services with lots of pods to be serviced much
// more often than services with few pods; it also would cause a
Expand All @@ -124,7 +131,7 @@ type EndpointController struct {
// Since we join two objects, we'll watch both of them with
// controllers.
serviceController *framework.Controller
podController *framework.Controller
podController framework.ControllerInterface
// podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podStoreSynced func() bool
Expand All @@ -144,6 +151,11 @@ func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) {
time.Sleep(5 * time.Minute) // give time for our cache to fill
e.checkLeftoverEndpoints()
}()

if e.internalPodInformer != nil {
go e.internalPodInformer.Run(stopCh)
}

<-stopCh
e.queue.ShutDown()
}
Expand Down
24 changes: 12 additions & 12 deletions pkg/controller/endpoint/endpoints_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
// TODO: Uncomment when fix #19254
// defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
endpoints.serviceStore.Store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
Expand Down Expand Up @@ -142,7 +142,7 @@ func TestCheckLeftoverEndpoints(t *testing.T) {
// TODO: Uncomment when fix #19254
// defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
endpoints.checkLeftoverEndpoints()

Expand Down Expand Up @@ -172,7 +172,7 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) {
// TODO: Uncomment when fix #19254
// defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady

addPods(endpoints.podStore.Store, ns, 1, 1, 0)
Expand Down Expand Up @@ -216,7 +216,7 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) {
// TODO: Uncomment when fix #19254
// defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Store, ns, 1, 1, 0)
endpoints.serviceStore.Store.Add(&api.Service{
Expand Down Expand Up @@ -256,7 +256,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
// TODO: Uncomment when fix #19254
// defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Store, ns, 1, 1, 0)
endpoints.serviceStore.Store.Add(&api.Service{
Expand Down Expand Up @@ -295,7 +295,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) {
// TODO: Uncomment when fix #19254
// defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Store, ns, 0, 1, 1)
endpoints.serviceStore.Store.Add(&api.Service{
Expand Down Expand Up @@ -334,7 +334,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) {
// TODO: Uncomment when fix #19254
// defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Store, ns, 1, 1, 1)
endpoints.serviceStore.Store.Add(&api.Service{
Expand Down Expand Up @@ -377,7 +377,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) {
// TODO: Uncomment when fix #19254
// defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Store, ns, 1, 1, 0)
endpoints.serviceStore.Store.Add(&api.Service{
Expand Down Expand Up @@ -419,7 +419,7 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
// TODO: Uncomment when fix #19254
// defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Store, api.NamespaceDefault, 1, 1, 0)
endpoints.serviceStore.Store.Add(&api.Service{
Expand All @@ -440,7 +440,7 @@ func TestSyncEndpointsItems(t *testing.T) {
// TODO: Uncomment when fix #19254
// defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Store, ns, 3, 2, 0)
addPods(endpoints.podStore.Store, "blah", 5, 2, 0) // make sure these aren't found!
Expand Down Expand Up @@ -484,7 +484,7 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) {
// TODO: Uncomment when fix #19254
// defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Store, ns, 3, 2, 0)
serviceLabels := map[string]string{"foo": "bar"}
Expand Down Expand Up @@ -546,7 +546,7 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) {
// TODO: Uncomment when fix #19254
// defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Store, ns, 1, 1, 0)
serviceLabels := map[string]string{"baz": "blah"}
Expand Down
6 changes: 6 additions & 0 deletions pkg/controller/framework/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ type Controller struct {
reflectorMutex sync.RWMutex
}

// TODO make the "Controller" private, and convert all references to use ControllerInterface instead
type ControllerInterface interface {
Run(stopCh <-chan struct{})
HasSynced() bool
}

// New makes a new Controller from the given Config.
func New(c *Config) *Controller {
ctlr := &Controller{
Expand Down
44 changes: 44 additions & 0 deletions pkg/controller/framework/informers/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package informers

import (
"time"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
)

// CreateSharedPodInformer returns a SharedInformer that lists and watches all pods
func CreateSharedPodInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedInformer {
sharedInformer := framework.NewSharedInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.Core().Pods(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return client.Core().Pods(api.NamespaceAll).Watch(options)
},
},
&api.Pod{}, resyncPeriod)

return sharedInformer
}