-
Notifications
You must be signed in to change notification settings - Fork 1
/
proc_event_publisher.go
131 lines (113 loc) · 3.49 KB
/
proc_event_publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
// This file was automatically generated by genny.
// Any changes will be lost if this file is regenerated.
// see https://github.com/cheekybits/genny
package pm // import "github.com/telnet2/pm/pubsub"
import (
"sync"
"time"
)
var wgProcEventPool = sync.Pool{New: func() interface{} { return new(sync.WaitGroup) }}
// NewProcEventPublisher creates a new pub/sub publisher to broadcast messages.
// The duration is used as the send timeout as to not block the publisher publishing
// messages to other clients if one client is slow or unresponsive.
// The buffer is used when creating new channels for subscribers.
func NewProcEventPublisher(publishTimeout time.Duration, buffer int) *ProcEventPublisher {
return &ProcEventPublisher{
buffer: buffer,
timeout: publishTimeout,
subscribers: make(map[_ProcEventSubscriber]_ProcEventTopicFunc),
}
}
type _ProcEventSubscriber chan *ProcEvent
type _ProcEventTopicFunc func(v *ProcEvent) bool
// ProcEventPublisher is basic pub/sub structure. Allows to send events and subscribe
// to them. Can be safely used from multiple goroutines.
type ProcEventPublisher struct {
m sync.RWMutex
buffer int
timeout time.Duration
subscribers map[_ProcEventSubscriber]_ProcEventTopicFunc
}
// Len returns the number of subscribers for the publisher
func (p *ProcEventPublisher) Len() int {
p.m.RLock()
i := len(p.subscribers)
p.m.RUnlock()
return i
}
// Subscribe adds a new subscriber to the publisher returning the channel.
func (p *ProcEventPublisher) Subscribe() chan *ProcEvent {
return p.SubscribeTopic(nil)
}
// SubscribeTopic adds a new subscriber that filters messages sent by a topic.
func (p *ProcEventPublisher) SubscribeTopic(topic _ProcEventTopicFunc) chan *ProcEvent {
ch := make(chan *ProcEvent, p.buffer)
p.m.Lock()
p.subscribers[ch] = topic
p.m.Unlock()
return ch
}
// SubscribeTopicWithBuffer adds a new subscriber that filters messages sent by a topic.
// The returned channel has a buffer of the specified size.
func (p *ProcEventPublisher) SubscribeTopicWithBuffer(topic _ProcEventTopicFunc, buffer int) chan *ProcEvent {
ch := make(chan *ProcEvent, buffer)
p.m.Lock()
p.subscribers[ch] = topic
p.m.Unlock()
return ch
}
// Evict removes the specified subscriber from receiving any more messages.
func (p *ProcEventPublisher) Evict(sub chan *ProcEvent) {
p.m.Lock()
_, exists := p.subscribers[sub]
if exists {
delete(p.subscribers, sub)
close(sub)
}
p.m.Unlock()
}
// Publish sends the data in v to all subscribers currently registered with the publisher.
func (p *ProcEventPublisher) Publish(v *ProcEvent) {
p.m.RLock()
if len(p.subscribers) == 0 {
p.m.RUnlock()
return
}
wg := wgProcEventPool.Get().(*sync.WaitGroup)
for sub, topic := range p.subscribers {
wg.Add(1)
go p.sendTopic(sub, topic, v, wg)
}
wg.Wait()
wgProcEventPool.Put(wg)
p.m.RUnlock()
}
// Close closes the channels to all subscribers registered with the publisher.
func (p *ProcEventPublisher) Close() {
p.m.Lock()
for sub := range p.subscribers {
delete(p.subscribers, sub)
close(sub)
}
p.m.Unlock()
}
func (p *ProcEventPublisher) sendTopic(sub _ProcEventSubscriber, topic _ProcEventTopicFunc, v *ProcEvent, wg *sync.WaitGroup) {
defer wg.Done()
if topic != nil && !topic(v) {
return
}
// send under a select as to not block if the receiver is unavailable
if p.timeout > 0 {
timeout := time.NewTimer(p.timeout)
defer timeout.Stop()
select {
case sub <- v:
case <-timeout.C:
}
return
}
select {
case sub <- v:
default:
}
}