Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't block etcd client #10427

Merged
merged 1 commit into from
Jun 29, 2015
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
27 changes: 20 additions & 7 deletions pkg/tools/etcd_helper_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,22 @@ const watchWaitDuration = 100 * time.Millisecond
// and a versioner, the versioner must be able to handle the objects that transform creates.
func newEtcdWatcher(list bool, include includeFunc, filter FilterFunc, encoding runtime.Codec, versioner EtcdVersioner, transform TransformFunc, cache etcdCache) *etcdWatcher {
w := &etcdWatcher{
encoding: encoding,
versioner: versioner,
transform: transform,
list: list,
include: include,
filter: filter,
etcdIncoming: make(chan *etcd.Response),
encoding: encoding,
versioner: versioner,
transform: transform,
list: list,
include: include,
filter: filter,
// Buffer this channel, so that the etcd client is not forced
// to context switch with every object it gets, and so that a
// long time spent decoding an object won't block the *next*
// object. Basically, we see a lot of "401 window exceeded"
// errors from etcd, and that's due to the client not streaming
// results but rather getting them one at a time. So we really
// want to never block the etcd client, if possible. The 50 is
// arbitrary; there's a V(4) log message that prints the length
// so we can monitor how much of this buffer is actually used.
etcdIncoming: make(chan *etcd.Response, 50),
etcdError: make(chan error, 1),
etcdStop: make(chan bool),
outgoing: make(chan watch.Event),
Expand Down Expand Up @@ -250,6 +259,10 @@ func (w *etcdWatcher) translate() {
return
case res, ok := <-w.etcdIncoming:
if ok {
if curLen := len(w.etcdIncoming); curLen > 0 {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to cause a logging storm, which will further exacerbate the backlog. Don't you rather want this logging threshold to be greater than zero? Perhaps n/2 for a channel of length n? And you don't want to log more often than once a minute or so.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically I want to know if it's ever > 0-- if so, that justifies this patch. If not, this patch didn't fix anything.

I can adjust to reduce log spam, but the decoding process that is backing things up is much more resource intensive than a log message, so this is "only" a cosmetic problem.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest basic edge triggering then. Log once when it crosses from zero to non-zero, and once when it crosses back again.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@quinton-hoole - this is logged at the 4 level - by default we're using 2, so this won't be logged

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Fair enough. LGTM

// Monitor if this gets backed up, and how much.
glog.V(4).Infof("watch: %v objects queued in channel.", curLen)
}
w.sendResult(res)
}
// If !ok, don't return here-- must wait for etcdError channel
Expand Down