Skip to content

Commit

Permalink
Fixes CC issue where some relationship inconsistencies may be overlooked
Browse files Browse the repository at this point in the history
The initial design had a flaw, or a mismatch from one part of the code
to another where relationship record processing expected that the records
where divided up by its node ids, not its id. The RecordDistributor didn't
do this, but the RelationshipRecordCheck code had that expectation.

This would have the effect that relationships would randomly not be checked,
depending on how many threads the machine had which ran the check.
  • Loading branch information
tinwelint committed Feb 29, 2016
1 parent 266f09d commit 41d346d
Show file tree
Hide file tree
Showing 17 changed files with 582 additions and 60 deletions.
Expand Up @@ -47,12 +47,10 @@ protected Client initialValue( int id )
private final PackedMultiFieldCache cache;
private long recordsPerCPU;
private final Counts counts;
private final int threads;

public DefaultCacheAccess( Counts counts, int threads )
{
this.counts = counts;
this.threads = threads;
this.propertiesProcessed = new Collection[threads];
this.cache = new PackedMultiFieldCache( 1, 63 );
}
Expand Down Expand Up @@ -88,10 +86,10 @@ public boolean isForward()
}

@Override
public void prepareForProcessingOfSingleStore( long storeHighId )
public void prepareForProcessingOfSingleStore( long recordsPerCpu )
{
clients.resetId();
recordsPerCPU = (storeHighId / threads) + 1;
this.recordsPerCPU = recordsPerCpu;
}

private class DefaultClient implements Client
Expand Down Expand Up @@ -191,5 +189,11 @@ public void incAndGetCount( Type type )
{
counts.incAndGet( type, threadIndex );
}

@Override
public String toString()
{
return "Client[" + threadIndex + ", records/CPU:" + recordsPerCPU + "]";
}
}
}
Expand Up @@ -54,6 +54,7 @@
import static org.neo4j.consistency.checking.full.MultiPassStore.RELATIONSHIPS;
import static org.neo4j.consistency.checking.full.MultiPassStore.RELATIONSHIP_GROUPS;
import static org.neo4j.consistency.checking.full.MultiPassStore.STRINGS;
import static org.neo4j.consistency.checking.full.QueueDistribution.ROUND_ROBIN;

public class ConsistencyCheckTasks
{
Expand Down Expand Up @@ -95,20 +96,20 @@ public List<ConsistencyCheckerTask> createTasksForFullCheck( boolean checkLabelS
StoreProcessor processor =
multiPass.processor( CheckStage.Stage1_NS_PropsLabels, PROPERTIES );
tasks.add( create( CheckStage.Stage1_NS_PropsLabels.name(), nativeStores.getNodeStore(),
processor ) );
processor, ROUND_ROBIN ) );
//ReltionshipStore pass - check label counts using cached labels, check properties, skip nodes and relationships
processor = multiPass.processor( CheckStage.Stage2_RS_Labels, LABELS );
multiPass.reDecorateRelationship( processor, RelationshipRecordCheck.relationshipRecordCheckForwardPass() );
tasks.add( create( CheckStage.Stage2_RS_Labels.name(), nativeStores.getRelationshipStore(),
processor ) );
processor, ROUND_ROBIN ) );
//NodeStore pass - just cache nextRel and inUse
tasks.add( new CacheTask.CacheNextRel( CheckStage.Stage3_NS_NextRel, cacheAccess, Scanner.scan( nativeStores.getNodeStore() ) ) );
//RelationshipStore pass - check nodes inUse, FirstInFirst, FirstInSecond using cached info
processor = multiPass.processor( CheckStage.Stage4_RS_NextRel, NODES );
multiPass.reDecorateRelationship( processor, RelationshipRecordCheck.relationshipRecordCheckBackwardPass(
new PropertyChain<>( mandatoryProperties.forRelationships( reporter ) ) ) );
tasks.add( create( CheckStage.Stage4_RS_NextRel.name(), nativeStores.getRelationshipStore(),
processor ) );
processor, ROUND_ROBIN ) );
//NodeStore pass - just cache nextRel and inUse
multiPass.reDecorateNode( processor, NodeRecordCheck.toCheckNextRel(), true );
multiPass.reDecorateNode( processor, NodeRecordCheck.toCheckNextRelationshipGroup(), false );
Expand All @@ -119,68 +120,71 @@ public List<ConsistencyCheckerTask> createTasksForFullCheck( boolean checkLabelS
multiPass.reDecorateRelationship( processor,
RelationshipRecordCheck.relationshipRecordCheckSourceChain() );
tasks.add( create( CheckStage.Stage6_RS_Forward.name(), nativeStores.getRelationshipStore(),
processor ) );
processor, QueueDistribution.RELATIONSHIPS ) );
//RelationshipStore pass - reverse scan of source chain using the cache.
processor = multiPass.processor( CheckStage.Stage7_RS_Backward, RELATIONSHIPS );
multiPass.reDecorateRelationship( processor,
RelationshipRecordCheck.relationshipRecordCheckSourceChain() );
tasks.add( create( CheckStage.Stage7_RS_Backward.name(), nativeStores.getRelationshipStore(),
processor ) );
processor, QueueDistribution.RELATIONSHIPS ) );

//relationshipGroup
StoreProcessor relGrpProcessor = multiPass.processor( Stage.PARALLEL_FORWARD, RELATIONSHIP_GROUPS );
tasks.add( create( "RelationshipGroupStore-RelGrp", nativeStores.getRelationshipGroupStore(),
relGrpProcessor ) );
relGrpProcessor, ROUND_ROBIN ) );

PropertyReader propertyReader = new PropertyReader( nativeStores );
tasks.add( recordScanner( CheckStage.Stage8_PS_Props.name(),
new IterableStore<>( nativeStores.getNodeStore(), true ),
new PropertyAndNode2LabelIndexProcessor( reporter, (checkIndexes ? indexes : null ),
propertyReader, cacheAccess, mandatoryProperties.forNodes( reporter ) ),
CheckStage.Stage8_PS_Props, new IterableStore<>( nativeStores.getPropertyStore(), true ) ) );
CheckStage.Stage8_PS_Props, ROUND_ROBIN,
new IterableStore<>( nativeStores.getPropertyStore(), true ) ) );

tasks.add( create( "StringStore-Str", nativeStores.getStringStore(),
multiPass.processor( Stage.SEQUENTIAL_FORWARD, STRINGS ) ) );
multiPass.processor( Stage.SEQUENTIAL_FORWARD, STRINGS ), ROUND_ROBIN ) );
tasks.add( create( "ArrayStore-Arrays", nativeStores.getArrayStore(),
multiPass.processor( Stage.SEQUENTIAL_FORWARD, ARRAYS ) ) );
multiPass.processor( Stage.SEQUENTIAL_FORWARD, ARRAYS ), ROUND_ROBIN ) );
}
// The schema store is verified in multiple passes that share state since it fits into memory
// and we care about the consistency of back references (cf. SemanticCheck)
// PASS 1: Dynamic record chains
tasks.add( create( "SchemaStore", nativeStores.getSchemaStore() ) );
tasks.add( create( "SchemaStore", nativeStores.getSchemaStore(), ROUND_ROBIN ) );
// PASS 2: Rule integrity and obligation build up
final SchemaRecordCheck schemaCheck =
new SchemaRecordCheck( new SchemaStorage( nativeStores.getSchemaStore() ) );
tasks.add( new SchemaStoreProcessorTask<>( "SchemaStoreProcessor-check_rules", statistics, numberOfThreads,
nativeStores.getSchemaStore(), nativeStores, "check_rules",
schemaCheck, progress, cacheAccess, defaultProcessor ) );
schemaCheck, progress, cacheAccess, defaultProcessor, ROUND_ROBIN ) );
// PASS 3: Obligation verification and semantic rule uniqueness
tasks.add( new SchemaStoreProcessorTask<>( "SchemaStoreProcessor-check_obligations", statistics,
numberOfThreads, nativeStores.getSchemaStore(), nativeStores,
"check_obligations", schemaCheck.forObligationChecking(), progress, cacheAccess, defaultProcessor ) );
"check_obligations", schemaCheck.forObligationChecking(), progress, cacheAccess, defaultProcessor,
ROUND_ROBIN ) );
if ( checkGraph )
{
tasks.add( create( "RelationshipTypeTokenStore", nativeStores.getRelationshipTypeTokenStore() ) );
tasks.add( create( "PropertyKeyTokenStore", nativeStores.getPropertyKeyTokenStore() ) );
tasks.add( create( "LabelTokenStore", nativeStores.getLabelTokenStore() ) );
tasks.add( create( "RelationshipTypeNameStore", nativeStores.getRelationshipTypeNameStore() ) );
tasks.add( create( "PropertyKeyNameStore", nativeStores.getPropertyKeyNameStore() ) );
tasks.add( create( "LabelNameStore", nativeStores.getLabelNameStore() ) );
tasks.add( create( "NodeDynamicLabelStore", nativeStores.getNodeDynamicLabelStore() ) );
tasks.add( create( "RelationshipTypeTokenStore", nativeStores.getRelationshipTypeTokenStore(), ROUND_ROBIN ) );
tasks.add( create( "PropertyKeyTokenStore", nativeStores.getPropertyKeyTokenStore(), ROUND_ROBIN ) );
tasks.add( create( "LabelTokenStore", nativeStores.getLabelTokenStore(), ROUND_ROBIN ) );
tasks.add( create( "RelationshipTypeNameStore", nativeStores.getRelationshipTypeNameStore(), ROUND_ROBIN ) );
tasks.add( create( "PropertyKeyNameStore", nativeStores.getPropertyKeyNameStore(), ROUND_ROBIN ) );
tasks.add( create( "LabelNameStore", nativeStores.getLabelNameStore(), ROUND_ROBIN ) );
tasks.add( create( "NodeDynamicLabelStore", nativeStores.getNodeDynamicLabelStore(), ROUND_ROBIN ) );
}
if ( checkLabelScanStore )
{
tasks.add( recordScanner( "NodeStoreToLabelScanStore",
new IterableStore<>( nativeStores.getNodeStore(), true ),
new NodeToLabelScanRecordProcessor( reporter, labelScanStore ),
CheckStage.Stage9_NS_LabelCounts ) );
CheckStage.Stage9_NS_LabelCounts, ROUND_ROBIN ) );
}
ConsistencyReporter filteredReporter = multiPass.reporter( NODES );
if ( checkLabelScanStore )
{
tasks.add( recordScanner( "LabelScanStore",
labelScanStore.newAllEntriesReader(), new LabelScanDocumentProcessor(
filteredReporter, new LabelScanCheck() ), Stage.SEQUENTIAL_FORWARD ) );
filteredReporter, new LabelScanCheck() ), Stage.SEQUENTIAL_FORWARD,
ROUND_ROBIN ) );
}
if ( checkIndexes )
{
Expand All @@ -189,34 +193,35 @@ public List<ConsistencyCheckerTask> createTasksForFullCheck( boolean checkLabelS
tasks.add( recordScanner( format( "Index_%d", indexRule.getId() ),
new IndexIterator( indexes.accessorFor( indexRule ) ),
new IndexEntryProcessor( filteredReporter, new IndexCheck( indexRule ) ),
Stage.SEQUENTIAL_FORWARD ) );
Stage.SEQUENTIAL_FORWARD, ROUND_ROBIN ) );
}
}
return tasks;
}

private <RECORD> RecordScanner<RECORD> recordScanner( String name,
BoundedIterable<RECORD> store, RecordProcessor<RECORD> processor, Stage stage,
QueueDistribution distribution,
@SuppressWarnings( "rawtypes" ) IterableStore... warmupStores )
{
return stage.isParallel()
? new ParallelRecordScanner<>( name, statistics, numberOfThreads, store, progress, processor,
cacheAccess, warmupStores )
cacheAccess, distribution, warmupStores )
: new SequentialRecordScanner<>( name, statistics, numberOfThreads, store, progress, processor,
warmupStores );
}

private <RECORD extends AbstractBaseRecord> StoreProcessorTask<RECORD> create( String name,
RecordStore<RECORD> input )
RecordStore<RECORD> input, QueueDistribution distribution )
{
return new StoreProcessorTask<>( name, statistics, numberOfThreads, input, nativeStores, name, progress,
cacheAccess, defaultProcessor );
cacheAccess, defaultProcessor, distribution );
}

private <RECORD extends AbstractBaseRecord> StoreProcessorTask<RECORD> create( String name,
RecordStore<RECORD> input, StoreProcessor processor )
RecordStore<RECORD> input, StoreProcessor processor, QueueDistribution distribution )
{
return new StoreProcessorTask<>( name, statistics, numberOfThreads, input, nativeStores, name, progress,
cacheAccess, processor );
cacheAccess, processor, distribution );
}
}
Expand Up @@ -20,6 +20,7 @@
package org.neo4j.consistency.checking.full;

import org.neo4j.consistency.checking.cache.CacheAccess;
import org.neo4j.consistency.checking.full.QueueDistribution.QueueDistributor;
import org.neo4j.consistency.statistics.Statistics;
import org.neo4j.helpers.progress.ProgressMonitorFactory.MultiPartBuilder;
import org.neo4j.kernel.api.direct.BoundedIterable;
Expand All @@ -30,22 +31,26 @@
public class ParallelRecordScanner<RECORD> extends RecordScanner<RECORD>
{
private final CacheAccess cacheAccess;
private final QueueDistribution distribution;

public ParallelRecordScanner( String name, Statistics statistics, int threads, BoundedIterable<RECORD> store,
MultiPartBuilder builder, RecordProcessor<RECORD> processor, CacheAccess cacheAccess,
QueueDistribution distribution,
IterableStore... warmUpStores )
{
super( name, statistics, threads, store, builder, processor, warmUpStores );
this.cacheAccess = cacheAccess;
this.distribution = distribution;
}

@Override
protected void scan()
{
long recordsPerCPU = (store.maxCount() / numberOfThreads) + 1;
long recordsPerCPU = RecordDistributor.calculateRecodsPerCpu( store.maxCount(), numberOfThreads );
cacheAccess.prepareForProcessingOfSingleStore( recordsPerCPU );

QueueDistributor<RECORD> distributor = distribution.distributor( recordsPerCPU, numberOfThreads );
distributeRecords( numberOfThreads, getClass().getSimpleName() + "-" + name,
DEFAULT_QUEUE_SIZE, store, progress, processor );
DEFAULT_QUEUE_SIZE, store, progress, processor, distributor );
}
}
@@ -0,0 +1,113 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.consistency.checking.full;

import org.neo4j.consistency.checking.full.RecordDistributor.RecordConsumer;
import org.neo4j.kernel.impl.store.record.RelationshipRecord;

/**
* Factory for creating {@link QueueDistribution}. Typically the distribution type is decided higher up
* in the call stack and the actual {@link QueueDistributor} is instantiated when more data is available
* deeper down in the call stack.
*/
public interface QueueDistribution
{
<RECORD> QueueDistributor<RECORD> distributor( long recordsPerCpu, int numberOfThreads );

/**
* Distributes records into {@link RecordConsumer}.
*/
public interface QueueDistributor<RECORD>
{
void distribute( RECORD record, RecordConsumer<RECORD> consumer ) throws InterruptedException;
}

/**
* Distributes records round-robin style to all queues.
*/
public static final QueueDistribution ROUND_ROBIN = new QueueDistribution()
{
@Override
public <RECORD> QueueDistributor<RECORD> distributor( long recordsPerCpu, int numberOfThreads )
{
return new RoundRobinQueueDistributor<>( numberOfThreads );
}
};

/**
* Distributes {@link RelationshipRecord} depending on the start/end node ids.
*/
public static final QueueDistribution RELATIONSHIPS = new QueueDistribution()
{
@Override
public QueueDistributor<RelationshipRecord> distributor( long recordsPerCpu, int numberOfThreads )
{
return new RelationshipNodesQueueDistributor( recordsPerCpu );
}
};

static class RoundRobinQueueDistributor<RECORD> implements QueueDistributor<RECORD>
{
private final int numberOfThreads;
private int nextQIndex;

public RoundRobinQueueDistributor( int numberOfThreads )
{
this.numberOfThreads = numberOfThreads;
}

@Override
public void distribute( RECORD record, RecordConsumer<RECORD> consumer ) throws InterruptedException
{
nextQIndex = (nextQIndex + 1) % numberOfThreads;
consumer.accept( record, nextQIndex );
}
}

static class RelationshipNodesQueueDistributor implements QueueDistributor<RelationshipRecord>
{
private final long recordsPerCpu;

public RelationshipNodesQueueDistributor( long recordsPerCpu )
{
this.recordsPerCpu = recordsPerCpu;
}

@Override
public void distribute( RelationshipRecord relationship, RecordConsumer<RelationshipRecord> consumer )
throws InterruptedException
{
int qIndex1 = (int) (relationship.getFirstNode() / recordsPerCpu);
int qIndex2 = (int) (relationship.getSecondNode() / recordsPerCpu);
try
{
consumer.accept( relationship, qIndex1 );
if ( qIndex1 != qIndex2 )
{
consumer.accept( relationship, qIndex2 );
}
}
catch ( ArrayIndexOutOfBoundsException e )
{
throw e;
}
}
}
}

0 comments on commit 41d346d

Please sign in to comment.