Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cpu manager: handle reduced cpuset in root cgroup. #87522

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/kubelet/cm/container_manager_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,13 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I

// Initialize CPU manager
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
cpusetMountPoint, found := subsystems.MountPoints["cpuset"]
if !found || len(cpusetMountPoint) == 0 {
return nil, fmt.Errorf("cpuset cgroup mount point not found")
}

cpusetPath := path.Join(cpusetMountPoint, cgroupManager.Name(cgroupRoot))

cm.cpuManager, err = cpumanager.NewManager(
nodeConfig.ExperimentalCPUManagerPolicy,
nodeConfig.ExperimentalCPUManagerReconcilePeriod,
Expand All @@ -325,6 +332,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
cm.GetNodeAllocatableReservation(),
nodeConfig.KubeletRootDir,
cm.topologyManager,
cpusetPath,
)
if err != nil {
klog.Errorf("failed to initialize cpu manager: %v", err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/kubelet/cm/cpumanager/cpu_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (s *sourcesReadyStub) AddSource(source string) {}
func (s *sourcesReadyStub) AllReady() bool { return true }

// NewManager creates new cpu manager based on provided policy
func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, numaNodeInfo topology.NUMANodeInfo, specificCPUs cpuset.CPUSet, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) {
func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, numaNodeInfo topology.NUMANodeInfo, specificCPUs cpuset.CPUSet, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string, affinity topologymanager.Store, cgroupPath string) (Manager, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking would cpusetPath or similar be better name for the argument(?)

var topo *topology.CPUTopology
var policy Policy

Expand Down Expand Up @@ -155,7 +155,7 @@ func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo
// exclusively allocated.
reservedCPUsFloat := float64(reservedCPUs.MilliValue()) / 1000
numReservedCPUs := int(math.Ceil(reservedCPUsFloat))
policy, err = NewStaticPolicy(topo, numReservedCPUs, specificCPUs, affinity)
policy, err = NewStaticPolicy(topo, numReservedCPUs, specificCPUs, affinity, cgroupPath)
if err != nil {
return nil, fmt.Errorf("new static policy error: %v", err)
}
Expand Down
49 changes: 39 additions & 10 deletions pkg/kubelet/cm/cpumanager/policy_static.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ package cpumanager

import (
"fmt"
"io/ioutil"
"path"
"strings"

v1 "k8s.io/api/core/v1"
"k8s.io/klog"
Expand Down Expand Up @@ -75,18 +78,44 @@ type staticPolicy struct {
topology *topology.CPUTopology
// set of CPUs that is not available for exclusive assignment
reserved cpuset.CPUSet
// set of CPUs that is available in the top-level cpuset cgroup
available cpuset.CPUSet
// topology manager reference to get container Topology affinity
affinity topologymanager.Store
}

// Ensure staticPolicy implements Policy interface
var _ Policy = &staticPolicy{}

func readCPUSet(cgroupRoot string) (cpuset.CPUSet, error) {
fileName := path.Join(cgroupRoot, "cpuset.cpus")

data, err := ioutil.ReadFile(fileName)
if err != nil {
return cpuset.NewCPUSet(), err
}

cpus, err := cpuset.Parse(strings.TrimSpace(string(data)))
if err != nil {
return cpuset.NewCPUSet(), err
}

return cpus, nil
}

// NewStaticPolicy returns a CPU manager policy that does not change CPU
// assignments for exclusively pinned guaranteed containers after the main
// container process starts.
func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reservedCPUs cpuset.CPUSet, affinity topologymanager.Store) (Policy, error) {
allCPUs := topology.CPUDetails.CPUs()
func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reservedCPUs cpuset.CPUSet, affinity topologymanager.Store, cgroupPath string) (Policy, error) {
// It can be that the the cgroup root (provided with --cgroup-root kubelet
// command line option) has an already restricted set of CPUs. This means
// that the "reserved" set needs to contain the CPUs on which we can't
// run any containers.
available, err := readCPUSet(cgroupPath)
if err != nil {
return nil, err
}

var reserved cpuset.CPUSet
if reservedCPUs.Size() > 0 {
reserved = reservedCPUs
Expand All @@ -96,7 +125,7 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv
//
// For example: Given a system with 8 CPUs available and HT enabled,
// if numReservedCPUs=2, then reserved={0,4}
reserved, _ = takeByTopology(topology, allCPUs, numReservedCPUs)
reserved, _ = takeByTopology(topology, available, numReservedCPUs)
}

if reserved.Size() != numReservedCPUs {
Expand All @@ -107,9 +136,10 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv
klog.Infof("[cpumanager] reserved %d CPUs (\"%s\") not available for exclusive assignment", reserved.Size(), reserved)

return &staticPolicy{
topology: topology,
reserved: reserved,
affinity: affinity,
topology: topology,
reserved: reserved,
available: available,
affinity: affinity,
}, nil
}

Expand All @@ -135,8 +165,7 @@ func (p *staticPolicy) validateState(s state.State) error {
return fmt.Errorf("default cpuset cannot be empty")
}
// state is empty initialize
allCPUs := p.topology.CPUDetails.CPUs()
s.SetDefaultCPUSet(allCPUs)
s.SetDefaultCPUSet(p.available)
return nil
}

Expand Down Expand Up @@ -175,9 +204,9 @@ func (p *staticPolicy) validateState(s state.State) error {
}
}
totalKnownCPUs = totalKnownCPUs.UnionAll(tmpCPUSets)
if !totalKnownCPUs.Equals(p.topology.CPUDetails.CPUs()) {
if !totalKnownCPUs.Equals(p.available) {
return fmt.Errorf("current set of available CPUs \"%s\" doesn't match with CPUs in state \"%s\"",
p.topology.CPUDetails.CPUs().String(), totalKnownCPUs.String())
p.available, totalKnownCPUs.String())
}

return nil
Expand Down