Skip to content

Commit

Permalink
koord-scheduler: support kubelet cpu manager policy (#434)
Browse files Browse the repository at this point in the history
Signed-off-by: Joseph <joseph.t.lee@outlook.com>
  • Loading branch information
eahydra committed Aug 3, 2022
1 parent 9eb7b7d commit 1e77f1f
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 37 deletions.
23 changes: 14 additions & 9 deletions apis/extension/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ const (
// AnnotationNodeCPUSharedPools describes the CPU Shared Pool defined by Koordinator.
// The shared pool is mainly used by Koordinator LS Pods or K8s Burstable Pods.
AnnotationNodeCPUSharedPools = NodeDomainPrefix + "/cpu-shared-pools"
// AnnotationNodeCPUManagerPolicy describes the cpu manager policy options of kubelet
AnnotationNodeCPUManagerPolicy = NodeDomainPrefix + "/cpu-manager-policy"

// LabelNodeCPUBindPolicy constrains how to bind CPU logical CPUs when scheduling.
LabelNodeCPUBindPolicy = NodeDomainPrefix + "/cpu-bind-policy"
Expand All @@ -42,9 +40,6 @@ const (
)

const (
// cpu manager policy options of kubelet
NodeCPUManagerPolicyStatic = "static"
NodeCPUManagerPolicyNone = "none"
// NodeCPUBindPolicyFullPCPUsOnly requires that the scheduler must allocate full physical cores.
// Equivalent to kubelet CPU manager policy option full-pcpus-only=true.
NodeCPUBindPolicyFullPCPUsOnly = "FullPCPUsOnly"
Expand All @@ -55,6 +50,16 @@ const (
NodeNUMAAllocateStrategyMostAllocated = string(schedulingconfig.NUMAMostAllocated)
)

const (
// AnnotationKubeletCPUManagerPolicy describes the cpu manager policy options of kubelet
AnnotationKubeletCPUManagerPolicy = "kubelet.koordinator.sh/cpu-manager-policy"

KubeletCPUManagerPolicyStatic = "static"
KubeletCPUManagerPolicyNone = "none"
KubeletCPUManagerPolicyFullPCPUsOnlyOption = "full-pcpus-only"
KubeletCPUManagerPolicyDistributeCPUsAcrossNUMAOption = "distribute-cpus-across-numa"
)

type CPUTopology struct {
Detail []CPUInfo `json:"detail,omitempty"`
}
Expand All @@ -76,7 +81,7 @@ type PodCPUAlloc struct {

type PodCPUAllocs []PodCPUAlloc

type CPUManagerPolicy struct {
type KubeletCPUManagerPolicy struct {
Policy string `json:"policy,omitempty"`
Options map[string]string `json:"options,omitempty"`
}
Expand Down Expand Up @@ -120,9 +125,9 @@ func GetNodeCPUSharePools(nodeTopoAnnotations map[string]string) ([]CPUSharedPoo
return cpuSharePools, nil
}

func GetCPUManagerPolicy(annotations map[string]string) (*CPUManagerPolicy, error) {
cpuManagerPolicy := &CPUManagerPolicy{}
data, ok := annotations[AnnotationNodeCPUManagerPolicy]
func GetKubeletCPUManagerPolicy(annotations map[string]string) (*KubeletCPUManagerPolicy, error) {
cpuManagerPolicy := &KubeletCPUManagerPolicy{}
data, ok := annotations[AnnotationKubeletCPUManagerPolicy]
if !ok {
return cpuManagerPolicy, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ The koordlet creates or updates NodeResourceTopology periodically. The name of N

At present, the NodeResourceTopology lacks some information, and it is temporarily written in the NodeResourceTopology in the form of annotations or labels:

- The annotation `kubelet.koordinator.sh/cpu-manger-policy` describes the kubelet CPU manager policy and options. The scheme is defined as follows:
- The annotation `kubelet.koordinator.sh/cpu-manager-policy` describes the kubelet CPU manager policy and options. The scheme is defined as follows:

```go
const (
Expand Down Expand Up @@ -423,7 +423,7 @@ apiVersion: topology.node.k8s.io/v1alpha1
kind: NodeResourceTopology
metadata:
annotations:
kubelet.koordinator.sh/cpu-manger-policy: |-
kubelet.koordinator.sh/cpu-manager-policy: |-
{
"policy": "static",
"options": {
Expand Down
8 changes: 4 additions & 4 deletions pkg/koordlet/runtimehooks/hooks/cpuset/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@ import (
"strings"

topov1alpha1 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha1"
"github.com/koordinator-sh/koordinator/pkg/util"
corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"

ext "github.com/koordinator-sh/koordinator/apis/extension"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/protocol"
"github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer"
"github.com/koordinator-sh/koordinator/pkg/util"
)

type cpusetRule struct {
kubeletPolicy ext.CPUManagerPolicy
kubeletPolicy ext.KubeletCPUManagerPolicy
sharePools []ext.CPUSharedPool
}

Expand Down Expand Up @@ -84,7 +84,7 @@ func (r *cpusetRule) getContainerCPUSet(containerReq *protocol.ContainerRequest)
return pointer.String(""), nil
}

if r.kubeletPolicy.Policy == ext.NodeCPUManagerPolicyStatic {
if r.kubeletPolicy.Policy == ext.KubeletCPUManagerPolicyStatic {
return nil, nil
} else {
// none policy
Expand All @@ -102,7 +102,7 @@ func (p *cpusetPlugin) parseRule(nodeTopoIf interface{}) (bool, error) {
if err != nil {
return false, err
}
cpuManagerPolicy, err := ext.GetCPUManagerPolicy(nodeTopo.Annotations)
cpuManagerPolicy, err := ext.GetKubeletCPUManagerPolicy(nodeTopo.Annotations)
if err != nil {
return false, err
}
Expand Down
18 changes: 9 additions & 9 deletions pkg/koordlet/runtimehooks/hooks/cpuset/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func Test_cpusetRule_getContainerCPUSet(t *testing.T) {
{
name: "get all share pools for origin burstable pod under none policy",
fields: fields{
kubeletPoicy: ext.NodeCPUManagerPolicyNone,
kubeletPoicy: ext.KubeletCPUManagerPolicyNone,
sharePools: []ext.CPUSharedPool{
{
Socket: 0,
Expand Down Expand Up @@ -171,7 +171,7 @@ func Test_cpusetRule_getContainerCPUSet(t *testing.T) {
{
name: "do nothing for origin burstable pod under static policy",
fields: fields{
kubeletPoicy: ext.NodeCPUManagerPolicyStatic,
kubeletPoicy: ext.KubeletCPUManagerPolicyStatic,
sharePools: []ext.CPUSharedPool{
{
Socket: 0,
Expand Down Expand Up @@ -229,7 +229,7 @@ func Test_cpusetRule_getContainerCPUSet(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := &cpusetRule{
kubeletPolicy: ext.CPUManagerPolicy{
kubeletPolicy: ext.KubeletCPUManagerPolicy{
Policy: tt.fields.kubeletPoicy,
},
sharePools: tt.fields.sharePools,
Expand Down Expand Up @@ -328,7 +328,7 @@ func Test_cpusetPlugin_parseRule(t *testing.T) {
}
type args struct {
nodeTopo *topov1alpha1.NodeResourceTopology
cpuPolicy *ext.CPUManagerPolicy
cpuPolicy *ext.KubeletCPUManagerPolicy
sharePools []ext.CPUSharedPool
}
tests := []struct {
Expand Down Expand Up @@ -449,8 +449,8 @@ func Test_cpusetPlugin_parseRule(t *testing.T) {
Name: "test-node",
},
},
cpuPolicy: &ext.CPUManagerPolicy{
Policy: ext.NodeCPUManagerPolicyNone,
cpuPolicy: &ext.KubeletCPUManagerPolicy{
Policy: ext.KubeletCPUManagerPolicyNone,
},
sharePools: []ext.CPUSharedPool{
{
Expand All @@ -467,8 +467,8 @@ func Test_cpusetPlugin_parseRule(t *testing.T) {
},
wantUpdated: true,
wantRule: &cpusetRule{
kubeletPolicy: ext.CPUManagerPolicy{
Policy: ext.NodeCPUManagerPolicyNone,
kubeletPolicy: ext.KubeletCPUManagerPolicy{
Policy: ext.KubeletCPUManagerPolicyNone,
},
sharePools: []ext.CPUSharedPool{
{
Expand Down Expand Up @@ -496,7 +496,7 @@ func Test_cpusetPlugin_parseRule(t *testing.T) {
}
if tt.args.cpuPolicy != nil {
cpuPolicyJson := util.DumpJSON(tt.args.cpuPolicy)
tt.args.nodeTopo.Annotations[ext.AnnotationNodeCPUManagerPolicy] = cpuPolicyJson
tt.args.nodeTopo.Annotations[ext.AnnotationKubeletCPUManagerPolicy] = cpuPolicyJson
}
if len(tt.args.sharePools) != 0 {
sharePoolJson := util.DumpJSON(tt.args.sharePools)
Expand Down
4 changes: 2 additions & 2 deletions pkg/koordlet/statesinformer/states_noderesourcetopology.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (s *statesInformer) reportNodeTopology() {
if err != nil {
klog.Errorf("failed to guess kubelet cpu manager opt, err: %v", err)
}
cpuManagerPolicy := extension.CPUManagerPolicy{
cpuManagerPolicy := extension.KubeletCPUManagerPolicy{
Policy: cpuPolicy,
Options: cpuManagerOpt,
}
Expand Down Expand Up @@ -211,7 +211,7 @@ func (s *statesInformer) reportNodeTopology() {
s.updateNodeTopo(topology)
topology.Annotations[extension.AnnotationNodeCPUTopology] = string(cpuTopologyJson)
topology.Annotations[extension.AnnotationNodeCPUSharedPools] = string(cpuSharePoolsJson)
topology.Annotations[extension.AnnotationNodeCPUManagerPolicy] = string(cpuManagerPolicyJson)
topology.Annotations[extension.AnnotationKubeletCPUManagerPolicy] = string(cpuManagerPolicyJson)
if len(podAllocsJson) != 0 {
topology.Annotations[extension.AnnotationNodeCPUAllocs] = string(podAllocsJson)
}
Expand Down
23 changes: 19 additions & 4 deletions pkg/scheduler/plugins/nodenumaresource/node_numa_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type nodeNUMAInfo struct {
cpuTopology *CPUTopology
allocatedPods map[types.UID]struct{}
allocatedCPUs CPUDetails
*extension.KubeletCPUManagerPolicy
}

type nodeNumaInfoCache struct {
Expand All @@ -45,10 +46,11 @@ type nodeNumaInfoCache struct {

func newNodeNUMAInfo(nodeName string, cpuTopology *CPUTopology) *nodeNUMAInfo {
return &nodeNUMAInfo{
nodeName: nodeName,
cpuTopology: cpuTopology,
allocatedPods: map[types.UID]struct{}{},
allocatedCPUs: NewCPUDetails(),
nodeName: nodeName,
cpuTopology: cpuTopology,
allocatedPods: map[types.UID]struct{}{},
allocatedCPUs: NewCPUDetails(),
KubeletCPUManagerPolicy: &extension.KubeletCPUManagerPolicy{},
}
}

Expand Down Expand Up @@ -149,6 +151,11 @@ func (c *nodeNumaInfoCache) setNodeResourceTopology(oldNodeResTopology, nodeResT
if err != nil {
klog.Errorf("Failed to GetPodCPUAllocs from new NodeResourceTopology %s, err: %v", nodeResTopology.Name, err)
}
kubeletCPUManagerPolicy, err := extension.GetKubeletCPUManagerPolicy(nodeResTopology.Annotations)
if err != nil {
klog.Errorf("Failed to GetKubeletCPUManagerPolicy from NodeResourceTopology %s, err: %v", nodeResTopology.Name, err)
}

var oldPodCPUAllocs extension.PodCPUAllocs
if oldNodeResTopology != nil {
oldPodCPUAllocs, err = extension.GetPodCPUAllocs(oldNodeResTopology.Annotations)
Expand All @@ -170,6 +177,7 @@ func (c *nodeNumaInfoCache) setNodeResourceTopology(oldNodeResTopology, nodeResT
numaInfo.lock.Lock()
defer numaInfo.lock.Unlock()
numaInfo.updateCPUTopology(cpuTopology)
numaInfo.updateKubeletCPUManagerPolicy(kubeletCPUManagerPolicy)
numaInfo.releaseCPUsManagedByKubelet(oldPodCPUAllocs)
numaInfo.updateCPUsManagedByKubelet(podCPUAllocs)
}
Expand Down Expand Up @@ -243,6 +251,13 @@ func (n *nodeNUMAInfo) updateCPUTopology(topology *CPUTopology) {
n.cpuTopology = topology
}

func (n *nodeNUMAInfo) updateKubeletCPUManagerPolicy(policy *extension.KubeletCPUManagerPolicy) {
if policy == nil {
policy = &extension.KubeletCPUManagerPolicy{}
}
n.KubeletCPUManagerPolicy = policy
}

func (n *nodeNUMAInfo) updateCPUsManagedByKubelet(podCPUAllocs extension.PodCPUAllocs) {
for _, v := range podCPUAllocs {
if !v.ManagedByKubelet || v.UID == "" || v.CPUSet == "" {
Expand Down
4 changes: 3 additions & 1 deletion pkg/scheduler/plugins/nodenumaresource/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,9 @@ func (p *Plugin) Filter(ctx context.Context, cycleState *framework.CycleState, p
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrInvalidCPUTopology)
}

if node.Labels[extension.LabelNodeCPUBindPolicy] == extension.NodeCPUBindPolicyFullPCPUsOnly {
if node.Labels[extension.LabelNodeCPUBindPolicy] == extension.NodeCPUBindPolicyFullPCPUsOnly ||
(numaInfo.KubeletCPUManagerPolicy.Policy == extension.KubeletCPUManagerPolicyStatic &&
numaInfo.KubeletCPUManagerPolicy.Options[extension.KubeletCPUManagerPolicyFullPCPUsOnlyOption] == "true") {
if state.numCPUsNeeded%numaInfo.cpuTopology.CPUsPerCore() != 0 {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrSMTAlignmentError)
}
Expand Down
52 changes: 46 additions & 6 deletions pkg/scheduler/plugins/nodenumaresource/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,12 +416,13 @@ func TestPlugin_PreFilter(t *testing.T) {

func TestPlugin_Filter(t *testing.T) {
tests := []struct {
name string
nodeLabels map[string]string
state *preFilterState
pod *corev1.Pod
numaInfo *nodeNUMAInfo
want *framework.Status
name string
nodeLabels map[string]string
kubeletPolicy *extension.KubeletCPUManagerPolicy
state *preFilterState
pod *corev1.Pod
numaInfo *nodeNUMAInfo
want *framework.Status
}{
{
name: "error with missing preFilterState",
Expand Down Expand Up @@ -492,6 +493,42 @@ func TestPlugin_Filter(t *testing.T) {
pod: &corev1.Pod{},
want: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrRequiredFullPCPUsPolicy),
},
{
name: "verify Kubelet FullPCPUsOnly with SMTAlignmentError",
state: &preFilterState{
skip: false,
resourceSpec: &extension.ResourceSpec{},
preferredCPUBindPolicy: schedulingconfig.CPUBindPolicyFullPCPUs,
numCPUsNeeded: 5,
},
numaInfo: newNodeNUMAInfo("test-node-1", buildCPUTopologyForTest(2, 1, 4, 2)),
kubeletPolicy: &extension.KubeletCPUManagerPolicy{
Policy: extension.KubeletCPUManagerPolicyStatic,
Options: map[string]string{
extension.KubeletCPUManagerPolicyFullPCPUsOnlyOption: "true",
},
},
pod: &corev1.Pod{},
want: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrSMTAlignmentError),
},
{
name: "verify Kubelet FullPCPUsOnly with RequiredFullPCPUsPolicy",
state: &preFilterState{
skip: false,
resourceSpec: &extension.ResourceSpec{},
preferredCPUBindPolicy: schedulingconfig.CPUBindPolicySpreadByPCPUs,
numCPUsNeeded: 4,
},
numaInfo: newNodeNUMAInfo("test-node-1", buildCPUTopologyForTest(2, 1, 4, 2)),
kubeletPolicy: &extension.KubeletCPUManagerPolicy{
Policy: extension.KubeletCPUManagerPolicyStatic,
Options: map[string]string{
extension.KubeletCPUManagerPolicyFullPCPUsOnlyOption: "true",
},
},
pod: &corev1.Pod{},
want: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrRequiredFullPCPUsPolicy),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -520,6 +557,9 @@ func TestPlugin_Filter(t *testing.T) {

plg := p.(*Plugin)
if tt.numaInfo != nil {
if tt.kubeletPolicy != nil {
tt.numaInfo.KubeletCPUManagerPolicy = tt.kubeletPolicy
}
plg.nodeInfoCache.nodes[tt.numaInfo.nodeName] = tt.numaInfo
}

Expand Down

0 comments on commit 1e77f1f

Please sign in to comment.