/
pubsub.go
92 lines (82 loc) · 1.27 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package pubsub
import (
"sync"
)
type PubSub struct {
mu sync.Mutex
next chan item
closed bool
}
type item struct {
value interface{}
next chan item
}
type Subscription struct {
next chan item
Values chan interface{}
mu sync.Mutex
closed chan struct{}
}
func NewPubSub() (ret *PubSub) {
return &PubSub{
next: make(chan item, 1),
}
}
func (me *PubSub) Publish(v interface{}) {
next := make(chan item, 1)
i := item{v, next}
me.mu.Lock()
me.next <- i
me.next = next
me.mu.Unlock()
}
func (me *Subscription) Close() {
me.mu.Lock()
defer me.mu.Unlock()
select {
case <-me.closed:
default:
close(me.closed)
}
}
func (me *Subscription) runner() {
defer close(me.Values)
for {
select {
case i, ok := <-me.next:
if !ok {
me.Close()
return
}
me.next <- i
me.next = i.next
select {
case me.Values <- i.value:
case <-me.closed:
return
}
case <-me.closed:
return
}
}
}
func (me *PubSub) Subscribe() (ret *Subscription) {
ret = &Subscription{
closed: make(chan struct{}),
Values: make(chan interface{}),
}
me.mu.Lock()
ret.next = me.next
me.mu.Unlock()
go ret.runner()
return
}
func (me *PubSub) Close() {
me.mu.Lock()
defer me.mu.Unlock()
if me.closed {
return
}
close(me.next)
me.closed = true
}