Skip to content

Commit

Permalink
add pod lock
Browse files Browse the repository at this point in the history
- replace keylock with keymutex which is more efficient
- fix a bug if TAppLister is nil and stateful pod is immutable release policy
  • Loading branch information
chenchun committed May 29, 2020
1 parent c42b250 commit 320d9fe
Show file tree
Hide file tree
Showing 18 changed files with 329 additions and 379 deletions.
2 changes: 1 addition & 1 deletion hack/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ go get github.com/onsi/ginkgo/ginkgo
export PATH=${PATH}:$(go env GOPATH)/bin
echo PATH=$PATH
export GOOS=linux
go test -coverpkg $PKG/pkg/... -coverprofile=coverage.txt -covermode=atomic -v ./tools/... ./cmd/... ./cni/... ./pkg/...
go test -race -coverpkg $PKG/pkg/... -coverprofile=coverage.txt -covermode=atomic -v ./tools/... ./cmd/... ./cni/... ./pkg/...
# go tool cover -func=coverage.txt
for i in e2e/k8s-vlan e2e/veth e2e/cni-request e2e/underlay/veth; do
ginkgo -v $i
Expand Down
7 changes: 2 additions & 5 deletions pkg/ipam/api/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,12 @@ import (
"tkestack.io/galaxy/pkg/ipam/floatingip"
"tkestack.io/galaxy/pkg/ipam/schedulerplugin/util"
"tkestack.io/galaxy/pkg/utils/httputil"
"tkestack.io/galaxy/pkg/utils/keylock"
)

type PoolController struct {
Client versioned.Interface
PoolLister list.PoolLister
LockPool *keylock.Keylock
LockPoolFunc func(poolName string) func() // returns unlock func
IPAM, SecondIPAM floatingip.IPAM
}

Expand Down Expand Up @@ -139,9 +138,7 @@ func (c *PoolController) CreateOrUpdate(req *restful.Request, resp *restful.Resp

func (c *PoolController) preAllocateIP(req *restful.Request, resp *restful.Response, pool *Pool) {
poolPrefix := util.NewKeyObj(util.DeploymentPrefixKey, "", "", "", pool.Name).PoolPrefix()
lockIndex := c.LockPool.GetLockIndex([]byte(poolPrefix))
c.LockPool.RawLock(lockIndex)
defer c.LockPool.RawUnlock(lockIndex)
defer c.LockPoolFunc(poolPrefix)()
fips, err := c.IPAM.ByPrefix(poolPrefix)
if err != nil {
httputil.InternalError(resp, err)
Expand Down
39 changes: 0 additions & 39 deletions pkg/ipam/schedulerplugin/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@
package schedulerplugin

import (
"encoding/json"
"fmt"

glog "k8s.io/klog"
"tkestack.io/galaxy/pkg/ipam/cloudprovider/rpc"
"tkestack.io/galaxy/pkg/ipam/floatingip"
)

// cloudProviderAssignIP send assign ip req to cloud provider
Expand Down Expand Up @@ -62,40 +60,3 @@ func (p *FloatingIPPlugin) cloudProviderUnAssignIP(req *rpc.UnAssignIPRequest) e
glog.Infof("UnAssignIP %v success", req)
return nil
}

// resyncCloudProviderIPs resyncs assigned ips with cloud provider
func (p *FloatingIPPlugin) resyncCloudProviderIPs(ipam floatingip.IPAM, meta *resyncMeta) {
for key, obj := range meta.assignedPods {
if _, ok := meta.existPods[key]; ok {
continue
}
// check with apiserver to confirm it really not exist
if p.podExist(obj.keyObj.PodName, obj.keyObj.Namespace) {
continue
}
var attr Attr
if err := json.Unmarshal([]byte(obj.fip.Attr), &attr); err != nil {
glog.Errorf("failed to unmarshal attr %s for pod %s: %v", obj.fip.Attr, key, err)
continue
}
if attr.NodeName == "" {
// this is expected. For tapp and sts pod, nodeName will be updated to empty after unassigning
continue
}
glog.Infof("UnAssignIP nodeName %s, ip %s, key %s during resync", attr.NodeName,
obj.fip.IP.String(), key)
if err := p.cloudProviderUnAssignIP(&rpc.UnAssignIPRequest{
NodeName: attr.NodeName,
IPAddress: obj.fip.IP.String(),
}); err != nil {
// delete this record from allocatedIPs map to have a retry
delete(meta.allocatedIPs, key)
glog.Warningf("failed to unassign ip %s to %s: %v", obj.fip.IP.String(), key, err)
continue
}
// for tapp and sts pod, we need to clean its node attr and uid
if err := ipam.ReserveIP(key, key, getAttr("", "")); err != nil {
glog.Errorf("failed to reserve %s ip: %v", key, err)
}
}
}
58 changes: 20 additions & 38 deletions pkg/ipam/schedulerplugin/cloudprovider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,59 +18,41 @@
package schedulerplugin

import (
"errors"
"net"
"sync"
"testing"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"tkestack.io/galaxy/pkg/api/galaxy/constant"
"tkestack.io/galaxy/pkg/api/k8s/schedulerapi"
. "tkestack.io/galaxy/pkg/ipam/schedulerplugin/testing"
schedulerplugin_util "tkestack.io/galaxy/pkg/ipam/schedulerplugin/util"
)

func TestBindingAfterReceivingDeleteEvent(t *testing.T) {
node := createNode("node1", nil, "10.49.27.2")
func TestConcurrentBindUnbind(t *testing.T) {
pod := CreateDeploymentPod("dp-xxx-yyy", "ns1", poolAnnotation("pool1"))
podKey, _ := schedulerplugin_util.FormatKey(pod)
dp1 := createDeployment("dp", "ns1", pod.ObjectMeta, 1)
expectIP := "10.49.27.205"
plugin, stopChan := createPlugin(t, pod, dp1, &node)
dp1 := createDeployment(pod.ObjectMeta, 1)
plugin, stopChan, _ := createPluginTestNodes(t, pod, dp1)
defer func() { stopChan <- struct{}{} }()
cloudProvider := &fakeCloudProvider1{proceedBind: make(chan struct{})}
cloudProvider := &fakeCloudProvider1{m: make(map[string]string)}
plugin.cloudProvider = cloudProvider
if err := plugin.ipam.AllocateSpecificIP(podKey.KeyInDB, net.ParseIP("10.49.27.216"), constant.ReleasePolicyPodDelete, "{}"); err != nil {
t.Fatal(err)
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
// drain ips other than expectIP of this subnet
if err := drainNode(plugin, node3Subnet, net.ParseIP(expectIP)); err != nil {
t.Fatal(err)
}
// bind will hang on waiting event
_, err := checkBind(plugin, pod, node.Name, podKey.KeyInDB, node3Subnet)
if err == nil || !isPodNotFoundError(errors.Unwrap(err)) {
defer wg.Done()
if err := plugin.unbind(pod); err != nil {
t.Fatal(err)
}
}()
<-cloudProvider.proceedBind
// Before cloudProvider.AssignIP invoked allocating ip has already done, check ip allocated to pod
if err := checkIPKey(plugin.ipam, expectIP, podKey.KeyInDB); err != nil {
t.Fatal(err)
}
// before bind is done, we delete this pod
if err := plugin.PluginFactoryArgs.Client.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{}); err != nil {
t.Fatal(err)
}
if err := waitForUnbind(plugin); err != nil {
t.Fatal(err)
}
cloudProvider.proceedBind <- struct{}{}
if err := waitForUnbind(plugin); err != nil {
if err := plugin.Bind(&schedulerapi.ExtenderBindingArgs{
PodName: pod.Name,
PodNamespace: pod.Namespace,
Node: node3,
}); err != nil {
t.Fatal(err)
}
// key should be updated to pool prefix
if err := checkIPKey(plugin.ipam, expectIP, podKey.PoolPrefix()); err != nil {
t.Fatal(err)
}
}

func waitForUnbind(plugin *FloatingIPPlugin) error {
deleteEvent := <-plugin.unreleased
return plugin.unbind(deleteEvent.pod)
wg.Wait()
}
11 changes: 8 additions & 3 deletions pkg/ipam/schedulerplugin/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,7 @@ func (p *FloatingIPPlugin) unbindDpPodForIPAM(keyObj *util.KeyObj, ipam floating
}
// locks the pool name if it is a pool
// locks the deployment app name if it isn't a pool
lockIndex := p.dpLockPool.GetLockIndex([]byte(prefixKey))
p.dpLockPool.RawLock(lockIndex)
defer p.dpLockPool.RawUnlock(lockIndex)
defer p.LockDpPool(prefixKey)()
fips, err := ipam.ByPrefix(prefixKey)
if err != nil {
return err
Expand Down Expand Up @@ -115,3 +113,10 @@ func (p *FloatingIPPlugin) getDpReplicas(keyObj *util.KeyObj) (int, bool, error)
}
return replicas, false, nil
}

func (p *FloatingIPPlugin) LockDpPool(poolName string) func() {
p.dpLockPool.LockKey(poolName)
return func() {
_ = p.dpLockPool.UnlockKey(poolName)
}
}
58 changes: 58 additions & 0 deletions pkg/ipam/schedulerplugin/deployment_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Tencent is pleased to support the open source community by making TKEStack available.
*
* Copyright (C) 2012-2019 Tencent. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* https://opensource.org/licenses/Apache-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package schedulerplugin

import (
"testing"

. "tkestack.io/galaxy/pkg/ipam/schedulerplugin/testing"
"tkestack.io/galaxy/pkg/ipam/schedulerplugin/util"
)

func TestDpReleasePolicy(t *testing.T) {
for i, testCase := range []struct {
annotations map[string]string
replicas int32
expectKeyFunc func(obj *util.KeyObj) string
}{
{annotations: nil, replicas: 1, expectKeyFunc: emptyNameFunc},
{annotations: immutableAnnotation, replicas: 1, expectKeyFunc: poolPrefixFunc},
{annotations: immutableAnnotation, replicas: 0, expectKeyFunc: emptyNameFunc},
{annotations: neverAnnotation, replicas: 0, expectKeyFunc: poolPrefixFunc},
{annotations: neverAnnotation, replicas: 1, expectKeyFunc: poolPrefixFunc},
{annotations: poolAnnotation("test"), replicas: 0, expectKeyFunc: poolPrefixFunc},
{annotations: poolAnnotation("test"), replicas: 1, expectKeyFunc: poolPrefixFunc},
} {
pod := CreateDeploymentPod("dp-xxx-yy", "ns1", testCase.annotations)
keyObj, _ := util.FormatKey(pod)
dp := createDeployment(pod.ObjectMeta, testCase.replicas)
func() {
fipPlugin, stopChan, _ := createPluginTestNodes(t, pod, dp)
defer func() { stopChan <- struct{}{} }()
fip, err := checkBind(fipPlugin, pod, node3, keyObj.KeyInDB, node3Subnet)
if err != nil {
t.Fatalf("case %d, err %v", i, err)
}
if err := fipPlugin.unbind(pod); err != nil {
t.Fatalf("case %d, err %v", i, err)
}
if err := checkIPKey(fipPlugin.ipam, fip.FIP.IP.String(), testCase.expectKeyFunc(keyObj)); err != nil {
t.Fatalf("case %d, err %v", i, err)
}
}()
}
}
9 changes: 6 additions & 3 deletions pkg/ipam/schedulerplugin/fake_cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,22 @@
package schedulerplugin

import (
glog "k8s.io/klog"
"tkestack.io/galaxy/pkg/ipam/cloudprovider/rpc"
)

type fakeCloudProvider1 struct {
proceedBind chan struct{}
m map[string]string // race test
}

func (f *fakeCloudProvider1) AssignIP(in *rpc.AssignIPRequest) (*rpc.AssignIPReply, error) {
f.proceedBind <- struct{}{} // notify we are waiting in AssignIP now
<-f.proceedBind // sleep before we receive signal to continue again
f.m["1"] = "a"
glog.Infof(`f.m["1"] = "a"`)
return &rpc.AssignIPReply{Success: true}, nil
}

func (f *fakeCloudProvider1) UnAssignIP(in *rpc.UnAssignIPRequest) (*rpc.UnAssignIPReply, error) {
f.m["2"] = "b"
glog.Infof(`f.m["2"] = "b"`)
return &rpc.UnAssignIPReply{Success: true}, nil
}

0 comments on commit 320d9fe

Please sign in to comment.