-
Notifications
You must be signed in to change notification settings - Fork 278
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
core/config: refactor change dispatcher #4657
Changes from 3 commits
ddb2b41
cf5b009
e18a480
2336612
d1350f0
328d2d5
1b03347
7ea4b6f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,160 @@ | ||||||
package events | ||||||
|
||||||
import ( | ||||||
"context" | ||||||
"sync" | ||||||
|
||||||
"github.com/google/uuid" | ||||||
) | ||||||
|
||||||
type ( | ||||||
// A Listener is a function that listens for events of type T. | ||||||
Listener[T any] func(T) | ||||||
// A Handle represents a listener. | ||||||
Handle string | ||||||
|
||||||
addListenerEvent[T any] struct { | ||||||
listener Listener[T] | ||||||
handle Handle | ||||||
} | ||||||
removeListenerEvent[T any] struct { | ||||||
handle Handle | ||||||
} | ||||||
dispatchEvent[T any] struct { | ||||||
event T | ||||||
} | ||||||
) | ||||||
|
||||||
// A Target is a target for events. | ||||||
// | ||||||
// Listeners are added with AddListener with a function to be called when the event occurs. | ||||||
// AddListener returns a Handle which can be used to remove a listener with RemoveListener. | ||||||
// | ||||||
// Dispatch dispatches events to all the registered listeners. | ||||||
// | ||||||
// Target is safe to use in its zero state. | ||||||
// | ||||||
// The first time any method of Target is called a background goroutine is started that handles | ||||||
// any requests and maintains the state of the listeners. Each listener also starts a | ||||||
// separate goroutine so that all listeners can be invoked concurrently. | ||||||
// | ||||||
// The channels to the main goroutine and to the listener goroutines have a size of 1 so typically | ||||||
// methods and dispatches will return immediately. However a slow listener will cause the next event | ||||||
// dispatch to block. This is the opposite behavior from Manager. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for follow-up: that might be a great place to inject telemetry instrumentation to, in order to automatically calculate delays, especially if we give human readable names to subscribers |
||||||
// | ||||||
// Close will cancel all the goroutines. Subsequent calls to AddListener, RemoveListener, Close and | ||||||
// Dispatch are no-ops. | ||||||
type Target[T any] struct { | ||||||
initOnce sync.Once | ||||||
ctx context.Context | ||||||
cancel context.CancelFunc | ||||||
calebdoxsey marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
addListenerCh chan addListenerEvent[T] | ||||||
removeListenerCh chan removeListenerEvent[T] | ||||||
dispatchCh chan dispatchEvent[T] | ||||||
listeners map[Handle]chan T | ||||||
} | ||||||
|
||||||
// AddListener adds a listener to the target. | ||||||
func (t *Target[T]) AddListener(listener Listener[T]) Handle { | ||||||
t.init() | ||||||
|
||||||
// using a handle is necessary because you can't use a function as a map key. | ||||||
handle := Handle(uuid.NewString()) | ||||||
|
||||||
select { | ||||||
case <-t.ctx.Done(): | ||||||
case t.addListenerCh <- addListenerEvent[T]{listener, handle}: | ||||||
} | ||||||
|
||||||
return handle | ||||||
} | ||||||
|
||||||
// Close closes the event target. This can be called multiple times safely. | ||||||
// Once closed the target cannot be used. | ||||||
func (t *Target[T]) Close() { | ||||||
t.init() | ||||||
|
||||||
t.cancel() | ||||||
} | ||||||
|
||||||
// Dispatch dispatches an event to any listeners. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
func (t *Target[T]) Dispatch(evt T) { | ||||||
t.init() | ||||||
|
||||||
select { | ||||||
case <-t.ctx.Done(): | ||||||
case t.dispatchCh <- dispatchEvent[T]{evt}: | ||||||
} | ||||||
} | ||||||
|
||||||
// RemoveListener removes a listener from the target. | ||||||
func (t *Target[T]) RemoveListener(handle Handle) { | ||||||
t.init() | ||||||
|
||||||
select { | ||||||
case <-t.ctx.Done(): | ||||||
case t.removeListenerCh <- removeListenerEvent[T]{handle}: | ||||||
} | ||||||
} | ||||||
|
||||||
func (t *Target[T]) init() { | ||||||
t.initOnce.Do(func() { | ||||||
t.ctx, t.cancel = context.WithCancel(context.Background()) | ||||||
t.addListenerCh = make(chan addListenerEvent[T], 1) | ||||||
t.removeListenerCh = make(chan removeListenerEvent[T], 1) | ||||||
t.dispatchCh = make(chan dispatchEvent[T], 1) | ||||||
t.listeners = map[Handle]chan T{} | ||||||
go t.run() | ||||||
}) | ||||||
} | ||||||
|
||||||
func (t *Target[T]) run() { | ||||||
// listen for add/remove/dispatch events and call functions | ||||||
for { | ||||||
select { | ||||||
case <-t.ctx.Done(): | ||||||
return | ||||||
wasaga marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
case evt := <-t.addListenerCh: | ||||||
t.addListener(evt.listener, evt.handle) | ||||||
case evt := <-t.removeListenerCh: | ||||||
t.removeListener(evt.handle) | ||||||
case evt := <-t.dispatchCh: | ||||||
t.dispatch(evt.event) | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
// these functions are not thread-safe. They are intended to be called only by "run". | ||||||
|
||||||
func (t *Target[T]) addListener(listener Listener[T], handle Handle) { | ||||||
ch := make(chan T, 1) | ||||||
t.listeners[handle] = ch | ||||||
// start a goroutine to send events to the listener | ||||||
go func() { | ||||||
for evt := range ch { | ||||||
listener(evt) | ||||||
} | ||||||
}() | ||||||
} | ||||||
|
||||||
func (t *Target[T]) removeListener(handle Handle) { | ||||||
ch, ok := t.listeners[handle] | ||||||
if !ok { | ||||||
// nothing to do since the listener doesn't exist | ||||||
return | ||||||
} | ||||||
// close the channel to kill the goroutine | ||||||
close(ch) | ||||||
delete(t.listeners, handle) | ||||||
} | ||||||
|
||||||
func (t *Target[T]) dispatch(evt T) { | ||||||
// loop over all the listeners and send the event to them | ||||||
for _, ch := range t.listeners { | ||||||
select { | ||||||
case <-t.ctx.Done(): | ||||||
return | ||||||
case ch <- evt: | ||||||
} | ||||||
} | ||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
package events_test | ||
|
||
import ( | ||
"sync/atomic" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/assert" | ||
|
||
"github.com/pomerium/pomerium/internal/events" | ||
) | ||
|
||
func TestTarget(t *testing.T) { | ||
t.Parallel() | ||
|
||
var target events.Target[int64] | ||
defer target.Close() | ||
|
||
var calls1, calls2, calls3 atomic.Int64 | ||
h1 := target.AddListener(func(i int64) { | ||
calls1.Add(i) | ||
}) | ||
h2 := target.AddListener(func(i int64) { | ||
calls2.Add(i) | ||
}) | ||
h3 := target.AddListener(func(i int64) { | ||
calls3.Add(i) | ||
}) | ||
|
||
shouldBe := func(i1, i2, i3 int64) { | ||
t.Helper() | ||
|
||
assert.Eventually(t, func() bool { return calls1.Load() == i1 }, time.Millisecond*10, time.Microsecond*100) | ||
assert.Eventually(t, func() bool { return calls2.Load() == i2 }, time.Millisecond*10, time.Microsecond*100) | ||
assert.Eventually(t, func() bool { return calls3.Load() == i3 }, time.Millisecond*10, time.Microsecond*100) | ||
} | ||
|
||
target.Dispatch(1) | ||
shouldBe(1, 1, 1) | ||
|
||
target.RemoveListener(h2) | ||
target.Dispatch(2) | ||
shouldBe(3, 1, 3) | ||
|
||
target.RemoveListener(h1) | ||
target.Dispatch(3) | ||
shouldBe(3, 1, 6) | ||
|
||
target.RemoveListener(h3) | ||
target.Dispatch(4) | ||
shouldBe(3, 1, 6) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the contract with this function is that we allow listener to run for some time, we do not expect it to return almost immediatelly, correct?
if that's the case, do we maybe want to add context here for the sake of telemetry context propagation?
And also add the context to Dispatch(), so that if you're sending an event that's part of some operation in telemetry terms, you could collect spans from listeners as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably don't want a trace span that contains the entire execution stack of Pomerium, but sure I can add context to more places.