Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make ListWatch work with a ListFunc and WatchFunc #4453

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
46 changes: 25 additions & 21 deletions pkg/client/cache/listwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,33 +23,37 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)

// ListFunc knows how to list resources
type ListFunc func() (runtime.Object, error)

// WatchFunc knows how to watch resources
type WatchFunc func(resourceVersion string) (watch.Interface, error)

// ListWatch knows how to list and watch a set of apiserver resources. It satisfies the ListerWatcher interface.
// It is a convenience function for users of NewReflector, etc. Client must not be nil.
// It is a convenience function for users of NewReflector, etc.
// ListFunc and WatchFunc must not be nil
type ListWatch struct {
Client *client.Client
FieldSelector labels.Selector
Resource string
Namespace string
ListFunc ListFunc
WatchFunc WatchFunc
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Formatting?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Format looks good to me. It just has the two member fields and they are aligned.

}

// ListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector
func NewListWatchFromClient(client *client.Client, resource string, namespace string, fieldSelector labels.Selector) *ListWatch {
listFunc := func() (runtime.Object, error) {
return client.Get().Namespace(namespace).Resource(resource).SelectorParam("fields", fieldSelector).Do().Get()
}
watchFunc := func(resourceVersion string) (watch.Interface, error) {
return client.Get().Prefix("watch").Namespace(namespace).Resource(resource).SelectorParam("fields", fieldSelector).Param("resourceVersion", resourceVersion).Watch()
}
return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}

// ListWatch knows how to list and watch a set of apiserver resources.
// List a set of apiserver resources
func (lw *ListWatch) List() (runtime.Object, error) {
return lw.Client.
Get().
Namespace(lw.Namespace).
Resource(lw.Resource).
SelectorParam("fields", lw.FieldSelector).
Do().
Get()
return lw.ListFunc()
}

// Watch a set of apiserver resources
func (lw *ListWatch) Watch(resourceVersion string) (watch.Interface, error) {
return lw.Client.
Get().
Prefix("watch").
Namespace(lw.Namespace).
Resource(lw.Resource).
SelectorParam("fields", lw.FieldSelector).
Param("resourceVersion", resourceVersion).
Watch()
return lw.WatchFunc(resourceVersion)
}
98 changes: 47 additions & 51 deletions pkg/client/cache/listwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,33 +72,31 @@ func buildLocation(resourcePath string, query url.Values) string {

func TestListWatchesCanList(t *testing.T) {
table := []struct {
location string
lw ListWatch
location string
resource string
namespace string
fieldSelector labels.Selector
}{
// Minion
{
location: buildLocation(buildResourcePath("", api.NamespaceAll, "minions"), buildQueryValues(api.NamespaceAll, nil)),
lw: ListWatch{
FieldSelector: parseSelectorOrDie(""),
Resource: "minions",
},
location: buildLocation(buildResourcePath("", api.NamespaceAll, "minions"), buildQueryValues(api.NamespaceAll, nil)),
resource: "minions",
namespace: api.NamespaceAll,
fieldSelector: parseSelectorOrDie(""),
},
// pod with "assigned" field selector.
{
location: buildLocation(buildResourcePath("", api.NamespaceAll, "pods"), buildQueryValues(api.NamespaceAll, url.Values{"fields": []string{"DesiredState.Host="}})),
lw: ListWatch{
FieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(),
Resource: "pods",
},
location: buildLocation(buildResourcePath("", api.NamespaceAll, "pods"), buildQueryValues(api.NamespaceAll, url.Values{"fields": []string{"DesiredState.Host="}})),
resource: "pods",
namespace: api.NamespaceAll,
fieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(),
},
// pod in namespace "foo"
{
location: buildLocation(buildResourcePath("", "foo", "pods"), buildQueryValues("foo", url.Values{"fields": []string{"DesiredState.Host="}})),
lw: ListWatch{
FieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(),
Resource: "pods",
Namespace: "foo",
},
location: buildLocation(buildResourcePath("", "foo", "pods"), buildQueryValues("foo", url.Values{"fields": []string{"DesiredState.Host="}})),
resource: "pods",
namespace: "foo",
fieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(),
},
}
for _, item := range table {
Expand All @@ -109,54 +107,52 @@ func TestListWatchesCanList(t *testing.T) {
}
server := httptest.NewServer(&handler)
defer server.Close()
item.lw.Client = client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()})
client := client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()})
lw := NewListWatchFromClient(client, item.resource, item.namespace, item.fieldSelector)
// This test merely tests that the correct request is made.
item.lw.List()
lw.List()
handler.ValidateRequest(t, item.location, "GET", nil)
}
}

func TestListWatchesCanWatch(t *testing.T) {
table := []struct {
rv string
location string
lw ListWatch
rv string
location string
resource string
namespace string
fieldSelector labels.Selector
}{
// Minion
{
location: buildLocation(buildResourcePath("watch", api.NamespaceAll, "minions"), buildQueryValues(api.NamespaceAll, url.Values{"resourceVersion": []string{""}})),
rv: "",
lw: ListWatch{
FieldSelector: parseSelectorOrDie(""),
Resource: "minions",
},
location: buildLocation(buildResourcePath("watch", api.NamespaceAll, "minions"), buildQueryValues(api.NamespaceAll, url.Values{"resourceVersion": []string{""}})),
rv: "",
resource: "minions",
namespace: api.NamespaceAll,
fieldSelector: parseSelectorOrDie(""),
},
{
location: buildLocation(buildResourcePath("watch", api.NamespaceAll, "minions"), buildQueryValues(api.NamespaceAll, url.Values{"resourceVersion": []string{"42"}})),
rv: "42",
lw: ListWatch{
FieldSelector: parseSelectorOrDie(""),
Resource: "minions",
},
location: buildLocation(buildResourcePath("watch", api.NamespaceAll, "minions"), buildQueryValues(api.NamespaceAll, url.Values{"resourceVersion": []string{"42"}})),
rv: "42",
resource: "minions",
namespace: api.NamespaceAll,
fieldSelector: parseSelectorOrDie(""),
},
// pod with "assigned" field selector.
{
location: buildLocation(buildResourcePath("watch", api.NamespaceAll, "pods"), buildQueryValues(api.NamespaceAll, url.Values{"fields": []string{"DesiredState.Host="}, "resourceVersion": []string{"0"}})),
rv: "0",
lw: ListWatch{
FieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(),
Resource: "pods",
},
location: buildLocation(buildResourcePath("watch", api.NamespaceAll, "pods"), buildQueryValues(api.NamespaceAll, url.Values{"fields": []string{"DesiredState.Host="}, "resourceVersion": []string{"0"}})),
rv: "0",
resource: "pods",
namespace: api.NamespaceAll,
fieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(),
},
// pod with namespace foo and assigned field selector
{
location: buildLocation(buildResourcePath("watch", "foo", "pods"), buildQueryValues("foo", url.Values{"fields": []string{"DesiredState.Host="}, "resourceVersion": []string{"0"}})),
rv: "0",
lw: ListWatch{
FieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(),
Resource: "pods",
Namespace: "foo",
},
location: buildLocation(buildResourcePath("watch", "foo", "pods"), buildQueryValues("foo", url.Values{"fields": []string{"DesiredState.Host="}, "resourceVersion": []string{"0"}})),
rv: "0",
resource: "pods",
namespace: "foo",
fieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(),
},
}

Expand All @@ -168,10 +164,10 @@ func TestListWatchesCanWatch(t *testing.T) {
}
server := httptest.NewServer(&handler)
defer server.Close()
item.lw.Client = client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()})

client := client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()})
lw := NewListWatchFromClient(client, item.resource, item.namespace, item.fieldSelector)
// This test merely tests that the correct request is made.
item.lw.Watch(item.rv)
lw.Watch(item.rv)
handler.ValidateRequest(t, item.location, "GET", nil)
}
}
6 changes: 1 addition & 5 deletions pkg/kubelet/config/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,7 @@ import (

// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
func NewSourceApiserver(client *client.Client, hostname string, updates chan<- interface{}) {
lw := &cache.ListWatch{
Client: client,
FieldSelector: labels.OneTermEqualSelector("Status.Host", hostname),
Resource: "pods",
}
lw := cache.NewListWatchFromClient(client, "pods", api.NamespaceAll, labels.OneTermEqualSelector("Status.Host", hostname))
newSourceApiserverFromLW(lw, updates)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func NewMainKubelet(

serviceStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
if kubeClient != nil {
cache.NewReflector(&cache.ListWatch{kubeClient, labels.Everything(), "services", api.NamespaceAll}, &api.Service{}, serviceStore).Run()
cache.NewReflector(cache.NewListWatchFromClient(kubeClient, "services", api.NamespaceAll, labels.Everything()), &api.Service{}, serviceStore).Run()
}
serviceLister := &cache.StoreToServiceLister{serviceStore}

Expand Down
11 changes: 8 additions & 3 deletions plugin/pkg/admission/namespace/autoprovision/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)

func init() {
Expand Down Expand Up @@ -79,9 +81,12 @@ func NewProvision(c client.Interface) admission.Interface {
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
reflector := cache.NewReflector(
&cache.ListWatch{
Client: c.(*client.Client),
FieldSelector: labels.Everything(),
Resource: "namespaces",
ListFunc: func() (runtime.Object, error) {
return c.Namespaces().List(labels.Everything())
},
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
return c.Namespaces().Watch(labels.Everything(), labels.Everything(), resourceVersion)
},
},
&api.Namespace{},
store,
Expand Down
11 changes: 8 additions & 3 deletions plugin/pkg/admission/namespace/exists/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)

func init() {
Expand Down Expand Up @@ -83,9 +85,12 @@ func NewExists(c client.Interface) admission.Interface {
// TODO: look into a list/watch that can work with client.Interface, maybe pass it a ListFunc and a WatchFunc
reflector := cache.NewReflector(
&cache.ListWatch{
Client: c.(*client.Client),
FieldSelector: labels.Everything(),
Resource: "namespaces",
ListFunc: func() (runtime.Object, error) {
return c.Namespaces().List(labels.Everything())
},
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
return c.Namespaces().Watch(labels.Everything(), labels.Everything(), resourceVersion)
},
},
&api.Namespace{},
store,
Expand Down
24 changes: 4 additions & 20 deletions plugin/pkg/scheduler/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe
// createUnassignedPodLW returns a cache.ListWatch that finds all pods that need to be
// scheduled.
func (factory *ConfigFactory) createUnassignedPodLW() *cache.ListWatch {
return &cache.ListWatch{
Client: factory.Client,
FieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(),
Resource: "pods",
}
return cache.NewListWatchFromClient(factory.Client, "pods", api.NamespaceAll, labels.Set{"DesiredState.Host": ""}.AsSelector())
}

func parseSelectorOrDie(s string) labels.Selector {
Expand All @@ -162,20 +158,12 @@ func parseSelectorOrDie(s string) labels.Selector {
// already scheduled.
// TODO: return a ListerWatcher interface instead?
func (factory *ConfigFactory) createAssignedPodLW() *cache.ListWatch {
return &cache.ListWatch{
Client: factory.Client,
FieldSelector: parseSelectorOrDie("DesiredState.Host!="),
Resource: "pods",
}
return cache.NewListWatchFromClient(factory.Client, "pods", api.NamespaceAll, parseSelectorOrDie("DesiredState.Host!="))
}

// createMinionLW returns a cache.ListWatch that gets all changes to minions.
func (factory *ConfigFactory) createMinionLW() *cache.ListWatch {
return &cache.ListWatch{
Client: factory.Client,
FieldSelector: parseSelectorOrDie(""),
Resource: "minions",
}
return cache.NewListWatchFromClient(factory.Client, "minions", api.NamespaceAll, parseSelectorOrDie(""))
}

// pollMinions lists all minions and filter out unhealthy ones, then returns
Expand Down Expand Up @@ -215,11 +203,7 @@ func (factory *ConfigFactory) pollMinions() (cache.Enumerator, error) {

// createServiceLW returns a cache.ListWatch that gets all changes to services.
func (factory *ConfigFactory) createServiceLW() *cache.ListWatch {
return &cache.ListWatch{
Client: factory.Client,
FieldSelector: parseSelectorOrDie(""),
Resource: "services",
}
return cache.NewListWatchFromClient(factory.Client, "services", api.NamespaceAll, parseSelectorOrDie(""))
}

func (factory *ConfigFactory) makeDefaultErrorFunc(backoff *podBackoff, podQueue *cache.FIFO) func(pod *api.Pod, err error) {
Expand Down