From 160cb5f6705781d6f467c5f2bf2e198ed6f98e51 Mon Sep 17 00:00:00 2001 From: Chun Chen Date: Tue, 4 Aug 2020 14:06:15 +0800 Subject: [PATCH 1/3] clean up --- pkg/ipam/schedulerplugin/floatingip_plugin.go | 24 +++++------------ .../schedulerplugin/floatingip_plugin_test.go | 10 +++---- pkg/ipam/schedulerplugin/resync.go | 26 +------------------ pkg/ipam/schedulerplugin/resync_test.go | 4 +-- pkg/ipam/schedulerplugin/types.go | 5 ---- pkg/ipam/server/server.go | 14 +++++----- 6 files changed, 20 insertions(+), 63 deletions(-) diff --git a/pkg/ipam/schedulerplugin/floatingip_plugin.go b/pkg/ipam/schedulerplugin/floatingip_plugin.go index 969d494d..dc9ef2a5 100644 --- a/pkg/ipam/schedulerplugin/floatingip_plugin.go +++ b/pkg/ipam/schedulerplugin/floatingip_plugin.go @@ -46,10 +46,10 @@ type FloatingIPPlugin struct { nodeSubnet map[string]*net.IPNet nodeSubnetLock sync.Mutex *PluginFactoryArgs - lastIPConf, lastSecondIPConf string - conf *Conf - unreleased chan *releaseEvent - cloudProvider cloudprovider.CloudProvider + lastIPConf string + conf *Conf + unreleased chan *releaseEvent + cloudProvider cloudprovider.CloudProvider // protect unbind immutable deployment pod dpLockPool keymutex.KeyMutex // protect bind/unbind for each pod @@ -93,11 +93,7 @@ func (p *FloatingIPPlugin) Init() error { return fmt.Errorf("failed to get floatingip config from configmap: %v", err) } } - wait.PollInfinite(time.Second, func() (done bool, err error) { - glog.Infof("waiting store ready") - return p.storeReady(), nil - }) - glog.Infof("store is ready, plugin init done") + glog.Infof("plugin init done") return nil } @@ -110,15 +106,9 @@ func (p *FloatingIPPlugin) Run(stop chan struct{}) { } }, time.Minute, stop) } - firstTime := true go wait.Until(func() { - if firstTime { - glog.Infof("start resyncing for the first time") - defer glog.Infof("resyncing complete for the first time") - firstTime = false - } - if err := p.resyncPod(p.ipam); err != nil { - glog.Warningf("[%s] %v", p.ipam.Name(), err) + if err := p.resyncPod(); err != nil { + glog.Warningf("resync pod: %v", err) } p.syncPodIPsIntoDB() }, time.Duration(p.conf.ResyncInterval)*time.Minute, stop) diff --git a/pkg/ipam/schedulerplugin/floatingip_plugin_test.go b/pkg/ipam/schedulerplugin/floatingip_plugin_test.go index 2cd2bd1d..7681e09a 100644 --- a/pkg/ipam/schedulerplugin/floatingip_plugin_test.go +++ b/pkg/ipam/schedulerplugin/floatingip_plugin_test.go @@ -500,11 +500,7 @@ func createPluginFactoryArgs(t *testing.T, objs ...runtime.Object) (*PluginFacto StatefulSetLister: statefulsetInformer.Lister(), DeploymentLister: deploymentInformer.Lister(), Client: client, - PodHasSynced: podInformer.Informer().HasSynced, - StatefulSetSynced: statefulsetInformer.Informer().HasSynced, - DeploymentSynced: deploymentInformer.Informer().HasSynced, PoolLister: poolInformer.Lister(), - PoolSynced: poolInformer.Informer().HasSynced, //TAppClient: tappCli, //TAppHasSynced: tappInformer.Informer().HasSynced, //TAppLister: tappInformer.Lister(), @@ -513,8 +509,10 @@ func createPluginFactoryArgs(t *testing.T, objs ...runtime.Object) (*PluginFacto FIPInformer: FIPInformer, } //tapp.EnsureCRDCreated(pluginArgs.ExtClient) - go informerFactory.Start(stopChan) - go crdInformerFactory.Start(stopChan) + informerFactory.Start(stopChan) + crdInformerFactory.Start(stopChan) + informerFactory.WaitForCacheSync(stopChan) + crdInformerFactory.WaitForCacheSync(stopChan) //go tappInformerFactory.Start(stopChan) return pluginArgs, podInformer, stopChan } diff --git a/pkg/ipam/schedulerplugin/resync.go b/pkg/ipam/schedulerplugin/resync.go index d5bd3cf0..0659886d 100644 --- a/pkg/ipam/schedulerplugin/resync.go +++ b/pkg/ipam/schedulerplugin/resync.go @@ -36,30 +36,6 @@ import ( tappv1 "tkestack.io/tapp/pkg/apis/tappcontroller/v1" ) -func (p *FloatingIPPlugin) storeReady() bool { - if !p.PodHasSynced() { - glog.V(3).Infof("the pod store has not been synced yet") - return false - } - if !p.StatefulSetSynced() { - glog.V(3).Infof("the statefulset store has not been synced yet") - return false - } - if !p.DeploymentSynced() { - glog.V(3).Infof("the deployment store has not been synced yet") - return false - } - if p.TAppHasSynced != nil && !p.TAppHasSynced() { - glog.V(3).Infof("the tapp store has not been synced yet") - return false - } - if !p.PoolSynced() { - glog.V(3).Infof("the pool store has not been synced yet") - return false - } - return true -} - type resyncObj struct { keyObj *util.KeyObj fip floatingip.FloatingIP @@ -71,7 +47,7 @@ type resyncObj struct { // 3. deleted pods whose parent deployment no need so many ips // 4. deleted pods whose parent statefulset/tapp exist but pod index > .spec.replica // 5. existing pods but its status is evicted -func (p *FloatingIPPlugin) resyncPod(ipam floatingip.IPAM) error { +func (p *FloatingIPPlugin) resyncPod() error { glog.V(4).Infof("resync pods+") defer glog.V(4).Infof("resync pods-") resyncMeta := &resyncMeta{ diff --git a/pkg/ipam/schedulerplugin/resync_test.go b/pkg/ipam/schedulerplugin/resync_test.go index 838b5033..62bda332 100644 --- a/pkg/ipam/schedulerplugin/resync_test.go +++ b/pkg/ipam/schedulerplugin/resync_test.go @@ -39,7 +39,7 @@ func TestResyncAppNotExist(t *testing.T) { if err := fipPlugin.ipam.AllocateSpecificIP(pod2Key.KeyInDB, net.ParseIP("10.49.27.216"), parseReleasePolicy(&pod2.ObjectMeta), ""); err != nil { t.Fatal(err) } - if err := fipPlugin.resyncPod(fipPlugin.ipam); err != nil { + if err := fipPlugin.resyncPod(); err != nil { t.Fatal(err) } if err := checkIPKey(fipPlugin.ipam, "10.49.27.205", pod1Key.PoolPrefix()); err != nil { @@ -84,7 +84,7 @@ func TestResyncStsPod(t *testing.T) { if err := fipPlugin.ipam.AllocateSpecificIP(keyObj.KeyInDB, net.ParseIP("10.49.27.205"), parseReleasePolicy(&pod.ObjectMeta), ""); err != nil { t.Fatalf("case %d, err %v", i, err) } - if err := fipPlugin.resyncPod(fipPlugin.ipam); err != nil { + if err := fipPlugin.resyncPod(); err != nil { t.Fatalf("case %d, err %v", i, err) } if err := checkIPKey(fipPlugin.ipam, "10.49.27.205", testCase.expectKeyFunc(keyObj)); err != nil { diff --git a/pkg/ipam/schedulerplugin/types.go b/pkg/ipam/schedulerplugin/types.go index 930d68d6..7b59a964 100644 --- a/pkg/ipam/schedulerplugin/types.go +++ b/pkg/ipam/schedulerplugin/types.go @@ -37,11 +37,6 @@ type PluginFactoryArgs struct { DeploymentLister appv1.DeploymentLister TAppLister v1.TAppLister PoolLister list.PoolLister - PodHasSynced func() bool - StatefulSetSynced func() bool - DeploymentSynced func() bool - TAppHasSynced func() bool - PoolSynced func() bool CrdClient crd_clientset.Interface ExtClient extensionClient.Interface FIPInformer crdInformer.FloatingIPInformer diff --git a/pkg/ipam/server/server.go b/pkg/ipam/server/server.go index 46b138b1..0fdababd 100644 --- a/pkg/ipam/server/server.go +++ b/pkg/ipam/server/server.go @@ -110,11 +110,7 @@ func (s *Server) init() error { DeploymentLister: deploymentInformer.Lister(), Client: s.client, TAppClient: s.tappClient, - PodHasSynced: podInformer.Informer().HasSynced, - StatefulSetSynced: statefulsetInformer.Informer().HasSynced, - DeploymentSynced: deploymentInformer.Informer().HasSynced, PoolLister: poolInformer.Lister(), - PoolSynced: poolInformer.Informer().HasSynced, CrdClient: s.crdClient, ExtClient: s.extensionClient, FIPInformer: fipInformer, @@ -123,7 +119,6 @@ func (s *Server) init() error { s.tappInformerFactory = tappInformers.NewSharedInformerFactory(s.tappClient, time.Minute) tappInformer := s.tappInformerFactory.Tappcontroller().V1().TApps() pluginArgs.TAppLister = tappInformer.Lister() - pluginArgs.TAppHasSynced = tappInformer.Informer().HasSynced } s.plugin, err = schedulerplugin.NewFloatingIPPlugin(s.SchedulePluginConf, pluginArgs) if err != nil { @@ -145,10 +140,13 @@ func (s *Server) Start() error { } func (s *Server) Run() error { - go s.informerFactory.Start(s.stopChan) - go s.crdInformerFactory.Start(s.stopChan) + s.informerFactory.Start(s.stopChan) + s.informerFactory.WaitForCacheSync(s.stopChan) + s.crdInformerFactory.Start(s.stopChan) + s.crdInformerFactory.WaitForCacheSync(s.stopChan) if s.tappInformerFactory != nil { - go s.tappInformerFactory.Start(s.stopChan) + s.tappInformerFactory.Start(s.stopChan) + s.tappInformerFactory.WaitForCacheSync(s.stopChan) } if err := crd.EnsureCRDCreated(s.extensionClient); err != nil { return err From 9902f0cc6979d3f52daf7678945407b30daacc37 Mon Sep 17 00:00:00 2001 From: Chun Chen Date: Tue, 4 Aug 2020 15:10:28 +0800 Subject: [PATCH 2/3] release ip of completed pod --- pkg/ipam/schedulerplugin/event.go | 3 +- pkg/ipam/schedulerplugin/floatingip_plugin.go | 5 ++- .../schedulerplugin/floatingip_plugin_test.go | 39 +++++++++++++++++++ pkg/ipam/schedulerplugin/resync.go | 11 +++--- pkg/ipam/schedulerplugin/resync_test.go | 7 ++++ pkg/ipam/server/server.go | 2 +- 6 files changed, 58 insertions(+), 9 deletions(-) diff --git a/pkg/ipam/schedulerplugin/event.go b/pkg/ipam/schedulerplugin/event.go index e2869db3..c4261ae4 100644 --- a/pkg/ipam/schedulerplugin/event.go +++ b/pkg/ipam/schedulerplugin/event.go @@ -40,9 +40,10 @@ func (p *FloatingIPPlugin) UpdatePod(oldPod, newPod *corev1.Pod) error { if !p.hasResourceName(&newPod.Spec) { return nil } - if !evicted(oldPod) && evicted(newPod) { + if !shouldReleaseIP(oldPod) && shouldReleaseIP(newPod) { // Deployments will leave evicted pods // If it's a evicted one, release its ip + glog.Infof("release ip from %s_%s, phase %s", newPod.Name, newPod.Namespace, string(newPod.Status.Phase)) p.unreleased <- &releaseEvent{pod: newPod} } if err := p.syncPodIP(newPod); err != nil { diff --git a/pkg/ipam/schedulerplugin/floatingip_plugin.go b/pkg/ipam/schedulerplugin/floatingip_plugin.go index dc9ef2a5..c0222fcb 100644 --- a/pkg/ipam/schedulerplugin/floatingip_plugin.go +++ b/pkg/ipam/schedulerplugin/floatingip_plugin.go @@ -437,8 +437,9 @@ func getNodeIP(node *corev1.Node) net.IP { return nil } -func evicted(pod *corev1.Pod) bool { - return pod.Status.Phase == corev1.PodFailed && pod.Status.Reason == "Evicted" +func shouldReleaseIP(pod *corev1.Pod) bool { + return (pod.Status.Phase == corev1.PodFailed && pod.Status.Reason == "Evicted") || + pod.Status.Phase == corev1.PodSucceeded } // getNodeSubnet gets node subnet from ipam diff --git a/pkg/ipam/schedulerplugin/floatingip_plugin_test.go b/pkg/ipam/schedulerplugin/floatingip_plugin_test.go index 7681e09a..14454cfa 100644 --- a/pkg/ipam/schedulerplugin/floatingip_plugin_test.go +++ b/pkg/ipam/schedulerplugin/floatingip_plugin_test.go @@ -31,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" coreInformer "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes/fake" @@ -61,6 +62,14 @@ var ( pod = CreateStatefulSetPod("pod1-0", "ns1", immutableAnnotation) podKey, _ = schedulerplugin_util.FormatKey(pod) node3Subnet = &net.IPNet{IP: net.ParseIP("10.49.27.0"), Mask: net.IPv4Mask(255, 255, 255, 0)} + + toEvictPod = func(pod *corev1.Pod) { + pod.Status.Phase = corev1.PodFailed + pod.Status.Reason = "Evicted" + } + toSuccessPod = func(pod *corev1.Pod) { + pod.Status.Phase = corev1.PodSucceeded + } ) func createPluginTestNodes(t *testing.T, objs ...runtime.Object) (*FloatingIPPlugin, chan struct{}, []corev1.Node) { @@ -795,3 +804,33 @@ func checkBind(fipPlugin *FloatingIPPlugin, pod *corev1.Pod, nodeName, checkKey } return fipInfo, nil } + +func TestReleaseIPOfEvictOrCompletePod(t *testing.T) { + for i, testCase := range []struct { + updatePodStatus func(pod *corev1.Pod) + }{ + {updatePodStatus: toEvictPod}, + {updatePodStatus: toSuccessPod}, + } { + pod := CreateStatefulSetPod("pod1-0", "ns1", nil) + podKey, _ := schedulerplugin_util.FormatKey(pod) + func() { + fipPlugin, stopChan, _ := createPluginTestNodes(t, pod) + fipPlugin.Run(stopChan) + defer func() { stopChan <- struct{}{} }() + fipInfo, err := checkBind(fipPlugin, pod, node3, podKey.KeyInDB, node3Subnet) + if err != nil { + t.Fatalf("case %d: %v", i, err) + } + testCase.updatePodStatus(pod) + if _, err := fipPlugin.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(pod); err != nil { + t.Fatalf("case %d: %v", i, err) + } + if err := wait.Poll(time.Microsecond*10, time.Second*30, func() (done bool, err error) { + return checkIPKey(fipPlugin.ipam, fipInfo.FIP.IP.String(), "") == nil, nil + }); err != nil { + t.Fatalf("case %d: %v", i, err) + } + }() + } +} diff --git a/pkg/ipam/schedulerplugin/resync.go b/pkg/ipam/schedulerplugin/resync.go index 0659886d..8daee804 100644 --- a/pkg/ipam/schedulerplugin/resync.go +++ b/pkg/ipam/schedulerplugin/resync.go @@ -195,14 +195,15 @@ func (p *FloatingIPPlugin) resyncTappOrSts(meta *resyncMeta, keyObj *util.KeyObj } func (p *FloatingIPPlugin) podExist(podName, namespace string) bool { - _, err := p.Client.CoreV1().Pods(namespace).Get(podName, v1.GetOptions{}) + pod, err := p.Client.CoreV1().Pods(namespace).Get(podName, v1.GetOptions{}) if err != nil { if metaErrs.IsNotFound(err) { return false } - // we cannot figure out whether pod exist or not + // we cannot figure out whether pod exist or not, we'd better keep the ip + return true } - return true + return !shouldReleaseIP(pod) } func parsePodIndex(name string) (int, error) { @@ -231,7 +232,7 @@ func (p *FloatingIPPlugin) listWantedPodsToMap() (map[string]*corev1.Pod, error) } existPods := map[string]*corev1.Pod{} for i := range pods { - if evicted(pods[i]) { + if shouldReleaseIP(pods[i]) { // for evicted pod, treat as not exist continue } @@ -260,7 +261,7 @@ func (p *FloatingIPPlugin) syncPodIPsIntoDB() { } // #lizard forgives -// syncPodIP sync pod ip with db, if the pod has ipinfos annotation and the ip is unallocated in db, allocate the ip +// syncPodIP sync pod ip with ipam, if the pod has ipinfos annotation and the ip is unallocated in ipam, allocate the ip // to the pod func (p *FloatingIPPlugin) syncPodIP(pod *corev1.Pod) error { if pod.Status.Phase != corev1.PodRunning { diff --git a/pkg/ipam/schedulerplugin/resync_test.go b/pkg/ipam/schedulerplugin/resync_test.go index 62bda332..7ef688c0 100644 --- a/pkg/ipam/schedulerplugin/resync_test.go +++ b/pkg/ipam/schedulerplugin/resync_test.go @@ -20,6 +20,7 @@ import ( "net" "testing" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" . "tkestack.io/galaxy/pkg/ipam/schedulerplugin/testing" "tkestack.io/galaxy/pkg/ipam/schedulerplugin/util" @@ -55,6 +56,7 @@ func TestResyncStsPod(t *testing.T) { annotations map[string]string replicas int32 createPod bool + updateStatus func(*corev1.Pod) createApp bool expectKeyFunc func(obj *util.KeyObj) string }{ @@ -66,9 +68,14 @@ func TestResyncStsPod(t *testing.T) { {annotations: neverAnnotation, replicas: 0, expectKeyFunc: podNameFunc, createApp: true}, {annotations: neverAnnotation, replicas: 1, expectKeyFunc: podNameFunc, createApp: true}, {annotations: neverAnnotation, replicas: 1, expectKeyFunc: podNameFunc, createApp: false}, + {annotations: nil, replicas: 1, expectKeyFunc: emptyNameFunc, createPod: true, updateStatus: toEvictPod}, // pod evicted, ip will be released + {annotations: nil, replicas: 1, expectKeyFunc: emptyNameFunc, createPod: true, updateStatus: toSuccessPod}, // pod completed, ip will be released } { var objs []runtime.Object pod := CreateStatefulSetPod("sts-xxx-0", "ns1", testCase.annotations) + if testCase.updateStatus != nil { + testCase.updateStatus(pod) + } keyObj, _ := util.FormatKey(pod) if testCase.createPod { objs = append(objs, pod) diff --git a/pkg/ipam/server/server.go b/pkg/ipam/server/server.go index 0fdababd..01e69455 100644 --- a/pkg/ipam/server/server.go +++ b/pkg/ipam/server/server.go @@ -141,8 +141,8 @@ func (s *Server) Start() error { func (s *Server) Run() error { s.informerFactory.Start(s.stopChan) - s.informerFactory.WaitForCacheSync(s.stopChan) s.crdInformerFactory.Start(s.stopChan) + s.informerFactory.WaitForCacheSync(s.stopChan) s.crdInformerFactory.WaitForCacheSync(s.stopChan) if s.tappInformerFactory != nil { s.tappInformerFactory.Start(s.stopChan) From e6e1ca959e79918b9c78b77ca85e4d41a69c080f Mon Sep 17 00:00:00 2001 From: Chun Chen Date: Wed, 5 Aug 2020 14:37:09 +0800 Subject: [PATCH 3/3] release ip from all failed pods and rename some func --- pkg/ipam/schedulerplugin/event.go | 2 +- pkg/ipam/schedulerplugin/floatingip_plugin.go | 6 ++--- .../schedulerplugin/floatingip_plugin_test.go | 7 +++-- pkg/ipam/schedulerplugin/resync.go | 27 +++++++++---------- pkg/ipam/schedulerplugin/resync_test.go | 2 +- 5 files changed, 21 insertions(+), 23 deletions(-) diff --git a/pkg/ipam/schedulerplugin/event.go b/pkg/ipam/schedulerplugin/event.go index c4261ae4..338860c7 100644 --- a/pkg/ipam/schedulerplugin/event.go +++ b/pkg/ipam/schedulerplugin/event.go @@ -40,7 +40,7 @@ func (p *FloatingIPPlugin) UpdatePod(oldPod, newPod *corev1.Pod) error { if !p.hasResourceName(&newPod.Spec) { return nil } - if !shouldReleaseIP(oldPod) && shouldReleaseIP(newPod) { + if !finished(oldPod) && finished(newPod) { // Deployments will leave evicted pods // If it's a evicted one, release its ip glog.Infof("release ip from %s_%s, phase %s", newPod.Name, newPod.Namespace, string(newPod.Status.Phase)) diff --git a/pkg/ipam/schedulerplugin/floatingip_plugin.go b/pkg/ipam/schedulerplugin/floatingip_plugin.go index c0222fcb..387866aa 100644 --- a/pkg/ipam/schedulerplugin/floatingip_plugin.go +++ b/pkg/ipam/schedulerplugin/floatingip_plugin.go @@ -437,9 +437,9 @@ func getNodeIP(node *corev1.Node) net.IP { return nil } -func shouldReleaseIP(pod *corev1.Pod) bool { - return (pod.Status.Phase == corev1.PodFailed && pod.Status.Reason == "Evicted") || - pod.Status.Phase == corev1.PodSucceeded +// finished returns true if pod completes and won't be restarted again +func finished(pod *corev1.Pod) bool { + return pod.Status.Phase == corev1.PodFailed || pod.Status.Phase == corev1.PodSucceeded } // getNodeSubnet gets node subnet from ipam diff --git a/pkg/ipam/schedulerplugin/floatingip_plugin_test.go b/pkg/ipam/schedulerplugin/floatingip_plugin_test.go index 14454cfa..786b29bd 100644 --- a/pkg/ipam/schedulerplugin/floatingip_plugin_test.go +++ b/pkg/ipam/schedulerplugin/floatingip_plugin_test.go @@ -63,9 +63,8 @@ var ( podKey, _ = schedulerplugin_util.FormatKey(pod) node3Subnet = &net.IPNet{IP: net.ParseIP("10.49.27.0"), Mask: net.IPv4Mask(255, 255, 255, 0)} - toEvictPod = func(pod *corev1.Pod) { + toFailedPod = func(pod *corev1.Pod) { pod.Status.Phase = corev1.PodFailed - pod.Status.Reason = "Evicted" } toSuccessPod = func(pod *corev1.Pod) { pod.Status.Phase = corev1.PodSucceeded @@ -805,11 +804,11 @@ func checkBind(fipPlugin *FloatingIPPlugin, pod *corev1.Pod, nodeName, checkKey return fipInfo, nil } -func TestReleaseIPOfEvictOrCompletePod(t *testing.T) { +func TestReleaseIPOfFinishedPod(t *testing.T) { for i, testCase := range []struct { updatePodStatus func(pod *corev1.Pod) }{ - {updatePodStatus: toEvictPod}, + {updatePodStatus: toFailedPod}, {updatePodStatus: toSuccessPod}, } { pod := CreateStatefulSetPod("pod1-0", "ns1", nil) diff --git a/pkg/ipam/schedulerplugin/resync.go b/pkg/ipam/schedulerplugin/resync.go index 8daee804..e7f027c4 100644 --- a/pkg/ipam/schedulerplugin/resync.go +++ b/pkg/ipam/schedulerplugin/resync.go @@ -65,7 +65,7 @@ func (p *FloatingIPPlugin) resyncPod() error { type resyncMeta struct { allocatedIPs map[string]resyncObj // allocated ips from galaxy pool - existPods map[string]*corev1.Pod + unfinish map[string]*corev1.Pod tappMap map[string]*tappv1.TApp ssMap map[string]*appv1.StatefulSet } @@ -95,7 +95,7 @@ func (p *FloatingIPPlugin) fetchChecklist(meta *resyncMeta) error { func (p *FloatingIPPlugin) fetchAppAndPodMeta(meta *resyncMeta) error { var err error - meta.existPods, err = p.listWantedPodsToMap() + meta.unfinish, err = p.listUnfinishPodsToMap() if err != nil { return err } @@ -115,11 +115,11 @@ func (p *FloatingIPPlugin) resyncAllocatedIPs(meta *resyncMeta) { for key, obj := range meta.allocatedIPs { func() { defer p.lockPod(obj.keyObj.PodName, obj.keyObj.Namespace)() - if _, ok := meta.existPods[key]; ok { + if _, ok := meta.unfinish[key]; ok { return } // check with apiserver to confirm it really not exist - if p.podExist(obj.keyObj.PodName, obj.keyObj.Namespace) { + if p.podRunning(obj.keyObj.PodName, obj.keyObj.Namespace) { return } if p.cloudProvider != nil { @@ -149,7 +149,7 @@ func (p *FloatingIPPlugin) resyncAllocatedIPs(meta *resyncMeta) { releasePolicy := constant.ReleasePolicy(obj.fip.Policy) // we can't get labels of not exist pod, so get them from it's ss or deployment if !obj.keyObj.Deployment() { - p.resyncTappOrSts(meta, obj.keyObj, releasePolicy) + p.resyncNoneDpPod(meta, obj.keyObj, releasePolicy) return } if err := p.unbindDpPod(obj.keyObj, releasePolicy, "during resyncing"); err != nil { @@ -159,7 +159,7 @@ func (p *FloatingIPPlugin) resyncAllocatedIPs(meta *resyncMeta) { } } -func (p *FloatingIPPlugin) resyncTappOrSts(meta *resyncMeta, keyObj *util.KeyObj, releasePolicy constant.ReleasePolicy) { +func (p *FloatingIPPlugin) resyncNoneDpPod(meta *resyncMeta, keyObj *util.KeyObj, releasePolicy constant.ReleasePolicy) { if releasePolicy == constant.ReleasePolicyNever { return } @@ -194,7 +194,7 @@ func (p *FloatingIPPlugin) resyncTappOrSts(meta *resyncMeta, keyObj *util.KeyObj } } -func (p *FloatingIPPlugin) podExist(podName, namespace string) bool { +func (p *FloatingIPPlugin) podRunning(podName, namespace string) bool { pod, err := p.Client.CoreV1().Pods(namespace).Get(podName, v1.GetOptions{}) if err != nil { if metaErrs.IsNotFound(err) { @@ -203,7 +203,7 @@ func (p *FloatingIPPlugin) podExist(podName, namespace string) bool { // we cannot figure out whether pod exist or not, we'd better keep the ip return true } - return !shouldReleaseIP(pod) + return !finished(pod) } func parsePodIndex(name string) (int, error) { @@ -225,24 +225,23 @@ func (p *FloatingIPPlugin) listWantedPods() ([]*corev1.Pod, error) { return filtered, nil } -func (p *FloatingIPPlugin) listWantedPodsToMap() (map[string]*corev1.Pod, error) { +func (p *FloatingIPPlugin) listUnfinishPodsToMap() (map[string]*corev1.Pod, error) { pods, err := p.listWantedPods() if err != nil { return nil, err } - existPods := map[string]*corev1.Pod{} + unfinish := map[string]*corev1.Pod{} for i := range pods { - if shouldReleaseIP(pods[i]) { - // for evicted pod, treat as not exist + if finished(pods[i]) { continue } keyObj, err := util.FormatKey(pods[i]) if err != nil { continue } - existPods[keyObj.KeyInDB] = pods[i] + unfinish[keyObj.KeyInDB] = pods[i] } - return existPods, nil + return unfinish, nil } // syncPodIPs sync all pods' ips with db, if a pod has PodIP and its ip is unallocated, allocate the ip to it diff --git a/pkg/ipam/schedulerplugin/resync_test.go b/pkg/ipam/schedulerplugin/resync_test.go index 7ef688c0..9cb7d29b 100644 --- a/pkg/ipam/schedulerplugin/resync_test.go +++ b/pkg/ipam/schedulerplugin/resync_test.go @@ -68,7 +68,7 @@ func TestResyncStsPod(t *testing.T) { {annotations: neverAnnotation, replicas: 0, expectKeyFunc: podNameFunc, createApp: true}, {annotations: neverAnnotation, replicas: 1, expectKeyFunc: podNameFunc, createApp: true}, {annotations: neverAnnotation, replicas: 1, expectKeyFunc: podNameFunc, createApp: false}, - {annotations: nil, replicas: 1, expectKeyFunc: emptyNameFunc, createPod: true, updateStatus: toEvictPod}, // pod evicted, ip will be released + {annotations: nil, replicas: 1, expectKeyFunc: emptyNameFunc, createPod: true, updateStatus: toFailedPod}, // pod failed, ip will be released {annotations: nil, replicas: 1, expectKeyFunc: emptyNameFunc, createPod: true, updateStatus: toSuccessPod}, // pod completed, ip will be released } { var objs []runtime.Object