-
Notifications
You must be signed in to change notification settings - Fork 12
/
utils.go
145 lines (125 loc) · 4.75 KB
/
utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package tracing
import (
"context"
"encoding/json"
"github.com/openline-ai/openline-customer-os/packages/server/customer-os-common-module/tracing"
"github.com/openline-ai/openline-customer-os/packages/server/events-processing-platform/constants"
"github.com/openline-ai/openline-customer-os/packages/server/events-processing-platform/eventstore"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/opentracing/opentracing-go/log"
"google.golang.org/grpc/metadata"
)
const (
SpanTagTenant = "tenant"
SpanTagUserId = "user-id"
SpanTagComponent = "component"
SpanTagAggregateId = "aggregateID"
SpanTagEntityId = "entity-id"
SpanTagRedundantEventSkipped = "redundantEventSkipped"
)
func StartGrpcServerTracerSpan(ctx context.Context, operationName string) (context.Context, opentracing.Span) {
textMapCarrierFromMetaData := GetTextMapCarrierFromMetaData(ctx)
span, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, textMapCarrierFromMetaData)
if err != nil {
serverSpan := opentracing.GlobalTracer().StartSpan(operationName)
ctx = opentracing.ContextWithSpan(ctx, serverSpan)
return ctx, serverSpan
}
serverSpan := opentracing.GlobalTracer().StartSpan(operationName, ext.RPCServerOption(span))
ctx = opentracing.ContextWithSpan(ctx, serverSpan)
return ctx, serverSpan
}
func StartProjectionTracerSpan(ctx context.Context, operationName string, event eventstore.Event) (context.Context, opentracing.Span) {
textMapCarrierFromMetaData := GetTextMapCarrierFromEvent(event)
span, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, textMapCarrierFromMetaData)
if err != nil {
serverSpan := opentracing.GlobalTracer().StartSpan(operationName)
ctx = opentracing.ContextWithSpan(ctx, serverSpan)
return ctx, serverSpan
}
serverSpan := opentracing.GlobalTracer().StartSpan(operationName, ext.RPCServerOption(span))
ctx = opentracing.ContextWithSpan(ctx, serverSpan)
return ctx, serverSpan
}
func GetTextMapCarrierFromEvent(event eventstore.Event) opentracing.TextMapCarrier {
metadataMap := make(opentracing.TextMapCarrier)
err := json.Unmarshal(event.GetMetadata(), &metadataMap)
if err != nil {
return metadataMap
}
return metadataMap
}
func GetTextMapCarrierFromMetaData(ctx context.Context) opentracing.TextMapCarrier {
metadataMap := make(opentracing.TextMapCarrier)
if md, ok := metadata.FromIncomingContext(ctx); ok {
for key := range md.Copy() {
metadataMap.Set(key, md.Get(key)[0])
}
}
return metadataMap
}
func InjectTextMapCarrier(spanCtx opentracing.SpanContext) (opentracing.TextMapCarrier, error) {
m := make(opentracing.TextMapCarrier)
if err := opentracing.GlobalTracer().Inject(spanCtx, opentracing.TextMap, m); err != nil {
return nil, err
}
return m, nil
}
func ExtractTextMapCarrier(spanCtx opentracing.SpanContext) opentracing.TextMapCarrier {
textMapCarrier, err := InjectTextMapCarrier(spanCtx)
if err != nil {
return make(opentracing.TextMapCarrier)
}
return textMapCarrier
}
func TraceErr(span opentracing.Span, err error, fields ...log.Field) {
tracing.TraceErr(span, err, fields...)
}
func LogObjectAsJson(span opentracing.Span, name string, object any) {
tracing.LogObjectAsJson(span, name, object)
}
func SetNeo4jRepositorySpanTags(ctx context.Context, span opentracing.Span, tenant string) {
setTenantSpanTag(span, tenant)
span.SetTag(SpanTagComponent, constants.ComponentNeo4jRepository)
}
func SetServiceSpanTags(ctx context.Context, span opentracing.Span, tenant, loggedInUserId string) {
setTenantSpanTag(span, tenant)
setUseridSpanTag(span, loggedInUserId)
span.SetTag(SpanTagComponent, constants.ComponentService)
}
func SetCommandHandlerSpanTags(ctx context.Context, span opentracing.Span, tenant, userId string) {
setTenantSpanTag(span, tenant)
setUseridSpanTag(span, userId)
span.SetTag(SpanTagComponent, constants.ComponentService)
}
func setTenantSpanTag(span opentracing.Span, tenant string) {
if tenant != "" {
span.SetTag(SpanTagTenant, tenant)
}
}
func setUseridSpanTag(span opentracing.Span, userId string) {
if userId != "" {
span.SetTag(SpanTagUserId, userId)
}
}
func InjectSpanContextIntoGrpcMetadata(ctx context.Context, span opentracing.Span) context.Context {
if span != nil {
// Inject the span context into the gRPC request metadata.
textMapCarrier := make(opentracing.TextMapCarrier)
err := span.Tracer().Inject(span.Context(), opentracing.TextMap, textMapCarrier)
if err == nil {
// Add the injected metadata to the gRPC context.
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.New(nil)
}
for key, val := range textMapCarrier {
md.Set(key, val)
}
ctx = metadata.NewOutgoingContext(ctx, md)
return ctx
}
}
return ctx
}