Skip to content

Commit

Permalink
Clean up handling of raft outcome.
Browse files Browse the repository at this point in the history
The outcome handling was a mess. An important point is to be careful
when comparing state vs. outcome and making it explicit where it is
done, so that pitfalls like comparing already updated state instead
of the old state are avoided.
  • Loading branch information
martinfurmanski committed Mar 14, 2016
1 parent a3fba8f commit 6e9c577
Show file tree
Hide file tree
Showing 11 changed files with 118 additions and 85 deletions.
Expand Up @@ -47,7 +47,6 @@
import org.neo4j.coreedge.raft.state.StateStorage;
import org.neo4j.coreedge.raft.state.term.TermState;
import org.neo4j.coreedge.raft.state.vote.VoteState;
import org.neo4j.cursor.IOCursor;
import org.neo4j.kernel.impl.util.Listener;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.monitoring.Monitors;
Expand Down Expand Up @@ -88,7 +87,7 @@ public enum Timeouts implements RenewableTimeoutService.TimeoutName
ELECTION, HEARTBEAT
}

private final RaftState<MEMBER> raftState;
private final RaftState<MEMBER> state;
private final MEMBER myself;
private final RaftLog entryLog;

Expand All @@ -106,7 +105,6 @@ public enum Timeouts implements RenewableTimeoutService.TimeoutName

private final Outbound<MEMBER> outbound;
private final Log log;
private volatile boolean handlingMessage = false;
private Role currentRole = Role.FOLLOWER;

private RaftLogShippingManager<MEMBER> logShipping;
Expand Down Expand Up @@ -137,7 +135,7 @@ public RaftInstance( MEMBER myself, StateStorage<TermState> termStorage,

this.membershipManager = membershipManager;

this.raftState = new RaftState<>( myself, termStorage, membershipManager, entryLog, voteStorage );
this.state = new RaftState<>( myself, termStorage, membershipManager, entryLog, voteStorage );

leaderNotFoundMonitor = monitors.newMonitor( LeaderNotFoundMonitor.class );

Expand Down Expand Up @@ -200,7 +198,7 @@ public void setTargetMembershipSet( Set<MEMBER> targetMembers )

if ( currentRole == LEADER )
{
membershipManager.onFollowerStateChange( raftState.followerStates() );
membershipManager.onFollowerStateChange( state.followerStates() );
}
}

Expand Down Expand Up @@ -236,7 +234,7 @@ public MEMBER getLeader( long timeoutMillis, Predicate<MEMBER> predicate ) throw
public synchronized void registerListener( Listener<MEMBER> listener )
{
leaderListeners.add( listener );
listener.receive( raftState.leader() );
listener.receive( state.leader() );
}

@Override
Expand All @@ -247,23 +245,16 @@ public synchronized void unregisterListener( Listener<MEMBER> listener )

public ReadableRaftState<MEMBER> state()
{
return raftState;
return state;
}

private void handleOutcome( Outcome<MEMBER> outcome ) throws IOException
{
adjustLogShipping( outcome );
notifyLeaderChanges( outcome );

raftState.update( outcome );
membershipManager.processLog( outcome.getLogCommands() );
consensusListener.notifyCommitted();
volatileLeader.set( outcome.getLeader() );
}

private void notifyLeaderChanges( Outcome<MEMBER> outcome )
{
if ( leaderChanged( outcome, raftState.leader() ) )
if ( leaderChanged( outcome, state.leader() ) )
{
for ( Listener<MEMBER> listener : leaderListeners )
{
Expand All @@ -272,26 +263,22 @@ private void notifyLeaderChanges( Outcome<MEMBER> outcome )
}
}

private void adjustLogShipping( Outcome<MEMBER> outcome ) throws IOException
private void handleLogShipping( Outcome<MEMBER> outcome ) throws IOException
{
MEMBER oldLeader = raftState.leader();

if ( myself.equals( outcome.getLeader() ) )
LeaderContext leaderContext = new LeaderContext( outcome.getTerm(), outcome.getLeaderCommit() );
if ( outcome.isElectedLeader() )
{
LeaderContext leaderContext = new LeaderContext( outcome.getTerm(), outcome.getLeaderCommit() );

if ( !myself.equals( oldLeader ) )
{
// We became leader, start the log shipping.
logShipping.start( leaderContext );
}

logShipping.handleCommands( outcome.getShipCommands(), leaderContext );
logShipping.start( leaderContext );
}
else if ( myself.equals( oldLeader ) && !myself.equals( outcome.getLeader() ) )
else if ( outcome.isSteppingDown() )
{
logShipping.stop();
}

if( outcome.getRole() == LEADER )
{
logShipping.handleCommands( outcome.getShipCommands(), leaderContext );
}
}

private boolean leaderChanged( Outcome<MEMBER> outcome, MEMBER oldLeader )
Expand All @@ -310,45 +297,56 @@ else if ( oldLeader != null && !oldLeader.equals( outcome.getLeader() ) )

public synchronized void handle( Message incomingMessage )
{
if ( handlingMessage )
{
throw new IllegalStateException( "recursive use" );
}

try
{
handlingMessage = true;
Outcome<MEMBER> outcome = currentRole.handler.handle(
(RaftMessages.RaftMessage<MEMBER>) incomingMessage, state, log );

Outcome<MEMBER> outcome = currentRole.handler.handle( (RaftMessages.RaftMessage<MEMBER>) incomingMessage,
raftState, log );
state.update( outcome );
sendMessages( outcome );

handleOutcome( outcome );
currentRole = outcome.getNewRole();
handleTimers( outcome );
handleLogShipping( outcome );

for ( RaftMessages.Directed<MEMBER> outgoingMessage : outcome.getOutgoingMessages() )
{
outbound.send( outgoingMessage.to(), outgoingMessage.message() );
}
if ( outcome.electionTimeoutRenewed() )
{
electionTimer.renew();
}
membershipManager.processLog( outcome.getLogCommands() );
driveMembership( outcome );

membershipManager.onRole( currentRole );
volatileLeader.set( outcome.getLeader() );

if ( currentRole == LEADER )
{
membershipManager.onFollowerStateChange( raftState.followerStates() );
}
raftStateMachine.notifyUpdate();
notifyLeaderChanges( outcome );
}
catch ( IOException e )
catch ( Throwable e )
{
log.error( "Failed to process RAFT message " + incomingMessage, e );
databaseHealthSupplier.get().panic( e );
}
finally
}

private void driveMembership( Outcome<MEMBER> outcome )
{
currentRole = outcome.getRole();
membershipManager.onRole( currentRole );

if ( currentRole == LEADER )
{
membershipManager.onFollowerStateChange( state.followerStates() );
}
}

private void handleTimers( Outcome<MEMBER> outcome )
{
if ( outcome.electionTimeoutRenewed() )
{
electionTimer.renew();
}
}

private void sendMessages( Outcome<MEMBER> outcome )
{
for ( RaftMessages.Directed<MEMBER> outgoingMessage : outcome.getOutgoingMessages() )
{
handlingMessage = false;
outbound.send( outgoingMessage.to(), outgoingMessage.message() );
}
}

Expand Down Expand Up @@ -389,7 +387,7 @@ public BootstrapException( Throwable cause )

public long term()
{
return raftState.term();
return state.term();
}

private long randomTimeoutRange()
Expand Down
Expand Up @@ -41,7 +41,7 @@
public class Outcome<MEMBER> implements Message
{
/* Common */
private Role newRole;
private Role nextRole;

private long term;
private MEMBER leader;
Expand All @@ -62,19 +62,21 @@ public class Outcome<MEMBER> implements Message
/* Leader */
private FollowerStates<MEMBER> followerStates;
private Collection<ShipCommand> shipCommands = new ArrayList<>();
private boolean electedLeader;
private boolean steppingDown;

public Outcome( Role currentRole, ReadableRaftState<MEMBER> ctx )
{
defaults( currentRole, ctx );
}

public Outcome( Role newRole, long term, MEMBER leader, long leaderCommit, MEMBER votedFor,
public Outcome( Role nextRole, long term, MEMBER leader, long leaderCommit, MEMBER votedFor,
Set<MEMBER> votesForMe, long lastLogIndexBeforeWeBecameLeader,
FollowerStates<MEMBER> followerStates, boolean renewElectionTimeout,
Collection<LogCommand> logCommands, Collection<RaftMessages.Directed<MEMBER>> outgoingMessages,
Collection<ShipCommand> shipCommands )
{
this.newRole = newRole;
this.nextRole = nextRole;
this.term = term;
this.leader = leader;
this.leaderCommit = leaderCommit;
Expand All @@ -91,7 +93,7 @@ public Outcome( Role newRole, long term, MEMBER leader, long leaderCommit, MEMBE

private void defaults( Role currentRole, ReadableRaftState<MEMBER> ctx )
{
newRole = currentRole;
nextRole = currentRole;

term = ctx.term();
leader = ctx.leader();
Expand All @@ -109,7 +111,7 @@ private void defaults( Role currentRole, ReadableRaftState<MEMBER> ctx )

public void setNextRole( Role nextRole )
{
this.newRole = nextRole;
this.nextRole = nextRole;
}

public void setNextTerm( long nextTerm )
Expand Down Expand Up @@ -167,11 +169,23 @@ public void addShipCommand( ShipCommand shipCommand )
shipCommands.add( shipCommand );
}

public void electedLeader()
{
assert !steppingDown;
this.electedLeader = true;
}

public void steppingDown()
{
assert !electedLeader;
this.steppingDown = true;
}

@Override
public String toString()
{
return "Outcome{" +
"nextRole=" + newRole +
"nextRole=" + nextRole +
", newTerm=" + term +
", leader=" + leader +
", leaderCommit=" + leaderCommit +
Expand All @@ -183,12 +197,14 @@ public String toString()
", updatedFollowerStates=" + followerStates +
", renewElectionTimeout=" + renewElectionTimeout +
", outgoingMessages=" + outgoingMessages +
", electedLeader=" + electedLeader +
", steppingDown=" + steppingDown +
'}';
}

public Role getNewRole()
public Role getRole()
{
return newRole;
return nextRole;
}

public long getTerm()
Expand Down Expand Up @@ -245,4 +261,14 @@ public Collection<ShipCommand> getShipCommands()
{
return shipCommands;
}

public boolean isElectedLeader()
{
return electedLeader;
}

public boolean isSteppingDown()
{
return steppingDown;
}
}
Expand Up @@ -64,7 +64,7 @@ public static <MEMBER> void handleAppendEntriesRequest(
long baseIndex = request.prevLogIndex() + 1;
int offset;

/* Find possible truncation point. */
/* Find possible truncation point. */
for ( offset = 0; offset < request.entries().length; offset++ )
{
long logTerm = state.entryLog().readEntryTerm( baseIndex + offset );
Expand Down
Expand Up @@ -105,6 +105,7 @@ else if ( res.term() < ctx.term() || !res.voteGranted() )
Appending.appendNewEntry( ctx, outcome, new NewLeaderBarrier() );

outcome.setLastLogIndexBeforeWeBecameLeader( ctx.entryLog().appendIndex() );
outcome.electedLeader();
outcome.setNextRole( LEADER );
}
break;
Expand Down
Expand Up @@ -74,6 +74,7 @@ public <MEMBER> Outcome<MEMBER> handle( RaftMessages.RaftMessage<MEMBER> message
break;
}

outcome.steppingDown();
outcome.setNextRole( FOLLOWER );
Heart.beat( ctx, outcome, (Heartbeat<MEMBER>) message );
break;
Expand Down Expand Up @@ -105,6 +106,7 @@ else if ( req.leaderTerm() == ctx.term() )
else
{
// There is a new leader in a later term, we should revert to follower. (§5.1)
outcome.steppingDown();
outcome.setNextRole( FOLLOWER );
Appending.handleAppendEntriesRequest( ctx, outcome, req );
break;
Expand All @@ -123,6 +125,7 @@ else if ( req.leaderTerm() == ctx.term() )
else if ( res.term() > ctx.term() )
{
outcome.setNextTerm( res.term() );
outcome.steppingDown();
outcome.setNextRole( FOLLOWER );
outcome.replaceFollowerStates( new FollowerStates<>() );
break;
Expand Down Expand Up @@ -181,6 +184,7 @@ else if ( res.term() > ctx.term() )

if ( req.term() > ctx.term() )
{
outcome.steppingDown();
outcome.setNextRole( FOLLOWER );
Voting.handleVoteRequest( ctx, outcome, req );
break;
Expand Down
Expand Up @@ -73,7 +73,7 @@ public void shouldBeElectedLeaderOnReceivingGrantedVoteResponseWithCurrentTerm()
.build(), state, log() );

// then
assertEquals( LEADER, outcome.getNewRole() );
assertEquals( LEADER, outcome.getRole() );
assertThat( outcome.getLogCommands(), hasItem( new AppendLogEntry( 0,
new RaftLogEntry( state.term(), new NewLeaderBarrier() ) ) ) );
}
Expand All @@ -92,7 +92,7 @@ public void shouldStayAsCandidateOnReceivingDeniedVoteResponseWithCurrentTerm()
.build(), state, log() );

// then
assertEquals( CANDIDATE, outcome.getNewRole() );
assertEquals( CANDIDATE, outcome.getRole() );
}

@Test
Expand All @@ -111,7 +111,7 @@ public void shouldUpdateTermOnReceivingVoteResponseWithLaterTerm() throws Except
.build(), state, log() );

// then
assertEquals( FOLLOWER, outcome.getNewRole() );
assertEquals( FOLLOWER, outcome.getRole() );
assertEquals( voterTerm, outcome.getTerm() );
}

Expand All @@ -131,7 +131,7 @@ public void shouldRejectVoteResponseWithOldTerm() throws Exception
.build(), state, log() );

// then
assertEquals( CANDIDATE, outcome.getNewRole() );
assertEquals( CANDIDATE, outcome.getRole() );
}

public RaftState<RaftTestMember> newState() throws IOException
Expand Down

0 comments on commit 6e9c577

Please sign in to comment.