Skip to content

Commit

Permalink
More explicit work thread id assignment in CC store processor
Browse files Browse the repository at this point in the history
which simplifies:
- RecordDistributor which no longer needs to handle first/last records specially
- CountsBuildDecorator which has a predicate filtering out consecutive stages
  as to not duplicate count values for comparison later. This predicate is
  simpler since an explicit prepare() is called before each stage instead of
  trying to infer this fact by looking at ids of prcessed records (knowing they
  might even be out of order). None of that

And fixes race conditions around parallel processing on datasets smaller
than number threads.
  • Loading branch information
tinwelint committed Feb 29, 2016
1 parent 41d346d commit b339d4b
Show file tree
Hide file tree
Showing 17 changed files with 158 additions and 214 deletions.
Expand Up @@ -32,6 +32,11 @@

public interface CheckDecorator
{
/**
* Called before each pass over the store(s) to check.
*/
void prepare();

OwningRecordCheck<NeoStoreRecord, ConsistencyReport.NeoStoreConsistencyReport> decorateNeoStoreChecker(
OwningRecordCheck<NeoStoreRecord, ConsistencyReport.NeoStoreConsistencyReport> checker );

Expand Down Expand Up @@ -63,6 +68,11 @@ RecordCheck<RelationshipGroupRecord, ConsistencyReport.RelationshipGroupConsiste

static class Adapter implements CheckDecorator
{
@Override
public void prepare()
{
}

@Override
public OwningRecordCheck<NeoStoreRecord, ConsistencyReport.NeoStoreConsistencyReport> decorateNeoStoreChecker(
OwningRecordCheck<NeoStoreRecord, ConsistencyReport.NeoStoreConsistencyReport> checker )
Expand Down Expand Up @@ -136,6 +146,15 @@ public ChainCheckDecorator( CheckDecorator...decorators )
this.decorators = decorators;
}

@Override
public void prepare()
{
for ( CheckDecorator decorator : decorators )
{
decorator.prepare();
}
}

@Override
public OwningRecordCheck<NeoStoreRecord,ConsistencyReport.NeoStoreConsistencyReport> decorateNeoStoreChecker(
OwningRecordCheck<NeoStoreRecord,ConsistencyReport.NeoStoreConsistencyReport> checker )
Expand Down
Expand Up @@ -42,7 +42,6 @@
import org.neo4j.kernel.impl.store.NodeLabelsField;
import org.neo4j.kernel.impl.store.NodeStore;
import org.neo4j.kernel.impl.store.RecordStore;
import org.neo4j.kernel.impl.store.RelationshipStore;
import org.neo4j.kernel.impl.store.StoreAccess;
import org.neo4j.kernel.impl.store.counts.keys.CountsKey;
import org.neo4j.kernel.impl.store.record.AbstractBaseRecord;
Expand All @@ -61,10 +60,9 @@ class CountsBuilderDecorator extends CheckDecorator.Adapter
private static final int WILDCARD = -1;
private final MultiSet<CountsKey> nodeCounts = new MultiSet<>();
private final MultiSet<CountsKey> relationshipCounts = new MultiSet<>();
private final Predicate<NodeRecord> nodeCountBuildCondition;
private final Predicate<RelationshipRecord> relationshipCountBuildCondition;
private final MultiPassAvoidanceCondition<NodeRecord> nodeCountBuildCondition;
private final MultiPassAvoidanceCondition<RelationshipRecord> relationshipCountBuildCondition;
private final NodeStore nodeStore;
private final RelationshipStore relationshipStore;
private final StoreAccess storeAccess;
private final CountsEntry.CheckAdapter CHECK_NODE_COUNT = new CountsEntry.CheckAdapter()
{
Expand Down Expand Up @@ -127,9 +125,15 @@ public CountsBuilderDecorator( StoreAccess storeAccess )
{
this.storeAccess = storeAccess;
this.nodeStore = storeAccess.getRawNeoStores().getNodeStore();
this.relationshipStore = storeAccess.getRawNeoStores().getRelationshipStore();
this.nodeCountBuildCondition = new MultiPassAvoidanceCondition<>( nodeStore.getHighestPossibleIdInUse() );
this.relationshipCountBuildCondition = new MultiPassAvoidanceCondition<>( relationshipStore.getHighestPossibleIdInUse() );
this.nodeCountBuildCondition = new MultiPassAvoidanceCondition<>();
this.relationshipCountBuildCondition = new MultiPassAvoidanceCondition<>();
}

@Override
public void prepare()
{
this.nodeCountBuildCondition.prepare();
this.relationshipCountBuildCondition.prepare();
}

@Override
Expand Down Expand Up @@ -326,39 +330,28 @@ public void check( RelationshipRecord record,

private static class MultiPassAvoidanceCondition<T extends AbstractBaseRecord> implements Predicate<T>
{
private boolean started = false, done = false;
private final long terminationId;
private boolean used;
private boolean done;

public MultiPassAvoidanceCondition( long terminationId )
public void prepare()
{
this.terminationId = terminationId;
if ( used )
{
done = true;
}
}

@Override
public boolean test( T record )
{
if ( done )
try
{
return false;
return !done;
}
if ( record.getLongId() == terminationId )
finally
{
done = true;
return true;
}
if ( record.getLongId() == 0 )
{
if ( started )
{
done = true;
return false;
}
else
{
started = true;
}
used = true;
}
return true;
}
}

Expand Down
Expand Up @@ -122,7 +122,7 @@ ConsistencySummaryStatistics execute( DirectStoreAccess stores, Log log, Monitor
return summary;
}

void execute( final DirectStoreAccess directStoreAccess, CheckDecorator decorator,
void execute( final DirectStoreAccess directStoreAccess, final CheckDecorator decorator,
final RecordAccess recordAccess, final InconsistencyReport report,
CacheAccess cacheAccess, Monitor reportMonitor )
throws ConsistencyCheckIncompleteException
Expand All @@ -142,7 +142,14 @@ void execute( final DirectStoreAccess directStoreAccess, CheckDecorator decorato
multiPass, reporter, threads );
List<ConsistencyCheckerTask> tasks =
taskCreator.createTasksForFullCheck( checkLabelScanStore, checkIndexes, checkGraph );
TaskExecutor.execute( tasks, progress.build() );
TaskExecutor.execute( tasks, new Runnable()
{
@Override
public void run()
{
decorator.prepare();
}
} );
}
catch ( Exception e )
{
Expand Down
Expand Up @@ -26,7 +26,7 @@
import org.neo4j.kernel.api.labelscan.LabelScanStore;
import org.neo4j.kernel.impl.store.record.NodeRecord;

public class NodeToLabelScanRecordProcessor implements RecordProcessor<NodeRecord>
public class NodeToLabelScanRecordProcessor extends RecordProcessor.Adapter<NodeRecord>
{
private final ConsistencyReporter reporter;
private final RecordCheck<NodeRecord, ConsistencyReport.LabelsMatchReport> nodeLabelCheck;
Expand Down
Expand Up @@ -129,6 +129,11 @@ public void run()
}
}

@Override
public void prepare()
{
}

@Override
public OwningRecordCheck<NeoStoreRecord, ConsistencyReport.NeoStoreConsistencyReport> decorateNeoStoreChecker(
OwningRecordCheck<NeoStoreRecord, ConsistencyReport.NeoStoreConsistencyReport> checker )
Expand Down
Expand Up @@ -35,7 +35,7 @@
/**
* Processor of node records with the context of how they're indexed.
*/
public class PropertyAndNode2LabelIndexProcessor implements RecordProcessor<NodeRecord>
public class PropertyAndNode2LabelIndexProcessor extends RecordProcessor.Adapter<NodeRecord>
{
private final ConsistencyReporter reporter;
private final RecordCheck<NodeRecord, ConsistencyReport.NodeConsistencyReport> nodeIndexCheck;
Expand Down Expand Up @@ -71,9 +71,4 @@ public void process( NodeRecord nodeRecord )
}
}
}

@Override
public void close()
{
}
}
Expand Up @@ -76,8 +76,14 @@ public RoundRobinQueueDistributor( int numberOfThreads )
@Override
public void distribute( RECORD record, RecordConsumer<RECORD> consumer ) throws InterruptedException
{
nextQIndex = (nextQIndex + 1) % numberOfThreads;
consumer.accept( record, nextQIndex );
try
{
consumer.accept( record, nextQIndex );
}
finally
{
nextQIndex = (nextQIndex + 1) % numberOfThreads;
}
}
}

Expand Down
Expand Up @@ -26,19 +26,21 @@
/**
* Base class for workers that processes records during consistency check.
*/
public abstract class RecordCheckWorker<RECORD> implements Runnable
public class RecordCheckWorker<RECORD> implements Runnable
{
private volatile boolean done;
protected final BlockingQueue<RECORD> recordsQ;
private final int id;
private final AtomicInteger idQueue;
private boolean initialized;
private final RecordProcessor<RECORD> processor;

public RecordCheckWorker( int id, AtomicInteger idQueue, BlockingQueue<RECORD> recordsQ )
public RecordCheckWorker( int id, AtomicInteger idQueue, BlockingQueue<RECORD> recordsQ,
RecordProcessor<RECORD> processor )
{
this.id = id;
this.idQueue = idQueue;
this.recordsQ = recordsQ;
this.processor = processor;
}

public void done()
Expand All @@ -49,6 +51,21 @@ public void done()
@Override
public void run()
{
// We assign threads to ids, first come first serve and the the thread assignment happens
// inside the record processing which accesses CacheAccess#client() and that happens
// lazily. So... we need to coordinate so that the processing threads initializes the processing
// in order of thread id. This may change later so that the thread ids are assigned
// explicitly on creating the threads... which should be much better, although hard with
// the current design due to the state living inside ThreadLocal which makes it depend
// on the actual and correct thread making the call... which is what we do here.
awaitMyTurnToInitialize();

// This was the first record, the first record processing has now happened and so we
// can notify the others that we have initialized this thread id and the next one
// can go ahead and do so.
processor.init( id );
tellNextThreadToInitialize();

while ( !done || !recordsQ.isEmpty() )
{
RECORD record;
Expand All @@ -57,29 +74,7 @@ public void run()
record = recordsQ.poll( 10, TimeUnit.MILLISECONDS );
if ( record != null )
{
if ( !initialized )
{
// We assign threads to ids, first come first serve and the the thread assignment happens
// inside the record processing which accesses CacheAccess#client() and that happens
// lazily. So... we need to coordinate so that the first process of a record occurs
// in order of thread id. This may change later so that the thread ids are assigned
// explicitly on creating the threads... which should be much better, although hard with
// the current design due to the state living inside ThreadLocal which makes it depend
// on the actual and correct thread making the call... which is what we do here.
awaitMyTurnToInitialize();
}

process( record );


if ( !initialized )
{
// This was the first record, the first record processing has now happened and so we
// can notify the others that we have initialized this thread id and the next one
// can go ahead and do so.
tellNextThreadToInitialize();
initialized = true;
}
processor.process( record );
}
}
catch ( InterruptedException e )
Expand Down Expand Up @@ -111,6 +106,4 @@ private void tellNextThreadToInitialize()
boolean set = idQueue.compareAndSet( id-1, id );
assert set : "Something wrong with the design here";
}

protected abstract void process( RECORD record );
}

0 comments on commit b339d4b

Please sign in to comment.