-
Notifications
You must be signed in to change notification settings - Fork 159
/
acker.go
104 lines (91 loc) · 2.32 KB
/
acker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package xgress
import (
"github.com/ef-ds/deque"
"github.com/michaelquigley/pfxlog"
"github.com/openziti/metrics"
"sync/atomic"
)
var acker *Acker
func InitAcker(forwarder PayloadBufferForwarder, metrics metrics.Registry, closeNotify <-chan struct{}) {
acker = NewAcker(forwarder, metrics, closeNotify)
}
type ackEntry struct {
Address
*Acknowledgement
}
// Note: if altering this struct, be sure to account for 64 bit alignment on 32 bit arm arch
// https://pkg.go.dev/sync/atomic#pkg-note-BUG
// https://github.com/golang/go/issues/36606
type Acker struct {
acksQueueSize int64
forwarder PayloadBufferForwarder
acks *deque.Deque
ackIngest chan *ackEntry
ackSend chan *ackEntry
closeNotify <-chan struct{}
}
func NewAcker(forwarder PayloadBufferForwarder, metrics metrics.Registry, closeNotify <-chan struct{}) *Acker {
result := &Acker{
forwarder: forwarder,
acks: deque.New(),
ackIngest: make(chan *ackEntry, 16),
ackSend: make(chan *ackEntry, 1),
closeNotify: closeNotify,
}
go result.ackIngester()
go result.ackSender()
metrics.FuncGauge("xgress.acks.queue_size", func() int64 {
return atomic.LoadInt64(&result.acksQueueSize)
})
return result
}
func (acker *Acker) ack(ack *Acknowledgement, address Address) {
acker.ackIngest <- &ackEntry{
Acknowledgement: ack,
Address: address,
}
}
func (acker *Acker) ackIngester() {
var next *ackEntry
for {
if next == nil {
if val, _ := acker.acks.PopFront(); val != nil {
next = val.(*ackEntry)
}
}
if next == nil {
select {
case ack := <-acker.ackIngest:
acker.acks.PushBack(ack)
case <-acker.closeNotify:
return
}
} else {
select {
case ack := <-acker.ackIngest:
acker.acks.PushBack(ack)
case acker.ackSend <- next:
next = nil
case <-acker.closeNotify:
return
}
}
atomic.StoreInt64(&acker.acksQueueSize, int64(acker.acks.Len()))
}
}
func (acker *Acker) ackSender() {
logger := pfxlog.Logger()
for {
select {
case nextAck := <-acker.ackSend:
if err := acker.forwarder.ForwardAcknowledgement(nextAck.Address, nextAck.Acknowledgement); err != nil {
logger.WithError(err).Debugf("unexpected error while sending ack from %v", nextAck.Address)
ackFailures.Mark(1)
} else {
ackTxMeter.Mark(1)
}
case <-acker.closeNotify:
return
}
}
}