Skip to content

Commit

Permalink
Fixes a count/progress issue after restarting from linking state
Browse files Browse the repository at this point in the history
by storing the node count in the distribution file too
  • Loading branch information
tinwelint committed Dec 4, 2017
1 parent 2858e69 commit da96688
Show file tree
Hide file tree
Showing 10 changed files with 56 additions and 40 deletions.
Expand Up @@ -190,6 +190,7 @@ public <T> T getState( Class<T> type )
public <T> void putState( T state )
{
accessibleState.put( state.getClass(), state );
dependencies.satisfyDependency( state );
}

/**
Expand Down Expand Up @@ -262,7 +263,7 @@ public void calculateNodeDegrees()
{
Configuration relationshipConfig =
configWithRecordsPerPageBasedBatchSize( config, neoStore.getRelationshipStore() );
nodeRelationshipCache.setHighNodeId( neoStore.getNodeStore().getHighId() );
nodeRelationshipCache.setNodeCount( getState( RelationshipTypeDistribution.class ).getNodeCount() );
NodeDegreeCountStage nodeDegreeStage = new NodeDegreeCountStage( relationshipConfig,
neoStore.getRelationshipStore(), nodeRelationshipCache, memoryUsageStats );
executeStage( nodeDegreeStage );
Expand Down
Expand Up @@ -23,7 +23,6 @@

import org.neo4j.kernel.impl.store.InlineNodeLabels;
import org.neo4j.kernel.impl.store.NodeStore;
import org.neo4j.kernel.impl.store.record.DynamicRecord;
import org.neo4j.kernel.impl.store.record.NodeRecord;
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdGenerator;
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper;
Expand Down
Expand Up @@ -64,7 +64,7 @@ public class RelationshipStage extends Stage
public RelationshipStage( Configuration config, IoMonitor writeMonitor,
InputIterable<InputRelationship> relationships, IdMapper idMapper,
Collector badCollector, InputCache inputCache,
BatchingNeoStores neoStore, EntityStoreUpdaterStep.Monitor storeUpdateMonitor ) throws IOException
BatchingNeoStores neoStore, CountingStoreUpdateMonitor storeUpdateMonitor ) throws IOException
{
super( NAME, null, config, ORDER_SEND_DOWNSTREAM );
add( new InputIteratorBatcherStep<>( control(), config, relationships.iterator(),
Expand All @@ -76,7 +76,8 @@ public RelationshipStage( Configuration config, IoMonitor writeMonitor,

RelationshipStore relationshipStore = neoStore.getRelationshipStore();
PropertyStore propertyStore = neoStore.getPropertyStore();
add( typer = new RelationshipTypeCheckerStep( control(), config, neoStore.getRelationshipTypeRepository() ) );
add( typer = new RelationshipTypeCheckerStep( control(), config, neoStore.getRelationshipTypeRepository(),
storeUpdateMonitor.nodesWritten() ) );
add( new AssignRelationshipIdBatchStep( control(), config, 0 ) );
add( new RelationshipPreparationStep( control(), config, idMapper ) );
add( new RelationshipRecordPreparationStep( control(), config,
Expand Down
Expand Up @@ -53,13 +53,15 @@ public class RelationshipTypeCheckerStep extends ProcessorStep<Batch<InputRelati
( e1, e2 ) -> Integer.compare( (Integer) e2.getKey(), (Integer) e1.getKey() );
private final Map<Thread,Map<Object,MutableLong>> typeCheckers = new ConcurrentHashMap<>();
private final BatchingRelationshipTypeTokenRepository typeTokenRepository;
private final long nodeCount;
private RelationshipTypeDistribution distribution;

public RelationshipTypeCheckerStep( StageControl control, Configuration config,
BatchingRelationshipTypeTokenRepository typeTokenRepository )
BatchingRelationshipTypeTokenRepository typeTokenRepository, long nodeCount )
{
super( control, "TYPE", config, 0 );
this.typeTokenRepository = typeTokenRepository;
this.nodeCount = nodeCount;
}

@Override
Expand Down Expand Up @@ -102,7 +104,7 @@ protected void done()
{
typeTokenRepository.getOrCreateId( sortedTypes[i].getKey() );
}
distribution = new RelationshipTypeDistribution( convert( sortedTypes ) );
distribution = new RelationshipTypeDistribution( nodeCount, convert( sortedTypes ) );
super.done();
}

Expand Down
Expand Up @@ -33,9 +33,11 @@ public class RelationshipTypeDistribution implements Iterable<Pair<Object,Long>>
{
// keys can be either String or Integer
private final Pair<Object,Long>[] sortedTypes;
private final long nodeCount;

public RelationshipTypeDistribution( Pair<Object,Long>[] sortedTypes )
public RelationshipTypeDistribution( long nodeCount, Pair<Object,Long>[] sortedTypes )
{
this.nodeCount = nodeCount;
this.sortedTypes = sortedTypes;
}

Expand Down Expand Up @@ -64,4 +66,19 @@ public Set<Object> types( int startingFromType, int upToType )
}
return types;
}

public long getNodeCount()
{
return nodeCount;
}

public long getRelationshipCount()
{
long sum = 0;
for ( Pair<Object,Long> type : sortedTypes )
{
sum += type.other();
}
return sum;
}
}
Expand Up @@ -225,7 +225,7 @@ private static boolean isBigCount( int storedCount )
*
* @param nodeId high node id in the store, e.g. the highest node id + 1
*/
public void setHighNodeId( long nodeId )
public void setNodeCount( long nodeId )
{
this.highNodeId = nodeId;
this.array = arrayFactory.newByteArray( highNodeId, minusOneBytes( ID_AND_COUNT_SIZE ) );
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.neo4j.unsafe.impl.batchimport.NodeStage;
import org.neo4j.unsafe.impl.batchimport.RelationshipGroupStage;
import org.neo4j.unsafe.impl.batchimport.RelationshipStage;
import org.neo4j.unsafe.impl.batchimport.RelationshipTypeDistribution;
import org.neo4j.unsafe.impl.batchimport.ScanAndCacheGroupsStage;
import org.neo4j.unsafe.impl.batchimport.SparseNodeFirstRelationshipStage;
import org.neo4j.unsafe.impl.batchimport.cache.GatheringMemoryStatsVisitor;
Expand Down Expand Up @@ -70,8 +71,6 @@ public class HumanUnderstandableExecutionMonitor implements ExecutionMonitor
// assigned later on
private final PrintStream out;
private DependencyResolver dependencyResolver;
private long actualNodeCount;
private long actualRelationshipCount;

// progress of current stage
private long goal;
Expand Down Expand Up @@ -164,7 +163,7 @@ else if ( execution.getStageName().equals( NodeDegreeCountStage.NAME ) )
// - backward linking
// - node relationship linking
// - forward linking
initializeLinking();
initializeLinking( dependencyResolver.resolveDependency( RelationshipTypeDistribution.class ) );
}
else if ( execution.getStageName().equals( CountGroupsStage.NAME ) )
{
Expand All @@ -173,7 +172,9 @@ else if ( execution.getStageName().equals( CountGroupsStage.NAME ) )
// Misc:
// - relationship group defragmentation
// - counts store
initializeMisc( dependencyResolver.resolveDependency( BatchingNeoStores.class ) );
initializeMisc(
dependencyResolver.resolveDependency( BatchingNeoStores.class ),
dependencyResolver.resolveDependency( RelationshipTypeDistribution.class ) );
}
else if ( includeStage( execution ) )
{
Expand Down Expand Up @@ -227,20 +228,23 @@ ESTIMATED_REQUIRED_MEMORY_USAGE, bytes(
initializeProgress( numberOfRelationships );
}

private void initializeLinking()
private void initializeLinking( RelationshipTypeDistribution distribution )
{
printStageHeader( "(3/4) Relationship linking" );
long actualRelationshipCount = distribution.getRelationshipCount();
initializeProgress(
actualRelationshipCount + // node degrees
actualRelationshipCount * 2 + // start/end forwards, see RelationshipLinkingProgress
actualRelationshipCount * 2 // start/end backwards, see RelationshipLinkingProgress
);
}

private void initializeMisc( BatchingNeoStores stores )
private void initializeMisc( BatchingNeoStores stores, RelationshipTypeDistribution distribution )
{
printStageHeader( "(4/4) Post processing" );
// written groups + node counts + relationship counts
long actualNodeCount = distribution.getNodeCount();
long actualRelationshipCount = distribution.getRelationshipCount();
long groupCount = stores.getTemporaryRelationshipGroupStore().getHighId();
initializeProgress(
groupCount + // Count groups
Expand Down Expand Up @@ -352,14 +356,6 @@ private void printStageHeader( String name, Object... data )
@Override
public void end( StageExecution execution, long totalTimeMillis )
{
if ( execution.getStageName().equals( NodeStage.NAME ) )
{
actualNodeCount = progressOf( execution );
}
else if ( execution.getStageName().equals( RelationshipStage.NAME ) )
{
actualRelationshipCount = progressOf( execution );
}
}

@Override
Expand Down
Expand Up @@ -48,7 +48,7 @@ public void shouldSplitUpRelationshipTypesInBatches() throws Exception
int numberOfNodes = 100;
int numberOfTypes = 10;
NodeRelationshipCache cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, denseNodeThreshold );
cache.setHighNodeId( numberOfNodes + 1 );
cache.setNodeCount( numberOfNodes + 1 );
Direction[] directions = Direction.values();
for ( int i = 0; i < numberOfNodes; i++ )
{
Expand All @@ -65,7 +65,7 @@ public void shouldSplitUpRelationshipTypesInBatches() throws Exception
numberOfRelationships += count;
}
types.sort( ( t1, t2 ) -> Long.compare( t2.other(), t1.other() ) );
RelationshipTypeDistribution typeDistribution = new RelationshipTypeDistribution( types.stream().toArray( Pair[]::new ) );
RelationshipTypeDistribution typeDistribution = new RelationshipTypeDistribution( 0, types.stream().toArray( Pair[]::new ) );

// WHEN enough memory for all types
{
Expand Down
Expand Up @@ -66,7 +66,7 @@ private void shouldReturnRelationshipTypeIdsInReverseOrderOfTokenCreation( boole
// GIVEN
BatchingRelationshipTypeTokenRepository repository = mock( BatchingRelationshipTypeTokenRepository.class );
RelationshipTypeCheckerStep step =
new RelationshipTypeCheckerStep( mock( StageControl.class ), DEFAULT, repository );
new RelationshipTypeCheckerStep( mock( StageControl.class ), DEFAULT, repository, 0 );

// WHEN
Batch<InputRelationship,RelationshipRecord> relationships =
Expand All @@ -89,7 +89,7 @@ public void shouldReturnRelationshipTypesInDescendingOrder() throws Throwable
// GIVEN
BatchingRelationshipTypeTokenRepository repository = mock( BatchingRelationshipTypeTokenRepository.class );
RelationshipTypeCheckerStep step = new RelationshipTypeCheckerStep( mock( StageControl.class ), DEFAULT,
repository );
repository, 0 );
Batch<InputRelationship,RelationshipRecord> relationships =
batchOfRelationshipsWithRandomTypes( 10, true/*use the raw ids*/ );
step.process( relationships, mock( BatchSender.class ) );
Expand Down
Expand Up @@ -84,7 +84,7 @@ public void shouldReportCorrectNumberOfDenseNodes() throws Exception
{
// GIVEN
cache = new NodeRelationshipCache( NumberArrayFactory.AUTO_WITHOUT_PAGECACHE, 5, 100, base );
cache.setHighNodeId( 26 );
cache.setNodeCount( 26 );
increment( cache, 2, 10 );
increment( cache, 5, 2 );
increment( cache, 7, 12 );
Expand All @@ -108,7 +108,7 @@ public void shouldGoThroughThePhases() throws Exception
// GIVEN
int nodeCount = 10;
cache = new NodeRelationshipCache( NumberArrayFactory.OFF_HEAP, 20, 100, base );
cache.setHighNodeId( nodeCount );
cache.setNodeCount( nodeCount );
incrementRandomCounts( cache, nodeCount, nodeCount * 20 );

// Test sparse node semantics
Expand All @@ -135,7 +135,7 @@ public void shouldObserveFirstRelationshipAsEmptyInEachDirection() throws Except
Direction[] directions = Direction.values();
GroupVisitor groupVisitor = mock( GroupVisitor.class );
cache.setForwardScan( true, true );
cache.setHighNodeId( nodes + 1 );
cache.setNodeCount( nodes + 1 );
for ( int i = 0; i < nodes; i++ )
{
assertEquals( -1L, cache.getFirstRel( nodes, groupVisitor ) );
Expand Down Expand Up @@ -169,7 +169,7 @@ public void shouldResetCountAfterGetOnDenseNodes() throws Exception
cache = new NodeRelationshipCache( NumberArrayFactory.AUTO_WITHOUT_PAGECACHE, 1, 100, base );
long nodeId = 0;
int typeId = 3;
cache.setHighNodeId( 1 );
cache.setNodeCount( 1 );
cache.incrementCount( nodeId );
cache.incrementCount( nodeId );
cache.getAndPutRelationship( nodeId, typeId, OUTGOING, 10, true );
Expand All @@ -193,7 +193,7 @@ public void shouldGetAndPutRelationshipAroundChunkEdge() throws Exception
// WHEN
long nodeId = 1_000_000 - 1;
int typeId = 10;
cache.setHighNodeId( nodeId + 1 );
cache.setNodeCount( nodeId + 1 );
Direction direction = Direction.OUTGOING;
long relId = 10;
cache.getAndPutRelationship( nodeId, typeId, direction, relId, false );
Expand All @@ -212,7 +212,7 @@ public void shouldPutRandomStuff() throws Exception
cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 1, 1000, base );

// mark random nodes as dense (dense node threshold is 1 so enough with one increment
cache.setHighNodeId( nodes );
cache.setNodeCount( nodes );
for ( long nodeId = 0; nodeId < nodes; nodeId++ )
{
if ( random.nextBoolean() )
Expand Down Expand Up @@ -249,7 +249,7 @@ public void shouldPut6ByteRelationshipIds() throws Exception
long denseNode = 1;
long relationshipId = (1L << 48) - 2;
int typeId = 10;
cache.setHighNodeId( 2 );
cache.setNodeCount( 2 );
cache.incrementCount( denseNode );

// WHEN
Expand All @@ -267,7 +267,7 @@ public void shouldFailFastIfTooBigRelationshipId() throws Exception
// GIVEN
int typeId = 10;
cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 1, 100, base );
cache.setHighNodeId( 1 );
cache.setNodeCount( 1 );

// WHEN
cache.getAndPutRelationship( 0, typeId, OUTGOING, (1L << 48) - 2, false );
Expand All @@ -290,7 +290,7 @@ public void shouldVisitChangedNodes() throws Exception
int nodes = 10;
int typeId = 10;
cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 2, 100, base );
cache.setHighNodeId( nodes );
cache.setNodeCount( nodes );
for ( long nodeId = 0; nodeId < nodes; nodeId++ )
{
cache.incrementCount( nodeId );
Expand Down Expand Up @@ -342,7 +342,7 @@ public void shouldFailFastOnTooHighCountOnNode() throws Exception
long nodeId = 5;
long count = NodeRelationshipCache.MAX_COUNT - 1;
int typeId = 10;
cache.setHighNodeId( 10 );
cache.setNodeCount( 10 );
cache.setCount( nodeId, count, typeId, OUTGOING );

// WHEN
Expand All @@ -365,7 +365,7 @@ public void shouldKeepNextGroupIdForNextRound() throws Exception
cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 1, 100, base );
long nodeId = 0;
int typeId = 10;
cache.setHighNodeId( nodeId + 1 );
cache.setNodeCount( nodeId + 1 );
cache.incrementCount( nodeId );
GroupVisitor groupVisitor = mock( GroupVisitor.class );
when( groupVisitor.visit( anyLong(), anyInt(), anyLong(), anyLong(), anyLong() ) ).thenReturn( 1L, 2L, 3L );
Expand Down Expand Up @@ -425,7 +425,7 @@ public void shouldHaveDenseNodesWithBigCounts() throws Exception
cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 1, 100, base );
long nodeId = 1;
int typeId = 10;
cache.setHighNodeId( nodeId + 1 );
cache.setNodeCount( nodeId + 1 );
cache.setCount( nodeId, 2, typeId, OUTGOING ); // surely dense now
cache.getAndPutRelationship( nodeId, typeId, OUTGOING, 1, true );
cache.getAndPutRelationship( nodeId, typeId, INCOMING, 2, true );
Expand All @@ -448,7 +448,7 @@ public void shouldCacheMultipleDenseNodeRelationshipHeads() throws Exception
{
// GIVEN
cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 1 );
cache.setHighNodeId( 10 );
cache.setNodeCount( 10 );
long nodeId = 3;
cache.setCount( nodeId, 10, /*these do not matter ==>*/ 0, OUTGOING );

Expand Down Expand Up @@ -486,7 +486,7 @@ public void shouldHaveSparseNodesWithBigCounts() throws Exception
cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 1, 100, base );
long nodeId = 1;
int typeId = 10;
cache.setHighNodeId( nodeId + 1 );
cache.setNodeCount( nodeId + 1 );

// WHEN
long highCount = NodeRelationshipCache.MAX_COUNT - 100;
Expand Down

0 comments on commit da96688

Please sign in to comment.