Skip to content

Commit

Permalink
Merge pull request #39 from xiangli-cmu/server
Browse files Browse the repository at this point in the history
Blocking the transportation when the server will stepdown
  • Loading branch information
benbjohnson committed Jul 3, 2013
2 parents 73aa2c5 + 8844583 commit 8b8791e
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 31 deletions.
45 changes: 20 additions & 25 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,12 +370,16 @@ func (s *Server) collectVotes(c chan *RequestVoteResponse) (bool, bool) {
// Step down if we discover a higher term.
s.mutex.Lock()

s.setCurrentTerm(resp.Term)
s.state = Follower
s.currentTerm = resp.Term

s.mutex.Unlock()
return false, false
}
}
case term := <- s.stepDown:
s.state = Follower
s.currentTerm = term

// TODO: do we calculate the overall timeout? or timeout for each vote?
// Some issue here
Expand Down Expand Up @@ -406,11 +410,17 @@ func (s *Server) commitCenter() {
}

case term := <-s.stepDown:
s.mutex.Lock()
// stepdown to follower

s.setCurrentTerm(term)
// stop heartbeats
for _, peer := range s.peers {
peer.stop()
}

s.mutex.Unlock()
s.state = Follower
s.currentTerm = term

s.StartElectionTimeout()
return
}

Expand Down Expand Up @@ -760,6 +770,7 @@ func (s *Server) RequestVote(req *RequestVoteRequest) (*RequestVoteResponse, err
if req.Term < s.currentTerm {
return NewRequestVoteResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Stale term: %v < %v", req.Term, s.currentTerm)
}

s.setCurrentTerm(req.Term)

// If we've already voted for a different candidate then don't vote for this candidate.
Expand Down Expand Up @@ -794,30 +805,14 @@ func (s *Server) setCurrentTerm(term uint64) {
if term > s.currentTerm {
s.votedFor = ""

if s.state == Leader {
debugln(s.Name(), " step down to a follower")
if s.state == Leader || s.state == Candidate{
debugln(s.Name(), " should step down to a follower from ", s.state)

// stop heartbeats
for _, peer := range s.peers {
peer.stop()
}

select {
case s.stepDown <- term:

default:
s.stepDown <- term

}

s.StartElectionTimeout()

// candidate should also start timeout
} else if s.state == Candidate {
s.StartElectionTimeout()
debugln(s.Name(), " step down to a follower from ", s.state)
return
}

s.state = Follower

// update term after stop all the peer
s.currentTerm = term
}
Expand Down
28 changes: 22 additions & 6 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ func TestServerMultiNode(t *testing.T) {
}
mutex.Unlock()

for i := 0; i < 20; i++ {
for i := 0; i < 200000; i++ {
i++
debugln("Round ", i)

Expand All @@ -453,20 +453,36 @@ func TestServerMultiNode(t *testing.T) {
for i := 0; i < 10; i++ {
debugln("[Test] do ", value.Name())
if _, err := value.Do(&TestCommand2{X: 1}); err != nil {
t.Fatalf("Unable to do command")
break
}
debugln("[Test] Done")
}

leader++
debugln("Leader is ", value.Name(), " Index ", value.log.commitIndex)
}
debugln("Not Found leader")
}
}
for {
for key, value := range servers {
if key != num {
if value.State() == Leader {
leader++
}
}
}

if leader != 1 {
t.Fatalf("wrong leader number %v", leader)
if leader > 1 {

t.Fatalf("wrong leader number %v", leader)
}
if leader == 0 {
leader = 0
time.Sleep(100 * time.Millisecond)
continue
}
if leader == 1 {
break
}
}

//mutex.Unlock()
Expand Down

0 comments on commit 8b8791e

Please sign in to comment.