/
consumer_manager.go
429 lines (357 loc) · 12.7 KB
/
consumer_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
// package kafkaconsumer is a clone of github.com/wvanbergen/kafka/kafkaconsumer with sarama references
// replaced with gopkg.in links instead of "raw" Github
package kafkaconsumer
import (
"fmt"
"strings"
"sync"
"time"
"github.com/samuel/go-zookeeper/zk"
"github.com/wvanbergen/kazoo-go"
"gopkg.in/Shopify/sarama.v1"
"gopkg.in/tomb.v1"
)
// Consumer represents a consumer instance and is the main interface to work with as a consumer
// of this library.
type Consumer interface {
// Interrups will initiate the shutdown procedure of the consumer, and return immediately.
// When you are done using the consumer, you must either call Close or Interrupt to prevent leaking memory.
Interrupt()
// Closes will start the shutdown procedure for the consumer and wait for it to complete.
// When you are done using the consumer, you must either call Close or Interrupt to prevent leaking memory.
Close() error
// Messages returns a channel that you can read to obtain messages from Kafka to process.
// Every message that you receive from this channel should be sent to Ack after it has been processed.
Messages() <-chan *sarama.ConsumerMessage
// Error returns a channel that you can read to obtain errors that occur.
Errors() <-chan error
// Ack marks a message as processed, indicating that the message offset can be committed
// for the message's partition by the offset manager. Note that the offset manager may decide
// not to commit every offset immediately for efficiency reasons. Calling Close or Interrupt
// will make sure that the last offset provided to this function will be flushed to storage.
// You have to provide the messages in the same order as you received them from the Messages
// channel.
Ack(*sarama.ConsumerMessage)
}
// Join joins a Kafka consumer group, and returns a Consumer instance.
// - `group` is the name of the group this consumer instance will join . All instances that form
// a consumer group should use the same name. A group name must be unique per Kafka cluster.
// - `subscription` is an object that describes what partitions the group wants to consume.
// A single instance may end up consuming between zero of them, or all of them, or any number
// in between. Every running instance in a group should use the same subscription; the behavior
// is undefined if that is not the case.
// - `zookeeper` is the zookeeper connection string, e.g. "zk1:2181,zk2:2181,zk3:2181/chroot"
// - `config` specifies the configuration. If it is nil, a default configuration is used.
func Join(group string, subscription Subscription, zookeeper string, config *Config) (Consumer, error) {
if group == "" {
return nil, sarama.ConfigurationError("a group name cannot be empty")
}
if config == nil {
config = NewConfig()
}
var zkNodes []string
zkNodes, config.Zookeeper.Chroot = kazoo.ParseConnectionString(zookeeper)
if err := config.Validate(); err != nil {
return nil, err
}
cm := &consumerManager{
config: config,
subscription: subscription,
partitionManagers: make(map[string]*partitionManager),
messages: make(chan *sarama.ConsumerMessage, config.ChannelBufferSize),
errors: make(chan error, config.ChannelBufferSize),
}
if kz, err := kazoo.NewKazoo(zkNodes, config.Zookeeper); err != nil {
return nil, err
} else {
cm.kz = kz
}
cm.group = cm.kz.Consumergroup(group)
cm.instance = cm.group.NewInstance()
// Register the consumer group if it does not exist yet
if exists, err := cm.group.Exists(); err != nil {
cm.shutdown()
return nil, err
} else if !exists {
if err := cm.group.Create(); err != nil {
cm.shutdown()
return nil, err
}
}
// Register itself with zookeeper
data, err := subscription.JSON()
if err != nil {
cm.shutdown()
return nil, err
}
if err := cm.instance.RegisterWithSubscription(data); err != nil {
cm.shutdown()
return nil, err
} else {
cm.logf("Consumer instance registered (%s).", cm.instance.ID)
}
// Discover the Kafka brokers
brokers, err := cm.kz.BrokerList()
if err != nil {
cm.shutdown()
return nil, err
} else {
cm.logf("Discovered Kafka cluster at %s", strings.Join(brokers, ","))
}
// Initialize sarama client
if client, err := sarama.NewClient(brokers, config.Config); err != nil {
cm.shutdown()
return nil, err
} else {
cm.client = client
}
// Initialize sarama offset manager
if offsetManager, err := sarama.NewOffsetManagerFromClient(group, cm.client); err != nil {
cm.shutdown()
return nil, err
} else {
cm.offsetManager = offsetManager
}
// Initialize sarama consumer
if consumer, err := sarama.NewConsumerFromClient(cm.client); err != nil {
cm.shutdown()
return nil, err
} else {
cm.consumer = consumer
}
// Start the manager goroutine
go cm.run()
return cm, nil
}
// consumerManager implements the Consumer interface, and manages the goroutine that
// is responsible for spawning and terminating partitionManagers.
type consumerManager struct {
config *Config
subscription Subscription
kz *kazoo.Kazoo
group *kazoo.Consumergroup
instance *kazoo.ConsumergroupInstance
client sarama.Client
consumer sarama.Consumer
offsetManager sarama.OffsetManager
t tomb.Tomb
m sync.RWMutex
partitionManagers map[string]*partitionManager
messages chan *sarama.ConsumerMessage
errors chan error
}
func (cm *consumerManager) Messages() <-chan *sarama.ConsumerMessage {
return cm.messages
}
func (cm *consumerManager) Errors() <-chan error {
return cm.errors
}
func (cm *consumerManager) Interrupt() {
cm.t.Kill(nil)
}
func (cm *consumerManager) Close() error {
cm.Interrupt()
return cm.t.Wait()
}
// Ack will dispatch a message to the right partitionManager's ack
// function, so it can be marked as processed.
func (cm *consumerManager) Ack(msg *sarama.ConsumerMessage) {
cm.m.RLock()
defer cm.m.RUnlock()
partitionKey := fmt.Sprintf("%s/%d", msg.Topic, msg.Partition)
partitionManager := cm.partitionManagers[partitionKey]
if partitionManager == nil {
cm.logf("ERROR: acked message %d for %s, but this partition is not managed by this consumer!", msg.Offset, partitionKey)
} else {
partitionManager.ack(msg.Offset)
}
}
// run implements the main loop of the consumer manager.
// 1. Get partitions that the group subscribes to
// 2. Get the currently running instances
// 3. Distribute partitions over instances
// 4. Run partition consumers for the instances that are assigned to this instance
// 5. Watch zookeeper for changes in 1 & 2; start over when that happens.
func (cm *consumerManager) run() {
defer cm.shutdown()
for {
partitions, partitionsChanged, err := cm.watchSubscription()
if err != nil {
cm.t.Kill(err)
return
}
instances, instancesChanged, err := cm.watchConsumerInstances()
if err != nil {
cm.t.Kill(err)
return
}
cm.logf("Currently, %d instances are registered, to consume %d partitions in total.", len(instances), len(partitions))
var (
partitionDistribution = distributePartitionsBetweenConsumers(instances, retrievePartitionLeaders(partitions))
assignedPartitions = make(map[string]*kazoo.Partition)
)
for _, partition := range partitionDistribution[cm.instance.ID] {
assignedPartitions[partition.Key()] = partition
}
cm.managePartitionManagers(assignedPartitions)
select {
case <-cm.t.Dying():
cm.logf("Interrupted, shutting down...")
return
case <-partitionsChanged:
cm.logf("Woke up because the subscription reported a change in partitions.")
case <-instancesChanged:
cm.logf("Woke up because the list of running instances changed.")
}
}
}
// watchConsumerInstances retrieves the list of currently running consumer instances from Zookeeper,
// and sets a watch to be notified of changes to this list. It will retry for any error that may
// occur. If the consumer manager is interrupted, the error return value will be tomb.ErrDying. Any
// other error is non-recoverable.
func (cm *consumerManager) watchSubscription() (kazoo.PartitionList, <-chan zk.Event, error) {
var (
partitions kazoo.PartitionList
partitionsChanged <-chan zk.Event
err error
)
for {
partitions, partitionsChanged, err = cm.subscription.WatchPartitions(cm.kz)
if err != nil {
cm.logf("Failed to watch subscription: %s. Trying again in 1 second...", err)
select {
case <-cm.t.Dying():
return nil, nil, tomb.ErrDying
case <-time.After(1 * time.Second):
continue
}
}
return partitions, partitionsChanged, nil
}
}
// watchConsumerInstances retrieves the list of currently running consumer instances from Zookeeper,
// and sets a watch to be notified of changes to this list. It will retry for any error that may
// occur. If the consumer manager is interrupted, the error return value will be tomb.ErrDying. Any
// other error is non-recoverable.
func (cm *consumerManager) watchConsumerInstances() (kazoo.ConsumergroupInstanceList, <-chan zk.Event, error) {
var (
instances kazoo.ConsumergroupInstanceList
instancesChanged <-chan zk.Event
err error
)
for {
instances, instancesChanged, err = cm.group.WatchInstances()
if err != nil {
cm.logf("Failed to watch consumer group instances: %s. Trying again in 1 second...", err)
select {
case <-cm.t.Dying():
return nil, nil, tomb.ErrDying
case <-time.After(1 * time.Second):
continue
}
}
return instances, instancesChanged, err
}
}
// startPartitionManager starts a new partition manager in a a goroutine, and adds
// it to the partitionManagers map.
func (cm *consumerManager) startPartitionManager(partition *kazoo.Partition) {
pm := &partitionManager{
parent: cm,
partition: partition,
lastConsumedOffset: -1,
processingDone: make(chan struct{}),
}
cm.m.Lock()
cm.partitionManagers[pm.partition.Key()] = pm
cm.m.Unlock()
go pm.run()
}
// startPartitionManager stops a running partition manager, and rmoves it
// from the partitionManagers map.
func (cm *consumerManager) stopPartitionManager(pm *partitionManager) {
if err := pm.close(); err != nil {
pm.logf("Failed to cleanly shut down consumer: %s", err)
}
cm.m.Lock()
delete(cm.partitionManagers, pm.partition.Key())
cm.m.Unlock()
}
// managePartitionManagers will compare the currently running partition managers to the list
// of partitions that is assigned to this consumer instance, and will stop and start partition
// managers as appropriate
func (cm *consumerManager) managePartitionManagers(assignedPartitions map[string]*kazoo.Partition) {
var wg sync.WaitGroup
cm.m.RLock()
cm.logf("This instance is assigned to consume %d partitions, and is currently consuming %d partitions.", len(assignedPartitions), len(cm.partitionManagers))
// Stop consumers for partitions that we were not already consuming
for partitionKey, pm := range cm.partitionManagers {
if _, ok := assignedPartitions[partitionKey]; !ok {
wg.Add(1)
go func(pm *partitionManager) {
defer wg.Done()
cm.stopPartitionManager(pm)
}(pm)
}
}
// Start consumers for partitions that we were not already consuming
for partitionKey, partition := range assignedPartitions {
if _, ok := cm.partitionManagers[partitionKey]; !ok {
wg.Add(1)
go func(partition *kazoo.Partition) {
defer wg.Done()
cm.startPartitionManager(partition)
}(partition)
}
}
cm.m.RUnlock()
// Wait until all the interrupted partionManagers have shut down completely.
wg.Wait()
}
// shutdown cleanly shuts down the consumer manager:
// 1. stop all partition managers
// 2. close connection to Kafka cluster
// 3. deregister this running instance in zookeeper
// 4. close connection to zookeeper.
// 5. close messages and errors channels.
func (cm *consumerManager) shutdown() {
defer cm.t.Done()
cm.managePartitionManagers(nil)
if cm.consumer != nil {
if err := cm.consumer.Close(); err != nil {
cm.logf("Failed to close Kafka client: %s", err)
}
}
if cm.offsetManager != nil {
if err := cm.offsetManager.Close(); err != nil {
cm.logf("Failed to close offset manager: %s", err)
}
}
if cm.client != nil {
if err := cm.client.Close(); err != nil {
cm.logf("Failed to close Kafka offset manager: %s", err)
}
}
if cm.instance != nil {
if err := cm.instance.Deregister(); err != nil {
cm.logf("Failed to deregister consumer instance: %s", err)
}
}
if cm.kz != nil {
if err := cm.kz.Close(); err != nil {
cm.logf("Failed to close Zookeeper connection: %s", err)
}
}
close(cm.messages)
close(cm.errors)
}
func (cm *consumerManager) shortID() string {
if cm.instance == nil {
return "(defunct)"
} else {
return cm.instance.ID[len(cm.instance.ID)-12:]
}
}
func (cm *consumerManager) logf(format string, arguments ...interface{}) {
Logger.Printf(fmt.Sprintf("[instance=%s] %s", cm.shortID(), format), arguments...)
}