forked from instana/go-sensor
/
partition_consumer.go
65 lines (53 loc) · 1.76 KB
/
partition_consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// (c) Copyright IBM Corp. 2021
// (c) Copyright Instana Inc. 2020
package instasarama
import (
"github.com/Shopify/sarama"
instana "github.com/mier85/go-sensor"
ot "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
)
// PartitionConsumer is a wrapper for sarama.PartitionConsumer that instruments its calls using
// provided instana.Sensor
type PartitionConsumer struct {
sarama.PartitionConsumer
sensor *instana.Sensor
messages chan *sarama.ConsumerMessage
}
// WrapPartitionConsumer wraps sarama.PartitionConsumer instance and instruments its calls
func WrapPartitionConsumer(c sarama.PartitionConsumer, sensor *instana.Sensor) *PartitionConsumer {
pc := &PartitionConsumer{
PartitionConsumer: c,
sensor: sensor,
messages: make(chan *sarama.ConsumerMessage),
}
go pc.consumeMessages()
return pc
}
// Messages returns a channel of consumer messages of the underlying partition consumer
func (pc *PartitionConsumer) Messages() <-chan *sarama.ConsumerMessage {
return pc.messages
}
func (pc *PartitionConsumer) consumeMessages() {
for msg := range pc.PartitionConsumer.Messages() {
pc.consumeMessage(msg)
}
close(pc.messages)
}
func (pc *PartitionConsumer) consumeMessage(msg *sarama.ConsumerMessage) {
opts := []ot.StartSpanOption{
ext.SpanKindConsumer,
ot.Tags{
"kafka.service": msg.Topic,
"kafka.access": "consume",
},
}
if spanContext, ok := SpanContextFromConsumerMessage(msg, pc.sensor); ok {
opts = append(opts, ot.ChildOf(spanContext))
}
sp := pc.sensor.Tracer().StartSpan("kafka", opts...)
defer sp.Finish()
// inject consumer span context, so that it becomes a parent for subcalls
pc.sensor.Tracer().Inject(sp.Context(), ot.TextMap, ConsumerMessageCarrier{msg})
pc.messages <- msg
}