forked from Dan70402/go-qless
/
events.go
executable file
·120 lines (100 loc) · 1.97 KB
/
events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package qless
import (
"sync"
"github.com/garyburd/redigo/redis"
)
type Events struct {
pool *redis.Pool
// locks following
l sync.Mutex
c *redis.PubSubConn
subscribers []chan<- interface{}
}
var channels = []interface{}{"ql:log", "ql:canceled", "ql:completed", "ql:failed", "ql:popped", "ql:stalled", "ql:put", "ql:track", "ql:untrack"}
func (e *Events) Subscribe(ch chan<- interface{}) error {
e.l.Lock()
defer e.l.Unlock()
e.subscribers = append(e.subscribers, ch)
if e.c == nil {
return e.start()
}
return nil
}
func (e *Events) Unsubscribe(ch chan<- interface{}) {
e.l.Lock()
defer e.l.Unlock()
for i := 0; i < len(e.subscribers); i++ {
if e.subscribers[i] == ch {
close(ch)
l := len(e.subscribers) - 1
e.subscribers[i] = e.subscribers[l]
e.subscribers[l] = nil
e.subscribers = e.subscribers[:l]
}
}
if len(e.subscribers) == 0 {
e.subscribers = nil
}
}
func (e *Events) start() error {
psc := redis.PubSubConn{Conn: e.pool.Get()}
if err := psc.Subscribe(channels...); err != nil {
psc.Conn.Close()
return err
}
e.c = &psc
go e.run(psc)
return nil
}
func (e *Events) Close() {
e.l.Lock()
defer e.l.Unlock()
if e.subscribers != nil {
for _, ch := range e.subscribers {
close(ch)
}
e.subscribers = nil
}
if e.c != nil {
e.c.Unsubscribe()
e.c.Conn.Close()
e.c = nil
}
}
func (e *Events) run(psc redis.PubSubConn) {
defer func() {
e.Close()
}()
running := true
for running {
val := psc.Receive()
if _, ok := val.(error); ok {
running = false
}
e.l.Lock()
sub := e.subscribers
oldLen := len(sub)
for i := 0; i < len(sub); i++ {
ch := sub[i]
select {
case ch <- val:
default:
// blocking channels are removed
close(ch)
e := len(sub) - 1
sub[i] = sub[e]
sub[e] = nil
sub = sub[:e]
}
}
switch len(sub) {
case 0:
running = false
e.subscribers = nil
case oldLen: // do nothing
default:
e.subscribers = sub
}
e.l.Unlock()
}
}