-
Notifications
You must be signed in to change notification settings - Fork 3
/
consumer.go
133 lines (111 loc) · 3.22 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package main
import (
"context"
"github.com/Shopify/sarama"
"github.com/signalfx/golib/datapoint"
"github.com/signalfx/golib/event"
"github.com/signalfx/golib/sfxclient"
"log"
"strconv"
"sync/atomic"
)
type parser interface {
parse(msg []byte, dps chan *datapoint.Datapoint, events chan *event.Event) (int, error)
}
type consumer struct {
consumer consumerGroup
config *config
parser parser
dps chan *datapoint.Datapoint
evts chan *event.Event
id int
stats struct {
numMessages int64
numMetricsParsed int64
numErrs int64
numReplacements int64
numParseErrs int64
numTelegrafParseErrs int64
}
cancel context.CancelFunc
}
func (*consumer) Setup(sess sarama.ConsumerGroupSession) error {
log.Printf("I! Starting consumer for claims %v", sess.Claims())
return nil
}
func (*consumer) Cleanup(sess sarama.ConsumerGroupSession) error {
log.Printf("I! Cleaning up consumer for claims %v", sess.Claims())
return nil
}
func (c *consumer) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
atomic.AddInt64(&c.stats.numMessages, 1)
if numMetrics, err := c.parser.parse(msg.Value, c.dps, c.evts); err == nil {
atomic.AddInt64(&c.stats.numMetricsParsed, int64(numMetrics))
} else {
atomic.AddInt64(&c.stats.numParseErrs, 1)
log.Printf("E! Message Parse Error\nmessage: %v\nerror: %s", msg, err)
}
sess.MarkMessage(msg, "")
}
return nil
}
func (c *consumer) close() error {
log.Printf("I! Waiting for consumer to stop")
if err := c.consumer.Close(); err != nil {
log.Printf("E! Error closing consumer: %s", err.Error())
return err
}
return nil
}
func (c *consumer) Datapoints() []*datapoint.Datapoint {
dims := map[string]string{"path": "kafka_consumer", "obj": "consumer", "consumer_id": strconv.Itoa(c.id)}
dps := []*datapoint.Datapoint{
sfxclient.CumulativeP("total_messages_received", dims, &c.stats.numMessages),
sfxclient.CumulativeP("total_messages_parsed", dims, &c.stats.numMetricsParsed),
sfxclient.CumulativeP("total_errors_received", dims, &c.stats.numErrs),
sfxclient.CumulativeP("total_parse_errors_received", dims, &c.stats.numParseErrs),
}
return dps
}
func newConsumer(ctxt context.Context, c *config, id int, topics []string, dps chan *datapoint.Datapoint, evts chan *event.Event) (*consumer, error) {
log.Printf("I! Topics being monitored: %s", topics)
cc, err := c.getConsumerGroup(c.offset, hostname + "-" + strconv.Itoa(id))
if err != nil {
return nil, err
}
parser, err := c.getParser()
if err != nil {
return nil, err
}
consumer := &consumer{
consumer: cc,
config: c,
parser: parser,
dps: dps,
evts: evts,
id: id,
}
go func() {
for {
if err := cc.Consume(ctxt, topics, consumer); err != nil {
if err := cc.Close(); err != nil {
log.Printf("W! unable to close consumer on Consume error: %s", err)
}
log.Printf("E! Consume returned error: %s", err)
}
if ctxt.Err() != nil {
return
}
}
}()
go func() {
for err := range cc.Errors() {
atomic.AddInt64(&consumer.stats.numErrs, 1)
if err != nil {
log.Printf("E! consumer Error: %s", err.Error())
}
}
}()
return consumer, nil
}