Skip to content

Commit

Permalink
Ability to import > 2.4 billion nodes in import tool
Browse files Browse the repository at this point in the history
Previously there was a limit of how many nodes could be imported in one
import using the import tool. That limit came from the "tracker", the data
stucture used for sorting ids after all had been added, only being capable
of handling int-range values. This to save 4 bytes compared to using long
for that always.

As imports grow bigger this needs to change, so this commit introduces support
for this tracker being long-range, but doesn't penalize < 2.4 billion imports
because the type of data structure can be decided after all nodes have been
imported and at that time it knows how many there are ans so the long-range
data structure is only used for the imports exceeding that threshold.
  • Loading branch information
tinwelint committed Jan 8, 2016
1 parent 797388e commit 545547f
Show file tree
Hide file tree
Showing 15 changed files with 580 additions and 81 deletions.
Expand Up @@ -33,7 +33,7 @@ public static int safeCastLongToInt( long value )
{
if ( value > Integer.MAX_VALUE )
{
throw new ArithmeticException( getOverflowMessage( value, Integer.class ) );
throw new ArithmeticException( getOverflowMessage( value, Integer.TYPE ) );
}
return (int) value;
}
Expand All @@ -42,7 +42,7 @@ public static short safeCastLongToShort( long value )
{
if ( value > Short.MAX_VALUE )
{
throw new ArithmeticException( getOverflowMessage( value, Short.class ) );
throw new ArithmeticException( getOverflowMessage( value, Short.TYPE ) );
}
return (short) value;
}
Expand All @@ -51,7 +51,7 @@ public static byte safeCastLongToByte( long value )
{
if ( value > Byte.MAX_VALUE )
{
throw new ArithmeticException( getOverflowMessage( value, Byte.class ) );
throw new ArithmeticException( getOverflowMessage( value, Byte.TYPE ) );
}
return (byte) value;
}
Expand Down
Expand Up @@ -33,6 +33,7 @@
import org.neo4j.unsafe.impl.batchimport.input.InputRelationship;

import static org.neo4j.unsafe.impl.batchimport.cache.idmapping.string.EncodingIdMapper.NO_MONITOR;
import static org.neo4j.unsafe.impl.batchimport.cache.idmapping.string.TrackerFactories.dynamic;

/**
* Place to instantiate common {@link IdMapper} implementations.
Expand Down Expand Up @@ -94,7 +95,7 @@ public static IdMapper actual()
*/
public static IdMapper strings( NumberArrayFactory cacheFactory )
{
return new EncodingIdMapper( cacheFactory, new StringEncoder(), Radix.STRING, NO_MONITOR );
return new EncodingIdMapper( cacheFactory, new StringEncoder(), Radix.STRING, NO_MONITOR, dynamic() );
}

/**
Expand All @@ -106,6 +107,6 @@ public static IdMapper strings( NumberArrayFactory cacheFactory )
*/
public static IdMapper longs( NumberArrayFactory cacheFactory )
{
return new EncodingIdMapper( cacheFactory, new LongEncoder(), Radix.LONG, NO_MONITOR );
return new EncodingIdMapper( cacheFactory, new LongEncoder(), Radix.LONG, NO_MONITOR, dynamic() );
}
}
@@ -0,0 +1,52 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.unsafe.impl.batchimport.cache.idmapping.string;

import org.neo4j.unsafe.impl.batchimport.cache.MemoryStatsVisitor;
import org.neo4j.unsafe.impl.batchimport.cache.NumberArray;

/**
* Base implementation of {@link Tracker} over a {@link NumberArray}.
*
* @param <ARRAY> type of {@link NumberArray} in this implementation.
*/
abstract class AbstractTracker<ARRAY extends NumberArray> implements Tracker
{
static final int DEFAULT_VALUE = -1;

protected ARRAY array;

protected AbstractTracker( ARRAY array )
{
this.array = array;
}

@Override
public void acceptMemoryStatsVisitor( MemoryStatsVisitor visitor )
{
array.acceptMemoryStatsVisitor( visitor );
}

@Override
public void swap( long fromIndex, long toIndex, int count )
{
array.swap( fromIndex, toIndex, count );
}
}
Expand Up @@ -28,6 +28,7 @@
import org.neo4j.helpers.progress.ProgressListener;
import org.neo4j.unsafe.impl.batchimport.InputIterable;
import org.neo4j.unsafe.impl.batchimport.InputIterator;
import org.neo4j.unsafe.impl.batchimport.Utils;
import org.neo4j.unsafe.impl.batchimport.Utils.CompareType;
import org.neo4j.unsafe.impl.batchimport.cache.IntArray;
import org.neo4j.unsafe.impl.batchimport.cache.LongArray;
Expand All @@ -51,12 +52,13 @@
import static org.neo4j.unsafe.impl.batchimport.cache.idmapping.string.SourceInformation.encodeSourceInformation;

/**
* Maps arbitrary values to long ids. The values can be {@link #put(Object, long) added} in any order,
* but {@link #needsPreparation() needs} {@link #prepare() preparation} in order to {@link #get(Object) get}
* ids back later.
* Maps arbitrary values to long ids. The values can be {@link #put(Object, long, Group) added} in any order,
* but {@link #needsPreparation() needs} {@link #prepare(InputIterable, Collector, ProgressListener) preparation}
* in order to {@link #get(Object, Group) get} ids back later.
*
* In the {@link #prepare() preparation phase} the added entries are sorted according to a number representation
* of each input value and {@link #get(Object)} does simple binary search to find the correct one.
* In the {@link #prepare(InputIterable, Collector, ProgressListener) preparation phase} the added entries are
* sorted according to a number representation of each input value and {@link #get(Object, Group)} does simple
* binary search to find the correct one.
*
* The implementation is space-efficient, much more so than using, say, a {@link HashMap}.
*
Expand Down Expand Up @@ -88,8 +90,7 @@ public class EncodingIdMapper implements IdMapper
public interface Monitor
{
/**
* Number of eIds that have been marked as collisions.
* @param count
* @param count Number of eIds that have been marked as collisions.
*/
void numberOfCollisions( int count );
}
Expand All @@ -115,6 +116,7 @@ public void numberOfCollisions( int count )
private static final long GAP_VALUE = 0;

private final NumberArrayFactory cacheFactory;
private final TrackerFactory trackerFactory;
// Encoded values added in #put, in the order in which they are put. Indexes in the array are the actual node ids,
// values are the encoded versions of the input ids.
private LongArray dataCache;
Expand All @@ -125,7 +127,7 @@ public void numberOfCollisions( int count )
// they end up sorted. Again, dataCache remains unchanged, only the ordering information is kept here.
// Each index in trackerCache points to a dataCache index, where the value in dataCache contains the
// encoded input id, used to match against the input id that is looked up during binary search.
private IntArray trackerCache;
private Tracker trackerCache;
private final Encoder encoder;
private final Radix radix;
private final int processorsForSorting;
Expand All @@ -136,7 +138,7 @@ public void numberOfCollisions( int count )
// These 3 caches below are needed only during duplicate input id detection, but referenced here so
// that the memory visitor can see them when they are active.
private LongArray collisionSourceDataCache;
private IntArray collisionTrackerCache;
private Tracker collisionTrackerCache;

private boolean readyForUse;
private long[][] sortBuckets;
Expand All @@ -147,17 +149,19 @@ public void numberOfCollisions( int count )
private final Factory<Radix> radixFactory;

public EncodingIdMapper( NumberArrayFactory cacheFactory, Encoder encoder, Factory<Radix> radixFactory,
Monitor monitor )
Monitor monitor, TrackerFactory trackerFactory )
{
this( cacheFactory, encoder, radixFactory, monitor, DEFAULT_CACHE_CHUNK_SIZE,
this( cacheFactory, encoder, radixFactory, monitor, trackerFactory, DEFAULT_CACHE_CHUNK_SIZE,
Runtime.getRuntime().availableProcessors() - 1, DEFAULT );
}

public EncodingIdMapper( NumberArrayFactory cacheFactory, Encoder encoder, Factory<Radix> radixFactory,
Monitor monitor, int chunkSize, int processorsForSorting, Comparator comparator )
Monitor monitor, TrackerFactory trackerFactory, int chunkSize, int processorsForSorting,
Comparator comparator )
{
this.monitor = monitor;
this.cacheFactory = cacheFactory;
this.trackerFactory = trackerFactory;
this.comparator = comparator;
this.processorsForSorting = max( processorsForSorting, 1 );
this.dataCache = cacheFactory.newDynamicLongArray( chunkSize, GAP_VALUE );
Expand Down Expand Up @@ -263,7 +267,7 @@ public void prepare( InputIterable<Object> ids, Collector collector, ProgressLis
{
endPreviousGroup();
dataCache = dataCache.fixate();
trackerCache = cacheFactory.newIntArray( highestSetIndex+1, -1 );
trackerCache = trackerFactory.create( cacheFactory, highestSetIndex+1 );

try
{
Expand Down Expand Up @@ -363,8 +367,8 @@ private int detectAndMarkCollisions( ProgressListener progress )
int batch = (int) min( max-i, 10_000 );
for ( int j = 0; j < batch; j++, i++ )
{
int dataIndexA = trackerCache.get( i );
int dataIndexB = trackerCache.get( i+1 );
long dataIndexA = trackerCache.get( i );
long dataIndexB = trackerCache.get( i+1 );
if ( dataIndexA == -1 || dataIndexB == -1 )
{
sameGroupDetector.reset();
Expand All @@ -386,7 +390,7 @@ private int detectAndMarkCollisions( ProgressListener progress )
radixOf( eIdA ) + ":" + radixOf( eIdB ) );
case EQ:
// Here we have two equal encoded values. First let's check if they are in the same id space.
int collision = sameGroupDetector.collisionWithinSameGroup(
long collision = sameGroupDetector.collisionWithinSameGroup(
dataIndexA, groupOf( dataIndexA ).id(),
dataIndexB, groupOf( dataIndexB ).id() );

Expand Down Expand Up @@ -428,7 +432,7 @@ dataIndexA, groupOf( dataIndexA ).id(),
/**
* @return {@code true} if marked as collision in this call, {@code false} if it was already marked as collision.
*/
private boolean markAsCollision( int dataIndex )
private boolean markAsCollision( long dataIndex )
{
long eId = dataCache.get( dataIndex );
boolean isAlreadyMarked = isCollision( eId );
Expand All @@ -450,7 +454,7 @@ private void buildCollisionInfo( InputIterator<Object> ids, int numberOfCollisio
List<String> sourceDescriptions = new ArrayList<>();
String lastSourceDescription = null;
collisionSourceDataCache = cacheFactory.newLongArray( numberOfCollisions, -1 );
collisionTrackerCache = cacheFactory.newIntArray( numberOfCollisions, -1 );
collisionTrackerCache = trackerFactory.create( cacheFactory, numberOfCollisions );
for ( long i = 0; ids.hasNext(); )
{
long j = 0;
Expand Down Expand Up @@ -559,7 +563,7 @@ public long dataValue( long nodeId )
long j = 0;
for ( ; j < 10_000 && i < numberOfCollisions; j++, i++ )
{
int collisionIndex = collisionTrackerCache.get( i );
long collisionIndex = collisionTrackerCache.get( i );
long dataIndex = collisionNodeIdCache.get( collisionIndex );
long eid = dataCache.get( dataIndex );
long sourceInformation = collisionSourceDataCache.get( collisionIndex );
Expand All @@ -574,7 +578,10 @@ public long dataValue( long nodeId )
}

// Potential duplicate
Object inputId = collisionValues.get( collisionIndex );
// We cast the collision index to an int here. This means that we can't support > int-range
// number of collisions. But that's probably alright since the data structures and
// actual collisions values for all these collisions wouldn't fit in a heap anyway.
Object inputId = collisionValues.get( Utils.safeCastLongToInt( collisionIndex ) );
int detectorIndex = detector.add( inputId, sourceInformation );
if ( detectorIndex != -1 )
{ // Duplicate
Expand Down Expand Up @@ -647,7 +654,7 @@ private long binarySearch( long x, Object inputId, long low, long high, int grou
while ( low <= high )
{
long mid = low + (high - low)/2;//(low + high) / 2;
int dataIndex = trackerCache.get( mid );
long dataIndex = trackerCache.get( mid );
if ( dataIndex == -1 )
{
return -1;
Expand Down Expand Up @@ -731,7 +738,7 @@ private long findFromEIdRange( long fromIndex, long toIndex, int groupId, Object
long lowestFound = -1; // lowest data index means "first put"
for ( long index = fromIndex; index <= toIndex; index++ )
{
int dataIndex = trackerCache.get( index );
long dataIndex = trackerCache.get( index );
IdGroup group = groupOf( dataIndex );
if ( groupId == group.id() )
{
Expand Down
@@ -0,0 +1,50 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.unsafe.impl.batchimport.cache.idmapping.string;

import org.neo4j.unsafe.impl.batchimport.Utils;
import org.neo4j.unsafe.impl.batchimport.cache.IntArray;

/**
* {@link Tracker} capable of keeping {@code int} range values, using {@link IntArray}.
* Will fail in {@link #set(long, long)} with {@link ArithmeticException} if trying to put a too big value.
*/
public class IntTracker extends AbstractTracker<IntArray>
{
public IntTracker( IntArray array )
{
super( array );
}

@Override
public long get( long index )
{
return array.get( index );
}

/**
* @throws ArithmeticException if value is bigger than {@link Integer#MAX_VALUE}.
*/
@Override
public void set( long index, long value )
{
array.set( index, Utils.safeCastLongToInt( value ) );
}
}
@@ -0,0 +1,45 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.unsafe.impl.batchimport.cache.idmapping.string;

import org.neo4j.unsafe.impl.batchimport.cache.LongArray;

/**
* {@link Tracker} capable of keeping {@code long} range values, using {@link LongArray}.
*/
public class LongTracker extends AbstractTracker<LongArray>
{
public LongTracker( LongArray array )
{
super( array );
}

@Override
public long get( long index )
{
return array.get( index );
}

@Override
public void set( long index, long value )
{
array.set( index, value );
}
}

0 comments on commit 545547f

Please sign in to comment.