Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get recent #6560

Merged
merged 3 commits into from
Apr 8, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
72 changes: 54 additions & 18 deletions pkg/kubelet/dockertools/fake_docker_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,20 @@ import (
// FakeDockerClient is a simple fake docker client, so that kubelet can be run for testing without requiring a real docker setup.
type FakeDockerClient struct {
sync.Mutex
ContainerList []docker.APIContainers
Container *docker.Container
ContainerMap map[string]*docker.Container
Image *docker.Image
Images []docker.APIImages
Err error
called []string
Stopped []string
pulled []string
Created []string
Removed []string
RemovedImages util.StringSet
VersionInfo docker.Env
ContainerList []docker.APIContainers
ExitedContainerList []docker.APIContainers
Container *docker.Container
ContainerMap map[string]*docker.Container
Image *docker.Image
Images []docker.APIImages
Err error
called []string
Stopped []string
pulled []string
Created []string
Removed []string
RemovedImages util.StringSet
VersionInfo docker.Env
}

func (f *FakeDockerClient) ClearCalls() {
Expand All @@ -67,17 +68,48 @@ func (f *FakeDockerClient) AssertCalls(calls []string) (err error) {
return
}

func (f *FakeDockerClient) AssertCreated(created []string) error {
f.Lock()
defer f.Unlock()

actualCreated := []string{}
for _, c := range f.Created {
dockerName, _, err := ParseDockerName(c)
if err != nil {
return fmt.Errorf("unexpected error: %v", err)
}
actualCreated = append(actualCreated, dockerName.ContainerName)
}
sort.StringSlice(created).Sort()
sort.StringSlice(actualCreated).Sort()
if !reflect.DeepEqual(created, actualCreated) {
return fmt.Errorf("expected %#v, got %#v", created, actualCreated)
}
return nil
}

func (f *FakeDockerClient) AssertStopped(stopped []string) error {
f.Lock()
defer f.Unlock()
sort.StringSlice(stopped).Sort()
sort.StringSlice(f.Stopped).Sort()
if !reflect.DeepEqual(stopped, f.Stopped) {
return fmt.Errorf("expected %#v, got %#v", stopped, f.Stopped)
}
return nil
}

func (f *FakeDockerClient) AssertUnorderedCalls(calls []string) (err error) {
f.Lock()
defer f.Unlock()

actual := make([]string, len(calls))
expected := make([]string, len(f.called))
copy(actual, calls)
copy(expected, f.called)
expected := make([]string, len(calls))
actual := make([]string, len(f.called))
copy(expected, calls)
copy(actual, f.called)

sort.StringSlice(actual).Sort()
sort.StringSlice(expected).Sort()
sort.StringSlice(actual).Sort()

if !reflect.DeepEqual(actual, expected) {
err = fmt.Errorf("expected(sorted) %#v, got(sorted) %#v", expected, actual)
Expand All @@ -91,6 +123,10 @@ func (f *FakeDockerClient) ListContainers(options docker.ListContainersOptions)
f.Lock()
defer f.Unlock()
f.called = append(f.called, "list")

if options.All {
return append(f.ContainerList, f.ExitedContainerList...), f.Err
}
return f.ContainerList, f.Err
}

Expand Down
35 changes: 0 additions & 35 deletions pkg/kubelet/dockertools/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/fsouza/go-dockerclient"
"github.com/golang/glog"
Expand All @@ -51,40 +50,6 @@ func NewDockerManager(client DockerInterface, recorder record.EventRecorder) *Do
return &DockerManager{client: client, recorder: recorder}
}

// GetRecentDockerContainersWithNameAndUUID returns a list of dead docker containers which matches the name
// and uid given.
func (self *DockerManager) GetRecentDockerContainersWithNameAndUUID(podFullName string, uid types.UID,
containerName string) ([]*docker.Container, error) {
var result []*docker.Container
containers, err := self.client.ListContainers(docker.ListContainersOptions{All: true})
if err != nil {
return nil, err
}
for _, dockerContainer := range containers {
if len(dockerContainer.Names) == 0 {
continue
}
dockerName, _, err := ParseDockerName(dockerContainer.Names[0])
if err != nil {
continue
}
if dockerName.PodFullName != podFullName {
continue
}
if uid != "" && dockerName.PodUID != uid {
continue
}
if dockerName.ContainerName != containerName {
continue
}
inspectResult, _ := self.client.InspectContainer(dockerContainer.ID)
if inspectResult != nil && !inspectResult.State.Running && !inspectResult.State.Paused {
result = append(result, inspectResult)
}
}
return result, nil
}

// GetKubeletDockerContainerLogs returns logs of a specific container. By
// default, it returns a snapshot of the container log. Set |follow| to true to
// stream the log. Set |follow| to false and specify the number of lines (e.g.
Expand Down
47 changes: 28 additions & 19 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,28 +999,33 @@ func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error {
return nil
}

func (kl *Kubelet) shouldContainerBeRestarted(container *api.Container, pod *api.Pod) bool {
func (kl *Kubelet) shouldContainerBeRestarted(container *api.Container, pod *api.Pod, podStatus *api.PodStatus) bool {
podFullName := kubecontainer.GetPodFullName(pod)
// Check RestartPolicy for dead container
recentContainers, err := kl.containerManager.GetRecentDockerContainersWithNameAndUUID(podFullName, pod.UID, container.Name)
if err != nil {
glog.Errorf("Error listing recent containers for pod %q: %v", podFullName, err)
// TODO(dawnchen): error handling here?

// Get all dead container status.
var resultStatus []*api.ContainerStatus
for i, containerStatus := range podStatus.ContainerStatuses {
if containerStatus.Name == container.Name && containerStatus.State.Termination != nil {
resultStatus = append(resultStatus, &podStatus.ContainerStatuses[i])
}
}
// set dead containers to unready state
for _, c := range recentContainers {
kl.readinessManager.RemoveReadiness(c.ID)

// Set dead containers to unready state.
for _, c := range resultStatus {
// TODO(yifan): Unify the format of container ID. (i.e. including docker:// as prefix).
kl.readinessManager.RemoveReadiness(strings.TrimPrefix(c.ContainerID, dockertools.DockerPrefix))
}

if len(recentContainers) > 0 {
// Check RestartPolicy for dead container.
if len(resultStatus) > 0 {
if pod.Spec.RestartPolicy == api.RestartPolicyNever {
glog.Infof("Already ran container %q of pod %q, do nothing", container.Name, podFullName)
return false

}
if pod.Spec.RestartPolicy == api.RestartPolicyOnFailure {
// Check the exit code of last run
if recentContainers[0].State.ExitCode == 0 {
// Check the exit code of last run. Note: This assumes the result is sorted
// by the created time in reverse order.
if resultStatus[0].State.Termination.ExitCode == 0 {
glog.Infof("Already successfully ran container %q of pod %q, do nothing", container.Name, podFullName)
return false
}
Expand Down Expand Up @@ -1106,23 +1111,27 @@ func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, runningPod kubeconta
containersToStart := make(map[int]empty)
containersToKeep := make(map[dockertools.DockerID]int)
createPodInfraContainer := false
var podStatus api.PodStatus

var podInfraContainerID dockertools.DockerID
podInfraContainer := runningPod.FindContainerByName(dockertools.PodInfraContainerName)
if podInfraContainer != nil {
glog.V(4).Infof("Found infra pod for %q", podFullName)
podInfraContainerID = dockertools.DockerID(podInfraContainer.ID)
containersToKeep[podInfraContainerID] = -1
podStatus, err = kl.GetPodStatus(podFullName)
if err != nil {
glog.Errorf("Unable to get pod with name %q and uid %q info with error(%v)", podFullName, uid, err)
}

} else {
glog.V(2).Infof("No Infra Container for %q found. All containers will be restarted.", podFullName)
createPodInfraContainer = true
}

// Do not use the cache here since we need the newest status to check
// if we need to restart the container below.
podStatus, err := kl.generatePodStatus(podFullName)
if err != nil {
glog.Errorf("Unable to get pod with name %q and uid %q info with error(%v)", podFullName, uid, err)
return podContainerChangesSpec{}, err
}

for index, container := range pod.Spec.Containers {
expectedHash := dockertools.HashContainer(&container)

Expand Down Expand Up @@ -1176,7 +1185,7 @@ func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, runningPod kubeconta
continue
}
} else {
if kl.shouldContainerBeRestarted(&container, pod) {
if kl.shouldContainerBeRestarted(&container, pod, &podStatus) {
// If we are here it means that the container is dead and sould be restarted, or never existed and should
// be created. We may be inserting this ID again if the container has changed and it has
// RestartPolicy::Always, but it's not a big deal.
Expand Down