From 4870e64ac110d6bf62dbd270f8cef557c4bd6d53 Mon Sep 17 00:00:00 2001 From: Matthias Bertschy Date: Mon, 7 Dec 2020 21:43:04 +0100 Subject: [PATCH 1/4] Improve update time of readiness state --- pkg/kubelet/prober/prober_manager.go | 16 ++++++++++++++-- pkg/kubelet/prober/worker.go | 20 ++++++++++++++------ test/e2e/common/node/container_probe.go | 12 +++++++++--- 3 files changed, 37 insertions(+), 11 deletions(-) diff --git a/pkg/kubelet/prober/prober_manager.go b/pkg/kubelet/prober/prober_manager.go index be2779047213..9d5340a88763 100644 --- a/pkg/kubelet/prober/prober_manager.go +++ b/pkg/kubelet/prober/prober_manager.go @@ -17,7 +17,9 @@ limitations under the License. package prober import ( + "k8s.io/apimachinery/pkg/util/clock" "sync" + "time" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" @@ -94,6 +96,8 @@ type manager struct { // prober executes the probe actions. prober *prober + + start time.Time } // NewManager creates a Manager for pod probing. @@ -113,6 +117,7 @@ func NewManager( livenessManager: livenessManager, startupManager: startupManager, workers: make(map[probeKey]*worker), + start: clock.RealClock{}.Now(), } } @@ -253,8 +258,15 @@ func (m *manager) UpdatePodStatus(podUID types.UID, podStatus *v1.PodStatus) { ready = result == results.Success } else { // The check whether there is a probe which hasn't run yet. - _, exists := m.getWorker(podUID, c.Name, readiness) - ready = !exists + w, exists := m.getWorker(podUID, c.Name, readiness) + ready = !exists // no readinessProbe -> always ready + if exists { + // Trigger an immediate run of the readinessProbe to update ready state + select { + case w.manualTriggerCh <- struct{}{}: + default: // Non-blocking. + } + } } podStatus.ContainerStatuses[i].Ready = ready } diff --git a/pkg/kubelet/prober/worker.go b/pkg/kubelet/prober/worker.go index cac63117b4cd..ef8339e314dd 100644 --- a/pkg/kubelet/prober/worker.go +++ b/pkg/kubelet/prober/worker.go @@ -38,6 +38,9 @@ type worker struct { // Channel for stopping the probe. stopCh chan struct{} + // Channel for triggering the probe manually. + manualTriggerCh chan struct{} + // The pod containing this probe (read-only) pod *v1.Pod @@ -82,11 +85,12 @@ func newWorker( container v1.Container) *worker { w := &worker{ - stopCh: make(chan struct{}, 1), // Buffer so stop() can be non-blocking. - pod: pod, - container: container, - probeType: probeType, - probeManager: m, + stopCh: make(chan struct{}, 1), // Buffer so stop() can be non-blocking. + manualTriggerCh: make(chan struct{}, 1), // Buffer so prober_manager can do non-blocking calls to doProbe. + pod: pod, + container: container, + probeType: probeType, + probeManager: m, } switch probeType { @@ -130,7 +134,10 @@ func (w *worker) run() { // If kubelet restarted the probes could be started in rapid succession. // Let the worker wait for a random portion of tickerPeriod before probing. - time.Sleep(time.Duration(rand.Float64() * float64(probeTickerPeriod))) + // Do it only if the kubelet has started recently. + if probeTickerPeriod > time.Since(w.probeManager.start) { + time.Sleep(time.Duration(rand.Float64() * float64(probeTickerPeriod))) + } probeTicker := time.NewTicker(probeTickerPeriod) @@ -154,6 +161,7 @@ probeLoop: case <-w.stopCh: break probeLoop case <-probeTicker.C: + case <-w.manualTriggerCh: // continue } } diff --git a/test/e2e/common/node/container_probe.go b/test/e2e/common/node/container_probe.go index c69412c23151..1c0c4cabf220 100644 --- a/test/e2e/common/node/container_probe.go +++ b/test/e2e/common/node/container_probe.go @@ -374,7 +374,8 @@ var _ = SIGDescribe("Probing container", func() { Description: A Pod is created with startup and readiness probes. The Container is started by creating /tmp/startup after 45 seconds, delaying the ready state by this amount of time. This is similar to the "Pod readiness probe, with initial delay" test. */ ginkgo.It("should not be ready until startupProbe succeeds", func() { - cmd := []string{"/bin/sh", "-c", "echo ok >/tmp/health; sleep 45; echo ok >/tmp/startup; sleep 600"} + sleepBeforeStarted := time.Duration(45) + cmd := []string{"/bin/sh", "-c", fmt.Sprintf("echo ok >/tmp/health; sleep %d; echo ok >/tmp/startup; sleep 600", sleepBeforeStarted)} readinessProbe := &v1.Probe{ Handler: v1.Handler{ Exec: &v1.ExecAction{ @@ -382,6 +383,7 @@ var _ = SIGDescribe("Probing container", func() { }, }, InitialDelaySeconds: 0, + PeriodSeconds: 60, } startupProbe := &v1.Probe{ Handler: v1.Handler{ @@ -390,7 +392,8 @@ var _ = SIGDescribe("Probing container", func() { }, }, InitialDelaySeconds: 0, - FailureThreshold: 60, + PeriodSeconds: 1, + FailureThreshold: 600, } p := podClient.Create(startupPodSpec(startupProbe, readinessProbe, nil, cmd)) @@ -414,9 +417,12 @@ var _ = SIGDescribe("Probing container", func() { framework.ExpectNoError(err) framework.Logf("Container started at %v, pod became ready at %v", startedTime, readyTime) - if readyTime.Sub(startedTime) < 40*time.Second { + if readyTime.Sub(startedTime) < sleepBeforeStarted*time.Second { framework.Failf("Pod became ready before startupProbe succeeded") } + if readyTime.Sub(startedTime) > (sleepBeforeStarted+20)*time.Second { + framework.Failf("Pod became ready more than 20s after startupProbe succeeded") + } }) }) From eed218a3a254cd9f38f09d9f6b8d6fee7fe4a34b Mon Sep 17 00:00:00 2001 From: Matthias Bertschy Date: Sat, 16 Jan 2021 13:34:38 +0100 Subject: [PATCH 2/4] Move startupManager updates handling to kubelet --- pkg/kubelet/kubelet.go | 32 +++++++++++++++++----------- pkg/kubelet/prober/prober_manager.go | 9 -------- 2 files changed, 19 insertions(+), 22 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 6f640c359a3f..58d72fc754eb 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1902,8 +1902,8 @@ func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHand // * plegCh: update the runtime cache; sync pod // * syncCh: sync all pods waiting for sync // * housekeepingCh: trigger cleanup of pods -// * liveness manager: sync pods that have failed or in which one or more -// containers have failed liveness checks +// * liveness/startup manager: sync pods that have failed or in which one or more +// containers have failed liveness/startup checks func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler, syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool { select { @@ -1979,18 +1979,12 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle case update := <-kl.livenessManager.Updates(): if update.Result == proberesults.Failure { // The liveness manager detected a failure; sync the pod. - - // We should not use the pod from livenessManager, because it is never updated after - // initialization. - pod, ok := kl.podManager.GetPodByUID(update.PodUID) - if !ok { - // If the pod no longer exists, ignore the update. - klog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update) - break - } - klog.V(1).Infof("SyncLoop (container unhealthy): %q", format.Pod(pod)) - handler.HandlePodSyncs([]*v1.Pod{pod}) + syncPod(kl, update, handler) } + case update := <-kl.startupManager.Updates(): + started := update.Result == proberesults.Success + kl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started) + syncPod(kl, update, handler) case <-housekeepingCh: if !kl.sourcesReady.AllReady() { // If the sources aren't ready or volume manager has not yet synced the states, @@ -2006,6 +2000,18 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle return true } +func syncPod(kl *Kubelet, update proberesults.Update, handler SyncHandler) { + // We should not use the pod from manager, because it is never updated after initialization. + pod, ok := kl.podManager.GetPodByUID(update.PodUID) + if !ok { + // If the pod no longer exists, ignore the update. + klog.V(4).Infof("SyncLoop: ignore irrelevant update: %#v", update) + return + } + klog.V(1).Infof("SyncLoop: %q", format.Pod(pod)) + handler.HandlePodSyncs([]*v1.Pod{pod}) +} + // dispatchWork starts the asynchronous sync of the pod in a pod worker. // If the pod has completed termination, dispatchWork will perform no action. func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) { diff --git a/pkg/kubelet/prober/prober_manager.go b/pkg/kubelet/prober/prober_manager.go index 9d5340a88763..1d829e62bd10 100644 --- a/pkg/kubelet/prober/prober_manager.go +++ b/pkg/kubelet/prober/prober_manager.go @@ -125,8 +125,6 @@ func NewManager( func (m *manager) Start() { // Start syncing readiness. go wait.Forever(m.updateReadiness, 0) - // Start syncing startup. - go wait.Forever(m.updateStartup, 0) } // Key uniquely identifying container probes @@ -309,10 +307,3 @@ func (m *manager) updateReadiness() { ready := update.Result == results.Success m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready) } - -func (m *manager) updateStartup() { - update := <-m.startupManager.Updates() - - started := update.Result == results.Success - m.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started) -} From 431e6a704406b27b910de5a2ed6f1437fcff4303 Mon Sep 17 00:00:00 2001 From: Matthias Bertschy Date: Mon, 25 Jan 2021 17:51:12 +0100 Subject: [PATCH 3/4] Move readinessManager updates handling to kubelet --- pkg/kubelet/kubelet.go | 32 ++++++++++++------- pkg/kubelet/kubelet_test.go | 1 + .../kuberuntime/kuberuntime_manager.go | 7 ++-- pkg/kubelet/prober/common_test.go | 1 + pkg/kubelet/prober/prober_manager.go | 22 ++----------- pkg/kubelet/prober/prober_manager_test.go | 11 +++++-- test/e2e/common/node/container_probe.go | 28 ++++++---------- 7 files changed, 50 insertions(+), 52 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 58d72fc754eb..d633d6e1d039 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -581,6 +581,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, imageBackOff := flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff) klet.livenessManager = proberesults.NewManager() + klet.readinessManager = proberesults.NewManager() klet.startupManager = proberesults.NewManager() klet.podCache = kubecontainer.NewCache() @@ -618,6 +619,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, runtime, err := kuberuntime.NewKubeGenericRuntimeManager( kubecontainer.FilterEventRecorder(kubeDeps.Recorder), klet.livenessManager, + klet.readinessManager, klet.startupManager, seccompProfileRoot, machineInfo, @@ -716,6 +718,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, klet.probeManager = prober.NewManager( klet.statusManager, klet.livenessManager, + klet.readinessManager, klet.startupManager, klet.runner, kubeDeps.Recorder) @@ -924,8 +927,9 @@ type Kubelet struct { // Handles container probing. probeManager prober.Manager // Manages container health check results. - livenessManager proberesults.Manager - startupManager proberesults.Manager + livenessManager proberesults.Manager + readinessManager proberesults.Manager + startupManager proberesults.Manager // How long to keep idle streaming command execution/port forwarding // connections open before terminating them @@ -1438,7 +1442,6 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { // Start component sync loops. kl.statusManager.Start() - kl.probeManager.Start() // Start syncing RuntimeClasses if enabled. if kl.runtimeClassManager != nil { @@ -1902,8 +1905,8 @@ func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHand // * plegCh: update the runtime cache; sync pod // * syncCh: sync all pods waiting for sync // * housekeepingCh: trigger cleanup of pods -// * liveness/startup manager: sync pods that have failed or in which one or more -// containers have failed liveness/startup checks +// * health manager: sync pods that have failed or in which one or more +// containers have failed health checks func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler, syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool { select { @@ -1978,13 +1981,16 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle handler.HandlePodSyncs(podsToSync) case update := <-kl.livenessManager.Updates(): if update.Result == proberesults.Failure { - // The liveness manager detected a failure; sync the pod. - syncPod(kl, update, handler) + handleProbeSync(kl, update, handler, "liveness", "unhealthy") } + case update := <-kl.readinessManager.Updates(): + ready := update.Result == proberesults.Success + kl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready) + handleProbeSync(kl, update, handler, "readiness", map[bool]string{true: "ready", false: ""}[ready]) case update := <-kl.startupManager.Updates(): started := update.Result == proberesults.Success kl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started) - syncPod(kl, update, handler) + handleProbeSync(kl, update, handler, "startup", map[bool]string{true: "started", false: "unhealthy"}[started]) case <-housekeepingCh: if !kl.sourcesReady.AllReady() { // If the sources aren't ready or volume manager has not yet synced the states, @@ -2000,15 +2006,19 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle return true } -func syncPod(kl *Kubelet, update proberesults.Update, handler SyncHandler) { +func handleProbeSync(kl *Kubelet, update proberesults.Update, handler SyncHandler, probe, status string) { + probeAndStatus := probe + if len(status) > 0 { + probeAndStatus = fmt.Sprintf("%s (container %s)", probe, status) + } // We should not use the pod from manager, because it is never updated after initialization. pod, ok := kl.podManager.GetPodByUID(update.PodUID) if !ok { // If the pod no longer exists, ignore the update. - klog.V(4).Infof("SyncLoop: ignore irrelevant update: %#v", update) + klog.V(4).Infof("SyncLoop %s: ignore irrelevant update: %#v", probeAndStatus, update) return } - klog.V(1).Infof("SyncLoop: %q", format.Pod(pod)) + klog.V(1).Infof("SyncLoop %s: %q", probeAndStatus, format.Pod(pod)) handler.HandlePodSyncs([]*v1.Pod{pod}) } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index b6272dc80467..a32a50b1f299 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -241,6 +241,7 @@ func newTestKubeletWithImageList( kubelet.probeManager = probetest.FakeManager{} kubelet.livenessManager = proberesults.NewManager() + kubelet.readinessManager = proberesults.NewManager() kubelet.startupManager = proberesults.NewManager() fakeContainerManager := cm.NewFakeContainerManager() diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager.go b/pkg/kubelet/kuberuntime/kuberuntime_manager.go index c1fb52bba4b9..abec903e2dc8 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager.go @@ -101,8 +101,9 @@ type kubeGenericRuntimeManager struct { runtimeHelper kubecontainer.RuntimeHelper // Health check results. - livenessManager proberesults.Manager - startupManager proberesults.Manager + livenessManager proberesults.Manager + readinessManager proberesults.Manager + startupManager proberesults.Manager // If true, enforce container cpu limits with CFS quota support cpuCFSQuota bool @@ -159,6 +160,7 @@ type LegacyLogProvider interface { func NewKubeGenericRuntimeManager( recorder record.EventRecorder, livenessManager proberesults.Manager, + readinessManager proberesults.Manager, startupManager proberesults.Manager, seccompProfileRoot string, machineInfo *cadvisorapi.MachineInfo, @@ -187,6 +189,7 @@ func NewKubeGenericRuntimeManager( cpuCFSQuotaPeriod: cpuCFSQuotaPeriod, seccompProfileRoot: seccompProfileRoot, livenessManager: livenessManager, + readinessManager: readinessManager, startupManager: startupManager, machineInfo: machineInfo, osInterface: osInterface, diff --git a/pkg/kubelet/prober/common_test.go b/pkg/kubelet/prober/common_test.go index 5152bb1fcffc..fb39e69037cb 100644 --- a/pkg/kubelet/prober/common_test.go +++ b/pkg/kubelet/prober/common_test.go @@ -111,6 +111,7 @@ func newTestManager() *manager { status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}), results.NewManager(), results.NewManager(), + results.NewManager(), nil, // runner &record.FakeRecorder{}, ).(*manager) diff --git a/pkg/kubelet/prober/prober_manager.go b/pkg/kubelet/prober/prober_manager.go index 1d829e62bd10..a661d7a1795e 100644 --- a/pkg/kubelet/prober/prober_manager.go +++ b/pkg/kubelet/prober/prober_manager.go @@ -17,14 +17,13 @@ limitations under the License. package prober import ( - "k8s.io/apimachinery/pkg/util/clock" "sync" "time" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/record" "k8s.io/component-base/metrics" "k8s.io/klog/v2" @@ -71,9 +70,6 @@ type Manager interface { // UpdatePodStatus modifies the given PodStatus with the appropriate Ready state for each // container based on container running status, cached probe results and worker states. UpdatePodStatus(types.UID, *v1.PodStatus) - - // Start starts the Manager sync loops. - Start() } type manager struct { @@ -104,12 +100,12 @@ type manager struct { func NewManager( statusManager status.Manager, livenessManager results.Manager, + readinessManager results.Manager, startupManager results.Manager, runner kubecontainer.CommandRunner, recorder record.EventRecorder) Manager { prober := newProber(runner, recorder) - readinessManager := results.NewManager() return &manager{ statusManager: statusManager, prober: prober, @@ -121,12 +117,6 @@ func NewManager( } } -// Start syncing probe status. This should only be called once. -func (m *manager) Start() { - // Start syncing readiness. - go wait.Forever(m.updateReadiness, 0) -} - // Key uniquely identifying container probes type probeKey struct { podUID types.UID @@ -263,6 +253,7 @@ func (m *manager) UpdatePodStatus(podUID types.UID, podStatus *v1.PodStatus) { select { case w.manualTriggerCh <- struct{}{}: default: // Non-blocking. + klog.Warningf("Failed to trigger a manual run of %s probe", w.probeType.String()) } } } @@ -300,10 +291,3 @@ func (m *manager) workerCount() int { defer m.workerLock.RUnlock() return len(m.workers) } - -func (m *manager) updateReadiness() { - update := <-m.readinessManager.Updates() - - ready := update.Result == results.Success - m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready) -} diff --git a/pkg/kubelet/prober/prober_manager_test.go b/pkg/kubelet/prober/prober_manager_test.go index cc9c16e769d5..70b8c77a71aa 100644 --- a/pkg/kubelet/prober/prober_manager_test.go +++ b/pkg/kubelet/prober/prober_manager_test.go @@ -319,6 +319,13 @@ func TestUpdatePodStatus(t *testing.T) { } } +func (m *manager) extractedReadinessHandling() { + update := <-m.readinessManager.Updates() + // This code corresponds to an extract from kubelet.syncLoopIteration() + ready := update.Result == results.Success + m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready) +} + func TestUpdateReadiness(t *testing.T) { testPod := getTestPod() setTestProbe(testPod, readiness, v1.Probe{}) @@ -327,10 +334,10 @@ func TestUpdateReadiness(t *testing.T) { // Start syncing readiness without leaking goroutine. stopCh := make(chan struct{}) - go wait.Until(m.updateReadiness, 0, stopCh) + go wait.Until(m.extractedReadinessHandling, 0, stopCh) defer func() { close(stopCh) - // Send an update to exit updateReadiness() + // Send an update to exit extractedReadinessHandling() m.readinessManager.Set(kubecontainer.ContainerID{}, results.Success, &v1.Pod{}) }() diff --git a/test/e2e/common/node/container_probe.go b/test/e2e/common/node/container_probe.go index 1c0c4cabf220..7f5d81ca2fff 100644 --- a/test/e2e/common/node/container_probe.go +++ b/test/e2e/common/node/container_probe.go @@ -373,27 +373,18 @@ var _ = SIGDescribe("Probing container", func() { Testname: Pod readiness probe, delayed by startup probe Description: A Pod is created with startup and readiness probes. The Container is started by creating /tmp/startup after 45 seconds, delaying the ready state by this amount of time. This is similar to the "Pod readiness probe, with initial delay" test. */ - ginkgo.It("should not be ready until startupProbe succeeds", func() { + ginkgo.It("should be ready immediately after startupProbe succeeds", func() { sleepBeforeStarted := time.Duration(45) - cmd := []string{"/bin/sh", "-c", fmt.Sprintf("echo ok >/tmp/health; sleep %d; echo ok >/tmp/startup; sleep 600", sleepBeforeStarted)} + cmd := []string{"/bin/sh", "-c", fmt.Sprintf("sleep %d; echo ok >/tmp/startup; sleep 600", sleepBeforeStarted)} readinessProbe := &v1.Probe{ - Handler: v1.Handler{ - Exec: &v1.ExecAction{ - Command: []string{"cat", "/tmp/health"}, - }, - }, + Handler: execHandler([]string{"/bin/true"}), InitialDelaySeconds: 0, PeriodSeconds: 60, } startupProbe := &v1.Probe{ - Handler: v1.Handler{ - Exec: &v1.ExecAction{ - Command: []string{"cat", "/tmp/startup"}, - }, - }, + Handler: execHandler([]string{"/bin/cat", "/tmp/startup"}), InitialDelaySeconds: 0, - PeriodSeconds: 1, - FailureThreshold: 600, + FailureThreshold: 60, } p := podClient.Create(startupPodSpec(startupProbe, readinessProbe, nil, cmd)) @@ -416,12 +407,13 @@ var _ = SIGDescribe("Probing container", func() { startedTime, err := GetContainerStartedTime(p, "busybox") framework.ExpectNoError(err) - framework.Logf("Container started at %v, pod became ready at %v", startedTime, readyTime) - if readyTime.Sub(startedTime) < sleepBeforeStarted*time.Second { + readyIn := readyTime.Sub(startedTime) - sleepBeforeStarted*time.Second + framework.Logf("Container started at %v, pod became ready at %v, %v after startupProbe succeeded", startedTime, readyTime, readyIn) + if readyIn < 0 { framework.Failf("Pod became ready before startupProbe succeeded") } - if readyTime.Sub(startedTime) > (sleepBeforeStarted+20)*time.Second { - framework.Failf("Pod became ready more than 20s after startupProbe succeeded") + if readyIn > 5*time.Second { + framework.Failf("Pod became ready in %v, more than 5s after startupProbe succeeded. It means that the delay readiness probes were not initiated immediately after startup finished.", readyIn) } }) }) From b203fb0565607f37f907cc89e2d257e93ae4e9b8 Mon Sep 17 00:00:00 2001 From: Matthias Bertschy Date: Sat, 6 Mar 2021 11:58:03 +0100 Subject: [PATCH 4/4] Deflake e2e test for startupProbe --- test/e2e/common/node/container_probe.go | 24 ++++++++++++------------ test/e2e/framework/pod/resource.go | 14 ++++++++++++++ test/e2e/framework/pod/wait.go | 5 +++++ 3 files changed, 31 insertions(+), 12 deletions(-) diff --git a/test/e2e/common/node/container_probe.go b/test/e2e/common/node/container_probe.go index 7f5d81ca2fff..20aa75b8e872 100644 --- a/test/e2e/common/node/container_probe.go +++ b/test/e2e/common/node/container_probe.go @@ -374,10 +374,9 @@ var _ = SIGDescribe("Probing container", func() { Description: A Pod is created with startup and readiness probes. The Container is started by creating /tmp/startup after 45 seconds, delaying the ready state by this amount of time. This is similar to the "Pod readiness probe, with initial delay" test. */ ginkgo.It("should be ready immediately after startupProbe succeeds", func() { - sleepBeforeStarted := time.Duration(45) - cmd := []string{"/bin/sh", "-c", fmt.Sprintf("sleep %d; echo ok >/tmp/startup; sleep 600", sleepBeforeStarted)} + cmd := []string{"/bin/sh", "-c", "echo ok >/tmp/health; sleep 10; echo ok >/tmp/startup; sleep 600"} readinessProbe := &v1.Probe{ - Handler: execHandler([]string{"/bin/true"}), + Handler: execHandler([]string{"/bin/cat", "/tmp/health"}), InitialDelaySeconds: 0, PeriodSeconds: 60, } @@ -391,7 +390,15 @@ var _ = SIGDescribe("Probing container", func() { p, err := podClient.Get(context.TODO(), p.Name, metav1.GetOptions{}) framework.ExpectNoError(err) - e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, p.Name, f.Namespace.Name, framework.PodStartTimeout) + err = e2epod.WaitForPodContainerStarted(f.ClientSet, f.Namespace.Name, p.Name, 0, framework.PodStartTimeout) + framework.ExpectNoError(err) + startedTime := time.Now() + + // We assume the pod became ready when the container became ready. This + // is true for a single container pod. + err = e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, p.Name, f.Namespace.Name, framework.PodStartTimeout) + framework.ExpectNoError(err) + readyTime := time.Now() p, err = podClient.Get(context.TODO(), p.Name, metav1.GetOptions{}) framework.ExpectNoError(err) @@ -400,14 +407,7 @@ var _ = SIGDescribe("Probing container", func() { framework.ExpectNoError(err) framework.ExpectEqual(isReady, true, "pod should be ready") - // We assume the pod became ready when the container became ready. This - // is true for a single container pod. - readyTime, err := GetTransitionTimeForReadyCondition(p) - framework.ExpectNoError(err) - startedTime, err := GetContainerStartedTime(p, "busybox") - framework.ExpectNoError(err) - - readyIn := readyTime.Sub(startedTime) - sleepBeforeStarted*time.Second + readyIn := readyTime.Sub(startedTime) framework.Logf("Container started at %v, pod became ready at %v, %v after startupProbe succeeded", startedTime, readyTime, readyIn) if readyIn < 0 { framework.Failf("Pod became ready before startupProbe succeeded") diff --git a/test/e2e/framework/pod/resource.go b/test/e2e/framework/pod/resource.go index 668cf263ad77..23f4c565b9f0 100644 --- a/test/e2e/framework/pod/resource.go +++ b/test/e2e/framework/pod/resource.go @@ -321,6 +321,20 @@ func podContainerFailed(c clientset.Interface, namespace, podName string, contai } } +func podContainerStarted(c clientset.Interface, namespace, podName string, containerIndex int) wait.ConditionFunc { + return func() (bool, error) { + pod, err := c.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{}) + if err != nil { + return false, err + } + if containerIndex > len(pod.Status.ContainerStatuses)-1 { + return false, nil + } + containerStatus := pod.Status.ContainerStatuses[containerIndex] + return *containerStatus.Started, nil + } +} + // LogPodStates logs basic info of provided pods for debugging. func LogPodStates(pods []v1.Pod) { // Find maximum widths for pod, node, and phase strings for column printing. diff --git a/test/e2e/framework/pod/wait.go b/test/e2e/framework/pod/wait.go index cb107a920b83..6de8e87363e9 100644 --- a/test/e2e/framework/pod/wait.go +++ b/test/e2e/framework/pod/wait.go @@ -542,3 +542,8 @@ func WaitForNRestartablePods(ps *testutils.PodStore, expect int, timeout time.Du func WaitForPodContainerToFail(c clientset.Interface, namespace, podName string, containerIndex int, reason string, timeout time.Duration) error { return wait.PollImmediate(poll, timeout, podContainerFailed(c, namespace, podName, containerIndex, reason)) } + +// WaitForPodContainerStarted waits for the given Pod container to start, after a successful run of the startupProbe. +func WaitForPodContainerStarted(c clientset.Interface, namespace, podName string, containerIndex int, timeout time.Duration) error { + return wait.PollImmediate(poll, timeout, podContainerStarted(c, namespace, podName, containerIndex)) +}