Skip to content

Commit

Permalink
Adapt pod killing and cleanup for generic container runtime
Browse files Browse the repository at this point in the history
This change removes docker-specifc code in killUnwantedPods. It
also instructs the cleanup code to move away from interacting with
containers directly. They should always deal with the pod-level
abstraction if at all possible.
  • Loading branch information
yujuhong committed Apr 29, 2015
1 parent ee27094 commit 4be6521
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 184 deletions.
108 changes: 0 additions & 108 deletions pkg/kubelet/dockertools/docker_test.go
Expand Up @@ -390,114 +390,6 @@ func TestIsImagePresent(t *testing.T) {
}
}

func TestGetRunningContainers(t *testing.T) {
fakeDocker := &FakeDockerClient{Errors: make(map[string]error)}
fakeRecorder := &record.FakeRecorder{}
np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
containerManager := NewDockerManager(fakeDocker, fakeRecorder, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np)
tests := []struct {
containers map[string]*docker.Container
inputIDs []string
expectedIDs []string
err error
}{
{
containers: map[string]*docker.Container{
"foobar": {
ID: "foobar",
State: docker.State{
Running: false,
},
},
"baz": {
ID: "baz",
State: docker.State{
Running: true,
},
},
},
inputIDs: []string{"foobar", "baz"},
expectedIDs: []string{"baz"},
},
{
containers: map[string]*docker.Container{
"foobar": {
ID: "foobar",
State: docker.State{
Running: true,
},
},
"baz": {
ID: "baz",
State: docker.State{
Running: true,
},
},
},
inputIDs: []string{"foobar", "baz"},
expectedIDs: []string{"foobar", "baz"},
},
{
containers: map[string]*docker.Container{
"foobar": {
ID: "foobar",
State: docker.State{
Running: false,
},
},
"baz": {
ID: "baz",
State: docker.State{
Running: false,
},
},
},
inputIDs: []string{"foobar", "baz"},
expectedIDs: []string{},
},
{
containers: map[string]*docker.Container{
"foobar": {
ID: "foobar",
State: docker.State{
Running: false,
},
},
"baz": {
ID: "baz",
State: docker.State{
Running: false,
},
},
},
inputIDs: []string{"foobar", "baz"},
err: fmt.Errorf("test error"),
},
}
for _, test := range tests {
fakeDocker.ContainerMap = test.containers
if test.err != nil {
fakeDocker.Errors["inspect_container"] = test.err
}
if results, err := containerManager.GetRunningContainers(test.inputIDs); err == nil {
resultIDs := []string{}
for _, result := range results {
resultIDs = append(resultIDs, result.ID)
}
if !reflect.DeepEqual(resultIDs, test.expectedIDs) {
t.Errorf("expected: %#v, saw: %#v", test.expectedIDs, resultIDs)
}
if err != nil {
t.Errorf("unexpected error: %v", err)
}
} else {
if err != test.err {
t.Errorf("unexpected error: %v", err)
}
}
}
}

type podsByID []*kubecontainer.Pod

func (b podsByID) Len() int { return len(b) }
Expand Down
17 changes: 0 additions & 17 deletions pkg/kubelet/dockertools/manager.go
Expand Up @@ -449,23 +449,6 @@ func (dm *DockerManager) GetPodInfraContainer(pod kubecontainer.Pod) (kubecontai
return kubecontainer.Container{}, fmt.Errorf("unable to find pod infra container for pod %v", pod.ID)
}

func (dm *DockerManager) GetRunningContainers(ids []string) ([]*docker.Container, error) {
var result []*docker.Container
if dm.client == nil {
return nil, fmt.Errorf("unexpected nil docker client.")
}
for ix := range ids {
status, err := dm.client.InspectContainer(ids[ix])
if err != nil {
return nil, err
}
if status != nil && status.State.Running {
result = append(result, status)
}
}
return result, nil
}

func (dm *DockerManager) runContainerRecordErrorReason(pod *api.Pod, container *api.Container, opts *kubecontainer.RunContainerOptions, ref *api.ObjectReference) (string, error) {
dockerID, err := dm.runContainer(pod, container, opts, ref)
if err != nil {
Expand Down
73 changes: 29 additions & 44 deletions pkg/kubelet/kubelet.go
Expand Up @@ -55,7 +55,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/version"
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
docker "github.com/fsouza/go-dockerclient"
"github.com/golang/glog"
cadvisorApi "github.com/google/cadvisor/info/v1"
)
Expand Down Expand Up @@ -1264,20 +1263,15 @@ func (kl *Kubelet) cleanupOrphanedPodDirs(pods []*api.Pod) error {

// Compares the map of current volumes to the map of desired volumes.
// If an active volume does not have a respective desired volume, clean it up.
func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, running []*docker.Container) error {
func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, runningPods []*kubecontainer.Pod) error {
desiredVolumes := getDesiredVolumes(pods)
currentVolumes := kl.getPodVolumesFromDisk()

runningSet := util.StringSet{}
for ix := range running {
if len(running[ix].Name) == 0 {
glog.V(2).Infof("Found running container ix=%d with info: %+v", ix, running[ix])
}
containerName, _, err := dockertools.ParseDockerName(running[ix].Name)
if err != nil {
continue
}
runningSet.Insert(string(containerName.PodUID))
for _, pod := range runningPods {
runningSet.Insert(string(pod.ID))
}

for name, vol := range currentVolumes {
if _, ok := desiredVolumes[name]; !ok {
parts := strings.Split(name, "/")
Expand Down Expand Up @@ -1387,16 +1381,24 @@ func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]metri
return nil
}

// Kill containers associated with unwanted pods and get a list of
// unwanted containers that are still running.
running, err := kl.killUnwantedPods(desiredPods, runningPods)
// Kill containers associated with unwanted pods.
err = kl.killUnwantedPods(desiredPods, runningPods)
if err != nil {
glog.Errorf("Failed killing unwanted containers: %v", err)
}

// Note that we just killed the unwanted pods. This may not have reflected
// in the cache. We need to bypass the cach to get the latest set of
// running pods to clean up the volumes.
// TODO: Evaluate the performance impact of bypassing the runtime cache.
runningPods, err = kl.containerManager.GetPods(true)
if err != nil {
glog.Errorf("Error listing containers: %#v", err)
return err
}

// Remove any orphaned volumes.
err = kl.cleanupOrphanedVolumes(pods, running)
err = kl.cleanupOrphanedVolumes(pods, runningPods)
if err != nil {
glog.Errorf("Failed cleaning up orphaned volumes: %v", err)
return err
Expand All @@ -1415,15 +1417,10 @@ func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]metri
return err
}

// killUnwantedPods kills the unwanted, running pods in parallel, and returns
// containers in those pods that it failed to terminate.
// killUnwantedPods kills the unwanted, running pods in parallel.
func (kl *Kubelet) killUnwantedPods(desiredPods map[types.UID]empty,
runningPods []*kubecontainer.Pod) ([]*docker.Container, error) {
type result struct {
containers []*docker.Container
err error
}
ch := make(chan result, len(runningPods))
runningPods []*kubecontainer.Pod) error {
ch := make(chan error, len(runningPods))
defer close(ch)
numWorkers := 0
for _, pod := range runningPods {
Expand All @@ -1432,42 +1429,30 @@ func (kl *Kubelet) killUnwantedPods(desiredPods map[types.UID]empty,
continue
}
numWorkers++
go func(pod *kubecontainer.Pod, ch chan result) {
go func(pod *kubecontainer.Pod, ch chan error) {
var err error = nil
defer func() {
// Send the IDs of the containers that we failed to killed.
containers, err := kl.getRunningContainersByPod(pod)
ch <- result{containers: containers, err: err}
ch <- err
}()
glog.V(1).Infof("Killing unwanted pod %q", pod.Name)
// Stop the containers.
err := kl.killPod(*pod)
err = kl.killPod(*pod)
if err != nil {
glog.Errorf("Failed killing the pod %q: %v", pod.Name, err)
return
}
}(pod, ch)
}

// Aggregate results from the pod killing workers.
// Aggregate errors from the pod killing workers.
var errs []error
var running []*docker.Container
for i := 0; i < numWorkers; i++ {
m := <-ch
if m.err != nil {
errs = append(errs, m.err)
continue
err := <-ch
if err != nil {
errs = append(errs, err)
}
running = append(running, m.containers...)
}
return running, utilErrors.NewAggregate(errs)
}

func (kl *Kubelet) getRunningContainersByPod(pod *kubecontainer.Pod) ([]*docker.Container, error) {
containerIDs := make([]string, len(pod.Containers))
for i, c := range pod.Containers {
containerIDs[i] = string(c.ID)
}
return kl.containerManager.GetRunningContainers(containerIDs)
return utilErrors.NewAggregate(errs)
}

type podsByCreationTime []*api.Pod
Expand Down

0 comments on commit 4be6521

Please sign in to comment.