Skip to content

Commit

Permalink
Static cache arrays during import for better performance
Browse files Browse the repository at this point in the history
Caches are instantiated later where maximum size is known
and so static arrays can be instantiated, they are faster than
the dynamically growing arrays and so the importer will be
faster as a whole.

The creation of these arrays still fallback on splitting between
on/off heap if it won't fit in either completely, so that
case will still perform that same. Best case will be better.
  • Loading branch information
tinwelint committed Aug 28, 2016
1 parent 8d0d00f commit 7338276
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 41 deletions.
Expand Up @@ -39,15 +39,13 @@
public class CalculateDenseNodesStage extends Stage
{
private RelationshipTypeCheckerStep typer;
private final NodeRelationshipCache cache;

public CalculateDenseNodesStage( Configuration config, InputIterable<InputRelationship> relationships,
NodeRelationshipCache cache, IdMapper idMapper,
Collector badCollector, InputCache inputCache,
BatchingNeoStores neoStores ) throws IOException
{
super( "Calculate dense nodes", config );
this.cache = cache;
add( new InputIteratorBatcherStep<>( control(), config,
relationships.iterator(), InputRelationship.class ) );
if ( !relationships.supportsMultiplePasses() )
Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.neo4j.kernel.impl.store.NodeStore;
import org.neo4j.kernel.impl.store.PropertyStore;
import org.neo4j.kernel.impl.store.record.NodeRecord;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache;
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdGenerator;
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper;
import org.neo4j.unsafe.impl.batchimport.input.InputCache;
Expand All @@ -43,20 +44,25 @@
*/
public class NodeStage extends Stage
{
private final NodeRelationshipCache cache;
private final NodeStore nodeStore;

public NodeStage( Configuration config, IoMonitor writeMonitor,
InputIterable<InputNode> nodes, IdMapper idMapper, IdGenerator idGenerator,
BatchingNeoStores neoStore, InputCache inputCache, LabelScanStore labelScanStore,
EntityStoreUpdaterStep.Monitor storeUpdateMonitor,
NodeRelationshipCache cache,
StatsProvider memoryUsage ) throws IOException
{
super( "Nodes", config, ORDER_SEND_DOWNSTREAM );
this.cache = cache;
add( new InputIteratorBatcherStep<>( control(), config, nodes.iterator(), InputNode.class ) );
if ( !nodes.supportsMultiplePasses() )
{
add( new InputEntityCacherStep<>( control(), config, inputCache.cacheNodes( MAIN ) ) );
}

NodeStore nodeStore = neoStore.getNodeStore();
nodeStore = neoStore.getNodeStore();
PropertyStore propertyStore = neoStore.getPropertyStore();
add( new PropertyEncoderStep<>( control(), config, neoStore.getPropertyKeyRepository(), propertyStore ) );
add( new NodeEncoderStep( control(), config, idMapper, idGenerator,
Expand All @@ -65,4 +71,13 @@ public NodeStage( Configuration config, IoMonitor writeMonitor,
add( new EntityStoreUpdaterStep<>( control(), config, nodeStore, propertyStore,
writeMonitor, storeUpdateMonitor ) );
}

@Override
public void close()
{
// At this point we know how many nodes we have, so we tell the cache that instead of having the
// cache keeping track of that in a the face of concurrent updates.
cache.setHighNodeId( nodeStore.getHighId() );
super.close();
}
}
Expand Up @@ -156,35 +156,23 @@ public void doImport( Input input ) throws IOException
// Stage 1 -- nodes, properties, labels
NodeStage nodeStage = new NodeStage( config, writeMonitor,
nodes, idMapper, idGenerator, neoStore, inputCache, neoStore.getLabelScanStore(),
storeUpdateMonitor, memoryUsageStats );

// Stage 2 -- calculate dense node threshold
CalculateDenseNodesStage calculateDenseNodesStage = new CalculateDenseNodesStage( config,
relationships, nodeRelationshipCache, idMapper, badCollector, inputCache, neoStore );

// Execute stages 1 and 2 in parallel or sequentially?
storeUpdateMonitor, nodeRelationshipCache, memoryUsageStats );
executeStages( nodeStage );
if ( idMapper.needsPreparation() )
{ // The id mapper of choice needs preparation in order to get ids from it,
// So we need to execute the node stage first as it fills the id mapper and prepares it in the end,
// before executing any stage that needs ids from the id mapper, for example calc dense node stage.
executeStages( nodeStage );
{
executeStages( new IdMapperPreparationStage( config, idMapper, cachedNodes,
badCollector, memoryUsageStats ) );
PrimitiveLongIterator duplicateNodeIds = badCollector.leftOverDuplicateNodesIds();
if ( duplicateNodeIds.hasNext() )
{
executeStages( new DeleteDuplicateNodesStage( config, duplicateNodeIds, neoStore ) );
}
executeStages( calculateDenseNodesStage );
}
else
{ // The id mapper of choice doesn't need any preparation, so we can go ahead and execute
// the node and calc dense node stages in parallel.
executeStages( nodeStage, calculateDenseNodesStage );
}
// At this point we know how many nodes we have, so we tell the cache that instead of having the
// cache keeping track of that in a the face of concurrent updates.
nodeRelationshipCache.setHighNodeId( neoStore.getNodeStore().getHighId() );

// Stage 2 -- calculate dense node threshold
CalculateDenseNodesStage calculateDenseNodesStage = new CalculateDenseNodesStage( config,
relationships, nodeRelationshipCache, idMapper, badCollector, inputCache, neoStore );
executeStages( calculateDenseNodesStage );

importRelationships( nodeRelationshipCache, storeUpdateMonitor, neoStore, writeMonitor,
idMapper, cachedRelationships, inputCache,
Expand Down
Expand Up @@ -68,16 +68,17 @@ public class NodeRelationshipCache implements MemoryStatsVisitor.Visitable
private static final int NODE_CHANGED_MASKS = DENSE_NODE_CHANGED_MASK | SPARSE_NODE_CHANGED_MASK;
private static final int COUNT_MASK = ~NODE_CHANGED_MASKS;

private final ByteArray array;
private ByteArray array;
private byte[] chunkChangedArray;
private final int denseNodeThreshold;
private final RelGroupCache relGroupCache;
private long highId;
private long highNodeId;
// This cache participates in scans backwards and forwards, marking entities as changed in the process.
// When going forward (forward==true) changes are marked with a set bit, a cleared bit when going bachwards.
// This way there won't have to be a clearing of the change bits in between the scans.
private volatile boolean forward = true;
private final int chunkSize;
private final NumberArrayFactory arrayFactory;

public NodeRelationshipCache( NumberArrayFactory arrayFactory, int denseNodeThreshold )
{
Expand All @@ -86,8 +87,8 @@ public NodeRelationshipCache( NumberArrayFactory arrayFactory, int denseNodeThre

NodeRelationshipCache( NumberArrayFactory arrayFactory, int denseNodeThreshold, int chunkSize, long base )
{
this.arrayFactory = arrayFactory;
this.chunkSize = chunkSize;
this.array = arrayFactory.newDynamicByteArray( chunkSize, minusOneBytes( ID_AND_COUNT_SIZE ) );
this.denseNodeThreshold = denseNodeThreshold;
this.relGroupCache = new RelGroupCache( arrayFactory, chunkSize, base );
}
Expand Down Expand Up @@ -134,13 +135,14 @@ private static void assertValidCount( long nodeId, int count )
*/
public void setHighNodeId( long nodeId )
{
this.highId = nodeId;
this.highNodeId = nodeId;
this.array = arrayFactory.newByteArray( highNodeId, minusOneBytes( ID_AND_COUNT_SIZE ) );
this.chunkChangedArray = new byte[chunkOf( nodeId ) + 1];
}

public long getHighNodeId()
{
return this.highId;
return this.highNodeId;
}

private static int getCount( ByteArray array, long index, int offset )
Expand Down Expand Up @@ -410,7 +412,7 @@ private static class RelGroupCache implements AutoCloseable, MemoryStatsVisitor.
{
this.base = base;
assert chunkSize > 0;
this.array = arrayFactory.newDynamicByteArray( chunkSize,
this.array = arrayFactory.newDynamicByteArray( 1_000_000,
minusOneBytes( ID_SIZE/*next*/ + (ID_AND_COUNT_SIZE) * Direction.values().length ) );
this.nextFreeId = new AtomicLong( base );
}
Expand Down Expand Up @@ -524,7 +526,7 @@ public void close()
@Override
public void acceptMemoryStatsVisitor( MemoryStatsVisitor visitor )
{
array.acceptMemoryStatsVisitor( visitor );
nullSafeMemoryStatsVisitor( array, visitor );
}
}

Expand All @@ -543,10 +545,18 @@ public void close()
@Override
public void acceptMemoryStatsVisitor( MemoryStatsVisitor visitor )
{
array.acceptMemoryStatsVisitor( visitor );
nullSafeMemoryStatsVisitor( array, visitor );
relGroupCache.acceptMemoryStatsVisitor( visitor );
}

static void nullSafeMemoryStatsVisitor( MemoryStatsVisitor.Visitable visitable, MemoryStatsVisitor visitor )
{
if ( visitable != null )
{
visitable.acceptMemoryStatsVisitor( visitor );
}
}

private static int changeMask( boolean dense )
{
return dense ? DENSE_NODE_CHANGED_MASK : SPARSE_NODE_CHANGED_MASK;
Expand All @@ -570,7 +580,7 @@ public void visitChangedNodes( NodeChangeVisitor visitor, boolean denseNodes )
{
long mask = changeMask( denseNodes );
byte chunkMask = chunkChangeMask( denseNodes );
for ( long nodeId = 0; nodeId < highId; )
for ( long nodeId = 0; nodeId < highNodeId; )
{
if ( !chunkHasChange( nodeId, chunkMask ) )
{
Expand All @@ -579,7 +589,7 @@ public void visitChangedNodes( NodeChangeVisitor visitor, boolean denseNodes )
}

ByteArray chunk = array.at( nodeId );
for ( int i = 0; i < chunkSize && nodeId < highId; i++, nodeId++ )
for ( int i = 0; i < chunkSize && nodeId < highNodeId; i++, nodeId++ )
{
if ( isDense( chunk, nodeId ) == denseNodes && nodeIsChanged( chunk, nodeId, mask ) )
{
Expand Down
Expand Up @@ -52,7 +52,11 @@ public void close()
{
if ( !closed )
{
UnsafeUtil.free( address );
if ( length > 0 )
{
// Allocating 0 bytes actually returns address 0
UnsafeUtil.free( address );
}
closed = true;
}
}
Expand Down
Expand Up @@ -79,13 +79,13 @@ public void shouldReportCorrectNumberOfDenseNodes() throws Exception
{
// GIVEN
cache = new NodeRelationshipCache( NumberArrayFactory.AUTO, 5, 100, base );
cache.setHighNodeId( 26 );
increment( cache, 2, 10 );
increment( cache, 5, 2 );
increment( cache, 7, 12 );
increment( cache, 23, 4 );
increment( cache, 24, 5 );
increment( cache, 25, 6 );
cache.setHighNodeId( 25 );

// THEN
assertFalse( cache.isDense( 0 ) );
Expand Down Expand Up @@ -129,7 +129,7 @@ public void shouldObserveFirstRelationshipAsEmptyInEachDirection() throws Except
Direction[] directions = Direction.values();
GroupVisitor groupVisitor = mock( GroupVisitor.class );
cache.setForwardScan( true );
cache.setHighNodeId( nodes );
cache.setHighNodeId( nodes+1 );
for ( int i = 0; i < nodes; i++ )
{
assertEquals( -1L, cache.getFirstRel( nodes, groupVisitor ) );
Expand Down Expand Up @@ -185,7 +185,7 @@ public void shouldGetAndPutRelationshipAroundChunkEdge() throws Exception

// WHEN
long nodeId = 1_000_000 - 1;
cache.setHighNodeId( nodeId );
cache.setHighNodeId( nodeId+1 );
Direction direction = Direction.OUTGOING;
long relId = 10;
cache.getAndPutRelationship( nodeId, direction, relId, false );
Expand All @@ -203,14 +203,14 @@ public void shouldPutRandomStuff() throws Exception
cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 1, 1000, base );

// mark random nodes as dense (dense node threshold is 1 so enough with one increment
cache.setHighNodeId( nodes );
for ( long nodeId = 0; nodeId < nodes; nodeId++ )
{
if ( random.nextBoolean() )
{
cache.incrementCount( nodeId );
}
}
cache.setHighNodeId( nodes );

// WHEN
for ( int i = 0; i < 100_000; i++ )
Expand Down Expand Up @@ -239,8 +239,8 @@ public void shouldPut6ByteRelationshipIds() throws Exception
long sparseNode = 0;
long denseNode = 1;
long relationshipId = (1L << 48) - 2;
cache.setHighNodeId( 2 );
cache.incrementCount( denseNode );
cache.setHighNodeId( 1 );

// WHEN
assertEquals( -1L, cache.getAndPutRelationship( sparseNode, OUTGOING, relationshipId, false ) );
Expand Down Expand Up @@ -278,6 +278,7 @@ public void shouldVisitChangedNodes() throws Exception
// GIVEN
int nodes = 10;
cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 2, 100, base );
cache.setHighNodeId( nodes );
for ( long nodeId = 0; nodeId < nodes; nodeId++ )
{
cache.incrementCount( nodeId );
Expand All @@ -286,7 +287,6 @@ public void shouldVisitChangedNodes() throws Exception
cache.incrementCount( nodeId );
}
}
cache.setHighNodeId( nodes );
PrimitiveLongSet keySparseChanged = Primitive.longSet( nodes );
PrimitiveLongSet keyDenseChanged = Primitive.longSet( nodes );
for ( int i = 0; i < nodes / 2; i++ )
Expand Down Expand Up @@ -329,6 +329,7 @@ public void shouldFailFastOnTooHighCountOnNode() throws Exception
cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 10, 100, base );
long nodeId = 5;
int count = NodeRelationshipCache.MAX_COUNT - 5;
cache.setHighNodeId( 10 );
cache.setCount( nodeId, count );

// WHEN
Expand All @@ -352,8 +353,8 @@ public void shouldKeepNextGroupIdForNextRound() throws Exception
// GIVEN
cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 1, 100, base );
long nodeId = 0;
cache.incrementCount( nodeId );
cache.setHighNodeId( nodeId+1 );
cache.incrementCount( nodeId );
GroupVisitor groupVisitor = mock( GroupVisitor.class );
when( groupVisitor.visit( anyLong(), anyLong(), anyLong(), anyLong(), anyLong() ) ).thenReturn( 1L, 2L, 3L );

Expand Down

0 comments on commit 7338276

Please sign in to comment.