-
-
Notifications
You must be signed in to change notification settings - Fork 158
/
metadata.go
470 lines (409 loc) · 12.5 KB
/
metadata.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
package kgo
import (
"context"
"sync"
"time"
"github.com/twmb/franz-go/pkg/kerr"
)
type metawait struct {
mu sync.Mutex
c *sync.Cond
lastUpdate time.Time
}
func (m *metawait) init() { m.c = sync.NewCond(&m.mu) }
func (m *metawait) signal() {
m.mu.Lock()
m.lastUpdate = time.Now()
m.mu.Unlock()
m.c.Broadcast()
}
// waitmeta returns immediately if metadata was updated within the last second,
// otherwise this waits for up to wait for a metadata update to complete.
func (cl *Client) waitmeta(ctx context.Context, wait time.Duration) {
now := time.Now()
cl.metawait.mu.Lock()
if now.Sub(cl.metawait.lastUpdate) < time.Second {
cl.metawait.mu.Unlock()
return
}
cl.metawait.mu.Unlock()
cl.triggerUpdateMetadataNow()
quit := false
done := make(chan struct{})
timeout := time.NewTimer(wait)
defer timeout.Stop()
go func() {
defer close(done)
cl.metawait.mu.Lock()
defer cl.metawait.mu.Unlock()
for !quit {
if now.Sub(cl.metawait.lastUpdate) < time.Second {
return
}
cl.metawait.c.Wait()
}
}()
select {
case <-done:
return
case <-timeout.C:
case <-ctx.Done():
case <-cl.ctx.Done():
}
cl.metawait.mu.Lock()
quit = true
cl.metawait.mu.Unlock()
cl.metawait.c.Broadcast()
}
func (cl *Client) triggerUpdateMetadata() {
select {
case cl.updateMetadataCh <- struct{}{}:
default:
}
}
func (cl *Client) triggerUpdateMetadataNow() {
select {
case cl.updateMetadataNowCh <- struct{}{}:
default:
}
}
// updateMetadataLoop updates metadata whenever the update ticker ticks,
// or whenever deliberately triggered.
func (cl *Client) updateMetadataLoop() {
defer close(cl.metadone)
var consecutiveErrors int
var lastAt time.Time
ticker := time.NewTicker(cl.cfg.metadataMaxAge)
defer ticker.Stop()
for {
var now bool
select {
case <-cl.ctx.Done():
return
case <-ticker.C:
case <-cl.updateMetadataCh:
case <-cl.updateMetadataNowCh:
now = true
}
var nowTries int
start:
nowTries++
if !now {
if wait := cl.cfg.metadataMinAge - time.Since(lastAt); wait > 0 {
timer := time.NewTimer(wait)
select {
case <-cl.ctx.Done():
timer.Stop()
return
case <-cl.updateMetadataNowCh:
timer.Stop()
case <-timer.C:
}
}
} else {
// Even with an "update now", we sleep just a bit to allow some
// potential pile on now triggers.
time.Sleep(50 * time.Millisecond)
}
// Drain any refires that occured during our waiting.
select {
case <-cl.updateMetadataCh:
default:
}
select {
case <-cl.updateMetadataNowCh:
default:
}
again, err := cl.updateMetadata()
if again || err != nil {
if now && nowTries < 10 {
goto start
}
cl.triggerUpdateMetadata()
}
if err == nil {
lastAt = time.Now()
consecutiveErrors = 0
continue
}
consecutiveErrors++
after := time.NewTimer(cl.cfg.retryBackoff(consecutiveErrors))
select {
case <-cl.ctx.Done():
after.Stop()
return
case <-after.C:
}
}
}
// updateMetadata updates all of a client's topic's metadata, returning whether
// a new update needs scheduling or if an error occured.
//
// If any topics or partitions have an error, all record buffers in the topic,
// or the record buffer for each erroring partition, has the first batch's
// try count bumped by one.
func (cl *Client) updateMetadata() (needsRetry bool, err error) {
defer cl.metawait.signal()
topics := cl.loadTopics()
toUpdate := make([]string, 0, len(topics))
for topic := range topics {
toUpdate = append(toUpdate, topic)
}
meta, all, err := cl.fetchTopicMetadata(toUpdate)
if err != nil {
return true, err
}
// If we are consuming with regex and thus fetched all topics, the
// metadata may have returned topics we are not yet tracking.
// We have to add those topics to our topics map so that we can
// save their information in the merge just below.
if all {
var cloned bool
cl.topicsMu.Lock()
topics = cl.loadTopics()
for topic := range meta {
if _, exists := topics[topic]; !exists {
if !cloned {
topics = cl.cloneTopics()
cloned = true
}
topics[topic] = newTopicPartitions(topic)
}
}
if cloned {
cl.topics.Store(topics)
}
cl.topicsMu.Unlock()
}
// Merge the producer side of the update.
for topic, oldParts := range topics {
newParts, exists := meta[topic]
if !exists {
continue
}
needsRetry = cl.mergeTopicPartitions(oldParts, newParts) || needsRetry
}
// Trigger any consumer updates.
cl.consumer.doOnMetadataUpdate()
return needsRetry, nil
}
// fetchTopicMetadata fetches metadata for all reqTopics and returns new
// topicPartitionsData for each topic.
func (cl *Client) fetchTopicMetadata(reqTopics []string) (map[string]*topicPartitionsData, bool, error) {
cl.consumer.mu.Lock()
all := cl.consumer.typ == consumerTypeDirect && cl.consumer.direct.regexTopics ||
cl.consumer.typ == consumerTypeGroup && cl.consumer.group.regexTopics
cl.consumer.mu.Unlock()
meta, err := cl.fetchMetadata(cl.ctx, all, reqTopics)
if err != nil {
return nil, all, err
}
topics := make(map[string]*topicPartitionsData, len(reqTopics))
cl.brokersMu.RLock()
defer cl.brokersMu.RUnlock()
for i := range meta.Topics {
topicMeta := &meta.Topics[i]
parts := &topicPartitionsData{
loadErr: kerr.ErrorForCode(topicMeta.ErrorCode),
isInternal: topicMeta.IsInternal,
all: make(map[int32]*topicPartition, len(topicMeta.Partitions)),
writable: make(map[int32]*topicPartition, len(topicMeta.Partitions)),
}
topics[topicMeta.Topic] = parts
if parts.loadErr != nil {
continue
}
for i := range topicMeta.Partitions {
partMeta := &topicMeta.Partitions[i]
leaderEpoch := partMeta.LeaderEpoch
if meta.Version < 7 {
leaderEpoch = -1
}
p := &topicPartition{
loadErr: kerr.ErrorForCode(partMeta.ErrorCode),
leader: partMeta.Leader,
leaderEpoch: leaderEpoch,
records: &recBuf{
cl: cl,
topic: topicMeta.Topic,
partition: partMeta.Partition,
recBufsIdx: -1,
},
cursor: &cursor{
topic: topicMeta.Topic,
partition: partMeta.Partition,
leader: partMeta.Leader,
preferredReplica: -1,
keepControl: cl.cfg.keepControl,
cursorsIdx: -1,
seqOffset: seqOffset{
offset: -1, // required to not consume until needed
currentLeaderEpoch: leaderEpoch,
lastConsumedEpoch: -1, // required sentinel
},
},
}
broker, exists := cl.brokers[p.leader]
if !exists {
if p.loadErr == nil {
p.loadErr = &errUnknownBrokerForPartition{topicMeta.Topic, partMeta.Partition, p.leader}
}
} else {
p.records.sink = broker.sink
p.cursor.source = broker.source
}
parts.partitions = append(parts.partitions, partMeta.Partition)
parts.all[partMeta.Partition] = p
if p.loadErr == nil {
parts.writablePartitions = append(parts.writablePartitions, partMeta.Partition)
parts.writable[partMeta.Partition] = p
}
}
}
return topics, all, nil
}
// mergeTopicPartitions merges a new topicPartition into an old and returns
// whether the metadata update that caused this merge needs to be retried.
//
// Retries are necessary if the topic or any partition has a retriable error.
func (cl *Client) mergeTopicPartitions(l *topicPartitions, r *topicPartitionsData) (needsRetry bool) {
lv := *l.load() // copy so our field writes do not collide with reads
hadPartitions := len(lv.all) != 0
defer func() { cl.storePartitionsUpdate(l, &lv, hadPartitions) }()
lv.loadErr = r.loadErr
lv.isInternal = r.isInternal
if r.loadErr != nil {
retriable := kerr.IsRetriable(r.loadErr)
if retriable {
for _, topicPartition := range lv.all {
topicPartition.records.bumpRepeatedLoadErr(lv.loadErr)
}
} else {
for _, topicPartition := range lv.all {
topicPartition.records.failAllRecords(lv.loadErr)
}
}
return retriable
}
lv.partitions = r.partitions
lv.writablePartitions = r.writablePartitions
// We should have no deleted partitions, but there are two cases where
// we could.
//
// 1) an admin added partitions, we saw, then we re-fetched metadata
// from an out of date broker that did not have the new partitions
// 2) a topic was deleted and recreated with fewer partitions
//
// Both of these scenarios should be rare to non-existent. If we see a
// delete partition, we remove it from sinks / sources and error all
// buffered records for it. This isn't the best behavior in the first
// scenario, but it isn't showstopping. The new broker will eventually
// see the new partitions and we will eventually pick them up. In the
// latter, we avoid trying to forever produce to a partition that truly
// does no longer exist.
var deleted []*topicPartition
// Migrating topicPartitions is a little tricky because we have to
// worry about map contents.
//
// We update everything appropriately in the new r.all, and after
// this loop we copy the updated map to lv.all (which is stored
// atomically after the defer above).
for part, oldTP := range lv.all {
newTP, exists := r.all[part]
if !exists {
// Individual partitions cannot be deleted, so if this
// partition does not exist anymore, either the topic
// was deleted and recreated, which we do not handle
// yet (and cannot on most Kafka's), or the broker we
// fetched metadata from is out of date.
//
// TODO we should store a "first seen deleted" time
// and only delete after a given amount of time.
// It should be rare, but theoretically we can fetch
// from two brokers in quick succession where one
// has out of date metadata that will cause us to
// delete the partition.
//
// This is a minor harm (clients can re-publish any
// records failed due to ErrPartitionDeleted, and
// consumption will just pause for these partitions),
// so it is not critical.
deleted = append(deleted, oldTP)
continue
}
if newTP.loadErr != nil { // partition errors should generally be temporary
err := newTP.loadErr
*newTP = *oldTP
newTP.loadErr = err
newTP.records.bumpRepeatedLoadErr(newTP.loadErr)
needsRetry = true
continue
}
// Update the old's leader epoch before we do any pointer
// copying. Our epoch should not go backwards, but just in
// case, we can guard against it.
if newTP.leaderEpoch < oldTP.leaderEpoch {
continue
}
// If the new sink is the same as the old, we simply copy over
// the records pointer and maybe begin draining again.
//
// We always clear the failing state; migration does this itself.
if newTP.records.sink == oldTP.records.sink {
newTP.records = oldTP.records
newTP.records.clearFailing()
} else {
oldTP.migrateProductionTo(newTP)
}
// The cursor source could be different because we could be
// fetching from a preferred replica.
if newTP.cursor.leader == oldTP.cursor.leader {
newTP.cursor = oldTP.cursor
newTP.cursor.clearFailing()
} else {
oldTP.migrateCursorTo(newTP)
}
}
// Anything left with a negative recBufsIdx / cursorsIdx is a new topic
// partition. We use this to add the new tp's records to its sink.
// Same reasoning applies to the cursor offset.
for _, newTP := range r.all {
// If the partition has a load error, even if it is new, we
// can't do anything with it now. Its record sink and source
// cursor will be nil.
if newTP.loadErr != nil {
continue
}
if newTP.records.recBufsIdx == -1 {
newTP.records.sink.addRecBuf(newTP.records)
newTP.cursor.source.addCursor(newTP.cursor)
}
}
lv.all = r.all
lv.writable = r.writable
// Handle anything deleted. We do this serially so that something can't
// re-trigger a metadata update and have some logic collide with our
// deletion cleanup.
if len(deleted) > 0 {
cl.handleDeletedPartitions(deleted)
}
// The left writable map needs no further updates: all changes above
// happened to r.all, of which r.writable contains a subset of.
// Modifications to r.all are seen in r.writable.
return needsRetry
}
// handleDeletedPartitions calls all promises in all records in all partitions
// in deleted with ErrPartitionDeleted, as well as removes topic partition
// cursors from their sources.
//
// We can encounter a deleted partition if a topic is deleted and recreated
// with fewer partitions. We have to clear the cursors so that if more
// partitions are reencountered in the future, they will be used.
func (cl *Client) handleDeletedPartitions(deleted []*topicPartition) {
for _, d := range deleted {
d.records.sink.removeRecBuf(d.records)
d.records.failAllRecords(ErrPartitionDeleted)
d.cursor.source.removeCursor(d.cursor)
cl.consumer.deletePartition(d)
}
}