Skip to content

Commit

Permalink
memory advisor plugins (#143)
Browse files Browse the repository at this point in the history
* feat(sysadvisor): introduce memadvisor plugin cache_reaper

Signed-off-by: linzhecheng <linzhecheng@bytedance.com>

* feat(sysadvisor): introduce memory guard plugin

Signed-off-by: linzhecheng <linzhecheng@bytedance.com>

* feat(sysadvisor): introduce memset binder

Signed-off-by: linzhecheng <linzhecheng@bytedance.com>

---------

Signed-off-by: linzhecheng <linzhecheng@bytedance.com>
  • Loading branch information
cheney-lin committed Jul 18, 2023
1 parent 20bae0a commit d408f67
Show file tree
Hide file tree
Showing 18 changed files with 1,184 additions and 203 deletions.
168 changes: 8 additions & 160 deletions pkg/agent/evictionmanager/plugin/memory/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,16 @@ limitations under the License.
package memory

import (
"fmt"
"strconv"

v1 "k8s.io/api/core/v1"

"github.com/kubewharf/katalyst-core/pkg/config"
evictionconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic/adminqos/eviction"
"github.com/kubewharf/katalyst-core/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/metaserver"
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/helper"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/metric"
"github.com/kubewharf/katalyst-core/pkg/util/native"
)

Expand All @@ -52,16 +50,12 @@ const (
metricsNameThresholdMet = "threshold_met_count"
metricsNameNumaMetric = "numa_metric_raw"
metricsNameSystemMetric = "system_metric_raw"
metricsNameContainerMetric = "container_metric_raw"
metricsNamePodMetric = "pod_metric_raw"

metricsTagKeyEvictionScope = "eviction_scope"
metricsTagKeyDetectionLevel = "detection_level"
metricsTagKeyNumaID = "numa_id"
metricsTagKeyAction = "action"
metricsTagKeyMetricName = "metric_name"
metricsTagKeyPodUID = "pod_uid"
metricsTagKeyContainerName = "container_name"

metricsTagValueDetectionLevelNuma = "numa"
metricsTagValueDetectionLevelSystem = "system"
Expand All @@ -73,113 +67,24 @@ const (
)

const (
errMsgGetSystemMetrics = "failed to get system metric, metric name: %s, err: %v"
errMsgGetNumaMetrics = "failed to get numa metric, metric name: %s, numa id: %d, err: %v"
errMsgGetContainerNumaMetrics = "failed to get container numa metric, metric name: %s, pod uid: %s, container name: %s, numa id: %d, err: %v"
errMsgGetContainerSystemMetrics = "failed to get container system metric, metric name: %s, pod uid: %s, container name: %s, err: %v"
errMsgCheckReclaimedPodFailed = "failed to check reclaimed pod, pod: %s/%s, err: %v"
errMsgCheckReclaimedPodFailed = "failed to check reclaimed pod, pod: %s/%s, err: %v"
)

// EvictionHelper is a general tool collection for all memory eviction plugin
type EvictionHelper struct {
emitter metrics.MetricEmitter
metaServer *metaserver.MetaServer
emitter metrics.MetricEmitter
reclaimedPodFilter func(pod *v1.Pod) (bool, error)
}

func NewEvictionHelper(emitter metrics.MetricEmitter, metaServer *metaserver.MetaServer, conf *config.Configuration) *EvictionHelper {
return &EvictionHelper{
emitter: emitter,
metaServer: metaServer,
emitter: emitter,
reclaimedPodFilter: conf.CheckReclaimedQoSForPod,
}
}

// getWatermarkMetrics returns system-water mark related metrics (config)
// if numa node is specified, return config in this numa; otherwise return system-level config
func (e *EvictionHelper) getWatermarkMetrics(numaID int) (free, total, scaleFactor float64, err error) {
var m metric.MetricData
if numaID >= 0 {
m, err = e.metaServer.GetNumaMetric(numaID, consts.MetricMemFreeNuma)
if err != nil {
return 0, 0, 0, fmt.Errorf(errMsgGetNumaMetrics, consts.MetricMemFreeNuma, numaID, err)
}

free = m.Value
_ = e.emitter.StoreFloat64(metricsNameNumaMetric, free, metrics.MetricTypeNameRaw,
metrics.ConvertMapToTags(map[string]string{
metricsTagKeyNumaID: strconv.Itoa(numaID),
metricsTagKeyMetricName: consts.MetricMemFreeNuma,
})...)

m, err = e.metaServer.GetNumaMetric(numaID, consts.MetricMemTotalNuma)
if err != nil {
return 0, 0, 0, fmt.Errorf(errMsgGetNumaMetrics, consts.MetricMemTotalNuma, numaID, err)
}

total = m.Value
_ = e.emitter.StoreFloat64(metricsNameNumaMetric, total, metrics.MetricTypeNameRaw,
metrics.ConvertMapToTags(map[string]string{
metricsTagKeyNumaID: strconv.Itoa(numaID),
metricsTagKeyMetricName: consts.MetricMemTotalNuma,
})...)
} else {
m, err = e.metaServer.GetNodeMetric(consts.MetricMemFreeSystem)
if err != nil {
return 0, 0, 0, fmt.Errorf(errMsgGetSystemMetrics, consts.MetricMemFreeSystem, err)
}

free = m.Value
_ = e.emitter.StoreFloat64(metricsNameSystemMetric, free, metrics.MetricTypeNameRaw,
metrics.ConvertMapToTags(map[string]string{
metricsTagKeyMetricName: consts.MetricMemFreeSystem,
})...)

m, err = e.metaServer.GetNodeMetric(consts.MetricMemTotalSystem)
if err != nil {
return 0, 0, 0, fmt.Errorf(errMsgGetSystemMetrics, consts.MetricMemTotalSystem, err)
}

total = m.Value
_ = e.emitter.StoreFloat64(metricsNameSystemMetric, total, metrics.MetricTypeNameRaw,
metrics.ConvertMapToTags(map[string]string{
metricsTagKeyMetricName: consts.MetricMemTotalSystem,
})...)
}

m, err = e.metaServer.GetNodeMetric(consts.MetricMemScaleFactorSystem)
if err != nil {
return 0, 0, 0, fmt.Errorf(errMsgGetSystemMetrics, consts.MetricMemScaleFactorSystem, err)
}

scaleFactor = m.Value
_ = e.emitter.StoreFloat64(metricsNameSystemMetric, scaleFactor, metrics.MetricTypeNameRaw,
metrics.ConvertMapToTags(map[string]string{
metricsTagKeyMetricName: consts.MetricMemScaleFactorSystem,
})...)

return free, total, scaleFactor, nil
}

func (e *EvictionHelper) getSystemKswapdStealMetrics() (metric.MetricData, error) {
m, err := e.metaServer.GetNodeMetric(consts.MetricMemKswapdstealSystem)
if err != nil {
return m, fmt.Errorf(errMsgGetSystemMetrics, consts.MetricMemKswapdstealSystem, err)
}

if m.Time == nil {
return m, fmt.Errorf(errMsgGetSystemMetrics, consts.MetricMemKswapdstealSystem, fmt.Errorf("metric timestamp is nil"))
}

kswapdSteal := m.Value
_ = e.emitter.StoreFloat64(metricsNameSystemMetric, kswapdSteal, metrics.MetricTypeNameRaw,
metrics.ConvertMapToTags(map[string]string{
metricsTagKeyMetricName: consts.MetricMemKswapdstealSystem,
})...)

return m, nil
}

func (e *EvictionHelper) selectTopNPodsToEvictByMetrics(activePods []*v1.Pod, topN uint64, numaID,
action int, rankingMetrics []string, podToEvictMap map[string]*v1.Pod) {
filteredPods := e.filterPods(activePods, action)
Expand All @@ -206,8 +111,8 @@ func (e *EvictionHelper) filterPods(pods []*v1.Pod, action int) []*v1.Pod {
func (e *EvictionHelper) getEvictionCmpFuncs(rankingMetrics []string, numaID int) []general.CmpFunc {
cmpFuncs := make([]general.CmpFunc, 0, len(rankingMetrics))

for _, metric := range rankingMetrics {
currentMetric := metric
for _, m := range rankingMetrics {
currentMetric := m
cmpFuncs = append(cmpFuncs, func(s1, s2 interface{}) int {
p1, p2 := s1.(*v1.Pod), s2.(*v1.Pod)
switch currentMetric {
Expand All @@ -233,8 +138,8 @@ func (e *EvictionHelper) getEvictionCmpFuncs(rankingMetrics []string, numaID int
// prioritize evicting the pod whose priority is lower
return general.ReverseCmpFunc(native.PodPriorityCmpFunc)(p1, p2)
default:
p1Metric, p1Found := e.getPodMetric(p1, currentMetric, numaID)
p2Metric, p2Found := e.getPodMetric(p2, currentMetric, numaID)
p1Metric, p1Found := helper.GetPodMetric(e.metaServer.MetricsFetcher, e.emitter, p1, currentMetric, numaID)
p2Metric, p2Found := helper.GetPodMetric(e.metaServer.MetricsFetcher, e.emitter, p2, currentMetric, numaID)
if !p1Found || !p2Found {
_ = e.emitter.StoreInt64(metricsNameFetchMetricError, 1, metrics.MetricTypeNameCount,
metrics.ConvertMapToTags(map[string]string{
Expand All @@ -253,60 +158,3 @@ func (e *EvictionHelper) getEvictionCmpFuncs(rankingMetrics []string, numaID int
return cmpFuncs

}

// getPodMetric returns the value of a pod-level metric.
// And the value of a pod-level metric is calculated by summing the metric values for all containers in that pod.
func (e *EvictionHelper) getPodMetric(pod *v1.Pod, metricName string, numaID int) (float64, bool) {
if pod == nil {
return 0, false
}

var m metric.MetricData
var podMetricValue float64
for _, container := range pod.Spec.Containers {
var containerMetricValue float64
var err error
if numaID >= 0 {
m, err = e.metaServer.GetContainerNumaMetric(string(pod.UID), container.Name, strconv.Itoa(numaID), metricName)
if err != nil {
general.Errorf(errMsgGetContainerNumaMetrics, metricName, pod.UID, container.Name, numaID, err)
return 0, false
}

containerMetricValue = m.Value
_ = e.emitter.StoreFloat64(metricsNameContainerMetric, containerMetricValue, metrics.MetricTypeNameRaw,
metrics.ConvertMapToTags(map[string]string{
metricsTagKeyPodUID: string(pod.UID),
metricsTagKeyContainerName: container.Name,
metricsTagKeyNumaID: strconv.Itoa(numaID),
metricsTagKeyMetricName: metricName,
})...)
} else {
m, err = e.metaServer.GetContainerMetric(string(pod.UID), container.Name, metricName)
if err != nil {
general.Errorf(errMsgGetContainerSystemMetrics, metricName, pod.UID, container.Name, err)
return 0, false
}

containerMetricValue = m.Value
_ = e.emitter.StoreFloat64(metricsNameContainerMetric, containerMetricValue, metrics.MetricTypeNameRaw,
metrics.ConvertMapToTags(map[string]string{
metricsTagKeyPodUID: string(pod.UID),
metricsTagKeyContainerName: container.Name,
metricsTagKeyNumaID: strconv.Itoa(numaID),
metricsTagKeyMetricName: metricName,
})...)
}

podMetricValue += containerMetricValue
}

_ = e.emitter.StoreFloat64(metricsNamePodMetric, podMetricValue, metrics.MetricTypeNameRaw,
metrics.ConvertMapToTags(map[string]string{
metricsTagKeyPodUID: string(pod.UID),
metricsTagKeyNumaID: strconv.Itoa(numaID),
metricsTagKeyMetricName: metricName,
})...)

return podMetricValue, true
}
15 changes: 6 additions & 9 deletions pkg/agent/evictionmanager/plugin/memory/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

apiconsts "github.com/kubewharf/katalyst-api/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/metaserver"
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/general"
Expand All @@ -34,9 +35,8 @@ import (
"github.com/kubewharf/katalyst-core/pkg/util/native"
)

func makeHelper() (*EvictionHelper, error) {
func makeHelper(metaServer *metaserver.MetaServer) (*EvictionHelper, error) {
conf := makeConf()
metaServer := makeMetaServer()

cpuTopology, err := machine.GenerateDummyCPUTopology(16, 1, 2)
if err != nil {
Expand All @@ -48,23 +48,20 @@ func makeHelper() (*EvictionHelper, error) {
}
metaServer.MetricsFetcher = metric.NewFakeMetricsFetcher(metrics.DummyMetrics{})

return &EvictionHelper{
emitter: metrics.DummyMetrics{},
metaServer: metaServer,
reclaimedPodFilter: conf.QoSConfiguration.CheckReclaimedQoSForPod,
}, nil
return NewEvictionHelper(&metrics.DummyMetrics{}, metaServer, conf), nil
}

func TestEvictionHelper_getEvictionCmpFuncs(t *testing.T) {
t.Parallel()

helper, err := makeHelper()
metaServer := makeMetaServer()
helper, err := makeHelper(metaServer)
assert.NoError(t, err)
assert.NotNil(t, helper)

conf := makeConf()

fakeMetricsFetcher := helper.metaServer.MetricsFetcher.(*metric.FakeMetricsFetcher)
fakeMetricsFetcher := metaServer.MetricsFetcher.(*metric.FakeMetricsFetcher)
assert.NotNil(t, fakeMetricsFetcher)

pods := []*v1.Pod{
Expand Down
3 changes: 2 additions & 1 deletion pkg/agent/evictionmanager/plugin/memory/numa_pressure.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/kubewharf/katalyst-core/pkg/config"
"github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic"
"github.com/kubewharf/katalyst-core/pkg/metaserver"
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/helper"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/native"
Expand Down Expand Up @@ -123,7 +124,7 @@ func (n *NumaMemoryPressurePlugin) detectNumaPressures() {
}

func (n *NumaMemoryPressurePlugin) detectNumaWatermarkPressure(numaID int) error {
free, total, scaleFactor, err := n.evictionHelper.getWatermarkMetrics(numaID)
free, total, scaleFactor, err := helper.GetWatermarkMetrics(n.metaServer.MetricsFetcher, n.emitter, numaID)
if err != nil {
general.Errorf("failed to getWatermarkMetrics for numa %d, err: %v", numaID, err)
_ = n.emitter.StoreInt64(metricsNameFetchMetricError, 1, metrics.MetricTypeNameCount,
Expand Down
13 changes: 6 additions & 7 deletions pkg/agent/evictionmanager/plugin/memory/numa_pressure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,19 @@ var (
)

func makeNumaPressureEvictionPlugin(conf *config.Configuration) (*NumaMemoryPressurePlugin, error) {
metaServer := makeMetaServer()

plugin := NewNumaMemoryPressureEvictionPlugin(nil, nil, metaServer, metrics.DummyMetrics{}, conf)
res := plugin.(*NumaMemoryPressurePlugin)

cpuTopology, err := machine.GenerateDummyCPUTopology(16, 1, 2)
if err != nil {
return nil, err
}

res.metaServer.KatalystMachineInfo = &machine.KatalystMachineInfo{
metaServer := makeMetaServer()
metaServer.KatalystMachineInfo = &machine.KatalystMachineInfo{
CPUTopology: cpuTopology,
}
res.metaServer.MetricsFetcher = metric.NewFakeMetricsFetcher(metrics.DummyMetrics{})
metaServer.MetricsFetcher = metric.NewFakeMetricsFetcher(metrics.DummyMetrics{})

plugin := NewNumaMemoryPressureEvictionPlugin(nil, nil, metaServer, metrics.DummyMetrics{}, conf)
res := plugin.(*NumaMemoryPressurePlugin)

return res, nil
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/agent/evictionmanager/plugin/memory/rss_overuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/kubewharf/katalyst-core/pkg/config/generic"
"github.com/kubewharf/katalyst-core/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/metaserver"
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/helper"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/process"
Expand All @@ -57,7 +58,6 @@ func NewRssOveruseEvictionPlugin(_ *client.GenericClientSet, _ events.EventRecor
reclaimedPodFilter: conf.CheckReclaimedQoSForPod,
pluginName: EvictionPluginNameRssOveruse,
metaServer: metaServer,
evictionHelper: NewEvictionHelper(emitter, metaServer, conf),
supportedQosLevels: sets.NewString(apiconsts.PodAnnotationQoSLevelReclaimedCores, apiconsts.PodAnnotationQoSLevelSharedCores),

dynamicConfig: conf.DynamicAgentConfiguration,
Expand All @@ -77,7 +77,6 @@ type RssOveruseEvictionPlugin struct {
reclaimedPodFilter func(pod *v1.Pod) (bool, error)
pluginName string
metaServer *metaserver.MetaServer
evictionHelper *EvictionHelper
supportedQosLevels sets.String

dynamicConfig *dynamic.DynamicAgentConfiguration
Expand Down Expand Up @@ -164,7 +163,7 @@ func (r *RssOveruseEvictionPlugin) GetEvictPods(_ context.Context, request *plug
continue
}

podRss, found := r.evictionHelper.getPodMetric(pod, consts.MetricMemRssContainer, nonExistNumaID)
podRss, found := helper.GetPodMetric(r.metaServer.MetricsFetcher, r.emitter, pod, consts.MetricMemRssContainer, nonExistNumaID)
if !found {
_ = r.emitter.StoreInt64(metricsNameFetchMetricError, 1, metrics.MetricTypeNameCount,
metrics.ConvertMapToTags(map[string]string{
Expand Down
5 changes: 3 additions & 2 deletions pkg/agent/evictionmanager/plugin/memory/system_pressure.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/kubewharf/katalyst-core/pkg/config"
"github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic"
"github.com/kubewharf/katalyst-core/pkg/metaserver"
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/helper"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/native"
Expand Down Expand Up @@ -160,7 +161,7 @@ func (s *SystemPressureEvictionPlugin) detectSystemPressures(_ context.Context)
}

func (s *SystemPressureEvictionPlugin) detectSystemWatermarkPressure() {
free, total, scaleFactor, err := s.evictionHelper.getWatermarkMetrics(nonExistNumaID)
free, total, scaleFactor, err := helper.GetWatermarkMetrics(s.metaServer.MetricsFetcher, s.emitter, nonExistNumaID)
if err != nil {
_ = s.emitter.StoreInt64(metricsNameFetchMetricError, 1, metrics.MetricTypeNameCount,
metrics.ConvertMapToTags(map[string]string{
Expand All @@ -181,7 +182,7 @@ func (s *SystemPressureEvictionPlugin) detectSystemWatermarkPressure() {
}

func (s *SystemPressureEvictionPlugin) detectSystemKswapdStealPressure() {
kswapdSteal, err := s.evictionHelper.getSystemKswapdStealMetrics()
kswapdSteal, err := helper.GetSystemKswapdStealMetrics(s.metaServer.MetricsFetcher, s.emitter)
if err != nil {
s.kswapdStealPreviousCycle = kswapdStealPreviousCycleMissing
s.kswapdStealPreviousCycleTime = time.Now()
Expand Down

0 comments on commit d408f67

Please sign in to comment.