-
Notifications
You must be signed in to change notification settings - Fork 38.7k
/
metric_recorder.go
157 lines (134 loc) · 4.36 KB
/
metric_recorder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"time"
"k8s.io/component-base/metrics"
)
// MetricRecorder represents a metric recorder which takes action when the
// metric Inc(), Dec() and Clear()
type MetricRecorder interface {
Inc()
Dec()
Clear()
}
var _ MetricRecorder = &PendingPodsRecorder{}
// PendingPodsRecorder is an implementation of MetricRecorder
type PendingPodsRecorder struct {
recorder metrics.GaugeMetric
}
// NewActivePodsRecorder returns ActivePods in a Prometheus metric fashion
func NewActivePodsRecorder() *PendingPodsRecorder {
return &PendingPodsRecorder{
recorder: ActivePods(),
}
}
// NewUnschedulablePodsRecorder returns UnschedulablePods in a Prometheus metric fashion
func NewUnschedulablePodsRecorder() *PendingPodsRecorder {
return &PendingPodsRecorder{
recorder: UnschedulablePods(),
}
}
// NewBackoffPodsRecorder returns BackoffPods in a Prometheus metric fashion
func NewBackoffPodsRecorder() *PendingPodsRecorder {
return &PendingPodsRecorder{
recorder: BackoffPods(),
}
}
// NewGatedPodsRecorder returns GatedPods in a Prometheus metric fashion
func NewGatedPodsRecorder() *PendingPodsRecorder {
return &PendingPodsRecorder{
recorder: GatedPods(),
}
}
// Inc increases a metric counter by 1, in an atomic way
func (r *PendingPodsRecorder) Inc() {
r.recorder.Inc()
}
// Dec decreases a metric counter by 1, in an atomic way
func (r *PendingPodsRecorder) Dec() {
r.recorder.Dec()
}
// Clear set a metric counter to 0, in an atomic way
func (r *PendingPodsRecorder) Clear() {
r.recorder.Set(float64(0))
}
// metric is the data structure passed in the buffer channel between the main framework thread
// and the metricsRecorder goroutine.
type metric struct {
metric *metrics.HistogramVec
labelValues []string
value float64
}
// MetricAsyncRecorder records metric in a separate goroutine to avoid overhead in the critical path.
type MetricAsyncRecorder struct {
// bufferCh is a channel that serves as a metrics buffer before the metricsRecorder goroutine reports it.
bufferCh chan *metric
// if bufferSize is reached, incoming metrics will be discarded.
bufferSize int
// how often the recorder runs to flush the metrics.
interval time.Duration
// stopCh is used to stop the goroutine which periodically flushes metrics.
stopCh <-chan struct{}
// IsStoppedCh indicates whether the goroutine is stopped. It's used in tests only to make sure
// the metric flushing goroutine is stopped so that tests can collect metrics for verification.
IsStoppedCh chan struct{}
}
func NewMetricsAsyncRecorder(bufferSize int, interval time.Duration, stopCh <-chan struct{}) *MetricAsyncRecorder {
recorder := &MetricAsyncRecorder{
bufferCh: make(chan *metric, bufferSize),
bufferSize: bufferSize,
interval: interval,
stopCh: stopCh,
IsStoppedCh: make(chan struct{}),
}
go recorder.run()
return recorder
}
// ObservePluginDurationAsync observes the plugin_execution_duration_seconds metric.
// The metric will be flushed to Prometheus asynchronously.
func (r *MetricAsyncRecorder) ObservePluginDurationAsync(extensionPoint, pluginName, status string, value float64) {
newMetric := &metric{
metric: PluginExecutionDuration,
labelValues: []string{pluginName, extensionPoint, status},
value: value,
}
select {
case r.bufferCh <- newMetric:
default:
}
}
// run flushes buffered metrics into Prometheus every second.
func (r *MetricAsyncRecorder) run() {
for {
select {
case <-r.stopCh:
close(r.IsStoppedCh)
return
default:
}
r.FlushMetrics()
time.Sleep(r.interval)
}
}
// FlushMetrics tries to clean up the bufferCh by reading at most bufferSize metrics.
func (r *MetricAsyncRecorder) FlushMetrics() {
for i := 0; i < r.bufferSize; i++ {
select {
case m := <-r.bufferCh:
m.metric.WithLabelValues(m.labelValues...).Observe(m.value)
default:
return
}
}
}