Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release ip of completed pod #81

Merged
merged 3 commits into from
Aug 5, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/ipam/schedulerplugin/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ func (p *FloatingIPPlugin) UpdatePod(oldPod, newPod *corev1.Pod) error {
if !p.hasResourceName(&newPod.Spec) {
return nil
}
if !evicted(oldPod) && evicted(newPod) {
if !shouldReleaseIP(oldPod) && shouldReleaseIP(newPod) {
// Deployments will leave evicted pods
// If it's a evicted one, release its ip
glog.Infof("release ip from %s_%s, phase %s", newPod.Name, newPod.Namespace, string(newPod.Status.Phase))
p.unreleased <- &releaseEvent{pod: newPod}
}
if err := p.syncPodIP(newPod); err != nil {
Expand Down
29 changes: 10 additions & 19 deletions pkg/ipam/schedulerplugin/floatingip_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ type FloatingIPPlugin struct {
nodeSubnet map[string]*net.IPNet
nodeSubnetLock sync.Mutex
*PluginFactoryArgs
lastIPConf, lastSecondIPConf string
conf *Conf
unreleased chan *releaseEvent
cloudProvider cloudprovider.CloudProvider
lastIPConf string
conf *Conf
unreleased chan *releaseEvent
cloudProvider cloudprovider.CloudProvider
// protect unbind immutable deployment pod
dpLockPool keymutex.KeyMutex
// protect bind/unbind for each pod
Expand Down Expand Up @@ -93,11 +93,7 @@ func (p *FloatingIPPlugin) Init() error {
return fmt.Errorf("failed to get floatingip config from configmap: %v", err)
}
}
wait.PollInfinite(time.Second, func() (done bool, err error) {
glog.Infof("waiting store ready")
return p.storeReady(), nil
})
glog.Infof("store is ready, plugin init done")
glog.Infof("plugin init done")
return nil
}

Expand All @@ -110,15 +106,9 @@ func (p *FloatingIPPlugin) Run(stop chan struct{}) {
}
}, time.Minute, stop)
}
firstTime := true
go wait.Until(func() {
if firstTime {
glog.Infof("start resyncing for the first time")
defer glog.Infof("resyncing complete for the first time")
firstTime = false
}
if err := p.resyncPod(p.ipam); err != nil {
glog.Warningf("[%s] %v", p.ipam.Name(), err)
if err := p.resyncPod(); err != nil {
glog.Warningf("resync pod: %v", err)
}
p.syncPodIPsIntoDB()
}, time.Duration(p.conf.ResyncInterval)*time.Minute, stop)
Expand Down Expand Up @@ -447,8 +437,9 @@ func getNodeIP(node *corev1.Node) net.IP {
return nil
}

func evicted(pod *corev1.Pod) bool {
return pod.Status.Phase == corev1.PodFailed && pod.Status.Reason == "Evicted"
func shouldReleaseIP(pod *corev1.Pod) bool {
return (pod.Status.Phase == corev1.PodFailed && pod.Status.Reason == "Evicted") ||
pod.Status.Phase == corev1.PodSucceeded
}

// getNodeSubnet gets node subnet from ipam
Expand Down
49 changes: 43 additions & 6 deletions pkg/ipam/schedulerplugin/floatingip_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
coreInformer "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes/fake"
Expand Down Expand Up @@ -61,6 +62,14 @@ var (
pod = CreateStatefulSetPod("pod1-0", "ns1", immutableAnnotation)
podKey, _ = schedulerplugin_util.FormatKey(pod)
node3Subnet = &net.IPNet{IP: net.ParseIP("10.49.27.0"), Mask: net.IPv4Mask(255, 255, 255, 0)}

toEvictPod = func(pod *corev1.Pod) {
pod.Status.Phase = corev1.PodFailed
pod.Status.Reason = "Evicted"
}
toSuccessPod = func(pod *corev1.Pod) {
pod.Status.Phase = corev1.PodSucceeded
}
)

func createPluginTestNodes(t *testing.T, objs ...runtime.Object) (*FloatingIPPlugin, chan struct{}, []corev1.Node) {
Expand Down Expand Up @@ -500,11 +509,7 @@ func createPluginFactoryArgs(t *testing.T, objs ...runtime.Object) (*PluginFacto
StatefulSetLister: statefulsetInformer.Lister(),
DeploymentLister: deploymentInformer.Lister(),
Client: client,
PodHasSynced: podInformer.Informer().HasSynced,
StatefulSetSynced: statefulsetInformer.Informer().HasSynced,
DeploymentSynced: deploymentInformer.Informer().HasSynced,
PoolLister: poolInformer.Lister(),
PoolSynced: poolInformer.Informer().HasSynced,
//TAppClient: tappCli,
//TAppHasSynced: tappInformer.Informer().HasSynced,
//TAppLister: tappInformer.Lister(),
Expand All @@ -513,8 +518,10 @@ func createPluginFactoryArgs(t *testing.T, objs ...runtime.Object) (*PluginFacto
FIPInformer: FIPInformer,
}
//tapp.EnsureCRDCreated(pluginArgs.ExtClient)
go informerFactory.Start(stopChan)
go crdInformerFactory.Start(stopChan)
informerFactory.Start(stopChan)
crdInformerFactory.Start(stopChan)
informerFactory.WaitForCacheSync(stopChan)
crdInformerFactory.WaitForCacheSync(stopChan)
//go tappInformerFactory.Start(stopChan)
return pluginArgs, podInformer, stopChan
}
Expand Down Expand Up @@ -797,3 +804,33 @@ func checkBind(fipPlugin *FloatingIPPlugin, pod *corev1.Pod, nodeName, checkKey
}
return fipInfo, nil
}

func TestReleaseIPOfEvictOrCompletePod(t *testing.T) {
for i, testCase := range []struct {
updatePodStatus func(pod *corev1.Pod)
}{
{updatePodStatus: toEvictPod},
{updatePodStatus: toSuccessPod},
} {
pod := CreateStatefulSetPod("pod1-0", "ns1", nil)
podKey, _ := schedulerplugin_util.FormatKey(pod)
func() {
fipPlugin, stopChan, _ := createPluginTestNodes(t, pod)
fipPlugin.Run(stopChan)
defer func() { stopChan <- struct{}{} }()
fipInfo, err := checkBind(fipPlugin, pod, node3, podKey.KeyInDB, node3Subnet)
if err != nil {
t.Fatalf("case %d: %v", i, err)
}
testCase.updatePodStatus(pod)
if _, err := fipPlugin.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(pod); err != nil {
t.Fatalf("case %d: %v", i, err)
}
if err := wait.Poll(time.Microsecond*10, time.Second*30, func() (done bool, err error) {
return checkIPKey(fipPlugin.ipam, fipInfo.FIP.IP.String(), "") == nil, nil
}); err != nil {
t.Fatalf("case %d: %v", i, err)
}
}()
}
}
37 changes: 7 additions & 30 deletions pkg/ipam/schedulerplugin/resync.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,30 +36,6 @@ import (
tappv1 "tkestack.io/tapp/pkg/apis/tappcontroller/v1"
)

func (p *FloatingIPPlugin) storeReady() bool {
if !p.PodHasSynced() {
glog.V(3).Infof("the pod store has not been synced yet")
return false
}
if !p.StatefulSetSynced() {
glog.V(3).Infof("the statefulset store has not been synced yet")
return false
}
if !p.DeploymentSynced() {
glog.V(3).Infof("the deployment store has not been synced yet")
return false
}
if p.TAppHasSynced != nil && !p.TAppHasSynced() {
glog.V(3).Infof("the tapp store has not been synced yet")
return false
}
if !p.PoolSynced() {
glog.V(3).Infof("the pool store has not been synced yet")
return false
}
return true
}

type resyncObj struct {
keyObj *util.KeyObj
fip floatingip.FloatingIP
Expand All @@ -71,7 +47,7 @@ type resyncObj struct {
// 3. deleted pods whose parent deployment no need so many ips
// 4. deleted pods whose parent statefulset/tapp exist but pod index > .spec.replica
// 5. existing pods but its status is evicted
func (p *FloatingIPPlugin) resyncPod(ipam floatingip.IPAM) error {
func (p *FloatingIPPlugin) resyncPod() error {
glog.V(4).Infof("resync pods+")
defer glog.V(4).Infof("resync pods-")
resyncMeta := &resyncMeta{
Expand Down Expand Up @@ -219,14 +195,15 @@ func (p *FloatingIPPlugin) resyncTappOrSts(meta *resyncMeta, keyObj *util.KeyObj
}

func (p *FloatingIPPlugin) podExist(podName, namespace string) bool {
chenchun marked this conversation as resolved.
Show resolved Hide resolved
_, err := p.Client.CoreV1().Pods(namespace).Get(podName, v1.GetOptions{})
pod, err := p.Client.CoreV1().Pods(namespace).Get(podName, v1.GetOptions{})
if err != nil {
if metaErrs.IsNotFound(err) {
return false
}
// we cannot figure out whether pod exist or not
// we cannot figure out whether pod exist or not, we'd better keep the ip
return true
}
return true
return !shouldReleaseIP(pod)
}

func parsePodIndex(name string) (int, error) {
Expand Down Expand Up @@ -255,7 +232,7 @@ func (p *FloatingIPPlugin) listWantedPodsToMap() (map[string]*corev1.Pod, error)
}
existPods := map[string]*corev1.Pod{}
for i := range pods {
if evicted(pods[i]) {
if shouldReleaseIP(pods[i]) {
// for evicted pod, treat as not exist
chenchun marked this conversation as resolved.
Show resolved Hide resolved
continue
}
Expand Down Expand Up @@ -284,7 +261,7 @@ func (p *FloatingIPPlugin) syncPodIPsIntoDB() {
}

// #lizard forgives
// syncPodIP sync pod ip with db, if the pod has ipinfos annotation and the ip is unallocated in db, allocate the ip
// syncPodIP sync pod ip with ipam, if the pod has ipinfos annotation and the ip is unallocated in ipam, allocate the ip
// to the pod
func (p *FloatingIPPlugin) syncPodIP(pod *corev1.Pod) error {
if pod.Status.Phase != corev1.PodRunning {
Expand Down
11 changes: 9 additions & 2 deletions pkg/ipam/schedulerplugin/resync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"net"
"testing"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
. "tkestack.io/galaxy/pkg/ipam/schedulerplugin/testing"
"tkestack.io/galaxy/pkg/ipam/schedulerplugin/util"
Expand All @@ -39,7 +40,7 @@ func TestResyncAppNotExist(t *testing.T) {
if err := fipPlugin.ipam.AllocateSpecificIP(pod2Key.KeyInDB, net.ParseIP("10.49.27.216"), parseReleasePolicy(&pod2.ObjectMeta), ""); err != nil {
t.Fatal(err)
}
if err := fipPlugin.resyncPod(fipPlugin.ipam); err != nil {
if err := fipPlugin.resyncPod(); err != nil {
t.Fatal(err)
}
if err := checkIPKey(fipPlugin.ipam, "10.49.27.205", pod1Key.PoolPrefix()); err != nil {
Expand All @@ -55,6 +56,7 @@ func TestResyncStsPod(t *testing.T) {
annotations map[string]string
replicas int32
createPod bool
updateStatus func(*corev1.Pod)
createApp bool
expectKeyFunc func(obj *util.KeyObj) string
}{
Expand All @@ -66,9 +68,14 @@ func TestResyncStsPod(t *testing.T) {
{annotations: neverAnnotation, replicas: 0, expectKeyFunc: podNameFunc, createApp: true},
{annotations: neverAnnotation, replicas: 1, expectKeyFunc: podNameFunc, createApp: true},
{annotations: neverAnnotation, replicas: 1, expectKeyFunc: podNameFunc, createApp: false},
{annotations: nil, replicas: 1, expectKeyFunc: emptyNameFunc, createPod: true, updateStatus: toEvictPod}, // pod evicted, ip will be released
{annotations: nil, replicas: 1, expectKeyFunc: emptyNameFunc, createPod: true, updateStatus: toSuccessPod}, // pod completed, ip will be released
} {
var objs []runtime.Object
pod := CreateStatefulSetPod("sts-xxx-0", "ns1", testCase.annotations)
if testCase.updateStatus != nil {
testCase.updateStatus(pod)
}
keyObj, _ := util.FormatKey(pod)
if testCase.createPod {
objs = append(objs, pod)
Expand All @@ -84,7 +91,7 @@ func TestResyncStsPod(t *testing.T) {
if err := fipPlugin.ipam.AllocateSpecificIP(keyObj.KeyInDB, net.ParseIP("10.49.27.205"), parseReleasePolicy(&pod.ObjectMeta), ""); err != nil {
t.Fatalf("case %d, err %v", i, err)
}
if err := fipPlugin.resyncPod(fipPlugin.ipam); err != nil {
if err := fipPlugin.resyncPod(); err != nil {
t.Fatalf("case %d, err %v", i, err)
}
if err := checkIPKey(fipPlugin.ipam, "10.49.27.205", testCase.expectKeyFunc(keyObj)); err != nil {
Expand Down
5 changes: 0 additions & 5 deletions pkg/ipam/schedulerplugin/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,6 @@ type PluginFactoryArgs struct {
DeploymentLister appv1.DeploymentLister
TAppLister v1.TAppLister
PoolLister list.PoolLister
PodHasSynced func() bool
StatefulSetSynced func() bool
DeploymentSynced func() bool
TAppHasSynced func() bool
PoolSynced func() bool
CrdClient crd_clientset.Interface
ExtClient extensionClient.Interface
FIPInformer crdInformer.FloatingIPInformer
Expand Down
14 changes: 6 additions & 8 deletions pkg/ipam/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,7 @@ func (s *Server) init() error {
DeploymentLister: deploymentInformer.Lister(),
Client: s.client,
TAppClient: s.tappClient,
PodHasSynced: podInformer.Informer().HasSynced,
StatefulSetSynced: statefulsetInformer.Informer().HasSynced,
DeploymentSynced: deploymentInformer.Informer().HasSynced,
PoolLister: poolInformer.Lister(),
PoolSynced: poolInformer.Informer().HasSynced,
CrdClient: s.crdClient,
ExtClient: s.extensionClient,
FIPInformer: fipInformer,
Expand All @@ -123,7 +119,6 @@ func (s *Server) init() error {
s.tappInformerFactory = tappInformers.NewSharedInformerFactory(s.tappClient, time.Minute)
tappInformer := s.tappInformerFactory.Tappcontroller().V1().TApps()
pluginArgs.TAppLister = tappInformer.Lister()
pluginArgs.TAppHasSynced = tappInformer.Informer().HasSynced
}
s.plugin, err = schedulerplugin.NewFloatingIPPlugin(s.SchedulePluginConf, pluginArgs)
if err != nil {
Expand All @@ -145,10 +140,13 @@ func (s *Server) Start() error {
}

func (s *Server) Run() error {
go s.informerFactory.Start(s.stopChan)
go s.crdInformerFactory.Start(s.stopChan)
s.informerFactory.Start(s.stopChan)
s.crdInformerFactory.Start(s.stopChan)
s.informerFactory.WaitForCacheSync(s.stopChan)
s.crdInformerFactory.WaitForCacheSync(s.stopChan)
if s.tappInformerFactory != nil {
go s.tappInformerFactory.Start(s.stopChan)
s.tappInformerFactory.Start(s.stopChan)
s.tappInformerFactory.WaitForCacheSync(s.stopChan)
}
if err := crd.EnsureCRDCreated(s.extensionClient); err != nil {
return err
Expand Down