Skip to content
This repository has been archived by the owner on Jul 21, 2021. It is now read-only.

Commit

Permalink
Merge b9e3d7d into c4fab1a
Browse files Browse the repository at this point in the history
  • Loading branch information
shz117 committed Feb 21, 2019
2 parents c4fab1a + b9e3d7d commit 6e60c6b
Show file tree
Hide file tree
Showing 8 changed files with 772 additions and 9 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
.DS_Store
.idea
zookeeper-*
156 changes: 156 additions & 0 deletions zk/leader_election.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package zk

import (
"fmt"
"strings"
)

// LeaderElector defines methods for leader election
type LeaderElector interface {
// Start leader election for current process, caller should listen to returned
// chan for success event
Start() (chan struct{}, error)
// Stop election, resign leader if already chosen as leader
Stop() error
}

// NewLeaderElector returns a LeaderElector implementation
func NewLeaderElector(conn *Conn, electionPath string, acl []ACL) LeaderElector {
return &leaderElectorImpl{conn: conn, electionPath: electionPath, acl: acl}
}

type leaderElectorImpl struct {
conn *Conn
acl []ACL
// path of election
electionPath string
// path and version of current process' znode
nodePath string
lock *Lock
}

func (l *leaderElectorImpl) Start() (election chan struct{}, err error) {
l.lock = NewLock(l.conn, l.electionPath + "_lock", l.acl)
err = l.lock.Lock()
if err != nil {
return
}
defer func() {
err = l.lock.Unlock()
}()

election = make(chan struct{}, 1)
prefix := fmt.Sprintf("%s/condidate-", l.electionPath)

// create znode for current process
nodePath := ""
for i := 0; i< 3; i++ {
nodePath, err = l.conn.CreateProtectedEphemeralSequential(prefix, []byte{}, l.acl)
if err == ErrNoNode {
// Create parent node.
parts := strings.Split(l.electionPath, "/")
pth := ""
for _, p := range parts[1:] {
var exists bool
pth += "/" + p
exists, _, err = l.conn.Exists(pth)
if err != nil {
return
}
if exists == true {
continue
}
_, err = l.conn.Create(pth, []byte{}, 0, l.acl)
if err != nil && err != ErrNodeExists {
return
}
}
} else if err == nil {
break
} else {
return
}
}
l.nodePath = nodePath
go l.watchAndPipe(election)
return
}

// setup watches, push to election chan when current process got elected
// this func is meant to be kept running in a separate goroutine
func (l *leaderElectorImpl) watchAndPipe(election chan struct{}) {
// check up to 3 times for potential wins
for i := 1; i < 3; i++ {
seq, err := parseSeq(l.nodePath, "")
if err != nil {
return
}

children, _, err := l.conn.Children(l.electionPath)
if err != nil {
return
}

lowestSeq := seq
prevSeq := -1
prevSeqPath := ""
for _, p := range children {
s, err := parseSeq(p, "")
if err != nil {
return
}
if s < lowestSeq {
lowestSeq = s
}
if s < seq && s > prevSeq {
prevSeq = s
prevSeqPath = p
}
}

if seq == lowestSeq {
// won election!
election <- struct{}{}
return
}

// Wait on the node next in line to be deleted
_, _, ch, err := l.conn.GetW(l.electionPath + "/" + prevSeqPath)
if err != nil && err != ErrNoNode {
return
}

ev := <-ch
if ev.Err != nil {
err = ev.Err
return
}
if ev.Type == EventNodeDeleted {
// previous node deleted, could be a win, check again to make sure
continue
}
break
}
}

func (l *leaderElectorImpl) Stop() (err error) {
err = l.lock.Lock()
if err != nil {
return
}
defer func() {
err = l.lock.Unlock()
}()

if l.nodePath != "" {
var stat *Stat
_, stat, err = l.conn.Get(l.nodePath)
if err != nil {
return
}

err = l.conn.Delete(l.nodePath, stat.Version)
}
return
}

94 changes: 94 additions & 0 deletions zk/leader_election_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package zk

import (
"testing"
"time"
)

func TestLeaderElector(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
zk, _, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()

acls := WorldACL(PermAll)

l1 := NewLeaderElector(zk, "/test", acls)
c1, err := l1.Start()
if err != nil {
t.Error(err)
}


// case1: l1 should win as only candidate, test should hang here otherwise
_ = <- c1

kill1 := make(chan struct{}, 1)

go func() {
// case 2: l2 should never win since l1 holds election
l2 := NewLeaderElector(zk, "/test", acls)
c2, _ := l2.Start()
defer l2.Stop()
select {
case <- c2:
t.Error()
case <- kill1:
return
}
}()

time.Sleep(time.Second)
// l2 exited election here
kill1 <- struct{}{}


kill2 := make(chan struct{}, 1)
go func() {
// case 3: l3 will win after l1 quit
l3 := NewLeaderElector(zk, "/test", acls)
c3, _ := l3.Start()

defer l3.Stop()
select {
case <- c3:
return
case <- kill2:
t.Error()
}
}()

l1.Stop()
time.Sleep(time.Second)
kill2 <- struct{}{}

kill3 := make(chan struct{}, 1)
// case 4: l1 holds election, l3 waits on l2, and should not win in case l2
// disconnect (l1 still holds election)
c1, _ = l1.Start()
_ = <- c1
l2 := NewLeaderElector(zk, "/test", acls)
_, _ = l2.Start()
go func() {
l3 := NewLeaderElector(zk, "/test", acls)
c3, _ := l3.Start()

defer l3.Stop()
select {
case <- c3:
t.Error()
case <- kill3:
return
}
}()
time.Sleep(time.Second)
l2.Stop()
time.Sleep(time.Second)
kill3 <- struct{}{}
}
10 changes: 2 additions & 8 deletions zk/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package zk
import (
"errors"
"fmt"
"strconv"
"strings"
)

Expand Down Expand Up @@ -34,11 +33,6 @@ func NewLock(c *Conn, path string, acl []ACL) *Lock {
}
}

func parseSeq(path string) (int, error) {
parts := strings.Split(path, "-")
return strconv.Atoi(parts[len(parts)-1])
}

// Lock attempts to acquire the lock. It will wait to return until the lock
// is acquired or an error occurs. If this instance already has the lock
// then ErrDeadlock is returned.
Expand Down Expand Up @@ -82,7 +76,7 @@ func (l *Lock) Lock() error {
return err
}

seq, err := parseSeq(path)
seq, err := parseSeq(path, "")
if err != nil {
return err
}
Expand All @@ -97,7 +91,7 @@ func (l *Lock) Lock() error {
prevSeq := -1
prevSeqPath := ""
for _, p := range children {
s, err := parseSeq(p)
s, err := parseSeq(p, "")
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 6e60c6b

Please sign in to comment.