-
-
Notifications
You must be signed in to change notification settings - Fork 273
/
control_flow.go
454 lines (393 loc) Β· 12.5 KB
/
control_flow.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
package terminal
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/safing/portbase/formats/varint"
"github.com/safing/portbase/modules"
)
// FlowControl defines the flow control interface.
type FlowControl interface {
Deliver(msg *Msg) *Error
Receive() <-chan *Msg
Send(msg *Msg, timeout time.Duration) *Error
ReadyToSend() <-chan struct{}
Flush(timeout time.Duration)
StartWorkers(m *modules.Module, terminalName string)
RecvQueueLen() int
SendQueueLen() int
}
// FlowControlType represents a flow control type.
type FlowControlType uint8
// Flow Control Types.
const (
FlowControlDefault FlowControlType = 0
FlowControlDFQ FlowControlType = 1
FlowControlNone FlowControlType = 2
defaultFlowControl = FlowControlDFQ
)
// DefaultSize returns the default flow control size.
func (fct FlowControlType) DefaultSize() uint32 {
if fct == FlowControlDefault {
fct = defaultFlowControl
}
switch fct {
case FlowControlDFQ:
return 50000
case FlowControlNone:
return 10000
case FlowControlDefault:
fallthrough
default:
return 0
}
}
// Flow Queue Configuration.
const (
DefaultQueueSize = 50000
MaxQueueSize = 1000000
forceReportBelowPercent = 0.75
)
// DuplexFlowQueue is a duplex flow control mechanism using queues.
type DuplexFlowQueue struct {
// ti is the Terminal that is using the DFQ.
ctx context.Context
// submitUpstream is used to submit messages to the upstream channel.
submitUpstream func(msg *Msg, timeout time.Duration)
// sendQueue holds the messages that are waiting to be sent.
sendQueue chan *Msg
// prioMsgs holds the number of messages to send with high priority.
prioMsgs *int32
// sendSpace indicates the amount free slots in the recvQueue on the other end.
sendSpace *int32
// readyToSend is used to notify sending components that there is free space.
readyToSend chan struct{}
// wakeSender is used to wake a sender in case the sendSpace was zero and the
// sender is waiting for available space.
wakeSender chan struct{}
// recvQueue holds the messages that are waiting to be processed.
recvQueue chan *Msg
// reportedSpace indicates the amount of free slots that the other end knows
// about.
reportedSpace *int32
// spaceReportLock locks the calculation of space to report.
spaceReportLock sync.Mutex
// forceSpaceReport forces the sender to send a space report.
forceSpaceReport chan struct{}
// flush is used to send a finish function to the handler, which will write
// all pending messages and then call the received function.
flush chan func()
}
// NewDuplexFlowQueue returns a new duplex flow queue.
func NewDuplexFlowQueue(
ctx context.Context,
queueSize uint32,
submitUpstream func(msg *Msg, timeout time.Duration),
) *DuplexFlowQueue {
dfq := &DuplexFlowQueue{
ctx: ctx,
submitUpstream: submitUpstream,
sendQueue: make(chan *Msg, queueSize),
prioMsgs: new(int32),
sendSpace: new(int32),
readyToSend: make(chan struct{}),
wakeSender: make(chan struct{}, 1),
recvQueue: make(chan *Msg, queueSize),
reportedSpace: new(int32),
forceSpaceReport: make(chan struct{}, 1),
flush: make(chan func()),
}
atomic.StoreInt32(dfq.sendSpace, int32(queueSize))
atomic.StoreInt32(dfq.reportedSpace, int32(queueSize))
return dfq
}
// StartWorkers starts the necessary workers to operate the flow queue.
func (dfq *DuplexFlowQueue) StartWorkers(m *modules.Module, terminalName string) {
m.StartWorker(terminalName+" flow queue", dfq.FlowHandler)
}
// shouldReportRecvSpace returns whether the receive space should be reported.
func (dfq *DuplexFlowQueue) shouldReportRecvSpace() bool {
return atomic.LoadInt32(dfq.reportedSpace) < int32(float32(cap(dfq.recvQueue))*forceReportBelowPercent)
}
// decrementReportedRecvSpace decreases the reported recv space by 1 and
// returns if the receive space should be reported.
func (dfq *DuplexFlowQueue) decrementReportedRecvSpace() (shouldReportRecvSpace bool) {
return atomic.AddInt32(dfq.reportedSpace, -1) < int32(float32(cap(dfq.recvQueue))*forceReportBelowPercent)
}
// getSendSpace returns the current send space.
func (dfq *DuplexFlowQueue) getSendSpace() int32 {
return atomic.LoadInt32(dfq.sendSpace)
}
// decrementSendSpace decreases the send space by 1 and returns it.
func (dfq *DuplexFlowQueue) decrementSendSpace() int32 {
return atomic.AddInt32(dfq.sendSpace, -1)
}
func (dfq *DuplexFlowQueue) addToSendSpace(n int32) {
// Add new space to send space and check if it was zero.
atomic.AddInt32(dfq.sendSpace, n)
// Wake the sender in case it is waiting.
select {
case dfq.wakeSender <- struct{}{}:
default:
}
}
// reportableRecvSpace returns how much free space can be reported to the other
// end. The returned number must be communicated to the other end and must not
// be ignored.
func (dfq *DuplexFlowQueue) reportableRecvSpace() int32 {
// Changes to the recvQueue during calculation are no problem.
// We don't want to report space twice though!
dfq.spaceReportLock.Lock()
defer dfq.spaceReportLock.Unlock()
// Calculate reportable receive space and add it to the reported space.
reportedSpace := atomic.LoadInt32(dfq.reportedSpace)
toReport := int32(cap(dfq.recvQueue)-len(dfq.recvQueue)) - reportedSpace
// Never report values below zero.
// This can happen, as dfq.reportedSpace is decreased after a container is
// submitted to dfq.recvQueue by dfq.Deliver(). This race condition can only
// lower the space to report, not increase it. A simple check here solved
// this problem and keeps performance high.
// Also, don't report values of 1, as the benefit is minimal and this might
// be commonly triggered due to the buffer of the force report channel.
if toReport <= 1 {
return 0
}
// Add space to report to dfq.reportedSpace and return it.
atomic.AddInt32(dfq.reportedSpace, toReport)
return toReport
}
// FlowHandler handles all flow queue internals and must be started as a worker
// in the module where it is used.
func (dfq *DuplexFlowQueue) FlowHandler(_ context.Context) error {
// The upstreamSender is started by the terminal module, but is tied to the
// flow owner instead. Make sure that the flow owner's module depends on the
// terminal module so that it is shut down earlier.
var sendSpaceDepleted bool
var flushFinished func()
// Drain all queues when shutting down.
defer func() {
for {
select {
case msg := <-dfq.sendQueue:
msg.Finish()
case msg := <-dfq.recvQueue:
msg.Finish()
default:
return
}
}
}()
sending:
for {
// If the send queue is depleted, wait to be woken.
if sendSpaceDepleted {
select {
case <-dfq.wakeSender:
if dfq.getSendSpace() > 0 {
sendSpaceDepleted = false
} else {
continue sending
}
case <-dfq.forceSpaceReport:
// Forced reporting of space.
// We do not need to check if there is enough sending space, as there is
// no data included.
spaceToReport := dfq.reportableRecvSpace()
if spaceToReport > 0 {
msg := NewMsg(varint.Pack64(uint64(spaceToReport)))
dfq.submitUpstream(msg, 0)
}
continue sending
case <-dfq.ctx.Done():
return nil
}
}
// Get message from send queue.
select {
case dfq.readyToSend <- struct{}{}:
// Notify that we are ready to send.
case msg := <-dfq.sendQueue:
// Send message from queue.
// If nil, the queue is being shut down.
if msg == nil {
return nil
}
// Check if we are handling a high priority message or waiting for one.
// Mark any msgs as high priority, when there is one in the pipeline.
remainingPrioMsgs := atomic.AddInt32(dfq.prioMsgs, -1)
switch {
case remainingPrioMsgs >= 0:
msg.Unit.MakeHighPriority()
case remainingPrioMsgs < -30_000:
// Prevent wrap to positive.
// Compatible with int16 or bigger.
atomic.StoreInt32(dfq.prioMsgs, 0)
}
// Wait for processing slot.
msg.Unit.WaitForSlot()
// Prepend available receiving space.
msg.Data.Prepend(varint.Pack64(uint64(dfq.reportableRecvSpace())))
// Submit for sending upstream.
dfq.submitUpstream(msg, 0)
// Decrease the send space and set flag if depleted.
if dfq.decrementSendSpace() <= 0 {
sendSpaceDepleted = true
}
// Check if the send queue is empty now and signal flushers.
if flushFinished != nil && len(dfq.sendQueue) == 0 {
flushFinished()
flushFinished = nil
}
case <-dfq.forceSpaceReport:
// Forced reporting of space.
// We do not need to check if there is enough sending space, as there is
// no data included.
spaceToReport := dfq.reportableRecvSpace()
if spaceToReport > 0 {
msg := NewMsg(varint.Pack64(uint64(spaceToReport)))
dfq.submitUpstream(msg, 0)
}
case newFlushFinishedFn := <-dfq.flush:
// Signal immediately if send queue is empty.
if len(dfq.sendQueue) == 0 {
newFlushFinishedFn()
} else {
// If there already is a flush finished function, stack them.
if flushFinished != nil {
stackedFlushFinishFn := flushFinished
flushFinished = func() {
stackedFlushFinishFn()
newFlushFinishedFn()
}
} else {
flushFinished = newFlushFinishedFn
}
}
case <-dfq.ctx.Done():
return nil
}
}
}
// Flush waits for all waiting data to be sent.
func (dfq *DuplexFlowQueue) Flush(timeout time.Duration) {
// Create channel and function for notifying.
wait := make(chan struct{})
finished := func() {
close(wait)
}
// Request flush and return when stopping.
select {
case dfq.flush <- finished:
case <-dfq.ctx.Done():
return
case <-TimedOut(timeout):
return
}
// Wait for flush to finish and return when stopping.
select {
case <-wait:
case <-dfq.ctx.Done():
case <-TimedOut(timeout):
}
}
var ready = make(chan struct{})
func init() {
close(ready)
}
// ReadyToSend returns a channel that can be read when data can be sent.
func (dfq *DuplexFlowQueue) ReadyToSend() <-chan struct{} {
if atomic.LoadInt32(dfq.sendSpace) > 0 {
return ready
}
return dfq.readyToSend
}
// Send adds the given container to the send queue.
func (dfq *DuplexFlowQueue) Send(msg *Msg, timeout time.Duration) *Error {
select {
case dfq.sendQueue <- msg:
if msg.Unit.IsHighPriority() {
// Reset prioMsgs to the current queue size, so that all waiting and the
// message we just added are all handled as high priority.
atomic.StoreInt32(dfq.prioMsgs, int32(len(dfq.sendQueue)))
}
return nil
case <-TimedOut(timeout):
msg.Finish()
return ErrTimeout
case <-dfq.ctx.Done():
msg.Finish()
return ErrStopping
}
}
// Receive receives a container from the recv queue.
func (dfq *DuplexFlowQueue) Receive() <-chan *Msg {
// If the reported recv space is nearing its end, force a report.
if dfq.shouldReportRecvSpace() {
select {
case dfq.forceSpaceReport <- struct{}{}:
default:
}
}
return dfq.recvQueue
}
// Deliver submits a container for receiving from upstream.
func (dfq *DuplexFlowQueue) Deliver(msg *Msg) *Error {
// Ignore nil containers.
if msg == nil || msg.Data == nil {
msg.Finish()
return ErrMalformedData.With("no data")
}
// Get and add new reported space.
addSpace, err := msg.Data.GetNextN16()
if err != nil {
msg.Finish()
return ErrMalformedData.With("failed to parse reported space: %w", err)
}
if addSpace > 0 {
dfq.addToSendSpace(int32(addSpace))
}
// Abort processing if the container only contained a space update.
if !msg.Data.HoldsData() {
msg.Finish()
return nil
}
select {
case dfq.recvQueue <- msg:
// If the recv queue accepted the Container, decrement the recv space.
shouldReportRecvSpace := dfq.decrementReportedRecvSpace()
// If the reported recv space is nearing its end, force a report, if the
// sender worker is idle.
if shouldReportRecvSpace {
select {
case dfq.forceSpaceReport <- struct{}{}:
default:
}
}
return nil
default:
// If the recv queue is full, return an error.
// The whole point of the flow queue is to guarantee that this never happens.
msg.Finish()
return ErrQueueOverflow
}
}
// FlowStats returns a k=v formatted string of internal stats.
func (dfq *DuplexFlowQueue) FlowStats() string {
return fmt.Sprintf(
"sq=%d rq=%d sends=%d reps=%d",
len(dfq.sendQueue),
len(dfq.recvQueue),
atomic.LoadInt32(dfq.sendSpace),
atomic.LoadInt32(dfq.reportedSpace),
)
}
// RecvQueueLen returns the current length of the receive queue.
func (dfq *DuplexFlowQueue) RecvQueueLen() int {
return len(dfq.recvQueue)
}
// SendQueueLen returns the current length of the send queue.
func (dfq *DuplexFlowQueue) SendQueueLen() int {
return len(dfq.sendQueue)
}