-
Notifications
You must be signed in to change notification settings - Fork 91
/
resource.go
130 lines (108 loc) · 5.21 KB
/
resource.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
/*
Copyright 2022 The Katalyst Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resource
import (
"context"
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/metacache"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/qosaware/resource/cpu"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/qosaware/resource/memory"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types"
"github.com/kubewharf/katalyst-core/pkg/config"
"github.com/kubewharf/katalyst-core/pkg/metaserver"
"github.com/kubewharf/katalyst-core/pkg/metrics"
)
// ResourceAdvisor is a wrapper of different sub resource advisors. It can be registered to
// headroom reporter to give designated resource headroom quantity based on provision result.
type ResourceAdvisor interface {
// Run starts all sub resource advisors
Run(ctx context.Context)
// GetSubAdvisor returns the corresponding sub advisor according to resource name
GetSubAdvisor(resourceName types.QoSResourceName) (SubResourceAdvisor, error)
// GetHeadroom returns the corresponding headroom quantity according to resource name
GetHeadroom(resourceName v1.ResourceName) (resource.Quantity, error)
}
// SubResourceAdvisor updates resource provision of a certain dimension based on the latest
// system and workload snapshot(s), and returns provision advice or resource headroom quantity.
// It should push updated results to the corresponding qrm server.
type SubResourceAdvisor interface {
// Run starts resource provision update based on the latest system and workload snapshot(s)
Run(ctx context.Context)
// GetChannels returns two channels. The first one receives update trigger from qrm server.
// The other one sends the latest internal calculation result to qrm server.
GetChannels() (interface{}, interface{})
// GetHeadroom returns the latest resource headroom quantity for resource reporter
GetHeadroom() (resource.Quantity, error)
}
type resourceAdvisorWrapper struct {
subAdvisorsToRun map[types.QoSResourceName]SubResourceAdvisor
}
// NewResourceAdvisor returns a resource advisor wrapper instance, initializing all required
// sub resource advisor according to config
func NewResourceAdvisor(conf *config.Configuration, extraConf interface{}, metaCache metacache.MetaCache,
metaServer *metaserver.MetaServer, emitter metrics.MetricEmitter) (ResourceAdvisor, error) {
resourceAdvisor := resourceAdvisorWrapper{
subAdvisorsToRun: make(map[types.QoSResourceName]SubResourceAdvisor),
}
for _, resourceNameStr := range conf.ResourceAdvisors {
resourceName := types.QoSResourceName(resourceNameStr)
subAdvisor, err := NewSubResourceAdvisor(resourceName, conf, extraConf, metaCache, metaServer, emitter)
if err != nil {
return nil, fmt.Errorf("new sub resource advisor for %v failed: %v", resourceName, err)
}
resourceAdvisor.subAdvisorsToRun[resourceName] = subAdvisor
}
return &resourceAdvisor, nil
}
// NewSubResourceAdvisor returns a corresponding advisor according to resource name
func NewSubResourceAdvisor(resourceName types.QoSResourceName, conf *config.Configuration, extraConf interface{},
metaCache metacache.MetaCache, metaServer *metaserver.MetaServer, emitter metrics.MetricEmitter) (SubResourceAdvisor, error) {
switch resourceName {
case types.QoSResourceCPU:
return cpu.NewCPUResourceAdvisor(conf, extraConf, metaCache, metaServer, emitter), nil
case types.QoSResourceMemory:
return memory.NewMemoryResourceAdvisor(conf, extraConf, metaCache, metaServer, emitter), nil
default:
return nil, fmt.Errorf("try to new sub resource advisor for unsupported resource %v", resourceName)
}
}
func (ra *resourceAdvisorWrapper) Run(ctx context.Context) {
for _, subAdvisor := range ra.subAdvisorsToRun {
go subAdvisor.Run(ctx)
}
}
func (ra *resourceAdvisorWrapper) GetSubAdvisor(resourceName types.QoSResourceName) (SubResourceAdvisor, error) {
if subAdvisor, ok := ra.subAdvisorsToRun[resourceName]; ok {
return subAdvisor, nil
}
return nil, fmt.Errorf("no sub resource advisor for %v", resourceName)
}
func (ra *resourceAdvisorWrapper) GetHeadroom(resourceName v1.ResourceName) (resource.Quantity, error) {
switch resourceName {
case v1.ResourceCPU:
return ra.getSubAdvisorHeadroom(types.QoSResourceCPU)
case v1.ResourceMemory:
return ra.getSubAdvisorHeadroom(types.QoSResourceMemory)
default:
return resource.Quantity{}, fmt.Errorf("illegal resource %v", resourceName)
}
}
func (ra *resourceAdvisorWrapper) getSubAdvisorHeadroom(resourceName types.QoSResourceName) (resource.Quantity, error) {
subAdvisor, ok := ra.subAdvisorsToRun[resourceName]
if !ok {
return resource.Quantity{}, fmt.Errorf("no sub resource advisor for %v", resourceName)
}
return subAdvisor.GetHeadroom()
}