Skip to content

Commit

Permalink
kubelet: Handle UID reuse in pod worker
Browse files Browse the repository at this point in the history
If a pod is killed (no longer wanted) and then a subsequent create/
add/update event is seen in the pod worker, assume that a pod UID
was reused (as it could be in static pods) and have the next
SyncKnownPods after the pod terminates remove the worker history so
that the config loop can restart the static pod, as well as return
to the caller the fact that this termination was not final.

The housekeeping loop then reconciles the desired state of the Kubelet
(pods in pod manager that are not in a terminal state, i.e. admitted
pods) with the pod worker by resubmitting those pods. This adds a
small amount of latency (2s) when a pod UID is reused and the pod
is terminated and restarted.
  • Loading branch information
smarterclayton authored and rphillips committed Sep 16, 2021
1 parent a6539a6 commit 2ff2780
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 16 deletions.
5 changes: 5 additions & 0 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -2227,6 +2227,8 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
}
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
// TODO: move inside syncPod and make reentrant
// https://github.com/kubernetes/kubernetes/issues/105014
kl.probeManager.AddPod(pod)
}
}
Expand Down Expand Up @@ -2261,6 +2263,9 @@ func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
if err := kl.deletePod(pod); err != nil {
klog.V(2).InfoS("Failed to delete pod", "pod", klog.KObj(pod), "err", err)
}
// TODO: move inside syncTerminatingPod|syncTerminatedPod (we should stop probing
// once the pod kill is acknowledged and during eviction)
// https://github.com/kubernetes/kubernetes/issues/105014
kl.probeManager.RemovePod(pod)
}
}
Expand Down
62 changes: 57 additions & 5 deletions pkg/kubelet/kubelet_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -978,14 +978,37 @@ func (kl *Kubelet) filterOutTerminatedPods(pods []*v1.Pod) []*v1.Pod {
if kl.podWorkers.IsPodKnownTerminated(p.UID) {
continue
}
if p.Status.Phase == v1.PodSucceeded || p.Status.Phase == v1.PodFailed {
// terminal pods are considered inactive UNLESS they are actively terminating
if kl.isAdmittedPodTerminal(p) && !kl.podWorkers.IsPodTerminationRequested(p.UID) {
continue
}
filteredPods = append(filteredPods, p)
}
return filteredPods
}

// isAdmittedPodTerminal returns true if the provided config source pod is in
// a terminal phase, or if the Kubelet has already indicated the pod has reached
// a terminal phase but the config source has not accepted it yet. This method
// should only be used within the pod configuration loops that notify the pod
// worker, other components should treat the pod worker as authoritative.
func (kl *Kubelet) isAdmittedPodTerminal(pod *v1.Pod) bool {
// pods are considered inactive if the config source has observed a
// terminal phase (if the Kubelet recorded that the pod reached a terminal
// phase the pod should never be restarted)
if pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed {
return true
}
// a pod that has been marked terminal within the Kubelet is considered
// inactive (may have been rejected by Kubelet admision)
if status, ok := kl.statusManager.GetPodStatus(pod.UID); ok {
if status.Phase == v1.PodSucceeded || status.Phase == v1.PodFailed {
return true
}
}
return false
}

// removeOrphanedPodStatuses removes obsolete entries in podStatus where
// the pod is no longer considered bound to this node.
func (kl *Kubelet) removeOrphanedPodStatuses(pods []*v1.Pod, mirrorPods []*v1.Pod) {
Expand Down Expand Up @@ -1067,13 +1090,16 @@ func (kl *Kubelet) HandlePodCleanups() error {
// cleanup of pod cgroups.
runningPods := make(map[types.UID]sets.Empty)
possiblyRunningPods := make(map[types.UID]sets.Empty)
restartablePods := make(map[types.UID]sets.Empty)
for uid, sync := range workingPods {
switch sync {
case SyncPodWork:
case SyncPod:
runningPods[uid] = struct{}{}
possiblyRunningPods[uid] = struct{}{}
case TerminatingPodWork:
case TerminatingPod:
possiblyRunningPods[uid] = struct{}{}
case TerminatedAndRecreatedPod:
restartablePods[uid] = struct{}{}
}
}

Expand All @@ -1089,8 +1115,8 @@ func (kl *Kubelet) HandlePodCleanups() error {
return err
}
for _, runningPod := range runningRuntimePods {
switch workType, ok := workingPods[runningPod.ID]; {
case ok && workType == SyncPodWork, ok && workType == TerminatingPodWork:
switch workerState, ok := workingPods[runningPod.ID]; {
case ok && workerState == SyncPod, ok && workerState == TerminatingPod:
// if the pod worker is already in charge of this pod, we don't need to do anything
continue
default:
Expand Down Expand Up @@ -1157,6 +1183,32 @@ func (kl *Kubelet) HandlePodCleanups() error {
}

kl.backOff.GC()

// If two pods with the same UID are observed in rapid succession, we need to
// resynchronize the pod worker after the first pod completes and decide whether
// to restart the pod. This happens last to avoid confusing the desired state
// in other components and to increase the likelihood transient OS failures during
// container start are mitigated. In general only static pods will ever reuse UIDs
// since the apiserver uses randomly generated UUIDv4 UIDs with a very low
// probability of collision.
for uid := range restartablePods {
pod, ok := allPodsByUID[uid]
if !ok {
continue
}
if kl.isAdmittedPodTerminal(pod) {
klog.V(3).InfoS("Pod is restartable after termination due to UID reuse, but pod phase is terminal", "pod", klog.KObj(pod), "podUID", pod.UID)
continue
}
start := kl.clock.Now()
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
klog.V(3).InfoS("Pod is restartable after termination due to UID reuse", "pod", klog.KObj(pod), "podUID", pod.UID)
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
// TODO: move inside syncPod and make reentrant
// https://github.com/kubernetes/kubernetes/issues/105014
kl.probeManager.AddPod(pod)
}

return nil
}

Expand Down
66 changes: 56 additions & 10 deletions pkg/kubelet/pod_workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ type UpdatePodOptions struct {
type PodWorkType int

const (
// SyncPodSync is when the pod is expected to be started and running.
// SyncPodWork is when the pod is expected to be started and running.
SyncPodWork PodWorkType = iota
// TerminatingPodWork is when the pod is no longer being set up, but some
// containers may be running and are being torn down.
Expand All @@ -101,6 +101,26 @@ const (
TerminatedPodWork
)

// PodWorkType classifies the status of pod as seen by the pod worker - setup (sync),
// teardown of containers (terminating), cleanup (terminated), or recreated with the
// same UID (kill -> create while terminating)
type PodWorkerState int

const (
// SyncPod is when the pod is expected to be started and running.
SyncPod PodWorkerState = iota
// TerminatingPod is when the pod is no longer being set up, but some
// containers may be running and are being torn down.
TerminatingPod
// TerminatedPod indicates the pod is stopped, can have no more running
// containers, and any foreground cleanup can be executed.
TerminatedPod
// TerminatedAndRecreatedPod indicates that after the pod was terminating a
// request to recreate the pod was received. The pod is terminated and can
// now be restarted by sending a create event to the pod worker.
TerminatedAndRecreatedPod
)

// podWork is the internal changes
type podWork struct {
// WorkType is the type of sync to perform - sync (create), terminating (stop
Expand All @@ -127,8 +147,8 @@ type PodWorkers interface {
// and have been terminated for a significant period of time. Once this method
// has been called once, the workers are assumed to be fully initialized and
// subsequent calls to ShouldPodContentBeRemoved on unknown pods will return
// true.
SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorkType
// true. It returns a map describing the state of each known pod worker.
SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorkerState

// IsPodKnownTerminated returns true if the provided pod UID is known by the pod
// worker to be terminated. If the pod has been force deleted and the pod worker
Expand Down Expand Up @@ -254,6 +274,11 @@ type podSyncStatus struct {
// to remove the pod. A terminal pod (Succeeded/Failed) will have
// termination status until the pod is deleted.
finished bool
// restartRequested is true if the pod worker was informed the pod is
// expected to exist (update type of create, update, or sync) after
// it has been killed. When known pods are synced, any pod that is
// terminated and has restartRequested will have its history cleared.
restartRequested bool
// notifyPostTerminating will be closed once the pod transitions to
// terminated. After the pod is in terminated state, nothing should be
// added to this list.
Expand Down Expand Up @@ -514,6 +539,19 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
p.podSyncStatuses[uid] = status
}

// if an update is received that implies the pod should be running, but we are already terminating a pod by
// that UID, assume that two pods with the same UID were created in close temporal proximity (usually static
// pod but it's possible for an apiserver to extremely rarely do something similar) - flag the sync status
// to indicate that after the pod terminates it should be reset to "not running" to allow a subsequent add/update
// to start the pod worker again
if status.IsTerminationRequested() {
if options.UpdateType == kubetypes.SyncPodCreate {
status.restartRequested = true
klog.V(4).InfoS("Pod is terminating but has been requested to restart with same UID, will be reconciled later", "pod", klog.KObj(pod), "podUID", pod.UID)
return
}
}

// once a pod is terminated by UID, it cannot reenter the pod worker (until the UID is purged by housekeeping)
if status.IsFinished() {
klog.V(4).InfoS("Pod is finished processing, no further updates", "pod", klog.KObj(pod), "podUID", pod.UID)
Expand Down Expand Up @@ -965,8 +1003,8 @@ func (p *podWorkers) contextForWorker(uid types.UID) context.Context {
// to UpdatePods for new pods. It returns a map of known workers that are not finished
// with a value of SyncPodTerminated, SyncPodKill, or SyncPodSync depending on whether
// the pod is terminated, terminating, or syncing.
func (p *podWorkers) SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorkType {
workers := make(map[types.UID]PodWorkType)
func (p *podWorkers) SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorkerState {
workers := make(map[types.UID]PodWorkerState)
known := make(map[types.UID]struct{})
for _, pod := range desiredPods {
known[pod.UID] = struct{}{}
Expand All @@ -977,16 +1015,20 @@ func (p *podWorkers) SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorkT

p.podsSynced = true
for uid, status := range p.podSyncStatuses {
if _, exists := known[uid]; !exists {
if _, exists := known[uid]; !exists || status.restartRequested {
p.removeTerminatedWorker(uid)
}
switch {
case !status.terminatedAt.IsZero():
workers[uid] = TerminatedPodWork
if status.restartRequested {
workers[uid] = TerminatedAndRecreatedPod
} else {
workers[uid] = TerminatedPod
}
case !status.terminatingAt.IsZero():
workers[uid] = TerminatingPodWork
workers[uid] = TerminatingPod
default:
workers[uid] = SyncPodWork
workers[uid] = SyncPod
}
}
return workers
Expand All @@ -1009,7 +1051,11 @@ func (p *podWorkers) removeTerminatedWorker(uid types.UID) {
return
}

klog.V(4).InfoS("Pod has been terminated and is no longer known to the kubelet, remove all history", "podUID", uid)
if status.restartRequested {
klog.V(4).InfoS("Pod has been terminated but another pod with the same UID was created, remove history to allow restart", "podUID", uid)
} else {
klog.V(4).InfoS("Pod has been terminated and is no longer known to the kubelet, remove all history", "podUID", uid)
}
delete(p.podSyncStatuses, uid)
delete(p.podUpdates, uid)
delete(p.lastUndeliveredWorkUpdate, uid)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/pod_workers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (f *fakePodWorkers) UpdatePod(options UpdatePodOptions) {
}
}

func (f *fakePodWorkers) SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorkType {
func (f *fakePodWorkers) SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorkerState {
return nil
}

Expand Down

0 comments on commit 2ff2780

Please sign in to comment.