diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java
index 88d85f45c2ff2..3cffbf4589a6c 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java
@@ -312,7 +312,6 @@ public synchronized void handle( RaftMessages.RaftMessage incomingMessage )
handleTimers( outcome );
handleLogShipping( outcome );
- membershipManager.processLog( outcome.getCommitIndex(), outcome.getLogCommands() );
driveMembership( outcome );
volatileLeader.set( outcome.getLeader() );
@@ -334,8 +333,10 @@ public synchronized void handle( RaftMessages.RaftMessage incomingMessage )
}
}
- private void driveMembership( Outcome outcome )
+ private void driveMembership( Outcome outcome ) throws IOException
{
+ membershipManager.processLog( outcome.getCommitIndex(), outcome.getLogCommands() );
+
currentRole = outcome.getRole();
membershipManager.onRole( currentRole );
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/MembershipDriver.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/MembershipDriver.java
deleted file mode 100644
index 92cb3e7cd49e3..0000000000000
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/MembershipDriver.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright (c) 2002-2016 "Neo Technology,"
- * Network Engine for Objects in Lund AB [http://neotechnology.com]
- *
- * This file is part of Neo4j.
- *
- * Neo4j is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of the
- * License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-package org.neo4j.coreedge.raft.membership;
-
-import java.util.Set;
-
-import org.neo4j.coreedge.server.CoreMember;
-
-public interface MembershipDriver
-{
- void doConsensus( Set newVotingMemberSet );
-
- boolean uncommittedMemberChangeInLog();
-
- void stateChanged();
-}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/RaftMembership.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/RaftMembership.java
index 8860394a64a85..86f0557ff446a 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/RaftMembership.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/RaftMembership.java
@@ -40,8 +40,6 @@ public interface RaftMembership
*/
Set replicationMembers();
- long logIndex();
-
/**
* Register a membership listener.
*
@@ -49,8 +47,6 @@ public interface RaftMembership
*/
void registerListener( RaftMembership.Listener listener );
- void deregisterListener( RaftMembership.Listener listener );
-
/**
* This interface must be implemented from whoever wants to be notified of membership changes. Membership changes
* are additions to and removals from the voting and replication members set.
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/RaftMembershipStateMachine.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/RaftMembershipChanger.java
similarity index 87%
rename from enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/RaftMembershipStateMachine.java
rename to enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/RaftMembershipChanger.java
index 0c8a1898ee211..3fa49aa39fc22 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/RaftMembershipStateMachine.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/RaftMembershipChanger.java
@@ -26,7 +26,6 @@
import org.neo4j.coreedge.raft.log.ReadableRaftLog;
import org.neo4j.coreedge.raft.roles.Role;
import org.neo4j.coreedge.raft.state.follower.FollowerStates;
-import org.neo4j.coreedge.raft.state.membership.RaftMembershipState;
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
@@ -59,7 +58,7 @@
*
* Only a single member change is handled at a time.
*/
-class RaftMembershipStateMachine
+class RaftMembershipChanger
{
private final Log log;
public RaftMembershipStateMachineEventHandler state = new Inactive();
@@ -68,22 +67,19 @@ class RaftMembershipStateMachine
private final Clock clock;
private final long electionTimeout;
- private final MembershipDriver membershipDriver;
+ private final RaftMembershipManager membershipManager;
private long catchupTimeout;
- private final RaftMembershipState membershipState;
private CoreMember catchingUpMember;
- RaftMembershipStateMachine( ReadableRaftLog raftLog, Clock clock, long electionTimeout,
- MembershipDriver membershipDriver, LogProvider logProvider,
- long catchupTimeout, RaftMembershipState membershipState )
+ RaftMembershipChanger( ReadableRaftLog raftLog, Clock clock, long electionTimeout,
+ LogProvider logProvider, long catchupTimeout, RaftMembershipManager membershipManager )
{
this.raftLog = raftLog;
this.clock = clock;
this.electionTimeout = electionTimeout;
- this.membershipDriver = membershipDriver;
this.catchupTimeout = catchupTimeout;
- this.membershipState = membershipState;
+ this.membershipManager = membershipManager;
this.log = logProvider.getLog( getClass() );
}
@@ -98,7 +94,7 @@ private synchronized void handleState( RaftMembershipStateMachineEventHandler ne
newState.onEntry();
log.info( newState.toString() );
- membershipDriver.stateChanged();
+ membershipManager.stateChanged();
}
}
@@ -139,7 +135,7 @@ public RaftMembershipStateMachineEventHandler onRole( Role role )
{
if ( role == Role.LEADER )
{
- if ( membershipDriver.uncommittedMemberChangeInLog() )
+ if ( membershipManager.uncommittedMemberChangeInLog() )
{
return new ConsensusInProgress();
}
@@ -185,9 +181,9 @@ public RaftMembershipStateMachineEventHandler onMissingMember( CoreMember member
@Override
public RaftMembershipStateMachineEventHandler onSuperfluousMember( CoreMember member )
{
- Set updatedVotingMembers = new HashSet<>( membershipState.votingMembers() );
+ Set updatedVotingMembers = new HashSet<>( membershipManager.votingMembers() );
updatedVotingMembers.remove( member );
- membershipDriver.doConsensus( updatedVotingMembers );
+ membershipManager.doConsensus( updatedVotingMembers );
return new ConsensusInProgress();
}
@@ -213,16 +209,16 @@ private class CatchingUp extends ActiveBaseState
@Override
public void onEntry()
{
- membershipState.addAdditionalReplicationMember( catchingUpMember );
+ membershipManager.addAdditionalReplicationMember( catchingUpMember );
log.info( "Adding replication member: " + catchingUpMember );
}
@Override
public void onExit()
{
- if( !movingToConsensus )
+ if ( !movingToConsensus )
{
- membershipState.removeAdditionalReplicationMember( catchingUpMember );
+ membershipManager.removeAdditionalReplicationMember( catchingUpMember );
log.info( "Removing replication member: " + catchingUpMember );
}
}
@@ -249,9 +245,9 @@ public RaftMembershipStateMachineEventHandler onFollowerStateChange( FollowerSta
{
if ( catchupGoalTracker.isGoalAchieved() )
{
- Set updatedVotingMembers = new HashSet<>( membershipState.votingMembers() );
+ Set updatedVotingMembers = new HashSet<>( membershipManager.votingMembers() );
updatedVotingMembers.add( catchingUpMember );
- membershipDriver.doConsensus( updatedVotingMembers );
+ membershipManager.doConsensus( updatedVotingMembers );
movingToConsensus = true;
return new ConsensusInProgress();
@@ -301,7 +297,7 @@ public void onEntry()
@Override
public void onExit()
{
- membershipState.removeAdditionalReplicationMember( catchingUpMember );
+ membershipManager.removeAdditionalReplicationMember( catchingUpMember );
log.info( "Removing replication member: " + catchingUpMember );
}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/RaftMembershipManager.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/RaftMembershipManager.java
index 4724533eed194..b6bc3f69796dc 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/RaftMembershipManager.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/membership/RaftMembershipManager.java
@@ -22,25 +22,20 @@
import java.io.IOException;
import java.time.Clock;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
-import org.neo4j.coreedge.raft.log.RaftLog;
import org.neo4j.coreedge.raft.log.RaftLogCursor;
+import org.neo4j.coreedge.raft.log.RaftLogEntry;
import org.neo4j.coreedge.raft.log.ReadableRaftLog;
-import org.neo4j.coreedge.raft.outcome.AppendLogEntry;
-import org.neo4j.coreedge.raft.outcome.BatchAppendLogEntries;
-import org.neo4j.coreedge.raft.outcome.LogCommand;
-import org.neo4j.coreedge.raft.outcome.TruncateLogCommand;
-import org.neo4j.coreedge.raft.replication.ReplicatedContent;
+import org.neo4j.coreedge.raft.outcome.RaftLogCommand;
import org.neo4j.coreedge.raft.replication.SendToMyself;
import org.neo4j.coreedge.raft.roles.Role;
import org.neo4j.coreedge.raft.state.StateStorage;
import org.neo4j.coreedge.raft.state.follower.FollowerStates;
import org.neo4j.coreedge.raft.state.membership.RaftMembershipState;
import org.neo4j.coreedge.server.CoreMember;
-import org.neo4j.helpers.collection.Pair;
+import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
@@ -53,162 +48,62 @@
* - raft membership state machine
* - raft log events
*/
-public class RaftMembershipManager implements RaftMembership, MembershipDriver
+public class RaftMembershipManager extends LifecycleAdapter implements RaftMembership, RaftLogCommand.Handler
{
- private RaftMembershipStateMachine membershipStateMachine;
+ private RaftMembershipChanger membershipChanger;
private Set targetMembers = null;
- private int uncommittedMemberChanges = 0;
-
- private final SendToMyself replicator;
+ private final SendToMyself sendToMyself;
private final RaftGroup.Builder memberSetBuilder;
- private final ReadableRaftLog entryLog;
+ private final ReadableRaftLog raftLog;
private final Log log;
- private final int expectedClusterSize;
- private final StateStorage stateStorage;
- private final RaftMembershipState raftMembershipState;
- private long lastApplied = -1;
-
- public RaftMembershipManager( SendToMyself replicator, RaftGroup.Builder memberSetBuilder, RaftLog entryLog,
- LogProvider logProvider, int expectedClusterSize, long electionTimeout, Clock clock, long catchupTimeout,
- StateStorage stateStorage )
- {
- this.replicator = replicator;
- this.memberSetBuilder = memberSetBuilder;
- this.entryLog = entryLog;
- this.expectedClusterSize = expectedClusterSize;
- this.stateStorage = stateStorage;
- this.raftMembershipState = stateStorage.getInitialState();
- this.log = logProvider.getLog( getClass() );
+ private final long recoverFromIndex;
- this.membershipStateMachine = new RaftMembershipStateMachine( entryLog, clock, electionTimeout, this,
- logProvider, catchupTimeout, raftMembershipState );
- }
-
- public void processLog( long commitIndex, Collection logCommands ) throws IOException
- {
- for ( LogCommand logCommand : logCommands )
- {
- if ( logCommand instanceof TruncateLogCommand )
- {
- onTruncated( commitIndex );
- }
- if ( logCommand instanceof AppendLogEntry )
- {
- AppendLogEntry command = (AppendLogEntry) logCommand;
- onAppended( command.entry.content(), command.index );
- }
- if ( logCommand instanceof BatchAppendLogEntries )
- {
- BatchAppendLogEntries command = (BatchAppendLogEntries) logCommand;
- for ( int i = command.offset; i < command.entries.length; i++ )
- {
- onAppended( command.entries[i].content(), command.baseIndex + i );
- }
- }
- }
- if ( commitIndex > lastApplied )
- {
- long index = lastApplied + 1;
- try ( RaftLogCursor entryCursor = entryLog.getEntryCursor( index ) )
- {
- while ( entryCursor.next() )
- {
- if ( index == commitIndex + 1 )
- {
- break;
- }
- ReplicatedContent content = entryCursor.get().content();
- onCommitted( content, index );
- index++;
- }
- }
- lastApplied = commitIndex;
- }
- }
+ private final StateStorage storage;
+ private final RaftMembershipState state;
- private void onAppended( ReplicatedContent content, long logIndex )
- {
- if ( content instanceof RaftGroup )
- {
- if ( logIndex > raftMembershipState.logIndex() )
- {
- assert uncommittedMemberChanges >= 0;
+ private final int expectedClusterSize;
- uncommittedMemberChanges++;
+ private volatile Set votingMembers = new HashSet<>();
+ private volatile Set replicationMembers = new HashSet<>(); // votingMembers + additionalReplicationMembers
- RaftGroup raftGroup = (RaftGroup) content;
- raftMembershipState.setVotingMembers( raftGroup.getMembers() );
- }
- else
- {
- log.info( "Ignoring content at index %d, since already appended up to %d",
- logIndex, raftMembershipState.logIndex() );
- }
- }
- }
+ private Set listeners = new HashSet<>();
+ private Set additionalReplicationMembers = new HashSet<>();
- private void onCommitted( ReplicatedContent content, long logIndex ) throws IOException
+ public RaftMembershipManager( SendToMyself sendToMyself, RaftGroup.Builder memberSetBuilder,
+ ReadableRaftLog raftLog, LogProvider logProvider, int expectedClusterSize, long electionTimeout,
+ Clock clock, long catchupTimeout, StateStorage membershipStorage, long recoverFromIndex )
{
- if ( content instanceof RaftGroup )
- {
- if ( logIndex > raftMembershipState.logIndex() )
- {
- assert uncommittedMemberChanges > 0;
-
- uncommittedMemberChanges--;
+ this.sendToMyself = sendToMyself;
+ this.memberSetBuilder = memberSetBuilder;
+ this.raftLog = raftLog;
+ this.expectedClusterSize = expectedClusterSize;
+ this.storage = membershipStorage;
+ this.state = membershipStorage.getInitialState();
- if ( uncommittedMemberChanges == 0 )
- {
- membershipStateMachine.onRaftGroupCommitted();
- }
- raftMembershipState.logIndex( logIndex );
- stateStorage.persistStoreData( raftMembershipState );
- }
- else
- {
- log.info( "Ignoring content at index %d, since already committed up to %d",
- logIndex, raftMembershipState.logIndex() );
- }
- }
+ this.log = logProvider.getLog( getClass() );
+ this.recoverFromIndex = recoverFromIndex;
+ this.membershipChanger = new RaftMembershipChanger( raftLog, clock,
+ electionTimeout, logProvider, catchupTimeout, this );
}
- private void onTruncated( long commitIndex ) throws IOException
+ @Override
+ public void start() throws Throwable
{
- Pair> lastMembershipEntry = findLastMembershipEntry();
+ log.info( "Membership state before recovery: " + state );
+ log.info( "Recovering from: " + recoverFromIndex + " to: " + raftLog.appendIndex() );
- if ( lastMembershipEntry != null )
- {
- raftMembershipState.setVotingMembers( lastMembershipEntry.other().getMembers() );
- raftMembershipState.logIndex( lastMembershipEntry.first() );
- stateStorage.persistStoreData( raftMembershipState );
- uncommittedMemberChanges = lastMembershipEntry.first() <= commitIndex ? 0 : 1;
- }
- else
- {
- raftMembershipState.setVotingMembers( Collections.emptySet() );
- uncommittedMemberChanges = 0;
- }
- }
-
- private Pair> findLastMembershipEntry() throws IOException
- {
- Pair> lastMembershipEntry = null;
- long index = 0;
- try ( RaftLogCursor cursor = entryLog.getEntryCursor( index ) )
+ try ( RaftLogCursor cursor = raftLog.getEntryCursor( recoverFromIndex ) )
{
while ( cursor.next() )
{
- ReplicatedContent content = cursor.get().content();
- if ( content instanceof RaftGroup )
- {
- lastMembershipEntry = Pair.of( index, (RaftGroup) content );
- }
- index++;
+ append( cursor.index(), cursor.get() );
}
}
- return lastMembershipEntry;
+
+ log.info( "Membership state after recovery: " + state );
+ updateMemberSets();
}
public void setTargetMembershipSet( Set targetMembers )
@@ -216,7 +111,7 @@ public void setTargetMembershipSet( Set targetMembers )
this.targetMembers = new HashSet<>( targetMembers );
log.info( "Target membership: " + targetMembers );
- membershipStateMachine.onTargetChanged( targetMembers );
+ membershipChanger.onTargetChanged( targetMembers );
checkForStartCondition();
}
@@ -233,6 +128,46 @@ private Set missingMembers()
return missingMembers;
}
+ /**
+ * All the externally published sets are derived from the committed and appended sets.
+ */
+ private void updateMemberSets()
+ {
+ votingMembers = state.getLatest();
+
+ HashSet newReplicationMembers = new HashSet<>( votingMembers );
+ newReplicationMembers.addAll( additionalReplicationMembers );
+
+ replicationMembers = newReplicationMembers;
+ notifyListeners();
+ }
+
+ /**
+ * Adds an additional member to replicate to. Members that are joining need to
+ * catch up sufficiently before they become part of the voting group.
+ *
+ * @param member The member which will be added to the replication group.
+ */
+ void addAdditionalReplicationMember( CoreMember member )
+ {
+ additionalReplicationMembers.add( member );
+ updateMemberSets();
+ }
+
+ /**
+ * Removes a member previously part of the additional replication member group.
+ *
+ * This either happens because they caught up sufficiently and became part of the
+ * voting group or because they failed to catch up in time.
+ *
+ * @param member The member to remove from the replication group.
+ */
+ void removeAdditionalReplicationMember( CoreMember member )
+ {
+ additionalReplicationMembers.remove( member );
+ updateMemberSets();
+ }
+
private boolean isSafeToRemoveMember()
{
return votingMembers() != null && votingMembers().size() > expectedClusterSize;
@@ -254,69 +189,124 @@ private void checkForStartCondition()
{
if ( missingMembers().size() > 0 )
{
- membershipStateMachine.onMissingMember( first( missingMembers() ) );
+ membershipChanger.onMissingMember( first( missingMembers() ) );
}
else if ( isSafeToRemoveMember() && superfluousMembers().size() > 0 )
{
- membershipStateMachine.onSuperfluousMember( first( superfluousMembers() ) );
+ membershipChanger.onSuperfluousMember( first( superfluousMembers() ) );
}
}
- @Override
- public void doConsensus( Set newVotingMemberSet )
+ /**
+ * Used by the membership changer for getting consensus on a new set of members.
+ *
+ * @param newVotingMemberSet The new set of members.
+ */
+ void doConsensus( Set newVotingMemberSet )
{
- replicator.replicate( memberSetBuilder.build( newVotingMemberSet ) );
+ sendToMyself.replicate( memberSetBuilder.build( newVotingMemberSet ) );
}
- @Override
- public boolean uncommittedMemberChangeInLog()
- {
- return uncommittedMemberChanges > 0;
- }
-
- @Override
- public void stateChanged()
+ /**
+ * Called by the membership changer when it has changed state and in response
+ * the membership manager potentially feeds it back with an event to start
+ * a new membership change operation.
+ */
+ void stateChanged()
{
checkForStartCondition();
}
public void onFollowerStateChange( FollowerStates followerStates )
{
- membershipStateMachine.onFollowerStateChange( followerStates );
+ membershipChanger.onFollowerStateChange( followerStates );
}
public void onRole( Role role )
{
- membershipStateMachine.onRole( role );
+ membershipChanger.onRole( role );
}
@Override
public Set votingMembers()
{
- return raftMembershipState.votingMembers();
+ return votingMembers;
}
@Override
public Set replicationMembers()
{
- return raftMembershipState.replicationMembers();
+ return replicationMembers;
}
@Override
- public long logIndex()
+ public synchronized void registerListener( Listener listener )
+ {
+ listeners.add( listener );
+ }
+
+ private synchronized void notifyListeners()
{
- return raftMembershipState.logIndex();
+ listeners.forEach( Listener::onMembershipChanged );
+ }
+
+ boolean uncommittedMemberChangeInLog()
+ {
+ return state.uncommittedMemberChangeInLog();
+ }
+
+ public void processLog( long commitIndex, Collection logCommands ) throws IOException
+ {
+ for ( RaftLogCommand logCommand : logCommands )
+ {
+ logCommand.dispatch( this );
+ }
+
+ if ( state.commit( commitIndex ) )
+ {
+ membershipChanger.onRaftGroupCommitted();
+ storage.persistStoreData( state );
+ updateMemberSets();
+ }
}
@Override
- public void registerListener( Listener listener )
+ public void append( long baseIndex, RaftLogEntry... entries ) throws IOException
{
- raftMembershipState.registerListener( listener );
+ /* The warnings in this method are rarely expected occurrences which warrant to be logged with significance. */
+
+ for ( RaftLogEntry entry : entries )
+ {
+ if ( entry.content() instanceof RaftGroup )
+ {
+ RaftGroup raftGroup = (RaftGroup) entry.content();
+
+ if ( state.uncommittedMemberChangeInLog() )
+ {
+ log.warn( "Appending with uncommitted membership change in log" );
+ }
+
+ if ( state.append( baseIndex, new HashSet<>( raftGroup.getMembers() ) ) )
+ {
+ storage.persistStoreData( state );
+ updateMemberSets();
+ }
+ else
+ {
+ log.warn( "Appending member set was ignored. Current state: %s, Appended set: %s, Log index: %d%n", state, raftGroup, baseIndex );
+ }
+ }
+ baseIndex++;
+ }
}
@Override
- public void deregisterListener( Listener listener )
+ public void truncate( long fromIndex ) throws IOException
{
- raftMembershipState.deregisterListener( listener );
+ if ( state.truncate( fromIndex ) )
+ {
+ storage.persistStoreData( state );
+ updateMemberSets();
+ }
}
}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/AppendLogEntry.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/AppendLogEntry.java
index 409de6cbf0158..b48f9f9bfb1b2 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/AppendLogEntry.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/AppendLogEntry.java
@@ -26,7 +26,7 @@
import org.neo4j.coreedge.raft.log.RaftLogEntry;
import org.neo4j.coreedge.raft.log.segmented.InFlightMap;
-public class AppendLogEntry implements LogCommand
+public class AppendLogEntry implements RaftLogCommand
{
public final long index;
public final RaftLogEntry entry;
@@ -53,6 +53,12 @@ public void applyTo( InFlightMap inFlightMap ) throws IOExce
inFlightMap.register( index, entry );
}
+ @Override
+ public void dispatch( Handler handler ) throws IOException
+ {
+ handler.append( index, entry );
+ }
+
@Override
public boolean equals( Object o )
{
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/BatchAppendLogEntries.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/BatchAppendLogEntries.java
index 1e998eb78b5ad..a86ac01decaf4 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/BatchAppendLogEntries.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/BatchAppendLogEntries.java
@@ -29,7 +29,7 @@
import static java.lang.String.format;
-public class BatchAppendLogEntries implements LogCommand
+public class BatchAppendLogEntries implements RaftLogCommand
{
public final long baseIndex;
public final int offset;
@@ -42,6 +42,12 @@ public BatchAppendLogEntries( long baseIndex, int offset, RaftLogEntry[] entries
this.entries = entries;
}
+ @Override
+ public void dispatch( Handler handler ) throws IOException
+ {
+ handler.append( baseIndex + offset, Arrays.copyOfRange( entries, offset, entries.length ) );
+ }
+
@Override
public void applyTo( RaftLog raftLog ) throws IOException
{
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/Outcome.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/Outcome.java
index bf58733b5ccec..1f33745b65b3b 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/Outcome.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/Outcome.java
@@ -49,7 +49,7 @@ public class Outcome implements Message
private long leaderCommit;
- private Collection logCommands = new ArrayList<>();
+ private Collection logCommands = new ArrayList<>();
private Collection outgoingMessages = new ArrayList<>();
private long commitIndex;
@@ -77,7 +77,7 @@ public Outcome( Role currentRole, ReadableRaftState ctx )
public Outcome( Role nextRole, long term, CoreMember leader, long leaderCommit, CoreMember votedFor,
Set votesForMe, long lastLogIndexBeforeWeBecameLeader,
FollowerStates followerStates, boolean renewElectionTimeout,
- Collection logCommands, Collection outgoingMessages,
+ Collection logCommands, Collection outgoingMessages,
Collection shipCommands, long commitIndex )
{
this.nextRole = nextRole;
@@ -137,7 +137,7 @@ public void setLeaderCommit( long leaderCommit )
this.leaderCommit = leaderCommit;
}
- public void addLogCommand( LogCommand logCommand )
+ public void addLogCommand( RaftLogCommand logCommand )
{
this.logCommands.add( logCommand );
}
@@ -236,7 +236,7 @@ public long getLeaderCommit()
return leaderCommit;
}
- public Collection getLogCommands()
+ public Collection getLogCommands()
{
return logCommands;
}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/LogCommand.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/RaftLogCommand.java
similarity index 81%
rename from enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/LogCommand.java
rename to enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/RaftLogCommand.java
index 2163fa660de63..abaf6935f3df3 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/LogCommand.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/RaftLogCommand.java
@@ -25,8 +25,16 @@
import org.neo4j.coreedge.raft.log.RaftLogEntry;
import org.neo4j.coreedge.raft.log.segmented.InFlightMap;
-public interface LogCommand
+public interface RaftLogCommand
{
+ interface Handler
+ {
+ void append( long baseIndex, RaftLogEntry... entries ) throws IOException;
+ void truncate( long fromIndex ) throws IOException;
+ }
+
+ void dispatch( Handler handler ) throws IOException;
+
void applyTo( RaftLog raftLog ) throws IOException;
void applyTo( InFlightMap inFlightMap ) throws IOException;
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/TruncateLogCommand.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/TruncateLogCommand.java
index 81a9085de95b8..1e832221d6e25 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/TruncateLogCommand.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/TruncateLogCommand.java
@@ -26,7 +26,7 @@
import org.neo4j.coreedge.raft.log.RaftLogEntry;
import org.neo4j.coreedge.raft.log.segmented.InFlightMap;
-public class TruncateLogCommand implements LogCommand
+public class TruncateLogCommand implements RaftLogCommand
{
public final long fromIndex;
@@ -35,6 +35,12 @@ public TruncateLogCommand( long fromIndex )
this.fromIndex = fromIndex;
}
+ @Override
+ public void dispatch( Handler handler ) throws IOException
+ {
+ handler.truncate( fromIndex );
+ }
+
@Override
public void applyTo( RaftLog raftLog ) throws IOException
{
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/token/ReplicatedTokenStateMachine.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/token/ReplicatedTokenStateMachine.java
index 3b4727d5a82b0..95b0fa2cf2f6b 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/token/ReplicatedTokenStateMachine.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/token/ReplicatedTokenStateMachine.java
@@ -42,7 +42,6 @@
import org.neo4j.storageengine.api.TransactionApplicationMode;
import static java.lang.String.format;
-
import static org.neo4j.coreedge.raft.replication.tx.LogIndexTxHeaderEncoding.encodeLogIndexAsTxHeader;
public class ReplicatedTokenStateMachine implements StateMachine
@@ -141,6 +140,11 @@ public synchronized void flush() throws IOException
@Override
public long lastAppliedIndex()
{
+ if ( commitProcess == null )
+ {
+ /** See {@link #installCommitProcess}. */
+ throw new IllegalStateException( "Value has not been installed" );
+ }
return lastCommittedIndex;
}
}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/LastCommittedIndexFinder.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/LastCommittedIndexFinder.java
index b835d8984d956..8ae5ff6811556 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/LastCommittedIndexFinder.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/LastCommittedIndexFinder.java
@@ -38,8 +38,8 @@ public class LastCommittedIndexFinder
private final LogicalTransactionStore transactionStore;
private final Log log;
- public LastCommittedIndexFinder( TransactionIdStore transactionIdStore, LogicalTransactionStore transactionStore,
- LogProvider logProvider )
+ public LastCommittedIndexFinder( TransactionIdStore transactionIdStore,
+ LogicalTransactionStore transactionStore, LogProvider logProvider )
{
this.transactionIdStore = transactionIdStore;
this.transactionStore = transactionStore;
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachine.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachine.java
index c6ff2f94903b4..1228df888ea77 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachine.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachine.java
@@ -112,6 +112,11 @@ public void flush() throws IOException
@Override
public long lastAppliedIndex()
{
+ if ( queue == null )
+ {
+ /** See {@link #installCommitProcess}. */
+ throw new IllegalStateException( "Value has not been installed" );
+ }
return lastCommittedIndex;
}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java
index 11f4d111cc0af..c9959b6811000 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java
@@ -287,7 +287,12 @@ public synchronized void start() throws IOException, InterruptedException
log.info( format( "Restoring last applied index to %d", lastApplied ) );
sessionState = sessionStorage.getInitialState();
- submitApplyJob( coreStateMachines.getApplyingIndex() );
+ /* Considering the order in which state is flushed, the state machines will
+ * always be furthest ahead and indicate the furthest possible state to
+ * which we must replay to reach a consistent state. */
+ long lastPossiblyApplying = coreStateMachines.getLastAppliedIndex();
+
+ submitApplyJob( lastPossiblyApplying );
applier.sync( false );
}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreStateMachines.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreStateMachines.java
index 92a7b638fff45..1fca0d0f8a9a9 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreStateMachines.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreStateMachines.java
@@ -200,7 +200,7 @@ public void close()
}
}
- public long getApplyingIndex()
+ long getLastAppliedIndex()
{
long lastAppliedTxIndex = replicatedTxStateMachine.lastAppliedIndex();
assert lastAppliedTxIndex == labelTokenStateMachine.lastAppliedIndex();
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/LongIndexMarshal.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/LongIndexMarshal.java
index 4c48af0ac837d..0a43c79bbb490 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/LongIndexMarshal.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/LongIndexMarshal.java
@@ -48,7 +48,7 @@ public void marshal( Long index, WritableChannel channel ) throws IOException
}
@Override
- protected Long unmarshal0( ReadableChannel channel ) throws IOException, EndOfStreamException
+ protected Long unmarshal0( ReadableChannel channel ) throws IOException
{
return channel.getLong();
}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/RaftState.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/RaftState.java
index 3b20b3b3a8d95..2b37c7b5424f9 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/RaftState.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/RaftState.java
@@ -28,7 +28,7 @@
import org.neo4j.coreedge.raft.log.ReadableRaftLog;
import org.neo4j.coreedge.raft.log.segmented.InFlightMap;
import org.neo4j.coreedge.raft.membership.RaftMembership;
-import org.neo4j.coreedge.raft.outcome.LogCommand;
+import org.neo4j.coreedge.raft.outcome.RaftLogCommand;
import org.neo4j.coreedge.raft.outcome.Outcome;
import org.neo4j.coreedge.raft.state.follower.FollowerStates;
import org.neo4j.coreedge.raft.state.term.TermState;
@@ -164,7 +164,7 @@ public void update( Outcome outcome ) throws IOException
lastLogIndexBeforeWeBecameLeader = outcome.getLastLogIndexBeforeWeBecameLeader();
followerStates = outcome.getFollowerStates();
- for ( LogCommand logCommand : outcome.getLogCommands() )
+ for ( RaftLogCommand logCommand : outcome.getLogCommands() )
{
logCommand.applyTo( entryLog );
logCommand.applyTo( inFlightMap );
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/membership/MembershipEntry.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/membership/MembershipEntry.java
new file mode 100644
index 0000000000000..2240c04227b98
--- /dev/null
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/membership/MembershipEntry.java
@@ -0,0 +1,139 @@
+/*
+ * Copyright (c) 2002-2016 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.coreedge.raft.state.membership;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+
+import org.neo4j.coreedge.raft.state.EndOfStreamException;
+import org.neo4j.coreedge.raft.state.SafeStateMarshal;
+import org.neo4j.coreedge.server.CoreMember;
+import org.neo4j.storageengine.api.ReadableChannel;
+import org.neo4j.storageengine.api.WritableChannel;
+
+/**
+ * Represents a membership entry in the RAFT log.
+ */
+class MembershipEntry
+{
+ private long logIndex;
+ private Set members;
+
+ MembershipEntry( long logIndex, Set members )
+ {
+ this.members = members;
+ this.logIndex = logIndex;
+ }
+
+ public long logIndex()
+ {
+ return logIndex;
+ }
+
+ public Set members()
+ {
+ return members;
+ }
+
+ @Override
+ public boolean equals( Object o )
+ {
+ if ( this == o )
+ { return true; }
+ if ( o == null || getClass() != o.getClass() )
+ { return false; }
+ MembershipEntry that = (MembershipEntry) o;
+ return logIndex == that.logIndex &&
+ Objects.equals( members, that.members );
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash( logIndex, members );
+ }
+
+ @Override
+ public String toString()
+ {
+ return "MembershipEntry{" +
+ "logIndex=" + logIndex +
+ ", members=" + members +
+ '}';
+ }
+
+ static class Marshal extends SafeStateMarshal
+ {
+ CoreMember.CoreMemberMarshal memberMarshal = new CoreMember.CoreMemberMarshal();
+
+ @Override
+ public MembershipEntry startState()
+ {
+ return null;
+ }
+
+ @Override
+ public long ordinal( MembershipEntry entry )
+ {
+ return entry.logIndex;
+ }
+
+ @Override
+ public void marshal( MembershipEntry entry, WritableChannel channel ) throws IOException
+ {
+ if ( entry == null )
+ {
+ channel.putInt( 0 );
+ return;
+ }
+ else
+ {
+ channel.putInt( 1 );
+ }
+
+ channel.putLong( entry.logIndex );
+ channel.putInt( entry.members.size() );
+ for ( CoreMember member : entry.members )
+ {
+ memberMarshal.marshal( member, channel );
+ }
+ }
+
+ @Override
+ public MembershipEntry unmarshal0( ReadableChannel channel ) throws IOException, EndOfStreamException
+ {
+ int hasEntry = channel.getInt();
+ if ( hasEntry == 0 )
+ {
+ return null;
+ }
+ long logIndex = channel.getLong();
+ int memberCount = channel.getInt();
+ Set members = new HashSet<>();
+ for ( int i = 0; i < memberCount; i++ )
+ {
+ members.add( memberMarshal.unmarshal( channel ) );
+ }
+ return new MembershipEntry( logIndex, members );
+ }
+ }
+}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/membership/RaftMembershipState.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/membership/RaftMembershipState.java
index b680fb1c5ae76..fa7b8b216d57a 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/membership/RaftMembershipState.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/membership/RaftMembershipState.java
@@ -21,169 +21,191 @@
import java.io.IOException;
import java.util.HashSet;
+import java.util.Objects;
import java.util.Set;
-import org.neo4j.coreedge.raft.membership.RaftMembership;
-import org.neo4j.coreedge.raft.state.ChannelMarshal;
import org.neo4j.coreedge.raft.state.EndOfStreamException;
import org.neo4j.coreedge.raft.state.SafeStateMarshal;
import org.neo4j.coreedge.server.CoreMember;
+import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.storageengine.api.ReadableChannel;
import org.neo4j.storageengine.api.WritableChannel;
-public class RaftMembershipState implements RaftMembership
+/**
+ * Represents the current state of membership in RAFT and exposes operations
+ * for modifying the state. The valid states and transitions are represented
+ * by the following table:
+ *
+ * state valid transitions
+ * 1: [ - , - ] 2 (append)
+ * 2: [ - , appended ] 1,4 (commit or truncate)
+ * 3: [ committed, appended ] 4 (commit or truncate)
+ * 4: [ committed, - ] 3 (append)
+ *
+ * The transition from 3->4 is either because the appended entry became
+ * the new committed entry or because the appended entry was truncated.
+ *
+ * A committed entry can never be truncated and there can only be a single
+ * outstanding appended entry which usually is committed shortly
+ * thereafter, but it might also be truncated.
+ *
+ * Recovery must in-order replay all the log entries whose effects are not
+ * guaranteed to have been persisted. The handling of these events is
+ * idempotent so it is safe to replay entries which might have been
+ * applied already.
+ *
+ * Note that commit updates occur separately from append/truncation in RAFT
+ * so it is possible to for example observe several membership entries in a row
+ * being appended on a particular member without an intermediate commit, even
+ * though this is not possible in the system as a whole because the leader which
+ * drives the membership change work will not spawn a new entry until it knows
+ * that the previous one has been appended with a quorum, i.e. committed. This
+ * is the reason why that this class is very lax when it comes to updating the
+ * state and not making hard assertions which on a superficial level might
+ * seem obvious. The consensus system as a whole and the membership change
+ * driving logic is relied upon for achieving the correct system level
+ * behaviour.
+ */
+public class RaftMembershipState extends LifecycleAdapter
{
- private Set additionalReplicationMembers = new HashSet<>();
+ private MembershipEntry committed;
+ private MembershipEntry appended;
+ long ordinal; // persistence ordinal must be increased each time we change committed or appended
- private volatile Set votingMembers = new HashSet<>();
- private volatile Set replicationMembers = new HashSet<>(); // votingMembers + additionalReplicationMembers
-
- private final Set listeners;
-
- private long logIndex = -1; // First log index is 0, so -1 is used here as "unknown" value
-
- private RaftMembershipState( Set members, long logIndex )
+ public static RaftMembershipState startState()
{
- this.votingMembers = members;
- this.logIndex = logIndex;
- this.listeners = new HashSet<>( );
- updateReplicationMembers();
+ return new RaftMembershipState( -1, null, null );
}
- public RaftMembershipState()
+ RaftMembershipState( long ordinal, MembershipEntry committed, MembershipEntry appended )
{
- this.listeners = new HashSet<>();
+ this.ordinal = ordinal;
+ this.committed = committed;
+ this.appended = appended;
}
- public synchronized void setVotingMembers( Set newVotingMembers )
+ public boolean append( long logIndex, Set members )
{
- this.votingMembers = new HashSet<>( newVotingMembers );
+ if ( committed != null && logIndex <= committed.logIndex() )
+ {
+ return false;
+ }
- updateReplicationMembers();
- notifyListeners();
- }
+ if ( appended != null && (committed == null || appended.logIndex() > committed.logIndex()) )
+ {
+ /* This might seem counter-intuitive, but seeing two appended entries
+ in a row must mean that the previous one got committed. So it must
+ be recorded as having been committed or a subsequent truncation might
+ erase the state. We also protect against going backwards in the
+ committed state, as might happen during recovery. */
- /**
- * Adds an additional member to replicate to. Members that are joining need to
- * catch up sufficiently before they become part of the voting group.
- *
- * @param member The member which will be added to the replication group.
- */
- public synchronized void addAdditionalReplicationMember( CoreMember member )
- {
- additionalReplicationMembers.add( member );
+ committed = appended;
+ }
- updateReplicationMembers();
- notifyListeners();
+ ordinal++;
+ appended = new MembershipEntry( logIndex, members );
+ return true;
}
- /**
- * Removes a member previously part of the additional replication member group.
- *
- * This either happens because they caught up sufficiently and became part of the
- * voting group or because they failed to catch up in time.
- *
- * @param member The member to remove from the replication group.
- */
- public synchronized void removeAdditionalReplicationMember( CoreMember member )
+ public boolean truncate( long fromIndex )
{
- additionalReplicationMembers.remove( member );
-
- updateReplicationMembers();
- notifyListeners();
- }
+ if ( committed != null && fromIndex <= committed.logIndex() )
+ {
+ throw new IllegalStateException( "Truncating committed entry" );
+ }
- public void logIndex( long logIndex )
- {
- this.logIndex = logIndex;
+ if ( appended != null && fromIndex <= appended.logIndex() )
+ {
+ ordinal++;
+ appended = null;
+ return true;
+ }
+ return false;
}
- private void updateReplicationMembers()
+ public boolean commit( long commitIndex )
{
- HashSet newReplicationMembers = new HashSet<>( votingMembers );
-
- newReplicationMembers.addAll( additionalReplicationMembers );
- this.replicationMembers = newReplicationMembers;
+ if ( appended != null && commitIndex >= appended.logIndex() )
+ {
+ ordinal++;
+ committed = appended;
+ appended = null;
+ return true;
+ }
+ return false;
}
- @Override
- public Set votingMembers()
+ public boolean uncommittedMemberChangeInLog()
{
- return new HashSet<>( votingMembers );
+ return appended != null;
}
- @Override
- public Set replicationMembers()
+ public Set getLatest()
{
- return new HashSet<>( replicationMembers );
+ return appended != null ? appended.members() :
+ committed != null ? committed.members() : new HashSet<>();
}
@Override
- public long logIndex()
+ public boolean equals( Object o )
{
- return logIndex;
+ if ( this == o )
+ { return true; }
+ if ( o == null || getClass() != o.getClass() )
+ { return false; }
+ RaftMembershipState that = (RaftMembershipState) o;
+ return ordinal == that.ordinal &&
+ Objects.equals( committed, that.committed ) &&
+ Objects.equals( appended, that.appended );
}
@Override
- public synchronized void registerListener( Listener listener )
+ public int hashCode()
{
- listeners.add( listener );
+ return Objects.hash( committed, appended, ordinal );
}
@Override
- public synchronized void deregisterListener( Listener listener )
+ public String toString()
{
- listeners.remove( listener );
- }
-
- private void notifyListeners()
- {
- listeners.forEach( Listener::onMembershipChanged );
+ return "RaftMembershipState{" +
+ "committed=" + committed +
+ ", appended=" + appended +
+ ", ordinal=" + ordinal +
+ '}';
}
public static class Marshal extends SafeStateMarshal
{
- private final ChannelMarshal memberMarshal;
-
- public Marshal( ChannelMarshal marshal )
- {
- this.memberMarshal = marshal;
- }
+ MembershipEntry.Marshal entryMarshal = new MembershipEntry.Marshal();
@Override
- public void marshal( RaftMembershipState state, WritableChannel channel ) throws IOException
+ public RaftMembershipState startState()
{
- channel.putLong( state.logIndex );
- channel.putInt( state.votingMembers.size() );
- for ( CoreMember votingMember : state.votingMembers )
- {
- memberMarshal.marshal( votingMember, channel );
- }
+ return RaftMembershipState.startState();
}
@Override
- public RaftMembershipState unmarshal0( ReadableChannel channel ) throws IOException, EndOfStreamException
+ public long ordinal( RaftMembershipState state )
{
- long logIndex = channel.getLong();
- int memberCount = channel.getInt();
- Set members = new HashSet<>();
- for ( int i = 0; i < memberCount; i++ )
- {
- members.add( memberMarshal.unmarshal( channel ) );
- }
- return new RaftMembershipState( members, logIndex );
+ return state.ordinal;
}
@Override
- public RaftMembershipState startState()
+ public void marshal( RaftMembershipState state, WritableChannel channel ) throws IOException
{
- return new RaftMembershipState();
+ channel.putLong( state.ordinal );
+ entryMarshal.marshal( state.committed, channel );
+ entryMarshal.marshal( state.appended, channel );
}
@Override
- public long ordinal( RaftMembershipState state )
+ public RaftMembershipState unmarshal0( ReadableChannel channel ) throws IOException, EndOfStreamException
{
- return state.logIndex();
+ long ordinal = channel.getLong();
+ MembershipEntry committed = entryMarshal.unmarshal( channel );
+ MembershipEntry appended = entryMarshal.unmarshal( channel );
+ return new RaftMembershipState( ordinal, committed, appended );
}
}
}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/CoreServerStartupProcess.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/CoreServerStartupProcess.java
index 0ee65b295b1b9..91d0c17998a34 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/CoreServerStartupProcess.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/CoreServerStartupProcess.java
@@ -55,7 +55,6 @@ public static LifeSupport createLifeSupport( DataSourceManager dataSourceManager
services.add( catchupServer );
services.add( raftTimeoutService );
services.add( new MembershipWaiterLifecycle( membershipWaiter, joinCatchupTimeout, raft, raftServer, logProvider ) );
-
return services;
}
@@ -68,7 +67,7 @@ private static class MembershipWaiterLifecycle extends LifecycleAdapter
private final Log log;
private MembershipWaiterLifecycle( MembershipWaiter membershipWaiter, Long joinCatchupTimeout,
- RaftInstance raft, RaftServer raftServer, LogProvider logProvider )
+ RaftInstance raft, RaftServer raftServer, LogProvider logProvider )
{
this.membershipWaiter = membershipWaiter;
this.joinCatchupTimeout = joinCatchupTimeout;
@@ -86,10 +85,10 @@ public void start() throws Throwable
{
caughtUp.get( joinCatchupTimeout, MILLISECONDS );
}
- catch(ExecutionException e)
+ catch ( ExecutionException e )
{
log.error( "Server failed to join cluster", e.getCause() );
- throw e.getCause() ;
+ throw e.getCause();
}
catch ( InterruptedException | TimeoutException e )
{
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java
index 253cbcc3d8b10..f689d56486051 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java
@@ -190,8 +190,8 @@ public void registerProcedures( Procedures procedures )
}
}
- EnterpriseCoreEditionModule(final PlatformModule platformModule,
- DiscoveryServiceFactory discoveryServiceFactory)
+ EnterpriseCoreEditionModule( final PlatformModule platformModule,
+ DiscoveryServiceFactory discoveryServiceFactory )
{
ioLimiter = new ConfigurableIOLimiter( platformModule.config );
@@ -320,8 +320,8 @@ public void registerProcedures( Procedures procedures )
CoreState coreState;
CoreStateApplier coreStateApplier = new CoreStateApplier( logProvider );
- CoreStateDownloader downloader =
- new CoreStateDownloader( localDatabase, storeFetcher, coreToCoreClient, logProvider );
+ CoreStateDownloader downloader = new CoreStateDownloader( localDatabase, storeFetcher,
+ coreToCoreClient, logProvider );
InFlightMap inFlightMap = new InFlightMap<>();
@@ -357,7 +357,7 @@ public void registerProcedures( Procedures procedures )
raftMembershipStorage = life.add(
new DurableStateStorage<>( fileSystem, new File( clusterStateDirectory, "membership-state" ),
- "membership-state", new RaftMembershipState.Marshal( new CoreMemberMarshal() ),
+ "membership-state", new RaftMembershipState.Marshal(),
config.get( CoreEdgeClusterSettings.raft_membership_state_size ), databaseHealthSupplier,
logProvider ) );
}
@@ -382,7 +382,10 @@ public void registerProcedures( Procedures procedures )
RaftMembershipManager raftMembershipManager =
new RaftMembershipManager( leaderOnlyReplicator, memberSetBuilder, raftLog, logProvider,
expectedClusterSize, electionTimeout1, systemUTC(),
- config.get( CoreEdgeClusterSettings.join_catch_up_timeout ), raftMembershipStorage );
+ config.get( CoreEdgeClusterSettings.join_catch_up_timeout ), raftMembershipStorage,
+ lastFlushedStorage.getInitialState() );
+
+ life.add( raftMembershipManager );
RaftLogShippingManager logShipping =
new RaftLogShippingManager( loggingOutbound, logProvider, raftLog, systemUTC(),
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/ClusterIdentityIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/ClusterIdentityIT.java
index b4882390404fd..66e364a985436 100644
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/ClusterIdentityIT.java
+++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/ClusterIdentityIT.java
@@ -46,7 +46,6 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
-
import static org.neo4j.coreedge.TestStoreId.assertAllStoresHaveTheSameStoreId;
import static org.neo4j.graphdb.Label.label;
import static org.neo4j.kernel.impl.store.MetaDataStore.Position.RANDOM_NUMBER;
@@ -77,7 +76,8 @@ public void setup() throws Exception
public void allServersShouldHaveTheSameStoreId() throws Throwable
{
// WHEN
- cluster.coreTx( ( db, tx ) -> {
+ cluster.coreTx( ( db, tx ) ->
+ {
Node node = db.createNode( label( "boo" ) );
node.setProperty( "foobar", "baz_bat" );
tx.success();
@@ -95,20 +95,21 @@ public void allServersShouldHaveTheSameStoreId() throws Throwable
public void whenWeRestartTheClusterAllServersShouldStillHaveTheSameStoreId() throws Throwable
{
// GIVEN
- cluster.coreTx( ( db, tx ) -> {
+ cluster.coreTx( ( db, tx ) ->
+ {
Node node = db.createNode( label( "boo" ) );
node.setProperty( "foobar", "baz_bat" );
tx.success();
} );
cluster.shutdown();
-
// WHEN
cluster.start();
List coreStoreDirs = storeDirs( cluster.coreServers() );
- cluster.coreTx( ( db, tx ) -> {
+ cluster.coreTx( ( db, tx ) ->
+ {
Node node = db.createNode( label( "boo" ) );
node.setProperty( "foobar", "baz_bat" );
tx.success();
@@ -124,7 +125,8 @@ public void whenWeRestartTheClusterAllServersShouldStillHaveTheSameStoreId() thr
public void shouldNotJoinClusterIfHasDataWithDifferentStoreId() throws Exception
{
// GIVEN
- cluster.coreTx( ( db, tx ) -> {
+ cluster.coreTx( ( db, tx ) ->
+ {
Node node = db.createNode( label( "boo" ) );
node.setProperty( "foobar", "baz_bat" );
tx.success();
@@ -143,7 +145,7 @@ public void shouldNotJoinClusterIfHasDataWithDifferentStoreId() throws Exception
}
catch ( RuntimeException e )
{
- assertThat(e.getCause(), instanceOf(LifecycleException.class));
+ assertThat( e.getCause(), instanceOf( LifecycleException.class ) );
}
}
@@ -151,7 +153,8 @@ public void shouldNotJoinClusterIfHasDataWithDifferentStoreId() throws Exception
public void laggingFollowerShouldDownloadSnapshot() throws Exception
{
// GIVEN
- cluster.coreTx( ( db, tx ) -> {
+ cluster.coreTx( ( db, tx ) ->
+ {
Node node = db.createNode( label( "boo" ) );
node.setProperty( "foobar", "baz_bat" );
tx.success();
@@ -183,7 +186,8 @@ public void laggingFollowerShouldDownloadSnapshot() throws Exception
public void badFollowerShouldNotJoinCluster() throws Exception
{
// GIVEN
- cluster.coreTx( ( db, tx ) -> {
+ cluster.coreTx( ( db, tx ) ->
+ {
Node node = db.createNode( label( "boo" ) );
node.setProperty( "foobar", "baz_bat" );
tx.success();
@@ -208,7 +212,7 @@ public void badFollowerShouldNotJoinCluster() throws Exception
}
catch ( RuntimeException e )
{
- assertThat(e.getCause(), instanceOf(LifecycleException.class));
+ assertThat( e.getCause(), instanceOf( LifecycleException.class ) );
}
}
@@ -216,7 +220,8 @@ public void badFollowerShouldNotJoinCluster() throws Exception
public void aNewServerShouldJoinTheClusterByDownloadingASnapshot() throws Exception
{
// GIVEN
- cluster.coreTx( ( db, tx ) -> {
+ cluster.coreTx( ( db, tx ) ->
+ {
Node node = db.createNode( label( "boo" ) );
node.setProperty( "foobar", "baz_bat" );
tx.success();
@@ -251,7 +256,8 @@ private void createSomeData( int items, Cluster cluster ) throws TimeoutExceptio
{
for ( int i = 0; i < items; i++ )
{
- cluster.coreTx( ( db, tx ) -> {
+ cluster.coreTx( ( db, tx ) ->
+ {
Node node = db.createNode( label( "boo" ) );
node.setProperty( "foobar", "baz_bat" );
tx.success();
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/Cluster.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/Cluster.java
index d910d66f5f0d4..35d9464e3d905 100644
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/Cluster.java
+++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/discovery/Cluster.java
@@ -32,6 +32,7 @@
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BiConsumer;
@@ -46,12 +47,15 @@
import org.neo4j.coreedge.server.core.CoreGraphDatabase;
import org.neo4j.coreedge.server.core.locks.LeaderOnlyLockManager;
import org.neo4j.coreedge.server.edge.EdgeGraphDatabase;
+import org.neo4j.function.Predicates;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.TransactionFailureException;
+import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.store.format.standard.StandardV3_0;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.storageengine.api.lock.AcquireLockTimeoutException;
+import org.neo4j.test.DbRepresentation;
import static java.util.Collections.emptyMap;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -86,7 +90,7 @@ public Cluster( File parentDir, int noOfCoreServers, int noOfEdgeServers,
public void start() throws InterruptedException, ExecutionException
{
- ExecutorService executor = Executors.newCachedThreadPool();
+ ExecutorService executor = Executors.newCachedThreadPool( new NamedThreadFactory( "server-starter" ) );
try
{
startCoreServers( executor );
@@ -376,8 +380,7 @@ private void createCoreServers( final int noOfCoreServers,
for ( int i = 0; i < noOfCoreServers; i++ )
{
CoreServer coreServer = new CoreServer( i, noOfCoreServers, addresses, discoveryServiceFactory,
- recordFormat, parentDir,
- extraParams, instanceExtraParams );
+ recordFormat, parentDir, extraParams, instanceExtraParams );
coreServers.put( i, coreServer );
}
}
@@ -437,4 +440,14 @@ private void shutdownEdgeServers()
edgeServers.values().forEach( EdgeServer::shutdown );
}
+ public static void dataMatchesEventually( CoreServer server, Collection targetDBs ) throws TimeoutException, InterruptedException
+ {
+ CoreGraphDatabase sourceDB = server.database();
+ DbRepresentation sourceRepresentation = DbRepresentation.of( sourceDB );
+ for ( CoreServer targetDB : targetDBs )
+ {
+ Predicates.await( () -> sourceRepresentation.equals( DbRepresentation.of( targetDB.database() ) ),
+ DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS );
+ }
+ }
}
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceBuilder.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceBuilder.java
index fe9acfeb35193..f687b1b8178cb 100644
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceBuilder.java
+++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceBuilder.java
@@ -83,7 +83,7 @@ public void send( CoreMember to, Collection raftMessag
private int maxAllowedShippingLag = 256;
private Supplier databaseHealthSupplier;
private StateStorage raftMembership =
- new InMemoryStateStorage<>( new RaftMembershipState() );
+ new InMemoryStateStorage<>( RaftMembershipState.startState() );
private Monitors monitors = new Monitors();
private RaftStateMachine raftStateMachine = new EmptyStateMachine();
private final InFlightMap inFlightMap;
@@ -101,7 +101,7 @@ public RaftInstance build()
SendToMyself leaderOnlyReplicator = new SendToMyself( member, outbound );
RaftMembershipManager membershipManager = new RaftMembershipManager( leaderOnlyReplicator,
memberSetBuilder, raftLog, logProvider, expectedClusterSize, electionTimeout, clock, catchupTimeout,
- raftMembership );
+ raftMembership, 0 );
RaftLogShippingManager logShipping =
new RaftLogShippingManager( outbound, logProvider, raftLog, clock, member, membershipManager,
retryTimeMillis, catchupBatchSize, maxAllowedShippingLag, inFlightMap );
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/membership/RaftMembershipManagerTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/membership/RaftMembershipManagerTest.java
index 02c7391ced3c0..eb88b5852e655 100644
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/membership/RaftMembershipManagerTest.java
+++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/membership/RaftMembershipManagerTest.java
@@ -21,16 +21,14 @@
import org.junit.Test;
-import java.util.Collections;
import java.util.List;
import org.neo4j.coreedge.raft.log.InMemoryRaftLog;
import org.neo4j.coreedge.raft.log.RaftLogEntry;
import org.neo4j.coreedge.raft.outcome.AppendLogEntry;
-import org.neo4j.coreedge.raft.outcome.LogCommand;
+import org.neo4j.coreedge.raft.outcome.RaftLogCommand;
import org.neo4j.coreedge.raft.outcome.TruncateLogCommand;
import org.neo4j.coreedge.raft.state.InMemoryStateStorage;
-import org.neo4j.coreedge.raft.state.StateStorage;
import org.neo4j.coreedge.raft.state.membership.RaftMembershipState;
import org.neo4j.coreedge.server.RaftTestMemberSetBuilder;
import org.neo4j.logging.NullLogProvider;
@@ -39,12 +37,6 @@
import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
public class RaftMembershipManagerTest
{
@@ -58,7 +50,7 @@ public void membershipManagerShouldUseLatestAppendedMembershipSetEntries()
RaftMembershipManager membershipManager = new RaftMembershipManager(
null, RaftTestMemberSetBuilder.INSTANCE, log,
NullLogProvider.getInstance(), 3, 1000, new FakeClock(),
- 1000, new InMemoryStateStorage<>( new RaftMembershipState() ) );
+ 1000, new InMemoryStateStorage<>( new RaftMembershipState.Marshal().startState() ), 0 );
// when
membershipManager.processLog( 0, asList(
@@ -80,16 +72,16 @@ public void membershipManagerShouldRevertToOldMembershipSetAfterTruncationCauses
RaftMembershipManager membershipManager = new RaftMembershipManager(
null,
RaftTestMemberSetBuilder.INSTANCE, log, NullLogProvider.getInstance(), 3, 1000, new FakeClock(),
- 1000, new InMemoryStateStorage<>( new RaftMembershipState() ) );
+ 1000, new InMemoryStateStorage<>( new RaftMembershipState.Marshal().startState() ), 0 );
// when
- List logCommands = asList(
+ List logCommands = asList(
new AppendLogEntry( 0, new RaftLogEntry( 0, new RaftTestGroup( 1, 2, 3, 4 ) ) ),
new AppendLogEntry( 1, new RaftLogEntry( 0, new RaftTestGroup( 1, 2, 3, 5 ) ) ),
new TruncateLogCommand( 1 )
);
- for ( LogCommand logCommand : logCommands )
+ for ( RaftLogCommand logCommand : logCommands )
{
logCommand.applyTo( log );
}
@@ -110,16 +102,16 @@ public void membershipManagerShouldRevertToEarlierAppendedMembershipSetAfterTrun
RaftMembershipManager membershipManager = new RaftMembershipManager(
null,
RaftTestMemberSetBuilder.INSTANCE, log, NullLogProvider.getInstance(), 3, 1000, new FakeClock(),
- 1000, new InMemoryStateStorage<>( new RaftMembershipState() ) );
+ 1000, new InMemoryStateStorage<>( new RaftMembershipState.Marshal().startState() ), 0 );
// when
- List logCommands = asList(
+ List logCommands = asList(
new AppendLogEntry( 0, new RaftLogEntry( 0, new RaftTestGroup( 1, 2, 3, 4 ) ) ),
new AppendLogEntry( 1, new RaftLogEntry( 0, new RaftTestGroup( 1, 2, 3, 5 ) ) ),
new AppendLogEntry( 2, new RaftLogEntry( 0, new RaftTestGroup( 1, 2, 3, 6 ) ) ),
new TruncateLogCommand( 2 )
);
- for ( LogCommand logCommand : logCommands )
+ for ( RaftLogCommand logCommand : logCommands )
{
logCommand.applyTo( log );
}
@@ -127,31 +119,5 @@ public void membershipManagerShouldRevertToEarlierAppendedMembershipSetAfterTrun
// then
assertEquals( new RaftTestGroup( 1, 2, 3, 5 ).getMembers(), membershipManager.votingMembers() );
- assertTrue( membershipManager.uncommittedMemberChangeInLog() );
- }
-
- @Test
- public void shouldNotOverwriteCurrentStateWithPreviousState() throws Exception
- {
- // given
- final InMemoryRaftLog log = new InMemoryRaftLog();
-
- RaftMembershipState state = new RaftMembershipState();
- state.logIndex( 42L );
-
- final StateStorage stateStorage = mock( StateStorage.class );
- when( stateStorage.getInitialState() ).thenReturn( state );
-
- RaftMembershipManager membershipManager = new RaftMembershipManager(
- null,
- RaftTestMemberSetBuilder.INSTANCE, log, NullLogProvider.getInstance(), 3, 1000, new FakeClock(),
- 1000, stateStorage );
-
- // when
- membershipManager.processLog( 0, Collections.singletonList( new AppendLogEntry( 0, new RaftLogEntry( 0, new
- RaftTestGroup( 1, 2, 3, 4 ) ) ) ) );
-
- // then
- verify( stateStorage, times( 0 ) ).persistStoreData( any() );
}
}
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/id/StateRecoveryManagerTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/id/StateRecoveryManagerTest.java
index 2b7b9cc7a700a..150cbde28803a 100644
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/id/StateRecoveryManagerTest.java
+++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/id/StateRecoveryManagerTest.java
@@ -27,7 +27,6 @@
import java.io.IOException;
import java.nio.ByteBuffer;
-import org.neo4j.coreedge.raft.state.EndOfStreamException;
import org.neo4j.coreedge.raft.state.SafeStateMarshal;
import org.neo4j.coreedge.raft.state.StateRecoveryManager;
import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction;
@@ -257,7 +256,7 @@ public void marshal( Long aLong, WritableChannel channel ) throws IOException
}
@Override
- protected Long unmarshal0( ReadableChannel channel ) throws IOException, EndOfStreamException
+ protected Long unmarshal0( ReadableChannel channel ) throws IOException
{
return channel.getLong();
}
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/AppendingTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/AppendingTest.java
index 107032b91f986..7e464ee1010a6 100644
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/AppendingTest.java
+++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/AppendingTest.java
@@ -27,7 +27,7 @@
import org.neo4j.coreedge.raft.ReplicatedInteger;
import org.neo4j.coreedge.raft.log.RaftLogEntry;
import org.neo4j.coreedge.raft.log.ReadableRaftLog;
-import org.neo4j.coreedge.raft.outcome.LogCommand;
+import org.neo4j.coreedge.raft.outcome.RaftLogCommand;
import org.neo4j.coreedge.raft.outcome.Outcome;
import org.neo4j.coreedge.raft.outcome.TruncateLogCommand;
import org.neo4j.coreedge.raft.state.ReadableRaftState;
@@ -181,7 +181,7 @@ public void shouldNotAttemptToTruncateAtIndexBeforeTheLogPrevIndex() throws Exce
verify( outcome, times( 0 ) ).addLogCommand( any() );
}
- private static class LogCommandMatcher extends TypeSafeMatcher
+ private static class LogCommandMatcher extends TypeSafeMatcher
{
private final long truncateIndex;
@@ -191,7 +191,7 @@ private LogCommandMatcher( long truncateIndex )
}
@Override
- protected boolean matchesSafely( LogCommand item )
+ protected boolean matchesSafely( RaftLogCommand item )
{
return item instanceof TruncateLogCommand && ((TruncateLogCommand) item).fromIndex == truncateIndex;
}
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateTest.java
index d83b822104dc9..7714b88ce9ab2 100644
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateTest.java
+++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateTest.java
@@ -89,7 +89,7 @@ public class CoreStateTest
{
when( coreStateMachines.commandDispatcher() ).thenReturn( commandDispatcher );
- when( coreStateMachines.getApplyingIndex() ).thenReturn( -1L );
+ when( coreStateMachines.getLastAppliedIndex() ).thenReturn( -1L );
}
private int sequenceNumber = 0;
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/RaftStateBuilder.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/RaftStateBuilder.java
index 3064bbe93636f..54f63df6a860b 100644
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/RaftStateBuilder.java
+++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/RaftStateBuilder.java
@@ -30,7 +30,7 @@
import org.neo4j.coreedge.raft.log.RaftLog;
import org.neo4j.coreedge.raft.log.segmented.InFlightMap;
import org.neo4j.coreedge.raft.membership.RaftMembership;
-import org.neo4j.coreedge.raft.outcome.LogCommand;
+import org.neo4j.coreedge.raft.outcome.RaftLogCommand;
import org.neo4j.coreedge.raft.outcome.Outcome;
import org.neo4j.coreedge.raft.state.follower.FollowerStates;
import org.neo4j.coreedge.raft.state.term.TermState;
@@ -130,7 +130,7 @@ public RaftState build() throws IOException
new RaftState( myself, termStore, membership, entryLog, voteStore, new InFlightMap<>(), NullLogProvider.getInstance() );
Collection noMessages = Collections.emptyList();
- List noLogCommands = Collections.emptyList();
+ List noLogCommands = Collections.emptyList();
state.update( new Outcome( null, term, leader, leaderCommit, votedFor, votesForMe, lastLogIndexBeforeWeBecameLeader,
followerStates, false, noLogCommands, noMessages, Collections.emptySet(), commitIndex ) );
@@ -162,22 +162,10 @@ public Set replicationMembers()
return votingMembers;
}
- @Override
- public long logIndex()
- {
- throw new UnsupportedOperationException();
- }
-
@Override
public void registerListener( Listener listener )
{
throw new UnsupportedOperationException();
}
-
- @Override
- public void deregisterListener( Listener listener )
- {
- throw new UnsupportedOperationException();
- }
}
}
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/RaftStateTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/RaftStateTest.java
index 914619785352d..db2b4b633d210 100644
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/RaftStateTest.java
+++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/RaftStateTest.java
@@ -35,7 +35,7 @@
import org.neo4j.coreedge.raft.log.segmented.InFlightMap;
import org.neo4j.coreedge.raft.membership.RaftMembership;
import org.neo4j.coreedge.raft.outcome.AppendLogEntry;
-import org.neo4j.coreedge.raft.outcome.LogCommand;
+import org.neo4j.coreedge.raft.outcome.RaftLogCommand;
import org.neo4j.coreedge.raft.outcome.Outcome;
import org.neo4j.coreedge.raft.outcome.TruncateLogCommand;
import org.neo4j.coreedge.raft.state.follower.FollowerState;
@@ -66,7 +66,7 @@ public void shouldUpdateCacheState() throws Exception
new InMemoryStateStorage<>( new TermState() ), new FakeMembership(), new InMemoryRaftLog(),
new InMemoryStateStorage<>( new VoteState() ), cache, NullLogProvider.getInstance() );
- List logCommands = new LinkedList()
+ List logCommands = new LinkedList()
{{
add( new AppendLogEntry( 1, new RaftLogEntry( 0L, valueOf( 0 ) ) ) );
add( new AppendLogEntry( 2, new RaftLogEntry( 0L, valueOf( 1 ) ) ) );
@@ -122,7 +122,7 @@ private FollowerStates initialFollowerStates()
return new FollowerStates<>( new FollowerStates<>(), member( 1 ), new FollowerState() );
}
- private Collection emptyLogCommands()
+ private Collection emptyLogCommands()
{
return Collections.emptyList();
}
@@ -141,22 +141,10 @@ public Set replicationMembers()
return emptySet();
}
- @Override
- public long logIndex()
- {
- return -1;
- }
-
@Override
public void registerListener( Listener listener )
{
throw new UnsupportedOperationException();
}
-
- @Override
- public void deregisterListener( Listener listener )
- {
- throw new UnsupportedOperationException();
- }
}
}
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/explorer/ComparableRaftState.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/explorer/ComparableRaftState.java
index 5eff0e382b9bf..dcb2b29ec98d9 100644
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/explorer/ComparableRaftState.java
+++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/explorer/ComparableRaftState.java
@@ -28,7 +28,7 @@
import org.neo4j.coreedge.raft.log.RaftLogEntry;
import org.neo4j.coreedge.raft.log.ReadableRaftLog;
import org.neo4j.coreedge.raft.log.segmented.InFlightMap;
-import org.neo4j.coreedge.raft.outcome.LogCommand;
+import org.neo4j.coreedge.raft.outcome.RaftLogCommand;
import org.neo4j.coreedge.raft.outcome.Outcome;
import org.neo4j.coreedge.raft.state.ReadableRaftState;
import org.neo4j.coreedge.raft.state.follower.FollowerStates;
@@ -149,7 +149,7 @@ public void update( Outcome outcome ) throws IOException
lastLogIndexBeforeWeBecameLeader = outcome.getLastLogIndexBeforeWeBecameLeader();
followerStates= outcome.getFollowerStates();
- for ( LogCommand logCommand : outcome.getLogCommands() )
+ for ( RaftLogCommand logCommand : outcome.getLogCommands() )
{
logCommand.applyTo( entryLog );
logCommand.applyTo( inFlightMap );
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/membership/RaftMembershipStateTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/membership/RaftMembershipStateTest.java
index b3efffce7d282..f7c6045c621b3 100644
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/membership/RaftMembershipStateTest.java
+++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/membership/RaftMembershipStateTest.java
@@ -19,49 +19,131 @@
*/
package org.neo4j.coreedge.raft.state.membership;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import org.junit.Test;
-import org.neo4j.coreedge.raft.membership.RaftTestGroup;
+import java.util.Set;
+
+import org.neo4j.coreedge.raft.net.NetworkFlushableChannelNetty4;
+import org.neo4j.coreedge.raft.net.NetworkReadableClosableChannelNetty4;
import org.neo4j.coreedge.server.CoreMember;
-import org.neo4j.kernel.impl.transaction.log.InMemoryClosableChannel;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.neo4j.coreedge.server.RaftTestMember.member;
+import static org.neo4j.helpers.collection.Iterators.asSet;
public class RaftMembershipStateTest
{
+ private RaftMembershipState state = RaftMembershipState.startState();
+
+ private Set membersA = asSet( member( 0 ), member( 1 ), member( 2 ) );
+ private Set membersB = asSet( member( 0 ), member( 1 ), member( 2 ), member( 3 ) );
+
+ @Test
+ public void shouldHaveCorrectInitialState() throws Exception
+ {
+ assertThat( state.getLatest(), hasSize( 0 ) );
+ assertFalse( state.uncommittedMemberChangeInLog() );
+ }
+
+ @Test
+ public void shouldUpdateLatestOnAppend() throws Exception
+ {
+ // when
+ state.append( 0, membersA );
+
+ // then
+ assertEquals( state.getLatest(), membersA );
+
+ // when
+ state.append( 1, membersB );
+
+ // then
+ assertEquals( state.getLatest(), membersB );
+ assertEquals( 1, state.ordinal );
+ }
+
@Test
- public void shouldSerialiseAndDeserialiseEmptyStateCorrectly() throws Exception
+ public void shouldKeepLatestOnCommit() throws Exception
{
// given
- RaftMembershipState state = new RaftMembershipState();
- RaftMembershipState.Marshal marshal = new RaftMembershipState.Marshal( new CoreMember.CoreMemberMarshal() );
+ state.append( 0, membersA );
+ state.append( 1, membersB );
// when
- InMemoryClosableChannel channel = new InMemoryClosableChannel();
- marshal.marshal( state, channel );
- final RaftMembershipState recovered = marshal.unmarshal( channel );
+ state.commit( 0 );
// then
- assertEquals( state.votingMembers(), recovered.votingMembers() );
+ assertEquals( state.getLatest(), membersB );
+ assertTrue( state.uncommittedMemberChangeInLog() );
+ assertEquals( 1, state.ordinal );
}
@Test
- public void shouldSerialiseAndDeserialiseNonEmptyStateCorrectly() throws Exception
+ public void shouldLowerUncommittedFlagOnCommit() throws Exception
{
// given
- RaftMembershipState state = new RaftMembershipState();
- RaftMembershipState.Marshal serializer = new RaftMembershipState.Marshal( new CoreMember.CoreMemberMarshal() );
+ state.append( 0, membersA );
+ assertTrue( state.uncommittedMemberChangeInLog() );
+
+ // when
+ state.commit( 0 );
- RaftTestGroup coreMembers = new RaftTestGroup( 1, 2, 3 ,4 );
+ // then
+ assertFalse( state.uncommittedMemberChangeInLog() );
+ }
- state.setVotingMembers( coreMembers.getMembers() );
+ @Test
+ public void shouldRevertToCommittedStateOnTruncation() throws Exception
+ {
+ // given
+ state.append( 0, membersA );
+ state.commit( 0 );
+ state.append( 1, membersB );
+ assertEquals( state.getLatest(), membersB );
+
+ // when
+ state.truncate( 1 );
+
+ // then
+ assertEquals( state.getLatest(), membersA );
+ assertEquals( 3, state.ordinal );
+ }
+
+ @Test
+ public void shouldNotTruncateEarlierThanIndicated() throws Exception
+ {
+ // given
+ state.append( 0, membersA );
+ state.append( 1, membersB );
+ assertEquals( state.getLatest(), membersB );
+
+ // when
+ state.truncate( 2 );
+
+ // then
+ assertEquals( state.getLatest(), membersB );
+ assertEquals( 1, state.ordinal );
+ }
+
+ @Test
+ public void shouldMarshalCorrectly() throws Exception
+ {
+ // given
+ RaftMembershipState.Marshal marshal = new RaftMembershipState.Marshal();
+ state = new RaftMembershipState( 5, new MembershipEntry( 7, membersA ), new MembershipEntry( 8, membersB ) );
// when
- InMemoryClosableChannel channel = new InMemoryClosableChannel();
- serializer.marshal( state, channel );
- final RaftMembershipState recovered = serializer.unmarshal( channel );
+ ByteBuf buffer = Unpooled.buffer( 1_000 );
+ marshal.marshal( state, new NetworkFlushableChannelNetty4( buffer ) );
+ final RaftMembershipState recovered = marshal.unmarshal( new NetworkReadableClosableChannelNetty4( buffer ) );
// then
- assertEquals( state.votingMembers(), recovered.votingMembers() );
+ assertEquals( state, recovered );
}
}
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreServerReplicationIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreServerReplicationIT.java
index 311700aca6f99..e5b55275bce4d 100644
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreServerReplicationIT.java
+++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/CoreServerReplicationIT.java
@@ -37,6 +37,7 @@
import org.neo4j.test.coreedge.ClusterRule;
import static org.junit.Assert.assertEquals;
+import static org.neo4j.coreedge.discovery.Cluster.dataMatchesEventually;
import static org.neo4j.graphdb.Label.label;
import static org.neo4j.helpers.collection.Iterables.count;
@@ -157,16 +158,4 @@ private long countNodes( CoreServer server )
}
return count;
}
-
- private void dataMatchesEventually( CoreServer server, Collection targetDBs ) throws
- TimeoutException, InterruptedException
- {
- CoreGraphDatabase sourceDB = server.database();
- DbRepresentation sourceRepresentation = DbRepresentation.of( sourceDB );
- for ( CoreServer targetDB : targetDBs )
- {
- Predicates.await( () -> sourceRepresentation.equals( DbRepresentation.of( targetDB.database() ) ),
- DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS );
- }
- }
}
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/RestartIT.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/RestartIT.java
index 12e209fe8d12a..25ff6f740af83 100644
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/RestartIT.java
+++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/scenarios/RestartIT.java
@@ -42,6 +42,7 @@
import static org.junit.Assert.assertTrue;
+import static org.neo4j.coreedge.discovery.Cluster.dataMatchesEventually;
import static org.neo4j.graphdb.Label.label;
public class RestartIT
@@ -117,6 +118,28 @@ public void restartWhileDoingTransactions() throws Exception
executor.shutdown();
}
+ @Test
+ public void shouldHaveWritableClusterAfterCompleteRestart() throws Exception
+ {
+ // given
+ Cluster cluster = clusterRule.startCluster();
+ cluster.shutdown();
+
+ // when
+ cluster.start();
+
+ CoreServer last = cluster.coreTx( ( db, tx ) ->
+ {
+ Node node = db.createNode( label( "boo" ) );
+ node.setProperty( "foobar", "baz_bat" );
+ tx.success();
+ } );
+
+ // then
+ dataMatchesEventually( last, cluster.coreServers() );
+ cluster.shutdown();
+ }
+
@Test
public void edgeTest() throws Exception
{