/
resources.go
106 lines (90 loc) · 3.34 KB
/
resources.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package main
import (
"sync"
"github.com/openhistogram/circonusllhist"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/test-infra/prow/pjutil"
pod_scaler "github.com/openshift/ci-tools/pkg/pod-scaler"
)
func newResourceServer(loaders map[string][]*cacheReloader, health *pjutil.Health) *resourceServer {
logger := logrus.WithField("component", "pod-scaler request server")
server := &resourceServer{
logger: logger,
lock: sync.RWMutex{},
byMetaData: map[pod_scaler.FullMetadata]corev1.ResourceRequirements{},
}
digestAll(loaders, map[string]digester{
MetricNameCPUUsage: server.digestCPU,
MetricNameMemoryWorkingSet: server.digestMemory,
}, health, logger)
return server
}
type resourceServer struct {
logger *logrus.Entry
lock sync.RWMutex
// byMetaData caches resource requirements calculated for the full assortment of
// metadata labels.
byMetaData map[pod_scaler.FullMetadata]corev1.ResourceRequirements
}
const (
// cpuRequestQuantile is the quantile of CPU core usage data to use as the CPU request
cpuRequestQuantile = 0.8
)
func formatCPU() toQuantity {
return func(valueAtQuantile float64) *resource.Quantity {
return resource.NewMilliQuantity(int64(valueAtQuantile*1000), resource.DecimalSI)
}
}
func (s *resourceServer) digestCPU(data *pod_scaler.CachedQuery) {
s.logger.Debugf("Digesting new CPU consumption metrics.")
s.digestData(data, cpuRequestQuantile, corev1.ResourceCPU, formatCPU())
}
const (
// memRequestQuantile is the quantile of memory usage data to use as the memory request
memRequestQuantile = 0.8
)
func formatMemory() toQuantity {
return func(valueAtQuantile float64) *resource.Quantity {
return resource.NewQuantity(int64(valueAtQuantile), resource.BinarySI)
}
}
func (s *resourceServer) digestMemory(data *pod_scaler.CachedQuery) {
s.logger.Debugf("Digesting new memory consumption metrics.")
s.digestData(data, memRequestQuantile, corev1.ResourceMemory, formatMemory())
}
type toQuantity func(valueAtQuantile float64) (quantity *resource.Quantity)
func (s *resourceServer) digestData(data *pod_scaler.CachedQuery, quantile float64, request corev1.ResourceName, quantity toQuantity) {
logger := s.logger.WithField("resource", request)
logger.Debugf("Digesting %d identifiers.", len(data.DataByMetaData))
for meta, fingerprints := range data.DataByMetaData {
overall := circonusllhist.New()
metaLogger := logger.WithField("meta", meta)
metaLogger.Tracef("digesting %d fingerprints", len(fingerprints))
for _, fingerprint := range fingerprints {
overall.Merge(data.Data[fingerprint].Histogram())
}
metaLogger.Trace("merged all fingerprints")
valueAtQuantile := overall.ValueAtQuantile(quantile)
metaLogger.Trace("locking for value update")
s.lock.Lock()
if _, exists := s.byMetaData[meta]; !exists {
s.byMetaData[meta] = corev1.ResourceRequirements{
Requests: corev1.ResourceList{},
Limits: corev1.ResourceList{},
}
}
q := quantity(valueAtQuantile)
s.byMetaData[meta].Requests[request] = *q
metaLogger.Trace("unlocking for meta")
s.lock.Unlock()
}
logger.Debug("Finished digesting new data.")
}
func (s *resourceServer) recommendedRequestFor(meta pod_scaler.FullMetadata) (corev1.ResourceRequirements, bool) {
s.lock.RLock()
defer s.lock.RUnlock()
data, ok := s.byMetaData[meta]
return data, ok
}