Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor readiness probing #14221

Merged
merged 1 commit into from
Oct 5, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 1 addition & 6 deletions pkg/kubelet/container/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TrimRuntimePrefix(fullString string) string {

// ShouldContainerBeRestarted checks whether a container needs to be restarted.
// TODO(yifan): Think about how to refactor this.
func ShouldContainerBeRestarted(container *api.Container, pod *api.Pod, podStatus *api.PodStatus, readinessManager *ReadinessManager) bool {
func ShouldContainerBeRestarted(container *api.Container, pod *api.Pod, podStatus *api.PodStatus) bool {
podFullName := GetPodFullName(pod)

// Get all dead container status.
Expand All @@ -62,11 +62,6 @@ func ShouldContainerBeRestarted(container *api.Container, pod *api.Pod, podStatu
}
}

// Set dead containers to notReady state.
for _, c := range resultStatus {
readinessManager.RemoveReadiness(TrimRuntimePrefix(c.ContainerID))
}

// Check RestartPolicy for dead container.
if len(resultStatus) > 0 {
if pod.Spec.RestartPolicy == api.RestartPolicyNever {
Expand Down
5 changes: 2 additions & 3 deletions pkg/kubelet/dockertools/fake_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
func NewFakeDockerManager(
client DockerInterface,
recorder record.EventRecorder,
readinessManager *kubecontainer.ReadinessManager,
prober prober.Prober,
containerRefManager *kubecontainer.RefManager,
machineInfo *cadvisorApi.MachineInfo,
podInfraContainerImage string,
Expand All @@ -44,10 +44,9 @@ func NewFakeDockerManager(

fakeOOMAdjuster := oom.NewFakeOOMAdjuster()
fakeProcFs := procfs.NewFakeProcFs()
dm := NewDockerManager(client, recorder, readinessManager, containerRefManager, machineInfo, podInfraContainerImage, qps,
dm := NewDockerManager(client, recorder, prober, containerRefManager, machineInfo, podInfraContainerImage, qps,
burst, containerLogsDir, osInterface, networkPlugin, generator, httpClient, &NativeExecHandler{},
fakeOOMAdjuster, fakeProcFs, false)
dm.dockerPuller = &FakeDockerPuller{}
dm.prober = prober.New(nil, readinessManager, containerRefManager, recorder)
return dm
}
13 changes: 4 additions & 9 deletions pkg/kubelet/dockertools/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ var podInfraContainerImagePullPolicy = api.PullIfNotPresent
type DockerManager struct {
client DockerInterface
recorder record.EventRecorder
readinessManager *kubecontainer.ReadinessManager
containerRefManager *kubecontainer.RefManager
os kubecontainer.OSInterface
machineInfo *cadvisorApi.MachineInfo
Expand Down Expand Up @@ -145,7 +144,7 @@ type DockerManager struct {
func NewDockerManager(
client DockerInterface,
recorder record.EventRecorder,
readinessManager *kubecontainer.ReadinessManager,
prober prober.Prober,
containerRefManager *kubecontainer.RefManager,
machineInfo *cadvisorApi.MachineInfo,
podInfraContainerImage string,
Expand Down Expand Up @@ -195,7 +194,6 @@ func NewDockerManager(
dm := &DockerManager{
client: client,
recorder: recorder,
readinessManager: readinessManager,
containerRefManager: containerRefManager,
os: osInterface,
machineInfo: machineInfo,
Expand All @@ -205,15 +203,14 @@ func NewDockerManager(
dockerRoot: dockerRoot,
containerLogsDir: containerLogsDir,
networkPlugin: networkPlugin,
prober: nil,
prober: prober,
generator: generator,
execHandler: execHandler,
oomAdjuster: oomAdjuster,
procFs: procFs,
cpuCFSQuota: cpuCFSQuota,
}
dm.runner = lifecycle.NewHandlerRunner(httpClient, dm, dm)
dm.prober = prober.New(dm, readinessManager, containerRefManager, recorder)
dm.imagePuller = kubecontainer.NewImagePuller(recorder, dm)

return dm
Expand Down Expand Up @@ -1363,8 +1360,6 @@ func (dm *DockerManager) killContainer(containerID types.UID, container *api.Con
gracePeriod -= int64(unversioned.Now().Sub(start.Time).Seconds())
}

dm.readinessManager.RemoveReadiness(ID)

// always give containers a minimal shutdown window to avoid unnecessary SIGKILLs
if gracePeriod < minimumGracePeriodInSeconds {
gracePeriod = minimumGracePeriodInSeconds
Expand Down Expand Up @@ -1659,7 +1654,7 @@ func (dm *DockerManager) computePodContainerChanges(pod *api.Pod, runningPod kub

c := runningPod.FindContainerByName(container.Name)
if c == nil {
if kubecontainer.ShouldContainerBeRestarted(&container, pod, &podStatus, dm.readinessManager) {
if kubecontainer.ShouldContainerBeRestarted(&container, pod, &podStatus) {
// If we are here it means that the container is dead and should be restarted, or never existed and should
// be created. We may be inserting this ID again if the container has changed and it has
// RestartPolicy::Always, but it's not a big deal.
Expand Down Expand Up @@ -1694,7 +1689,7 @@ func (dm *DockerManager) computePodContainerChanges(pod *api.Pod, runningPod kub
continue
}

result, err := dm.prober.Probe(pod, podStatus, container, string(c.ID), c.Created)
result, err := dm.prober.ProbeLiveness(pod, podStatus, container, string(c.ID), c.Created)
if err != nil {
// TODO(vmarmol): examine this logic.
glog.V(2).Infof("probe no-error: %q", container.Name)
Expand Down
37 changes: 5 additions & 32 deletions pkg/kubelet/dockertools/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/client/record"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/prober"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
uexec "k8s.io/kubernetes/pkg/util/exec"
Expand Down Expand Up @@ -74,14 +75,13 @@ func (*fakeOptionGenerator) GenerateRunContainerOptions(pod *api.Pod, container
func newTestDockerManagerWithHTTPClient(fakeHTTPClient *fakeHTTP) (*DockerManager, *FakeDockerClient) {
fakeDocker := &FakeDockerClient{VersionInfo: docker.Env{"Version=1.1.3", "ApiVersion=1.15"}, Errors: make(map[string]error), RemovedImages: sets.String{}}
fakeRecorder := &record.FakeRecorder{}
readinessManager := kubecontainer.NewReadinessManager()
containerRefManager := kubecontainer.NewRefManager()
networkPlugin, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
optionGenerator := &fakeOptionGenerator{}
dockerManager := NewFakeDockerManager(
fakeDocker,
fakeRecorder,
readinessManager,
prober.FakeProber{},
containerRefManager,
&cadvisorApi.MachineInfo{},
PodInfraContainerImage,
Expand Down Expand Up @@ -398,10 +398,6 @@ func TestKillContainerInPod(t *testing.T) {
containerToKill := &containers[0]
containerToSpare := &containers[1]
fakeDocker.ContainerList = containers
// Set all containers to ready.
for _, c := range fakeDocker.ContainerList {
manager.readinessManager.SetReadiness(c.ID, true)
}

if err := manager.KillContainerInPod("", &pod.Spec.Containers[0], pod); err != nil {
t.Errorf("unexpected error: %v", err)
Expand All @@ -410,13 +406,9 @@ func TestKillContainerInPod(t *testing.T) {
if err := fakeDocker.AssertStopped([]string{containerToKill.ID}); err != nil {
t.Errorf("container was not stopped correctly: %v", err)
}

// Verify that the readiness has been removed for the stopped container.
if ready := manager.readinessManager.GetReadiness(containerToKill.ID); ready {
t.Errorf("exepcted container entry ID '%v' to not be found. states: %+v", containerToKill.ID, ready)
}
if ready := manager.readinessManager.GetReadiness(containerToSpare.ID); !ready {
t.Errorf("exepcted container entry ID '%v' to be found. states: %+v", containerToSpare.ID, ready)
// Assert the container has been spared.
if err := fakeDocker.AssertStopped([]string{containerToSpare.ID}); err == nil {
t.Errorf("container unexpectedly stopped: %v", containerToSpare.ID)
}
}

Expand Down Expand Up @@ -471,10 +463,6 @@ func TestKillContainerInPodWithPreStop(t *testing.T) {
},
},
}
// Set all containers to ready.
for _, c := range fakeDocker.ContainerList {
manager.readinessManager.SetReadiness(c.ID, true)
}

if err := manager.KillContainerInPod("", &pod.Spec.Containers[0], pod); err != nil {
t.Errorf("unexpected error: %v", err)
Expand Down Expand Up @@ -510,27 +498,12 @@ func TestKillContainerInPodWithError(t *testing.T) {
Names: []string{"/k8s_bar_qux_new_1234_42"},
},
}
containerToKill := &containers[0]
containerToSpare := &containers[1]
fakeDocker.ContainerList = containers
fakeDocker.Errors["stop"] = fmt.Errorf("sample error")

// Set all containers to ready.
for _, c := range fakeDocker.ContainerList {
manager.readinessManager.SetReadiness(c.ID, true)
}

if err := manager.KillContainerInPod("", &pod.Spec.Containers[0], pod); err == nil {
t.Errorf("expected error, found nil")
}

// Verify that the readiness has been removed even though the stop failed.
if ready := manager.readinessManager.GetReadiness(containerToKill.ID); ready {
t.Errorf("exepcted container entry ID '%v' to not be found. states: %+v", containerToKill.ID, ready)
}
if ready := manager.readinessManager.GetReadiness(containerToSpare.ID); !ready {
t.Errorf("exepcted container entry ID '%v' to be found. states: %+v", containerToSpare.ID, ready)
}
}

func TestIsAExitError(t *testing.T) {
Expand Down
44 changes: 29 additions & 15 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,13 @@ import (
"k8s.io/kubernetes/pkg/kubelet/envvars"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/prober"
"k8s.io/kubernetes/pkg/kubelet/rkt"
"k8s.io/kubernetes/pkg/kubelet/status"
kubeletTypes "k8s.io/kubernetes/pkg/kubelet/types"
kubeletUtil "k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/probe"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
Expand Down Expand Up @@ -243,7 +245,6 @@ func NewMainKubelet(
return nil, fmt.Errorf("failed to initialize disk manager: %v", err)
}
statusManager := status.NewManager(kubeClient)
readinessManager := kubecontainer.NewReadinessManager()
containerRefManager := kubecontainer.NewRefManager()

volumeManager := newVolumeManager()
Expand All @@ -258,7 +259,6 @@ func NewMainKubelet(
rootDirectory: rootDirectory,
resyncInterval: resyncInterval,
containerRefManager: containerRefManager,
readinessManager: readinessManager,
httpClient: &http.Client{},
sourcesReady: sourcesReady,
registerNode: registerNode,
Expand Down Expand Up @@ -317,7 +317,7 @@ func NewMainKubelet(
klet.containerRuntime = dockertools.NewDockerManager(
dockerClient,
recorder,
readinessManager,
klet, // prober
containerRefManager,
machineInfo,
podInfraContainerImage,
Expand All @@ -343,7 +343,7 @@ func NewMainKubelet(
klet,
recorder,
containerRefManager,
readinessManager,
klet, // prober
klet.volumeManager)
if err != nil {
return nil, err
Expand Down Expand Up @@ -386,6 +386,12 @@ func NewMainKubelet(
klet.runner = klet.containerRuntime
klet.podManager = newBasicPodManager(klet.kubeClient)

klet.prober = prober.New(klet.runner, containerRefManager, recorder)
klet.probeManager = prober.NewManager(
klet.resyncInterval,
klet.statusManager,
klet.prober)

runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
if err != nil {
return nil, err
Expand Down Expand Up @@ -486,8 +492,10 @@ type Kubelet struct {
// Network plugin.
networkPlugin network.NetworkPlugin

// Container readiness state manager.
readinessManager *kubecontainer.ReadinessManager
// Handles container readiness probing
probeManager prober.Manager
// TODO: Move prober ownership to the probeManager once the runtime no longer depends on it.
prober prober.Prober

// How long to keep idle streaming command execution/port forwarding
// connections open before terminating them
Expand Down Expand Up @@ -1665,6 +1673,7 @@ func (kl *Kubelet) HandlePodCleanups() error {
// Stop the workers for no-longer existing pods.
// TODO: is here the best place to forget pod workers?
kl.podWorkers.ForgetNonExistingPodWorkers(desiredPods)
kl.probeManager.CleanupPods(activePods)

runningPods, err := kl.runtimeCache.GetPods()
if err != nil {
Expand Down Expand Up @@ -1993,6 +2002,7 @@ func (kl *Kubelet) HandlePodAdditions(pods []*api.Pod) {
}
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, SyncPodCreate, mirrorPod, start)
kl.probeManager.AddPod(pod)
}
}

Expand Down Expand Up @@ -2024,6 +2034,7 @@ func (kl *Kubelet) HandlePodDeletions(pods []*api.Pod) {
if err := kl.deletePod(pod.UID); err != nil {
glog.V(2).Infof("Failed to delete pod %q, err: %v", kubeletUtil.FormatPodName(pod), err)
}
kl.probeManager.RemovePod(pod)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to add this in HandlePodCleanup() too for the cases where pods were deleted during kubelet restarts. (kubelet doesn't have a checkpoint and it would not recognize that the pods were deleted at all).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
}

Expand Down Expand Up @@ -2613,15 +2624,8 @@ func (kl *Kubelet) generatePodStatus(pod *api.Pod) (api.PodStatus, error) {

// Assume info is ready to process
podStatus.Phase = GetPhase(spec, podStatus.ContainerStatuses)
for _, c := range spec.Containers {
for i, st := range podStatus.ContainerStatuses {
if st.Name == c.Name {
ready := st.State.Running != nil && kl.readinessManager.GetReadiness(kubecontainer.TrimRuntimePrefix(st.ContainerID))
podStatus.ContainerStatuses[i].Ready = ready
break
}
}
}
kl.probeManager.UpdatePodStatus(pod.UID, podStatus)

podStatus.Conditions = append(podStatus.Conditions, getPodReadyCondition(spec, podStatus.ContainerStatuses)...)

if !kl.standaloneMode {
Expand Down Expand Up @@ -2791,6 +2795,16 @@ func (kl *Kubelet) GetRuntime() kubecontainer.Runtime {
return kl.containerRuntime
}

// Proxy prober calls through the Kubelet to break the circular dependency between the runtime &
// prober.
// TODO: Remove this hack once the runtime no longer depends on the prober.
func (kl *Kubelet) ProbeLiveness(pod *api.Pod, status api.PodStatus, container api.Container, containerID string, createdAt int64) (probe.Result, error) {
return kl.prober.ProbeLiveness(pod, status, container, containerID, createdAt)
}
func (kl *Kubelet) ProbeReadiness(pod *api.Pod, status api.PodStatus, container api.Container, containerID string) (probe.Result, error) {
return kl.prober.ProbeReadiness(pod, status, container, containerID)
}

var minRsrc = resource.MustParse("1k")
var maxRsrc = resource.MustParse("1P")

Expand Down
6 changes: 5 additions & 1 deletion pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/container"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/prober"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
Expand Down Expand Up @@ -105,7 +106,6 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.masterServiceNamespace = api.NamespaceDefault
kubelet.serviceLister = testServiceLister{}
kubelet.nodeLister = testNodeLister{}
kubelet.readinessManager = kubecontainer.NewReadinessManager()
kubelet.recorder = fakeRecorder
kubelet.statusManager = status.NewManager(fakeKubeClient)
if err := kubelet.setupDataDirs(); err != nil {
Expand All @@ -130,6 +130,10 @@ func newTestKubelet(t *testing.T) *TestKubelet {
runtimeCache: kubelet.runtimeCache,
t: t,
}

kubelet.prober = prober.FakeProber{}
kubelet.probeManager = prober.FakeManager{}

kubelet.volumeManager = newVolumeManager()
kubelet.containerManager, _ = newContainerManager(fakeContainerMgrMountInt(), mockCadvisor, "", "", "")
kubelet.networkConfigured = true
Expand Down
4 changes: 2 additions & 2 deletions pkg/kubelet/network/cni/cni_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/prober"
"k8s.io/kubernetes/pkg/util/sets"
)

Expand Down Expand Up @@ -143,13 +144,12 @@ func (nh *fakeNetworkHost) GetRuntime() kubecontainer.Runtime {
func newTestDockerManager() (*dockertools.DockerManager, *dockertools.FakeDockerClient) {
fakeDocker := &dockertools.FakeDockerClient{VersionInfo: docker.Env{"Version=1.1.3", "ApiVersion=1.15"}, Errors: make(map[string]error), RemovedImages: sets.String{}}
fakeRecorder := &record.FakeRecorder{}
readinessManager := kubecontainer.NewReadinessManager()
containerRefManager := kubecontainer.NewRefManager()
networkPlugin, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
dockerManager := dockertools.NewFakeDockerManager(
fakeDocker,
fakeRecorder,
readinessManager,
prober.FakeProber{},
containerRefManager,
&cadvisorApi.MachineInfo{},
dockertools.PodInfraContainerImage,
Expand Down