From 3ef342d42a30f4b8f4cb55dc67b3e93be9daecd0 Mon Sep 17 00:00:00 2001 From: Ravi Sankar Penta Date: Mon, 26 Jun 2017 13:11:47 -0700 Subject: [PATCH] Bug 1453190 - Fix pod update operation Use pod sandbox ID to update the pod as opposed to pod container ID. OVS flow note identified by sandbox ID is desired as network namespace is held by the pod sandbox and pod could have many containers and single container ID may not represent all the pod ovs flows. Since we can't use kubelet 'Host' (explanation refer commit: f1118459), we use runtime shim endpoint to connect to runtime service using gRPC. This is the same mechanism used by kubelet(GenericKubeletRuntimeManager) to talk to runtime service(docker/rkt). --- pkg/cmd/server/kubernetes/node/node_config.go | 3 +- pkg/sdn/plugin/cniserver/cniserver.go | 4 +- pkg/sdn/plugin/common.go | 8 --- pkg/sdn/plugin/node.go | 32 +++++++++-- pkg/sdn/plugin/ovscontroller.go | 20 +++---- pkg/sdn/plugin/ovscontroller_test.go | 34 ++++++------ pkg/sdn/plugin/pod_linux.go | 13 ++--- pkg/sdn/plugin/pod_test.go | 4 +- pkg/sdn/plugin/runtime.go | 55 +++++++++++++++++++ 9 files changed, 121 insertions(+), 52 deletions(-) create mode 100644 pkg/sdn/plugin/runtime.go diff --git a/pkg/cmd/server/kubernetes/node/node_config.go b/pkg/cmd/server/kubernetes/node/node_config.go index f2ad136023ee..3204cb64b9fa 100644 --- a/pkg/cmd/server/kubernetes/node/node_config.go +++ b/pkg/cmd/server/kubernetes/node/node_config.go @@ -237,7 +237,8 @@ func BuildKubernetesNodeConfig(options configapi.NodeConfig, enableProxy, enable internalKubeInformers := kinternalinformers.NewSharedInformerFactory(kubeClient, proxyconfig.ConfigSyncPeriod) // Initialize SDN before building kubelet config so it can modify option - sdnPlugin, err := sdnplugin.NewNodePlugin(options.NetworkConfig.NetworkPluginName, originClient, kubeClient, internalKubeInformers, options.NodeName, options.NodeIP, options.NetworkConfig.MTU, proxyconfig.KubeProxyConfiguration) + sdnPlugin, err := sdnplugin.NewNodePlugin(options.NetworkConfig.NetworkPluginName, originClient, kubeClient, internalKubeInformers, options.NodeName, options.NodeIP, + options.NetworkConfig.MTU, proxyconfig.KubeProxyConfiguration, options.DockerConfig.DockerShimSocket) if err != nil { return nil, fmt.Errorf("SDN initialization failed: %v", err) } diff --git a/pkg/sdn/plugin/cniserver/cniserver.go b/pkg/sdn/plugin/cniserver/cniserver.go index 482ccefaf09f..62a3ab4a6ac4 100644 --- a/pkg/sdn/plugin/cniserver/cniserver.go +++ b/pkg/sdn/plugin/cniserver/cniserver.go @@ -71,7 +71,7 @@ type PodRequest struct { // kubernetes pod name PodName string // kubernetes container ID - ContainerId string + SandboxID string // kernel network namespace path Netns string // Channel for returning the operation result to the CNIServer @@ -190,7 +190,7 @@ func cniRequestToPodRequest(r *http.Request) (*PodRequest, error) { Result: make(chan *PodResult), } - req.ContainerId, ok = cr.Env["CNI_CONTAINERID"] + req.SandboxID, ok = cr.Env["CNI_CONTAINERID"] if !ok { return nil, fmt.Errorf("missing CNI_CONTAINERID") } diff --git a/pkg/sdn/plugin/common.go b/pkg/sdn/plugin/common.go index 7da88fbc943d..c76f4ba9418b 100644 --- a/pkg/sdn/plugin/common.go +++ b/pkg/sdn/plugin/common.go @@ -21,16 +21,8 @@ import ( kapi "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/apis/extensions" kinternalinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" - kcontainer "k8s.io/kubernetes/pkg/kubelet/container" ) -func getPodContainerID(pod *kapi.Pod) string { - if len(pod.Status.ContainerStatuses) > 0 { - return kcontainer.ParseContainerID(pod.Status.ContainerStatuses[0].ContainerID).ID - } - return "" -} - func hostSubnetToString(subnet *osapi.HostSubnet) string { return fmt.Sprintf("%s (host: %q, ip: %q, subnet: %q)", subnet.Name, subnet.Host, subnet.HostIP, subnet.Subnet) } diff --git a/pkg/sdn/plugin/node.go b/pkg/sdn/plugin/node.go index c3a74c9f9457..646065aa049f 100644 --- a/pkg/sdn/plugin/node.go +++ b/pkg/sdn/plugin/node.go @@ -30,8 +30,11 @@ import ( "k8s.io/kubernetes/pkg/apis/componentconfig" kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" kinternalinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" + kubeletapi "k8s.io/kubernetes/pkg/kubelet/api" + kruntimeapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" "k8s.io/kubernetes/pkg/kubelet/dockertools" knetwork "k8s.io/kubernetes/pkg/kubelet/network" + ktypes "k8s.io/kubernetes/pkg/kubelet/types" kexec "k8s.io/kubernetes/pkg/util/exec" ) @@ -76,10 +79,16 @@ type OsdnNode struct { clearLbr0IptablesRule bool kubeInformers kinternalinformers.SharedInformerFactory + + // Holds runtime endpoint shim to make SDN <-> runtime communication + runtimeEndpoint string + runtimeRequestTimeout time.Duration + runtimeService kubeletapi.RuntimeService } // Called by higher layers to create the plugin SDN node instance -func NewNodePlugin(pluginName string, osClient *osclient.Client, kClient kclientset.Interface, kubeInformers kinternalinformers.SharedInformerFactory, hostname string, selfIP string, mtu uint32, proxyConfig componentconfig.KubeProxyConfiguration) (*OsdnNode, error) { +func NewNodePlugin(pluginName string, osClient *osclient.Client, kClient kclientset.Interface, kubeInformers kinternalinformers.SharedInformerFactory, + hostname string, selfIP string, mtu uint32, proxyConfig componentconfig.KubeProxyConfiguration, runtimeEndpoint string) (*OsdnNode, error) { var policy osdnPolicy var pluginId int var minOvsVersion string @@ -153,6 +162,12 @@ func NewNodePlugin(pluginName string, osClient *osclient.Client, kClient kclient egressPolicies: make(map[uint32][]osapi.EgressNetworkPolicy), egressDNS: NewEgressDNS(), kubeInformers: kubeInformers, + + runtimeEndpoint: runtimeEndpoint, + // 2 minutes is the current default value used in kubelet + runtimeRequestTimeout: 2 * time.Minute, + // populated on demand + runtimeService: nil, } if err := plugin.dockerPreCNICleanup(); err != nil { @@ -329,17 +344,24 @@ func (node *OsdnNode) Start() error { // FIXME: this should eventually go into kubelet via a CNI UPDATE/CHANGE action // See https://github.com/containernetworking/cni/issues/89 func (node *OsdnNode) UpdatePod(pod kapi.Pod) error { + filter := &kruntimeapi.PodSandboxFilter{ + LabelSelector: map[string]string{ktypes.KubernetesPodUIDLabel: string(pod.UID)}, + } + sandboxID, err := node.getPodSandboxID(filter) + if err != nil { + return err + } + req := &cniserver.PodRequest{ Command: cniserver.CNI_UPDATE, PodNamespace: pod.Namespace, PodName: pod.Name, - ContainerId: getPodContainerID(&pod), - // netns is read from docker if needed, since we don't get it from kubelet - Result: make(chan *cniserver.PodResult), + SandboxID: sandboxID, + Result: make(chan *cniserver.PodResult), } // Send request and wait for the result - _, err := node.podManager.handleCNIRequest(req) + _, err = node.podManager.handleCNIRequest(req) return err } diff --git a/pkg/sdn/plugin/ovscontroller.go b/pkg/sdn/plugin/ovscontroller.go index 8ebd4e0adb41..ec8b615712e0 100644 --- a/pkg/sdn/plugin/ovscontroller.go +++ b/pkg/sdn/plugin/ovscontroller.go @@ -239,13 +239,13 @@ func (oc *ovsController) cleanupPodFlows(podIP string) error { return otx.EndTransaction() } -func getPodNote(containerID string) (string, error) { - bytes, err := hex.DecodeString(containerID) +func getPodNote(sandboxID string) (string, error) { + bytes, err := hex.DecodeString(sandboxID) if err != nil { - return "", fmt.Errorf("failed to decode container ID %q: %v", containerID, err) + return "", fmt.Errorf("failed to decode sandbox ID %q: %v", sandboxID, err) } if len(bytes) != 32 { - return "", fmt.Errorf("invalid container ID %q length; expected 32 bytes", containerID) + return "", fmt.Errorf("invalid sandbox ID %q length; expected 32 bytes", sandboxID) } var note string for _, b := range bytes { @@ -257,8 +257,8 @@ func getPodNote(containerID string) (string, error) { return note, nil } -func (oc *ovsController) SetUpPod(hostVeth, podIP, podMAC, containerID string, vnid uint32) (int, error) { - note, err := getPodNote(containerID) +func (oc *ovsController) SetUpPod(hostVeth, podIP, podMAC, sandboxID string, vnid uint32) (int, error) { + note, err := getPodNote(sandboxID) if err != nil { return -1, err } @@ -308,8 +308,8 @@ func (oc *ovsController) SetPodBandwidth(hostVeth string, ingressBPS, egressBPS return nil } -func getPodDetailsByContainerID(flows []string, containerID string) (int, string, string, string, error) { - note, err := getPodNote(containerID) +func getPodDetailsBySandboxID(flows []string, sandboxID string) (int, string, string, string, error) { + note, err := getPodNote(sandboxID) if err != nil { return 0, "", "", "", err } @@ -350,12 +350,12 @@ func getPodDetailsByContainerID(flows []string, containerID string) (int, string return 0, "", "", "", fmt.Errorf("failed to find pod details from OVS flows") } -func (oc *ovsController) UpdatePod(containerID string, vnid uint32) error { +func (oc *ovsController) UpdatePod(sandboxID string, vnid uint32) error { flows, err := oc.ovs.DumpFlows() if err != nil { return err } - ofport, podIP, podMAC, note, err := getPodDetailsByContainerID(flows, containerID) + ofport, podIP, podMAC, note, err := getPodDetailsBySandboxID(flows, sandboxID) if err != nil { return err } diff --git a/pkg/sdn/plugin/ovscontroller_test.go b/pkg/sdn/plugin/ovscontroller_test.go index 2c17264ba78e..e7c042bd3518 100644 --- a/pkg/sdn/plugin/ovscontroller_test.go +++ b/pkg/sdn/plugin/ovscontroller_test.go @@ -215,16 +215,16 @@ func TestOVSService(t *testing.T) { } const ( - containerID string = "bcb5d8d287fcf97458c48ad643b101079e3bc265a94e097e7407440716112f69" - containerNote string = "bc.b5.d8.d2.87.fc.f9.74.58.c4.8a.d6.43.b1.01.07.9e.3b.c2.65.a9.4e.09.7e.74.07.44.07.16.11.2f.69" - containerNoteAction string = "note:" + containerNote + sandboxID string = "bcb5d8d287fcf97458c48ad643b101079e3bc265a94e097e7407440716112f69" + sandboxNote string = "bc.b5.d8.d2.87.fc.f9.74.58.c4.8a.d6.43.b1.01.07.9e.3b.c2.65.a9.4e.09.7e.74.07.44.07.16.11.2f.69" + sandboxNoteAction string = "note:" + sandboxNote ) func TestOVSPod(t *testing.T) { ovsif, oc, origFlows := setup(t) // Add - ofport, err := oc.SetUpPod("veth1", "10.128.0.2", "11:22:33:44:55:66", containerID, 42) + ofport, err := oc.SetUpPod("veth1", "10.128.0.2", "11:22:33:44:55:66", sandboxID, 42) if err != nil { t.Fatalf("Unexpected error adding pod rules: %v", err) } @@ -236,7 +236,7 @@ func TestOVSPod(t *testing.T) { err = assertFlowChanges(origFlows, flows, flowChange{ kind: flowAdded, - match: []string{"table=20", fmt.Sprintf("in_port=%d", ofport), "arp", "10.128.0.2", "11:22:33:44:55:66", containerNoteAction}, + match: []string{"table=20", fmt.Sprintf("in_port=%d", ofport), "arp", "10.128.0.2", "11:22:33:44:55:66", sandboxNoteAction}, }, flowChange{ kind: flowAdded, @@ -262,7 +262,7 @@ func TestOVSPod(t *testing.T) { } // Update - err = oc.UpdatePod(containerID, 43) + err = oc.UpdatePod(sandboxID, 43) if err != nil { t.Fatalf("Unexpected error adding pod rules: %v", err) } @@ -274,7 +274,7 @@ func TestOVSPod(t *testing.T) { err = assertFlowChanges(origFlows, flows, flowChange{ kind: flowAdded, - match: []string{"table=20", fmt.Sprintf("in_port=%d", ofport), "arp", "10.128.0.2", "11:22:33:44:55:66", containerNoteAction}, + match: []string{"table=20", fmt.Sprintf("in_port=%d", ofport), "arp", "10.128.0.2", "11:22:33:44:55:66", sandboxNoteAction}, }, flowChange{ kind: flowAdded, @@ -317,18 +317,18 @@ func TestOVSPod(t *testing.T) { func TestGetPodDetails(t *testing.T) { type testcase struct { - containerID string - flows []string - ofport int - ip string - mac string - note string - errStr string + sandboxID string + flows []string + ofport int + ip string + mac string + note string + errStr string } testcases := []testcase{ { - containerID: containerID, + sandboxID: sandboxID, flows: []string{ "cookie=0x0, duration=12.243s, table=0, n_packets=0, n_bytes=0, priority=250,ip,in_port=2,nw_dst=224.0.0.0/4 actions=drop", "cookie=0x0, duration=12.258s, table=0, n_packets=0, n_bytes=0, priority=200,arp,in_port=1,arp_spa=10.128.0.0/14,arp_tpa=10.130.0.0/23 actions=move:NXM_NX_TUN_ID[0..31]->NXM_NX_REG0[],goto_table:10", @@ -388,12 +388,12 @@ func TestGetPodDetails(t *testing.T) { ofport: 3, ip: "10.130.0.2", mac: "4a:77:32:e4:ab:9d", - note: containerNote, + note: sandboxNote, }, } for _, tc := range testcases { - ofport, ip, mac, note, err := getPodDetailsByContainerID(tc.flows, tc.containerID) + ofport, ip, mac, note, err := getPodDetailsBySandboxID(tc.flows, tc.sandboxID) if err != nil { if tc.errStr != "" { if !strings.Contains(err.Error(), tc.errStr) { diff --git a/pkg/sdn/plugin/pod_linux.go b/pkg/sdn/plugin/pod_linux.go index 4855c77a156c..f3b8b1ca4291 100644 --- a/pkg/sdn/plugin/pod_linux.go +++ b/pkg/sdn/plugin/pod_linux.go @@ -235,9 +235,9 @@ func (m *podManager) setup(req *cniserver.PodRequest) (*cnitypes.Result, *runnin return nil, nil, err } - ipamResult, err := m.ipamAdd(req.Netns, req.ContainerId) + ipamResult, err := m.ipamAdd(req.Netns, req.SandboxID) if err != nil { - return nil, nil, fmt.Errorf("failed to run IPAM for %v: %v", req.ContainerId, err) + return nil, nil, fmt.Errorf("failed to run IPAM for %v: %v", req.SandboxID, err) } podIP := ipamResult.IP4.IP.IP @@ -245,7 +245,7 @@ func (m *podManager) setup(req *cniserver.PodRequest) (*cnitypes.Result, *runnin var success bool defer func() { if !success { - m.ipamDel(req.ContainerId) + m.ipamDel(req.SandboxID) if err := m.hostportSyncer.SyncHostports(TUN, m.getRunningPods()); err != nil { glog.Warningf("failed syncing hostports: %v", err) } @@ -309,7 +309,7 @@ func (m *podManager) setup(req *cniserver.PodRequest) (*cnitypes.Result, *runnin return nil, nil, err } - ofport, err := m.ovs.SetUpPod(hostVethName, podIP.String(), contVethMac, req.ContainerId, vnid) + ofport, err := m.ovs.SetUpPod(hostVethName, podIP.String(), contVethMac, req.SandboxID, vnid) if err != nil { return nil, nil, err } @@ -329,10 +329,9 @@ func (m *podManager) update(req *cniserver.PodRequest) (uint32, error) { return 0, err } - if err := m.ovs.UpdatePod(req.ContainerId, vnid); err != nil { + if err := m.ovs.UpdatePod(req.SandboxID, vnid); err != nil { return 0, err } - return vnid, nil } @@ -359,7 +358,7 @@ func (m *podManager) teardown(req *cniserver.PodRequest) error { } } - if err := m.ipamDel(req.ContainerId); err != nil { + if err := m.ipamDel(req.SandboxID); err != nil { errList = append(errList, err) } diff --git a/pkg/sdn/plugin/pod_test.go b/pkg/sdn/plugin/pod_test.go index 0a07397632fc..d728bb8e2257 100644 --- a/pkg/sdn/plugin/pod_test.go +++ b/pkg/sdn/plugin/pod_test.go @@ -329,7 +329,7 @@ func TestPodManager(t *testing.T) { Command: op.command, PodNamespace: op.namespace, PodName: op.name, - ContainerId: "asd;lfkajsdflkajfs", + SandboxID: "asd;lfkajsdflkajfs", Netns: "/some/network/namespace", Result: make(chan *cniserver.PodResult), } @@ -424,7 +424,7 @@ func TestDirectPodUpdate(t *testing.T) { Command: op.command, PodNamespace: op.namespace, PodName: op.name, - ContainerId: "asdfasdfasdfaf", + SandboxID: "asdfasdfasdfaf", Result: make(chan *cniserver.PodResult), } diff --git a/pkg/sdn/plugin/runtime.go b/pkg/sdn/plugin/runtime.go new file mode 100644 index 000000000000..a58acad73707 --- /dev/null +++ b/pkg/sdn/plugin/runtime.go @@ -0,0 +1,55 @@ +package plugin + +import ( + "fmt" + "time" + + kwait "k8s.io/apimachinery/pkg/util/wait" + kubeletapi "k8s.io/kubernetes/pkg/kubelet/api" + kruntimeapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" + kubeletremote "k8s.io/kubernetes/pkg/kubelet/remote" +) + +func (node *OsdnNode) getRuntimeService() (kubeletapi.RuntimeService, error) { + if node.runtimeService != nil { + return node.runtimeService, nil + } + + // Kubelet starts asynchronously and when we get an Update op, kubelet may not have created runtime endpoint. + // So try couple of times before bailing out (~30 seconds timeout). + err := kwait.ExponentialBackoff( + kwait.Backoff{ + Duration: 100 * time.Millisecond, + Factor: 1.2, + Steps: 23, + }, + func() (bool, error) { + runtimeService, err := kubeletremote.NewRemoteRuntimeService(node.runtimeEndpoint, node.runtimeRequestTimeout) + if err != nil { + // Wait longer + return false, nil + } + node.runtimeService = runtimeService + return true, nil + }) + if err != nil { + return nil, fmt.Errorf("Failed to fetch runtime service: %v", err) + } + return node.runtimeService, nil +} + +func (node *OsdnNode) getPodSandboxID(filter *kruntimeapi.PodSandboxFilter) (string, error) { + runtimeService, err := node.getRuntimeService() + if err != nil { + return "", err + } + + podSandboxList, err := runtimeService.ListPodSandbox(filter) + if err != nil { + return "", fmt.Errorf("Failed to list pod sandboxes: %v", err) + } + if len(podSandboxList) == 0 { + return "", fmt.Errorf("Pod sandbox not found for filter: %v", filter) + } + return podSandboxList[0].Id, nil +}