New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extend watch queue with a timeout and size limit #2285
Conversation
813ed1a
to
14f3120
Compare
The vendoring error is something weird that came up through colliding merges that both vendored different things. We fixed it earlier today on master, so if you rebase, it should resolve that problem. I'm uncertain whether |
watch/queue/queue.go
Outdated
"github.com/docker/go-events" | ||
) | ||
|
||
func ErrQueueFullCloseFailed(err error) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be unused. If it's meant for external users, it seems like a strange function to expose.
watch/queue/queue.go
Outdated
return fmt.Errorf("The queue size reached its limit but couldn't be closed: %s", err) | ||
} | ||
|
||
var ErrQueueFull = fmt.Errorf("The queue size reached its limit and was closed") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about queue closed due to size limit
?
watch/sinks.go
Outdated
events "github.com/docker/go-events" | ||
) | ||
|
||
var ErrSinkTimeout = fmt.Errorf("Timeout exceeded, tearing down sink") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lowercase timeout
in the message
watch/sinks.go
Outdated
select { | ||
case err := <-errChan: | ||
return err | ||
case <-time.After(s.timeout): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest using time.NewTimer
explicitly here instead of time.After
, so that the timer can be stopped in the common case that the write succeeds before the timer fires. There's no functional difference, but it's a little more optimal from a resource management perspective.
watch/watch.go
Outdated
} | ||
|
||
// NewQueueWithOpts creates a new Queue using the full set of options. | ||
func NewQueueWithOpts(opts QueueOpts) *Queue { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me this is begging for functional arguments: https://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis
watch/watch.go
Outdated
// WatchWithCtx returns a channel where all items published to the queue will | ||
// be received. The channel will be closed when the provided context is | ||
// cancelled. | ||
func (q *Queue) WatchWithCtx(ctx context.Context) (eventq chan events.Event) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
grpc has a variant of Dial
which takes a context that they call DialContext
, so WatchContext
?
I think we could have something like this in go-events. We may want to back this with a buffered channel, depending on whether or not runtime does lazy allocation for buffered channel. |
watch/watch.go
Outdated
q.mu.Unlock() | ||
|
||
if cancelFunc != nil { | ||
cancelFunc() | ||
} | ||
} | ||
|
||
outChan := make(chan events.Event) | ||
go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather avoid an extra goroutine for every watch. This will be a large number for swarmkit, and it's already quite hard to read stack dumps. Either it should be opt-in with a special variant of CallbackWatch
, or we should just return the extra channels and let the caller implement this select
if it wants to.
return nil | ||
} | ||
|
||
// Full returns a channel that is closed when the queue becomes full for the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens when it is no longer full?
Typically, "full" and "empty" act pretty racy for concurrent queues. When would this be used?
If we want to act on full, might want a clamping function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The channel is closed when a Write causes the queue to reach its limit. The queue can stop being full afterwards, and this channel is not meant as a mechanism for viewing the current full-ness state of the queue.
The main use case for this is to notify that least one Event
has been dropped, and then it's up to the listener to determine if any action should be taken.
In the case of docker events, all API server implementations can receive a /events?since
parameter to backfill past events, and the events stream is expected to be reliable in some versions of the CLI (1.13 to 17.03). Therefore, when a slow listener fills up its queue it's preferred to close their event stream entirely and have them re-establish it with an appropriate since
parameter, rather than silently dropping events.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't this be handled with a callback on each dropped message? This seems very fragile.
14f3120
to
1bba22a
Compare
Rebased off of master and added a commit which addresses all review comments until this point. |
// debug log messages that may be confusing. It is possible that the queue | ||
// will try to write an event to its destination channel while the queue is | ||
// being removed from the broadcaster. Since the channel is closed before the | ||
// queue, there is a narrow window when this is possible. In some event-based |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"some event-based systems"?
watch/sinks_test.go
Outdated
for { | ||
<-ch.C | ||
} | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this test will leak a goroutine; is ch.C
ever closed? If not, can you add another channel for this goroutine to select
on so it will terminate when the test is finished?
watch/sinks_test.go
Outdated
// Make sure that closing a sink closes the channel | ||
var errClose error | ||
go func() { | ||
errClose = sink.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this in a separate goroutine?
watch/watch.go
Outdated
cancelFuncs map[events.Sink]func() | ||
|
||
// closeOutChan indicates whether the watchers' channels should be closed | ||
// when a watcher queue reaches its limit or when |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or when?
watch/watch.go
Outdated
for _, option := range options { | ||
err := option(q) | ||
if err != nil { | ||
logrus.Warnf("Failed to apply options to queue: %s", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess you don't want to change the signature of NewQueue
, so how about a panic here instead?
watch/watch.go
Outdated
} | ||
|
||
if q.closeOutChan && q.limit == 0 { | ||
logrus.Warnf("Unable to create queue with zero size limit and closeOutChan") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I know I suggested this, but looking at the code I'm not sure there's any reason it can't be supported. It would just inhibit the optimization of returning ch.C
(which is fine). Am I missing anything?
@alexmavr Any plans to PR this to go-events? This really belongs there as a primitive. |
@stevvooe |
watch/watch.go
Outdated
|
||
// NewTimeoutLimitQueue creates a queue with a size limit and a request write | ||
// timeout. | ||
func NewTimeoutLimitQueue(timeout time.Duration, limit uint64) *Queue { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's instead define functions like
func WithTimeout(timeout time.Duration) func(*Queue) error
func WithLimit(limit uint64) func(*Queue) error
func WithCloseOutChan(closeOutChan bool) func(*Queue) error
Then you can create a queue with something like:
NewQueue(WithTimeout(30*time.Second), WithCloseOutChan(true))
It's a bit more flexible that way.
Please sign your commits following these rules: $ git clone -b "watch-extensions" git@github.com:alexmavr/swarmkit.git somewhere
$ cd somewhere
$ git rebase -i HEAD~842353884240
editor opens
change each 'pick' to 'edit'
save the file and quit
$ git commit --amend -s --no-edit
$ git rebase --continue # and repeat the amend for each commit
$ git push -f Amending updates the existing PR. You DO NOT need to open a new one. |
Codecov Report
@@ Coverage Diff @@
## master #2285 +/- ##
==========================================
+ Coverage 60.96% 61.04% +0.08%
==========================================
Files 126 128 +2
Lines 20391 20531 +140
==========================================
+ Hits 12431 12534 +103
- Misses 6589 6632 +43
+ Partials 1371 1365 -6 |
f043ff7
to
9ce0554
Compare
So, the bounded queue model makes a lot of sense in go-events and I think we can do it with less code (maybe). I understand the time pressure, but I think if we are making a commitment to quality, we should try to land this in the right place, rather than defer to later. Once it is tested and in the code base, there will be little incentive to move it over. |
watch/sinks_test.go
Outdated
go func() { | ||
errClose = sink.Close() | ||
}() | ||
errClose = sink.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: errClose := sink.Close()
instead of predeclaring
watch/sinks_test.go
Outdated
<-ch.Done() | ||
require.NoError(errClose) | ||
|
||
// Close the leaking goroutine | ||
close(doneChan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: defer close(doneChan)
right after the channel is created.
// If a size of 0 is provided, the LimitQueue is considered limitless. | ||
type LimitQueue struct { | ||
dst events.Sink | ||
events *list.List |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure how much time you have to make this work, but if you can instrument the entrance and exit of a regular queue, you can avoid having to replicate all of this logic.
if !eq.fullClosed { | ||
eq.fullClosed = true | ||
close(eq.full) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can do a callback here. That would avoid spilling the internal channel manipulation outside of the internals.
Make sure to release the locks, then return ErrQueueFull
. That will allow writer and out of band notification. It also doesn't poison the queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that a callback would be a better pattern here. Let's do it this way for the go-events
followup
} | ||
|
||
if err := eq.dst.Write(event); err != nil { | ||
// TODO(aaronl): Dropping events could be bad depending |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aaronlehmann Did we not remove this error message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's suppressed by a wrapper sink in swarmkit. Discussion here: docker/go-events#11
watch/queue/queue_test.go
Outdated
@@ -13,10 +14,13 @@ type mockSink struct { | |||
closed bool | |||
holdChan chan struct{} | |||
data []events.Event | |||
mutex *sync.Mutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just do mutex sync.Mutex
; then it doesn't need to be initialized.
watch/queue/queue_test.go
Outdated
} | ||
|
||
func TestLimitQueueNoLimit(t *testing.T) { | ||
require := require.New(t) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's cool; I didn't know about this feature.
watch/watch_test.go
Outdated
go func() { | ||
closed := false | ||
for range events { | ||
// After receiving the first event, block indefinitely |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment is outdated
watch/watch_test.go
Outdated
|
||
doneChan = make(chan struct{}) | ||
go func() { | ||
for !eventsClosed { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the race detector will consider this a data race. If we make it a channel that gets closed, it avoids the problem. I'd kind of like to unify this goroutine with the select
below anyway. How about something like this:
timeoutTimer := time.NewTimer(time.Minute)
defer timeoutTimer.Stop()
selectLoop:
for {
select {
case <-eventsClosed:
break selectLoop
case <-time.After(writerSleepDuration):
q.Publish("new event")
case <-timeoutTimer.C:
require.Fail("Timeout exceeded")
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a great way to restructure this, thanks for the insight.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't look into details, but sync.Once
may help in certain areas...
watch/queue/queue_test.go
Outdated
@@ -42,6 +48,10 @@ func (s *mockSink) Len() int { | |||
return len(s.data) | |||
} | |||
|
|||
func (s *mockSink) String() string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have to take the lock here to make the race detector happy?
Signed-off-by: Alex Mavrogiannis <alex.mavrogiannis@docker.com>
8dbfddb
to
2d31d73
Compare
Addressed final comments and squashed |
LGTM |
1 similar comment
LGTM |
- moby/swarmkit#2266 (support for templating Node.Hostname in docker executor) - moby/swarmkit#2281 (change restore action on objects to be update, not delete/create) - moby/swarmkit#2285 (extend watch queue with timeout and size limit) - moby/swarmkit#2253 (version-aware failure tracking in the scheduler) - moby/swarmkit#2275 (update containerd and port executor to container client library) - moby/swarmkit#2292 (rename some generic resources) - moby/swarmkit#2300 (limit the size of the external CA response) - moby/swarmkit#2301 (delete global tasks when the node running them is deleted) Minor cleanups, dependency bumps, and vendoring: - moby/swarmkit#2271 - moby/swarmkit#2279 - moby/swarmkit#2283 - moby/swarmkit#2282 - moby/swarmkit#2274 - moby/swarmkit#2296 (dependency bump of etcd, go-winio) Signed-off-by: Ying Li <ying.li@docker.com> Upstream-commit: 4509a00 Component: engine
- moby/swarmkit#2266 (support for templating Node.Hostname in docker executor) - moby/swarmkit#2281 (change restore action on objects to be update, not delete/create) - moby/swarmkit#2285 (extend watch queue with timeout and size limit) - moby/swarmkit#2253 (version-aware failure tracking in the scheduler) - moby/swarmkit#2275 (update containerd and port executor to container client library) - moby/swarmkit#2292 (rename some generic resources) - moby/swarmkit#2300 (limit the size of the external CA response) - moby/swarmkit#2301 (delete global tasks when the node running them is deleted) Minor cleanups, dependency bumps, and vendoring: - moby/swarmkit#2271 - moby/swarmkit#2279 - moby/swarmkit#2283 - moby/swarmkit#2282 - moby/swarmkit#2274 - moby/swarmkit#2296 (dependency bump of etcd, go-winio) Signed-off-by: Ying Li <ying.li@docker.com> Upstream-commit: 4509a00 Component: engine
This PR extends the
watch
package with the following items:LimitQueue
. This is near-identical to the implementation of a Queue from https://github.com/docker/go-events/blob/master/queue.go with the difference that the queue has an upper size limit. When that limit is reached, a channel is closed and it's up to the user to determine the desired behavior from thereTimeoutSink
which wraps another sink with a timeout. If the timeout is reached, the wrapped sink is closed and an error is returned to the writer.ChannelSinkGenerator
interface which can be used to configure the sink-chain thatwatch.Queue
creates for each watcher.watch.Queue
NewTimeoutLimitQueue
, or the more genericNewQueueWithOpts
. Edit: Feature replaced with a functional options constructorTo avoid disruption on existing users, the existing
NewQueue
constructor will return the previous configuration of sinks.Unit test times:
cc @aaronlehmann @nishanttotla
Signed-off-by: Alex Mavrogiannis alex.mavrogiannis@docker.com