Skip to content

Commit

Permalink
fix handedeletePod repeat 4 times (#2789)
Browse files Browse the repository at this point in the history
* fix handedeletePod repeat 4 times

* refactor
  • Loading branch information
changluyi committed May 11, 2023
1 parent c8af3dd commit 8f43028
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 20 deletions.
3 changes: 3 additions & 0 deletions pkg/controller/controller.go
Expand Up @@ -9,6 +9,7 @@ import (

"golang.org/x/time/rate"
corev1 "k8s.io/api/core/v1"
k8sv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand Down Expand Up @@ -63,6 +64,7 @@ type Controller struct {
podsSynced cache.InformerSynced
addOrUpdatePodQueue workqueue.RateLimitingInterface
deletePodQueue workqueue.RateLimitingInterface
deletingPodObjMap map[string]*k8sv1.Pod
updatePodSecurityQueue workqueue.RateLimitingInterface
podKeyMutex keymutex.KeyMutex

Expand Down Expand Up @@ -394,6 +396,7 @@ func Run(ctx context.Context, config *Configuration) {
workqueue.NewNamedDelayingQueue("DeletePod"),
workqueue.DefaultControllerRateLimiter(),
),
deletingPodObjMap: make(map[string]*k8sv1.Pod),
updatePodSecurityQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdatePodSecurity"),
podKeyMutex: keymutex.NewHashed(numKeyLocks),

Expand Down
66 changes: 46 additions & 20 deletions pkg/controller/pod.go
Expand Up @@ -150,6 +150,10 @@ func isPodAlive(p *v1.Pod) bool {
return false
}
}
return isPodStatusPhaseAlive(p)
}

func isPodStatusPhaseAlive(p *v1.Pod) bool {
if p.Status.Phase == v1.PodSucceeded && p.Spec.RestartPolicy != v1.RestartPolicyAlways {
return false
}
Expand Down Expand Up @@ -194,15 +198,18 @@ func (c *Controller) enqueueAddPod(obj interface{}) {
if isStateful || (isVmPod && c.config.EnableKeepVmIP) {
if isStateful && isStatefulSetPodToDel(c.config.KubeClient, p, statefulSetName) {
klog.V(3).Infof("enqueue delete pod %s", key)
c.deletePodQueue.Add(obj)
c.deletingPodObjMap[key] = p
c.deletePodQueue.Add(key)
}
if isVmPod && c.isVmPodToDel(p, vmName) {
klog.V(3).Infof("enqueue delete pod %s", key)
c.deletePodQueue.Add(obj)
c.deletingPodObjMap[key] = p
c.deletePodQueue.Add(key)
}
} else {
klog.V(3).Infof("enqueue delete pod %s", key)
c.deletePodQueue.Add(obj)
c.deletingPodObjMap[key] = p
c.deletePodQueue.Add(key)
}
return
}
Expand Down Expand Up @@ -232,7 +239,8 @@ func (c *Controller) enqueueDeletePod(obj interface{}) {
}

klog.V(3).Infof("enqueue delete pod %s", key)
c.deletePodQueue.Add(obj)
c.deletingPodObjMap[key] = p
c.deletePodQueue.Add(key)
}

func (c *Controller) enqueueUpdatePod(oldObj, newObj interface{}) {
Expand Down Expand Up @@ -283,9 +291,10 @@ func (c *Controller) enqueueUpdatePod(oldObj, newObj interface{}) {

isStateful, statefulSetName := isStatefulSetPod(newPod)
isVmPod, vmName := isVmPod(newPod)
if !isPodAlive(newPod) && !isStateful && !isVmPod {
if !isPodStatusPhaseAlive(newPod) && !isStateful && !isVmPod {
klog.V(3).Infof("enqueue delete pod %s", key)
c.deletePodQueue.Add(newObj)
c.deletingPodObjMap[key] = newPod
c.deletePodQueue.Add(key)
return
}

Expand All @@ -304,7 +313,8 @@ func (c *Controller) enqueueUpdatePod(oldObj, newObj interface{}) {
// In case node get lost and pod can not be deleted,
// the ip address will not be recycled
klog.V(3).Infof("enqueue delete pod %s after %v", key, delay)
c.deletePodQueue.AddAfter(newObj, delay)
c.deletingPodObjMap[key] = newPod
c.deletePodQueue.AddAfter(key, delay)
}()
return
}
Expand All @@ -313,14 +323,16 @@ func (c *Controller) enqueueUpdatePod(oldObj, newObj interface{}) {
if isStateful && isStatefulSetPodToDel(c.config.KubeClient, newPod, statefulSetName) {
go func() {
klog.V(3).Infof("enqueue delete pod %s after %v", key, delay)
c.deletePodQueue.AddAfter(newObj, delay)
c.deletingPodObjMap[key] = newPod
c.deletePodQueue.AddAfter(key, delay)
}()
return
}
if isVmPod && c.isVmPodToDel(newPod, vmName) {
go func() {
klog.V(3).Infof("enqueue delete pod %s after %v", key, delay)
c.deletePodQueue.AddAfter(newObj, delay)
c.deletingPodObjMap[key] = newPod
c.deletePodQueue.AddAfter(key, delay)
}()
return
}
Expand Down Expand Up @@ -400,20 +412,30 @@ func (c *Controller) processNextDeletePodWorkItem() bool {
now := time.Now()
err := func(obj interface{}) error {
defer c.deletePodQueue.Done(obj)
var pod *v1.Pod
var key string
var ok bool
if pod, ok = obj.(*v1.Pod); !ok {
if key, ok = obj.(string); !ok {
c.deletePodQueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected pod in workqueue but got %#v", obj))
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
_, exist := c.deletingPodObjMap[key]
if !exist {
return nil
}
if err := c.handleDeletePod(pod); err != nil {
c.deletePodQueue.AddRateLimited(obj)
return fmt.Errorf("error syncing '%s': %s, requeuing", pod.Name, err.Error())

if err := c.handleDeletePod(key); err != nil {
c.deletePodQueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
c.deletePodQueue.Forget(obj)
last := time.Since(now)
klog.Infof("take %d ms to handle delete pod %s/%s", last.Milliseconds(), pod.Namespace, pod.Name)
klog.Infof("take %d ms to handle delete pod %s", last.Milliseconds(), key)
// gc pod obj in c.deletingPodObjMap
go func() {
time.Sleep(5 * time.Minute)
delete(c.deletingPodObjMap, key)
}()
return nil
}(obj)

Expand Down Expand Up @@ -685,7 +707,9 @@ func (c *Controller) reconcileAllocateSubnets(cachedPod, pod *v1.Pod, needAlloca
if k8serrors.IsNotFound(err) {
// Sometimes pod is deleted between kube-ovn configure ovn-nb and patch pod.
// Then we need to recycle the resource again.
c.deletePodQueue.AddRateLimited(pod)
key := strings.Join([]string{namespace, name}, "/")
c.deletingPodObjMap[key] = pod
c.deletePodQueue.AddRateLimited(key)
return nil, nil
}
klog.Errorf("patch pod %s/%s failed: %v", name, namespace, err)
Expand Down Expand Up @@ -829,7 +853,9 @@ func (c *Controller) reconcileRouteSubnets(cachedPod, pod *v1.Pod, needRoutePodN
if k8serrors.IsNotFound(err) {
// Sometimes pod is deleted between kube-ovn configure ovn-nb and patch pod.
// Then we need to recycle the resource again.
c.deletePodQueue.AddRateLimited(pod)
key := strings.Join([]string{namespace, name}, "/")
c.deletingPodObjMap[key] = pod
c.deletePodQueue.AddRateLimited(key)
return nil
}
klog.Errorf("patch pod %s/%s failed %v", name, namespace, err)
Expand All @@ -838,9 +864,9 @@ func (c *Controller) reconcileRouteSubnets(cachedPod, pod *v1.Pod, needRoutePodN
return nil
}

func (c *Controller) handleDeletePod(pod *v1.Pod) error {
func (c *Controller) handleDeletePod(key string) error {
pod := c.deletingPodObjMap[key]
podName := c.getNameByPod(pod)
key := fmt.Sprintf("%s/%s", pod.Namespace, podName)
c.podKeyMutex.LockKey(key)
defer func() { _ = c.podKeyMutex.UnlockKey(key) }()
klog.Infof("handle delete pod %s", key)
Expand Down

0 comments on commit 8f43028

Please sign in to comment.