Skip to content

Commit

Permalink
koordlet: Other qos features support cgroups v2 (#898)
Browse files Browse the repository at this point in the history
Signed-off-by: saintube <saintube@foxmail.com>
  • Loading branch information
saintube committed Dec 26, 2022
1 parent 9403df4 commit 26d27bf
Show file tree
Hide file tree
Showing 26 changed files with 1,572 additions and 598 deletions.
41 changes: 23 additions & 18 deletions pkg/koordlet/metricsadvisor/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,26 +297,31 @@ func (c *collector) collectContainerResUsed(meta *statesinformer.PodMeta) {
for i := range pod.Status.ContainerStatuses {
containerStat := &pod.Status.ContainerStatuses[i]
collectTime := time.Now()
if len(containerStat.ContainerID) == 0 {
klog.V(5).Infof("container %s/%s/%s id is empty, maybe not ready, skip this round",
pod.Namespace, pod.Name, containerStat.Name)
continue
}

containerCgroupDir, err := koordletutil.GetContainerCgroupPathWithKube(meta.CgroupDir, containerStat)
if err != nil {
// higher verbosity for probably non-running pods
if containerStat.State.Running == nil {
klog.V(6).Infof("failed to collect non-running container usage for %s/%s/%s, err: %s",
pod.Namespace, pod.Name, containerStat.Name, err)
} else {
klog.V(4).Infof("failed to collect container usage for %s/%s/%s, err: %s",
pod.Namespace, pod.Name, containerStat.Name, err)
}
klog.V(4).Infof("failed to collect container usage for %s/%s/%s, cannot get container cgroup, err: %s",
pod.Namespace, pod.Name, containerStat.Name, err)
continue
}

currentCPUUsage, err0 := c.cgroupReader.ReadCPUAcctUsage(containerCgroupDir)
memStat, err1 := c.cgroupReader.ReadMemoryStat(containerCgroupDir)

if err0 != nil || err1 != nil {
klog.V(4).Infof("failed to collect container usage for %s/%s/%s, CPU err: %s, Memory err: %s",
pod.Namespace, pod.Name, containerStat.Name, err0, err1)
// higher verbosity for probably non-running pods
if containerStat.State.Running == nil {
klog.V(6).Infof("failed to collect non-running container usage for %s/%s/%s, CPU err: %s, Memory err: %s",
pod.Namespace, pod.Name, containerStat.Name, err0, err1)
} else {
klog.V(4).Infof("failed to collect container usage for %s/%s/%s, CPU err: %s, Memory err: %s",
pod.Namespace, pod.Name, containerStat.Name, err0, err1)
}
continue
}

Expand Down Expand Up @@ -442,12 +447,19 @@ func (c *collector) collectContainerThrottledInfo(podMeta *statesinformer.PodMet
collectTime := time.Now()
containerStat := &pod.Status.ContainerStatuses[i]
if len(containerStat.ContainerID) == 0 {
klog.V(4).Infof("container %s/%s/%s id is empty, maybe not ready, skip this round",
klog.V(5).Infof("container %s/%s/%s id is empty, maybe not ready, skip this round",
pod.Namespace, pod.Name, containerStat.Name)
continue
}

containerCgroupDir, err := koordletutil.GetContainerCgroupPathWithKube(podMeta.CgroupDir, containerStat)
if err != nil {
klog.V(4).Infof("collect container %s/%s/%s cpu throttled failed, cannot get container cgroup, err: %s",
pod.Namespace, pod.Name, containerStat.Name, err)
continue
}

currentCPUStat, err := c.cgroupReader.ReadCPUStat(containerCgroupDir)
if err != nil {
// higher verbosity for probably non-running pods
if containerStat.State.Running == nil {
Expand All @@ -459,13 +471,6 @@ func (c *collector) collectContainerThrottledInfo(podMeta *statesinformer.PodMet
}
continue
}

currentCPUStat, err := c.cgroupReader.ReadCPUStat(containerCgroupDir)
if err != nil {
klog.V(4).Infof("collect container %s/%s/%s cpu throttled failed, err %v, metric %v",
pod.Namespace, pod.Name, containerStat.Name, err, currentCPUStat)
continue
}
lastCPUThrottledValue, ok := c.context.lastContainerCPUThrottled.Load(containerStat.ContainerID)
c.context.lastContainerCPUThrottled.Store(containerStat.ContainerID, currentCPUStat)
if !ok {
Expand Down
158 changes: 83 additions & 75 deletions pkg/koordlet/resmanager/cgroup_reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,17 @@ import (

apiext "github.com/koordinator-sh/koordinator/apis/extension"
slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1"
"github.com/koordinator-sh/koordinator/pkg/koordlet/executor"
"github.com/koordinator-sh/koordinator/pkg/koordlet/resmanager/configextensions"
"github.com/koordinator-sh/koordinator/pkg/koordlet/resourceexecutor"
"github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer"
koordletutil "github.com/koordinator-sh/koordinator/pkg/koordlet/util"
"github.com/koordinator-sh/koordinator/pkg/koordlet/util/system"
"github.com/koordinator-sh/koordinator/pkg/util"
)

const (
CgroupResourcesReconcileForceUpdateSeconds int = 60
)

type CgroupResourcesReconcile struct {
resmanager *resmanager
executor *executor.LeveledResourceUpdateExecutor
executor resourceexecutor.ResourceUpdateExecutor
}

// cgroupResourceSummary summarizes values of cgroup resources to update; nil value means not to update
Expand All @@ -57,11 +53,17 @@ type cgroupResourceSummary struct {
memoryOomKillGroup *int64
}

type cgroupResourceUpdaterMeta struct {
resourceType system.ResourceType
value *int64
isMergeable bool
}

func NewCgroupResourcesReconcile(resmanager *resmanager) *CgroupResourcesReconcile {
executor := executor.NewLeveledResourceUpdateExecutor("CgroupResourcesExecutor", CgroupResourcesReconcileForceUpdateSeconds)
e := resourceexecutor.NewResourceUpdateExecutor()
return &CgroupResourcesReconcile{
resmanager: resmanager,
executor: executor,
executor: e,
}
}

Expand Down Expand Up @@ -107,13 +109,13 @@ func (m *CgroupResourcesReconcile) calculateAndUpdateResources(nodeSLO *slov1alp
// to make sure the hierarchical cgroup resources are correctly updated, we simply update the resources by
// cgroup-level order.
// e.g. /kubepods.slice/memory.min, /kubepods.slice-podxxx/memory.min, /kubepods.slice-podxxx/docker-yyy/memory.min
leveledResources := [][]executor.MergeableResourceUpdater{qosResources, podResources, containerResources}
m.executor.LeveledUpdateBatchByCache(leveledResources)
leveledResources := [][]resourceexecutor.ResourceUpdater{qosResources, podResources, containerResources}
m.executor.LeveledUpdateBatch(true, leveledResources)
}

// calculateResources calculates qos-level, pod-level and container-level resources with nodeCfg and podMetas
func (m *CgroupResourcesReconcile) calculateResources(nodeCfg *slov1alpha1.ResourceQOSStrategy, node *corev1.Node,
podMetas []*statesinformer.PodMeta) (qosLevelResources, podLevelResources, containerLevelResources []executor.MergeableResourceUpdater) {
podMetas []*statesinformer.PodMeta) (qosLevelResources, podLevelResources, containerLevelResources []resourceexecutor.ResourceUpdater) {
// TODO: check anolis os version
qosSummary := map[corev1.PodQOSClass]*cgroupResourceSummary{
corev1.PodQOSGuaranteed: {},
Expand Down Expand Up @@ -162,7 +164,7 @@ func (m *CgroupResourcesReconcile) calculateResources(nodeCfg *slov1alpha1.Resou
}

func (m *CgroupResourcesReconcile) calculateQoSResources(summary *cgroupResourceSummary, qos corev1.PodQOSClass,
qosCfg *slov1alpha1.ResourceQOS) []executor.MergeableResourceUpdater {
qosCfg *slov1alpha1.ResourceQOS) []resourceexecutor.ResourceUpdater {
// double-check qosCfg is not nil
if qosCfg == nil {
klog.Warningf("calculateQoSResources aborts since qos config is %v", qosCfg)
Expand All @@ -178,11 +180,11 @@ func (m *CgroupResourcesReconcile) calculateQoSResources(summary *cgroupResource
summary.memoryOomKillGroup = qosCfg.MemoryQOS.OomKillGroup
}

return makeCgroupResources(executor.GroupOwnerRef(string(qos)), qosDir, summary)
return makeCgroupResources(qosDir, summary)
}

func (m *CgroupResourcesReconcile) calculatePodAndContainerResources(podMeta *statesinformer.PodMeta, node *corev1.Node,
podCfg *slov1alpha1.ResourceQOS) (podResources, containerResources []executor.MergeableResourceUpdater) {
podCfg *slov1alpha1.ResourceQOS) (podResources, containerResources []resourceexecutor.ResourceUpdater) {
pod := podMeta.Pod
podDir := koordletutil.GetPodCgroupDirWithKube(podMeta.CgroupDir)

Expand All @@ -208,7 +210,7 @@ func (m *CgroupResourcesReconcile) calculatePodAndContainerResources(podMeta *st
return
}

func (m *CgroupResourcesReconcile) calculatePodResources(pod *corev1.Pod, parentDir string, podCfg *slov1alpha1.ResourceQOS) []executor.MergeableResourceUpdater {
func (m *CgroupResourcesReconcile) calculatePodResources(pod *corev1.Pod, parentDir string, podCfg *slov1alpha1.ResourceQOS) []resourceexecutor.ResourceUpdater {
// double-check qos config is not nil
if podCfg == nil {
klog.V(5).Infof("calculatePodResources aborts since pod-level config is empty, cfg: %v", podCfg)
Expand Down Expand Up @@ -251,11 +253,11 @@ func (m *CgroupResourcesReconcile) calculatePodResources(pod *corev1.Pod, parent
}
}

return makeCgroupResources(executor.PodOwnerRef(pod.Namespace, pod.Name), parentDir, summary)
return makeCgroupResources(parentDir, summary)
}

func (m *CgroupResourcesReconcile) calculateContainerResources(container *corev1.Container, pod *corev1.Pod,
node *corev1.Node, parentDir string, podCfg *slov1alpha1.ResourceQOS) []executor.MergeableResourceUpdater {
node *corev1.Node, parentDir string, podCfg *slov1alpha1.ResourceQOS) []resourceexecutor.ResourceUpdater {
// double-check qos config is not nil
if podCfg == nil {
klog.V(5).Infof("calculateContainerResources aborts since pod-level config is empty, cfg: %v", podCfg)
Expand Down Expand Up @@ -321,7 +323,7 @@ func (m *CgroupResourcesReconcile) calculateContainerResources(container *corev1
}
}

return makeCgroupResources(executor.ContainerOwnerRef(pod.Namespace, pod.Name, container.Name), parentDir, summary)
return makeCgroupResources(parentDir, summary)
}

// getMergedPodResourceQoS returns a merged ResourceQOS for the pod (i.e. a pod-level qos config).
Expand Down Expand Up @@ -460,66 +462,72 @@ func completeCgroupSummaryForQoS(qosSummary map[corev1.PodQOSClass]*cgroupResour
}
}

func makeCgroupResources(owner *executor.OwnerRef, parentDir string, summary *cgroupResourceSummary) []executor.MergeableResourceUpdater {
var resources []executor.MergeableResourceUpdater

anolisResources := makeCgroupResourcesForAnolis(owner, parentDir, summary)
if len(anolisResources) > 0 {
resources = append(resources, anolisResources...)
}
func makeCgroupResources(parentDir string, summary *cgroupResourceSummary) []resourceexecutor.ResourceUpdater {
var resources []resourceexecutor.ResourceUpdater

return resources
}

func makeCgroupResourcesForAnolis(owner *executor.OwnerRef, parentDir string, summary *cgroupResourceSummary) []executor.MergeableResourceUpdater {
var resources []executor.MergeableResourceUpdater
//Memory
// mergeable resources: memory.min, memory.low, memory.high
for _, t := range []cgroupResourceUpdaterMeta{
{
resourceType: system.MemoryMinName,
value: summary.memoryMin,
isMergeable: true,
},
{
resourceType: system.MemoryLowName,
value: summary.memoryLow,
isMergeable: true,
},
{
resourceType: system.MemoryHighName,
value: summary.memoryHigh,
isMergeable: true,
},
{
resourceType: system.MemoryWmarkRatioName,
value: summary.memoryWmarkRatio,
},
{
resourceType: system.MemoryWmarkScaleFactorName,
value: summary.memoryWmarkScaleFactor,
},
{
resourceType: system.MemoryWmarkMinAdjName,
value: summary.memoryWmarkMinAdj,
},
// TBD: handle memory priority and oom group
{
resourceType: system.MemoryPriorityName,
value: summary.memoryPriority,
},
{
resourceType: system.MemoryUsePriorityOomName,
value: summary.memoryUsePriorityOom,
},
{
resourceType: system.MemoryOomGroupName,
value: summary.memoryOomKillGroup,
},
} {
if t.value == nil {
continue
}
valueStr := strconv.FormatInt(*t.value, 10)

if !system.HostSystemInfo.IsAnolisOS {
klog.V(5).Infof("ignored cgroup resources which required non Anolis OS, owner: %v, parentDir: %v",
owner, parentDir)
return nil
}
var r resourceexecutor.ResourceUpdater
var err error
if t.isMergeable {
r, err = resourceexecutor.NewMergeableCgroupUpdaterIfValueLarger(t.resourceType, parentDir, valueStr)
} else {
r, err = resourceexecutor.NewCommonCgroupUpdater(t.resourceType, parentDir, valueStr)
}

//Memory
if v := summary.memoryMin; v != nil && system.ValidateResourceValue(v, parentDir, system.MemoryMin) {
valueStr := strconv.FormatInt(*v, 10)
resources = append(resources, executor.NewMergeableCgroupResourceUpdater(owner, parentDir, system.MemoryMin,
valueStr, executor.MergeFuncUpdateCgroupIfLarger))
}
if v := summary.memoryLow; v != nil && system.ValidateResourceValue(v, parentDir, system.MemoryLow) {
valueStr := strconv.FormatInt(*v, 10)
resources = append(resources, executor.NewMergeableCgroupResourceUpdater(owner, parentDir, system.MemoryLow,
valueStr, executor.MergeFuncUpdateCgroupIfLarger))
}
if v := summary.memoryHigh; v != nil && system.ValidateResourceValue(v, parentDir, system.MemoryHigh) {
valueStr := strconv.FormatInt(*v, 10)
resources = append(resources, executor.NewMergeableCgroupResourceUpdater(owner, parentDir, system.MemoryHigh,
valueStr, executor.MergeFuncUpdateCgroupIfLarger))
}
if v := summary.memoryWmarkRatio; v != nil && system.ValidateResourceValue(v, parentDir, system.MemoryWmarkRatio) {
valueStr := strconv.FormatInt(*v, 10)
resources = append(resources, executor.NewCommonCgroupResourceUpdater(owner, parentDir, system.MemoryWmarkRatio, valueStr))
}
if v := summary.memoryWmarkScaleFactor; v != nil && system.ValidateResourceValue(v, parentDir, system.MemoryWmarkScaleFactor) {
valueStr := strconv.FormatInt(*v, 10)
resources = append(resources, executor.NewCommonCgroupResourceUpdater(owner, parentDir, system.MemoryWmarkScaleFactor, valueStr))
}
if v := summary.memoryWmarkMinAdj; v != nil && system.ValidateResourceValue(v, parentDir, system.MemoryWmarkMinAdj) {
valueStr := strconv.FormatInt(*v, 10)
resources = append(resources, executor.NewCommonCgroupResourceUpdater(owner, parentDir, system.MemoryWmarkMinAdj, valueStr))
}
// TBD: handle memory priority and oom group
if v := summary.memoryPriority; v != nil && system.ValidateResourceValue(v, parentDir, system.MemoryPriority) {
valueStr := strconv.FormatInt(*v, 10)
resources = append(resources, executor.NewCommonCgroupResourceUpdater(owner, parentDir, system.MemoryPriority, valueStr))
}
if v := summary.memoryUsePriorityOom; v != nil && system.ValidateResourceValue(v, parentDir, system.MemoryUsePriorityOom) {
valueStr := strconv.FormatInt(*v, 10)
resources = append(resources, executor.NewCommonCgroupResourceUpdater(owner, parentDir, system.MemoryUsePriorityOom, valueStr))
}
if v := summary.memoryOomKillGroup; v != nil && system.ValidateResourceValue(v, parentDir, system.MemoryOomGroup) {
valueStr := strconv.FormatInt(*v, 10)
resources = append(resources, executor.NewCommonCgroupResourceUpdater(owner, parentDir, system.MemoryOomGroup, valueStr))
if err != nil {
klog.V(5).Infof("skip cgroup resources that may be unsupported, resource %s [parentDir %s, value %v], err: %v",
t.resourceType, parentDir, *t.value, err)
continue
}
resources = append(resources, r)
}

return resources
Expand Down
Loading

0 comments on commit 26d27bf

Please sign in to comment.