Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release ip of completed pod #81

Merged
merged 3 commits into from
Aug 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/ipam/schedulerplugin/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ func (p *FloatingIPPlugin) UpdatePod(oldPod, newPod *corev1.Pod) error {
if !p.hasResourceName(&newPod.Spec) {
return nil
}
if !evicted(oldPod) && evicted(newPod) {
if !finished(oldPod) && finished(newPod) {
// Deployments will leave evicted pods
// If it's a evicted one, release its ip
glog.Infof("release ip from %s_%s, phase %s", newPod.Name, newPod.Namespace, string(newPod.Status.Phase))
p.unreleased <- &releaseEvent{pod: newPod}
}
if err := p.syncPodIP(newPod); err != nil {
Expand Down
29 changes: 10 additions & 19 deletions pkg/ipam/schedulerplugin/floatingip_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ type FloatingIPPlugin struct {
nodeSubnet map[string]*net.IPNet
nodeSubnetLock sync.Mutex
*PluginFactoryArgs
lastIPConf, lastSecondIPConf string
conf *Conf
unreleased chan *releaseEvent
cloudProvider cloudprovider.CloudProvider
lastIPConf string
conf *Conf
unreleased chan *releaseEvent
cloudProvider cloudprovider.CloudProvider
// protect unbind immutable deployment pod
dpLockPool keymutex.KeyMutex
// protect bind/unbind for each pod
Expand Down Expand Up @@ -93,11 +93,7 @@ func (p *FloatingIPPlugin) Init() error {
return fmt.Errorf("failed to get floatingip config from configmap: %v", err)
}
}
wait.PollInfinite(time.Second, func() (done bool, err error) {
glog.Infof("waiting store ready")
return p.storeReady(), nil
})
glog.Infof("store is ready, plugin init done")
glog.Infof("plugin init done")
return nil
}

Expand All @@ -110,15 +106,9 @@ func (p *FloatingIPPlugin) Run(stop chan struct{}) {
}
}, time.Minute, stop)
}
firstTime := true
go wait.Until(func() {
if firstTime {
glog.Infof("start resyncing for the first time")
defer glog.Infof("resyncing complete for the first time")
firstTime = false
}
if err := p.resyncPod(p.ipam); err != nil {
glog.Warningf("[%s] %v", p.ipam.Name(), err)
if err := p.resyncPod(); err != nil {
glog.Warningf("resync pod: %v", err)
}
p.syncPodIPsIntoDB()
}, time.Duration(p.conf.ResyncInterval)*time.Minute, stop)
Expand Down Expand Up @@ -447,8 +437,9 @@ func getNodeIP(node *corev1.Node) net.IP {
return nil
}

func evicted(pod *corev1.Pod) bool {
return pod.Status.Phase == corev1.PodFailed && pod.Status.Reason == "Evicted"
// finished returns true if pod completes and won't be restarted again
func finished(pod *corev1.Pod) bool {
return pod.Status.Phase == corev1.PodFailed || pod.Status.Phase == corev1.PodSucceeded
}

// getNodeSubnet gets node subnet from ipam
Expand Down
48 changes: 42 additions & 6 deletions pkg/ipam/schedulerplugin/floatingip_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
coreInformer "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes/fake"
Expand Down Expand Up @@ -61,6 +62,13 @@ var (
pod = CreateStatefulSetPod("pod1-0", "ns1", immutableAnnotation)
podKey, _ = schedulerplugin_util.FormatKey(pod)
node3Subnet = &net.IPNet{IP: net.ParseIP("10.49.27.0"), Mask: net.IPv4Mask(255, 255, 255, 0)}

toFailedPod = func(pod *corev1.Pod) {
pod.Status.Phase = corev1.PodFailed
}
toSuccessPod = func(pod *corev1.Pod) {
pod.Status.Phase = corev1.PodSucceeded
}
)

func createPluginTestNodes(t *testing.T, objs ...runtime.Object) (*FloatingIPPlugin, chan struct{}, []corev1.Node) {
Expand Down Expand Up @@ -500,11 +508,7 @@ func createPluginFactoryArgs(t *testing.T, objs ...runtime.Object) (*PluginFacto
StatefulSetLister: statefulsetInformer.Lister(),
DeploymentLister: deploymentInformer.Lister(),
Client: client,
PodHasSynced: podInformer.Informer().HasSynced,
StatefulSetSynced: statefulsetInformer.Informer().HasSynced,
DeploymentSynced: deploymentInformer.Informer().HasSynced,
PoolLister: poolInformer.Lister(),
PoolSynced: poolInformer.Informer().HasSynced,
//TAppClient: tappCli,
//TAppHasSynced: tappInformer.Informer().HasSynced,
//TAppLister: tappInformer.Lister(),
Expand All @@ -513,8 +517,10 @@ func createPluginFactoryArgs(t *testing.T, objs ...runtime.Object) (*PluginFacto
FIPInformer: FIPInformer,
}
//tapp.EnsureCRDCreated(pluginArgs.ExtClient)
go informerFactory.Start(stopChan)
go crdInformerFactory.Start(stopChan)
informerFactory.Start(stopChan)
crdInformerFactory.Start(stopChan)
informerFactory.WaitForCacheSync(stopChan)
crdInformerFactory.WaitForCacheSync(stopChan)
//go tappInformerFactory.Start(stopChan)
return pluginArgs, podInformer, stopChan
}
Expand Down Expand Up @@ -797,3 +803,33 @@ func checkBind(fipPlugin *FloatingIPPlugin, pod *corev1.Pod, nodeName, checkKey
}
return fipInfo, nil
}

func TestReleaseIPOfFinishedPod(t *testing.T) {
for i, testCase := range []struct {
updatePodStatus func(pod *corev1.Pod)
}{
{updatePodStatus: toFailedPod},
{updatePodStatus: toSuccessPod},
} {
pod := CreateStatefulSetPod("pod1-0", "ns1", nil)
podKey, _ := schedulerplugin_util.FormatKey(pod)
func() {
fipPlugin, stopChan, _ := createPluginTestNodes(t, pod)
fipPlugin.Run(stopChan)
defer func() { stopChan <- struct{}{} }()
fipInfo, err := checkBind(fipPlugin, pod, node3, podKey.KeyInDB, node3Subnet)
if err != nil {
t.Fatalf("case %d: %v", i, err)
}
testCase.updatePodStatus(pod)
if _, err := fipPlugin.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(pod); err != nil {
t.Fatalf("case %d: %v", i, err)
}
if err := wait.Poll(time.Microsecond*10, time.Second*30, func() (done bool, err error) {
return checkIPKey(fipPlugin.ipam, fipInfo.FIP.IP.String(), "") == nil, nil
}); err != nil {
t.Fatalf("case %d: %v", i, err)
}
}()
}
}
60 changes: 18 additions & 42 deletions pkg/ipam/schedulerplugin/resync.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,30 +36,6 @@ import (
tappv1 "tkestack.io/tapp/pkg/apis/tappcontroller/v1"
)

func (p *FloatingIPPlugin) storeReady() bool {
if !p.PodHasSynced() {
glog.V(3).Infof("the pod store has not been synced yet")
return false
}
if !p.StatefulSetSynced() {
glog.V(3).Infof("the statefulset store has not been synced yet")
return false
}
if !p.DeploymentSynced() {
glog.V(3).Infof("the deployment store has not been synced yet")
return false
}
if p.TAppHasSynced != nil && !p.TAppHasSynced() {
glog.V(3).Infof("the tapp store has not been synced yet")
return false
}
if !p.PoolSynced() {
glog.V(3).Infof("the pool store has not been synced yet")
return false
}
return true
}

type resyncObj struct {
keyObj *util.KeyObj
fip floatingip.FloatingIP
Expand All @@ -71,7 +47,7 @@ type resyncObj struct {
// 3. deleted pods whose parent deployment no need so many ips
// 4. deleted pods whose parent statefulset/tapp exist but pod index > .spec.replica
// 5. existing pods but its status is evicted
func (p *FloatingIPPlugin) resyncPod(ipam floatingip.IPAM) error {
func (p *FloatingIPPlugin) resyncPod() error {
glog.V(4).Infof("resync pods+")
defer glog.V(4).Infof("resync pods-")
resyncMeta := &resyncMeta{
Expand All @@ -89,7 +65,7 @@ func (p *FloatingIPPlugin) resyncPod(ipam floatingip.IPAM) error {

type resyncMeta struct {
allocatedIPs map[string]resyncObj // allocated ips from galaxy pool
existPods map[string]*corev1.Pod
unfinish map[string]*corev1.Pod
tappMap map[string]*tappv1.TApp
ssMap map[string]*appv1.StatefulSet
}
Expand Down Expand Up @@ -119,7 +95,7 @@ func (p *FloatingIPPlugin) fetchChecklist(meta *resyncMeta) error {

func (p *FloatingIPPlugin) fetchAppAndPodMeta(meta *resyncMeta) error {
var err error
meta.existPods, err = p.listWantedPodsToMap()
meta.unfinish, err = p.listUnfinishPodsToMap()
if err != nil {
return err
}
Expand All @@ -139,11 +115,11 @@ func (p *FloatingIPPlugin) resyncAllocatedIPs(meta *resyncMeta) {
for key, obj := range meta.allocatedIPs {
func() {
defer p.lockPod(obj.keyObj.PodName, obj.keyObj.Namespace)()
if _, ok := meta.existPods[key]; ok {
if _, ok := meta.unfinish[key]; ok {
return
}
// check with apiserver to confirm it really not exist
if p.podExist(obj.keyObj.PodName, obj.keyObj.Namespace) {
if p.podRunning(obj.keyObj.PodName, obj.keyObj.Namespace) {
return
}
if p.cloudProvider != nil {
Expand Down Expand Up @@ -173,7 +149,7 @@ func (p *FloatingIPPlugin) resyncAllocatedIPs(meta *resyncMeta) {
releasePolicy := constant.ReleasePolicy(obj.fip.Policy)
// we can't get labels of not exist pod, so get them from it's ss or deployment
if !obj.keyObj.Deployment() {
p.resyncTappOrSts(meta, obj.keyObj, releasePolicy)
p.resyncNoneDpPod(meta, obj.keyObj, releasePolicy)
return
}
if err := p.unbindDpPod(obj.keyObj, releasePolicy, "during resyncing"); err != nil {
Expand All @@ -183,7 +159,7 @@ func (p *FloatingIPPlugin) resyncAllocatedIPs(meta *resyncMeta) {
}
}

func (p *FloatingIPPlugin) resyncTappOrSts(meta *resyncMeta, keyObj *util.KeyObj, releasePolicy constant.ReleasePolicy) {
func (p *FloatingIPPlugin) resyncNoneDpPod(meta *resyncMeta, keyObj *util.KeyObj, releasePolicy constant.ReleasePolicy) {
if releasePolicy == constant.ReleasePolicyNever {
return
}
Expand Down Expand Up @@ -218,15 +194,16 @@ func (p *FloatingIPPlugin) resyncTappOrSts(meta *resyncMeta, keyObj *util.KeyObj
}
}

func (p *FloatingIPPlugin) podExist(podName, namespace string) bool {
_, err := p.Client.CoreV1().Pods(namespace).Get(podName, v1.GetOptions{})
func (p *FloatingIPPlugin) podRunning(podName, namespace string) bool {
pod, err := p.Client.CoreV1().Pods(namespace).Get(podName, v1.GetOptions{})
if err != nil {
if metaErrs.IsNotFound(err) {
return false
}
// we cannot figure out whether pod exist or not
// we cannot figure out whether pod exist or not, we'd better keep the ip
return true
}
return true
return !finished(pod)
}

func parsePodIndex(name string) (int, error) {
Expand All @@ -248,24 +225,23 @@ func (p *FloatingIPPlugin) listWantedPods() ([]*corev1.Pod, error) {
return filtered, nil
}

func (p *FloatingIPPlugin) listWantedPodsToMap() (map[string]*corev1.Pod, error) {
func (p *FloatingIPPlugin) listUnfinishPodsToMap() (map[string]*corev1.Pod, error) {
pods, err := p.listWantedPods()
if err != nil {
return nil, err
}
existPods := map[string]*corev1.Pod{}
unfinish := map[string]*corev1.Pod{}
for i := range pods {
if evicted(pods[i]) {
// for evicted pod, treat as not exist
if finished(pods[i]) {
continue
}
keyObj, err := util.FormatKey(pods[i])
if err != nil {
continue
}
existPods[keyObj.KeyInDB] = pods[i]
unfinish[keyObj.KeyInDB] = pods[i]
}
return existPods, nil
return unfinish, nil
}

// syncPodIPs sync all pods' ips with db, if a pod has PodIP and its ip is unallocated, allocate the ip to it
Expand All @@ -284,7 +260,7 @@ func (p *FloatingIPPlugin) syncPodIPsIntoDB() {
}

// #lizard forgives
// syncPodIP sync pod ip with db, if the pod has ipinfos annotation and the ip is unallocated in db, allocate the ip
// syncPodIP sync pod ip with ipam, if the pod has ipinfos annotation and the ip is unallocated in ipam, allocate the ip
// to the pod
func (p *FloatingIPPlugin) syncPodIP(pod *corev1.Pod) error {
if pod.Status.Phase != corev1.PodRunning {
Expand Down
11 changes: 9 additions & 2 deletions pkg/ipam/schedulerplugin/resync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"net"
"testing"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
. "tkestack.io/galaxy/pkg/ipam/schedulerplugin/testing"
"tkestack.io/galaxy/pkg/ipam/schedulerplugin/util"
Expand All @@ -39,7 +40,7 @@ func TestResyncAppNotExist(t *testing.T) {
if err := fipPlugin.ipam.AllocateSpecificIP(pod2Key.KeyInDB, net.ParseIP("10.49.27.216"), parseReleasePolicy(&pod2.ObjectMeta), ""); err != nil {
t.Fatal(err)
}
if err := fipPlugin.resyncPod(fipPlugin.ipam); err != nil {
if err := fipPlugin.resyncPod(); err != nil {
t.Fatal(err)
}
if err := checkIPKey(fipPlugin.ipam, "10.49.27.205", pod1Key.PoolPrefix()); err != nil {
Expand All @@ -55,6 +56,7 @@ func TestResyncStsPod(t *testing.T) {
annotations map[string]string
replicas int32
createPod bool
updateStatus func(*corev1.Pod)
createApp bool
expectKeyFunc func(obj *util.KeyObj) string
}{
Expand All @@ -66,9 +68,14 @@ func TestResyncStsPod(t *testing.T) {
{annotations: neverAnnotation, replicas: 0, expectKeyFunc: podNameFunc, createApp: true},
{annotations: neverAnnotation, replicas: 1, expectKeyFunc: podNameFunc, createApp: true},
{annotations: neverAnnotation, replicas: 1, expectKeyFunc: podNameFunc, createApp: false},
{annotations: nil, replicas: 1, expectKeyFunc: emptyNameFunc, createPod: true, updateStatus: toFailedPod}, // pod failed, ip will be released
{annotations: nil, replicas: 1, expectKeyFunc: emptyNameFunc, createPod: true, updateStatus: toSuccessPod}, // pod completed, ip will be released
} {
var objs []runtime.Object
pod := CreateStatefulSetPod("sts-xxx-0", "ns1", testCase.annotations)
if testCase.updateStatus != nil {
testCase.updateStatus(pod)
}
keyObj, _ := util.FormatKey(pod)
if testCase.createPod {
objs = append(objs, pod)
Expand All @@ -84,7 +91,7 @@ func TestResyncStsPod(t *testing.T) {
if err := fipPlugin.ipam.AllocateSpecificIP(keyObj.KeyInDB, net.ParseIP("10.49.27.205"), parseReleasePolicy(&pod.ObjectMeta), ""); err != nil {
t.Fatalf("case %d, err %v", i, err)
}
if err := fipPlugin.resyncPod(fipPlugin.ipam); err != nil {
if err := fipPlugin.resyncPod(); err != nil {
t.Fatalf("case %d, err %v", i, err)
}
if err := checkIPKey(fipPlugin.ipam, "10.49.27.205", testCase.expectKeyFunc(keyObj)); err != nil {
Expand Down
5 changes: 0 additions & 5 deletions pkg/ipam/schedulerplugin/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,6 @@ type PluginFactoryArgs struct {
DeploymentLister appv1.DeploymentLister
TAppLister v1.TAppLister
PoolLister list.PoolLister
PodHasSynced func() bool
StatefulSetSynced func() bool
DeploymentSynced func() bool
TAppHasSynced func() bool
PoolSynced func() bool
CrdClient crd_clientset.Interface
ExtClient extensionClient.Interface
FIPInformer crdInformer.FloatingIPInformer
Expand Down
14 changes: 6 additions & 8 deletions pkg/ipam/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,7 @@ func (s *Server) init() error {
DeploymentLister: deploymentInformer.Lister(),
Client: s.client,
TAppClient: s.tappClient,
PodHasSynced: podInformer.Informer().HasSynced,
StatefulSetSynced: statefulsetInformer.Informer().HasSynced,
DeploymentSynced: deploymentInformer.Informer().HasSynced,
PoolLister: poolInformer.Lister(),
PoolSynced: poolInformer.Informer().HasSynced,
CrdClient: s.crdClient,
ExtClient: s.extensionClient,
FIPInformer: fipInformer,
Expand All @@ -123,7 +119,6 @@ func (s *Server) init() error {
s.tappInformerFactory = tappInformers.NewSharedInformerFactory(s.tappClient, time.Minute)
tappInformer := s.tappInformerFactory.Tappcontroller().V1().TApps()
pluginArgs.TAppLister = tappInformer.Lister()
pluginArgs.TAppHasSynced = tappInformer.Informer().HasSynced
}
s.plugin, err = schedulerplugin.NewFloatingIPPlugin(s.SchedulePluginConf, pluginArgs)
if err != nil {
Expand All @@ -145,10 +140,13 @@ func (s *Server) Start() error {
}

func (s *Server) Run() error {
go s.informerFactory.Start(s.stopChan)
go s.crdInformerFactory.Start(s.stopChan)
s.informerFactory.Start(s.stopChan)
s.crdInformerFactory.Start(s.stopChan)
s.informerFactory.WaitForCacheSync(s.stopChan)
s.crdInformerFactory.WaitForCacheSync(s.stopChan)
if s.tappInformerFactory != nil {
go s.tappInformerFactory.Start(s.stopChan)
s.tappInformerFactory.Start(s.stopChan)
s.tappInformerFactory.WaitForCacheSync(s.stopChan)
}
if err := crd.EnsureCRDCreated(s.extensionClient); err != nil {
return err
Expand Down