Skip to content

Commit

Permalink
Make API more usable for generic types
Browse files Browse the repository at this point in the history
  • Loading branch information
kelindar committed Dec 3, 2022
1 parent 0a1be6a commit a9b84bf
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 69 deletions.
18 changes: 8 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,39 +14,37 @@ This repository contains a **simple, in-process event dispatcher** to be used to
```go
// Various event types
const EventA = 0x01
const EventB = 0x02

// Event type for testing purposes
type myEvent struct{
kind uint32
Data string
}

// Type returns the event type
func (ev myEvent) Type() uint32 {
return ev.kind
return EventA
}
```

When publishing events, you can create a `Dispatcher[T]` which allows to `Publish()` and `Subscribe()` to various event types.
When publishing events, you can create a `Dispatcher` which is then used as a target of generic `event.Publish[T]()` and `event.Subscribe[T]()` functions to publish and subscribe to various event types respectively.

```go
bus := event.NewDispatcher[Event]()
bus := event.NewDispatcher()

// Subcribe to event A, and automatically unsubscribe at the end
defer bus.Subscribe(EventA, func(e Event) {
defer event.Subscribe(bus, func(e Event) {
println("(consumer 1)", e.Data)
})()

// Subcribe to event A, and automatically unsubscribe at the end
defer bus.Subscribe(EventA, func(e Event) {
defer event.Subscribe(bus, func(e Event) {
println("(consumer 2)", e.Data)
})()

// Publish few events
bus.Publish(newEventA("event 1"))
bus.Publish(newEventA("event 2"))
bus.Publish(newEventA("event 3"))
event.Publish(bus, newEventA("event 1"))
event.Publish(bus, newEventA("event 2"))
event.Publish(bus, newEventA("event 3"))
```

It should output something along these lines, where order is not guaranteed given that both subscribers are processing messages asyncrhonously.
Expand Down
88 changes: 62 additions & 26 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ package event

import (
"context"
"fmt"
"reflect"
"strings"
"sync"
)

Expand All @@ -16,56 +19,65 @@ type Event interface {
// ------------------------------------- Dispatcher -------------------------------------

// Dispatcher represents an event dispatcher.
type Dispatcher[T Event] struct {
type Dispatcher struct {
subs sync.Map
}

// NewDispatcher creates a new dispatcher of events.
func NewDispatcher[T Event]() *Dispatcher[T] {
return &Dispatcher[T]{}
func NewDispatcher() *Dispatcher {
return &Dispatcher{}
}

// loadOrStore finds a subscriber group or creates a new one
func (d *Dispatcher[T]) loadOrStore(key uint32) *group[T] {
s, _ := d.subs.LoadOrStore(key, new(group[T]))
return s.(*group[T])
// Subscribe subscribes to an event, the type of the event will be automatically
// inferred from the provided type. Must be constant for this to work.
func Subscribe[T Event](broker *Dispatcher, handler func(T)) context.CancelFunc {
var event T
return SubscribeTo(broker, event.Type(), handler)
}

// Publish writes an event into the dispatcher
func (d *Dispatcher[T]) Publish(ev T) {
if g, ok := d.subs.Load(ev.Type()); ok {
g.(*group[T]).Broadcast(ev)
}
}

// Subscribe subscribes to an callback event
func (d *Dispatcher[T]) Subscribe(eventType uint32, handler func(T)) context.CancelFunc {
// SubscribeTo subscribes to an event with the specified event type.
func SubscribeTo[T Event](broker *Dispatcher, eventType uint32, handler func(T)) context.CancelFunc {
ctx, cancel := context.WithCancel(context.Background())
sub := &consumer[T]{
queue: make(chan T, 1024),
exec: handler,
}

// Add to consumer group, if it doesn't exist it will create one
group := d.loadOrStore(eventType)
s, _ := broker.subs.LoadOrStore(eventType, new(group[T]))
group := groupOf[T](eventType, s)
group.Add(ctx, sub)

// Return unsubscribe function
return func() {
d.unsubscribe(eventType, sub) // Remove from the list
cancel() // Stop async processing
group.Del(sub)
cancel() // Stop async processing
}
}

// Count counts the number of subscribers
func (d *Dispatcher[T]) count(eventType uint32) int {
return len(d.loadOrStore(eventType).subs)
// Publish writes an event into the dispatcher
func Publish[T Event](broker *Dispatcher, ev T) {
if s, ok := broker.subs.Load(ev.Type()); ok {
group := groupOf[T](ev.Type(), s)
group.Broadcast(ev)
}
}

// Count counts the number of subscribers, this is for testing only.
func (d *Dispatcher) count(eventType uint32) int {
if group, ok := d.subs.Load(eventType); ok {
return group.(interface{ Count() int }).Count()
}
return 0
}

// unsubscribe removes the subscriber from the list of subscribers
func (d *Dispatcher[T]) unsubscribe(eventType uint32, sub *consumer[T]) {
group := d.loadOrStore(eventType)
group.Del(sub)
// groupOf casts the subscriber group to the specified generic type
func groupOf[T Event](eventType uint32, subs any) *group[T] {
if group, ok := subs.(*group[T]); ok {
return group
}

panic(errConflict[T](eventType, subs))
}

// ------------------------------------- Subscriber List -------------------------------------
Expand Down Expand Up @@ -127,3 +139,27 @@ func (s *group[T]) Del(sub *consumer[T]) {
}
s.subs = subs
}

// ------------------------------------- Debugging -------------------------------------

// Count returns the number of subscribers in this group
func (s *group[T]) Count() int {
return len(s.subs)
}

// String returns string representation of the type
func (s *group[T]) String() string {
typ := reflect.TypeOf(s).String()
idx := strings.LastIndex(typ, "/")
typ = typ[idx+1 : len(typ)-1]
return typ
}

// errConflict returns a conflict message
func errConflict[T any](eventType uint32, existing any) string {
var want T
return fmt.Sprintf(
"conflicting event type, want=<%T>, registered=<%s>, event=0x%v",
want, existing, eventType,
)
}
72 changes: 49 additions & 23 deletions event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,61 +11,62 @@ import (

/*
cpu: Intel(R) Core(TM) i7-9700K CPU @ 3.60GHz
BenchmarkEvent/1-consumers-8 10021444 119.1 ns/op 10021301 msg 0 B/op 0 allocs/op
BenchmarkEvent/10-consumers-8 799999 1595 ns/op 7999915 msg 0 B/op 0 allocs/op
BenchmarkEvent/100-consumers-8 99048 14308 ns/op 9904769 msg 0 B/op 0 allocs/op
BenchmarkEvent/1-consumers-8 10240418 116.8 ns/op 10240023 msg 0 B/op 0 allocs/op
BenchmarkEvent/10-consumers-8 923197 1396 ns/op 9231961 msg 0 B/op 0 allocs/op
BenchmarkEvent/100-consumers-8 97951 12699 ns/op 9795055 msg 0 B/op 0 allocs/op
*/
func BenchmarkEvent(b *testing.B) {
for _, subs := range []int{1, 10, 100} {
b.Run(fmt.Sprintf("%d-consumers", subs), func(b *testing.B) {
var count uint64
d := NewDispatcher[testEvent]()
d := NewDispatcher()
for i := 0; i < subs; i++ {
defer d.Subscribe(TestEventType, func(ev testEvent) {
defer Subscribe(d, func(ev MyEvent1) {
atomic.AddUint64(&count, 1)
})()
}

b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
d.Publish(testEvent{})
Publish(d, MyEvent1{})
}
b.ReportMetric(float64(count), "msg")
})
}
}

func TestPublish(t *testing.T) {
d := NewDispatcher[testEvent]()
d := NewDispatcher()
var wg sync.WaitGroup

// Subscribe
var count int64
defer d.Subscribe(TestEventType, func(ev testEvent) {
defer Subscribe(d, func(ev MyEvent1) {
atomic.AddInt64(&count, 1)
wg.Done()
})()

// Publish
wg.Add(2)
d.Publish(testEvent{})
d.Publish(testEvent{})
Publish(d, MyEvent1{})
Publish(d, MyEvent1{})

// Wait and check
wg.Wait()
assert.Equal(t, int64(2), count)
}

func TestUnsubscribe(t *testing.T) {
d := NewDispatcher[testEvent]()
unsubscribe := d.Subscribe(TestEventType, func(ev testEvent) {
d := NewDispatcher()
assert.Equal(t, 0, d.count(TypeEvent1))
unsubscribe := Subscribe(d, func(ev MyEvent1) {
// Nothing
})

assert.Equal(t, 1, d.count(TestEventType))
assert.Equal(t, 1, d.count(TypeEvent1))
unsubscribe()
assert.Equal(t, 0, d.count(TestEventType))
assert.Equal(t, 0, d.count(TypeEvent1))
}

func TestConcurrent(t *testing.T) {
Expand All @@ -74,8 +75,8 @@ func TestConcurrent(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)

d := NewDispatcher[testEvent]()
defer d.Subscribe(TestEventType, func(ev testEvent) {
d := NewDispatcher()
defer Subscribe(d, func(ev MyEvent1) {
if current := atomic.AddInt64(&count, 1); current == max {
wg.Done()
}
Expand All @@ -84,24 +85,49 @@ func TestConcurrent(t *testing.T) {
// Asynchronously publish
go func() {
for i := 0; i < max; i++ {
d.Publish(testEvent{})
Publish(d, MyEvent1{})
}
}()

defer d.Subscribe(TestEventType, func(ev testEvent) {
defer Subscribe(d, func(ev MyEvent1) {
// Subscriber that does nothing
})()

wg.Wait()
assert.Equal(t, max, int(count))
}

// ------------------------------------- Test Event -------------------------------------
func TestSubscribeDifferentType(t *testing.T) {
d := NewDispatcher()
assert.Panics(t, func() {
SubscribeTo(d, TypeEvent1, func(ev MyEvent1) {})
SubscribeTo(d, TypeEvent1, func(ev MyEvent2) {})
})
}

func TestPublishDifferentType(t *testing.T) {
d := NewDispatcher()
assert.Panics(t, func() {
SubscribeTo(d, TypeEvent1, func(ev MyEvent2) {})
Publish(d, MyEvent1{})
})
}

const TestEventType = 0xff
// ------------------------------------- Test Events -------------------------------------

type testEvent struct{}
const (
TypeEvent1 = 0x1
TypeEvent2 = 0x2
)

func (testEvent) Type() uint32 {
return TestEventType
type MyEvent1 struct {
Number int
}

func (t MyEvent1) Type() uint32 { return TypeEvent1 }

type MyEvent2 struct {
Text string
}

func (t MyEvent2) Type() uint32 { return TypeEvent2 }
18 changes: 8 additions & 10 deletions example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,41 +11,39 @@ import (

// Various event types
const EventA = 0x01
const EventB = 0x02

// Event type for testing purposes
type Event struct {
kind uint32
Data string
}

// Type returns the event type
func (ev Event) Type() uint32 {
return ev.kind
return EventA
}

// newEventA creates a new instance of an event
func newEventA(data string) Event {
return Event{kind: EventA, Data: data}
return Event{Data: data}
}

func main() {
bus := event.NewDispatcher[Event]()
bus := event.NewDispatcher()

// Subcribe to event A, and automatically unsubscribe at the end
defer bus.Subscribe(EventA, func(e Event) {
defer event.SubscribeTo(bus, EventA, func(e Event) {
println("(consumer 1)", e.Data)
})()

// Subcribe to event A, and automatically unsubscribe at the end
defer bus.Subscribe(EventA, func(e Event) {
defer event.SubscribeTo(bus, EventA, func(e Event) {
println("(consumer 2)", e.Data)
})()

// Publish few events
bus.Publish(newEventA("event 1"))
bus.Publish(newEventA("event 2"))
bus.Publish(newEventA("event 3"))
event.Publish(bus, newEventA("event 1"))
event.Publish(bus, newEventA("event 2"))
event.Publish(bus, newEventA("event 3"))

time.Sleep(10 * time.Millisecond)
}

0 comments on commit a9b84bf

Please sign in to comment.