/
client_workers.go
259 lines (214 loc) · 6.14 KB
/
client_workers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
package sync
import (
"fmt"
"strconv"
"time"
"go.uber.org/zap"
)
func (c *Client) barrierWorker() {
defer c.wg.Done()
var (
pending []*Barrier
keys []string
log = c.log.With("process", "barriers")
)
// remove removes the index from both the active and key slices.
remove := func(b *Barrier) {
key := b.key
log.Debugw("stopping to monitor barrier", "key", key)
// drop this barrier from the active set.
for i, p := range pending {
if p == b {
copy(pending[i:], pending[i+1:])
pending[len(pending)-1] = nil
pending = pending[:len(pending)-1]
break
}
}
// drop the key as well.
for i, k := range keys {
if k == key {
copy(keys[i:], keys[i+1:])
keys[len(keys)-1] = ""
keys = keys[:len(keys)-1]
break
}
}
}
tick := time.NewTicker(1 * time.Second)
defer func() { tick.Stop() }() // this is a closure because tick gets replaced.
for {
cnt := len(pending)
if cnt == 0 {
tick.Stop()
select {
case <-tick.C:
default:
}
}
select {
case newBarrier := <-c.barrierCh:
b := newBarrier.barrier
pending = append(pending, b)
keys = append(keys, b.key)
log.Debugw("added barrier", "new", b.key, "all", keys)
newBarrier.resultCh <- nil
if cnt == 0 {
// we need to reactivate the ticker.
tick = time.NewTicker(1 * time.Second)
}
case <-tick.C:
log.Debugw("checking barriers", "keys", keys)
case <-c.ctx.Done():
log.Debugw("yielding", "pending_barriers", len(pending))
for _, b := range pending {
log.Debugw("cancelling pending barrier", "key", b.key)
b.C <- c.ctx.Err()
close(b.C)
}
pending = nil
return
}
// Test all contexts and forget the barriers whose contexts have fired.
var del []*Barrier
for _, b := range pending {
if err := b.ctx.Err(); err != nil {
log.Debugw("barrier context expired; removing", "key", b.key)
b.C <- err
close(b.C)
del = append(del, b)
}
}
// Prune deleted barriers.
for _, b := range del {
remove(b)
}
if len(pending) == 0 {
// nothing to do; loop over; the ticker will be disarmed.
continue
}
// Get the values of all pending states at once, under the context of the Client.
log.Debugw("getting barrier values", "keys", keys)
vals, err := c.rclient.MGet(keys...).Result()
if err != nil {
log.Warnw("failed while getting barriers; iteration skipped", "error", err)
continue
}
del = del[:0]
for i, v := range vals {
if v == nil {
continue // nobody else has INCR the barrier yet; skip.
}
b := pending[i]
curr, err := strconv.ParseInt(v.(string), 10, 64)
if err != nil {
log.Warnw("failed to parse barrier value", "error", err, "value", v, "key", b.key)
continue
}
// Has the barrier been hit?
if curr >= b.target {
log.Debugw("barrier was hit; informing waiters", "key", b.key, "target", b.target, "curr", curr)
// barrier has been hit; send a nil error on the channel, and close it.
b.C <- nil
close(b.C)
// queue this deletion; otherwise indices won't line up.
del = append(del, b)
} else {
log.Debugw("barrier still unsatisfied", "key", b.key, "target", b.target, "curr", curr)
}
}
for _, b := range del {
remove(b)
}
}
}
func (c *Client) subscriptionWorker() {
defer c.wg.Done()
var (
active = make(map[string]*Subscription)
rmSubCh = make(chan []*Subscription, 1)
)
log := c.log.With("process", "subscriptions")
monitorCtx := func(s *Subscription) {
select {
case <-s.ctx.Done():
log.Debugw("context closure detected; removing subscription", "topic", s.topic.name, "key", s.key)
rmSubCh <- []*Subscription{s}
case <-c.ctx.Done():
log.Debugw("yielding context monitor routine due to global context closure", "topic", s.topic.name, "key", s.key)
}
}
consumer := &subscriptionConsumer{c: c, log: log, rmSubCh: rmSubCh, notifyCh: make(chan struct{}, 1)}
var finalErr error // error to broadcast to remaining subscribers upon returning.
defer func() {
for _, s := range active {
s.doneCh <- finalErr
close(s.doneCh)
s.outCh.Close()
}
}()
for {
// Manage subscriptions.
select {
case newSub := <-c.newSubCh:
s := newSub.sub
if _, ok := active[s.key]; ok {
newSub.resultCh <- fmt.Errorf("failed to add duplicate subscription")
continue
}
log.Debugw("adding subscription", "topic", s.topic.name, "key", s.key)
// interrupt consumer and wait until it yields, before mutating the active set.
err := consumer.interrupt()
if err != nil {
panic(fmt.Sprintf("failed to interrupt consumer when adding subscription; exiting; err: %s", err))
}
active[s.key] = s
go monitorCtx(s)
newSub.resultCh <- nil
case subs := <-rmSubCh:
// interrupt consumer and wait until it yields, before accessing subscriptions.
err := consumer.interrupt()
if err != nil {
panic(fmt.Sprintf("failed to interrupt consumer when removing subscriptions; exiting; err: %s", err))
}
if log.Desugar().Core().Enabled(zap.DebugLevel) {
// only incur in this cost if debug level is enabled.
topics, keys, ids := make([]string, len(subs)), make([]string, len(subs)), make([]string, len(subs))
for _, s := range subs {
topics = append(topics, s.topic.name)
keys = append(keys, s.key)
ids = append(ids, s.lastid)
}
log.Debugw("removing subscriptions", "topics", topics, "keys", keys, "ids", ids)
}
for _, s := range subs {
// this was a planned removal, sending err = nil.
s.doneCh <- nil
close(s.doneCh)
s.outCh.Close()
delete(active, s.key)
}
case <-c.ctx.Done():
err := consumer.interrupt()
if err != nil {
log.Debugf("failed to interrupt consumer when exiting", "error", err)
finalErr = err
return
}
return
}
if len(rmSubCh)+len(c.newSubCh) > 0 {
log.Debugf("more subscription control events to consume; looping over")
// we still have pending items, continue draining before we resume
// consuming.
continue
}
if len(active) == 0 {
continue
}
log.Debugf("resume consuming")
// no copy of the active set is needed, as we always interrupt the
// consumer before mutating the active set.
consumer.resume(active)
}
}