Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kubelet watching only its own Node object #6349

Merged
merged 1 commit into from
Apr 2, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 13 additions & 0 deletions pkg/api/v1beta1/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -1545,6 +1545,19 @@ func init() {
// If one of the conversion functions is malformed, detect it immediately.
panic(err)
}
err = newer.Scheme.AddFieldLabelConversionFunc("v1beta1", "Minion",
func(label, value string) (string, string, error) {
switch label {
case "name":
return "name", value, nil
default:
return "", "", fmt.Errorf("field label not supported: %s", label)
}
})
if err != nil {
// If one of the conversion functions is malformed, detect it immediately.
panic(err)
}
err = newer.Scheme.AddFieldLabelConversionFunc("v1beta1", "ReplicationController",
func(label, value string) (string, string, error) {
switch label {
Expand Down
13 changes: 13 additions & 0 deletions pkg/api/v1beta2/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -1470,6 +1470,19 @@ func init() {
// If one of the conversion functions is malformed, detect it immediately.
panic(err)
}
err = newer.Scheme.AddFieldLabelConversionFunc("v1beta2", "Minion",
func(label, value string) (string, string, error) {
switch label {
case "name":
return "name", value, nil
default:
return "", "", fmt.Errorf("field label not supported: %s", label)
}
})
if err != nil {
// if one of the conversion functions is malformed, detect it immediately.
panic(err)
}
err = newer.Scheme.AddFieldLabelConversionFunc("v1beta2", "ReplicationController",
func(label, value string) (string, string, error) {
switch label {
Expand Down
13 changes: 13 additions & 0 deletions pkg/api/v1beta3/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,19 @@ func init() {
// If one of the conversion functions is malformed, detect it immediately.
panic(err)
}
err = newer.Scheme.AddFieldLabelConversionFunc("v1beta3", "Minion",
func(label, value string) (string, string, error) {
switch label {
case "name":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewers: just name by itself is incorrect. The field labels should match the access you would do to get the field in the object. "name" in v1beta3 is stored in .metadata.name. This is an api error.

return label, value, nil
default:
return "", "", fmt.Errorf("field label not supported: %s", label)
}
})
if err != nil {
// If one of the conversion functions is malformed, detect it immediately.
panic(err)
}
err = newer.Scheme.AddFieldLabelConversionFunc("v1beta3", "ReplicationController",
func(label, value string) (string, string, error) {
switch label {
Expand Down
9 changes: 3 additions & 6 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,17 +179,14 @@ func NewMainKubelet(
if kubeClient != nil {
// TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather
// than an interface. There is no way to construct a list+watcher using resource name.
fieldSelector := fields.Set{"name": hostname}.AsSelector()
listWatch := &cache.ListWatch{
// TODO: currently, we are watching all nodes. To make it more efficient,
// we should be watching only a node with Name equal to kubelet's Hostname.
// To make it possible, we need to add field selector to ListFunc and WatchFunc,
// and selection by field needs to be implemented in WatchMinions function in pkg/registry/etcd.
ListFunc: func() (runtime.Object, error) {
// TODO: Use List() with fieldSelector when it is supported.
return kubeClient.Nodes().List()
},
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
return kubeClient.Nodes().Watch(
labels.Everything(), fields.Everything(), resourceVersion)
return kubeClient.Nodes().Watch(labels.Everything(), fieldSelector, resourceVersion)
},
}
cache.NewReflector(listWatch, &api.Node{}, nodeStore, 0).Run()
Expand Down
20 changes: 17 additions & 3 deletions pkg/registry/generic/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ type Etcd struct {
// Called for Create/Update/Get/Delete
KeyFunc func(ctx api.Context, name string) (string, error)

// If field.Selector of Watch contains a label with such name, this will be
// translated to watching a single object (not all objects of that type).
WatchSingleFieldName string

// Called to get the name of an object
ObjectNameFunc func(obj runtime.Object) (string, error)

Expand Down Expand Up @@ -404,19 +408,29 @@ func (e *Etcd) finalizeDelete(obj runtime.Object, runHooks bool) (runtime.Object
}

// WatchPredicate starts a watch for the items that m matches.
// TODO: Detect if m references a single object instead of a list.
func (e *Etcd) Watch(ctx api.Context, label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) {
return e.WatchPredicate(ctx, e.PredicateFunc(label, field), resourceVersion)
if value, found := field.RequiresExactMatch(e.WatchSingleFieldName); found && len(e.WatchSingleFieldName) > 0 {
key, err := e.KeyFunc(ctx, value)
if err != nil {
return nil, err
}
return e.watchPredicate(key, e.PredicateFunc(label, field), resourceVersion)
}
return e.watchPredicate(e.KeyRootFunc(ctx), e.PredicateFunc(label, field), resourceVersion)
}

// WatchPredicate starts a watch for the items that m matches.
// TODO: Detect if m references a single object instead of a list.
func (e *Etcd) WatchPredicate(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) {
return e.watchPredicate(e.KeyRootFunc(ctx), m, resourceVersion)
}

func (e *Etcd) watchPredicate(key string, m generic.Matcher, resourceVersion string) (watch.Interface, error) {
version, err := tools.ParseWatchResourceVersion(resourceVersion, e.EndpointName)
if err != nil {
return nil, err
}
return e.Helper.WatchList(e.KeyRootFunc(ctx), version, func(obj runtime.Object) bool {
return e.Helper.WatchList(key, version, func(obj runtime.Object) bool {
matches, err := m.Matches(obj)
if err != nil {
glog.Errorf("unable to match watch: %v", err)
Expand Down
1 change: 1 addition & 0 deletions pkg/registry/minion/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func NewStorage(h tools.EtcdHelper, connection client.ConnectionInfoGetter) *RES
KeyFunc: func(ctx api.Context, name string) (string, error) {
return prefix + "/" + name, nil
},
WatchSingleFieldName: "name",
ObjectNameFunc: func(obj runtime.Object) (string, error) {
return obj.(*api.Node).Name, nil
},
Expand Down
58 changes: 58 additions & 0 deletions pkg/registry/minion/etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ import (
"github.com/coreos/go-etcd/etcd"
)

const (
PASS = iota
FAIL
)

type fakeConnectionInfoGetter struct {
}

Expand Down Expand Up @@ -342,6 +347,59 @@ func TestEtcdWatchNodesMatch(t *testing.T) {
watching.Stop()
}

func TestEtcdWatchNodesFields(t *testing.T) {
ctx := api.NewDefaultContext()
storage, fakeClient := newStorage(t)
node := validNewNode()
nodeBytes, _ := latest.Codec.Encode(node)

testFieldMap := map[int][]fields.Set{
PASS: {
{"name": "foo"},
},
FAIL: {
{"name": "bar"},
},
}

for _, singleWatchField := range []string{"", "name"} {
storage.WatchSingleFieldName = singleWatchField
for expectedResult, fieldSet := range testFieldMap {
for _, field := range fieldSet {
watching, err := storage.Watch(ctx,
labels.Everything(),
field.AsSelector(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
fakeClient.WatchResponse <- &etcd.Response{
Action: "create",
Node: &etcd.Node{
Value: string(nodeBytes),
},
}
select {
case r, ok := <-watching.ResultChan():
if expectedResult == FAIL {
t.Errorf("unexpected result from channel %#v", r)
}
if !ok {
t.Errorf("watching channel should be open")
}
case <-time.After(time.Millisecond * 100):
if expectedResult == PASS {
t.Errorf("unexpected timeout from result channel")
}
}
watching.Stop()
}
}
}
}

func TestEtcdWatchNodesNotMatch(t *testing.T) {
ctx := api.NewDefaultContext()
storage, fakeClient := newStorage(t)
Expand Down
11 changes: 9 additions & 2 deletions pkg/registry/minion/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,22 @@ type ResourceGetter interface {
Get(api.Context, string) (runtime.Object, error)
}

// NodeToSelectableFields returns a label set that represents the object.
func NodeToSelectableFields(node *api.Node) labels.Set {
return labels.Set{
"name": node.Name,
}
}

// MatchNode returns a generic matcher for a given label and field selector.
func MatchNode(label labels.Selector, field fields.Selector) generic.Matcher {
return generic.MatcherFunc(func(obj runtime.Object) (bool, error) {
nodeObj, ok := obj.(*api.Node)
if !ok {
return false, fmt.Errorf("not a node")
}
// TODO: Add support for filtering based on field, once NodeStatus is defined.
return label.Matches(labels.Set(nodeObj.Labels)), nil
fields := NodeToSelectableFields(nodeObj)
return label.Matches(labels.Set(nodeObj.Labels)) && field.Matches(fields), nil
})
}

Expand Down