Skip to content

Commit

Permalink
Merge pull request #7449 from vmarmol/runtime-network-plugins
Browse files Browse the repository at this point in the history
Move network plugin TearDown to DockerManager
  • Loading branch information
yujuhong committed Apr 29, 2015
2 parents a529c0e + 787d42d commit 33b8f48
Show file tree
Hide file tree
Showing 11 changed files with 114 additions and 71 deletions.
8 changes: 3 additions & 5 deletions pkg/kubelet/dockertools/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/leaky"
kubeletTypes "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/docker/docker/pkg/parsers"
Expand Down Expand Up @@ -68,9 +69,6 @@ type DockerInterface interface {
StartExec(string, docker.StartExecOptions) error
}

// DockerID is an ID of docker container. It is a type to make it clear when we're working with docker container Ids
type DockerID string

// KubeletContainerName encapsulates a pod name and a Kubernetes container name.
type KubeletContainerName struct {
PodFullName string
Expand Down Expand Up @@ -174,7 +172,7 @@ func (p throttledDockerPuller) IsImagePresent(name string) (bool, error) {
}

// DockerContainers is a map of containers
type DockerContainers map[DockerID]*docker.APIContainers
type DockerContainers map[kubeletTypes.DockerID]*docker.APIContainers

func (c DockerContainers) FindPodContainer(podFullName string, uid types.UID, containerName string) (*docker.APIContainers, bool, uint64) {
for _, dockerContainer := range c {
Expand Down Expand Up @@ -319,7 +317,7 @@ func GetKubeletDockerContainers(client DockerInterface, allContainers bool) (Doc
glog.V(3).Infof("Docker Container: %s is not managed by kubelet.", container.Names[0])
continue
}
result[DockerID(container.ID)] = container
result[kubeletTypes.DockerID(container.ID)] = container
}
return result, nil
}
7 changes: 5 additions & 2 deletions pkg/kubelet/dockertools/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
docker "github.com/fsouza/go-dockerclient"
Expand Down Expand Up @@ -392,7 +393,8 @@ func TestIsImagePresent(t *testing.T) {
func TestGetRunningContainers(t *testing.T) {
fakeDocker := &FakeDockerClient{Errors: make(map[string]error)}
fakeRecorder := &record.FakeRecorder{}
containerManager := NewDockerManager(fakeDocker, fakeRecorder, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{})
np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
containerManager := NewDockerManager(fakeDocker, fakeRecorder, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np)
tests := []struct {
containers map[string]*docker.Container
inputIDs []string
Expand Down Expand Up @@ -657,7 +659,8 @@ func TestFindContainersByPod(t *testing.T) {
},
}
fakeClient := &FakeDockerClient{}
containerManager := NewDockerManager(fakeClient, &record.FakeRecorder{}, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{})
np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
containerManager := NewDockerManager(fakeClient, &record.FakeRecorder{}, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np)
for i, test := range tests {
fakeClient.ContainerList = test.containerList
fakeClient.ExitedContainerList = test.exitedContainerList
Expand Down
32 changes: 25 additions & 7 deletions pkg/kubelet/dockertools/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
kubeletTypes "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/fsouza/go-dockerclient"
Expand Down Expand Up @@ -81,6 +83,9 @@ type DockerManager struct {

// Directory of container logs.
containerLogsDir string

// Network plugin.
networkPlugin network.NetworkPlugin
}

func NewDockerManager(
Expand All @@ -92,7 +97,8 @@ func NewDockerManager(
qps float32,
burst int,
containerLogsDir string,
osInterface kubecontainer.OSInterface) *DockerManager {
osInterface kubecontainer.OSInterface,
networkPlugin network.NetworkPlugin) *DockerManager {
// Work out the location of the Docker runtime, defaulting to /var/lib/docker
// if there are any problems.
dockerRoot := "/var/lib/docker"
Expand Down Expand Up @@ -135,6 +141,7 @@ func NewDockerManager(
Puller: newDockerPuller(client, qps, burst),
dockerRoot: dockerRoot,
containerLogsDir: containerLogsDir,
networkPlugin: networkPlugin,
}
}

Expand Down Expand Up @@ -941,13 +948,24 @@ func (dm *DockerManager) PortForward(pod *kubecontainer.Pod, port uint16, stream

// Kills all containers in the specified pod
func (dm *DockerManager) KillPod(pod kubecontainer.Pod) error {
// Send the kills in parallel since they may take a long time.
errs := make(chan error, len(pod.Containers))
// Send the kills in parallel since they may take a long time. Len + 1 since there
// can be Len errors + the networkPlugin teardown error.
errs := make(chan error, len(pod.Containers)+1)
wg := sync.WaitGroup{}
for _, container := range pod.Containers {
wg.Add(1)
go func(container *kubecontainer.Container) {
defer util.HandleCrash()

// TODO: Handle this without signaling the pod infra container to
// adapt to the generic container runtime.
if container.Name == PodInfraContainerName {
err := dm.networkPlugin.TearDownPod(pod.Namespace, pod.Name, kubeletTypes.DockerID(container.ID))
if err != nil {
glog.Errorf("Failed tearing down the infra container: %v", err)
errs <- err
}
}
err := dm.KillContainer(container.ID)
if err != nil {
glog.Errorf("Failed to delete container: %v; Skipping pod %q", err, pod.ID)
Expand Down Expand Up @@ -988,7 +1006,7 @@ func (dm *DockerManager) KillContainer(containerID types.UID) error {
}

// Run a single container from a pod. Returns the docker container ID
func (dm *DockerManager) RunContainer(pod *api.Pod, container *api.Container, generator kubecontainer.RunContainerOptionsGenerator, runner kubecontainer.HandlerRunner, netMode, ipcMode string) (DockerID, error) {
func (dm *DockerManager) RunContainer(pod *api.Pod, container *api.Container, generator kubecontainer.RunContainerOptionsGenerator, runner kubecontainer.HandlerRunner, netMode, ipcMode string) (kubeletTypes.DockerID, error) {
ref, err := kubecontainer.GenerateContainerRef(pod, container)
if err != nil {
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
Expand All @@ -1013,7 +1031,7 @@ func (dm *DockerManager) RunContainer(pod *api.Pod, container *api.Container, ge
handlerErr := runner.Run(id, pod, container, container.Lifecycle.PostStart)
if handlerErr != nil {
dm.KillContainer(types.UID(id))
return DockerID(""), fmt.Errorf("failed to call event handler: %v", handlerErr)
return kubeletTypes.DockerID(""), fmt.Errorf("failed to call event handler: %v", handlerErr)
}
}

Expand All @@ -1027,11 +1045,11 @@ func (dm *DockerManager) RunContainer(pod *api.Pod, container *api.Container, ge
if err = dm.os.Symlink(containerLogFile, symlinkFile); err != nil {
glog.Errorf("Failed to create symbolic link to the log file of pod %q container %q: %v", podFullName, container.Name, err)
}
return DockerID(id), err
return kubeletTypes.DockerID(id), err
}

// CreatePodInfraContainer starts the pod infra container for a pod. Returns the docker container ID of the newly created container.
func (dm *DockerManager) CreatePodInfraContainer(pod *api.Pod, generator kubecontainer.RunContainerOptionsGenerator, runner kubecontainer.HandlerRunner) (DockerID, error) {
func (dm *DockerManager) CreatePodInfraContainer(pod *api.Pod, generator kubecontainer.RunContainerOptionsGenerator, runner kubecontainer.HandlerRunner) (kubeletTypes.DockerID, error) {
// Use host networking if specified.
netNamespace := ""
var ports []api.ContainerPort
Expand Down
75 changes: 29 additions & 46 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/envvars"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
kubeletTypes "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
Expand Down Expand Up @@ -202,16 +203,6 @@ func NewMainKubelet(
statusManager := newStatusManager(kubeClient)
readinessManager := kubecontainer.NewReadinessManager()
containerRefManager := kubecontainer.NewRefManager()
containerManager := dockertools.NewDockerManager(
dockerClient,
recorder,
readinessManager,
containerRefManager,
podInfraContainerImage,
pullQPS,
pullBurst,
containerLogsDir,
osInterface)

volumeManager := newVolumeManager()

Expand All @@ -223,7 +214,6 @@ func NewMainKubelet(
resyncInterval: resyncInterval,
containerRefManager: containerRefManager,
readinessManager: readinessManager,
runner: containerManager,
httpClient: &http.Client{},
sourcesReady: sourcesReady,
clusterDomain: clusterDomain,
Expand All @@ -240,12 +230,30 @@ func NewMainKubelet(
volumeManager: volumeManager,
cloud: cloud,
nodeRef: nodeRef,
containerManager: containerManager,
nodeStatusUpdateFrequency: nodeStatusUpdateFrequency,
resourceContainer: resourceContainer,
os: osInterface,
}

if plug, err := network.InitNetworkPlugin(networkPlugins, networkPluginName, &networkHost{klet}); err != nil {
return nil, err
} else {
klet.networkPlugin = plug
}
containerManager := dockertools.NewDockerManager(
dockerClient,
recorder,
readinessManager,
containerRefManager,
podInfraContainerImage,
pullQPS,
pullBurst,
containerLogsDir,
osInterface,
klet.networkPlugin)
klet.runner = containerManager
klet.containerManager = containerManager

klet.podManager = newBasicPodManager(klet.kubeClient)
klet.prober = newProber(klet.runner, klet.readinessManager, klet.containerRefManager, klet.recorder)
klet.handlerRunner = newHandlerRunner(klet.httpClient, klet.runner, klet.containerManager)
Expand All @@ -266,11 +274,6 @@ func NewMainKubelet(
return nil, err
}

if plug, err := network.InitNetworkPlugin(networkPlugins, networkPluginName, &networkHost{klet}); err != nil {
return nil, err
} else {
klet.networkPlugin = plug
}
// If the container logs directory does not exist, create it.
if _, err := os.Stat(containerLogsDir); err != nil {
if err := osInterface.Mkdir(containerLogsDir, 0755); err != nil {
Expand Down Expand Up @@ -887,27 +890,7 @@ func (kl *Kubelet) pullImage(pod *api.Pod, container *api.Container) error {

// Kill all running containers in a pod (includes the pod infra container).
func (kl *Kubelet) killPod(pod kubecontainer.Pod) error {
// TODO(vmarmol): Consider handling non-Docker runtimes, the plugins are not friendly to it today.
container, err := kl.containerManager.GetPodInfraContainer(pod)
errList := []error{}
if err == nil {
// Call the networking plugin for teardown.
err = kl.networkPlugin.TearDownPod(pod.Namespace, pod.Name, dockertools.DockerID(container.ID))
if err != nil {
glog.Errorf("Failed tearing down the network plugin for pod %q: %v", pod.ID, err)
errList = append(errList, err)
}
}

err = kl.containerManager.KillPod(pod)
if err != nil {
errList = append(errList, err)
}

if len(errList) > 0 {
return utilErrors.NewAggregate(errList)
}
return nil
return kl.containerManager.KillPod(pod)
}

type empty struct{}
Expand Down Expand Up @@ -973,9 +956,9 @@ func shouldContainerBeRestarted(container *api.Container, pod *api.Pod, podStatu
// - all running containers which are NOT contained in containersToKeep should be killed.
type podContainerChangesSpec struct {
startInfraContainer bool
infraContainerId dockertools.DockerID
infraContainerId kubeletTypes.DockerID
containersToStart map[int]empty
containersToKeep map[dockertools.DockerID]int
containersToKeep map[kubeletTypes.DockerID]int
}

func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus) (podContainerChangesSpec, error) {
Expand All @@ -984,11 +967,11 @@ func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, runningPod kubeconta
glog.V(4).Infof("Syncing Pod %+v, podFullName: %q, uid: %q", pod, podFullName, uid)

containersToStart := make(map[int]empty)
containersToKeep := make(map[dockertools.DockerID]int)
containersToKeep := make(map[kubeletTypes.DockerID]int)
createPodInfraContainer := false

var err error
var podInfraContainerID dockertools.DockerID
var podInfraContainerID kubeletTypes.DockerID
var changed bool
podInfraContainer := runningPod.FindContainerByName(dockertools.PodInfraContainerName)
if podInfraContainer != nil {
Expand All @@ -1007,7 +990,7 @@ func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, runningPod kubeconta
} else {
glog.V(4).Infof("Pod infra container looks good, keep it %q", podFullName)
createPodInfraContainer = false
podInfraContainerID = dockertools.DockerID(podInfraContainer.ID)
podInfraContainerID = kubeletTypes.DockerID(podInfraContainer.ID)
containersToKeep[podInfraContainerID] = -1
}

Expand All @@ -1026,7 +1009,7 @@ func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, runningPod kubeconta
continue
}

containerID := dockertools.DockerID(c.ID)
containerID := kubeletTypes.DockerID(c.ID)
hash := c.Hash
glog.V(3).Infof("pod %q container %q exists as %v", podFullName, container.Name, containerID)

Expand Down Expand Up @@ -1074,7 +1057,7 @@ func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, runningPod kubeconta

// If Infra container is the last running one, we don't want to keep it.
if !createPodInfraContainer && len(containersToStart) == 0 && len(containersToKeep) == 1 {
containersToKeep = make(map[dockertools.DockerID]int)
containersToKeep = make(map[kubeletTypes.DockerID]int)
}

return podContainerChangesSpec{
Expand Down Expand Up @@ -1147,7 +1130,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
} else {
// Otherwise kill any containers in this pod which are not specified as ones to keep.
for _, container := range runningPod.Containers {
_, keep := containerChanges.containersToKeep[dockertools.DockerID(container.ID)]
_, keep := containerChanges.containersToKeep[kubeletTypes.DockerID(container.ID)]
if !keep {
glog.V(3).Infof("Killing unwanted container %+v", container)
err = kl.containerManager.KillContainer(container.ID)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
podManager, fakeMirrorClient := newFakePodManager()
kubelet.podManager = podManager
kubelet.containerRefManager = kubecontainer.NewRefManager()
kubelet.containerManager = dockertools.NewDockerManager(fakeDocker, fakeRecorder, kubelet.readinessManager, kubelet.containerRefManager, dockertools.PodInfraContainerImage, 0, 0, "", kubelet.os)
kubelet.containerManager = dockertools.NewDockerManager(fakeDocker, fakeRecorder, kubelet.readinessManager, kubelet.containerRefManager, dockertools.PodInfraContainerImage, 0, 0, "", kubelet.os, kubelet.networkPlugin)
kubelet.runtimeCache = kubecontainer.NewFakeRuntimeCache(kubelet.containerManager)
kubelet.podWorkers = newPodWorkers(
kubelet.runtimeCache,
Expand Down
6 changes: 3 additions & 3 deletions pkg/kubelet/network/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ import (
"path"
"strings"

"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
kubeletTypes "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/types"
utilexec "github.com/GoogleCloudPlatform/kubernetes/pkg/util/exec"
"github.com/golang/glog"
)
Expand Down Expand Up @@ -125,13 +125,13 @@ func (plugin *execNetworkPlugin) validate() error {
return nil
}

func (plugin *execNetworkPlugin) SetUpPod(namespace string, name string, id dockertools.DockerID) error {
func (plugin *execNetworkPlugin) SetUpPod(namespace string, name string, id kubeletTypes.DockerID) error {
out, err := utilexec.New().Command(plugin.getExecutable(), setUpCmd, namespace, name, string(id)).CombinedOutput()
glog.V(5).Infof("SetUpPod 'exec' network plugin output: %s, %v", string(out), err)
return err
}

func (plugin *execNetworkPlugin) TearDownPod(namespace string, name string, id dockertools.DockerID) error {
func (plugin *execNetworkPlugin) TearDownPod(namespace string, name string, id kubeletTypes.DockerID) error {
out, err := utilexec.New().Command(plugin.getExecutable(), tearDownCmd, namespace, name, string(id)).CombinedOutput()
glog.V(5).Infof("TearDownPod 'exec' network plugin output: %s, %v", string(out), err)
return err
Expand Down
10 changes: 5 additions & 5 deletions pkg/kubelet/network/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
kubeletTypes "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/errors"
"github.com/golang/glog"
Expand All @@ -43,10 +43,10 @@ type NetworkPlugin interface {
// SetUpPod is the method called after the infra container of
// the pod has been created but before the other containers of the
// pod are launched.
SetUpPod(namespace string, name string, podInfraContainerID dockertools.DockerID) error
SetUpPod(namespace string, name string, podInfraContainerID kubeletTypes.DockerID) error

// TearDownPod is the method called before a pod's infra container will be deleted
TearDownPod(namespace string, name string, podInfraContainerID dockertools.DockerID) error
TearDownPod(namespace string, name string, podInfraContainerID kubeletTypes.DockerID) error
}

// Host is an interface that plugins can use to access the kubelet.
Expand Down Expand Up @@ -113,10 +113,10 @@ func (plugin *noopNetworkPlugin) Name() string {
return DefaultPluginName
}

func (plugin *noopNetworkPlugin) SetUpPod(namespace string, name string, id dockertools.DockerID) error {
func (plugin *noopNetworkPlugin) SetUpPod(namespace string, name string, id kubeletTypes.DockerID) error {
return nil
}

func (plugin *noopNetworkPlugin) TearDownPod(namespace string, name string, id dockertools.DockerID) error {
func (plugin *noopNetworkPlugin) TearDownPod(namespace string, name string, id kubeletTypes.DockerID) error {
return nil
}
Loading

0 comments on commit 33b8f48

Please sign in to comment.