-
Notifications
You must be signed in to change notification settings - Fork 35
/
broadcast.go
73 lines (64 loc) · 1.65 KB
/
broadcast.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package holster
import "sync"
type Broadcaster interface {
WaitChan(string) chan struct{}
Wait(string)
Broadcast()
Done()
}
// Broadcasts to goroutines a new event has occurred and any waiting go routines should
// stop waiting and do work. The current implementation is limited to 10,0000 unconsumed
// broadcasts. If the user broadcasts more events than can be consumed calls to broadcast()
// will eventually block until the goroutines can catch up. This ensures goroutines will
// receive at least one event per broadcast() call.
type broadcast struct {
clients map[string]chan struct{}
done chan struct{}
mutex sync.Mutex
}
func NewBroadcaster() Broadcaster {
return &broadcast{
clients: make(map[string]chan struct{}),
done: make(chan struct{}),
}
}
// Notify all Waiting goroutines
func (b *broadcast) Broadcast() {
b.mutex.Lock()
for _, channel := range b.clients {
channel <- struct{}{}
}
b.mutex.Unlock()
}
// Cancels any Wait() calls that are currently blocked
func (b *broadcast) Done() {
close(b.done)
}
// Blocks until a broadcast is received
func (b *broadcast) Wait(name string) {
b.mutex.Lock()
channel, ok := b.clients[name]
if !ok {
b.clients[name] = make(chan struct{}, 10000)
channel = b.clients[name]
}
b.mutex.Unlock()
// Wait for a new event or done is closed
select {
case <-channel:
return
case <-b.done:
return
}
}
// Returns a channel the caller can use to wait for a broadcast
func (b *broadcast) WaitChan(name string) chan struct{} {
b.mutex.Lock()
channel, ok := b.clients[name]
if !ok {
b.clients[name] = make(chan struct{}, 10000)
channel = b.clients[name]
}
b.mutex.Unlock()
return channel
}