Skip to content

Commit

Permalink
kubelet: Clean up a static pod that has been terminated before starting
Browse files Browse the repository at this point in the history
- Allow a podWorker to start if it is blocked by a pod that has been
  terminated before starting
- When a pod can't start AND has already been terminated, exit cleanly
- Add a unit test that exercises race conditions in pod workers
  • Loading branch information
gjkim42 authored and ehashman committed Feb 10, 2022
1 parent 6f11dc0 commit 9f347da
Show file tree
Hide file tree
Showing 2 changed files with 909 additions and 58 deletions.
131 changes: 92 additions & 39 deletions pkg/kubelet/pod_workers.go
Expand Up @@ -392,6 +392,11 @@ type podWorkers struct {
syncTerminatingPodFn syncTerminatingPodFnType
syncTerminatedPodFn syncTerminatedPodFnType

// workerChannelFn is exposed for testing to allow unit tests to impose delays
// in channel communication. The function is invoked once each time a new worker
// goroutine starts.
workerChannelFn func(uid types.UID, in chan podWork) (out <-chan podWork)

// The EventRecorder to use
recorder record.EventRecorder

Expand Down Expand Up @@ -699,9 +704,8 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
}

// start the pod worker goroutine if it doesn't exist
var podUpdates chan podWork
var exists bool
if podUpdates, exists = p.podUpdates[uid]; !exists {
podUpdates, exists := p.podUpdates[uid]
if !exists {
// We need to have a buffer here, because checkForUpdates() method that
// puts an update into channel is called from the same goroutine where
// the channel is consumed. However, it is guaranteed that in such case
Expand All @@ -715,13 +719,21 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
append(p.waitingToStartStaticPodsByFullname[status.fullname], uid)
}

// allow testing of delays in the pod update channel
var outCh <-chan podWork
if p.workerChannelFn != nil {
outCh = p.workerChannelFn(uid, podUpdates)
} else {
outCh = podUpdates
}

// Creating a new pod worker either means this is a new pod, or that the
// kubelet just restarted. In either case the kubelet is willing to believe
// the status of the pod for the first pod worker sync. See corresponding
// comment in syncPod.
go func() {
defer runtime.HandleCrash()
p.managePodLoop(podUpdates)
p.managePodLoop(outCh)
}()
}

Expand Down Expand Up @@ -785,28 +797,31 @@ func calculateEffectiveGracePeriod(status *podSyncStatus, pod *v1.Pod, options *
}

// allowPodStart tries to start the pod and returns true if allowed, otherwise
// it requeues the pod and returns false.
func (p *podWorkers) allowPodStart(pod *v1.Pod) bool {
// it requeues the pod and returns false. If the pod will never be able to start
// because data is missing, or the pod was terminated before start, canEverStart
// is false.
func (p *podWorkers) allowPodStart(pod *v1.Pod) (canStart bool, canEverStart bool) {
if !kubetypes.IsStaticPod(pod) {
// TBD: Do we want to allow non-static pods with the same full name?
// TODO: Do we want to allow non-static pods with the same full name?
// Note that it may disable the force deletion of pods.
return true
return true, true
}
p.podLock.Lock()
defer p.podLock.Unlock()
status, ok := p.podSyncStatuses[pod.UID]
if !ok {
klog.ErrorS(nil, "Failed to get a valid podSyncStatuses", "pod", klog.KObj(pod), "podUID", pod.UID)
p.workQueue.Enqueue(pod.UID, wait.Jitter(p.backOffPeriod, workerBackOffPeriodJitterFactor))
status.working = false
return false
klog.ErrorS(nil, "Pod sync status does not exist, the worker should not be running", "pod", klog.KObj(pod), "podUID", pod.UID)
return false, false
}
if status.IsTerminationRequested() {
return false, false
}
if !p.allowStaticPodStart(status.fullname, pod.UID) {
p.workQueue.Enqueue(pod.UID, wait.Jitter(p.backOffPeriod, workerBackOffPeriodJitterFactor))
status.working = false
return false
return false, true
}
return true
return true, true
}

// allowStaticPodStart tries to start the static pod and returns true if
Expand All @@ -819,9 +834,12 @@ func (p *podWorkers) allowStaticPodStart(fullname string, uid types.UID) bool {
}

waitingPods := p.waitingToStartStaticPodsByFullname[fullname]
// TODO: This is O(N) with respect to the number of updates to static pods
// with overlapping full names, and ideally would be O(1).
for i, waitingUID := range waitingPods {
// has pod already terminated or been deleted?
if _, ok := p.podSyncStatuses[waitingUID]; !ok {
status, ok := p.podSyncStatuses[waitingUID]
if !ok || status.IsTerminationRequested() || status.IsTerminated() {
continue
}
// another pod is next in line
Expand All @@ -847,8 +865,20 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) {
var podStarted bool
for update := range podUpdates {
pod := update.Options.Pod

// Decide whether to start the pod. If the pod was terminated prior to the pod being allowed
// to start, we have to clean it up and then exit the pod worker loop.
if !podStarted {
if !p.allowPodStart(pod) {
canStart, canEverStart := p.allowPodStart(pod)
if !canEverStart {
p.completeUnstartedTerminated(pod)
if start := update.Options.StartTime; !start.IsZero() {
metrics.PodWorkerDuration.WithLabelValues("terminated").Observe(metrics.SinceInSeconds(start))
}
klog.V(4).InfoS("Processing pod event done", "pod", klog.KObj(pod), "podUID", pod.UID, "updateType", update.WorkType)
return
}
if !canStart {
klog.V(4).InfoS("Pod cannot start yet", "pod", klog.KObj(pod), "podUID", pod.UID)
continue
}
Expand Down Expand Up @@ -1027,12 +1057,7 @@ func (p *podWorkers) completeTerminatingRuntimePod(pod *v1.Pod) {
}
}

ch, ok := p.podUpdates[pod.UID]
if ok {
close(ch)
}
delete(p.podUpdates, pod.UID)
delete(p.lastUndeliveredWorkUpdate, pod.UID)
p.cleanupPodUpdates(pod.UID)
}

// completeTerminated is invoked after syncTerminatedPod completes successfully and means we
Expand All @@ -1043,12 +1068,7 @@ func (p *podWorkers) completeTerminated(pod *v1.Pod) {

klog.V(4).InfoS("Pod is complete and the worker can now stop", "pod", klog.KObj(pod), "podUID", pod.UID)

ch, ok := p.podUpdates[pod.UID]
if ok {
close(ch)
}
delete(p.podUpdates, pod.UID)
delete(p.lastUndeliveredWorkUpdate, pod.UID)
p.cleanupPodUpdates(pod.UID)

if status, ok := p.podSyncStatuses[pod.UID]; ok {
if status.terminatingAt.IsZero() {
Expand All @@ -1066,6 +1086,33 @@ func (p *podWorkers) completeTerminated(pod *v1.Pod) {
}
}

// completeUnstartedTerminated is invoked if a pod that has never been started receives a termination
// signal before it can be started.
func (p *podWorkers) completeUnstartedTerminated(pod *v1.Pod) {
p.podLock.Lock()
defer p.podLock.Unlock()

klog.V(4).InfoS("Pod never started and the worker can now stop", "pod", klog.KObj(pod), "podUID", pod.UID)

p.cleanupPodUpdates(pod.UID)

if status, ok := p.podSyncStatuses[pod.UID]; ok {
if status.terminatingAt.IsZero() {
klog.V(4).InfoS("Pod worker is complete but did not have terminatingAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID)
}
if !status.terminatedAt.IsZero() {
klog.V(4).InfoS("Pod worker is complete and had terminatedAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID)
}
status.finished = true
status.working = false
status.terminatedAt = time.Now()

if p.startedStaticPodsByFullname[status.fullname] == pod.UID {
delete(p.startedStaticPodsByFullname, status.fullname)
}
}
}

// completeWork requeues on error or the next sync interval and then immediately executes any pending
// work.
func (p *podWorkers) completeWork(pod *v1.Pod, syncErr error) {
Expand Down Expand Up @@ -1150,10 +1197,10 @@ func (p *podWorkers) SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorke
return workers
}

// removeTerminatedWorker cleans up and removes the worker status for a worker that
// has reached a terminal state of "finished" - has successfully exited
// syncTerminatedPod. This "forgets" a pod by UID and allows another pod to be recreated
// with the same UID.
// removeTerminatedWorker cleans up and removes the worker status for a worker
// that has reached a terminal state of "finished" - has successfully exited
// syncTerminatedPod. This "forgets" a pod by UID and allows another pod to be
// recreated with the same UID.
func (p *podWorkers) removeTerminatedWorker(uid types.UID) {
status, ok := p.podSyncStatuses[uid]
if !ok {
Expand All @@ -1162,11 +1209,6 @@ func (p *podWorkers) removeTerminatedWorker(uid types.UID) {
return
}

if startedUID, started := p.startedStaticPodsByFullname[status.fullname]; started && startedUID != uid {
klog.V(4).InfoS("Pod cannot start yet but is no longer known to the kubelet, finish it", "podUID", uid)
status.finished = true
}

if !status.finished {
klog.V(4).InfoS("Pod worker has been requested for removal but is still not fully terminated", "podUID", uid)
return
Expand All @@ -1178,8 +1220,7 @@ func (p *podWorkers) removeTerminatedWorker(uid types.UID) {
klog.V(4).InfoS("Pod has been terminated and is no longer known to the kubelet, remove all history", "podUID", uid)
}
delete(p.podSyncStatuses, uid)
delete(p.podUpdates, uid)
delete(p.lastUndeliveredWorkUpdate, uid)
p.cleanupPodUpdates(uid)

if p.startedStaticPodsByFullname[status.fullname] == uid {
delete(p.startedStaticPodsByFullname, status.fullname)
Expand Down Expand Up @@ -1230,3 +1271,15 @@ func killPodNow(podWorkers PodWorkers, recorder record.EventRecorder) eviction.K
}
}
}

// cleanupPodUpdates closes the podUpdates channel and removes it from
// podUpdates map so that the corresponding pod worker can stop. It also
// removes any undelivered work. This method must be called holding the
// pod lock.
func (p *podWorkers) cleanupPodUpdates(uid types.UID) {
if ch, ok := p.podUpdates[uid]; ok {
close(ch)
}
delete(p.podUpdates, uid)
delete(p.lastUndeliveredWorkUpdate, uid)
}

0 comments on commit 9f347da

Please sign in to comment.