Skip to content

Commit

Permalink
Merge pull request kubernetes#115331 from mimowo/kubelet-fail-pending…
Browse files Browse the repository at this point in the history
…-deleted-pods

Give terminal phase correctly to all pods that will not be restarted
  • Loading branch information
k8s-ci-robot committed Mar 16, 2023
2 parents 76d3510 + 3d68f36 commit e1c2af3
Show file tree
Hide file tree
Showing 12 changed files with 604 additions and 233 deletions.
34 changes: 28 additions & 6 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -1479,7 +1479,7 @@ func (kl *Kubelet) initializeRuntimeDependentModules() {
os.Exit(1)
}
// eviction manager must start after cadvisor because it needs to know if the container runtime has a dedicated imagefs
kl.evictionManager.Start(kl.StatsProvider, kl.GetActivePods, kl.podResourcesAreReclaimed, evictionMonitoringPeriod)
kl.evictionManager.Start(kl.StatsProvider, kl.GetActivePods, kl.PodIsFinished, evictionMonitoringPeriod)

// container log manager must start after container runtime is up to retrieve information from container runtime
// and inform container to reopen log file after log rotation.
Expand Down Expand Up @@ -1689,7 +1689,7 @@ func (kl *Kubelet) SyncPod(_ context.Context, updateType kubetypes.SyncPodType,
}

// Generate final API pod status with pod and status manager status
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus, false)
// The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576)
// TODO(random-liu): After writing pod spec into container labels, check whether pod is using host network, and
// set pod IP to hostIP directly in runtime.GetPodStatus
Expand Down Expand Up @@ -1941,7 +1941,7 @@ func (kl *Kubelet) SyncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus
klog.V(4).InfoS("SyncTerminatingPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
defer klog.V(4).InfoS("SyncTerminatingPod exit", "pod", klog.KObj(pod), "podUID", pod.UID)

apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus, false)
if podStatusFn != nil {
podStatusFn(&apiPodStatus)
}
Expand Down Expand Up @@ -2014,6 +2014,13 @@ func (kl *Kubelet) SyncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus
}
}

// Compute and update the status in cache once the pods are no longer running.
// The computation is done here to ensure the pod status used for it contains
// information about the container end states (including exit codes) - when
// SyncTerminatedPod is called the containers may already be removed.
apiPodStatus = kl.generateAPIPodStatus(pod, podStatus, true)
kl.statusManager.SetPodStatus(pod, apiPodStatus)

// we have successfully stopped all containers, the pod is terminating, our status is "done"
klog.V(4).InfoS("Pod termination stopped all running containers", "pod", klog.KObj(pod), "podUID", pod.UID)

Expand Down Expand Up @@ -2050,8 +2057,8 @@ func (kl *Kubelet) SyncTerminatingRuntimePod(_ context.Context, runningPod *kube
}

// SyncTerminatedPod cleans up a pod that has terminated (has no running containers).
// The invocations in this call are expected to tear down what PodResourcesAreReclaimed checks (which
// gates pod deletion). When this method exits the pod is expected to be ready for cleanup. This method
// The invocations in this call are expected to tear down all pod resources.
// When this method exits the pod is expected to be ready for cleanup. This method
// reduces the latency of pod cleanup but is not guaranteed to get called in all scenarios.
//
// Because the kubelet has no local store of information, all actions in this method that modify
Expand All @@ -2071,7 +2078,7 @@ func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus

// generate the final status of the pod
// TODO: should we simply fold this into TerminatePod? that would give a single pod update
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus, true)

kl.statusManager.SetPodStatus(pod, apiPodStatus)

Expand All @@ -2082,6 +2089,21 @@ func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus
}
klog.V(4).InfoS("Pod termination unmounted volumes", "pod", klog.KObj(pod), "podUID", pod.UID)

if !kl.keepTerminatedPodVolumes {
// This waiting loop relies on the background cleanup which starts after pod workers respond
// true for ShouldPodRuntimeBeRemoved, which happens after `SyncTerminatingPod` is completed.
if err := wait.PollUntilContextCancel(ctx, 100*time.Millisecond, true, func(ctx context.Context) (bool, error) {
volumesExist := kl.podVolumesExist(pod.UID)
if volumesExist {
klog.V(3).InfoS("Pod is terminated, but some volumes have not been cleaned up", "pod", klog.KObj(pod), "podUID", pod.UID)
}
return !volumesExist, nil
}); err != nil {
return err
}
klog.V(3).InfoS("Pod termination cleaned up volume paths", "pod", klog.KObj(pod), "podUID", pod.UID)
}

// After volume unmount is complete, let the secret and configmap managers know we're done with this pod
if kl.secretManager != nil {
kl.secretManager.UnregisterPod(pod)
Expand Down
89 changes: 23 additions & 66 deletions pkg/kubelet/kubelet_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -901,26 +901,6 @@ func (kl *Kubelet) getPullSecretsForPod(pod *v1.Pod) []v1.Secret {
return pullSecrets
}

func countRunningContainerStatus(status v1.PodStatus) int {
var runningContainers int
for _, c := range status.InitContainerStatuses {
if c.State.Running != nil {
runningContainers++
}
}
for _, c := range status.ContainerStatuses {
if c.State.Running != nil {
runningContainers++
}
}
for _, c := range status.EphemeralContainerStatuses {
if c.State.Running != nil {
runningContainers++
}
}
return runningContainers
}

// PodCouldHaveRunningContainers returns true if the pod with the given UID could still have running
// containers. This returns false if the pod has not yet been started or the pod is unknown.
func (kl *Kubelet) PodCouldHaveRunningContainers(pod *v1.Pod) bool {
Expand All @@ -941,48 +921,11 @@ func (kl *Kubelet) PodCouldHaveRunningContainers(pod *v1.Pod) bool {
return false
}

// PodResourcesAreReclaimed returns true if all required node-level resources that a pod was consuming have
// been reclaimed by the kubelet. Reclaiming resources is a prerequisite to deleting a pod from the API server.
func (kl *Kubelet) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool {
if kl.podWorkers.CouldHaveRunningContainers(pod.UID) {
// We shouldn't delete pods that still have running containers
klog.V(3).InfoS("Pod is terminated, but some containers are still running", "pod", klog.KObj(pod))
return false
}
if count := countRunningContainerStatus(status); count > 0 {
// We shouldn't delete pods until the reported pod status contains no more running containers (the previous
// check ensures no more status can be generated, this check verifies we have seen enough of the status)
klog.V(3).InfoS("Pod is terminated, but some container status has not yet been reported", "pod", klog.KObj(pod), "running", count)
return false
}
if kl.podVolumesExist(pod.UID) && !kl.keepTerminatedPodVolumes {
// We shouldn't delete pods whose volumes have not been cleaned up if we are not keeping terminated pod volumes
klog.V(3).InfoS("Pod is terminated, but some volumes have not been cleaned up", "pod", klog.KObj(pod))
return false
}
if kl.kubeletConfiguration.CgroupsPerQOS {
pcm := kl.containerManager.NewPodContainerManager()
if pcm.Exists(pod) {
klog.V(3).InfoS("Pod is terminated, but pod cgroup sandbox has not been cleaned up", "pod", klog.KObj(pod))
return false
}
}

// Note: we leave pod containers to be reclaimed in the background since dockershim requires the
// container for retrieving logs and we want to make sure logs are available until the pod is
// physically deleted.

klog.V(3).InfoS("Pod is terminated and all resources are reclaimed", "pod", klog.KObj(pod))
return true
}

// podResourcesAreReclaimed simply calls PodResourcesAreReclaimed with the most up-to-date status.
func (kl *Kubelet) podResourcesAreReclaimed(pod *v1.Pod) bool {
status, ok := kl.statusManager.GetPodStatus(pod.UID)
if !ok {
status = pod.Status
}
return kl.PodResourcesAreReclaimed(pod, status)
// PodIsFinished returns true if SyncTerminatedPod is finished, ie.
// all required node-level resources that a pod was consuming have
// been reclaimed by the kubelet.
func (kl *Kubelet) PodIsFinished(pod *v1.Pod) bool {
return kl.podWorkers.ShouldPodBeFinished(pod.UID)
}

// filterOutInactivePods returns pods that are not in a terminal phase
Expand Down Expand Up @@ -1440,7 +1383,8 @@ func (kl *Kubelet) GetKubeletContainerLogs(ctx context.Context, podFullName, con
}

// getPhase returns the phase of a pod given its container info.
func getPhase(spec *v1.PodSpec, info []v1.ContainerStatus) v1.PodPhase {
func getPhase(pod *v1.Pod, info []v1.ContainerStatus, podIsTerminal bool) v1.PodPhase {
spec := pod.Spec
pendingInitialization := 0
failedInitialization := 0
for _, container := range spec.InitContainers {
Expand Down Expand Up @@ -1517,6 +1461,19 @@ func getPhase(spec *v1.PodSpec, info []v1.ContainerStatus) v1.PodPhase {
// one container is running
return v1.PodRunning
case running == 0 && stopped > 0 && unknown == 0:
// The pod is terminal so its containers won't be restarted regardless
// of the restart policy.
if podIsTerminal {
// TODO(#116484): Also assign terminal phase to static pods.
if !kubetypes.IsStaticPod(pod) {
// All containers are terminated in success
if stopped == succeeded {
return v1.PodSucceeded
}
// There is at least one failure
return v1.PodFailed
}
}
// All containers are terminated
if spec.RestartPolicy == v1.RestartPolicyAlways {
// All containers are in the process of restarting
Expand Down Expand Up @@ -1567,8 +1524,8 @@ func (kl *Kubelet) determinePodResizeStatus(pod *v1.Pod, podStatus *v1.PodStatus

// generateAPIPodStatus creates the final API pod status for a pod, given the
// internal pod status. This method should only be called from within sync*Pod methods.
func (kl *Kubelet) generateAPIPodStatus(pod *v1.Pod, podStatus *kubecontainer.PodStatus) v1.PodStatus {
klog.V(3).InfoS("Generating pod status", "pod", klog.KObj(pod))
func (kl *Kubelet) generateAPIPodStatus(pod *v1.Pod, podStatus *kubecontainer.PodStatus, podIsTerminal bool) v1.PodStatus {
klog.V(3).InfoS("Generating pod status", "podIsTerminal", podIsTerminal, "pod", klog.KObj(pod))
// use the previous pod status, or the api status, as the basis for this pod
oldPodStatus, found := kl.statusManager.GetPodStatus(pod.UID)
if !found {
Expand All @@ -1580,7 +1537,7 @@ func (kl *Kubelet) generateAPIPodStatus(pod *v1.Pod, podStatus *kubecontainer.Po
}
// calculate the next phase and preserve reason
allStatus := append(append([]v1.ContainerStatus{}, s.ContainerStatuses...), s.InitContainerStatuses...)
s.Phase = getPhase(&pod.Spec, allStatus)
s.Phase = getPhase(pod, allStatus, podIsTerminal)
klog.V(4).InfoS("Got phase for pod", "pod", klog.KObj(pod), "oldPhase", oldPodStatus.Phase, "phase", s.Phase)

// Perform a three-way merge between the statuses from the status manager,
Expand Down

0 comments on commit e1c2af3

Please sign in to comment.