Skip to content

Commit

Permalink
Reduce contention on work distribution
Browse files Browse the repository at this point in the history
by dividing work into batches.
  • Loading branch information
burqen committed Mar 14, 2017
1 parent 5fff2a2 commit 817ef61
Showing 1 changed file with 14 additions and 13 deletions.
Expand Up @@ -30,9 +30,10 @@
import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.io.pagecache.PagedFile;

import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.lang.System.currentTimeMillis;
import static java.util.concurrent.TimeUnit.SECONDS;

import static org.neo4j.helpers.Exceptions.launderedException;

/**
Expand All @@ -44,6 +45,8 @@ class CrashGenCleaner
private final TreeNode<?,?> treeNode;
private final long lowTreeNodeId;
private final long highTreeNodeId;
private final int availableProcessors;
private final long batchSize;
private final long stableGeneration;
private final long unstableGeneration;
private final Monitor monitor;
Expand All @@ -56,6 +59,11 @@ class CrashGenCleaner
this.treeNode = treeNode;
this.lowTreeNodeId = lowTreeNodeId;
this.highTreeNodeId = highTreeNodeId;
this.availableProcessors = Runtime.getRuntime().availableProcessors();
this.batchSize = // Each processor will get roughly 100 batches each
min( 1000, max( 10, (highTreeNodeId - lowTreeNodeId) / (100 * availableProcessors) ) );
System.out.println( "Number of processors: " + availableProcessors + ", number of pages: " +
(highTreeNodeId - lowTreeNodeId) + ", batch size: " + batchSize );
this.stableGeneration = stableGeneration;
this.unstableGeneration = unstableGeneration;
this.monitor = monitor;
Expand All @@ -70,7 +78,7 @@ public void clean() throws IOException
assert unstableGeneration - stableGeneration > 1;

long startTime = currentTimeMillis();
int threads = Runtime.getRuntime().availableProcessors();
int threads = availableProcessors;
ExecutorService executor = Executors.newFixedThreadPool( threads );
AtomicLong nextId = new AtomicLong( lowTreeNodeId);
AtomicReference<Throwable> error = new AtomicReference<>();
Expand Down Expand Up @@ -118,19 +126,12 @@ private Runnable cleaner( AtomicLong nextId, AtomicReference<Throwable> error, A
try ( PageCursor cursor = pagedFile.io( 0, PagedFile.PF_SHARED_READ_LOCK );
PageCursor writeCursor = pagedFile.io( 0, PagedFile.PF_SHARED_WRITE_LOCK ) )
{
while ( nextId.get() < highTreeNodeId )
long localNextId;
while ( ( localNextId = nextId.getAndAdd( batchSize )) < highTreeNodeId )
{
// Do a batch of max 1000
int batchCount = 0;
for ( ; batchCount < 1_000; batchCount++ )
for ( int i = 0; i < batchSize && localNextId < highTreeNodeId; i++, localNextId++ )
{
long id = nextId.getAndIncrement();
if ( id >= highTreeNodeId )
{
break;
}

PageCursorUtil.goTo( cursor, "clean", id );
PageCursorUtil.goTo( cursor, "clean", localNextId );

if ( hasCrashedGSPP( treeNode, cursor ) )
{
Expand Down

0 comments on commit 817ef61

Please sign in to comment.