Skip to content

Commit

Permalink
core-edge: better membership storage
Browse files Browse the repository at this point in the history
The RAFT membership storage is now in line with the intended design
and only stores a single committed state and potentially an additional
appended state. This simplifies the implementation considerably and
improves performance since the dependence on the RAFT log as the
underlying storage is removed. This mismatch was forcing extremely
bad usage patterns of the RAFT log from the membership manager,
causing unnecessary linear scans.
  • Loading branch information
martinfurmanski authored and jimwebber committed Jul 14, 2016
1 parent cd4c108 commit 3fda82c
Show file tree
Hide file tree
Showing 34 changed files with 682 additions and 475 deletions.
Expand Up @@ -312,7 +312,6 @@ public synchronized void handle( RaftMessages.RaftMessage incomingMessage )
handleTimers( outcome );
handleLogShipping( outcome );

membershipManager.processLog( outcome.getCommitIndex(), outcome.getLogCommands() );
driveMembership( outcome );

volatileLeader.set( outcome.getLeader() );
Expand All @@ -334,8 +333,10 @@ public synchronized void handle( RaftMessages.RaftMessage incomingMessage )
}
}

private void driveMembership( Outcome outcome )
private void driveMembership( Outcome outcome ) throws IOException
{
membershipManager.processLog( outcome.getCommitIndex(), outcome.getLogCommands() );

currentRole = outcome.getRole();
membershipManager.onRole( currentRole );

Expand Down

This file was deleted.

Expand Up @@ -40,17 +40,13 @@ public interface RaftMembership
*/
Set<CoreMember> replicationMembers();

long logIndex();

/**
* Register a membership listener.
*
* @param listener The listener.
*/
void registerListener( RaftMembership.Listener listener );

void deregisterListener( RaftMembership.Listener listener );

/**
* This interface must be implemented from whoever wants to be notified of membership changes. Membership changes
* are additions to and removals from the voting and replication members set.
Expand Down
Expand Up @@ -26,7 +26,6 @@
import org.neo4j.coreedge.raft.log.ReadableRaftLog;
import org.neo4j.coreedge.raft.roles.Role;
import org.neo4j.coreedge.raft.state.follower.FollowerStates;
import org.neo4j.coreedge.raft.state.membership.RaftMembershipState;
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
Expand Down Expand Up @@ -59,7 +58,7 @@
*
* Only a single member change is handled at a time.
*/
class RaftMembershipStateMachine
class RaftMembershipChanger
{
private final Log log;
public RaftMembershipStateMachineEventHandler state = new Inactive();
Expand All @@ -68,22 +67,19 @@ class RaftMembershipStateMachine
private final Clock clock;
private final long electionTimeout;

private final MembershipDriver membershipDriver;
private final RaftMembershipManager membershipManager;
private long catchupTimeout;
private final RaftMembershipState membershipState;

private CoreMember catchingUpMember;

RaftMembershipStateMachine( ReadableRaftLog raftLog, Clock clock, long electionTimeout,
MembershipDriver membershipDriver, LogProvider logProvider,
long catchupTimeout, RaftMembershipState membershipState )
RaftMembershipChanger( ReadableRaftLog raftLog, Clock clock, long electionTimeout,
LogProvider logProvider, long catchupTimeout, RaftMembershipManager membershipManager )
{
this.raftLog = raftLog;
this.clock = clock;
this.electionTimeout = electionTimeout;
this.membershipDriver = membershipDriver;
this.catchupTimeout = catchupTimeout;
this.membershipState = membershipState;
this.membershipManager = membershipManager;
this.log = logProvider.getLog( getClass() );
}

Expand All @@ -98,7 +94,7 @@ private synchronized void handleState( RaftMembershipStateMachineEventHandler ne
newState.onEntry();

log.info( newState.toString() );
membershipDriver.stateChanged();
membershipManager.stateChanged();
}
}

Expand Down Expand Up @@ -139,7 +135,7 @@ public RaftMembershipStateMachineEventHandler onRole( Role role )
{
if ( role == Role.LEADER )
{
if ( membershipDriver.uncommittedMemberChangeInLog() )
if ( membershipManager.uncommittedMemberChangeInLog() )
{
return new ConsensusInProgress();
}
Expand Down Expand Up @@ -185,9 +181,9 @@ public RaftMembershipStateMachineEventHandler onMissingMember( CoreMember member
@Override
public RaftMembershipStateMachineEventHandler onSuperfluousMember( CoreMember member )
{
Set<CoreMember> updatedVotingMembers = new HashSet<>( membershipState.votingMembers() );
Set<CoreMember> updatedVotingMembers = new HashSet<>( membershipManager.votingMembers() );
updatedVotingMembers.remove( member );
membershipDriver.doConsensus( updatedVotingMembers );
membershipManager.doConsensus( updatedVotingMembers );

return new ConsensusInProgress();
}
Expand All @@ -213,16 +209,16 @@ private class CatchingUp extends ActiveBaseState
@Override
public void onEntry()
{
membershipState.addAdditionalReplicationMember( catchingUpMember );
membershipManager.addAdditionalReplicationMember( catchingUpMember );
log.info( "Adding replication member: " + catchingUpMember );
}

@Override
public void onExit()
{
if( !movingToConsensus )
if ( !movingToConsensus )
{
membershipState.removeAdditionalReplicationMember( catchingUpMember );
membershipManager.removeAdditionalReplicationMember( catchingUpMember );
log.info( "Removing replication member: " + catchingUpMember );
}
}
Expand All @@ -249,9 +245,9 @@ public RaftMembershipStateMachineEventHandler onFollowerStateChange( FollowerSta
{
if ( catchupGoalTracker.isGoalAchieved() )
{
Set<CoreMember> updatedVotingMembers = new HashSet<>( membershipState.votingMembers() );
Set<CoreMember> updatedVotingMembers = new HashSet<>( membershipManager.votingMembers() );
updatedVotingMembers.add( catchingUpMember );
membershipDriver.doConsensus( updatedVotingMembers );
membershipManager.doConsensus( updatedVotingMembers );

movingToConsensus = true;
return new ConsensusInProgress();
Expand Down Expand Up @@ -301,7 +297,7 @@ public void onEntry()
@Override
public void onExit()
{
membershipState.removeAdditionalReplicationMember( catchingUpMember );
membershipManager.removeAdditionalReplicationMember( catchingUpMember );
log.info( "Removing replication member: " + catchingUpMember );
}

Expand Down

0 comments on commit 3fda82c

Please sign in to comment.