Skip to content

Commit

Permalink
Bunch of PR fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
tinwelint committed Nov 30, 2017
1 parent a4ea4b7 commit 049d9a6
Show file tree
Hide file tree
Showing 9 changed files with 26 additions and 38 deletions.
Expand Up @@ -22,6 +22,7 @@
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.lang.Thread.State; import java.lang.Thread.State;
import java.lang.Thread.UncaughtExceptionHandler;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.util.function.Predicate; import java.util.function.Predicate;
Expand All @@ -30,6 +31,10 @@


public class Exceptions public class Exceptions
{ {
public static final UncaughtExceptionHandler SILENT_UNCAUGHT_EXCEPTION_HANDLER = ( t, e ) ->
{ // Don't print about it
};

private static final String UNEXPECTED_MESSAGE = "Unexpected Exception"; private static final String UNEXPECTED_MESSAGE = "Unexpected Exception";


public static <T extends Throwable> T withCause( T exception, Throwable cause ) public static <T extends Throwable> T withCause( T exception, Throwable cause )
Expand Down
Expand Up @@ -22,7 +22,6 @@
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.StringReader; import java.io.StringReader;

import org.neo4j.csv.reader.CharSeeker; import org.neo4j.csv.reader.CharSeeker;
import org.neo4j.csv.reader.CharSeekers; import org.neo4j.csv.reader.CharSeekers;
import org.neo4j.csv.reader.Extractors; import org.neo4j.csv.reader.Extractors;
Expand All @@ -45,6 +44,7 @@
import org.neo4j.unsafe.impl.batchimport.input.csv.IdType; import org.neo4j.unsafe.impl.batchimport.input.csv.IdType;


import static java.lang.System.currentTimeMillis; import static java.lang.System.currentTimeMillis;
import static java.util.Collections.emptyList;


import static org.neo4j.graphdb.factory.GraphDatabaseSettings.dense_node_threshold; import static org.neo4j.graphdb.factory.GraphDatabaseSettings.dense_node_threshold;
import static org.neo4j.kernel.configuration.Settings.parseLongWithUnit; import static org.neo4j.kernel.configuration.Settings.parseLongWithUnit;
Expand Down Expand Up @@ -139,7 +139,7 @@ public long pageCacheMemory()
@Override @Override
public long maxMemoryUsage() public long maxMemoryUsage()
{ {
String custom = args.get( ImportTool.Options.MAX_MEMORY.key(), null ); String custom = args.get( ImportTool.Options.MAX_MEMORY.key(), (String) ImportTool.Options.MAX_MEMORY.defaultValue() );
return custom != null ? ImportTool.parseMaxMemory( custom ) : DEFAULT.maxMemoryUsage(); return custom != null ? ImportTool.parseMaxMemory( custom ) : DEFAULT.maxMemoryUsage();
} }
}; };
Expand All @@ -166,6 +166,7 @@ public long maxMemoryUsage()
consumer = new ParallelBatchImporter( dir, fileSystem, null, importConfig, consumer = new ParallelBatchImporter( dir, fileSystem, null, importConfig,
new SimpleLogService( sysoutLogProvider, sysoutLogProvider ), defaultVisible(), EMPTY, dbConfig, new SimpleLogService( sysoutLogProvider, sysoutLogProvider ), defaultVisible(), EMPTY, dbConfig,
RecordFormatSelector.selectForConfig( dbConfig, sysoutLogProvider ) ); RecordFormatSelector.selectForConfig( dbConfig, sysoutLogProvider ) );
ImportTool.printOverview( dir, emptyList(), emptyList(), importConfig, System.out );
} }
consumer.doImport( input ); consumer.doImport( input );
} }
Expand Down
Expand Up @@ -120,9 +120,6 @@ public class ImportLogic implements Closeable
private long availableMemoryForLinking; private long availableMemoryForLinking;


/** /**
* Advanced usage of the parallel batch importer, for special and very specific cases. Please use
* a constructor with fewer arguments instead.
*
* @param storeDir directory which the db will be created in. * @param storeDir directory which the db will be created in.
* @param fileSystem {@link FileSystemAbstraction} that the {@code storeDir} lives in. * @param fileSystem {@link FileSystemAbstraction} that the {@code storeDir} lives in.
* @param neoStore {@link BatchingNeoStores} to import into. * @param neoStore {@link BatchingNeoStores} to import into.
Expand Down Expand Up @@ -368,6 +365,20 @@ public int linkRelationships( int startingFromType )
return upToType; return upToType;
} }


/**
* Links relationships of all types, potentially doing multiple passes, each pass calling {@link #linkRelationships(int)}
* with a type range.
*/
public void linkRelationshipsOfAllTypes()
{
int type = 0;
do
{
type = linkRelationships( type );
}
while ( type != -1 );
}

/** /**
* Convenience method (for code reading) to have a zero-based value become one based (for printing/logging). * Convenience method (for code reading) to have a zero-based value become one based (for printing/logging).
*/ */
Expand Down Expand Up @@ -461,7 +472,6 @@ public void close() throws IOException
idMapper.close(); idMapper.close();
} }
inputCache.close(); inputCache.close();
// TODO close badCollector here instead of in import tool?
} }


private void updatePeakMemoryUsage() private void updatePeakMemoryUsage()
Expand Down
Expand Up @@ -83,14 +83,7 @@ public void doImport( Input input ) throws IOException
logic.prepareIdMapper(); logic.prepareIdMapper();
logic.importRelationships(); logic.importRelationships();
logic.calculateNodeDegrees(); logic.calculateNodeDegrees();

logic.linkRelationshipsOfAllTypes();
int type = 0;
do
{
type = logic.linkRelationships( type );
}
while ( type != -1 );

logic.defragmentRelationshipGroups(); logic.defragmentRelationshipGroups();
logic.buildCountsStore(); logic.buildCountsStore();


Expand Down
Expand Up @@ -29,7 +29,6 @@
import org.neo4j.unsafe.impl.batchimport.InputIterable; import org.neo4j.unsafe.impl.batchimport.InputIterable;
import org.neo4j.unsafe.impl.batchimport.InputIterator; import org.neo4j.unsafe.impl.batchimport.InputIterator;
import org.neo4j.unsafe.impl.batchimport.Utils.CompareType; import org.neo4j.unsafe.impl.batchimport.Utils.CompareType;
import org.neo4j.unsafe.impl.batchimport.cache.IntArray;
import org.neo4j.unsafe.impl.batchimport.cache.LongArray; import org.neo4j.unsafe.impl.batchimport.cache.LongArray;
import org.neo4j.unsafe.impl.batchimport.cache.LongBitsManipulator; import org.neo4j.unsafe.impl.batchimport.cache.LongBitsManipulator;
import org.neo4j.unsafe.impl.batchimport.cache.MemoryStatsVisitor; import org.neo4j.unsafe.impl.batchimport.cache.MemoryStatsVisitor;
Expand Down Expand Up @@ -829,21 +828,6 @@ private long findFromEIdRange( long fromIndex, long toIndex, int groupId, Object
return lowestFound; return lowestFound;
} }


static boolean compareDataCache( LongArray dataCache, IntArray tracker, int a, int b, CompareType compareType )
{
int indexA = tracker.get( a );
int indexB = tracker.get( b );
if ( indexA == ID_NOT_FOUND || indexB == ID_NOT_FOUND )
{
return false;
}

return unsignedCompare(
clearCollision( dataCache.get( indexA ) ),
clearCollision( dataCache.get( indexB ) ),
compareType );
}

@Override @Override
public void acceptMemoryStatsVisitor( MemoryStatsVisitor visitor ) public void acceptMemoryStatsVisitor( MemoryStatsVisitor visitor )
{ {
Expand Down
Expand Up @@ -19,7 +19,6 @@
*/ */
package org.neo4j.unsafe.impl.batchimport.executor; package org.neo4j.unsafe.impl.batchimport.executor;


import java.lang.Thread.UncaughtExceptionHandler;
import java.lang.reflect.Array; import java.lang.reflect.Array;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
Expand All @@ -32,6 +31,8 @@
import static java.lang.Integer.max; import static java.lang.Integer.max;
import static java.lang.Integer.min; import static java.lang.Integer.min;
import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;

import static org.neo4j.helpers.Exceptions.SILENT_UNCAUGHT_EXCEPTION_HANDLER;
import static org.neo4j.helpers.Exceptions.launderedException; import static org.neo4j.helpers.Exceptions.launderedException;


/** /**
Expand Down Expand Up @@ -193,10 +194,6 @@ private void parkAWhile()
parkStrategy.park( Thread.currentThread() ); parkStrategy.park( Thread.currentThread() );
} }


public static final UncaughtExceptionHandler SILENT_UNCAUGHT_EXCEPTION_HANDLER = ( t, e ) ->
{ // Don't print about it
};

private class Processor extends Thread private class Processor extends Thread
{ {
// In addition to the global shutDown flag in the executor each processor has a local flag // In addition to the global shutDown flag in the executor each processor has a local flag
Expand Down
Expand Up @@ -28,7 +28,7 @@
import org.neo4j.unsafe.impl.batchimport.stats.Stat; import org.neo4j.unsafe.impl.batchimport.stats.Stat;
import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider; import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider;


import static org.neo4j.unsafe.impl.batchimport.executor.DynamicTaskExecutor.SILENT_UNCAUGHT_EXCEPTION_HANDLER; import static org.neo4j.helpers.Exceptions.SILENT_UNCAUGHT_EXCEPTION_HANDLER;


/** /**
* Step that generally sits first in a {@link Stage} and produces batches that will flow downstream * Step that generally sits first in a {@link Stage} and produces batches that will flow downstream
Expand Down
Expand Up @@ -65,7 +65,6 @@ public void shouldSplitUpRelationshipTypesInBatches() throws Exception
numberOfRelationships += count; numberOfRelationships += count;
} }
types.sort( ( t1, t2 ) -> Long.compare( t2.other(), t1.other() ) ); types.sort( ( t1, t2 ) -> Long.compare( t2.other(), t1.other() ) );
@SuppressWarnings( "unchecked" )
RelationshipTypeDistribution typeDistribution = new RelationshipTypeDistribution( types.stream().toArray( Pair[]::new ) ); RelationshipTypeDistribution typeDistribution = new RelationshipTypeDistribution( types.stream().toArray( Pair[]::new ) );


// WHEN enough memory for all types // WHEN enough memory for all types
Expand Down
Expand Up @@ -56,7 +56,6 @@ public void startBatchingNeoStoreWithMetricsPluginEnabled() throws Exception
.batchingNeoStores( fileSystem, storeDir, RecordFormatSelector.defaultFormat(), Configuration.DEFAULT, .batchingNeoStores( fileSystem, storeDir, RecordFormatSelector.defaultFormat(), Configuration.DEFAULT,
logService, AdditionalInitialIds.EMPTY, config ) ) logService, AdditionalInitialIds.EMPTY, config ) )
{ {
// empty block
batchingNeoStores.createNew(); batchingNeoStores.createNew();
} }
provider.assertNone( AssertableLogProvider.inLog( MetricsExtension.class ).any() ); provider.assertNone( AssertableLogProvider.inLog( MetricsExtension.class ).any() );
Expand Down

0 comments on commit 049d9a6

Please sign in to comment.