Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-4.12] WIP: DNM: kubelet: devices: skip allocation for running pods #1636

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 6 additions & 24 deletions pkg/kubelet/cm/container_manager_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package cm

import (
"bytes"
"context"
"fmt"
"io/ioutil"
"os"
Expand Down Expand Up @@ -50,7 +51,6 @@ import (
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm/admission"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
Expand Down Expand Up @@ -557,9 +557,11 @@ func (cm *containerManagerImpl) Start(node *v1.Node,
runtimeService internalapi.RuntimeService,
localStorageCapacityIsolation bool) error {

ctx := context.Background()
containerMap, containerRunningSet := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService)

// Initialize CPU manager
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
containerMap := buildContainerMapFromRuntime(runtimeService)
err := cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap)
if err != nil {
return fmt.Errorf("start cpu manager error: %v", err)
Expand All @@ -568,7 +570,7 @@ func (cm *containerManagerImpl) Start(node *v1.Node,

// Initialize memory manager
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.MemoryManager) {
containerMap := buildContainerMapFromRuntime(runtimeService)
containerMap, _ := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService)
err := cm.memoryManager.Start(memorymanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap)
if err != nil {
return fmt.Errorf("start memory manager error: %v", err)
Expand Down Expand Up @@ -632,7 +634,7 @@ func (cm *containerManagerImpl) Start(node *v1.Node,
}

// Starts device manager.
if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady); err != nil {
if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady, containerMap, containerRunningSet); err != nil {
return err
}

Expand Down Expand Up @@ -726,26 +728,6 @@ func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList {
}
}

func buildContainerMapFromRuntime(runtimeService internalapi.RuntimeService) containermap.ContainerMap {
podSandboxMap := make(map[string]string)
podSandboxList, _ := runtimeService.ListPodSandbox(nil)
for _, p := range podSandboxList {
podSandboxMap[p.Id] = p.Metadata.Uid
}

containerMap := containermap.NewContainerMap()
containerList, _ := runtimeService.ListContainers(nil)
for _, c := range containerList {
if _, exists := podSandboxMap[c.PodSandboxId]; !exists {
klog.InfoS("no PodSandBox found for the container", "podSandboxId", c.PodSandboxId, "containerName", c.Metadata.Name, "containerId", c.Id)
continue
}
containerMap.Add(podSandboxMap[c.PodSandboxId], c.Metadata.Name, c.Id)
}

return containerMap
}

func isProcessRunningInHost(pid int) (bool, error) {
// Get init pid namespace.
initPidNs, err := os.Readlink("/proc/1/ns/pid")
Expand Down
6 changes: 5 additions & 1 deletion pkg/kubelet/cm/container_manager_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ limitations under the License.
package cm

import (
"context"
"fmt"

"k8s.io/klog/v2"
Expand Down Expand Up @@ -84,8 +85,11 @@ func (cm *containerManagerImpl) Start(node *v1.Node,
}
}

ctx := context.Background()
containerMap, containerRunningSet := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService)

// Starts device manager.
if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady); err != nil {
if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady, containerMap, containerRunningSet); err != nil {
return err
}

Expand Down
68 changes: 66 additions & 2 deletions pkg/kubelet/cm/devicemanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
plugin "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/plugin/v1beta1"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
Expand Down Expand Up @@ -99,6 +100,15 @@ type ManagerImpl struct {

// pendingAdmissionPod contain the pod during the admission phase
pendingAdmissionPod *v1.Pod

// containerMap provides a mapping from (pod, container) -> containerID
// for all containers in a pod. Used to detect pods running across a restart
containerMap containermap.ContainerMap

// containerRunningSet identifies which container among those present in `containerMap`
// was reported running by the container runtime when `containerMap` was computed.
// Used to detect pods running across a restart
containerRunningSet sets.String
}

type endpointInfo struct {
Expand Down Expand Up @@ -214,6 +224,7 @@ func (m *ManagerImpl) PluginConnected(resourceName string, p plugin.DevicePlugin
defer m.mutex.Unlock()
m.endpoints[resourceName] = endpointInfo{e, options}

klog.V(2).InfoS("Device plugin connected", "resourceName", resourceName)
return nil
}

Expand All @@ -238,6 +249,7 @@ func (m *ManagerImpl) PluginListAndWatchReceiver(resourceName string, resp *plug
}

func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices []pluginapi.Device) {
healthyCount := 0
m.mutex.Lock()
m.healthyDevices[resourceName] = sets.NewString()
m.unhealthyDevices[resourceName] = sets.NewString()
Expand All @@ -246,6 +258,7 @@ func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices [
m.allDevices[resourceName][dev.ID] = dev
if dev.Health == pluginapi.Healthy {
m.healthyDevices[resourceName].Insert(dev.ID)
healthyCount++
} else {
m.unhealthyDevices[resourceName].Insert(dev.ID)
}
Expand All @@ -254,6 +267,7 @@ func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices [
if err := m.writeCheckpoint(); err != nil {
klog.ErrorS(err, "Writing checkpoint encountered")
}
klog.V(2).InfoS("Processed device updates for resource", "resourceName", resourceName, "totalCount", len(devices), "healthyCount", healthyCount)
}

// GetWatcherHandler returns the plugin handler
Expand All @@ -269,11 +283,13 @@ func (m *ManagerImpl) checkpointFile() string {
// Start starts the Device Plugin Manager and start initialization of
// podDevices and allocatedDevices information from checkpointed state and
// starts device plugin registration service.
func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {
func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, initialContainers containermap.ContainerMap, initialContainerRunningSet sets.String) error {
klog.V(2).InfoS("Starting Device Plugin manager")

m.activePods = activePods
m.sourcesReady = sourcesReady
m.containerMap = initialContainers
m.containerRunningSet = initialContainerRunningSet

// Loads in allocatedDevices information from disk.
err := m.readCheckpoint()
Expand Down Expand Up @@ -537,10 +553,35 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi
}
}

// We have 3 major flows to handle:
// 1. kubelet running, normal allocation (needed > 0, container being [re]created). Steady state and most common case by far and large.
// 2. kubelet restart. In this scenario every other component of the stack (device plugins, app container, runtime) is still running.
// 3. node reboot. In this scenario device plugins may not be running yet when we try to allocate devices.
// note: if we get this far the runtime is surely running. This is usually enforced at OS level by startup system services dependencies.

// First we take care of the exceptional flow (scenarios 2 and 3). In both flows, kubelet is reinitializing, and while kubelet is initializing, sources are NOT all ready.
// Is this a simple kubelet restart (scenario 2)? To distinguish, we use the informations we got for runtime. If we are asked to allocate devices for containers reported
// running, then it can only be a kubelet restart. On node reboot the runtime and the containers were also shut down. Then, if the container was running, it can only be
// because it already has access to all the required devices, so we got nothing to do and we can bail out.

relaxedCheck := isRelaxedCheckEnabled()
klog.V(2).InfoS("container running check", "relaxed", relaxedCheck)

if !relaxedCheck && !m.sourcesReady.AllReady() && m.isContainerAlreadyRunning(podUID, contName) {
klog.V(3).InfoS("container detected running, nothing to do", "deviceNumber", needed, "resourceName", resource, "podUID", string(podUID), "containerName", contName)
return nil, nil
}

// We dealt with scenario 2. If we got this far it's either scenario 3 (node reboot) or scenario 1 (steady state, normal flow).
klog.V(3).InfoS("Need devices to allocate for pod", "deviceNumber", needed, "resourceName", resource, "podUID", string(podUID), "containerName", contName)
healthyDevices, hasRegistered := m.healthyDevices[resource]

// Check if resource registered with devicemanager
// The following checks are expected to fail only happen on scenario 3 (node reboot).
// The kubelet is reinitializing and got a container from sources. But there's no ordering, so an app container may attempt allocation _before_ the device plugin was created,
// has registered and reported back to kubelet the devices.
// This can only happen on scenario 3 because at steady state (scenario 1) the scheduler prevents pod to be sent towards node which don't report enough devices.
// Note: we need to check the device health and registration status *before* we check how many devices are needed, doing otherwise caused issue #109595
// Note: if the scheduler is bypassed, we fall back in scenario 1, so we still need these checks.
if !hasRegistered {
return nil, fmt.Errorf("cannot allocate unregistered device %s", resource)
}
Expand All @@ -555,7 +596,10 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi
return nil, fmt.Errorf("previously allocated devices are no longer healthy; cannot allocate unhealthy devices %s", resource)
}

// We handled the known error paths in scenario 3 (node reboot), so from now on we can fall back in a common path.
// We cover container restart on kubelet steady state with the same flow.
if needed == 0 {
klog.V(3).InfoS("no devices needed, nothing to do", "deviceNumber", needed, "resourceName", resource, "podUID", string(podUID), "containerName", contName)
// No change, no work.
return nil, nil
}
Expand Down Expand Up @@ -1035,3 +1079,23 @@ func (m *ManagerImpl) setPodPendingAdmission(pod *v1.Pod) {

m.pendingAdmissionPod = pod
}

func (m *ManagerImpl) isContainerAlreadyRunning(podUID, cntName string) bool {
cntID, err := m.containerMap.GetContainerID(podUID, cntName)
if err != nil {
klog.V(4).InfoS("container not found in the initial map, assumed NOT running", "podUID", podUID, "containerName", cntName, "err", err)
return false
}

// note that if container runtime is down when kubelet restarts, this set will be empty,
// so on kubelet restart containers will again fail admission, hitting https://github.com/kubernetes/kubernetes/issues/118559 again.
// This scenario should however be rare enough.
if !m.containerRunningSet.Has(cntID) {
klog.V(4).InfoS("container not present in the initial running set", "podUID", podUID, "containerName", cntName, "containerID", cntID)
return false
}

// Once we make it here we know we have a running container.
klog.V(4).InfoS("container found in the initial set, assumed running", "podUID", podUID, "containerName", cntName, "containerID", cntID)
return true
}
4 changes: 3 additions & 1 deletion pkg/kubelet/cm/devicemanager/manager_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package devicemanager

import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
Expand All @@ -34,7 +36,7 @@ func NewManagerStub() (*ManagerStub, error) {
}

// Start simply returns nil.
func (h *ManagerStub) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {
func (h *ManagerStub) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, initialContainers containermap.ContainerMap, initialContainerRunningSet sets.String) error {
return nil
}

Expand Down
7 changes: 6 additions & 1 deletion pkg/kubelet/cm/devicemanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
watcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
plugin "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/plugin/v1beta1"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
Expand Down Expand Up @@ -273,7 +274,9 @@ func setupDeviceManager(t *testing.T, devs []*pluginapi.Device, callback monitor
return []*v1.Pod{}
}

err = w.Start(activePods, &sourcesReadyStub{})
// test steady state, initialization where sourcesReady, containerMap and containerRunningSet
// are relevant will be tested with a different flow
err = w.Start(activePods, &sourcesReadyStub{}, containermap.NewContainerMap(), sets.NewString())
require.NoError(t, err)

return w, updateChan
Expand Down Expand Up @@ -1001,6 +1004,8 @@ func TestPodContainerDeviceToAllocate(t *testing.T) {
unhealthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
podDevices: newPodDevices(),
activePods: func() []*v1.Pod { return []*v1.Pod{} },
sourcesReady: &sourcesReadyStub{},
}

testManager.podDevices.insert("pod1", "con1", resourceName1,
Expand Down
38 changes: 38 additions & 0 deletions pkg/kubelet/cm/devicemanager/strictallocation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
Copyright 2023 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package devicemanager

import "os"

var (
devicemanagerRelaxedCheckEnabled bool
devicemanagerRelaxedCheckFilename = "/etc/kubernetes/openshift-devicemanager-relaxed-check"
)

func init() {
readEnablementFile()
}

func readEnablementFile() {
if _, err := os.Stat(devicemanagerRelaxedCheckFilename); err == nil {
devicemanagerRelaxedCheckEnabled = true
}
}

func isRelaxedCheckEnabled() bool {
return devicemanagerRelaxedCheckEnabled
}
4 changes: 3 additions & 1 deletion pkg/kubelet/cm/devicemanager/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
Expand All @@ -31,7 +33,7 @@ import (
// Manager manages all the Device Plugins running on a node.
type Manager interface {
// Start starts device plugin registration service.
Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error
Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, initialContainers containermap.ContainerMap, initialContainerRunningSet sets.String) error

// Allocate configures and assigns devices to a container in a pod. From
// the requested device resources, Allocate will communicate with the
Expand Down
32 changes: 32 additions & 0 deletions pkg/kubelet/cm/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@ limitations under the License.
package cm

import (
"context"

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
internalapi "k8s.io/cri-api/pkg/apis"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
)

Expand All @@ -44,3 +51,28 @@ func hardEvictionReservation(thresholds []evictionapi.Threshold, capacity v1.Res
}
return ret
}

func buildContainerMapAndRunningSetFromRuntime(ctx context.Context, runtimeService internalapi.RuntimeService) (containermap.ContainerMap, sets.String) {
podSandboxMap := make(map[string]string)
podSandboxList, _ := runtimeService.ListPodSandbox(nil)
for _, p := range podSandboxList {
podSandboxMap[p.Id] = p.Metadata.Uid
}

runningSet := sets.NewString()
containerMap := containermap.NewContainerMap()
containerList, _ := runtimeService.ListContainers(nil)
for _, c := range containerList {
if _, exists := podSandboxMap[c.PodSandboxId]; !exists {
klog.InfoS("No PodSandBox found for the container", "podSandboxId", c.PodSandboxId, "containerName", c.Metadata.Name, "containerId", c.Id)
continue
}
podUID := podSandboxMap[c.PodSandboxId]
containerMap.Add(podUID, c.Metadata.Name, c.Id)
if c.State == runtimeapi.ContainerState_CONTAINER_RUNNING {
klog.V(4).InfoS("Container reported running", "podSandboxId", c.PodSandboxId, "podUID", podUID, "containerName", c.Metadata.Name, "containerId", c.Id)
runningSet.Insert(c.Id)
}
}
return containerMap, runningSet
}