Skip to content

Commit

Permalink
Workers utility for managing worker threads in IdMapper
Browse files Browse the repository at this point in the history
  • Loading branch information
tinwelint committed Apr 27, 2015
1 parent 872c6b3 commit b10d9ca
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 83 deletions.
Expand Up @@ -241,33 +241,28 @@ public boolean needsPreparation()
public void prepare( InputIterable<Object> ids, Collector collector, ProgressListener progress ) public void prepare( InputIterable<Object> ids, Collector collector, ProgressListener progress )
{ {
endPreviousGroup(); endPreviousGroup();
synchronized ( this ) dataCache = dataCache.fixate();
trackerCache = cacheFactory.newIntArray( dataCacheStats.highestIndex()+1, -1 );

try
{ {
dataCache = dataCache.fixate(); sortBuckets = new ParallelSort( radix, dataCache, dataCacheStats, trackerCache, trackerStats,
trackerCache = cacheFactory.newIntArray( dataCacheStats.highestIndex()+1, -1 ); processorsForSorting, progress, DEFAULT ).run();


// Synchronized since there's this concern that a couple of other threads are changing trackerCache if ( detectAndMarkCollisions( progress ) > 0 )
// and it's nice to go through a memory barrier afterwards to ensure this CPU see correct data.
try
{ {
sortBuckets = new ParallelSort( radix, dataCache, dataCacheStats, trackerCache, trackerStats, try ( InputIterator<Object> idIterator = ids.iterator() )
processorsForSorting, progress, DEFAULT ).run();

if ( detectAndMarkCollisions( progress ) > 0 )
{ {
try ( InputIterator<Object> idIterator = ids.iterator() ) buildCollisionInfo( idIterator, collector, progress );
{
buildCollisionInfo( idIterator, collector, progress );
}
} }
} }
catch ( InterruptedException e ) }
{ catch ( InterruptedException e )
Thread.interrupted(); {
throw new RuntimeException( "Got interrupted while preparing the index. Throwing this exception " Thread.interrupted();
+ "onwards will cause a chain reaction which will cause a panic in the whole import, " throw new RuntimeException( "Got interrupted while preparing the index. Throwing this exception "
+ "so mission accomplished" ); + "onwards will cause a chain reaction which will cause a panic in the whole import, "
} + "so mission accomplished" );
} }
readyForUse = true; readyForUse = true;
} }
Expand Down
Expand Up @@ -67,7 +67,7 @@ public ParallelSort( Radix radix, LongArray dataCache, NumberArrayStats dataStat
this.threads = threads; this.threads = threads;
} }


public long[][] run() throws InterruptedException public synchronized long[][] run() throws InterruptedException
{ {
int[][] sortParams = sortRadix(); int[][] sortParams = sortRadix();
int threadsNeeded = 0; int threadsNeeded = 0;
Expand All @@ -80,22 +80,20 @@ public long[][] run() throws InterruptedException
threadsNeeded++; threadsNeeded++;
} }
CountDownLatch waitSignal = new CountDownLatch( 1 ); CountDownLatch waitSignal = new CountDownLatch( 1 );
CountDownLatch doneSignal = new CountDownLatch( threadsNeeded ); Workers<SortWorker> sortWorkers = new Workers<>( "SortWorker" );
SortWorker[] sortWorker = new SortWorker[threadsNeeded];
progress.started( "SORT" ); progress.started( "SORT" );
for ( int i = 0; i < threadsNeeded; i++ ) for ( int i = 0; i < threadsNeeded; i++ )
{ {
if ( sortParams[i][1] == 0 ) if ( sortParams[i][1] == 0 )
{ {
break; break;
} }
sortWorker[i] = new SortWorker( i, sortParams[i][0], sortParams[i][1], waitSignal, doneSignal ); sortWorkers.start( new SortWorker( sortParams[i][0], sortParams[i][1], waitSignal ) );
sortWorker[i].start();
} }
waitSignal.countDown(); waitSignal.countDown();
try try
{ {
doneSignal.await(); sortWorkers.awaitAndThrowOnError();
} }
finally finally
{ {
Expand All @@ -108,7 +106,7 @@ private int[][] sortRadix() throws InterruptedException
{ {
int[][] rangeParams = new int[threads][2]; int[][] rangeParams = new int[threads][2];
int[] bucketRange = new int[threads]; int[] bucketRange = new int[threads];
TrackerInitializer[] initializers = new TrackerInitializer[threads]; Workers<TrackerInitializer> initializers = new Workers<>( "TrackerInitializer" );
sortBuckets = new long[threads][2]; sortBuckets = new long[threads][2];
int bucketSize = safeCastLongToInt( dataStats.size() / threads ); int bucketSize = safeCastLongToInt( dataStats.size() / threads );
int count = 0, fullCount = 0 + 0; int count = 0, fullCount = 0 + 0;
Expand All @@ -134,9 +132,9 @@ private int[][] sortRadix() throws InterruptedException
fullCount += radixIndexCount[i]; fullCount += radixIndexCount[i];
progress.add( radixIndexCount[i] ); progress.add( radixIndexCount[i] );
} }
initializers[threadIndex] = new TrackerInitializer( threadIndex, rangeParams[threadIndex], initializers.start( new TrackerInitializer( threadIndex, rangeParams[threadIndex],
threadIndex > 0 ? bucketRange[threadIndex-1] : -1, bucketRange[threadIndex], threadIndex > 0 ? bucketRange[threadIndex-1] : -1, bucketRange[threadIndex],
sortBuckets[threadIndex] ); sortBuckets[threadIndex] ) );
threadIndex++; threadIndex++;
} }
else else
Expand All @@ -148,9 +146,9 @@ private int[][] sortRadix() throws InterruptedException
bucketRange[threadIndex] = radixIndexCount.length; bucketRange[threadIndex] = radixIndexCount.length;
rangeParams[threadIndex][0] = fullCount; rangeParams[threadIndex][0] = fullCount;
rangeParams[threadIndex][1] = safeCastLongToInt( dataStats.size() - fullCount ); rangeParams[threadIndex][1] = safeCastLongToInt( dataStats.size() - fullCount );
initializers[threadIndex] = new TrackerInitializer( threadIndex, rangeParams[threadIndex], initializers.start( new TrackerInitializer( threadIndex, rangeParams[threadIndex],
threadIndex > 0 ? bucketRange[threadIndex-1] : -1, bucketRange[threadIndex], threadIndex > 0 ? bucketRange[threadIndex-1] : -1, bucketRange[threadIndex],
sortBuckets[threadIndex] ); sortBuckets[threadIndex] ) );
break; break;
} }
} }
Expand All @@ -159,23 +157,15 @@ private int[][] sortRadix() throws InterruptedException
// In the loop above where we split up radixes into buckets, we start one thread per bucket whose // In the loop above where we split up radixes into buckets, we start one thread per bucket whose
// job is to populate trackerCache and sortBuckets where each thread will not touch the same // job is to populate trackerCache and sortBuckets where each thread will not touch the same
// data indexes as any other thread. Here we wait for them all to finish. // data indexes as any other thread. Here we wait for them all to finish.
Throwable error = initializers.await();
int[] bucketIndex = new int[threads]; int[] bucketIndex = new int[threads];
Throwable error = null;
long highestIndex = -1, size = 0; long highestIndex = -1, size = 0;
for ( int i = 0; i < initializers.length; i++ ) int i = 0;
for ( TrackerInitializer initializer : initializers )
{ {
TrackerInitializer initializer = initializers[i]; bucketIndex[i++] = initializer.bucketIndex;
if ( initializer != null ) highestIndex = max( highestIndex, initializer.highestIndex );
{ size += initializer.size;
Throwable initializerError = initializer.await();
if ( initializerError != null )
{
error = initializerError;
}
bucketIndex[i] = initializer.bucketIndex;
highestIndex = max( highestIndex, initializer.highestIndex );
size += initializer.size;
}
} }
trackerStats.set( size, highestIndex ); trackerStats.set( size, highestIndex );
if ( error != null ) if ( error != null )
Expand Down Expand Up @@ -301,20 +291,17 @@ public boolean ge( long right, long pivot )
* instead trackerCache is updated to point to the right indexes. Only touches a designated part of trackerCache * instead trackerCache is updated to point to the right indexes. Only touches a designated part of trackerCache
* so that many can run in parallel on their own part without synchronization. * so that many can run in parallel on their own part without synchronization.
*/ */
private class SortWorker extends Thread private class SortWorker implements Runnable
{ {
private final int start, size; private final int start, size;
private final CountDownLatch doneSignal, waitSignal; private final CountDownLatch waitSignal;
private int workerId = -1;
private int threadLocalProgress; private int threadLocalProgress;


SortWorker( int workerId, int startRange, int size, CountDownLatch wait, CountDownLatch done ) SortWorker( int startRange, int size, CountDownLatch wait )
{ {
this.start = startRange; this.start = startRange;
this.size = size; this.size = size;
this.doneSignal = done;
this.waitSignal = wait; this.waitSignal = wait;
this.workerId = workerId;
} }


void incrementProgress( int diff ) void incrementProgress( int diff )
Expand All @@ -336,7 +323,6 @@ private void reportProgress()
public void run() public void run()
{ {
Random random = ThreadLocalRandom.current(); Random random = ThreadLocalRandom.current();
this.setName( "SortWorker-" + workerId );
try try
{ {
waitSignal.await(); waitSignal.await();
Expand All @@ -347,23 +333,21 @@ public void run()
} }
recursiveQsort( start, start + size, random, this ); recursiveQsort( start, start + size, random, this );
reportProgress(); reportProgress();
doneSignal.countDown();
} }
} }


/** /**
* Sets the initial tracker indexes pointing to data indexes. Only touches a designated part of trackerCache * Sets the initial tracker indexes pointing to data indexes. Only touches a designated part of trackerCache
* so that many can run in parallel on their own part without synchronization. * so that many can run in parallel on their own part without synchronization.
*/ */
private class TrackerInitializer extends Thread private class TrackerInitializer implements Runnable
{ {
private final int[] rangeParams; private final int[] rangeParams;
private final int lowBucketRange; private final int lowBucketRange;
private final int highBucketRange; private final int highBucketRange;
private final int threadIndex; private final int threadIndex;
private int bucketIndex; private int bucketIndex;
private final long[] result; private final long[] result;
private volatile Throwable error;
private long highestIndex = -1; private long highestIndex = -1;
private long size; private long size;


Expand All @@ -374,46 +358,32 @@ private class TrackerInitializer extends Thread
this.lowBucketRange = lowBucketRange; this.lowBucketRange = lowBucketRange;
this.highBucketRange = highBucketRange; this.highBucketRange = highBucketRange;
this.result = result; this.result = result;
start();
} }


@Override @Override
public void run() public void run()
{ {
try long max = dataStats.highestIndex();
for ( long i = 0; i <= max; i++ )
{ {
long dataSize = dataStats.size(); int rIndex = radixCalculator.radixOf( dataCache.get( i ) );
for ( long i = 0; i < dataSize; i++ ) if ( rIndex > lowBucketRange && rIndex <= highBucketRange )
{ {
int rIndex = radixCalculator.radixOf( dataCache.get( i ) ); long temp = (rangeParams[0] + bucketIndex++);
if ( rIndex > lowBucketRange && rIndex <= highBucketRange ) assert tracker.get( temp ) == -1 : "Overlapping buckets i:" + i + ", k:" + threadIndex;
tracker.set( temp, (int) i );
if ( bucketIndex == rangeParams[1] )
{ {
long temp = (rangeParams[0] + bucketIndex++); result[0] = highBucketRange;
assert tracker.get( temp ) == -1 : "Overlapping buckets i:" + i + ", k:" + threadIndex; result[1] = rangeParams[0];
tracker.set( temp, (int) i );
if ( bucketIndex == rangeParams[1] )
{
result[0] = highBucketRange;
result[1] = rangeParams[0];
}
} }
} }
if ( bucketIndex > 0 )
{
highestIndex = rangeParams[0] + bucketIndex - 1;
}
size = bucketIndex;
} }
catch ( Throwable t ) if ( bucketIndex > 0 )
{ {
error = t; highestIndex = rangeParams[0] + bucketIndex - 1;
} }
} size = bucketIndex;

private synchronized Throwable await() throws InterruptedException
{
join();
return error;
} }
} }
} }

0 comments on commit b10d9ca

Please sign in to comment.