diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 43173fac696..3fcc87d5d7d 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -9,6 +9,7 @@ import ( "golang.org/x/time/rate" corev1 "k8s.io/api/core/v1" + k8sv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -63,6 +64,7 @@ type Controller struct { podsSynced cache.InformerSynced addOrUpdatePodQueue workqueue.RateLimitingInterface deletePodQueue workqueue.RateLimitingInterface + deletingPodObjMap map[string]*k8sv1.Pod updatePodSecurityQueue workqueue.RateLimitingInterface podKeyMutex keymutex.KeyMutex @@ -394,6 +396,7 @@ func Run(ctx context.Context, config *Configuration) { workqueue.NewNamedDelayingQueue("DeletePod"), workqueue.DefaultControllerRateLimiter(), ), + deletingPodObjMap: make(map[string]*k8sv1.Pod), updatePodSecurityQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdatePodSecurity"), podKeyMutex: keymutex.NewHashed(numKeyLocks), diff --git a/pkg/controller/pod.go b/pkg/controller/pod.go index 991620b0d13..78fd98f7f19 100644 --- a/pkg/controller/pod.go +++ b/pkg/controller/pod.go @@ -150,6 +150,10 @@ func isPodAlive(p *v1.Pod) bool { return false } } + return isPodStatusPhaseAlive(p) +} + +func isPodStatusPhaseAlive(p *v1.Pod) bool { if p.Status.Phase == v1.PodSucceeded && p.Spec.RestartPolicy != v1.RestartPolicyAlways { return false } @@ -194,15 +198,18 @@ func (c *Controller) enqueueAddPod(obj interface{}) { if isStateful || (isVmPod && c.config.EnableKeepVmIP) { if isStateful && isStatefulSetPodToDel(c.config.KubeClient, p, statefulSetName) { klog.V(3).Infof("enqueue delete pod %s", key) - c.deletePodQueue.Add(obj) + c.deletingPodObjMap[key] = p + c.deletePodQueue.Add(key) } if isVmPod && c.isVmPodToDel(p, vmName) { klog.V(3).Infof("enqueue delete pod %s", key) - c.deletePodQueue.Add(obj) + c.deletingPodObjMap[key] = p + c.deletePodQueue.Add(key) } } else { klog.V(3).Infof("enqueue delete pod %s", key) - c.deletePodQueue.Add(obj) + c.deletingPodObjMap[key] = p + c.deletePodQueue.Add(key) } return } @@ -232,7 +239,8 @@ func (c *Controller) enqueueDeletePod(obj interface{}) { } klog.V(3).Infof("enqueue delete pod %s", key) - c.deletePodQueue.Add(obj) + c.deletingPodObjMap[key] = p + c.deletePodQueue.Add(key) } func (c *Controller) enqueueUpdatePod(oldObj, newObj interface{}) { @@ -283,9 +291,10 @@ func (c *Controller) enqueueUpdatePod(oldObj, newObj interface{}) { isStateful, statefulSetName := isStatefulSetPod(newPod) isVmPod, vmName := isVmPod(newPod) - if !isPodAlive(newPod) && !isStateful && !isVmPod { + if !isPodStatusPhaseAlive(newPod) && !isStateful && !isVmPod { klog.V(3).Infof("enqueue delete pod %s", key) - c.deletePodQueue.Add(newObj) + c.deletingPodObjMap[key] = newPod + c.deletePodQueue.Add(key) return } @@ -304,7 +313,8 @@ func (c *Controller) enqueueUpdatePod(oldObj, newObj interface{}) { // In case node get lost and pod can not be deleted, // the ip address will not be recycled klog.V(3).Infof("enqueue delete pod %s after %v", key, delay) - c.deletePodQueue.AddAfter(newObj, delay) + c.deletingPodObjMap[key] = newPod + c.deletePodQueue.AddAfter(key, delay) }() return } @@ -313,14 +323,16 @@ func (c *Controller) enqueueUpdatePod(oldObj, newObj interface{}) { if isStateful && isStatefulSetPodToDel(c.config.KubeClient, newPod, statefulSetName) { go func() { klog.V(3).Infof("enqueue delete pod %s after %v", key, delay) - c.deletePodQueue.AddAfter(newObj, delay) + c.deletingPodObjMap[key] = newPod + c.deletePodQueue.AddAfter(key, delay) }() return } if isVmPod && c.isVmPodToDel(newPod, vmName) { go func() { klog.V(3).Infof("enqueue delete pod %s after %v", key, delay) - c.deletePodQueue.AddAfter(newObj, delay) + c.deletingPodObjMap[key] = newPod + c.deletePodQueue.AddAfter(key, delay) }() return } @@ -400,20 +412,30 @@ func (c *Controller) processNextDeletePodWorkItem() bool { now := time.Now() err := func(obj interface{}) error { defer c.deletePodQueue.Done(obj) - var pod *v1.Pod + var key string var ok bool - if pod, ok = obj.(*v1.Pod); !ok { + if key, ok = obj.(string); !ok { c.deletePodQueue.Forget(obj) - utilruntime.HandleError(fmt.Errorf("expected pod in workqueue but got %#v", obj)) + utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) + return nil + } + _, exist := c.deletingPodObjMap[key] + if !exist { return nil } - if err := c.handleDeletePod(pod); err != nil { - c.deletePodQueue.AddRateLimited(obj) - return fmt.Errorf("error syncing '%s': %s, requeuing", pod.Name, err.Error()) + + if err := c.handleDeletePod(key); err != nil { + c.deletePodQueue.AddRateLimited(key) + return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) } c.deletePodQueue.Forget(obj) last := time.Since(now) - klog.Infof("take %d ms to handle delete pod %s/%s", last.Milliseconds(), pod.Namespace, pod.Name) + klog.Infof("take %d ms to handle delete pod %s", last.Milliseconds(), key) + // gc pod obj in c.deletingPodObjMap + go func() { + time.Sleep(5 * time.Minute) + delete(c.deletingPodObjMap, key) + }() return nil }(obj) @@ -685,7 +707,9 @@ func (c *Controller) reconcileAllocateSubnets(cachedPod, pod *v1.Pod, needAlloca if k8serrors.IsNotFound(err) { // Sometimes pod is deleted between kube-ovn configure ovn-nb and patch pod. // Then we need to recycle the resource again. - c.deletePodQueue.AddRateLimited(pod) + key := strings.Join([]string{namespace, name}, "/") + c.deletingPodObjMap[key] = pod + c.deletePodQueue.AddRateLimited(key) return nil, nil } klog.Errorf("patch pod %s/%s failed: %v", name, namespace, err) @@ -829,7 +853,9 @@ func (c *Controller) reconcileRouteSubnets(cachedPod, pod *v1.Pod, needRoutePodN if k8serrors.IsNotFound(err) { // Sometimes pod is deleted between kube-ovn configure ovn-nb and patch pod. // Then we need to recycle the resource again. - c.deletePodQueue.AddRateLimited(pod) + key := strings.Join([]string{namespace, name}, "/") + c.deletingPodObjMap[key] = pod + c.deletePodQueue.AddRateLimited(key) return nil } klog.Errorf("patch pod %s/%s failed %v", name, namespace, err) @@ -838,9 +864,9 @@ func (c *Controller) reconcileRouteSubnets(cachedPod, pod *v1.Pod, needRoutePodN return nil } -func (c *Controller) handleDeletePod(pod *v1.Pod) error { +func (c *Controller) handleDeletePod(key string) error { + pod := c.deletingPodObjMap[key] podName := c.getNameByPod(pod) - key := fmt.Sprintf("%s/%s", pod.Namespace, podName) c.podKeyMutex.LockKey(key) defer func() { _ = c.podKeyMutex.UnlockKey(key) }() klog.Infof("handle delete pod %s", key)