Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor kubelet syncPod method and fix behavior of Infra Containers for RestartPolicy::Never. #5022

Merged
merged 1 commit into from
Mar 13, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
237 changes: 157 additions & 80 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -1183,122 +1183,199 @@ func (kl *Kubelet) pullImageAndRunContainer(pod *api.BoundPod, container *api.Co
return containerID, nil
}

func (kl *Kubelet) syncPod(pod *api.BoundPod, containersInPod dockertools.DockerContainers) error {
// Structure keeping information on changes that need to happen for a pod. The semantics is as follows:
// - startInfraContainer is true if new Infra Containers have to be started and old one (if running) killed.
// Additionally if it is true then containersToKeep have to be empty
// - infraContainerId have to be set iff startInfraContainer is false. It stores dockerID of running Infra Container
// - containersToStart keeps indices of Specs of containers that have to be started.
// - containersToKeep stores mapping from dockerIDs of running containers to indices of their Specs for containers that
// should be kept running. If startInfraContainer is false then it contains an entry for infraContainerId (mapped to -1).
// It shouldn't be the case where containersToStart is empty and containersToKeep contains only infraContainerId. In such case
// Infra Container should be killed, hence it's removed from this map.
// - all running containers which are NOT contained in containersToKeep should be killed.
type podContainerChangesSpec struct {
startInfraContainer bool
infraContainerId dockertools.DockerID
containersToStart map[int]empty
containersToKeep map[dockertools.DockerID]int
}

func (kl *Kubelet) computePodContainerChanges(pod *api.BoundPod, containersInPod dockertools.DockerContainers) (podContainerChangesSpec, error) {
podFullName := GetPodFullName(pod)
uid := pod.UID
glog.V(4).Infof("Syncing Pod, podFullName: %q, uid: %q", podFullName, uid)
glog.V(4).Infof("Syncing Pod %+v, podFullName: %q, uid: %q", pod, podFullName, uid)

err := kl.makePodDataDirs(pod)
if err != nil {
return err
return podContainerChangesSpec{}, err
}

containersToStart := make(map[int]empty)
containersToKeep := make(map[dockertools.DockerID]int)
createPodInfraContainer := false
var podStatus api.PodStatus
podInfraContainerID, found := kl.getPodInfraContainer(podFullName, uid, containersInPod)
if !found {
glog.V(2).Infof("Pod infra container doesn't exist for pod %q, killing and re-creating the pod", podFullName)
var count int
count, err = kl.killContainersInPod(pod, containersInPod)
if err != nil {
return err
}
podInfraContainerID, err = kl.createPodInfraContainer(pod)
if err != nil {
glog.Errorf("Failed to introspect pod infra container: %v; Skipping pod %q", err, podFullName)
return err
}
if count > 0 {
// Re-list everything, otherwise we'll think we're ok.
containersInPod, err = dockertools.GetKubeletDockerContainers(kl.dockerClient, false)
if err != nil {
glog.Errorf("Error listing containers %#v", containersInPod)
return err
}
}
if found {
glog.V(4).Infof("Found infra pod for %q", podFullName)
containersToKeep[podInfraContainerID] = -1
podStatus, err = kl.GetPodStatus(podFullName, uid)
if err != nil {
glog.Errorf("Unable to get pod with name %q and uid %q info with error(%v)", podFullName, uid, err)
}
} else {
podStatus, err = kl.GetPodStatus(podFullName, uid)
if err != nil {
glog.Errorf("Unable to get pod with name %q and uid %q info with error(%v)", podFullName, uid, err)
}
glog.V(2).Infof("No Infra Container for %q found. All containers will be restarted.", podFullName)
createPodInfraContainer = true
}
containersInPod.RemoveContainerWithID(podInfraContainerID)

ref, err := api.GetReference(pod)
if err != nil {
glog.Errorf("Couldn't make a ref to pod %q: '%v'", podFullName, err)
}

podVolumes, err := kl.mountExternalVolumes(pod)
if err != nil {
if ref != nil {
kl.recorder.Eventf(ref, "failedMount",
"Unable to mount volumes for pod %q: %v", podFullName, err)
}
glog.Errorf("Unable to mount volumes for pod %q: %v; skipping pod", podFullName, err)
return err
}

for _, container := range pod.Spec.Containers {
for index, container := range pod.Spec.Containers {
expectedHash := dockertools.HashContainer(&container)
dockerContainerName := dockertools.BuildDockerName(uid, podFullName, &container)
if dockerContainer, found, hash := containersInPod.FindPodContainer(podFullName, uid, container.Name); found {
containerID := dockertools.DockerID(dockerContainer.ID)
glog.V(3).Infof("pod %q container %q exists as %v", podFullName, container.Name, containerID)

// look for changes in the container.
containerChanged := hash != 0 && hash != expectedHash
if !containerChanged {
result, err := kl.probeContainer(pod, podStatus, container, dockerContainer)
if err != nil {
containersInPod.RemoveContainerWithID(containerID)
continue
}
if result == probe.Success {
containersInPod.RemoveContainerWithID(containerID)
continue
if !createPodInfraContainer {
// look for changes in the container.
containerChanged := hash != 0 && hash != expectedHash
if !containerChanged {
result, err := kl.probeContainer(pod, podStatus, container, dockerContainer)
if err != nil {
// TODO(vmarmol): examine this logic.
glog.Infof("probe no-error: %s", container.Name)
containersToKeep[containerID] = index
continue
}
if result == probe.Success {
glog.Infof("probe success: %s", container.Name)
containersToKeep[containerID] = index
continue
}
glog.Infof("pod %q container %q is unhealthy (probe result: %v). Container will be killed and re-created.", podFullName, container.Name, result)
containersToStart[index] = empty{}
} else {
glog.Infof("pod %q container %q hash changed (%d vs %d). Pod will be killed and re-created.", podFullName, container.Name, hash, expectedHash)
createPodInfraContainer = true
delete(containersToKeep, podInfraContainerID)
// If we are to restart Infra Container then we move containersToKeep into containersToStart
// if RestartPolicy allows restarting failed containers.
if pod.Spec.RestartPolicy.Never == nil {
for _, v := range containersToKeep {
containersToStart[v] = empty{}
}
}
containersToStart[index] = empty{}
containersToKeep = make(map[dockertools.DockerID]int)
}
glog.Infof("pod %q container %q is unhealthy (probe result: %v). Container will be killed and re-created.", podFullName, container.Name, result)
} else {
glog.Infof("pod %q container %q hash changed (%d vs %d). Container will be killed and re-created.", podFullName, container.Name, hash, expectedHash)
// Also kill associated pod infra container if the container changed.
if err := kl.killContainerByID(string(podInfraContainerID)); err != nil {
glog.V(1).Infof("Failed to kill pod infra container %q: %v", podInfraContainerID, err)
continue
} else { // createPodInfraContainer == true and Container exists
// If we're creating infra containere everything will be killed anyway
// If RestartPolicy is Always or OnFailure we restart containers that were running before we
// killed them when restarting Infra Container.
if pod.Spec.RestartPolicy.Never == nil {
glog.V(1).Infof("Infra Container is being recreated. %q will be restarted.", container.Name)
containersToStart[index] = empty{}
}
containersInPod.RemoveContainerWithID(containerID)
}
containersInPod.RemoveContainerWithID(containerID)
if err := kl.killContainer(dockerContainer); err != nil {
glog.V(1).Infof("Failed to kill container %q: %v", dockerContainer.ID, err)
continue
}
} else {
if kl.shouldContainerBeRestarted(&container, pod) {
// If we are here it means that the container is dead and sould be restarted, or never existed and should
// be created. We may be inserting this ID again if the container has changed and it has
// RestartPolicy::Always, but it's not a big deal.
glog.V(3).Infof("Container %+v is dead, but RestartPolicy says that we should restart it.", container)
containersToStart[index] = empty{}
}
}
}

if !kl.shouldContainerBeRestarted(&container, pod) {
continue
}
// After the loop one of the following should be true:
// - createPodInfraContainer is true and containersToKeep is empty
// - createPodInfraContainer is false and containersToKeep contains at least ID of Infra Container

glog.V(3).Infof("Container with name %s doesn't exist, creating", dockerContainerName)
// If Infra container is the last running one, we don't want to keep it.
if !createPodInfraContainer && len(containersToStart) == 0 && len(containersToKeep) == 1 {
containersToKeep = make(map[dockertools.DockerID]int)
}

containerID, err := kl.pullImageAndRunContainer(pod, &container, &podVolumes, podInfraContainerID)
if err == nil {
containersInPod.RemoveContainerWithID(containerID)
return podContainerChangesSpec{
startInfraContainer: createPodInfraContainer,
infraContainerId: podInfraContainerID,
containersToStart: containersToStart,
containersToKeep: containersToKeep,
}, nil
}

func (kl *Kubelet) syncPod(pod *api.BoundPod, containersInPod dockertools.DockerContainers) error {
podFullName := GetPodFullName(pod)
uid := pod.UID
containerChanges, err := kl.computePodContainerChanges(pod, containersInPod)
glog.V(3).Infof("Got container changes for pod %q: %+v", podFullName, containerChanges)
if err != nil {
return err
}

if containerChanges.startInfraContainer || (len(containerChanges.containersToKeep) == 0 && len(containerChanges.containersToStart) == 0) {
if len(containerChanges.containersToKeep) == 0 && len(containerChanges.containersToStart) == 0 {
glog.V(4).Infof("Killing Infra Container for %q becase all other containers are dead.", podFullName)
} else {
glog.V(4).Infof("Killing Infra Container for %q, will start new one", podFullName)
}
// Killing phase: if we want to start new infra container, or nothing is running kill everything (including infra container)
if podInfraContainer, found, _ := containersInPod.FindPodContainer(podFullName, uid, dockertools.PodInfraContainerName); found {
if err := kl.killContainer(podInfraContainer); err != nil {
glog.Warningf("Failed to kill pod infra container %q: %v", podInfraContainer.ID, err)
}
}
_, err = kl.killContainersInPod(pod, containersInPod)
if err != nil {
return err
}
} else {
// Otherwise kill any containers in this pod which are not specified as ones to keep.
for id, container := range containersInPod {
_, keep := containerChanges.containersToKeep[id]
if !keep {
glog.V(3).Infof("Killing unwanted container %+v", container)
err = kl.killContainer(container)
if err != nil {
glog.Errorf("Error killing container: %v", err)
}
}
}
}

// Kill any remaining containers in this pod which were not identified above (guards against duplicates).
for _, container := range containersInPod {
glog.V(1).Infof("Killing unwanted container in pod %q: %+v", pod.UID, container)
err = kl.killContainer(container)
// Starting phase: if we should create infra container then we do it first
var ref *api.ObjectReference
var podVolumes volumeMap
podInfraContainerID := containerChanges.infraContainerId
if containerChanges.startInfraContainer && (len(containerChanges.containersToStart) > 0) {
ref, err = api.GetReference(pod)
if err != nil {
glog.Errorf("Couldn't make a ref to pod %q: '%v'", podFullName, err)
}
glog.Infof("Creating pod infra container for %q", podFullName)
podInfraContainerID, err = kl.createPodInfraContainer(pod)
if err != nil {
glog.Errorf("Error killing container: %v", err)
glog.Errorf("Failed to create pod infra container: %v; Skipping pod %q", err, podFullName)
return err
}
}

// Mount volumes
podVolumes, err = kl.mountExternalVolumes(pod)
if err != nil {
if ref != nil {
kl.recorder.Eventf(ref, "failedMount",
"Unable to mount volumes for pod %q: %v", podFullName, err)
}
glog.Errorf("Unable to mount volumes for pod %q: %v; skipping pod", podFullName, err)
return err
}

// Start everything
for container := range containerChanges.containersToStart {
glog.V(4).Infof("Creating container %+v", pod.Spec.Containers[container])
kl.pullImageAndRunContainer(pod, &pod.Spec.Containers[container], &podVolumes, podInfraContainerID)
}

return nil
}

Expand Down
23 changes: 16 additions & 7 deletions pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ func TestSyncPodsWithTerminationLog(t *testing.T) {
}
waitGroup.Wait()
verifyCalls(t, fakeDocker, []string{
"list", "list", "create", "start", "inspect_container", "list", "inspect_container", "inspect_image", "list", "create", "start"})
"list", "list", "list", "create", "start", "inspect_container", "create", "start"})

fakeDocker.Lock()
parts := strings.Split(fakeDocker.Container.HostConfig.Binds[0], ":")
Expand Down Expand Up @@ -506,7 +506,7 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
waitGroup.Wait()

verifyCalls(t, fakeDocker, []string{
"list", "list", "create", "start", "inspect_container", "list", "inspect_container", "inspect_image", "list", "create", "start"})
"list", "list", "list", "create", "start", "inspect_container", "create", "start"})

fakeDocker.Lock()

Expand Down Expand Up @@ -556,7 +556,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
waitGroup.Wait()

verifyCalls(t, fakeDocker, []string{
"list", "list", "create", "start", "inspect_container", "list", "inspect_container", "inspect_image", "list", "create", "start"})
"list", "list", "list", "create", "start", "inspect_container", "create", "start"})

fakeDocker.Lock()

Expand Down Expand Up @@ -701,7 +701,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
waitGroup.Wait()

verifyCalls(t, fakeDocker, []string{
"list", "list", "stop", "create", "start", "inspect_container", "list", "list", "inspect_container", "inspect_image", "list", "create", "start"})
"list", "list", "stop", "create", "start", "inspect_container", "create", "start"})

// A map iteration is used to delete containers, so must not depend on
// order here.
Expand Down Expand Up @@ -873,7 +873,7 @@ func TestSyncPodBadHash(t *testing.T) {
}

//verifyCalls(t, fakeDocker, []string{"list", "stop", "list", "create", "start", "stop", "create", "start", "inspect_container"})
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "list", "create", "start"})
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "create", "start", "inspect_container", "create", "start"})

// A map interation is used to delete containers, so must not depend on
// order here.
Expand Down Expand Up @@ -924,7 +924,7 @@ func TestSyncPodUnhealthy(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}

verifyCalls(t, fakeDocker, []string{"list", "stop", "list", "create", "start"})
verifyCalls(t, fakeDocker, []string{"list", "stop", "create", "start"})

// A map interation is used to delete containers, so must not depend on
// order here.
Expand Down Expand Up @@ -1987,7 +1987,16 @@ func TestSyncPodsWithPullPolicy(t *testing.T) {

fakeDocker.Lock()

if !reflect.DeepEqual(puller.ImagesPulled, []string{"custom_image_name", "pull_always_image", "pull_if_not_present_image"}) {
pulledImageSet := make(map[string]empty)
for v := range puller.ImagesPulled {
pulledImageSet[puller.ImagesPulled[v]] = empty{}
}

if !reflect.DeepEqual(pulledImageSet, map[string]empty{
"custom_image_name": {},
"pull_always_image": {},
"pull_if_not_present_image": {},
}) {
t.Errorf("Unexpected pulled containers: %v", puller.ImagesPulled)
}

Expand Down