diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java index 6a77cd18323d3..a6ba8892f5ab9 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java @@ -53,7 +53,6 @@ public class CoreState extends LifecycleAdapter implements RaftStateMachine, Log { private static final long NOTHING = -1; private final RaftLog raftLog; - private final int maxBatchSize; private final StateStorage lastFlushedStorage; private final int flushEvery; private final ProgressTracker progressTracker; @@ -66,7 +65,7 @@ public class CoreState extends LifecycleAdapter implements RaftStateMachine, Log private final CoreServerSelectionStrategy selectionStrategy; private final CoreStateDownloader downloader; private final RaftLogCommitIndexMonitor commitIndexMonitor; - private final List distributedOperations; + private final OperationBatcher batcher; private GlobalSessionTrackerState sessionState = new GlobalSessionTrackerState<>(); private CoreStateMachines coreStateMachines; @@ -92,7 +91,6 @@ public CoreState( Monitors monitors ) { this.raftLog = raftLog; - this.maxBatchSize = maxBatchSize; this.lastFlushedStorage = lastFlushedStorage; this.flushEvery = flushEvery; this.progressTracker = progressTracker; @@ -105,7 +103,7 @@ public CoreState( this.dbHealth = dbHealth; this.inFlightMap = inFlightMap; this.commitIndexMonitor = monitors.newMonitor( RaftLogCommitIndexMonitor.class, getClass() ); - this.distributedOperations = new ArrayList<>( maxBatchSize ); + this.batcher = new OperationBatcher( maxBatchSize ); } synchronized void setStateMachine( CoreStateMachines coreStateMachines ) @@ -116,7 +114,7 @@ synchronized void setStateMachine( CoreStateMachines coreStateMachines ) public void skip( long lastApplied ) { this.lastApplied = this.lastFlushed = lastApplied; - log.info( format("Skipping lastApplied index forward to %d", lastApplied) ); + log.info( format( "Skipping lastApplied index forward to %d", lastApplied ) ); } @Override @@ -134,59 +132,29 @@ public synchronized void notifyCommitted( long commitIndex ) private void submitApplyJob( long lastToApply ) { applier.submit( ( status ) -> () -> { - try ( LogEntrySupplier cacheLogEntrySupplier = new CacheApplier(); - LogEntrySupplier cursorLogEntrySupplier = new CursorApplier() ) + try ( LogEntrySupplier logEntrySupplier = new LogEntrySupplier() ) { - LogEntrySupplier currentLogEntrySupplier = cacheLogEntrySupplier; lastApplyingStorage.persistStoreData( lastToApply ); - for ( long i = lastApplied + 1; !status.isCancelled() && i <= lastToApply; i++ ) + for ( long logIndex = lastApplied + 1; !status.isCancelled() && logIndex <= lastToApply; logIndex++ ) { - RaftLogEntry raftLogEntry = currentLogEntrySupplier.get( i ); - if ( raftLogEntry == null ) + RaftLogEntry entry = logEntrySupplier.get( logIndex ); + if ( entry == null ) { - if ( currentLogEntrySupplier == cursorLogEntrySupplier ) - { - throw new IllegalStateException( - format( "Index %d not found in the raft log. lastToApply was: %d, append index " + - "is %d", i, lastToApply, raftLog.appendIndex() ) ); - } - currentLogEntrySupplier = cursorLogEntrySupplier; - raftLogEntry = currentLogEntrySupplier.get( i ); + throw new IllegalStateException( "Committed log entry must exist." ); } - if ( raftLogEntry == null ) + if ( entry.content() instanceof DistributedOperation ) { - String failureMessage = format( - "Log entry at index %d is not present in the log. " + - "Please perform recovery by restarting this database instance. " + - "Indexes to be applied are from %d up to %d. " + - "The append index is %d, and the prev index is %d.", - i, lastApplied + 1, lastToApply, raftLog.appendIndex(), raftLog.prevIndex() ); - log.error( failureMessage ); - throw new IllegalStateException( failureMessage ); - } - - if ( raftLogEntry.content() instanceof DistributedOperation ) - { - DistributedOperation distributedOperation = (DistributedOperation) raftLogEntry.content(); + DistributedOperation distributedOperation = (DistributedOperation) entry.content(); progressTracker.trackReplication( distributedOperation ); - distributedOperations.add( distributedOperation ); - - if ( distributedOperations.size() >= maxBatchSize ) - { - handleBatch( i, distributedOperations ); - } + batcher.add( logIndex, distributedOperation ); } else { - handleBatch( i - 1 /* ignore the current entry */, distributedOperations ); - assertIndexIsValidAndIncrementLastApplied( i, 1 ); + batcher.flush(); } - - // no matter what, we need to purge the inflight cache from applied entries - inFlightMap.unregister( i ); } - handleBatch( lastToApply, distributedOperations ); + batcher.flush(); } catch ( Throwable e ) { @@ -196,40 +164,85 @@ private void submitApplyJob( long lastToApply ) } ); } - private interface LogEntrySupplier extends AutoCloseable + private class OperationBatcher { - RaftLogEntry get( long indexToApply ) throws IOException; - } + private List batch; + private int maxBatchSize; + private long lastIndex; - private class CacheApplier implements LogEntrySupplier - { - @Override - public RaftLogEntry get( long indexToApply ) throws IOException + OperationBatcher( int maxBatchSize ) { - return inFlightMap.retrieve( indexToApply ); + this.batch = new ArrayList<>( maxBatchSize ); + this.maxBatchSize = maxBatchSize; } - @Override - public void close() throws Exception + private void add( long index, DistributedOperation operation ) throws Exception + { + if ( batch.size() > 0 ) + { + assert index == (lastIndex + 1); + } + + batch.add( operation ); + lastIndex = index; + + if ( batch.size() == maxBatchSize ) + { + flush(); + } + } + + private void flush() throws Exception { + if ( batch.size() == 0 ) + { + return; + } + + long startIndex = lastIndex - batch.size() + 1; + handleOperations( startIndex, batch ); + lastApplied = lastIndex; + batch.clear(); + maybeFlush(); } } - private class CursorApplier implements LogEntrySupplier + private class LogEntrySupplier implements AutoCloseable { - RaftLogCursor cursor = null; + private RaftLogCursor cursor; + private boolean useInFlightMap = true; - @Override - public RaftLogEntry get( long indexToApply ) throws IOException + public RaftLogEntry get( long logIndex ) throws IOException + { + RaftLogEntry entry = null; + + if ( useInFlightMap ) + { + entry = inFlightMap.retrieve( logIndex ); + } + + if ( entry == null ) + { + useInFlightMap = false; + entry = getUsingCursor( logIndex ); + } + + inFlightMap.unregister( logIndex ); + + return entry; + } + + private RaftLogEntry getUsingCursor( long logIndex ) throws IOException { if ( cursor == null ) { - cursor = raftLog.getEntryCursor( indexToApply ); + cursor = raftLog.getEntryCursor( logIndex ); } if ( cursor.next() ) { + assert cursor.index() == logIndex; return cursor.get(); } else @@ -248,24 +261,6 @@ public void close() throws Exception } } - private void handleBatch( long lastIndex, List distributedOperations ) throws Exception - { - if ( !distributedOperations.isEmpty() ) - { - long startIndex = lastIndex - distributedOperations.size() + 1; - handleOperations( startIndex, distributedOperations ); - assertIndexIsValidAndIncrementLastApplied( lastIndex, distributedOperations.size() ); - distributedOperations.clear(); - maybeFlush(); - } - } - - private void assertIndexIsValidAndIncrementLastApplied( long index, int increment ) - { - assert index == (lastApplied + increment); - lastApplied = index; - } - @Override public synchronized void notifyNeedFreshSnapshot() {