From 41d346d73518e61b94933ac226bbffbbae2f970e Mon Sep 17 00:00:00 2001 From: Mattias Persson Date: Wed, 24 Feb 2016 10:29:15 +0100 Subject: [PATCH] Fixes CC issue where some relationship inconsistencies may be overlooked The initial design had a flaw, or a mismatch from one part of the code to another where relationship record processing expected that the records where divided up by its node ids, not its id. The RecordDistributor didn't do this, but the RelationshipRecordCheck code had that expectation. This would have the effect that relationships would randomly not be checked, depending on how many threads the machine had which ran the check. --- .../checking/cache/DefaultCacheAccess.java | 12 +- .../checking/full/ConsistencyCheckTasks.java | 59 ++++--- .../checking/full/ParallelRecordScanner.java | 9 +- .../checking/full/QueueDistribution.java | 113 ++++++++++++ .../checking/full/RecordCheckWorker.java | 52 +++++- .../checking/full/RecordDistributor.java | 54 ++++-- .../full/SchemaStoreProcessorTask.java | 5 +- .../checking/full/StoreProcessor.java | 4 +- .../checking/full/StoreProcessorTask.java | 13 +- ...etectAllRelationshipInconsistenciesIT.java | 163 ++++++++++++++++++ .../full/FullCheckIntegrationTest.java | 1 - .../checking/full/RecordCheckWorkerTest.java | 84 +++++++++ .../checking/full/RecordDistributorTest.java | 45 ++++- .../checking/full/RecordScannerTest.java | 2 +- .../checking/full/StoreProcessorTaskTest.java | 3 +- .../impl/store/record/RelationshipRecord.java | 20 ++- .../src/test/java/org/neo4j/test/Randoms.java | 3 +- 17 files changed, 582 insertions(+), 60 deletions(-) create mode 100644 community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/QueueDistribution.java create mode 100644 community/consistency-check/src/test/java/org/neo4j/consistency/checking/full/DetectAllRelationshipInconsistenciesIT.java create mode 100644 community/consistency-check/src/test/java/org/neo4j/consistency/checking/full/RecordCheckWorkerTest.java diff --git a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/cache/DefaultCacheAccess.java b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/cache/DefaultCacheAccess.java index 06cac80c6058a..40dedbe0b6fa6 100644 --- a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/cache/DefaultCacheAccess.java +++ b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/cache/DefaultCacheAccess.java @@ -47,12 +47,10 @@ protected Client initialValue( int id ) private final PackedMultiFieldCache cache; private long recordsPerCPU; private final Counts counts; - private final int threads; public DefaultCacheAccess( Counts counts, int threads ) { this.counts = counts; - this.threads = threads; this.propertiesProcessed = new Collection[threads]; this.cache = new PackedMultiFieldCache( 1, 63 ); } @@ -88,10 +86,10 @@ public boolean isForward() } @Override - public void prepareForProcessingOfSingleStore( long storeHighId ) + public void prepareForProcessingOfSingleStore( long recordsPerCpu ) { clients.resetId(); - recordsPerCPU = (storeHighId / threads) + 1; + this.recordsPerCPU = recordsPerCpu; } private class DefaultClient implements Client @@ -191,5 +189,11 @@ public void incAndGetCount( Type type ) { counts.incAndGet( type, threadIndex ); } + + @Override + public String toString() + { + return "Client[" + threadIndex + ", records/CPU:" + recordsPerCPU + "]"; + } } } diff --git a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/ConsistencyCheckTasks.java b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/ConsistencyCheckTasks.java index 3ffe0f4131348..05aa6babb101f 100644 --- a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/ConsistencyCheckTasks.java +++ b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/ConsistencyCheckTasks.java @@ -54,6 +54,7 @@ import static org.neo4j.consistency.checking.full.MultiPassStore.RELATIONSHIPS; import static org.neo4j.consistency.checking.full.MultiPassStore.RELATIONSHIP_GROUPS; import static org.neo4j.consistency.checking.full.MultiPassStore.STRINGS; +import static org.neo4j.consistency.checking.full.QueueDistribution.ROUND_ROBIN; public class ConsistencyCheckTasks { @@ -95,12 +96,12 @@ public List createTasksForFullCheck( boolean checkLabelS StoreProcessor processor = multiPass.processor( CheckStage.Stage1_NS_PropsLabels, PROPERTIES ); tasks.add( create( CheckStage.Stage1_NS_PropsLabels.name(), nativeStores.getNodeStore(), - processor ) ); + processor, ROUND_ROBIN ) ); //ReltionshipStore pass - check label counts using cached labels, check properties, skip nodes and relationships processor = multiPass.processor( CheckStage.Stage2_RS_Labels, LABELS ); multiPass.reDecorateRelationship( processor, RelationshipRecordCheck.relationshipRecordCheckForwardPass() ); tasks.add( create( CheckStage.Stage2_RS_Labels.name(), nativeStores.getRelationshipStore(), - processor ) ); + processor, ROUND_ROBIN ) ); //NodeStore pass - just cache nextRel and inUse tasks.add( new CacheTask.CacheNextRel( CheckStage.Stage3_NS_NextRel, cacheAccess, Scanner.scan( nativeStores.getNodeStore() ) ) ); //RelationshipStore pass - check nodes inUse, FirstInFirst, FirstInSecond using cached info @@ -108,7 +109,7 @@ public List createTasksForFullCheck( boolean checkLabelS multiPass.reDecorateRelationship( processor, RelationshipRecordCheck.relationshipRecordCheckBackwardPass( new PropertyChain<>( mandatoryProperties.forRelationships( reporter ) ) ) ); tasks.add( create( CheckStage.Stage4_RS_NextRel.name(), nativeStores.getRelationshipStore(), - processor ) ); + processor, ROUND_ROBIN ) ); //NodeStore pass - just cache nextRel and inUse multiPass.reDecorateNode( processor, NodeRecordCheck.toCheckNextRel(), true ); multiPass.reDecorateNode( processor, NodeRecordCheck.toCheckNextRelationshipGroup(), false ); @@ -119,68 +120,71 @@ public List createTasksForFullCheck( boolean checkLabelS multiPass.reDecorateRelationship( processor, RelationshipRecordCheck.relationshipRecordCheckSourceChain() ); tasks.add( create( CheckStage.Stage6_RS_Forward.name(), nativeStores.getRelationshipStore(), - processor ) ); + processor, QueueDistribution.RELATIONSHIPS ) ); //RelationshipStore pass - reverse scan of source chain using the cache. processor = multiPass.processor( CheckStage.Stage7_RS_Backward, RELATIONSHIPS ); multiPass.reDecorateRelationship( processor, RelationshipRecordCheck.relationshipRecordCheckSourceChain() ); tasks.add( create( CheckStage.Stage7_RS_Backward.name(), nativeStores.getRelationshipStore(), - processor ) ); + processor, QueueDistribution.RELATIONSHIPS ) ); //relationshipGroup StoreProcessor relGrpProcessor = multiPass.processor( Stage.PARALLEL_FORWARD, RELATIONSHIP_GROUPS ); tasks.add( create( "RelationshipGroupStore-RelGrp", nativeStores.getRelationshipGroupStore(), - relGrpProcessor ) ); + relGrpProcessor, ROUND_ROBIN ) ); PropertyReader propertyReader = new PropertyReader( nativeStores ); tasks.add( recordScanner( CheckStage.Stage8_PS_Props.name(), new IterableStore<>( nativeStores.getNodeStore(), true ), new PropertyAndNode2LabelIndexProcessor( reporter, (checkIndexes ? indexes : null ), propertyReader, cacheAccess, mandatoryProperties.forNodes( reporter ) ), - CheckStage.Stage8_PS_Props, new IterableStore<>( nativeStores.getPropertyStore(), true ) ) ); + CheckStage.Stage8_PS_Props, ROUND_ROBIN, + new IterableStore<>( nativeStores.getPropertyStore(), true ) ) ); tasks.add( create( "StringStore-Str", nativeStores.getStringStore(), - multiPass.processor( Stage.SEQUENTIAL_FORWARD, STRINGS ) ) ); + multiPass.processor( Stage.SEQUENTIAL_FORWARD, STRINGS ), ROUND_ROBIN ) ); tasks.add( create( "ArrayStore-Arrays", nativeStores.getArrayStore(), - multiPass.processor( Stage.SEQUENTIAL_FORWARD, ARRAYS ) ) ); + multiPass.processor( Stage.SEQUENTIAL_FORWARD, ARRAYS ), ROUND_ROBIN ) ); } // The schema store is verified in multiple passes that share state since it fits into memory // and we care about the consistency of back references (cf. SemanticCheck) // PASS 1: Dynamic record chains - tasks.add( create( "SchemaStore", nativeStores.getSchemaStore() ) ); + tasks.add( create( "SchemaStore", nativeStores.getSchemaStore(), ROUND_ROBIN ) ); // PASS 2: Rule integrity and obligation build up final SchemaRecordCheck schemaCheck = new SchemaRecordCheck( new SchemaStorage( nativeStores.getSchemaStore() ) ); tasks.add( new SchemaStoreProcessorTask<>( "SchemaStoreProcessor-check_rules", statistics, numberOfThreads, nativeStores.getSchemaStore(), nativeStores, "check_rules", - schemaCheck, progress, cacheAccess, defaultProcessor ) ); + schemaCheck, progress, cacheAccess, defaultProcessor, ROUND_ROBIN ) ); // PASS 3: Obligation verification and semantic rule uniqueness tasks.add( new SchemaStoreProcessorTask<>( "SchemaStoreProcessor-check_obligations", statistics, numberOfThreads, nativeStores.getSchemaStore(), nativeStores, - "check_obligations", schemaCheck.forObligationChecking(), progress, cacheAccess, defaultProcessor ) ); + "check_obligations", schemaCheck.forObligationChecking(), progress, cacheAccess, defaultProcessor, + ROUND_ROBIN ) ); if ( checkGraph ) { - tasks.add( create( "RelationshipTypeTokenStore", nativeStores.getRelationshipTypeTokenStore() ) ); - tasks.add( create( "PropertyKeyTokenStore", nativeStores.getPropertyKeyTokenStore() ) ); - tasks.add( create( "LabelTokenStore", nativeStores.getLabelTokenStore() ) ); - tasks.add( create( "RelationshipTypeNameStore", nativeStores.getRelationshipTypeNameStore() ) ); - tasks.add( create( "PropertyKeyNameStore", nativeStores.getPropertyKeyNameStore() ) ); - tasks.add( create( "LabelNameStore", nativeStores.getLabelNameStore() ) ); - tasks.add( create( "NodeDynamicLabelStore", nativeStores.getNodeDynamicLabelStore() ) ); + tasks.add( create( "RelationshipTypeTokenStore", nativeStores.getRelationshipTypeTokenStore(), ROUND_ROBIN ) ); + tasks.add( create( "PropertyKeyTokenStore", nativeStores.getPropertyKeyTokenStore(), ROUND_ROBIN ) ); + tasks.add( create( "LabelTokenStore", nativeStores.getLabelTokenStore(), ROUND_ROBIN ) ); + tasks.add( create( "RelationshipTypeNameStore", nativeStores.getRelationshipTypeNameStore(), ROUND_ROBIN ) ); + tasks.add( create( "PropertyKeyNameStore", nativeStores.getPropertyKeyNameStore(), ROUND_ROBIN ) ); + tasks.add( create( "LabelNameStore", nativeStores.getLabelNameStore(), ROUND_ROBIN ) ); + tasks.add( create( "NodeDynamicLabelStore", nativeStores.getNodeDynamicLabelStore(), ROUND_ROBIN ) ); } if ( checkLabelScanStore ) { tasks.add( recordScanner( "NodeStoreToLabelScanStore", new IterableStore<>( nativeStores.getNodeStore(), true ), new NodeToLabelScanRecordProcessor( reporter, labelScanStore ), - CheckStage.Stage9_NS_LabelCounts ) ); + CheckStage.Stage9_NS_LabelCounts, ROUND_ROBIN ) ); } ConsistencyReporter filteredReporter = multiPass.reporter( NODES ); if ( checkLabelScanStore ) { tasks.add( recordScanner( "LabelScanStore", labelScanStore.newAllEntriesReader(), new LabelScanDocumentProcessor( - filteredReporter, new LabelScanCheck() ), Stage.SEQUENTIAL_FORWARD ) ); + filteredReporter, new LabelScanCheck() ), Stage.SEQUENTIAL_FORWARD, + ROUND_ROBIN ) ); } if ( checkIndexes ) { @@ -189,7 +193,7 @@ public List createTasksForFullCheck( boolean checkLabelS tasks.add( recordScanner( format( "Index_%d", indexRule.getId() ), new IndexIterator( indexes.accessorFor( indexRule ) ), new IndexEntryProcessor( filteredReporter, new IndexCheck( indexRule ) ), - Stage.SEQUENTIAL_FORWARD ) ); + Stage.SEQUENTIAL_FORWARD, ROUND_ROBIN ) ); } } return tasks; @@ -197,26 +201,27 @@ public List createTasksForFullCheck( boolean checkLabelS private RecordScanner recordScanner( String name, BoundedIterable store, RecordProcessor processor, Stage stage, + QueueDistribution distribution, @SuppressWarnings( "rawtypes" ) IterableStore... warmupStores ) { return stage.isParallel() ? new ParallelRecordScanner<>( name, statistics, numberOfThreads, store, progress, processor, - cacheAccess, warmupStores ) + cacheAccess, distribution, warmupStores ) : new SequentialRecordScanner<>( name, statistics, numberOfThreads, store, progress, processor, warmupStores ); } private StoreProcessorTask create( String name, - RecordStore input ) + RecordStore input, QueueDistribution distribution ) { return new StoreProcessorTask<>( name, statistics, numberOfThreads, input, nativeStores, name, progress, - cacheAccess, defaultProcessor ); + cacheAccess, defaultProcessor, distribution ); } private StoreProcessorTask create( String name, - RecordStore input, StoreProcessor processor ) + RecordStore input, StoreProcessor processor, QueueDistribution distribution ) { return new StoreProcessorTask<>( name, statistics, numberOfThreads, input, nativeStores, name, progress, - cacheAccess, processor ); + cacheAccess, processor, distribution ); } } diff --git a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/ParallelRecordScanner.java b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/ParallelRecordScanner.java index 5a0d1c6654b86..6cbcb6c169161 100644 --- a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/ParallelRecordScanner.java +++ b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/ParallelRecordScanner.java @@ -20,6 +20,7 @@ package org.neo4j.consistency.checking.full; import org.neo4j.consistency.checking.cache.CacheAccess; +import org.neo4j.consistency.checking.full.QueueDistribution.QueueDistributor; import org.neo4j.consistency.statistics.Statistics; import org.neo4j.helpers.progress.ProgressMonitorFactory.MultiPartBuilder; import org.neo4j.kernel.api.direct.BoundedIterable; @@ -30,22 +31,26 @@ public class ParallelRecordScanner extends RecordScanner { private final CacheAccess cacheAccess; + private final QueueDistribution distribution; public ParallelRecordScanner( String name, Statistics statistics, int threads, BoundedIterable store, MultiPartBuilder builder, RecordProcessor processor, CacheAccess cacheAccess, + QueueDistribution distribution, IterableStore... warmUpStores ) { super( name, statistics, threads, store, builder, processor, warmUpStores ); this.cacheAccess = cacheAccess; + this.distribution = distribution; } @Override protected void scan() { - long recordsPerCPU = (store.maxCount() / numberOfThreads) + 1; + long recordsPerCPU = RecordDistributor.calculateRecodsPerCpu( store.maxCount(), numberOfThreads ); cacheAccess.prepareForProcessingOfSingleStore( recordsPerCPU ); + QueueDistributor distributor = distribution.distributor( recordsPerCPU, numberOfThreads ); distributeRecords( numberOfThreads, getClass().getSimpleName() + "-" + name, - DEFAULT_QUEUE_SIZE, store, progress, processor ); + DEFAULT_QUEUE_SIZE, store, progress, processor, distributor ); } } diff --git a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/QueueDistribution.java b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/QueueDistribution.java new file mode 100644 index 0000000000000..ab3259aa1897f --- /dev/null +++ b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/QueueDistribution.java @@ -0,0 +1,113 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.consistency.checking.full; + +import org.neo4j.consistency.checking.full.RecordDistributor.RecordConsumer; +import org.neo4j.kernel.impl.store.record.RelationshipRecord; + +/** + * Factory for creating {@link QueueDistribution}. Typically the distribution type is decided higher up + * in the call stack and the actual {@link QueueDistributor} is instantiated when more data is available + * deeper down in the call stack. + */ +public interface QueueDistribution +{ + QueueDistributor distributor( long recordsPerCpu, int numberOfThreads ); + + /** + * Distributes records into {@link RecordConsumer}. + */ + public interface QueueDistributor + { + void distribute( RECORD record, RecordConsumer consumer ) throws InterruptedException; + } + + /** + * Distributes records round-robin style to all queues. + */ + public static final QueueDistribution ROUND_ROBIN = new QueueDistribution() + { + @Override + public QueueDistributor distributor( long recordsPerCpu, int numberOfThreads ) + { + return new RoundRobinQueueDistributor<>( numberOfThreads ); + } + }; + + /** + * Distributes {@link RelationshipRecord} depending on the start/end node ids. + */ + public static final QueueDistribution RELATIONSHIPS = new QueueDistribution() + { + @Override + public QueueDistributor distributor( long recordsPerCpu, int numberOfThreads ) + { + return new RelationshipNodesQueueDistributor( recordsPerCpu ); + } + }; + + static class RoundRobinQueueDistributor implements QueueDistributor + { + private final int numberOfThreads; + private int nextQIndex; + + public RoundRobinQueueDistributor( int numberOfThreads ) + { + this.numberOfThreads = numberOfThreads; + } + + @Override + public void distribute( RECORD record, RecordConsumer consumer ) throws InterruptedException + { + nextQIndex = (nextQIndex + 1) % numberOfThreads; + consumer.accept( record, nextQIndex ); + } + } + + static class RelationshipNodesQueueDistributor implements QueueDistributor + { + private final long recordsPerCpu; + + public RelationshipNodesQueueDistributor( long recordsPerCpu ) + { + this.recordsPerCpu = recordsPerCpu; + } + + @Override + public void distribute( RelationshipRecord relationship, RecordConsumer consumer ) + throws InterruptedException + { + int qIndex1 = (int) (relationship.getFirstNode() / recordsPerCpu); + int qIndex2 = (int) (relationship.getSecondNode() / recordsPerCpu); + try + { + consumer.accept( relationship, qIndex1 ); + if ( qIndex1 != qIndex2 ) + { + consumer.accept( relationship, qIndex2 ); + } + } + catch ( ArrayIndexOutOfBoundsException e ) + { + throw e; + } + } + } +} diff --git a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/RecordCheckWorker.java b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/RecordCheckWorker.java index 4eed0ad588294..5858719a0ea08 100644 --- a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/RecordCheckWorker.java +++ b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/RecordCheckWorker.java @@ -21,6 +21,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; /** * Base class for workers that processes records during consistency check. @@ -29,9 +30,14 @@ public abstract class RecordCheckWorker implements Runnable { private volatile boolean done; protected final BlockingQueue recordsQ; + private final int id; + private final AtomicInteger idQueue; + private boolean initialized; - public RecordCheckWorker( BlockingQueue recordsQ ) + public RecordCheckWorker( int id, AtomicInteger idQueue, BlockingQueue recordsQ ) { + this.id = id; + this.idQueue = idQueue; this.recordsQ = recordsQ; } @@ -51,7 +57,29 @@ public void run() record = recordsQ.poll( 10, TimeUnit.MILLISECONDS ); if ( record != null ) { + if ( !initialized ) + { + // We assign threads to ids, first come first serve and the the thread assignment happens + // inside the record processing which accesses CacheAccess#client() and that happens + // lazily. So... we need to coordinate so that the first process of a record occurs + // in order of thread id. This may change later so that the thread ids are assigned + // explicitly on creating the threads... which should be much better, although hard with + // the current design due to the state living inside ThreadLocal which makes it depend + // on the actual and correct thread making the call... which is what we do here. + awaitMyTurnToInitialize(); + } + process( record ); + + + if ( !initialized ) + { + // This was the first record, the first record processing has now happened and so we + // can notify the others that we have initialized this thread id and the next one + // can go ahead and do so. + tellNextThreadToInitialize(); + initialized = true; + } } } catch ( InterruptedException e ) @@ -62,5 +90,27 @@ record = recordsQ.poll( 10, TimeUnit.MILLISECONDS ); } } + private void awaitMyTurnToInitialize() + { + while ( idQueue.get() < id-1 ) + { + try + { + Thread.sleep( 10 ); + } + catch ( InterruptedException e ) + { + Thread.interrupted(); + break; + } + } + } + + private void tellNextThreadToInitialize() + { + boolean set = idQueue.compareAndSet( id-1, id ); + assert set : "Something wrong with the design here"; + } + protected abstract void process( RECORD record ); } diff --git a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/RecordDistributor.java b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/RecordDistributor.java index 8331c565a5441..dd3ea01d8525c 100644 --- a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/RecordDistributor.java +++ b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/RecordDistributor.java @@ -22,7 +22,9 @@ import java.util.Iterator; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import org.neo4j.consistency.checking.full.QueueDistribution.QueueDistributor; import org.neo4j.helpers.progress.ProgressListener; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.string.Workers; @@ -37,7 +39,8 @@ public static void distributeRecords( int queueSize, Iterable records, ProgressListener progress, - RecordProcessor processor ) + RecordProcessor processor, + QueueDistributor idDistributor ) { Iterator iterator = records.iterator(); @@ -55,24 +58,32 @@ public static void distributeRecords( return; } - ArrayBlockingQueue[] recordQ = new ArrayBlockingQueue[numberOfThreads]; - Workers> workers = new Workers<>( workerNames ); + final ArrayBlockingQueue[] recordQ = new ArrayBlockingQueue[numberOfThreads]; + final Workers> workers = new Workers<>( workerNames ); + final AtomicInteger idGroup = new AtomicInteger( -1 ); for ( int threadId = 0; threadId < numberOfThreads; threadId++ ) { recordQ[threadId] = new ArrayBlockingQueue<>( queueSize ); - workers.start( new Worker<>( recordQ[threadId], processor ) ); + workers.start( new Worker<>( threadId, idGroup, recordQ[threadId], processor ) ); } - int[] recsProcessed = new int[numberOfThreads]; - int qIndex = 0; + final int[] recsProcessed = new int[numberOfThreads]; + RecordConsumer recordConsumer = new RecordConsumer() + { + @Override + public void accept( RECORD record, int qIndex ) throws InterruptedException + { + recordQ[qIndex].put( record ); + recsProcessed[qIndex]++; + } + }; RECORD last = null; while ( iterator.hasNext() ) { try { - // Put records round-robin style into the queue of each thread, where a Worker - // will sit and pull from and process. + // Put records into the queues using the queue distributor. Each Worker will pull and process. RECORD record = iterator.next(); // Detect the last record and defer processing that until after all the others @@ -83,9 +94,7 @@ public static void distributeRecords( last = record; break; } - qIndex = (qIndex + 1)%numberOfThreads; - recordQ[qIndex].put( record ); - recsProcessed[qIndex]++; + idDistributor.distribute( record, recordConsumer ); } catch ( InterruptedException e ) { @@ -120,9 +129,9 @@ private static class Worker extends RecordCheckWorker { private final RecordProcessor processor; - Worker( BlockingQueue recordsQ, RecordProcessor processor ) + Worker( int id, AtomicInteger idGroup, BlockingQueue recordsQ, RecordProcessor processor ) { - super( recordsQ ); + super( id, idGroup, recordsQ ); this.processor = processor; } @@ -132,4 +141,23 @@ protected void process( RECORD record ) processor.process( record ); } } + + /** + * Consumers records from a {@link QueueDistribution}, feeding into correct queue. + */ + interface RecordConsumer + { + void accept( RECORD record, int qIndex ) throws InterruptedException; + } + + public static long calculateRecodsPerCpu( long highId, int numberOfThreads ) + { + boolean hasRest = highId % numberOfThreads > 0; + long result = highId / numberOfThreads; + if ( hasRest ) + { + result++; + } + return result; + } } diff --git a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/SchemaStoreProcessorTask.java b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/SchemaStoreProcessorTask.java index ca1aa4d90450f..5d8d5487e94e8 100644 --- a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/SchemaStoreProcessorTask.java +++ b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/SchemaStoreProcessorTask.java @@ -37,10 +37,11 @@ public SchemaStoreProcessorTask( String name, Statistics statistics, int threads SchemaRecordCheck schemaRecordCheck, ProgressMonitorFactory.MultiPartBuilder builder, CacheAccess cacheAccess, - StoreProcessor processor ) + StoreProcessor processor, + QueueDistribution distribution ) { super( name, statistics, threads, store, storeAccess, builderPrefix, - builder, cacheAccess, processor ); + builder, cacheAccess, processor, distribution ); this.schemaRecordCheck = schemaRecordCheck; } diff --git a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/StoreProcessor.java b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/StoreProcessor.java index e069f2e3f6aae..e0089db07858d 100644 --- a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/StoreProcessor.java +++ b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/StoreProcessor.java @@ -25,6 +25,7 @@ import org.neo4j.consistency.checking.RecordCheck; import org.neo4j.consistency.checking.SchemaRecordCheck; import org.neo4j.consistency.checking.cache.CacheAccess; +import org.neo4j.consistency.checking.full.QueueDistribution.QueueDistributor; import org.neo4j.consistency.report.ConsistencyReport; import org.neo4j.consistency.report.ConsistencyReport.DynamicLabelConsistencyReport; import org.neo4j.consistency.report.ConsistencyReport.RelationshipGroupConsistencyReport; @@ -205,6 +206,7 @@ public void processSchema( RecordStore store, DynamicRecord schem public void applyFilteredParallel( final RecordStore store, final ProgressListener progressListener, int numberOfThreads, long recordsPerCpu, + final QueueDistributor distributor, Predicate... filters ) throws Exception { @@ -223,6 +225,6 @@ public void close() } }; distributeRecords( numberOfThreads, getClass().getSimpleName(), qSize, - scan( store, stage.isForward(), filters ), progressListener, processor ); + scan( store, stage.isForward(), filters ), progressListener, processor, distributor ); } } diff --git a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/StoreProcessorTask.java b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/StoreProcessorTask.java index 374ae02d2d5ce..b1f5efb81ebfb 100644 --- a/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/StoreProcessorTask.java +++ b/community/consistency-check/src/main/java/org/neo4j/consistency/checking/full/StoreProcessorTask.java @@ -20,6 +20,7 @@ package org.neo4j.consistency.checking.full; import org.neo4j.consistency.checking.cache.CacheAccess; +import org.neo4j.consistency.checking.full.QueueDistribution.QueueDistributor; import org.neo4j.consistency.statistics.Statistics; import org.neo4j.helpers.Exceptions; import org.neo4j.helpers.progress.ProgressListener; @@ -37,16 +38,18 @@ public class StoreProcessorTask extends Consistenc private final ProgressListener progressListener; private final StoreAccess storeAccess; private final CacheAccess cacheAccess; + private final QueueDistribution distribution; StoreProcessorTask( String name, Statistics statistics, int threads, RecordStore store, StoreAccess storeAccess, String builderPrefix, ProgressMonitorFactory.MultiPartBuilder builder, CacheAccess cacheAccess, - StoreProcessor processor ) + StoreProcessor processor, QueueDistribution distribution ) { super( name, statistics, threads ); this.store = store; this.storeAccess = storeAccess; this.cacheAccess = cacheAccess; this.processor = processor; + this.distribution = distribution; this.progressListener = builder.progressForPart( name + indexedPartName( store.getStorageFileName().getName(), builderPrefix ), store.getHighId() ); } @@ -83,10 +86,12 @@ else if ( processor.getStage() == CheckStage.Stage8_PS_Props ) } else { - highId = storeAccess.getRelationshipStore().getHighId(); + highId = storeAccess.getNodeStore().getHighId(); } - long recordsPerCPU = (highId / numberOfThreads) + 1; - processor.applyFilteredParallel( store, progressListener, numberOfThreads, recordsPerCPU ); + long recordsPerCPU = RecordDistributor.calculateRecodsPerCpu( highId, numberOfThreads ); + QueueDistributor distributor = distribution.distributor( recordsPerCPU, numberOfThreads ); + processor.applyFilteredParallel( store, progressListener, numberOfThreads, recordsPerCPU, + distributor ); } else { diff --git a/community/consistency-check/src/test/java/org/neo4j/consistency/checking/full/DetectAllRelationshipInconsistenciesIT.java b/community/consistency-check/src/test/java/org/neo4j/consistency/checking/full/DetectAllRelationshipInconsistenciesIT.java new file mode 100644 index 0000000000000..69282da8d9edf --- /dev/null +++ b/community/consistency-check/src/test/java/org/neo4j/consistency/checking/full/DetectAllRelationshipInconsistenciesIT.java @@ -0,0 +1,163 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.consistency.checking.full; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.RuleChain; + +import org.neo4j.consistency.RecordType; +import org.neo4j.consistency.report.ConsistencySummaryStatistics; +import org.neo4j.consistency.statistics.Statistics; +import org.neo4j.graphdb.Node; +import org.neo4j.graphdb.Relationship; +import org.neo4j.graphdb.Transaction; +import org.neo4j.graphdb.factory.GraphDatabaseFactory; +import org.neo4j.graphdb.factory.GraphDatabaseSettings; +import org.neo4j.helpers.progress.ProgressMonitorFactory; +import org.neo4j.kernel.GraphDatabaseAPI; +import org.neo4j.kernel.api.direct.DirectStoreAccess; +import org.neo4j.kernel.api.index.SchemaIndexProvider; +import org.neo4j.kernel.api.labelscan.LabelScanStore; +import org.neo4j.kernel.configuration.Config; +import org.neo4j.kernel.impl.MyRelTypes; +import org.neo4j.kernel.impl.store.NeoStores; +import org.neo4j.kernel.impl.store.RelationshipStore; +import org.neo4j.kernel.impl.store.StoreAccess; +import org.neo4j.kernel.impl.store.record.RelationshipRecord; +import org.neo4j.logging.AssertableLogProvider; +import org.neo4j.test.RandomRule; +import org.neo4j.test.TargetDirectory; +import static org.junit.Assert.assertTrue; + +import static org.neo4j.helpers.collection.MapUtil.stringMap; + +public class DetectAllRelationshipInconsistenciesIT +{ + public final TargetDirectory.TestDirectory directory = TargetDirectory.testDirForTest( getClass() ); + public final RandomRule random = new RandomRule(); + @Rule + public final RuleChain rules = RuleChain.outerRule( random ).around( directory ); + + @Test + public void shouldDetectSabotagedRelationshipWhereEverItIs() throws Exception + { + // GIVEN a database which lots of relationships + GraphDatabaseAPI db = (GraphDatabaseAPI) + new GraphDatabaseFactory().newEmbeddedDatabase( directory.absolutePath() ); + Sabotage sabotage; + try + { + Node[] nodes = new Node[1_000]; + Relationship[] relationships = new Relationship[10_000]; + try ( Transaction tx = db.beginTx() ) + { + for ( int i = 0; i < nodes.length; i++ ) + { + nodes[i] = db.createNode(); + } + for ( int i = 0; i < 10_000; i++ ) + { + relationships[i] = + random.among( nodes ).createRelationshipTo( random.among( nodes ), MyRelTypes.TEST ); + } + tx.success(); + } + + // WHEN sabotaging a random relationship + NeoStores neoStores = db.getDependencyResolver().resolveDependency( NeoStores.class ); + RelationshipStore relationshipStore = neoStores.getRelationshipStore(); + Relationship sabotagedRelationships = random.among( relationships ); + sabotage = sabotage( relationshipStore, sabotagedRelationships.getId() ); + } + finally + { + db.shutdown(); + } + + // THEN the checker should find it, where ever it is in the store + db = (GraphDatabaseAPI) new GraphDatabaseFactory().newEmbeddedDatabase( directory.absolutePath() ); + try + { + StoreAccess storeAccess = new StoreAccess( db ).initialize(); + DirectStoreAccess directStoreAccess = new DirectStoreAccess( storeAccess, + db.getDependencyResolver().resolveDependency( LabelScanStore.class ), + db.getDependencyResolver().resolveDependency( SchemaIndexProvider.class ) ); + + int threads = random.intBetween( 2, 10 ); + FullCheck checker = + new FullCheck( new Config( stringMap( GraphDatabaseSettings.pagecache_memory.name(), "8m" ) ), + ProgressMonitorFactory.NONE, Statistics.NONE, + threads ); + AssertableLogProvider logProvider = new AssertableLogProvider( true ); + ConsistencySummaryStatistics summary = checker.execute( directStoreAccess, + logProvider.getLog( FullCheck.class ) ); + int relationshipInconsistencies = summary.getInconsistencyCountForRecordType( + RecordType.RELATIONSHIP ); + + assertTrue( "Couldn't detect sabotaged relationship " + sabotage, relationshipInconsistencies > 0 ); + logProvider.assertContainsLogCallContaining( sabotage.after.toString() ); + } + finally + { + db.shutdown(); + } + } + + private static class Sabotage + { + private final RelationshipRecord before; + private final RelationshipRecord after; + private final RelationshipRecord other; + + Sabotage( RelationshipRecord before, RelationshipRecord after, RelationshipRecord other ) + { + this.before = before; + this.after = after; + this.other = other; + } + + @Override + public String toString() + { + return "Sabotabed " + before + " --> " + after + ", other relationship " + other; + } + } + + private Sabotage sabotage( RelationshipStore relationshipStore, long id ) + { + RelationshipRecord before = relationshipStore.getRecord( id ); + RelationshipRecord after = before.clone(); + + long otherReference; + if ( !after.isFirstInFirstChain() ) + { + after.setFirstPrevRel( otherReference = after.getFirstPrevRel() + 1 ); + } + else + { + after.setFirstNextRel( otherReference = after.getFirstNextRel() + 1 ); + } + + relationshipStore.updateRecord( after ); + RelationshipRecord other = relationshipStore.forceGetRecord( otherReference ); + return new Sabotage( before, after, other ); + } +} diff --git a/community/consistency-check/src/test/java/org/neo4j/consistency/checking/full/FullCheckIntegrationTest.java b/community/consistency-check/src/test/java/org/neo4j/consistency/checking/full/FullCheckIntegrationTest.java index 4e905be1939b2..0f460127f1118 100644 --- a/community/consistency-check/src/test/java/org/neo4j/consistency/checking/full/FullCheckIntegrationTest.java +++ b/community/consistency-check/src/test/java/org/neo4j/consistency/checking/full/FullCheckIntegrationTest.java @@ -107,7 +107,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; - import static java.lang.String.format; import static java.util.Arrays.asList; import static java.util.Collections.singleton; diff --git a/community/consistency-check/src/test/java/org/neo4j/consistency/checking/full/RecordCheckWorkerTest.java b/community/consistency-check/src/test/java/org/neo4j/consistency/checking/full/RecordCheckWorkerTest.java new file mode 100644 index 0000000000000..89d73a4ed81fa --- /dev/null +++ b/community/consistency-check/src/test/java/org/neo4j/consistency/checking/full/RecordCheckWorkerTest.java @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.consistency.checking.full; + +import org.junit.Test; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import org.neo4j.test.Race; + +import static org.junit.Assert.assertEquals; + +import static java.lang.System.currentTimeMillis; +import static java.util.concurrent.TimeUnit.SECONDS; + +public class RecordCheckWorkerTest +{ + @Test + public void shouldDoInitialProcessingInOrder() throws Throwable + { + // GIVEN + final Race race = new Race(); + final AtomicInteger coordination = new AtomicInteger( -1 ); + final AtomicInteger expected = new AtomicInteger(); + final int threads = 30; + final RecordCheckWorker[] workers = new RecordCheckWorker[threads]; + for ( int i = 0; i < threads; i++ ) + { + final int id = i; + ArrayBlockingQueue queue = new ArrayBlockingQueue<>( 10 ); + queue.offer( i ); + race.addContestant( workers[i] = new RecordCheckWorker( i, coordination, queue ) + { + private boolean initialized; + + @Override + protected void process( Integer record ) + { + if ( !initialized ) + { + // THEN + assertEquals( id, expected.getAndAdd( 1 ) ); + initialized = true; + } + } + } ); + } + race.addContestant( new Runnable() + { + @Override + public void run() + { + long end = currentTimeMillis() + SECONDS.toMillis( 10 ); + while ( currentTimeMillis() < end && expected.get() < threads ); + assertEquals( threads, expected.get() ); + for ( RecordCheckWorker worker : workers ) + { + worker.done(); + } + } + } ); + + // WHEN + race.go(); + } +} diff --git a/community/consistency-check/src/test/java/org/neo4j/consistency/checking/full/RecordDistributorTest.java b/community/consistency-check/src/test/java/org/neo4j/consistency/checking/full/RecordDistributorTest.java index 37afcc627b42d..c11bac98bf79b 100644 --- a/community/consistency-check/src/test/java/org/neo4j/consistency/checking/full/RecordDistributorTest.java +++ b/community/consistency-check/src/test/java/org/neo4j/consistency/checking/full/RecordDistributorTest.java @@ -24,7 +24,16 @@ import java.util.Collection; import java.util.concurrent.atomic.AtomicInteger; +import org.neo4j.consistency.checking.full.QueueDistribution.QueueDistributor; +import org.neo4j.consistency.checking.full.QueueDistribution.RelationshipNodesQueueDistributor; +import org.neo4j.consistency.checking.full.QueueDistribution.RoundRobinQueueDistributor; +import org.neo4j.consistency.checking.full.RecordDistributor.RecordConsumer; +import org.neo4j.kernel.impl.store.record.RelationshipRecord; + import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static java.util.Arrays.asList; @@ -63,9 +72,43 @@ public void close() }; // WHEN - RecordDistributor.distributeRecords( count, getClass().getSimpleName(), 100, records, NONE, processor ); + RecordDistributor.distributeRecords( count, getClass().getSimpleName(), 100, records, NONE, processor, + new RoundRobinQueueDistributor( count ) ); // THEN assertEquals( count, calls.get() ); } + + @Test + public void shouldDistributeRelationshipRecordsByNodeId() throws Exception + { + // GIVEN + int numberOfThreads = 5; + QueueDistributor distributor = new RelationshipNodesQueueDistributor( 5 ); + RecordConsumer consumer = mock( RecordConsumer.class ); + + // WHEN/THEN + RelationshipRecord relationship = relationship( 0, 0, 1 ); + distributor.distribute( relationship, consumer ); + verify( consumer, times( 1 ) ).accept( relationship, 0 ); + + relationship = relationship( 1, 0, 7 ); + distributor.distribute( relationship, consumer ); + verify( consumer, times( 1 ) ).accept( relationship, 0 ); + verify( consumer, times( 1 ) ).accept( relationship, 1 ); + + relationship = relationship( 3, 26, 11 ); + distributor.distribute( relationship, consumer ); + verify( consumer, times( 1 ) ).accept( relationship, 5 ); + verify( consumer, times( 1 ) ).accept( relationship, 2 ); + } + + private RelationshipRecord relationship( long id, long startNodeId, long endNodeId ) + { + RelationshipRecord record = new RelationshipRecord( id ); + record.setInUse( true ); + record.setFirstNode( startNodeId ); + record.setSecondNode( endNodeId ); + return record; + } } diff --git a/community/consistency-check/src/test/java/org/neo4j/consistency/checking/full/RecordScannerTest.java b/community/consistency-check/src/test/java/org/neo4j/consistency/checking/full/RecordScannerTest.java index 88dafdf283be6..fcfaafe3ae64f 100644 --- a/community/consistency-check/src/test/java/org/neo4j/consistency/checking/full/RecordScannerTest.java +++ b/community/consistency-check/src/test/java/org/neo4j/consistency/checking/full/RecordScannerTest.java @@ -81,7 +81,7 @@ public void shouldProcessRecordsParallelAndUpdateProgress() throws Exception RecordProcessor recordProcessor = mock( RecordProcessor.class ); RecordScanner scanner = new ParallelRecordScanner<>( "our test task", Statistics.NONE, 1, store, - progressBuilder, recordProcessor, CacheAccess.EMPTY ); + progressBuilder, recordProcessor, CacheAccess.EMPTY, QueueDistribution.ROUND_ROBIN ); // when scanner.run(); diff --git a/community/consistency-check/src/test/java/org/neo4j/consistency/checking/full/StoreProcessorTaskTest.java b/community/consistency-check/src/test/java/org/neo4j/consistency/checking/full/StoreProcessorTaskTest.java index 25d8877aa8fce..8ef6f76d4e584 100644 --- a/community/consistency-check/src/test/java/org/neo4j/consistency/checking/full/StoreProcessorTaskTest.java +++ b/community/consistency-check/src/test/java/org/neo4j/consistency/checking/full/StoreProcessorTaskTest.java @@ -52,7 +52,8 @@ public void singlePassShouldOnlyProcessTheStoreOnce() throws Exception StoreProcessorTask task = new StoreProcessorTask<>( "nodes", Statistics.NONE, 1, store, null, "nodes", ProgressMonitorFactory.NONE.multipleParts( "check" ), CacheAccess.EMPTY, - singlePassProcessor ); + singlePassProcessor, + QueueDistribution.ROUND_ROBIN ); // when task.run(); diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/store/record/RelationshipRecord.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/store/record/RelationshipRecord.java index a8dd9865dae77..00239aac9113f 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/store/record/RelationshipRecord.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/store/record/RelationshipRecord.java @@ -182,4 +182,22 @@ public void setIdTo( PropertyRecord property ) { property.setRelId( getId() ); } -} \ No newline at end of file + + @Override + public RelationshipRecord clone() + { + RelationshipRecord record = new RelationshipRecord( getId() ); + record.setInUse( inUse() ); + record.setType( type ); + record.setFirstInFirstChain( firstInFirstChain ); + record.setFirstInSecondChain( firstInSecondChain ); + record.setFirstNextRel( firstNextRel ); + record.setFirstNode( firstNode ); + record.setFirstPrevRel( firstPrevRel ); + record.setNextProp( getNextProp() ); + record.setSecondNextRel( secondNextRel ); + record.setSecondNode( secondNode ); + record.setSecondPrevRel( secondPrevRel ); + return record; + } +} diff --git a/community/kernel/src/test/java/org/neo4j/test/Randoms.java b/community/kernel/src/test/java/org/neo4j/test/Randoms.java index 84dc136583101..80068bf132433 100644 --- a/community/kernel/src/test/java/org/neo4j/test/Randoms.java +++ b/community/kernel/src/test/java/org/neo4j/test/Randoms.java @@ -156,7 +156,8 @@ public char character( int characterSets ) @SuppressWarnings( "unchecked" ) public T[] selection( T[] among, int min, int max, boolean allowDuplicates ) { - int length = min + random.nextInt( max-min ); + assert min <= max; + int length = min + (max-min == 0 ? 0 : random.nextInt( max-min ) ); T[] result = (T[]) Array.newInstance( among.getClass().getComponentType(), length ); for ( int i = 0; i < length; i++ ) {