This repository has been archived by the owner on Oct 2, 2020. It is now read-only.
/
ackMgr.go
247 lines (229 loc) · 8.43 KB
/
ackMgr.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package consumer
import (
"fmt"
"sync"
"time"
"github.com/uber-go/kafka-client/internal/list"
"github.com/uber-go/kafka-client/internal/metrics"
"github.com/uber-go/tally"
"go.uber.org/zap"
)
const (
resetCheckInterval = time.Second
)
type (
ackID struct {
listAddr list.Address
msgSeq int64
}
ackManager struct {
unackedSeqList *threadSafeSortedList
logger *zap.Logger
tally tally.Scope
}
threadSafeSortedList struct {
sync.Mutex
list *list.IntegerList
lastValue int64
tally tally.Scope
}
)
// newAckManager returns a new instance of AckManager. The returned
// ackManager can be used by a kafka partitioned consumer to keep
// track of the current commit level for the partition. The ackManager
// is only needed when consuming and processing kafka messages in
// parallel (i.e. multiple goroutines), which can cause out of order
// message completion.
//
// In the description below, the terms seqNum and offset are used interchangeably
// Within the ackManager, offsets are nothing but message seqNums
//
// Usage:
// - For every new message, call ackMgr.GetAckID(msgSeqNum) before processing the message
// - After processing, call ackMgr.Ack(ackID) to mark the message as processed
// - If the message processing fails, call ackMgr.Nack(ackID) to skip the message
// and move it into an error queue (not supported yet)
// - Periodically, call ackMgr.CommitLevel() to retrieve / checkpoint the safe commit level
//
// Assumptions:
// - The first msgSeq added is considered as the beginSeqNum for this ackManager
// - Any message with seqNum less than the most recent seqNum is considered a duplicate and ignored
// - There CANNOT be more than maxOutstanding messages that are unacked at any given point of time
// - Call ackMgr.Ack is an acknowledgement to move the seqNum past the acked message
//
// Implementation Notes:
// The implementation works by keeping track of unacked seqNums in a doubly linked list. When a
// message is acked, its removed from the linked list. The head of the linked list is the unacked
// message with the lowest seqNum. So, any offset less that that is a safe commit checkpoint.
//
// Params:
// maxOutstanding - max number of unacked messages at any given point in time
// scope / logger - metrics / logging client
func newAckManager(maxOutstanding int, scope tally.Scope, logger *zap.Logger) *ackManager {
return &ackManager{
tally: scope,
logger: logger,
unackedSeqList: newThreadSafeSortedList(maxOutstanding, scope),
}
}
// GetAckID adds the given seqNum to the list of unacked seqNums
// and returns a opaque AckID that can be used to identify this
// message when its subsequently acked or nacked
// Returns an error if the msgSeqNum is unexpected
func (mgr *ackManager) GetAckID(msgSeq int64) (ackID, error) {
addr, err := mgr.unackedSeqList.Add(msgSeq)
if err != nil {
mgr.tally.Counter(metrics.KafkaPartitionGetAckIDErrors).Inc(1)
if err != list.ErrCapacity {
// list.ErrCapacity is handled gracefully so no need to log error.
mgr.logger.Error("GetAckID() error", zap.Int64("rcvdSeq", msgSeq), zap.Error(err))
}
return ackID{}, err
}
return newAckID(addr, msgSeq), nil
}
// Ack marks the given msgSeqNum as processed.
// Ack always returns nil because the errors are non-actionable and do not affect correctness
// so we do not want to propagate back to user.
// Non-actionable errors are visible via "kafka.partition.ackmgr.ack-error" metrics.
func (mgr *ackManager) Ack(id ackID) error {
err := mgr.unackedSeqList.Remove(id.listAddr, id.msgSeq)
if err != nil {
mgr.tally.Counter(metrics.KafkaPartitionAckErrors).Inc(1)
mgr.logger.Error("ack error: list remove failed", zap.Error(err))
} else {
mgr.tally.Counter(metrics.KafkaPartitionAck).Inc(1)
}
return nil
}
// Nack marks the given msgSeqNum as processed, the expectation
// is for the caller to move the message to an error queue
// before calling this
// Nack always returns nil because the errors are non-actionable and do not affect correctness
// so we do not want to propagate back to user.
// Non-actionable errors are visible via "kafka.partition.ackmgr.nack-error" metrics.
func (mgr *ackManager) Nack(id ackID) error {
err := mgr.unackedSeqList.Remove(id.listAddr, id.msgSeq)
if err != nil {
mgr.tally.Counter(metrics.KafkaPartitionNackErrors).Inc(1)
mgr.logger.Error("nack error: list remove failed", zap.Error(err))
} else {
mgr.tally.Counter(metrics.KafkaPartitionNack).Inc(1)
}
return nil
}
// CommitLevel returns the seqNum that can be
// used as a safe commit checkpoint. Returns value
// less than zero if there is no safe checkpoint yet
func (mgr *ackManager) CommitLevel() int64 {
unacked, err := mgr.unackedSeqList.PeekHead()
if err != nil {
if err != list.ErrEmpty {
mgr.logger.Fatal("commitLevel error: list peekHead failed", zap.Error(err))
}
return mgr.unackedSeqList.LastValue()
}
return unacked - 1
}
// Reset blocks until the list is empty and the offsets have been reset.
func (mgr *ackManager) Reset() {
mgr.unackedSeqList.Reset()
}
// newAckID returns a an ackID with the given params
func newAckID(addr list.Address, value int64) ackID {
return ackID{listAddr: addr, msgSeq: value}
}
// newThreadSafeSortedList returns a new instance of thread safe
// integer list that expects its input to be received in a sorted
// order
func newThreadSafeSortedList(maxOutstanding int, scope tally.Scope) *threadSafeSortedList {
list := list.NewIntegerList(maxOutstanding)
return &threadSafeSortedList{list: list, lastValue: -1, tally: scope}
}
// PeekHead returns the value at the head of the list, if it exist
func (l *threadSafeSortedList) PeekHead() (int64, error) {
l.Lock()
defer l.Unlock()
return l.list.PeekHead()
}
func (l *threadSafeSortedList) LastValue() int64 {
l.Lock()
value := l.lastValue
l.Unlock()
return value
}
// Remove removes the entry at the given address, if and if only if
// the entry has value equal to the given value
func (l *threadSafeSortedList) Remove(addr list.Address, value int64) error {
l.Lock()
defer l.Unlock()
got, err := l.list.Get(addr)
if err != nil {
return err
}
if value != got {
return fmt.Errorf("address / value mismatch, expected value of %v but got %v", value, got)
}
return l.list.Remove(addr)
}
// Add adds the value to the end of the list. The value MUST be
// greater than the last value added to this list to maintain
// the sorted order. If not, an error is returned
func (l *threadSafeSortedList) Add(value int64) (list.Address, error) {
l.Lock()
defer l.Unlock()
if value <= l.lastValue {
l.tally.Counter(metrics.KafkaPartitionAckMgrDups).Inc(1)
return list.Null, fmt.Errorf("new value %v is not greater than last stored value %v", value, l.lastValue)
}
skipped := value - l.lastValue - 1
if skipped > 0 {
l.tally.Counter(metrics.KafkaPartitionAckMgrSkipped).Inc(skipped)
}
addr, err := l.list.Add(value)
if err == nil {
l.lastValue = value
}
return addr, err
}
// Reset blocks until the list is empty then sets lastValue to -1.
func (l *threadSafeSortedList) Reset() {
doneC := make(chan struct{})
checkInterval := time.NewTicker(resetCheckInterval)
go func() {
for {
select {
case <-checkInterval.C:
l.Lock()
if l.list.Empty() {
l.lastValue = -1
close(doneC)
}
l.Unlock()
case <-doneC:
return
}
}
}()
<-doneC // block until the list is reset
checkInterval.Stop()
}