Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

memory advisor plugins #143

Merged
merged 3 commits into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
168 changes: 8 additions & 160 deletions pkg/agent/evictionmanager/plugin/memory/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,16 @@ limitations under the License.
package memory

import (
"fmt"
"strconv"

v1 "k8s.io/api/core/v1"

"github.com/kubewharf/katalyst-core/pkg/config"
evictionconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic/adminqos/eviction"
"github.com/kubewharf/katalyst-core/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/metaserver"
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/helper"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/metric"
"github.com/kubewharf/katalyst-core/pkg/util/native"
)

Expand All @@ -52,16 +50,12 @@ const (
metricsNameThresholdMet = "threshold_met_count"
metricsNameNumaMetric = "numa_metric_raw"
metricsNameSystemMetric = "system_metric_raw"
metricsNameContainerMetric = "container_metric_raw"
metricsNamePodMetric = "pod_metric_raw"

metricsTagKeyEvictionScope = "eviction_scope"
metricsTagKeyDetectionLevel = "detection_level"
metricsTagKeyNumaID = "numa_id"
metricsTagKeyAction = "action"
metricsTagKeyMetricName = "metric_name"
metricsTagKeyPodUID = "pod_uid"
metricsTagKeyContainerName = "container_name"

metricsTagValueDetectionLevelNuma = "numa"
metricsTagValueDetectionLevelSystem = "system"
Expand All @@ -73,113 +67,24 @@ const (
)

const (
errMsgGetSystemMetrics = "failed to get system metric, metric name: %s, err: %v"
errMsgGetNumaMetrics = "failed to get numa metric, metric name: %s, numa id: %d, err: %v"
errMsgGetContainerNumaMetrics = "failed to get container numa metric, metric name: %s, pod uid: %s, container name: %s, numa id: %d, err: %v"
errMsgGetContainerSystemMetrics = "failed to get container system metric, metric name: %s, pod uid: %s, container name: %s, err: %v"
errMsgCheckReclaimedPodFailed = "failed to check reclaimed pod, pod: %s/%s, err: %v"
errMsgCheckReclaimedPodFailed = "failed to check reclaimed pod, pod: %s/%s, err: %v"
)

// EvictionHelper is a general tool collection for all memory eviction plugin
type EvictionHelper struct {
emitter metrics.MetricEmitter
metaServer *metaserver.MetaServer
emitter metrics.MetricEmitter
reclaimedPodFilter func(pod *v1.Pod) (bool, error)
}

func NewEvictionHelper(emitter metrics.MetricEmitter, metaServer *metaserver.MetaServer, conf *config.Configuration) *EvictionHelper {
return &EvictionHelper{
emitter: emitter,
metaServer: metaServer,
emitter: emitter,
reclaimedPodFilter: conf.CheckReclaimedQoSForPod,
}
}

// getWatermarkMetrics returns system-water mark related metrics (config)
// if numa node is specified, return config in this numa; otherwise return system-level config
func (e *EvictionHelper) getWatermarkMetrics(numaID int) (free, total, scaleFactor float64, err error) {
var m metric.MetricData
if numaID >= 0 {
m, err = e.metaServer.GetNumaMetric(numaID, consts.MetricMemFreeNuma)
if err != nil {
return 0, 0, 0, fmt.Errorf(errMsgGetNumaMetrics, consts.MetricMemFreeNuma, numaID, err)
}

free = m.Value
_ = e.emitter.StoreFloat64(metricsNameNumaMetric, free, metrics.MetricTypeNameRaw,
metrics.ConvertMapToTags(map[string]string{
metricsTagKeyNumaID: strconv.Itoa(numaID),
metricsTagKeyMetricName: consts.MetricMemFreeNuma,
})...)

m, err = e.metaServer.GetNumaMetric(numaID, consts.MetricMemTotalNuma)
if err != nil {
return 0, 0, 0, fmt.Errorf(errMsgGetNumaMetrics, consts.MetricMemTotalNuma, numaID, err)
}

total = m.Value
_ = e.emitter.StoreFloat64(metricsNameNumaMetric, total, metrics.MetricTypeNameRaw,
metrics.ConvertMapToTags(map[string]string{
metricsTagKeyNumaID: strconv.Itoa(numaID),
metricsTagKeyMetricName: consts.MetricMemTotalNuma,
})...)
} else {
m, err = e.metaServer.GetNodeMetric(consts.MetricMemFreeSystem)
if err != nil {
return 0, 0, 0, fmt.Errorf(errMsgGetSystemMetrics, consts.MetricMemFreeSystem, err)
}

free = m.Value
_ = e.emitter.StoreFloat64(metricsNameSystemMetric, free, metrics.MetricTypeNameRaw,
metrics.ConvertMapToTags(map[string]string{
metricsTagKeyMetricName: consts.MetricMemFreeSystem,
})...)

m, err = e.metaServer.GetNodeMetric(consts.MetricMemTotalSystem)
if err != nil {
return 0, 0, 0, fmt.Errorf(errMsgGetSystemMetrics, consts.MetricMemTotalSystem, err)
}

total = m.Value
_ = e.emitter.StoreFloat64(metricsNameSystemMetric, total, metrics.MetricTypeNameRaw,
metrics.ConvertMapToTags(map[string]string{
metricsTagKeyMetricName: consts.MetricMemTotalSystem,
})...)
}

m, err = e.metaServer.GetNodeMetric(consts.MetricMemScaleFactorSystem)
if err != nil {
return 0, 0, 0, fmt.Errorf(errMsgGetSystemMetrics, consts.MetricMemScaleFactorSystem, err)
}

scaleFactor = m.Value
_ = e.emitter.StoreFloat64(metricsNameSystemMetric, scaleFactor, metrics.MetricTypeNameRaw,
metrics.ConvertMapToTags(map[string]string{
metricsTagKeyMetricName: consts.MetricMemScaleFactorSystem,
})...)

return free, total, scaleFactor, nil
}

func (e *EvictionHelper) getSystemKswapdStealMetrics() (metric.MetricData, error) {
m, err := e.metaServer.GetNodeMetric(consts.MetricMemKswapdstealSystem)
if err != nil {
return m, fmt.Errorf(errMsgGetSystemMetrics, consts.MetricMemKswapdstealSystem, err)
}

if m.Time == nil {
return m, fmt.Errorf(errMsgGetSystemMetrics, consts.MetricMemKswapdstealSystem, fmt.Errorf("metric timestamp is nil"))
}

kswapdSteal := m.Value
_ = e.emitter.StoreFloat64(metricsNameSystemMetric, kswapdSteal, metrics.MetricTypeNameRaw,
metrics.ConvertMapToTags(map[string]string{
metricsTagKeyMetricName: consts.MetricMemKswapdstealSystem,
})...)

return m, nil
}

func (e *EvictionHelper) selectTopNPodsToEvictByMetrics(activePods []*v1.Pod, topN uint64, numaID,
action int, rankingMetrics []string, podToEvictMap map[string]*v1.Pod) {
filteredPods := e.filterPods(activePods, action)
Expand All @@ -206,8 +111,8 @@ func (e *EvictionHelper) filterPods(pods []*v1.Pod, action int) []*v1.Pod {
func (e *EvictionHelper) getEvictionCmpFuncs(rankingMetrics []string, numaID int) []general.CmpFunc {
cmpFuncs := make([]general.CmpFunc, 0, len(rankingMetrics))

for _, metric := range rankingMetrics {
currentMetric := metric
for _, m := range rankingMetrics {
currentMetric := m
cmpFuncs = append(cmpFuncs, func(s1, s2 interface{}) int {
p1, p2 := s1.(*v1.Pod), s2.(*v1.Pod)
switch currentMetric {
Expand All @@ -233,8 +138,8 @@ func (e *EvictionHelper) getEvictionCmpFuncs(rankingMetrics []string, numaID int
// prioritize evicting the pod whose priority is lower
return general.ReverseCmpFunc(native.PodPriorityCmpFunc)(p1, p2)
default:
p1Metric, p1Found := e.getPodMetric(p1, currentMetric, numaID)
p2Metric, p2Found := e.getPodMetric(p2, currentMetric, numaID)
p1Metric, p1Found := helper.GetPodMetric(e.metaServer.MetricsFetcher, e.emitter, p1, currentMetric, numaID)
p2Metric, p2Found := helper.GetPodMetric(e.metaServer.MetricsFetcher, e.emitter, p2, currentMetric, numaID)
if !p1Found || !p2Found {
_ = e.emitter.StoreInt64(metricsNameFetchMetricError, 1, metrics.MetricTypeNameCount,
metrics.ConvertMapToTags(map[string]string{
Expand All @@ -253,60 +158,3 @@ func (e *EvictionHelper) getEvictionCmpFuncs(rankingMetrics []string, numaID int
return cmpFuncs

}

// getPodMetric returns the value of a pod-level metric.
// And the value of a pod-level metric is calculated by summing the metric values for all containers in that pod.
func (e *EvictionHelper) getPodMetric(pod *v1.Pod, metricName string, numaID int) (float64, bool) {
if pod == nil {
return 0, false
}

var m metric.MetricData
var podMetricValue float64
for _, container := range pod.Spec.Containers {
var containerMetricValue float64
var err error
if numaID >= 0 {
m, err = e.metaServer.GetContainerNumaMetric(string(pod.UID), container.Name, strconv.Itoa(numaID), metricName)
if err != nil {
general.Errorf(errMsgGetContainerNumaMetrics, metricName, pod.UID, container.Name, numaID, err)
return 0, false
}

containerMetricValue = m.Value
_ = e.emitter.StoreFloat64(metricsNameContainerMetric, containerMetricValue, metrics.MetricTypeNameRaw,
metrics.ConvertMapToTags(map[string]string{
metricsTagKeyPodUID: string(pod.UID),
metricsTagKeyContainerName: container.Name,
metricsTagKeyNumaID: strconv.Itoa(numaID),
metricsTagKeyMetricName: metricName,
})...)
} else {
m, err = e.metaServer.GetContainerMetric(string(pod.UID), container.Name, metricName)
if err != nil {
general.Errorf(errMsgGetContainerSystemMetrics, metricName, pod.UID, container.Name, err)
return 0, false
}

containerMetricValue = m.Value
_ = e.emitter.StoreFloat64(metricsNameContainerMetric, containerMetricValue, metrics.MetricTypeNameRaw,
metrics.ConvertMapToTags(map[string]string{
metricsTagKeyPodUID: string(pod.UID),
metricsTagKeyContainerName: container.Name,
metricsTagKeyNumaID: strconv.Itoa(numaID),
metricsTagKeyMetricName: metricName,
})...)
}

podMetricValue += containerMetricValue
}

_ = e.emitter.StoreFloat64(metricsNamePodMetric, podMetricValue, metrics.MetricTypeNameRaw,
metrics.ConvertMapToTags(map[string]string{
metricsTagKeyPodUID: string(pod.UID),
metricsTagKeyNumaID: strconv.Itoa(numaID),
metricsTagKeyMetricName: metricName,
})...)

return podMetricValue, true
}
15 changes: 6 additions & 9 deletions pkg/agent/evictionmanager/plugin/memory/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

apiconsts "github.com/kubewharf/katalyst-api/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/metaserver"
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/general"
Expand All @@ -34,9 +35,8 @@ import (
"github.com/kubewharf/katalyst-core/pkg/util/native"
)

func makeHelper() (*EvictionHelper, error) {
func makeHelper(metaServer *metaserver.MetaServer) (*EvictionHelper, error) {
conf := makeConf()
metaServer := makeMetaServer()

cpuTopology, err := machine.GenerateDummyCPUTopology(16, 1, 2)
if err != nil {
Expand All @@ -48,23 +48,20 @@ func makeHelper() (*EvictionHelper, error) {
}
metaServer.MetricsFetcher = metric.NewFakeMetricsFetcher(metrics.DummyMetrics{})

return &EvictionHelper{
emitter: metrics.DummyMetrics{},
metaServer: metaServer,
reclaimedPodFilter: conf.QoSConfiguration.CheckReclaimedQoSForPod,
}, nil
return NewEvictionHelper(&metrics.DummyMetrics{}, metaServer, conf), nil
}

func TestEvictionHelper_getEvictionCmpFuncs(t *testing.T) {
t.Parallel()

helper, err := makeHelper()
metaServer := makeMetaServer()
helper, err := makeHelper(metaServer)
assert.NoError(t, err)
assert.NotNil(t, helper)

conf := makeConf()

fakeMetricsFetcher := helper.metaServer.MetricsFetcher.(*metric.FakeMetricsFetcher)
fakeMetricsFetcher := metaServer.MetricsFetcher.(*metric.FakeMetricsFetcher)
assert.NotNil(t, fakeMetricsFetcher)

pods := []*v1.Pod{
Expand Down
3 changes: 2 additions & 1 deletion pkg/agent/evictionmanager/plugin/memory/numa_pressure.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/kubewharf/katalyst-core/pkg/config"
"github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic"
"github.com/kubewharf/katalyst-core/pkg/metaserver"
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/helper"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/native"
Expand Down Expand Up @@ -123,7 +124,7 @@ func (n *NumaMemoryPressurePlugin) detectNumaPressures() {
}

func (n *NumaMemoryPressurePlugin) detectNumaWatermarkPressure(numaID int) error {
free, total, scaleFactor, err := n.evictionHelper.getWatermarkMetrics(numaID)
free, total, scaleFactor, err := helper.GetWatermarkMetrics(n.metaServer.MetricsFetcher, n.emitter, numaID)
if err != nil {
general.Errorf("failed to getWatermarkMetrics for numa %d, err: %v", numaID, err)
_ = n.emitter.StoreInt64(metricsNameFetchMetricError, 1, metrics.MetricTypeNameCount,
Expand Down
13 changes: 6 additions & 7 deletions pkg/agent/evictionmanager/plugin/memory/numa_pressure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,19 @@ var (
)

func makeNumaPressureEvictionPlugin(conf *config.Configuration) (*NumaMemoryPressurePlugin, error) {
metaServer := makeMetaServer()

plugin := NewNumaMemoryPressureEvictionPlugin(nil, nil, metaServer, metrics.DummyMetrics{}, conf)
res := plugin.(*NumaMemoryPressurePlugin)

cpuTopology, err := machine.GenerateDummyCPUTopology(16, 1, 2)
if err != nil {
return nil, err
}

res.metaServer.KatalystMachineInfo = &machine.KatalystMachineInfo{
metaServer := makeMetaServer()
metaServer.KatalystMachineInfo = &machine.KatalystMachineInfo{
CPUTopology: cpuTopology,
}
res.metaServer.MetricsFetcher = metric.NewFakeMetricsFetcher(metrics.DummyMetrics{})
metaServer.MetricsFetcher = metric.NewFakeMetricsFetcher(metrics.DummyMetrics{})

plugin := NewNumaMemoryPressureEvictionPlugin(nil, nil, metaServer, metrics.DummyMetrics{}, conf)
res := plugin.(*NumaMemoryPressurePlugin)

return res, nil
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/agent/evictionmanager/plugin/memory/rss_overuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/kubewharf/katalyst-core/pkg/config/generic"
"github.com/kubewharf/katalyst-core/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/metaserver"
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/helper"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/process"
Expand All @@ -57,7 +58,6 @@ func NewRssOveruseEvictionPlugin(_ *client.GenericClientSet, _ events.EventRecor
reclaimedPodFilter: conf.CheckReclaimedQoSForPod,
pluginName: EvictionPluginNameRssOveruse,
metaServer: metaServer,
evictionHelper: NewEvictionHelper(emitter, metaServer, conf),
supportedQosLevels: sets.NewString(apiconsts.PodAnnotationQoSLevelReclaimedCores, apiconsts.PodAnnotationQoSLevelSharedCores),

dynamicConfig: conf.DynamicAgentConfiguration,
Expand All @@ -77,7 +77,6 @@ type RssOveruseEvictionPlugin struct {
reclaimedPodFilter func(pod *v1.Pod) (bool, error)
pluginName string
metaServer *metaserver.MetaServer
evictionHelper *EvictionHelper
supportedQosLevels sets.String

dynamicConfig *dynamic.DynamicAgentConfiguration
Expand Down Expand Up @@ -164,7 +163,7 @@ func (r *RssOveruseEvictionPlugin) GetEvictPods(_ context.Context, request *plug
continue
}

podRss, found := r.evictionHelper.getPodMetric(pod, consts.MetricMemRssContainer, nonExistNumaID)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems like evictionHelper is useless in this plugin, maybe we can delete it.

podRss, found := helper.GetPodMetric(r.metaServer.MetricsFetcher, r.emitter, pod, consts.MetricMemRssContainer, nonExistNumaID)
if !found {
_ = r.emitter.StoreInt64(metricsNameFetchMetricError, 1, metrics.MetricTypeNameCount,
metrics.ConvertMapToTags(map[string]string{
Expand Down
5 changes: 3 additions & 2 deletions pkg/agent/evictionmanager/plugin/memory/system_pressure.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/kubewharf/katalyst-core/pkg/config"
"github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic"
"github.com/kubewharf/katalyst-core/pkg/metaserver"
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/helper"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/native"
Expand Down Expand Up @@ -160,7 +161,7 @@ func (s *SystemPressureEvictionPlugin) detectSystemPressures(_ context.Context)
}

func (s *SystemPressureEvictionPlugin) detectSystemWatermarkPressure() {
free, total, scaleFactor, err := s.evictionHelper.getWatermarkMetrics(nonExistNumaID)
free, total, scaleFactor, err := helper.GetWatermarkMetrics(s.metaServer.MetricsFetcher, s.emitter, nonExistNumaID)
if err != nil {
_ = s.emitter.StoreInt64(metricsNameFetchMetricError, 1, metrics.MetricTypeNameCount,
metrics.ConvertMapToTags(map[string]string{
Expand All @@ -181,7 +182,7 @@ func (s *SystemPressureEvictionPlugin) detectSystemWatermarkPressure() {
}

func (s *SystemPressureEvictionPlugin) detectSystemKswapdStealPressure() {
kswapdSteal, err := s.evictionHelper.getSystemKswapdStealMetrics()
kswapdSteal, err := helper.GetSystemKswapdStealMetrics(s.metaServer.MetricsFetcher, s.emitter)
if err != nil {
s.kswapdStealPreviousCycle = kswapdStealPreviousCycleMissing
s.kswapdStealPreviousCycleTime = time.Now()
Expand Down