Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CRI] enable kubenet traffic shaping #41588

Merged
merged 2 commits into from
Feb 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/kubelet/dockershim/docker_sandbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (ds *dockerService) RunPodSandbox(config *runtimeapi.PodSandboxConfig) (str
// on the host as well, to satisfy parts of the pod spec that aren't
// recognized by the CNI standard yet.
cID := kubecontainer.BuildContainerID(runtimeName, createResp.ID)
err = ds.network.SetUpPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID)
err = ds.network.SetUpPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID, config.Annotations)
// TODO: Do we need to teardown on failure or can we rely on a StopPodSandbox call with the given ID?
return createResp.ID, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/dockertools/docker_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2249,7 +2249,7 @@ func (dm *DockerManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecon
setupNetworkResult := kubecontainer.NewSyncResult(kubecontainer.SetupNetwork, kubecontainer.GetPodFullName(pod))
result.AddSyncResult(setupNetworkResult)
if !kubecontainer.IsHostNetworkPod(pod) {
if err := dm.network.SetUpPod(pod.Namespace, pod.Name, podInfraContainerID.ContainerID()); err != nil {
if err := dm.network.SetUpPod(pod.Namespace, pod.Name, podInfraContainerID.ContainerID(), pod.Annotations); err != nil {
setupNetworkResult.Fail(kubecontainer.ErrSetupNetwork, err.Error())
glog.Error(err)

Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/network/cni/cni.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (plugin *cniNetworkPlugin) Name() string {
return CNIPluginName
}

func (plugin *cniNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID) error {
func (plugin *cniNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID, annotations map[string]string) error {
if err := plugin.checkInitialized(); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/network/cni/cni_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func TestCNIPlugin(t *testing.T) {
}

// Set up the pod
err = plug.SetUpPod("podNamespace", "podName", containerID)
err = plug.SetUpPod("podNamespace", "podName", containerID, map[string]string{})
if err != nil {
t.Errorf("Expected nil: %v", err)
}
Expand Down
34 changes: 17 additions & 17 deletions pkg/kubelet/network/kubenet/kubenet_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func (plugin *kubenetNetworkPlugin) Capabilities() utilsets.Int {
// setup sets up networking through CNI using the given ns/name and sandbox ID.
// TODO: Don't pass the pod to this method, it only needs it for bandwidth
// shaping and hostport management.
func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kubecontainer.ContainerID, pod *v1.Pod) error {
func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kubecontainer.ContainerID, pod *v1.Pod, annotations map[string]string) error {
// Bring up container loopback interface
if _, err := plugin.addContainerToNetwork(plugin.loConfig, "lo", namespace, name, id); err != nil {
return err
Expand Down Expand Up @@ -359,23 +359,22 @@ func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kube

plugin.podIPs[id] = ip4.String()

// The first SetUpPod call creates the bridge; get a shaper for the sake of initialization
// TODO: replace with CNI traffic shaper plugin
shaper := plugin.shaper()
ingress, egress, err := bandwidth.ExtractPodBandwidthResources(annotations)
if err != nil {
return fmt.Errorf("Error reading pod bandwidth annotations: %v", err)
}
if egress != nil || ingress != nil {
if err := shaper.ReconcileCIDR(fmt.Sprintf("%s/32", ip4.String()), egress, ingress); err != nil {
return fmt.Errorf("Failed to add pod to shaper: %v", err)
}
}

// The host can choose to not support "legacy" features. The remote
// shim doesn't support it (#35457), but the kubelet does.
if plugin.host.SupportsLegacyFeatures() {
// The first SetUpPod call creates the bridge; get a shaper for the sake of
// initialization
shaper := plugin.shaper()

ingress, egress, err := bandwidth.ExtractPodBandwidthResources(pod.Annotations)
if err != nil {
return fmt.Errorf("Error reading pod bandwidth annotations: %v", err)
}
if egress != nil || ingress != nil {
if err := shaper.ReconcileCIDR(fmt.Sprintf("%s/32", ip4.String()), egress, ingress); err != nil {
return fmt.Errorf("Failed to add pod to shaper: %v", err)
}
}

// Open any hostport the pod's containers want
activePodPortMappings, err := plugin.getPodPortMappings()
if err != nil {
Expand All @@ -387,6 +386,7 @@ func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kube
return err
}
} else {
// TODO: replace with CNI port-forwarding plugin
portMappings, err := plugin.host.GetPodPortMappings(id.ID)
if err != nil {
return err
Expand All @@ -406,7 +406,7 @@ func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kube
return nil
}

func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID) error {
func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID, annotations map[string]string) error {
plugin.mu.Lock()
defer plugin.mu.Unlock()

Expand All @@ -425,7 +425,7 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k
return fmt.Errorf("Kubenet cannot SetUpPod: %v", err)
}

if err := plugin.setup(namespace, name, id, pod); err != nil {
if err := plugin.setup(namespace, name, id, pod, annotations); err != nil {
// Make sure everything gets cleaned up on errors
podIP, _ := plugin.podIPs[id]
if err := plugin.teardown(namespace, name, id, podIP); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/network/kubenet/kubenet_unsupported.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (plugin *kubenetNetworkPlugin) Name() string {
return "kubenet"
}

func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID) error {
func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID, annotations map[string]string) error {
return fmt.Errorf("Kubenet is not supported in this build")
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/kubelet/network/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type NetworkPlugin interface {
// the pod has been created but before the other containers of the
// pod are launched.
// TODO: rename podInfraContainerID to sandboxID
SetUpPod(namespace string, name string, podInfraContainerID kubecontainer.ContainerID) error
SetUpPod(namespace string, name string, podInfraContainerID kubecontainer.ContainerID, annotations map[string]string) error

// TearDownPod is the method called before a pod's infra container will be deleted
// TODO: rename podInfraContainerID to sandboxID
Expand Down Expand Up @@ -235,7 +235,7 @@ func (plugin *NoopNetworkPlugin) Capabilities() utilsets.Int {
return utilsets.NewInt()
}

func (plugin *NoopNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID) error {
func (plugin *NoopNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID, annotations map[string]string) error {
return nil
}

Expand Down Expand Up @@ -389,13 +389,13 @@ func (pm *PluginManager) GetPodNetworkStatus(podNamespace, podName string, id ku
return netStatus, nil
}

func (pm *PluginManager) SetUpPod(podNamespace, podName string, id kubecontainer.ContainerID) error {
func (pm *PluginManager) SetUpPod(podNamespace, podName string, id kubecontainer.ContainerID, annotations map[string]string) error {
fullPodName := kubecontainer.BuildPodFullName(podName, podNamespace)
pm.podLock(fullPodName).Lock()
defer pm.podUnlock(fullPodName)

glog.V(3).Infof("Calling network plugin %s to set up pod %q", pm.plugin.Name(), fullPodName)
if err := pm.plugin.SetUpPod(podNamespace, podName, id); err != nil {
if err := pm.plugin.SetUpPod(podNamespace, podName, id, annotations); err != nil {
return fmt.Errorf("NetworkPlugin %s failed to set up pod %q network: %v", pm.plugin.Name(), fullPodName, err)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/network/testing/mock_network_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (_mr *_MockNetworkPluginRecorder) Name() *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Name")
}

func (_m *MockNetworkPlugin) SetUpPod(_param0 string, _param1 string, _param2 container.ContainerID) error {
func (_m *MockNetworkPlugin) SetUpPod(_param0 string, _param1 string, _param2 container.ContainerID, annotations map[string]string) error {
ret := _m.ctrl.Call(_m, "SetUpPod", _param0, _param1, _param2)
ret0, _ := ret[0].(error)
return ret0
Expand Down
8 changes: 4 additions & 4 deletions pkg/kubelet/network/testing/plugins_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestPluginManager(t *testing.T) {
// concurrently.
allCreatedWg.Wait()

if err := pm.SetUpPod("", name, id); err != nil {
if err := pm.SetUpPod("", name, id, nil); err != nil {
t.Errorf("Failed to set up pod %q: %v", name, err)
return
}
Expand Down Expand Up @@ -128,7 +128,7 @@ func (p *hookableFakeNetworkPlugin) Capabilities() utilsets.Int {
return utilsets.NewInt()
}

func (p *hookableFakeNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID) error {
func (p *hookableFakeNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID, annotations map[string]string) error {
if p.setupHook != nil {
p.setupHook(namespace, name, id)
}
Expand Down Expand Up @@ -179,7 +179,7 @@ func TestMultiPodParallelNetworkOps(t *testing.T) {
// Setup will block on the runner pod completing. If network
// operations locking isn't correct (eg pod network operations
// block other pods) setUpPod() will never return.
if err := pm.SetUpPod("", podName, containerID); err != nil {
if err := pm.SetUpPod("", podName, containerID, nil); err != nil {
t.Errorf("Failed to set up waiter pod: %v", err)
return
}
Expand All @@ -199,7 +199,7 @@ func TestMultiPodParallelNetworkOps(t *testing.T) {
podName := "runner"
containerID := kubecontainer.ContainerID{ID: podName}

if err := pm.SetUpPod("", podName, containerID); err != nil {
if err := pm.SetUpPod("", podName, containerID, nil); err != nil {
t.Errorf("Failed to set up runner pod: %v", err)
return
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/rkt/rkt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1297,7 +1297,7 @@ func (r *Runtime) setupPodNetwork(pod *v1.Pod) (string, string, error) {

// Set up networking with the network plugin
containerID := kubecontainer.ContainerID{ID: string(pod.UID)}
err = r.network.SetUpPod(pod.Namespace, pod.Name, containerID)
err = r.network.SetUpPod(pod.Namespace, pod.Name, containerID, pod.Annotations)
if err != nil {
return "", "", err
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/util/bandwidth/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ func validateBandwidthIsReasonable(rsrc *resource.Quantity) error {
}

func ExtractPodBandwidthResources(podAnnotations map[string]string) (ingress, egress *resource.Quantity, err error) {
if podAnnotations == nil {
return nil, nil, nil
}
str, found := podAnnotations["kubernetes.io/ingress-bandwidth"]
if found {
ingressValue, err := resource.ParseQuantity(str)
Expand Down