Skip to content

Commit

Permalink
Remove last applying storage
Browse files Browse the repository at this point in the history
On startup read the last applying index from the state machines rather
than writing it in a storage file on disk.

This should improve performance when applying commands since there is
no need to flush on disk applying index updates.
  • Loading branch information
davidegrohmann committed Jun 16, 2016
1 parent d6bb6dc commit 9872b5a
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 39 deletions.
Expand Up @@ -76,6 +76,12 @@ public void flush() throws IOException
storage.persistStoreData( state ); storage.persistStoreData( state );
} }


@Override
public long lastAppliedIndex()
{
return storage.getInitialState().logIndex();
}

public synchronized IdAllocationState snapshot() public synchronized IdAllocationState snapshot()
{ {
return state.newInstance(); return state.newInstance();
Expand Down
Expand Up @@ -55,7 +55,7 @@ public class ReplicatedTokenStateMachine<TOKEN extends Token> implements StateMa
private final TokenFactory<TOKEN> tokenFactory; private final TokenFactory<TOKEN> tokenFactory;


private final Log log; private final Log log;
private long lastCommittedIndex = Long.MAX_VALUE; private long lastCommittedIndex = -1;


public ReplicatedTokenStateMachine( TokenRegistry<TOKEN> tokenRegistry, TokenFactory<TOKEN> tokenFactory, public ReplicatedTokenStateMachine( TokenRegistry<TOKEN> tokenRegistry, TokenFactory<TOKEN> tokenFactory,
LogProvider logProvider ) LogProvider logProvider )
Expand Down Expand Up @@ -139,4 +139,10 @@ public synchronized void flush() throws IOException
{ {
// already implicitly flushed to the store // already implicitly flushed to the store
} }

@Override
public long lastAppliedIndex()
{
return lastCommittedIndex;
}
} }
Expand Up @@ -66,12 +66,6 @@ public synchronized void installCommitProcess( TransactionCommitProcess commitPr
commitProcess.commit( txs, CommitEvent.NULL, TransactionApplicationMode.EXTERNAL ) ); commitProcess.commit( txs, CommitEvent.NULL, TransactionApplicationMode.EXTERNAL ) );
} }


@Override
public void flush() throws IOException
{
// implicitly flushed
}

@Override @Override
public synchronized void applyCommand( ReplicatedTransaction replicatedTx, long commandIndex, Consumer<Result> callback ) public synchronized void applyCommand( ReplicatedTransaction replicatedTx, long commandIndex, Consumer<Result> callback )
{ {
Expand Down Expand Up @@ -110,6 +104,18 @@ public synchronized void applyCommand( ReplicatedTransaction replicatedTx, long
} }
} }


@Override
public void flush() throws IOException
{
// implicitly flushed
}

@Override
public long lastAppliedIndex()
{
return lastCommittedIndex;
}

public synchronized void ensuredApplied() public synchronized void ensuredApplied()
{ {
try try
Expand Down
Expand Up @@ -55,7 +55,6 @@ public class CoreState extends LifecycleAdapter implements RaftStateMachine<Core
private final StateStorage<Long> lastFlushedStorage; private final StateStorage<Long> lastFlushedStorage;
private final int flushEvery; private final int flushEvery;
private final ProgressTracker progressTracker; private final ProgressTracker progressTracker;
private final StateStorage<Long> lastApplyingStorage;
private final StateStorage<GlobalSessionTrackerState<CoreMember>> sessionStorage; private final StateStorage<GlobalSessionTrackerState<CoreMember>> sessionStorage;
private final Supplier<DatabaseHealth> dbHealth; private final Supplier<DatabaseHealth> dbHealth;
private final InFlightMap<Long,RaftLogEntry> inFlightMap; private final InFlightMap<Long,RaftLogEntry> inFlightMap;
Expand All @@ -80,7 +79,6 @@ public CoreState(
LogProvider logProvider, LogProvider logProvider,
ProgressTracker progressTracker, ProgressTracker progressTracker,
StateStorage<Long> lastFlushedStorage, StateStorage<Long> lastFlushedStorage,
StateStorage<Long> lastApplyingStorage,
StateStorage<GlobalSessionTrackerState<CoreMember>> sessionStorage, StateStorage<GlobalSessionTrackerState<CoreMember>> sessionStorage,
CoreStateApplier applier, CoreStateApplier applier,
CoreStateDownloader downloader, CoreStateDownloader downloader,
Expand All @@ -91,7 +89,6 @@ public CoreState(
this.lastFlushedStorage = lastFlushedStorage; this.lastFlushedStorage = lastFlushedStorage;
this.flushEvery = flushEvery; this.flushEvery = flushEvery;
this.progressTracker = progressTracker; this.progressTracker = progressTracker;
this.lastApplyingStorage = lastApplyingStorage;
this.sessionStorage = sessionStorage; this.sessionStorage = sessionStorage;
this.applier = applier; this.applier = applier;
this.downloader = downloader; this.downloader = downloader;
Expand Down Expand Up @@ -130,7 +127,6 @@ private void submitApplyJob( long lastToApply )
applier.submit( ( status ) -> () -> { applier.submit( ( status ) -> () -> {
try ( InFlightLogEntrySupplier logEntrySupplier = new InFlightLogEntrySupplier( raftLog, inFlightMap ) ) try ( InFlightLogEntrySupplier logEntrySupplier = new InFlightLogEntrySupplier( raftLog, inFlightMap ) )
{ {
lastApplyingStorage.persistStoreData( lastToApply );
for ( long logIndex = lastApplied + 1; !status.isCancelled() && logIndex <= lastToApply; logIndex++ ) for ( long logIndex = lastApplied + 1; !status.isCancelled() && logIndex <= lastToApply; logIndex++ )
{ {
RaftLogEntry entry = logEntrySupplier.get( logIndex ); RaftLogEntry entry = logEntrySupplier.get( logIndex );
Expand Down Expand Up @@ -284,7 +280,7 @@ public synchronized void start() throws IOException, InterruptedException
log.info( format( "Restoring last applied index to %d", lastApplied ) ); log.info( format( "Restoring last applied index to %d", lastApplied ) );
sessionState = sessionStorage.getInitialState(); sessionState = sessionStorage.getInitialState();


submitApplyJob( lastApplyingStorage.getInitialState() ); submitApplyJob( coreStateMachines.getApplyingIndex() );
applier.sync( false ); applier.sync( false );
} }


Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; import org.neo4j.coreedge.catchup.storecopy.LocalDatabase;
import org.neo4j.coreedge.catchup.storecopy.core.CoreStateType; import org.neo4j.coreedge.catchup.storecopy.core.CoreStateType;
import org.neo4j.coreedge.raft.log.MonitoredRaftLog; import org.neo4j.coreedge.raft.log.MonitoredRaftLog;
import org.neo4j.coreedge.raft.outcome.ShipCommand;
import org.neo4j.coreedge.raft.replication.id.ReplicatedIdAllocationRequest; import org.neo4j.coreedge.raft.replication.id.ReplicatedIdAllocationRequest;
import org.neo4j.coreedge.raft.replication.id.ReplicatedIdAllocationStateMachine; import org.neo4j.coreedge.raft.replication.id.ReplicatedIdAllocationStateMachine;
import org.neo4j.coreedge.raft.replication.token.ReplicatedTokenRequest; import org.neo4j.coreedge.raft.replication.token.ReplicatedTokenRequest;
Expand All @@ -39,6 +40,8 @@
import org.neo4j.kernel.impl.core.RelationshipTypeToken; import org.neo4j.kernel.impl.core.RelationshipTypeToken;
import org.neo4j.storageengine.api.Token; import org.neo4j.storageengine.api.Token;


import static java.lang.Math.max;

public class CoreStateMachines public class CoreStateMachines
{ {
private final ReplicatedTransactionStateMachine<CoreMember> replicatedTxStateMachine; private final ReplicatedTransactionStateMachine<CoreMember> replicatedTxStateMachine;
Expand Down Expand Up @@ -197,4 +200,17 @@ public void close()
replicatedTxStateMachine.ensuredApplied(); replicatedTxStateMachine.ensuredApplied();
} }
} }

public long getApplyingIndex()
{
long lastAppliedTxIndex = replicatedTxStateMachine.lastAppliedIndex();
assert lastAppliedTxIndex == labelTokenStateMachine.lastAppliedIndex();
assert lastAppliedTxIndex == relationshipTypeTokenStateMachine.lastAppliedIndex();
assert lastAppliedTxIndex == propertyKeyTokenStateMachine.lastAppliedIndex();

long lastAppliedLockTokenIndex = replicatedLockTokenStateMachine.lastAppliedIndex();
long lastAppliedIdAllocationIndex = idAllocationStateMachine.lastAppliedIndex();

return max( max( lastAppliedLockTokenIndex, lastAppliedIdAllocationIndex ), lastAppliedIdAllocationIndex );
}
} }
Expand Up @@ -39,4 +39,10 @@ public interface StateMachine<Command>
* @throws IOException * @throws IOException
*/ */
void flush() throws IOException; void flush() throws IOException;

/**
* Return the index of the last applied command by this state machine.
* @return the last applied index for this state machine
*/
long lastAppliedIndex();
} }
Expand Up @@ -334,11 +334,6 @@ fileSystem, new File( clusterStateDirectory, "last-flushed-state" ), "last-flush
new LongIndexMarshal(), config.get( CoreEdgeClusterSettings.last_flushed_state_size ), new LongIndexMarshal(), config.get( CoreEdgeClusterSettings.last_flushed_state_size ),
databaseHealthSupplier, logProvider ) ); databaseHealthSupplier, logProvider ) );


DurableStateStorage<Long> lastApplyingStorage = life.add( new DurableStateStorage<>(
fileSystem, new File( clusterStateDirectory, "last-applying-state" ), "last-applying",
new LongIndexMarshal(), config.get( CoreEdgeClusterSettings.last_flushed_state_size ),
databaseHealthSupplier, logProvider ) );

StateStorage<GlobalSessionTrackerState<CoreMember>> sessionTrackerStorage; StateStorage<GlobalSessionTrackerState<CoreMember>> sessionTrackerStorage;
try try
{ {
Expand All @@ -363,7 +358,7 @@ fileSystem, new File( clusterStateDirectory, "session-tracker-state" ), "session
coreState = new CoreState( coreState = new CoreState(
raftLog, config.get( CoreEdgeClusterSettings.state_machine_apply_max_batch_size ), raftLog, config.get( CoreEdgeClusterSettings.state_machine_apply_max_batch_size ),
config.get( CoreEdgeClusterSettings.state_machine_flush_window_size ), config.get( CoreEdgeClusterSettings.state_machine_flush_window_size ),
databaseHealthSupplier, logProvider, progressTracker, lastFlushedStorage, lastApplyingStorage, databaseHealthSupplier, logProvider, progressTracker, lastFlushedStorage,
sessionTrackerStorage, applier, downloader, inFlightMap, platformModule.monitors ); sessionTrackerStorage, applier, downloader, inFlightMap, platformModule.monitors );


raft = createRaft( life, loggingOutbound, discoveryService, config, messageLogger, raftLog, raft = createRaft( life, loggingOutbound, discoveryService, config, messageLogger, raftLog,
Expand Down Expand Up @@ -565,7 +560,6 @@ private RaftLog createRaftLog(
case SEGMENTED: case SEGMENTED:
{ {
long rotateAtSize = config.get( CoreEdgeClusterSettings.raft_log_rotation_size ); long rotateAtSize = config.get( CoreEdgeClusterSettings.raft_log_rotation_size );
int metaDataCacheSize = config.get( CoreEdgeClusterSettings.raft_log_meta_data_cache_size );
String pruningStrategyConfig = config.get( CoreEdgeClusterSettings.raft_log_pruning_strategy ); String pruningStrategyConfig = config.get( CoreEdgeClusterSettings.raft_log_pruning_strategy );
int entryCacheSize = config.get( CoreEdgeClusterSettings.raft_log_entry_cache_size ); int entryCacheSize = config.get( CoreEdgeClusterSettings.raft_log_entry_cache_size );


Expand Down
Expand Up @@ -66,6 +66,12 @@ public synchronized void flush() throws IOException
storage.persistStoreData( state ); storage.persistStoreData( state );
} }


@Override
public long lastAppliedIndex()
{
return storage.getInitialState().ordinal();
}

public synchronized ReplicatedLockTokenState<MEMBER> snapshot() public synchronized ReplicatedLockTokenState<MEMBER> snapshot()
{ {
return state.newInstance(); return state.newInstance();
Expand Down
Expand Up @@ -39,7 +39,6 @@
import org.neo4j.coreedge.raft.replication.tx.CoreReplicatedContent; import org.neo4j.coreedge.raft.replication.tx.CoreReplicatedContent;
import org.neo4j.coreedge.raft.replication.tx.ReplicatedTransaction; import org.neo4j.coreedge.raft.replication.tx.ReplicatedTransaction;
import org.neo4j.coreedge.server.CoreMember; import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.coreedge.server.edge.CoreServerSelectionStrategy;
import org.neo4j.kernel.impl.core.DatabasePanicEventGenerator; import org.neo4j.kernel.impl.core.DatabasePanicEventGenerator;
import org.neo4j.kernel.internal.DatabaseHealth; import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.kernel.monitoring.Monitors;
Expand All @@ -66,7 +65,6 @@ public class CoreStateTest
private final InMemoryRaftLog raftLog = spy( new InMemoryRaftLog() ); private final InMemoryRaftLog raftLog = spy( new InMemoryRaftLog() );


private final InMemoryStateStorage<Long> lastFlushedStorage = new InMemoryStateStorage<>( -1L ); private final InMemoryStateStorage<Long> lastFlushedStorage = new InMemoryStateStorage<>( -1L );
private final InMemoryStateStorage<Long> lastApplyingStorage = new InMemoryStateStorage<>( -1L );
private final InMemoryStateStorage<GlobalSessionTrackerState<CoreMember>> sessionStorage = private final InMemoryStateStorage<GlobalSessionTrackerState<CoreMember>> sessionStorage =
new InMemoryStateStorage<>( new GlobalSessionTrackerState<>() ); new InMemoryStateStorage<>( new GlobalSessionTrackerState<>() );


Expand All @@ -82,15 +80,16 @@ public class CoreStateTest
private final Monitors monitors = new Monitors(); private final Monitors monitors = new Monitors();
private final CoreState coreState = new CoreState( raftLog, batchSize, flushEvery, () -> dbHealth, private final CoreState coreState = new CoreState( raftLog, batchSize, flushEvery, () -> dbHealth,
NullLogProvider.getInstance(), new ProgressTrackerImpl( globalSession ), lastFlushedStorage, NullLogProvider.getInstance(), new ProgressTrackerImpl( globalSession ), lastFlushedStorage,
lastApplyingStorage, sessionStorage, applier, mock( CoreStateDownloader.class ), inFlightMap, monitors ); sessionStorage, applier, mock( CoreStateDownloader.class ), inFlightMap, monitors );


private ReplicatedTransaction nullTx = new ReplicatedTransaction( null ); private ReplicatedTransaction nullTx = new ReplicatedTransaction( null );


private final CommandDispatcher commandDispatcher = mock( CommandDispatcher.class ); private final CommandDispatcher commandDispatcher = mock( CommandDispatcher.class );
private final CoreStateMachines txStateMachine = mock( CoreStateMachines.class ); private final CoreStateMachines coreStateMachines = mock( CoreStateMachines.class );


{ {
when( txStateMachine.commandDispatcher() ).thenReturn( commandDispatcher ); when( coreStateMachines.commandDispatcher() ).thenReturn( commandDispatcher );
when( coreStateMachines.getApplyingIndex() ).thenReturn( -1L );
} }


private int sequenceNumber = 0; private int sequenceNumber = 0;
Expand All @@ -105,10 +104,10 @@ public void shouldApplyCommittedCommand() throws Throwable
// given // given
RaftLogCommitIndexMonitor listener = mock( RaftLogCommitIndexMonitor.class ); RaftLogCommitIndexMonitor listener = mock( RaftLogCommitIndexMonitor.class );
monitors.addMonitorListener( listener ); monitors.addMonitorListener( listener );
coreState.setStateMachine( txStateMachine ); coreState.setStateMachine( coreStateMachines );
coreState.start(); coreState.start();


InOrder inOrder = inOrder( txStateMachine, commandDispatcher ); InOrder inOrder = inOrder( coreStateMachines, commandDispatcher );


// when // when
raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) ); raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) );
Expand All @@ -118,7 +117,7 @@ public void shouldApplyCommittedCommand() throws Throwable
applier.sync( false ); applier.sync( false );


// then // then
inOrder.verify( txStateMachine ).commandDispatcher(); inOrder.verify( coreStateMachines ).commandDispatcher();
inOrder.verify( commandDispatcher ).dispatch( eq( nullTx ), eq( 0L ), anyCallback() ); inOrder.verify( commandDispatcher ).dispatch( eq( nullTx ), eq( 0L ), anyCallback() );
inOrder.verify( commandDispatcher ).dispatch( eq( nullTx ), eq( 1L ), anyCallback() ); inOrder.verify( commandDispatcher ).dispatch( eq( nullTx ), eq( 1L ), anyCallback() );
inOrder.verify( commandDispatcher ).dispatch( eq( nullTx ), eq( 2L ), anyCallback() ); inOrder.verify( commandDispatcher ).dispatch( eq( nullTx ), eq( 2L ), anyCallback() );
Expand All @@ -131,7 +130,7 @@ public void shouldApplyCommittedCommand() throws Throwable
public void shouldNotApplyUncommittedCommands() throws Throwable public void shouldNotApplyUncommittedCommands() throws Throwable
{ {
// given // given
coreState.setStateMachine( txStateMachine ); coreState.setStateMachine( coreStateMachines );
coreState.start(); coreState.start();


// when // when
Expand All @@ -148,7 +147,7 @@ public void shouldNotApplyUncommittedCommands() throws Throwable
public void entriesThatAreNotStateMachineCommandsShouldStillIncreaseCommandIndex() throws Throwable public void entriesThatAreNotStateMachineCommandsShouldStillIncreaseCommandIndex() throws Throwable
{ {
// given // given
coreState.setStateMachine( txStateMachine ); coreState.setStateMachine( coreStateMachines );
coreState.start(); coreState.start();


// when // when
Expand All @@ -157,10 +156,10 @@ public void entriesThatAreNotStateMachineCommandsShouldStillIncreaseCommandIndex
coreState.notifyCommitted( 1 ); coreState.notifyCommitted( 1 );
applier.sync( false ); applier.sync( false );


InOrder inOrder = inOrder( txStateMachine, commandDispatcher ); InOrder inOrder = inOrder( coreStateMachines, commandDispatcher );


// then // then
inOrder.verify( txStateMachine ).commandDispatcher(); inOrder.verify( coreStateMachines ).commandDispatcher();
inOrder.verify( commandDispatcher ).dispatch( eq( nullTx ), eq( 1L ), anyCallback() ); inOrder.verify( commandDispatcher ).dispatch( eq( nullTx ), eq( 1L ), anyCallback() );
inOrder.verify( commandDispatcher ).close(); inOrder.verify( commandDispatcher ).close();
} }
Expand All @@ -171,7 +170,7 @@ public void entriesThatAreNotStateMachineCommandsShouldStillIncreaseCommandIndex
public void shouldPeriodicallyFlushState() throws Throwable public void shouldPeriodicallyFlushState() throws Throwable
{ {
// given // given
coreState.setStateMachine( txStateMachine ); coreState.setStateMachine( coreStateMachines );
coreState.start(); coreState.start();


int interactions = flushEvery * 5; int interactions = flushEvery * 5;
Expand All @@ -185,7 +184,7 @@ public void shouldPeriodicallyFlushState() throws Throwable
applier.sync( false ); applier.sync( false );


// then // then
verify( txStateMachine, times( interactions / batchSize ) ).flush(); verify( coreStateMachines, times( interactions / batchSize ) ).flush();
assertEquals( interactions - ( interactions % batchSize) - 1, (long) lastFlushedStorage.getInitialState() ); assertEquals( interactions - ( interactions % batchSize) - 1, (long) lastFlushedStorage.getInitialState() );
} }


Expand All @@ -195,6 +194,7 @@ public void shouldPanicIfUnableToApply() throws Throwable
// given // given
doThrow( IllegalStateException.class ).when( commandDispatcher ) doThrow( IllegalStateException.class ).when( commandDispatcher )
.dispatch( any( ReplicatedTransaction.class ), anyLong(), anyCallback() ); .dispatch( any( ReplicatedTransaction.class ), anyLong(), anyCallback() );
coreState.setStateMachine( coreStateMachines );
coreState.start(); coreState.start();


raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) ); raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) );
Expand All @@ -214,7 +214,7 @@ public void shouldApplyToLogFromCache() throws Throwable
//given n things to apply in the cache, check that they are actually applied. //given n things to apply in the cache, check that they are actually applied.


// given // given
coreState.setStateMachine( txStateMachine ); coreState.setStateMachine( coreStateMachines );
coreState.start(); coreState.start();


inFlightMap.register( 0L, new RaftLogEntry( 1, operation( nullTx ) ) ); inFlightMap.register( 0L, new RaftLogEntry( 1, operation( nullTx ) ) );
Expand All @@ -232,9 +232,7 @@ public void shouldApplyToLogFromCache() throws Throwable
public void cacheEntryShouldBePurgedWhenApplied() throws Throwable public void cacheEntryShouldBePurgedWhenApplied() throws Throwable
{ {
//given a cache in submitApplyJob, the contents of the cache should only contain unapplied "things" //given a cache in submitApplyJob, the contents of the cache should only contain unapplied "things"

coreState.setStateMachine( coreStateMachines );
// given
coreState.setStateMachine( txStateMachine );
coreState.start(); coreState.start();


inFlightMap.register( 0L, new RaftLogEntry( 0, operation( nullTx ) ) ); inFlightMap.register( 0L, new RaftLogEntry( 0, operation( nullTx ) ) );
Expand All @@ -256,7 +254,7 @@ public void shouldFallbackToLogCursorOnCacheMiss() throws Throwable
{ {
// if the cache does not contain all things to be applied, make sure we fall back to the log // if the cache does not contain all things to be applied, make sure we fall back to the log
// should only happen in recovery, otherwise this is probably a bug. // should only happen in recovery, otherwise this is probably a bug.
coreState.setStateMachine( txStateMachine ); coreState.setStateMachine( coreStateMachines );
coreState.start(); coreState.start();


//given cache with missing entry //given cache with missing entry
Expand Down Expand Up @@ -292,7 +290,7 @@ public void shouldFailWhenCacheAndLogMiss() throws Throwable
{ {
//When an entry is not in the log, we must fail. //When an entry is not in the log, we must fail.


coreState.setStateMachine( txStateMachine ); coreState.setStateMachine( coreStateMachines );
coreState.start(); coreState.start();


inFlightMap.register( 0L, new RaftLogEntry( 0, operation( nullTx ) ) ); inFlightMap.register( 0L, new RaftLogEntry( 0, operation( nullTx ) ) );
Expand Down

0 comments on commit 9872b5a

Please sign in to comment.