Skip to content

Commit

Permalink
Refactor contents of PBI --> ImportLogic
Browse files Browse the repository at this point in the history
so that the ParallelBatchImporter code is very clear on what happens
and that the different stages become eligible for use in a different implementation.
  • Loading branch information
tinwelint committed Nov 27, 2017
1 parent 600b3ab commit 80c37e4
Show file tree
Hide file tree
Showing 51 changed files with 1,004 additions and 582 deletions.
Expand Up @@ -47,6 +47,7 @@
import org.neo4j.kernel.configuration.Settings; import org.neo4j.kernel.configuration.Settings;
import org.neo4j.kernel.impl.logging.LogService; import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.logging.StoreLogService; import org.neo4j.kernel.impl.logging.StoreLogService;
import org.neo4j.kernel.impl.store.format.RecordFormatSelector;
import org.neo4j.kernel.impl.util.Converters; import org.neo4j.kernel.impl.util.Converters;
import org.neo4j.kernel.impl.util.OsBeanUtil; import org.neo4j.kernel.impl.util.OsBeanUtil;
import org.neo4j.kernel.impl.util.Validator; import org.neo4j.kernel.impl.util.Validator;
Expand All @@ -71,6 +72,7 @@
import org.neo4j.unsafe.impl.batchimport.staging.ExecutionMonitors; import org.neo4j.unsafe.impl.batchimport.staging.ExecutionMonitors;


import static java.nio.charset.Charset.defaultCharset; import static java.nio.charset.Charset.defaultCharset;

import static org.neo4j.graphdb.factory.GraphDatabaseSettings.logs_directory; import static org.neo4j.graphdb.factory.GraphDatabaseSettings.logs_directory;
import static org.neo4j.graphdb.factory.GraphDatabaseSettings.store_internal_log_path; import static org.neo4j.graphdb.factory.GraphDatabaseSettings.store_internal_log_path;
import static org.neo4j.helpers.Exceptions.launderedException; import static org.neo4j.helpers.Exceptions.launderedException;
Expand All @@ -79,6 +81,7 @@
import static org.neo4j.io.ByteUnit.mebiBytes; import static org.neo4j.io.ByteUnit.mebiBytes;
import static org.neo4j.kernel.configuration.Settings.parseLongWithUnit; import static org.neo4j.kernel.configuration.Settings.parseLongWithUnit;
import static org.neo4j.kernel.impl.util.Converters.withDefault; import static org.neo4j.kernel.impl.util.Converters.withDefault;
import static org.neo4j.unsafe.impl.batchimport.AdditionalInitialIds.EMPTY;
import static org.neo4j.unsafe.impl.batchimport.Configuration.BAD_FILE_NAME; import static org.neo4j.unsafe.impl.batchimport.Configuration.BAD_FILE_NAME;
import static org.neo4j.unsafe.impl.batchimport.Configuration.DEFAULT; import static org.neo4j.unsafe.impl.batchimport.Configuration.DEFAULT;
import static org.neo4j.unsafe.impl.batchimport.Configuration.DEFAULT_MAX_MEMORY_PERCENT; import static org.neo4j.unsafe.impl.batchimport.Configuration.DEFAULT_MAX_MEMORY_PERCENT;
Expand Down Expand Up @@ -479,7 +482,7 @@ idType, csvConfiguration( args, defaultSettingsSuitableForTests ), badCollector,
} }
} }


private static Long parseMaxMemory( String maxMemoryString ) static Long parseMaxMemory( String maxMemoryString )
{ {
if ( maxMemoryString != null ) if ( maxMemoryString != null )
{ {
Expand Down Expand Up @@ -517,10 +520,13 @@ public static void doImport( PrintStream out, PrintStream err, File storeDir, Fi
life.start(); life.start();
BatchImporter importer = new ParallelBatchImporter( storeDir, BatchImporter importer = new ParallelBatchImporter( storeDir,
fs, fs,
null, // no external page cache
configuration, configuration,
logService, logService,
ExecutionMonitors.defaultVisible(), ExecutionMonitors.defaultVisible(),
dbConfig ); EMPTY,
dbConfig,
RecordFormatSelector.selectForConfig( dbConfig, logService.getInternalLogProvider() ) );
printOverview( storeDir, nodesFiles, relationshipsFiles, configuration, out ); printOverview( storeDir, nodesFiles, relationshipsFiles, configuration, out );
success = false; success = false;
try try
Expand Down Expand Up @@ -605,7 +611,7 @@ private static Config loadDbConfig( File file ) throws IOException
return file != null && file.exists() ? Config.defaults( MapUtil.load( file ) ) : Config.defaults(); return file != null && file.exists() ? Config.defaults( MapUtil.load( file ) ) : Config.defaults();
} }


private static void printOverview( File storeDir, Collection<Option<File[]>> nodesFiles, static void printOverview( File storeDir, Collection<Option<File[]>> nodesFiles,
Collection<Option<File[]>> relationshipsFiles, Collection<Option<File[]>> relationshipsFiles,
org.neo4j.unsafe.impl.batchimport.Configuration configuration, PrintStream out ) org.neo4j.unsafe.impl.batchimport.Configuration configuration, PrintStream out )
{ {
Expand Down
Expand Up @@ -32,20 +32,25 @@
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.logging.SimpleLogService; import org.neo4j.kernel.impl.logging.SimpleLogService;
import org.neo4j.kernel.impl.store.format.RecordFormatSelector;
import org.neo4j.logging.FormattedLogProvider; import org.neo4j.logging.FormattedLogProvider;
import org.neo4j.unsafe.impl.batchimport.BatchImporter; import org.neo4j.unsafe.impl.batchimport.BatchImporter;
import org.neo4j.unsafe.impl.batchimport.ParallelBatchImporter; import org.neo4j.unsafe.impl.batchimport.ParallelBatchImporter;
import org.neo4j.unsafe.impl.batchimport.input.Collector; import org.neo4j.unsafe.impl.batchimport.input.Collector;
import org.neo4j.unsafe.impl.batchimport.input.DataGeneratorInput;
import org.neo4j.unsafe.impl.batchimport.input.Input; import org.neo4j.unsafe.impl.batchimport.input.Input;
import org.neo4j.unsafe.impl.batchimport.input.SimpleDataGenerator;
import org.neo4j.unsafe.impl.batchimport.input.csv.Configuration; import org.neo4j.unsafe.impl.batchimport.input.csv.Configuration;
import org.neo4j.unsafe.impl.batchimport.input.csv.Header; import org.neo4j.unsafe.impl.batchimport.input.csv.Header;
import org.neo4j.unsafe.impl.batchimport.input.csv.IdType; import org.neo4j.unsafe.impl.batchimport.input.csv.IdType;


import static java.lang.System.currentTimeMillis; import static java.lang.System.currentTimeMillis;

import static org.neo4j.graphdb.factory.GraphDatabaseSettings.dense_node_threshold; import static org.neo4j.graphdb.factory.GraphDatabaseSettings.dense_node_threshold;
import static org.neo4j.kernel.configuration.Settings.parseLongWithUnit; import static org.neo4j.kernel.configuration.Settings.parseLongWithUnit;
import static org.neo4j.tooling.DataGeneratorInput.bareboneNodeHeader; import static org.neo4j.unsafe.impl.batchimport.AdditionalInitialIds.EMPTY;
import static org.neo4j.tooling.DataGeneratorInput.bareboneRelationshipHeader; import static org.neo4j.unsafe.impl.batchimport.input.DataGeneratorInput.bareboneNodeHeader;
import static org.neo4j.unsafe.impl.batchimport.input.DataGeneratorInput.bareboneRelationshipHeader;
import static org.neo4j.unsafe.impl.batchimport.input.csv.Configuration.COMMAS; import static org.neo4j.unsafe.impl.batchimport.input.csv.Configuration.COMMAS;
import static org.neo4j.unsafe.impl.batchimport.input.csv.DataFactories.defaultFormatNodeFileHeader; import static org.neo4j.unsafe.impl.batchimport.input.csv.DataFactories.defaultFormatNodeFileHeader;
import static org.neo4j.unsafe.impl.batchimport.input.csv.DataFactories.defaultFormatRelationshipFileHeader; import static org.neo4j.unsafe.impl.batchimport.input.csv.DataFactories.defaultFormatRelationshipFileHeader;
Expand Down Expand Up @@ -130,6 +135,13 @@ public long pageCacheMemory()
{ {
return pageCacheMemory; return pageCacheMemory;
} }

@Override
public long maxMemoryUsage()
{
String custom = args.get( ImportTool.Options.MAX_MEMORY.key(), null );
return custom != null ? ImportTool.parseMaxMemory( custom ) : DEFAULT.maxMemoryUsage();
}
}; };


float factorBadNodeData = args.getNumber( "factor-bad-node-data", 0 ).floatValue(); float factorBadNodeData = args.getNumber( "factor-bad-node-data", 0 ).floatValue();
Expand All @@ -151,8 +163,9 @@ public long pageCacheMemory()
} }
else else
{ {
consumer = new ParallelBatchImporter( dir, fileSystem, importConfig, consumer = new ParallelBatchImporter( dir, fileSystem, null, importConfig,
new SimpleLogService( sysoutLogProvider, sysoutLogProvider ), defaultVisible(), dbConfig ); new SimpleLogService( sysoutLogProvider, sysoutLogProvider ), defaultVisible(), EMPTY, dbConfig,
RecordFormatSelector.selectForConfig( dbConfig, sysoutLogProvider ) );
} }
consumer.doImport( input ); consumer.doImport( input );
} }
Expand Down
Expand Up @@ -39,10 +39,12 @@
*/ */
public class CountGroupsStage extends Stage public class CountGroupsStage extends Stage
{ {
public static final String NAME = "Count groups";

public CountGroupsStage( Configuration config, RecordStore<RelationshipGroupRecord> store, public CountGroupsStage( Configuration config, RecordStore<RelationshipGroupRecord> store,
RelationshipGroupCache groupCache ) RelationshipGroupCache groupCache )
{ {
super( "Count groups", config ); super( NAME, null, config, 0 );
add( new BatchFeedStep( control(), config, allIn( store, config ), store.getRecordSize() ) ); add( new BatchFeedStep( control(), config, allIn( store, config ), store.getRecordSize() ) );
add( new ReadRecordsStep<>( control(), config, false, store, null ) ); add( new ReadRecordsStep<>( control(), config, false, store, null ) );
add( new CountGroupsStep( control(), config, groupCache ) ); add( new CountGroupsStep( control(), config, groupCache ) );
Expand Down
Expand Up @@ -36,7 +36,7 @@ public class DeleteDuplicateNodesStage extends Stage
public DeleteDuplicateNodesStage( Configuration config, PrimitiveLongIterator duplicateNodeIds, public DeleteDuplicateNodesStage( Configuration config, PrimitiveLongIterator duplicateNodeIds,
BatchingNeoStores neoStore ) BatchingNeoStores neoStore )
{ {
super( "DEDUP", config ); super( "DEDUP", null, config, 0 );
add( new DeleteDuplicateNodesStep( control(), config, duplicateNodeIds, add( new DeleteDuplicateNodesStep( control(), config, duplicateNodeIds,
neoStore.getNodeStore(), neoStore.getLabelScanStore() ) ); neoStore.getNodeStore(), neoStore.getLabelScanStore() ) );
} }
Expand Down
Expand Up @@ -34,10 +34,12 @@
*/ */
public class IdMapperPreparationStage extends Stage public class IdMapperPreparationStage extends Stage
{ {
public static final String NAME = "Prepare node index";

public IdMapperPreparationStage( Configuration config, IdMapper idMapper, InputIterable<InputNode> nodes, public IdMapperPreparationStage( Configuration config, IdMapper idMapper, InputIterable<InputNode> nodes,
Collector collector, StatsProvider memoryUsageStats ) Collector collector, StatsProvider memoryUsageStats )
{ {
super( "Prepare node index", config ); super( NAME, null, config, 0 );
add( new IdMapperPreparationStep( control(), config, add( new IdMapperPreparationStep( control(), config,
idMapper, idsOf( nodes ), collector, memoryUsageStats ) ); idMapper, idsOf( nodes ), collector, memoryUsageStats ) );
} }
Expand Down

0 comments on commit 80c37e4

Please sign in to comment.