Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update strategy used to reuse CPUs from init containers in CPUManager #90419

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions pkg/kubelet/cm/cpumanager/cpu_manager_test.go
Expand Up @@ -503,6 +503,8 @@ func TestCPUManagerAddWithInitContainers(t *testing.T) {
testCase.expInitCSets,
testCase.expCSets...)

cumCSet := cpuset.NewCPUSet()

for i := range containers {
err := mgr.Allocate(testCase.pod, &containers[i])
if err != nil {
Expand All @@ -525,6 +527,13 @@ func TestCPUManagerAddWithInitContainers(t *testing.T) {
t.Errorf("StaticPolicy AddContainer() error (%v). expected cpuset %v for container %v but got %v",
testCase.description, expCSets[i], containers[i].Name, cset)
}

cumCSet = cumCSet.Union(cset)
}

if !testCase.stDefaultCPUSet.Difference(cumCSet).Equals(state.defaultCPUSet) {
t.Errorf("StaticPolicy error (%v). expected final state for defaultCPUSet %v but got %v",
testCase.description, testCase.stDefaultCPUSet.Difference(cumCSet), state.defaultCPUSet)
}
}
}
Expand Down
57 changes: 39 additions & 18 deletions pkg/kubelet/cm/cpumanager/policy_static.go
Expand Up @@ -77,6 +77,8 @@ type staticPolicy struct {
reserved cpuset.CPUSet
// topology manager reference to get container Topology affinity
affinity topologymanager.Store
// set of CPUs to reuse across allocations in a pod
cpusToReuse map[string]cpuset.CPUSet
}

// Ensure staticPolicy implements Policy interface
Expand Down Expand Up @@ -107,9 +109,10 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv
klog.Infof("[cpumanager] reserved %d CPUs (\"%s\") not available for exclusive assignment", reserved.Size(), reserved)

return &staticPolicy{
topology: topology,
reserved: reserved,
affinity: affinity,
topology: topology,
reserved: reserved,
affinity: affinity,
cpusToReuse: make(map[string]cpuset.CPUSet),
}, nil
}

Expand Down Expand Up @@ -188,12 +191,37 @@ func (p *staticPolicy) assignableCPUs(s state.State) cpuset.CPUSet {
return s.GetDefaultCPUSet().Difference(p.reserved)
}

func (p *staticPolicy) updateCPUsToReuse(pod *v1.Pod, container *v1.Container, cset cpuset.CPUSet) {
// If pod entries to m.cpusToReuse other than the current pod exist, delete them.
for podUID := range p.cpusToReuse {
if podUID != string(pod.UID) {
delete(p.cpusToReuse, podUID)
}
}
// If no cpuset exists for cpusToReuse by this pod yet, create one.
if _, ok := p.cpusToReuse[string(pod.UID)]; !ok {
p.cpusToReuse[string(pod.UID)] = cpuset.NewCPUSet()
}
// Check if the container is an init container.
// If so, add its cpuset to the cpuset of reusable CPUs for any new allocations.
for _, initContainer := range pod.Spec.InitContainers {
if container.Name == initContainer.Name {
p.cpusToReuse[string(pod.UID)] = p.cpusToReuse[string(pod.UID)].Union(cset)
return
}
}
// Otherwise it is an app container.
// Remove its cpuset from the cpuset of reusable CPUs for any new allocations.
p.cpusToReuse[string(pod.UID)] = p.cpusToReuse[string(pod.UID)].Difference(cset)
}

func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error {
if numCPUs := p.guaranteedCPUs(pod, container); numCPUs != 0 {
klog.Infof("[cpumanager] static policy: Allocate (pod: %s, container: %s)", pod.Name, container.Name)
// container belongs in an exclusively allocated pool

if _, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {
if cpuset, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {
p.updateCPUsToReuse(pod, container, cpuset)
klog.Infof("[cpumanager] static policy: container already present in state, skipping (pod: %s, container: %s)", pod.Name, container.Name)
return nil
}
Expand All @@ -203,23 +231,14 @@ func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Contai
klog.Infof("[cpumanager] Pod %v, Container %v Topology Affinity is: %v", pod.UID, container.Name, hint)

// Allocate CPUs according to the NUMA affinity contained in the hint.
cpuset, err := p.allocateCPUs(s, numCPUs, hint.NUMANodeAffinity)
cpuset, err := p.allocateCPUs(s, numCPUs, hint.NUMANodeAffinity, p.cpusToReuse[string(pod.UID)])
if err != nil {
klog.Errorf("[cpumanager] unable to allocate %d CPUs (pod: %s, container: %s, error: %v)", numCPUs, pod.Name, container.Name, err)
return err
}
s.SetCPUSet(string(pod.UID), container.Name, cpuset)
p.updateCPUsToReuse(pod, container, cpuset)

// Check if the container that has just been allocated resources is an init container.
// If so, release its CPUs back into the shared pool so they can be reallocated.
for _, initContainer := range pod.Spec.InitContainers {
if container.Name == initContainer.Name {
if toRelease, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {
// Mutate the shared pool, adding released cpus.
s.SetDefaultCPUSet(s.GetDefaultCPUSet().Union(toRelease))
}
}
}
}
// container belongs in the shared pool (nothing to do; use default cpuset)
return nil
Expand All @@ -235,15 +254,17 @@ func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerNa
return nil
}

func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bitmask.BitMask) (cpuset.CPUSet, error) {
func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bitmask.BitMask, reusableCPUs cpuset.CPUSet) (cpuset.CPUSet, error) {
klog.Infof("[cpumanager] allocateCpus: (numCPUs: %d, socket: %v)", numCPUs, numaAffinity)

assignableCPUs := p.assignableCPUs(s).Union(reusableCPUs)

// If there are aligned CPUs in numaAffinity, attempt to take those first.
result := cpuset.NewCPUSet()
if numaAffinity != nil {
alignedCPUs := cpuset.NewCPUSet()
for _, numaNodeID := range numaAffinity.GetBits() {
alignedCPUs = alignedCPUs.Union(p.assignableCPUs(s).Intersection(p.topology.CPUDetails.CPUsInNUMANodes(numaNodeID)))
alignedCPUs = alignedCPUs.Union(assignableCPUs.Intersection(p.topology.CPUDetails.CPUsInNUMANodes(numaNodeID)))
}

numAlignedToAlloc := alignedCPUs.Size()
Expand All @@ -260,7 +281,7 @@ func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bit
}

// Get any remaining CPUs from what's leftover after attempting to grab aligned ones.
remainingCPUs, err := takeByTopology(p.topology, p.assignableCPUs(s).Difference(result), numCPUs-result.Size())
remainingCPUs, err := takeByTopology(p.topology, assignableCPUs.Difference(result), numCPUs-result.Size())
if err != nil {
return cpuset.NewCPUSet(), err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/cm/cpumanager/policy_static_test.go
Expand Up @@ -639,7 +639,7 @@ func TestTopologyAwareAllocateCPUs(t *testing.T) {
continue
}

cset, err := policy.allocateCPUs(st, tc.numRequested, tc.socketMask)
cset, err := policy.allocateCPUs(st, tc.numRequested, tc.socketMask, cpuset.NewCPUSet())
if err != nil {
t.Errorf("StaticPolicy allocateCPUs() error (%v). expected CPUSet %v not error %v",
tc.description, tc.expCSet, err)
Expand Down