-
Notifications
You must be signed in to change notification settings - Fork 0
/
pubsub.go
111 lines (99 loc) · 1.51 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package pubsub
import (
"sync"
)
type PubSub struct {
mu sync.Mutex
next chan item
closed bool
}
type item struct {
value interface{}
next chan item
}
type Subscription struct {
next chan item
Values chan interface{}
mu sync.Mutex
closed chan struct{}
}
func NewPubSub() (ret *PubSub) {
return new(PubSub)
}
func (me *PubSub) init() {
me.next = make(chan item, 1)
}
func (me *PubSub) lazyInit() {
me.mu.Lock()
defer me.mu.Unlock()
if me.closed {
return
}
if me.next == nil {
me.init()
}
}
func (me *PubSub) Publish(v interface{}) {
me.lazyInit()
next := make(chan item, 1)
i := item{v, next}
me.mu.Lock()
if !me.closed {
me.next <- i
me.next = next
}
me.mu.Unlock()
}
func (me *Subscription) Close() {
me.mu.Lock()
defer me.mu.Unlock()
select {
case <-me.closed:
default:
close(me.closed)
}
}
func (me *Subscription) runner() {
defer close(me.Values)
for {
select {
case i, ok := <-me.next:
if !ok {
me.Close()
return
}
me.next <- i
me.next = i.next
select {
case me.Values <- i.value:
case <-me.closed:
return
}
case <-me.closed:
return
}
}
}
func (me *PubSub) Subscribe() (ret *Subscription) {
me.lazyInit()
ret = &Subscription{
closed: make(chan struct{}),
Values: make(chan interface{}),
}
me.mu.Lock()
ret.next = me.next
me.mu.Unlock()
go ret.runner()
return
}
func (me *PubSub) Close() {
me.mu.Lock()
defer me.mu.Unlock()
if me.closed {
return
}
if me.next != nil {
close(me.next)
}
me.closed = true
}