/
instaamqp091.go
147 lines (114 loc) · 3.71 KB
/
instaamqp091.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// (c) Copyright IBM Corp. 2023
//go:build go1.16
// +build go1.16
package instaamqp091
import (
"net/url"
instana "github.com/instana/go-sensor"
amqp "github.com/rabbitmq/amqp091-go"
ot "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
otlog "github.com/opentracing/opentracing-go/log"
)
const (
consume = "consume"
publish = "publish"
operation = "rabbitmq"
)
// PubCons contains all methods that we want to instrument from the amqp library
type PubCons interface {
Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error
Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error)
}
// AmqpChannel is a wrapper around the amqp.Channel object and contains all the relevant information to be tracked
type AmqpChannel struct {
url string
pc PubCons
sensor instana.TracerLogger
}
// Publish replaces the original amqp.Channel.Publish method in order to collect the relevant data to be tracked
func (c AmqpChannel) Publish(entrySpan ot.Span, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error {
opts := []ot.StartSpanOption{
ext.SpanKindProducer,
ot.ChildOf(entrySpan.Context()),
ot.Tags{
"rabbitmq.exchange": exchange,
"rabbitmq.key": key,
"rabbitmq.sort": publish,
"rabbitmq.address": c.url,
},
}
logger := c.sensor.Logger()
tracer := c.sensor.Tracer()
sp := tracer.StartSpan(operation, opts...)
if msg.Headers == nil {
msg.Headers = amqp.Table{}
}
err := tracer.Inject(sp.Context(), ot.TextMap, &messageCarrier{msg.Headers, logger})
if err != nil {
logger.Debug(err)
}
res := c.pc.Publish(exchange, key, mandatory, immediate, msg)
resCopy := res
if resCopy != nil {
errorText := resCopy.Error()
sp.SetTag("rabbitmq.error", errorText)
sp.LogFields(otlog.Object("error", errorText))
}
sp.Finish()
return res
}
// Consume replaces the original amqp.Channel.Consume method in order to collect the relevant data to be tracked
func (c AmqpChannel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error) {
deliveryChan, err := c.pc.Consume(queue, consumer, autoAck, exclusive, noLocal, noWait, args)
if err == nil {
// Creates a pipe channel that receives the read data from the original channel
pipeCh := make(chan amqp.Delivery, cap(deliveryChan))
go func() {
for {
deliveryData, more := <-deliveryChan
if !more {
close(pipeCh)
return
}
c.consumeMessage(pipeCh, deliveryData, queue)
}
}()
return pipeCh, err
}
return deliveryChan, err
}
func (c AmqpChannel) consumeMessage(pipeCh chan amqp.Delivery, deliveryData amqp.Delivery, queue string) {
opts := []ot.StartSpanOption{
ext.SpanKindConsumer,
ot.Tags{
"rabbitmq.exchange": deliveryData.Exchange,
"rabbitmq.key": queue,
"rabbitmq.sort": consume,
"rabbitmq.address": c.url,
},
}
logger := c.sensor.Logger()
tracer := c.sensor.Tracer()
sc, err := tracer.Extract(ot.TextMap, &messageCarrier{deliveryData.Headers, logger})
if err != nil {
logger.Debug(err)
}
opts = append(opts, ot.ChildOf(sc))
sp := tracer.StartSpan(operation, opts...)
err = tracer.Inject(sp.Context(), ot.TextMap, &messageCarrier{deliveryData.Headers, logger})
if err != nil {
logger.Debug(err)
}
sp.Finish()
pipeCh <- deliveryData
}
// WrapChannel returns the AmqpChannel, which is Instana's wrapper around amqp.Channel
func WrapChannel(sensor instana.TracerLogger, ch PubCons, serverUrl string) *AmqpChannel {
sUrl := ""
urlObj, err := url.Parse(serverUrl)
if err == nil {
sUrl = urlObj.Scheme + "://" + urlObj.Host
}
return &AmqpChannel{sUrl, ch, sensor}
}