Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

koord-scheduler: support kubelet cpu manager policy #434

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions apis/extension/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ const (
// AnnotationNodeCPUSharedPools describes the CPU Shared Pool defined by Koordinator.
// The shared pool is mainly used by Koordinator LS Pods or K8s Burstable Pods.
AnnotationNodeCPUSharedPools = NodeDomainPrefix + "/cpu-shared-pools"
// AnnotationNodeCPUManagerPolicy describes the cpu manager policy options of kubelet
AnnotationNodeCPUManagerPolicy = NodeDomainPrefix + "/cpu-manager-policy"

// LabelNodeCPUBindPolicy constrains how to bind CPU logical CPUs when scheduling.
LabelNodeCPUBindPolicy = NodeDomainPrefix + "/cpu-bind-policy"
Expand All @@ -42,9 +40,6 @@ const (
)

const (
// cpu manager policy options of kubelet
NodeCPUManagerPolicyStatic = "static"
NodeCPUManagerPolicyNone = "none"
// NodeCPUBindPolicyFullPCPUsOnly requires that the scheduler must allocate full physical cores.
// Equivalent to kubelet CPU manager policy option full-pcpus-only=true.
NodeCPUBindPolicyFullPCPUsOnly = "FullPCPUsOnly"
Expand All @@ -55,6 +50,16 @@ const (
NodeNUMAAllocateStrategyMostAllocated = string(schedulingconfig.NUMAMostAllocated)
)

const (
// AnnotationKubeletCPUManagerPolicy describes the cpu manager policy options of kubelet
AnnotationKubeletCPUManagerPolicy = "kubelet.koordinator.sh/cpu-manager-policy"

KubeletCPUManagerPolicyStatic = "static"
KubeletCPUManagerPolicyNone = "none"
KubeletCPUManagerPolicyFullPCPUsOnlyOption = "full-pcpus-only"
KubeletCPUManagerPolicyDistributeCPUsAcrossNUMAOption = "distribute-cpus-across-numa"
)

type CPUTopology struct {
Detail []CPUInfo `json:"detail,omitempty"`
}
Expand All @@ -76,7 +81,7 @@ type PodCPUAlloc struct {

type PodCPUAllocs []PodCPUAlloc

type CPUManagerPolicy struct {
type KubeletCPUManagerPolicy struct {
Policy string `json:"policy,omitempty"`
Options map[string]string `json:"options,omitempty"`
}
Expand Down Expand Up @@ -120,9 +125,9 @@ func GetNodeCPUSharePools(nodeTopoAnnotations map[string]string) ([]CPUSharedPoo
return cpuSharePools, nil
}

func GetCPUManagerPolicy(annotations map[string]string) (*CPUManagerPolicy, error) {
cpuManagerPolicy := &CPUManagerPolicy{}
data, ok := annotations[AnnotationNodeCPUManagerPolicy]
func GetKubeletCPUManagerPolicy(annotations map[string]string) (*KubeletCPUManagerPolicy, error) {
cpuManagerPolicy := &KubeletCPUManagerPolicy{}
data, ok := annotations[AnnotationKubeletCPUManagerPolicy]
if !ok {
return cpuManagerPolicy, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ The koordlet creates or updates NodeResourceTopology periodically. The name of N

At present, the NodeResourceTopology lacks some information, and it is temporarily written in the NodeResourceTopology in the form of annotations or labels:

- The annotation `kubelet.koordinator.sh/cpu-manger-policy` describes the kubelet CPU manager policy and options. The scheme is defined as follows:
- The annotation `kubelet.koordinator.sh/cpu-manager-policy` describes the kubelet CPU manager policy and options. The scheme is defined as follows:

```go
const (
Expand Down Expand Up @@ -423,7 +423,7 @@ apiVersion: topology.node.k8s.io/v1alpha1
kind: NodeResourceTopology
metadata:
annotations:
kubelet.koordinator.sh/cpu-manger-policy: |-
kubelet.koordinator.sh/cpu-manager-policy: |-
{
"policy": "static",
"options": {
Expand Down
8 changes: 4 additions & 4 deletions pkg/koordlet/runtimehooks/hooks/cpuset/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@ import (
"strings"

topov1alpha1 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha1"
"github.com/koordinator-sh/koordinator/pkg/util"
corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"

ext "github.com/koordinator-sh/koordinator/apis/extension"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/protocol"
"github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer"
"github.com/koordinator-sh/koordinator/pkg/util"
)

type cpusetRule struct {
kubeletPolicy ext.CPUManagerPolicy
kubeletPolicy ext.KubeletCPUManagerPolicy
sharePools []ext.CPUSharedPool
}

Expand Down Expand Up @@ -84,7 +84,7 @@ func (r *cpusetRule) getContainerCPUSet(containerReq *protocol.ContainerRequest)
return pointer.String(""), nil
}

if r.kubeletPolicy.Policy == ext.NodeCPUManagerPolicyStatic {
if r.kubeletPolicy.Policy == ext.KubeletCPUManagerPolicyStatic {
return nil, nil
} else {
// none policy
Expand All @@ -102,7 +102,7 @@ func (p *cpusetPlugin) parseRule(nodeTopoIf interface{}) (bool, error) {
if err != nil {
return false, err
}
cpuManagerPolicy, err := ext.GetCPUManagerPolicy(nodeTopo.Annotations)
cpuManagerPolicy, err := ext.GetKubeletCPUManagerPolicy(nodeTopo.Annotations)
if err != nil {
return false, err
}
Expand Down
18 changes: 9 additions & 9 deletions pkg/koordlet/runtimehooks/hooks/cpuset/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func Test_cpusetRule_getContainerCPUSet(t *testing.T) {
{
name: "get all share pools for origin burstable pod under none policy",
fields: fields{
kubeletPoicy: ext.NodeCPUManagerPolicyNone,
kubeletPoicy: ext.KubeletCPUManagerPolicyNone,
sharePools: []ext.CPUSharedPool{
{
Socket: 0,
Expand Down Expand Up @@ -171,7 +171,7 @@ func Test_cpusetRule_getContainerCPUSet(t *testing.T) {
{
name: "do nothing for origin burstable pod under static policy",
fields: fields{
kubeletPoicy: ext.NodeCPUManagerPolicyStatic,
kubeletPoicy: ext.KubeletCPUManagerPolicyStatic,
sharePools: []ext.CPUSharedPool{
{
Socket: 0,
Expand Down Expand Up @@ -229,7 +229,7 @@ func Test_cpusetRule_getContainerCPUSet(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := &cpusetRule{
kubeletPolicy: ext.CPUManagerPolicy{
kubeletPolicy: ext.KubeletCPUManagerPolicy{
Policy: tt.fields.kubeletPoicy,
},
sharePools: tt.fields.sharePools,
Expand Down Expand Up @@ -328,7 +328,7 @@ func Test_cpusetPlugin_parseRule(t *testing.T) {
}
type args struct {
nodeTopo *topov1alpha1.NodeResourceTopology
cpuPolicy *ext.CPUManagerPolicy
cpuPolicy *ext.KubeletCPUManagerPolicy
sharePools []ext.CPUSharedPool
}
tests := []struct {
Expand Down Expand Up @@ -449,8 +449,8 @@ func Test_cpusetPlugin_parseRule(t *testing.T) {
Name: "test-node",
},
},
cpuPolicy: &ext.CPUManagerPolicy{
Policy: ext.NodeCPUManagerPolicyNone,
cpuPolicy: &ext.KubeletCPUManagerPolicy{
Policy: ext.KubeletCPUManagerPolicyNone,
},
sharePools: []ext.CPUSharedPool{
{
Expand All @@ -467,8 +467,8 @@ func Test_cpusetPlugin_parseRule(t *testing.T) {
},
wantUpdated: true,
wantRule: &cpusetRule{
kubeletPolicy: ext.CPUManagerPolicy{
Policy: ext.NodeCPUManagerPolicyNone,
kubeletPolicy: ext.KubeletCPUManagerPolicy{
Policy: ext.KubeletCPUManagerPolicyNone,
},
sharePools: []ext.CPUSharedPool{
{
Expand Down Expand Up @@ -496,7 +496,7 @@ func Test_cpusetPlugin_parseRule(t *testing.T) {
}
if tt.args.cpuPolicy != nil {
cpuPolicyJson := util.DumpJSON(tt.args.cpuPolicy)
tt.args.nodeTopo.Annotations[ext.AnnotationNodeCPUManagerPolicy] = cpuPolicyJson
tt.args.nodeTopo.Annotations[ext.AnnotationKubeletCPUManagerPolicy] = cpuPolicyJson
}
if len(tt.args.sharePools) != 0 {
sharePoolJson := util.DumpJSON(tt.args.sharePools)
Expand Down
4 changes: 2 additions & 2 deletions pkg/koordlet/statesinformer/states_noderesourcetopology.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (s *statesInformer) reportNodeTopology() {
if err != nil {
klog.Errorf("failed to guess kubelet cpu manager opt, err: %v", err)
}
cpuManagerPolicy := extension.CPUManagerPolicy{
cpuManagerPolicy := extension.KubeletCPUManagerPolicy{
Policy: cpuPolicy,
Options: cpuManagerOpt,
}
Expand Down Expand Up @@ -211,7 +211,7 @@ func (s *statesInformer) reportNodeTopology() {
s.updateNodeTopo(topology)
topology.Annotations[extension.AnnotationNodeCPUTopology] = string(cpuTopologyJson)
topology.Annotations[extension.AnnotationNodeCPUSharedPools] = string(cpuSharePoolsJson)
topology.Annotations[extension.AnnotationNodeCPUManagerPolicy] = string(cpuManagerPolicyJson)
topology.Annotations[extension.AnnotationKubeletCPUManagerPolicy] = string(cpuManagerPolicyJson)
if len(podAllocsJson) != 0 {
topology.Annotations[extension.AnnotationNodeCPUAllocs] = string(podAllocsJson)
}
Expand Down
23 changes: 19 additions & 4 deletions pkg/scheduler/plugins/nodenumaresource/node_numa_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type nodeNUMAInfo struct {
cpuTopology *CPUTopology
allocatedPods map[types.UID]struct{}
allocatedCPUs CPUDetails
*extension.KubeletCPUManagerPolicy
}

type nodeNumaInfoCache struct {
Expand All @@ -45,10 +46,11 @@ type nodeNumaInfoCache struct {

func newNodeNUMAInfo(nodeName string, cpuTopology *CPUTopology) *nodeNUMAInfo {
return &nodeNUMAInfo{
nodeName: nodeName,
cpuTopology: cpuTopology,
allocatedPods: map[types.UID]struct{}{},
allocatedCPUs: NewCPUDetails(),
nodeName: nodeName,
cpuTopology: cpuTopology,
allocatedPods: map[types.UID]struct{}{},
allocatedCPUs: NewCPUDetails(),
KubeletCPUManagerPolicy: &extension.KubeletCPUManagerPolicy{},
}
}

Expand Down Expand Up @@ -149,6 +151,11 @@ func (c *nodeNumaInfoCache) setNodeResourceTopology(oldNodeResTopology, nodeResT
if err != nil {
klog.Errorf("Failed to GetPodCPUAllocs from new NodeResourceTopology %s, err: %v", nodeResTopology.Name, err)
}
kubeletCPUManagerPolicy, err := extension.GetKubeletCPUManagerPolicy(nodeResTopology.Annotations)
if err != nil {
klog.Errorf("Failed to GetKubeletCPUManagerPolicy from NodeResourceTopology %s, err: %v", nodeResTopology.Name, err)
}

var oldPodCPUAllocs extension.PodCPUAllocs
if oldNodeResTopology != nil {
oldPodCPUAllocs, err = extension.GetPodCPUAllocs(oldNodeResTopology.Annotations)
Expand All @@ -170,6 +177,7 @@ func (c *nodeNumaInfoCache) setNodeResourceTopology(oldNodeResTopology, nodeResT
numaInfo.lock.Lock()
defer numaInfo.lock.Unlock()
numaInfo.updateCPUTopology(cpuTopology)
numaInfo.updateKubeletCPUManagerPolicy(kubeletCPUManagerPolicy)
numaInfo.releaseCPUsManagedByKubelet(oldPodCPUAllocs)
numaInfo.updateCPUsManagedByKubelet(podCPUAllocs)
}
Expand Down Expand Up @@ -243,6 +251,13 @@ func (n *nodeNUMAInfo) updateCPUTopology(topology *CPUTopology) {
n.cpuTopology = topology
}

func (n *nodeNUMAInfo) updateKubeletCPUManagerPolicy(policy *extension.KubeletCPUManagerPolicy) {
if policy == nil {
policy = &extension.KubeletCPUManagerPolicy{}
}
n.KubeletCPUManagerPolicy = policy
}

func (n *nodeNUMAInfo) updateCPUsManagedByKubelet(podCPUAllocs extension.PodCPUAllocs) {
for _, v := range podCPUAllocs {
if !v.ManagedByKubelet || v.UID == "" || v.CPUSet == "" {
Expand Down
4 changes: 3 additions & 1 deletion pkg/scheduler/plugins/nodenumaresource/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,9 @@ func (p *Plugin) Filter(ctx context.Context, cycleState *framework.CycleState, p
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrInvalidCPUTopology)
}

if node.Labels[extension.LabelNodeCPUBindPolicy] == extension.NodeCPUBindPolicyFullPCPUsOnly {
if node.Labels[extension.LabelNodeCPUBindPolicy] == extension.NodeCPUBindPolicyFullPCPUsOnly ||
(numaInfo.KubeletCPUManagerPolicy.Policy == extension.KubeletCPUManagerPolicyStatic &&
numaInfo.KubeletCPUManagerPolicy.Options[extension.KubeletCPUManagerPolicyFullPCPUsOnlyOption] == "true") {
if state.numCPUsNeeded%numaInfo.cpuTopology.CPUsPerCore() != 0 {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrSMTAlignmentError)
}
Expand Down
52 changes: 46 additions & 6 deletions pkg/scheduler/plugins/nodenumaresource/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,12 +416,13 @@ func TestPlugin_PreFilter(t *testing.T) {

func TestPlugin_Filter(t *testing.T) {
tests := []struct {
name string
nodeLabels map[string]string
state *preFilterState
pod *corev1.Pod
numaInfo *nodeNUMAInfo
want *framework.Status
name string
nodeLabels map[string]string
kubeletPolicy *extension.KubeletCPUManagerPolicy
state *preFilterState
pod *corev1.Pod
numaInfo *nodeNUMAInfo
want *framework.Status
}{
{
name: "error with missing preFilterState",
Expand Down Expand Up @@ -492,6 +493,42 @@ func TestPlugin_Filter(t *testing.T) {
pod: &corev1.Pod{},
want: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrRequiredFullPCPUsPolicy),
},
{
name: "verify Kubelet FullPCPUsOnly with SMTAlignmentError",
state: &preFilterState{
skip: false,
resourceSpec: &extension.ResourceSpec{},
preferredCPUBindPolicy: schedulingconfig.CPUBindPolicyFullPCPUs,
numCPUsNeeded: 5,
},
numaInfo: newNodeNUMAInfo("test-node-1", buildCPUTopologyForTest(2, 1, 4, 2)),
kubeletPolicy: &extension.KubeletCPUManagerPolicy{
Policy: extension.KubeletCPUManagerPolicyStatic,
Options: map[string]string{
extension.KubeletCPUManagerPolicyFullPCPUsOnlyOption: "true",
},
},
pod: &corev1.Pod{},
want: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrSMTAlignmentError),
},
{
name: "verify Kubelet FullPCPUsOnly with RequiredFullPCPUsPolicy",
state: &preFilterState{
skip: false,
resourceSpec: &extension.ResourceSpec{},
preferredCPUBindPolicy: schedulingconfig.CPUBindPolicySpreadByPCPUs,
numCPUsNeeded: 4,
},
numaInfo: newNodeNUMAInfo("test-node-1", buildCPUTopologyForTest(2, 1, 4, 2)),
kubeletPolicy: &extension.KubeletCPUManagerPolicy{
Policy: extension.KubeletCPUManagerPolicyStatic,
Options: map[string]string{
extension.KubeletCPUManagerPolicyFullPCPUsOnlyOption: "true",
},
},
pod: &corev1.Pod{},
want: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrRequiredFullPCPUsPolicy),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -520,6 +557,9 @@ func TestPlugin_Filter(t *testing.T) {

plg := p.(*Plugin)
if tt.numaInfo != nil {
if tt.kubeletPolicy != nil {
tt.numaInfo.KubeletCPUManagerPolicy = tt.kubeletPolicy
}
plg.nodeInfoCache.nodes[tt.numaInfo.nodeName] = tt.numaInfo
}

Expand Down