forked from opentracing-contrib/go-amqp
/
tracer.go
57 lines (54 loc) · 1.56 KB
/
tracer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package amqptracer
import (
opentracing "github.com/opentracing/opentracing-go"
amqp "github.com/rabbitmq/amqp091-go"
)
// Inject injects the span context into the AMQP header.
//
// Example:
//
// func PublishMessage(
// ctx context.Context,
// ch *amqp.Channel,
// exchange, key string,
// mandatory, immediate bool,
// msg *amqp.Publishing,
// ) error {
// sp := opentracing.SpanFromContext(ctx)
// defer sp.Finish()
//
// // Inject the span context into the AMQP header.
// if err := amqptracer.Inject(sp, msg.Headers); err != nil {
// return err
// }
//
// // Publish the message with the span context.
// return ch.Publish(exchange, key, mandatory, immediate, msg)
// }
func Inject(span opentracing.Span, hdrs amqp.Table) error {
c := amqpHeadersCarrier(hdrs)
return span.Tracer().Inject(span.Context(), opentracing.TextMap, c)
}
// Extract extracts the span context out of the AMQP header.
//
// Example:
//
// func ConsumeMessage(ctx context.Context, msg *amqp.Delivery) error {
// // Extract the span context out of the AMQP header.
// spCtx, _ := amqptracer.Extract(msg.Headers)
// sp := opentracing.StartSpan(
// "ConsumeMessage",
// opentracing.FollowsFrom(spCtx),
// )
// defer sp.Finish()
//
// // Update the context with the span for the subsequent reference.
// ctx = opentracing.ContextWithSpan(ctx, sp)
//
// // Actual message processing.
// return ProcessMessage(ctx, msg)
// }
func Extract(hdrs amqp.Table) (opentracing.SpanContext, error) {
c := amqpHeadersCarrier(hdrs)
return opentracing.GlobalTracer().Extract(opentracing.TextMap, c)
}