Skip to content

Commit

Permalink
Merge pull request #211 from unihorn/55
Browse files Browse the repository at this point in the history
fix: wait for all goroutines to finish before Stop
  • Loading branch information
xiang90 committed Apr 4, 2014
2 parents f94d98c + 1961d0a commit 585c580
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 23 deletions.
16 changes: 16 additions & 0 deletions http_transporter.go
Expand Up @@ -244,6 +244,10 @@ func (t *HTTPTransporter) appendEntriesHandler(server Server) http.HandlerFunc {
}

resp := server.AppendEntries(req)
if resp == nil {
http.Error(w, "Failed creating response.", http.StatusInternalServerError)
return
}
if _, err := resp.Encode(w); err != nil {
http.Error(w, "", http.StatusInternalServerError)
return
Expand All @@ -263,6 +267,10 @@ func (t *HTTPTransporter) requestVoteHandler(server Server) http.HandlerFunc {
}

resp := server.RequestVote(req)
if resp == nil {
http.Error(w, "Failed creating response.", http.StatusInternalServerError)
return
}
if _, err := resp.Encode(w); err != nil {
http.Error(w, "", http.StatusInternalServerError)
return
Expand All @@ -282,6 +290,10 @@ func (t *HTTPTransporter) snapshotHandler(server Server) http.HandlerFunc {
}

resp := server.RequestSnapshot(req)
if resp == nil {
http.Error(w, "Failed creating response.", http.StatusInternalServerError)
return
}
if _, err := resp.Encode(w); err != nil {
http.Error(w, "", http.StatusInternalServerError)
return
Expand All @@ -301,6 +313,10 @@ func (t *HTTPTransporter) snapshotRecoveryHandler(server Server) http.HandlerFun
}

resp := server.SnapshotRecoveryRequest(req)
if resp == nil {
http.Error(w, "Failed creating response.", http.StatusInternalServerError)
return
}
if _, err := resp.Encode(w); err != nil {
http.Error(w, "", http.StatusInternalServerError)
return
Expand Down
7 changes: 6 additions & 1 deletion peer.go
Expand Up @@ -88,7 +88,12 @@ func (p *Peer) setLastActivity(now time.Time) {
func (p *Peer) startHeartbeat() {
p.stopChan = make(chan bool)
c := make(chan bool)
go p.heartbeat(c)

p.server.routineGroup.Add(1)
go func() {
defer p.server.routineGroup.Done()
p.heartbeat(c)
}()
<-c
}

Expand Down
74 changes: 52 additions & 22 deletions server.go
Expand Up @@ -55,6 +55,7 @@ const ElectionTimeoutThresholdPercent = 0.8
var NotLeaderError = errors.New("raft.Server: Not current leader")
var DuplicatePeerError = errors.New("raft.Server: Duplicate peer")
var CommandTimeoutError = errors.New("raft: Command timeout")
var StopError = errors.New("raft: Has been stopped")

//------------------------------------------------------------------------------
//
Expand Down Expand Up @@ -123,7 +124,7 @@ type server struct {
mutex sync.RWMutex
syncedPeer map[string]bool

stopped chan chan bool
stopped chan bool
c chan *ev
electionTimeout time.Duration
heartbeatInterval time.Duration
Expand All @@ -140,6 +141,8 @@ type server struct {
maxLogEntriesPerRequest uint64

connectionString string

routineGroup sync.WaitGroup
}

// An internal event to be processed by the server's event loop.
Expand Down Expand Up @@ -177,7 +180,6 @@ func NewServer(name string, path string, transporter Transporter, stateMachine S
state: Stopped,
peers: make(map[string]*Peer),
log: newLog(),
stopped: make(chan chan bool),
c: make(chan *ev, 256),
electionTimeout: DefaultElectionTimeout,
heartbeatInterval: DefaultHeartbeatInterval,
Expand Down Expand Up @@ -440,6 +442,9 @@ func (s *server) Start() error {
return err
}

// stopped needs to be allocated each time server starts
// because it is closed at `Stop`.
s.stopped = make(chan bool)
s.setState(Follower)

// If no log entries exist then
Expand All @@ -457,7 +462,11 @@ func (s *server) Start() error {

debugln(s.GetState())

go s.loop()
s.routineGroup.Add(1)
go func() {
defer s.routineGroup.Done()
s.loop()
}()

return nil
}
Expand Down Expand Up @@ -507,11 +516,11 @@ func (s *server) Stop() {
return
}

stop := make(chan bool)
s.stopped <- stop
close(s.stopped)

// make sure all goroutines have stopped before we close the log
s.routineGroup.Wait()

// make sure the server has stopped before we close the log
<-stop
s.log.close()
s.setState(Stopped)
}
Expand Down Expand Up @@ -605,9 +614,17 @@ func (s *server) loop() {
// until the event is actually processed before returning.
func (s *server) send(value interface{}) (interface{}, error) {
event := &ev{target: value, c: make(chan error, 1)}
s.c <- event
err := <-event.c
return event.returnValue, err
select {
case s.c <- event:
case <-s.stopped:
return nil, StopError
}
select {
case <-s.stopped:
return nil, StopError
case err := <-event.c:
return event.returnValue, err
}
}

func (s *server) sendAsync(value interface{}) {
Expand All @@ -621,8 +638,13 @@ func (s *server) sendAsync(value interface{}) {
default:
}

s.routineGroup.Add(1)
go func() {
s.c <- event
defer s.routineGroup.Done()
select {
case s.c <- event:
case <-s.stopped:
}
}()
}

Expand All @@ -640,9 +662,8 @@ func (s *server) followerLoop() {
var err error
update := false
select {
case stop := <-s.stopped:
case <-s.stopped:
s.setState(Stopped)
stop <- true
return

case e := <-s.c:
Expand Down Expand Up @@ -717,7 +738,11 @@ func (s *server) candidateLoop() {
// Send RequestVote RPCs to all other servers.
respChan = make(chan *RequestVoteResponse, len(s.peers))
for _, peer := range s.peers {
go peer.sendVoteRequest(newRequestVoteRequest(s.currentTerm, s.name, lastLogIndex, lastLogTerm), respChan)
s.routineGroup.Add(1)
go func(peer *Peer) {
defer s.routineGroup.Done()
peer.sendVoteRequest(newRequestVoteRequest(s.currentTerm, s.name, lastLogIndex, lastLogTerm), respChan)
}(peer)
}

// Wait for either:
Expand All @@ -740,9 +765,8 @@ func (s *server) candidateLoop() {

// Collect votes from peers.
select {
case stop := <-s.stopped:
case <-s.stopped:
s.setState(Stopped)
stop <- true
return

case resp := <-respChan:
Expand Down Expand Up @@ -786,19 +810,22 @@ func (s *server) leaderLoop() {
// "Upon election: send initial empty AppendEntries RPCs (heartbeat) to
// each server; repeat during idle periods to prevent election timeouts
// (§5.2)". The heartbeats started above do the "idle" period work.
go s.Do(NOPCommand{})
s.routineGroup.Add(1)
go func() {
defer s.routineGroup.Done()
s.Do(NOPCommand{})
}()

// Begin to collect response from followers
for s.State() == Leader {
var err error
select {
case stop := <-s.stopped:
case <-s.stopped:
// Stop all peers before stop
for _, peer := range s.peers {
peer.stopHeartbeat(false)
}
s.setState(Stopped)
stop <- true
return

case e := <-s.c:
Expand Down Expand Up @@ -826,9 +853,8 @@ func (s *server) snapshotLoop() {
for s.State() == Snapshotting {
var err error
select {
case stop := <-s.stopped:
case <-s.stopped:
s.setState(Stopped)
stop <- true
return

case e := <-s.c:
Expand Down Expand Up @@ -1109,7 +1135,11 @@ func (s *server) RemovePeer(name string) error {
// So we might be holding log lock and waiting for log lock,
// which lead to a deadlock.
// TODO(xiangli) refactor log lock
go peer.stopHeartbeat(true)
s.routineGroup.Add(1)
go func() {
defer s.routineGroup.Done()
peer.stopHeartbeat(true)
}()
}

delete(s.peers, name)
Expand Down

0 comments on commit 585c580

Please sign in to comment.