Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Guarantee aligned resources across containers #87759

5 changes: 2 additions & 3 deletions pkg/kubelet/cm/container_manager.go
Expand Up @@ -25,7 +25,6 @@ import (
internalapi "k8s.io/cri-api/pkg/apis"
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
Expand Down Expand Up @@ -111,8 +110,8 @@ type ContainerManager interface {
// due to node recreation.
ShouldResetExtendedResourceCapacity() bool

// GetTopologyPodAdmitHandler returns an instance of the TopologyManager for Pod Admission
GetTopologyPodAdmitHandler() topologymanager.Manager
// GetAllocateResourcesPodAdmitHandler returns an instance of a PodAdmitHandler responsible for allocating pod resources.
GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler

// UpdateAllocatedDevices frees any Devices that are bound to terminated pods.
UpdateAllocatedDevices()
Expand Down
46 changes: 43 additions & 3 deletions pkg/kubelet/cm/container_manager_linux.go
Expand Up @@ -672,11 +672,51 @@ func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Containe
}

func (cm *containerManagerImpl) UpdatePluginResources(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
return cm.deviceManager.Allocate(node, attrs)
return cm.deviceManager.UpdatePluginResources(node, attrs)
}

func (cm *containerManagerImpl) GetTopologyPodAdmitHandler() topologymanager.Manager {
return cm.topologyManager
func (cm *containerManagerImpl) GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler {
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) {
return cm.topologyManager
}
// TODO: we need to think about a better way to do this. This will work for
// now so long as we have only the cpuManager and deviceManager relying on
// allocations here. However, going forward it is not generalized enough to
// work as we add more and more hint providers that the TopologyManager
// needs to call Allocate() on (that may not be directly intstantiated
// inside this component).
return &resourceAllocator{cm.cpuManager, cm.deviceManager}
}

type resourceAllocator struct {
cpuManager cpumanager.Manager
deviceManager devicemanager.Manager
}

func (m *resourceAllocator) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
pod := attrs.Pod

for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
err := m.deviceManager.Allocate(pod, &container)
if err != nil {
return lifecycle.PodAdmitResult{
Message: fmt.Sprintf("Allocate failed due to %v, which is unexpected", err),
Reason: "UnexpectedAdmissionError",
Admit: false,
}
}

err = m.cpuManager.Allocate(pod, &container)
if err != nil {
return lifecycle.PodAdmitResult{
Message: fmt.Sprintf("Allocate failed due to %v, which is unexpected", err),
Reason: "UnexpectedAdmissionError",
Admit: false,
}
}
}

return lifecycle.PodAdmitResult{Admit: true}
}

func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kubelet/cm/container_manager_stub.go
Expand Up @@ -117,8 +117,8 @@ func (cm *containerManagerStub) ShouldResetExtendedResourceCapacity() bool {
return cm.shouldResetExtendedResourceCapacity
}

func (cm *containerManagerStub) GetTopologyPodAdmitHandler() topologymanager.Manager {
return nil
func (cm *containerManagerStub) GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler {
return topologymanager.NewFakeManager()
}

func (cm *containerManagerStub) UpdateAllocatedDevices() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/cm/container_manager_windows.go
Expand Up @@ -178,7 +178,7 @@ func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool {
return false
}

func (cm *containerManagerImpl) GetTopologyPodAdmitHandler() topologymanager.Manager {
func (cm *containerManagerImpl) GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler {
return nil
}

Expand Down
46 changes: 19 additions & 27 deletions pkg/kubelet/cm/cpumanager/cpu_manager.go
Expand Up @@ -55,6 +55,11 @@ type Manager interface {
// Start is called during Kubelet initialization.
Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error

// Called to trigger the allocation of CPUs to a container. This must be
// called at some point prior to the AddContainer() call for a container,
// e.g. at pod admission time.
Allocate(pod *v1.Pod, container *v1.Container) error

// AddContainer is called between container create and container start
// so that initial CPU affinity settings can be written through to the
// container runtime before the first process begins to execute.
Expand Down Expand Up @@ -206,46 +211,41 @@ func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesRe
return nil
}

func (m *manager) AddContainer(p *v1.Pod, c *v1.Container, containerID string) error {
func (m *manager) Allocate(p *v1.Pod, c *v1.Container) error {
m.Lock()
// Proactively remove CPUs from init containers that have already run.
// They are guaranteed to have run to completion before any other
// container is run.
for _, initContainer := range p.Spec.InitContainers {
if c.Name != initContainer.Name {
err := m.policyRemoveContainerByRef(string(p.UID), initContainer.Name)
if err != nil {
klog.Warningf("[cpumanager] unable to remove init container (pod: %s, container: %s, error: %v)", string(p.UID), initContainer.Name, err)
}
}
}
defer m.Unlock()

// Call down into the policy to assign this container CPUs if required.
err := m.policyAddContainer(p, c, containerID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, is the idea here to delay adding the container to the containerMap to the reconciliation loop? It seems that reconciliation loop might not get far enough in the loop to actually perform the Add().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the it will be added to the containerMap as soon as it can be (i.e. at the top AddContainer() below).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I asked because I only see m.containerMap.Add() being called in the reconciliation loop, but maybe it's abstracted somewhere...

err := m.policy.Allocate(m.state, p, c)
if err != nil {
klog.Errorf("[cpumanager] AddContainer error: %v", err)
m.Unlock()
klog.Errorf("[cpumanager] Allocate error: %v", err)
return err
}

// Get the CPUs just assigned to the container (or fall back to the default
// CPUSet if none were assigned).
return nil
}

func (m *manager) AddContainer(p *v1.Pod, c *v1.Container, containerID string) error {
m.Lock()
// Get the CPUs assigned to the container during Allocate()
// (or fall back to the default CPUSet if none were assigned).
cpus := m.state.GetCPUSetOrDefault(string(p.UID), c.Name)
m.Unlock()

if !cpus.IsEmpty() {
err = m.updateContainerCPUSet(containerID, cpus)
err := m.updateContainerCPUSet(containerID, cpus)
if err != nil {
klog.Errorf("[cpumanager] AddContainer error: error updating CPUSet for container (pod: %s, container: %s, container id: %s, err: %v)", p.Name, c.Name, containerID, err)
m.Lock()
err := m.policyRemoveContainerByID(containerID)
err := m.policyRemoveContainerByRef(string(p.UID), c.Name)
if err != nil {
klog.Errorf("[cpumanager] AddContainer rollback state error: %v", err)
}
m.Unlock()
}
return err
}

klog.V(5).Infof("[cpumanager] update container resources is skipped due to cpu set is empty")
return nil
}
Expand All @@ -263,14 +263,6 @@ func (m *manager) RemoveContainer(containerID string) error {
return nil
}

func (m *manager) policyAddContainer(p *v1.Pod, c *v1.Container, containerID string) error {
err := m.policy.AddContainer(m.state, p, c)
if err == nil {
m.containerMap.Add(string(p.UID), c.Name, containerID)
}
return err
}

func (m *manager) policyRemoveContainerByID(containerID string) error {
podUID, containerName, err := m.containerMap.GetContainerRef(containerID)
if err != nil {
Expand Down
104 changes: 64 additions & 40 deletions pkg/kubelet/cm/cpumanager/cpu_manager_test.go
Expand Up @@ -104,7 +104,7 @@ func (p *mockPolicy) Start(s state.State) error {
return p.err
}

func (p *mockPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container) error {
func (p *mockPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error {
return p.err
}

Expand Down Expand Up @@ -223,34 +223,38 @@ func TestCPUManagerAdd(t *testing.T) {
cpuset.NewCPUSet(),
topologymanager.NewFakeManager())
testCases := []struct {
description string
updateErr error
policy Policy
expCPUSet cpuset.CPUSet
expErr error
description string
updateErr error
policy Policy
expCPUSet cpuset.CPUSet
expAllocateErr error
expAddContainerErr error
}{
{
description: "cpu manager add - no error",
updateErr: nil,
policy: testPolicy,
expCPUSet: cpuset.NewCPUSet(3, 4),
expErr: nil,
description: "cpu manager add - no error",
updateErr: nil,
policy: testPolicy,
expCPUSet: cpuset.NewCPUSet(3, 4),
expAllocateErr: nil,
expAddContainerErr: nil,
},
{
description: "cpu manager add - policy add container error",
updateErr: nil,
policy: &mockPolicy{
err: fmt.Errorf("fake reg error"),
},
expCPUSet: cpuset.NewCPUSet(1, 2, 3, 4),
expErr: fmt.Errorf("fake reg error"),
expCPUSet: cpuset.NewCPUSet(1, 2, 3, 4),
expAllocateErr: fmt.Errorf("fake reg error"),
expAddContainerErr: nil,
},
{
description: "cpu manager add - container update error",
updateErr: fmt.Errorf("fake update error"),
policy: testPolicy,
expCPUSet: cpuset.NewCPUSet(1, 2, 3, 4),
expErr: fmt.Errorf("fake update error"),
description: "cpu manager add - container update error",
updateErr: fmt.Errorf("fake update error"),
policy: testPolicy,
expCPUSet: cpuset.NewCPUSet(1, 2, 3, 4),
expAllocateErr: nil,
expAddContainerErr: fmt.Errorf("fake update error"),
},
}

Expand All @@ -271,10 +275,16 @@ func TestCPUManagerAdd(t *testing.T) {

pod := makePod("fakePod", "fakeContainer", "2", "2")
container := &pod.Spec.Containers[0]
err := mgr.AddContainer(pod, container, "fakeID")
if !reflect.DeepEqual(err, testCase.expErr) {
err := mgr.Allocate(pod, container)
if !reflect.DeepEqual(err, testCase.expAllocateErr) {
t.Errorf("CPU Manager Allocate() error (%v). expected error: %v but got: %v",
testCase.description, testCase.expAllocateErr, err)
}

err = mgr.AddContainer(pod, container, "fakeID")
if !reflect.DeepEqual(err, testCase.expAddContainerErr) {
t.Errorf("CPU Manager AddContainer() error (%v). expected error: %v but got: %v",
testCase.description, testCase.expErr, err)
testCase.description, testCase.expAddContainerErr, err)
}
if !testCase.expCPUSet.Equals(mgr.state.GetDefaultCPUSet()) {
t.Errorf("CPU Manager AddContainer() error (%v). expected cpuset: %v but got: %v",
Expand Down Expand Up @@ -494,7 +504,12 @@ func TestCPUManagerAddWithInitContainers(t *testing.T) {
testCase.expCSets...)

for i := range containers {
err := mgr.AddContainer(testCase.pod, &containers[i], containerIDs[i])
err := mgr.Allocate(testCase.pod, &containers[i])
if err != nil {
t.Errorf("StaticPolicy Allocate() error (%v). unexpected error for container id: %v: %v",
testCase.description, containerIDs[i], err)
}
err = mgr.AddContainer(testCase.pod, &containers[i], containerIDs[i])
if err != nil {
t.Errorf("StaticPolicy AddContainer() error (%v). unexpected error for container id: %v: %v",
testCase.description, containerIDs[i], err)
Expand Down Expand Up @@ -970,25 +985,28 @@ func TestCPUManagerAddWithResvList(t *testing.T) {
cpuset.NewCPUSet(0),
topologymanager.NewFakeManager())
testCases := []struct {
description string
updateErr error
policy Policy
expCPUSet cpuset.CPUSet
expErr error
description string
updateErr error
policy Policy
expCPUSet cpuset.CPUSet
expAllocateErr error
expAddContainerErr error
}{
{
description: "cpu manager add - no error",
updateErr: nil,
policy: testPolicy,
expCPUSet: cpuset.NewCPUSet(0, 3),
expErr: nil,
description: "cpu manager add - no error",
updateErr: nil,
policy: testPolicy,
expCPUSet: cpuset.NewCPUSet(0, 3),
expAllocateErr: nil,
expAddContainerErr: nil,
},
{
description: "cpu manager add - container update error",
updateErr: fmt.Errorf("fake update error"),
policy: testPolicy,
expCPUSet: cpuset.NewCPUSet(0, 1, 2, 3),
expErr: fmt.Errorf("fake update error"),
description: "cpu manager add - container update error",
updateErr: fmt.Errorf("fake update error"),
policy: testPolicy,
expCPUSet: cpuset.NewCPUSet(0, 1, 2, 3),
expAllocateErr: nil,
expAddContainerErr: fmt.Errorf("fake update error"),
},
}

Expand All @@ -1009,10 +1027,16 @@ func TestCPUManagerAddWithResvList(t *testing.T) {

pod := makePod("fakePod", "fakeContainer", "2", "2")
container := &pod.Spec.Containers[0]
err := mgr.AddContainer(pod, container, "fakeID")
if !reflect.DeepEqual(err, testCase.expErr) {
err := mgr.Allocate(pod, container)
if !reflect.DeepEqual(err, testCase.expAllocateErr) {
t.Errorf("CPU Manager Allocate() error (%v). expected error: %v but got: %v",
testCase.description, testCase.expAllocateErr, err)
}

err = mgr.AddContainer(pod, container, "fakeID")
if !reflect.DeepEqual(err, testCase.expAddContainerErr) {
t.Errorf("CPU Manager AddContainer() error (%v). expected error: %v but got: %v",
testCase.description, testCase.expErr, err)
testCase.description, testCase.expAddContainerErr, err)
}
if !testCase.expCPUSet.Equals(mgr.state.GetDefaultCPUSet()) {
t.Errorf("CPU Manager AddContainer() error (%v). expected cpuset: %v but got: %v",
Expand Down
5 changes: 5 additions & 0 deletions pkg/kubelet/cm/cpumanager/fake_cpu_manager.go
Expand Up @@ -40,6 +40,11 @@ func (m *fakeManager) Policy() Policy {
return NewNonePolicy()
}

func (m *fakeManager) Allocate(pod *v1.Pod, container *v1.Container) error {
klog.Infof("[fake cpumanager] Allocate (pod: %s, container: %s", pod.Name, container.Name)
return nil
}

func (m *fakeManager) AddContainer(pod *v1.Pod, container *v1.Container, containerID string) error {
klog.Infof("[fake cpumanager] AddContainer (pod: %s, container: %s, container id: %s)", pod.Name, container.Name, containerID)
return nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/kubelet/cm/cpumanager/policy.go
Expand Up @@ -26,8 +26,8 @@ import (
type Policy interface {
Name() string
Start(s state.State) error
// AddContainer call is idempotent
AddContainer(s state.State, pod *v1.Pod, container *v1.Container) error
// Allocate call is idempotent
Allocate(s state.State, pod *v1.Pod, container *v1.Container) error
// RemoveContainer call is idempotent
RemoveContainer(s state.State, podUID string, containerName string) error
// GetTopologyHints implements the topologymanager.HintProvider Interface
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/cm/cpumanager/policy_none.go
Expand Up @@ -44,7 +44,7 @@ func (p *nonePolicy) Start(s state.State) error {
return nil
}

func (p *nonePolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container) error {
func (p *nonePolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error {
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/kubelet/cm/cpumanager/policy_none_test.go
Expand Up @@ -33,7 +33,7 @@ func TestNonePolicyName(t *testing.T) {
}
}

func TestNonePolicyAdd(t *testing.T) {
func TestNonePolicyAllocate(t *testing.T) {
policy := &nonePolicy{}

st := &mockState{
Expand All @@ -44,9 +44,9 @@ func TestNonePolicyAdd(t *testing.T) {
testPod := makePod("fakePod", "fakeContainer", "1000m", "1000m")

container := &testPod.Spec.Containers[0]
err := policy.AddContainer(st, testPod, container)
err := policy.Allocate(st, testPod, container)
if err != nil {
t.Errorf("NonePolicy AddContainer() error. expected no error but got: %v", err)
t.Errorf("NonePolicy Allocate() error. expected no error but got: %v", err)
}
}

Expand Down