Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make all health checks probing consistent #98376

Merged
merged 4 commits into from Mar 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
50 changes: 33 additions & 17 deletions pkg/kubelet/kubelet.go
Expand Up @@ -581,6 +581,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
imageBackOff := flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)

klet.livenessManager = proberesults.NewManager()
klet.readinessManager = proberesults.NewManager()
klet.startupManager = proberesults.NewManager()
klet.podCache = kubecontainer.NewCache()

Expand Down Expand Up @@ -618,6 +619,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
klet.livenessManager,
klet.readinessManager,
klet.startupManager,
seccompProfileRoot,
machineInfo,
Expand Down Expand Up @@ -716,6 +718,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.probeManager = prober.NewManager(
klet.statusManager,
klet.livenessManager,
klet.readinessManager,
klet.startupManager,
klet.runner,
kubeDeps.Recorder)
Expand Down Expand Up @@ -924,8 +927,9 @@ type Kubelet struct {
// Handles container probing.
probeManager prober.Manager
// Manages container health check results.
livenessManager proberesults.Manager
startupManager proberesults.Manager
livenessManager proberesults.Manager
readinessManager proberesults.Manager
startupManager proberesults.Manager

// How long to keep idle streaming command execution/port forwarding
// connections open before terminating them
Expand Down Expand Up @@ -1438,7 +1442,6 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {

// Start component sync loops.
kl.statusManager.Start()
kl.probeManager.Start()

// Start syncing RuntimeClasses if enabled.
if kl.runtimeClassManager != nil {
Expand Down Expand Up @@ -1902,8 +1905,8 @@ func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHand
// * plegCh: update the runtime cache; sync pod
// * syncCh: sync all pods waiting for sync
// * housekeepingCh: trigger cleanup of pods
// * liveness manager: sync pods that have failed or in which one or more
// containers have failed liveness checks
// * health manager: sync pods that have failed or in which one or more
// containers have failed health checks
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
Expand Down Expand Up @@ -1978,19 +1981,16 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle
handler.HandlePodSyncs(podsToSync)
case update := <-kl.livenessManager.Updates():
if update.Result == proberesults.Failure {
// The liveness manager detected a failure; sync the pod.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment might not be necessary anymore?

// We should not use the pod from livenessManager, because it is never updated after
// initialization.
pod, ok := kl.podManager.GetPodByUID(update.PodUID)
if !ok {
// If the pod no longer exists, ignore the update.
klog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update)
break
}
klog.V(1).Infof("SyncLoop (container unhealthy): %q", format.Pod(pod))
handler.HandlePodSyncs([]*v1.Pod{pod})
handleProbeSync(kl, update, handler, "liveness", "unhealthy")
}
case update := <-kl.readinessManager.Updates():
ready := update.Result == proberesults.Success
kl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
handleProbeSync(kl, update, handler, "readiness", map[bool]string{true: "ready", false: ""}[ready])
case update := <-kl.startupManager.Updates():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why it is advantageous to handle these all this way in this sync loop? Why were readinessManager and startupManager not in here before? Would it make sense to have the livenessManager behave like them rather than the other way around? (I wish this was documented in a comment from before!)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main benefit, apart from being easier to present/understand, is to allow me to call the HandlePodSyncs in a consistent manner after each *Manager update.

started := update.Result == proberesults.Success
kl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)
handleProbeSync(kl, update, handler, "startup", map[bool]string{true: "started", false: "unhealthy"}[started])
case <-housekeepingCh:
if !kl.sourcesReady.AllReady() {
// If the sources aren't ready or volume manager has not yet synced the states,
Expand All @@ -2006,6 +2006,22 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle
return true
}

func handleProbeSync(kl *Kubelet, update proberesults.Update, handler SyncHandler, probe, status string) {
probeAndStatus := probe
if len(status) > 0 {
probeAndStatus = fmt.Sprintf("%s (container %s)", probe, status)
}
// We should not use the pod from manager, because it is never updated after initialization.
pod, ok := kl.podManager.GetPodByUID(update.PodUID)
if !ok {
// If the pod no longer exists, ignore the update.
klog.V(4).Infof("SyncLoop %s: ignore irrelevant update: %#v", probeAndStatus, update)
return
}
klog.V(1).Infof("SyncLoop %s: %q", probeAndStatus, format.Pod(pod))
handler.HandlePodSyncs([]*v1.Pod{pod})
}

// dispatchWork starts the asynchronous sync of the pod in a pod worker.
// If the pod has completed termination, dispatchWork will perform no action.
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
Expand Down
1 change: 1 addition & 0 deletions pkg/kubelet/kubelet_test.go
Expand Up @@ -241,6 +241,7 @@ func newTestKubeletWithImageList(

kubelet.probeManager = probetest.FakeManager{}
kubelet.livenessManager = proberesults.NewManager()
kubelet.readinessManager = proberesults.NewManager()
kubelet.startupManager = proberesults.NewManager()

fakeContainerManager := cm.NewFakeContainerManager()
Expand Down
7 changes: 5 additions & 2 deletions pkg/kubelet/kuberuntime/kuberuntime_manager.go
Expand Up @@ -101,8 +101,9 @@ type kubeGenericRuntimeManager struct {
runtimeHelper kubecontainer.RuntimeHelper

// Health check results.
livenessManager proberesults.Manager
startupManager proberesults.Manager
livenessManager proberesults.Manager
readinessManager proberesults.Manager
startupManager proberesults.Manager

// If true, enforce container cpu limits with CFS quota support
cpuCFSQuota bool
Expand Down Expand Up @@ -159,6 +160,7 @@ type LegacyLogProvider interface {
func NewKubeGenericRuntimeManager(
recorder record.EventRecorder,
livenessManager proberesults.Manager,
readinessManager proberesults.Manager,
startupManager proberesults.Manager,
seccompProfileRoot string,
machineInfo *cadvisorapi.MachineInfo,
Expand Down Expand Up @@ -187,6 +189,7 @@ func NewKubeGenericRuntimeManager(
cpuCFSQuotaPeriod: cpuCFSQuotaPeriod,
seccompProfileRoot: seccompProfileRoot,
livenessManager: livenessManager,
readinessManager: readinessManager,
startupManager: startupManager,
machineInfo: machineInfo,
osInterface: osInterface,
Expand Down
1 change: 1 addition & 0 deletions pkg/kubelet/prober/common_test.go
Expand Up @@ -111,6 +111,7 @@ func newTestManager() *manager {
status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}),
results.NewManager(),
results.NewManager(),
results.NewManager(),
nil, // runner
&record.FakeRecorder{},
).(*manager)
Expand Down
45 changes: 16 additions & 29 deletions pkg/kubelet/prober/prober_manager.go
Expand Up @@ -18,11 +18,12 @@ package prober

import (
"sync"
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
"k8s.io/component-base/metrics"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -69,9 +70,6 @@ type Manager interface {
// UpdatePodStatus modifies the given PodStatus with the appropriate Ready state for each
// container based on container running status, cached probe results and worker states.
UpdatePodStatus(types.UID, *v1.PodStatus)

// Start starts the Manager sync loops.
Start()
}

type manager struct {
Expand All @@ -94,36 +92,31 @@ type manager struct {

// prober executes the probe actions.
prober *prober

start time.Time
}

// NewManager creates a Manager for pod probing.
func NewManager(
statusManager status.Manager,
livenessManager results.Manager,
readinessManager results.Manager,
startupManager results.Manager,
runner kubecontainer.CommandRunner,
recorder record.EventRecorder) Manager {

prober := newProber(runner, recorder)
readinessManager := results.NewManager()
return &manager{
statusManager: statusManager,
prober: prober,
readinessManager: readinessManager,
livenessManager: livenessManager,
startupManager: startupManager,
workers: make(map[probeKey]*worker),
start: clock.RealClock{}.Now(),
}
}

// Start syncing probe status. This should only be called once.
func (m *manager) Start() {
// Start syncing readiness.
go wait.Forever(m.updateReadiness, 0)
// Start syncing startup.
go wait.Forever(m.updateStartup, 0)
}

// Key uniquely identifying container probes
type probeKey struct {
podUID types.UID
Expand Down Expand Up @@ -253,8 +246,16 @@ func (m *manager) UpdatePodStatus(podUID types.UID, podStatus *v1.PodStatus) {
ready = result == results.Success
} else {
// The check whether there is a probe which hasn't run yet.
_, exists := m.getWorker(podUID, c.Name, readiness)
ready = !exists
w, exists := m.getWorker(podUID, c.Name, readiness)
ready = !exists // no readinessProbe -> always ready
if exists {
// Trigger an immediate run of the readinessProbe to update ready state
select {
case w.manualTriggerCh <- struct{}{}:
default: // Non-blocking.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add logging that we failed to manually trigger the update?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know... does it work putting a log under default?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

klog.Warningf("Failed to trigger a manual run of %s probe", w.probeType.String())
}
}
}
podStatus.ContainerStatuses[i].Ready = ready
}
Expand Down Expand Up @@ -290,17 +291,3 @@ func (m *manager) workerCount() int {
defer m.workerLock.RUnlock()
return len(m.workers)
}

func (m *manager) updateReadiness() {
update := <-m.readinessManager.Updates()

ready := update.Result == results.Success
m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
}

func (m *manager) updateStartup() {
update := <-m.startupManager.Updates()

started := update.Result == results.Success
m.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)
}
11 changes: 9 additions & 2 deletions pkg/kubelet/prober/prober_manager_test.go
Expand Up @@ -319,6 +319,13 @@ func TestUpdatePodStatus(t *testing.T) {
}
}

func (m *manager) extractedReadinessHandling() {
update := <-m.readinessManager.Updates()
// This code corresponds to an extract from kubelet.syncLoopIteration()
ready := update.Result == results.Success
m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
}

func TestUpdateReadiness(t *testing.T) {
testPod := getTestPod()
setTestProbe(testPod, readiness, v1.Probe{})
Expand All @@ -327,10 +334,10 @@ func TestUpdateReadiness(t *testing.T) {

// Start syncing readiness without leaking goroutine.
stopCh := make(chan struct{})
go wait.Until(m.updateReadiness, 0, stopCh)
go wait.Until(m.extractedReadinessHandling, 0, stopCh)
defer func() {
close(stopCh)
// Send an update to exit updateReadiness()
// Send an update to exit extractedReadinessHandling()
m.readinessManager.Set(kubecontainer.ContainerID{}, results.Success, &v1.Pod{})
}()

Expand Down
20 changes: 14 additions & 6 deletions pkg/kubelet/prober/worker.go
Expand Up @@ -38,6 +38,9 @@ type worker struct {
// Channel for stopping the probe.
stopCh chan struct{}

// Channel for triggering the probe manually.
manualTriggerCh chan struct{}

// The pod containing this probe (read-only)
pod *v1.Pod

Expand Down Expand Up @@ -82,11 +85,12 @@ func newWorker(
container v1.Container) *worker {

w := &worker{
stopCh: make(chan struct{}, 1), // Buffer so stop() can be non-blocking.
pod: pod,
container: container,
probeType: probeType,
probeManager: m,
stopCh: make(chan struct{}, 1), // Buffer so stop() can be non-blocking.
manualTriggerCh: make(chan struct{}, 1), // Buffer so prober_manager can do non-blocking calls to doProbe.
pod: pod,
container: container,
probeType: probeType,
probeManager: m,
}

switch probeType {
Expand Down Expand Up @@ -130,7 +134,10 @@ func (w *worker) run() {

// If kubelet restarted the probes could be started in rapid succession.
// Let the worker wait for a random portion of tickerPeriod before probing.
time.Sleep(time.Duration(rand.Float64() * float64(probeTickerPeriod)))
// Do it only if the kubelet has started recently.
if probeTickerPeriod > time.Since(w.probeManager.start) {
time.Sleep(time.Duration(rand.Float64() * float64(probeTickerPeriod)))
}

probeTicker := time.NewTicker(probeTickerPeriod)

Expand All @@ -154,6 +161,7 @@ probeLoop:
case <-w.stopCh:
break probeLoop
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the manualTriggerCh be closed here? What's the risk of keeping it not closed ever?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, we could say the same for the stopCh no?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. Not sure what the downside will be to not close it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I leave it like that, let's see what the approvers have to say.

case <-probeTicker.C:
case <-w.manualTriggerCh:
// continue
}
}
Expand Down
42 changes: 20 additions & 22 deletions test/e2e/common/node/container_probe.go
Expand Up @@ -373,22 +373,15 @@ var _ = SIGDescribe("Probing container", func() {
Testname: Pod readiness probe, delayed by startup probe
Description: A Pod is created with startup and readiness probes. The Container is started by creating /tmp/startup after 45 seconds, delaying the ready state by this amount of time. This is similar to the "Pod readiness probe, with initial delay" test.
*/
ginkgo.It("should not be ready until startupProbe succeeds", func() {
cmd := []string{"/bin/sh", "-c", "echo ok >/tmp/health; sleep 45; echo ok >/tmp/startup; sleep 600"}
ginkgo.It("should be ready immediately after startupProbe succeeds", func() {
cmd := []string{"/bin/sh", "-c", "echo ok >/tmp/health; sleep 10; echo ok >/tmp/startup; sleep 600"}
readinessProbe := &v1.Probe{
Handler: v1.Handler{
Exec: &v1.ExecAction{
Command: []string{"cat", "/tmp/health"},
},
},
Handler: execHandler([]string{"/bin/cat", "/tmp/health"}),
InitialDelaySeconds: 0,
PeriodSeconds: 60,
}
startupProbe := &v1.Probe{
Handler: v1.Handler{
Exec: &v1.ExecAction{
Command: []string{"cat", "/tmp/startup"},
},
},
Handler: execHandler([]string{"/bin/cat", "/tmp/startup"}),
InitialDelaySeconds: 0,
FailureThreshold: 60,
}
Expand All @@ -397,7 +390,15 @@ var _ = SIGDescribe("Probing container", func() {
p, err := podClient.Get(context.TODO(), p.Name, metav1.GetOptions{})
framework.ExpectNoError(err)

e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, p.Name, f.Namespace.Name, framework.PodStartTimeout)
err = e2epod.WaitForPodContainerStarted(f.ClientSet, f.Namespace.Name, p.Name, 0, framework.PodStartTimeout)
framework.ExpectNoError(err)
startedTime := time.Now()

// We assume the pod became ready when the container became ready. This
// is true for a single container pod.
err = e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, p.Name, f.Namespace.Name, framework.PodStartTimeout)
framework.ExpectNoError(err)
readyTime := time.Now()

p, err = podClient.Get(context.TODO(), p.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
Expand All @@ -406,17 +407,14 @@ var _ = SIGDescribe("Probing container", func() {
framework.ExpectNoError(err)
framework.ExpectEqual(isReady, true, "pod should be ready")

// We assume the pod became ready when the container became ready. This
// is true for a single container pod.
readyTime, err := GetTransitionTimeForReadyCondition(p)
framework.ExpectNoError(err)
startedTime, err := GetContainerStartedTime(p, "busybox")
framework.ExpectNoError(err)

framework.Logf("Container started at %v, pod became ready at %v", startedTime, readyTime)
if readyTime.Sub(startedTime) < 40*time.Second {
readyIn := readyTime.Sub(startedTime)
framework.Logf("Container started at %v, pod became ready at %v, %v after startupProbe succeeded", startedTime, readyTime, readyIn)
if readyIn < 0 {
framework.Failf("Pod became ready before startupProbe succeeded")
}
if readyIn > 5*time.Second {
framework.Failf("Pod became ready in %v, more than 5s after startupProbe succeeded. It means that the delay readiness probes were not initiated immediately after startup finished.", readyIn)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this is a suuuper long line. these are slightly more difficult to review on github.com. consider splitting this up.

}
})
})

Expand Down