From 6ac893833cd83f85c887d9ff1ad9ef9208b65466 Mon Sep 17 00:00:00 2001 From: Mattias Persson Date: Tue, 18 Oct 2016 22:58:35 +0200 Subject: [PATCH] Supports degrees up to 2^35 in importer without additional memory overhead. This is done via indirection in integer count field (which also contains other flags though) if count surpasses 2^29 into an array which has bigger slots. This indirection should be extremely rare and only kicks in for nodes with more than 2^29 number of relationships in a single import. --- .../cache/NodeRelationshipCache.java | 150 +++++++++++++++--- .../cache/NodeRelationshipCacheTest.java | 73 +++++++-- 2 files changed, 183 insertions(+), 40 deletions(-) diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCache.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCache.java index 5e9be272f7f4c..bbb455cb66f1a 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCache.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCache.java @@ -20,6 +20,7 @@ package org.neo4j.unsafe.impl.batchimport.cache; import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.neo4j.graphdb.Direction; @@ -54,7 +55,12 @@ public class NodeRelationshipCache implements MemoryStatsVisitor.Visitable private static final int CHUNK_SIZE = 1_000_000; private static final long EMPTY = -1; private static final long MAX_RELATIONSHIP_ID = (1L << 48/*6B*/) - 2/*reserving -1 as legal default value*/; - static final int MAX_COUNT = (1 << 30/*2 change bits*/) - 2/*reserving -1 as legal default value*/; + // if count goes beyond this max count then count is redirected to bigCounts and index into that array + // is stored as value in count offset + static final int MAX_SMALL_COUNT = (1 << 29/*3 change bits*/) - 2/*reserving -1 as legal default value*/; + // this max count is pessimistic in that it's what community format can hold, still pretty big. + // we can make this as big as our storage needs them later on + static final long MAX_COUNT = (1L << 35) - 1; // Sizes and offsets of values in each sparse node ByteArray item private static final int ID_SIZE = 6; @@ -66,8 +72,9 @@ public class NodeRelationshipCache implements MemoryStatsVisitor.Visitable // Masking for tracking changes per node private static final int DENSE_NODE_CHANGED_MASK = 0x80000000; private static final int SPARSE_NODE_CHANGED_MASK = 0x40000000; - private static final int NODE_CHANGED_MASKS = DENSE_NODE_CHANGED_MASK | SPARSE_NODE_CHANGED_MASK; - private static final int COUNT_MASK = ~NODE_CHANGED_MASKS; + private static final int BIG_COUNT_MASK = 0x20000000; + private static final int COUNT_FLAGS_MASKS = DENSE_NODE_CHANGED_MASK | SPARSE_NODE_CHANGED_MASK | BIG_COUNT_MASK; + private static final int COUNT_MASK = ~COUNT_FLAGS_MASKS; private ByteArray array; private byte[] chunkChangedArray; @@ -80,6 +87,8 @@ public class NodeRelationshipCache implements MemoryStatsVisitor.Visitable private volatile boolean forward = true; private final int chunkSize; private final NumberArrayFactory arrayFactory; + private final LongArray bigCounts; + private final AtomicInteger bigCountsCursor = new AtomicInteger(); public NodeRelationshipCache( NumberArrayFactory arrayFactory, int denseNodeThreshold ) { @@ -91,6 +100,7 @@ public NodeRelationshipCache( NumberArrayFactory arrayFactory, int denseNodeThre this.arrayFactory = arrayFactory; this.chunkSize = chunkSize; this.denseNodeThreshold = denseNodeThreshold; + this.bigCounts = arrayFactory.newDynamicLongArray( 1_000, 0 ); this.relGroupCache = new RelGroupCache( arrayFactory, chunkSize, base ); } @@ -106,27 +116,101 @@ private static byte[] minusOneBytes( int length ) * @param nodeId node to increment relationship count for. * @return count after the increment. */ - public int incrementCount( long nodeId ) + public long incrementCount( long nodeId ) { return incrementCount( array, nodeId, SPARSE_COUNT_OFFSET ); } - void setCount( long nodeId, int count ) + /** + * Should only be used by tests + */ + void setCount( long nodeId, long count, Direction direction ) + { + if ( isDense( nodeId ) ) + { + long relGroupId = all48Bits( array, nodeId, SPARSE_ID_OFFSET ); + relGroupCache.getAndSetCount( relGroupId, direction, count ); + } + else + { + setCount( array, nodeId, SPARSE_COUNT_OFFSET, count ); + } + } + + /** + * This method sets count (node degree, really). It's somewhat generic in that it accepts + * array and offset to set the count into. This is due to there being multiple places where + * we store counts. Simplest one is for sparse nodes, which live in the main + * NodeRelationshipCache.array at the dedicated offset. Other counts live in RelGroupCache.array + * which contain three counts, one for each direction. That's covered by array and offset, + * the count field works the same in all those scenarios. It's an integer which happens to have + * some other flags at msb, so it's the 29 lsb bits which represents the count. 2^29 is merely + * 1/2bn and so the count field has its 30th bit marking whether or not it's a "big count", + * if it is then the 29 count bits instead point to an array index/slot into bigCounts array + * which has much bigger space per count. This is of course quite rare, but nice to support. + * + *
+     * "small" count, i.e. < 2^29
+     * [  0c,cccc][cccc,cccc][cccc,cccc][cccc,cccc]
+     *    │└──────────────────┬──────────────────┘
+     *    │       bits containing actual count
+     *  0 marking that this is a small count
+     *
+     * "big" count, i.e. >= 2^29
+     * [  1i,iiii][iiii,iiii][iiii,iiii][iiii,iiii]
+     *    │└──────────────────┬──────────────────┘
+     *    │    bits containing array index into bigCounts array which contains the actual count
+     *  1 marking that this is a big count
+     * 
+ * + * so the bigCounts array is shared between all different types of counts, because big counts are so rare + * + * @param array {@link ByteArray} to set count in + * @param nodeId node id, i.e. array index + * @param offset offset on that array index (a ByteArray feature) + * @param count count to set at this position + */ + private void setCount( ByteArray array, long nodeId, int offset, long count ) { assertValidCount( nodeId, count ); - array.setInt( nodeId, SPARSE_COUNT_OFFSET, count ); + + if ( count > MAX_SMALL_COUNT ) + { + int rawCount = array.getInt( nodeId, offset ); + int slot; + if ( rawCount == -1 || !isBigCount( rawCount ) ) + { + // Allocate a slot in the bigCounts array + slot = bigCountsCursor.getAndIncrement(); + array.setInt( nodeId, offset, BIG_COUNT_MASK | slot ); + } + else + { + slot = countValue( rawCount ); + } + bigCounts.set( slot, count ); + } + else + { // We can simply set it + array.setInt( nodeId, offset, toIntExact( count ) ); + } } - private static void assertValidCount( long nodeId, int count ) + private static void assertValidCount( long nodeId, long count ) { if ( count > MAX_COUNT ) { // Meaning there are bits outside of this mask, meaning this value is too big - throw new IllegalStateException( "Tried to increment count of " + nodeId + " to " + count + + throw new IllegalStateException( "Tried to increment count of node id " + nodeId + " to " + count + ", which is too big in one single import" ); } } + private static boolean isBigCount( int storedCount ) + { + return (storedCount & BIG_COUNT_MASK) != 0; + } + /** * Called by the one calling {@link #incrementCount(long)} after all nodes have been added. * Done like this since currently it's just overhead trying to maintain a high id in the face @@ -146,22 +230,38 @@ public long getHighNodeId() return this.highNodeId; } - private static int getCount( ByteArray array, long index, int offset ) + /** + * @see #setCount(ByteArray, long, int, long) setCount for description on how bigCounts work + */ + private long getCount( ByteArray array, long index, int offset ) { - long rawCount = array.getInt( index, offset ) & COUNT_MASK; - if ( rawCount == COUNT_MASK ) + int rawCount = array.getInt( index, offset ); + int count = countValue( rawCount ); + if ( count == COUNT_MASK ) { + // All bits 1, i.e. default initialized field return 0; } - return (int) rawCount; + + if ( isBigCount( rawCount ) ) + { + // 'count' means index into bigCounts in this context + return bigCounts.get( count ); + } + + return count; + } + + private static int countValue( int rawCount ) + { + return rawCount & COUNT_MASK; } - private static int incrementCount( ByteArray array, long index, int offset ) + private long incrementCount( ByteArray array, long nodeId, int offset ) { - array = array.at( index ); - int count = getCount( array, index, offset ) + 1; - assertValidCount( index, count ); - array.setInt( index, offset, count ); + array = array.at( nodeId ); + long count = getCount( array, nodeId, offset ) + 1; + setCount( array, nodeId, offset, count ); return count; } @@ -369,13 +469,13 @@ public void setForwardScan( boolean forward ) * @param direction {@link Direction} to get count for. * @return count (degree) of the requested relationship chain. */ - public int getCount( long nodeId, Direction direction ) + public long getCount( long nodeId, Direction direction ) { ByteArray array = this.array.at( nodeId ); if ( isDense( array, nodeId ) ) { // Indirection into rel group cache long id = getRelationshipId( array, nodeId ); - return id == EMPTY ? 0 : relGroupCache.getAndResetCount( id, direction ); + return id == EMPTY ? 0 : relGroupCache.getAndSetCount( id, direction, 0 ); } return getCount( array, nodeId, SPARSE_COUNT_OFFSET ); @@ -399,7 +499,7 @@ public interface GroupVisitor public static final GroupVisitor NO_GROUP_VISITOR = (nodeId, next, out, in, loop) -> -1; - private static class RelGroupCache implements AutoCloseable, MemoryStatsVisitor.Visitable + private class RelGroupCache implements AutoCloseable, MemoryStatsVisitor.Visitable { private static final int NEXT_OFFSET = 0; private static final int BASE_IDS_OFFSET = ID_SIZE; @@ -431,7 +531,7 @@ private void clearRelationships( ByteArray array, long relGroupId ) * relationship chain for this node after this point in time, where the count should * restart from 0. */ - int getAndResetCount( long id, Direction direction ) + long getAndSetCount( long id, Direction direction, long newCount ) { id = rebase( id ); ByteArray array = this.array.at( id ); @@ -441,8 +541,8 @@ int getAndResetCount( long id, Direction direction ) } int offset = countOffset( direction ); - int count = NodeRelationshipCache.getCount( array, id, offset ); - array.setInt( id, offset, 0 ); + long count = getCount( array, id, offset ); + setCount( array, id, offset, newCount ); return count; } @@ -476,12 +576,12 @@ private long visitGroup( long nodeId, long relGroupIndex, GroupVisitor visitor ) return nextId; } - private static int directionOffset( Direction direction ) + private int directionOffset( Direction direction ) { return BASE_IDS_OFFSET + (direction.ordinal() * ID_AND_COUNT_SIZE); } - private static int countOffset( Direction direction ) + private int countOffset( Direction direction ) { return directionOffset( direction ) + ID_SIZE; } diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCacheTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCacheTest.java index c0b87698dde82..2838dfbd7cc09 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCacheTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCacheTest.java @@ -328,22 +328,20 @@ public void shouldFailFastOnTooHighCountOnNode() throws Exception // GIVEN cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 10, 100, base ); long nodeId = 5; - int count = NodeRelationshipCache.MAX_COUNT - 5; + long count = NodeRelationshipCache.MAX_COUNT - 1; cache.setHighNodeId( 10 ); - cache.setCount( nodeId, count ); + cache.setCount( nodeId, count, OUTGOING ); // WHEN - for ( int i = 0; i < 10; i++ ) + cache.incrementCount( nodeId ); + try { - try - { - cache.incrementCount( i ); - } - catch ( IllegalStateException e ) - { - assertEquals( NodeRelationshipCache.MAX_COUNT + 1, i ); - break; - } + cache.incrementCount( nodeId ); + fail( "Should have failed" ); + } + catch ( IllegalStateException e ) + { + // THEN Good } } @@ -402,9 +400,54 @@ public void shouldKeepNextGroupIdForNextRound() throws Exception } } + @Test + public void shouldHaveSparseNodesWithBigCounts() throws Exception + { + // GIVEN + cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 1, 100, base ); + long nodeId = 1; + cache.setHighNodeId( nodeId + 1 ); + + // WHEN + long highCount = NodeRelationshipCache.MAX_COUNT - 100; + cache.setCount( nodeId, highCount, OUTGOING ); + long nextHighCount = cache.incrementCount( nodeId ); + + // THEN + assertEquals( highCount + 1, nextHighCount ); + } + + @Test + public void shouldHaveDenseNodesWithBigCounts() throws Exception + { + // A count of a dense node follow a different path during import, first there's counting per node + // then import goes into actual import of relationships where individual chain degrees are + // kept. So this test will first set a total count, then set count for a specific chain + + // GIVEN + cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 1, 100, base ); + long nodeId = 1; + cache.setHighNodeId( nodeId + 1 ); + cache.setCount( nodeId, 2, OUTGOING ); // surely dense now + cache.getAndPutRelationship( nodeId, OUTGOING, 1, true ); + cache.getAndPutRelationship( nodeId, INCOMING, 2, true ); + + // WHEN + long highCountOut = NodeRelationshipCache.MAX_COUNT - 100; + long highCountIn = NodeRelationshipCache.MAX_COUNT - 50; + cache.setCount( nodeId, highCountOut, OUTGOING ); + cache.setCount( nodeId, highCountIn, INCOMING ); + cache.getAndPutRelationship( nodeId, OUTGOING, 1, true /*increment count*/ ); + cache.getAndPutRelationship( nodeId, INCOMING, 2, true /*increment count*/ ); + + // THEN + assertEquals( highCountOut + 1, cache.getCount( nodeId, OUTGOING ) ); + assertEquals( highCountIn + 1, cache.getCount( nodeId, INCOMING ) ); + } + private void testNode( NodeRelationshipCache link, long node, Direction direction ) { - int count = link.getCount( node, direction ); + long count = link.getCount( node, direction ); assertEquals( -1, link.getAndPutRelationship( node, direction, 5, false ) ); assertEquals( 5, link.getAndPutRelationship( node, direction, 10, false ) ); assertEquals( count, link.getCount( node, direction ) ); @@ -422,9 +465,9 @@ private long findNode( NodeRelationshipCache link, long nodeCount, boolean isDen throw new IllegalArgumentException( "No dense node found" ); } - private int incrementRandomCounts( NodeRelationshipCache link, int nodeCount, int i ) + private long incrementRandomCounts( NodeRelationshipCache link, int nodeCount, int i ) { - int highestSeenCount = 0; + long highestSeenCount = 0; while ( i --> 0 ) { long node = random.nextInt( nodeCount );