Skip to content

Commit

Permalink
fix: 多个 destination 时的 zk 竞争问题 (#68) (#69)
Browse files Browse the repository at this point in the history
Co-authored-by: Lucas biu <517197934@qq.com>
  • Loading branch information
withlin and LucaslEliane committed Apr 2, 2021
1 parent b7c5331 commit 533d438
Showing 1 changed file with 29 additions and 13 deletions.
42 changes: 29 additions & 13 deletions client/cluster_canal_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ type ClusterCanalConnector struct {
RetryTimes int32
currentSequence string
zkVersion int32

Path string
}

const (
Expand All @@ -32,12 +34,14 @@ const (
func NewClusterCanalConnector(canalNode *CanalClusterNode, username string, password string, destination string,
soTimeOut int32, idleTimeOut int32) (*ClusterCanalConnector,error) {

err := checkRootPath(canalNode.zkClient)
destinationPath := fmt.Sprintf("%s/%s", path, destination)

err := checkRootPath(canalNode.zkClient, destinationPath)
if err != nil {
return nil, err
}

currentSequence, err := createEphemeralSequence(canalNode.zkClient)
currentSequence, err := createEphemeralSequence(canalNode.zkClient, destinationPath)
if err != nil {
return nil,err
}
Expand All @@ -52,6 +56,7 @@ func NewClusterCanalConnector(canalNode *CanalClusterNode, username string, pass
RetryTimes: 0,
currentSequence:currentSequence,
zkVersion:0,
Path: destinationPath,
}

return cluster, nil
Expand Down Expand Up @@ -89,7 +94,7 @@ func (cc *ClusterCanalConnector) doConnect() error {
return fmt.Errorf("error wait become first zk node %s", err.Error())
}

_, err = cc.canalNode.zkClient.Set(path+"/"+cc.currentSequence, []byte{runningFlag}, cc.zkVersion)
_, err = cc.canalNode.zkClient.Set(cc.Path+"/"+cc.currentSequence, []byte{runningFlag}, cc.zkVersion)
if err != nil {
return fmt.Errorf("error set running flag %s", err.Error())
}
Expand Down Expand Up @@ -117,8 +122,8 @@ func (cc *ClusterCanalConnector) doConnect() error {
func (cc *ClusterCanalConnector) DisConnection() error {
if cc.conn != nil {
cc.conn.DisConnection()
_, stat, _ := cc.canalNode.zkClient.Get(path + "/" + cc.currentSequence)
err := cc.canalNode.zkClient.Delete(path+"/"+cc.currentSequence, stat.Version)
_, stat, _ := cc.canalNode.zkClient.Get(cc.Path + "/" + cc.currentSequence)
err := cc.canalNode.zkClient.Delete(cc.Path+"/"+cc.currentSequence, stat.Version)
if err != nil {
return fmt.Errorf("error delete temp consumer path %s", err.Error())
}
Expand Down Expand Up @@ -206,8 +211,9 @@ func (cc *ClusterCanalConnector) RollBack(batchId int64) error {
return nil
}

func createEphemeralSequence(zkClient *zk.Conn) (string, error) {
node, err := zkClient.Create(path+"/", []byte{notRunningFlag}, zk.FlagEphemeral|zk.FlagSequence, zk.WorldACL(zk.PermAll))
func createEphemeralSequence(zkClient *zk.Conn, destinationPath string) (string, error) {
node, err := zkClient.Create(destinationPath+"/", []byte{notRunningFlag}, zk.FlagEphemeral|zk.FlagSequence,
zk.WorldACL(zk.PermAll))
if err != nil {
return "", err
}
Expand All @@ -216,23 +222,33 @@ func createEphemeralSequence(zkClient *zk.Conn) (string, error) {
return currentSequence, nil
}

func checkRootPath(zkClient *zk.Conn) error {
exists, _, err := zkClient.Exists(path)
func checkRootPath(zkClient *zk.Conn, destinationPath string) error {
rootExists, _, err := zkClient.Exists(path)
if err != nil {
return err
}
if !exists {
if !rootExists {
_, err := zkClient.Create(path, []byte{}, 0, zk.WorldACL(zk.PermAll))
if err != nil {
return err
}
}
exists, _, err := zkClient.Exists(destinationPath)
if err != nil {
return err
}
if !exists {
_, err := zkClient.Create(destinationPath, []byte{}, 0, zk.WorldACL(zk.PermAll))
if err != nil {
return err
}
}
return nil
}

func (cc *ClusterCanalConnector) waitBecomeFirst() error {
zkClient := cc.canalNode.zkClient
children, _, err := zkClient.Children(path)
children, _, err := zkClient.Children(cc.Path)
if err != nil {
return err
}
Expand All @@ -249,7 +265,7 @@ func (cc *ClusterCanalConnector) waitBecomeFirst() error {
for i, child := range children {
if cc.currentSequence == child {
noSelf = false
previousPath := path + "/" + children[i-1]
previousPath := cc.Path + "/" + children[i-1]
//阻塞等待上一个比他小的节点删除
log.Println("waiting")
err := waitDelete(zkClient, previousPath)
Expand All @@ -264,7 +280,7 @@ func (cc *ClusterCanalConnector) waitBecomeFirst() error {

if noSelf {
//以防万一
cc.currentSequence, err = createEphemeralSequence(zkClient)
cc.currentSequence, err = createEphemeralSequence(zkClient, cc.Path)
if err != nil {
return err
}
Expand Down

0 comments on commit 533d438

Please sign in to comment.