This repository has been archived by the owner on Nov 24, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
metrics.go
55 lines (46 loc) · 1.94 KB
/
metrics.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package gpubsub
import (
"context"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)
type metricsCollector interface {
onPublish(context.Context, string, error)
onHandle(context.Context, string, time.Duration, error)
}
type noopMetricsCollector struct{}
func (noopMetricsCollector) onPublish(context.Context, string, error) {}
func (noopMetricsCollector) onHandle(context.Context, string, time.Duration, error) {
}
type otelMetricsCollector struct {
metricPublishedMessages metric.Int64Counter
metricPublishMessageErrors metric.Int64Counter
metricHandledMessages metric.Int64Counter
metricHandleMessageErrors metric.Int64Counter
metricHandleMessageDurationMillis metric.Int64Histogram
}
func (c *otelMetricsCollector) onPublish(ctx context.Context, topicName string, err error) {
label := attribute.String("topic", topicName)
c.metricPublishedMessages.Add(ctx, 1, label)
if err != nil {
c.metricPublishMessageErrors.Add(ctx, 1, label)
}
}
func (c *otelMetricsCollector) onHandle(ctx context.Context, topicName string, timeTaken time.Duration, err error) {
label := attribute.String("topic", topicName)
c.metricHandledMessages.Add(ctx, 1, label)
c.metricHandleMessageDurationMillis.Record(ctx, timeTaken.Milliseconds(), label)
if err != nil {
c.metricHandleMessageErrors.Add(ctx, 1, label)
}
}
func (p *PubsubMsgBroker) initMetrics(meter metric.MeterMust) {
p.metrics = &otelMetricsCollector{
metricPublishedMessages: meter.NewInt64Counter("gpubsub_published_messages_total"),
metricPublishMessageErrors: meter.NewInt64Counter("gpubsub_publish_message_errors_total"),
metricHandledMessages: meter.NewInt64Counter("gpubsub_handled_messages_total"),
metricHandleMessageErrors: meter.NewInt64Counter("gpubsub_handle_message_errors_total"),
metricHandleMessageDurationMillis: meter.NewInt64Histogram("gpubsub_handle_message_duration_millis"),
}
}