Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make watch actually return an error when there's an error #1389

Merged
merged 2 commits into from
Sep 22, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 18 additions & 4 deletions pkg/client/cache/reflector.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package cache

import (
"errors"
"fmt"
"reflect"
"time"
Expand Down Expand Up @@ -99,7 +100,10 @@ func (r *Reflector) listAndWatch() {
glog.Errorf("failed to watch %v: %v", r.expectedType, err)
return
}
r.watchHandler(w, &resourceVersion)
if err := r.watchHandler(w, &resourceVersion); err != nil {
glog.Errorf("failed to watch %v: %v", r.expectedType, err)
return
}
}
}

Expand All @@ -119,12 +123,13 @@ func (r *Reflector) syncWith(items []runtime.Object) error {
}

// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) {
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) error {
start := time.Now()
eventCount := 0
for {
event, ok := <-w.ResultChan()
if !ok {
glog.Errorf("unexpected watch close")
return
break
}
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
glog.Errorf("expected type %v, but watch event object had type %v", e, a)
Expand All @@ -149,5 +154,14 @@ func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) {
glog.Errorf("unable to understand watch event %#v", event)
}
*resourceVersion = jsonBase.ResourceVersion() + 1
eventCount++
}

watchDuration := time.Now().Sub(start)
if watchDuration < 1*time.Second && eventCount == 0 {
glog.Errorf("unexpected watch close - watch lasted less than a second and no items received")
return errors.New("very short watch")
}
glog.Infof("unexpected watch close - %v total items received", eventCount)
return nil
}
19 changes: 18 additions & 1 deletion pkg/client/cache/reflector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,20 @@ func (t *testLW) Watch(resourceVersion uint64) (watch.Interface, error) {
return t.WatchFunc(resourceVersion)
}

func TestReflector_watchHandlerError(t *testing.T) {
s := NewStore()
g := NewReflector(&testLW{}, &api.Pod{}, s)
fw := watch.NewFake()
go func() {
fw.Stop()
}()
var resumeRV uint64
err := g.watchHandler(fw, &resumeRV)
if err == nil {
t.Errorf("unexpected non-error")
}
}

func TestReflector_watchHandler(t *testing.T) {
s := NewStore()
g := NewReflector(&testLW{}, &api.Pod{}, s)
Expand All @@ -49,7 +63,10 @@ func TestReflector_watchHandler(t *testing.T) {
fw.Stop()
}()
var resumeRV uint64
g.watchHandler(fw, &resumeRV)
err := g.watchHandler(fw, &resumeRV)
if err != nil {
t.Errorf("unexpected error %v", err)
}

table := []struct {
ID string
Expand Down
42 changes: 30 additions & 12 deletions pkg/tools/etcd_tools_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package tools

import (
"sync"
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
Expand Down Expand Up @@ -69,7 +70,7 @@ func (h *EtcdHelper) Watch(key string, resourceVersion uint64) (watch.Interface,
func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) (watch.Interface, error) {
w := newEtcdWatcher(false, Everything, h.Codec, h.ResourceVersioner, transform)
go w.etcdWatch(h.Client, key, resourceVersion)
return w, nil
return w, <-w.immediateError
}

// TransformFunc attempts to convert an object to another object for use with a watcher.
Expand All @@ -88,6 +89,11 @@ type etcdWatcher struct {
etcdStop chan bool
etcdCallEnded chan struct{}

// etcdWatch will send an error down this channel if the Watch fails.
// Otherwise, a nil will be sent down this channel watchWaitDuration
// after the watch starts.
immediateError chan error

outgoing chan watch.Event
userStop chan struct{}
stopped bool
Expand All @@ -97,31 +103,42 @@ type etcdWatcher struct {
emit func(watch.Event)
}

// watchWaitDuration is the amount of time to wait for an error from watch.
const watchWaitDuration = 100 * time.Millisecond

// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform
// and a versioner, the versioner must be able to handle the objects that transform creates.
func newEtcdWatcher(list bool, filter FilterFunc, encoding runtime.Codec, versioner runtime.ResourceVersioner, transform TransformFunc) *etcdWatcher {
w := &etcdWatcher{
encoding: encoding,
versioner: versioner,
transform: transform,
list: list,
filter: filter,
etcdIncoming: make(chan *etcd.Response),
etcdStop: make(chan bool),
etcdCallEnded: make(chan struct{}),
outgoing: make(chan watch.Event),
userStop: make(chan struct{}),
encoding: encoding,
versioner: versioner,
transform: transform,
list: list,
filter: filter,
etcdIncoming: make(chan *etcd.Response),
etcdStop: make(chan bool),
etcdCallEnded: make(chan struct{}),
immediateError: make(chan error),
outgoing: make(chan watch.Event),
userStop: make(chan struct{}),
}
w.emit = func(e watch.Event) { w.outgoing <- e }
go w.translate()
return w
}

// etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called
// as a goroutine.
// as a goroutine. Will either send an error over w.immediateError if Watch fails, or in 100ms will
func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string, resourceVersion uint64) {
defer util.HandleCrash()
defer close(w.etcdCallEnded)
go func() {
// This is racy; assume that Watch will fail within 100ms if it is going to fail.
// It's still more useful than blocking until the first result shows up.
// Trying to detect the 401: watch window expired error.
<-time.After(watchWaitDuration)
w.immediateError <- nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like we're just dancing around not adding an "error" result to the watch stream. Is it really simpler to eat a real error (at any time during a watch) than to just build in the possibility that errors can be returned by the watch interface?

I.e.:

chan <- watch.Event{
  Type: "ERROR",
  Object: &api.Status{
    ...
  },

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm convinced--if you have a preference for this, I can make it happen. Can I do it in a subsequent PR, though? This one at least prevents a nasty errorloop.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

----- Original Message -----

func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string,
resourceVersion uint64) {
defer util.HandleCrash()
defer close(w.etcdCallEnded)

  • go func() {
  •   // This is racy; assume that Watch will fail within 100ms if it is going
    
    to fail.
  •   // It's still more useful than blocking until the first result shows up.
    
  •   // Trying to detect the 401: watch window expired error.
    
  •   <-time.After(watchWaitDuration)
    
  •   w.immediateError <- nil
    

Yeah, I'm convinced--if you have a preference for this, I can make it happen.
Can I do it in a subsequent PR, though? This one at least prevents a nasty
errorloop.

Absolutely.

}()
if resourceVersion == 0 {
latest, ok := etcdGetInitialWatchState(client, key, w.list, w.etcdIncoming)
if !ok {
Expand All @@ -132,6 +149,7 @@ func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string, resourceVersion u
_, err := client.Watch(key, resourceVersion, w.list, w.etcdIncoming, w.etcdStop)
if err != etcd.ErrWatchStoppedByUser {
glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, key)
w.immediateError <- err
}
}

Expand Down
13 changes: 13 additions & 0 deletions pkg/tools/etcd_tools_watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,19 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) {
}
}

func TestWatchEtcdError(t *testing.T) {
codec := latest.Codec
fakeClient := NewFakeEtcdClient(t)
fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{}
fakeClient.WatchImmediateError = fmt.Errorf("immediate error")
h := EtcdHelper{fakeClient, codec, versioner}

_, err := h.Watch("/some/key", 0)
if err == nil {
t.Fatalf("Unexpected non-error")
}
}

func TestWatch(t *testing.T) {
codec := latest.Codec
fakeClient := NewFakeEtcdClient(t)
Expand Down
5 changes: 5 additions & 0 deletions pkg/tools/fake_etcd_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type FakeEtcdClient struct {
// Write to this to prematurely stop a Watch that is running in a goroutine.
WatchInjectError chan<- error
WatchStop chan<- bool
// If non-nil, will be returned immediately when Watch is called.
WatchImmediateError error
}

func NewFakeEtcdClient(t TestLogger) *FakeEtcdClient {
Expand Down Expand Up @@ -250,6 +252,9 @@ func (f *FakeEtcdClient) WaitForWatchCompletion() {
}

func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) {
if f.WatchImmediateError != nil {
return nil, f.WatchImmediateError
}
f.WatchResponse = receiver
f.WatchStop = stop
f.WatchIndex = waitIndex
Expand Down