Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor change: move watch.ListWatchUntil to its own package #35673

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
76 changes: 76 additions & 0 deletions pkg/client/cache/listwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,22 @@ import (
"time"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
)

// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {
// List should return a list type object; the Items field will be extracted, and the
// ResourceVersion field will be used to start the watch in the right place.
List(options api.ListOptions) (runtime.Object, error)
// Watch should begin a watch at the specified version.
Watch(options api.ListOptions) (watch.Interface, error)
}

// ListFunc knows how to list resources
type ListFunc func(options api.ListOptions) (runtime.Object, error)

Expand Down Expand Up @@ -84,3 +94,69 @@ func (lw *ListWatch) List(options api.ListOptions) (runtime.Object, error) {
func (lw *ListWatch) Watch(options api.ListOptions) (watch.Interface, error) {
return lw.WatchFunc(options)
}

// TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until.
func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watch.ConditionFunc) (*watch.Event, error) {
if len(conditions) == 0 {
return nil, nil
}

list, err := lw.List(api.ListOptions{})
if err != nil {
return nil, err
}
initialItems, err := meta.ExtractList(list)
if err != nil {
return nil, err
}

// use the initial items as simulated "adds"
var lastEvent *watch.Event
currIndex := 0
passedConditions := 0
for _, condition := range conditions {
// check the next condition against the previous event and short circuit waiting for the next watch
if lastEvent != nil {
done, err := condition(*lastEvent)
if err != nil {
return lastEvent, err
}
if done {
passedConditions = passedConditions + 1
continue
}
}

ConditionSucceeded:
for currIndex < len(initialItems) {
lastEvent = &watch.Event{Type: watch.Added, Object: initialItems[currIndex]}
currIndex++

done, err := condition(*lastEvent)
if err != nil {
return lastEvent, err
}
if done {
passedConditions = passedConditions + 1
break ConditionSucceeded
}
}
}
if passedConditions == len(conditions) {
return lastEvent, nil
}
remainingConditions := conditions[passedConditions:]

metaObj, err := meta.ListAccessor(list)
if err != nil {
return nil, err
}
currResourceVersion := metaObj.GetResourceVersion()

watchInterface, err := lw.Watch(api.ListOptions{ResourceVersion: currResourceVersion})
if err != nil {
return nil, err
}

return watch.Until(timeout, watchInterface, remainingConditions...)
}
54 changes: 54 additions & 0 deletions pkg/client/cache/listwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"net/http/httptest"
"net/url"
"testing"
"time"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
Expand All @@ -28,7 +29,9 @@ import (
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/runtime"
utiltesting "k8s.io/kubernetes/pkg/util/testing"
"k8s.io/kubernetes/pkg/watch"
)

func parseSelectorOrDie(s string) fields.Selector {
Expand Down Expand Up @@ -171,3 +174,54 @@ func TestListWatchesCanWatch(t *testing.T) {
handler.ValidateRequest(t, item.location, "GET", nil)
}
}

type lw struct {
list runtime.Object
watch watch.Interface
}

func (w lw) List(options api.ListOptions) (runtime.Object, error) {
return w.list, nil
}

func (w lw) Watch(options api.ListOptions) (watch.Interface, error) {
return w.watch, nil
}

func TestListWatchUntil(t *testing.T) {
fw := watch.NewFake()
go func() {
var obj *api.Pod
fw.Modify(obj)
}()
listwatch := lw{
list: &api.PodList{Items: []api.Pod{{}}},
watch: fw,
}

conditions := []watch.ConditionFunc{
func(event watch.Event) (bool, error) {
t.Logf("got %#v", event)
return event.Type == watch.Added, nil
},
func(event watch.Event) (bool, error) {
t.Logf("got %#v", event)
return event.Type == watch.Modified, nil
},
}

timeout := 10 * time.Second
lastEvent, err := ListWatchUntil(timeout, listwatch, conditions...)
if err != nil {
t.Fatalf("expected nil error, got %#v", err)
}
if lastEvent == nil {
t.Fatal("expected an event")
}
if lastEvent.Type != watch.Modified {
t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type)
}
if got, isPod := lastEvent.Object.(*api.Pod); !isPod {
t.Fatalf("expected a pod event, got %#v", got)
}
}
9 changes: 0 additions & 9 deletions pkg/client/cache/reflector.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,6 @@ import (
"k8s.io/kubernetes/pkg/watch"
)

// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {
// List should return a list type object; the Items field will be extracted, and the
// ResourceVersion field will be used to start the watch in the right place.
List(options api.ListOptions) (runtime.Object, error)
// Watch should begin a watch at the specified version.
Watch(options api.ListOptions) (watch.Interface, error)
}

// Reflector watches a specified resource and causes all changes to be reflected in the given store.
type Reflector struct {
// name identifies this reflector. By default it will be a file:line if possible.
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/client_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (b SAControllerClientBuilder) Config(name string) (*restclient.Config, erro
return b.CoreClient.Secrets(b.Namespace).Watch(options)
},
}
_, err = watch.ListWatchUntil(30*time.Second, lw,
_, err = cache.ListWatchUntil(30*time.Second, lw,
func(event watch.Event) (bool, error) {
switch event.Type {
case watch.Deleted:
Expand Down
2 changes: 0 additions & 2 deletions pkg/watch/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ go_library(
],
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/meta:go_default_library",
"//pkg/api/unversioned:go_default_library",
"//pkg/runtime:go_default_library",
"//pkg/util/net:go_default_library",
Expand Down
78 changes: 0 additions & 78 deletions pkg/watch/until.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ package watch
import (
"time"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/wait"
)

Expand Down Expand Up @@ -84,78 +81,3 @@ func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc
}
return lastEvent, nil
}

// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {
// List should return a list type object; the Items field will be extracted, and the
// ResourceVersion field will be used to start the watch in the right place.
List(options api.ListOptions) (runtime.Object, error)
// Watch should begin a watch at the specified version.
Watch(options api.ListOptions) (Interface, error)
}

// TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until.
func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...ConditionFunc) (*Event, error) {
if len(conditions) == 0 {
return nil, nil
}

list, err := lw.List(api.ListOptions{})
if err != nil {
return nil, err
}
initialItems, err := meta.ExtractList(list)
if err != nil {
return nil, err
}

// use the initial items as simulated "adds"
var lastEvent *Event
currIndex := 0
passedConditions := 0
for _, condition := range conditions {
// check the next condition against the previous event and short circuit waiting for the next watch
if lastEvent != nil {
done, err := condition(*lastEvent)
if err != nil {
return lastEvent, err
}
if done {
passedConditions = passedConditions + 1
continue
}
}

ConditionSucceeded:
for currIndex < len(initialItems) {
lastEvent = &Event{Type: Added, Object: initialItems[currIndex]}
currIndex++

done, err := condition(*lastEvent)
if err != nil {
return lastEvent, err
}
if done {
passedConditions = passedConditions + 1
break ConditionSucceeded
}
}
}
if passedConditions == len(conditions) {
return lastEvent, nil
}
remainingConditions := conditions[passedConditions:]

metaObj, err := meta.ListAccessor(list)
if err != nil {
return nil, err
}
currResourceVersion := metaObj.GetResourceVersion()

watch, err := lw.Watch(api.ListOptions{ResourceVersion: currResourceVersion})
if err != nil {
return nil, err
}

return Until(timeout, watch, remainingConditions...)
}
52 changes: 0 additions & 52 deletions pkg/watch/until_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"time"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/wait"
)

Expand Down Expand Up @@ -163,54 +162,3 @@ func TestUntilErrorCondition(t *testing.T) {
t.Fatalf("expected %q in error string, got %q", expected, err.Error())
}
}

type lw struct {
list runtime.Object
watch Interface
}

func (w lw) List(options api.ListOptions) (runtime.Object, error) {
return w.list, nil
}

func (w lw) Watch(options api.ListOptions) (Interface, error) {
return w.watch, nil
}

func TestListWatchUntil(t *testing.T) {
fw := NewFake()
go func() {
var obj *api.Pod
fw.Modify(obj)
}()
listwatch := lw{
list: &api.PodList{Items: []api.Pod{{}}},
watch: fw,
}

conditions := []ConditionFunc{
func(event Event) (bool, error) {
t.Logf("got %#v", event)
return event.Type == Added, nil
},
func(event Event) (bool, error) {
t.Logf("got %#v", event)
return event.Type == Modified, nil
},
}

timeout := 10 * time.Second
lastEvent, err := ListWatchUntil(timeout, listwatch, conditions...)
if err != nil {
t.Fatalf("expected nil error, got %#v", err)
}
if lastEvent == nil {
t.Fatal("expected an event")
}
if lastEvent.Type != Modified {
t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type)
}
if got, isPod := lastEvent.Object.(*api.Pod); !isPod {
t.Fatalf("expected a pod event, got %#v", got)
}
}
23 changes: 10 additions & 13 deletions test/test_owners.csv
Original file line number Diff line number Diff line change
Expand Up @@ -282,13 +282,13 @@ Opaque resources should account opaque integer resources in pods with multiple c
Opaque resources should not break pods that do not consume opaque integer resources.,ConnorDoyle,0
Opaque resources should not schedule pods that exceed the available amount of opaque integer resource.,ConnorDoyle,0
Opaque resources should schedule pods that do consume opaque integer resources.,ConnorDoyle,0
PersistentVolumes with Single PV - PVC pairs create a PV and a pre-bound PVC: test write access,roberthbailey,1
PersistentVolumes with Single PV - PVC pairs create a PVC and a pre-bound PV: test write access,piosz,1
PersistentVolumes with Single PV - PVC pairs create a PVC and non-pre-bound PV: test write access,sttts,1
PersistentVolumes with Single PV - PVC pairs should create a non-pre-bound PV and PVC: test write access,yujuhong,1
PersistentVolumes with multiple PVs and PVCs all in same ns should create 2 PVs and 4 PVCs: test write access,brendandburns,1
PersistentVolumes with multiple PVs and PVCs all in same ns should create 3 PVs and 3 PVCs: test write access,hurf,1
PersistentVolumes with multiple PVs and PVCs all in same ns should create 4 PVs and 2 PVCs: test write access,jlowdermilk,1
PersistentVolumes with Single PV - PVC pairs create a PV and a pre-bound PVC: test write access,caesarxuchao,1
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rmmh do you know what caused these changes? I didn't work on PersistenVolumes and this PR doesn't have anything to do with it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@caesarxuchao no, this is unexpected. What environment (OS, Go version, Python version) did you run update_owners on?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OSX. go 1.6.3. python 2.7.10.

PersistentVolumes with Single PV - PVC pairs create a PVC and a pre-bound PV: test write access,caesarxuchao,1
PersistentVolumes with Single PV - PVC pairs create a PVC and non-pre-bound PV: test write access,caesarxuchao,1
PersistentVolumes with Single PV - PVC pairs should create a non-pre-bound PV and PVC: test write access,caesarxuchao,1
PersistentVolumes with multiple PVs and PVCs all in same ns should create 2 PVs and 4 PVCs: test write access,caesarxuchao,1
PersistentVolumes with multiple PVs and PVCs all in same ns should create 3 PVs and 3 PVCs: test write access,caesarxuchao,1
PersistentVolumes with multiple PVs and PVCs all in same ns should create 4 PVs and 2 PVCs: test write access,caesarxuchao,1
Pet Store should scale to persist a nominal number ( * ) of transactions in * seconds,xiang90,1
Pet set recreate should recreate evicted statefulset,roberthbailey,1
"Pod Disks Should schedule a pod w/ a RW PD, gracefully remove it, then schedule it on another host",alex-mohr,1
Expand Down Expand Up @@ -453,10 +453,7 @@ k8s.io/kubernetes/cmd/kube-apiserver/app,nikhiljindal,0
k8s.io/kubernetes/cmd/kube-apiserver/app/options,nikhiljindal,0
k8s.io/kubernetes/cmd/kube-discovery/app,pmorie,1
k8s.io/kubernetes/cmd/kube-proxy/app,luxas,1
k8s.io/kubernetes/cmd/kubeadm/app/cmd,davidopp,1
k8s.io/kubernetes/cmd/kubeadm/app/images,saad-ali,1
k8s.io/kubernetes/cmd/kubeadm/app/util,eparis,1
k8s.io/kubernetes/cmd/kubeadm/app/cmd,vishh,1
k8s.io/kubernetes/cmd/kubeadm/app/cmd,caesarxuchao,1
k8s.io/kubernetes/cmd/kubeadm/app/images,davidopp,1
k8s.io/kubernetes/cmd/kubeadm/app/preflight,apprenda,0
k8s.io/kubernetes/cmd/kubeadm/app/util,krousey,1
Expand Down Expand Up @@ -631,9 +628,9 @@ k8s.io/kubernetes/pkg/kubelet/qos,vishh,0
k8s.io/kubernetes/pkg/kubelet/rkt,apelisse,1
k8s.io/kubernetes/pkg/kubelet/rktshim,mml,1
k8s.io/kubernetes/pkg/kubelet/server,timstclair,0
k8s.io/kubernetes/pkg/kubelet/server/portforward,saad-ali,1
k8s.io/kubernetes/pkg/kubelet/server/portforward,caesarxuchao,1
k8s.io/kubernetes/pkg/kubelet/server/stats,timstclair,0
k8s.io/kubernetes/pkg/kubelet/server/streaming,derekwaynecarr,1
k8s.io/kubernetes/pkg/kubelet/server/streaming,caesarxuchao,1
k8s.io/kubernetes/pkg/kubelet/status,mwielgus,1
k8s.io/kubernetes/pkg/kubelet/sysctl,piosz,1
k8s.io/kubernetes/pkg/kubelet/types,jlowdermilk,1
Expand Down