Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcd3/watcher: cancelling context shouldn't return error #24638

Merged
merged 1 commit into from
Apr 22, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 15 additions & 2 deletions pkg/storage/etcd3/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
etcdrpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/golang/glog"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)

const (
Expand Down Expand Up @@ -78,6 +80,12 @@ func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive bo
if recursive && !strings.HasSuffix(key, "/") {
key += "/"
}
wc := w.createWatchChan(ctx, key, rev, recursive, filter)
go wc.run()
return wc, nil
}

func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive bool, filter storage.FilterFunc) *watchChan {
wc := &watchChan{
watcher: w,
key: key,
Expand All @@ -89,8 +97,7 @@ func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive bo
errChan: make(chan error, 1),
}
wc.ctx, wc.cancel = context.WithCancel(ctx)
go wc.run()
return wc, nil
return wc
}

func (wc *watchChan) run() {
Expand Down Expand Up @@ -276,6 +283,12 @@ func parseError(err error) *watch.Event {
}

func (wc *watchChan) sendError(err error) {
// Context.canceled is an expected behavior.
// We should just stop all goroutines in watchChan without returning error.
// TODO: etcd client should return context.Canceled instead of grpc specific error.
if grpc.Code(err) == codes.Canceled || err == context.Canceled {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need grpc dependency?
We are still not using grpc anywhere in kubernetes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment.

Currently etcd client returns grpc error. It shouldn't though. We will fix it later by return "context.Canceled"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment explains the problem (which I understand) not why you are using grpc.

Currently etcd client returns grpc error. It shouldn't though. We will fix it later by return "context.Canceled"

ok - that makes sense.

return
}
select {
case wc.errChan <- err:
case <-wc.ctx.Done():
Expand Down
19 changes: 19 additions & 0 deletions pkg/storage/etcd3/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,25 @@ func TestWatchError(t *testing.T) {
testCheckResult(t, 0, watch.Error, w, nil)
}

func TestWatchContextCancel(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
canceledCtx, cancel := context.WithCancel(ctx)
cancel()
w := store.watcher.createWatchChan(canceledCtx, "/abc", 0, false, storage.Everything)
// When we do a client.Get with a canceled context, it will return error.
// Nonetheless, when we try to send it over internal errChan, we should detect
// it's context canceled and not send it.
err := w.sync()
w.ctx = ctx
w.sendError(err)
select {
case err := <-w.errChan:
t.Errorf("cancelling context shouldn't return any error. Err: %v", err)
default:
}
}

type testWatchStruct struct {
obj *api.Pod
expectEvent bool
Expand Down