Skip to content

Commit

Permalink
Merge pull request kubernetes#119706 from ffromani/automated-cherry-p…
Browse files Browse the repository at this point in the history
…ick-of-#119432-upstream-release-1.26-1690878669

[1.26] kubelet: devices: skip allocation for running pods kubernetes#118635
  • Loading branch information
k8s-ci-robot committed Aug 30, 2023
2 parents a5071b9 + 180aa30 commit 77d1f7a
Show file tree
Hide file tree
Showing 8 changed files with 316 additions and 99 deletions.
28 changes: 4 additions & 24 deletions pkg/kubelet/cm/container_manager_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ import (
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm/admission"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
"k8s.io/kubernetes/pkg/kubelet/cm/dra"
Expand Down Expand Up @@ -569,16 +568,17 @@ func (cm *containerManagerImpl) Start(node *v1.Node,
localStorageCapacityIsolation bool) error {
ctx := context.Background()

containerMap, containerRunningSet := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService)

// Initialize CPU manager
containerMap := buildContainerMapFromRuntime(ctx, runtimeService)
err := cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap)
if err != nil {
return fmt.Errorf("start cpu manager error: %v", err)
}

// Initialize memory manager
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.MemoryManager) {
containerMap := buildContainerMapFromRuntime(ctx, runtimeService)
containerMap, _ := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService)
err := cm.memoryManager.Start(memorymanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap)
if err != nil {
return fmt.Errorf("start memory manager error: %v", err)
Expand Down Expand Up @@ -642,7 +642,7 @@ func (cm *containerManagerImpl) Start(node *v1.Node,
}

// Starts device manager.
if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady); err != nil {
if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady, containerMap, containerRunningSet); err != nil {
return err
}

Expand Down Expand Up @@ -744,26 +744,6 @@ func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList {
}
}

func buildContainerMapFromRuntime(ctx context.Context, runtimeService internalapi.RuntimeService) containermap.ContainerMap {
podSandboxMap := make(map[string]string)
podSandboxList, _ := runtimeService.ListPodSandbox(ctx, nil)
for _, p := range podSandboxList {
podSandboxMap[p.Id] = p.Metadata.Uid
}

containerMap := containermap.NewContainerMap()
containerList, _ := runtimeService.ListContainers(ctx, nil)
for _, c := range containerList {
if _, exists := podSandboxMap[c.PodSandboxId]; !exists {
klog.InfoS("no PodSandBox found for the container", "podSandboxId", c.PodSandboxId, "containerName", c.Metadata.Name, "containerId", c.Id)
continue
}
containerMap.Add(podSandboxMap[c.PodSandboxId], c.Metadata.Name, c.Id)
}

return containerMap
}

func isProcessRunningInHost(pid int) (bool, error) {
// Get init pid namespace.
initPidNs, err := os.Readlink("/proc/1/ns/pid")
Expand Down
6 changes: 5 additions & 1 deletion pkg/kubelet/cm/container_manager_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ limitations under the License.
package cm

import (
"context"
"fmt"

"k8s.io/klog/v2"
Expand Down Expand Up @@ -87,8 +88,11 @@ func (cm *containerManagerImpl) Start(node *v1.Node,
}
}

ctx := context.Background()
containerMap, containerRunningSet := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService)

// Starts device manager.
if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady); err != nil {
if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady, containerMap, containerRunningSet); err != nil {
return err
}

Expand Down
64 changes: 62 additions & 2 deletions pkg/kubelet/cm/devicemanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
plugin "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/plugin/v1beta1"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
Expand Down Expand Up @@ -97,6 +98,15 @@ type ManagerImpl struct {

// pendingAdmissionPod contain the pod during the admission phase
pendingAdmissionPod *v1.Pod

// containerMap provides a mapping from (pod, container) -> containerID
// for all containers in a pod. Used to detect pods running across a restart
containerMap containermap.ContainerMap

// containerRunningSet identifies which container among those present in `containerMap`
// was reported running by the container runtime when `containerMap` was computed.
// Used to detect pods running across a restart
containerRunningSet sets.String
}

type endpointInfo struct {
Expand Down Expand Up @@ -216,6 +226,7 @@ func (m *ManagerImpl) PluginConnected(resourceName string, p plugin.DevicePlugin
defer m.mutex.Unlock()
m.endpoints[resourceName] = endpointInfo{e, options}

klog.V(2).InfoS("Device plugin connected", "resourceName", resourceName)
return nil
}

Expand Down Expand Up @@ -246,6 +257,7 @@ func (m *ManagerImpl) PluginListAndWatchReceiver(resourceName string, resp *plug
}

func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices []pluginapi.Device) {
healthyCount := 0
m.mutex.Lock()
m.healthyDevices[resourceName] = sets.NewString()
m.unhealthyDevices[resourceName] = sets.NewString()
Expand All @@ -254,6 +266,7 @@ func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices [
m.allDevices[resourceName][dev.ID] = dev
if dev.Health == pluginapi.Healthy {
m.healthyDevices[resourceName].Insert(dev.ID)
healthyCount++
} else {
m.unhealthyDevices[resourceName].Insert(dev.ID)
}
Expand All @@ -262,6 +275,7 @@ func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices [
if err := m.writeCheckpoint(); err != nil {
klog.ErrorS(err, "Writing checkpoint encountered")
}
klog.V(2).InfoS("Processed device updates for resource", "resourceName", resourceName, "totalCount", len(devices), "healthyCount", healthyCount)
}

// GetWatcherHandler returns the plugin handler
Expand All @@ -277,11 +291,13 @@ func (m *ManagerImpl) checkpointFile() string {
// Start starts the Device Plugin Manager and start initialization of
// podDevices and allocatedDevices information from checkpointed state and
// starts device plugin registration service.
func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {
func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, initialContainers containermap.ContainerMap, initialContainerRunningSet sets.String) error {
klog.V(2).InfoS("Starting Device Plugin manager")

m.activePods = activePods
m.sourcesReady = sourcesReady
m.containerMap = initialContainers
m.containerRunningSet = initialContainerRunningSet

// Loads in allocatedDevices information from disk.
err := m.readCheckpoint()
Expand Down Expand Up @@ -545,10 +561,31 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi
}
}

// We have 3 major flows to handle:
// 1. kubelet running, normal allocation (needed > 0, container being [re]created). Steady state and most common case by far and large.
// 2. kubelet restart. In this scenario every other component of the stack (device plugins, app container, runtime) is still running.
// 3. node reboot. In this scenario device plugins may not be running yet when we try to allocate devices.
// note: if we get this far the runtime is surely running. This is usually enforced at OS level by startup system services dependencies.

// First we take care of the exceptional flow (scenarios 2 and 3). In both flows, kubelet is reinitializing, and while kubelet is initializing, sources are NOT all ready.
// Is this a simple kubelet restart (scenario 2)? To distinguish, we use the informations we got for runtime. If we are asked to allocate devices for containers reported
// running, then it can only be a kubelet restart. On node reboot the runtime and the containers were also shut down. Then, if the container was running, it can only be
// because it already has access to all the required devices, so we got nothing to do and we can bail out.
if !m.sourcesReady.AllReady() && m.isContainerAlreadyRunning(podUID, contName) {
klog.V(3).InfoS("container detected running, nothing to do", "deviceNumber", needed, "resourceName", resource, "podUID", string(podUID), "containerName", contName)
return nil, nil
}

// We dealt with scenario 2. If we got this far it's either scenario 3 (node reboot) or scenario 1 (steady state, normal flow).
klog.V(3).InfoS("Need devices to allocate for pod", "deviceNumber", needed, "resourceName", resource, "podUID", string(podUID), "containerName", contName)
healthyDevices, hasRegistered := m.healthyDevices[resource]

// Check if resource registered with devicemanager
// The following checks are expected to fail only happen on scenario 3 (node reboot).
// The kubelet is reinitializing and got a container from sources. But there's no ordering, so an app container may attempt allocation _before_ the device plugin was created,
// has registered and reported back to kubelet the devices.
// This can only happen on scenario 3 because at steady state (scenario 1) the scheduler prevents pod to be sent towards node which don't report enough devices.
// Note: we need to check the device health and registration status *before* we check how many devices are needed, doing otherwise caused issue #109595
// Note: if the scheduler is bypassed, we fall back in scenario 1, so we still need these checks.
if !hasRegistered {
return nil, fmt.Errorf("cannot allocate unregistered device %s", resource)
}
Expand All @@ -563,7 +600,10 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi
return nil, fmt.Errorf("previously allocated devices are no longer healthy; cannot allocate unhealthy devices %s", resource)
}

// We handled the known error paths in scenario 3 (node reboot), so from now on we can fall back in a common path.
// We cover container restart on kubelet steady state with the same flow.
if needed == 0 {
klog.V(3).InfoS("no devices needed, nothing to do", "deviceNumber", needed, "resourceName", resource, "podUID", string(podUID), "containerName", contName)
// No change, no work.
return nil, nil
}
Expand Down Expand Up @@ -1040,3 +1080,23 @@ func (m *ManagerImpl) setPodPendingAdmission(pod *v1.Pod) {

m.pendingAdmissionPod = pod
}

func (m *ManagerImpl) isContainerAlreadyRunning(podUID, cntName string) bool {
cntID, err := m.containerMap.GetContainerID(podUID, cntName)
if err != nil {
klog.V(4).InfoS("container not found in the initial map, assumed NOT running", "podUID", podUID, "containerName", cntName, "err", err)
return false
}

// note that if container runtime is down when kubelet restarts, this set will be empty,
// so on kubelet restart containers will again fail admission, hitting https://github.com/kubernetes/kubernetes/issues/118559 again.
// This scenario should however be rare enough.
if !m.containerRunningSet.Has(cntID) {
klog.V(4).InfoS("container not present in the initial running set", "podUID", podUID, "containerName", cntName, "containerID", cntID)
return false
}

// Once we make it here we know we have a running container.
klog.V(4).InfoS("container found in the initial set, assumed running", "podUID", podUID, "containerName", cntName, "containerID", cntID)
return true
}
7 changes: 6 additions & 1 deletion pkg/kubelet/cm/devicemanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
watcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
plugin "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/plugin/v1beta1"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
Expand Down Expand Up @@ -272,7 +273,9 @@ func setupDeviceManager(t *testing.T, devs []*pluginapi.Device, callback monitor
return []*v1.Pod{}
}

err = w.Start(activePods, &sourcesReadyStub{})
// test steady state, initialization where sourcesReady, containerMap and containerRunningSet
// are relevant will be tested with a different flow
err = w.Start(activePods, &sourcesReadyStub{}, containermap.NewContainerMap(), sets.NewString())
require.NoError(t, err)

return w, updateChan
Expand Down Expand Up @@ -1000,6 +1003,8 @@ func TestPodContainerDeviceToAllocate(t *testing.T) {
unhealthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
podDevices: newPodDevices(),
activePods: func() []*v1.Pod { return []*v1.Pod{} },
sourcesReady: &sourcesReadyStub{},
}

testManager.podDevices.insert("pod1", "con1", resourceName1,
Expand Down
4 changes: 3 additions & 1 deletion pkg/kubelet/cm/devicemanager/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
Expand All @@ -31,7 +33,7 @@ import (
// Manager manages all the Device Plugins running on a node.
type Manager interface {
// Start starts device plugin registration service.
Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error
Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, initialContainers containermap.ContainerMap, initialContainerRunningSet sets.String) error

// Allocate configures and assigns devices to a container in a pod. From
// the requested device resources, Allocate will communicate with the
Expand Down
32 changes: 32 additions & 0 deletions pkg/kubelet/cm/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@ limitations under the License.
package cm

import (
"context"

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
internalapi "k8s.io/cri-api/pkg/apis"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
)

Expand All @@ -44,3 +51,28 @@ func hardEvictionReservation(thresholds []evictionapi.Threshold, capacity v1.Res
}
return ret
}

func buildContainerMapAndRunningSetFromRuntime(ctx context.Context, runtimeService internalapi.RuntimeService) (containermap.ContainerMap, sets.String) {
podSandboxMap := make(map[string]string)
podSandboxList, _ := runtimeService.ListPodSandbox(ctx, nil)
for _, p := range podSandboxList {
podSandboxMap[p.Id] = p.Metadata.Uid
}

runningSet := sets.NewString()
containerMap := containermap.NewContainerMap()
containerList, _ := runtimeService.ListContainers(ctx, nil)
for _, c := range containerList {
if _, exists := podSandboxMap[c.PodSandboxId]; !exists {
klog.InfoS("No PodSandBox found for the container", "podSandboxId", c.PodSandboxId, "containerName", c.Metadata.Name, "containerId", c.Id)
continue
}
podUID := podSandboxMap[c.PodSandboxId]
containerMap.Add(podUID, c.Metadata.Name, c.Id)
if c.State == runtimeapi.ContainerState_CONTAINER_RUNNING {
klog.V(4).InfoS("Container reported running", "podSandboxId", c.PodSandboxId, "podUID", podUID, "containerName", c.Metadata.Name, "containerId", c.Id)
runningSet.Insert(c.Id)
}
}
return containerMap, runningSet
}
2 changes: 1 addition & 1 deletion pkg/kubelet/cm/topologymanager/topology_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (m *manager) RemoveContainer(containerID string) error {
}

func (m *manager) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
klog.InfoS("Topology Admit Handler")
klog.InfoS("Topology Admit Handler", "podUID", attrs.Pod.UID, "podNamespace", attrs.Pod.Namespace, "podName", attrs.Pod.Name)
pod := attrs.Pod

return m.scope.Admit(pod)
Expand Down

0 comments on commit 77d1f7a

Please sign in to comment.