-
Notifications
You must be signed in to change notification settings - Fork 2
/
context.go
49 lines (44 loc) · 1.59 KB
/
context.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package kafkaconsumer
import (
"context"
"github.com/sabariramc/goserverbase/v5/instrumentation/span"
"github.com/sabariramc/goserverbase/v5/kafka"
"github.com/sabariramc/goserverbase/v5/log"
)
func (k *KafkaConsumerServer) GetCorrelationParams(headers map[string]string) *log.CorrelationParam {
cr := &log.CorrelationParam{}
cr.LoadFromHeader(headers)
if cr.CorrelationID == "" {
return log.GetDefaultCorrelationParam(k.c.ServiceName)
}
return cr
}
func (k *KafkaConsumerServer) GetCustomerID(headers map[string]string) *log.CustomerIdentifier {
id := &log.CustomerIdentifier{}
id.LoadFromHeader(headers)
return id
}
func (k *KafkaConsumerServer) GetMessageContext(msg *kafka.Message) context.Context {
msgCtx := context.Background()
corr := k.GetCorrelationParams(msg.GetHeaders())
identity := k.GetCustomerID(msg.GetHeaders())
msgCtx = k.GetContextWithCorrelation(msgCtx, k.GetCorrelationParams(msg.GetHeaders()))
msgCtx = k.GetContextWithCustomerId(msgCtx, identity)
if k.tracer != nil {
var span span.Span
msgCtx, span = k.tracer.StartKafkaSpanFromMessage(msgCtx, msg.Message)
span.SetAttribute("correlationId", corr.CorrelationID)
span.SetAttribute("messaging.kafka.topic", msg.Topic)
span.SetAttribute("messaging.kafka.partition", msg.Partition)
span.SetAttribute("messaging.kafka.offset", msg.Offset)
span.SetAttribute("messaging.kafka.key", string(msg.Key))
span.SetAttribute("messaging.kafka.timestamp", msg.Time.UnixMilli())
data := identity.GetPayload()
for key, value := range data {
if value != "" {
span.SetAttribute("user."+key, value)
}
}
}
return msgCtx
}