Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

koord-scheduler: support CPU exclusive policy #359

Merged
merged 1 commit into from
Jul 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions apis/extension/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ var (
type ResourceSpec struct {
// PreferredCPUBindPolicy represents best-effort CPU bind policy.
PreferredCPUBindPolicy CPUBindPolicy `json:"preferredCPUBindPolicy,omitempty"`
// PreferredCPUExclusivePolicy represents best-effort CPU exclusive policy.
PreferredCPUExclusivePolicy CPUExclusivePolicy `json:"preferredCPUExclusivePolicy,omitempty"`
}

// ResourceStatus describes resource allocation result, such as how to bind CPU.
Expand All @@ -74,6 +76,17 @@ const (
CPUBindPolicyConstrainedBurst CPUBindPolicy = schedulingconfig.CPUBindPolicyConstrainedBurst
)

type CPUExclusivePolicy = schedulingconfig.CPUExclusivePolicy

const (
// CPUExclusivePolicyNone does not perform any exclusive policy
CPUExclusivePolicyNone CPUExclusivePolicy = schedulingconfig.CPUExclusivePolicyNone
// CPUExclusivePolicyPCPULevel represents mutual exclusion in the physical core dimension
CPUExclusivePolicyPCPULevel CPUExclusivePolicy = schedulingconfig.CPUExclusivePolicyPCPULevel
// CPUExclusivePolicyNUMANodeLevel indicates mutual exclusion in the NUMA topology dimension
CPUExclusivePolicyNUMANodeLevel CPUExclusivePolicy = schedulingconfig.CPUExclusivePolicyNUMANodeLevel
)

type NUMACPUSharedPools []CPUSharedPool

type CPUSharedPool struct {
Expand Down
11 changes: 11 additions & 0 deletions apis/scheduling/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,17 @@ const (
CPUBindPolicyConstrainedBurst CPUBindPolicy = "ConstrainedBurst"
)

type CPUExclusivePolicy string

const (
// CPUExclusivePolicyNone does not perform any exclusive policy
CPUExclusivePolicyNone CPUExclusivePolicy = "None"
// CPUExclusivePolicyPCPULevel represents mutual exclusion in the physical core dimension
CPUExclusivePolicyPCPULevel CPUExclusivePolicy = "PCPULevel"
// CPUExclusivePolicyNUMANodeLevel indicates mutual exclusion in the NUMA topology dimension
CPUExclusivePolicyNUMANodeLevel CPUExclusivePolicy = "NUMANodeLevel"
)

// NUMAAllocateStrategy indicates how to choose satisfied NUMA Nodes
type NUMAAllocateStrategy string

Expand Down
13 changes: 12 additions & 1 deletion apis/scheduling/config/v1beta2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type ScoringStrategy struct {

// NodeNUMAResourceArgs holds arguments used to configure the NodeNUMAResource plugin.
type NodeNUMAResourceArgs struct {
metav1.TypeMeta `json:",inline"`
metav1.TypeMeta

DefaultCPUBindPolicy CPUBindPolicy `json:"defaultCPUBindPolicy,omitempty"`
NUMAAllocateStrategy NUMAAllocateStrategy `json:"numaAllocateStrategy,omitempty"`
Expand All @@ -92,6 +92,17 @@ const (
CPUBindPolicyConstrainedBurst CPUBindPolicy = "ConstrainedBurst"
)

type CPUExclusivePolicy string

const (
// CPUExclusivePolicyNone does not perform any exclusive policy
CPUExclusivePolicyNone CPUExclusivePolicy = "None"
// CPUExclusivePolicyPCPULevel represents mutual exclusion in the physical core dimension
CPUExclusivePolicyPCPULevel CPUExclusivePolicy = "PCPULevel"
// CPUExclusivePolicyNUMANodeLevel indicates mutual exclusion in the NUMA topology dimension
CPUExclusivePolicyNUMANodeLevel CPUExclusivePolicy = "NUMANodeLevel"
)

// NUMAAllocateStrategy indicates how to choose satisfied NUMA Nodes
type NUMAAllocateStrategy string

Expand Down
113 changes: 71 additions & 42 deletions pkg/scheduler/plugins/nodenumaresource/cpu_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import (
"fmt"
"sort"

"github.com/koordinator-sh/koordinator/apis/extension"
"k8s.io/apimachinery/pkg/util/sets"

schedulingconfig "github.com/koordinator-sh/koordinator/apis/scheduling/config"
)

Expand All @@ -41,11 +42,11 @@ func takeCPUs(
availableCPUs CPUSet,
allocatedCPUs CPUDetails,
numCPUsNeeded int,
cpuBindPolicy extension.CPUBindPolicy,
exclusive bool,
cpuBindPolicy schedulingconfig.CPUBindPolicy,
cpuExclusivePolicy schedulingconfig.CPUExclusivePolicy,
numaAllocatedStrategy schedulingconfig.NUMAAllocateStrategy,
) (CPUSet, error) {
acc := newCPUAccumulator(topology, availableCPUs, allocatedCPUs, numCPUsNeeded, exclusive, numaAllocatedStrategy)
acc := newCPUAccumulator(topology, availableCPUs, allocatedCPUs, numCPUsNeeded, cpuExclusivePolicy, numaAllocatedStrategy)
if acc.isSatisfied() {
return acc.result, nil
}
Expand All @@ -58,12 +59,15 @@ func takeCPUs(
// According to the NUMA allocation strategy,
// select the NUMA Node with the most remaining amount or the least amount remaining
// and the total amount of available CPUs in the NUMA Node is greater than or equal to the number of CPUs needed
filterExclusiveArgs := []bool{true, false}
if acc.numCPUsNeeded <= acc.topology.CPUsPerNode() {
freeCPUs := acc.freeCoresInNode(true)
for _, cpus := range freeCPUs {
if len(cpus) >= acc.numCPUsNeeded {
acc.take(cpus[:acc.numCPUsNeeded]...)
return acc.result, nil
for _, filterExclusive := range filterExclusiveArgs {
freeCPUs := acc.freeCoresInNode(true, filterExclusive)
for _, cpus := range freeCPUs {
if len(cpus) >= acc.numCPUsNeeded {
acc.take(cpus[:acc.numCPUsNeeded]...)
return acc.result, nil
}
}
}
}
Expand Down Expand Up @@ -181,10 +185,12 @@ func takeCPUs(

type cpuAccumulator struct {
topology *CPUTopology
details CPUDetails
allocatableCPUs CPUDetails
numCPUsNeeded int
exclusive bool
exclusiveInCores map[int]bool
exclusiveInCores sets.Int
exclusiveInNUMANodes sets.Int
exclusivePolicy schedulingconfig.CPUExclusivePolicy
numaAllocateStrategy schedulingconfig.NUMAAllocateStrategy
result CPUSet
}
Expand All @@ -194,20 +200,30 @@ func newCPUAccumulator(
availableCPUs CPUSet,
allocatedCPUs CPUDetails,
numCPUsNeeded int,
exclusive bool,
exclusivePolicy schedulingconfig.CPUExclusivePolicy,
numaSortStrategy schedulingconfig.NUMAAllocateStrategy,
) *cpuAccumulator {
exclusiveInCores := make(map[int]bool)
exclusiveInCores := sets.NewInt()
exclusiveInNUMANodes := sets.NewInt()
for _, v := range allocatedCPUs {
exclusiveInCores[v.CoreID] = v.Exclusive
if v.ExclusivePolicy == schedulingconfig.CPUExclusivePolicyPCPULevel {
exclusiveInCores.Insert(v.CoreID)
} else if v.ExclusivePolicy == schedulingconfig.CPUExclusivePolicyNUMANodeLevel {
exclusiveInNUMANodes.Insert(v.NodeID)
}
}
details := topology.CPUDetails.KeepOnly(availableCPUs)
allocatableCPUs := topology.CPUDetails.KeepOnly(availableCPUs)

exclusive := exclusivePolicy == schedulingconfig.CPUExclusivePolicyPCPULevel ||
exclusivePolicy == schedulingconfig.CPUExclusivePolicyNUMANodeLevel

return &cpuAccumulator{
topology: topology,
details: details,
allocatableCPUs: allocatableCPUs,
exclusiveInCores: exclusiveInCores,
exclusiveInNUMANodes: exclusiveInNUMANodes,
exclusive: exclusive,
exclusivePolicy: exclusivePolicy,
numCPUsNeeded: numCPUsNeeded,
numaAllocateStrategy: numaSortStrategy,
result: NewCPUSet(),
Expand All @@ -217,10 +233,14 @@ func newCPUAccumulator(
func (a *cpuAccumulator) take(cpus ...int) {
a.result = a.result.UnionSlice(cpus...)
for _, cpu := range cpus {
delete(a.details, cpu)
delete(a.allocatableCPUs, cpu)
if a.exclusive {
cpuInfo := a.topology.CPUDetails[cpu]
a.exclusiveInCores[cpuInfo.CoreID] = true
if a.exclusivePolicy == schedulingconfig.CPUExclusivePolicyPCPULevel {
a.exclusiveInCores.Insert(cpuInfo.CoreID)
} else if a.exclusivePolicy == schedulingconfig.CPUExclusivePolicyNUMANodeLevel {
a.exclusiveInNUMANodes.Insert(cpuInfo.NodeID)
}
}
}
a.numCPUsNeeded -= len(cpus)
Expand All @@ -235,15 +255,21 @@ func (a *cpuAccumulator) isSatisfied() bool {
}

func (a *cpuAccumulator) isFailed() bool {
return a.numCPUsNeeded > len(a.details)
return a.numCPUsNeeded > len(a.allocatableCPUs)
}

func (a *cpuAccumulator) isCPUExclusivePCPULevel(cpuInfo *CPUInfo) bool {
if a.exclusivePolicy != schedulingconfig.CPUExclusivePolicyPCPULevel {
return false
}
return a.exclusiveInCores.Has(cpuInfo.CoreID)
}

func (a *cpuAccumulator) isCPUExclusive(cpuID int) bool {
if !a.exclusive {
func (a *cpuAccumulator) isCPUExclusiveNUMANodeLevel(cpuInfo *CPUInfo) bool {
if a.exclusivePolicy != schedulingconfig.CPUExclusivePolicyNUMANodeLevel {
return false
}
cpuInfo := a.topology.CPUDetails[cpuID]
return a.exclusiveInCores[cpuInfo.CoreID]
return a.exclusiveInNUMANodes.Has(cpuInfo.NodeID)
}

func (a *cpuAccumulator) extractCPU(cpus []int) []int {
Expand Down Expand Up @@ -278,12 +304,15 @@ func (a *cpuAccumulator) sortCores(cores []int, cpusInCores map[int][]int) {
}

// freeCoresInNode returns the logical cpus of the free cores in nodes that sorted
func (a *cpuAccumulator) freeCoresInNode(filterFullFreeCore bool) [][]int {
details := a.details
func (a *cpuAccumulator) freeCoresInNode(filterFullFreeCore bool, filterExclusive bool) [][]int {
allocatableCPUs := a.allocatableCPUs

socketFreeScores := make(map[int]int)
cpusInCores := make(map[int][]int)
for _, cpuInfo := range details {
for _, cpuInfo := range allocatableCPUs {
if filterExclusive && a.isCPUExclusiveNUMANodeLevel(&cpuInfo) {
continue
}
cpus := cpusInCores[cpuInfo.CoreID]
if len(cpus) == 0 {
cpus = make([]int, 0, a.topology.CPUsPerCore())
Expand All @@ -299,7 +328,7 @@ func (a *cpuAccumulator) freeCoresInNode(filterFullFreeCore bool) [][]int {
if filterFullFreeCore && len(cpus) != a.topology.CPUsPerCore() {
continue
}
info := details[cpus[0]]
info := allocatableCPUs[cpus[0]]
cores := coresInNodes[info.NodeID]
if len(cores) == 0 {
cores = make([]int, 0, a.topology.CPUsPerNode()/a.topology.CPUsPerCore())
Expand Down Expand Up @@ -328,8 +357,8 @@ func (a *cpuAccumulator) freeCoresInNode(filterFullFreeCore bool) [][]int {
jCPUs := cpusInNodes[nodeIDs[j]]

// each cpu's socketId and nodeId in same node are same
iCPUInfo := details[iCPUs[0]]
jCPUInfo := details[jCPUs[0]]
iCPUInfo := allocatableCPUs[iCPUs[0]]
jCPUInfo := allocatableCPUs[jCPUs[0]]

iSocket := iCPUInfo.SocketID
jSocket := jCPUInfo.SocketID
Expand Down Expand Up @@ -369,10 +398,10 @@ func (a *cpuAccumulator) freeCoresInNode(filterFullFreeCore bool) [][]int {

// freeCoresInSocket returns the logical cpus of the free cores in sockets that sorted
func (a *cpuAccumulator) freeCoresInSocket(filterFullFreeCore bool) [][]int {
details := a.details
allocatableCPUs := a.allocatableCPUs

cpusInCores := make(map[int][]int)
for _, cpuInfo := range details {
for _, cpuInfo := range allocatableCPUs {
cpus := cpusInCores[cpuInfo.CoreID]
if len(cpus) == 0 {
cpus = make([]int, 0, a.topology.CPUsPerCore())
Expand All @@ -387,7 +416,7 @@ func (a *cpuAccumulator) freeCoresInSocket(filterFullFreeCore bool) [][]int {
if filterFullFreeCore && len(cpus) != a.topology.CPUsPerCore() {
continue
}
info := details[cpus[0]]
info := allocatableCPUs[cpus[0]]
cores := coresInSockets[info.SocketID]
if len(cores) == 0 {
cores = make([]int, 0, a.topology.CPUsPerSocket()/a.topology.CPUsPerCore())
Expand Down Expand Up @@ -438,8 +467,8 @@ func (a *cpuAccumulator) freeCPUsInNode(filterExclusive bool) [][]int {
cpusInNodes := make(map[int][]int)
nodeFreeScores := make(map[int]int)
socketFreeScores := make(map[int]int)
for _, cpuInfo := range a.details {
if filterExclusive && a.isCPUExclusive(cpuInfo.CPUID) {
for _, cpuInfo := range a.allocatableCPUs {
if filterExclusive && (a.isCPUExclusivePCPULevel(&cpuInfo) || a.isCPUExclusiveNUMANodeLevel(&cpuInfo)) {
continue
}
cpus := cpusInNodes[cpuInfo.NodeID]
Expand All @@ -466,8 +495,8 @@ func (a *cpuAccumulator) freeCPUsInNode(filterExclusive bool) [][]int {
iCPUs := cpusInNodes[nodeIDs[i]]
jCPUs := cpusInNodes[nodeIDs[j]]

iCPUInfo := a.details[iCPUs[0]]
jCPUInfo := a.details[jCPUs[0]]
iCPUInfo := a.allocatableCPUs[iCPUs[0]]
jCPUInfo := a.allocatableCPUs[jCPUs[0]]

iNode := iCPUInfo.NodeID
jNode := jCPUInfo.NodeID
Expand Down Expand Up @@ -510,10 +539,10 @@ func (a *cpuAccumulator) freeCPUsInNode(filterExclusive bool) [][]int {

// freeCPUsInSocket returns free logical cpus in sockets that sorted in ascending order.
func (a *cpuAccumulator) freeCPUsInSocket(filterExclusive bool) [][]int {
details := a.details
allocatableCPUs := a.allocatableCPUs
cpusInSockets := make(map[int][]int)
for _, cpuInfo := range details {
if filterExclusive && a.isCPUExclusive(cpuInfo.CPUID) {
for _, cpuInfo := range allocatableCPUs {
if filterExclusive && a.isCPUExclusivePCPULevel(&cpuInfo) {
continue
}
cpus := cpusInSockets[cpuInfo.SocketID]
Expand Down Expand Up @@ -564,14 +593,14 @@ func (a *cpuAccumulator) freeCPUsInSocket(filterExclusive bool) [][]int {
// - node ID
// - core ID
func (a *cpuAccumulator) freeCPUs(filterExclusive bool) []int {
details := a.details
allocatableCPUs := a.allocatableCPUs
cpusInCores := make(map[int][]int)
coresToSocket := make(map[int]int)
coresToNode := make(map[int]int)
nodeFreeScores := make(map[int]int)
socketFreeScores := make(map[int]int)
for _, cpuInfo := range details {
if filterExclusive && a.isCPUExclusive(cpuInfo.CPUID) {
for _, cpuInfo := range allocatableCPUs {
if filterExclusive && (a.isCPUExclusivePCPULevel(&cpuInfo) || a.isCPUExclusiveNUMANodeLevel(&cpuInfo)) {
continue
}

Expand Down
Loading