-
-
Notifications
You must be signed in to change notification settings - Fork 17
/
opentelemetry.go
113 lines (98 loc) · 3.15 KB
/
opentelemetry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package otelcoap
import (
"context"
"github.com/plgd-dev/go-coap/v3/message"
"github.com/plgd-dev/go-coap/v3/message/codes"
"github.com/plgd-dev/go-coap/v3/message/pool"
tcpCoder "github.com/plgd-dev/go-coap/v3/tcp/coder"
udpCoder "github.com/plgd-dev/go-coap/v3/udp/coder"
pkgMessage "github.com/plgd-dev/hub/v2/coap-gateway/service/message"
"github.com/plgd-dev/hub/v2/pkg/opentelemetry"
"github.com/plgd-dev/kit/v2/codec/json"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/attribute"
semconv "go.opentelemetry.io/otel/semconv/v1.10.0"
"go.opentelemetry.io/otel/trace"
)
var (
messageSent = MessageType(otelgrpc.RPCMessageTypeSent)
messageReceived = MessageType(otelgrpc.RPCMessageTypeReceived)
COAPStatusCodeKey = attribute.Key("coap.status_code")
COAPMethodKey = attribute.Key("coap.method")
COAPPathKey = attribute.Key("coap.path")
COAPRequest = attribute.Key("coap.request")
)
type MessageType attribute.KeyValue
// Event adds an event of the messageType to the span associated with the
// passed context with id and size (if message is a proto message).
func (m MessageType) Event(ctx context.Context, msg *pool.Message) {
span := trace.SpanFromContext(ctx)
tcpMsg := message.Message{
Code: msg.Code(),
Token: msg.Token(),
Options: msg.Options(),
}
var coder interface {
Size(message.Message) (int, error)
}
if msg.Type() == message.Unset && msg.MessageID() < 0 {
coder = tcpCoder.DefaultCoder
} else {
coder = udpCoder.DefaultCoder
}
size, err := coder.Size(tcpMsg)
if err != nil {
size = 0
}
if bodySize, err := msg.BodySize(); err != nil {
size += int(bodySize)
}
span.AddEvent("message", trace.WithAttributes(
attribute.KeyValue(m),
semconv.MessageUncompressedSizeKey.Int(size),
))
}
func SetRequest(ctx context.Context, message *pool.Message) {
span := trace.SpanFromContext(ctx)
msg := pkgMessage.ToJson(message, true, false)
if msg.Body != nil {
request := ""
if body, ok := msg.Body.(string); ok {
request = body
} else {
v, err := json.Encode(msg.Body)
if err == nil {
request = string(v)
}
}
span.SetAttributes(COAPRequest.String(request))
}
}
func DefaultTransportFormatter(path string) string {
return "COAP " + path
}
func StatusCodeAttr(c codes.Code) attribute.KeyValue {
return COAPStatusCodeKey.Int64(int64(c))
}
func MessageReceivedEvent(ctx context.Context, message *pool.Message) {
messageReceived.Event(ctx, message)
}
func MessageSentEvent(ctx context.Context, message *pool.Message) {
messageSent.Event(ctx, message)
}
func Start(ctx context.Context, path, method string, opts ...Option) (context.Context, trace.Span) {
cfg := newConfig(opts...)
tracer := cfg.TracerProvider.Tracer(
InstrumentationName,
trace.WithInstrumentationVersion(opentelemetry.SemVersion()),
)
attrs := []attribute.KeyValue{
COAPMethodKey.String(method),
COAPPathKey.String(path),
}
spanOpts := []trace.SpanStartOption{trace.WithAttributes(attrs...)}
if len(cfg.SpanStartOptions) > 0 {
spanOpts = append(spanOpts, cfg.SpanStartOptions...)
}
return tracer.Start(ctx, DefaultTransportFormatter(path), spanOpts...)
}