Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move watch filter into storage level #2665

Merged
merged 1 commit into from
Dec 10, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/registry/controller/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ package controller

import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)

// Registry is an interface for things that know how to store ReplicationControllers.
type Registry interface {
ListControllers(ctx api.Context) (*api.ReplicationControllerList, error)
WatchControllers(ctx api.Context, resourceVersion string) (watch.Interface, error)
WatchControllers(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error)
GetController(ctx api.Context, controllerID string) (*api.ReplicationController, error)
CreateController(ctx api.Context, controller *api.ReplicationController) error
UpdateController(ctx api.Context, controller *api.ReplicationController) error
Expand Down
36 changes: 1 addition & 35 deletions pkg/registry/controller/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,41 +146,7 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan apiserver.RE
// Watch returns ReplicationController events via a watch.Interface.
// It implements apiserver.ResourceWatcher.
func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
if !field.Empty() {
return nil, fmt.Errorf("no field selector implemented for controllers")
}
incoming, err := rs.registry.WatchControllers(ctx, resourceVersion)
if err != nil {
return nil, err
}
// TODO(lavalamp): remove watch.Filter, which is broken. Implement consistent way of filtering.
// TODO(lavalamp): this watch method needs a test.
return watch.Filter(incoming, func(e watch.Event) (watch.Event, bool) {
controller, ok := e.Object.(*api.ReplicationController)
if !ok {
// must be an error event-- pass it on
return e, true
}
match := label.Matches(labels.Set(controller.Labels))
if match {
rs.fillCurrentState(ctx, controller)
}
return e, match
}), nil
}

func (rs *REST) waitForController(ctx api.Context, controller *api.ReplicationController) (runtime.Object, error) {
for {
pods, err := rs.podLister.ListPods(ctx, labels.Set(controller.Spec.Selector).AsSelector())
if err != nil {
return controller, err
}
if len(pods.Items) == controller.Spec.Replicas {
break
}
time.Sleep(rs.pollPeriod)
}
return controller, nil
return rs.registry.WatchControllers(ctx, label, field, resourceVersion)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be more appropriate to fillCirrentState here via a filter instead of doing it in the registry. It's less clear of a case than pod status though, and having the registry do it (or via a wrapper to the registry) is also acceptable. I guess I'm fine with the inconsistency here for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've struggled a little bit, but didn't find a better alternative. I'm also fine with it for now.

}

func (rs *REST) fillCurrentState(ctx api.Context, controller *api.ReplicationController) error {
Expand Down
31 changes: 23 additions & 8 deletions pkg/registry/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,20 +122,20 @@ func (r *Registry) ListPodsPredicate(ctx api.Context, filter func(*api.Pod) bool
}

// WatchPods begins watching for new, changed, or deleted pods.
func (r *Registry) WatchPods(ctx api.Context, resourceVersion string, filter func(*api.Pod) bool) (watch.Interface, error) {
func (r *Registry) WatchPods(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
version, err := tools.ParseWatchResourceVersion(resourceVersion, "pod")
if err != nil {
return nil, err
}
key := makePodListKey(ctx)
return r.WatchList(key, version, func(obj runtime.Object) bool {
switch t := obj.(type) {
case *api.Pod:
return filter(t)
default:
// Must be an error
podObj, ok := obj.(*api.Pod)
if !ok {
// Must be an error: return true to propagate to upper level.
return true
}
fields := pod.PodToSelectableFields(podObj)
return label.Matches(labels.Set(podObj.Labels)) && field.Matches(fields)
})
}

Expand Down Expand Up @@ -327,13 +327,28 @@ func (r *Registry) ListControllers(ctx api.Context) (*api.ReplicationControllerL
}

// WatchControllers begins watching for new, changed, or deleted controllers.
func (r *Registry) WatchControllers(ctx api.Context, resourceVersion string) (watch.Interface, error) {
func (r *Registry) WatchControllers(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
if !field.Empty() {
return nil, fmt.Errorf("field selectors are not supported on replication controllers")
}
version, err := tools.ParseWatchResourceVersion(resourceVersion, "replicationControllers")
if err != nil {
return nil, err
}
key := makeControllerListKey(ctx)
return r.WatchList(key, version, tools.Everything)
return r.WatchList(key, version, func(obj runtime.Object) bool {
controller, ok := obj.(*api.ReplicationController)
if !ok {
// Must be an error: return true to propagate to upper level.
return true
}
match := label.Matches(labels.Set(controller.Labels))
if match {
pods, _ := r.ListPods(ctx, labels.Set(controller.Spec.Selector).AsSelector())
controller.Status.Replicas = len(pods.Items)
}
return match
})
}

// makeControllerListKey constructs etcd paths to controller directories enforcing namespace rules.
Expand Down
215 changes: 215 additions & 0 deletions pkg/registry/etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strconv"
"strings"
"testing"
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
Expand Down Expand Up @@ -705,6 +706,112 @@ func TestEtcdListPods(t *testing.T) {
}
}

func TestEtcdWatchPods(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient)
watching, err := registry.WatchPods(ctx,
labels.Everything(),
labels.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()

select {
case _, ok := <-watching.ResultChan():
if !ok {
t.Errorf("watching channel should be open")
}
default:
}
fakeClient.WatchInjectError <- nil
if _, ok := <-watching.ResultChan(); ok {
t.Errorf("watching channel should be closed")
}
watching.Stop()
}

func TestEtcdWatchPodsMatch(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient)
watching, err := registry.WatchPods(ctx,
labels.SelectorFromSet(labels.Set{"name": "foo"}),
labels.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()

pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Labels: map[string]string{
"name": "foo",
},
},
}
podBytes, _ := latest.Codec.Encode(pod)
fakeClient.WatchResponse <- &etcd.Response{
Action: "create",
Node: &etcd.Node{
Value: string(podBytes),
},
}
select {
case _, ok := <-watching.ResultChan():
if !ok {
t.Errorf("watching channel should be open")
}
case <-time.After(time.Millisecond * 100):
t.Error("unexpected timeout from result channel")
}
watching.Stop()
}

func TestEtcdWatchPodsNotMatch(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient)
watching, err := registry.WatchPods(ctx,
labels.SelectorFromSet(labels.Set{"name": "foo"}),
labels.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()

pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "bar",
Labels: map[string]string{
"name": "bar",
},
},
}
podBytes, _ := latest.Codec.Encode(pod)
fakeClient.WatchResponse <- &etcd.Response{
Action: "create",
Node: &etcd.Node{
Value: string(podBytes),
},
}

select {
case <-watching.ResultChan():
t.Error("unexpected result from result channel")
case <-time.After(time.Millisecond * 100):
// expected case
}
}

func TestEtcdListControllersNotFound(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
ctx := api.NewDefaultContext()
Expand Down Expand Up @@ -934,6 +1041,114 @@ func TestEtcdUpdateController(t *testing.T) {
}
}

func TestEtcdWatchController(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient)
watching, err := registry.WatchControllers(ctx,
labels.Everything(),
labels.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()

select {
case _, ok := <-watching.ResultChan():
if !ok {
t.Errorf("watching channel should be open")
}
default:
}
fakeClient.WatchInjectError <- nil
if _, ok := <-watching.ResultChan(); ok {
t.Errorf("watching channel should be closed")
}
watching.Stop()
}

func TestEtcdWatchControllersMatch(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.ExpectNotFoundGet(makePodListKey(ctx))
registry := NewTestEtcdRegistry(fakeClient)
watching, err := registry.WatchControllers(ctx,
labels.SelectorFromSet(labels.Set{"name": "foo"}),
labels.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()

controller := &api.ReplicationController{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Labels: map[string]string{
"name": "foo",
},
},
}
controllerBytes, _ := latest.Codec.Encode(controller)
fakeClient.WatchResponse <- &etcd.Response{
Action: "create",
Node: &etcd.Node{
Value: string(controllerBytes),
},
}
select {
case _, ok := <-watching.ResultChan():
if !ok {
t.Errorf("watching channel should be open")
}
case <-time.After(time.Millisecond * 100):
t.Error("unexpected timeout from result channel")
}
watching.Stop()
}

func TestEtcdWatchControllersNotMatch(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.ExpectNotFoundGet(makePodListKey(ctx))
registry := NewTestEtcdRegistry(fakeClient)
watching, err := registry.WatchControllers(ctx,
labels.SelectorFromSet(labels.Set{"name": "foo"}),
labels.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()

controller := &api.ReplicationController{
ObjectMeta: api.ObjectMeta{
Name: "bar",
Labels: map[string]string{
"name": "bar",
},
},
}
controllerBytes, _ := latest.Codec.Encode(controller)
fakeClient.WatchResponse <- &etcd.Response{
Action: "create",
Node: &etcd.Node{
Value: string(controllerBytes),
},
}

select {
case <-watching.ResultChan():
t.Error("unexpected result from result channel")
case <-time.After(time.Millisecond * 100):
// expected case
}
}

func TestEtcdListServices(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
Expand Down
2 changes: 1 addition & 1 deletion pkg/registry/pod/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Registry interface {
// ListPodsPredicate obtains a list of pods for which filter returns true.
ListPodsPredicate(ctx api.Context, filter func(*api.Pod) bool) (*api.PodList, error)
// Watch for new/changed/deleted pods
WatchPods(ctx api.Context, resourceVersion string, filter func(*api.Pod) bool) (watch.Interface, error)
WatchPods(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error)
// Get a specific pod
GetPod(ctx api.Context, podID string) (*api.Pod, error)
// Create a pod based on a specification.
Expand Down
7 changes: 4 additions & 3 deletions pkg/registry/pod/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) {
return pod, err
}

func (rs *REST) podToSelectableFields(pod *api.Pod) labels.Set {
func PodToSelectableFields(pod *api.Pod) labels.Set {

// TODO we are populating both Status and DesiredState because selectors are not aware of API versions
// see https://github.com/GoogleCloudPlatform/kubernetes/pull/2503
Expand All @@ -158,7 +158,7 @@ func (rs *REST) podToSelectableFields(pod *api.Pod) labels.Set {
// ListPods & WatchPods.
func (rs *REST) filterFunc(label, field labels.Selector) func(*api.Pod) bool {
return func(pod *api.Pod) bool {
fields := rs.podToSelectableFields(pod)
fields := PodToSelectableFields(pod)
return label.Matches(labels.Set(pod.Labels)) && field.Matches(fields)
}
}
Expand All @@ -184,7 +184,8 @@ func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Obj

// Watch begins watching for new, changed, or deleted pods.
func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
return rs.registry.WatchPods(ctx, resourceVersion, rs.filterFunc(label, field))
// TODO: Add pod status to watch command
return rs.registry.WatchPods(ctx, label, field, resourceVersion)
}

func (*REST) New() runtime.Object {
Expand Down