Skip to content

Commit

Permalink
UPSTREAM: 108749: kubelet: Delay writing a terminal phase until the p…
Browse files Browse the repository at this point in the history
…od is terminated

Other components must know when the Kubelet has released critical
resources for terminal pods. Do not set the phase in the apiserver
to terminal until all containers are stopped and cannot restart.

As a consequence of this change, the Kubelet must explicitly transition
a terminal pod to the terminating state in the pod worker which is
handled by returning a new isTerminal boolean from syncPod.

Finally, if a pod with init containers hasn't been initialized yet,
don't default container statuses or not yet attempted init containers
to the unknown failure state.
  • Loading branch information
smarterclayton authored and rphillips committed Mar 18, 2022
1 parent 5c84e52 commit 22a3ab6
Show file tree
Hide file tree
Showing 10 changed files with 869 additions and 88 deletions.
61 changes: 41 additions & 20 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -1519,23 +1519,36 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
}

// syncPod is the transaction script for the sync of a single pod (setting up)
// a pod. The reverse (teardown) is handled in syncTerminatingPod and
// syncTerminatedPod. If syncPod exits without error, then the pod runtime
// state is in sync with the desired configuration state (pod is running).
// If syncPod exits with a transient error, the next invocation of syncPod
// is expected to make progress towards reaching the runtime state.
// a pod. This method is reentrant and expected to converge a pod towards the
// desired state of the spec. The reverse (teardown) is handled in
// syncTerminatingPod and syncTerminatedPod. If syncPod exits without error,
// then the pod runtime state is in sync with the desired configuration state
// (pod is running). If syncPod exits with a transient error, the next
// invocation of syncPod is expected to make progress towards reaching the
// runtime state. syncPod exits with isTerminal when the pod was detected to
// have reached a terminal lifecycle phase due to container exits (for
// RestartNever or RestartOnFailure) and the next method invoked will by
// syncTerminatingPod.
//
// Arguments:
//
// o - the SyncPodOptions for this invocation
// updateType - whether this is a create (first time) or an update, should
// only be used for metrics since this method must be reentrant
// pod - the pod that is being set up
// mirrorPod - the mirror pod known to the kubelet for this pod, if any
// podStatus - the most recent pod status observed for this pod which can
// be used to determine the set of actions that should be taken during
// this loop of syncPod
//
// The workflow is:
// * If the pod is being created, record pod worker start latency
// * Call generateAPIPodStatus to prepare an v1.PodStatus for the pod
// * If the pod is being seen as running for the first time, record pod
// start latency
// * Update the status of the pod in the status manager
// * Kill the pod if it should not be running due to soft admission
// * Stop the pod's containers if it should not be running due to soft
// admission
// * Ensure any background tracking for a runnable pod is started
// * Create a mirror pod if the pod is a static pod, and does not
// already have a mirror pod
// * Create the data directories for the pod if they do not exist
Expand All @@ -1549,10 +1562,12 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
//
// This operation writes all events that are dispatched in order to provide
// the most accurate information possible about an error situation to aid debugging.
// Callers should not throw an event if this operation returns an error.
func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
// Callers should not write an event if this operation returns an error.
func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
klog.V(4).InfoS("syncPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
defer klog.V(4).InfoS("syncPod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
defer func() {
klog.V(4).InfoS("syncPod exit", "pod", klog.KObj(pod), "podUID", pod.UID, "isTerminal", isTerminal)
}()

// Latency measurements for the main workflow are relative to the
// first time the pod was seen by the API server.
Expand Down Expand Up @@ -1584,11 +1599,17 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType
for _, ipInfo := range apiPodStatus.PodIPs {
podStatus.IPs = append(podStatus.IPs, ipInfo.IP)
}

if len(podStatus.IPs) == 0 && len(apiPodStatus.PodIP) > 0 {
podStatus.IPs = []string{apiPodStatus.PodIP}
}

// If the pod is terminal, we don't need to continue to setup the pod
if apiPodStatus.Phase == v1.PodSucceeded || apiPodStatus.Phase == v1.PodFailed {
kl.statusManager.SetPodStatus(pod, apiPodStatus)
isTerminal = true
return isTerminal, nil
}

// If the pod should not be running, we request the pod's containers be stopped. This is not the same
// as termination (we want to stop the pod, but potentially restart it later if soft admission allows
// it later). Set the status and phase appropriately
Expand Down Expand Up @@ -1637,13 +1658,13 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType
// Return an error to signal that the sync loop should back off.
syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message)
}
return syncErr
return false, syncErr
}

// If the network plugin is not ready, only start the pod if it uses the host network
if err := kl.runtimeState.networkErrors(); err != nil && !kubecontainer.IsHostNetworkPod(pod) {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.NetworkNotReady, "%s: %v", NetworkNotReadyErrorMsg, err)
return fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, err)
return false, fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, err)
}

// Create Cgroups for the pod and apply resource parameters
Expand Down Expand Up @@ -1690,7 +1711,7 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType
}
if err := pcm.EnsureExists(pod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToCreatePodContainer, "unable to ensure pod container exists: %v", err)
return fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err)
return false, fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err)
}
}
}
Expand Down Expand Up @@ -1731,7 +1752,7 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType
if err := kl.makePodDataDirs(pod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err)
klog.ErrorS(err, "Unable to make pod data directories for pod", "pod", klog.KObj(pod))
return err
return false, err
}

// Volume manager will not mount volumes for terminating pods
Expand All @@ -1741,7 +1762,7 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType
if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err)
klog.ErrorS(err, "Unable to attach or mount volumes for pod; skipping pod", "pod", klog.KObj(pod))
return err
return false, err
}
}

Expand All @@ -1756,15 +1777,15 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType
for _, r := range result.SyncResults {
if r.Error != kubecontainer.ErrCrashLoopBackOff && r.Error != images.ErrImagePullBackOff {
// Do not record an event here, as we keep all event logging for sync pod failures
// local to container runtime so we get better errors
return err
// local to container runtime, so we get better errors.
return false, err
}
}

return nil
return false, nil
}

return nil
return false, nil
}

// syncTerminatingPod is expected to terminate all running containers in a pod. Once this method
Expand Down
8 changes: 7 additions & 1 deletion pkg/kubelet/kubelet_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,12 @@ func countRunningContainerStatus(status v1.PodStatus) int {
return runningContainers
}

// PodCouldHaveRunningContainers returns true if the pod with the given UID could still have running
// containers. This returns false if the pod has not yet been started or the pod is unknown.
func (kl *Kubelet) PodCouldHaveRunningContainers(pod *v1.Pod) bool {
return kl.podWorkers.CouldHaveRunningContainers(pod.UID)
}

// PodResourcesAreReclaimed returns true if all required node-level resources that a pod was consuming have
// been reclaimed by the kubelet. Reclaiming resources is a prerequisite to deleting a pod from the API server.
func (kl *Kubelet) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool {
Expand Down Expand Up @@ -1440,7 +1446,7 @@ func getPhase(spec *v1.PodSpec, info []v1.ContainerStatus) v1.PodPhase {
}

// generateAPIPodStatus creates the final API pod status for a pod, given the
// internal pod status.
// internal pod status. This method should only be called from within sync*Pod methods.
func (kl *Kubelet) generateAPIPodStatus(pod *v1.Pod, podStatus *kubecontainer.PodStatus) v1.PodStatus {
klog.V(3).InfoS("Generating pod status", "pod", klog.KObj(pod))

Expand Down
28 changes: 20 additions & 8 deletions pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,9 +490,9 @@ func TestDispatchWorkOfCompletedPod(t *testing.T) {
kubelet := testKubelet.kubelet
var got bool
kubelet.podWorkers = &fakePodWorkers{
syncPodFn: func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
syncPodFn: func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) {
got = true
return nil
return false, nil
},
cache: kubelet.podCache,
t: t,
Expand Down Expand Up @@ -569,9 +569,9 @@ func TestDispatchWorkOfActivePod(t *testing.T) {
kubelet := testKubelet.kubelet
var got bool
kubelet.podWorkers = &fakePodWorkers{
syncPodFn: func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
syncPodFn: func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) {
got = true
return nil
return false, nil
},
cache: kubelet.podCache,
t: t,
Expand Down Expand Up @@ -1220,8 +1220,11 @@ func TestCreateMirrorPod(t *testing.T) {
pod.Annotations[kubetypes.ConfigSourceAnnotationKey] = "file"
pods := []*v1.Pod{pod}
kl.podManager.SetPods(pods)
err := kl.syncPod(context.Background(), updateType, pod, nil, &kubecontainer.PodStatus{})
isTerminal, err := kl.syncPod(context.Background(), updateType, pod, nil, &kubecontainer.PodStatus{})
assert.NoError(t, err)
if isTerminal {
t.Fatalf("pod should not be terminal: %#v", pod)
}
podFullName := kubecontainer.GetPodFullName(pod)
assert.True(t, manager.HasPod(podFullName), "Expected mirror pod %q to be created", podFullName)
assert.Equal(t, 1, manager.NumOfPods(), "Expected only 1 mirror pod %q, got %+v", podFullName, manager.GetPods())
Expand Down Expand Up @@ -1252,8 +1255,11 @@ func TestDeleteOutdatedMirrorPod(t *testing.T) {

pods := []*v1.Pod{pod, mirrorPod}
kl.podManager.SetPods(pods)
err := kl.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, mirrorPod, &kubecontainer.PodStatus{})
isTerminal, err := kl.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, mirrorPod, &kubecontainer.PodStatus{})
assert.NoError(t, err)
if isTerminal {
t.Fatalf("pod should not be terminal: %#v", pod)
}
name := kubecontainer.GetPodFullName(pod)
creates, deletes := manager.GetCounts(name)
if creates != 1 || deletes != 1 {
Expand Down Expand Up @@ -1409,13 +1415,19 @@ func TestNetworkErrorsWithoutHostNetwork(t *testing.T) {
})

kubelet.podManager.SetPods([]*v1.Pod{pod})
err := kubelet.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{})
isTerminal, err := kubelet.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{})
assert.Error(t, err, "expected pod with hostNetwork=false to fail when network in error")
if isTerminal {
t.Fatalf("pod should not be terminal: %#v", pod)
}

pod.Annotations[kubetypes.ConfigSourceAnnotationKey] = kubetypes.FileSource
pod.Spec.HostNetwork = true
err = kubelet.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{})
isTerminal, err = kubelet.syncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{})
assert.NoError(t, err, "expected pod with hostNetwork=true to succeed when network in error")
if isTerminal {
t.Fatalf("pod should not be terminal: %#v", pod)
}
}

func TestFilterOutInactivePods(t *testing.T) {
Expand Down
48 changes: 43 additions & 5 deletions pkg/kubelet/pod_workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ type PodWorkers interface {
}

// the function to invoke to perform a sync (reconcile the kubelet state to the desired shape of the pod)
type syncPodFnType func(ctx context.Context, updateType kubetypes.SyncPodType, pod *v1.Pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error
type syncPodFnType func(ctx context.Context, updateType kubetypes.SyncPodType, pod *v1.Pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error)

// the function to invoke to terminate a pod (ensure no running processes are present)
type syncTerminatingPodFnType func(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, runningPod *kubecontainer.Pod, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error
Expand Down Expand Up @@ -886,6 +886,7 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) {
}

klog.V(4).InfoS("Processing pod event", "pod", klog.KObj(pod), "podUID", pod.UID, "updateType", update.WorkType)
var isTerminal bool
err := func() error {
// The worker is responsible for ensuring the sync method sees the appropriate
// status updates on resyncs (the result of the last sync), transitions to
Expand Down Expand Up @@ -932,13 +933,14 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) {
err = p.syncTerminatingPodFn(ctx, pod, status, update.Options.RunningPod, gracePeriod, podStatusFn)

default:
err = p.syncPodFn(ctx, update.Options.UpdateType, pod, update.Options.MirrorPod, status)
isTerminal, err = p.syncPodFn(ctx, update.Options.UpdateType, pod, update.Options.MirrorPod, status)
}

lastSyncTime = time.Now()
return err
}()

var phaseTransition bool
switch {
case err == context.Canceled:
// when the context is cancelled we expect an update to already be queued
Expand Down Expand Up @@ -969,10 +971,17 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) {
}
// otherwise we move to the terminating phase
p.completeTerminating(pod)
phaseTransition = true

case isTerminal:
// if syncPod indicated we are now terminal, set the appropriate pod status to move to terminating
klog.V(4).InfoS("Pod is terminal", "pod", klog.KObj(pod), "podUID", pod.UID, "updateType", update.WorkType)
p.completeSync(pod)
phaseTransition = true
}

// queue a retry for errors if necessary, then put the next event in the channel if any
p.completeWork(pod, err)
// queue a retry if necessary, then put the next event in the channel if any
p.completeWork(pod, phaseTransition, err)
if start := update.Options.StartTime; !start.IsZero() {
metrics.PodWorkerDuration.WithLabelValues(update.Options.UpdateType.String()).Observe(metrics.SinceInSeconds(start))
}
Expand Down Expand Up @@ -1003,6 +1012,33 @@ func (p *podWorkers) acknowledgeTerminating(pod *v1.Pod) PodStatusFunc {
return nil
}

// completeSync is invoked when syncPod completes successfully and indicates the pod is now terminal and should
// be terminated. This happens when the natural pod lifecycle completes - any pod which is not RestartAlways
// exits. Unnatural completions, such as evictions, API driven deletion or phase transition, are handled by
// UpdatePod.
func (p *podWorkers) completeSync(pod *v1.Pod) {
p.podLock.Lock()
defer p.podLock.Unlock()

klog.V(4).InfoS("Pod indicated lifecycle completed naturally and should now terminate", "pod", klog.KObj(pod), "podUID", pod.UID)

if status, ok := p.podSyncStatuses[pod.UID]; ok {
if status.terminatingAt.IsZero() {
status.terminatingAt = time.Now()
} else {
klog.V(4).InfoS("Pod worker attempted to set terminatingAt twice, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID)
}
status.startedTerminating = true
}

p.lastUndeliveredWorkUpdate[pod.UID] = podWork{
WorkType: TerminatingPodWork,
Options: UpdatePodOptions{
Pod: pod,
},
}
}

// completeTerminating is invoked when syncTerminatingPod completes successfully, which means
// no container is running, no container will be started in the future, and we are ready for
// cleanup. This updates the termination state which prevents future syncs and will ensure
Expand Down Expand Up @@ -1115,9 +1151,11 @@ func (p *podWorkers) completeUnstartedTerminated(pod *v1.Pod) {

// completeWork requeues on error or the next sync interval and then immediately executes any pending
// work.
func (p *podWorkers) completeWork(pod *v1.Pod, syncErr error) {
func (p *podWorkers) completeWork(pod *v1.Pod, phaseTransition bool, syncErr error) {
// Requeue the last update if the last sync returned error.
switch {
case phaseTransition:
p.workQueue.Enqueue(pod.UID, 0)
case syncErr == nil:
// No error; requeue at the regular resync interval.
p.workQueue.Enqueue(pod.UID, wait.Jitter(p.resyncInterval, workerResyncIntervalJitterFactor))
Expand Down

0 comments on commit 22a3ab6

Please sign in to comment.