Skip to content

Commit

Permalink
Merge pull request kubernetes#111278 from arpitsardhana/master
Browse files Browse the repository at this point in the history
KEP-3327: Add CPUManager policy option to align CPUs by Socket instead of by NUMA node
  • Loading branch information
k8s-ci-robot committed Aug 2, 2022
2 parents 448e48b + d92fd83 commit 9fb1f67
Show file tree
Hide file tree
Showing 9 changed files with 425 additions and 8 deletions.
27 changes: 27 additions & 0 deletions pkg/kubelet/cm/cpumanager/policy_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,20 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
)

const (
FullPCPUsOnlyOption string = "full-pcpus-only"
DistributeCPUsAcrossNUMAOption string = "distribute-cpus-across-numa"
AlignBySocketOption string = "align-by-socket"
)

var (
alphaOptions = sets.NewString(
DistributeCPUsAcrossNUMAOption,
AlignBySocketOption,
)
betaOptions = sets.NewString(
FullPCPUsOnlyOption,
Expand Down Expand Up @@ -69,6 +73,9 @@ type StaticPolicyOptions struct {
// Flag to evenly distribute CPUs across NUMA nodes in cases where more
// than one NUMA node is required to satisfy the allocation.
DistributeCPUsAcrossNUMA bool
// Flag to ensure CPUs are considered aligned at socket boundary rather than
// NUMA boundary
AlignBySocket bool
}

func NewStaticPolicyOptions(policyOptions map[string]string) (StaticPolicyOptions, error) {
Expand All @@ -91,6 +98,12 @@ func NewStaticPolicyOptions(policyOptions map[string]string) (StaticPolicyOption
return opts, fmt.Errorf("bad value for option %q: %w", name, err)
}
opts.DistributeCPUsAcrossNUMA = optValue
case AlignBySocketOption:
optValue, err := strconv.ParseBool(value)
if err != nil {
return opts, fmt.Errorf("bad value for option %q: %w", name, err)
}
opts.AlignBySocket = optValue
default:
// this should never be reached, we already detect unknown options,
// but we keep it as further safety.
Expand All @@ -99,3 +112,17 @@ func NewStaticPolicyOptions(policyOptions map[string]string) (StaticPolicyOption
}
return opts, nil
}

func ValidateStaticPolicyOptions(opts StaticPolicyOptions, topology *topology.CPUTopology, topologyManager topologymanager.Store) error {
if opts.AlignBySocket {
// Not compatible with topology manager single-numa-node policy option.
if topologyManager.GetPolicy().Name() == topologymanager.PolicySingleNumaNode {
return fmt.Errorf("Topolgy manager %s policy is incompatible with CPUManager %s policy option", topologymanager.PolicySingleNumaNode, AlignBySocketOption)
}
// Not compatible with topology when number of sockets are more than number of NUMA nodes.
if topology.NumSockets > topology.NumNUMANodes {
return fmt.Errorf("Align by socket is not compatible with hardware where number of sockets are more than number of NUMA")
}
}
return nil
}
87 changes: 86 additions & 1 deletion pkg/kubelet/cm/cpumanager/policy_options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"k8s.io/component-base/featuregate"
featuregatetesting "k8s.io/component-base/featuregate/testing"
pkgfeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
)

type optionAvailTest struct {
Expand Down Expand Up @@ -54,7 +56,7 @@ func TestPolicyDefaultsAvailable(t *testing.T) {
}
}

func TestPolicyBetaOptionsAvailable(t *testing.T) {
func TestPolicyOptionsAvailable(t *testing.T) {
testCases := []optionAvailTest{
{
option: "this-option-does-not-exist",
Expand All @@ -80,6 +82,18 @@ func TestPolicyBetaOptionsAvailable(t *testing.T) {
featureGateEnable: false,
expectedAvailable: false,
},
{
option: AlignBySocketOption,
featureGate: pkgfeatures.CPUManagerPolicyAlphaOptions,
featureGateEnable: true,
expectedAvailable: true,
},
{
option: AlignBySocketOption,
featureGate: pkgfeatures.CPUManagerPolicyBetaOptions,
featureGateEnable: true,
expectedAvailable: false,
},
}
for _, testCase := range testCases {
t.Run(testCase.option, func(t *testing.T) {
Expand All @@ -92,3 +106,74 @@ func TestPolicyBetaOptionsAvailable(t *testing.T) {
})
}
}

func TestValidateStaticPolicyOptions(t *testing.T) {
testCases := []struct {
description string
policyOption map[string]string
topology *topology.CPUTopology
topoMgrPolicy string
expectedErr bool
}{
{
description: "Align by socket not enabled",
policyOption: map[string]string{FullPCPUsOnlyOption: "true"},
topology: topoDualSocketMultiNumaPerSocketHT,
topoMgrPolicy: topologymanager.PolicySingleNumaNode,
expectedErr: false,
},
{
description: "Align by socket enabled with topology manager single numa node",
policyOption: map[string]string{AlignBySocketOption: "true"},
topology: topoDualSocketMultiNumaPerSocketHT,
topoMgrPolicy: topologymanager.PolicySingleNumaNode,
expectedErr: true,
},
{
description: "Align by socket enabled with num_sockets > num_numa",
policyOption: map[string]string{AlignBySocketOption: "true"},
topology: fakeTopoMultiSocketDualSocketPerNumaHT,
topoMgrPolicy: topologymanager.PolicyNone,
expectedErr: true,
},
{
description: "Align by socket enabled: with topology manager None policy",
policyOption: map[string]string{AlignBySocketOption: "true"},
topology: topoDualSocketMultiNumaPerSocketHT,
topoMgrPolicy: topologymanager.PolicyNone,
expectedErr: false,
},
{
description: "Align by socket enabled: with topology manager best-effort policy",
policyOption: map[string]string{AlignBySocketOption: "true"},
topology: topoDualSocketMultiNumaPerSocketHT,
topoMgrPolicy: topologymanager.PolicyBestEffort,
expectedErr: false,
},
{
description: "Align by socket enabled: with topology manager restricted policy",
policyOption: map[string]string{AlignBySocketOption: "true"},
topology: topoDualSocketMultiNumaPerSocketHT,
topoMgrPolicy: topologymanager.PolicyRestricted,
expectedErr: false,
},
}
for _, testCase := range testCases {
t.Run(testCase.description, func(t *testing.T) {
topoMgrPolicy := topologymanager.NewNonePolicy()
if testCase.topoMgrPolicy == topologymanager.PolicySingleNumaNode {
topoMgrPolicy = topologymanager.NewSingleNumaNodePolicy(nil)

}
topoMgrStore := topologymanager.NewFakeManagerWithPolicy(topoMgrPolicy)

defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, pkgfeatures.CPUManagerPolicyAlphaOptions, true)()
policyOpt, _ := NewStaticPolicyOptions(testCase.policyOption)
err := ValidateStaticPolicyOptions(policyOpt, testCase.topology, topoMgrStore)
gotError := (err != nil)
if gotError != testCase.expectedErr {
t.Errorf("testCase %q failed, got %v expected %v", testCase.description, gotError, testCase.expectedErr)
}
})
}
}
49 changes: 45 additions & 4 deletions pkg/kubelet/cm/cpumanager/policy_static.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv
if err != nil {
return nil, err
}
err = ValidateStaticPolicyOptions(opts, topology, affinity)
if err != nil {
return nil, err
}

klog.InfoS("Static policy created with configuration", "options", opts)

Expand Down Expand Up @@ -325,10 +329,7 @@ func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bit
// If there are aligned CPUs in numaAffinity, attempt to take those first.
result := cpuset.NewCPUSet()
if numaAffinity != nil {
alignedCPUs := cpuset.NewCPUSet()
for _, numaNodeID := range numaAffinity.GetBits() {
alignedCPUs = alignedCPUs.Union(allocatableCPUs.Intersection(p.topology.CPUDetails.CPUsInNUMANodes(numaNodeID)))
}
alignedCPUs := p.getAlignedCPUs(numaAffinity, allocatableCPUs)

numAlignedToAlloc := alignedCPUs.Size()
if numCPUs < numAlignedToAlloc {
Expand Down Expand Up @@ -571,10 +572,50 @@ func (p *staticPolicy) generateCPUTopologyHints(availableCPUs cpuset.CPUSet, reu
// to the minAffinitySize. Only those with an equal number of bits set (and
// with a minimal set of numa nodes) will be considered preferred.
for i := range hints {
if p.options.AlignBySocket && p.isHintSocketAligned(hints[i], minAffinitySize) {
hints[i].Preferred = true
continue
}
if hints[i].NUMANodeAffinity.Count() == minAffinitySize {
hints[i].Preferred = true
}
}

return hints
}

// isHintSocketAligned function return true if numa nodes in hint are socket aligned.
func (p *staticPolicy) isHintSocketAligned(hint topologymanager.TopologyHint, minAffinitySize int) bool {
numaNodesBitMask := hint.NUMANodeAffinity.GetBits()
numaNodesPerSocket := p.topology.NumNUMANodes / p.topology.NumSockets
if numaNodesPerSocket == 0 {
return false
}
// minSockets refers to minimum number of socket required to satify allocation.
// A hint is considered socket aligned if sockets across which numa nodes span is equal to minSockets
minSockets := (minAffinitySize + numaNodesPerSocket - 1) / numaNodesPerSocket
return p.topology.CPUDetails.SocketsInNUMANodes(numaNodesBitMask...).Size() == minSockets
}

// getAlignedCPUs return set of aligned CPUs based on numa affinity mask and configured policy options.
func (p *staticPolicy) getAlignedCPUs(numaAffinity bitmask.BitMask, allocatableCPUs cpuset.CPUSet) cpuset.CPUSet {
alignedCPUs := cpuset.NewCPUSet()
numaBits := numaAffinity.GetBits()

// If align-by-socket policy option is enabled, NUMA based hint is expanded to
// socket aligned hint. It will ensure that first socket aligned available CPUs are
// allocated before we try to find CPUs across socket to satisfy allocation request.
if p.options.AlignBySocket {
socketBits := p.topology.CPUDetails.SocketsInNUMANodes(numaBits...).ToSliceNoSort()
for _, socketID := range socketBits {
alignedCPUs = alignedCPUs.Union(allocatableCPUs.Intersection(p.topology.CPUDetails.CPUsInSockets(socketID)))
}
return alignedCPUs
}

for _, numaNodeID := range numaBits {
alignedCPUs = alignedCPUs.Union(allocatableCPUs.Intersection(p.topology.CPUDetails.CPUsInNUMANodes(numaNodeID)))
}

return alignedCPUs
}
55 changes: 54 additions & 1 deletion pkg/kubelet/cm/cpumanager/policy_static_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (
"testing"

v1 "k8s.io/api/core/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
pkgfeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
Expand All @@ -39,6 +42,7 @@ type staticPolicyTest struct {
stAssignments state.ContainerCPUAssignments
stDefaultCPUSet cpuset.CPUSet
pod *v1.Pod
topologyHint *topologymanager.TopologyHint
expErr error
expCPUAlloc bool
expCSet cpuset.CPUSet
Expand Down Expand Up @@ -190,6 +194,7 @@ func TestStaticPolicyAdd(t *testing.T) {

// these are the cases which must behave the same regardless the policy options.
// So we will permutate the options to ensure this holds true.

optionsInsensitiveTestCases := []staticPolicyTest{
{
description: "GuPodSingleCore, SingleSocketHT, ExpectError",
Expand Down Expand Up @@ -493,6 +498,42 @@ func TestStaticPolicyAdd(t *testing.T) {
expCSet: cpuset.NewCPUSet(),
},
}
newNUMAAffinity := func(bits ...int) bitmask.BitMask {
affinity, _ := bitmask.NewBitMask(bits...)
return affinity
}
alignBySocketOptionTestCases := []staticPolicyTest{
{
description: "Align by socket: true, cpu's within same socket of numa in hint are part of allocation",
topo: topoDualSocketMultiNumaPerSocketHT,
options: map[string]string{
AlignBySocketOption: "true",
},
numReservedCPUs: 1,
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(2, 11, 21, 22),
pod: makePod("fakePod", "fakeContainer2", "2000m", "2000m"),
topologyHint: &topologymanager.TopologyHint{NUMANodeAffinity: newNUMAAffinity(0, 2), Preferred: true},
expErr: nil,
expCPUAlloc: true,
expCSet: cpuset.NewCPUSet(2, 11),
},
{
description: "Align by socket: false, cpu's are taken strictly from NUMA nodes in hint",
topo: topoDualSocketMultiNumaPerSocketHT,
options: map[string]string{
AlignBySocketOption: "false",
},
numReservedCPUs: 1,
stAssignments: state.ContainerCPUAssignments{},
stDefaultCPUSet: cpuset.NewCPUSet(2, 11, 21, 22),
pod: makePod("fakePod", "fakeContainer2", "2000m", "2000m"),
topologyHint: &topologymanager.TopologyHint{NUMANodeAffinity: newNUMAAffinity(0, 2), Preferred: true},
expErr: nil,
expCPUAlloc: true,
expCSet: cpuset.NewCPUSet(2, 21),
},
}

for _, testCase := range optionsInsensitiveTestCases {
for _, options := range []map[string]string{
Expand All @@ -514,10 +555,17 @@ func TestStaticPolicyAdd(t *testing.T) {
for _, testCase := range smtalignOptionTestCases {
runStaticPolicyTestCase(t, testCase)
}
for _, testCase := range alignBySocketOptionTestCases {
runStaticPolicyTestCaseWithFeatureGate(t, testCase)
}
}

func runStaticPolicyTestCase(t *testing.T, testCase staticPolicyTest) {
policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager(), testCase.options)
tm := topologymanager.NewFakeManager()
if testCase.topologyHint != nil {
tm = topologymanager.NewFakeManagerWithHint(testCase.topologyHint)
}
policy, _ := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), tm, testCase.options)

st := &mockState{
assignments: testCase.stAssignments,
Expand Down Expand Up @@ -558,6 +606,11 @@ func runStaticPolicyTestCase(t *testing.T, testCase staticPolicyTest) {
}
}

func runStaticPolicyTestCaseWithFeatureGate(t *testing.T, testCase staticPolicyTest) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, pkgfeatures.CPUManagerPolicyAlphaOptions, true)()
runStaticPolicyTestCase(t, testCase)
}

func TestStaticPolicyReuseCPUs(t *testing.T) {
testCases := []struct {
staticPolicyTest
Expand Down

0 comments on commit 9fb1f67

Please sign in to comment.