Skip to content

Commit

Permalink
Add a randomised test that opens multiple page cursors
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisvest committed Jan 20, 2016
1 parent 7f536b0 commit d309791
Showing 1 changed file with 169 additions and 55 deletions.
224 changes: 169 additions & 55 deletions community/io/src/test/java/org/neo4j/io/pagecache/PageCacheSlowTest.java
Expand Up @@ -55,7 +55,57 @@

public abstract class PageCacheSlowTest<T extends PageCache> extends PageCacheTestSupport<T>
{
@RepeatRule.Repeat( times = 1000 )
private static class UpdateResult
{
final int threadId;
final int[] pageCounts;

UpdateResult( int threadId, int[] pageCounts )
{
this.threadId = threadId;
this.pageCounts = pageCounts;
}
}

private static abstract class UpdateWorker implements Callable<UpdateResult>
{
final int threadId;
final int filePages;
final AtomicBoolean shouldStop;
final PagedFile pagedFile;
final int[] pageCounts;
final int offset;

UpdateWorker( int threadId, int filePages, AtomicBoolean shouldStop, PagedFile pagedFile )
{
this.threadId = threadId;
this.filePages = filePages;
this.shouldStop = shouldStop;
this.pagedFile = pagedFile;
pageCounts = new int[filePages];
offset = threadId * 4;
}

@Override
public UpdateResult call() throws Exception
{
ThreadLocalRandom rng = ThreadLocalRandom.current();

while ( !shouldStop.get() )
{
boolean updateCounter = rng.nextBoolean();
int pf_flags = updateCounter ? PF_SHARED_WRITE_LOCK : PF_SHARED_READ_LOCK;
performReadOrUpdate( rng, updateCounter, pf_flags );
}

return new UpdateResult( threadId, pageCounts );
}

protected abstract void performReadOrUpdate( ThreadLocalRandom rng, boolean updateCounter,
int pf_flags ) throws IOException;
}

@RepeatRule.Repeat( times = 250 )
@Test( timeout = SEMI_LONG_TIMEOUT_MILLIS )
public void mustNotLoseUpdates() throws Exception
{
Expand Down Expand Up @@ -89,53 +139,17 @@ public void mustNotLoseUpdates() throws Exception
getPageCache( fs, cachePages, pageSize, PageCacheTracer.NULL );
final PagedFile pagedFile = pageCache.map( file( "a" ), pageSize );

// Ensure all the pages exist
try ( PageCursor cursor = pagedFile.io( 0, PF_SHARED_WRITE_LOCK ) )
{
for ( int i = 0; i < filePages; i++ )
{
assertTrue( "failed to initialise file page " + i, cursor.next() );
for ( int j = 0; j < pageSize; j++ )
{
cursor.putByte( (byte) 0 );
}
}
}
pageCache.flushAndForce();

class Result
{
final int threadId;
final int[] pageCounts;

Result( int threadId, int[] pageCounts )
{
this.threadId = threadId;
this.pageCounts = pageCounts;
}
}
ensureAllPagesExists( filePages, pagedFile );

class Worker implements Callable<Result>
List<Future<UpdateResult>> futures = new ArrayList<>();
for ( int i = 0; i < threadCount; i++ )
{
final int threadId;

Worker( int threadId )
UpdateWorker worker = new UpdateWorker( i, filePages, shouldStop, pagedFile )
{
this.threadId = threadId;
}

@Override
public Result call() throws Exception
{
int[] pageCounts = new int[filePages];
ThreadLocalRandom rng = ThreadLocalRandom.current();

while ( !shouldStop.get() )
protected void performReadOrUpdate(
ThreadLocalRandom rng, boolean updateCounter, int pf_flags ) throws IOException
{
int pageId = rng.nextInt( 0, filePages );
int offset = threadId * 4;
boolean updateCounter = rng.nextBoolean();
int pf_flags = updateCounter ? PF_SHARED_WRITE_LOCK : PF_SHARED_READ_LOCK;
try ( PageCursor cursor = pagedFile.io( pageId, pf_flags ) )
{
int counter;
Expand All @@ -150,7 +164,9 @@ public Result call() throws Exception
while ( cursor.shouldRetry() );
String lockName = updateCounter ? "PF_SHARED_WRITE_LOCK" : "PF_SHARED_READ_LOCK";
assertThat( "inconsistent page read from filePageId = " + pageId + ", with " + lockName +
", workerId = " + threadId + " [t:" + Thread.currentThread().getId() + "]",
", workerId = " +
threadId + " [t:" + Thread
.currentThread().getId() + "]",
counter, is( pageCounts[pageId] ) );
}
catch ( Throwable throwable )
Expand All @@ -167,23 +183,36 @@ public Result call() throws Exception
}
}
}

return new Result( threadId, pageCounts );
}
};
futures.add( executor.submit( worker ) );
}

List<Future<Result>> futures = new ArrayList<>();
for ( int i = 0; i < threadCount; i++ )
Thread.sleep( 40 );
shouldStop.set( true );

verifyUpdateResults( filePages, pagedFile, futures );
pagedFile.close();
}

private void ensureAllPagesExists( int filePages, PagedFile pagedFile ) throws IOException
{
try ( PageCursor cursor = pagedFile.io( 0, PF_SHARED_WRITE_LOCK ) )
{
futures.add( executor.submit( new Worker( i ) ) );
for ( int i = 0; i < filePages; i++ )
{
assertTrue( "failed to initialise file page " + i, cursor.next() );
}
}
pageCache.flushAndForce();
}

Thread.sleep( 10 );
shouldStop.set( true );

for ( Future<Result> future : futures )
private void verifyUpdateResults( int filePages, PagedFile pagedFile,
List<Future<UpdateResult>> futures )
throws InterruptedException, ExecutionException, IOException
{
for ( Future<UpdateResult> future : futures )
{
Result result = future.get();
UpdateResult result = future.get();
try ( PageCursor cursor = pagedFile.io( 0, PF_SHARED_READ_LOCK ) )
{
for ( int i = 0; i < filePages; i++ )
Expand All @@ -205,6 +234,91 @@ public Result call() throws Exception
}
}
}
}

@RepeatRule.Repeat( times = 250 )
@Test( timeout = SEMI_LONG_TIMEOUT_MILLIS )
public void mustNotLoseUpdatesWhenOpeningMultiplePageCursorsPerThread() throws Exception
{
// Similar to the test above, except the threads will have multiple page cursors opened at a time.

final AtomicBoolean shouldStop = new AtomicBoolean();
final int cachePages = 20;
final int filePages = cachePages * 2;
final int threadCount = 8;
final int pageSize = threadCount * 4;

getPageCache( fs, cachePages, pageSize, PageCacheTracer.NULL );
final PagedFile pagedFile = pageCache.map( file( "a" ), pageSize );

ensureAllPagesExists( filePages, pagedFile );

List<Future<UpdateResult>> futures = new ArrayList<>();
for ( int i = 0; i < threadCount; i++ )
{
UpdateWorker worker = new UpdateWorker( i, filePages, shouldStop, pagedFile )
{
protected void performReadOrUpdate(
ThreadLocalRandom rng, boolean updateCounter, int pf_flags ) throws IOException
{
try
{
int pageCount = rng.nextInt( 1, filePages / 10 );
int[] pageIds = new int[pageCount];
for ( int j = 0; j < pageCount; j++ )
{
pageIds[j] = rng.nextInt( 0, filePages );
}
PageCursor[] cursors = new PageCursor[pageCount];
for ( int j = 0; j < pageCount; j++ )
{
cursors[j] = pagedFile.io( pageIds[j], pf_flags );
assertTrue( cursors[j].next() );
}
for ( int j = 0; j < pageCount; j++ )
{
int pageId = pageIds[j];
PageCursor cursor = cursors[j];
int counter;
do
{
cursor.setOffset( offset );
counter = cursor.getInt();
}
while ( cursor.shouldRetry() );
String lockName = updateCounter ? "PF_SHARED_WRITE_LOCK" : "PF_SHARED_READ_LOCK";
assertThat( "inconsistent page read from filePageId = " + pageId + ", with " + lockName +
", workerId = " +
threadId + " [t:" + Thread
.currentThread().getId() + "]",
counter, is( pageCounts[pageId] ) );
if ( updateCounter )
{
counter++;
pageCounts[pageId]++;
cursor.setOffset( offset );
cursor.putInt( counter );
}
}
for ( PageCursor cursor : cursors )
{
cursor.close();
}
}
catch ( Throwable throwable )
{
shouldStop.set( true );
throw throwable;
}
}
};
futures.add( executor.submit( worker ) );
}

Thread.sleep( 40 );
shouldStop.set( true );

verifyUpdateResults( filePages, pagedFile, futures );
pagedFile.close();
}

Expand Down

0 comments on commit d309791

Please sign in to comment.