Skip to content

Commit

Permalink
UPSTREAM: <carry>: kubelet: fix readiness probes with pod termination
Browse files Browse the repository at this point in the history
We can drop this patch after the following two PRs merge (or their
equivalent):

* kubernetes#115342
* kubernetes#113145

UPSTREAM: <carry>: kubelet: fix readiness probes with pod termination
  • Loading branch information
rphillips authored and bertinatto committed Jun 20, 2023
1 parent 279b387 commit a24922c
Show file tree
Hide file tree
Showing 13 changed files with 181 additions and 58 deletions.
6 changes: 3 additions & 3 deletions pkg/kubelet/kubelet.go
Expand Up @@ -2475,7 +2475,7 @@ func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubety
}
case update := <-kl.readinessManager.Updates():
ready := update.Result == proberesults.Success
kl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
kl.statusManager.SetContainerReadiness(update.Pod, update.ContainerID, ready)

status := ""
if ready {
Expand All @@ -2484,7 +2484,7 @@ func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubety
handleProbeSync(kl, update, handler, "readiness", status)
case update := <-kl.startupManager.Updates():
started := update.Result == proberesults.Success
kl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)
kl.statusManager.SetContainerStartup(update.Pod, update.ContainerID, started)

status := "unhealthy"
if started {
Expand Down Expand Up @@ -2514,7 +2514,7 @@ func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubety

func handleProbeSync(kl *Kubelet, update proberesults.Update, handler SyncHandler, probe, status string) {
// We should not use the pod from manager, because it is never updated after initialization.
pod, ok := kl.podManager.GetPodByUID(update.PodUID)
pod, ok := kl.podManager.GetPodByUID(update.Pod.UID)
if !ok {
// If the pod no longer exists, ignore the update.
klog.V(4).InfoS("SyncLoop (probe): ignore irrelevant update", "probe", probe, "status", status, "update", update)
Expand Down
7 changes: 5 additions & 2 deletions pkg/kubelet/kubelet_pods.go
Expand Up @@ -973,14 +973,17 @@ func (kl *Kubelet) isAdmittedPodTerminal(pod *v1.Pod) bool {

// removeOrphanedPodStatuses removes obsolete entries in podStatus where
// the pod is no longer considered bound to this node.
func (kl *Kubelet) removeOrphanedPodStatuses(pods []*v1.Pod, mirrorPods []*v1.Pod) {
func (kl *Kubelet) removeOrphanedPodStatuses(pods []*v1.Pod, mirrorPods []*v1.Pod, possiblyRunningPods map[types.UID]sets.Empty) {
podUIDs := make(map[types.UID]bool)
for _, pod := range pods {
podUIDs[pod.UID] = true
}
for _, pod := range mirrorPods {
podUIDs[pod.UID] = true
}
for uid := range possiblyRunningPods {
podUIDs[uid] = true
}
kl.statusManager.RemoveOrphanedStatuses(podUIDs)
}

Expand Down Expand Up @@ -1083,7 +1086,7 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {

// Remove orphaned pod statuses not in the total list of known config pods
klog.V(3).InfoS("Clean up orphaned pod statuses")
kl.removeOrphanedPodStatuses(allPods, mirrorPods)
kl.removeOrphanedPodStatuses(allPods, mirrorPods, possiblyRunningPods)

// Remove orphaned pod user namespace allocations (if any).
klog.V(3).InfoS("Clean up orphaned pod user namespace allocations")
Expand Down
1 change: 1 addition & 0 deletions pkg/kubelet/pod/mirror_client.go
Expand Up @@ -103,6 +103,7 @@ func (mc *basicMirrorClient) CreateMirrorPod(pod *v1.Pod) error {
return nil
}
}
klog.V(2).InfoS("Created mirror pod", "static_pod", klog.KObj(pod), "static_pod_uid", pod.UID, "mirror_pod", klog.KObj(apiPod), "mirror_pod_uid", apiPod.UID)
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/prober/prober_manager_test.go
Expand Up @@ -320,7 +320,7 @@ func (m *manager) extractedReadinessHandling() {
update := <-m.readinessManager.Updates()
// This code corresponds to an extract from kubelet.syncLoopIteration()
ready := update.Result == results.Success
m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
m.statusManager.SetContainerReadiness(update.Pod, update.ContainerID, ready)
}

func TestUpdateReadiness(t *testing.T) {
Expand Down
5 changes: 2 additions & 3 deletions pkg/kubelet/prober/results/results_manager.go
Expand Up @@ -20,7 +20,6 @@ import (
"sync"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)

Expand Down Expand Up @@ -80,7 +79,7 @@ func (r Result) ToPrometheusType() float64 {
type Update struct {
ContainerID kubecontainer.ContainerID
Result Result
PodUID types.UID
Pod *v1.Pod
}

// Manager implementation.
Expand Down Expand Up @@ -112,7 +111,7 @@ func (m *manager) Get(id kubecontainer.ContainerID) (Result, bool) {

func (m *manager) Set(id kubecontainer.ContainerID, result Result, pod *v1.Pod) {
if m.setInternal(id, result) {
m.updates <- Update{id, result, pod.UID}
m.updates <- Update{id, result, pod}
}
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/kubelet/prober/results/results_manager_test.go
Expand Up @@ -78,10 +78,10 @@ func TestUpdates(t *testing.T) {

// New result should always push an update.
m.Set(fooID, Success, pod)
expectUpdate(Update{fooID, Success, pod.UID}, "new success")
expectUpdate(Update{fooID, Success, pod}, "new success")

m.Set(barID, Failure, pod)
expectUpdate(Update{barID, Failure, pod.UID}, "new failure")
expectUpdate(Update{barID, Failure, pod}, "new failure")

// Unchanged results should not send an update.
m.Set(fooID, Success, pod)
Expand All @@ -92,8 +92,8 @@ func TestUpdates(t *testing.T) {

// Changed results should send an update.
m.Set(fooID, Failure, pod)
expectUpdate(Update{fooID, Failure, pod.UID}, "changed foo")
expectUpdate(Update{fooID, Failure, pod}, "changed foo")

m.Set(barID, Success, pod)
expectUpdate(Update{barID, Success, pod.UID}, "changed bar")
expectUpdate(Update{barID, Success, pod}, "changed bar")
}
2 changes: 1 addition & 1 deletion pkg/kubelet/prober/scale_test.go
Expand Up @@ -168,7 +168,7 @@ func TestTCPPortExhaustion(t *testing.T) {
switch result.Result.String() {
// The test will fail if any of the probes fails
case "Failure":
t.Errorf("Failure %s on contantinerID: %v Pod %v", probeType, result.ContainerID, result.PodUID)
t.Errorf("Failure %s on contantinerID: %v Pod %v", probeType, result.ContainerID, result.Pod.UID)
case "UNKNOWN": // startup probes
t.Logf("UNKNOWN state for %v", result)
default:
Expand Down
4 changes: 2 additions & 2 deletions pkg/kubelet/prober/worker_test.go
Expand Up @@ -462,7 +462,7 @@ func TestLivenessProbeDisabledByStarted(t *testing.T) {
expectContinue(t, w, w.doProbe(ctx), msg)
expectResult(t, w, results.Success, msg)
// setting started state
m.statusManager.SetContainerStartup(w.pod.UID, w.containerID, true)
m.statusManager.SetContainerStartup(w.pod, w.containerID, true)
// livenessProbe fails
m.prober.exec = fakeExecProber{probe.Failure, nil}
msg = "Started, probe failure, result failure"
Expand All @@ -486,7 +486,7 @@ func TestStartupProbeDisabledByStarted(t *testing.T) {
expectContinue(t, w, w.doProbe(ctx), msg)
expectResult(t, w, results.Success, msg)
// setting started state
m.statusManager.SetContainerStartup(w.pod.UID, w.containerID, true)
m.statusManager.SetContainerStartup(w.pod, w.containerID, true)
// startupProbe fails, but is disabled
m.prober.exec = fakeExecProber{probe.Failure, nil}
msg = "Started, probe failure, result success"
Expand Down
4 changes: 2 additions & 2 deletions pkg/kubelet/status/fake_status_manager.go
Expand Up @@ -43,12 +43,12 @@ func (m *fakeManager) SetPodStatus(pod *v1.Pod, status v1.PodStatus) {
return
}

func (m *fakeManager) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) {
func (m *fakeManager) SetContainerReadiness(pod *v1.Pod, containerID kubecontainer.ContainerID, ready bool) {
klog.InfoS("SetContainerReadiness()")
return
}

func (m *fakeManager) SetContainerStartup(podUID types.UID, containerID kubecontainer.ContainerID, started bool) {
func (m *fakeManager) SetContainerStartup(pod *v1.Pod, containerID kubecontainer.ContainerID, started bool) {
klog.InfoS("SetContainerStartup()")
return
}
Expand Down
26 changes: 5 additions & 21 deletions pkg/kubelet/status/status_manager.go
Expand Up @@ -128,11 +128,11 @@ type Manager interface {

// SetContainerReadiness updates the cached container status with the given readiness, and
// triggers a status update.
SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool)
SetContainerReadiness(pod *v1.Pod, containerID kubecontainer.ContainerID, ready bool)

// SetContainerStartup updates the cached container status with the given startup, and
// triggers a status update.
SetContainerStartup(podUID types.UID, containerID kubecontainer.ContainerID, started bool)
SetContainerStartup(pod *v1.Pod, containerID kubecontainer.ContainerID, started bool)

// TerminatePod resets the container status for the provided pod to terminated and triggers
// a status update.
Expand Down Expand Up @@ -292,16 +292,10 @@ func (m *manager) SetPodStatus(pod *v1.Pod, status v1.PodStatus) {
m.updateStatusInternal(pod, status, pod.DeletionTimestamp != nil, false)
}

func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) {
func (m *manager) SetContainerReadiness(pod *v1.Pod, containerID kubecontainer.ContainerID, ready bool) {
m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock()

pod, ok := m.podManager.GetPodByUID(podUID)
if !ok {
klog.V(4).InfoS("Pod has been deleted, no need to update readiness", "podUID", string(podUID))
return
}

oldStatus, found := m.podStatuses[pod.UID]
if !found {
klog.InfoS("Container readiness changed before pod has synced",
Expand Down Expand Up @@ -353,10 +347,11 @@ func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontai
m.updateStatusInternal(pod, status, false, false)
}

func (m *manager) SetContainerStartup(podUID types.UID, containerID kubecontainer.ContainerID, started bool) {
func (m *manager) SetContainerStartup(pod *v1.Pod, containerID kubecontainer.ContainerID, started bool) {
m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock()

podUID := pod.UID
pod, ok := m.podManager.GetPodByUID(podUID)
if !ok {
klog.V(4).InfoS("Pod has been deleted, no need to update startup", "podUID", string(podUID))
Expand Down Expand Up @@ -822,17 +817,6 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
return
}

translatedUID := m.podManager.TranslatePodUID(pod.UID)
// Type convert original uid just for the purpose of comparison.
if len(translatedUID) > 0 && translatedUID != kubetypes.ResolvedPodUID(uid) {
klog.V(2).InfoS("Pod was deleted and then recreated, skipping status update",
"pod", klog.KObj(pod),
"oldPodUID", uid,
"podUID", translatedUID)
m.deletePodStatus(uid)
return
}

mergedStatus := mergePodStatus(pod.Status, status.status, m.podDeletionSafety.PodCouldHaveRunningContainers(pod))

newPod, patchBytes, unchanged, err := statusutil.PatchPodStatus(context.TODO(), m.kubeClient, pod.Namespace, pod.Name, pod.UID, pod.Status, mergedStatus)
Expand Down
23 changes: 12 additions & 11 deletions pkg/kubelet/status/status_manager_test.go
Expand Up @@ -336,6 +336,7 @@ func TestSyncPodChecksMismatchedUID(t *testing.T) {
pod.UID = "first"
syncer.podManager.(mutablePodManager).AddPod(pod)
differentPod := getTestPod()
differentPod.Name = "second pod"
differentPod.UID = "second"
syncer.podManager.(mutablePodManager).AddPod(differentPod)
syncer.kubeClient = fake.NewSimpleClientset(pod)
Expand Down Expand Up @@ -376,7 +377,7 @@ func TestSyncPodNoDeadlock(t *testing.T) {
ret.UID = "other_pod"
err = nil
m.SetPodStatus(pod, getRandomPodStatus())
verifyActions(t, m, []core.Action{getAction()})
verifyActions(t, m, []core.Action{getAction(), patchAction()})

t.Logf("Pod not deleted (success case).")
ret = getTestPod()
Expand Down Expand Up @@ -1140,7 +1141,7 @@ func TestSetContainerReadiness(t *testing.T) {
m.podManager.(mutablePodManager).AddPod(pod)

t.Log("Setting readiness before status should fail.")
m.SetContainerReadiness(pod.UID, cID1, true)
m.SetContainerReadiness(pod, cID1, true)
verifyUpdates(t, m, 0)
if status, ok := m.GetPodStatus(pod.UID); ok {
t.Errorf("Unexpected PodStatus: %+v", status)
Expand All @@ -1153,25 +1154,25 @@ func TestSetContainerReadiness(t *testing.T) {
verifyReadiness("initial", &status, false, false, false)

t.Log("Setting unchanged readiness should do nothing.")
m.SetContainerReadiness(pod.UID, cID1, false)
m.SetContainerReadiness(pod, cID1, false)
verifyUpdates(t, m, 0)
status = expectPodStatus(t, m, pod)
verifyReadiness("unchanged", &status, false, false, false)

t.Log("Setting container readiness should generate update but not pod readiness.")
m.SetContainerReadiness(pod.UID, cID1, true)
m.SetContainerReadiness(pod, cID1, true)
verifyUpdates(t, m, 1)
status = expectPodStatus(t, m, pod)
verifyReadiness("c1 ready", &status, true, false, false)

t.Log("Setting both containers to ready should update pod readiness.")
m.SetContainerReadiness(pod.UID, cID2, true)
m.SetContainerReadiness(pod, cID2, true)
verifyUpdates(t, m, 1)
status = expectPodStatus(t, m, pod)
verifyReadiness("all ready", &status, true, true, true)

t.Log("Setting non-existent container readiness should fail.")
m.SetContainerReadiness(pod.UID, kubecontainer.ContainerID{Type: "test", ID: "foo"}, true)
m.SetContainerReadiness(pod, kubecontainer.ContainerID{Type: "test", ID: "foo"}, true)
verifyUpdates(t, m, 0)
status = expectPodStatus(t, m, pod)
verifyReadiness("ignore non-existent", &status, true, true, true)
Expand Down Expand Up @@ -1224,7 +1225,7 @@ func TestSetContainerStartup(t *testing.T) {
m.podManager.(mutablePodManager).AddPod(pod)

t.Log("Setting startup before status should fail.")
m.SetContainerStartup(pod.UID, cID1, true)
m.SetContainerStartup(pod, cID1, true)
verifyUpdates(t, m, 0)
if status, ok := m.GetPodStatus(pod.UID); ok {
t.Errorf("Unexpected PodStatus: %+v", status)
Expand All @@ -1237,25 +1238,25 @@ func TestSetContainerStartup(t *testing.T) {
verifyStartup("initial", &status, false, false, false)

t.Log("Setting unchanged startup should do nothing.")
m.SetContainerStartup(pod.UID, cID1, false)
m.SetContainerStartup(pod, cID1, false)
verifyUpdates(t, m, 1)
status = expectPodStatus(t, m, pod)
verifyStartup("unchanged", &status, false, false, false)

t.Log("Setting container startup should generate update but not pod startup.")
m.SetContainerStartup(pod.UID, cID1, true)
m.SetContainerStartup(pod, cID1, true)
verifyUpdates(t, m, 1) // Started = nil to false
status = expectPodStatus(t, m, pod)
verifyStartup("c1 ready", &status, true, false, false)

t.Log("Setting both containers to ready should update pod startup.")
m.SetContainerStartup(pod.UID, cID2, true)
m.SetContainerStartup(pod, cID2, true)
verifyUpdates(t, m, 1)
status = expectPodStatus(t, m, pod)
verifyStartup("all ready", &status, true, true, true)

t.Log("Setting non-existent container startup should fail.")
m.SetContainerStartup(pod.UID, kubecontainer.ContainerID{Type: "test", ID: "foo"}, true)
m.SetContainerStartup(pod, kubecontainer.ContainerID{Type: "test", ID: "foo"}, true)
verifyUpdates(t, m, 0)
status = expectPodStatus(t, m, pod)
verifyStartup("ignore non-existent", &status, true, true, true)
Expand Down
16 changes: 8 additions & 8 deletions pkg/kubelet/status/testing/mock_pod_status_provider.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit a24922c

Please sign in to comment.