/
registry.go
118 lines (98 loc) · 2.64 KB
/
registry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package internal
import (
"context"
"sync"
"github.com/signalfx/golib/datapoint"
"github.com/signalfx/golib/sfxclient"
"k8s.io/klog"
"sigs.k8s.io/custom-metrics-apiserver/pkg/provider"
)
type Registry struct {
sync.RWMutex
jobRunner *SignalFlowJobRunner
metricsByKey map[string]*HPAMetric
}
func NewRegistry(jobRunner *SignalFlowJobRunner) *Registry {
return &Registry{
jobRunner: jobRunner,
metricsByKey: make(map[string]*HPAMetric),
}
}
func (r *Registry) HandleHPAUpdated(ctx context.Context, updatedMetrics []HPAMetric) {
r.Lock()
defer r.Unlock()
for i := range updatedMetrics {
m := updatedMetrics[i]
r.metricsByKey[m.Key()] = &m
err := r.jobRunner.ReplaceOrStartJob(ctx, m.SignalFlowProgram())
if err != nil {
klog.Errorf("failed to start SignalFlow computation (%s): %v", m.SignalFlowProgram(), err)
}
}
}
func (r *Registry) HandleHPADeleted(ctx context.Context, deletedMetrics []HPAMetric) {
r.Lock()
defer r.Unlock()
for _, m := range deletedMetrics {
delete(r.metricsByKey, m.Key())
r.jobRunner.StopJob(m.SignalFlowProgram())
}
}
func (r *Registry) AllCustomMetrics() []provider.CustomMetricInfo {
r.RLock()
defer r.RUnlock()
// Ensure uniqueness of the metric infos
outSet := map[provider.CustomMetricInfo]bool{}
for _, metric := range r.metricsByKey {
if metric.IsExternal() {
continue
}
outSet[provider.CustomMetricInfo{
GroupResource: metric.GroupResource,
Namespaced: metric.Namespace != "",
Metric: metric.Metric,
}] = true
}
var out []provider.CustomMetricInfo
for k := range outSet {
out = append(out, k)
}
return out
}
func (r *Registry) AllExternalMetrics() []provider.ExternalMetricInfo {
r.RLock()
defer r.RUnlock()
metricsSeen := map[string]bool{}
var out []provider.ExternalMetricInfo
for _, metric := range r.metricsByKey {
if !metric.IsExternal() {
continue
}
if metricsSeen[metric.Metric] {
continue
}
out = append(out, provider.ExternalMetricInfo{
Metric: metric.Metric,
})
metricsSeen[metric.Metric] = true
}
return out
}
func (r *Registry) HPAMetricMatchingKey(m *HPAMetric) *HPAMetric {
r.RLock()
defer r.RUnlock()
return r.metricsByKey[m.Key()]
}
func (r *Registry) LatestSnapshot(m *HPAMetric) (MetricSnapshot, error) {
return r.jobRunner.LatestSnapshot(m)
}
func (r *Registry) InternalMetrics() []*datapoint.Datapoint {
externals := r.AllExternalMetrics()
customs := r.AllCustomMetrics()
out := []*datapoint.Datapoint{
sfxclient.Gauge("custom_metrics", nil, int64(len(customs))),
sfxclient.Gauge("external_metrics", nil, int64(len(externals))),
}
out = append(out, r.jobRunner.InternalMetrics()...)
return out
}