diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java index 5aa20327e552e..9cecab369fdf5 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java @@ -47,7 +47,6 @@ import org.neo4j.coreedge.raft.state.StateStorage; import org.neo4j.coreedge.raft.state.term.TermState; import org.neo4j.coreedge.raft.state.vote.VoteState; -import org.neo4j.cursor.IOCursor; import org.neo4j.kernel.impl.util.Listener; import org.neo4j.kernel.internal.DatabaseHealth; import org.neo4j.kernel.monitoring.Monitors; @@ -88,7 +87,7 @@ public enum Timeouts implements RenewableTimeoutService.TimeoutName ELECTION, HEARTBEAT } - private final RaftState raftState; + private final RaftState state; private final MEMBER myself; private final RaftLog entryLog; @@ -106,7 +105,6 @@ public enum Timeouts implements RenewableTimeoutService.TimeoutName private final Outbound outbound; private final Log log; - private volatile boolean handlingMessage = false; private Role currentRole = Role.FOLLOWER; private RaftLogShippingManager logShipping; @@ -137,7 +135,7 @@ public RaftInstance( MEMBER myself, StateStorage termStorage, this.membershipManager = membershipManager; - this.raftState = new RaftState<>( myself, termStorage, membershipManager, entryLog, voteStorage ); + this.state = new RaftState<>( myself, termStorage, membershipManager, entryLog, voteStorage ); leaderNotFoundMonitor = monitors.newMonitor( LeaderNotFoundMonitor.class ); @@ -200,7 +198,7 @@ public void setTargetMembershipSet( Set targetMembers ) if ( currentRole == LEADER ) { - membershipManager.onFollowerStateChange( raftState.followerStates() ); + membershipManager.onFollowerStateChange( state.followerStates() ); } } @@ -236,7 +234,7 @@ public MEMBER getLeader( long timeoutMillis, Predicate predicate ) throw public synchronized void registerListener( Listener listener ) { leaderListeners.add( listener ); - listener.receive( raftState.leader() ); + listener.receive( state.leader() ); } @Override @@ -247,23 +245,16 @@ public synchronized void unregisterListener( Listener listener ) public ReadableRaftState state() { - return raftState; + return state; } private void handleOutcome( Outcome outcome ) throws IOException { - adjustLogShipping( outcome ); - notifyLeaderChanges( outcome ); - - raftState.update( outcome ); - membershipManager.processLog( outcome.getLogCommands() ); - consensusListener.notifyCommitted(); - volatileLeader.set( outcome.getLeader() ); } private void notifyLeaderChanges( Outcome outcome ) { - if ( leaderChanged( outcome, raftState.leader() ) ) + if ( leaderChanged( outcome, state.leader() ) ) { for ( Listener listener : leaderListeners ) { @@ -272,26 +263,22 @@ private void notifyLeaderChanges( Outcome outcome ) } } - private void adjustLogShipping( Outcome outcome ) throws IOException + private void handleLogShipping( Outcome outcome ) throws IOException { - MEMBER oldLeader = raftState.leader(); - - if ( myself.equals( outcome.getLeader() ) ) + LeaderContext leaderContext = new LeaderContext( outcome.getTerm(), outcome.getLeaderCommit() ); + if ( outcome.isElectedLeader() ) { - LeaderContext leaderContext = new LeaderContext( outcome.getTerm(), outcome.getLeaderCommit() ); - - if ( !myself.equals( oldLeader ) ) - { - // We became leader, start the log shipping. - logShipping.start( leaderContext ); - } - - logShipping.handleCommands( outcome.getShipCommands(), leaderContext ); + logShipping.start( leaderContext ); } - else if ( myself.equals( oldLeader ) && !myself.equals( outcome.getLeader() ) ) + else if ( outcome.isSteppingDown() ) { logShipping.stop(); } + + if( outcome.getRole() == LEADER ) + { + logShipping.handleCommands( outcome.getShipCommands(), leaderContext ); + } } private boolean leaderChanged( Outcome outcome, MEMBER oldLeader ) @@ -310,45 +297,56 @@ else if ( oldLeader != null && !oldLeader.equals( outcome.getLeader() ) ) public synchronized void handle( Message incomingMessage ) { - if ( handlingMessage ) - { - throw new IllegalStateException( "recursive use" ); - } - try { - handlingMessage = true; + Outcome outcome = currentRole.handler.handle( + (RaftMessages.RaftMessage) incomingMessage, state, log ); - Outcome outcome = currentRole.handler.handle( (RaftMessages.RaftMessage) incomingMessage, - raftState, log ); + state.update( outcome ); + sendMessages( outcome ); - handleOutcome( outcome ); - currentRole = outcome.getNewRole(); + handleTimers( outcome ); + handleLogShipping( outcome ); - for ( RaftMessages.Directed outgoingMessage : outcome.getOutgoingMessages() ) - { - outbound.send( outgoingMessage.to(), outgoingMessage.message() ); - } - if ( outcome.electionTimeoutRenewed() ) - { - electionTimer.renew(); - } + membershipManager.processLog( outcome.getLogCommands() ); + driveMembership( outcome ); - membershipManager.onRole( currentRole ); + volatileLeader.set( outcome.getLeader() ); - if ( currentRole == LEADER ) - { - membershipManager.onFollowerStateChange( raftState.followerStates() ); - } + raftStateMachine.notifyUpdate(); + notifyLeaderChanges( outcome ); } - catch ( IOException e ) + catch ( Throwable e ) { log.error( "Failed to process RAFT message " + incomingMessage, e ); databaseHealthSupplier.get().panic( e ); } - finally + } + + private void driveMembership( Outcome outcome ) + { + currentRole = outcome.getRole(); + membershipManager.onRole( currentRole ); + + if ( currentRole == LEADER ) + { + membershipManager.onFollowerStateChange( state.followerStates() ); + } + } + + private void handleTimers( Outcome outcome ) + { + if ( outcome.electionTimeoutRenewed() ) + { + electionTimer.renew(); + } + } + + private void sendMessages( Outcome outcome ) + { + for ( RaftMessages.Directed outgoingMessage : outcome.getOutgoingMessages() ) { - handlingMessage = false; + outbound.send( outgoingMessage.to(), outgoingMessage.message() ); } } @@ -389,7 +387,7 @@ public BootstrapException( Throwable cause ) public long term() { - return raftState.term(); + return state.term(); } private long randomTimeoutRange() diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/Outcome.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/Outcome.java index 9af153e1f60cd..37d0224804010 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/Outcome.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/Outcome.java @@ -41,7 +41,7 @@ public class Outcome implements Message { /* Common */ - private Role newRole; + private Role nextRole; private long term; private MEMBER leader; @@ -62,19 +62,21 @@ public class Outcome implements Message /* Leader */ private FollowerStates followerStates; private Collection shipCommands = new ArrayList<>(); + private boolean electedLeader; + private boolean steppingDown; public Outcome( Role currentRole, ReadableRaftState ctx ) { defaults( currentRole, ctx ); } - public Outcome( Role newRole, long term, MEMBER leader, long leaderCommit, MEMBER votedFor, + public Outcome( Role nextRole, long term, MEMBER leader, long leaderCommit, MEMBER votedFor, Set votesForMe, long lastLogIndexBeforeWeBecameLeader, FollowerStates followerStates, boolean renewElectionTimeout, Collection logCommands, Collection> outgoingMessages, Collection shipCommands ) { - this.newRole = newRole; + this.nextRole = nextRole; this.term = term; this.leader = leader; this.leaderCommit = leaderCommit; @@ -91,7 +93,7 @@ public Outcome( Role newRole, long term, MEMBER leader, long leaderCommit, MEMBE private void defaults( Role currentRole, ReadableRaftState ctx ) { - newRole = currentRole; + nextRole = currentRole; term = ctx.term(); leader = ctx.leader(); @@ -109,7 +111,7 @@ private void defaults( Role currentRole, ReadableRaftState ctx ) public void setNextRole( Role nextRole ) { - this.newRole = nextRole; + this.nextRole = nextRole; } public void setNextTerm( long nextTerm ) @@ -167,11 +169,23 @@ public void addShipCommand( ShipCommand shipCommand ) shipCommands.add( shipCommand ); } + public void electedLeader() + { + assert !steppingDown; + this.electedLeader = true; + } + + public void steppingDown() + { + assert !electedLeader; + this.steppingDown = true; + } + @Override public String toString() { return "Outcome{" + - "nextRole=" + newRole + + "nextRole=" + nextRole + ", newTerm=" + term + ", leader=" + leader + ", leaderCommit=" + leaderCommit + @@ -183,12 +197,14 @@ public String toString() ", updatedFollowerStates=" + followerStates + ", renewElectionTimeout=" + renewElectionTimeout + ", outgoingMessages=" + outgoingMessages + + ", electedLeader=" + electedLeader + + ", steppingDown=" + steppingDown + '}'; } - public Role getNewRole() + public Role getRole() { - return newRole; + return nextRole; } public long getTerm() @@ -245,4 +261,14 @@ public Collection getShipCommands() { return shipCommands; } + + public boolean isElectedLeader() + { + return electedLeader; + } + + public boolean isSteppingDown() + { + return steppingDown; + } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Appending.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Appending.java index b9953e550cffe..cc960ef34de20 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Appending.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Appending.java @@ -64,7 +64,7 @@ public static void handleAppendEntriesRequest( long baseIndex = request.prevLogIndex() + 1; int offset; - /* Find possible truncation point. */ + /* Find possible truncation point. */ for ( offset = 0; offset < request.entries().length; offset++ ) { long logTerm = state.entryLog().readEntryTerm( baseIndex + offset ); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Candidate.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Candidate.java index 73e9759ae0dfa..4ec559af6c2ff 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Candidate.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Candidate.java @@ -105,6 +105,7 @@ else if ( res.term() < ctx.term() || !res.voteGranted() ) Appending.appendNewEntry( ctx, outcome, new NewLeaderBarrier() ); outcome.setLastLogIndexBeforeWeBecameLeader( ctx.entryLog().appendIndex() ); + outcome.electedLeader(); outcome.setNextRole( LEADER ); } break; diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Leader.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Leader.java index 74be731b1d55d..3d93191db8781 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Leader.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Leader.java @@ -74,6 +74,7 @@ public Outcome handle( RaftMessages.RaftMessage message break; } + outcome.steppingDown(); outcome.setNextRole( FOLLOWER ); Heart.beat( ctx, outcome, (Heartbeat) message ); break; @@ -105,6 +106,7 @@ else if ( req.leaderTerm() == ctx.term() ) else { // There is a new leader in a later term, we should revert to follower. (ยง5.1) + outcome.steppingDown(); outcome.setNextRole( FOLLOWER ); Appending.handleAppendEntriesRequest( ctx, outcome, req ); break; @@ -123,6 +125,7 @@ else if ( req.leaderTerm() == ctx.term() ) else if ( res.term() > ctx.term() ) { outcome.setNextTerm( res.term() ); + outcome.steppingDown(); outcome.setNextRole( FOLLOWER ); outcome.replaceFollowerStates( new FollowerStates<>() ); break; @@ -181,6 +184,7 @@ else if ( res.term() > ctx.term() ) if ( req.term() > ctx.term() ) { + outcome.steppingDown(); outcome.setNextRole( FOLLOWER ); Voting.handleVoteRequest( ctx, outcome, req ); break; diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/CandidateTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/CandidateTest.java index 4b3418f44311a..be7544ece5e36 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/CandidateTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/CandidateTest.java @@ -73,7 +73,7 @@ public void shouldBeElectedLeaderOnReceivingGrantedVoteResponseWithCurrentTerm() .build(), state, log() ); // then - assertEquals( LEADER, outcome.getNewRole() ); + assertEquals( LEADER, outcome.getRole() ); assertThat( outcome.getLogCommands(), hasItem( new AppendLogEntry( 0, new RaftLogEntry( state.term(), new NewLeaderBarrier() ) ) ) ); } @@ -92,7 +92,7 @@ public void shouldStayAsCandidateOnReceivingDeniedVoteResponseWithCurrentTerm() .build(), state, log() ); // then - assertEquals( CANDIDATE, outcome.getNewRole() ); + assertEquals( CANDIDATE, outcome.getRole() ); } @Test @@ -111,7 +111,7 @@ public void shouldUpdateTermOnReceivingVoteResponseWithLaterTerm() throws Except .build(), state, log() ); // then - assertEquals( FOLLOWER, outcome.getNewRole() ); + assertEquals( FOLLOWER, outcome.getRole() ); assertEquals( voterTerm, outcome.getTerm() ); } @@ -131,7 +131,7 @@ public void shouldRejectVoteResponseWithOldTerm() throws Exception .build(), state, log() ); // then - assertEquals( CANDIDATE, outcome.getNewRole() ); + assertEquals( CANDIDATE, outcome.getRole() ); } public RaftState newState() throws IOException diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/FollowerTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/FollowerTest.java index 32ebc01e482cd..9c5242b7923a2 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/FollowerTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/FollowerTest.java @@ -79,7 +79,7 @@ public void followerShouldTransitToCandidateAndInstigateAnElectionAfterTimeout() state.update( outcome ); // then - assertEquals( CANDIDATE, outcome.getNewRole() ); + assertEquals( CANDIDATE, outcome.getRole() ); assertNotNull( messageFor( outcome, member1 ) ); assertNotNull( messageFor( outcome, member2 ) ); @@ -100,7 +100,7 @@ public void shouldBecomeCandidateOnReceivingElectionTimeoutMessage() throws Exce Outcome outcome = follower.handle( new Election<>( myself ), state, log() ); // then - assertEquals( CANDIDATE, outcome.getNewRole() ); + assertEquals( CANDIDATE, outcome.getRole() ); } @Test diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/LeaderTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/LeaderTest.java index 148aed71a505c..9b84dafe27be9 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/LeaderTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/LeaderTest.java @@ -42,7 +42,6 @@ import org.neo4j.coreedge.raft.state.follower.FollowerState; import org.neo4j.coreedge.raft.state.follower.FollowerStates; import org.neo4j.coreedge.server.RaftTestMember; -import org.neo4j.helpers.collection.Iterables; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; import org.neo4j.logging.NullLogProvider; @@ -54,12 +53,16 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; + import static org.neo4j.coreedge.raft.MessageUtils.messageFor; import static org.neo4j.coreedge.raft.TestMessageBuilders.appendEntriesResponse; import static org.neo4j.coreedge.raft.roles.Role.FOLLOWER; import static org.neo4j.coreedge.raft.state.RaftStateBuilder.raftState; import static org.neo4j.coreedge.server.RaftTestMember.member; +import static org.neo4j.helpers.collection.Iterables.count; +import static org.neo4j.helpers.collection.Iterables.firstOrNull; import static org.neo4j.helpers.collection.Iterators.asSet; +import static org.neo4j.helpers.collection.Iterables.single; @RunWith(MockitoJUnitRunner.class) public class LeaderTest @@ -339,9 +342,9 @@ public void leaderShouldRejectAppendEntriesResponseWithNewerTermAndBecomeAFollow Outcome outcome = leader.handle( message, state, log() ); // then - assertEquals( 0, Iterables.count( outcome.getOutgoingMessages() ) ); - assertEquals( FOLLOWER, outcome.getNewRole() ); - assertEquals( 0, Iterables.count( outcome.getLogCommands() ) ); + assertEquals( 0, count( outcome.getOutgoingMessages() ) ); + assertEquals( FOLLOWER, outcome.getRole() ); + assertEquals( 0, count( outcome.getLogCommands() ) ); assertEquals( state.term() + 1, outcome.getTerm() ); } @@ -376,18 +379,19 @@ public void leaderShouldDecideToAppendToItsLogAndSendAppendEntriesMessageOnRecei Leader leader = new Leader(); - RaftMessages.NewEntry.Request newEntryRequest = new RaftMessages.NewEntry.Request<>( member( 9 ), CONTENT ); + RaftMessages.NewEntry.Request newEntryRequest = new RaftMessages.NewEntry.Request<>( member( + 9 ), CONTENT ); // when Outcome outcome = leader.handle( newEntryRequest, state, log() ); //state.update( outcome ); // then - AppendLogEntry logCommand = (AppendLogEntry) Iterables.single( outcome.getLogCommands() ); + AppendLogEntry logCommand = (AppendLogEntry) single( outcome.getLogCommands() ); assertEquals( 0, logCommand.index ); assertEquals( 0, logCommand.entry.term() ); - ShipCommand.NewEntry shipCommand = (ShipCommand.NewEntry) Iterables.single( outcome.getShipCommands() ); + ShipCommand.NewEntry shipCommand = (ShipCommand.NewEntry) single( outcome.getShipCommands() ); assertEquals( shipCommand, new ShipCommand.NewEntry( -1, -1, new RaftLogEntry( 0, CONTENT ) ) ); } @@ -416,7 +420,7 @@ public void leaderShouldCommitOnMajorityResponse() throws Exception new RaftMessages.AppendEntries.Response<>( member1, 0, true, 0, 0 ), state, log() ); // then - assertThat( Iterables.firstOrNull( outcome.getLogCommands() ), instanceOf( CommitCommand.class ) ); + assertThat( firstOrNull( outcome.getLogCommands() ), instanceOf( CommitCommand.class ) ); assertEquals( 0, outcome.getLeaderCommit() ); } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/NonFollowerVoteRequestTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/NonFollowerVoteRequestTest.java index bc9819b9e670b..f05b89510eb60 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/NonFollowerVoteRequestTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/NonFollowerVoteRequestTest.java @@ -71,7 +71,7 @@ public void shouldRejectVoteRequestFromCurrentTerm() throws Exception // then assertFalse( ((RaftMessages.Vote.Response) messageFor( outcome, member1 )).voteGranted() ); - assertEquals( role, outcome.getNewRole() ); + assertEquals( role, outcome.getRole() ); } @Test @@ -88,7 +88,7 @@ public void shouldRejectVoteRequestFromPreviousTerm() throws Exception // then assertFalse( ((RaftMessages.Vote.Response) messageFor( outcome, member1 )).voteGranted() ); - assertEquals( role, outcome.getNewRole() ); + assertEquals( role, outcome.getRole() ); } public RaftState newState() throws IOException diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/VoteRequestTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/VoteRequestTest.java index 7fe852427472f..e00274e23a130 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/VoteRequestTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/VoteRequestTest.java @@ -94,7 +94,7 @@ public void shouldDenyForCandidateInPreviousTerm() throws Exception // then assertFalse( ((RaftMessages.Vote.Response) messageFor( outcome, member1 )).voteGranted() ); - assertEquals( role, outcome.getNewRole() ); + assertEquals( role, outcome.getRole() ); } @Test @@ -134,7 +134,7 @@ public void shouldStayInCurrentRoleOnRequestFromCurrentTerm() throws Exception .lastLogTerm( -1 ).build(), state, log() ); // then - assertEquals( role, outcome.getNewRole() ); + assertEquals( role, outcome.getRole() ); } @Test @@ -151,7 +151,7 @@ public void shouldMoveToFollowerIfRequestIsFromLaterTerm() throws Exception .lastLogTerm( -1 ).build(), state, log() ); // then - assertEquals( Role.FOLLOWER, outcome.getNewRole() ); + assertEquals( Role.FOLLOWER, outcome.getRole() ); } @Test diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/explorer/action/ProcessMessage.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/explorer/action/ProcessMessage.java index 82bcfbb532399..cd9c444285f28 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/explorer/action/ProcessMessage.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/explorer/action/ProcessMessage.java @@ -64,7 +64,7 @@ public ClusterState advance( ClusterState previous ) throws IOException newClusterState.queues.put( outgoingMessage.to(), outboundQueue ); } - newClusterState.roles.put( member, outcome.getNewRole() ); + newClusterState.roles.put( member, outcome.getRole() ); newClusterState.states.put( member, newMemberState ); newClusterState.queues.put( member, inboundQueue ); return newClusterState;