Skip to content

Commit

Permalink
startupProbe: Kubelet changes
Browse files Browse the repository at this point in the history
  • Loading branch information
matthyx committed Aug 29, 2019
1 parent e4d26f8 commit 323f99e
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 12 deletions.
21 changes: 14 additions & 7 deletions pkg/kubelet/prober/prober.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,24 @@ import (
"k8s.io/kubernetes/pkg/probe"
execprobe "k8s.io/kubernetes/pkg/probe/exec"
httpprobe "k8s.io/kubernetes/pkg/probe/http"
tcprobe "k8s.io/kubernetes/pkg/probe/tcp"
tcpprobe "k8s.io/kubernetes/pkg/probe/tcp"
"k8s.io/utils/exec"

"k8s.io/klog"
)

const maxProbeRetries = 3

// Prober helps to check the liveness/readiness of a container.
// Prober helps to check the liveness/readiness/startup of a container.
type prober struct {
exec execprobe.Prober
// probe types needs different httprobe instances so they don't
// share a connection pool which can cause collsions to the
// same host:port and transient failures. See #49740.
readinessHTTP httpprobe.Prober
livenessHTTP httpprobe.Prober
tcp tcprobe.Prober
startupHTTP httpprobe.Prober
tcp tcpprobe.Prober
runner kubecontainer.ContainerCommandRunner

refManager *kubecontainer.RefManager
Expand All @@ -71,7 +72,8 @@ func newProber(
exec: execprobe.New(),
readinessHTTP: httpprobe.New(followNonLocalRedirects),
livenessHTTP: httpprobe.New(followNonLocalRedirects),
tcp: tcprobe.New(),
startupHTTP: httpprobe.New(followNonLocalRedirects),
tcp: tcpprobe.New(),
runner: runner,
refManager: refManager,
recorder: recorder,
Expand All @@ -86,6 +88,8 @@ func (pb *prober) probe(probeType probeType, pod *v1.Pod, status v1.PodStatus, c
probeSpec = container.ReadinessProbe
case liveness:
probeSpec = container.LivenessProbe
case startup:
probeSpec = container.StartupProbe
default:
return results.Failure, fmt.Errorf("unknown probe type: %q", probeType)
}
Expand Down Expand Up @@ -174,11 +178,14 @@ func (pb *prober) runProbe(probeType probeType, p *v1.Probe, pod *v1.Pod, status
url := formatURL(scheme, host, port, path)
headers := buildHeader(p.HTTPGet.HTTPHeaders)
klog.V(4).Infof("HTTP-Probe Headers: %v", headers)
if probeType == liveness {
switch probeType {
case liveness:
return pb.livenessHTTP.Probe(url, headers, timeout)
case startup:
return pb.startupHTTP.Probe(url, headers, timeout)
default:
return pb.readinessHTTP.Probe(url, headers, timeout)
}
// readiness
return pb.readinessHTTP.Probe(url, headers, timeout)
}
if p.TCPSocket != nil {
port, err := extractPort(p.TCPSocket.Port, container)
Expand Down
52 changes: 49 additions & 3 deletions pkg/kubelet/prober/prober_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
"k8s.io/component-base/metrics"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/features"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/status"
Expand All @@ -37,7 +39,7 @@ var ProberResults = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: "prober",
Name: "probe_total",
Help: "Cumulative number of a liveness or readiness probe for a container by result.",
Help: "Cumulative number of a liveness, readiness or startup probe for a container by result.",
StabilityLevel: metrics.ALPHA,
},
[]string{"probe_type",
Expand Down Expand Up @@ -89,6 +91,9 @@ type manager struct {
// livenessManager manages the results of liveness probes
livenessManager results.Manager

// startupManager manages the results of startup probes
startupManager results.Manager

// prober executes the probe actions.
prober *prober
}
Expand All @@ -103,11 +108,13 @@ func NewManager(

prober := newProber(runner, refManager, recorder)
readinessManager := results.NewManager()
startupManager := results.NewManager()
return &manager{
statusManager: statusManager,
prober: prober,
readinessManager: readinessManager,
livenessManager: livenessManager,
startupManager: startupManager,
workers: make(map[probeKey]*worker),
}
}
Expand All @@ -116,6 +123,8 @@ func NewManager(
func (m *manager) Start() {
// Start syncing readiness.
go wait.Forever(m.updateReadiness, 0)
// Start syncing startup.
go wait.Forever(m.updateStartup, 0)
}

// Key uniquely identifying container probes
Expand All @@ -125,12 +134,13 @@ type probeKey struct {
probeType probeType
}

// Type of probe (readiness or liveness)
// Type of probe (liveness, readiness or startup)
type probeType int

const (
liveness probeType = iota
readiness
startup

probeResultSuccessful string = "successful"
probeResultFailed string = "failed"
Expand All @@ -144,6 +154,8 @@ func (t probeType) String() string {
return "Readiness"
case liveness:
return "Liveness"
case startup:
return "Startup"
default:
return "UNKNOWN"
}
Expand All @@ -157,6 +169,18 @@ func (m *manager) AddPod(pod *v1.Pod) {
for _, c := range pod.Spec.Containers {
key.containerName = c.Name

if c.StartupProbe != nil && utilfeature.DefaultFeatureGate.Enabled(features.StartupProbe) {
key.probeType = startup
if _, ok := m.workers[key]; ok {
klog.Errorf("Startup probe already exists! %v - %v",
format.Pod(pod), c.Name)
return
}
w := newWorker(m, startup, pod, c)
m.workers[key] = w
go w.run()
}

if c.ReadinessProbe != nil {
key.probeType = readiness
if _, ok := m.workers[key]; ok {
Expand Down Expand Up @@ -190,7 +214,7 @@ func (m *manager) RemovePod(pod *v1.Pod) {
key := probeKey{podUID: pod.UID}
for _, c := range pod.Spec.Containers {
key.containerName = c.Name
for _, probeType := range [...]probeType{readiness, liveness} {
for _, probeType := range [...]probeType{readiness, liveness, startup} {
key.probeType = probeType
if worker, ok := m.workers[key]; ok {
worker.stop()
Expand Down Expand Up @@ -223,6 +247,21 @@ func (m *manager) UpdatePodStatus(podUID types.UID, podStatus *v1.PodStatus) {
ready = !exists
}
podStatus.ContainerStatuses[i].Ready = ready

var started bool
if c.State.Running == nil {
started = false
} else if !utilfeature.DefaultFeatureGate.Enabled(features.StartupProbe) {
// the container is running, assume it is started if the StartupProbe feature is disabled
started = true
} else if result, ok := m.startupManager.Get(kubecontainer.ParseContainerID(c.ContainerID)); ok {
started = result == results.Success
} else {
// The check whether there is a probe which hasn't run yet.
_, exists := m.getWorker(podUID, c.Name, startup)
started = !exists
}
podStatus.ContainerStatuses[i].Started = &started
}
// init containers are ready if they have exited with success or if a readiness probe has
// succeeded.
Expand Down Expand Up @@ -262,3 +301,10 @@ func (m *manager) updateReadiness() {
ready := update.Result == results.Success
m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
}

func (m *manager) updateStartup() {
update := <-m.startupManager.Updates()

started := update.Result == results.Success
m.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)
}
21 changes: 19 additions & 2 deletions pkg/kubelet/prober/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ func newWorker(
w.spec = container.LivenessProbe
w.resultsManager = m.livenessManager
w.initialValue = results.Success
case startup:
w.spec = container.StartupProbe
w.resultsManager = m.startupManager
w.initialValue = results.Failure
}

basicMetricLabels := prometheus.Labels{
Expand Down Expand Up @@ -218,10 +222,23 @@ func (w *worker) doProbe() (keepGoing bool) {
w.pod.Spec.RestartPolicy != v1.RestartPolicyNever
}

// Probe disabled for InitialDelaySeconds.
if int32(time.Since(c.State.Running.StartedAt.Time).Seconds()) < w.spec.InitialDelaySeconds {
return true
}

if c.Started != nil && *c.Started {
// Stop probing for startup once container has started.
if w.probeType == startup {
return true
}
} else {
// Disable other probes until container has started.
if w.probeType != startup {
return true
}
}

// TODO: in order for exec probes to correctly handle downward API env, we must be able to reconstruct
// the full container environment here, OR we must make a call to the CRI in order to get those environment
// values from the running container.
Expand Down Expand Up @@ -255,8 +272,8 @@ func (w *worker) doProbe() (keepGoing bool) {

w.resultsManager.Set(w.containerID, result, w.pod)

if w.probeType == liveness && result == results.Failure {
// The container fails a liveness check, it will need to be restarted.
if (w.probeType == liveness || w.probeType == startup) && result == results.Failure {
// The container fails a liveness/startup check, it will need to be restarted.
// Stop probing until we see a new container ID. This is to reduce the
// chance of hitting #21751, where running `docker exec` when a
// container is being stopped may lead to corrupted container state.
Expand Down
43 changes: 43 additions & 0 deletions pkg/kubelet/status/status_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ type Manager interface {
// triggers a status update.
SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool)

// SetContainerStartup updates the cached container status with the given startup, and
// triggers a status update.
SetContainerStartup(podUID types.UID, containerID kubecontainer.ContainerID, started bool)

// TerminatePod resets the container status for the provided pod to terminated and triggers
// a status update.
TerminatePod(pod *v1.Pod)
Expand Down Expand Up @@ -248,6 +252,45 @@ func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontai
m.updateStatusInternal(pod, status, false)
}

func (m *manager) SetContainerStartup(podUID types.UID, containerID kubecontainer.ContainerID, started bool) {
m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock()

pod, ok := m.podManager.GetPodByUID(podUID)
if !ok {
klog.V(4).Infof("Pod %q has been deleted, no need to update startup", string(podUID))
return
}

oldStatus, found := m.podStatuses[pod.UID]
if !found {
klog.Warningf("Container startup changed before pod has synced: %q - %q",
format.Pod(pod), containerID.String())
return
}

// Find the container to update.
containerStatus, _, ok := findContainerStatus(&oldStatus.status, containerID.String())
if !ok {
klog.Warningf("Container startup changed for unknown container: %q - %q",
format.Pod(pod), containerID.String())
return
}

if containerStatus.Started != nil && *containerStatus.Started == started {
klog.V(4).Infof("Container startup unchanged (%v): %q - %q", started,
format.Pod(pod), containerID.String())
return
}

// Make sure we're not updating the cached version.
status := *oldStatus.status.DeepCopy()
containerStatus, _, _ = findContainerStatus(&status, containerID.String())
containerStatus.Started = &started

m.updateStatusInternal(pod, status, false)
}

func findContainerStatus(status *v1.PodStatus, containerID string) (containerStatus *v1.ContainerStatus, init bool, ok bool) {
// Find the container to update.
for i, c := range status.ContainerStatuses {
Expand Down

0 comments on commit 323f99e

Please sign in to comment.