Skip to content

Commit

Permalink
Periodic flushing of state to durable storage.
Browse files Browse the repository at this point in the history
Flushes after a fixed number of raft entries, configurable by
core_edge.state_machine_flush_window_size.
  • Loading branch information
apcj committed Feb 11, 2016
1 parent 59c65b3 commit 1722721
Show file tree
Hide file tree
Showing 93 changed files with 1,798 additions and 4,843 deletions.
Expand Up @@ -19,8 +19,10 @@
*/
package org.neo4j.coreedge.raft;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
Expand All @@ -35,21 +37,27 @@
import org.neo4j.coreedge.raft.membership.RaftMembershipManager;
import org.neo4j.coreedge.raft.net.Inbound;
import org.neo4j.coreedge.raft.net.Outbound;
import org.neo4j.coreedge.raft.outcome.AppendLogEntry;
import org.neo4j.coreedge.raft.outcome.CommitCommand;
import org.neo4j.coreedge.raft.outcome.LogCommand;
import org.neo4j.coreedge.raft.outcome.Outcome;
import org.neo4j.coreedge.raft.replication.ReplicatedContent;
import org.neo4j.coreedge.raft.replication.shipping.RaftLogShippingManager;
import org.neo4j.coreedge.raft.roles.Role;
import org.neo4j.coreedge.raft.state.RaftState;
import org.neo4j.coreedge.raft.state.ReadableRaftState;
import org.neo4j.coreedge.raft.state.StateMachine;
import org.neo4j.coreedge.raft.state.StateStorage;
import org.neo4j.coreedge.raft.state.term.TermState;
import org.neo4j.coreedge.raft.state.vote.VoteState;
import org.neo4j.helpers.Clock;
import org.neo4j.kernel.impl.util.Listener;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

import static java.lang.String.format;
import static java.util.Arrays.asList;

import static org.neo4j.coreedge.raft.roles.Role.LEADER;

Expand All @@ -76,13 +84,14 @@
public class RaftInstance<MEMBER> implements LeaderLocator<MEMBER>, Inbound.MessageHandler, CoreMetaData
{
private final LeaderNotFoundMonitor leaderNotFoundMonitor;
private int flushAfter;

public enum Timeouts implements RenewableTimeoutService.TimeoutName
{
ELECTION, HEARTBEAT
}

private final RaftState<MEMBER> state;
private final RaftState<MEMBER> raftState;
private final MEMBER myself;
private final RaftLog entryLog;

Expand All @@ -91,11 +100,11 @@ public enum Timeouts implements RenewableTimeoutService.TimeoutName
private RenewableTimeoutService.RenewableTimeout electionTimer;
private RaftMembershipManager<MEMBER> membershipManager;

private final StateMachine stateMachine;
private final long electionTimeout;
private final long leaderWaitTimeout;

private final Supplier<DatabaseHealth> databaseHealthSupplier;
private Clock clock;
private final VolatileFuture<MEMBER> volatileLeader = new VolatileFuture<>( null );

private final Outbound<MEMBER> outbound;
Expand All @@ -105,17 +114,19 @@ public enum Timeouts implements RenewableTimeoutService.TimeoutName

private RaftLogShippingManager<MEMBER> logShipping;

public RaftInstance( MEMBER myself, TermState termState, VoteState<MEMBER> voteState, RaftLog entryLog,
long electionTimeout, long heartbeatInterval, RenewableTimeoutService renewableTimeoutService,
public RaftInstance( MEMBER myself, StateStorage<TermState> termStorage,
StateStorage<VoteState<MEMBER>> voteStorage, RaftLog entryLog,
StateMachine stateMachine, long electionTimeout, long heartbeatInterval,
RenewableTimeoutService renewableTimeoutService,
final Inbound inbound, final Outbound<MEMBER> outbound, long leaderWaitTimeout,
LogProvider logProvider, RaftMembershipManager<MEMBER> membershipManager,
RaftLogShippingManager<MEMBER> logShipping,
Supplier<DatabaseHealth> databaseHealthSupplier,
Clock clock, Monitors monitors )

Monitors monitors, int flushAfter )
{
this.myself = myself;
this.entryLog = entryLog;
this.stateMachine = stateMachine;
this.electionTimeout = electionTimeout;
this.heartbeatInterval = heartbeatInterval;

Expand All @@ -125,12 +136,12 @@ public RaftInstance( MEMBER myself, TermState termState, VoteState<MEMBER> voteS
this.outbound = outbound;
this.logShipping = logShipping;
this.databaseHealthSupplier = databaseHealthSupplier;
this.clock = clock;
this.flushAfter = flushAfter;
this.log = logProvider.getLog( getClass() );

this.membershipManager = membershipManager;

this.state = new RaftState<>( myself, termState, membershipManager, entryLog, voteState );
this.raftState = new RaftState<>( myself, termStorage, membershipManager, entryLog, voteStorage );

leaderNotFoundMonitor = monitors.newMonitor( LeaderNotFoundMonitor.class );

Expand Down Expand Up @@ -170,8 +181,16 @@ public synchronized void bootstrapWithInitialMembers( RaftGroup<MEMBER> memberSe

try
{
entryLog.append( membershipLogEntry );
entryLog.commit( 0 );
List<LogCommand> logCommands = asList(
new AppendLogEntry( 0, membershipLogEntry ),
new CommitCommand( 0 )
);
for ( LogCommand logCommand : logCommands )
{
logCommand.applyTo( entryLog );
}
membershipManager.processLog( logCommands );
lastApplied = 0;
}
catch ( RaftStorageException e )
{
Expand All @@ -186,7 +205,7 @@ public void setTargetMembershipSet( Set<MEMBER> targetMembers )

if ( currentRole == LEADER )
{
membershipManager.onFollowerStateChange( state.followerStates() );
membershipManager.onFollowerStateChange( raftState.followerStates() );
}
}

Expand Down Expand Up @@ -222,7 +241,7 @@ public MEMBER getLeader( long timeoutMillis, Predicate<MEMBER> predicate ) throw
public synchronized void registerListener( Listener<MEMBER> listener )
{
leaderListeners.add( listener );
listener.receive( state.leader() );
listener.receive( raftState.leader() );
}

@Override
Expand All @@ -233,13 +252,46 @@ public synchronized void unregisterListener( Listener<MEMBER> listener )

public ReadableRaftState<MEMBER> state()
{
return state;
return raftState;
}

private long lastApplied = -1;

private void handleOutcome( Outcome<MEMBER> outcome ) throws RaftStorageException, IOException
{
adjustLogShipping( outcome );
notifyLeaderChanges( outcome );

raftState.update( outcome );
membershipManager.processLog( outcome.getLogCommands() );

for ( long index = lastApplied + 1; index <= raftState.entryLog().commitIndex(); index++ )
{
ReplicatedContent content = raftState.entryLog().readEntryContent( index );
stateMachine.applyCommand( content, index );
if ( index % this.flushAfter == 0 )
{
stateMachine.flush();
}
}
lastApplied = raftState.entryLog().commitIndex();
volatileLeader.set( outcome.getLeader() );
}

private void notifyLeaderChanges( Outcome<MEMBER> outcome )
{
if ( leaderChanged( outcome, raftState.leader() ) )
{
for ( Listener<MEMBER> listener : leaderListeners )
{
listener.receive( outcome.getLeader() );
}
}
}

private synchronized void handleOutcome( Outcome<MEMBER> outcome ) throws RaftStorageException
private void adjustLogShipping( Outcome<MEMBER> outcome ) throws RaftStorageException
{
// Save interesting pre-state
MEMBER oldLeader = state.leader();
MEMBER oldLeader = raftState.leader();

if ( myself.equals( outcome.getLeader() ) )
{
Expand All @@ -257,18 +309,6 @@ else if ( myself.equals( oldLeader ) && !myself.equals( outcome.getLeader() ) )
{
logShipping.stop();
}

if ( leaderChanged( outcome, state.leader() ) )
{
for ( Listener<MEMBER> listener : leaderListeners )
{
listener.receive( outcome.getLeader() );
}
}

// Update state
state.update( outcome );
volatileLeader.set( outcome.getLeader() );
}

private boolean leaderChanged( Outcome<MEMBER> outcome, MEMBER oldLeader )
Expand All @@ -295,8 +335,9 @@ public synchronized void handle( Message incomingMessage )
try
{
handlingMessage = true;

Outcome<MEMBER> outcome = currentRole.handler.handle( (RaftMessages.RaftMessage<MEMBER>) incomingMessage,
state, log );
raftState, log );

handleOutcome( outcome );
currentRole = outcome.getNewRole();
Expand All @@ -314,10 +355,10 @@ public synchronized void handle( Message incomingMessage )

if ( currentRole == LEADER )
{
membershipManager.onFollowerStateChange( state.followerStates() );
membershipManager.onFollowerStateChange( raftState.followerStates() );
}
}
catch ( RaftStorageException e )
catch ( RaftStorageException | IOException e )
{
log.error( "Failed to process RAFT message " + incomingMessage, e );
databaseHealthSupplier.get().panic( e );
Expand Down Expand Up @@ -365,7 +406,7 @@ public BootstrapException( Throwable cause )

public long term()
{
return state.term();
return raftState.term();
}

private long randomTimeoutRange()
Expand All @@ -382,4 +423,4 @@ public Set<MEMBER> replicationMembers()
{
return membershipManager.replicationMembers();
}
}
}
Expand Up @@ -22,49 +22,17 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

import org.neo4j.coreedge.raft.replication.ReplicatedContent;

public class InMemoryRaftLog implements RaftLog
{
private final Set<Listener> listeners = new CopyOnWriteArraySet<>();
private final Map<Long, RaftLogEntry> raftLog = new HashMap<>();

private long appendIndex = -1;
private long commitIndex = -1;
private long term = -1;

@Override
public void replay() throws Throwable
{
int index = 0;
for (; index <= commitIndex; index++ )
{
ReplicatedContent content = readEntryContent( index );
for ( Listener listener : listeners )
{
listener.onAppended( content, index );
listener.onCommitted( content, index );
}
}
for (; index <= appendIndex; index++ )
{
ReplicatedContent content = readEntryContent( index );
for ( Listener listener : listeners )
{
listener.onAppended( content, index );
}
}
}

@Override
public void registerListener( Listener listener )
{
listeners.add( listener );
}

@Override
public long append( RaftLogEntry logEntry ) throws RaftStorageException
{
Expand All @@ -80,10 +48,6 @@ public long append( RaftLogEntry logEntry ) throws RaftStorageException
}

appendIndex++;
for ( Listener listener : listeners )
{
listener.onAppended( logEntry.content(), appendIndex );
}
raftLog.put( appendIndex, logEntry );
return appendIndex;
}
Expand All @@ -95,17 +59,7 @@ public void commit( long commitIndex )
{
commitIndex = appendIndex;
}
while ( this.commitIndex < commitIndex )
{
long nextCommitIndex = this.commitIndex + 1;

RaftLogEntry logEntry = raftLog.get( nextCommitIndex );
for ( Listener listener : listeners )
{
listener.onCommitted( logEntry.content(), nextCommitIndex );
}
this.commitIndex = nextCommitIndex;
}
this.commitIndex = commitIndex;
}

@Override
Expand Down Expand Up @@ -168,11 +122,6 @@ public synchronized void truncate( long fromIndex )
if ( appendIndex >= fromIndex )
{
appendIndex = fromIndex - 1;

for ( Listener listener : listeners )
{
listener.onTruncated( fromIndex );
}
}
term = readEntryTerm( appendIndex );
}
Expand Down
Expand Up @@ -26,6 +26,9 @@

public class MonitoredRaftLog implements RaftLog
{
public static final String APPEND_INDEX_TAG = "appendIndex";
public static final String COMMIT_INDEX_TAG = "commitIndex";

private final RaftLog delegate;
private final RaftLogAppendIndexMonitor appendIndexMonitor;
private final RaftLogCommitIndexMonitor commitIndexMonitor;
Expand Down Expand Up @@ -59,18 +62,6 @@ public void commit( long commitIndex ) throws RaftStorageException
commitIndexMonitor.commitIndex( delegate.commitIndex() );
}

@Override
public void replay() throws Throwable
{
delegate.replay();
}

@Override
public void registerListener( Listener consumer )
{
delegate.registerListener( consumer );
}

@Override
public long appendIndex()
{
Expand Down

0 comments on commit 1722721

Please sign in to comment.