Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stop etcd watcher when watch chan is closed #33003

Merged
merged 1 commit into from
Sep 21, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 16 additions & 6 deletions pkg/storage/etcd3/watcher.go
Expand Up @@ -99,7 +99,8 @@ func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, re
}

func (wc *watchChan) run() {
go wc.startWatching()
watchClosedCh := make(chan struct{})
go wc.startWatching(watchClosedCh)

var resultChanWG sync.WaitGroup
resultChanWG.Add(1)
Expand All @@ -108,7 +109,6 @@ func (wc *watchChan) run() {
select {
case err := <-wc.errChan:
if err == context.Canceled {
wc.cancel() // just in case
break
}
errResult := parseError(err)
Expand All @@ -119,10 +119,15 @@ func (wc *watchChan) run() {
case <-wc.ctx.Done(): // user has given up all results
}
}
wc.cancel()
case <-wc.ctx.Done():
case <-watchClosedCh:
case <-wc.ctx.Done(): // user cancel
}
// we need to wait until resultChan wouldn't be sent to anymore

// We use wc.ctx to reap all goroutines. Under whatever condition, we should stop them all.
// It's fine to double cancel.
wc.cancel()

// we need to wait until resultChan wouldn't be used anymore
resultChanWG.Wait()
close(wc.resultChan)
}
Expand Down Expand Up @@ -157,7 +162,7 @@ func (wc *watchChan) sync() error {
// startWatching does:
// - get current objects if initialRev=0; set initialRev to current rev
// - watch on given key and send events to process.
func (wc *watchChan) startWatching() {
func (wc *watchChan) startWatching(watchClosedCh chan struct{}) {
if wc.initialRev == 0 {
if err := wc.sync(); err != nil {
glog.Errorf("failed to sync with latest state: %v", err)
Expand All @@ -182,6 +187,11 @@ func (wc *watchChan) startWatching() {
wc.sendEvent(parseEvent(e))
}
}
// When we come to this point, it's only possible that client side ends the watch.
// e.g. cancel the context, close the client.
// If this watch chan is broken and context isn't cancelled, other goroutines will still hang.
// We should notify the main thread that this goroutine has exited.
close(watchClosedCh)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do this via defer if it should always happen. right now there's a case where it isn't called and I can't tell if that's bug or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't always happen -- it should happen only if watch chan is closed and no error message returned from watch chan.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lavalamp
Any comment on this?
If not, can you remove "requested change"?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's incomprehensible to me why you'd want to only notify your caller under that condition. It seems the caller should be notified when this function exits for any reason. Please heavily comment or change. It's really confusing right now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it do any harm to always close this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Answer is yes.
On error case, we are expecting to return error via the ResultChan() before closing the chan. If we always close it, we won't get same guarantee on that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I still think this needs a lot of changes to be understandable, but I won't block this any more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1.

}

// processEvent processes events from etcd watcher and sends results to resultChan.
Expand Down