/
sync_producer.go
210 lines (180 loc) · 6.7 KB
/
sync_producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
// (c) Copyright IBM Corp. 2023
//go:build go1.17
// +build go1.17
package instasarama
import (
"bytes"
"github.com/IBM/sarama"
instana "github.com/instana/go-sensor"
ot "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
otlog "github.com/opentracing/opentracing-go/log"
)
// SyncProducer is a wrapper for sarama.SyncProducer that instruments its calls using
// provided instana.Sensor
type SyncProducer struct {
sarama.SyncProducer
sensor instana.TracerLogger
propageContext bool
}
// NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration, and
// instruments its calls
func NewSyncProducer(addrs []string, config *sarama.Config, sensor instana.TracerLogger) (sarama.SyncProducer, error) {
sp, err := sarama.NewSyncProducer(addrs, config)
if err != nil {
return sp, err
}
return WrapSyncProducer(sp, config, sensor), nil
}
// NewSyncProducerFromClient creates a new SyncProducer using the given client, and instruments its calls
func NewSyncProducerFromClient(client sarama.Client, sensor instana.TracerLogger) (sarama.SyncProducer, error) {
sp, err := sarama.NewSyncProducerFromClient(client)
if err != nil {
return sp, err
}
return WrapSyncProducer(sp, client.Config(), sensor), nil
}
// WrapSyncProducer wraps an existing sarama.SyncProducer instance and instruments its calls. It requires the same
// config that was used to create this producer to detect the Kafka version and whether it's supposed to return
// successes/errors. To initialize a new sync producer instance use instasarama.NewSyncProducer() and
// instasarama.NewSyncProducerFromClient() convenience methods instead
func WrapSyncProducer(sp sarama.SyncProducer, config *sarama.Config, sensor instana.TracerLogger) *SyncProducer {
return &SyncProducer{
SyncProducer: sp,
sensor: sensor,
propageContext: contextPropagationSupported(config.Version),
}
}
// SendMessage picks up the trace context previously added to the message with
// instasarama.ProducerMessageWithSpan(), starts a new child span and injects its
// context into the message headers before sending it to the underlying producer.
// The call will not be traced if there the message does not contain trace context
func (p *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (int32, int64, error) {
sp := startProducerSpan(p.sensor, msg)
if sp != nil {
defer sp.Finish()
carrier := ProducerMessageCarrier{msg}
if p.propageContext {
// forward the trace context, updating the span ids
sp.Tracer().Inject(sp.Context(), ot.TextMap, carrier)
} else {
// remove previously added trace headers
carrier.RemoveAll()
}
}
partition, offset, err := p.SyncProducer.SendMessage(msg)
if err != nil && sp != nil {
sp.SetTag("kafka.error", err)
sp.LogFields(otlog.Error(err))
}
return partition, offset, err
}
// SendMessages starts a new span and injects its context into messages headers before
// sending them with the underlying producer.
//
// This method attempts to use the existing trace context found in message headers.
// There will be NO SPAN CREATED for this call if messages originate from different trace contexts.
// A possible use case that result in such behavior would be if messages resulted from different
// HTTP requests are buffered and later being sent in one batch asynchronously.
// In case you want your batch publish operation to be a part of a specific trace, make sure that
// you inject the parent span of this trace explicitly before calling `SendMessages()`, i.e.
//
// type MessageCollector struct {
// CollectedMessages []*sarama.ProducerMessage
// producer *instasarama.SyncProducer
// // ...
// }
//
// func (c MessageCollector) Flush(ctx context.Context) error {
// // extract the parent span from context and use it to continue the trace
// if parentSpan, ok := instana.SpanFromContext(ctx); ok {
// // start a new span for the batch send job
// sp := parentSpan.Tracer().StartSpan("batch-send", ot.ChilfOf(parentSpan.Context()))
// defer sp.Finish()
//
// // inject the trace context into every collected message, overriding the existing one
// for i, msg := range c.CollectedMessages {
// c.CollectedMessages = instasarama.ProducerMessageWithSpan(msg, sp)
// }
// }
//
// return c.producer.SendMessages(c.CollectedMessages)
// }
func (p *SyncProducer) SendMessages(msgs []*sarama.ProducerMessage) error {
if len(msgs) == 0 {
return nil
}
var sp ot.Span
if producerMessagesFromSameContext(msgs) {
sp = startProducerSpan(p.sensor, msgs[0])
}
if sp != nil {
defer sp.Finish()
instana.BatchSize(len(msgs)).Set(sp)
// collect unique topics from the rest of messages and inject trace context in one go
topics := make(map[string]struct{})
for _, msg := range msgs {
if _, ok := topics[msg.Topic]; !ok {
topics[msg.Topic] = struct{}{}
}
// forward the trace context, updating the span id
sp.Tracer().Inject(sp.Context(), ot.TextMap, ProducerMessageCarrier{msg})
}
// send topics as a comma-separated string
buf := bytes.NewBuffer(nil)
for topic := range topics {
buf.WriteString(topic)
buf.WriteByte(',')
}
buf.Truncate(buf.Len() - 1) // truncate trailing comma
sp.SetTag("kafka.service", buf.String())
}
err := p.SyncProducer.SendMessages(msgs)
if err != nil && sp != nil {
sp.SetTag("kafka.error", err)
sp.LogFields(otlog.Error(err))
}
return err
}
// startSpan picks up the existing trace context provided in the message and returns a new child
// span. It returns nil if there is no valid context provided in the message
func startProducerSpan(sensor instana.TracerLogger, msg *sarama.ProducerMessage) ot.Span {
switch sc, err := sensor.Tracer().Extract(ot.TextMap, ProducerMessageCarrier{msg}); err {
case nil:
return sensor.Tracer().StartSpan(
"kafka",
ext.SpanKindProducer,
ot.ChildOf(sc),
ot.Tags{
"kafka.service": msg.Topic,
"kafka.access": "send",
},
)
case ot.ErrSpanContextNotFound:
sensor.Logger().Debug("no span context provided in message to ", msg.Topic, ", skipping the call", msg.Topic)
case ot.ErrUnsupportedFormat:
sensor.Logger().Info("unsupported span context format provided in message to ", msg.Topic, ", skipping the call")
default:
sensor.Logger().Warn("failed to extract span context from producer message headers: ", err)
}
return nil
}
func producerMessagesFromSameContext(msgs []*sarama.ProducerMessage) bool {
if len(msgs) == 0 {
return true
}
firstTraceID, firstSpanID, err := extractTraceSpanID(msgs[0])
if err != nil {
return false
}
for _, msg := range msgs[1:] {
traceID, spanID, err := extractTraceSpanID(msg)
if err != nil {
return false
}
if traceID != firstTraceID || spanID != firstSpanID {
return false
}
}
return true
}