diff --git a/apis/extension/node.go b/apis/extension/node.go index 6f2a7225c..aae8af659 100644 --- a/apis/extension/node.go +++ b/apis/extension/node.go @@ -32,8 +32,6 @@ const ( // AnnotationNodeCPUSharedPools describes the CPU Shared Pool defined by Koordinator. // The shared pool is mainly used by Koordinator LS Pods or K8s Burstable Pods. AnnotationNodeCPUSharedPools = NodeDomainPrefix + "/cpu-shared-pools" - // AnnotationNodeCPUManagerPolicy describes the cpu manager policy options of kubelet - AnnotationNodeCPUManagerPolicy = NodeDomainPrefix + "/cpu-manager-policy" // LabelNodeCPUBindPolicy constrains how to bind CPU logical CPUs when scheduling. LabelNodeCPUBindPolicy = NodeDomainPrefix + "/cpu-bind-policy" @@ -42,9 +40,6 @@ const ( ) const ( - // cpu manager policy options of kubelet - NodeCPUManagerPolicyStatic = "static" - NodeCPUManagerPolicyNone = "none" // NodeCPUBindPolicyFullPCPUsOnly requires that the scheduler must allocate full physical cores. // Equivalent to kubelet CPU manager policy option full-pcpus-only=true. NodeCPUBindPolicyFullPCPUsOnly = "FullPCPUsOnly" @@ -55,6 +50,16 @@ const ( NodeNUMAAllocateStrategyMostAllocated = string(schedulingconfig.NUMAMostAllocated) ) +const ( + // AnnotationKubeletCPUManagerPolicy describes the cpu manager policy options of kubelet + AnnotationKubeletCPUManagerPolicy = "kubelet.koordinator.sh/cpu-manager-policy" + + KubeletCPUManagerPolicyStatic = "static" + KubeletCPUManagerPolicyNone = "none" + KubeletCPUManagerPolicyFullPCPUsOnlyOption = "full-pcpus-only" + KubeletCPUManagerPolicyDistributeCPUsAcrossNUMAOption = "distribute-cpus-across-numa" +) + type CPUTopology struct { Detail []CPUInfo `json:"detail,omitempty"` } @@ -76,7 +81,7 @@ type PodCPUAlloc struct { type PodCPUAllocs []PodCPUAlloc -type CPUManagerPolicy struct { +type KubeletCPUManagerPolicy struct { Policy string `json:"policy,omitempty"` Options map[string]string `json:"options,omitempty"` } @@ -120,9 +125,9 @@ func GetNodeCPUSharePools(nodeTopoAnnotations map[string]string) ([]CPUSharedPoo return cpuSharePools, nil } -func GetCPUManagerPolicy(annotations map[string]string) (*CPUManagerPolicy, error) { - cpuManagerPolicy := &CPUManagerPolicy{} - data, ok := annotations[AnnotationNodeCPUManagerPolicy] +func GetKubeletCPUManagerPolicy(annotations map[string]string) (*KubeletCPUManagerPolicy, error) { + cpuManagerPolicy := &KubeletCPUManagerPolicy{} + data, ok := annotations[AnnotationKubeletCPUManagerPolicy] if !ok { return cpuManagerPolicy, nil } diff --git a/docs/proposals/scheduling/20220530-fine-grained-cpu-orchestration.md b/docs/proposals/scheduling/20220530-fine-grained-cpu-orchestration.md index 5659dd04b..00c697102 100644 --- a/docs/proposals/scheduling/20220530-fine-grained-cpu-orchestration.md +++ b/docs/proposals/scheduling/20220530-fine-grained-cpu-orchestration.md @@ -349,7 +349,7 @@ The koordlet creates or updates NodeResourceTopology periodically. The name of N At present, the NodeResourceTopology lacks some information, and it is temporarily written in the NodeResourceTopology in the form of annotations or labels: -- The annotation `kubelet.koordinator.sh/cpu-manger-policy` describes the kubelet CPU manager policy and options. The scheme is defined as follows: +- The annotation `kubelet.koordinator.sh/cpu-manager-policy` describes the kubelet CPU manager policy and options. The scheme is defined as follows: ```go const ( @@ -423,7 +423,7 @@ apiVersion: topology.node.k8s.io/v1alpha1 kind: NodeResourceTopology metadata: annotations: - kubelet.koordinator.sh/cpu-manger-policy: |- + kubelet.koordinator.sh/cpu-manager-policy: |- { "policy": "static", "options": { diff --git a/pkg/koordlet/runtimehooks/hooks/cpuset/rule.go b/pkg/koordlet/runtimehooks/hooks/cpuset/rule.go index 35640c884..86b1ab0b5 100644 --- a/pkg/koordlet/runtimehooks/hooks/cpuset/rule.go +++ b/pkg/koordlet/runtimehooks/hooks/cpuset/rule.go @@ -22,7 +22,6 @@ import ( "strings" topov1alpha1 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha1" - "github.com/koordinator-sh/koordinator/pkg/util" corev1 "k8s.io/api/core/v1" "k8s.io/klog/v2" "k8s.io/utils/pointer" @@ -30,10 +29,11 @@ import ( ext "github.com/koordinator-sh/koordinator/apis/extension" "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/protocol" "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer" + "github.com/koordinator-sh/koordinator/pkg/util" ) type cpusetRule struct { - kubeletPolicy ext.CPUManagerPolicy + kubeletPolicy ext.KubeletCPUManagerPolicy sharePools []ext.CPUSharedPool } @@ -84,7 +84,7 @@ func (r *cpusetRule) getContainerCPUSet(containerReq *protocol.ContainerRequest) return pointer.String(""), nil } - if r.kubeletPolicy.Policy == ext.NodeCPUManagerPolicyStatic { + if r.kubeletPolicy.Policy == ext.KubeletCPUManagerPolicyStatic { return nil, nil } else { // none policy @@ -102,7 +102,7 @@ func (p *cpusetPlugin) parseRule(nodeTopoIf interface{}) (bool, error) { if err != nil { return false, err } - cpuManagerPolicy, err := ext.GetCPUManagerPolicy(nodeTopo.Annotations) + cpuManagerPolicy, err := ext.GetKubeletCPUManagerPolicy(nodeTopo.Annotations) if err != nil { return false, err } diff --git a/pkg/koordlet/runtimehooks/hooks/cpuset/rule_test.go b/pkg/koordlet/runtimehooks/hooks/cpuset/rule_test.go index 941231c62..8ead3340e 100644 --- a/pkg/koordlet/runtimehooks/hooks/cpuset/rule_test.go +++ b/pkg/koordlet/runtimehooks/hooks/cpuset/rule_test.go @@ -142,7 +142,7 @@ func Test_cpusetRule_getContainerCPUSet(t *testing.T) { { name: "get all share pools for origin burstable pod under none policy", fields: fields{ - kubeletPoicy: ext.NodeCPUManagerPolicyNone, + kubeletPoicy: ext.KubeletCPUManagerPolicyNone, sharePools: []ext.CPUSharedPool{ { Socket: 0, @@ -171,7 +171,7 @@ func Test_cpusetRule_getContainerCPUSet(t *testing.T) { { name: "do nothing for origin burstable pod under static policy", fields: fields{ - kubeletPoicy: ext.NodeCPUManagerPolicyStatic, + kubeletPoicy: ext.KubeletCPUManagerPolicyStatic, sharePools: []ext.CPUSharedPool{ { Socket: 0, @@ -229,7 +229,7 @@ func Test_cpusetRule_getContainerCPUSet(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { r := &cpusetRule{ - kubeletPolicy: ext.CPUManagerPolicy{ + kubeletPolicy: ext.KubeletCPUManagerPolicy{ Policy: tt.fields.kubeletPoicy, }, sharePools: tt.fields.sharePools, @@ -328,7 +328,7 @@ func Test_cpusetPlugin_parseRule(t *testing.T) { } type args struct { nodeTopo *topov1alpha1.NodeResourceTopology - cpuPolicy *ext.CPUManagerPolicy + cpuPolicy *ext.KubeletCPUManagerPolicy sharePools []ext.CPUSharedPool } tests := []struct { @@ -449,8 +449,8 @@ func Test_cpusetPlugin_parseRule(t *testing.T) { Name: "test-node", }, }, - cpuPolicy: &ext.CPUManagerPolicy{ - Policy: ext.NodeCPUManagerPolicyNone, + cpuPolicy: &ext.KubeletCPUManagerPolicy{ + Policy: ext.KubeletCPUManagerPolicyNone, }, sharePools: []ext.CPUSharedPool{ { @@ -467,8 +467,8 @@ func Test_cpusetPlugin_parseRule(t *testing.T) { }, wantUpdated: true, wantRule: &cpusetRule{ - kubeletPolicy: ext.CPUManagerPolicy{ - Policy: ext.NodeCPUManagerPolicyNone, + kubeletPolicy: ext.KubeletCPUManagerPolicy{ + Policy: ext.KubeletCPUManagerPolicyNone, }, sharePools: []ext.CPUSharedPool{ { @@ -496,7 +496,7 @@ func Test_cpusetPlugin_parseRule(t *testing.T) { } if tt.args.cpuPolicy != nil { cpuPolicyJson := util.DumpJSON(tt.args.cpuPolicy) - tt.args.nodeTopo.Annotations[ext.AnnotationNodeCPUManagerPolicy] = cpuPolicyJson + tt.args.nodeTopo.Annotations[ext.AnnotationKubeletCPUManagerPolicy] = cpuPolicyJson } if len(tt.args.sharePools) != 0 { sharePoolJson := util.DumpJSON(tt.args.sharePools) diff --git a/pkg/koordlet/statesinformer/states_noderesourcetopology.go b/pkg/koordlet/statesinformer/states_noderesourcetopology.go index e0a7d293b..927173fc0 100644 --- a/pkg/koordlet/statesinformer/states_noderesourcetopology.go +++ b/pkg/koordlet/statesinformer/states_noderesourcetopology.go @@ -161,7 +161,7 @@ func (s *statesInformer) reportNodeTopology() { if err != nil { klog.Errorf("failed to guess kubelet cpu manager opt, err: %v", err) } - cpuManagerPolicy := extension.CPUManagerPolicy{ + cpuManagerPolicy := extension.KubeletCPUManagerPolicy{ Policy: cpuPolicy, Options: cpuManagerOpt, } @@ -211,7 +211,7 @@ func (s *statesInformer) reportNodeTopology() { s.updateNodeTopo(topology) topology.Annotations[extension.AnnotationNodeCPUTopology] = string(cpuTopologyJson) topology.Annotations[extension.AnnotationNodeCPUSharedPools] = string(cpuSharePoolsJson) - topology.Annotations[extension.AnnotationNodeCPUManagerPolicy] = string(cpuManagerPolicyJson) + topology.Annotations[extension.AnnotationKubeletCPUManagerPolicy] = string(cpuManagerPolicyJson) if len(podAllocsJson) != 0 { topology.Annotations[extension.AnnotationNodeCPUAllocs] = string(podAllocsJson) } diff --git a/pkg/scheduler/plugins/nodenumaresource/node_numa_info.go b/pkg/scheduler/plugins/nodenumaresource/node_numa_info.go index 78bc46985..fda326c47 100644 --- a/pkg/scheduler/plugins/nodenumaresource/node_numa_info.go +++ b/pkg/scheduler/plugins/nodenumaresource/node_numa_info.go @@ -36,6 +36,7 @@ type nodeNUMAInfo struct { cpuTopology *CPUTopology allocatedPods map[types.UID]struct{} allocatedCPUs CPUDetails + *extension.KubeletCPUManagerPolicy } type nodeNumaInfoCache struct { @@ -45,10 +46,11 @@ type nodeNumaInfoCache struct { func newNodeNUMAInfo(nodeName string, cpuTopology *CPUTopology) *nodeNUMAInfo { return &nodeNUMAInfo{ - nodeName: nodeName, - cpuTopology: cpuTopology, - allocatedPods: map[types.UID]struct{}{}, - allocatedCPUs: NewCPUDetails(), + nodeName: nodeName, + cpuTopology: cpuTopology, + allocatedPods: map[types.UID]struct{}{}, + allocatedCPUs: NewCPUDetails(), + KubeletCPUManagerPolicy: &extension.KubeletCPUManagerPolicy{}, } } @@ -149,6 +151,11 @@ func (c *nodeNumaInfoCache) setNodeResourceTopology(oldNodeResTopology, nodeResT if err != nil { klog.Errorf("Failed to GetPodCPUAllocs from new NodeResourceTopology %s, err: %v", nodeResTopology.Name, err) } + kubeletCPUManagerPolicy, err := extension.GetKubeletCPUManagerPolicy(nodeResTopology.Annotations) + if err != nil { + klog.Errorf("Failed to GetKubeletCPUManagerPolicy from NodeResourceTopology %s, err: %v", nodeResTopology.Name, err) + } + var oldPodCPUAllocs extension.PodCPUAllocs if oldNodeResTopology != nil { oldPodCPUAllocs, err = extension.GetPodCPUAllocs(oldNodeResTopology.Annotations) @@ -170,6 +177,7 @@ func (c *nodeNumaInfoCache) setNodeResourceTopology(oldNodeResTopology, nodeResT numaInfo.lock.Lock() defer numaInfo.lock.Unlock() numaInfo.updateCPUTopology(cpuTopology) + numaInfo.updateKubeletCPUManagerPolicy(kubeletCPUManagerPolicy) numaInfo.releaseCPUsManagedByKubelet(oldPodCPUAllocs) numaInfo.updateCPUsManagedByKubelet(podCPUAllocs) } @@ -243,6 +251,13 @@ func (n *nodeNUMAInfo) updateCPUTopology(topology *CPUTopology) { n.cpuTopology = topology } +func (n *nodeNUMAInfo) updateKubeletCPUManagerPolicy(policy *extension.KubeletCPUManagerPolicy) { + if policy == nil { + policy = &extension.KubeletCPUManagerPolicy{} + } + n.KubeletCPUManagerPolicy = policy +} + func (n *nodeNUMAInfo) updateCPUsManagedByKubelet(podCPUAllocs extension.PodCPUAllocs) { for _, v := range podCPUAllocs { if !v.ManagedByKubelet || v.UID == "" || v.CPUSet == "" { diff --git a/pkg/scheduler/plugins/nodenumaresource/plugin.go b/pkg/scheduler/plugins/nodenumaresource/plugin.go index 204cf242c..4377876d7 100644 --- a/pkg/scheduler/plugins/nodenumaresource/plugin.go +++ b/pkg/scheduler/plugins/nodenumaresource/plugin.go @@ -207,7 +207,9 @@ func (p *Plugin) Filter(ctx context.Context, cycleState *framework.CycleState, p return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrInvalidCPUTopology) } - if node.Labels[extension.LabelNodeCPUBindPolicy] == extension.NodeCPUBindPolicyFullPCPUsOnly { + if node.Labels[extension.LabelNodeCPUBindPolicy] == extension.NodeCPUBindPolicyFullPCPUsOnly || + (numaInfo.KubeletCPUManagerPolicy.Policy == extension.KubeletCPUManagerPolicyStatic && + numaInfo.KubeletCPUManagerPolicy.Options[extension.KubeletCPUManagerPolicyFullPCPUsOnlyOption] == "true") { if state.numCPUsNeeded%numaInfo.cpuTopology.CPUsPerCore() != 0 { return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrSMTAlignmentError) } diff --git a/pkg/scheduler/plugins/nodenumaresource/plugin_test.go b/pkg/scheduler/plugins/nodenumaresource/plugin_test.go index 069ee42f7..8707e311f 100644 --- a/pkg/scheduler/plugins/nodenumaresource/plugin_test.go +++ b/pkg/scheduler/plugins/nodenumaresource/plugin_test.go @@ -416,12 +416,13 @@ func TestPlugin_PreFilter(t *testing.T) { func TestPlugin_Filter(t *testing.T) { tests := []struct { - name string - nodeLabels map[string]string - state *preFilterState - pod *corev1.Pod - numaInfo *nodeNUMAInfo - want *framework.Status + name string + nodeLabels map[string]string + kubeletPolicy *extension.KubeletCPUManagerPolicy + state *preFilterState + pod *corev1.Pod + numaInfo *nodeNUMAInfo + want *framework.Status }{ { name: "error with missing preFilterState", @@ -492,6 +493,42 @@ func TestPlugin_Filter(t *testing.T) { pod: &corev1.Pod{}, want: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrRequiredFullPCPUsPolicy), }, + { + name: "verify Kubelet FullPCPUsOnly with SMTAlignmentError", + state: &preFilterState{ + skip: false, + resourceSpec: &extension.ResourceSpec{}, + preferredCPUBindPolicy: schedulingconfig.CPUBindPolicyFullPCPUs, + numCPUsNeeded: 5, + }, + numaInfo: newNodeNUMAInfo("test-node-1", buildCPUTopologyForTest(2, 1, 4, 2)), + kubeletPolicy: &extension.KubeletCPUManagerPolicy{ + Policy: extension.KubeletCPUManagerPolicyStatic, + Options: map[string]string{ + extension.KubeletCPUManagerPolicyFullPCPUsOnlyOption: "true", + }, + }, + pod: &corev1.Pod{}, + want: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrSMTAlignmentError), + }, + { + name: "verify Kubelet FullPCPUsOnly with RequiredFullPCPUsPolicy", + state: &preFilterState{ + skip: false, + resourceSpec: &extension.ResourceSpec{}, + preferredCPUBindPolicy: schedulingconfig.CPUBindPolicySpreadByPCPUs, + numCPUsNeeded: 4, + }, + numaInfo: newNodeNUMAInfo("test-node-1", buildCPUTopologyForTest(2, 1, 4, 2)), + kubeletPolicy: &extension.KubeletCPUManagerPolicy{ + Policy: extension.KubeletCPUManagerPolicyStatic, + Options: map[string]string{ + extension.KubeletCPUManagerPolicyFullPCPUsOnlyOption: "true", + }, + }, + pod: &corev1.Pod{}, + want: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrRequiredFullPCPUsPolicy), + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -520,6 +557,9 @@ func TestPlugin_Filter(t *testing.T) { plg := p.(*Plugin) if tt.numaInfo != nil { + if tt.kubeletPolicy != nil { + tt.numaInfo.KubeletCPUManagerPolicy = tt.kubeletPolicy + } plg.nodeInfoCache.nodes[tt.numaInfo.nodeName] = tt.numaInfo }