/
producer.go
123 lines (101 loc) 路 5.34 KB
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package producer
import (
"context"
"fmt"
"time"
"github.com/mehdihadeli/go-ecommerce-microservices/internal/pkg/core/metadata"
messageHeader "github.com/mehdihadeli/go-ecommerce-microservices/internal/pkg/messaging/message_header"
messageTracing "github.com/mehdihadeli/go-ecommerce-microservices/internal/pkg/messaging/otel/tracing"
"github.com/mehdihadeli/go-ecommerce-microservices/internal/pkg/messaging/types"
"github.com/mehdihadeli/go-ecommerce-microservices/internal/pkg/otel/tracing"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/baggage"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
"go.opentelemetry.io/otel/trace"
)
// https://devandchill.com/posts/2021/12/go-step-by-step-guide-for-implementing-tracing-on-a-microservices-architecture-2/2/
// https://github.com/open-telemetry/opentelemetry-go-contrib/blob/v0.12.0/instrumentation/github.com/Shopify/sarama/otelsarama/producer.go
// https://opentelemetry.io/docs/reference/specification/trace/semantic_conventions/messaging/
// https://opentelemetry.io/docs/instrumentation/go/manual/#semantic-attributes
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#messaging-attributes
// https://trstringer.com/otel-part5-propagation/
func StartProducerSpan(
ctx context.Context,
message types.IMessage,
meta *metadata.Metadata,
payload string,
producerTracingOptions *ProducerTracingOptions,
) (context.Context, trace.Span) {
ctx = addAfterBaggage(ctx, message, meta)
// If there's a span context in the message, use that as the parent context.
// extracts the tracing from the header and puts it into the context
carrier := messageTracing.NewMessageCarrier(meta)
parentSpanContext := otel.GetTextMapPropagator().Extract(ctx, carrier)
opts := getTraceOptions(meta, message, payload, producerTracingOptions)
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#span-name
// SpanName = Destination Name + Operation Name
ctx, span := messageTracing.MessagingTracer.Start(
parentSpanContext,
fmt.Sprintf("%s %s", producerTracingOptions.Destination, "send"),
opts...)
span.AddEvent(fmt.Sprintf("start publishing message '%s' to the broker", messageHeader.GetMessageName(*meta)))
// Injects current span context, so consumers can use it to propagate span.
// injects the tracing from the context into the header map
otel.GetTextMapPropagator().Inject(ctx, carrier)
// we don't want next trace (AfterProduce) becomes child of this span, so we should not use new ctx for (AfterProducer) span. if already exists a span on ctx next span will be a child of that span
return ctx, span
}
func FinishProducerSpan(span trace.Span, err error) error {
messageName := tracing.GetSpanAttribute(span, messageTracing.MessageName).Value.AsString()
if err != nil {
span.AddEvent(fmt.Sprintf("failed to publsih message '%s' to the broker", messageName))
_ = messageTracing.TraceMessagingErrFromSpan(span, err)
}
span.SetAttributes(
attribute.Key(tracing.TraceId).String(span.SpanContext().TraceID().String()),
attribute.Key(tracing.SpanId).String(span.SpanContext().SpanID().String()), // current span id
)
span.AddEvent(fmt.Sprintf("message '%s' published to the broker succesfully", messageName))
span.End()
return err
}
func getTraceOptions(
meta *metadata.Metadata,
message types.IMessage,
payload string,
producerTracingOptions *ProducerTracingOptions,
) []trace.SpanStartOption {
correlationId := messageHeader.GetCorrelationId(*meta)
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#topic-with-multiple-consumers
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#batch-receiving
attrs := []attribute.KeyValue{
semconv.MessageIDKey.String(message.GeMessageId()),
semconv.MessagingMessageConversationID(correlationId),
attribute.Key(messageTracing.MessageType).String(message.GetEventTypeName()),
attribute.Key(messageTracing.MessageName).String(messageHeader.GetMessageName(*meta)),
attribute.Key(messageTracing.Payload).String(payload),
attribute.String(messageTracing.Headers, meta.ToJson()),
attribute.Key(tracing.Timestamp).Int64(time.Now().UnixMilli()),
semconv.MessagingDestinationName(producerTracingOptions.Destination),
semconv.MessagingSystemKey.String(producerTracingOptions.MessagingSystem),
semconv.MessagingOperationKey.String("send"),
}
if producerTracingOptions.OtherAttributes != nil && len(producerTracingOptions.OtherAttributes) > 0 {
attrs = append(attrs, producerTracingOptions.OtherAttributes...)
}
opts := []trace.SpanStartOption{
trace.WithAttributes(attrs...),
trace.WithSpanKind(trace.SpanKindProducer),
}
return opts
}
func addAfterBaggage(ctx context.Context, message types.IMessage, meta *metadata.Metadata) context.Context {
correlationId := messageHeader.GetCorrelationId(*meta)
correlationIdBag, _ := baggage.NewMember(string(semconv.MessagingMessageConversationIDKey), correlationId)
messageIdBag, _ := baggage.NewMember(string(semconv.MessageIDKey), message.GeMessageId())
b, _ := baggage.New(correlationIdBag, messageIdBag)
ctx = baggage.ContextWithBaggage(ctx, b)
// new context including baggage
return ctx
}