diff --git a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/CheckDecorator.java b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/CheckDecorator.java index 36a75db5d6d00..1af22ede0cfa8 100644 --- a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/CheckDecorator.java +++ b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/CheckDecorator.java @@ -32,6 +32,11 @@ public interface CheckDecorator { + /** + * Called before each pass over the store(s) to check. + */ + void prepare(); + OwningRecordCheck decorateNeoStoreChecker( OwningRecordCheck checker ); @@ -63,6 +68,11 @@ RecordCheck decorateNeoStoreChecker( OwningRecordCheck checker ) @@ -136,6 +146,15 @@ public ChainCheckDecorator( CheckDecorator...decorators ) this.decorators = decorators; } + @Override + public void prepare() + { + for ( CheckDecorator decorator : decorators ) + { + decorator.prepare(); + } + } + @Override public OwningRecordCheck decorateNeoStoreChecker( OwningRecordCheck checker ) diff --git a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/CountsBuilderDecorator.java b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/CountsBuilderDecorator.java index 975e9ee414e0b..994e17c4e2ced 100644 --- a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/CountsBuilderDecorator.java +++ b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/CountsBuilderDecorator.java @@ -42,7 +42,6 @@ import org.neo4j.kernel.impl.store.NodeLabelsField; import org.neo4j.kernel.impl.store.NodeStore; import org.neo4j.kernel.impl.store.RecordStore; -import org.neo4j.kernel.impl.store.RelationshipStore; import org.neo4j.kernel.impl.store.StoreAccess; import org.neo4j.kernel.impl.store.counts.keys.CountsKey; import org.neo4j.kernel.impl.store.record.AbstractBaseRecord; @@ -61,10 +60,9 @@ class CountsBuilderDecorator extends CheckDecorator.Adapter private static final int WILDCARD = -1; private final MultiSet nodeCounts = new MultiSet<>(); private final MultiSet relationshipCounts = new MultiSet<>(); - private final Predicate nodeCountBuildCondition; - private final Predicate relationshipCountBuildCondition; + private final MultiPassAvoidanceCondition nodeCountBuildCondition; + private final MultiPassAvoidanceCondition relationshipCountBuildCondition; private final NodeStore nodeStore; - private final RelationshipStore relationshipStore; private final StoreAccess storeAccess; private final CountsEntry.CheckAdapter CHECK_NODE_COUNT = new CountsEntry.CheckAdapter() { @@ -127,9 +125,15 @@ public CountsBuilderDecorator( StoreAccess storeAccess ) { this.storeAccess = storeAccess; this.nodeStore = storeAccess.getRawNeoStores().getNodeStore(); - this.relationshipStore = storeAccess.getRawNeoStores().getRelationshipStore(); - this.nodeCountBuildCondition = new MultiPassAvoidanceCondition<>( nodeStore.getHighestPossibleIdInUse() ); - this.relationshipCountBuildCondition = new MultiPassAvoidanceCondition<>( relationshipStore.getHighestPossibleIdInUse() ); + this.nodeCountBuildCondition = new MultiPassAvoidanceCondition<>(); + this.relationshipCountBuildCondition = new MultiPassAvoidanceCondition<>(); + } + + @Override + public void prepare() + { + this.nodeCountBuildCondition.prepare(); + this.relationshipCountBuildCondition.prepare(); } @Override @@ -326,39 +330,28 @@ public void check( RelationshipRecord record, private static class MultiPassAvoidanceCondition implements Predicate { - private boolean started = false, done = false; - private final long terminationId; + private boolean used; + private boolean done; - public MultiPassAvoidanceCondition( long terminationId ) + public void prepare() { - this.terminationId = terminationId; + if ( used ) + { + done = true; + } } @Override public boolean test( T record ) { - if ( done ) + try { - return false; + return !done; } - if ( record.getLongId() == terminationId ) + finally { - done = true; - return true; - } - if ( record.getLongId() == 0 ) - { - if ( started ) - { - done = true; - return false; - } - else - { - started = true; - } + used = true; } - return true; } } diff --git a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/FullCheck.java b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/FullCheck.java index 1a8b8eae94e46..b8f7cf55d7e39 100644 --- a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/FullCheck.java +++ b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/FullCheck.java @@ -122,7 +122,7 @@ ConsistencySummaryStatistics execute( DirectStoreAccess stores, Log log, Monitor return summary; } - void execute( final DirectStoreAccess directStoreAccess, CheckDecorator decorator, + void execute( final DirectStoreAccess directStoreAccess, final CheckDecorator decorator, final RecordAccess recordAccess, final InconsistencyReport report, CacheAccess cacheAccess, Monitor reportMonitor ) throws ConsistencyCheckIncompleteException @@ -142,7 +142,14 @@ void execute( final DirectStoreAccess directStoreAccess, CheckDecorator decorato multiPass, reporter, threads ); List tasks = taskCreator.createTasksForFullCheck( checkLabelScanStore, checkIndexes, checkGraph ); - TaskExecutor.execute( tasks, progress.build() ); + TaskExecutor.execute( tasks, new Runnable() + { + @Override + public void run() + { + decorator.prepare(); + } + } ); } catch ( Exception e ) { diff --git a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/NodeToLabelScanRecordProcessor.java b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/NodeToLabelScanRecordProcessor.java index f1f49e3f4474b..9887124f00afd 100644 --- a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/NodeToLabelScanRecordProcessor.java +++ b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/NodeToLabelScanRecordProcessor.java @@ -26,7 +26,7 @@ import org.neo4j.kernel.api.labelscan.LabelScanStore; import org.neo4j.kernel.impl.store.record.NodeRecord; -public class NodeToLabelScanRecordProcessor implements RecordProcessor +public class NodeToLabelScanRecordProcessor extends RecordProcessor.Adapter { private final ConsistencyReporter reporter; private final RecordCheck nodeLabelCheck; diff --git a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/OwnerCheck.java b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/OwnerCheck.java index ebfb4d2a00bc6..69b1a4930468e 100644 --- a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/OwnerCheck.java +++ b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/OwnerCheck.java @@ -129,6 +129,11 @@ public void run() } } + @Override + public void prepare() + { + } + @Override public OwningRecordCheck decorateNeoStoreChecker( OwningRecordCheck checker ) diff --git a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/PropertyAndNode2LabelIndexProcessor.java b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/PropertyAndNode2LabelIndexProcessor.java index 242a7d3a16b95..0f709d6dcb92c 100644 --- a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/PropertyAndNode2LabelIndexProcessor.java +++ b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/PropertyAndNode2LabelIndexProcessor.java @@ -35,7 +35,7 @@ /** * Processor of node records with the context of how they're indexed. */ -public class PropertyAndNode2LabelIndexProcessor implements RecordProcessor +public class PropertyAndNode2LabelIndexProcessor extends RecordProcessor.Adapter { private final ConsistencyReporter reporter; private final RecordCheck nodeIndexCheck; @@ -71,9 +71,4 @@ public void process( NodeRecord nodeRecord ) } } } - - @Override - public void close() - { - } } diff --git a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/QueueDistribution.java b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/QueueDistribution.java index ab3259aa1897f..250256eb30e72 100644 --- a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/QueueDistribution.java +++ b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/QueueDistribution.java @@ -76,8 +76,14 @@ public RoundRobinQueueDistributor( int numberOfThreads ) @Override public void distribute( RECORD record, RecordConsumer consumer ) throws InterruptedException { - nextQIndex = (nextQIndex + 1) % numberOfThreads; - consumer.accept( record, nextQIndex ); + try + { + consumer.accept( record, nextQIndex ); + } + finally + { + nextQIndex = (nextQIndex + 1) % numberOfThreads; + } } } diff --git a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/RecordCheckWorker.java b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/RecordCheckWorker.java index 5858719a0ea08..ce1568885d1d3 100644 --- a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/RecordCheckWorker.java +++ b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/RecordCheckWorker.java @@ -26,19 +26,21 @@ /** * Base class for workers that processes records during consistency check. */ -public abstract class RecordCheckWorker implements Runnable +public class RecordCheckWorker implements Runnable { private volatile boolean done; protected final BlockingQueue recordsQ; private final int id; private final AtomicInteger idQueue; - private boolean initialized; + private final RecordProcessor processor; - public RecordCheckWorker( int id, AtomicInteger idQueue, BlockingQueue recordsQ ) + public RecordCheckWorker( int id, AtomicInteger idQueue, BlockingQueue recordsQ, + RecordProcessor processor ) { this.id = id; this.idQueue = idQueue; this.recordsQ = recordsQ; + this.processor = processor; } public void done() @@ -49,6 +51,21 @@ public void done() @Override public void run() { + // We assign threads to ids, first come first serve and the the thread assignment happens + // inside the record processing which accesses CacheAccess#client() and that happens + // lazily. So... we need to coordinate so that the processing threads initializes the processing + // in order of thread id. This may change later so that the thread ids are assigned + // explicitly on creating the threads... which should be much better, although hard with + // the current design due to the state living inside ThreadLocal which makes it depend + // on the actual and correct thread making the call... which is what we do here. + awaitMyTurnToInitialize(); + + // This was the first record, the first record processing has now happened and so we + // can notify the others that we have initialized this thread id and the next one + // can go ahead and do so. + processor.init( id ); + tellNextThreadToInitialize(); + while ( !done || !recordsQ.isEmpty() ) { RECORD record; @@ -57,29 +74,7 @@ public void run() record = recordsQ.poll( 10, TimeUnit.MILLISECONDS ); if ( record != null ) { - if ( !initialized ) - { - // We assign threads to ids, first come first serve and the the thread assignment happens - // inside the record processing which accesses CacheAccess#client() and that happens - // lazily. So... we need to coordinate so that the first process of a record occurs - // in order of thread id. This may change later so that the thread ids are assigned - // explicitly on creating the threads... which should be much better, although hard with - // the current design due to the state living inside ThreadLocal which makes it depend - // on the actual and correct thread making the call... which is what we do here. - awaitMyTurnToInitialize(); - } - - process( record ); - - - if ( !initialized ) - { - // This was the first record, the first record processing has now happened and so we - // can notify the others that we have initialized this thread id and the next one - // can go ahead and do so. - tellNextThreadToInitialize(); - initialized = true; - } + processor.process( record ); } } catch ( InterruptedException e ) @@ -111,6 +106,4 @@ private void tellNextThreadToInitialize() boolean set = idQueue.compareAndSet( id-1, id ); assert set : "Something wrong with the design here"; } - - protected abstract void process( RECORD record ); } diff --git a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/RecordDistributor.java b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/RecordDistributor.java index dd3ea01d8525c..4e905fd420921 100644 --- a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/RecordDistributor.java +++ b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/RecordDistributor.java @@ -38,33 +38,24 @@ public static void distributeRecords( String workerNames, int queueSize, Iterable records, - ProgressListener progress, + final ProgressListener progress, RecordProcessor processor, QueueDistributor idDistributor ) { Iterator iterator = records.iterator(); - - // Run the first record in the main thread since there are filters in some - // checkers that change state on first and last record, state that may affect other concurrent - // processors. - if ( iterator.hasNext() ) - { - processor.process( iterator.next() ); - progress.add( 1 ); - } - else + if ( !iterator.hasNext() ) { - // No need to set up a bunch of threads if there are no records to process anyway return; } + @SuppressWarnings( "unchecked" ) final ArrayBlockingQueue[] recordQ = new ArrayBlockingQueue[numberOfThreads]; - final Workers> workers = new Workers<>( workerNames ); + final Workers> workers = new Workers<>( workerNames ); final AtomicInteger idGroup = new AtomicInteger( -1 ); for ( int threadId = 0; threadId < numberOfThreads; threadId++ ) { recordQ[threadId] = new ArrayBlockingQueue<>( queueSize ); - workers.start( new Worker<>( threadId, idGroup, recordQ[threadId], processor ) ); + workers.start( new RecordCheckWorker<>( threadId, idGroup, recordQ[threadId], processor ) ); } final int[] recsProcessed = new int[numberOfThreads]; @@ -78,37 +69,30 @@ public void accept( RECORD record, int qIndex ) throws InterruptedException } }; - RECORD last = null; - while ( iterator.hasNext() ) + try { - try + while ( iterator.hasNext() ) { - // Put records into the queues using the queue distributor. Each Worker will pull and process. - RECORD record = iterator.next(); - - // Detect the last record and defer processing that until after all the others - // since there are filters in some checkers that change state on first and last record, - // state that may affect other concurrent processors. - if ( !iterator.hasNext() ) + try + { + // Put records into the queues using the queue distributor. Each Worker will pull and process. + RECORD record = iterator.next(); + idDistributor.distribute( record, recordConsumer ); + progress.add( 1 ); + } + catch ( InterruptedException e ) { - last = record; + Thread.currentThread().interrupt(); break; } - idDistributor.distribute( record, recordConsumer ); } - catch ( InterruptedException e ) + + // No more records to distribute, mark as done so that the workers will exit when no more records in queue. + for ( RecordCheckWorker worker : workers ) { - Thread.currentThread().interrupt(); - break; + worker.done(); } - progress.add( 1 ); - } - for ( Worker worker : workers ) - { - worker.done(); - } - try - { + workers.awaitAndThrowOnError( RuntimeException.class ); } catch ( InterruptedException e ) @@ -116,30 +100,6 @@ public void accept( RECORD record, int qIndex ) throws InterruptedException Thread.currentThread().interrupt(); throw new RuntimeException( "Was interrupted while awaiting completion" ); } - - // Here we process the last record. Why? See comments above - if ( last != null ) - { - processor.process( last ); - progress.add( 1 ); - } - } - - private static class Worker extends RecordCheckWorker - { - private final RecordProcessor processor; - - Worker( int id, AtomicInteger idGroup, BlockingQueue recordsQ, RecordProcessor processor ) - { - super( id, idGroup, recordsQ ); - this.processor = processor; - } - - @Override - protected void process( RECORD record ) - { - processor.process( record ); - } } /** diff --git a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/RecordProcessor.java b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/RecordProcessor.java index 4142fb9c4079b..bfb4695629069 100644 --- a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/RecordProcessor.java +++ b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/RecordProcessor.java @@ -21,7 +21,25 @@ public interface RecordProcessor { + /** + * Must be called by the thread executing {@link #process(Object)}. + */ + void init( int id ); + void process( RECORD record ); void close(); + + public static abstract class Adapter implements RecordProcessor + { + @Override + public void init( int id ) + { + } + + @Override + public void close() + { + } + } } diff --git a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/StoreProcessor.java b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/StoreProcessor.java index e0089db07858d..ea8b9042c3ff0 100644 --- a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/StoreProcessor.java +++ b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/StoreProcessor.java @@ -211,17 +211,20 @@ public void applyFilteredParallel( final RecordSt throws Exception { cacheAccess.prepareForProcessingOfSingleStore( recordsPerCpu ); - RecordProcessor processor = new RecordProcessor() + RecordProcessor processor = new RecordProcessor.Adapter() { @Override - public void process( R record ) + public void init( int id ) { - store.accept( StoreProcessor.this, record ); + // Thread id assignment happens here, so do this before processing. Calles to this init + // method is ordered externally. + cacheAccess.client(); } @Override - public void close() + public void process( R record ) { + store.accept( StoreProcessor.this, record ); } }; distributeRecords( numberOfThreads, getClass().getSimpleName(), qSize, diff --git a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/TaskExecutor.java b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/TaskExecutor.java index 3a64967d80d8c..e8ae9be6f5055 100644 --- a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/TaskExecutor.java +++ b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/TaskExecutor.java @@ -21,17 +21,16 @@ import java.util.List; -import org.neo4j.helpers.progress.Completion; - public class TaskExecutor { - public static void execute( List tasks, Completion completion ) + public static void execute( List tasks, Runnable callBefore ) throws ConsistencyCheckIncompleteException { try { for ( Runnable task : tasks ) { + callBefore.run(); task.run(); } } diff --git a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/index/IndexEntryProcessor.java b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/index/IndexEntryProcessor.java index 02e74c05aa287..fded1eab46606 100644 --- a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/index/IndexEntryProcessor.java +++ b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/index/IndexEntryProcessor.java @@ -24,7 +24,7 @@ import org.neo4j.consistency.report.ConsistencyReporter; import org.neo4j.consistency.store.synthetic.IndexEntry; -public class IndexEntryProcessor implements RecordProcessor +public class IndexEntryProcessor extends RecordProcessor.Adapter { private final ConsistencyReporter reporter; private final IndexCheck indexCheck; @@ -40,9 +40,4 @@ public void process( Long nodeId ) { reporter.forIndexEntry( new IndexEntry( nodeId ), indexCheck ); } - - @Override - public void close() - { - } } diff --git a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/labelscan/LabelScanDocumentProcessor.java b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/labelscan/LabelScanDocumentProcessor.java index 306878e5e9f80..a780563bc6a7c 100644 --- a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/labelscan/LabelScanDocumentProcessor.java +++ b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/labelscan/LabelScanDocumentProcessor.java @@ -24,7 +24,7 @@ import org.neo4j.consistency.store.synthetic.LabelScanDocument; import org.neo4j.kernel.api.direct.NodeLabelRange; -public class LabelScanDocumentProcessor implements RecordProcessor +public class LabelScanDocumentProcessor extends RecordProcessor.Adapter { private final ConsistencyReporter reporter; private final LabelScanCheck labelScanCheck; @@ -40,9 +40,4 @@ public void process( NodeLabelRange nodeLabelRange ) { reporter.forNodeLabelScan( new LabelScanDocument( nodeLabelRange ), labelScanCheck ); } - - @Override - public void close() - { - } } diff --git a/community/consistency-check/src/test/java/org/neo4j/consistency/checking/full/ExecutionOrderIntegrationTest.java b/community/consistency-check/src/test/java/org/neo4j/consistency/checking/full/ExecutionOrderIntegrationTest.java index e299f22d6347e..8e8e5006ae0a0 100644 --- a/community/consistency-check/src/test/java/org/neo4j/consistency/checking/full/ExecutionOrderIntegrationTest.java +++ b/community/consistency-check/src/test/java/org/neo4j/consistency/checking/full/ExecutionOrderIntegrationTest.java @@ -217,6 +217,11 @@ private static class LogDecorator implements CheckDecorator this.log = log; } + @Override + public void prepare() + { + } + OwningRecordCheck logging( RecordCheck checker ) { diff --git a/community/consistency-check/src/test/java/org/neo4j/consistency/checking/full/RecordCheckWorkerTest.java b/community/consistency-check/src/test/java/org/neo4j/consistency/checking/full/RecordCheckWorkerTest.java index 89d73a4ed81fa..7175ce7c3d83f 100644 --- a/community/consistency-check/src/test/java/org/neo4j/consistency/checking/full/RecordCheckWorkerTest.java +++ b/community/consistency-check/src/test/java/org/neo4j/consistency/checking/full/RecordCheckWorkerTest.java @@ -34,34 +34,33 @@ public class RecordCheckWorkerTest { @Test - public void shouldDoInitialProcessingInOrder() throws Throwable + public void shouldDoProcessingInitializationInOrder() throws Throwable { // GIVEN final Race race = new Race(); final AtomicInteger coordination = new AtomicInteger( -1 ); final AtomicInteger expected = new AtomicInteger(); final int threads = 30; + @SuppressWarnings( "unchecked" ) final RecordCheckWorker[] workers = new RecordCheckWorker[threads]; - for ( int i = 0; i < threads; i++ ) + final RecordProcessor processor = new RecordProcessor.Adapter() { - final int id = i; - ArrayBlockingQueue queue = new ArrayBlockingQueue<>( 10 ); - queue.offer( i ); - race.addContestant( workers[i] = new RecordCheckWorker( i, coordination, queue ) + @Override + public void process( Integer record ) { - private boolean initialized; + // We're testing init() here, not really process() + } - @Override - protected void process( Integer record ) - { - if ( !initialized ) - { - // THEN - assertEquals( id, expected.getAndAdd( 1 ) ); - initialized = true; - } - } - } ); + @Override + public void init( int id ) + { + assertEquals( id, expected.getAndAdd( 1 ) ); + } + }; + for ( int id = 0; id < threads; id++ ) + { + ArrayBlockingQueue queue = new ArrayBlockingQueue<>( 10 ); + race.addContestant( workers[id] = new RecordCheckWorker<>( id, coordination, queue, processor ) ); } race.addContestant( new Runnable() { diff --git a/community/consistency-check/src/test/java/org/neo4j/consistency/checking/full/RecordDistributorTest.java b/community/consistency-check/src/test/java/org/neo4j/consistency/checking/full/RecordDistributorTest.java index c11bac98bf79b..d129990050ea7 100644 --- a/community/consistency-check/src/test/java/org/neo4j/consistency/checking/full/RecordDistributorTest.java +++ b/community/consistency-check/src/test/java/org/neo4j/consistency/checking/full/RecordDistributorTest.java @@ -21,69 +21,21 @@ import org.junit.Test; -import java.util.Collection; -import java.util.concurrent.atomic.AtomicInteger; - import org.neo4j.consistency.checking.full.QueueDistribution.QueueDistributor; import org.neo4j.consistency.checking.full.QueueDistribution.RelationshipNodesQueueDistributor; -import org.neo4j.consistency.checking.full.QueueDistribution.RoundRobinQueueDistributor; import org.neo4j.consistency.checking.full.RecordDistributor.RecordConsumer; import org.neo4j.kernel.impl.store.record.RelationshipRecord; -import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static java.util.Arrays.asList; - -import static org.neo4j.helpers.progress.ProgressListener.NONE; - public class RecordDistributorTest { - /** - * This test will not deterministically trigger the race which the fix inside {@link RecordDistributor} - * fixes, but very often. On the other hand the test is fast and will not report false failures either. - * Over time, as many builds are running this test, correctness will be asserted. - */ - @Test - public void shouldProcessFirstAndLastRecordFirstAndLast() throws Exception - { - // GIVEN - final Collection records = asList( 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 ); - final int count = records.size(); - final AtomicInteger calls = new AtomicInteger(); - RecordProcessor processor = new RecordProcessor() - { - @Override - public void process( Integer record ) - { - int call = calls.getAndIncrement(); - if ( record == 0 || record == count - 1 ) - { - assertEquals( record.intValue(), call ); - } - } - - @Override - public void close() - { - } - }; - - // WHEN - RecordDistributor.distributeRecords( count, getClass().getSimpleName(), 100, records, NONE, processor, - new RoundRobinQueueDistributor( count ) ); - - // THEN - assertEquals( count, calls.get() ); - } - @Test public void shouldDistributeRelationshipRecordsByNodeId() throws Exception { // GIVEN - int numberOfThreads = 5; QueueDistributor distributor = new RelationshipNodesQueueDistributor( 5 ); RecordConsumer consumer = mock( RecordConsumer.class );