New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
core/config: refactor change dispatcher #4657
Conversation
// | ||
// The channels to the main goroutine and to the listener goroutines have a size of 1 so typically | ||
// methods and dispatches will return immediately. However a slow listener will cause the next event | ||
// dispatch to block. This is the opposite behavior from Manager. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for follow-up: that might be a great place to inject telemetry instrumentation to, in order to automatically calculate delays, especially if we give human readable names to subscribers
internal/events/target.go
Outdated
t.cancel(errors.New("target closed")) | ||
} | ||
|
||
// Dispatch dispatches an event to any listeners. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Dispatch dispatches an event to any listeners. | |
// Dispatch dispatches an event to all listeners. |
internal/events/target.go
Outdated
|
||
type ( | ||
// A Listener is a function that listens for events of type T. | ||
Listener[T any] func(T) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the contract with this function is that we allow listener to run for some time, we do not expect it to return almost immediatelly, correct?
if that's the case, do we maybe want to add context here for the sake of telemetry context propagation?
And also add the context to Dispatch(), so that if you're sending an event that's part of some operation in telemetry terms, you could collect spans from listeners as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably don't want a trace span that contains the entire execution stack of Pomerium, but sure I can add context to more places.
Personally I don't understand how context should be used with this package. I suspect naive usage will result in unexpected behavior. Like canceling the context in A leading to process B stopping unexpectedly. The purpose of events was to de-couple systems. We've now re-coupled them. |
go func() { | ||
for { | ||
select { | ||
case <-t.ctx.Done(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this case is missing a return
, to stop the goroutine from looping.
select { | ||
case <-t.ctx.Done(): | ||
case evt := <-ch: | ||
listener(evt.ctx, evt.event) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we also need some way to break out of the loop when ch
is closed? It looks like calling RemoveListener() will result in the listener being called with a zero event value.
Summary
Refactor the config Change Dispatcher to use an
events.Target
.events.Target
is a new events implementation that triggers events in separate goroutines so that config source subscribers can be called concurrently.For example if (1) generating envoyconfig takes 30 seconds, (2) rebuilding authorization takes 45 seconds and (3) updating XDS takes 60 seconds, previously we would've done 1+2+3 sequentially resulting in 135 seconds total time. We will do the authorization and XDS update concurrently 1+max(2,3) resulting in 90 seconds total time. (assuming cores are available)
In addition since updates are handled each in their goroutine, the next call to 1 can happen while the previous call to 2,3 is still completing, further reducing the amount of total time taken. (1b-1a)+max(2,3)
Related issues
Checklist
improvement
/bug
/ etc)