Skip to content

Commit

Permalink
feature: report pod alloc of Guaranteed pod and cpu manager policy (k…
Browse files Browse the repository at this point in the history
…oordinator-sh#386)

Signed-off-by: Yue Zhang <huaihuan.zy@alibaba-inc.com>
  • Loading branch information
ZYecho authored and jasonliu747 committed Jul 26, 2022
1 parent de19981 commit fc22b73
Show file tree
Hide file tree
Showing 5 changed files with 230 additions and 7 deletions.
22 changes: 21 additions & 1 deletion apis/extension/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ import (
const (
// AnnotationNodeCPUTopology describes the detailed CPU topology.
AnnotationNodeCPUTopology = NodeDomainPrefix + "/cpu-topology"
// AnnotationNodeCPUAllocs describes the CPUs allocated by Koordinator LSE/LSR and K8s Guaranteed Pods.
// AnnotationNodeCPUAllocs describes K8s Guaranteed Pods.
AnnotationNodeCPUAllocs = NodeDomainPrefix + "/pod-cpu-allocs"
// AnnotationNodeCPUSharedPools describes the CPU Shared Pool defined by Koordinator.
// The shared pool is mainly used by Koordinator LS Pods or K8s Burstable Pods.
AnnotationNodeCPUSharedPools = NodeDomainPrefix + "/cpu-shared-pools"
// AnnotationNodeCPUManagerPolicy describes the cpu manager policy options of kubelet
AnnotationNodeCPUManagerPolicy = NodeDomainPrefix + "/cpu-manager-policy"

// LabelNodeCPUBindPolicy constrains how to bind CPU logical CPUs when scheduling.
LabelNodeCPUBindPolicy = NodeDomainPrefix + "/cpu-bind-policy"
Expand Down Expand Up @@ -68,6 +70,11 @@ type PodCPUAlloc struct {
CPUSet string `json:"cpuset,omitempty"`
}

type CPUManagerPolicy struct {
Policy string `json:"policy,omitempty"`
Options map[string]string `json:"options,omitempty"`
}

type PodCPUAllocs []PodCPUAlloc

func GetCPUTopology(annotations map[string]string) (*CPUTopology, error) {
Expand Down Expand Up @@ -108,3 +115,16 @@ func GetNodeCPUSharePools(nodeTopoAnnotations map[string]string) ([]CPUSharedPoo
}
return cpuSharePools, nil
}

func GetCPUManagerPolicy(annotations map[string]string) (*CPUManagerPolicy, error) {
cpuManagerPolicy := &CPUManagerPolicy{}
data, ok := annotations[AnnotationNodeCPUManagerPolicy]
if !ok {
return cpuManagerPolicy, nil
}
err := json.Unmarshal([]byte(data), cpuManagerPolicy)
if err != nil {
return nil, err
}
return cpuManagerPolicy, nil
}
77 changes: 71 additions & 6 deletions pkg/koordlet/statesinformer/states_noderesourcetopology.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,21 @@ package statesinformer
import (
"context"
"encoding/json"
"fmt"
"os"

"github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"

"github.com/koordinator-sh/koordinator/apis/extension"
"github.com/koordinator-sh/koordinator/pkg/koordlet/metriccache"
"github.com/koordinator-sh/koordinator/pkg/util/system"
)

func (s *statesInformer) syncNodeResourceTopology(node *corev1.Node) {
Expand Down Expand Up @@ -74,27 +78,87 @@ func (s *statesInformer) syncNodeResourceTopology(node *corev1.Node) {
}
}

func (s *statesInformer) calGuaranteedCpu(usedCPUs map[int32]*extension.CPUInfo, stateJson string) ([]extension.PodCPUAlloc, error) {
if stateJson == "" {
return nil, fmt.Errorf("empty state file")
}
checkPoint := &state.CPUManagerCheckpoint{}
err := json.Unmarshal([]byte(stateJson), checkPoint)
if err != nil {
return nil, err
}
podAllocs := []extension.PodCPUAlloc{}
for pod := range checkPoint.Entries {
cpuSet := cpuset.NewCPUSet()
for container, cpuString := range checkPoint.Entries[pod] {
if tmpContainerCPUSet, err := cpuset.Parse(cpuString); err != nil {
klog.Errorf("could not parse cpuset %q for container %q in pod %q: %v", cpuString, container, pod, err)
continue
} else {
cpuSet = cpuSet.Union(tmpContainerCPUSet)
}
}
podAllocs = append(podAllocs, extension.PodCPUAlloc{
Name: pod,
CPUSet: cpuSet.String(),
})
for _, cpuID := range cpuSet.ToSliceNoSort() {
delete(usedCPUs, int32(cpuID))
}
}
return podAllocs, nil
}

func (s *statesInformer) reportNodeTopology() {
klog.Info("start to report node topology")
s.nodeRWMutex.RLock()
nodeName := s.node.Name
s.nodeRWMutex.RUnlock()
ctx := context.TODO()

cpuTopology, usedCPUs, err := s.calCpuTopology()
if err != nil {
return
}
// TODO: support https://kubernetes.io/docs/reference/config-api/kubelet-config.v1beta1/
cpuPolicy, stateFilePath, cpuManagerOpt, err := system.GuessCPUManagerOptFromKubelet()
if err != nil {
klog.Errorf("failed to guess kubelet cpu manager opt, err: %v", err)
}
cpuManagerPolicy := extension.CPUManagerPolicy{
Policy: cpuPolicy,
Options: cpuManagerOpt,
}
cpuManagerPolicyJson, err := json.Marshal(cpuManagerPolicy)
if err != nil {
klog.Errorf("failed to marshal cpu manager policy, err: %v", err)
}
var podAllocsJson []byte
if stateFilePath != "" {
data, err := os.ReadFile(stateFilePath)
if err != nil {
klog.Errorf("failed to read file, err: %v", err)
}
podAllocs, err := s.calGuaranteedCpu(usedCPUs, string(data))
if err != nil {
klog.Errorf("failed to cal GuaranteedCpu, err: %v", err)
}
if len(podAllocs) != 0 {
podAllocsJson, err = json.Marshal(podAllocs)
if err != nil {
klog.Errorf("failed to marshal pod allocs, err: %v", err)
}
}
}
// TODO: report lse/lsr pod from cgroup
sharePools := s.calCPUSharePools(usedCPUs)

cpuTopologyJson, err := json.Marshal(cpuTopology)
if err != nil {
klog.Errorf("failed to marshal cpu topology of node %s, err: %v", nodeName, err)
return
}
cpuSharePoolsJson, err := json.Marshal(sharePools)
if err != nil {
klog.Errorf("failed to marshal cpushare pools of node %s, err: %v", nodeName, err)
return
}

err = retry.OnError(retry.DefaultBackoff, errors.IsTooManyRequests, func() error {
Expand All @@ -108,11 +172,12 @@ func (s *statesInformer) reportNodeTopology() {
}
// TODO only update if necessary
s.updateNodeTopo(topology)
if topology.Annotations[extension.AnnotationNodeCPUTopology] == string(cpuTopologyJson) && topology.Annotations[extension.AnnotationNodeCPUSharedPools] == string(cpuSharePoolsJson) {
return nil
}
topology.Annotations[extension.AnnotationNodeCPUTopology] = string(cpuTopologyJson)
topology.Annotations[extension.AnnotationNodeCPUSharedPools] = string(cpuSharePoolsJson)
topology.Annotations[extension.AnnotationNodeCPUManagerPolicy] = string(cpuManagerPolicyJson)
if len(podAllocsJson) != 0 {
topology.Annotations[extension.AnnotationNodeCPUAllocs] = string(podAllocsJson)
}
_, err = s.topologyClient.TopologyV1alpha1().NodeResourceTopologies().Update(context.TODO(), topology, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("failed to update cpu info of node %s, err: %v", nodeName, err)
Expand Down
60 changes: 60 additions & 0 deletions pkg/koordlet/statesinformer/states_noderesourcetopology_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,66 @@ func Test_syncNodeResourceTopology(t *testing.T) {
assert.Equal(t, "Koordinator", topology.Labels[extension.LabelManagedBy])
}

func Test_calGuaranteedCpu(t *testing.T) {
testCases := []struct {
description string
checkpointContent string
expectedError bool
expectedPodAllocs []extension.PodCPUAlloc
}{
{
"Restore non-existing checkpoint",
"",
true,
nil,
},
{
"Restore empty entry",
`{
"policyName": "none",
"defaultCPUSet": "4-6",
"entries": {},
"checksum": 354655845
}`,
false,
extension.PodCPUAllocs{},
},
{
"Restore checkpoint with invalid JSON",
`{`,
true,
nil,
},
{
"Restore checkpoint with normal assignment entry",
`{
"policyName": "none",
"defaultCPUSet": "1-3",
"entries": {
"pod": {
"container1": "1-2",
"container2": "2-3"
}
},
"checksum": 962272150
}`,
false,
[]extension.PodCPUAlloc{
{
Name: "pod",
CPUSet: "1-3",
},
},
},
}
s := &statesInformer{}
for _, c := range testCases {
podAllocs, err := s.calGuaranteedCpu(map[int32]*extension.CPUInfo{}, c.checkpointContent)
assert.Equal(t, c.expectedError, err != nil)
assert.Equal(t, c.expectedPodAllocs, podAllocs)
}
}

func Test_reportNodeTopology(t *testing.T) {
client := topologyclientsetfake.NewSimpleClientset()
testNode := &corev1.Node{
Expand Down
54 changes: 54 additions & 0 deletions pkg/util/system/cpu_manager_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
//go:build linux
// +build linux

/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package system

import (
"fmt"
"path"

"github.com/spf13/pflag"
cliflag "k8s.io/component-base/cli/flag"
)

// return cpu policy, cpu state file path and cpu manager opt
func GuessCPUManagerOptFromKubelet() (string, string, map[string]string, error) {
pids, err := PidOf(Conf.ProcRootDir, "kubelet")
if err != nil || len(pids) == 0 {
return "", "", nil, fmt.Errorf("failed to find kubelet's pid, kubelet may stop: %v", err)
}
kubeletPid := pids[0]

kubeletArgs, err := ProcCmdLine(Conf.ProcRootDir, kubeletPid)
if err != nil || len(kubeletArgs) <= 1 {
return "", "", nil, fmt.Errorf("failed to get kubelet's args: %v", err)
}
var argsRootDir string
argsCpuManagerOpt := map[string]string{}
var argsCpuPolicy string
fs := pflag.NewFlagSet("GuessTest", pflag.ContinueOnError)
fs.ParseErrorsWhitelist.UnknownFlags = true
fs.StringVar(&argsRootDir, "root-dir", "/var/lib/kubelet", "")
fs.StringVar(&argsCpuPolicy, "cpu-manager-policy", "none", "")
fs.Var(cliflag.NewMapStringStringNoSplit(&argsCpuManagerOpt), "cpu-manager-policy-options", "")
if err := fs.Parse(kubeletArgs[1:]); err != nil {
return "", "", nil, fmt.Errorf("failed to parse kubelet's args, kubelet version may not support: %v", err)
}
return argsCpuPolicy, path.Join(argsRootDir, "cpu_manager_state"), argsCpuManagerOpt, nil
}
24 changes: 24 additions & 0 deletions pkg/util/system/cpu_manager_unsupported.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
//go:build !linux
// +build !linux

/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package system

func GuessCPUManagerOptFromKubelet() (string, string, map[string]string, error) {
return "", "", nil, nil
}

0 comments on commit fc22b73

Please sign in to comment.