Skip to content

Commit

Permalink
Use sync.Cond instead of channel for broadcast (#4)
Browse files Browse the repository at this point in the history
This PR removes the usage of channel for broadcast and instead makes use of `sync.Cond` primitive directly. Subscribers are also flushed periodically to avoid lock contention. This results in ~2-4x performance improvement depending on the conditions.
  • Loading branch information
kelindar committed Dec 26, 2022
1 parent 51911ab commit 5ac9b3a
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 79 deletions.
52 changes: 33 additions & 19 deletions default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,47 @@ import (
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

/*
cpu: Intel(R) Core(TM) i7-9700K CPU @ 3.60GHz
BenchmarkDefault/1-consumers-8 9965634 120.6 ns/op 9965601 msg 0 B/op 0 allocs/op
BenchmarkDefault/10-consumers-8 800031 1365 ns/op 8000305 msg 0 B/op 0 allocs/op
BenchmarkDefault/100-consumers-8 82742 15891 ns/op 8274183 msg 0 B/op 0 allocs/op
BenchmarkEvent/1x1-8 22845045 48.90 ns/op 20399340 ev/s 1 B/op 0 allocs/op
BenchmarkEvent/1x10-8 5448380 212.2 ns/op 46183805 ev/s 47 B/op 0 allocs/op
BenchmarkEvent/1x100-8 631585 1876 ns/op 50592945 ev/s 377 B/op 0 allocs/op
BenchmarkEvent/10x1-8 2284117 503.2 ns/op 19805320 ev/s 11 B/op 0 allocs/op
BenchmarkEvent/10x10-8 521727 2041 ns/op 47717331 ev/s 373 B/op 0 allocs/op
BenchmarkEvent/10x100-8 68062 17626 ns/op 55497296 ev/s 2707 B/op 0 allocs/op
*/
func BenchmarkDefault(b *testing.B) {
for _, subs := range []int{1, 10, 100} {
b.Run(fmt.Sprintf("%d-consumers", subs), func(b *testing.B) {
var count uint64
for i := 0; i < subs; i++ {
defer On(func(ev MyEvent1) {
atomic.AddUint64(&count, 1)
})()
}
func BenchmarkEvent(b *testing.B) {
for _, topics := range []int{1, 10} {
for _, subs := range []int{1, 10, 100} {
b.Run(fmt.Sprintf("%dx%d", topics, subs), func(b *testing.B) {
var count atomic.Int64
for i := 0; i < subs; i++ {
for id := 10; id < 10+topics; id++ {
defer OnType(uint32(id), func(ev MyEvent3) {
count.Add(1)
})()
}
}

b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
Emit(MyEvent1{})
}
b.ReportMetric(float64(count), "msg")
})
start := time.Now()
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
for id := 10; id < 10+topics; id++ {
Emit(MyEvent3{ID: id})
}
}

elapsed := time.Since(start)
rate := float64(count.Load()) / elapsed.Seconds()
b.ReportMetric(rate, "ev/s")
})
}
}
}

Expand Down
122 changes: 95 additions & 27 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"reflect"
"strings"
"sync"
"time"
)

// Event represents an event contract
Expand All @@ -21,11 +22,32 @@ type Event interface {
// Dispatcher represents an event dispatcher.
type Dispatcher struct {
subs sync.Map
done chan struct{} // Cancellation
df time.Duration // Flush interval
}

// NewDispatcher creates a new dispatcher of events.
func NewDispatcher() *Dispatcher {
return &Dispatcher{}
return &Dispatcher{
df: 500 * time.Microsecond,
done: make(chan struct{}),
}
}

// Close closes the dispatcher
func (d *Dispatcher) Close() error {
close(d.done)
return nil
}

// isClosed returns whether the dispatcher is closed or not
func (d *Dispatcher) isClosed() bool {
select {
case <-d.done:
return true
default:
return false
}
}

// Subscribe subscribes to an event, the type of the event will be automatically
Expand All @@ -37,21 +59,25 @@ func Subscribe[T Event](broker *Dispatcher, handler func(T)) context.CancelFunc

// SubscribeTo subscribes to an event with the specified event type.
func SubscribeTo[T Event](broker *Dispatcher, eventType uint32, handler func(T)) context.CancelFunc {
ctx, cancel := context.WithCancel(context.Background())
sub := &consumer[T]{
queue: make(chan T, 1024),
exec: handler,
if broker.isClosed() {
panic(errClosed)
}

// Add to consumer group, if it doesn't exist it will create one
s, _ := broker.subs.LoadOrStore(eventType, new(group[T]))
s, loaded := broker.subs.LoadOrStore(eventType, &group[T]{
cond: sync.NewCond(new(sync.Mutex)),
})
group := groupOf[T](eventType, s)
group.Add(ctx, sub)
sub := group.Add(handler)

// Start flushing asynchronously if we just created a new group
if !loaded {
go group.Process(broker.df, broker.done)
}

// Return unsubscribe function
return func() {
group.Del(sub)
cancel() // Stop async processing
}
}

Expand Down Expand Up @@ -80,57 +106,97 @@ func groupOf[T Event](eventType uint32, subs any) *group[T] {
panic(errConflict[T](eventType, subs))
}

// ------------------------------------- Subscriber List -------------------------------------
// ------------------------------------- Subscriber -------------------------------------

// consumer represents a consumer with a message queue
type consumer[T Event] struct {
queue chan T // Message buffer
exec func(T) // Process callback
queue []T // Current work queue
stop bool // Stop signal
}

// Listen listens to the event queue and processes events
func (s *consumer[T]) Listen(ctx context.Context) {
func (s *consumer[T]) Listen(c *sync.Cond, fn func(T)) {
pending := make([]T, 0, 128)

for {
select {
case ev := <-s.queue:
s.exec(ev)
case <-ctx.Done():
return
c.L.Lock()
for len(s.queue) == 0 {
switch {
case s.stop:
c.L.Unlock()
return
default:
c.Wait()
}
}

// Swap buffers and reset the current queue
temp := s.queue
s.queue = pending
pending = temp
s.queue = s.queue[:0]
c.L.Unlock()

// Outside of the critical section, process the work
for i := 0; i < len(pending); i++ {
fn(pending[i])
}
}
}

// ------------------------------------- Subscriber Group -------------------------------------

// group represents a consumer group
type group[T Event] struct {
sync.RWMutex
cond *sync.Cond
subs []*consumer[T]
}

// Process periodically broadcasts events
func (s *group[T]) Process(interval time.Duration, done chan struct{}) {
ticker := time.NewTicker(interval)
for {
select {
case <-done:
return
case <-ticker.C:
s.cond.Broadcast()
}
}
}

// Broadcast sends an event to all consumers
func (s *group[T]) Broadcast(ev T) {
s.RLock()
defer s.RUnlock()
s.cond.L.Lock()
for _, sub := range s.subs {
sub.queue <- ev
sub.queue = append(sub.queue, ev)
}
s.cond.L.Unlock()
}

// Add adds a subscriber to the list
func (s *group[T]) Add(ctx context.Context, sub *consumer[T]) {
go sub.Listen(ctx)
func (s *group[T]) Add(handler func(T)) *consumer[T] {
sub := &consumer[T]{
queue: make([]T, 0, 128),
}

// Add the consumer to the list of active consumers
s.Lock()
s.cond.L.Lock()
s.subs = append(s.subs, sub)
s.Unlock()
s.cond.L.Unlock()

// Start listening
go sub.Listen(s.cond, handler)
return sub
}

// Del removes a subscriber from the list
func (s *group[T]) Del(sub *consumer[T]) {
s.Lock()
defer s.Unlock()
s.cond.L.Lock()
defer s.cond.L.Unlock()

// Search and remove the subscriber
sub.stop = true
subs := make([]*consumer[T], 0, len(s.subs))
for _, v := range s.subs {
if v != sub {
Expand All @@ -142,6 +208,8 @@ func (s *group[T]) Del(sub *consumer[T]) {

// ------------------------------------- Debugging -------------------------------------

var errClosed = fmt.Errorf("event dispatcher is closed")

// Count returns the number of subscribers in this group
func (s *group[T]) Count() int {
return len(s.subs)
Expand Down
89 changes: 56 additions & 33 deletions event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,52 +12,26 @@ import (
"github.com/stretchr/testify/assert"
)

/*
cpu: Intel(R) Core(TM) i7-9700K CPU @ 3.60GHz
BenchmarkEvent/1-consumers-8 10240418 116.8 ns/op 10240023 msg 0 B/op 0 allocs/op
BenchmarkEvent/10-consumers-8 923197 1396 ns/op 9231961 msg 0 B/op 0 allocs/op
BenchmarkEvent/100-consumers-8 97951 12699 ns/op 9795055 msg 0 B/op 0 allocs/op
*/
func BenchmarkEvent(b *testing.B) {
for _, subs := range []int{1, 10, 100} {
b.Run(fmt.Sprintf("%d-consumers", subs), func(b *testing.B) {
var count uint64
d := NewDispatcher()
for i := 0; i < subs; i++ {
defer Subscribe(d, func(ev MyEvent1) {
atomic.AddUint64(&count, 1)
})()
}

b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
Publish(d, MyEvent1{})
}
b.ReportMetric(float64(count), "msg")
})
}
}

func TestPublish(t *testing.T) {
d := NewDispatcher()
var wg sync.WaitGroup

// Subscribe
// Subscribe, must be received in order
var count int64
defer Subscribe(d, func(ev MyEvent1) {
atomic.AddInt64(&count, 1)
assert.Equal(t, int(atomic.AddInt64(&count, 1)), ev.Number)
wg.Done()
})()

// Publish
wg.Add(2)
Publish(d, MyEvent1{})
Publish(d, MyEvent1{})
wg.Add(3)
Publish(d, MyEvent1{Number: 1})
Publish(d, MyEvent1{Number: 2})
Publish(d, MyEvent1{Number: 3})

// Wait and check
wg.Wait()
assert.Equal(t, int64(2), count)
assert.Equal(t, int64(3), count)
}

func TestUnsubscribe(t *testing.T) {
Expand Down Expand Up @@ -116,6 +90,49 @@ func TestPublishDifferentType(t *testing.T) {
})
}

func TestCloseDispatcher(t *testing.T) {
d := NewDispatcher()
defer SubscribeTo(d, TypeEvent1, func(ev MyEvent2) {})()

assert.NoError(t, d.Close())
assert.Panics(t, func() {
SubscribeTo(d, TypeEvent1, func(ev MyEvent2) {})
})
}

func TestMatrix(t *testing.T) {
const amount = 1000
for _, subs := range []int{1, 10, 100} {
for _, topics := range []int{1, 10} {
expected := subs * topics * amount
t.Run(fmt.Sprintf("%dx%d", topics, subs), func(t *testing.T) {
var count atomic.Int64
var wg sync.WaitGroup
wg.Add(expected)

d := NewDispatcher()
for i := 0; i < subs; i++ {
for id := 0; id < topics; id++ {
defer SubscribeTo(d, uint32(id), func(ev MyEvent3) {
count.Add(1)
wg.Done()
})()
}
}

for n := 0; n < amount; n++ {
for id := 0; id < topics; id++ {
go Publish(d, MyEvent3{ID: id})
}
}

wg.Wait()
assert.Equal(t, expected, int(count.Load()))
})
}
}
}

// ------------------------------------- Test Events -------------------------------------

const (
Expand All @@ -134,3 +151,9 @@ type MyEvent2 struct {
}

func (t MyEvent2) Type() uint32 { return TypeEvent2 }

type MyEvent3 struct {
ID int
}

func (t MyEvent3) Type() uint32 { return uint32(t.ID) }

0 comments on commit 5ac9b3a

Please sign in to comment.