Skip to content

Commit

Permalink
GBPTree test for recovery replaying transactions from before last che…
Browse files Browse the repository at this point in the history
…ckpoint

To make sure nothing breaks if more transactions then necessary is replayed.
  • Loading branch information
burqen committed Jan 5, 2017
1 parent 6527c8e commit 08e066e
Showing 1 changed file with 40 additions and 13 deletions.
Expand Up @@ -57,7 +57,13 @@
public class GBPTreeRecoveryTest
{
private static final int PAGE_SIZE = 256;
private static final Action CHECKPOINT = index -> index.checkpoint( unlimited() );
private static final Action CHECKPOINT = ( index, forRecovery ) ->
{
if ( !forRecovery )
{
index.checkpoint( unlimited() );
}
};

private final RandomRule random = new RandomRule();
private final EphemeralFileSystemRule fs = new EphemeralFileSystemRule();
Expand Down Expand Up @@ -121,7 +127,20 @@ public void shouldRecoverFromCrashBeforeFirstCheckpoint() throws Exception
}

@Test
public void shouldRecoverFromAnything() throws Exception
public void shouldRecoverFromAnythingReplayExactFromCheckpoint() throws Exception
{
doShouldRecoverFromAnything( true );

}

@Test
public void shouldRecoverFromAnythingReplayFromBeforeLastCheckpoint() throws Exception
{
doShouldRecoverFromAnything( false );

}

private void doShouldRecoverFromAnything( boolean replayRecoveryExactlyFromCheckpoint ) throws Exception
{
// GIVEN
// a tree which has had random updates and checkpoints in it, load generated with specific seed
Expand All @@ -139,15 +158,15 @@ public void shouldRecoverFromAnything() throws Exception
PageCache pageCache = createPageCache();
GBPTree<MutableLong,MutableLong> index = createIndex( pageCache, file );
// Execute all actions up to and including last checkpoint ...
execute( load.subList( 0, lastCheckPointIndex + 1 ), index );
execute( load.subList( 0, lastCheckPointIndex + 1 ), index, false );
// ... a random amount of the remaining "unsafe" actions ...
int numberOfRemainingActions = load.size() - lastCheckPointIndex - 1;
int crashFlushIndex = lastCheckPointIndex + random.nextInt( numberOfRemainingActions ) + 1;
execute( load.subList( lastCheckPointIndex + 1, crashFlushIndex ), index );
execute( load.subList( lastCheckPointIndex + 1, crashFlushIndex ), index, false );
// ... flush ...
pageCache.flushAndForce();
// ... execute the remaining actions
execute( load.subList( crashFlushIndex, load.size() ), index );
execute( load.subList( crashFlushIndex, load.size() ), index, false );
// ... and finally crash
fs.snapshot( throwing( () ->
{
Expand All @@ -157,7 +176,15 @@ public void shouldRecoverFromAnything() throws Exception
}

// WHEN doing recovery
List<Action> recoveryActions = load.subList( lastCheckPointIndex + 1, load.size() );
List<Action> recoveryActions;
if ( replayRecoveryExactlyFromCheckpoint )
{
recoveryActions = load.subList( lastCheckPointIndex + 1, load.size() );
}
else
{
recoveryActions = load.subList( random.nextInt( lastCheckPointIndex ), load.size() );
}

// first crashing during recovery
int numberOfCrashesDuringRecovery = random.intBetween( 0, 3 );
Expand Down Expand Up @@ -204,22 +231,22 @@ public void shouldRecoverFromAnything() throws Exception
private void recover( List<Action> load, GBPTree<MutableLong,MutableLong> index ) throws IOException
{
index.prepareForRecovery();
execute( load, index );
execute( load, index, true );
}

private static void execute( List<Action> load, Index<MutableLong,MutableLong> index )
private static void execute( List<Action> load, Index<MutableLong,MutableLong> index, boolean forRecovery )
throws IOException
{
for ( Action action : load )
{
action.execute( index );
action.execute( index, forRecovery );
}
}

private static long[] expectedSortedAggregatedDataFromGeneratedLoad( List<Action> load ) throws IOException
{
CapturingIndex index = new CapturingIndex();
execute( load, index );
execute( load, index, false );
@SuppressWarnings( "unchecked" )
Map.Entry<Long,Long>[] entries = index.map.entrySet().toArray( new Map.Entry[index.map.size()] );
long[] result = new long[entries.length * 2];
Expand Down Expand Up @@ -286,7 +313,7 @@ private Action randomAction( boolean allowCheckPoint )
{
// put
long[] data = modificationData( 30, 200 );
return index ->
return ( index, forRecovery ) ->
{
try ( IndexWriter<MutableLong,MutableLong> writer = index.writer() )
{
Expand All @@ -303,7 +330,7 @@ else if ( randomized <= 0.95 || !allowCheckPoint )
{
// remove
long[] data = modificationData( 5, 20 );
return index ->
return ( index, forRecovery ) ->
{
try ( IndexWriter<MutableLong,MutableLong> writer = index.writer() )
{
Expand Down Expand Up @@ -348,7 +375,7 @@ private PageCache createPageCache()

interface Action
{
void execute( Index<MutableLong,MutableLong> index ) throws IOException;
void execute( Index<MutableLong,MutableLong> index, boolean forRecovery ) throws IOException;
}

private static class CapturingIndex implements Index<MutableLong,MutableLong>, IndexWriter<MutableLong,MutableLong>
Expand Down

0 comments on commit 08e066e

Please sign in to comment.