Skip to content
Permalink
Browse files

Caches all pod delegates json for pods deletion without k8s info

This fixes #243 with following changes:
 + Optimize to fetch Pod from k8s client
 + Change to use cache always in DEL.
 + If failed to fetch the pod info from k8s clinet in deletion,
  use cached delegates as emergency bailout
 + Add test cases for cache
  • Loading branch information...
s1061123 committed Jan 25, 2019
1 parent f0a43ca commit 5dc774a5471acfa8a6c43deb4750ff4730ce1fc0
Showing with 375 additions and 118 deletions.
  1. +50 −63 k8sclient/k8sclient.go
  2. +31 −9 k8sclient/k8sclient_test.go
  3. +56 −40 multus/multus.go
  4. +233 −6 multus/multus_test.go
  5. +5 −0 testing/testing.go
@@ -37,8 +37,9 @@ import (
)

const (
resourceNameAnnot = "k8s.v1.cni.cncf.io/resourceName"
defaultNetAnnot = "v1.multus-cni.io/default-network"
resourceNameAnnot = "k8s.v1.cni.cncf.io/resourceName"
defaultNetAnnot = "v1.multus-cni.io/default-network"
networkAttachmentAnnot = "k8s.v1.cni.cncf.io/networks"
)

// NoK8sNetworkError indicates error, no network in kubernetes
@@ -80,12 +81,14 @@ func setKubeClientInfo(c *clientInfo, client KubeClient, k8sArgs *types.K8sArgs)
c.Podname = string(k8sArgs.K8S_POD_NAME)
}

func SetNetworkStatus(k *clientInfo, netStatus []*types.NetworkStatus) error {
func SetNetworkStatus(client KubeClient, k8sArgs *types.K8sArgs, netStatus []*types.NetworkStatus) error {

logging.Debugf("SetNetworkStatus: %v, %v", k, netStatus)
pod, err := k.Client.GetPod(k.Podnamespace, k.Podname)
logging.Debugf("SetNetworkStatus: %v, %v, %v", client, k8sArgs, netStatus)
podName := string(k8sArgs.K8S_POD_NAME)
podNamespace := string(k8sArgs.K8S_POD_NAMESPACE)
pod, err := client.GetPod(podNamespace, podName)
if err != nil {
return logging.Errorf("SetNetworkStatus: failed to query the pod %v in out of cluster comm: %v", k.Podname, err)
return logging.Errorf("SetNetworkStatus: failed to query the pod %v in out of cluster comm: %v", podName, err)
}

var networkStatuses string
@@ -101,9 +104,9 @@ func SetNetworkStatus(k *clientInfo, netStatus []*types.NetworkStatus) error {

networkStatuses = fmt.Sprintf("[%s]", strings.Join(networkStatus, ","))
}
_, err = setPodNetworkAnnotation(k.Client, k.Podnamespace, pod, networkStatuses)
_, err = setPodNetworkAnnotation(client, podNamespace, pod, networkStatuses)
if err != nil {
return logging.Errorf("SetNetworkStatus: failed to update the pod %v in out of cluster comm: %v", k.Podname, err)
return logging.Errorf("SetNetworkStatus: failed to update the pod %v in out of cluster comm: %v", podName, err)
}

return nil
@@ -137,18 +140,6 @@ func setPodNetworkAnnotation(client KubeClient, namespace string, pod *v1.Pod, n
return pod, nil
}

func getPodNetworkAnnotation(client KubeClient, k8sArgs *types.K8sArgs) (string, string, string, error) {
var err error

logging.Debugf("getPodNetworkAnnotation: %v, %v", client, k8sArgs)
pod, err := client.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME))
if err != nil {
return "", "", "", logging.Errorf("getPodNetworkAnnotation: failed to query the pod %v in out of cluster comm: %v", string(k8sArgs.K8S_POD_NAME), err)
}

return pod.Annotations["k8s.v1.cni.cncf.io/networks"], pod.ObjectMeta.Namespace, string(pod.UID), nil
}

func parsePodNetworkObjectName(podnetwork string) (string, string, string, error) {
var netNsName string
var netIfName string
@@ -427,8 +418,14 @@ func TryLoadPodDelegates(k8sArgs *types.K8sArgs, conf *types.NetConf, kubeClient
}

setKubeClientInfo(clientInfo, kubeClient, k8sArgs)
// Get the pod info. If cannot get it, we use cached delegates
pod, err := kubeClient.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME))
if err != nil {
logging.Debugf("tryLoadK8sDelegates: Err in loading K8s cluster default network from pod annotation: %v, use cached delegates", err)
return 0, nil, nil
}

delegate, err := tryLoadK8sPodDefaultNetwork(k8sArgs, conf, kubeClient)
delegate, err := tryLoadK8sPodDefaultNetwork(kubeClient, pod, conf)
if err != nil {
return 0, nil, logging.Errorf("tryLoadK8sDelegates: Err in loading K8s cluster default network from pod annotation: %v", err)
}
@@ -438,19 +435,23 @@ func TryLoadPodDelegates(k8sArgs *types.K8sArgs, conf *types.NetConf, kubeClient
conf.Delegates[0] = delegate
}

delegates, err := GetPodNetwork(kubeClient, k8sArgs, conf.ConfDir, conf.NamespaceIsolation)
if err != nil {
if _, ok := err.(*NoK8sNetworkError); ok {
return 0, clientInfo, nil
networks, err := GetPodNetwork(pod)
if networks != nil {
delegates, err := GetNetworkDelegates(kubeClient, pod, networks, conf.ConfDir, conf.NamespaceIsolation)

if err != nil {
if _, ok := err.(*NoK8sNetworkError); ok {
return 0, clientInfo, nil
}
return 0, nil, logging.Errorf("Multus: Err in getting k8s network from pod: %v", err)
}
return 0, nil, logging.Errorf("Multus: Err in getting k8s network from pod: %v", err)
}

if err = conf.AddDelegates(delegates); err != nil {
return 0, nil, err
if err = conf.AddDelegates(delegates); err != nil {
return 0, nil, err
}
return len(delegates), clientInfo, nil
}

return len(delegates), clientInfo, nil
return 0, clientInfo, nil
}

func GetK8sClient(kubeconfig string, kubeClient KubeClient) (KubeClient, error) {
@@ -495,17 +496,11 @@ func GetK8sClient(kubeconfig string, kubeClient KubeClient) (KubeClient, error)
return &defaultKubeClient{client: client}, nil
}

func GetPodNetwork(k8sclient KubeClient, k8sArgs *types.K8sArgs, confdir string, confnamespaceisolation bool) ([]*types.DelegateNetConf, error) {
logging.Debugf("GetPodNetwork: %v, %v, %v", k8sclient, k8sArgs, confdir)
func GetPodNetwork(pod *v1.Pod) ([]*types.NetworkSelectionElement, error) {
logging.Debugf("GetPodNetwork: %v", pod)

netAnnot, defaultNamespace, podID, err := getPodNetworkAnnotation(k8sclient, k8sArgs)
if err != nil {
return nil, err
}

if err != nil {
return nil, logging.Errorf("GetK8sNetwork: failed to get resourceMap for PodUID: %v %v", podID, err)
}
netAnnot := pod.Annotations[networkAttachmentAnnot]
defaultNamespace := pod.ObjectMeta.Namespace

if len(netAnnot) == 0 {
return nil, &NoK8sNetworkError{"no kubernetes network found"}
@@ -515,24 +510,31 @@ func GetPodNetwork(k8sclient KubeClient, k8sArgs *types.K8sArgs, confdir string,
if err != nil {
return nil, err
}
return networks, nil
}

func GetNetworkDelegates(k8sclient KubeClient, pod *v1.Pod, networks []*types.NetworkSelectionElement, confdir string, confnamespaceIsolation bool) ([]*types.DelegateNetConf, error) {
logging.Debugf("GetNetworkDelegates: %v, %v, %v, %v, %v", k8sclient, pod, networks, confdir, confnamespaceIsolation)
// resourceMap holds Pod device allocation information; only initizized if CRD contains 'resourceName' annotation.
// This will only be initialized once and all delegate objects can reference this to look up device info.
var resourceMap map[string]*types.ResourceInfo

// Read all network objects referenced by 'networks'
var delegates []*types.DelegateNetConf
defaultNamespace := pod.ObjectMeta.Namespace

podID := pod.UID
for _, net := range networks {

// The pods namespace (stored as defaultNamespace, does not equal the annotation's target namespace in net.Namespace)
// In the case that this is a mismatch when namespaceisolation is enabled, this should be an error.
if confnamespaceisolation {
if confnamespaceIsolation {
if defaultNamespace != net.Namespace {
return nil, logging.Errorf("GetPodNetwork: namespace isolation violation: podnamespace: %v / target namespace: %v", defaultNamespace, net.Namespace)
}
}

delegate, updatedResourceMap, err := getKubernetesDelegate(k8sclient, net, confdir, podID, resourceMap)
delegate, updatedResourceMap, err := getKubernetesDelegate(k8sclient, net, confdir, string(podID), resourceMap)
if err != nil {
return nil, logging.Errorf("GetPodNetwork: failed getting the delegate: %v", err)
}
@@ -655,28 +657,13 @@ func GetDefaultNetworks(k8sArgs *types.K8sArgs, conf *types.NetConf, kubeClient
return nil
}

func getPodDefaultNetworkAnnotation(client KubeClient, k8sArgs *types.K8sArgs) (string, error) {
logging.Debugf("getPodDefaultNetworkAnnotation: %v, %v", client, k8sArgs)
pod, err := client.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME))
if err != nil {
return "", logging.Errorf("getPodDefaultNetworkAnnotation: failed to query the pod %v in out of cluster comm: %v", string(k8sArgs.K8S_POD_NAME), err)
}

if v, ok := pod.Annotations[defaultNetAnnot]; ok {
return v, nil
}
return "", nil
}

// tryLoadK8sPodDefaultNetwork get pod default network from annotations
func tryLoadK8sPodDefaultNetwork(k8sArgs *types.K8sArgs, conf *types.NetConf, kubeClient KubeClient) (*types.DelegateNetConf, error) {
logging.Debugf("tryLoadK8sPodDefaultNetwork: %v, %v", kubeClient, k8sArgs)
func tryLoadK8sPodDefaultNetwork(kubeClient KubeClient, pod *v1.Pod, conf *types.NetConf) (*types.DelegateNetConf, error) {
var netAnnot string
logging.Debugf("tryLoadK8sPodDefaultNetwork: %v, %v, %v", kubeClient, pod, conf)

netAnnot, err := getPodDefaultNetworkAnnotation(kubeClient, k8sArgs)
if err != nil {
return nil, logging.Errorf("tryLoadK8sPodDefaultNetwork: failed to get pod annotation: %v", err)
}
if netAnnot == "" {
netAnnot, ok := pod.Annotations[defaultNetAnnot]
if !ok {
logging.Debugf("tryLoadK8sPodDefaultNetwork: Pod default network annotation is not defined")
return nil, nil
}
@@ -82,7 +82,10 @@ var _ = Describe("k8sclient operations", func() {
Expect(err).NotTo(HaveOccurred())
k8sArgs, err := GetK8sArgs(args)
Expect(err).NotTo(HaveOccurred())
delegates, err := GetPodNetwork(kubeClient, k8sArgs, tmpDir, false)
pod, err := kubeClient.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME))
networks, err := GetPodNetwork(pod)
Expect(err).NotTo(HaveOccurred())
delegates, err := GetNetworkDelegates(kubeClient, pod, networks, tmpDir, false)
Expect(err).NotTo(HaveOccurred())
Expect(fKubeClient.PodCount).To(Equal(1))
Expect(fKubeClient.NetCount).To(Equal(2))
@@ -115,7 +118,10 @@ var _ = Describe("k8sclient operations", func() {
Expect(err).NotTo(HaveOccurred())
k8sArgs, err := GetK8sArgs(args)
Expect(err).NotTo(HaveOccurred())
delegates, err := GetPodNetwork(kubeClient, k8sArgs, tmpDir, false)
pod, err := kubeClient.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME))
networks, err := GetPodNetwork(pod)
Expect(err).NotTo(HaveOccurred())
delegates, err := GetNetworkDelegates(kubeClient, pod, networks, tmpDir, false)
Expect(len(delegates)).To(Equal(0))
Expect(err).To(MatchError("GetPodNetwork: failed getting the delegate: getKubernetesDelegate: failed to get network resource, refer Multus README.md for the usage guide: resource not found"))
})
@@ -159,7 +165,10 @@ var _ = Describe("k8sclient operations", func() {
Expect(err).NotTo(HaveOccurred())
k8sArgs, err := GetK8sArgs(args)
Expect(err).NotTo(HaveOccurred())
delegates, err := GetPodNetwork(kubeClient, k8sArgs, tmpDir, false)
pod, err := kubeClient.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME))
networks, err := GetPodNetwork(pod)
Expect(err).NotTo(HaveOccurred())
delegates, err := GetNetworkDelegates(kubeClient, pod, networks, tmpDir, false)
Expect(err).NotTo(HaveOccurred())
Expect(fKubeClient.PodCount).To(Equal(1))
Expect(fKubeClient.NetCount).To(Equal(3))
@@ -186,8 +195,9 @@ var _ = Describe("k8sclient operations", func() {
Expect(err).NotTo(HaveOccurred())
k8sArgs, err := GetK8sArgs(args)
Expect(err).NotTo(HaveOccurred())
delegates, err := GetPodNetwork(kubeClient, k8sArgs, tmpDir, false)
Expect(len(delegates)).To(Equal(0))
pod, err := kubeClient.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME))
networks, err := GetPodNetwork(pod)
Expect(len(networks)).To(Equal(0))
Expect(err).To(MatchError("parsePodNetworkAnnotation: failed to parse pod Network Attachment Selection Annotation JSON format: invalid character 'a' looking for beginning of value"))
})

@@ -216,7 +226,10 @@ var _ = Describe("k8sclient operations", func() {
Expect(err).NotTo(HaveOccurred())
k8sArgs, err := GetK8sArgs(args)
Expect(err).NotTo(HaveOccurred())
delegates, err := GetPodNetwork(kubeClient, k8sArgs, tmpDir, false)
pod, err := kubeClient.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME))
networks, err := GetPodNetwork(pod)
Expect(err).NotTo(HaveOccurred())
delegates, err := GetNetworkDelegates(kubeClient, pod, networks, tmpDir, false)
Expect(err).NotTo(HaveOccurred())
Expect(fKubeClient.PodCount).To(Equal(1))
Expect(fKubeClient.NetCount).To(Equal(2))
@@ -242,7 +255,10 @@ var _ = Describe("k8sclient operations", func() {
Expect(err).NotTo(HaveOccurred())
k8sArgs, err := GetK8sArgs(args)
Expect(err).NotTo(HaveOccurred())
delegates, err := GetPodNetwork(kubeClient, k8sArgs, tmpDir, false)
pod, err := kubeClient.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME))
networks, err := GetPodNetwork(pod)
Expect(err).NotTo(HaveOccurred())
delegates, err := GetNetworkDelegates(kubeClient, pod, networks, tmpDir, false)
Expect(err).NotTo(HaveOccurred())
Expect(fKubeClient.PodCount).To(Equal(1))
Expect(fKubeClient.NetCount).To(Equal(1))
@@ -273,7 +289,10 @@ var _ = Describe("k8sclient operations", func() {
Expect(err).NotTo(HaveOccurred())
k8sArgs, err := GetK8sArgs(args)
Expect(err).NotTo(HaveOccurred())
delegates, err := GetPodNetwork(kubeClient, k8sArgs, tmpDir, false)
pod, err := kubeClient.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME))
networks, err := GetPodNetwork(pod)
Expect(err).NotTo(HaveOccurred())
delegates, err := GetNetworkDelegates(kubeClient, pod, networks, tmpDir, false)
Expect(len(delegates)).To(Equal(0))
Expect(err).To(MatchError(fmt.Sprintf("GetPodNetwork: failed getting the delegate: cniConfigFromNetworkResource: err in getCNIConfigFromFile: Error loading CNI config file %s: error parsing configuration: invalid character 'a' looking for beginning of value", net2Name)))
})
@@ -584,7 +603,10 @@ var _ = Describe("k8sclient operations", func() {
k8sArgs, err := GetK8sArgs(args)
Expect(err).NotTo(HaveOccurred())

_, err = GetPodNetwork(kubeClient, k8sArgs, tmpDir, netConf.NamespaceIsolation)
pod, err := kubeClient.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME))
networks, err := GetPodNetwork(pod)
Expect(err).NotTo(HaveOccurred())
_, err = GetNetworkDelegates(kubeClient, pod, networks, tmpDir, netConf.NamespaceIsolation)
Expect(err).To(HaveOccurred())
Expect(err).To(MatchError("GetPodNetwork: namespace isolation violation: podnamespace: test / target namespace: kube-system"))

Oops, something went wrong.

0 comments on commit 5dc774a

Please sign in to comment.
You can’t perform that action at this time.