Skip to content

Commit

Permalink
Merge pull request #2251 from dcbw/cni-wait-for-pods
Browse files Browse the repository at this point in the history
cni: wait up to 1 second for pods to appear in the API
  • Loading branch information
dcbw committed Jun 8, 2021
2 parents fcadd18 + adaed36 commit 7bd21ac
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 27 deletions.
8 changes: 4 additions & 4 deletions go-controller/pkg/cni/cni.go
Expand Up @@ -97,7 +97,7 @@ func (pr *PodRequest) cmdAdd(podLister corev1listers.PodLister, useOVSExternalID
}
// Get the IP address and MAC address of the pod
// for Smart-Nic, ensure connection-details is present
annotations, err := GetPodAnnotations(pr.ctx, podLister, namespace, podName, annotCondFn)
annotations, err := GetPodAnnotations(pr.ctx, podLister, kclient, namespace, podName, annotCondFn)
if err != nil {
return nil, fmt.Errorf("failed to get pod annotation: %v", err)
}
Expand Down Expand Up @@ -137,7 +137,7 @@ func (pr *PodRequest) cmdDel() ([]byte, error) {
return []byte{}, nil
}

func (pr *PodRequest) cmdCheck(podLister corev1listers.PodLister, useOVSExternalIDs bool) ([]byte, error) {
func (pr *PodRequest) cmdCheck(podLister corev1listers.PodLister, useOVSExternalIDs bool, kclient kubernetes.Interface) ([]byte, error) {
namespace := pr.PodNamespace
podName := pr.PodName
if namespace == "" || podName == "" {
Expand All @@ -149,7 +149,7 @@ func (pr *PodRequest) cmdCheck(podLister corev1listers.PodLister, useOVSExternal
if pr.IsSmartNIC {
annotCondFn = isSmartNICReady
}
annotations, err := GetPodAnnotations(pr.ctx, podLister, pr.PodNamespace, pr.PodName, annotCondFn)
annotations, err := GetPodAnnotations(pr.ctx, podLister, kclient, pr.PodNamespace, pr.PodName, annotCondFn)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -217,7 +217,7 @@ func HandleCNIRequest(request *PodRequest, podLister corev1listers.PodLister, us
case CNIDel:
result, err = request.cmdDel()
case CNICheck:
result, err = request.cmdCheck(podLister, useOVSExternalIDs)
result, err = request.cmdCheck(podLister, useOVSExternalIDs, kclient)
default:
}
klog.Infof("%s %s finished CNI request %+v, result %q, err %v", request, request.Command, request, string(result), err)
Expand Down
41 changes: 34 additions & 7 deletions go-controller/pkg/cni/utils.go
Expand Up @@ -8,6 +8,11 @@ import (

"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/config"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util"

kapi "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
corev1listers "k8s.io/client-go/listers/core/v1"
)

Expand Down Expand Up @@ -37,8 +42,20 @@ func isSmartNICReady(podAnnotation map[string]string) bool {
return false
}

// getPod returns a pod from the informer cache or (if that fails) the apiserver
func getPod(podLister corev1listers.PodLister, kclient kubernetes.Interface, namespace, name string) (*kapi.Pod, error) {
pod, err := podLister.Pods(namespace).Get(name)
if apierrors.IsNotFound(err) {
// If the pod wasn't in our local cache, ask for it directly
pod, err = kclient.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{})
}
return pod, err
}

// GetPodAnnotations obtains the pod annotation from the cache
func GetPodAnnotations(ctx context.Context, podLister corev1listers.PodLister, namespace, name string, annotCond podAnnotWaitCond) (map[string]string, error) {
func GetPodAnnotations(ctx context.Context, podLister corev1listers.PodLister, kclient kubernetes.Interface, namespace, name string, annotCond podAnnotWaitCond) (map[string]string, error) {
var notFoundCount uint

timeout := time.After(30 * time.Second)
for {
select {
Expand All @@ -47,14 +64,24 @@ func GetPodAnnotations(ctx context.Context, podLister corev1listers.PodLister, n
case <-timeout:
return nil, fmt.Errorf("timed out waiting for annotations")
default:
pod, err := podLister.Pods(namespace).Get(name)
pod, err := getPod(podLister, kclient, namespace, name)
if err != nil {
return nil, fmt.Errorf("failed to get annotations: %v", err)
}
annotations := pod.ObjectMeta.Annotations
if annotCond(annotations) {
return annotations, nil
if !apierrors.IsNotFound(err) {
return nil, fmt.Errorf("failed to get pod for annotations: %v", err)
}
// Allow up to 1 second for pod to be found
notFoundCount++
if notFoundCount >= 5 {
return nil, fmt.Errorf("timed out waiting for pod after 1s: %v", err)
}
// drop through to try again
} else if pod != nil {
annotations := pod.ObjectMeta.Annotations
if annotCond(annotations) {
return annotations, nil
}
}

// try again later
time.Sleep(200 * time.Millisecond)
}
Expand Down
91 changes: 75 additions & 16 deletions go-controller/pkg/cni/utils_test.go
Expand Up @@ -9,19 +9,41 @@ import (
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util"
"github.com/stretchr/testify/mock"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/fake"
"time"
)

func newPod(namespace, name string, annotations map[string]string) *v1.Pod {
if annotations == nil {
annotations = make(map[string]string)
}
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
UID: types.UID(name),
Namespace: namespace,
Annotations: annotations,
},
}
}

func newFakeKubeClientWithPod(pod *v1.Pod) *fake.Clientset {
return fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*pod}})
}

var _ = Describe("CNI Utils tests", func() {
Context("isOvnReady", func() {
It("Returns true if OVN pod network annotation exists", func() {
podAnnot := map[string]string{util.OvnPodAnnotationName: `{
"default":{"ip_addresses":["192.168.2.3/24"],
"mac_address":"0a:58:c0:a8:02:03",
"gateway_ips":["192.168.2.1"],
"ip_address":"192.168.2.3/24",
"gateway_ip":"192.168.2.1"}
}`}
"default":{"ip_addresses":["192.168.2.3/24"],
"mac_address":"0a:58:c0:a8:02:03",
"gateway_ips":["192.168.2.1"],
"ip_address":"192.168.2.3/24",
"gateway_ip":"192.168.2.1"}
}`}
Expect(isOvnReady(podAnnot)).To(Equal(true))
})

Expand Down Expand Up @@ -67,14 +89,13 @@ var _ = Describe("CNI Utils tests", func() {
Context("GetPodAnnotations", func() {
var podLister mocks.PodLister
var podNamespaceLister mocks.PodNamespaceLister
var pod v1.Pod
var pod *v1.Pod

BeforeEach(func() {
podNamespaceLister = mocks.PodNamespaceLister{}
pod = v1.Pod{}
pod = newPod("some-ns", "some-pod", nil)
podLister = mocks.PodLister{}
podLister.On("Pods", mock.AnythingOfType("string")).Return(&podNamespaceLister)

})

It("Returns Pod annotation if annotation condition is met", func() {
Expand All @@ -90,8 +111,9 @@ var _ = Describe("CNI Utils tests", func() {
return false
}

podNamespaceLister.On("Get", mock.AnythingOfType("string")).Return(&pod, nil)
annot, err := GetPodAnnotations(ctx, &podLister, "some-ns", "some-pod", cond)
fakeClient := newFakeKubeClientWithPod(pod)
podNamespaceLister.On("Get", mock.AnythingOfType("string")).Return(pod, nil)
annot, err := GetPodAnnotations(ctx, &podLister, fakeClient, "some-ns", "some-pod", cond)
Expect(err).ToNot(HaveOccurred())
Expect(annot).To(Equal(podAnnot))
})
Expand All @@ -108,8 +130,9 @@ var _ = Describe("CNI Utils tests", func() {
cancelFunc()
}()

podNamespaceLister.On("Get", mock.AnythingOfType("string")).Return(&pod, nil)
_, err := GetPodAnnotations(ctx, &podLister, "some-ns", "some-pod", cond)
fakeClient := newFakeKubeClientWithPod(pod)
podNamespaceLister.On("Get", mock.AnythingOfType("string")).Return(pod, nil)
_, err := GetPodAnnotations(ctx, &podLister, fakeClient, "some-ns", "some-pod", cond)
Expect(err).To(HaveOccurred())
})

Expand All @@ -126,8 +149,9 @@ var _ = Describe("CNI Utils tests", func() {
return false
}

podNamespaceLister.On("Get", mock.AnythingOfType("string")).Return(&pod, nil)
_, err := GetPodAnnotations(ctx, &podLister, "some-ns", "some-pod", cond)
fakeClient := newFakeKubeClientWithPod(pod)
podNamespaceLister.On("Get", mock.AnythingOfType("string")).Return(pod, nil)
_, err := GetPodAnnotations(ctx, &podLister, fakeClient, "some-ns", "some-pod", cond)
Expect(err).ToNot(HaveOccurred())
})

Expand All @@ -139,11 +163,46 @@ var _ = Describe("CNI Utils tests", func() {
return false
}

fakeClient := newFakeKubeClientWithPod(pod)
podNamespaceLister.On("Get", mock.AnythingOfType("string")).Return(nil, fmt.Errorf("failed to list pods"))
_, err := GetPodAnnotations(ctx, &podLister, "some-ns", "some-pod", cond)
_, err := GetPodAnnotations(ctx, &podLister, fakeClient, "some-ns", "some-pod", cond)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("failed to list pods"))
})

It("Tries kube client if PodLister can't find the pod", func() {
ctx, cancelFunc := context.WithTimeout(context.Background(), 1*time.Second)
defer cancelFunc()

calledOnce := false
cond := func(podAnnotation map[string]string) bool {
if calledOnce {
return true
}
calledOnce = true
return false
}

fakeClient := newFakeKubeClientWithPod(pod)
podNamespaceLister.On("Get", mock.AnythingOfType("string")).Return(nil, errors.NewNotFound(v1.Resource("pod"), name))
_, err := GetPodAnnotations(ctx, &podLister, fakeClient, "some-ns", "some-pod", cond)
Expect(err).ToNot(HaveOccurred())
})

It("Returns an error if PodLister and kube client can't find the pod", func() {
ctx, cancelFunc := context.WithTimeout(context.Background(), 1*time.Second)
defer cancelFunc()

cond := func(podAnnotation map[string]string) bool {
return false
}

fakeClient := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{}})
podNamespaceLister.On("Get", mock.AnythingOfType("string")).Return(nil, errors.NewNotFound(v1.Resource("pod"), name))
_, err := GetPodAnnotations(ctx, &podLister, fakeClient, "some-ns", "some-pod", cond)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("timed out waiting for pod after 1s"))
})
})

Context("PodAnnotation2PodInfo", func() {
Expand Down

0 comments on commit 7bd21ac

Please sign in to comment.