Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #37865 #41569 #41610

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 1 addition & 5 deletions pkg/kubelet/kubelet.go
Expand Up @@ -688,6 +688,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub

klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime)
klet.runtimeState.addHealthCheck("PLEG", klet.pleg.Healthy)
klet.updatePodCIDR(kubeCfg.PodCIDR)

// setup containerGC
Expand Down Expand Up @@ -2034,11 +2035,6 @@ func (kl *Kubelet) LatestLoopEntryTime() time.Time {
return val.(time.Time)
}

// PLEGHealthCheck returns whether the PLEG is healty.
func (kl *Kubelet) PLEGHealthCheck() (bool, error) {
return kl.pleg.Healthy()
}

// updateRuntimeUp calls the container runtime status callback, initializing
// the runtime dependent modules when the container runtime first comes up,
// and returns an error if the status check fails. If the status check is OK,
Expand Down
15 changes: 8 additions & 7 deletions pkg/kubelet/pleg/generic.go
Expand Up @@ -75,6 +75,11 @@ const (
plegContainerExited plegContainerState = "exited"
plegContainerUnknown plegContainerState = "unknown"
plegContainerNonExistent plegContainerState = "non-existent"

// The threshold needs to be greater than the relisting period + the
// relisting time, which can vary significantly. Set a conservative
// threshold to avoid flipping between healthy and unhealthy.
relistThreshold = 3 * time.Minute
)

func convertState(state kubecontainer.ContainerState) plegContainerState {
Expand Down Expand Up @@ -126,13 +131,9 @@ func (g *GenericPLEG) Start() {

func (g *GenericPLEG) Healthy() (bool, error) {
relistTime := g.getRelistTime()
// TODO: Evaluate if we can reduce this threshold.
// The threshold needs to be greater than the relisting period + the
// relisting time, which can vary significantly. Set a conservative
// threshold so that we don't cause kubelet to be restarted unnecessarily.
threshold := 2 * time.Minute
if g.clock.Since(relistTime) > threshold {
return false, fmt.Errorf("pleg was last seen active at %v", relistTime)
elapsed := g.clock.Since(relistTime)
if elapsed > relistThreshold {
return false, fmt.Errorf("pleg was last seen active %v ago; threshold is %v", elapsed, relistThreshold)
}
return true, nil
}
Expand Down
22 changes: 22 additions & 0 deletions pkg/kubelet/runtime.go
Expand Up @@ -30,6 +30,22 @@ type runtimeState struct {
internalError error
cidr string
initError error
healthChecks []*healthCheck
}

// A health check function should be efficient and not rely on external
// components (e.g., container runtime).
type healthCheckFnType func() (bool, error)

type healthCheck struct {
name string
fn healthCheckFnType
}

func (s *runtimeState) addHealthCheck(name string, f healthCheckFnType) {
s.Lock()
defer s.Unlock()
s.healthChecks = append(s.healthChecks, &healthCheck{name: name, fn: f})
}

func (s *runtimeState) setRuntimeSync(t time.Time) {
Expand Down Expand Up @@ -81,6 +97,12 @@ func (s *runtimeState) runtimeErrors() []string {
if s.internalError != nil {
ret = append(ret, s.internalError.Error())
}
for _, hc := range s.healthChecks {
if ok, err := hc.fn(); !ok {
ret = append(ret, fmt.Sprintf("%s is not healthy: %v", hc.name, err))
}
}

return ret
}

Expand Down
10 changes: 0 additions & 10 deletions pkg/kubelet/server/server.go
Expand Up @@ -180,7 +180,6 @@ type HostInterface interface {
ImagesFsInfo() (cadvisorapiv2.FsInfo, error)
RootFsInfo() (cadvisorapiv2.FsInfo, error)
ListVolumesForPod(podUID types.UID) (map[string]volume.Volume, bool)
PLEGHealthCheck() (bool, error)
GetExec(podFullName string, podUID types.UID, containerName string, cmd []string, streamOpts remotecommand.Options) (*url.URL, error)
GetAttach(podFullName string, podUID types.UID, containerName string, streamOpts remotecommand.Options) (*url.URL, error)
GetPortForward(podName, podNamespace string, podUID types.UID) (*url.URL, error)
Expand Down Expand Up @@ -255,7 +254,6 @@ func (s *Server) InstallDefaultHandlers() {
healthz.InstallHandler(s.restfulCont,
healthz.PingHealthz,
healthz.NamedCheck("syncloop", s.syncLoopHealthCheck),
healthz.NamedCheck("pleg", s.plegHealthCheck),
)
var ws *restful.WebService
ws = new(restful.WebService)
Expand Down Expand Up @@ -415,14 +413,6 @@ func (s *Server) syncLoopHealthCheck(req *http.Request) error {
return nil
}

// Checks if pleg, which lists pods periodically, is healthy.
func (s *Server) plegHealthCheck(req *http.Request) error {
if ok, err := s.host.PLEGHealthCheck(); !ok {
return fmt.Errorf("PLEG took longer than expected: %v", err)
}
return nil
}

// getContainerLogs handles containerLogs request against the Kubelet
func (s *Server) getContainerLogs(request *restful.Request, response *restful.Response) {
podNamespace := request.PathParameter("podNamespace")
Expand Down
14 changes: 0 additions & 14 deletions pkg/kubelet/server/server_test.go
Expand Up @@ -158,8 +158,6 @@ func (fk *fakeKubelet) StreamingConnectionIdleTimeout() time.Duration {
return fk.streamingConnectionIdleTimeoutFunc()
}

func (fk *fakeKubelet) PLEGHealthCheck() (bool, error) { return fk.plegHealth, nil }

// Unused functions
func (_ *fakeKubelet) GetContainerInfoV2(_ string, _ cadvisorapiv2.RequestOptions) (map[string]cadvisorapiv2.ContainerInfo, error) {
return nil, nil
Expand Down Expand Up @@ -868,18 +866,6 @@ func TestSyncLoopCheck(t *testing.T) {
assertHealthFails(t, fw.testHTTPServer.URL+"/healthz", http.StatusInternalServerError)
}

func TestPLEGHealthCheck(t *testing.T) {
fw := newServerTest()
defer fw.testHTTPServer.Close()
fw.fakeKubelet.hostnameFunc = func() string {
return "127.0.0.1"
}

// Test with failed pleg health check.
fw.fakeKubelet.plegHealth = false
assertHealthFails(t, fw.testHTTPServer.URL+"/healthz", http.StatusInternalServerError)
}

// returns http response status code from the HTTP GET
func assertHealthIsOk(t *testing.T, httpURL string) {
resp, err := http.Get(httpURL)
Expand Down