Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change factorization of listers to make them easier to add #32877

Merged
merged 1 commit into from Sep 17, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
117 changes: 40 additions & 77 deletions pkg/client/cache/listers.go
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/apps"
"k8s.io/kubernetes/pkg/apis/batch"
Expand All @@ -31,104 +32,66 @@ import (
"k8s.io/kubernetes/pkg/labels"
)

// TODO: generate these classes and methods for all resources of interest using
// a script. Can use "go generate" once 1.4 is supported by all users.

// StoreToPodLister makes a Store have the List method of the client.PodInterface
// The Store must contain (only) Pods.
//
// Example:
// s := cache.NewStore()
// lw := cache.ListWatch{Client: c, FieldSelector: sel, Resource: "pods"}
// r := cache.NewReflector(lw, &api.Pod{}, s).Run()
// l := StoreToPodLister{s}
// l.List()
type StoreToPodLister struct {
Indexer
}
type AppendFunc func(interface{})

// Please note that selector is filtering among the pods that have gotten into
// the store; there may have been some filtering that already happened before
// that.
// We explicitly don't return api.PodList, to avoid expensive allocations, which
// in most cases are unnecessary.
func (s *StoreToPodLister) List(selector labels.Selector) (pods []*api.Pod, err error) {
for _, m := range s.Indexer.List() {
pod := m.(*api.Pod)
if selector.Matches(labels.Set(pod.Labels)) {
pods = append(pods, pod)
func ListAll(store Store, selector labels.Selector, appendFn AppendFunc) error {
for _, m := range store.List() {
metadata, err := meta.Accessor(m)
if err != nil {
return err
}
if selector.Matches(labels.Set(metadata.GetLabels())) {
appendFn(m)
}
}
return pods, nil
}

// Pods is taking baby steps to be more like the api in pkg/client
func (s *StoreToPodLister) Pods(namespace string) storePodsNamespacer {
return storePodsNamespacer{s.Indexer, namespace}
return nil
}

type storePodsNamespacer struct {
indexer Indexer
namespace string
}

// Please note that selector is filtering among the pods that have gotten into
// the store; there may have been some filtering that already happened before
// that.
// We explicitly don't return api.PodList, to avoid expensive allocations, which
// in most cases are unnecessary.
func (s storePodsNamespacer) List(selector labels.Selector) (pods []*api.Pod, err error) {
if s.namespace == api.NamespaceAll {
for _, m := range s.indexer.List() {
pod := m.(*api.Pod)
if selector.Matches(labels.Set(pod.Labels)) {
pods = append(pods, pod)
func ListAllByNamespace(indexer Indexer, namespace string, selector labels.Selector, appendFn AppendFunc) error {
if namespace == api.NamespaceAll {
for _, m := range indexer.List() {
metadata, err := meta.Accessor(m)
if err != nil {
return err
}
if selector.Matches(labels.Set(metadata.GetLabels())) {
appendFn(m)
}
}
return pods, nil
return nil
}

key := &api.Pod{ObjectMeta: api.ObjectMeta{Namespace: s.namespace}}
items, err := s.indexer.Index(NamespaceIndex, key)
items, err := indexer.Index(NamespaceIndex, api.ObjectMeta{Namespace: namespace})
if err != nil {
// Ignore error; do slow search without index.
glog.Warningf("can not retrieve list of objects using index : %v", err)
for _, m := range s.indexer.List() {
pod := m.(*api.Pod)
if s.namespace == pod.Namespace && selector.Matches(labels.Set(pod.Labels)) {
pods = append(pods, pod)
for _, m := range indexer.List() {
metadata, err := meta.Accessor(m)
if err != nil {
return err
}
if metadata.GetNamespace() == namespace && selector.Matches(labels.Set(metadata.GetLabels())) {
appendFn(m)
}

}
return pods, nil
return nil
}
for _, m := range items {
pod := m.(*api.Pod)
if selector.Matches(labels.Set(pod.Labels)) {
pods = append(pods, pod)
metadata, err := meta.Accessor(m)
if err != nil {
return err
}
if selector.Matches(labels.Set(metadata.GetLabels())) {
appendFn(m)
}
}
return pods, nil
}

func (s storePodsNamespacer) Get(name string) (*api.Pod, error) {
obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name)
if err != nil {
return nil, err
}
if !exists {
return nil, errors.NewNotFound(api.Resource("pod"), name)
}
return obj.(*api.Pod), nil
return nil
}

// Exists returns true if a pod matching the namespace/name of the given pod exists in the store.
func (s *StoreToPodLister) Exists(pod *api.Pod) (bool, error) {
_, exists, err := s.Indexer.Get(pod)
if err != nil {
return false, err
}
return exists, nil
}
// TODO: generate these classes and methods for all resources of interest using
// a script. Can use "go generate" once 1.4 is supported by all users.

// NodeConditionPredicate is a function that indicates whether the given node's conditions meet
// some set of criteria defined by the function.
Expand Down
73 changes: 73 additions & 0 deletions pkg/client/cache/listers_core.go
@@ -0,0 +1,73 @@
/*
Copyright 2016 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cache

import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/labels"
)

// TODO: generate these classes and methods for all resources of interest using
// a script. Can use "go generate" once 1.4 is supported by all users.

// StoreToPodLister makes a Store have the List method of the client.PodInterface
// The Store must contain (only) Pods.
//
// Example:
// s := cache.NewStore()
// lw := cache.ListWatch{Client: c, FieldSelector: sel, Resource: "pods"}
// r := cache.NewReflector(lw, &api.Pod{}, s).Run()
// l := StoreToPodLister{s}
// l.List()
type StoreToPodLister struct {
Indexer Indexer
}

func (s *StoreToPodLister) List(selector labels.Selector) (pods []*api.Pod, err error) {
err = ListAll(s.Indexer, selector, func(m interface{}) {
pods = append(pods, m.(*api.Pod))
})
return pods, err
}

func (s *StoreToPodLister) Pods(namespace string) storePodsNamespacer {
return storePodsNamespacer{Indexer: s.Indexer, namespace: namespace}
}

type storePodsNamespacer struct {
Indexer Indexer
namespace string
}

func (s storePodsNamespacer) List(selector labels.Selector) (pods []*api.Pod, err error) {
err = ListAllByNamespace(s.Indexer, s.namespace, selector, func(m interface{}) {
pods = append(pods, m.(*api.Pod))
})
return pods, err
}

func (s storePodsNamespacer) Get(name string) (*api.Pod, error) {
obj, exists, err := s.Indexer.GetByKey(s.namespace + "/" + name)
if err != nil {
return nil, err
}
if !exists {
return nil, errors.NewNotFound(api.Resource("pod"), name)
}
return obj.(*api.Pod), nil
}
17 changes: 6 additions & 11 deletions pkg/client/cache/listers_test.go
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/apis/extensions"
Expand Down Expand Up @@ -700,8 +701,9 @@ func TestStoreToPodLister(t *testing.T) {
for _, id := range ids {
store.Add(&api.Pod{
ObjectMeta: api.ObjectMeta{
Name: id,
Labels: map[string]string{"name": id},
Namespace: "other",
Name: id,
Labels: map[string]string{"name": id},
},
})
}
Expand Down Expand Up @@ -739,20 +741,13 @@ func TestStoreToPodLister(t *testing.T) {
continue
}

exists, err := spl.Exists(&api.Pod{ObjectMeta: api.ObjectMeta{Name: id}})
_, err = spl.Pods("other").Get(id)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if !exists {
t.Errorf("exists returned false for %v", id)
}
}

exists, err := spl.Exists(&api.Pod{ObjectMeta: api.ObjectMeta{Name: "qux"}})
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if exists {
if _, err := spl.Pods("").Get("qux"); !apierrors.IsNotFound(err) {
t.Error("Unexpected pod exists")
}
}
Expand Down
16 changes: 8 additions & 8 deletions pkg/controller/daemon/daemoncontroller_test.go
Expand Up @@ -251,7 +251,7 @@ func TestInsufficentCapacityNodeDaemonDoesNotLaunchPod(t *testing.T) {
node := newNode("too-much-mem", nil)
node.Status.Allocatable = allocatableResources("100M", "200m")
manager.nodeStore.Add(node)
manager.podStore.Add(&api.Pod{
manager.podStore.Indexer.Add(&api.Pod{
Spec: podSpec,
})
ds := newDaemonSet("foo")
Expand All @@ -266,7 +266,7 @@ func TestSufficentCapacityWithTerminatedPodsDaemonLaunchesPod(t *testing.T) {
node := newNode("too-much-mem", nil)
node.Status.Allocatable = allocatableResources("100M", "200m")
manager.nodeStore.Add(node)
manager.podStore.Add(&api.Pod{
manager.podStore.Indexer.Add(&api.Pod{
Spec: podSpec,
Status: api.PodStatus{Phase: api.PodSucceeded},
})
Expand All @@ -283,7 +283,7 @@ func TestSufficentCapacityNodeDaemonLaunchesPod(t *testing.T) {
node := newNode("not-too-much-mem", nil)
node.Status.Allocatable = allocatableResources("200M", "200m")
manager.nodeStore.Add(node)
manager.podStore.Add(&api.Pod{
manager.podStore.Indexer.Add(&api.Pod{
Spec: podSpec,
})
ds := newDaemonSet("foo")
Expand All @@ -299,7 +299,7 @@ func TestDontDoAnythingIfBeingDeleted(t *testing.T) {
node := newNode("not-too-much-mem", nil)
node.Status.Allocatable = allocatableResources("200M", "200m")
manager.nodeStore.Add(node)
manager.podStore.Add(&api.Pod{
manager.podStore.Indexer.Add(&api.Pod{
Spec: podSpec,
})
ds := newDaemonSet("foo")
Expand All @@ -323,7 +323,7 @@ func TestPortConflictNodeDaemonDoesNotLaunchPod(t *testing.T) {
manager, podControl := newTestController()
node := newNode("port-conflict", nil)
manager.nodeStore.Add(node)
manager.podStore.Add(&api.Pod{
manager.podStore.Indexer.Add(&api.Pod{
Spec: podSpec,
})

Expand All @@ -349,7 +349,7 @@ func TestPortConflictWithSameDaemonPodDoesNotDeletePod(t *testing.T) {
manager, podControl := newTestController()
node := newNode("port-conflict", nil)
manager.nodeStore.Add(node)
manager.podStore.Add(&api.Pod{
manager.podStore.Indexer.Add(&api.Pod{
ObjectMeta: api.ObjectMeta{
Labels: simpleDaemonSetLabel,
Namespace: api.NamespaceDefault,
Expand Down Expand Up @@ -383,7 +383,7 @@ func TestNoPortConflictNodeDaemonLaunchesPod(t *testing.T) {
manager, podControl := newTestController()
node := newNode("no-port-conflict", nil)
manager.nodeStore.Add(node)
manager.podStore.Add(&api.Pod{
manager.podStore.Indexer.Add(&api.Pod{
Spec: podSpec1,
})
ds := newDaemonSet("foo")
Expand All @@ -399,7 +399,7 @@ func TestPodIsNotDeletedByDaemonsetWithEmptyLabelSelector(t *testing.T) {
manager, podControl := newTestController()
manager.nodeStore.Store.Add(newNode("node1", nil))
// Create pod not controlled by a daemonset.
manager.podStore.Add(&api.Pod{
manager.podStore.Indexer.Add(&api.Pod{
ObjectMeta: api.ObjectMeta{
Labels: map[string]string{"bang": "boom"},
Namespace: api.NamespaceDefault,
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/replicaset/replica_set_test.go
Expand Up @@ -779,7 +779,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
// has exactly one expectation at the end, to verify that we
// don't double delete.
for i := range podsToDelete[1:] {
manager.podStore.Delete(podsToDelete[i])
manager.podStore.Indexer.Delete(podsToDelete[i])
manager.deletePod(podsToDelete[i])
}
podExp, exists, err := manager.expectations.GetExpectations(rsKey)
Expand Down