diff --git a/pkg/ipam/api/api.go b/pkg/ipam/api/api.go index c1872bad..aec54c6c 100644 --- a/pkg/ipam/api/api.go +++ b/pkg/ipam/api/api.go @@ -60,7 +60,6 @@ type FloatingIP struct { UpdateTime time.Time `json:"updateTime,omitempty"` Status string `json:"status,omitempty"` Releasable bool `json:"releasable,omitempty"` - attr string `json:"-"` labels map[string]string `json:"-"` } @@ -349,8 +348,7 @@ func transform(fips []floatingip.FloatingIP) []FloatingIP { AppType: toAppType(keyObj.AppTypePrefix), Policy: fips[i].Policy, UpdateTime: fips[i].UpdatedAt, - labels: fips[i].Labels, - attr: fips[i].Attr}) + labels: fips[i].Labels}) } return res } diff --git a/pkg/ipam/api/pool.go b/pkg/ipam/api/pool.go index 26c25ff2..c16fe925 100644 --- a/pkg/ipam/api/pool.go +++ b/pkg/ipam/api/pool.go @@ -164,7 +164,7 @@ func (c *PoolController) preAllocateIP(req *restful.Request, resp *restful.Respo } needAllocateIPs := pool.Size - len(fips) for i := 0; i < needAllocateIPs; i++ { - ip, err := c.IPAM.AllocateInSubnet(poolPrefix, subnetIPNet, constant.ReleasePolicyNever, "") + ip, err := c.IPAM.AllocateInSubnet(poolPrefix, subnetIPNet, floatingip.Attr{Policy: constant.ReleasePolicyNever}) if err == nil { glog.Infof("allocated ip %s to %s during creating or updating pool", ip.String(), poolPrefix) continue diff --git a/pkg/ipam/floatingip/floatingip.go b/pkg/ipam/floatingip/floatingip.go index 82b9259d..3c97cf2b 100644 --- a/pkg/ipam/floatingip/floatingip.go +++ b/pkg/ipam/floatingip/floatingip.go @@ -24,39 +24,51 @@ import ( "time" "k8s.io/apimachinery/pkg/util/sets" + "tkestack.io/galaxy/pkg/api/galaxy/constant" "tkestack.io/galaxy/pkg/utils/nets" ) // FloatingIP defines a floating ip type FloatingIP struct { - Key string - Subnets sets.String // node subnet, not container ip's subnet - // TODO Replace attr with labels - Attr string + Key string + Subnets sets.String // node subnet, not container ip's subnet IP net.IP - Policy uint16 UpdatedAt time.Time Labels map[string]string + Policy uint16 + NodeName string + PodUid string } func (f FloatingIP) String() string { - return fmt.Sprintf("FloatingIP{ip:%s key:%s attr:%s policy:%d subnets:%v}", f.IP.String(), f.Key, f.Attr, f.Policy, f.Subnets) + return fmt.Sprintf("FloatingIP{ip:%s key:%s policy:%d nodeName:%s podUid:%s subnets:%v}", + f.IP.String(), f.Key, f.Policy, f.NodeName, f.PodUid, f.Subnets) +} + +// New creates a new FloatingIP +func New(ip net.IP, subnets sets.String, key string, attr *Attr, updateAt time.Time) *FloatingIP { + fip := &FloatingIP{IP: ip, Subnets: subnets} + fip.Assign(key, attr, updateAt) + return fip } -func (f *FloatingIP) Assign(key, attr string, policy uint16, updateAt time.Time) *FloatingIP { +// Assign updates key, attr, updatedAt of FloatingIP +func (f *FloatingIP) Assign(key string, attr *Attr, updateAt time.Time) *FloatingIP { f.Key = key - f.Attr = attr - f.Policy = policy + f.Policy = uint16(attr.Policy) f.UpdatedAt = updateAt + f.NodeName = attr.NodeName + f.PodUid = attr.Uid return f } -func (f *FloatingIP) CloneWith(key, attr string, policy uint16, updateAt time.Time) *FloatingIP { +// CloneWith creates a new FloatingIP and updates key, attr, updatedAt +func (f *FloatingIP) CloneWith(key string, attr *Attr, updateAt time.Time) *FloatingIP { fip := &FloatingIP{ IP: f.IP, Subnets: f.Subnets, } - return fip.Assign(key, attr, policy, updateAt) + return fip.Assign(key, attr, updateAt) } // FloatingIPPool is FloatingIPPool structure. @@ -284,3 +296,34 @@ func (s FloatingIPSlice) Swap(i, j int) { func (s FloatingIPSlice) Less(i, j int) bool { return nets.IPToInt(s[i].Gateway) < nets.IPToInt(s[j].Gateway) } + +// Attr stores attrs about this pod +type Attr struct { + // NodeName is needed to send unassign request to cloud provider on resync + NodeName string + // uid is used to differentiate a deleting pod and a newly created pod with the same name such as statefulsets + // or tapp pod + Uid string + // Release policy + Policy constant.ReleasePolicy `json:"-"` +} + +func (a Attr) String() string { + return fmt.Sprintf("Attr{policy:%d nodeName:%s uid:%s}", a.Policy, a.NodeName, a.Uid) +} + +// unmarshalAttr unmarshal attributes and assign PodUid and NodeName +// Make sure invoke this func in a copied FloatingIP +func (f *FloatingIP) unmarshalAttr(attrStr string) error { + if attrStr == "" { + return nil + } + var attr Attr + if err := json.Unmarshal([]byte(attrStr), &attr); err != nil { + return fmt.Errorf("unmarshal attr %s for %s %s: %v", attrStr, f.Key, f.IP.String(), err) + } else { + f.NodeName = attr.NodeName + f.PodUid = attr.Uid + } + return nil +} diff --git a/pkg/ipam/floatingip/ipam.go b/pkg/ipam/floatingip/ipam.go index 102aff45..7ae11368 100644 --- a/pkg/ipam/floatingip/ipam.go +++ b/pkg/ipam/floatingip/ipam.go @@ -39,15 +39,16 @@ type IPAM interface { // unreleased map stores ip with its latest key if key changed ReleaseIPs(map[string]string) (map[string]string, map[string]string, error) // AllocateSpecificIP allocate pod a specific IP. - AllocateSpecificIP(string, net.IP, constant.ReleasePolicy, string) error + AllocateSpecificIP(string, net.IP, Attr) error // AllocateInSubnet allocate subnet of IPs. - AllocateInSubnet(string, *net.IPNet, constant.ReleasePolicy, string) (net.IP, error) + AllocateInSubnet(string, *net.IPNet, Attr) (net.IP, error) // AllocateInSubnetWithKey allocate a floatingIP in given subnet and key. - AllocateInSubnetWithKey(oldK, newK, subnet string, policy constant.ReleasePolicy, attr string) error - // ReserveIP can reserve a IP entitled by a terminated pod. - ReserveIP(oldK, newK, attr string) error - // UpdatePolicy update floatingIP's release policy and attr according to ip and key - UpdatePolicy(string, net.IP, constant.ReleasePolicy, string) error + AllocateInSubnetWithKey(oldK, newK, subnet string, attr Attr) error + // ReserveIP can reserve a IP entitled by a terminated pod. Attributes **expect policy attr** will be updated. + // Returns true if key or attr updated. + ReserveIP(oldK, newK string, attr Attr) (bool, error) + // UpdateAttr update floatingIP's release policy and attrs according to ip and key + UpdateAttr(string, net.IP, Attr) error // Release release a given IP. Release(string, net.IP) error // First returns the first matched IP by key. diff --git a/pkg/ipam/floatingip/ipam_crd.go b/pkg/ipam/floatingip/ipam_crd.go index 0fc3bea2..3a988c9b 100644 --- a/pkg/ipam/floatingip/ipam_crd.go +++ b/pkg/ipam/floatingip/ipam_crd.go @@ -97,7 +97,7 @@ func NewCrdIPAM(fipClient crd_clientset.Interface, ipType Type, informer crdInfo } // AllocateSpecificIP allocate pod a specific IP. -func (ci *crdIpam) AllocateSpecificIP(key string, ip net.IP, policy constant.ReleasePolicy, attr string) error { +func (ci *crdIpam) AllocateSpecificIP(key string, ip net.IP, attr Attr) error { ipStr := ip.String() ci.cacheLock.RLock() spec, find := ci.unallocatedFIPs[ipStr] @@ -105,15 +105,7 @@ func (ci *crdIpam) AllocateSpecificIP(key string, ip net.IP, policy constant.Rel if !find { return fmt.Errorf("failed to find floating ip by %s in cache", ipStr) } - date := time.Now() - allocated := &FloatingIP{ - IP: ip, - Key: key, - Subnets: spec.Subnets, - Attr: attr, - Policy: uint16(policy), - UpdatedAt: date, - } + allocated := New(ip, spec.Subnets, key, &attr, time.Now()) if err := ci.createFloatingIP(allocated); err != nil { glog.Errorf("failed to create floatingIP %s: %v", ipStr, err) return err @@ -125,55 +117,38 @@ func (ci *crdIpam) AllocateSpecificIP(key string, ip net.IP, policy constant.Rel } // AllocateInSubnet allocate subnet of IPs. -func (ci *crdIpam) AllocateInSubnet(key string, nodeSubnet *net.IPNet, policy constant.ReleasePolicy, - attr string) (allocated net.IP, err error) { +func (ci *crdIpam) AllocateInSubnet(key string, nodeSubnet *net.IPNet, attr Attr) (net.IP, error) { if nodeSubnet == nil { // this should never happen return nil, fmt.Errorf("nil nodeSubnet") } var ipStr string ci.cacheLock.Lock() + defer ci.cacheLock.Unlock() nodeSubnetStr := nodeSubnet.String() for k, v := range ci.unallocatedFIPs { //find an unallocated fip, then use it if v.Subnets.Has(nodeSubnetStr) { ipStr = k - date := time.Now() // we never updates ip or subnet object, it's ok to share these objs. - allocatedFIP := &FloatingIP{ - IP: v.IP, - Key: key, - Subnets: v.Subnets, - Attr: attr, - Policy: uint16(policy), - UpdatedAt: date, - } - if err = ci.createFloatingIP(allocatedFIP); err != nil { + allocated := New(v.IP, v.Subnets, key, &attr, time.Now()) + if err := ci.createFloatingIP(allocated); err != nil { glog.Errorf("failed to create floatingIP %s: %v", ipStr, err) - ci.cacheLock.Unlock() - return + return nil, err } //sync cache when crd create success - ci.syncCacheAfterCreate(allocatedFIP) + ci.syncCacheAfterCreate(allocated) break } } - ci.cacheLock.Unlock() if ipStr == "" { return nil, ErrNoEnoughIP } - ci.cacheLock.RLock() - defer ci.cacheLock.RUnlock() - if err = ci.getFloatingIP(ipStr); err != nil { - return - } - allocated = net.ParseIP(ipStr) - return + return net.ParseIP(ipStr), nil } // AllocateInSubnetWithKey allocate a floatingIP in given subnet and key. -func (ci *crdIpam) AllocateInSubnetWithKey(oldK, newK, subnet string, policy constant.ReleasePolicy, - attr string) error { +func (ci *crdIpam) AllocateInSubnetWithKey(oldK, newK, subnet string, attr Attr) error { ci.cacheLock.Lock() defer ci.cacheLock.Unlock() var ( @@ -193,35 +168,40 @@ func (ci *crdIpam) AllocateInSubnetWithKey(oldK, newK, subnet string, policy con return fmt.Errorf("failed to find floatIP by key %s", oldK) } date := time.Now() - cloned := latest.CloneWith(newK, attr, uint16(policy), date) + cloned := latest.CloneWith(newK, &attr, date) if err := ci.updateFloatingIP(cloned); err != nil { glog.Errorf("failed to update floatingIP %s: %v", cloned.IP.String(), err) return err } - latest.Assign(newK, attr, uint16(policy), date) + latest.Assign(newK, &attr, date) return nil } // ReserveIP can reserve a IP entitled by a terminated pod. -func (ci *crdIpam) ReserveIP(oldK, newK, attr string) error { +func (ci *crdIpam) ReserveIP(oldK, newK string, attr Attr) (bool, error) { ci.cacheLock.Lock() defer ci.cacheLock.Unlock() for k, v := range ci.allocatedFIPs { if v.Key == oldK { + if oldK == newK && v.PodUid == attr.Uid && v.NodeName == attr.NodeName { + // nothing changed + return false, nil + } + attr.Policy = constant.ReleasePolicy(v.Policy) date := time.Now() - if err := ci.updateFloatingIP(v.CloneWith(newK, attr, v.Policy, date)); err != nil { + if err := ci.updateFloatingIP(v.CloneWith(newK, &attr, date)); err != nil { glog.Errorf("failed to update floatingIP %s: %v", k, err) - return err + return false, err } - v.Assign(newK, attr, v.Policy, date) - return nil + v.Assign(newK, &attr, date) + return true, nil } } - return fmt.Errorf("failed to find floatIP by key %s", oldK) + return false, fmt.Errorf("failed to find floatIP by key %s", oldK) } -// UpdatePolicy update floatingIP's release policy and attr according to ip and key -func (ci *crdIpam) UpdatePolicy(key string, ip net.IP, policy constant.ReleasePolicy, attr string) error { +// UpdateAttr update floatingIP's release policy and attr according to ip and key +func (ci *crdIpam) UpdateAttr(key string, ip net.IP, attr Attr) error { ipStr := ip.String() ci.cacheLock.Lock() defer ci.cacheLock.Unlock() @@ -233,11 +213,11 @@ func (ci *crdIpam) UpdatePolicy(key string, ip net.IP, policy constant.ReleasePo return fmt.Errorf("key for %s is %s, not %s", ipStr, v.Key, key) } date := time.Now() - if err := ci.updateFloatingIP(v.CloneWith(v.Key, attr, uint16(policy), date)); err != nil { + if err := ci.updateFloatingIP(v.CloneWith(v.Key, &attr, date)); err != nil { glog.Errorf("failed to update floatingIP %s: %v", ipStr, err) return err } - v.Assign(v.Key, attr, uint16(policy), date) + v.Assign(v.Key, &attr, date) return nil } @@ -395,7 +375,6 @@ func (ci *crdIpam) ConfigurePool(floatIPs []*FloatingIPPool) error { tmpFip := &FloatingIP{ IP: netIP, Key: ip.Spec.Key, - Attr: ip.Spec.Attribute, Policy: uint16(ip.Spec.Policy), // Since subnets may change and for reserved fips crds created by user manually, subnets may not be // correct, assign it to the latest config instead of crd value @@ -403,6 +382,9 @@ func (ci *crdIpam) ConfigurePool(floatIPs []*FloatingIPPool) error { Subnets: nodeSubnets[i], UpdatedAt: ip.Spec.UpdateTime.Time, } + if err := tmpFip.unmarshalAttr(ip.Spec.Attribute); err != nil { + glog.Error(err) + } tmpCacheAllocated[ip.Name] = tmpFip break } @@ -440,7 +422,6 @@ func (ci *crdIpam) ConfigurePool(floatIPs []*FloatingIPPool) error { tmpFip := &FloatingIP{ IP: ip, Key: "", - Attr: "", Policy: uint16(constant.ReleasePolicyPodDelete), Subnets: subnetSet, UpdatedAt: now, @@ -467,7 +448,7 @@ func (ci *crdIpam) syncCacheAfterCreate(fip *FloatingIP) { // don't use lock inner function, otherwise deadlock will be caused func (ci *crdIpam) syncCacheAfterDel(released *FloatingIP) { ipStr := released.IP.String() - released.Assign("", "", uint16(constant.ReleasePolicyPodDelete), time.Now()) + released.Assign("", &Attr{Policy: constant.ReleasePolicyPodDelete}, time.Now()) released.Labels = nil delete(ci.allocatedFIPs, ipStr) ci.unallocatedFIPs[ipStr] = released diff --git a/pkg/ipam/floatingip/ipam_crd_test.go b/pkg/ipam/floatingip/ipam_crd_test.go index e2af3c34..4c5ac73a 100644 --- a/pkg/ipam/floatingip/ipam_crd_test.go +++ b/pkg/ipam/floatingip/ipam_crd_test.go @@ -35,8 +35,8 @@ import ( ) const ( - pod1CRD = `{"kind":"FloatingIP","apiVersion":"galaxy.k8s.io/v1alpha1","metadata":{"name":"10.49.27.205","creationTimestamp":null,"labels":{"ipType":"internalIP"}},"spec":{"key":"pod1","attribute":"212","policy":2,"subnet":"10.49.27.0/24","updateTime":null}}` - pod2CRD = `{"kind":"FloatingIP","apiVersion":"galaxy.k8s.io/v1alpha1","metadata":{"name":"10.49.27.216","creationTimestamp":null,"labels":{"ipType":"internalIP"}},"spec":{"key":"pod2","attribute":"333","policy":1,"subnet":"10.49.27.0/24","updateTime":null}}` + pod1CRD = `{"kind":"FloatingIP","apiVersion":"galaxy.k8s.io/v1alpha1","metadata":{"name":"10.49.27.205","creationTimestamp":null,"labels":{"ipType":"internalIP"}},"spec":{"key":"pod1","attribute":"{\"NodeName\":\"212\",\"Uid\":\"xx1\"}","policy":2,"subnet":"10.49.27.0/24","updateTime":null}}` + pod2CRD = `{"kind":"FloatingIP","apiVersion":"galaxy.k8s.io/v1alpha1","metadata":{"name":"10.49.27.216","creationTimestamp":null,"labels":{"ipType":"internalIP"}},"spec":{"key":"pod2","attribute":"{\"NodeName\":\"333\",\"Uid\":\"xx2\"}","policy":1,"subnet":"10.49.27.0/24","updateTime":null}}` policy = constant.ReleasePolicyPodDelete ) @@ -111,7 +111,6 @@ func TestConfigurePoolWithAllocatedIP(t *testing.T) { IP: net.ParseIP("10.49.27.205"), Key: "pod2", Subnets: sets.NewString("subnet1"), // assign a bad subnet to test if it can be correct - Attr: "pod2 attr", Policy: 0, UpdatedAt: time.Now(), } @@ -120,7 +119,9 @@ func TestConfigurePoolWithAllocatedIP(t *testing.T) { internalIP := InternalIp ipType, _ := internalIP.String() fipCrd.Labels[constant.IpType] = ipType - assign(fipCrd, expectFip) + if err := assign(fipCrd, expectFip); err != nil { + t.Fatal(err) + } ipam := createTestCrdIPAM(t, fipCrd) if len(ipam.allocatedFIPs) != 1 { t.Fatal(len(ipam.allocatedFIPs)) @@ -142,7 +143,8 @@ func TestConfigurePoolWithAllocatedIP(t *testing.T) { func TestCRDAllocateSpecificIP(t *testing.T) { now := time.Now() ipam := createTestCrdIPAM(t) - if err := ipam.AllocateSpecificIP("pod1", net.ParseIP("10.49.27.205"), constant.ReleasePolicyNever, "212"); err != nil { + if err := ipam.AllocateSpecificIP("pod1", net.ParseIP("10.49.27.205"), + Attr{Policy: constant.ReleasePolicyNever, NodeName: "212", Uid: "xx1"}); err != nil { t.Fatal(err) } if len(ipam.allocatedFIPs) != 1 { @@ -155,7 +157,7 @@ func TestCRDAllocateSpecificIP(t *testing.T) { if !allocated.UpdatedAt.After(now) { t.Fatal(allocated.UpdatedAt) } - if `FloatingIP{ip:10.49.27.205 key:pod1 attr:212 policy:2 subnets:map[10.49.27.0/24:{}]}` != + if `FloatingIP{ip:10.49.27.205 key:pod1 policy:2 nodeName:212 podUid:xx1 subnets:map[10.49.27.0/24:{}]}` != fmt.Sprintf("%+v", allocated) { t.Fatal(fmt.Sprintf("%+v", allocated)) } @@ -186,10 +188,29 @@ func checkFIP(ipam *crdIpam, expect string) error { func TestCRDReserveIP(t *testing.T) { ipam := createTestCrdIPAM(t) - testReserveIP(t, ipam) - if err := checkFIP(ipam, `{"kind":"FloatingIP","apiVersion":"galaxy.k8s.io/v1alpha1","metadata":{"name":"10.49.27.205","creationTimestamp":null,"labels":{"ipType":"internalIP"}},"spec":{"key":"p1","attribute":"this is p1","policy":2,"subnet":"10.49.27.0/24","updateTime":null}}`); err != nil { + if err := ipam.AllocateSpecificIP("pod1", net.ParseIP("10.49.27.205"), + Attr{Policy: constant.ReleasePolicyNever, NodeName: "node1", Uid: "xx1"}); err != nil { + t.Fatal(err) + } + newAttr := Attr{NodeName: "node2", Uid: "xx2", Policy: constant.ReleasePolicyNever} + if reserved, err := ipam.ReserveIP("pod1", "p1", newAttr); err != nil { + t.Fatal(err) + } else if !reserved { + t.Fatal() + } + if err := checkIPKeyAttr(ipam, "10.49.27.205", "p1", &newAttr); err != nil { t.Fatal(err) } + if err := checkFIP(ipam, `{"kind":"FloatingIP","apiVersion":"galaxy.k8s.io/v1alpha1","metadata":{"name":"10.49.27.205","creationTimestamp":null,"labels":{"ipType":"internalIP"}},"spec":{"key":"p1","attribute":"{\"NodeName\":\"node2\",\"Uid\":\"xx2\"}","policy":2,"subnet":"10.49.27.0/24","updateTime":null}}`); err != nil { + t.Fatal(err) + } + // reserve again, should not succeed + newAttr.Policy = constant.ReleasePolicyPodDelete // policy should not be updated by ReserveIP + if reserved, err := ipam.ReserveIP("p1", "p1", newAttr); err != nil { + t.Fatal(err) + } else if reserved { + t.Fatal() + } } func TestCRDRelease(t *testing.T) { @@ -239,18 +260,6 @@ func testRelease(t *testing.T, ipam IPAM) { } } -func testReserveIP(t *testing.T, ipam IPAM) { - if err := ipam.AllocateSpecificIP("pod1", net.ParseIP("10.49.27.205"), constant.ReleasePolicyNever, "212"); err != nil { - t.Fatal(err) - } - if err := ipam.ReserveIP("pod1", "p1", "this is p1"); err != nil { - t.Fatal(err) - } - if err := checkIPKeyAttr(ipam, "10.49.27.205", "p1", "this is p1"); err != nil { - t.Fatal(err) - } -} - func testReleaseIPs(t *testing.T, ipam IPAM) { allocateSomeIPs(t, ipam) relesed, unreleased, err := ipam.ReleaseIPs(map[string]string{ @@ -289,16 +298,24 @@ func testByKeyword(t *testing.T, ipam IPAM) { if fips[0].Key != "pod2" { t.Fatal(fips) } + if fips[0].NodeName != "333" { + t.Fatal(fips) + } + if fips[0].PodUid != "xx2" { + t.Fatal(fips) + } if !fips[0].UpdatedAt.After(now) { t.Fatalf("now %v, update time %v", now, fips[0].UpdatedAt) } } func allocateSomeIPs(t *testing.T, ipam IPAM) { - if err := ipam.AllocateSpecificIP("pod1", net.ParseIP("10.49.27.205"), constant.ReleasePolicyNever, "212"); err != nil { + if err := ipam.AllocateSpecificIP("pod1", net.ParseIP("10.49.27.205"), + Attr{Policy: constant.ReleasePolicyNever, NodeName: "212", Uid: "xx1"}); err != nil { t.Fatal(err) } - if err := ipam.AllocateSpecificIP("pod2", net.ParseIP("10.49.27.216"), constant.ReleasePolicyImmutable, "333"); err != nil { + if err := ipam.AllocateSpecificIP("pod2", net.ParseIP("10.49.27.216"), + Attr{Policy: constant.ReleasePolicyImmutable, NodeName: "333", Uid: "xx2"}); err != nil { t.Fatal(err) } } @@ -332,6 +349,9 @@ func checkByPrefix(ipam IPAM, prefix string, expectKeys ...string) error { if _, ok := expectMap[fip.Key]; !ok { return fmt.Errorf("expect %v, got %v", expectKeys, fips) } + if fip.NodeName == "" || fip.PodUid == "" { + return fmt.Errorf("expect nodeName and podUid are not empty") + } } return nil } @@ -340,11 +360,11 @@ func checkIPKey(ipam IPAM, checkIP, expectKey string) error { return checkByIP(ipam, checkIP, expectKey, nil) } -func checkIPKeyAttr(ipam IPAM, checkIP, expectKey, expectAttr string) error { - return checkByIP(ipam, checkIP, expectKey, &expectAttr) +func checkIPKeyAttr(ipam IPAM, checkIP, expectKey string, expectAttr *Attr) error { + return checkByIP(ipam, checkIP, expectKey, expectAttr) } -func checkByIP(ipam IPAM, checkIP, expectKey string, expectAttr *string) error { +func checkByIP(ipam IPAM, checkIP, expectKey string, expectAttr *Attr) error { ip := net.ParseIP(checkIP) if ip == nil { return fmt.Errorf("bad check ip: %s", checkIP) @@ -357,8 +377,14 @@ func checkByIP(ipam IPAM, checkIP, expectKey string, expectAttr *string) error { return fmt.Errorf("expect key: %s, got %s, ip %s", expectKey, fip.Key, checkIP) } if expectAttr != nil { - if fip.Attr != *expectAttr { - return fmt.Errorf("expect attr: %s, got %s, ip %s", *expectAttr, fip.Attr, checkIP) + if fip.PodUid != expectAttr.Uid { + return fmt.Errorf("expect podUid: %s, got %s, ip %s", expectAttr.Uid, fip.PodUid, checkIP) + } + if fip.NodeName != expectAttr.NodeName { + return fmt.Errorf("expect nodeName: %s, got %s, ip %s", expectAttr.NodeName, fip.NodeName, checkIP) + } + if fip.Policy != uint16(expectAttr.Policy) { + return fmt.Errorf("expect policy: %v, got %d, ip %s", expectAttr.Policy, fip.Policy, checkIP) } } return nil @@ -379,7 +405,7 @@ func TestAllocateInSubnet(t *testing.T) { } for i := range testCases { testCase := testCases[i] - allocatedIP, err := ipam.AllocateInSubnet("pod1", testCase.nodeIPNet, policy, "") + allocatedIP, err := ipam.AllocateInSubnet("pod1", testCase.nodeIPNet, Attr{Policy: policy}) if err != nil { t.Fatalf("test case %d: %v", i, err) } @@ -389,18 +415,18 @@ func TestAllocateInSubnet(t *testing.T) { } // test can't find available ip _, noConfigNode, _ := net.ParseCIDR("10.173.14.0/24") - if _, err := ipam.AllocateInSubnet("pod1-1", noConfigNode, policy, ""); err == nil || err != ErrNoEnoughIP { + if _, err := ipam.AllocateInSubnet("pod1-1", noConfigNode, Attr{Policy: policy}); err == nil || err != ErrNoEnoughIP { t.Fatalf("should fail because of ErrNoEnoughIP: %v", err) } } func TestAllocateInSubnetWithKey(t *testing.T) { ipam := createTestCrdIPAM(t) - allocatedIP, err := ipam.AllocateInSubnet("pod2", node2IPNet, policy, "") + allocatedIP, err := ipam.AllocateInSubnet("pod2", node2IPNet, Attr{Policy: policy}) if err != nil { t.Fatal(err) } - if err := ipam.AllocateInSubnetWithKey("pod2", "pod3", node2IPNet.String(), policy, ""); err != nil { + if err := ipam.AllocateInSubnetWithKey("pod2", "pod3", node2IPNet.String(), Attr{Policy: policy}); err != nil { t.Fatal(err) } ipInfo, err := ipam.First("pod2") @@ -455,7 +481,7 @@ func TestAllocateInMultipleSubnet(t *testing.T) { ipam := createTestCrdIPAM(t) nodeSubnets := sets.NewString() for { - allocatedIP, err := ipam.AllocateInSubnet("pod1", node7IPNet, policy, "") + allocatedIP, err := ipam.AllocateInSubnet("pod1", node7IPNet, Attr{Policy: policy}) if err != nil { if err == ErrNoEnoughIP { break diff --git a/pkg/ipam/floatingip/store_crd.go b/pkg/ipam/floatingip/store_crd.go index ac766277..939040b9 100644 --- a/pkg/ipam/floatingip/store_crd.go +++ b/pkg/ipam/floatingip/store_crd.go @@ -17,6 +17,7 @@ package floatingip import ( + "encoding/json" "fmt" "strings" "time" @@ -45,7 +46,9 @@ func (ci *crdIpam) listFloatingIPs() (*v1alpha1.FloatingIPList, error) { func (ci *crdIpam) createFloatingIP(allocated *FloatingIP) error { glog.V(4).Infof("create floatingIP %v", *allocated) fip := ci.newFIPCrd(allocated.IP.String()) - assign(fip, allocated) + if err := assign(fip, allocated); err != nil { + return err + } if _, err := ci.client.GalaxyV1alpha1().FloatingIPs().Create(fip); err != nil { return err } @@ -57,28 +60,33 @@ func (ci *crdIpam) deleteFloatingIP(name string) error { return ci.client.GalaxyV1alpha1().FloatingIPs().Delete(name, &metav1.DeleteOptions{}) } -func (ci *crdIpam) getFloatingIP(name string) error { - _, err := ci.client.GalaxyV1alpha1().FloatingIPs().Get(name, metav1.GetOptions{}) - return err -} - func (ci *crdIpam) updateFloatingIP(toUpdate *FloatingIP) error { glog.V(4).Infof("update floatingIP %v", *toUpdate) fip, err := ci.client.GalaxyV1alpha1().FloatingIPs().Get(toUpdate.IP.String(), metav1.GetOptions{}) if err != nil { return err } - assign(fip, toUpdate) + if err := assign(fip, toUpdate); err != nil { + return err + } _, err = ci.client.GalaxyV1alpha1().FloatingIPs().Update(fip) return err } -func assign(spec *v1alpha1.FloatingIP, f *FloatingIP) { +func assign(spec *v1alpha1.FloatingIP, f *FloatingIP) error { spec.Spec.Key = f.Key spec.Spec.Policy = constant.ReleasePolicy(f.Policy) - spec.Spec.Attribute = f.Attr + data, err := json.Marshal(Attr{ + NodeName: f.NodeName, + Uid: f.PodUid, + }) + if err != nil { + return err + } + spec.Spec.Attribute = string(data) spec.Spec.Subnet = strings.Join(f.Subnets.List(), ",") spec.Spec.UpdateTime = metav1.NewTime(f.UpdatedAt) + return nil } // handleFIPAssign handles add event for manually created reserved ips @@ -100,7 +108,7 @@ func (ci *crdIpam) handleFIPAssign(obj interface{}) error { if !ok { return fmt.Errorf("there is no ip %s in unallocated map", ipStr) } - unallocated.Assign(fip.Spec.Key, fip.Spec.Attribute, uint16(fip.Spec.Policy), time.Now()) + unallocated.Assign(fip.Spec.Key, &Attr{Policy: fip.Spec.Policy}, time.Now()) unallocated.Labels = map[string]string{constant.ReserveFIPLabel: ""} ci.syncCacheAfterCreate(unallocated) glog.Infof("reserved ip %s", ipStr) diff --git a/pkg/ipam/floatingip/store_crd_test.go b/pkg/ipam/floatingip/store_crd_test.go index c1445dca..6c47c053 100644 --- a/pkg/ipam/floatingip/store_crd_test.go +++ b/pkg/ipam/floatingip/store_crd_test.go @@ -38,13 +38,14 @@ func TestAddFloatingIPEventByUser(t *testing.T) { IP: net.ParseIP("10.49.27.205"), Key: "pod2", Subnets: sets.NewString("subnet1"), - Attr: "pod2 attr", Policy: 0, UpdatedAt: time.Now(), } fipCrd := ipam.newFIPCrd(fip.IP.String()) fipCrd.Labels[constant.ReserveFIPLabel] = "" - assign(fipCrd, fip) + if err := assign(fipCrd, fip); err != nil { + t.Fatal(err) + } if _, err := ipam.client.GalaxyV1alpha1().FloatingIPs().Create(fipCrd); err != nil { t.Fatal(err) } diff --git a/pkg/ipam/schedulerplugin/cloudprovider_test.go b/pkg/ipam/schedulerplugin/cloudprovider_test.go index b1940528..76d58d4b 100644 --- a/pkg/ipam/schedulerplugin/cloudprovider_test.go +++ b/pkg/ipam/schedulerplugin/cloudprovider_test.go @@ -24,6 +24,7 @@ import ( "tkestack.io/galaxy/pkg/api/galaxy/constant" "tkestack.io/galaxy/pkg/api/k8s/schedulerapi" + "tkestack.io/galaxy/pkg/ipam/floatingip" . "tkestack.io/galaxy/pkg/ipam/schedulerplugin/testing" schedulerplugin_util "tkestack.io/galaxy/pkg/ipam/schedulerplugin/util" ) @@ -36,7 +37,8 @@ func TestConcurrentBindUnbind(t *testing.T) { defer func() { stopChan <- struct{}{} }() cloudProvider := &fakeCloudProvider1{m: make(map[string]string)} plugin.cloudProvider = cloudProvider - if err := plugin.ipam.AllocateSpecificIP(podKey.KeyInDB, net.ParseIP("10.49.27.216"), constant.ReleasePolicyPodDelete, "{}"); err != nil { + if err := plugin.ipam.AllocateSpecificIP(podKey.KeyInDB, net.ParseIP("10.49.27.216"), + floatingip.Attr{Policy: constant.ReleasePolicyPodDelete}); err != nil { t.Fatal(err) } var wg sync.WaitGroup diff --git a/pkg/ipam/schedulerplugin/floatingip_plugin.go b/pkg/ipam/schedulerplugin/floatingip_plugin.go index 55973959..2e3e525d 100644 --- a/pkg/ipam/schedulerplugin/floatingip_plugin.go +++ b/pkg/ipam/schedulerplugin/floatingip_plugin.go @@ -17,7 +17,6 @@ package schedulerplugin import ( - "encoding/json" "fmt" "net" "sync" @@ -234,9 +233,9 @@ func (p *FloatingIPPlugin) getSubnet(pod *corev1.Pod) (sets.String, error) { func (p *FloatingIPPlugin) allocateDuringFilter(keyObj *util.KeyObj, reserve, isPoolSizeDefined bool, reserveSubnet string, policy constant.ReleasePolicy, uid string) error { // we can't get nodename during filter, update attr on bind - attr := getAttr("", uid) + attr := floatingip.Attr{Policy: policy, NodeName: "", Uid: uid} if reserve { - if err := p.allocateInSubnetWithKey(keyObj.PoolPrefix(), keyObj.KeyInDB, reserveSubnet, policy, attr, + if err := p.allocateInSubnetWithKey(keyObj.PoolPrefix(), keyObj.KeyInDB, reserveSubnet, attr, "filter"); err != nil { return err } @@ -246,7 +245,7 @@ func (p *FloatingIPPlugin) allocateDuringFilter(keyObj *util.KeyObj, reserve, is if err != nil { return err } - if err := p.allocateInSubnet(keyObj.KeyInDB, ipNet, policy, attr, "filter"); err != nil { + if err := p.allocateInSubnet(keyObj.KeyInDB, ipNet, attr, "filter"); err != nil { return err } } @@ -271,26 +270,20 @@ func (p *FloatingIPPlugin) allocateIP(key string, nodeName string, pod *corev1.P } started := time.Now() policy := parseReleasePolicy(&pod.ObjectMeta) - attr := getAttr(nodeName, string(pod.UID)) + attr := floatingip.Attr{Policy: policy, NodeName: nodeName, Uid: string(pod.UID)} if ipInfo != nil { how = "reused" // check if uid missmatch, if we delete a statfulset/tapp and creates a same name statfulset/tapp immediately, // galaxy-ipam may receive bind event for new pod early than deleting event for old pod - var oldAttr Attr - if ipInfo.FIP.Attr != "" { - if err := json.Unmarshal([]byte(ipInfo.FIP.Attr), &oldAttr); err != nil { - return nil, fmt.Errorf("failed to unmarshal attr %s", ipInfo.FIP.Attr) - } - if oldAttr.Uid != "" && oldAttr.Uid != string(pod.UID) { - return nil, fmt.Errorf("waiting for delete event of %s before reuse this ip", key) - } + if ipInfo.FIP.PodUid != "" && ipInfo.FIP.PodUid != string(pod.GetUID()) { + return nil, fmt.Errorf("waiting for delete event of %s before reuse this ip", key) } } else { subnet, err := p.queryNodeSubnet(nodeName) if err != nil { return nil, err } - if err := p.allocateInSubnet(key, subnet, policy, attr, "bind"); err != nil { + if err := p.allocateInSubnet(key, subnet, attr, "bind"); err != nil { return nil, err } how = "allocated" @@ -311,13 +304,13 @@ func (p *FloatingIPPlugin) allocateIP(key string, nodeName string, pod *corev1.P return nil, fmt.Errorf("failed to assign ip %s to %s: %v", ipInfo.IPInfo.IP.IP.String(), key, err) } if how == "reused" { - glog.Infof("pod %s reused %s, updating policy to %v attr %s", key, ipInfo.IPInfo.IP.String(), policy, attr) - if err := p.ipam.UpdatePolicy(key, ipInfo.IPInfo.IP.IP, policy, attr); err != nil { + glog.Infof("pod %s reused %s, updating attr to %v", key, ipInfo.IPInfo.IP.String(), attr) + if err := p.ipam.UpdateAttr(key, ipInfo.IPInfo.IP.IP, attr); err != nil { return nil, fmt.Errorf("failed to update floating ip release policy: %v", err) } } - glog.Infof("started at %d %s ip %s, policy %v, attr %s for %s", started.UnixNano(), how, - ipInfo.IPInfo.IP.String(), policy, attr, key) + glog.Infof("started at %d %s ip %s, attr %v for %s", started.UnixNano(), how, + ipInfo.IPInfo.IP.String(), attr, key) return &ipInfo.IPInfo, nil } @@ -406,14 +399,9 @@ func (p *FloatingIPPlugin) unbind(pod *corev1.Pod) error { return nil } ipStr := ipInfo.IPInfo.IP.IP.String() - var attr Attr - if err := json.Unmarshal([]byte(ipInfo.FIP.Attr), &attr); err != nil { - return fmt.Errorf("failed to unmarshal attr %s for pod %s: %v", ipInfo.FIP.Attr, key, err) - } - - glog.Infof("UnAssignIP nodeName %s, ip %s, key %s", attr.NodeName, ipStr, key) + glog.Infof("UnAssignIP nodeName %s, ip %s, key %s", ipInfo.FIP.NodeName, ipStr, key) if err = p.cloudProviderUnAssignIP(&rpc.UnAssignIPRequest{ - NodeName: attr.NodeName, + NodeName: ipInfo.FIP.NodeName, IPAddress: ipStr, }); err != nil { return fmt.Errorf("failed to unassign ip %s from %s: %v", ipStr, key, err) @@ -423,7 +411,7 @@ func (p *FloatingIPPlugin) unbind(pod *corev1.Pod) error { if keyObj.Deployment() { return p.unbindDpPod(keyObj, policy, "during unbinding pod") } - return p.unbindStsOrTappPod(pod, keyObj, policy) + return p.unbindNoneDpPod(keyObj, policy, "during unbinding pod") } // hasResourceName checks if the podspec has floatingip resource name @@ -491,24 +479,6 @@ func parseReleasePolicy(meta *v1.ObjectMeta) constant.ReleasePolicy { return constant.ConvertReleasePolicy(meta.Annotations[constant.ReleasePolicyAnnotation]) } -// Attr stores attrs about this pod -type Attr struct { - // NodeName is needed to send unassign request to cloud provider on resync - NodeName string - // uid is used to differentiate a deleting pod and a newly created pod with the same name such as statefulsets - // or tapp pod - Uid string -} - -func getAttr(nodeName, uid string) string { - obj := Attr{NodeName: nodeName, Uid: uid} - attr, err := json.Marshal(obj) - if err != nil { - glog.Warningf("failed to marshal attr %+v: %v", obj, err) - } - return string(attr) -} - func (p *FloatingIPPlugin) GetIpam() floatingip.IPAM { return p.ipam } diff --git a/pkg/ipam/schedulerplugin/floatingip_plugin_test.go b/pkg/ipam/schedulerplugin/floatingip_plugin_test.go index 786b29bd..c15cf370 100644 --- a/pkg/ipam/schedulerplugin/floatingip_plugin_test.go +++ b/pkg/ipam/schedulerplugin/floatingip_plugin_test.go @@ -117,7 +117,8 @@ func TestFilter(t *testing.T) { t.Fatal(err) } // test filter for reserve situation - if err := fipPlugin.ipam.AllocateSpecificIP(podKey.KeyInDB, net.ParseIP("10.173.13.2"), constant.ReleasePolicyPodDelete, ""); err != nil { + if err := fipPlugin.ipam.AllocateSpecificIP(podKey.KeyInDB, net.ParseIP("10.173.13.2"), + floatingip.Attr{Policy: constant.ReleasePolicyPodDelete}); err != nil { t.Fatal(err) } filtered, failed, err = fipPlugin.Filter(pod, nodes) @@ -155,11 +156,13 @@ func TestAllocateIP(t *testing.T) { fipPlugin, stopChan, _ := createPluginTestNodes(t) defer func() { stopChan <- struct{}{} }() - if err := fipPlugin.ipam.AllocateSpecificIP(podKey.KeyInDB, net.ParseIP("10.173.13.2"), constant.ReleasePolicyPodDelete, ""); err != nil { + if err := fipPlugin.ipam.AllocateSpecificIP(podKey.KeyInDB, net.ParseIP("10.173.13.2"), + floatingip.Attr{Policy: constant.ReleasePolicyPodDelete}); err != nil { t.Fatal(err) } // check update from ReleasePolicyPodDelete to ReleasePolicyImmutable pod.Spec.NodeName = node4 + pod.SetUID("pod-xx-1") ipInfo, err := fipPlugin.allocateIP(podKey.KeyInDB, pod.Spec.NodeName, pod) if err != nil { t.Fatal(err) @@ -167,9 +170,19 @@ func TestAllocateIP(t *testing.T) { if ipInfo == nil || ipInfo.IP.String() != "10.173.13.2/24" { t.Fatal(ipInfo) } - if err := checkPolicyAndAttr(fipPlugin.ipam, podKey.KeyInDB, constant.ReleasePolicyImmutable, expectAttrNotEmpty()); err != nil { + fip, err := fipPlugin.ipam.First(podKey.KeyInDB) + if err != nil { t.Fatal(err) } + if fip.FIP.Policy != uint16(constant.ReleasePolicyImmutable) { + t.Fatal(fip.FIP.Policy) + } + if fip.FIP.NodeName != node4 { + t.Fatal(fip.FIP.NodeName) + } + if fip.FIP.PodUid != string(pod.UID) { + t.Fatal(fip.FIP.PodUid) + } } func TestUpdatePod(t *testing.T) { @@ -200,7 +213,8 @@ func TestUpdatePod(t *testing.T) { func TestReleaseIP(t *testing.T) { fipPlugin, stopChan, _ := createPluginTestNodes(t) defer func() { stopChan <- struct{}{} }() - if err := fipPlugin.ipam.AllocateSpecificIP(podKey.KeyInDB, net.ParseIP("10.173.13.2"), constant.ReleasePolicyPodDelete, ""); err != nil { + if err := fipPlugin.ipam.AllocateSpecificIP(podKey.KeyInDB, net.ParseIP("10.173.13.2"), + floatingip.Attr{Policy: constant.ReleasePolicyPodDelete}); err != nil { t.Fatal(err) } if err := checkIPKey(fipPlugin.ipam, "10.173.13.2", podKey.KeyInDB); err != nil { @@ -369,7 +383,8 @@ func TestFilterForDeploymentIPPool(t *testing.T) { // test bind gets the right key, i.e. dp_ns1_dp_dp-xxx-yyy, and filter gets reserved node testPod: pod, expectFiltererd: []string{node4}, expectFailed: []string{drainedNode, nodeHasNoIP, node3}, preHook: func() error { - return fipPlugin.ipam.AllocateSpecificIP(podKey.KeyInDB, net.ParseIP("10.173.13.2"), constant.ReleasePolicyNever, "") + return fipPlugin.ipam.AllocateSpecificIP(podKey.KeyInDB, net.ParseIP("10.173.13.2"), + floatingip.Attr{Policy: constant.ReleasePolicyNever}) }, }, { @@ -405,43 +420,6 @@ func TestFilterForDeploymentIPPool(t *testing.T) { } } -// Attr has a time field which makes it hard to check, so creating this struct to do part check -type expectAttr struct { - empty bool - contains []string -} - -func expectAttrEmpty() expectAttr { - return expectAttr{empty: true} -} - -func expectAttrNotEmpty() expectAttr { - return expectAttr{empty: false} -} - -func checkPolicyAndAttr(ipam floatingip.IPAM, key string, expectPolicy constant.ReleasePolicy, expectAttr expectAttr) error { - fip, err := ipam.First(key) - if err != nil { - return err - } - // policy should be - if fip.FIP.Policy != uint16(expectPolicy) { - return fmt.Errorf("expect policy %d, real %d", expectPolicy, fip.FIP.Policy) - } - if expectAttr.empty && fip.FIP.Attr != "" { - return fmt.Errorf("expect attr empty, real attr %q", fip.FIP.Attr) - } - if !expectAttr.empty && fip.FIP.Attr == "" { - return fmt.Errorf("expect attr not empty, real attr empty") - } - for i := range expectAttr.contains { - if !strings.Contains(fip.FIP.Attr, expectAttr.contains[i]) { - return fmt.Errorf("expect attr contains %q, real attr %q", expectAttr.contains[i], fip.FIP.Attr) - } - } - return nil -} - func checkFilterResult(realFilterd []corev1.Node, realFailed schedulerapi.FailedNodesMap, expectFiltererd, expectFailed []string) error { if err := checkFiltered(realFilterd, expectFiltererd...); err != nil { return err @@ -603,11 +581,11 @@ func TestParseReleasePolicy(t *testing.T) { expect: constant.ReleasePolicyPodDelete, }, { - meta: &v1.ObjectMeta{Labels: map[string]string{}, Annotations: map[string]string{constant.ReleasePolicyAnnotation: constant.Immutable}}, + meta: &v1.ObjectMeta{Labels: map[string]string{}, Annotations: immutableAnnotation}, expect: constant.ReleasePolicyImmutable, }, { - meta: &v1.ObjectMeta{Labels: map[string]string{}, Annotations: map[string]string{constant.ReleasePolicyAnnotation: constant.Never}}, + meta: &v1.ObjectMeta{Labels: map[string]string{}, Annotations: neverAnnotation}, expect: constant.ReleasePolicyNever, }, { @@ -707,8 +685,8 @@ func TestUnBind(t *testing.T) { func drainNode(fipPlugin *FloatingIPPlugin, subnet *net.IPNet, except net.IP) error { for { - if _, err := fipPlugin.ipam.AllocateInSubnet("ns_notexistpod", subnet, constant.ReleasePolicyPodDelete, - ""); err != nil { + if _, err := fipPlugin.ipam.AllocateInSubnet("ns_notexistpod", subnet, + floatingip.Attr{Policy: constant.ReleasePolicyPodDelete}); err != nil { if err == floatingip.ErrNoEnoughIP { break } @@ -726,7 +704,8 @@ func TestUnBindImmutablePod(t *testing.T) { podKey, _ = schedulerplugin_util.FormatKey(pod) fipPlugin, stopChan, _ := createPluginTestNodes(t, pod, CreateStatefulSet(pod.ObjectMeta, 1)) defer func() { stopChan <- struct{}{} }() - if err := fipPlugin.ipam.AllocateSpecificIP(podKey.KeyInDB, net.ParseIP("10.173.13.2"), constant.ReleasePolicyImmutable, ""); err != nil { + if err := fipPlugin.ipam.AllocateSpecificIP(podKey.KeyInDB, net.ParseIP("10.173.13.2"), + floatingip.Attr{Policy: constant.ReleasePolicyImmutable}); err != nil { t.Fatal(err) } // unbind the pod, check ip should be reserved, because pod has is immutable @@ -745,11 +724,13 @@ func TestAllocateRecentIPs(t *testing.T) { fipPlugin, stopChan, nodes := createPluginTestNodes(t, pod, dp) defer func() { stopChan <- struct{}{} }() podKey, _ := schedulerplugin_util.FormatKey(pod) - if err := fipPlugin.ipam.AllocateSpecificIP(podKey.PoolPrefix(), net.ParseIP("10.49.27.205"), constant.ReleasePolicyPodDelete, ""); err != nil { + if err := fipPlugin.ipam.AllocateSpecificIP(podKey.PoolPrefix(), net.ParseIP("10.49.27.205"), + floatingip.Attr{Policy: constant.ReleasePolicyPodDelete}); err != nil { t.Fatal(err) } // update time of 10.49.27.216 is more recently than 10.49.27.205 - if err := fipPlugin.ipam.AllocateSpecificIP(podKey.PoolPrefix(), net.ParseIP("10.49.27.216"), constant.ReleasePolicyPodDelete, ""); err != nil { + if err := fipPlugin.ipam.AllocateSpecificIP(podKey.PoolPrefix(), net.ParseIP("10.49.27.216"), + floatingip.Attr{Policy: constant.ReleasePolicyPodDelete}); err != nil { t.Fatal(err) } // check filter allocates recent ips for deployment pod from ip pool diff --git a/pkg/ipam/schedulerplugin/ipam.go b/pkg/ipam/schedulerplugin/ipam.go index f1a85eec..a61c1990 100644 --- a/pkg/ipam/schedulerplugin/ipam.go +++ b/pkg/ipam/schedulerplugin/ipam.go @@ -48,9 +48,8 @@ func (p *FloatingIPPlugin) ensureIPAMConf(lastConf *string, newConf string) (boo return true, nil } -func (p *FloatingIPPlugin) allocateInSubnet(key string, subnet *net.IPNet, policy constant.ReleasePolicy, attr, - when string) error { - ip, err := p.ipam.AllocateInSubnet(key, subnet, policy, attr) +func (p *FloatingIPPlugin) allocateInSubnet(key string, subnet *net.IPNet, attr floatingip.Attr, when string) error { + ip, err := p.ipam.AllocateInSubnet(key, subnet, attr) if err != nil { return err } @@ -58,9 +57,8 @@ func (p *FloatingIPPlugin) allocateInSubnet(key string, subnet *net.IPNet, polic return nil } -func (p *FloatingIPPlugin) allocateInSubnetWithKey(oldK, newK, subnet string, policy constant.ReleasePolicy, - attr, when string) error { - if err := p.ipam.AllocateInSubnetWithKey(oldK, newK, subnet, policy, attr); err != nil { +func (p *FloatingIPPlugin) allocateInSubnetWithKey(oldK, newK, subnet string, attr floatingip.Attr, when string) error { + if err := p.ipam.AllocateInSubnetWithKey(oldK, newK, subnet, attr); err != nil { return err } fip, err := p.ipam.First(newK) @@ -138,10 +136,13 @@ func (p *FloatingIPPlugin) releaseIP(key string, reason string) error { } func (p *FloatingIPPlugin) reserveIP(key, prefixKey string, reason string) error { - if err := p.ipam.ReserveIP(key, prefixKey, getAttr("", "")); err != nil { + if reserved, err := p.ipam.ReserveIP(key, prefixKey, floatingip.Attr{}); err != nil { return fmt.Errorf("reserve ip from pod %s to %s: %v", key, prefixKey, err) + } else if reserved { + // resync will call reserveIP for sts and tapp pod with immutable/never release policy for endless times, + // so print "success reserved ip" only when succeeded + glog.Infof("reserved ip from pod %s to %s, because %s", key, prefixKey, reason) } - glog.Infof("reserved ip from pod %s to %s, because %s", key, prefixKey, reason) return nil } diff --git a/pkg/ipam/schedulerplugin/resync.go b/pkg/ipam/schedulerplugin/resync.go index e7f027c4..f58c8dc8 100644 --- a/pkg/ipam/schedulerplugin/resync.go +++ b/pkg/ipam/schedulerplugin/resync.go @@ -17,13 +17,11 @@ package schedulerplugin import ( - "encoding/json" "fmt" "net" "strconv" "strings" - appv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metaErrs "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -33,7 +31,6 @@ import ( "tkestack.io/galaxy/pkg/ipam/cloudprovider/rpc" "tkestack.io/galaxy/pkg/ipam/floatingip" "tkestack.io/galaxy/pkg/ipam/schedulerplugin/util" - tappv1 "tkestack.io/tapp/pkg/apis/tappcontroller/v1" ) type resyncObj struct { @@ -56,18 +53,12 @@ func (p *FloatingIPPlugin) resyncPod() error { if err := p.fetchChecklist(resyncMeta); err != nil { return err } - if err := p.fetchAppAndPodMeta(resyncMeta); err != nil { - return err - } p.resyncAllocatedIPs(resyncMeta) return nil } type resyncMeta struct { allocatedIPs map[string]resyncObj // allocated ips from galaxy pool - unfinish map[string]*corev1.Pod - tappMap map[string]*tappv1.TApp - ssMap map[string]*appv1.StatefulSet } func (p *FloatingIPPlugin) fetchChecklist(meta *resyncMeta) error { @@ -93,109 +84,56 @@ func (p *FloatingIPPlugin) fetchChecklist(meta *resyncMeta) error { return nil } -func (p *FloatingIPPlugin) fetchAppAndPodMeta(meta *resyncMeta) error { - var err error - meta.unfinish, err = p.listUnfinishPodsToMap() - if err != nil { - return err - } - meta.ssMap, err = p.getSSMap() - if err != nil { - return err - } - meta.tappMap, err = p.getTAppMap() - if err != nil { - return err - } - return nil -} - // #lizard forgives func (p *FloatingIPPlugin) resyncAllocatedIPs(meta *resyncMeta) { for key, obj := range meta.allocatedIPs { func() { defer p.lockPod(obj.keyObj.PodName, obj.keyObj.Namespace)() - if _, ok := meta.unfinish[key]; ok { - return - } - // check with apiserver to confirm it really not exist - if p.podRunning(obj.keyObj.PodName, obj.keyObj.Namespace) { + if p.podRunning(obj.keyObj.PodName, obj.keyObj.Namespace, obj.fip.PodUid) { return } - if p.cloudProvider != nil { - var attr Attr - if err := json.Unmarshal([]byte(obj.fip.Attr), &attr); err != nil { - glog.Errorf("failed to unmarshal attr %s for pod %s: %v", obj.fip.Attr, key, err) + if p.cloudProvider != nil && obj.fip.NodeName != "" { + // For tapp and sts pod, nodeName will be updated to empty after unassigning + glog.Infof("UnAssignIP nodeName %s, ip %s, key %s during resync", obj.fip.NodeName, + obj.fip.IP.String(), key) + if err := p.cloudProviderUnAssignIP(&rpc.UnAssignIPRequest{ + NodeName: obj.fip.NodeName, + IPAddress: obj.fip.IP.String(), + }); err != nil { + glog.Warningf("failed to unassign ip %s to %s: %v", obj.fip.IP.String(), key, err) + // return to retry unassign ip in the next resync loop return } - // For tapp and sts pod, nodeName will be updated to empty after unassigning - if attr.NodeName != "" { - glog.Infof("UnAssignIP nodeName %s, ip %s, key %s during resync", attr.NodeName, - obj.fip.IP.String(), key) - if err := p.cloudProviderUnAssignIP(&rpc.UnAssignIPRequest{ - NodeName: attr.NodeName, - IPAddress: obj.fip.IP.String(), - }); err != nil { - glog.Warningf("failed to unassign ip %s to %s: %v", obj.fip.IP.String(), key, err) - // return to retry unassign ip in the next resync loop - return - } - // for tapp and sts pod, we need to clean its node attr and uid - if err := p.ipam.ReserveIP(key, key, getAttr("", "")); err != nil { - glog.Errorf("failed to reserve %s ip: %v", key, err) - } + // for tapp and sts pod, we need to clean its node attr and uid + if err := p.reserveIP(key, key, "unassign ip during resync"); err != nil { + glog.Error(err) } } releasePolicy := constant.ReleasePolicy(obj.fip.Policy) - // we can't get labels of not exist pod, so get them from it's ss or deployment if !obj.keyObj.Deployment() { - p.resyncNoneDpPod(meta, obj.keyObj, releasePolicy) + if err := p.unbindNoneDpPod(obj.keyObj, releasePolicy, "during resync"); err != nil { + glog.Error(err) + } return } - if err := p.unbindDpPod(obj.keyObj, releasePolicy, "during resyncing"); err != nil { + if err := p.unbindDpPod(obj.keyObj, releasePolicy, "during resync"); err != nil { glog.Error(err) } }() } } -func (p *FloatingIPPlugin) resyncNoneDpPod(meta *resyncMeta, keyObj *util.KeyObj, releasePolicy constant.ReleasePolicy) { - if releasePolicy == constant.ReleasePolicyNever { - return - } - var appExist bool - var replicas int32 - appFullName := util.Join(keyObj.AppName, keyObj.Namespace) - if keyObj.StatefulSet() { - ss, ok := meta.ssMap[appFullName] - if ok { - appExist = true - replicas = 1 - if ss.Spec.Replicas != nil { - replicas = *ss.Spec.Replicas - } - } - } else if keyObj.TApp() { - tapp, ok := meta.tappMap[appFullName] - if ok { - appExist = true - replicas = tapp.Spec.Replicas - } - } else { - // release for other apps - appExist = false - } - if should, reason, err := p.shouldRelease(keyObj, releasePolicy, appExist, replicas); err != nil { - glog.Warning(err) - } else if should { - if err := p.releaseIP(keyObj.KeyInDB, fmt.Sprintf("%s during resyncing", reason)); err != nil { - glog.Warning(err) - } +func (p *FloatingIPPlugin) podRunning(podName, namespace, podUid string) bool { + pod, err := p.PodLister.Pods(namespace).Get(podName) + if runningAndUidMatch(podUid, pod, err) { + return true } + // double check with apiserver to confirm it is not running + pod, err = p.Client.CoreV1().Pods(namespace).Get(podName, v1.GetOptions{}) + return runningAndUidMatch(podUid, pod, err) } -func (p *FloatingIPPlugin) podRunning(podName, namespace string) bool { - pod, err := p.Client.CoreV1().Pods(namespace).Get(podName, v1.GetOptions{}) +func runningAndUidMatch(storedUid string, pod *corev1.Pod, err error) bool { if err != nil { if metaErrs.IsNotFound(err) { return false @@ -203,6 +141,9 @@ func (p *FloatingIPPlugin) podRunning(podName, namespace string) bool { // we cannot figure out whether pod exist or not, we'd better keep the ip return true } + if storedUid != "" && storedUid != string(pod.GetUID()) { + return false + } return !finished(pod) } @@ -225,25 +166,6 @@ func (p *FloatingIPPlugin) listWantedPods() ([]*corev1.Pod, error) { return filtered, nil } -func (p *FloatingIPPlugin) listUnfinishPodsToMap() (map[string]*corev1.Pod, error) { - pods, err := p.listWantedPods() - if err != nil { - return nil, err - } - unfinish := map[string]*corev1.Pod{} - for i := range pods { - if finished(pods[i]) { - continue - } - keyObj, err := util.FormatKey(pods[i]) - if err != nil { - continue - } - unfinish[keyObj.KeyInDB] = pods[i] - } - return unfinish, nil -} - // syncPodIPs sync all pods' ips with db, if a pod has PodIP and its ip is unallocated, allocate the ip to it func (p *FloatingIPPlugin) syncPodIPsIntoDB() { glog.V(4).Infof("sync pod ips into DB") @@ -269,6 +191,7 @@ func (p *FloatingIPPlugin) syncPodIP(pod *corev1.Pod) error { if pod.Annotations == nil { return nil } + defer p.lockPod(pod.Name, pod.Namespace)() keyObj, err := util.FormatKey(pod) if err != nil { glog.V(5).Infof("sync pod %s/%s ip formatKey with error %v", pod.Namespace, pod.Name, err) @@ -296,8 +219,9 @@ func (p *FloatingIPPlugin) syncIP(key string, ip net.IP, pod *corev1.Pod) error return fmt.Errorf("conflict ip %s found for both %s and %s", ip.String(), key, storedKey) } } else { - if err := p.ipam.AllocateSpecificIP(key, ip, parseReleasePolicy(&pod.ObjectMeta), - getAttr(pod.Spec.NodeName, string(pod.UID))); err != nil { + attr := floatingip.Attr{ + Policy: parseReleasePolicy(&pod.ObjectMeta), NodeName: pod.Spec.NodeName, Uid: string(pod.UID)} + if err := p.ipam.AllocateSpecificIP(key, ip, attr); err != nil { return err } glog.Infof("updated floatingip %s to key %s", ip.String(), key) diff --git a/pkg/ipam/schedulerplugin/resync_test.go b/pkg/ipam/schedulerplugin/resync_test.go index 9cb7d29b..d8ae2f8f 100644 --- a/pkg/ipam/schedulerplugin/resync_test.go +++ b/pkg/ipam/schedulerplugin/resync_test.go @@ -22,6 +22,8 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "tkestack.io/galaxy/pkg/ipam/floatingip" . "tkestack.io/galaxy/pkg/ipam/schedulerplugin/testing" "tkestack.io/galaxy/pkg/ipam/schedulerplugin/util" ) @@ -34,10 +36,12 @@ func TestResyncAppNotExist(t *testing.T) { pod1Key, _ := util.FormatKey(pod1) pod2Key, _ := util.FormatKey(pod2) - if err := fipPlugin.ipam.AllocateSpecificIP(pod1Key.KeyInDB, net.ParseIP("10.49.27.205"), parseReleasePolicy(&pod1.ObjectMeta), ""); err != nil { + if err := fipPlugin.ipam.AllocateSpecificIP(pod1Key.KeyInDB, net.ParseIP("10.49.27.205"), + floatingip.Attr{Policy: parseReleasePolicy(&pod1.ObjectMeta)}); err != nil { t.Fatal(err) } - if err := fipPlugin.ipam.AllocateSpecificIP(pod2Key.KeyInDB, net.ParseIP("10.49.27.216"), parseReleasePolicy(&pod2.ObjectMeta), ""); err != nil { + if err := fipPlugin.ipam.AllocateSpecificIP(pod2Key.KeyInDB, net.ParseIP("10.49.27.216"), + floatingip.Attr{Policy: parseReleasePolicy(&pod2.ObjectMeta)}); err != nil { t.Fatal(err) } if err := fipPlugin.resyncPod(); err != nil { @@ -88,7 +92,8 @@ func TestResyncStsPod(t *testing.T) { func() { fipPlugin, stopChan, _ := createPluginTestNodes(t, objs...) defer func() { stopChan <- struct{}{} }() - if err := fipPlugin.ipam.AllocateSpecificIP(keyObj.KeyInDB, net.ParseIP("10.49.27.205"), parseReleasePolicy(&pod.ObjectMeta), ""); err != nil { + if err := fipPlugin.ipam.AllocateSpecificIP(keyObj.KeyInDB, net.ParseIP("10.49.27.205"), + floatingip.Attr{Policy: parseReleasePolicy(&pod.ObjectMeta)}); err != nil { t.Fatalf("case %d, err %v", i, err) } if err := fipPlugin.resyncPod(); err != nil { @@ -100,3 +105,37 @@ func TestResyncStsPod(t *testing.T) { }() } } + +func TestResyncPodUidChanged(t *testing.T) { + oldUid, newUid := "uid-1", "uid-2" + pod := CreateStatefulSetPod("dp-xxx-0", "ns1", immutableAnnotation) + pod.SetUID(types.UID(newUid)) + sts := CreateStatefulSet(pod.ObjectMeta, 1) + sts.Spec.Template.Spec = pod.Spec + ip := net.ParseIP("10.49.27.205") + fipPlugin, stopChan, _ := createPluginTestNodes(t, pod, sts) + defer func() { stopChan <- struct{}{} }() + podKey, _ := util.FormatKey(pod) + attr := floatingip.Attr{ + Policy: parseReleasePolicy(&pod.ObjectMeta), NodeName: "node-1", Uid: oldUid} + if err := fipPlugin.ipam.AllocateSpecificIP(podKey.KeyInDB, ip, attr); err != nil { + t.Fatal(err) + } + if err := fipPlugin.resyncPod(); err != nil { + t.Fatal(err) + } + fip, err := fipPlugin.ipam.ByIP(ip) + if err != nil { + t.Fatal(err) + } + if fip.Key != podKey.KeyInDB { + t.Fatalf("expect key: %s, got %s", podKey.KeyInDB, fip.Key) + } + // pod uid changed, ip should be reserved, i.e. key should keep, but nodeName and podUid should be empty + if fip.PodUid != "" { + t.Fatal(fip.PodUid) + } + if fip.NodeName != "" { + t.Fatal(fip.NodeName) + } +} diff --git a/pkg/ipam/schedulerplugin/statefulset.go b/pkg/ipam/schedulerplugin/statefulset.go index e8e37dee..d0b868ea 100644 --- a/pkg/ipam/schedulerplugin/statefulset.go +++ b/pkg/ipam/schedulerplugin/statefulset.go @@ -19,37 +19,35 @@ package schedulerplugin import ( "fmt" - appv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" metaErrs "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/labels" - glog "k8s.io/klog" "tkestack.io/galaxy/pkg/api/galaxy/constant" "tkestack.io/galaxy/pkg/ipam/schedulerplugin/util" ) -func (p *FloatingIPPlugin) unbindStsOrTappPod(pod *corev1.Pod, keyObj *util.KeyObj, - policy constant.ReleasePolicy) error { +func (p *FloatingIPPlugin) unbindNoneDpPod(keyObj *util.KeyObj, policy constant.ReleasePolicy, when string) error { key := keyObj.KeyInDB - if policy == constant.ReleasePolicyPodDelete { - return p.releaseIP(key, deletedAndIPMutablePod) + if policy == constant.ReleasePolicyPodDelete || (!keyObj.StatefulSet() && !keyObj.TApp()) { + // TODO for other workload pods, if we support more release policy other than ReleasePolicyPodDelete, + // make sure change this + return p.releaseIP(key, fmt.Sprintf("%s %s", deletedAndIPMutablePod, when)) } else if policy == constant.ReleasePolicyNever { - return p.reserveIP(key, key, "never policy") + return p.reserveIP(key, key, fmt.Sprintf("never release policy %s", when)) } else if policy == constant.ReleasePolicyImmutable { if keyObj.TApp() && p.TAppLister == nil { // tapp lister is nil, we can't get replicas and it's better to reserve the ip. - return p.reserveIP(key, key, "immutable policy") + return p.reserveIP(key, key, fmt.Sprintf("immutable policy %s", when)) } appExist, replicas, err := p.checkAppAndReplicas(keyObj) if err != nil { return err } - shouldRelease, reason, err := p.shouldRelease(keyObj, policy, appExist, replicas) + shouldRelease, reason, err := p.shouldRelease(keyObj, appExist, replicas) if err != nil { return err } + reason = fmt.Sprintf("%s %s", reason, when) if !shouldRelease { - return p.reserveIP(key, key, "immutable policy") + return p.reserveIP(key, key, reason) } else { return p.releaseIP(key, reason) } @@ -85,17 +83,10 @@ func (p *FloatingIPPlugin) getStsReplicas(keyObj *util.KeyObj) (appExist bool, r return } -func (p *FloatingIPPlugin) shouldRelease(keyObj *util.KeyObj, releasePolicy constant.ReleasePolicy, - parentAppExist bool, replicas int32) (bool, string, error) { +func (p *FloatingIPPlugin) shouldRelease(keyObj *util.KeyObj, parentAppExist bool, + replicas int32) (bool, string, error) { if !parentAppExist { - if releasePolicy != constant.ReleasePolicyNever { - return true, deletedAndParentAppNotExistPod, nil - } - return false, "", nil - } - if releasePolicy != constant.ReleasePolicyImmutable { - // 2. deleted pods whose parent statefulset or tapp exist but is not ip immutable - return true, deletedAndIPMutablePod, nil + return true, deletedAndParentAppNotExistPod, nil } index, err := parsePodIndex(keyObj.KeyInDB) if err != nil { @@ -104,21 +95,5 @@ func (p *FloatingIPPlugin) shouldRelease(keyObj *util.KeyObj, releasePolicy cons if replicas < int32(index)+1 { return true, deletedAndScaledDownAppPod, nil } - return false, "", nil -} - -func (p *FloatingIPPlugin) getSSMap() (map[string]*appv1.StatefulSet, error) { - sss, err := p.StatefulSetLister.List(labels.Everything()) - if err != nil { - return nil, err - } - key2App := make(map[string]*appv1.StatefulSet) - for i := range sss { - if !p.hasResourceName(&sss[i].Spec.Template.Spec) { - continue - } - key2App[util.StatefulsetName(sss[i])] = sss[i] - } - glog.V(5).Infof("%v", key2App) - return key2App, nil + return false, "pod index is less than replicas", nil } diff --git a/pkg/ipam/schedulerplugin/tapp.go b/pkg/ipam/schedulerplugin/tapp.go index b426bd51..6d52d031 100644 --- a/pkg/ipam/schedulerplugin/tapp.go +++ b/pkg/ipam/schedulerplugin/tapp.go @@ -17,38 +17,10 @@ package schedulerplugin import ( - "fmt" - metaErrs "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/labels" - glog "k8s.io/klog" "tkestack.io/galaxy/pkg/ipam/schedulerplugin/util" - tappv1 "tkestack.io/tapp/pkg/apis/tappcontroller/v1" ) -func tAppFullName(tapp *tappv1.TApp) string { - return fmt.Sprintf("%s_%s", tapp.Namespace, tapp.Name) -} - -func (p *FloatingIPPlugin) getTAppMap() (map[string]*tappv1.TApp, error) { - if p.TAppLister == nil { - return map[string]*tappv1.TApp{}, nil - } - tApps, err := p.TAppLister.List(labels.Everything()) - if err != nil { - return nil, err - } - key2App := make(map[string]*tappv1.TApp) - for i := range tApps { - if !p.hasResourceName(&tApps[i].Spec.Template.Spec) { - continue - } - key2App[tAppFullName(tApps[i])] = tApps[i] - } - glog.V(5).Infof("%v", key2App) - return key2App, nil -} - func (p *FloatingIPPlugin) getTAppReplicas(keyObj *util.KeyObj) (appExist bool, replicas int32, retErr error) { tapp, err := p.TAppLister.TApps(keyObj.Namespace).Get(keyObj.AppName) if err != nil {