-
Notifications
You must be signed in to change notification settings - Fork 1
/
metrics.go
133 lines (104 loc) · 3.52 KB
/
metrics.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package main
import (
"log"
"net/http"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
func serveMetrics(addr, path string) {
const me = "serveMetrics"
log.Printf("%s: starting metrics server at: %s %s", me, addr, path)
http.Handle(path, promhttp.Handler())
err := http.ListenAndServe(addr, nil)
log.Fatalf("%s: ListenAndServe error: %v", me, err)
}
type metrics struct {
buffer *prometheus.GaugeVec
receive *prometheus.CounterVec
receiveError *prometheus.CounterVec
receiveEmpty *prometheus.CounterVec
receiveMessages *prometheus.CounterVec
publishError *prometheus.CounterVec
deleteError *prometheus.CounterVec
delivery *prometheus.CounterVec
latency *prometheus.HistogramVec
}
const (
countSuffix = "_total"
bufferName = "buffer"
receiveName = "receive" + countSuffix
receiveErrorName = "receive_error" + countSuffix
receiveEmptyName = "receive_empty" + countSuffix
receiveMessagesName = "receive_messages" + countSuffix
publishErrorName = "publish_error" + countSuffix
deleteErrorName = "delete_error" + countSuffix
deliveryName = "delivery" + countSuffix
latencyName = "delivery_duration_seconds"
)
func newCounter(namespace, name, desc string) *prometheus.CounterVec {
const me = "newCounter"
c := prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: name,
Help: desc,
},
[]string{"queue"},
)
if err := prometheus.Register(c); err != nil {
log.Fatalf("%s: receive was not registered: %s", me, err)
}
return c
}
func newMetrics(namespace string, latencyBuckets []float64) *metrics {
const me = "newMetrics"
//
// buffer
//
buffer := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Name: bufferName,
Help: "How many SQS messages are buffered with us, partitioned by queue.",
},
[]string{"queue"},
)
if err := prometheus.Register(buffer); err != nil {
log.Fatalf("%s: buffer was not registered: %s", me, err)
}
//
// latency
//
latency := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Name: latencyName,
Help: "How long it took to fully process the delivery, partitioned by queue.",
Buckets: latencyBuckets,
},
[]string{"queue"},
)
if err := prometheus.Register(latency); err != nil {
log.Fatalf("%s: latency was not registered: %s", me, err)
}
//
// all metrics
//
m := &metrics{
buffer: buffer,
receive: newCounter(namespace, receiveName, "How many SQS receives called, partitioned by queue."),
receiveError: newCounter(namespace, receiveErrorName, "How many SQS receives errored, partitioned by queue."),
receiveEmpty: newCounter(namespace, receiveEmptyName, "How many SQS empty receives, partitioned by queue."),
receiveMessages: newCounter(namespace, receiveMessagesName, "How many SQS messages received, partitioned by queue."),
publishError: newCounter(namespace, publishErrorName, "How many SNS publishes errored, partitioned by queue."),
deleteError: newCounter(namespace, deleteErrorName, "How many SQS deletes errored, partitioned by queue."),
delivery: newCounter(namespace, deliveryName, "How many SQS deliveries fully processed, partitioned by queue."),
latency: latency,
}
return m
}
func (m *metrics) recordDelivery(queue string, latency time.Duration) {
m.delivery.WithLabelValues(queue).Inc()
m.latency.WithLabelValues(queue).Observe(float64(latency) / float64(time.Second))
}