Skip to content

Commit

Permalink
Leader steps down at election timeout if no heartbeat responses.
Browse files Browse the repository at this point in the history
  • Loading branch information
Max Sumrall committed Oct 31, 2016
1 parent 374233f commit 997f292
Show file tree
Hide file tree
Showing 6 changed files with 324 additions and 209 deletions.
Expand Up @@ -148,7 +148,7 @@ public void triggerElection() throws IOException
handle( new RaftMessages.Timeout.Election( myself ) ); handle( new RaftMessages.Timeout.Election( myself ) );
} }


public void stopTimers() public void panic()
{ {
heartbeatTimer.cancel(); heartbeatTimer.cancel();
electionTimer.cancel(); electionTimer.cancel();
Expand Down
Expand Up @@ -20,7 +20,10 @@
package org.neo4j.causalclustering.core.consensus.roles; package org.neo4j.causalclustering.core.consensus.roles;


import java.io.IOException; import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;


import org.neo4j.causalclustering.core.consensus.Followers; import org.neo4j.causalclustering.core.consensus.Followers;
import org.neo4j.causalclustering.core.consensus.RaftMessageHandler; import org.neo4j.causalclustering.core.consensus.RaftMessageHandler;
Expand All @@ -29,20 +32,24 @@
import org.neo4j.causalclustering.core.consensus.RaftMessages.LogCompactionInfo; import org.neo4j.causalclustering.core.consensus.RaftMessages.LogCompactionInfo;
import org.neo4j.causalclustering.core.consensus.outcome.Outcome; import org.neo4j.causalclustering.core.consensus.outcome.Outcome;
import org.neo4j.causalclustering.core.consensus.outcome.ShipCommand; import org.neo4j.causalclustering.core.consensus.outcome.ShipCommand;
import org.neo4j.causalclustering.core.replication.ReplicatedContent;
import org.neo4j.causalclustering.core.consensus.state.ReadableRaftState;
import org.neo4j.causalclustering.core.consensus.roles.follower.FollowerState; import org.neo4j.causalclustering.core.consensus.roles.follower.FollowerState;
import org.neo4j.causalclustering.core.consensus.roles.follower.FollowerStates; import org.neo4j.causalclustering.core.consensus.roles.follower.FollowerStates;
import org.neo4j.causalclustering.core.consensus.state.ReadableRaftState;
import org.neo4j.causalclustering.core.replication.ReplicatedContent;
import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.helpers.collection.FilteringIterable; import org.neo4j.helpers.collection.FilteringIterable;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;


import static java.lang.Math.max; import static java.lang.Math.max;
import static org.neo4j.causalclustering.core.consensus.MajorityIncludingSelfQuorum.isQuorum;
import static org.neo4j.causalclustering.core.consensus.roles.Role.FOLLOWER; import static org.neo4j.causalclustering.core.consensus.roles.Role.FOLLOWER;
import static org.neo4j.causalclustering.core.consensus.roles.Role.LEADER; import static org.neo4j.causalclustering.core.consensus.roles.Role.LEADER;


public class Leader implements RaftMessageHandler public class Leader implements RaftMessageHandler
{ {
private final Set<MemberId> heartbeatResponses = Collections.synchronizedSet( new HashSet<>() );
private boolean receivedHeartbeats = false;

private static Iterable<MemberId> replicationTargets( final ReadableRaftState ctx ) private static Iterable<MemberId> replicationTargets( final ReadableRaftState ctx )
{ {
return new FilteringIterable<>( ctx.replicationMembers(), member -> !member.equals( ctx.myself() ) ); return new FilteringIterable<>( ctx.replicationMembers(), member -> !member.equals( ctx.myself() ) );
Expand All @@ -60,97 +67,116 @@ private static void sendHeartbeats( ReadableRaftState ctx, Outcome outcome ) thr
} }


@Override @Override
public Outcome handle( RaftMessages.RaftMessage message, ReadableRaftState ctx, public Outcome handle( RaftMessages.RaftMessage message, ReadableRaftState ctx, Log log ) throws IOException
Log log ) throws IOException
{ {
Outcome outcome = new Outcome( LEADER, ctx ); Outcome outcome = new Outcome( LEADER, ctx );


switch ( message.type() ) switch ( message.type() )
{ {
case HEARTBEAT: case HEARTBEAT:
{ {
Heartbeat req = (Heartbeat) message; Heartbeat req = (Heartbeat) message;

if ( req.leaderTerm() < ctx.term() )
{
break;
}


outcome.steppingDown(); if ( req.leaderTerm() < ctx.term() )
outcome.setNextRole( FOLLOWER ); {
log.info( "Moving to FOLLOWER state after receiving heartbeat at term %d (my term is " +
"%d) from %s", req.leaderTerm(), ctx.term(), req.from() );
Heart.beat( ctx, outcome, (Heartbeat) message, log );
break; break;
} }


case HEARTBEAT_TIMEOUT: stepDownToFollower( outcome );
log.info( "Moving to FOLLOWER state after receiving heartbeat at term %d (my term is " + "%d) from %s",
req.leaderTerm(), ctx.term(), req.from() );
Heart.beat( ctx, outcome, (Heartbeat) message, log );
break;
}

case HEARTBEAT_TIMEOUT:
{
sendHeartbeats( ctx, outcome );
break;
}

case HEARTBEAT_RESPONSE:
{
receivedHeartbeats = true;
heartbeatResponses.add( message.from() );
break;
}

case ELECTION_TIMEOUT:
{
if ( !isQuorum( ctx.votingMembers().size(), heartbeatResponses.size() ) && receivedHeartbeats )
{ {
sendHeartbeats( ctx, outcome ); stepDownToFollower( outcome );
break; log.info( "Moving to FOLLOWER state after not receiving heartbeat responses in this election timeout " +
} "period. Heartbeats received: %s", heartbeatResponses );


case APPEND_ENTRIES_REQUEST: }
else
{ {
RaftMessages.AppendEntries.Request req = (RaftMessages.AppendEntries.Request) message; heartbeatResponses.clear();
}
break;
}


if ( req.leaderTerm() < ctx.term() ) case APPEND_ENTRIES_REQUEST:
{ {
RaftMessages.AppendEntries.Response appendResponse = RaftMessages.AppendEntries.Request req = (RaftMessages.AppendEntries.Request) message;
new RaftMessages.AppendEntries.Response( ctx.myself(), ctx.term(), false, -1,
ctx.entryLog().appendIndex() );


outcome.addOutgoingMessage( new RaftMessages.Directed( req.from(), appendResponse ) ); if ( req.leaderTerm() < ctx.term() )
break; {
} RaftMessages.AppendEntries.Response appendResponse =
else if ( req.leaderTerm() == ctx.term() ) new RaftMessages.AppendEntries.Response( ctx.myself(), ctx.term(), false, -1,
{ ctx.entryLog().appendIndex() );
throw new IllegalStateException( "Two leaders in the same term." );
}
else
{
// There is a new leader in a later term, we should revert to follower. (§5.1)
outcome.steppingDown();
outcome.setNextRole( FOLLOWER );
log.info( "Moving to FOLLOWER state after receiving append request at term %d (my term is " +
"%d) from %s", req.leaderTerm(), ctx.term(), req.from() );
Appending.handleAppendEntriesRequest( ctx, outcome, req, log );
break;
}
}


case APPEND_ENTRIES_RESPONSE: outcome.addOutgoingMessage( new RaftMessages.Directed( req.from(), appendResponse ) );
break;
}
else if ( req.leaderTerm() == ctx.term() )
{ {
RaftMessages.AppendEntries.Response response = (RaftMessages.AppendEntries.Response) message; throw new IllegalStateException( "Two leaders in the same term." );
}
else
{
// There is a new leader in a later term, we should revert to follower. (§5.1)
stepDownToFollower( outcome );
log.info( "Moving to FOLLOWER state after receiving append request at term %d (my term is " +
"%d) from %s", req.leaderTerm(), ctx.term(), req.from() );
Appending.handleAppendEntriesRequest( ctx, outcome, req, log );
break;
}
}


if ( response.term() < ctx.term() ) case APPEND_ENTRIES_RESPONSE:
{ {
RaftMessages.AppendEntries.Response response = (RaftMessages.AppendEntries.Response) message;

if ( response.term() < ctx.term() )
{
/* Ignore responses from old terms! */ /* Ignore responses from old terms! */
break; break;
} }
else if ( response.term() > ctx.term() ) else if ( response.term() > ctx.term() )
{ {
outcome.setNextTerm( response.term() ); outcome.setNextTerm( response.term() );
outcome.steppingDown(); stepDownToFollower( outcome );
outcome.setNextRole( FOLLOWER ); log.info( "Moving to FOLLOWER state after receiving append response at term %d (my term is " +
log.info( "Moving to FOLLOWER state after receiving append response at term %d (my term is " + "%d) from %s", response.term(), ctx.term(), response.from() );
"%d) from %s", response.term(), ctx.term(), response.from() ); outcome.replaceFollowerStates( new FollowerStates<>() );
outcome.replaceFollowerStates( new FollowerStates<>() ); break;
break; }
}


FollowerState follower = ctx.followerStates().get( response.from() ); FollowerState follower = ctx.followerStates().get( response.from() );


if ( response.success() ) if ( response.success() )
{ {
assert response.matchIndex() <= ctx.entryLog().appendIndex(); assert response.matchIndex() <= ctx.entryLog().appendIndex();


boolean followerProgressed = response.matchIndex() > follower.getMatchIndex(); boolean followerProgressed = response.matchIndex() > follower.getMatchIndex();


outcome.replaceFollowerStates( outcome.getFollowerStates() outcome.replaceFollowerStates( outcome.getFollowerStates()
.onSuccessResponse( response.from(), max( response.matchIndex(), follower.getMatchIndex() ) ) ); .onSuccessResponse( response.from(), max( response.matchIndex(), follower.getMatchIndex() ) ) );


outcome.addShipCommand( new ShipCommand.Match( response.matchIndex(), response.from() ) ); outcome.addShipCommand( new ShipCommand.Match( response.matchIndex(), response.from() ) );


/* /*
* Matches from older terms can in complicated leadership change / log truncation scenarios * Matches from older terms can in complicated leadership change / log truncation scenarios
Expand All @@ -159,85 +185,95 @@ else if ( response.term() > ctx.term() )
* and are ready for commit. * and are ready for commit.
* This is explained nicely in Figure 3.7 of the thesis * This is explained nicely in Figure 3.7 of the thesis
*/ */
boolean matchInCurrentTerm = ctx.entryLog().readEntryTerm( response.matchIndex() ) == ctx.term(); boolean matchInCurrentTerm = ctx.entryLog().readEntryTerm( response.matchIndex() ) == ctx.term();


/* /*
* The quorum situation may have changed only if the follower actually progressed. * The quorum situation may have changed only if the follower actually progressed.
*/ */
if ( followerProgressed && matchInCurrentTerm ) if ( followerProgressed && matchInCurrentTerm )
{
// TODO: Test that mismatch between voting and participating members affects commit outcome

long quorumAppendIndex =
Followers.quorumAppendIndex( ctx.votingMembers(), outcome.getFollowerStates() );
if ( quorumAppendIndex > ctx.commitIndex() )
{
outcome.setLeaderCommit( quorumAppendIndex );
outcome.setCommitIndex( quorumAppendIndex );
outcome.addShipCommand( new ShipCommand.CommitUpdate() );
}
}
}
else // Response indicated failure.
{ {
if ( response.appendIndex() > -1 && response.appendIndex() >= ctx.entryLog().prevIndex() ) // TODO: Test that mismatch between voting and participating members affects commit outcome
{
// Signal a mismatch to the log shipper, which will serve an earlier entry.
outcome.addShipCommand( new ShipCommand.Mismatch( response.appendIndex(), response.from() ) );
}
else
{
// There are no earlier entries, message the follower that we have compacted so that
// it can take appropriate action.
LogCompactionInfo compactionInfo = new LogCompactionInfo( ctx.myself(), ctx.term(), ctx.entryLog().prevIndex() );
RaftMessages.Directed directedCompactionInfo = new RaftMessages.Directed( response.from(), compactionInfo );


outcome.addOutgoingMessage( directedCompactionInfo ); long quorumAppendIndex =
Followers.quorumAppendIndex( ctx.votingMembers(), outcome.getFollowerStates() );
if ( quorumAppendIndex > ctx.commitIndex() )
{
outcome.setLeaderCommit( quorumAppendIndex );
outcome.setCommitIndex( quorumAppendIndex );
outcome.addShipCommand( new ShipCommand.CommitUpdate() );
} }
} }
break;
} }

else // Response indicated failure.
case VOTE_REQUEST:
{ {
RaftMessages.Vote.Request req = (RaftMessages.Vote.Request) message; if ( response.appendIndex() > -1 && response.appendIndex() >= ctx.entryLog().prevIndex() )

if ( req.term() > ctx.term() )
{ {
outcome.steppingDown(); // Signal a mismatch to the log shipper, which will serve an earlier entry.
log.info( "Moving to FOLLOWER state after receiving vote request at term %d (my term is " + outcome.addShipCommand( new ShipCommand.Mismatch( response.appendIndex(), response.from() ) );
"%d) from %s", req.term(), ctx.term(), req.from() ); }

else
outcome.setNextRole( FOLLOWER ); {
Voting.handleVoteRequest( ctx, outcome, req ); // There are no earlier entries, message the follower that we have compacted so that
break; // it can take appropriate action.
LogCompactionInfo compactionInfo =
new LogCompactionInfo( ctx.myself(), ctx.term(), ctx.entryLog().prevIndex() );
RaftMessages.Directed directedCompactionInfo =
new RaftMessages.Directed( response.from(), compactionInfo );

outcome.addOutgoingMessage( directedCompactionInfo );
} }

outcome.addOutgoingMessage( new RaftMessages.Directed( req.from(),
new RaftMessages.Vote.Response( ctx.myself(), ctx.term(), false ) ) );
break;
} }
break;
}


case NEW_ENTRY_REQUEST: case VOTE_REQUEST:
{ {
RaftMessages.NewEntry.Request req = (RaftMessages.NewEntry.Request) message; RaftMessages.Vote.Request req = (RaftMessages.Vote.Request) message;
ReplicatedContent content = req.content();
Appending.appendNewEntry( ctx, outcome, content );
break;
}


case NEW_BATCH_REQUEST: if ( req.term() > ctx.term() )
{ {
RaftMessages.NewEntry.BatchRequest req = (RaftMessages.NewEntry.BatchRequest) message; stepDownToFollower( outcome );
List<ReplicatedContent> contents = req.contents(); log.info(
Appending.appendNewEntries( ctx, outcome, contents ); "Moving to FOLLOWER state after receiving vote request at term %d (my term is " + "%d) from %s",
req.term(), ctx.term(), req.from() );

Voting.handleVoteRequest( ctx, outcome, req );
break; break;
} }


default: outcome.addOutgoingMessage( new RaftMessages.Directed( req.from(),
break; new RaftMessages.Vote.Response( ctx.myself(), ctx.term(), false ) ) );
break;
}

case NEW_ENTRY_REQUEST:
{
RaftMessages.NewEntry.Request req = (RaftMessages.NewEntry.Request) message;
ReplicatedContent content = req.content();
Appending.appendNewEntry( ctx, outcome, content );
break;
}

case NEW_BATCH_REQUEST:
{
RaftMessages.NewEntry.BatchRequest req = (RaftMessages.NewEntry.BatchRequest) message;
List<ReplicatedContent> contents = req.contents();
Appending.appendNewEntries( ctx, outcome, contents );
break;
}

default:
break;
} }


return outcome; return outcome;
} }

private void stepDownToFollower( Outcome outcome )
{
heartbeatResponses.clear();
receivedHeartbeats = false;
outcome.steppingDown();
outcome.setNextRole( FOLLOWER );
}
} }
Expand Up @@ -87,7 +87,7 @@ public void handle( RaftMessages.ClusterIdAwareMessage clusterIdAwareMessage )
catch ( Throwable e ) catch ( Throwable e )
{ {
log.error( "Error handling message", e ); log.error( "Error handling message", e );
raftMachine.stopTimers(); raftMachine.panic();
localDatabase.panic( e ); localDatabase.panic( e );
} }
} }
Expand Down

0 comments on commit 997f292

Please sign in to comment.