Skip to content

Commit

Permalink
koord-scheduler: support CPU exclusive policy
Browse files Browse the repository at this point in the history
Signed-off-by: Joseph <joseph.t.lee@outlook.com>
  • Loading branch information
eahydra committed Jul 11, 2022
1 parent 7d46fad commit 048a3bd
Show file tree
Hide file tree
Showing 9 changed files with 275 additions and 209 deletions.
13 changes: 13 additions & 0 deletions apis/extension/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ var (
type ResourceSpec struct {
// PreferredCPUBindPolicy represents best-effort CPU bind policy.
PreferredCPUBindPolicy CPUBindPolicy `json:"preferredCPUBindPolicy,omitempty"`
// PreferredCPUExclusivePolicy represents best-effort CPU exclusive policy.
PreferredCPUExclusivePolicy CPUExclusivePolicy `json:"preferredCPUExclusivePolicy,omitempty"`
}

// ResourceStatus describes resource allocation result, such as how to bind CPU.
Expand All @@ -74,6 +76,17 @@ const (
CPUBindPolicyConstrainedBurst CPUBindPolicy = schedulingconfig.CPUBindPolicyConstrainedBurst
)

type CPUExclusivePolicy = schedulingconfig.CPUExclusivePolicy

const (
// CPUExclusivePolicyNone does not perform any exclusive policy
CPUExclusivePolicyNone CPUExclusivePolicy = schedulingconfig.CPUExclusivePolicyNone
// CPUExclusivePolicyPCPULevel represents mutual exclusion in the physical core dimension
CPUExclusivePolicyPCPULevel CPUExclusivePolicy = schedulingconfig.CPUExclusivePolicyPCPULevel
// CPUExclusivePolicyNUMANodeLevel indicates mutual exclusion in the NUMA topology dimension
CPUExclusivePolicyNUMANodeLevel CPUExclusivePolicy = schedulingconfig.CPUExclusivePolicyNUMANodeLevel
)

type NUMACPUSharedPools []CPUSharedPool

type CPUSharedPool struct {
Expand Down
11 changes: 11 additions & 0 deletions apis/scheduling/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,17 @@ const (
CPUBindPolicyConstrainedBurst CPUBindPolicy = "ConstrainedBurst"
)

type CPUExclusivePolicy string

const (
// CPUExclusivePolicyNone does not perform any exclusive policy
CPUExclusivePolicyNone CPUExclusivePolicy = "None"
// CPUExclusivePolicyPCPULevel represents mutual exclusion in the physical core dimension
CPUExclusivePolicyPCPULevel CPUExclusivePolicy = "PCPULevel"
// CPUExclusivePolicyNUMANodeLevel indicates mutual exclusion in the NUMA topology dimension
CPUExclusivePolicyNUMANodeLevel CPUExclusivePolicy = "NUMANodeLevel"
)

// NUMAAllocateStrategy indicates how to choose satisfied NUMA Nodes
type NUMAAllocateStrategy string

Expand Down
13 changes: 12 additions & 1 deletion apis/scheduling/config/v1beta2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type ScoringStrategy struct {

// NodeNUMAResourceArgs holds arguments used to configure the NodeNUMAResource plugin.
type NodeNUMAResourceArgs struct {
metav1.TypeMeta `json:",inline"`
metav1.TypeMeta

DefaultCPUBindPolicy CPUBindPolicy `json:"defaultCPUBindPolicy,omitempty"`
NUMAAllocateStrategy NUMAAllocateStrategy `json:"numaAllocateStrategy,omitempty"`
Expand All @@ -92,6 +92,17 @@ const (
CPUBindPolicyConstrainedBurst CPUBindPolicy = "ConstrainedBurst"
)

type CPUExclusivePolicy string

const (
// CPUExclusivePolicyNone does not perform any exclusive policy
CPUExclusivePolicyNone CPUExclusivePolicy = "None"
// CPUExclusivePolicyPCPULevel represents mutual exclusion in the physical core dimension
CPUExclusivePolicyPCPULevel CPUExclusivePolicy = "PCPULevel"
// CPUExclusivePolicyNUMANodeLevel indicates mutual exclusion in the NUMA topology dimension
CPUExclusivePolicyNUMANodeLevel CPUExclusivePolicy = "NUMANodeLevel"
)

// NUMAAllocateStrategy indicates how to choose satisfied NUMA Nodes
type NUMAAllocateStrategy string

Expand Down
113 changes: 71 additions & 42 deletions pkg/scheduler/plugins/nodenumaresource/cpu_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import (
"fmt"
"sort"

"github.com/koordinator-sh/koordinator/apis/extension"
"k8s.io/apimachinery/pkg/util/sets"

schedulingconfig "github.com/koordinator-sh/koordinator/apis/scheduling/config"
)

Expand All @@ -41,11 +42,11 @@ func takeCPUs(
availableCPUs CPUSet,
allocatedCPUs CPUDetails,
numCPUsNeeded int,
cpuBindPolicy extension.CPUBindPolicy,
exclusive bool,
cpuBindPolicy schedulingconfig.CPUBindPolicy,
cpuExclusivePolicy schedulingconfig.CPUExclusivePolicy,
numaAllocatedStrategy schedulingconfig.NUMAAllocateStrategy,
) (CPUSet, error) {
acc := newCPUAccumulator(topology, availableCPUs, allocatedCPUs, numCPUsNeeded, exclusive, numaAllocatedStrategy)
acc := newCPUAccumulator(topology, availableCPUs, allocatedCPUs, numCPUsNeeded, cpuExclusivePolicy, numaAllocatedStrategy)
if acc.isSatisfied() {
return acc.result, nil
}
Expand All @@ -58,12 +59,15 @@ func takeCPUs(
// According to the NUMA allocation strategy,
// select the NUMA Node with the most remaining amount or the least amount remaining
// and the total amount of available CPUs in the NUMA Node is greater than or equal to the number of CPUs needed
filterExclusiveArgs := []bool{true, false}
if acc.numCPUsNeeded <= acc.topology.CPUsPerNode() {
freeCPUs := acc.freeCoresInNode(true)
for _, cpus := range freeCPUs {
if len(cpus) >= acc.numCPUsNeeded {
acc.take(cpus[:acc.numCPUsNeeded]...)
return acc.result, nil
for _, filterExclusive := range filterExclusiveArgs {
freeCPUs := acc.freeCoresInNode(true, filterExclusive)
for _, cpus := range freeCPUs {
if len(cpus) >= acc.numCPUsNeeded {
acc.take(cpus[:acc.numCPUsNeeded]...)
return acc.result, nil
}
}
}
}
Expand Down Expand Up @@ -181,10 +185,12 @@ func takeCPUs(

type cpuAccumulator struct {
topology *CPUTopology
details CPUDetails
allocatableCPUs CPUDetails
numCPUsNeeded int
exclusive bool
exclusiveInCores map[int]bool
exclusiveInCores sets.Int
exclusiveInNUMANodes sets.Int
exclusivePolicy schedulingconfig.CPUExclusivePolicy
numaAllocateStrategy schedulingconfig.NUMAAllocateStrategy
result CPUSet
}
Expand All @@ -194,20 +200,30 @@ func newCPUAccumulator(
availableCPUs CPUSet,
allocatedCPUs CPUDetails,
numCPUsNeeded int,
exclusive bool,
exclusivePolicy schedulingconfig.CPUExclusivePolicy,
numaSortStrategy schedulingconfig.NUMAAllocateStrategy,
) *cpuAccumulator {
exclusiveInCores := make(map[int]bool)
exclusiveInCores := sets.NewInt()
exclusiveInNUMANodes := sets.NewInt()
for _, v := range allocatedCPUs {
exclusiveInCores[v.CoreID] = v.Exclusive
if v.ExclusivePolicy == schedulingconfig.CPUExclusivePolicyPCPULevel {
exclusiveInCores.Insert(v.CoreID)
} else if v.ExclusivePolicy == schedulingconfig.CPUExclusivePolicyNUMANodeLevel {
exclusiveInNUMANodes.Insert(v.NodeID)
}
}
details := topology.CPUDetails.KeepOnly(availableCPUs)
allocatableCPUs := topology.CPUDetails.KeepOnly(availableCPUs)

exclusive := exclusivePolicy == schedulingconfig.CPUExclusivePolicyPCPULevel ||
exclusivePolicy == schedulingconfig.CPUExclusivePolicyNUMANodeLevel

return &cpuAccumulator{
topology: topology,
details: details,
allocatableCPUs: allocatableCPUs,
exclusiveInCores: exclusiveInCores,
exclusiveInNUMANodes: exclusiveInNUMANodes,
exclusive: exclusive,
exclusivePolicy: exclusivePolicy,
numCPUsNeeded: numCPUsNeeded,
numaAllocateStrategy: numaSortStrategy,
result: NewCPUSet(),
Expand All @@ -217,10 +233,14 @@ func newCPUAccumulator(
func (a *cpuAccumulator) take(cpus ...int) {
a.result = a.result.UnionSlice(cpus...)
for _, cpu := range cpus {
delete(a.details, cpu)
delete(a.allocatableCPUs, cpu)
if a.exclusive {
cpuInfo := a.topology.CPUDetails[cpu]
a.exclusiveInCores[cpuInfo.CoreID] = true
if a.exclusivePolicy == schedulingconfig.CPUExclusivePolicyPCPULevel {
a.exclusiveInCores.Insert(cpuInfo.CoreID)
} else if a.exclusivePolicy == schedulingconfig.CPUExclusivePolicyNUMANodeLevel {
a.exclusiveInNUMANodes.Insert(cpuInfo.NodeID)
}
}
}
a.numCPUsNeeded -= len(cpus)
Expand All @@ -235,15 +255,21 @@ func (a *cpuAccumulator) isSatisfied() bool {
}

func (a *cpuAccumulator) isFailed() bool {
return a.numCPUsNeeded > len(a.details)
return a.numCPUsNeeded > len(a.allocatableCPUs)
}

func (a *cpuAccumulator) isCPUExclusivePCPULevel(cpuInfo *CPUInfo) bool {
if a.exclusivePolicy != schedulingconfig.CPUExclusivePolicyPCPULevel {
return false
}
return a.exclusiveInCores.Has(cpuInfo.CoreID)
}

func (a *cpuAccumulator) isCPUExclusive(cpuID int) bool {
if !a.exclusive {
func (a *cpuAccumulator) isCPUExclusiveNUMANodeLevel(cpuInfo *CPUInfo) bool {
if a.exclusivePolicy != schedulingconfig.CPUExclusivePolicyNUMANodeLevel {
return false
}
cpuInfo := a.topology.CPUDetails[cpuID]
return a.exclusiveInCores[cpuInfo.CoreID]
return a.exclusiveInNUMANodes.Has(cpuInfo.NodeID)
}

func (a *cpuAccumulator) extractCPU(cpus []int) []int {
Expand Down Expand Up @@ -278,12 +304,15 @@ func (a *cpuAccumulator) sortCores(cores []int, cpusInCores map[int][]int) {
}

// freeCoresInNode returns the logical cpus of the free cores in nodes that sorted
func (a *cpuAccumulator) freeCoresInNode(filterFullFreeCore bool) [][]int {
details := a.details
func (a *cpuAccumulator) freeCoresInNode(filterFullFreeCore bool, filterExclusive bool) [][]int {
allocatableCPUs := a.allocatableCPUs

socketFreeScores := make(map[int]int)
cpusInCores := make(map[int][]int)
for _, cpuInfo := range details {
for _, cpuInfo := range allocatableCPUs {
if filterExclusive && a.isCPUExclusiveNUMANodeLevel(&cpuInfo) {
continue
}
cpus := cpusInCores[cpuInfo.CoreID]
if len(cpus) == 0 {
cpus = make([]int, 0, a.topology.CPUsPerCore())
Expand All @@ -299,7 +328,7 @@ func (a *cpuAccumulator) freeCoresInNode(filterFullFreeCore bool) [][]int {
if filterFullFreeCore && len(cpus) != a.topology.CPUsPerCore() {
continue
}
info := details[cpus[0]]
info := allocatableCPUs[cpus[0]]
cores := coresInNodes[info.NodeID]
if len(cores) == 0 {
cores = make([]int, 0, a.topology.CPUsPerNode()/a.topology.CPUsPerCore())
Expand Down Expand Up @@ -328,8 +357,8 @@ func (a *cpuAccumulator) freeCoresInNode(filterFullFreeCore bool) [][]int {
jCPUs := cpusInNodes[nodeIDs[j]]

// each cpu's socketId and nodeId in same node are same
iCPUInfo := details[iCPUs[0]]
jCPUInfo := details[jCPUs[0]]
iCPUInfo := allocatableCPUs[iCPUs[0]]
jCPUInfo := allocatableCPUs[jCPUs[0]]

iSocket := iCPUInfo.SocketID
jSocket := jCPUInfo.SocketID
Expand Down Expand Up @@ -369,10 +398,10 @@ func (a *cpuAccumulator) freeCoresInNode(filterFullFreeCore bool) [][]int {

// freeCoresInSocket returns the logical cpus of the free cores in sockets that sorted
func (a *cpuAccumulator) freeCoresInSocket(filterFullFreeCore bool) [][]int {
details := a.details
allocatableCPUs := a.allocatableCPUs

cpusInCores := make(map[int][]int)
for _, cpuInfo := range details {
for _, cpuInfo := range allocatableCPUs {
cpus := cpusInCores[cpuInfo.CoreID]
if len(cpus) == 0 {
cpus = make([]int, 0, a.topology.CPUsPerCore())
Expand All @@ -387,7 +416,7 @@ func (a *cpuAccumulator) freeCoresInSocket(filterFullFreeCore bool) [][]int {
if filterFullFreeCore && len(cpus) != a.topology.CPUsPerCore() {
continue
}
info := details[cpus[0]]
info := allocatableCPUs[cpus[0]]
cores := coresInSockets[info.SocketID]
if len(cores) == 0 {
cores = make([]int, 0, a.topology.CPUsPerSocket()/a.topology.CPUsPerCore())
Expand Down Expand Up @@ -438,8 +467,8 @@ func (a *cpuAccumulator) freeCPUsInNode(filterExclusive bool) [][]int {
cpusInNodes := make(map[int][]int)
nodeFreeScores := make(map[int]int)
socketFreeScores := make(map[int]int)
for _, cpuInfo := range a.details {
if filterExclusive && a.isCPUExclusive(cpuInfo.CPUID) {
for _, cpuInfo := range a.allocatableCPUs {
if filterExclusive && (a.isCPUExclusivePCPULevel(&cpuInfo) || a.isCPUExclusiveNUMANodeLevel(&cpuInfo)) {
continue
}
cpus := cpusInNodes[cpuInfo.NodeID]
Expand All @@ -466,8 +495,8 @@ func (a *cpuAccumulator) freeCPUsInNode(filterExclusive bool) [][]int {
iCPUs := cpusInNodes[nodeIDs[i]]
jCPUs := cpusInNodes[nodeIDs[j]]

iCPUInfo := a.details[iCPUs[0]]
jCPUInfo := a.details[jCPUs[0]]
iCPUInfo := a.allocatableCPUs[iCPUs[0]]
jCPUInfo := a.allocatableCPUs[jCPUs[0]]

iNode := iCPUInfo.NodeID
jNode := jCPUInfo.NodeID
Expand Down Expand Up @@ -510,10 +539,10 @@ func (a *cpuAccumulator) freeCPUsInNode(filterExclusive bool) [][]int {

// freeCPUsInSocket returns free logical cpus in sockets that sorted in ascending order.
func (a *cpuAccumulator) freeCPUsInSocket(filterExclusive bool) [][]int {
details := a.details
allocatableCPUs := a.allocatableCPUs
cpusInSockets := make(map[int][]int)
for _, cpuInfo := range details {
if filterExclusive && a.isCPUExclusive(cpuInfo.CPUID) {
for _, cpuInfo := range allocatableCPUs {
if filterExclusive && a.isCPUExclusivePCPULevel(&cpuInfo) {
continue
}
cpus := cpusInSockets[cpuInfo.SocketID]
Expand Down Expand Up @@ -564,14 +593,14 @@ func (a *cpuAccumulator) freeCPUsInSocket(filterExclusive bool) [][]int {
// - node ID
// - core ID
func (a *cpuAccumulator) freeCPUs(filterExclusive bool) []int {
details := a.details
allocatableCPUs := a.allocatableCPUs
cpusInCores := make(map[int][]int)
coresToSocket := make(map[int]int)
coresToNode := make(map[int]int)
nodeFreeScores := make(map[int]int)
socketFreeScores := make(map[int]int)
for _, cpuInfo := range details {
if filterExclusive && a.isCPUExclusive(cpuInfo.CPUID) {
for _, cpuInfo := range allocatableCPUs {
if filterExclusive && (a.isCPUExclusivePCPULevel(&cpuInfo) || a.isCPUExclusiveNUMANodeLevel(&cpuInfo)) {
continue
}

Expand Down
Loading

0 comments on commit 048a3bd

Please sign in to comment.