Skip to content

Commit

Permalink
kubelet: Ensure pods that have not started track a pendingUpdate
Browse files Browse the repository at this point in the history
A pod that cannot be started yet (due to static pod fullname
exclusion when UIDs are reused) must be accounted for in the
pod worker since it is considered to have been admitted and will
eventually start.

Due to a bug we accidentally cleared pendingUpdate for pods that
cannot start yet which means we can't report the right metric to
users in kubelet_working_pods and in theory we might fail to start
the pod in the future (although we currently have not observed
that in tests that should catch such an error). Describe, implement,
and test the invariant that when startPodSync returns in every path
that either activeUpdate OR pendingUpdate is set on the status, but
never both, and is only nil when the pod can never start.

This bug was detected by a "programmer error" assertion we added
on metrics that were not being reported, suggesting that we should
be more aggressive on using log assertions and automating detection
in tests.
  • Loading branch information
smarterclayton committed Apr 14, 2023
1 parent 398e387 commit 861e193
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 3 deletions.
65 changes: 65 additions & 0 deletions pkg/kubelet/kubelet_pods_test.go
Expand Up @@ -5082,6 +5082,71 @@ func TestKubelet_HandlePodCleanups(t *testing.T) {
}
},
},
{
name: "pod that could not start still has a pending update and is tracked in metrics",
wantErr: false,
pods: []*v1.Pod{
staticPod(),
},
prepareWorker: func(t *testing.T, w *podWorkers, records map[types.UID][]syncPodRecord) {
// send a create of a static pod
pod := staticPod()
// block startup of the static pod due to full name collision
w.startedStaticPodsByFullname[kubecontainer.GetPodFullName(pod)] = types.UID("2")

w.UpdatePod(UpdatePodOptions{
UpdateType: kubetypes.SyncPodCreate,
StartTime: time.Unix(1, 0).UTC(),
Pod: pod,
})
drainAllWorkers(w)

if _, ok := records[pod.UID]; ok {
t.Fatalf("unexpected records: %#v", records)
}
// pod worker is unaware of pod1 yet
},
wantWorker: func(t *testing.T, w *podWorkers, records map[types.UID][]syncPodRecord) {
uid := types.UID("1")
if len(w.podSyncStatuses) != 1 {
t.Fatalf("unexpected sync statuses: %#v", w.podSyncStatuses)
}
s, ok := w.podSyncStatuses[uid]
if !ok || s.IsTerminationRequested() || s.IsTerminationStarted() || s.IsFinished() || s.IsWorking() || s.IsDeleted() || s.restartRequested || s.activeUpdate != nil || s.pendingUpdate == nil {
t.Errorf("unexpected requested pod termination: %#v", s)
}

// expect that no sync calls are made, since the pod doesn't ever start
if actual, expected := records[uid], []syncPodRecord(nil); !reflect.DeepEqual(expected, actual) {
t.Fatalf("unexpected pod sync records: %s", cmp.Diff(expected, actual, cmp.AllowUnexported(syncPodRecord{})))
}
},
expectMetrics: map[string]string{
metrics.DesiredPodCount.FQName(): `# HELP kubelet_desired_pods [ALPHA] The number of pods the kubelet is being instructed to run. static is true if the pod is not from the apiserver.
# TYPE kubelet_desired_pods gauge
kubelet_desired_pods{static=""} 0
kubelet_desired_pods{static="true"} 1
`,
metrics.WorkingPodCount.FQName(): `# HELP kubelet_working_pods [ALPHA] Number of pods the kubelet is actually running, broken down by lifecycle phase, whether the pod is desired, orphaned, or runtime only (also orphaned), and whether the pod is static. An orphaned pod has been removed from local configuration or force deleted in the API and consumes resources that are not otherwise visible.
# TYPE kubelet_working_pods gauge
kubelet_working_pods{config="desired",lifecycle="sync",static=""} 0
kubelet_working_pods{config="desired",lifecycle="sync",static="true"} 1
kubelet_working_pods{config="desired",lifecycle="terminated",static=""} 0
kubelet_working_pods{config="desired",lifecycle="terminated",static="true"} 0
kubelet_working_pods{config="desired",lifecycle="terminating",static=""} 0
kubelet_working_pods{config="desired",lifecycle="terminating",static="true"} 0
kubelet_working_pods{config="orphan",lifecycle="sync",static=""} 0
kubelet_working_pods{config="orphan",lifecycle="sync",static="true"} 0
kubelet_working_pods{config="orphan",lifecycle="terminated",static=""} 0
kubelet_working_pods{config="orphan",lifecycle="terminated",static="true"} 0
kubelet_working_pods{config="orphan",lifecycle="terminating",static=""} 0
kubelet_working_pods{config="orphan",lifecycle="terminating",static="true"} 0
kubelet_working_pods{config="runtime_only",lifecycle="sync",static="unknown"} 0
kubelet_working_pods{config="runtime_only",lifecycle="terminated",static="unknown"} 0
kubelet_working_pods{config="runtime_only",lifecycle="terminating",static="unknown"} 0
`,
},
},
{
name: "pod that could not start and is not in config is force terminated without runtime during pod cleanup",
wantErr: false,
Expand Down
20 changes: 17 additions & 3 deletions pkg/kubelet/pod_workers.go
Expand Up @@ -1086,6 +1086,10 @@ func (p *podWorkers) cleanupUnstartedPod(pod *v1.Pod, status *podSyncStatus) {
// or can be started, and updates the cached pod state so that downstream components can observe what the
// pod worker goroutine is currently attempting to do. If ok is false, there is no available event. If any
// of the boolean values is false, ensure the appropriate cleanup happens before returning.
//
// This method should ensure that either status.pendingUpdate is cleared and merged into status.activeUpdate,
// or when a pod cannot be started status.pendingUpdate remains the same. Pods that have not been started
// should never have an activeUpdate because that is exposed to downstream components on started pods.
func (p *podWorkers) startPodSync(podUID types.UID) (ctx context.Context, update podWork, canStart, canEverStart, ok bool) {
p.podLock.Lock()
defer p.podLock.Unlock()
Expand Down Expand Up @@ -1159,6 +1163,8 @@ func (p *podWorkers) startPodSync(podUID types.UID) (ctx context.Context, update
klog.V(4).InfoS("Pod cannot start ever", "pod", klog.KObj(update.Options.Pod), "podUID", podUID, "updateType", update.WorkType)
return ctx, update, canStart, canEverStart, true
case !canStart:
// this is the only path we don't start the pod, so we need to put the change back in pendingUpdate
status.pendingUpdate = &update.Options
status.working = false
klog.V(4).InfoS("Pod cannot start yet", "pod", klog.KObj(update.Options.Pod), "podUID", podUID)
return ctx, update, canStart, canEverStart, true
Expand Down Expand Up @@ -1545,9 +1551,17 @@ func (p *podWorkers) SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorke
State: status.WorkType(),
Orphan: orphan,
}
if status.activeUpdate != nil && status.activeUpdate.Pod != nil {
sync.HasConfig = true
sync.Static = kubetypes.IsStaticPod(status.activeUpdate.Pod)
switch {
case status.activeUpdate != nil:
if status.activeUpdate.Pod != nil {
sync.HasConfig = true
sync.Static = kubetypes.IsStaticPod(status.activeUpdate.Pod)
}
case status.pendingUpdate != nil:
if status.pendingUpdate.Pod != nil {
sync.HasConfig = true
sync.Static = kubetypes.IsStaticPod(status.pendingUpdate.Pod)
}
}
workers[uid] = sync
}
Expand Down

0 comments on commit 861e193

Please sign in to comment.