Skip to content

Commit

Permalink
Supports degrees up to 2^35 in importer
Browse files Browse the repository at this point in the history
without additional memory overhead. This is done via indirection in
integer count field (which also contains other flags though) if count
surpasses 2^29 into an array which has bigger slots. This indirection
should be extremely rare and only kicks in for nodes with more than
2^29 number of relationships in a single import.
  • Loading branch information
tinwelint committed Apr 18, 2017
1 parent 3e29f42 commit 6ac8938
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 40 deletions.
Expand Up @@ -20,6 +20,7 @@
package org.neo4j.unsafe.impl.batchimport.cache;

import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.neo4j.graphdb.Direction;
Expand Down Expand Up @@ -54,7 +55,12 @@ public class NodeRelationshipCache implements MemoryStatsVisitor.Visitable
private static final int CHUNK_SIZE = 1_000_000;
private static final long EMPTY = -1;
private static final long MAX_RELATIONSHIP_ID = (1L << 48/*6B*/) - 2/*reserving -1 as legal default value*/;
static final int MAX_COUNT = (1 << 30/*2 change bits*/) - 2/*reserving -1 as legal default value*/;
// if count goes beyond this max count then count is redirected to bigCounts and index into that array
// is stored as value in count offset
static final int MAX_SMALL_COUNT = (1 << 29/*3 change bits*/) - 2/*reserving -1 as legal default value*/;
// this max count is pessimistic in that it's what community format can hold, still pretty big.
// we can make this as big as our storage needs them later on
static final long MAX_COUNT = (1L << 35) - 1;

// Sizes and offsets of values in each sparse node ByteArray item
private static final int ID_SIZE = 6;
Expand All @@ -66,8 +72,9 @@ public class NodeRelationshipCache implements MemoryStatsVisitor.Visitable
// Masking for tracking changes per node
private static final int DENSE_NODE_CHANGED_MASK = 0x80000000;
private static final int SPARSE_NODE_CHANGED_MASK = 0x40000000;
private static final int NODE_CHANGED_MASKS = DENSE_NODE_CHANGED_MASK | SPARSE_NODE_CHANGED_MASK;
private static final int COUNT_MASK = ~NODE_CHANGED_MASKS;
private static final int BIG_COUNT_MASK = 0x20000000;
private static final int COUNT_FLAGS_MASKS = DENSE_NODE_CHANGED_MASK | SPARSE_NODE_CHANGED_MASK | BIG_COUNT_MASK;
private static final int COUNT_MASK = ~COUNT_FLAGS_MASKS;

private ByteArray array;
private byte[] chunkChangedArray;
Expand All @@ -80,6 +87,8 @@ public class NodeRelationshipCache implements MemoryStatsVisitor.Visitable
private volatile boolean forward = true;
private final int chunkSize;
private final NumberArrayFactory arrayFactory;
private final LongArray bigCounts;
private final AtomicInteger bigCountsCursor = new AtomicInteger();

public NodeRelationshipCache( NumberArrayFactory arrayFactory, int denseNodeThreshold )
{
Expand All @@ -91,6 +100,7 @@ public NodeRelationshipCache( NumberArrayFactory arrayFactory, int denseNodeThre
this.arrayFactory = arrayFactory;
this.chunkSize = chunkSize;
this.denseNodeThreshold = denseNodeThreshold;
this.bigCounts = arrayFactory.newDynamicLongArray( 1_000, 0 );
this.relGroupCache = new RelGroupCache( arrayFactory, chunkSize, base );
}

Expand All @@ -106,27 +116,101 @@ private static byte[] minusOneBytes( int length )
* @param nodeId node to increment relationship count for.
* @return count after the increment.
*/
public int incrementCount( long nodeId )
public long incrementCount( long nodeId )
{
return incrementCount( array, nodeId, SPARSE_COUNT_OFFSET );
}

void setCount( long nodeId, int count )
/**
* Should only be used by tests
*/
void setCount( long nodeId, long count, Direction direction )
{
if ( isDense( nodeId ) )
{
long relGroupId = all48Bits( array, nodeId, SPARSE_ID_OFFSET );
relGroupCache.getAndSetCount( relGroupId, direction, count );
}
else
{
setCount( array, nodeId, SPARSE_COUNT_OFFSET, count );
}
}

/**
* This method sets count (node degree, really). It's somewhat generic in that it accepts
* array and offset to set the count into. This is due to there being multiple places where
* we store counts. Simplest one is for sparse nodes, which live in the main
* NodeRelationshipCache.array at the dedicated offset. Other counts live in RelGroupCache.array
* which contain three counts, one for each direction. That's covered by array and offset,
* the count field works the same in all those scenarios. It's an integer which happens to have
* some other flags at msb, so it's the 29 lsb bits which represents the count. 2^29 is merely
* 1/2bn and so the count field has its 30th bit marking whether or not it's a "big count",
* if it is then the 29 count bits instead point to an array index/slot into bigCounts array
* which has much bigger space per count. This is of course quite rare, but nice to support.
*
* <pre>
* "small" count, i.e. < 2^29
* [ 0c,cccc][cccc,cccc][cccc,cccc][cccc,cccc]
* │└──────────────────┬──────────────────┘
* │ bits containing actual count
* 0 marking that this is a small count
*
* "big" count, i.e. >= 2^29
* [ 1i,iiii][iiii,iiii][iiii,iiii][iiii,iiii]
* │└──────────────────┬──────────────────┘
* │ bits containing array index into bigCounts array which contains the actual count
* 1 marking that this is a big count
* </pre>
*
* so the bigCounts array is shared between all different types of counts, because big counts are so rare
*
* @param array {@link ByteArray} to set count in
* @param nodeId node id, i.e. array index
* @param offset offset on that array index (a ByteArray feature)
* @param count count to set at this position
*/
private void setCount( ByteArray array, long nodeId, int offset, long count )
{
assertValidCount( nodeId, count );
array.setInt( nodeId, SPARSE_COUNT_OFFSET, count );

if ( count > MAX_SMALL_COUNT )
{
int rawCount = array.getInt( nodeId, offset );
int slot;
if ( rawCount == -1 || !isBigCount( rawCount ) )
{
// Allocate a slot in the bigCounts array
slot = bigCountsCursor.getAndIncrement();
array.setInt( nodeId, offset, BIG_COUNT_MASK | slot );
}
else
{
slot = countValue( rawCount );
}
bigCounts.set( slot, count );
}
else
{ // We can simply set it
array.setInt( nodeId, offset, toIntExact( count ) );
}
}

private static void assertValidCount( long nodeId, int count )
private static void assertValidCount( long nodeId, long count )
{
if ( count > MAX_COUNT )
{
// Meaning there are bits outside of this mask, meaning this value is too big
throw new IllegalStateException( "Tried to increment count of " + nodeId + " to " + count +
throw new IllegalStateException( "Tried to increment count of node id " + nodeId + " to " + count +
", which is too big in one single import" );
}
}

private static boolean isBigCount( int storedCount )
{
return (storedCount & BIG_COUNT_MASK) != 0;
}

/**
* Called by the one calling {@link #incrementCount(long)} after all nodes have been added.
* Done like this since currently it's just overhead trying to maintain a high id in the face
Expand All @@ -146,22 +230,38 @@ public long getHighNodeId()
return this.highNodeId;
}

private static int getCount( ByteArray array, long index, int offset )
/**
* @see #setCount(ByteArray, long, int, long) setCount for description on how bigCounts work
*/
private long getCount( ByteArray array, long index, int offset )
{
long rawCount = array.getInt( index, offset ) & COUNT_MASK;
if ( rawCount == COUNT_MASK )
int rawCount = array.getInt( index, offset );
int count = countValue( rawCount );
if ( count == COUNT_MASK )
{
// All bits 1, i.e. default initialized field
return 0;
}
return (int) rawCount;

if ( isBigCount( rawCount ) )
{
// 'count' means index into bigCounts in this context
return bigCounts.get( count );
}

return count;
}

private static int countValue( int rawCount )
{
return rawCount & COUNT_MASK;
}

private static int incrementCount( ByteArray array, long index, int offset )
private long incrementCount( ByteArray array, long nodeId, int offset )
{
array = array.at( index );
int count = getCount( array, index, offset ) + 1;
assertValidCount( index, count );
array.setInt( index, offset, count );
array = array.at( nodeId );
long count = getCount( array, nodeId, offset ) + 1;
setCount( array, nodeId, offset, count );
return count;
}

Expand Down Expand Up @@ -369,13 +469,13 @@ public void setForwardScan( boolean forward )
* @param direction {@link Direction} to get count for.
* @return count (degree) of the requested relationship chain.
*/
public int getCount( long nodeId, Direction direction )
public long getCount( long nodeId, Direction direction )
{
ByteArray array = this.array.at( nodeId );
if ( isDense( array, nodeId ) )
{ // Indirection into rel group cache
long id = getRelationshipId( array, nodeId );
return id == EMPTY ? 0 : relGroupCache.getAndResetCount( id, direction );
return id == EMPTY ? 0 : relGroupCache.getAndSetCount( id, direction, 0 );
}

return getCount( array, nodeId, SPARSE_COUNT_OFFSET );
Expand All @@ -399,7 +499,7 @@ public interface GroupVisitor

public static final GroupVisitor NO_GROUP_VISITOR = (nodeId, next, out, in, loop) -> -1;

private static class RelGroupCache implements AutoCloseable, MemoryStatsVisitor.Visitable
private class RelGroupCache implements AutoCloseable, MemoryStatsVisitor.Visitable
{
private static final int NEXT_OFFSET = 0;
private static final int BASE_IDS_OFFSET = ID_SIZE;
Expand Down Expand Up @@ -431,7 +531,7 @@ private void clearRelationships( ByteArray array, long relGroupId )
* relationship chain for this node after this point in time, where the count should
* restart from 0.
*/
int getAndResetCount( long id, Direction direction )
long getAndSetCount( long id, Direction direction, long newCount )
{
id = rebase( id );
ByteArray array = this.array.at( id );
Expand All @@ -441,8 +541,8 @@ int getAndResetCount( long id, Direction direction )
}

int offset = countOffset( direction );
int count = NodeRelationshipCache.getCount( array, id, offset );
array.setInt( id, offset, 0 );
long count = getCount( array, id, offset );
setCount( array, id, offset, newCount );
return count;
}

Expand Down Expand Up @@ -476,12 +576,12 @@ private long visitGroup( long nodeId, long relGroupIndex, GroupVisitor visitor )
return nextId;
}

private static int directionOffset( Direction direction )
private int directionOffset( Direction direction )
{
return BASE_IDS_OFFSET + (direction.ordinal() * ID_AND_COUNT_SIZE);
}

private static int countOffset( Direction direction )
private int countOffset( Direction direction )
{
return directionOffset( direction ) + ID_SIZE;
}
Expand Down
Expand Up @@ -328,22 +328,20 @@ public void shouldFailFastOnTooHighCountOnNode() throws Exception
// GIVEN
cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 10, 100, base );
long nodeId = 5;
int count = NodeRelationshipCache.MAX_COUNT - 5;
long count = NodeRelationshipCache.MAX_COUNT - 1;
cache.setHighNodeId( 10 );
cache.setCount( nodeId, count );
cache.setCount( nodeId, count, OUTGOING );

// WHEN
for ( int i = 0; i < 10; i++ )
cache.incrementCount( nodeId );
try
{
try
{
cache.incrementCount( i );
}
catch ( IllegalStateException e )
{
assertEquals( NodeRelationshipCache.MAX_COUNT + 1, i );
break;
}
cache.incrementCount( nodeId );
fail( "Should have failed" );
}
catch ( IllegalStateException e )
{
// THEN Good
}
}

Expand Down Expand Up @@ -402,9 +400,54 @@ public void shouldKeepNextGroupIdForNextRound() throws Exception
}
}

@Test
public void shouldHaveSparseNodesWithBigCounts() throws Exception
{
// GIVEN
cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 1, 100, base );
long nodeId = 1;
cache.setHighNodeId( nodeId + 1 );

// WHEN
long highCount = NodeRelationshipCache.MAX_COUNT - 100;
cache.setCount( nodeId, highCount, OUTGOING );
long nextHighCount = cache.incrementCount( nodeId );

// THEN
assertEquals( highCount + 1, nextHighCount );
}

@Test
public void shouldHaveDenseNodesWithBigCounts() throws Exception
{
// A count of a dense node follow a different path during import, first there's counting per node
// then import goes into actual import of relationships where individual chain degrees are
// kept. So this test will first set a total count, then set count for a specific chain

// GIVEN
cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 1, 100, base );
long nodeId = 1;
cache.setHighNodeId( nodeId + 1 );
cache.setCount( nodeId, 2, OUTGOING ); // surely dense now
cache.getAndPutRelationship( nodeId, OUTGOING, 1, true );
cache.getAndPutRelationship( nodeId, INCOMING, 2, true );

// WHEN
long highCountOut = NodeRelationshipCache.MAX_COUNT - 100;
long highCountIn = NodeRelationshipCache.MAX_COUNT - 50;
cache.setCount( nodeId, highCountOut, OUTGOING );
cache.setCount( nodeId, highCountIn, INCOMING );
cache.getAndPutRelationship( nodeId, OUTGOING, 1, true /*increment count*/ );
cache.getAndPutRelationship( nodeId, INCOMING, 2, true /*increment count*/ );

// THEN
assertEquals( highCountOut + 1, cache.getCount( nodeId, OUTGOING ) );
assertEquals( highCountIn + 1, cache.getCount( nodeId, INCOMING ) );
}

private void testNode( NodeRelationshipCache link, long node, Direction direction )
{
int count = link.getCount( node, direction );
long count = link.getCount( node, direction );
assertEquals( -1, link.getAndPutRelationship( node, direction, 5, false ) );
assertEquals( 5, link.getAndPutRelationship( node, direction, 10, false ) );
assertEquals( count, link.getCount( node, direction ) );
Expand All @@ -422,9 +465,9 @@ private long findNode( NodeRelationshipCache link, long nodeCount, boolean isDen
throw new IllegalArgumentException( "No dense node found" );
}

private int incrementRandomCounts( NodeRelationshipCache link, int nodeCount, int i )
private long incrementRandomCounts( NodeRelationshipCache link, int nodeCount, int i )
{
int highestSeenCount = 0;
long highestSeenCount = 0;
while ( i --> 0 )
{
long node = random.nextInt( nodeCount );
Expand Down

0 comments on commit 6ac8938

Please sign in to comment.