Skip to content

Commit

Permalink
core-edge: refactor log entry supplier and batcher
Browse files Browse the repository at this point in the history
Pushed most of the logic for retreiving log entries from in-memory and
on-disk structures to LogEntrySupplier and the logic for batching to
OperationBatcher. The code reads easier now, I claim.
  • Loading branch information
martinfurmanski committed Jun 9, 2016
1 parent 805888d commit f173fe5
Showing 1 changed file with 73 additions and 78 deletions.
Expand Up @@ -53,7 +53,6 @@ public class CoreState extends LifecycleAdapter implements RaftStateMachine, Log
{
private static final long NOTHING = -1;
private final RaftLog raftLog;
private final int maxBatchSize;
private final StateStorage<Long> lastFlushedStorage;
private final int flushEvery;
private final ProgressTracker progressTracker;
Expand All @@ -66,7 +65,7 @@ public class CoreState extends LifecycleAdapter implements RaftStateMachine, Log
private final CoreServerSelectionStrategy selectionStrategy;
private final CoreStateDownloader downloader;
private final RaftLogCommitIndexMonitor commitIndexMonitor;
private final List<DistributedOperation> distributedOperations;
private final OperationBatcher batcher;

private GlobalSessionTrackerState<CoreMember> sessionState = new GlobalSessionTrackerState<>();
private CoreStateMachines coreStateMachines;
Expand All @@ -92,7 +91,6 @@ public CoreState(
Monitors monitors )
{
this.raftLog = raftLog;
this.maxBatchSize = maxBatchSize;
this.lastFlushedStorage = lastFlushedStorage;
this.flushEvery = flushEvery;
this.progressTracker = progressTracker;
Expand All @@ -105,7 +103,7 @@ public CoreState(
this.dbHealth = dbHealth;
this.inFlightMap = inFlightMap;
this.commitIndexMonitor = monitors.newMonitor( RaftLogCommitIndexMonitor.class, getClass() );
this.distributedOperations = new ArrayList<>( maxBatchSize );
this.batcher = new OperationBatcher( maxBatchSize );
}

synchronized void setStateMachine( CoreStateMachines coreStateMachines )
Expand All @@ -116,7 +114,7 @@ synchronized void setStateMachine( CoreStateMachines coreStateMachines )
public void skip( long lastApplied )
{
this.lastApplied = this.lastFlushed = lastApplied;
log.info( format("Skipping lastApplied index forward to %d", lastApplied) );
log.info( format( "Skipping lastApplied index forward to %d", lastApplied ) );
}

@Override
Expand All @@ -134,59 +132,29 @@ public synchronized void notifyCommitted( long commitIndex )
private void submitApplyJob( long lastToApply )
{
applier.submit( ( status ) -> () -> {
try ( LogEntrySupplier cacheLogEntrySupplier = new CacheApplier();
LogEntrySupplier cursorLogEntrySupplier = new CursorApplier() )
try ( LogEntrySupplier logEntrySupplier = new LogEntrySupplier() )
{
LogEntrySupplier currentLogEntrySupplier = cacheLogEntrySupplier;
lastApplyingStorage.persistStoreData( lastToApply );
for ( long i = lastApplied + 1; !status.isCancelled() && i <= lastToApply; i++ )
for ( long logIndex = lastApplied + 1; !status.isCancelled() && logIndex <= lastToApply; logIndex++ )
{
RaftLogEntry raftLogEntry = currentLogEntrySupplier.get( i );
if ( raftLogEntry == null )
RaftLogEntry entry = logEntrySupplier.get( logIndex );
if ( entry == null )
{
if ( currentLogEntrySupplier == cursorLogEntrySupplier )
{
throw new IllegalStateException(
format( "Index %d not found in the raft log. lastToApply was: %d, append index " +
"is %d", i, lastToApply, raftLog.appendIndex() ) );
}
currentLogEntrySupplier = cursorLogEntrySupplier;
raftLogEntry = currentLogEntrySupplier.get( i );
throw new IllegalStateException( "Committed log entry must exist." );
}

if ( raftLogEntry == null )
if ( entry.content() instanceof DistributedOperation )
{
String failureMessage = format(
"Log entry at index %d is not present in the log. " +
"Please perform recovery by restarting this database instance. " +
"Indexes to be applied are from %d up to %d. " +
"The append index is %d, and the prev index is %d.",
i, lastApplied + 1, lastToApply, raftLog.appendIndex(), raftLog.prevIndex() );
log.error( failureMessage );
throw new IllegalStateException( failureMessage );
}

if ( raftLogEntry.content() instanceof DistributedOperation )
{
DistributedOperation distributedOperation = (DistributedOperation) raftLogEntry.content();
DistributedOperation distributedOperation = (DistributedOperation) entry.content();
progressTracker.trackReplication( distributedOperation );
distributedOperations.add( distributedOperation );

if ( distributedOperations.size() >= maxBatchSize )
{
handleBatch( i, distributedOperations );
}
batcher.add( logIndex, distributedOperation );
}
else
{
handleBatch( i - 1 /* ignore the current entry */, distributedOperations );
assertIndexIsValidAndIncrementLastApplied( i, 1 );
batcher.flush();
}

// no matter what, we need to purge the inflight cache from applied entries
inFlightMap.unregister( i );
}
handleBatch( lastToApply, distributedOperations );
batcher.flush();
}
catch ( Throwable e )
{
Expand All @@ -196,40 +164,85 @@ private void submitApplyJob( long lastToApply )
} );
}

private interface LogEntrySupplier extends AutoCloseable
private class OperationBatcher
{
RaftLogEntry get( long indexToApply ) throws IOException;
}
private List<DistributedOperation> batch;
private int maxBatchSize;
private long lastIndex;

private class CacheApplier implements LogEntrySupplier
{
@Override
public RaftLogEntry get( long indexToApply ) throws IOException
OperationBatcher( int maxBatchSize )
{
return inFlightMap.retrieve( indexToApply );
this.batch = new ArrayList<>( maxBatchSize );
this.maxBatchSize = maxBatchSize;
}

@Override
public void close() throws Exception
private void add( long index, DistributedOperation operation ) throws Exception
{
if ( batch.size() > 0 )
{
assert index == (lastIndex + 1);
}

batch.add( operation );
lastIndex = index;

if ( batch.size() == maxBatchSize )
{
flush();
}
}

private void flush() throws Exception
{
if ( batch.size() == 0 )
{
return;
}

long startIndex = lastIndex - batch.size() + 1;
handleOperations( startIndex, batch );
lastApplied = lastIndex;

batch.clear();
maybeFlush();
}
}

private class CursorApplier implements LogEntrySupplier
private class LogEntrySupplier implements AutoCloseable
{
RaftLogCursor cursor = null;
private RaftLogCursor cursor;
private boolean useInFlightMap = true;

@Override
public RaftLogEntry get( long indexToApply ) throws IOException
public RaftLogEntry get( long logIndex ) throws IOException
{
RaftLogEntry entry = null;

if ( useInFlightMap )
{
entry = inFlightMap.retrieve( logIndex );
}

if ( entry == null )
{
useInFlightMap = false;
entry = getUsingCursor( logIndex );
}

inFlightMap.unregister( logIndex );

return entry;
}

private RaftLogEntry getUsingCursor( long logIndex ) throws IOException
{
if ( cursor == null )
{
cursor = raftLog.getEntryCursor( indexToApply );
cursor = raftLog.getEntryCursor( logIndex );
}

if ( cursor.next() )
{
assert cursor.index() == logIndex;
return cursor.get();
}
else
Expand All @@ -248,24 +261,6 @@ public void close() throws Exception
}
}

private void handleBatch( long lastIndex, List<DistributedOperation> distributedOperations ) throws Exception
{
if ( !distributedOperations.isEmpty() )
{
long startIndex = lastIndex - distributedOperations.size() + 1;
handleOperations( startIndex, distributedOperations );
assertIndexIsValidAndIncrementLastApplied( lastIndex, distributedOperations.size() );
distributedOperations.clear();
maybeFlush();
}
}

private void assertIndexIsValidAndIncrementLastApplied( long index, int increment )
{
assert index == (lastApplied + increment);
lastApplied = index;
}

@Override
public synchronized void notifyNeedFreshSnapshot()
{
Expand Down

0 comments on commit f173fe5

Please sign in to comment.