forked from cockroachdb/cockroach
-
Notifications
You must be signed in to change notification settings - Fork 0
/
metric.go
470 lines (399 loc) · 12.9 KB
/
metric.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
// Copyright 2015 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
//
// Author: Tobias Schottdorf (tobias.schottdorf@gmail.com)
package metric
import (
"encoding/json"
"fmt"
"math"
"sync/atomic"
"time"
"github.com/VividCortex/ewma"
"github.com/codahale/hdrhistogram"
"github.com/gogo/protobuf/proto"
prometheusgo "github.com/prometheus/client_model/go"
"github.com/rcrowley/go-metrics"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)
const (
// MaxLatency is the maximum value tracked in latency histograms. Higher
// values will be recorded as this value instead.
MaxLatency = 10 * time.Second
// TestSampleInterval is passed to histograms during tests which don't
// want to concern themselves with supplying a "correct" interval.
TestSampleInterval = time.Duration(math.MaxInt64)
// The number of histograms to keep in rolling window.
histWrapNum = 2
)
// Iterable provides a method for synchronized access to interior objects.
type Iterable interface {
// GetName returns the fully-qualified name of the metric.
GetName() string
// GetHelp returns the help text for the metric.
GetHelp() string
// Inspect calls the given closure with each contained item.
Inspect(func(interface{}))
}
// PrometheusExportable is the standard interface for an individual metric
// that can be exported to prometheus.
type PrometheusExportable interface {
// GetName is a method on Metadata
GetName() string
// GetHelp is a method on Metadata
GetHelp() string
// GetType returns the prometheus type enum for this metric.
GetType() *prometheusgo.MetricType
// GetLabels is a method on Metadata
GetLabels() []*prometheusgo.LabelPair
// ToPrometheusMetric returns a filled-in prometheus metric of the right type
// for the given metric. It does not fill in labels.
// The implementation must return thread-safe data to the caller, i.e.
// usually a copy of internal state.
ToPrometheusMetric() *prometheusgo.Metric
}
// Metadata holds metadata about a metric. It must be embedded in
// each metric object.
type Metadata struct {
Name, Help string
labels []*prometheusgo.LabelPair
}
// GetName returns the metric's name.
func (m *Metadata) GetName() string {
return m.Name
}
// GetHelp returns the metric's help string.
func (m *Metadata) GetHelp() string {
return m.Help
}
// GetLabels returns the metric's labels.
func (m *Metadata) GetLabels() []*prometheusgo.LabelPair {
return m.labels
}
// AddLabel adds a label/value pair for this metric.
func (m *Metadata) AddLabel(name, value string) {
m.labels = append(m.labels,
&prometheusgo.LabelPair{
Name: proto.String(exportedLabel(name)),
Value: proto.String(value),
})
}
var _ Iterable = &Gauge{}
var _ Iterable = &GaugeFloat64{}
var _ Iterable = &Counter{}
var _ Iterable = &Histogram{}
var _ json.Marshaler = &Gauge{}
var _ json.Marshaler = &GaugeFloat64{}
var _ json.Marshaler = &Counter{}
var _ json.Marshaler = &Registry{}
var _ PrometheusExportable = &Gauge{}
var _ PrometheusExportable = &GaugeFloat64{}
var _ PrometheusExportable = &Counter{}
var _ PrometheusExportable = &Histogram{}
type periodic interface {
nextTick() time.Time
tick()
}
var _ periodic = &Rate{}
var now = timeutil.Now
// TestingSetNow changes the clock used by the metric system. For use by
// testing to precisely control the clock.
func TestingSetNow(f func() time.Time) func() {
origNow := now
now = f
return func() {
now = origNow
}
}
func cloneHistogram(in *hdrhistogram.Histogram) *hdrhistogram.Histogram {
return hdrhistogram.Import(in.Export())
}
func maybeTick(m periodic) {
for m.nextTick().Before(now()) {
m.tick()
}
}
// A Histogram collects observed values by keeping bucketed counts. For
// convenience, internally two sets of buckets are kept: A cumulative set (i.e.
// data is never evicted) and a windowed set (which keeps only recently
// collected samples).
//
// Top-level methods generally apply to the cumulative buckets; the windowed
// variant is exposed through the Windowed method.
type Histogram struct {
Metadata
maxVal int64
mu struct {
syncutil.Mutex
cumulative *hdrhistogram.Histogram
sliding *slidingHistogram
}
}
// NewHistogram initializes a given Histogram. The contained windowed histogram
// rotates every 'duration'; both the windowed and the cumulative histogram
// track nonnegative values up to 'maxVal' with 'sigFigs' decimal points of
// precision.
func NewHistogram(metadata Metadata, duration time.Duration, maxVal int64, sigFigs int) *Histogram {
dHist := newSlidingHistogram(duration, maxVal, sigFigs)
h := &Histogram{
Metadata: metadata,
maxVal: maxVal,
}
h.mu.cumulative = hdrhistogram.New(0, maxVal, sigFigs)
h.mu.sliding = dHist
return h
}
// NewLatency is a convenience function which returns a histogram with
// suitable defaults for latency tracking. Values are expressed in ns,
// are truncated into the interval [0, MaxLatency] and are recorded
// with one digit of precision (i.e. errors of <10ms at 100ms, <6s at 60s).
//
// The windowed portion of the Histogram retains values for approximately
// histogramWindow.
func NewLatency(metadata Metadata, histogramWindow time.Duration) *Histogram {
return NewHistogram(
metadata, histogramWindow, MaxLatency.Nanoseconds(), 1,
)
}
// Windowed returns a copy of the current windowed histogram data and its
// rotation interval.
func (h *Histogram) Windowed() (*hdrhistogram.Histogram, time.Duration) {
h.mu.Lock()
defer h.mu.Unlock()
return cloneHistogram(h.mu.sliding.Current()), h.mu.sliding.duration
}
// Snapshot returns a copy of the cumulative (i.e. all-time samples) histogram
// data.
func (h *Histogram) Snapshot() *hdrhistogram.Histogram {
h.mu.Lock()
defer h.mu.Unlock()
return cloneHistogram(h.mu.cumulative)
}
// RecordValue adds the given value to the histogram. Recording a value in
// excess of the configured maximum value for that histogram results in
// recording the maximum value instead.
func (h *Histogram) RecordValue(v int64) {
h.mu.Lock()
defer h.mu.Unlock()
if h.mu.sliding.RecordValue(v) != nil {
_ = h.mu.sliding.RecordValue(h.maxVal)
}
if h.mu.cumulative.RecordValue(v) != nil {
_ = h.mu.cumulative.RecordValue(h.maxVal)
}
}
// TotalCount returns the (cumulative) number of samples.
func (h *Histogram) TotalCount() int64 {
h.mu.Lock()
defer h.mu.Unlock()
return h.mu.cumulative.TotalCount()
}
// Min returns the minimum.
func (h *Histogram) Min() int64 {
h.mu.Lock()
defer h.mu.Unlock()
return h.mu.cumulative.Min()
}
// Inspect calls the closure with the empty string and the receiver.
func (h *Histogram) Inspect(f func(interface{})) {
h.mu.Lock()
maybeTick(h.mu.sliding)
h.mu.Unlock()
f(h)
}
// GetType returns the prometheus type enum for this metric.
func (h *Histogram) GetType() *prometheusgo.MetricType {
return prometheusgo.MetricType_HISTOGRAM.Enum()
}
// ToPrometheusMetric returns a filled-in prometheus metric of the right type.
func (h *Histogram) ToPrometheusMetric() *prometheusgo.Metric {
hist := &prometheusgo.Histogram{}
h.mu.Lock()
maybeTick(h.mu.sliding)
bars := h.mu.cumulative.Distribution()
hist.Bucket = make([]*prometheusgo.Bucket, 0, len(bars))
var cumCount uint64
var sum float64
for _, bar := range bars {
if bar.Count == 0 {
// No need to expose trivial buckets.
continue
}
upperBound := float64(bar.To)
sum += upperBound * float64(bar.Count)
cumCount += uint64(bar.Count)
curCumCount := cumCount // need a new alloc thanks to bad proto code
hist.Bucket = append(hist.Bucket, &prometheusgo.Bucket{
CumulativeCount: &curCumCount,
UpperBound: &upperBound,
})
}
hist.SampleCount = &cumCount
hist.SampleSum = &sum // can do better here; we approximate in the loop
h.mu.Unlock()
return &prometheusgo.Metric{
Histogram: hist,
}
}
// A Counter holds a single mutable atomic value.
type Counter struct {
Metadata
metrics.Counter
}
// NewCounter creates a counter.
func NewCounter(metadata Metadata) *Counter {
return &Counter{metadata, metrics.NewCounter()}
}
// GetType returns the prometheus type enum for this metric.
func (c *Counter) GetType() *prometheusgo.MetricType {
return prometheusgo.MetricType_COUNTER.Enum()
}
// Inspect calls the given closure with the empty string and itself.
func (c *Counter) Inspect(f func(interface{})) { f(c) }
// MarshalJSON marshals to JSON.
func (c *Counter) MarshalJSON() ([]byte, error) {
return json.Marshal(c.Counter.Count())
}
// ToPrometheusMetric returns a filled-in prometheus metric of the right type.
func (c *Counter) ToPrometheusMetric() *prometheusgo.Metric {
return &prometheusgo.Metric{
Counter: &prometheusgo.Counter{Value: proto.Float64(float64(c.Counter.Count()))},
}
}
// A Gauge atomically stores a single integer value.
type Gauge struct {
Metadata
value *int64
fn func() int64
}
// NewGauge creates a Gauge.
func NewGauge(metadata Metadata) *Gauge {
return &Gauge{metadata, new(int64), nil}
}
// NewFunctionalGauge creates a Gauge metric whose value is determined when
// asked for by calling the provided function.
// Note that Update, Inc, and Dec should NOT be called on a Gauge returned
// from NewFunctionalGauge.
func NewFunctionalGauge(metadata Metadata, f func() int64) *Gauge {
return &Gauge{metadata, nil, f}
}
// Snapshot returns a read-only copy of the gauge.
func (g *Gauge) Snapshot() metrics.Gauge {
return metrics.GaugeSnapshot(g.Value())
}
// Update updates the gauge's value.
func (g *Gauge) Update(v int64) {
atomic.StoreInt64(g.value, v)
}
// Value returns the gauge's current value.
func (g *Gauge) Value() int64 {
if g.fn != nil {
return g.fn()
}
return atomic.LoadInt64(g.value)
}
// Inc increments the gauge's value.
func (g *Gauge) Inc(i int64) {
atomic.AddInt64(g.value, i)
}
// Dec decrements the gauge's value.
func (g *Gauge) Dec(i int64) {
atomic.AddInt64(g.value, -i)
}
// GetType returns the prometheus type enum for this metric.
func (g *Gauge) GetType() *prometheusgo.MetricType {
return prometheusgo.MetricType_GAUGE.Enum()
}
// Inspect calls the given closure with the empty string and itself.
func (g *Gauge) Inspect(f func(interface{})) { f(g) }
// MarshalJSON marshals to JSON.
func (g *Gauge) MarshalJSON() ([]byte, error) {
return json.Marshal(g.Value())
}
// ToPrometheusMetric returns a filled-in prometheus metric of the right type.
func (g *Gauge) ToPrometheusMetric() *prometheusgo.Metric {
return &prometheusgo.Metric{
Gauge: &prometheusgo.Gauge{Value: proto.Float64(float64(g.Value()))},
}
}
// A GaugeFloat64 atomically stores a single float64 value.
type GaugeFloat64 struct {
Metadata
metrics.GaugeFloat64
}
// NewGaugeFloat64 creates a GaugeFloat64.
func NewGaugeFloat64(metadata Metadata) *GaugeFloat64 {
return &GaugeFloat64{metadata, metrics.NewGaugeFloat64()}
}
// GetType returns the prometheus type enum for this metric.
func (g *GaugeFloat64) GetType() *prometheusgo.MetricType {
return prometheusgo.MetricType_GAUGE.Enum()
}
// Inspect calls the given closure with the empty string and itself.
func (g *GaugeFloat64) Inspect(f func(interface{})) { f(g) }
// MarshalJSON marshals to JSON.
func (g *GaugeFloat64) MarshalJSON() ([]byte, error) {
return json.Marshal(g.Value())
}
// ToPrometheusMetric returns a filled-in prometheus metric of the right type.
func (g *GaugeFloat64) ToPrometheusMetric() *prometheusgo.Metric {
return &prometheusgo.Metric{
Gauge: &prometheusgo.Gauge{Value: proto.Float64(g.Value())},
}
}
// A Rate is a exponential weighted moving average.
type Rate struct {
mu syncutil.Mutex // protects fields below
curSum float64
wrapped ewma.MovingAverage
interval time.Duration
nextT time.Time
}
// NewRate creates an EWMA rate on the given timescale. Timescales at
// or below 2s are illegal and will cause a panic.
func NewRate(timescale time.Duration) *Rate {
const tickInterval = time.Second
if timescale <= 2*time.Second {
panic(fmt.Sprintf("EWMA with per-second ticks makes no sense on timescale %s", timescale))
}
avgAge := float64(timescale) / float64(2*tickInterval)
return &Rate{
interval: tickInterval,
nextT: now(),
wrapped: ewma.NewMovingAverage(avgAge),
}
}
// Value returns the current value of the Rate.
func (e *Rate) Value() float64 {
e.mu.Lock()
defer e.mu.Unlock()
maybeTick(e)
return e.wrapped.Value()
}
func (e *Rate) nextTick() time.Time {
return e.nextT
}
func (e *Rate) tick() {
e.nextT = e.nextT.Add(e.interval)
e.wrapped.Add(e.curSum)
e.curSum = 0
}
// Add adds the given measurement to the Rate.
func (e *Rate) Add(v float64) {
e.mu.Lock()
maybeTick(e)
e.curSum += v
e.mu.Unlock()
}