/
metrics.go
81 lines (65 loc) · 2 KB
/
metrics.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package nats
import (
"context"
"github.com/nats-io/nats.go"
"github.com/tel-io/tel/v2"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
"go.opentelemetry.io/otel/metric/instrument/syncint64"
"time"
)
type metrics struct {
counters map[string]syncint64.Counter
valueRecorders map[string]syncfloat64.Histogram
}
func createMeasures(tele tel.Telemetry, meter metric.Meter) *metrics {
counters := make(map[string]syncint64.Counter)
valueRecorders := make(map[string]syncfloat64.Histogram)
counter, err := meter.SyncInt64().Counter(Count)
if err != nil {
tele.Panic("nats mw", tel.String("key", Count))
}
requestBytesCounter, err := meter.SyncInt64().Counter(ContentLength)
if err != nil {
tele.Panic("nats mw", tel.String("key", ContentLength))
}
serverLatencyMeasure, err := meter.SyncFloat64().Histogram(Latency)
if err != nil {
tele.Panic("nats mw", tel.String("key", Latency))
}
counters[Count] = counter
counters[ContentLength] = requestBytesCounter
valueRecorders[Latency] = serverLatencyMeasure
return &metrics{
counters: counters,
valueRecorders: valueRecorders,
}
}
// SubMetrics implement Middleware interface
type SubMetrics struct {
*metrics
}
func NewMetrics(m *metrics) *SubMetrics {
return &SubMetrics{metrics: m}
}
func (t *SubMetrics) apply(next MsgHandler) MsgHandler {
return func(ctx context.Context, msg *nats.Msg) (err error) {
defer func(start time.Time) {
kind := extractBaggageKind(ctx)
if ctx.Err() != nil {
err = ctx.Err()
ctx = tel.FromCtx(ctx).Ctx()
}
attr := []attribute.KeyValue{
IsError.Bool(err != nil),
Subject.String(decreaseSubjectCardinality(msg.Subject)),
Kind.String(kind),
}
t.counters[Count].Add(ctx, 1, attr...)
t.counters[ContentLength].Add(ctx, int64(len(msg.Data)), attr...)
t.valueRecorders[Latency].Record(ctx, float64(time.Since(start).Milliseconds()), attr...)
}(time.Now())
return next(ctx, msg)
}
}