Skip to content

Commit

Permalink
Merge pull request #14892 from pravisankar/fix-update-pod
Browse files Browse the repository at this point in the history
Merged by openshift-bot
  • Loading branch information
OpenShift Bot committed Jul 2, 2017
2 parents 8f51071 + 3ef342d commit 79b4411
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 52 deletions.
3 changes: 2 additions & 1 deletion pkg/cmd/server/kubernetes/node/node_config.go
Expand Up @@ -237,7 +237,8 @@ func BuildKubernetesNodeConfig(options configapi.NodeConfig, enableProxy, enable
internalKubeInformers := kinternalinformers.NewSharedInformerFactory(kubeClient, proxyconfig.ConfigSyncPeriod)

// Initialize SDN before building kubelet config so it can modify option
sdnPlugin, err := sdnplugin.NewNodePlugin(options.NetworkConfig.NetworkPluginName, originClient, kubeClient, internalKubeInformers, options.NodeName, options.NodeIP, options.NetworkConfig.MTU, proxyconfig.KubeProxyConfiguration)
sdnPlugin, err := sdnplugin.NewNodePlugin(options.NetworkConfig.NetworkPluginName, originClient, kubeClient, internalKubeInformers, options.NodeName, options.NodeIP,
options.NetworkConfig.MTU, proxyconfig.KubeProxyConfiguration, options.DockerConfig.DockerShimSocket)
if err != nil {
return nil, fmt.Errorf("SDN initialization failed: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/sdn/plugin/cniserver/cniserver.go
Expand Up @@ -71,7 +71,7 @@ type PodRequest struct {
// kubernetes pod name
PodName string
// kubernetes container ID
ContainerId string
SandboxID string
// kernel network namespace path
Netns string
// Channel for returning the operation result to the CNIServer
Expand Down Expand Up @@ -190,7 +190,7 @@ func cniRequestToPodRequest(r *http.Request) (*PodRequest, error) {
Result: make(chan *PodResult),
}

req.ContainerId, ok = cr.Env["CNI_CONTAINERID"]
req.SandboxID, ok = cr.Env["CNI_CONTAINERID"]
if !ok {
return nil, fmt.Errorf("missing CNI_CONTAINERID")
}
Expand Down
8 changes: 0 additions & 8 deletions pkg/sdn/plugin/common.go
Expand Up @@ -21,16 +21,8 @@ import (
kapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/extensions"
kinternalinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
kcontainer "k8s.io/kubernetes/pkg/kubelet/container"
)

func getPodContainerID(pod *kapi.Pod) string {
if len(pod.Status.ContainerStatuses) > 0 {
return kcontainer.ParseContainerID(pod.Status.ContainerStatuses[0].ContainerID).ID
}
return ""
}

func hostSubnetToString(subnet *osapi.HostSubnet) string {
return fmt.Sprintf("%s (host: %q, ip: %q, subnet: %q)", subnet.Name, subnet.Host, subnet.HostIP, subnet.Subnet)
}
Expand Down
32 changes: 27 additions & 5 deletions pkg/sdn/plugin/node.go
Expand Up @@ -30,8 +30,11 @@ import (
"k8s.io/kubernetes/pkg/apis/componentconfig"
kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
kinternalinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
kubeletapi "k8s.io/kubernetes/pkg/kubelet/api"
kruntimeapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
knetwork "k8s.io/kubernetes/pkg/kubelet/network"
ktypes "k8s.io/kubernetes/pkg/kubelet/types"
kexec "k8s.io/kubernetes/pkg/util/exec"
)

Expand Down Expand Up @@ -76,10 +79,16 @@ type OsdnNode struct {
clearLbr0IptablesRule bool

kubeInformers kinternalinformers.SharedInformerFactory

// Holds runtime endpoint shim to make SDN <-> runtime communication
runtimeEndpoint string
runtimeRequestTimeout time.Duration
runtimeService kubeletapi.RuntimeService
}

// Called by higher layers to create the plugin SDN node instance
func NewNodePlugin(pluginName string, osClient *osclient.Client, kClient kclientset.Interface, kubeInformers kinternalinformers.SharedInformerFactory, hostname string, selfIP string, mtu uint32, proxyConfig componentconfig.KubeProxyConfiguration) (*OsdnNode, error) {
func NewNodePlugin(pluginName string, osClient *osclient.Client, kClient kclientset.Interface, kubeInformers kinternalinformers.SharedInformerFactory,
hostname string, selfIP string, mtu uint32, proxyConfig componentconfig.KubeProxyConfiguration, runtimeEndpoint string) (*OsdnNode, error) {
var policy osdnPolicy
var pluginId int
var minOvsVersion string
Expand Down Expand Up @@ -153,6 +162,12 @@ func NewNodePlugin(pluginName string, osClient *osclient.Client, kClient kclient
egressPolicies: make(map[uint32][]osapi.EgressNetworkPolicy),
egressDNS: NewEgressDNS(),
kubeInformers: kubeInformers,

runtimeEndpoint: runtimeEndpoint,
// 2 minutes is the current default value used in kubelet
runtimeRequestTimeout: 2 * time.Minute,
// populated on demand
runtimeService: nil,
}

if err := plugin.dockerPreCNICleanup(); err != nil {
Expand Down Expand Up @@ -329,17 +344,24 @@ func (node *OsdnNode) Start() error {
// FIXME: this should eventually go into kubelet via a CNI UPDATE/CHANGE action
// See https://github.com/containernetworking/cni/issues/89
func (node *OsdnNode) UpdatePod(pod kapi.Pod) error {
filter := &kruntimeapi.PodSandboxFilter{
LabelSelector: map[string]string{ktypes.KubernetesPodUIDLabel: string(pod.UID)},
}
sandboxID, err := node.getPodSandboxID(filter)
if err != nil {
return err
}

req := &cniserver.PodRequest{
Command: cniserver.CNI_UPDATE,
PodNamespace: pod.Namespace,
PodName: pod.Name,
ContainerId: getPodContainerID(&pod),
// netns is read from docker if needed, since we don't get it from kubelet
Result: make(chan *cniserver.PodResult),
SandboxID: sandboxID,
Result: make(chan *cniserver.PodResult),
}

// Send request and wait for the result
_, err := node.podManager.handleCNIRequest(req)
_, err = node.podManager.handleCNIRequest(req)
return err
}

Expand Down
20 changes: 10 additions & 10 deletions pkg/sdn/plugin/ovscontroller.go
Expand Up @@ -239,13 +239,13 @@ func (oc *ovsController) cleanupPodFlows(podIP string) error {
return otx.EndTransaction()
}

func getPodNote(containerID string) (string, error) {
bytes, err := hex.DecodeString(containerID)
func getPodNote(sandboxID string) (string, error) {
bytes, err := hex.DecodeString(sandboxID)
if err != nil {
return "", fmt.Errorf("failed to decode container ID %q: %v", containerID, err)
return "", fmt.Errorf("failed to decode sandbox ID %q: %v", sandboxID, err)
}
if len(bytes) != 32 {
return "", fmt.Errorf("invalid container ID %q length; expected 32 bytes", containerID)
return "", fmt.Errorf("invalid sandbox ID %q length; expected 32 bytes", sandboxID)
}
var note string
for _, b := range bytes {
Expand All @@ -257,8 +257,8 @@ func getPodNote(containerID string) (string, error) {
return note, nil
}

func (oc *ovsController) SetUpPod(hostVeth, podIP, podMAC, containerID string, vnid uint32) (int, error) {
note, err := getPodNote(containerID)
func (oc *ovsController) SetUpPod(hostVeth, podIP, podMAC, sandboxID string, vnid uint32) (int, error) {
note, err := getPodNote(sandboxID)
if err != nil {
return -1, err
}
Expand Down Expand Up @@ -308,8 +308,8 @@ func (oc *ovsController) SetPodBandwidth(hostVeth string, ingressBPS, egressBPS
return nil
}

func getPodDetailsByContainerID(flows []string, containerID string) (int, string, string, string, error) {
note, err := getPodNote(containerID)
func getPodDetailsBySandboxID(flows []string, sandboxID string) (int, string, string, string, error) {
note, err := getPodNote(sandboxID)
if err != nil {
return 0, "", "", "", err
}
Expand Down Expand Up @@ -350,12 +350,12 @@ func getPodDetailsByContainerID(flows []string, containerID string) (int, string
return 0, "", "", "", fmt.Errorf("failed to find pod details from OVS flows")
}

func (oc *ovsController) UpdatePod(containerID string, vnid uint32) error {
func (oc *ovsController) UpdatePod(sandboxID string, vnid uint32) error {
flows, err := oc.ovs.DumpFlows()
if err != nil {
return err
}
ofport, podIP, podMAC, note, err := getPodDetailsByContainerID(flows, containerID)
ofport, podIP, podMAC, note, err := getPodDetailsBySandboxID(flows, sandboxID)
if err != nil {
return err
}
Expand Down
34 changes: 17 additions & 17 deletions pkg/sdn/plugin/ovscontroller_test.go
Expand Up @@ -215,16 +215,16 @@ func TestOVSService(t *testing.T) {
}

const (
containerID string = "bcb5d8d287fcf97458c48ad643b101079e3bc265a94e097e7407440716112f69"
containerNote string = "bc.b5.d8.d2.87.fc.f9.74.58.c4.8a.d6.43.b1.01.07.9e.3b.c2.65.a9.4e.09.7e.74.07.44.07.16.11.2f.69"
containerNoteAction string = "note:" + containerNote
sandboxID string = "bcb5d8d287fcf97458c48ad643b101079e3bc265a94e097e7407440716112f69"
sandboxNote string = "bc.b5.d8.d2.87.fc.f9.74.58.c4.8a.d6.43.b1.01.07.9e.3b.c2.65.a9.4e.09.7e.74.07.44.07.16.11.2f.69"
sandboxNoteAction string = "note:" + sandboxNote
)

func TestOVSPod(t *testing.T) {
ovsif, oc, origFlows := setup(t)

// Add
ofport, err := oc.SetUpPod("veth1", "10.128.0.2", "11:22:33:44:55:66", containerID, 42)
ofport, err := oc.SetUpPod("veth1", "10.128.0.2", "11:22:33:44:55:66", sandboxID, 42)
if err != nil {
t.Fatalf("Unexpected error adding pod rules: %v", err)
}
Expand All @@ -236,7 +236,7 @@ func TestOVSPod(t *testing.T) {
err = assertFlowChanges(origFlows, flows,
flowChange{
kind: flowAdded,
match: []string{"table=20", fmt.Sprintf("in_port=%d", ofport), "arp", "10.128.0.2", "11:22:33:44:55:66", containerNoteAction},
match: []string{"table=20", fmt.Sprintf("in_port=%d", ofport), "arp", "10.128.0.2", "11:22:33:44:55:66", sandboxNoteAction},
},
flowChange{
kind: flowAdded,
Expand All @@ -262,7 +262,7 @@ func TestOVSPod(t *testing.T) {
}

// Update
err = oc.UpdatePod(containerID, 43)
err = oc.UpdatePod(sandboxID, 43)
if err != nil {
t.Fatalf("Unexpected error adding pod rules: %v", err)
}
Expand All @@ -274,7 +274,7 @@ func TestOVSPod(t *testing.T) {
err = assertFlowChanges(origFlows, flows,
flowChange{
kind: flowAdded,
match: []string{"table=20", fmt.Sprintf("in_port=%d", ofport), "arp", "10.128.0.2", "11:22:33:44:55:66", containerNoteAction},
match: []string{"table=20", fmt.Sprintf("in_port=%d", ofport), "arp", "10.128.0.2", "11:22:33:44:55:66", sandboxNoteAction},
},
flowChange{
kind: flowAdded,
Expand Down Expand Up @@ -317,18 +317,18 @@ func TestOVSPod(t *testing.T) {

func TestGetPodDetails(t *testing.T) {
type testcase struct {
containerID string
flows []string
ofport int
ip string
mac string
note string
errStr string
sandboxID string
flows []string
ofport int
ip string
mac string
note string
errStr string
}

testcases := []testcase{
{
containerID: containerID,
sandboxID: sandboxID,
flows: []string{
"cookie=0x0, duration=12.243s, table=0, n_packets=0, n_bytes=0, priority=250,ip,in_port=2,nw_dst=224.0.0.0/4 actions=drop",
"cookie=0x0, duration=12.258s, table=0, n_packets=0, n_bytes=0, priority=200,arp,in_port=1,arp_spa=10.128.0.0/14,arp_tpa=10.130.0.0/23 actions=move:NXM_NX_TUN_ID[0..31]->NXM_NX_REG0[],goto_table:10",
Expand Down Expand Up @@ -388,12 +388,12 @@ func TestGetPodDetails(t *testing.T) {
ofport: 3,
ip: "10.130.0.2",
mac: "4a:77:32:e4:ab:9d",
note: containerNote,
note: sandboxNote,
},
}

for _, tc := range testcases {
ofport, ip, mac, note, err := getPodDetailsByContainerID(tc.flows, tc.containerID)
ofport, ip, mac, note, err := getPodDetailsBySandboxID(tc.flows, tc.sandboxID)
if err != nil {
if tc.errStr != "" {
if !strings.Contains(err.Error(), tc.errStr) {
Expand Down
13 changes: 6 additions & 7 deletions pkg/sdn/plugin/pod_linux.go
Expand Up @@ -235,17 +235,17 @@ func (m *podManager) setup(req *cniserver.PodRequest) (*cnitypes.Result, *runnin
return nil, nil, err
}

ipamResult, err := m.ipamAdd(req.Netns, req.ContainerId)
ipamResult, err := m.ipamAdd(req.Netns, req.SandboxID)
if err != nil {
return nil, nil, fmt.Errorf("failed to run IPAM for %v: %v", req.ContainerId, err)
return nil, nil, fmt.Errorf("failed to run IPAM for %v: %v", req.SandboxID, err)
}
podIP := ipamResult.IP4.IP.IP

// Release any IPAM allocations and hostports if the setup failed
var success bool
defer func() {
if !success {
m.ipamDel(req.ContainerId)
m.ipamDel(req.SandboxID)
if err := m.hostportSyncer.SyncHostports(TUN, m.getRunningPods()); err != nil {
glog.Warningf("failed syncing hostports: %v", err)
}
Expand Down Expand Up @@ -309,7 +309,7 @@ func (m *podManager) setup(req *cniserver.PodRequest) (*cnitypes.Result, *runnin
return nil, nil, err
}

ofport, err := m.ovs.SetUpPod(hostVethName, podIP.String(), contVethMac, req.ContainerId, vnid)
ofport, err := m.ovs.SetUpPod(hostVethName, podIP.String(), contVethMac, req.SandboxID, vnid)
if err != nil {
return nil, nil, err
}
Expand All @@ -329,10 +329,9 @@ func (m *podManager) update(req *cniserver.PodRequest) (uint32, error) {
return 0, err
}

if err := m.ovs.UpdatePod(req.ContainerId, vnid); err != nil {
if err := m.ovs.UpdatePod(req.SandboxID, vnid); err != nil {
return 0, err
}

return vnid, nil
}

Expand All @@ -359,7 +358,7 @@ func (m *podManager) teardown(req *cniserver.PodRequest) error {
}
}

if err := m.ipamDel(req.ContainerId); err != nil {
if err := m.ipamDel(req.SandboxID); err != nil {
errList = append(errList, err)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/sdn/plugin/pod_test.go
Expand Up @@ -329,7 +329,7 @@ func TestPodManager(t *testing.T) {
Command: op.command,
PodNamespace: op.namespace,
PodName: op.name,
ContainerId: "asd;lfkajsdflkajfs",
SandboxID: "asd;lfkajsdflkajfs",
Netns: "/some/network/namespace",
Result: make(chan *cniserver.PodResult),
}
Expand Down Expand Up @@ -424,7 +424,7 @@ func TestDirectPodUpdate(t *testing.T) {
Command: op.command,
PodNamespace: op.namespace,
PodName: op.name,
ContainerId: "asdfasdfasdfaf",
SandboxID: "asdfasdfasdfaf",
Result: make(chan *cniserver.PodResult),
}

Expand Down
55 changes: 55 additions & 0 deletions pkg/sdn/plugin/runtime.go
@@ -0,0 +1,55 @@
package plugin

import (
"fmt"
"time"

kwait "k8s.io/apimachinery/pkg/util/wait"
kubeletapi "k8s.io/kubernetes/pkg/kubelet/api"
kruntimeapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
kubeletremote "k8s.io/kubernetes/pkg/kubelet/remote"
)

func (node *OsdnNode) getRuntimeService() (kubeletapi.RuntimeService, error) {
if node.runtimeService != nil {
return node.runtimeService, nil
}

// Kubelet starts asynchronously and when we get an Update op, kubelet may not have created runtime endpoint.
// So try couple of times before bailing out (~30 seconds timeout).
err := kwait.ExponentialBackoff(
kwait.Backoff{
Duration: 100 * time.Millisecond,
Factor: 1.2,
Steps: 23,
},
func() (bool, error) {
runtimeService, err := kubeletremote.NewRemoteRuntimeService(node.runtimeEndpoint, node.runtimeRequestTimeout)
if err != nil {
// Wait longer
return false, nil
}
node.runtimeService = runtimeService
return true, nil
})
if err != nil {
return nil, fmt.Errorf("Failed to fetch runtime service: %v", err)
}
return node.runtimeService, nil
}

func (node *OsdnNode) getPodSandboxID(filter *kruntimeapi.PodSandboxFilter) (string, error) {
runtimeService, err := node.getRuntimeService()
if err != nil {
return "", err
}

podSandboxList, err := runtimeService.ListPodSandbox(filter)
if err != nil {
return "", fmt.Errorf("Failed to list pod sandboxes: %v", err)
}
if len(podSandboxList) == 0 {
return "", fmt.Errorf("Pod sandbox not found for filter: %v", filter)
}
return podSandboxList[0].Id, nil
}

0 comments on commit 79b4411

Please sign in to comment.