Skip to content

Commit

Permalink
fix pod terminating not recycle ip when controller not ready
Browse files Browse the repository at this point in the history
  • Loading branch information
halfcrazy committed Jan 12, 2021
1 parent 6b21078 commit d325e7e
Showing 1 changed file with 71 additions and 35 deletions.
106 changes: 71 additions & 35 deletions pkg/controller/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"k8s.io/client-go/kubernetes"
"net"
"reflect"
"strconv"
Expand All @@ -27,6 +28,14 @@ import (
)

func isPodAlive(p *v1.Pod) bool {
if p.DeletionTimestamp != nil && p.DeletionGracePeriodSeconds != nil {
now := time.Now()
deletionTime := p.DeletionTimestamp.Time
gracePeriod := time.Duration(*p.DeletionGracePeriodSeconds) * time.Second
if now.After(deletionTime.Add(gracePeriod)) {
return false
}
}
if p.Status.Phase == v1.PodSucceeded && p.Spec.RestartPolicy != v1.RestartPolicyAlways {
return false
}
Expand Down Expand Up @@ -65,7 +74,16 @@ func (c *Controller) enqueueAddPod(obj interface{}) {
}

if !isPodAlive(p) {
c.deletePodQueue.Add(key)
isStateful, statefulSetName := isStatefulSetPod(p)
if isStateful {
if isStatefulSetPodToDel(c.config.KubeClient, p, statefulSetName) {
klog.V(3).Infof("enqueue delete pod %s", key)
c.deletePodQueue.Add(key)
}
} else {
klog.V(3).Infof("enqueue delete pod %s", key)
c.deletePodQueue.Add(key)
}
return
}

Expand Down Expand Up @@ -101,45 +119,20 @@ func (c *Controller) enqueueDeletePod(obj interface{}) {
for _, np := range c.podMatchNetworkPolicies(p) {
c.updateNpQueue.Add(np)
}
isStateful, statefulSetName := isStatefulSetPod(p)

if p.Spec.HostNetwork {
return
}
if !isStateful {
klog.V(3).Infof("enqueue delete pod %s", key)
c.deletePodQueue.Add(key)
} else {
// only delete statefulset pod lsp when statefulset deleted or down scaled
ss, err := c.config.KubeClient.AppsV1().StatefulSets(p.Namespace).Get(context.Background(), statefulSetName, metav1.GetOptions{})
if err != nil {
// statefulset is deleted
if k8serrors.IsNotFound(err) {
c.deletePodQueue.Add(key)
} else {
klog.Errorf("failed to get statefulset %v", err)
}
return
}

// statefulset is deleting
if ss.DeletionTimestamp != nil {
c.deletePodQueue.Add(key)
return
}

// down scale statefulset
numIndex := len(strings.Split(p.Name, "-")) - 1
numStr := strings.Split(p.Name, "-")[numIndex]
index, err := strconv.ParseInt(numStr, 10, 0)
if err != nil {
klog.Errorf("failed to parse %s to int", numStr)
return
}

if index >= int64(*ss.Spec.Replicas) {
isStateful, statefulSetName := isStatefulSetPod(p)
if isStateful {
if isStatefulSetPodToDel(c.config.KubeClient, p, statefulSetName) {
klog.V(3).Infof("enqueue delete pod %s", key)
c.deletePodQueue.Add(key)
return
}
} else {
klog.V(3).Infof("enqueue delete pod %s", key)
c.deletePodQueue.Add(key)
}
}

Expand Down Expand Up @@ -179,7 +172,16 @@ func (c *Controller) enqueueUpdatePod(oldObj, newObj interface{}) {
}

if !isPodAlive(newPod) {
c.deletePodQueue.Add(key)
isStateful, statefulSetName := isStatefulSetPod(newPod)
if isStateful {
if isStatefulSetPodToDel(c.config.KubeClient, newPod, statefulSetName) {
klog.V(3).Infof("enqueue delete pod %s", key)
c.deletePodQueue.Add(key)
}
} else {
klog.V(3).Infof("enqueue delete pod %s", key)
c.deletePodQueue.Add(key)
}
return
}

Expand Down Expand Up @@ -602,6 +604,40 @@ func isStatefulSetPod(pod *v1.Pod) (bool, string) {
return false, ""
}

func isStatefulSetPodToDel(c kubernetes.Interface, pod *v1.Pod, statefulSetName string) bool {
// only delete statefulset pod lsp when statefulset deleted or down scaled
ss, err := c.AppsV1().StatefulSets(pod.Namespace).Get(context.Background(), statefulSetName, metav1.GetOptions{})
if err != nil {
// statefulset is deleted
if k8serrors.IsNotFound(err) {
return true
} else {
klog.Errorf("failed to get statefulset %v", err)
}
return false
}

// statefulset is deleting
if ss.DeletionTimestamp != nil {
return true
}

// down scale statefulset
numIndex := len(strings.Split(pod.Name, "-")) - 1
numStr := strings.Split(pod.Name, "-")[numIndex]
index, err := strconv.ParseInt(numStr, 10, 0)
if err != nil {
klog.Errorf("failed to parse %s to int", numStr)
return false
}

if index >= int64(*ss.Spec.Replicas) {
return true
}

return false
}

func getNodeTunlIP(node *v1.Node) ([]net.IP, error) {
var nodeTunlIPAddr []net.IP
nodeTunlIP := node.Annotations[util.IpAddressAnnotation]
Expand Down

0 comments on commit d325e7e

Please sign in to comment.