forked from influxdata/telegraf
/
kafka_consumer.go
166 lines (144 loc) · 3.87 KB
/
kafka_consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
package kafka_consumer
import (
"log"
"strings"
"sync"
"github.com/influxdb/influxdb/models"
"github.com/influxdb/telegraf/plugins/inputs"
"github.com/Shopify/sarama"
"github.com/wvanbergen/kafka/consumergroup"
)
type Kafka struct {
ConsumerGroup string
Topics []string
ZookeeperPeers []string
Consumer *consumergroup.ConsumerGroup
PointBuffer int
Offset string
sync.Mutex
// channel for all incoming kafka messages
in <-chan *sarama.ConsumerMessage
// channel for all kafka consumer errors
errs <-chan *sarama.ConsumerError
// channel for all incoming parsed kafka points
pointChan chan models.Point
done chan struct{}
// doNotCommitMsgs tells the parser not to call CommitUpTo on the consumer
// this is mostly for test purposes, but there may be a use-case for it later.
doNotCommitMsgs bool
}
var sampleConfig = `
# topic(s) to consume
topics = ["telegraf"]
# an array of Zookeeper connection strings
zookeeper_peers = ["localhost:2181"]
# the name of the consumer group
consumer_group = "telegraf_metrics_consumers"
# Maximum number of points to buffer between collection intervals
point_buffer = 100000
# Offset (must be either "oldest" or "newest")
offset = "oldest"
`
func (k *Kafka) SampleConfig() string {
return sampleConfig
}
func (k *Kafka) Description() string {
return "Read line-protocol metrics from Kafka topic(s)"
}
func (k *Kafka) Start() error {
k.Lock()
defer k.Unlock()
var consumerErr error
config := consumergroup.NewConfig()
switch strings.ToLower(k.Offset) {
case "oldest", "":
config.Offsets.Initial = sarama.OffsetOldest
case "newest":
config.Offsets.Initial = sarama.OffsetNewest
default:
log.Printf("WARNING: Kafka consumer invalid offset '%s', using 'oldest'\n",
k.Offset)
config.Offsets.Initial = sarama.OffsetOldest
}
if k.Consumer == nil || k.Consumer.Closed() {
k.Consumer, consumerErr = consumergroup.JoinConsumerGroup(
k.ConsumerGroup,
k.Topics,
k.ZookeeperPeers,
config,
)
if consumerErr != nil {
return consumerErr
}
// Setup message and error channels
k.in = k.Consumer.Messages()
k.errs = k.Consumer.Errors()
}
k.done = make(chan struct{})
if k.PointBuffer == 0 {
k.PointBuffer = 100000
}
k.pointChan = make(chan models.Point, k.PointBuffer)
// Start the kafka message reader
go k.parser()
log.Printf("Started the kafka consumer service, peers: %v, topics: %v\n",
k.ZookeeperPeers, k.Topics)
return nil
}
// parser() reads all incoming messages from the consumer, and parses them into
// influxdb metric points.
func (k *Kafka) parser() {
for {
select {
case <-k.done:
return
case err := <-k.errs:
log.Printf("Kafka Consumer Error: %s\n", err.Error())
case msg := <-k.in:
points, err := models.ParsePoints(msg.Value)
if err != nil {
log.Printf("Could not parse kafka message: %s, error: %s",
string(msg.Value), err.Error())
}
for _, point := range points {
select {
case k.pointChan <- point:
continue
default:
log.Printf("Kafka Consumer buffer is full, dropping a point." +
" You may want to increase the point_buffer setting")
}
}
if !k.doNotCommitMsgs {
// TODO(cam) this locking can be removed if this PR gets merged:
// https://github.com/wvanbergen/kafka/pull/84
k.Lock()
k.Consumer.CommitUpto(msg)
k.Unlock()
}
}
}
}
func (k *Kafka) Stop() {
k.Lock()
defer k.Unlock()
close(k.done)
if err := k.Consumer.Close(); err != nil {
log.Printf("Error closing kafka consumer: %s\n", err.Error())
}
}
func (k *Kafka) Gather(acc inputs.Accumulator) error {
k.Lock()
defer k.Unlock()
npoints := len(k.pointChan)
for i := 0; i < npoints; i++ {
point := <-k.pointChan
acc.AddFields(point.Name(), point.Fields(), point.Tags(), point.Time())
}
return nil
}
func init() {
inputs.Add("kafka_consumer", func() inputs.Input {
return &Kafka{}
})
}