-
Notifications
You must be signed in to change notification settings - Fork 0
/
bcast.go
84 lines (74 loc) · 1.58 KB
/
bcast.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package dcgm
import (
"fmt"
"sync"
)
type publisher struct {
publish chan interface{}
close chan bool
subscribers []*subscriber
subscriberLock sync.Mutex
}
type subscriber struct {
read chan interface{}
close chan bool
}
func newPublisher() *publisher {
pub := &publisher{
publish: make(chan interface{}),
close: make(chan bool),
}
return pub
}
func (p *publisher) subscriberList() []*subscriber {
p.subscriberLock.Lock()
defer p.subscriberLock.Unlock()
return p.subscribers[:]
}
func (p *publisher) add() *subscriber {
p.subscriberLock.Lock()
defer p.subscriberLock.Unlock()
newSub := &subscriber{
read: make(chan interface{}),
close: make(chan bool),
}
p.subscribers = append(p.subscribers, newSub)
return newSub
}
func (p *publisher) remove(leaving *subscriber) error {
p.subscriberLock.Lock()
defer p.subscriberLock.Unlock()
subscriberIndex := -1
for i, sub := range p.subscribers {
if sub == leaving {
subscriberIndex = i
break
}
}
if subscriberIndex == -1 {
return fmt.Errorf("Could not find subscriber")
}
go func() { leaving.close <- true }()
p.subscribers = append(p.subscribers[:subscriberIndex], p.subscribers[subscriberIndex+1:]...)
return nil
}
func (p *publisher) send(val interface{}) {
p.publish <- val
}
func (p *publisher) broadcast() {
for {
select {
case publishing := <-p.publish:
for _, sub := range p.subscriberList() {
go func(s *subscriber, val interface{}) {
s.read <- val
}(sub, publishing)
}
case <-p.close:
return
}
}
}
func (p *publisher) closePublisher() {
p.close <- true
}