Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add namespace index for cache #23795

Merged
merged 2 commits into from
May 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string
eventBroadcaster.StartRecordingToSink(cl.Events(""))
scheduler.New(schedulerConfig).Run()

podInformer := informers.CreateSharedPodInformer(clientset, controller.NoResyncPeriodFunc())
podInformer := informers.CreateSharedPodIndexInformer(clientset, controller.NoResyncPeriodFunc())

// ensure the service endpoints are sync'd several times within the window that the integration tests wait
go endpointcontroller.NewEndpointController(podInformer, clientset).
Expand Down
4 changes: 2 additions & 2 deletions cmd/kube-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,8 @@ func Run(s *options.CMServer) error {
}

func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig *restclient.Config, stop <-chan struct{}) error {
podInformer := informers.CreateSharedPodInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pod-informer")), ResyncPeriod(s)())
informers := map[reflect.Type]framework.SharedInformer{}
podInformer := informers.CreateSharedPodIndexInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pod-informer")), ResyncPeriod(s)())
informers := map[reflect.Type]framework.SharedIndexInformer{}
informers[reflect.TypeOf(&api.Pod{})] = podInformer

go endpointcontroller.NewEndpointController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "endpoint-controller"))).
Expand Down
3 changes: 2 additions & 1 deletion contrib/mesos/pkg/service/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func NewEndpointController(client *clientset.Clientset) *endpointController {
},
)

e.podStore.Store, e.podController = framework.NewInformer(
e.podStore.Indexer, e.podController = framework.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return e.client.Core().Pods(api.NamespaceAll).List(options)
Expand All @@ -92,6 +92,7 @@ func NewEndpointController(client *clientset.Clientset) *endpointController {
UpdateFunc: e.updatePod,
DeleteFunc: e.deletePod,
},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
return e
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/client/cache/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type Indexer interface {
ListIndexFuncValues(indexName string) []string
// ByIndex lists object that match on the named indexing function with the exact key
ByIndex(indexName, indexKey string) ([]interface{}, error)
// GetIndexer return the indexers
GetIndexers() Indexers
}

// IndexFunc knows how to provide an indexed value for an object.
Expand All @@ -53,6 +55,10 @@ func IndexFuncToKeyFuncAdapter(indexFunc IndexFunc) KeyFunc {
}
}

const (
NamespaceIndex string = "namespace"
)

// MetaNamespaceIndexFunc is a default index function that indexes based on an object's namespace
func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {
meta, err := meta.Accessor(obj)
Expand Down
84 changes: 64 additions & 20 deletions pkg/client/cache/listers.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (
// l := StoreToPodLister{s}
// l.List()
type StoreToPodLister struct {
Store
Indexer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a really good idea. I like it a lot.

}

// Please note that selector is filtering among the pods that have gotten into
Expand All @@ -54,7 +54,7 @@ func (s *StoreToPodLister) List(selector labels.Selector) (pods []*api.Pod, err
// s.Pods(api.NamespaceAll).List(selector), however then we'd have to
// remake the list.Items as a []*api.Pod. So leave this separate for
// now.
for _, m := range s.Store.List() {
for _, m := range s.Indexer.List() {
pod := m.(*api.Pod)
if selector.Matches(labels.Set(pod.Labels)) {
pods = append(pods, pod)
Expand All @@ -65,11 +65,11 @@ func (s *StoreToPodLister) List(selector labels.Selector) (pods []*api.Pod, err

// Pods is taking baby steps to be more like the api in pkg/client
func (s *StoreToPodLister) Pods(namespace string) storePodsNamespacer {
return storePodsNamespacer{s.Store, namespace}
return storePodsNamespacer{s.Indexer, namespace}
}

type storePodsNamespacer struct {
store Store
indexer Indexer
namespace string
}

Expand All @@ -78,20 +78,41 @@ type storePodsNamespacer struct {
// that.
func (s storePodsNamespacer) List(selector labels.Selector) (pods api.PodList, err error) {
list := api.PodList{}
for _, m := range s.store.List() {
pod := m.(*api.Pod)
if s.namespace == api.NamespaceAll || s.namespace == pod.Namespace {
if s.namespace == api.NamespaceAll {
for _, m := range s.indexer.List() {
pod := m.(*api.Pod)
if selector.Matches(labels.Set(pod.Labels)) {
list.Items = append(list.Items, *pod)
}
}
return list, nil
}

key := &api.Pod{ObjectMeta: api.ObjectMeta{Namespace: s.namespace}}
items, err := s.indexer.Index(NamespaceIndex, key)
if err != nil {
glog.Warningf("can not retrieve list of objects using index : %v", err)
for _, m := range s.indexer.List() {
pod := m.(*api.Pod)
if s.namespace == pod.Namespace && selector.Matches(labels.Set(pod.Labels)) {
list.Items = append(list.Items, *pod)
}
}
return list, err
}

for _, m := range items {
pod := m.(*api.Pod)
if selector.Matches(labels.Set(pod.Labels)) {
list.Items = append(list.Items, *pod)
}
}
return list, nil
}

// Exists returns true if a pod matching the namespace/name of the given pod exists in the store.
func (s *StoreToPodLister) Exists(pod *api.Pod) (bool, error) {
_, exists, err := s.Store.Get(pod)
_, exists, err := s.Indexer.Get(pod)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -143,12 +164,12 @@ func (s storeToNodeConditionLister) List() (nodes api.NodeList, err error) {

// StoreToReplicationControllerLister gives a store List and Exists methods. The store must contain only ReplicationControllers.
type StoreToReplicationControllerLister struct {
Store
Indexer
}

// Exists checks if the given rc exists in the store.
func (s *StoreToReplicationControllerLister) Exists(controller *api.ReplicationController) (bool, error) {
_, exists, err := s.Store.Get(controller)
_, exists, err := s.Indexer.Get(controller)
if err != nil {
return false, err
}
Expand All @@ -158,29 +179,49 @@ func (s *StoreToReplicationControllerLister) Exists(controller *api.ReplicationC
// StoreToReplicationControllerLister lists all controllers in the store.
// TODO: converge on the interface in pkg/client
func (s *StoreToReplicationControllerLister) List() (controllers []api.ReplicationController, err error) {
for _, c := range s.Store.List() {
for _, c := range s.Indexer.List() {
controllers = append(controllers, *(c.(*api.ReplicationController)))
}
return controllers, nil
}

func (s *StoreToReplicationControllerLister) ReplicationControllers(namespace string) storeReplicationControllersNamespacer {
return storeReplicationControllersNamespacer{s.Store, namespace}
return storeReplicationControllersNamespacer{s.Indexer, namespace}
}

type storeReplicationControllersNamespacer struct {
store Store
indexer Indexer
namespace string
}

func (s storeReplicationControllersNamespacer) List(selector labels.Selector) (controllers []api.ReplicationController, err error) {
for _, c := range s.store.List() {
rc := *(c.(*api.ReplicationController))
if s.namespace == api.NamespaceAll || s.namespace == rc.Namespace {
if s.namespace == api.NamespaceAll {
for _, m := range s.indexer.List() {
rc := *(m.(*api.ReplicationController))
if selector.Matches(labels.Set(rc.Labels)) {
controllers = append(controllers, rc)
}
}
return
}

key := &api.ReplicationController{ObjectMeta: api.ObjectMeta{Namespace: s.namespace}}
items, err := s.indexer.Index(NamespaceIndex, key)
if err != nil {
glog.Warningf("can not retrieve list of objects using index : %v", err)
for _, m := range s.indexer.List() {
rc := *(m.(*api.ReplicationController))
if s.namespace == rc.Namespace && selector.Matches(labels.Set(rc.Labels)) {
controllers = append(controllers, rc)
}
}
return
}
for _, m := range items {
rc := *(m.(*api.ReplicationController))
if selector.Matches(labels.Set(rc.Labels)) {
controllers = append(controllers, rc)
}
}
return
}
Expand All @@ -195,11 +236,14 @@ func (s *StoreToReplicationControllerLister) GetPodControllers(pod *api.Pod) (co
return
}

for _, m := range s.Store.List() {
key := &api.ReplicationController{ObjectMeta: api.ObjectMeta{Namespace: pod.Namespace}}
items, err := s.Indexer.Index(NamespaceIndex, key)
if err != nil {
return
}

for _, m := range items {
rc = *m.(*api.ReplicationController)
if rc.Namespace != pod.Namespace {
continue
}
labelSet := labels.Set(rc.Spec.Selector)
selector = labels.Set(rc.Spec.Selector).AsSelector()

Expand Down
4 changes: 2 additions & 2 deletions pkg/client/cache/listers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func TestStoreToNodeConditionLister(t *testing.T) {
}

func TestStoreToReplicationControllerLister(t *testing.T) {
store := NewStore(MetaNamespaceKeyFunc)
store := NewIndexer(MetaNamespaceKeyFunc, Indexers{NamespaceIndex: MetaNamespaceIndexFunc})
lister := StoreToReplicationControllerLister{store}
testCases := []struct {
inRCs []*api.ReplicationController
Expand Down Expand Up @@ -645,7 +645,7 @@ func TestStoreToJobLister(t *testing.T) {
}

func TestStoreToPodLister(t *testing.T) {
store := NewStore(MetaNamespaceKeyFunc)
store := NewIndexer(MetaNamespaceKeyFunc, Indexers{NamespaceIndex: MetaNamespaceIndexFunc})
ids := []string{"foo", "bar", "baz"}
for _, id := range ids {
store.Add(&api.Pod{
Expand Down
5 changes: 5 additions & 0 deletions pkg/client/cache/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ func (c *cache) ListKeys() []string {
return c.cacheStorage.ListKeys()
}

// GetIndexers returns the indexers of cache
func (c *cache) GetIndexers() Indexers {
return c.cacheStorage.GetIndexers()
}

// Index returns a list of items that match on the index function
// Index is thread-safe so long as you treat all items as immutable
func (c *cache) Index(indexName string, obj interface{}) ([]interface{}, error) {
Expand Down
5 changes: 5 additions & 0 deletions pkg/client/cache/thread_safe_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type ThreadSafeStore interface {
Index(indexName string, obj interface{}) ([]interface{}, error)
ListIndexFuncValues(name string) []string
ByIndex(indexName, indexKey string) ([]interface{}, error)
GetIndexers() Indexers
}

// threadSafeMap implements ThreadSafeStore
Expand Down Expand Up @@ -187,6 +188,10 @@ func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string {
return names
}

func (c *threadSafeMap) GetIndexers() Indexers {
return c.indexers
}

// updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj
// updateIndices must be called from a function that already has a lock on the cache
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) error {
Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/daemon/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ type DaemonSetsController struct {
queue *workqueue.Type
}

func NewDaemonSetsController(podInformer framework.SharedInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController {
func NewDaemonSetsController(podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
// TODO: remove the wrapper when every clients have moved to use the clientset.
Expand Down Expand Up @@ -183,7 +183,7 @@ func NewDaemonSetsController(podInformer framework.SharedInformer, kubeClient cl
UpdateFunc: dsc.updatePod,
DeleteFunc: dsc.deletePod,
})
dsc.podStore.Store = podInformer.GetStore()
dsc.podStore.Indexer = podInformer.GetIndexer()
dsc.podController = podInformer.GetController()
dsc.podStoreSynced = podInformer.HasSynced

Expand All @@ -210,7 +210,7 @@ func NewDaemonSetsController(podInformer framework.SharedInformer, kubeClient cl
}

func NewDaemonSetsControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController {
podInformer := informers.CreateSharedPodInformer(kubeClient, resyncPeriod())
podInformer := informers.CreateSharedPodIndexInformer(kubeClient, resyncPeriod())
dsc := NewDaemonSetsController(podInformer, kubeClient, resyncPeriod, lookupCacheSize)
dsc.internalPodInformer = podInformer

Expand Down Expand Up @@ -686,7 +686,7 @@ func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *api.Node, ds *exte
newPod.Spec.NodeName = node.Name
pods := []*api.Pod{newPod}

for _, m := range dsc.podStore.Store.List() {
for _, m := range dsc.podStore.Indexer.List() {
pod := m.(*api.Pod)
if pod.Spec.NodeName != node.Name {
continue
Expand Down
32 changes: 16 additions & 16 deletions pkg/controller/daemon/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,10 +419,10 @@ func TestPodIsNotDeletedByDaemonsetWithEmptyLabelSelector(t *testing.T) {
func TestDealsWithExistingPods(t *testing.T) {
manager, podControl := newTestController()
addNodes(manager.nodeStore.Store, 0, 5, nil)
addPods(manager.podStore.Store, "node-1", simpleDaemonSetLabel, 1)
addPods(manager.podStore.Store, "node-2", simpleDaemonSetLabel, 2)
addPods(manager.podStore.Store, "node-3", simpleDaemonSetLabel, 5)
addPods(manager.podStore.Store, "node-4", simpleDaemonSetLabel2, 2)
addPods(manager.podStore.Indexer, "node-1", simpleDaemonSetLabel, 1)
addPods(manager.podStore.Indexer, "node-2", simpleDaemonSetLabel, 2)
addPods(manager.podStore.Indexer, "node-3", simpleDaemonSetLabel, 5)
addPods(manager.podStore.Indexer, "node-4", simpleDaemonSetLabel2, 2)
ds := newDaemonSet("foo")
manager.dsStore.Add(ds)
syncAndValidateDaemonSets(t, manager, ds, podControl, 2, 5)
Expand All @@ -444,10 +444,10 @@ func TestSelectorDaemonDeletesUnselectedPods(t *testing.T) {
manager, podControl := newTestController()
addNodes(manager.nodeStore.Store, 0, 5, nil)
addNodes(manager.nodeStore.Store, 5, 5, simpleNodeLabel)
addPods(manager.podStore.Store, "node-0", simpleDaemonSetLabel2, 2)
addPods(manager.podStore.Store, "node-1", simpleDaemonSetLabel, 3)
addPods(manager.podStore.Store, "node-1", simpleDaemonSetLabel2, 1)
addPods(manager.podStore.Store, "node-4", simpleDaemonSetLabel, 1)
addPods(manager.podStore.Indexer, "node-0", simpleDaemonSetLabel2, 2)
addPods(manager.podStore.Indexer, "node-1", simpleDaemonSetLabel, 3)
addPods(manager.podStore.Indexer, "node-1", simpleDaemonSetLabel2, 1)
addPods(manager.podStore.Indexer, "node-4", simpleDaemonSetLabel, 1)
daemon := newDaemonSet("foo")
daemon.Spec.Template.Spec.NodeSelector = simpleNodeLabel
manager.dsStore.Add(daemon)
Expand All @@ -459,14 +459,14 @@ func TestSelectorDaemonDealsWithExistingPods(t *testing.T) {
manager, podControl := newTestController()
addNodes(manager.nodeStore.Store, 0, 5, nil)
addNodes(manager.nodeStore.Store, 5, 5, simpleNodeLabel)
addPods(manager.podStore.Store, "node-0", simpleDaemonSetLabel, 1)
addPods(manager.podStore.Store, "node-1", simpleDaemonSetLabel, 3)
addPods(manager.podStore.Store, "node-1", simpleDaemonSetLabel2, 2)
addPods(manager.podStore.Store, "node-2", simpleDaemonSetLabel, 4)
addPods(manager.podStore.Store, "node-6", simpleDaemonSetLabel, 13)
addPods(manager.podStore.Store, "node-7", simpleDaemonSetLabel2, 4)
addPods(manager.podStore.Store, "node-9", simpleDaemonSetLabel, 1)
addPods(manager.podStore.Store, "node-9", simpleDaemonSetLabel2, 1)
addPods(manager.podStore.Indexer, "node-0", simpleDaemonSetLabel, 1)
addPods(manager.podStore.Indexer, "node-1", simpleDaemonSetLabel, 3)
addPods(manager.podStore.Indexer, "node-1", simpleDaemonSetLabel2, 2)
addPods(manager.podStore.Indexer, "node-2", simpleDaemonSetLabel, 4)
addPods(manager.podStore.Indexer, "node-6", simpleDaemonSetLabel, 13)
addPods(manager.podStore.Indexer, "node-7", simpleDaemonSetLabel2, 4)
addPods(manager.podStore.Indexer, "node-9", simpleDaemonSetLabel, 1)
addPods(manager.podStore.Indexer, "node-9", simpleDaemonSetLabel2, 1)
ds := newDaemonSet("foo")
ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel
manager.dsStore.Add(ds)
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/deployment/deployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller
},
)

dc.podStore.Store, dc.podController = framework.NewInformer(
dc.podStore.Indexer, dc.podController = framework.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return dc.client.Core().Pods(api.NamespaceAll).List(options)
Expand All @@ -160,6 +160,7 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller
UpdateFunc: dc.updatePod,
DeleteFunc: dc.deletePod,
},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)

dc.syncHandler = dc.syncDeployment
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/deployment/deployment_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ func (f *fixture) run(deploymentName string) {
c.rsStore.Store.Add(rs)
}
for _, pod := range f.podStore {
c.podStore.Store.Add(pod)
c.podStore.Indexer.Add(pod)
}

err := c.syncDeployment(deploymentName)
Expand Down