Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug 1996201: Fixes cases of timed out while waiting for OVS port binding #686

Merged
merged 6 commits into from Sep 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
47 changes: 35 additions & 12 deletions go-controller/pkg/cni/cni.go
Expand Up @@ -77,7 +77,23 @@ func (pr *PodRequest) String() string {
return fmt.Sprintf("[%s/%s %s]", pr.PodNamespace, pr.PodName, pr.SandboxID)
}

func (pr *PodRequest) cmdAdd(podLister corev1listers.PodLister, useOVSExternalIDs bool, kclient kubernetes.Interface) ([]byte, error) {
// checkOrUpdatePodUID validates the given pod UID against the request's existing
// pod UID. If the existing UID is empty the runtime did not support passing UIDs
// and the best we can do is use the given UID for the duration of the request.
// But if the existing UID is valid and does not match the given UID then the
// sandbox request is for a different pod instance and should be terminated.
func (pr *PodRequest) checkOrUpdatePodUID(podUID string) error {
if pr.PodUID == "" {
// Runtime didn't pass UID, use the one we got from the pod object
pr.PodUID = podUID
} else if podUID != pr.PodUID {
// Exit early if the pod was deleted and recreated already
return fmt.Errorf("pod deleted before sandbox %v operation began", pr.Command)
}
return nil
}

func (pr *PodRequest) cmdAdd(kubeAuth *KubeAPIAuth, podLister corev1listers.PodLister, useOVSExternalIDs bool, kclient kubernetes.Interface) ([]byte, error) {
namespace := pr.PodNamespace
podName := pr.PodName
if namespace == "" || podName == "" {
Expand All @@ -97,19 +113,22 @@ func (pr *PodRequest) cmdAdd(podLister corev1listers.PodLister, useOVSExternalID
}
// Get the IP address and MAC address of the pod
// for Smart-Nic, ensure connection-details is present
annotations, err := GetPodAnnotations(pr.ctx, podLister, namespace, podName, annotCondFn)
podUID, annotations, err := GetPodAnnotations(pr.ctx, podLister, kclient, namespace, podName, annotCondFn)
if err != nil {
return nil, fmt.Errorf("failed to get pod annotation: %v", err)
}
if err := pr.checkOrUpdatePodUID(podUID); err != nil {
return nil, err
}

podInterfaceInfo, err := PodAnnotation2PodInfo(annotations, useOVSExternalIDs, pr.IsSmartNIC)
if err != nil {
return nil, err
}

response := &Response{}
response := &Response{KubeAuth: kubeAuth}
if !config.UnprivilegedMode {
response.Result, err = pr.getCNIResult(podInterfaceInfo)
response.Result, err = pr.getCNIResult(podLister, kclient, podInterfaceInfo)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -137,7 +156,7 @@ func (pr *PodRequest) cmdDel() ([]byte, error) {
return []byte{}, nil
}

func (pr *PodRequest) cmdCheck(podLister corev1listers.PodLister, useOVSExternalIDs bool) ([]byte, error) {
func (pr *PodRequest) cmdCheck(podLister corev1listers.PodLister, useOVSExternalIDs bool, kclient kubernetes.Interface) ([]byte, error) {
namespace := pr.PodNamespace
podName := pr.PodName
if namespace == "" || podName == "" {
Expand All @@ -149,10 +168,13 @@ func (pr *PodRequest) cmdCheck(podLister corev1listers.PodLister, useOVSExternal
if pr.IsSmartNIC {
annotCondFn = isSmartNICReady
}
annotations, err := GetPodAnnotations(pr.ctx, podLister, pr.PodNamespace, pr.PodName, annotCondFn)
podUID, annotations, err := GetPodAnnotations(pr.ctx, podLister, kclient, pr.PodNamespace, pr.PodName, annotCondFn)
if err != nil {
return nil, err
}
if err := pr.checkOrUpdatePodUID(podUID); err != nil {
return nil, err
}

if pr.CNIConf.PrevResult != nil {
result, err := current.NewResultFromResult(pr.CNIConf.PrevResult)
Expand All @@ -176,7 +198,8 @@ func (pr *PodRequest) cmdCheck(podLister corev1listers.PodLister, useOVSExternal
}
for _, ip := range result.IPs {
if err = waitForPodInterface(pr.ctx, result.Interfaces[*ip.Interface].Mac, []*net.IPNet{&ip.Address},
hostIfaceName, ifaceID, ofPort, useOVSExternalIDs); err != nil {
hostIfaceName, ifaceID, ofPort, useOVSExternalIDs, podLister, kclient, pr.PodNamespace, pr.PodName,
pr.PodUID); err != nil {
return nil, fmt.Errorf("error while waiting on OVN pod interface: %s ip: %v, error: %v", ifaceID, ip, err)
}
}
Expand Down Expand Up @@ -206,18 +229,18 @@ func (pr *PodRequest) cmdCheck(podLister corev1listers.PodLister, useOVSExternal
// Argument '*PodRequest' encapsulates all the necessary information
// kclient is passed in so that clientset can be reused from the server
// Return value is the actual bytes to be sent back without further processing.
func HandleCNIRequest(request *PodRequest, podLister corev1listers.PodLister, useOVSExternalIDs bool, kclient kubernetes.Interface) ([]byte, error) {
func HandleCNIRequest(request *PodRequest, podLister corev1listers.PodLister, useOVSExternalIDs bool, kclient kubernetes.Interface, kubeAuth *KubeAPIAuth) ([]byte, error) {
var result []byte
var err error

klog.Infof("%s %s starting CNI request %+v", request, request.Command, request)
switch request.Command {
case CNIAdd:
result, err = request.cmdAdd(podLister, useOVSExternalIDs, kclient)
result, err = request.cmdAdd(kubeAuth, podLister, useOVSExternalIDs, kclient)
case CNIDel:
result, err = request.cmdDel()
case CNICheck:
result, err = request.cmdCheck(podLister, useOVSExternalIDs)
result, err = request.cmdCheck(podLister, useOVSExternalIDs, kclient)
default:
}
klog.Infof("%s %s finished CNI request %+v, result %q, err %v", request, request.Command, request, string(result), err)
Expand All @@ -230,8 +253,8 @@ func HandleCNIRequest(request *PodRequest, podLister corev1listers.PodLister, us
}

// getCNIResult get result from pod interface info.
func (pr *PodRequest) getCNIResult(podInterfaceInfo *PodInterfaceInfo) (*current.Result, error) {
interfacesArray, err := pr.ConfigureInterface(pr.PodNamespace, pr.PodName, podInterfaceInfo)
func (pr *PodRequest) getCNIResult(podLister corev1listers.PodLister, kclient kubernetes.Interface, podInterfaceInfo *PodInterfaceInfo) (*current.Result, error) {
interfacesArray, err := pr.ConfigureInterface(podLister, kclient, podInterfaceInfo)
if err != nil {
return nil, fmt.Errorf("failed to configure pod interface: %v", err)
}
Expand Down
104 changes: 28 additions & 76 deletions go-controller/pkg/cni/cniserver.go
Expand Up @@ -2,6 +2,7 @@ package cni

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
Expand All @@ -11,10 +12,8 @@ import (
"time"

"github.com/gorilla/mux"
kapi "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/config"
Expand Down Expand Up @@ -69,13 +68,22 @@ func NewCNIServer(rundir string, useOVSExternalIDs bool, factory factory.NodeWat
Server: http.Server{
Handler: router,
},
rundir: rundir,
useOVSExternalIDs: ovnPortBinding,
podLister: corev1listers.NewPodLister(factory.LocalPodInformer().GetIndexer()),
kclient: kclient,
runningSandboxAdds: make(map[string]*PodRequest),
mode: config.OvnKubeNode.Mode,
rundir: rundir,
useOVSExternalIDs: ovnPortBinding,
podLister: corev1listers.NewPodLister(factory.LocalPodInformer().GetIndexer()),
kclient: kclient,
mode: config.OvnKubeNode.Mode,
kubeAuth: &KubeAPIAuth{
Kubeconfig: config.Kubernetes.Kubeconfig,
KubeAPIServer: config.Kubernetes.APIServer,
KubeAPIToken: config.Kubernetes.Token,
},
}

if len(config.Kubernetes.CAData) > 0 {
s.kubeAuth.KubeCAData = base64.StdEncoding.EncodeToString(config.Kubernetes.CAData)
}

router.NotFoundHandler = http.HandlerFunc(http.NotFound)
router.HandleFunc("/metrics", s.handleCNIMetrics).Methods("POST")
router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -92,43 +100,9 @@ func NewCNIServer(rundir string, useOVSExternalIDs bool, factory factory.NodeWat
}
}).Methods("POST")

factory.AddPodHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
pod := obj.(*kapi.Pod)
s.cancelOldestPodAdd(pod)
},
}, nil)

return s, nil
}

// cancelOldestPodAdd requests that the earliest outstanding add operation for a given
// pod should be canceled.
func (s *Server) cancelOldestPodAdd(pod *kapi.Pod) {
s.runningSandboxAddsLock.Lock()
defer s.runningSandboxAddsLock.Unlock()

oldest := time.Now()
var found *PodRequest

// There may be >= 0 sandboxes for a Pod Namespace+Name. Kubelet defers
// sandbox deletion to GC, and if a pod is deleted and re-created kubelet
// will start a second sandbox for the "new" Pod which has a different UID.
// We only want to cancel the oldest sandbox because it's either the
// only sandbox or has been superceded by a newer request.
for _, req := range s.runningSandboxAdds {
if req.PodNamespace == pod.Namespace && req.PodName == pod.Name && req.timestamp.Before(oldest) {
found = req
oldest = req.timestamp
}
}

if found != nil {
found.cancel()
klog.Infof("%s canceled sandbox ADD request", found)
}
}

// Split the "CNI_ARGS" environment variable's value into a map. CNI_ARGS
// contains arbitrary key/value pairs separated by ';' and is for runtime or
// plugin specific uses. Kubernetes passes the pod namespace and name in
Expand All @@ -150,7 +124,7 @@ func gatherCNIArgs(env map[string]string) (map[string]string, error) {
return mapArgs, nil
}

func cniRequestToPodRequest(cr *Request) (*PodRequest, error) {
func cniRequestToPodRequest(cr *Request, podLister corev1listers.PodLister, kclient kubernetes.Interface) (*PodRequest, error) {
cmd, ok := cr.Env["CNI_COMMAND"]
if !ok {
return nil, fmt.Errorf("unexpected or missing CNI_COMMAND")
Expand Down Expand Up @@ -189,6 +163,15 @@ func cniRequestToPodRequest(cr *Request) (*PodRequest, error) {
return nil, fmt.Errorf("missing K8S_POD_NAME")
}

// UID may not be passed by all runtimes yet. Will be passed
// by CRIO 1.20+ and containerd 1.5+ soon.
// CRIO 1.20: https://github.com/cri-o/cri-o/pull/5029
// CRIO 1.21: https://github.com/cri-o/cri-o/pull/5028
// CRIO 1.22: https://github.com/cri-o/cri-o/pull/5026
// containerd 1.6: https://github.com/containerd/containerd/pull/5640
// containerd 1.5: https://github.com/containerd/containerd/pull/5643
req.PodUID = cniArgs["K8S_POD_UID"]

conf, err := config.ReadCNIConfig(cr.Config)
if err != nil {
return nil, fmt.Errorf("broken stdin args")
Expand All @@ -200,32 +183,6 @@ func cniRequestToPodRequest(cr *Request) (*PodRequest, error) {
return req, nil
}

func (s *Server) startSandboxRequest(req *PodRequest) error {
// Only sandbox add requests are tracked because only adds need
// to be canceled when the pod is deleted. Delete requests should
// be run to completion to clean up anything the earlier add
// already configured.
if req.Command == CNIAdd {
s.runningSandboxAddsLock.Lock()
defer s.runningSandboxAddsLock.Unlock()
if _, ok := s.runningSandboxAdds[req.SandboxID]; ok {
// Should never happen as the runtime is required to
// serialize operations for the same sandbox
return fmt.Errorf("%s ADD already started", req)
}
s.runningSandboxAdds[req.SandboxID] = req
}
return nil
}

func (s *Server) finishSandboxRequest(req *PodRequest) {
if req.Command == CNIAdd {
s.runningSandboxAddsLock.Lock()
defer s.runningSandboxAddsLock.Unlock()
delete(s.runningSandboxAdds, req.SandboxID)
}
}

// Dispatch a pod request to the request handler and return the result to the
// CNI server client
func (s *Server) handleCNIRequest(r *http.Request) ([]byte, error) {
Expand All @@ -234,24 +191,19 @@ func (s *Server) handleCNIRequest(r *http.Request) ([]byte, error) {
if err := json.Unmarshal(b, &cr); err != nil {
return nil, err
}
req, err := cniRequestToPodRequest(&cr)
req, err := cniRequestToPodRequest(&cr, s.podLister, s.kclient)
if err != nil {
return nil, err
}
if s.mode == types.NodeModeSmartNICHost {
req.IsSmartNIC = true
}

if err := s.startSandboxRequest(req); err != nil {
return nil, err
}
defer s.finishSandboxRequest(req)

useOVSExternalIDs := false
if atomic.LoadInt32(&s.useOVSExternalIDs) > 0 {
useOVSExternalIDs = true
}
result, err := s.requestFunc(req, s.podLister, useOVSExternalIDs, s.kclient)
result, err := s.requestFunc(req, s.podLister, useOVSExternalIDs, s.kclient, s.kubeAuth)
if err != nil {
// Prefix error with request information for easier debugging
return nil, fmt.Errorf("%s %v", req, err)
Expand Down