Skip to content

Commit

Permalink
Revamp API, use go 1.18 (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
kelindar committed Jun 18, 2022
1 parent ec58c2e commit 0a1be6a
Show file tree
Hide file tree
Showing 10 changed files with 395 additions and 150 deletions.
1 change: 1 addition & 0 deletions .github/FUNDING.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
github: [kelindar]
Binary file added .github/logo.pdn
Binary file not shown.
Binary file added .github/logo.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
29 changes: 29 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
name: Test
on: [push, pull_request]
env:
GITHUB_TOKEN: ${{ secrets.COVERALLS_TOKEN }}
GO111MODULE: "on"
jobs:
test:
name: Test with Coverage
runs-on: ubuntu-latest
strategy:
matrix:
go: ["1.18"]
steps:
- name: Set up Go ${{ matrix.go }}
uses: actions/setup-go@v3
with:
go-version: ${{ matrix.go }}
- name: Check out code
uses: actions/checkout@v3
- name: Install dependencies
run: |
go mod download
- name: Run Unit Tests
run: |
go test -race -covermode atomic -coverprofile=profile.cov .
- name: Upload Coverage
uses: shogo82148/actions-goveralls@v1
with:
path-to-profile: profile.cov
90 changes: 70 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,70 @@
# Event Bus

This repository contains a simple in-process event bus to be used to decouple internal modules.

```
var ev event.Bus
// Subscribe to events
var count int
cancel := ev.On("event1", func(v interface{}) {
count += v.(int)
})
// Notify events
ev.Notify("event1", 1)
ev.Notify("event1", 1)
// Unsubscribe from "event1"
cancel()
```
<p align="center">
<img width="330" height="110" src=".github/logo.png" border="0" alt="kelindar/event">
<br>
<img src="https://img.shields.io/github/go-mod/go-version/kelindar/event" alt="Go Version">
<a href="https://pkg.go.dev/github.com/kelindar/event"><img src="https://pkg.go.dev/badge/github.com/kelindar/event" alt="PkgGoDev"></a>
<a href="https://opensource.org/licenses/MIT"><img src="https://img.shields.io/badge/License-MIT-blue.svg" alt="License"></a>
<a href="https://coveralls.io/github/kelindar/event"><img src="https://coveralls.io/repos/github/kelindar/event/badge.svg" alt="Coverage"></a>
</p>

## Generic In-Process Pub/Sub

This repository contains a **simple, in-process event dispatcher** to be used to decouple internal modules. It provides a generic way to define events, publish and subscribe to them.

```go
// Various event types
const EventA = 0x01
const EventB = 0x02

// Event type for testing purposes
type myEvent struct{
kind uint32
Data string
}

// Type returns the event type
func (ev myEvent) Type() uint32 {
return ev.kind
}
```

When publishing events, you can create a `Dispatcher[T]` which allows to `Publish()` and `Subscribe()` to various event types.

```go
bus := event.NewDispatcher[Event]()

// Subcribe to event A, and automatically unsubscribe at the end
defer bus.Subscribe(EventA, func(e Event) {
println("(consumer 1)", e.Data)
})()

// Subcribe to event A, and automatically unsubscribe at the end
defer bus.Subscribe(EventA, func(e Event) {
println("(consumer 2)", e.Data)
})()

// Publish few events
bus.Publish(newEventA("event 1"))
bus.Publish(newEventA("event 2"))
bus.Publish(newEventA("event 3"))
```

It should output something along these lines, where order is not guaranteed given that both subscribers are processing messages asyncrhonously.

```
(consumer 2) event 1
(consumer 2) event 2
(consumer 2) event 3
(consumer 1) event 1
(consumer 1) event 2
(consumer 1) event 3
```

## Benchmarks

```
cpu: Intel(R) Core(TM) i7-9700K CPU @ 3.60GHz
BenchmarkEvent/1-consumers-8 10021444 119.1 ns/op 10021301 msg 0 B/op 0 allocs/op
BenchmarkEvent/10-consumers-8 799999 1595 ns/op 7999915 msg 0 B/op 0 allocs/op
BenchmarkEvent/100-consumers-8 99048 14308 ns/op 9904769 msg 0 B/op 0 allocs/op
```
223 changes: 129 additions & 94 deletions event.go
Original file line number Diff line number Diff line change
@@ -1,94 +1,129 @@
// Copyright (c) Roman Atachiants and contributore. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for detaile.

package event

import (
"context"
"sync"
)

// Bus represents an event bus.
type Bus struct {
lock sync.RWMutex
subs map[string][]*handler
}

func (e *Bus) setup() {
if e.subs == nil {
e.subs = make(map[string][]*handler, 8)
}
}

// Notify notifies listeners of an event that happened
func (e *Bus) Notify(event string, value interface{}) {
e.lock.RLock()
defer e.lock.RUnlock()

e.setup()
if handlers, ok := e.subs[event]; ok {
for _, h := range handlers {
h.buffer <- value
}
}
}

// On registers an event listener on a system
func (e *Bus) On(event string, callback func(interface{})) context.CancelFunc {
e.lock.Lock()
defer e.lock.Unlock()

// Create the handler
e.setup()
ctx, cancel := context.WithCancel(context.Background())
subscriber := &handler{
buffer: make(chan interface{}, 1),
callback: &callback,
cancel: cancel,
}

// Add the listener
e.subs[event] = append(e.subs[event], subscriber)
go subscriber.listen(ctx)

return e.unsubscribe(event, &callback)
}

// unsubscribe deregisters an event listener from a system
func (e *Bus) unsubscribe(event string, callback *func(interface{})) context.CancelFunc {
return func() {
e.lock.Lock()
defer e.lock.Unlock()

if handlers, ok := e.subs[event]; ok {
clean := make([]*handler, 0, len(handlers))
for _, h := range handlers {
if h.callback != callback { // Compare address
clean = append(clean, h)
} else {
h.cancel()
}
}
}
}
}

// -------------------------------------------------------------------------------------------

type handler struct {
buffer chan interface{}
callback *func(interface{})
cancel context.CancelFunc
}

// Listen listens on the buffer and invokes the callback
func (h *handler) listen(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case value := <-h.buffer:
(*h.callback)(value)
}
}
}
// Copyright (c) Roman Atachiants and contributore. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for detaile.

package event

import (
"context"
"sync"
)

// Event represents an event contract
type Event interface {
Type() uint32
}

// ------------------------------------- Dispatcher -------------------------------------

// Dispatcher represents an event dispatcher.
type Dispatcher[T Event] struct {
subs sync.Map
}

// NewDispatcher creates a new dispatcher of events.
func NewDispatcher[T Event]() *Dispatcher[T] {
return &Dispatcher[T]{}
}

// loadOrStore finds a subscriber group or creates a new one
func (d *Dispatcher[T]) loadOrStore(key uint32) *group[T] {
s, _ := d.subs.LoadOrStore(key, new(group[T]))
return s.(*group[T])
}

// Publish writes an event into the dispatcher
func (d *Dispatcher[T]) Publish(ev T) {
if g, ok := d.subs.Load(ev.Type()); ok {
g.(*group[T]).Broadcast(ev)
}
}

// Subscribe subscribes to an callback event
func (d *Dispatcher[T]) Subscribe(eventType uint32, handler func(T)) context.CancelFunc {
ctx, cancel := context.WithCancel(context.Background())
sub := &consumer[T]{
queue: make(chan T, 1024),
exec: handler,
}

// Add to consumer group, if it doesn't exist it will create one
group := d.loadOrStore(eventType)
group.Add(ctx, sub)

// Return unsubscribe function
return func() {
d.unsubscribe(eventType, sub) // Remove from the list
cancel() // Stop async processing
}
}

// Count counts the number of subscribers
func (d *Dispatcher[T]) count(eventType uint32) int {
return len(d.loadOrStore(eventType).subs)
}

// unsubscribe removes the subscriber from the list of subscribers
func (d *Dispatcher[T]) unsubscribe(eventType uint32, sub *consumer[T]) {
group := d.loadOrStore(eventType)
group.Del(sub)
}

// ------------------------------------- Subscriber List -------------------------------------

// consumer represents a consumer with a message queue
type consumer[T Event] struct {
queue chan T // Message buffer
exec func(T) // Process callback
}

// Listen listens to the event queue and processes events
func (s *consumer[T]) Listen(ctx context.Context) {
for {
select {
case ev := <-s.queue:
s.exec(ev)
case <-ctx.Done():
return
}
}
}

// group represents a consumer group
type group[T Event] struct {
sync.RWMutex
subs []*consumer[T]
}

// Broadcast sends an event to all consumers
func (s *group[T]) Broadcast(ev T) {
s.RLock()
defer s.RUnlock()
for _, sub := range s.subs {
sub.queue <- ev
}
}

// Add adds a subscriber to the list
func (s *group[T]) Add(ctx context.Context, sub *consumer[T]) {
go sub.Listen(ctx)

// Add the consumer to the list of active consumers
s.Lock()
s.subs = append(s.subs, sub)
s.Unlock()
}

// Del removes a subscriber from the list
func (s *group[T]) Del(sub *consumer[T]) {
s.Lock()
defer s.Unlock()

// Search and remove the subscriber
subs := make([]*consumer[T], 0, len(s.subs))
for _, v := range s.subs {
if v != sub {
subs = append(subs, v)
}
}
s.subs = subs
}
Loading

0 comments on commit 0a1be6a

Please sign in to comment.