Skip to content

Commit

Permalink
periodic flush
Browse files Browse the repository at this point in the history
  • Loading branch information
kelindar committed Dec 26, 2022
1 parent 8db5091 commit 8cbf8f5
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 9 deletions.
12 changes: 6 additions & 6 deletions default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ import (

/*
cpu: Intel(R) Core(TM) i7-9700K CPU @ 3.60GHz
BenchmarkEvent/1x1-8 21140456 61.46 ns/op 16270375 ev/s 0 B/op 0 allocs/op
BenchmarkEvent/1x10-8 2968579 404.6 ns/op 24713378 ev/s 0 B/op 0 allocs/op
BenchmarkEvent/1x100-8 333204 3591 ns/op 27848968 ev/s 15 B/op 0 allocs/op
BenchmarkEvent/10x1-8 685381 1776 ns/op 5630000 ev/s 0 B/op 0 allocs/op
BenchmarkEvent/10x10-8 115762 12810 ns/op 7806533 ev/s 3 B/op 0 allocs/op
BenchmarkEvent/10x100-8 28773 48305 ns/op 20700046 ev/s 239 B/op 0 allocs/op
BenchmarkEvent/1x1-8 22845045 48.90 ns/op 20399340 ev/s 1 B/op 0 allocs/op
BenchmarkEvent/1x10-8 5448380 212.2 ns/op 46183805 ev/s 47 B/op 0 allocs/op
BenchmarkEvent/1x100-8 631585 1876 ns/op 50592945 ev/s 377 B/op 0 allocs/op
BenchmarkEvent/10x1-8 2284117 503.2 ns/op 19805320 ev/s 11 B/op 0 allocs/op
BenchmarkEvent/10x10-8 521727 2041 ns/op 47717331 ev/s 373 B/op 0 allocs/op
BenchmarkEvent/10x100-8 68062 17626 ns/op 55497296 ev/s 2707 B/op 0 allocs/op
*/
func BenchmarkEvent(b *testing.B) {
for _, topics := range []int{1, 10} {
Expand Down
50 changes: 47 additions & 3 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"reflect"
"strings"
"sync"
"time"
)

// Event represents an event contract
Expand All @@ -21,11 +22,32 @@ type Event interface {
// Dispatcher represents an event dispatcher.
type Dispatcher struct {
subs sync.Map
done chan struct{} // Cancellation
df time.Duration // Flush interval
}

// NewDispatcher creates a new dispatcher of events.
func NewDispatcher() *Dispatcher {
return &Dispatcher{}
return &Dispatcher{
df: 500 * time.Microsecond,
done: make(chan struct{}),
}
}

// Close closes the dispatcher
func (d *Dispatcher) Close() error {
close(d.done)
return nil
}

// isClosed returns whether the dispatcher is closed or not
func (d *Dispatcher) isClosed() bool {
select {
case <-d.done:
return true
default:
return false
}
}

// Subscribe subscribes to an event, the type of the event will be automatically
Expand All @@ -37,14 +59,22 @@ func Subscribe[T Event](broker *Dispatcher, handler func(T)) context.CancelFunc

// SubscribeTo subscribes to an event with the specified event type.
func SubscribeTo[T Event](broker *Dispatcher, eventType uint32, handler func(T)) context.CancelFunc {
if broker.isClosed() {
panic(errClosed)
}

// Add to consumer group, if it doesn't exist it will create one
s, _ := broker.subs.LoadOrStore(eventType, &group[T]{
s, loaded := broker.subs.LoadOrStore(eventType, &group[T]{
cond: sync.NewCond(new(sync.Mutex)),
})
group := groupOf[T](eventType, s)
sub := group.Add(handler)

// Start flushing asynchronously if we just created a new group
if !loaded {
go group.Process(broker.df, broker.done)
}

// Return unsubscribe function
return func() {
group.Del(sub)
Expand Down Expand Up @@ -122,14 +152,26 @@ type group[T Event] struct {
subs []*consumer[T]
}

// Process periodically broadcasts events
func (s *group[T]) Process(interval time.Duration, done chan struct{}) {
ticker := time.NewTicker(interval)
for {
select {
case <-done:
return
case <-ticker.C:
s.cond.Broadcast()
}
}
}

// Broadcast sends an event to all consumers
func (s *group[T]) Broadcast(ev T) {
s.cond.L.Lock()
for _, sub := range s.subs {
sub.queue = append(sub.queue, ev)
}
s.cond.L.Unlock()
s.cond.Broadcast()
}

// Add adds a subscriber to the list
Expand Down Expand Up @@ -166,6 +208,8 @@ func (s *group[T]) Del(sub *consumer[T]) {

// ------------------------------------- Debugging -------------------------------------

var errClosed = fmt.Errorf("event dispatcher is closed")

// Count returns the number of subscribers in this group
func (s *group[T]) Count() int {
return len(s.subs)
Expand Down
10 changes: 10 additions & 0 deletions event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,16 @@ func TestPublishDifferentType(t *testing.T) {
})
}

func TestCloseDispatcher(t *testing.T) {
d := NewDispatcher()
defer SubscribeTo(d, TypeEvent1, func(ev MyEvent2) {})()

assert.NoError(t, d.Close())
assert.Panics(t, func() {
SubscribeTo(d, TypeEvent1, func(ev MyEvent2) {})
})
}

func TestMatrix(t *testing.T) {
const amount = 1000
for _, subs := range []int{1, 10, 100} {
Expand Down

0 comments on commit 8cbf8f5

Please sign in to comment.