Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add error type to watch. #1404

Merged
merged 4 commits into from
Sep 23, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 11 additions & 0 deletions pkg/api/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net/http"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
)

// statusError is an error intended for consumption by a REST API server.
Expand All @@ -38,6 +39,16 @@ func (e *statusError) Status() api.Status {
return e.status
}

// FromObject generates an statusError from an api.Status, if that is the type of obj; otherwise,
// returns an error created by fmt.Errorf.
func FromObject(obj runtime.Object) error {
switch t := obj.(type) {
case *api.Status:
return &statusError{*t}
}
return fmt.Errorf("unexpected object: %v", obj)
}

// NewNotFound returns a new error which indicates that the resource of the kind and the name was not found.
func NewNotFound(kind, name string) error {
return &statusError{api.Status{
Expand Down
21 changes: 21 additions & 0 deletions pkg/api/errors/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
)

func TestErrorNew(t *testing.T) {
Expand Down Expand Up @@ -131,3 +132,23 @@ func Test_reasonForError(t *testing.T) {
t.Errorf("unexpected reason type: %#v", a)
}
}

type TestType struct{}

func (*TestType) IsAnAPIObject() {}

func TestFromObject(t *testing.T) {
table := []struct {
obj runtime.Object
message string
}{
{&api.Status{Message: "foobar"}, "foobar"},
{&TestType{}, "unexpected object: &{}"},
}

for _, item := range table {
if e, a := item.message, FromObject(item.obj).Error(); e != a {
t.Errorf("Expected %v, got %v", e, a)
}
}
}
8 changes: 6 additions & 2 deletions pkg/client/cache/reflector.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"reflect"
"time"

apierrs "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
Expand Down Expand Up @@ -101,7 +102,7 @@ func (r *Reflector) listAndWatch() {
return
}
if err := r.watchHandler(w, &resourceVersion); err != nil {
glog.Errorf("failed to watch %v: %v", r.expectedType, err)
glog.Errorf("watch of %v ended with error: %v", r.expectedType, err)
return
}
}
Expand Down Expand Up @@ -131,6 +132,9 @@ func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) err
if !ok {
break
}
if event.Type == watch.Error {
return apierrs.FromObject(event.Object)
}
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
glog.Errorf("expected type %v, but watch event object had type %v", e, a)
continue
Expand Down Expand Up @@ -162,6 +166,6 @@ func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) err
glog.Errorf("unexpected watch close - watch lasted less than a second and no items received")
return errors.New("very short watch")
}
glog.Infof("unexpected watch close - %v total items received", eventCount)
glog.Infof("watch close - %v total items received", eventCount)
return nil
}
12 changes: 6 additions & 6 deletions pkg/kubelet/config/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,18 @@ func NewSourceEtcd(key string, client tools.EtcdClient, updates chan<- interface
}

func (s *SourceEtcd) run() {
watching, err := s.helper.Watch(s.key, 0)
if err != nil {
glog.Errorf("Failed to initialize etcd watch: %v", err)
return
}
watching := s.helper.Watch(s.key, 0)
for {
select {
case event, ok := <-watching.ResultChan():
if !ok {
return
}

if event.Type == watch.Error {
glog.Errorf("Watch error: %v", event.Object)
watching.Stop()
return
}
pods, err := eventToPods(event)
if err != nil {
glog.Errorf("Failed to parse result from etcd watch: %v", err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/registry/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func (r *Registry) WatchServices(label, field labels.Selector, resourceVersion u
return nil, fmt.Errorf("label selectors are not supported on services")
}
if value, found := field.RequiresExactMatch("ID"); found {
return r.Watch(makeServiceKey(value), resourceVersion)
return r.Watch(makeServiceKey(value), resourceVersion), nil
}
if field.Empty() {
return r.WatchList("/registry/services/specs", resourceVersion, tools.Everything)
Expand Down Expand Up @@ -375,7 +375,7 @@ func (r *Registry) WatchEndpoints(label, field labels.Selector, resourceVersion
return nil, fmt.Errorf("label selectors are not supported on endpoints")
}
if value, found := field.RequiresExactMatch("ID"); found {
return r.Watch(makeServiceEndpointsKey(value), resourceVersion)
return r.Watch(makeServiceEndpointsKey(value), resourceVersion), nil
}
if field.Empty() {
return r.WatchList("/registry/services/endpoints", resourceVersion, tools.Everything)
Expand Down
86 changes: 42 additions & 44 deletions pkg/tools/etcd_tools_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync"
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
Expand Down Expand Up @@ -48,7 +49,8 @@ func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter Filter

// Watch begins watching the specified key. Events are decoded into
// API objects and sent down the returned watch.Interface.
func (h *EtcdHelper) Watch(key string, resourceVersion uint64) (watch.Interface, error) {
// Errors will be sent down the channel.
func (h *EtcdHelper) Watch(key string, resourceVersion uint64) watch.Interface {
return h.WatchAndTransform(key, resourceVersion, nil)
}

Expand All @@ -67,10 +69,11 @@ func (h *EtcdHelper) Watch(key string, resourceVersion uint64) (watch.Interface,
// return value, nil
// })
//
func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) (watch.Interface, error) {
// Errors will be sent down the channel.
func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) watch.Interface {
w := newEtcdWatcher(false, Everything, h.Codec, h.ResourceVersioner, transform)
go w.etcdWatch(h.Client, key, resourceVersion)
return w, <-w.immediateError
return w
}

// TransformFunc attempts to convert an object to another object for use with a watcher.
Expand All @@ -86,14 +89,10 @@ type etcdWatcher struct {
filter FilterFunc

etcdIncoming chan *etcd.Response
etcdError chan error
etcdStop chan bool
etcdCallEnded chan struct{}

// etcdWatch will send an error down this channel if the Watch fails.
// Otherwise, a nil will be sent down this channel watchWaitDuration
// after the watch starts.
immediateError chan error

outgoing chan watch.Event
userStop chan struct{}
stopped bool
Expand All @@ -110,64 +109,53 @@ const watchWaitDuration = 100 * time.Millisecond
// and a versioner, the versioner must be able to handle the objects that transform creates.
func newEtcdWatcher(list bool, filter FilterFunc, encoding runtime.Codec, versioner runtime.ResourceVersioner, transform TransformFunc) *etcdWatcher {
w := &etcdWatcher{
encoding: encoding,
versioner: versioner,
transform: transform,
list: list,
filter: filter,
etcdIncoming: make(chan *etcd.Response),
etcdStop: make(chan bool),
etcdCallEnded: make(chan struct{}),
immediateError: make(chan error),
outgoing: make(chan watch.Event),
userStop: make(chan struct{}),
encoding: encoding,
versioner: versioner,
transform: transform,
list: list,
filter: filter,
etcdIncoming: make(chan *etcd.Response),
etcdError: make(chan error, 1),
etcdStop: make(chan bool),
outgoing: make(chan watch.Event),
userStop: make(chan struct{}),
}
w.emit = func(e watch.Event) { w.outgoing <- e }
go w.translate()
return w
}

// etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called
// as a goroutine. Will either send an error over w.immediateError if Watch fails, or in 100ms will
// as a goroutine.
func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string, resourceVersion uint64) {
defer util.HandleCrash()
defer close(w.etcdCallEnded)
go func() {
// This is racy; assume that Watch will fail within 100ms if it is going to fail.
// It's still more useful than blocking until the first result shows up.
// Trying to detect the 401: watch window expired error.
<-time.After(watchWaitDuration)
w.immediateError <- nil
}()
defer close(w.etcdError)
if resourceVersion == 0 {
latest, ok := etcdGetInitialWatchState(client, key, w.list, w.etcdIncoming)
if !ok {
latest, err := etcdGetInitialWatchState(client, key, w.list, w.etcdIncoming)
if err != nil {
w.etcdError <- err
return
}
resourceVersion = latest + 1
}
_, err := client.Watch(key, resourceVersion, w.list, w.etcdIncoming, w.etcdStop)
if err != etcd.ErrWatchStoppedByUser {
glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, key)
w.immediateError <- err
if err != nil && err != etcd.ErrWatchStoppedByUser {
w.etcdError <- err
}
}

// etcdGetInitialWatchState turns an etcd Get request into a watch equivalent
func etcdGetInitialWatchState(client EtcdGetSet, key string, recursive bool, incoming chan<- *etcd.Response) (resourceVersion uint64, success bool) {
success = true

func etcdGetInitialWatchState(client EtcdGetSet, key string, recursive bool, incoming chan<- *etcd.Response) (resourceVersion uint64, err error) {
resp, err := client.Get(key, false, recursive)
if err != nil {
if !IsEtcdNotFound(err) {
glog.Errorf("watch was unable to retrieve the current index for the provided key: %v (%#v)", err, key)
success = false
return
return resourceVersion, err
}
if index, ok := etcdErrorIndex(err); ok {
resourceVersion = index
}
return
return resourceVersion, nil
}
resourceVersion = resp.EtcdIndex
convertRecursiveResponse(resp.Node, resp, incoming)
Expand All @@ -189,24 +177,34 @@ func convertRecursiveResponse(node *etcd.Node, response *etcd.Response, incoming
incoming <- &copied
}

// translate pulls stuff from etcd, convert, and push out the outgoing channel. Meant to be
// translate pulls stuff from etcd, converts, and pushes out the outgoing channel. Meant to be
// called as a goroutine.
func (w *etcdWatcher) translate() {
defer close(w.outgoing)
defer util.HandleCrash()

for {
select {
case <-w.etcdCallEnded:
case err := <-w.etcdError:
if err != nil {
w.emit(watch.Event{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we set a ResourceVersion on the object for the client to read?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so-- we want to force them to re-list, don't we?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you're right. Ignore that

On Sep 22, 2014, at 8:49 PM, Daniel Smith notifications@github.com wrote:

In pkg/tools/etcd_tools_watch.go:

@@ -197,16 +185,26 @@ func (w *etcdWatcher) translate() {

for {
    select {
  •   case <-w.etcdCallEnded:
    
  •   case err := <-w.etcdError:
    
  •       if err != nil {
    
  •           w.emit(watch.Event{
    
    I don't think so-- we want to force them to re-list, don't we?


Reply to this email directly or view it on GitHub.

watch.Error,
&api.Status{
Status: api.StatusFailure,
Message: err.Error(),
},
})
}
return
case <-w.userStop:
w.etcdStop <- true
return
case res, ok := <-w.etcdIncoming:
if !ok {
return
if ok {
w.sendResult(res)
}
w.sendResult(res)
// If !ok, don't return here-- must wait for etcdError channel
// to give an error or be closed.
}
}
}
Expand Down