forked from DataDog/dd-trace-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
kafka.go
407 lines (367 loc) · 12.9 KB
/
kafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.
// Package kafka provides functions to trace the confluentinc/confluent-kafka-go package (https://github.com/confluentinc/confluent-kafka-go).
package kafka // import "github.com/nowfred/dd-trace-go/contrib/confluentinc/confluent-kafka-go/kafka"
import (
"context"
"math"
"time"
"github.com/nowfred/dd-trace-go/datastreams"
"github.com/nowfred/dd-trace-go/datastreams/options"
"github.com/nowfred/dd-trace-go/ddtrace"
"github.com/nowfred/dd-trace-go/ddtrace/ext"
"github.com/nowfred/dd-trace-go/ddtrace/tracer"
"github.com/nowfred/dd-trace-go/internal/log"
"github.com/nowfred/dd-trace-go/internal/telemetry"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
const (
// make sure these 3 are updated to V2 for the V2 version.
componentName = "confluentinc/confluent-kafka-go/kafka"
packageName = "contrib/confluentinc/confluent-kafka-go/kafka"
integrationName = "github.com/confluentinc/confluent-kafka-go"
)
func init() {
telemetry.LoadIntegration(componentName)
tracer.MarkIntegrationImported(integrationName)
}
// NewConsumer calls kafka.NewConsumer and wraps the resulting Consumer.
func NewConsumer(conf *kafka.ConfigMap, opts ...Option) (*Consumer, error) {
c, err := kafka.NewConsumer(conf)
if err != nil {
return nil, err
}
opts = append(opts, WithConfig(conf))
return WrapConsumer(c, opts...), nil
}
// NewProducer calls kafka.NewProducer and wraps the resulting Producer.
func NewProducer(conf *kafka.ConfigMap, opts ...Option) (*Producer, error) {
p, err := kafka.NewProducer(conf)
if err != nil {
return nil, err
}
opts = append(opts, WithConfig(conf))
return WrapProducer(p, opts...), nil
}
// A Consumer wraps a kafka.Consumer.
type Consumer struct {
*kafka.Consumer
cfg *config
events chan kafka.Event
prev ddtrace.Span
}
// WrapConsumer wraps a kafka.Consumer so that any consumed events are traced.
func WrapConsumer(c *kafka.Consumer, opts ...Option) *Consumer {
wrapped := &Consumer{
Consumer: c,
cfg: newConfig(opts...),
}
log.Debug("%s: Wrapping Consumer: %#v", packageName, wrapped.cfg)
wrapped.events = wrapped.traceEventsChannel(c.Events())
return wrapped
}
func (c *Consumer) traceEventsChannel(in chan kafka.Event) chan kafka.Event {
// in will be nil when consuming via the events channel is not enabled
if in == nil {
return nil
}
out := make(chan kafka.Event, 1)
go func() {
defer close(out)
for evt := range in {
var next ddtrace.Span
// only trace messages
if msg, ok := evt.(*kafka.Message); ok {
next = c.startSpan(msg)
setConsumeCheckpoint(c.cfg.dataStreamsEnabled, c.cfg.groupID, msg)
} else if offset, ok := evt.(kafka.OffsetsCommitted); ok {
commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, offset.Offsets, offset.Error)
}
out <- evt
if c.prev != nil {
c.prev.Finish()
}
c.prev = next
}
// finish any remaining span
if c.prev != nil {
c.prev.Finish()
c.prev = nil
}
}()
return out
}
func (c *Consumer) startSpan(msg *kafka.Message) ddtrace.Span {
opts := []tracer.StartSpanOption{
tracer.ServiceName(c.cfg.consumerServiceName),
tracer.ResourceName("Consume Topic " + *msg.TopicPartition.Topic),
tracer.SpanType(ext.SpanTypeMessageConsumer),
tracer.Tag(ext.MessagingKafkaPartition, msg.TopicPartition.Partition),
tracer.Tag("offset", msg.TopicPartition.Offset),
tracer.Tag(ext.Component, componentName),
tracer.Tag(ext.SpanKind, ext.SpanKindConsumer),
tracer.Tag(ext.MessagingSystem, ext.MessagingSystemKafka),
tracer.Measured(),
}
if c.cfg.bootstrapServers != "" {
opts = append(opts, tracer.Tag(ext.KafkaBootstrapServers, c.cfg.bootstrapServers))
}
if c.cfg.tagFns != nil {
for key, tagFn := range c.cfg.tagFns {
opts = append(opts, tracer.Tag(key, tagFn(msg)))
}
}
if !math.IsNaN(c.cfg.analyticsRate) {
opts = append(opts, tracer.Tag(ext.EventSampleRate, c.cfg.analyticsRate))
}
// kafka supports headers, so try to extract a span context
carrier := NewMessageCarrier(msg)
if spanctx, err := tracer.Extract(carrier); err == nil {
opts = append(opts, tracer.ChildOf(spanctx))
}
span, _ := tracer.StartSpanFromContext(c.cfg.ctx, c.cfg.consumerSpanName, opts...)
// reinject the span context so consumers can pick it up
tracer.Inject(span.Context(), carrier)
return span
}
// Close calls the underlying Consumer.Close and if polling is enabled, finishes
// any remaining span.
func (c *Consumer) Close() error {
err := c.Consumer.Close()
// we only close the previous span if consuming via the events channel is
// not enabled, because otherwise there would be a data race from the
// consuming goroutine.
if c.events == nil && c.prev != nil {
c.prev.Finish()
c.prev = nil
}
return err
}
// Events returns the kafka Events channel (if enabled). Message events will be
// traced.
func (c *Consumer) Events() chan kafka.Event {
return c.events
}
// Poll polls the consumer for messages or events. Message will be
// traced.
func (c *Consumer) Poll(timeoutMS int) (event kafka.Event) {
if c.prev != nil {
c.prev.Finish()
c.prev = nil
}
evt := c.Consumer.Poll(timeoutMS)
if msg, ok := evt.(*kafka.Message); ok {
setConsumeCheckpoint(c.cfg.dataStreamsEnabled, c.cfg.groupID, msg)
c.prev = c.startSpan(msg)
} else if offset, ok := evt.(kafka.OffsetsCommitted); ok {
commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, offset.Offsets, offset.Error)
}
return evt
}
// ReadMessage polls the consumer for a message. Message will be traced.
func (c *Consumer) ReadMessage(timeout time.Duration) (*kafka.Message, error) {
if c.prev != nil {
c.prev.Finish()
c.prev = nil
}
msg, err := c.Consumer.ReadMessage(timeout)
if err != nil {
return nil, err
}
setConsumeCheckpoint(c.cfg.dataStreamsEnabled, c.cfg.groupID, msg)
c.prev = c.startSpan(msg)
return msg, nil
}
// Commit commits current offsets and tracks the commit offsets if data streams is enabled.
func (c *Consumer) Commit() ([]kafka.TopicPartition, error) {
tps, err := c.Consumer.Commit()
commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, tps, err)
return tps, err
}
// CommitMessage commits a message and tracks the commit offsets if data streams is enabled.
func (c *Consumer) CommitMessage(msg *kafka.Message) ([]kafka.TopicPartition, error) {
tps, err := c.Consumer.CommitMessage(msg)
commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, tps, err)
return tps, err
}
// CommitOffsets commits provided offsets and tracks the commit offsets if data streams is enabled.
func (c *Consumer) CommitOffsets(offsets []kafka.TopicPartition) ([]kafka.TopicPartition, error) {
tps, err := c.Consumer.CommitOffsets(offsets)
commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, tps, err)
return tps, err
}
func commitOffsets(dataStreamsEnabled bool, groupID string, tps []kafka.TopicPartition, err error) {
if err != nil || groupID == "" || !dataStreamsEnabled {
return
}
for _, tp := range tps {
tracer.TrackKafkaCommitOffset(groupID, *tp.Topic, tp.Partition, int64(tp.Offset))
}
}
func trackProduceOffsets(dataStreamsEnabled bool, msg *kafka.Message, err error) {
if err != nil || !dataStreamsEnabled || msg.TopicPartition.Topic == nil {
return
}
tracer.TrackKafkaProduceOffset(*msg.TopicPartition.Topic, msg.TopicPartition.Partition, int64(msg.TopicPartition.Offset))
}
// A Producer wraps a kafka.Producer.
type Producer struct {
*kafka.Producer
cfg *config
produceChannel chan *kafka.Message
events chan kafka.Event
libraryVersion int
}
// WrapProducer wraps a kafka.Producer so requests are traced.
func WrapProducer(p *kafka.Producer, opts ...Option) *Producer {
version, _ := kafka.LibraryVersion()
wrapped := &Producer{
Producer: p,
cfg: newConfig(opts...),
events: p.Events(),
libraryVersion: version,
}
log.Debug("%s: Wrapping Producer: %#v", packageName, wrapped.cfg)
wrapped.produceChannel = wrapped.traceProduceChannel(p.ProduceChannel())
if wrapped.cfg.dataStreamsEnabled {
wrapped.events = wrapped.traceEventsChannel(p.Events())
}
return wrapped
}
// Events returns the kafka Events channel (if enabled). Message events will be monitored
// with data streams monitoring (if enabled)
func (p *Producer) Events() chan kafka.Event {
return p.events
}
func (p *Producer) traceProduceChannel(out chan *kafka.Message) chan *kafka.Message {
if out == nil {
return out
}
in := make(chan *kafka.Message, 1)
go func() {
for msg := range in {
span := p.startSpan(msg)
setProduceCheckpoint(p.cfg.dataStreamsEnabled, p.libraryVersion, msg)
out <- msg
span.Finish()
}
}()
return in
}
func (p *Producer) startSpan(msg *kafka.Message) ddtrace.Span {
opts := []tracer.StartSpanOption{
tracer.ServiceName(p.cfg.producerServiceName),
tracer.ResourceName("Produce Topic " + *msg.TopicPartition.Topic),
tracer.SpanType(ext.SpanTypeMessageProducer),
tracer.Tag(ext.Component, componentName),
tracer.Tag(ext.SpanKind, ext.SpanKindProducer),
tracer.Tag(ext.MessagingSystem, ext.MessagingSystemKafka),
tracer.Tag(ext.MessagingKafkaPartition, msg.TopicPartition.Partition),
}
if p.cfg.bootstrapServers != "" {
opts = append(opts, tracer.Tag(ext.KafkaBootstrapServers, p.cfg.bootstrapServers))
}
if !math.IsNaN(p.cfg.analyticsRate) {
opts = append(opts, tracer.Tag(ext.EventSampleRate, p.cfg.analyticsRate))
}
//if there's a span context in the headers, use that as the parent
carrier := NewMessageCarrier(msg)
if spanctx, err := tracer.Extract(carrier); err == nil {
opts = append(opts, tracer.ChildOf(spanctx))
}
span, _ := tracer.StartSpanFromContext(p.cfg.ctx, p.cfg.producerSpanName, opts...)
// inject the span context so consumers can pick it up
tracer.Inject(span.Context(), carrier)
return span
}
// Close calls the underlying Producer.Close and also closes the internal
// wrapping producer channel.
func (p *Producer) Close() {
close(p.produceChannel)
p.Producer.Close()
}
// Produce calls the underlying Producer.Produce and traces the request.
func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error {
span := p.startSpan(msg)
// if the user has selected a delivery channel, we will wrap it and
// wait for the delivery event to finish the span
if deliveryChan != nil {
oldDeliveryChan := deliveryChan
deliveryChan = make(chan kafka.Event)
go func() {
var err error
evt := <-deliveryChan
if msg, ok := evt.(*kafka.Message); ok {
// delivery errors are returned via TopicPartition.Error
err = msg.TopicPartition.Error
trackProduceOffsets(p.cfg.dataStreamsEnabled, msg, err)
}
span.Finish(tracer.WithError(err))
oldDeliveryChan <- evt
}()
}
setProduceCheckpoint(p.cfg.dataStreamsEnabled, p.libraryVersion, msg)
err := p.Producer.Produce(msg, deliveryChan)
// with no delivery channel, finish immediately
if deliveryChan == nil {
span.Finish(tracer.WithError(err))
}
return err
}
// ProduceChannel returns a channel which can receive kafka Messages and will
// send them to the underlying producer channel.
func (p *Producer) ProduceChannel() chan *kafka.Message {
return p.produceChannel
}
func (p *Producer) traceEventsChannel(in chan kafka.Event) chan kafka.Event {
if in == nil {
return nil
}
out := make(chan kafka.Event, 1)
go func() {
defer close(out)
for evt := range in {
if msg, ok := evt.(*kafka.Message); ok {
trackProduceOffsets(p.cfg.dataStreamsEnabled, msg, msg.TopicPartition.Error)
}
out <- evt
}
}()
return out
}
func setConsumeCheckpoint(dataStreamsEnabled bool, groupID string, msg *kafka.Message) {
if !dataStreamsEnabled || msg == nil {
return
}
edges := []string{"direction:in", "topic:" + *msg.TopicPartition.Topic, "type:kafka"}
if groupID != "" {
edges = append(edges, "group:"+groupID)
}
carrier := NewMessageCarrier(msg)
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBase64Carrier(context.Background(), carrier), options.CheckpointParams{PayloadSize: getMsgSize(msg)}, edges...)
if !ok {
return
}
datastreams.InjectToBase64Carrier(ctx, carrier)
}
func setProduceCheckpoint(dataStreamsEnabled bool, version int, msg *kafka.Message) {
if !dataStreamsEnabled || msg == nil {
return
}
edges := []string{"direction:out", "topic:" + *msg.TopicPartition.Topic, "type:kafka"}
carrier := NewMessageCarrier(msg)
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBase64Carrier(context.Background(), carrier), options.CheckpointParams{PayloadSize: getMsgSize(msg)}, edges...)
if !ok || version < 0x000b0400 {
// headers not supported before librdkafka >=0.11.4
return
}
datastreams.InjectToBase64Carrier(ctx, carrier)
}
func getMsgSize(msg *kafka.Message) (size int64) {
for _, header := range msg.Headers {
size += int64(len(header.Key) + len(header.Value))
}
return size + int64(len(msg.Value)+len(msg.Key))
}