Skip to content

Commit

Permalink
Add apimachinery tools to deal with broken watches:
Browse files Browse the repository at this point in the history
 - watch.RetryWatcher
 - retry.RetryWatcher
 - informer.NewInformerWatcher
 - watch.UntilWithRetry
 - watch.UntilWithInformer
 - moves watch.Until to watch.UntilWithoutRetry with a big warning not to use it

 Remove cache.ListWatchUntil because there is no use for it anymore
  • Loading branch information
tnozicka committed Jul 31, 2018
1 parent 31982a3 commit a241de7
Show file tree
Hide file tree
Showing 14 changed files with 1,477 additions and 247 deletions.
40 changes: 0 additions & 40 deletions pkg/client/tests/listwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ import (
"net/http/httptest"
"net/url"
"testing"
"time"

"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -189,41 +187,3 @@ func (w lw) List(options metav1.ListOptions) (runtime.Object, error) {
func (w lw) Watch(options metav1.ListOptions) (watch.Interface, error) {
return w.watch, nil
}

func TestListWatchUntil(t *testing.T) {
fw := watch.NewFake()
go func() {
var obj *v1.Pod
fw.Modify(obj)
}()
listwatch := lw{
list: &v1.PodList{Items: []v1.Pod{{}}},
watch: fw,
}

conditions := []watch.ConditionFunc{
func(event watch.Event) (bool, error) {
t.Logf("got %#v", event)
return event.Type == watch.Added, nil
},
func(event watch.Event) (bool, error) {
t.Logf("got %#v", event)
return event.Type == watch.Modified, nil
},
}

timeout := 10 * time.Second
lastEvent, err := ListWatchUntil(timeout, listwatch, conditions...)
if err != nil {
t.Fatalf("expected nil error, got %#v", err)
}
if lastEvent == nil {
t.Fatal("expected an event")
}
if lastEvent.Type != watch.Modified {
t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type)
}
if got, isPod := lastEvent.Object.(*v1.Pod); !isPod {
t.Fatalf("expected a pod event, got %#v", got)
}
}
9 changes: 8 additions & 1 deletion staging/src/k8s.io/apimachinery/pkg/watch/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,17 @@ var _ Interface = &Recorder{}
// NewRecorder wraps an Interface and records any changes sent across it.
func NewRecorder(w Interface) *Recorder {
r := &Recorder{}
r.Interface = Filter(w, r.record)
r.SetInterface(w)
return r
}

// SetInterface sets the interface for the Recorder.
// Allows setting the Interface after the Recorder is created but before it'd be used.
// Not thread safe.
func (r *Recorder) SetInterface(w Interface) {
r.Interface = Filter(w, r.record)
}

// record is a FilterFunc and tracks each received event.
func (r *Recorder) record(in Event) (Event, bool) {
r.lock.Lock()
Expand Down
87 changes: 0 additions & 87 deletions staging/src/k8s.io/apimachinery/pkg/watch/until.go

This file was deleted.

46 changes: 46 additions & 0 deletions staging/src/k8s.io/apimachinery/pkg/watch/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,3 +268,49 @@ func (f *RaceFreeFakeWatcher) Action(action EventType, obj runtime.Object) {
}
}
}

// ProxyWatcher lets you wrap your channel in watch Interface. Threadsafe.
type ProxyWatcher struct {
result chan Event
stopCh chan struct{}
stopped bool
mutex sync.Mutex
}

var _ Interface = &ProxyWatcher{}

// NewProxyWatcher creates new ProxyWatcher by wrapping a channel
func NewProxyWatcher(ch chan Event) *ProxyWatcher {
return &ProxyWatcher{
result: ch,
stopCh: make(chan struct{}),
stopped: false,
}
}

// Stop implements Interface
func (pw *ProxyWatcher) Stop() {
pw.mutex.Lock()
defer pw.mutex.Unlock()
if !pw.stopped {
pw.stopped = true
close(pw.stopCh)
}
}

// Stopping returns true if Stop() has been called
func (pw *ProxyWatcher) Stopping() bool {
pw.mutex.Lock()
defer pw.mutex.Unlock()
return pw.stopped == true
}

// ResultChan implements Interface
func (pw *ProxyWatcher) ResultChan() <-chan Event {
return pw.result
}

// StopChan returns stop channel
func (pw *ProxyWatcher) StopChan() <-chan struct{} {
return pw.stopCh
}
38 changes: 38 additions & 0 deletions staging/src/k8s.io/apimachinery/pkg/watch/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package watch_test

import (
"reflect"
"testing"

"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -135,3 +136,40 @@ func TestEmpty(t *testing.T) {
t.Errorf("unexpected result channel result")
}
}

func TestProxyWatcher(t *testing.T) {
events := []Event{
{Added, testType("foo")},
{Modified, testType("qux")},
{Modified, testType("bar")},
{Deleted, testType("bar")},
{Error, testType("error: blah")},
}

ch := make(chan Event, len(events))
w := NewProxyWatcher(ch)

for _, e := range events {
ch <- e
}

for _, e := range events {
g := <-w.ResultChan()
if !reflect.DeepEqual(e, g) {
t.Errorf("Expected %#v, got %#v", e, g)
continue
}
}

w.Stop()

select {
// Closed channel always reads immediately
case <-w.StopChan():
default:
t.Error("Channel isn't closed")
}

// Test double close
w.Stop()
}
101 changes: 14 additions & 87 deletions staging/src/k8s.io/client-go/tools/cache/listwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,34 @@ package cache

import (
"context"
"time"

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/pager"
)

// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {
// Lister is any object that knows how to perform an initial list.
type Lister interface {
// List should return a list type object; the Items field will be extracted, and the
// ResourceVersion field will be used to start the watch in the right place.
List(options metav1.ListOptions) (runtime.Object, error)
}

// Watcher is any object that knows how to start a watch on a resource.
type Watcher interface {
// Watch should begin a watch at the specified version.
Watch(options metav1.ListOptions) (watch.Interface, error)
}

// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {
Lister
Watcher
}

// ListFunc knows how to list resources
type ListFunc func(options metav1.ListOptions) (runtime.Object, error)

Expand Down Expand Up @@ -93,95 +100,15 @@ func NewFilteredListWatchFromClient(c Getter, resource string, namespace string,
return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}

func timeoutFromListOptions(options metav1.ListOptions) time.Duration {
if options.TimeoutSeconds != nil {
return time.Duration(*options.TimeoutSeconds) * time.Second
}
return 0
}

// List a set of apiserver resources
// List lists a set of apiserver resources
func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) {
if !lw.DisableChunking {
return pager.New(pager.SimplePageFunc(lw.ListFunc)).List(context.TODO(), options)
}
return lw.ListFunc(options)
}

// Watch a set of apiserver resources
// Watch watches a set of apiserver resources
func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {
return lw.WatchFunc(options)
}

// ListWatchUntil checks the provided conditions against the items returned by the list watcher, returning wait.ErrWaitTimeout
// if timeout is exceeded without all conditions returning true, or an error if an error occurs.
// TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until.
func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watch.ConditionFunc) (*watch.Event, error) {
if len(conditions) == 0 {
return nil, nil
}

list, err := lw.List(metav1.ListOptions{})
if err != nil {
return nil, err
}
initialItems, err := meta.ExtractList(list)
if err != nil {
return nil, err
}

// use the initial items as simulated "adds"
var lastEvent *watch.Event
currIndex := 0
passedConditions := 0
for _, condition := range conditions {
// check the next condition against the previous event and short circuit waiting for the next watch
if lastEvent != nil {
done, err := condition(*lastEvent)
if err != nil {
return lastEvent, err
}
if done {
passedConditions = passedConditions + 1
continue
}
}

ConditionSucceeded:
for currIndex < len(initialItems) {
lastEvent = &watch.Event{Type: watch.Added, Object: initialItems[currIndex]}
currIndex++

done, err := condition(*lastEvent)
if err != nil {
return lastEvent, err
}
if done {
passedConditions = passedConditions + 1
break ConditionSucceeded
}
}
}
if passedConditions == len(conditions) {
return lastEvent, nil
}
remainingConditions := conditions[passedConditions:]

metaObj, err := meta.ListAccessor(list)
if err != nil {
return nil, err
}
currResourceVersion := metaObj.GetResourceVersion()

watchInterface, err := lw.Watch(metav1.ListOptions{ResourceVersion: currResourceVersion})
if err != nil {
return nil, err
}

evt, err := watch.Until(timeout, watchInterface, remainingConditions...)
if err == watch.ErrWatchClosed {
// present a consistent error interface to callers
err = wait.ErrWaitTimeout
}
return evt, err
}

0 comments on commit a241de7

Please sign in to comment.