Skip to content

Commit

Permalink
Merge pull request #4353 from tinyspeck/fix-for-consul-master-election
Browse files Browse the repository at this point in the history
Do not reuse a consul lock
  • Loading branch information
demmer committed Dec 8, 2018
2 parents eccf912 + dc2cbe0 commit 8e64c77
Showing 1 changed file with 17 additions and 18 deletions.
35 changes: 17 additions & 18 deletions go/vt/topo/consultopo/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,8 @@ import (

// NewMasterParticipation is part of the topo.Server interface
func (s *Server) NewMasterParticipation(name, id string) (topo.MasterParticipation, error) {
// Create the lock here.
electionPath := path.Join(s.root, electionsPath, name)
l, err := s.client.LockOpts(&api.LockOptions{
Key: electionPath,
Value: []byte(id),
})
if err != nil {
return nil, err
}

return &consulMasterParticipation{
s: s,
lock: l,
name: name,
id: id,
stop: make(chan struct{}),
Expand All @@ -56,9 +45,6 @@ type consulMasterParticipation struct {
// s is our parent consul topo Server
s *Server

// lock is the *api.Lock structure we're going to use.
lock *api.Lock

// name is the name of this MasterParticipation
name string

Expand All @@ -74,6 +60,16 @@ type consulMasterParticipation struct {

// WaitForMastership is part of the topo.MasterParticipation interface.
func (mp *consulMasterParticipation) WaitForMastership() (context.Context, error) {

electionPath := path.Join(mp.s.root, electionsPath, mp.name)
l, err := mp.s.client.LockOpts(&api.LockOptions{
Key: electionPath,
Value: []byte(mp.id),
})
if err != nil {
return nil, err
}

// If Stop was already called, mp.done is closed, so we are interrupted.
select {
case <-mp.done:
Expand All @@ -82,7 +78,7 @@ func (mp *consulMasterParticipation) WaitForMastership() (context.Context, error
}

// Try to lock until mp.stop is closed.
lost, err := mp.lock.Lock(mp.stop)
lost, err := l.Lock(mp.stop)
if err != nil {
// We can't lock. See if it was because we got canceled.
select {
Expand All @@ -93,19 +89,22 @@ func (mp *consulMasterParticipation) WaitForMastership() (context.Context, error
return nil, err
}

// We have the lock, keep mastership until we loose it.
// We have the lock, keep mastership until we lose it.
lockCtx, lockCancel := context.WithCancel(context.Background())
go func() {
select {
case <-lost:
// We lost the lock, nothing to do but lockCancel().
lockCancel()
// We could have lost the lock. Per consul API, explicitly call Unlock to make sure that session will not be renewed.
if err := l.Unlock(); err != nil {
log.Errorf("master election(%v) Unlock failed: %v", mp.name, err)
}
case <-mp.stop:
// Stop was called. We stop the context first,
// so the running process is not thinking it
// is the master any more, then we unlock.
lockCancel()
if err := mp.lock.Unlock(); err != nil {
if err := l.Unlock(); err != nil {
log.Errorf("master election(%v) Unlock failed: %v", mp.name, err)
}
close(mp.done)
Expand Down

0 comments on commit 8e64c77

Please sign in to comment.