Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various kubenet fixes (panics and bugs and cidrs, oh my) #26398

Merged
merged 6 commits into from
May 29, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
39 changes: 39 additions & 0 deletions pkg/kubelet/network/cni/testing/mock_cni.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// mock_cni is a mock of the `libcni.CNI` interface. It's a handwritten mock
// because there are only two functions to deal with.
package mock_cni

import (
"github.com/appc/cni/libcni"
"github.com/appc/cni/pkg/types"
"github.com/stretchr/testify/mock"
)

type MockCNI struct {
mock.Mock
}

func (m *MockCNI) AddNetwork(net *libcni.NetworkConfig, rt *libcni.RuntimeConf) (*types.Result, error) {
args := m.Called(net, rt)
return args.Get(0).(*types.Result), args.Error(1)
}

func (m *MockCNI) DelNetwork(net *libcni.NetworkConfig, rt *libcni.RuntimeConf) error {
args := m.Called(net, rt)
return args.Error(0)
}
103 changes: 52 additions & 51 deletions pkg/kubelet/network/kubenet/kubenet_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,19 @@ const (
type kubenetNetworkPlugin struct {
network.NoopNetworkPlugin

host network.Host
netConfig *libcni.NetworkConfig
loConfig *libcni.NetworkConfig
cniConfig *libcni.CNIConfig
shaper bandwidth.BandwidthShaper
podCIDRs map[kubecontainer.ContainerID]string
MTU int
mu sync.Mutex //Mutex for protecting podCIDRs map and netConfig
execer utilexec.Interface
nsenterPath string
hairpinMode componentconfig.HairpinMode
hostPortMap map[hostport]closeable
iptables utiliptables.Interface
host network.Host
netConfig *libcni.NetworkConfig
loConfig *libcni.NetworkConfig
cniConfig libcni.CNI
bandwidthShaper bandwidth.BandwidthShaper
mu sync.Mutex //Mutex for protecting podIPs map, netConfig, and shaper initialization
podIPs map[kubecontainer.ContainerID]string
MTU int
execer utilexec.Interface
nsenterPath string
hairpinMode componentconfig.HairpinMode
hostPortMap map[hostport]closeable
iptables utiliptables.Interface
}

func NewPlugin() network.NetworkPlugin {
Expand All @@ -86,7 +86,7 @@ func NewPlugin() network.NetworkPlugin {
iptInterface := utiliptables.New(execer, dbus, protocol)

return &kubenetNetworkPlugin{
podCIDRs: make(map[kubecontainer.ContainerID]string),
podIPs: make(map[kubecontainer.ContainerID]string),
hostPortMap: make(map[hostport]closeable),
MTU: 1460, //TODO: don't hardcode this
execer: utilexec.New(),
Expand Down Expand Up @@ -317,10 +317,14 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k
if err != nil {
return err
}
if res.IP4 == nil || res.IP4.IP.String() == "" {
if res.IP4 == nil {
return fmt.Errorf("CNI plugin reported no IPv4 address for container %v.", id)
}
plugin.podCIDRs[id] = res.IP4.IP.String()
ip4 := res.IP4.IP.IP.To4()
if ip4 == nil {
return fmt.Errorf("CNI plugin reported an invalid IPv4 address for container %v: %+v.", id, res.IP4)
}
plugin.podIPs[id] = ip4.String()

// Put the container bridge into promiscuous mode to force it to accept hairpin packets.
// TODO: Remove this once the kernel bug (#20096) is fixed.
Expand All @@ -335,19 +339,13 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k
}
}

// The first SetUpPod call creates the bridge; ensure shaping is enabled
if plugin.shaper == nil {
plugin.shaper = bandwidth.NewTCShaper(BridgeName)
if plugin.shaper == nil {
return fmt.Errorf("Failed to create bandwidth shaper!")
}
plugin.ensureBridgeTxQueueLen()
plugin.shaper.ReconcileInterface()
}
// The first SetUpPod call creates the bridge; get a shaper for the sake of
// initialization
shaper := plugin.shaper()

if egress != nil || ingress != nil {
ipAddr, _, _ := net.ParseCIDR(plugin.podCIDRs[id])
if err = plugin.shaper.ReconcileCIDR(fmt.Sprintf("%s/32", ipAddr.String()), egress, ingress); err != nil {
ipAddr := plugin.podIPs[id]
if err := shaper.ReconcileCIDR(fmt.Sprintf("%s/32", ipAddr), egress, ingress); err != nil {
return fmt.Errorf("Failed to add pod to shaper: %v", err)
}
}
Expand All @@ -369,26 +367,25 @@ func (plugin *kubenetNetworkPlugin) TearDownPod(namespace string, name string, i
return fmt.Errorf("Kubenet needs a PodCIDR to tear down pods")
}

// no cached CIDR is Ok during teardown
cidr, hasCIDR := plugin.podCIDRs[id]
if hasCIDR {
glog.V(5).Infof("Removing pod CIDR %s from shaper", cidr)
// no cached IP is Ok during teardown
podIP, hasIP := plugin.podIPs[id]
if hasIP {
glog.V(5).Infof("Removing pod IP %s from shaper", podIP)
// shaper wants /32
if addr, _, err := net.ParseCIDR(cidr); err != nil {
if err = plugin.shaper.Reset(fmt.Sprintf("%s/32", addr.String())); err != nil {
glog.Warningf("Failed to remove pod CIDR %s from shaper: %v", cidr, err)
}
if err := plugin.shaper().Reset(fmt.Sprintf("%s/32", podIP)); err != nil {
// Possible bandwidth shaping wasn't enabled for this pod anyways
glog.V(4).Infof("Failed to remove pod IP %s from shaper: %v", podIP, err)
}
}
if err := plugin.delContainerFromNetwork(plugin.netConfig, network.DefaultInterfaceName, namespace, name, id); err != nil {
// This is to prevent returning error when TearDownPod is called twice on the same pod. This helps to reduce event pollution.
if !hasCIDR {
if !hasIP {
glog.Warningf("Failed to delete container from kubenet: %v", err)
return nil
}
return err
}
delete(plugin.podCIDRs, id)
delete(plugin.podIPs, id)

plugin.syncHostportsRules()
return nil
Expand All @@ -400,11 +397,8 @@ func (plugin *kubenetNetworkPlugin) GetPodNetworkStatus(namespace string, name s
plugin.mu.Lock()
defer plugin.mu.Unlock()
// Assuming the ip of pod does not change. Try to retrieve ip from kubenet map first.
if cidr, ok := plugin.podCIDRs[id]; ok {
ip, _, err := net.ParseCIDR(strings.Trim(cidr, "\n"))
if err == nil {
return &network.PodNetworkStatus{IP: ip}, nil
}
if podIP, ok := plugin.podIPs[id]; ok {
return &network.PodNetworkStatus{IP: net.ParseIP(podIP)}, nil
}

netnsPath, err := plugin.host.GetRuntime().GetNetNS(id)
Expand All @@ -429,7 +423,7 @@ func (plugin *kubenetNetworkPlugin) GetPodNetworkStatus(namespace string, name s
if err != nil {
return nil, fmt.Errorf("Kubenet failed to parse ip from output %s due to %v", output, err)
}
plugin.podCIDRs[id] = ip.String()
plugin.podIPs[id] = ip.String()
return &network.PodNetworkStatus{IP: ip}, nil
}

Expand Down Expand Up @@ -549,7 +543,7 @@ func (plugin *kubenetNetworkPlugin) openPodHostports(pod *api.Pod) (map[hostport
//syncHostportMap syncs newly opened hostports to kubenet on successful pod setup. If pod setup failed, then clean up.
func (plugin *kubenetNetworkPlugin) syncHostportMap(id kubecontainer.ContainerID, hostportMap map[hostport]closeable) {
// if pod ip cannot be retrieved from podCIDR, then assume pod setup failed.
if _, ok := plugin.podCIDRs[id]; !ok {
if _, ok := plugin.podIPs[id]; !ok {
for hp, socket := range hostportMap {
err := socket.Close()
if err != nil {
Expand Down Expand Up @@ -580,23 +574,18 @@ func (plugin *kubenetNetworkPlugin) gatherAllHostports() (map[api.ContainerPort]
}
}
// Assuming if kubenet has the pod's ip, the pod is alive and its host port should be presented.
cidr, ok := plugin.podCIDRs[podInfraContainerId]
podIP, ok := plugin.podIPs[podInfraContainerId]
if !ok {
// The POD has been delete. Ignore
continue
}
podIP, _, err := net.ParseCIDR(strings.Trim(cidr, "\n"))
if err != nil {
glog.V(3).Info("Failed to retrieve pod ip for %s-%s: %v", p.Namespace, p.Name, err)
continue
}
// Need the complete api.Pod object
pod, ok := plugin.host.GetPodByName(p.Namespace, p.Name)
if ok {
for _, container := range pod.Spec.Containers {
for _, port := range container.Ports {
if port.HostPort != 0 {
podHostportMap[port] = targetPod{podFullName: kubecontainer.GetPodFullName(pod), podIP: podIP.String()}
podHostportMap[port] = targetPod{podFullName: kubecontainer.GetPodFullName(pod), podIP: podIP}
}
}
}
Expand Down Expand Up @@ -821,3 +810,15 @@ func (plugin *kubenetNetworkPlugin) cleanupHostportMap(containerPortMap map[api.
}
}
}

// shaper retrieves the bandwidth shaper and, if it hasn't been fetched before,
// initializes it and ensures the bridge is appropriately configured
// This function should only be called while holding the `plugin.mu` lock
func (plugin *kubenetNetworkPlugin) shaper() bandwidth.BandwidthShaper {
if plugin.bandwidthShaper == nil {
plugin.bandwidthShaper = bandwidth.NewTCShaper(BridgeName)
plugin.ensureBridgeTxQueueLen()
plugin.bandwidthShaper.ReconcileInterface()
}
return plugin.bandwidthShaper
}
60 changes: 52 additions & 8 deletions pkg/kubelet/network/kubenet/kubenet_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,36 @@ package kubenet
import (
"fmt"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"testing"

kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/network/cni/testing"
nettest "k8s.io/kubernetes/pkg/kubelet/network/testing"
"k8s.io/kubernetes/pkg/util/bandwidth"
"k8s.io/kubernetes/pkg/util/exec"
"testing"
ipttest "k8s.io/kubernetes/pkg/util/iptables/testing"
)

func newFakeKubenetPlugin(initMap map[kubecontainer.ContainerID]string, execer exec.Interface, host network.Host) network.NetworkPlugin {
// test it fulfills the NetworkPlugin interface
var _ network.NetworkPlugin = &kubenetNetworkPlugin{}

func newFakeKubenetPlugin(initMap map[kubecontainer.ContainerID]string, execer exec.Interface, host network.Host) *kubenetNetworkPlugin {
return &kubenetNetworkPlugin{
podCIDRs: initMap,
execer: execer,
MTU: 1460,
host: host,
podIPs: initMap,
execer: execer,
MTU: 1460,
host: host,
}
}

func TestGetPodNetworkStatus(t *testing.T) {
podIPMap := make(map[kubecontainer.ContainerID]string)
podIPMap[kubecontainer.ContainerID{ID: "1"}] = "10.245.0.2/32"
podIPMap[kubecontainer.ContainerID{ID: "2"}] = "10.245.0.3/32"
podIPMap[kubecontainer.ContainerID{ID: "1"}] = "10.245.0.2"
podIPMap[kubecontainer.ContainerID{ID: "2"}] = "10.245.0.3"

testCases := []struct {
id string
Expand Down Expand Up @@ -111,4 +121,38 @@ func TestGetPodNetworkStatus(t *testing.T) {
}
}

// TestTeardownBeforeSetUp tests that a `TearDown` call does call
// `shaper.Reset`
func TestTeardownCallsShaper(t *testing.T) {
fexec := &exec.FakeExec{
CommandScript: []exec.FakeCommandAction{},
LookPathFunc: func(file string) (string, error) {
return fmt.Sprintf("/fake-bin/%s", file), nil
},
}
fhost := nettest.NewFakeHost(nil)
fshaper := &bandwidth.FakeShaper{}
mockcni := &mock_cni.MockCNI{}
kubenet := newFakeKubenetPlugin(map[kubecontainer.ContainerID]string{}, fexec, fhost)
kubenet.cniConfig = mockcni
kubenet.iptables = ipttest.NewFake()
kubenet.bandwidthShaper = fshaper

mockcni.On("DelNetwork", mock.AnythingOfType("*libcni.NetworkConfig"), mock.AnythingOfType("*libcni.RuntimeConf")).Return(nil)

details := make(map[string]interface{})
details[network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE_DETAIL_CIDR] = "10.0.0.1/24"
kubenet.Event(network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE, details)

existingContainerID := kubecontainer.BuildContainerID("docker", "123")
kubenet.podIPs[existingContainerID] = "10.0.0.1"

if err := kubenet.TearDownPod("namespace", "name", existingContainerID); err != nil {
t.Fatalf("Unexpected error in TearDownPod: %v", err)
}
assert.Equal(t, []string{"10.0.0.1/32"}, fshaper.ResetCIDRs, "shaper.Reset should have been called")

mockcni.AssertExpectations(t)
}

//TODO: add unit test for each implementation of network plugin interface