forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 0
/
bus.go
102 lines (82 loc) · 1.87 KB
/
bus.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package bus
import (
"sync"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)
// Event sent to the bus
type Event common.MapStr
// Bus provides a common channel to emit and listen for Events
type Bus interface {
// Publish an event to the bus
Publish(Event)
// Subscribe to all events, filter them to the ones containing *all* the keys in filter
Subscribe(filter ...string) Listener
}
// Listener retrieves Events from a Bus subscription until Stop is called
type Listener interface {
// Events channel
Events() <-chan Event
// Stop listening and removes itself from the bus
Stop()
}
type bus struct {
sync.RWMutex
name string
listeners []*listener
}
type listener struct {
filter []string
channel chan Event
bus *bus
}
// New initializes a new bus with the given name and returns it
func New(name string) Bus {
return &bus{
name: name,
listeners: make([]*listener, 0),
}
}
func (b *bus) Publish(e Event) {
b.RLock()
defer b.RUnlock()
logp.Debug("bus", "%s: %+v", b.name, e)
for _, listener := range b.listeners {
if listener.interested(e) {
listener.channel <- e
}
}
}
func (b *bus) Subscribe(filter ...string) Listener {
listener := &listener{
filter: filter,
bus: b,
channel: make(chan Event, 100),
}
b.Lock()
defer b.Unlock()
b.listeners = append(b.listeners, listener)
return listener
}
func (l *listener) Events() <-chan Event {
return l.channel
}
func (l *listener) Stop() {
l.bus.Lock()
defer l.bus.Unlock()
for i, listener := range l.bus.listeners {
if l == listener {
l.bus.listeners = append(l.bus.listeners[:i], l.bus.listeners[i+1:]...)
}
}
close(l.channel)
}
// Return true if listener is interested on the given event
func (l *listener) interested(e Event) bool {
for _, key := range l.filter {
if _, ok := e[key]; !ok {
return false
}
}
return true
}