diff --git a/apis/extension/node.go b/apis/extension/node.go index 6e1a7bb5f..be16c2247 100644 --- a/apis/extension/node.go +++ b/apis/extension/node.go @@ -20,6 +20,8 @@ import ( "encoding/json" "k8s.io/apimachinery/pkg/types" + + schedulingconfig "github.com/koordinator-sh/koordinator/apis/scheduling/config" ) const ( @@ -30,6 +32,22 @@ const ( // AnnotationNodeCPUSharedPools describes the CPU Shared Pool defined by Koordinator. // The shared pool is mainly used by Koordinator LS Pods or K8s Burstable Pods. AnnotationNodeCPUSharedPools = NodeDomainPrefix + "/cpu-shared-pools" + + // LabelNodeCPUBindPolicy constrains how to bind CPU logical CPUs when scheduling. + LabelNodeCPUBindPolicy = NodeDomainPrefix + "/cpu-bind-policy" + // LabelNodeNUMAAllocateStrategy indicates how to choose satisfied NUMA Nodes when scheduling. + LabelNodeNUMAAllocateStrategy = NodeDomainPrefix + "/numa-allocate-strategy" +) + +const ( + // NodeCPUBindPolicyFullPCPUsOnly requires that the scheduler must allocate full physical cores. + // Equivalent to kubelet CPU manager policy option full-pcpus-only=true. + NodeCPUBindPolicyFullPCPUsOnly = "FullPCPUsOnly" +) + +const ( + NodeNUMAAllocateStrategyLeastAllocated = string(schedulingconfig.NUMALeastAllocated) + NodeNUMAAllocateStrategyMostAllocated = string(schedulingconfig.NUMAMostAllocated) ) type CPUTopology struct { diff --git a/apis/scheduling/config/types.go b/apis/scheduling/config/types.go index 06d2298db..4a83dbd88 100644 --- a/apis/scheduling/config/types.go +++ b/apis/scheduling/config/types.go @@ -73,9 +73,8 @@ type ScoringStrategy struct { type NodeNUMAResourceArgs struct { metav1.TypeMeta - DefaultCPUBindPolicy CPUBindPolicy `json:"defaultCPUBindPolicy,omitempty"` - NUMAAllocateStrategy NUMAAllocateStrategy `json:"numaAllocateStrategy,omitempty"` - ScoringStrategy *ScoringStrategy `json:"scoringStrategy,omitempty"` + DefaultCPUBindPolicy CPUBindPolicy `json:"defaultCPUBindPolicy,omitempty"` + ScoringStrategy *ScoringStrategy `json:"scoringStrategy,omitempty"` } // CPUBindPolicy defines the CPU binding policy diff --git a/apis/scheduling/config/v1beta2/defaults.go b/apis/scheduling/config/v1beta2/defaults.go index 58ad5d865..78f4c0eaa 100644 --- a/apis/scheduling/config/v1beta2/defaults.go +++ b/apis/scheduling/config/v1beta2/defaults.go @@ -41,7 +41,6 @@ var ( } defaultPreferredCPUBindPolicy = CPUBindPolicyFullPCPUs - defaultNUMAAllocateStrategy = NUMAMostAllocated defaultNodeNUMAResourceScoringStrategy = &ScoringStrategy{ Type: MostAllocated, Resources: []schedconfig.ResourceSpec{ @@ -77,9 +76,6 @@ func SetDefaults_NodeNUMAResourceArgs(obj *NodeNUMAResourceArgs) { if obj.DefaultCPUBindPolicy == "" { obj.DefaultCPUBindPolicy = defaultPreferredCPUBindPolicy } - if obj.NUMAAllocateStrategy == "" { - obj.NUMAAllocateStrategy = defaultNUMAAllocateStrategy - } if obj.ScoringStrategy == nil { obj.ScoringStrategy = defaultNodeNUMAResourceScoringStrategy } diff --git a/apis/scheduling/config/v1beta2/types.go b/apis/scheduling/config/v1beta2/types.go index 10c5ac7d3..b446a0a8a 100644 --- a/apis/scheduling/config/v1beta2/types.go +++ b/apis/scheduling/config/v1beta2/types.go @@ -73,9 +73,8 @@ type ScoringStrategy struct { type NodeNUMAResourceArgs struct { metav1.TypeMeta - DefaultCPUBindPolicy CPUBindPolicy `json:"defaultCPUBindPolicy,omitempty"` - NUMAAllocateStrategy NUMAAllocateStrategy `json:"numaAllocateStrategy,omitempty"` - ScoringStrategy *ScoringStrategy `json:"scoringStrategy,omitempty"` + DefaultCPUBindPolicy CPUBindPolicy `json:"defaultCPUBindPolicy,omitempty"` + ScoringStrategy *ScoringStrategy `json:"scoringStrategy,omitempty"` } // CPUBindPolicy defines the CPU binding policy diff --git a/apis/scheduling/config/v1beta2/zz_generated.conversion.go b/apis/scheduling/config/v1beta2/zz_generated.conversion.go index 9b26ed4ff..02f495529 100644 --- a/apis/scheduling/config/v1beta2/zz_generated.conversion.go +++ b/apis/scheduling/config/v1beta2/zz_generated.conversion.go @@ -101,7 +101,6 @@ func Convert_config_LoadAwareSchedulingArgs_To_v1beta2_LoadAwareSchedulingArgs(i func autoConvert_v1beta2_NodeNUMAResourceArgs_To_config_NodeNUMAResourceArgs(in *NodeNUMAResourceArgs, out *config.NodeNUMAResourceArgs, s conversion.Scope) error { out.DefaultCPUBindPolicy = config.CPUBindPolicy(in.DefaultCPUBindPolicy) - out.NUMAAllocateStrategy = config.NUMAAllocateStrategy(in.NUMAAllocateStrategy) out.ScoringStrategy = (*config.ScoringStrategy)(unsafe.Pointer(in.ScoringStrategy)) return nil } @@ -113,7 +112,6 @@ func Convert_v1beta2_NodeNUMAResourceArgs_To_config_NodeNUMAResourceArgs(in *Nod func autoConvert_config_NodeNUMAResourceArgs_To_v1beta2_NodeNUMAResourceArgs(in *config.NodeNUMAResourceArgs, out *NodeNUMAResourceArgs, s conversion.Scope) error { out.DefaultCPUBindPolicy = CPUBindPolicy(in.DefaultCPUBindPolicy) - out.NUMAAllocateStrategy = NUMAAllocateStrategy(in.NUMAAllocateStrategy) out.ScoringStrategy = (*ScoringStrategy)(unsafe.Pointer(in.ScoringStrategy)) return nil } diff --git a/docs/proposals/scheduling/20220530-fine-grained-cpu-orchestration.md b/docs/proposals/scheduling/20220530-fine-grained-cpu-orchestration.md index b2a3a7ea2..bbca4886e 100644 --- a/docs/proposals/scheduling/20220530-fine-grained-cpu-orchestration.md +++ b/docs/proposals/scheduling/20220530-fine-grained-cpu-orchestration.md @@ -648,7 +648,6 @@ type CPUOrchestrationPluginArgs struct { DefaultCPUBindPolicy CPUBindPolicy `json:"defaultCPUBindPolicy,omitempty"` NUMATopologyAlignmentPolicy NUMATopologyAlignmentPolicy `json:"numaTopologyAlignmentPolicy,omitempty"` - NUMAAllocateStrategy NUMAAllocateStrategy `json:"numaAllocateStrategy,omitempty"` ScoringStrategy ScoringStrategy `json:"scoringStrategy,omitempty"` } @@ -678,7 +677,6 @@ type ScoringStrategy struct { - `DefaultCPUBindPolicy` represents the default bind policy. If not set, use `FullPCPUs` as default value. - `NUMATopologyAlignmentPolicy` represents the default NUMA topology alignment policy, If not set, use `BestEffort` as default value. -- `NUMAAllocateStrategy` represents the default NUMA allocate strategy. If not set, use `MostAllocated` as default value. - `ScoringStrategy` represents the node resource scoring strategy. If not set, use `MostAllocated` as default value. ## Alternatives diff --git a/pkg/scheduler/plugins/nodenumaresource/cpu_allocator.go b/pkg/scheduler/plugins/nodenumaresource/cpu_allocator.go index 2d44dfb8c..3079e780e 100644 --- a/pkg/scheduler/plugins/nodenumaresource/cpu_allocator.go +++ b/pkg/scheduler/plugins/nodenumaresource/cpu_allocator.go @@ -201,7 +201,7 @@ func newCPUAccumulator( allocatedCPUs CPUDetails, numCPUsNeeded int, exclusivePolicy schedulingconfig.CPUExclusivePolicy, - numaSortStrategy schedulingconfig.NUMAAllocateStrategy, + numaAllocateStrategy schedulingconfig.NUMAAllocateStrategy, ) *cpuAccumulator { exclusiveInCores := sets.NewInt() exclusiveInNUMANodes := sets.NewInt() @@ -225,7 +225,7 @@ func newCPUAccumulator( exclusive: exclusive, exclusivePolicy: exclusivePolicy, numCPUsNeeded: numCPUsNeeded, - numaAllocateStrategy: numaSortStrategy, + numaAllocateStrategy: numaAllocateStrategy, result: NewCPUSet(), } } diff --git a/pkg/scheduler/plugins/nodenumaresource/plugin.go b/pkg/scheduler/plugins/nodenumaresource/plugin.go index 442c66a23..78a0ab9e1 100644 --- a/pkg/scheduler/plugins/nodenumaresource/plugin.go +++ b/pkg/scheduler/plugins/nodenumaresource/plugin.go @@ -53,6 +53,8 @@ const ( const ( ErrMissingNodeResourceTopology = "node(s) missing NodeResourceTopology" ErrInvalidCPUTopology = "node(s) invalid CPU Topology" + ErrSMTAlignmentError = "node(s) requested cpus not multiple cpus per core" + ErrRequiredFullPCPUsPolicy = "node(s) required FullPCPUs policy" ) var ( @@ -201,6 +203,15 @@ func (p *Plugin) Filter(ctx context.Context, cycleState *framework.CycleState, p return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrInvalidCPUTopology) } + if node.Labels[extension.LabelNodeCPUBindPolicy] == extension.NodeCPUBindPolicyFullPCPUsOnly { + if state.numCPUsNeeded%numaInfo.cpuTopology.CPUsPerCore() != 0 { + return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrSMTAlignmentError) + } + if state.preferredCPUBindPolicy != schedulingconfig.CPUBindPolicyFullPCPUs { + return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrRequiredFullPCPUsPolicy) + } + } + return nil } @@ -234,11 +245,23 @@ func (p *Plugin) Score(ctx context.Context, cycleState *framework.CycleState, po return 0, nil } - score := p.calcScore(state.numCPUsNeeded, state.preferredCPUBindPolicy, state.preferredCPUExclusivePolicy, numaInfo) + numaAllocateStrategy := p.getNUMAAllocateStrategy(node) + score := p.calcScore(numaInfo, state.numCPUsNeeded, state.preferredCPUBindPolicy, state.preferredCPUExclusivePolicy, numaAllocateStrategy) return score, nil } -func (p *Plugin) calcScore(numCPUsNeeded int, cpuBindPolicy schedulingconfig.CPUBindPolicy, cpuExclusivePolicy schedulingconfig.CPUExclusivePolicy, numaInfo *nodeNUMAInfo) int64 { +func (p *Plugin) getNUMAAllocateStrategy(node *corev1.Node) schedulingconfig.NUMAAllocateStrategy { + numaAllocateStrategy := schedulingconfig.NUMAMostAllocated + if p.pluginArgs.ScoringStrategy != nil && p.pluginArgs.ScoringStrategy.Type == schedulingconfig.LeastAllocated { + numaAllocateStrategy = schedulingconfig.NUMALeastAllocated + } + if val := schedulingconfig.NUMAAllocateStrategy(node.Labels[extension.LabelNodeNUMAAllocateStrategy]); val != "" { + numaAllocateStrategy = val + } + return numaAllocateStrategy +} + +func (p *Plugin) calcScore(numaInfo *nodeNUMAInfo, numCPUsNeeded int, cpuBindPolicy schedulingconfig.CPUBindPolicy, cpuExclusivePolicy schedulingconfig.CPUExclusivePolicy, numaAllocateStrategy schedulingconfig.NUMAAllocateStrategy) int64 { availableCPUs, allocated := getAvailableCPUsFunc(numaInfo) acc := newCPUAccumulator( numaInfo.cpuTopology, @@ -246,7 +269,7 @@ func (p *Plugin) calcScore(numCPUsNeeded int, cpuBindPolicy schedulingconfig.CPU allocated, numCPUsNeeded, cpuExclusivePolicy, - p.pluginArgs.NUMAAllocateStrategy, + numaAllocateStrategy, ) var freeCPUs [][]int @@ -265,7 +288,7 @@ func (p *Plugin) calcScore(numCPUsNeeded int, cpuBindPolicy schedulingconfig.CPU } scoreFn := mostRequestedScore - if p.pluginArgs.ScoringStrategy != nil && p.pluginArgs.ScoringStrategy.Type == schedulingconfig.LeastAllocated { + if numaAllocateStrategy == schedulingconfig.NUMALeastAllocated { scoreFn = leastRequestedScore } @@ -335,6 +358,15 @@ func (p *Plugin) Reserve(ctx context.Context, cycleState *framework.CycleState, return nil } + nodeInfo, err := p.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) + if err != nil { + return framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err)) + } + node := nodeInfo.Node() + if node == nil { + return framework.NewStatus(framework.Error, "node not found") + } + // The Pod requires the CPU to be allocated according to CPUBindPolicy, // but the current node does not have a NodeResourceTopology or a valid CPUTopology, // so this error should be exposed to the user @@ -350,6 +382,7 @@ func (p *Plugin) Reserve(ctx context.Context, cycleState *framework.CycleState, } availableCPUs, allocated := getAvailableCPUsFunc(numaInfo) + numaAllocateStrategy := p.getNUMAAllocateStrategy(node) result, err := takeCPUs( numaInfo.cpuTopology, availableCPUs, @@ -357,7 +390,7 @@ func (p *Plugin) Reserve(ctx context.Context, cycleState *framework.CycleState, state.numCPUsNeeded, state.preferredCPUBindPolicy, state.resourceSpec.PreferredCPUExclusivePolicy, - p.pluginArgs.NUMAAllocateStrategy, + numaAllocateStrategy, ) if err != nil { return framework.NewStatus(framework.Error, err.Error()) diff --git a/pkg/scheduler/plugins/nodenumaresource/plugin_test.go b/pkg/scheduler/plugins/nodenumaresource/plugin_test.go index e2fb3a6cc..069ee42f7 100644 --- a/pkg/scheduler/plugins/nodenumaresource/plugin_test.go +++ b/pkg/scheduler/plugins/nodenumaresource/plugin_test.go @@ -416,11 +416,12 @@ func TestPlugin_PreFilter(t *testing.T) { func TestPlugin_Filter(t *testing.T) { tests := []struct { - name string - state *preFilterState - pod *corev1.Pod - numaInfo *nodeNUMAInfo - want *framework.Status + name string + nodeLabels map[string]string + state *preFilterState + pod *corev1.Pod + numaInfo *nodeNUMAInfo + want *framework.Status }{ { name: "error with missing preFilterState", @@ -461,13 +462,44 @@ func TestPlugin_Filter(t *testing.T) { pod: &corev1.Pod{}, want: nil, }, + { + name: "verify FullPCPUsOnly with SMTAlignmentError", + nodeLabels: map[string]string{ + extension.LabelNodeCPUBindPolicy: extension.NodeCPUBindPolicyFullPCPUsOnly, + }, + state: &preFilterState{ + skip: false, + resourceSpec: &extension.ResourceSpec{}, + preferredCPUBindPolicy: schedulingconfig.CPUBindPolicyFullPCPUs, + numCPUsNeeded: 5, + }, + numaInfo: newNodeNUMAInfo("test-node-1", buildCPUTopologyForTest(2, 1, 4, 2)), + pod: &corev1.Pod{}, + want: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrSMTAlignmentError), + }, + { + name: "verify FullPCPUsOnly with RequiredFullPCPUsPolicy", + nodeLabels: map[string]string{ + extension.LabelNodeCPUBindPolicy: extension.NodeCPUBindPolicyFullPCPUsOnly, + }, + state: &preFilterState{ + skip: false, + resourceSpec: &extension.ResourceSpec{}, + preferredCPUBindPolicy: schedulingconfig.CPUBindPolicySpreadByPCPUs, + numCPUsNeeded: 4, + }, + numaInfo: newNodeNUMAInfo("test-node-1", buildCPUTopologyForTest(2, 1, 4, 2)), + pod: &corev1.Pod{}, + want: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrRequiredFullPCPUsPolicy), + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { nodes := []*corev1.Node{ { ObjectMeta: metav1.ObjectMeta{ - Name: "test-node-1", + Name: "test-node-1", + Labels: map[string]string{}, }, Status: corev1.NodeStatus{ Allocatable: corev1.ResourceList{ @@ -477,6 +509,10 @@ func TestPlugin_Filter(t *testing.T) { }, }, } + for k, v := range tt.nodeLabels { + nodes[0].Labels[k] = v + } + suit := newPluginTestSuit(t, nodes) p, err := suit.proxyNew(suit.nodeNUMAResourceArgs, suit.Handle) assert.NotNil(t, p) @@ -507,12 +543,13 @@ func TestPlugin_Filter(t *testing.T) { func TestPlugin_Score(t *testing.T) { tests := []struct { - name string - state *preFilterState - pod *corev1.Pod - numaInfo *nodeNUMAInfo - want *framework.Status - wantScore int64 + name string + nodeLabels map[string]string + state *preFilterState + pod *corev1.Pod + numaInfo *nodeNUMAInfo + want *framework.Status + wantScore int64 }{ { name: "error with missing preFilterState", @@ -637,6 +674,24 @@ func TestPlugin_Score(t *testing.T) { want: nil, wantScore: 100, }, + { + name: "score with Node NUMA Allocate Strategy", + nodeLabels: map[string]string{ + extension.LabelNodeNUMAAllocateStrategy: extension.NodeNUMAAllocateStrategyLeastAllocated, + }, + state: &preFilterState{ + skip: false, + resourceSpec: &extension.ResourceSpec{ + PreferredCPUBindPolicy: extension.CPUBindPolicySpreadByPCPUs, + }, + preferredCPUBindPolicy: schedulingconfig.CPUBindPolicySpreadByPCPUs, + numCPUsNeeded: 2, + }, + numaInfo: newNodeNUMAInfo("test-node-1", buildCPUTopologyForTest(2, 1, 4, 2)), + pod: &corev1.Pod{}, + want: nil, + wantScore: 50, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -644,7 +699,8 @@ func TestPlugin_Score(t *testing.T) { nodes := []*corev1.Node{ { ObjectMeta: metav1.ObjectMeta{ - Name: "test-node-1", + Name: "test-node-1", + Labels: map[string]string{}, }, Status: corev1.NodeStatus{ Allocatable: corev1.ResourceList{ @@ -654,6 +710,10 @@ func TestPlugin_Score(t *testing.T) { }, }, } + for k, v := range tt.nodeLabels { + nodes[0].Labels[k] = v + } + suit := newPluginTestSuit(t, nodes) p, err := suit.proxyNew(suit.nodeNUMAResourceArgs, suit.Handle) assert.NotNil(t, p) @@ -689,12 +749,14 @@ func TestPlugin_Score(t *testing.T) { func TestPlugin_Reserve(t *testing.T) { tests := []struct { - name string - state *preFilterState - pod *corev1.Pod - numaInfo *nodeNUMAInfo - want *framework.Status - wantCPUSet CPUSet + name string + nodeLabels map[string]string + state *preFilterState + pod *corev1.Pod + numaInfo *nodeNUMAInfo + allocatedCPUs []int + want *framework.Status + wantCPUSet CPUSet }{ { name: "error with missing preFilterState", @@ -754,13 +816,52 @@ func TestPlugin_Reserve(t *testing.T) { pod: &corev1.Pod{}, want: framework.NewStatus(framework.Error, "not enough cpus available to satisfy request"), }, + { + name: "succeed with valid cpu topology and node numa least allocate strategy", + nodeLabels: map[string]string{ + extension.LabelNodeNUMAAllocateStrategy: extension.NodeNUMAAllocateStrategyLeastAllocated, + }, + state: &preFilterState{ + skip: false, + numCPUsNeeded: 4, + resourceSpec: &extension.ResourceSpec{ + PreferredCPUBindPolicy: extension.CPUBindPolicyFullPCPUs, + }, + preferredCPUBindPolicy: schedulingconfig.CPUBindPolicyFullPCPUs, + }, + numaInfo: newNodeNUMAInfo("test-node-1", buildCPUTopologyForTest(2, 1, 8, 2)), + allocatedCPUs: []int{0, 1, 2, 3}, + pod: &corev1.Pod{}, + want: nil, + wantCPUSet: NewCPUSet(16, 17, 18, 19), + }, + { + name: "succeed with valid cpu topology and node numa most allocate strategy", + nodeLabels: map[string]string{ + extension.LabelNodeNUMAAllocateStrategy: extension.NodeNUMAAllocateStrategyMostAllocated, + }, + state: &preFilterState{ + skip: false, + numCPUsNeeded: 4, + resourceSpec: &extension.ResourceSpec{ + PreferredCPUBindPolicy: extension.CPUBindPolicyFullPCPUs, + }, + preferredCPUBindPolicy: schedulingconfig.CPUBindPolicyFullPCPUs, + }, + numaInfo: newNodeNUMAInfo("test-node-1", buildCPUTopologyForTest(2, 1, 8, 2)), + allocatedCPUs: []int{0, 1, 2, 3}, + pod: &corev1.Pod{}, + want: nil, + wantCPUSet: NewCPUSet(4, 5, 6, 7), + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { nodes := []*corev1.Node{ { ObjectMeta: metav1.ObjectMeta{ - Name: "test-node-1", + Name: "test-node-1", + Labels: map[string]string{}, }, Status: corev1.NodeStatus{ Allocatable: corev1.ResourceList{ @@ -770,6 +871,10 @@ func TestPlugin_Reserve(t *testing.T) { }, }, } + for k, v := range tt.nodeLabels { + nodes[0].Labels[k] = v + } + suit := newPluginTestSuit(t, nodes) p, err := suit.proxyNew(suit.nodeNUMAResourceArgs, suit.Handle) assert.NotNil(t, p) @@ -777,6 +882,9 @@ func TestPlugin_Reserve(t *testing.T) { plg := p.(*Plugin) if tt.numaInfo != nil { + if len(tt.allocatedCPUs) > 0 { + tt.numaInfo.allocateCPUs(uuid.NewUUID(), NewCPUSet(tt.allocatedCPUs...), schedulingconfig.CPUExclusivePolicyNone) + } plg.nodeInfoCache.nodes[tt.numaInfo.nodeName] = tt.numaInfo }