/
consumer.go
129 lines (102 loc) · 4.54 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package nrsarama
import (
"context"
"net/http"
"github.com/Shopify/sarama"
"github.com/newrelic/go-agent/v3/internal"
"github.com/newrelic/go-agent/v3/newrelic"
)
func init() { internal.TrackUsage("integration", "messagebroker", "saramakafka") }
type ConsumerWrapper struct {
consumerGroup sarama.ConsumerGroup
}
type ConsumerHandler struct {
app *newrelic.Application
txn *newrelic.Transaction
topic string
clientID string
saramaConfig *sarama.Config
messageHandler func(ctx context.Context, message *sarama.ConsumerMessage)
}
// NOTE: Creates and ends one transaction per claim consumed
// NewConsumerHandlerFromApp takes in a new relic application and creates a transaction using it
func NewConsumerHandlerFromApp(app *newrelic.Application, topic string, clientID string, saramaConfig *sarama.Config, messageHandler func(ctx context.Context, message *sarama.ConsumerMessage)) *ConsumerHandler {
return &ConsumerHandler{
app: app,
topic: topic,
messageHandler: messageHandler,
saramaConfig: saramaConfig,
clientID: clientID,
}
}
// NewConsumerHandlerFromTxn takes in a new relic transaction. No application instance is required
func NewConsumerHandlerFromTxn(txn *newrelic.Transaction, topic string, clientID string, saramaConfig *sarama.Config, messageHandler func(ctx context.Context, message *sarama.ConsumerMessage)) *ConsumerHandler {
return &ConsumerHandler{
txn: txn,
topic: topic,
messageHandler: messageHandler,
saramaConfig: saramaConfig,
clientID: clientID,
}
}
func (cw *ConsumerWrapper) Consume(ctx context.Context, handler *ConsumerHandler) error {
txn := newrelic.FromContext(ctx)
consume := cw.consumerGroup.Consume(ctx, []string{handler.topic}, handler)
if consume != nil {
txn.Application().RecordCustomMetric("MessageBroker/Kafka/Heartbeat/Fail", 1.0)
}
return nil
}
// Setup is ran at the beginning of a new session
func (ch *ConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error {
// Record session timeout/poll timeout intervals
ch.app.RecordCustomMetric("MessageBroker/Kafka/Heartbeat/SessionTimeout", ch.saramaConfig.Consumer.Group.Session.Timeout.Seconds())
ch.app.RecordCustomMetric("MessageBroker/Kafka/Heartbeat/PollTimeout", ch.saramaConfig.Consumer.Group.Heartbeat.Interval.Seconds())
return nil
}
// Cleanup is ran at the end of a new session
func (ch *ConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func ClaimIngestion(ch *ConsumerHandler, session sarama.ConsumerGroupSession, message *sarama.ConsumerMessage) {
// if txn exists, make claims segments of that txn otherwise create a new one
txn := ch.txn
if ch.txn == nil {
txn = ch.app.StartTransaction("kafkaconsumer")
}
ctx := newrelic.NewContext(context.Background(), txn)
segment := txn.StartSegment("Message/Kafka/Topic/Consume/Named/" + ch.topic)
// Deserialized key/value
deserializeKeySegment := txn.StartSegment("MessageBroker/Kafka/Topic/Named/" + ch.topic + "/Deserialization/Key")
key := string(message.Key)
deserializeKeySegment.End()
deserializeVaueSegment := txn.StartSegment("MessageBroker/Kafka/Topic/Named/" + ch.topic + "/Deserialization/Value")
value := string(message.Value)
deserializeVaueSegment.End()
ch.processMessage(ctx, message, key, value)
segment.End()
session.MarkMessage(message, "")
// Heartbeat metric to log a new message received successfully
txn.Application().RecordCustomMetric("MessageBroker/Kafka/Heartbeat/Receive", 1.0)
txn.End()
}
func (ch *ConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
ClaimIngestion(ch, session, message)
}
return nil
}
func (ch *ConsumerHandler) processMessage(ctx context.Context, message *sarama.ConsumerMessage, key string, value string) {
txn := newrelic.FromContext(ctx)
messageHandlingSegment := txn.StartSegment("Message/Kafka/Topic/Consume/Named/" + ch.topic + "/MessageProcessing/")
ch.messageHandler(ctx, message)
byteCount := float64(len(message.Value))
hdrs := http.Header{}
for _, hdr := range message.Headers {
hdrs.Add(string(hdr.Key), string(hdr.Value))
}
txn.InsertDistributedTraceHeaders(hdrs)
txn.AddAttribute("kafka.consume.byteCount", byteCount)
txn.AddAttribute("kafka.consume.ClientID", ch.clientID)
txn.Application().RecordCustomMetric("Message/Kafka/Topic/Named/"+ch.topic+"/Received/Bytes", byteCount)
txn.Application().RecordCustomMetric("Message/Kafka/Topic/Named/"+ch.topic+"/Received/Messages", 1)
messageHandlingSegment.End()
}