Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/2.3' into 3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
tinwelint committed Feb 9, 2016
2 parents 9cbddf0 + 5b9670c commit 9d58e54
Show file tree
Hide file tree
Showing 12 changed files with 129 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public void shouldImportCsvData() throws Exception
ExecutionMonitor processorAssigner = eagerRandomSaturation( config.maxNumberOfProcessors() );
final BatchImporter inserter = new ParallelBatchImporter( directory.graphDbDir(),
new DefaultFileSystemAbstraction(), config, NullLogService.getInstance(),
processorAssigner, EMPTY );
processorAssigner, EMPTY, new Config() );

boolean successful = false;
IdGroupDistribution groups = new IdGroupDistribution( NODE_COUNT, 5, random.random() );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public void shouldImportCsvData() throws Exception
ExecutionMonitor processorAssigner = eagerRandomSaturation( config.maxNumberOfProcessors() );
final BatchImporter inserter = new ParallelBatchImporter( directory.graphDbDir(),
new DefaultFileSystemAbstraction(), config, NullLogService.getInstance(),
processorAssigner, EMPTY );
processorAssigner, EMPTY, new Config() );

boolean successful = false;
IdGroupDistribution groups = new IdGroupDistribution( NODE_COUNT, 5, random.random() );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,19 @@
import java.util.function.Function;

import org.neo4j.csv.reader.IllegalMultilineFieldException;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.helpers.Args;
import org.neo4j.helpers.Args.Option;
import org.neo4j.helpers.ArrayUtil;
import org.neo4j.helpers.Exceptions;
import org.neo4j.helpers.Strings;
import org.neo4j.helpers.collection.IterableWrapper;
import org.neo4j.helpers.collection.Iterables;
import org.neo4j.helpers.collection.MapUtil;
import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.Version;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.logging.StoreLogService;
import org.neo4j.kernel.impl.storemigration.FileOperation;
Expand All @@ -67,6 +70,7 @@
import org.neo4j.unsafe.impl.batchimport.staging.ExecutionMonitors;

import static java.nio.charset.Charset.defaultCharset;

import static org.neo4j.helpers.Exceptions.launderedException;
import static org.neo4j.helpers.Format.bytes;
import static org.neo4j.helpers.Strings.TAB;
Expand Down Expand Up @@ -179,7 +183,16 @@ enum Options
"<true/false>",
"Whether or not to ignore extra columns in the data not specified by the header. "
+ "Skipped columns will be logged, containing at most number of entities specified by "
+ BAD_TOLERANCE.key() + "." );
+ BAD_TOLERANCE.key() + "." ),
DATABASE_CONFIG( "db-config", null,
"<path/to/neo4j.properties>",
"(advanced) File specifying database-specific configuration. For more information consult "
+ "manual about available configuration options for a neo4j configuration file. "
+ "Only configuration affecting store at time of creation will be read. "
+ "Examples of supported config are:\n"
+ GraphDatabaseSettings.dense_node_threshold.name() + "\n"
+ GraphDatabaseSettings.string_block_size.name() + "\n"
+ GraphDatabaseSettings.array_block_size.name() );

private final String key;
private final Object defaultValue;
Expand Down Expand Up @@ -302,14 +315,17 @@ public static void main( String[] incomingArguments, boolean defaultSettingsSuit
int badTolerance;
Charset inputEncoding;
boolean skipBadRelationships, skipDuplicateNodes, ignoreExtraColumns;
Config dbConfig;
OutputStream badOutput = null;

boolean success = false;
try
{
storeDir = args.interpretOption( Options.STORE_DIR.key(), Converters.<File>mandatory(),
Converters.toFile(), Validators.DIRECTORY_IS_WRITABLE, Validators.CONTAINS_NO_EXISTING_DATABASE );

File badFile = new File( storeDir, BAD_FILE_NAME );
OutputStream badOutput = new BufferedOutputStream( fs.openAsOutputStream( badFile, false ) );
badOutput = new BufferedOutputStream( fs.openAsOutputStream( badFile, false ) );
nodesFiles = INPUT_FILES_EXTRACTOR.apply( args, Options.NODE_DATA.key() );
relationshipsFiles = INPUT_FILES_EXTRACTOR.apply( args, Options.RELATIONSHIP_DATA.key() );
validateInputFiles( nodesFiles, relationshipsFiles );
Expand All @@ -333,6 +349,9 @@ public static void main( String[] incomingArguments, boolean defaultSettingsSuit
input = new CsvInput( nodeData( inputEncoding, nodesFiles ), defaultFormatNodeFileHeader(),
relationshipData( inputEncoding, relationshipsFiles ), defaultFormatRelationshipFileHeader(),
idType, csvConfiguration( args, defaultSettingsSuitableForTests ), badCollector );
dbConfig = loadDbConfig( args.interpretOption( Options.DATABASE_CONFIG.key(), Converters.<File>optional(),
Converters.toFile(), Validators.REGEX_FILE_EXISTS ) );
success = true;
}
catch ( IllegalArgumentException e )
{
Expand All @@ -342,20 +361,28 @@ public static void main( String[] incomingArguments, boolean defaultSettingsSuit
{
throw andPrintError( "File error", e, false );
}
finally
{
if ( !success && badOutput != null )
{
badOutput.close();
}
}

LifeSupport life = new LifeSupport();

LogService logService = life.add( StoreLogService.inStoreDirectory( fs, storeDir ) );

life.start();
org.neo4j.unsafe.impl.batchimport.Configuration configuration =
importConfiguration( processors, defaultSettingsSuitableForTests );
importConfiguration( processors, defaultSettingsSuitableForTests, dbConfig );
BatchImporter importer = new ParallelBatchImporter( storeDir,
configuration,
logService,
ExecutionMonitors.defaultVisible() );
ExecutionMonitors.defaultVisible(),
dbConfig );
printOverview( storeDir, nodesFiles, relationshipsFiles );
boolean success = false;
success = false;
try
{
importer.doImport( input );
Expand All @@ -368,6 +395,7 @@ public static void main( String[] incomingArguments, boolean defaultSettingsSuit
finally
{
input.badCollector().close();
badOutput.close();

if ( input.badCollector().badEntries() > 0 )
{
Expand Down Expand Up @@ -400,6 +428,11 @@ public static void main( String[] incomingArguments, boolean defaultSettingsSuit
}
}

private static Config loadDbConfig( File file ) throws IOException
{
return file != null && file.exists() ? new Config( MapUtil.load( file ) ) : new Config();
}

private static void printOverview( File storeDir, Collection<Option<File[]>> nodesFiles,
Collection<Option<File[]>> relationshipsFiles )
{
Expand Down Expand Up @@ -458,7 +491,7 @@ private static void validateInputFiles( Collection<Option<File[]>> nodesFiles,
}

private static org.neo4j.unsafe.impl.batchimport.Configuration importConfiguration( final Number processors,
final boolean defaultSettingsSuitableForTests )
final boolean defaultSettingsSuitableForTests, final Config dbConfig )
{
return new org.neo4j.unsafe.impl.batchimport.Configuration.Default()
{
Expand All @@ -475,6 +508,12 @@ public int maxNumberOfProcessors()
{
return processors != null ? processors.intValue() : super.maxNumberOfProcessors();
}

@Override
public int denseNodeThreshold()
{
return dbConfig.get( GraphDatabaseSettings.dense_node_threshold );
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,15 @@
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.helpers.collection.FilteringIterator;
import org.neo4j.helpers.collection.IteratorUtil;
import org.neo4j.helpers.collection.PrefetchingIterator;
import org.neo4j.io.fs.FileUtils;
import org.neo4j.kernel.Version;
import org.neo4j.kernel.impl.storageengine.impl.recordstorage.RecordStorageEngine;
import org.neo4j.kernel.impl.store.NeoStores;
import org.neo4j.kernel.impl.store.format.lowlimit.LowLimit;
import org.neo4j.kernel.impl.util.Validator;
import org.neo4j.kernel.impl.util.Validators;
import org.neo4j.test.EmbeddedDatabaseRule;
Expand Down Expand Up @@ -82,6 +86,8 @@
import static org.neo4j.helpers.collection.IteratorUtil.count;
import static org.neo4j.helpers.collection.IteratorUtil.single;
import static org.neo4j.helpers.collection.IteratorUtil.singleOrNull;
import static org.neo4j.helpers.collection.MapUtil.store;
import static org.neo4j.helpers.collection.MapUtil.stringMap;
import static org.neo4j.tooling.GlobalGraphOperations.at;
import static org.neo4j.tooling.ImportTool.MULTI_FILE_DELIMITER;

Expand Down Expand Up @@ -1520,8 +1526,33 @@ public void shouldBeEquivalentToUseRawAsciiOrCharacterAsQuoteConfiguration2() th
}
}

private File writeArrayCsv( String[] headers, String[] values )
throws FileNotFoundException
@Test
public void shouldRespectDbConfig() throws Exception
{
// GIVEN
int arrayBlockSize = 10;
int stringBlockSize = 12;
File dbConfig = file( "neo4j.properties" );
store( stringMap(
GraphDatabaseSettings.array_block_size.name(), String.valueOf( arrayBlockSize ),
GraphDatabaseSettings.string_block_size.name(), String.valueOf( stringBlockSize ) ), dbConfig );
List<String> nodeIds = nodeIds();

// WHEN
importTool(
"--into", dbRule.getStoreDirAbsolutePath(),
"--db-config", dbConfig.getAbsolutePath(),
"--nodes", nodeData( true, Configuration.COMMAS, nodeIds, (value) -> true ).getAbsolutePath() );

// THEN
NeoStores stores = dbRule.getGraphDatabaseAPI().getDependencyResolver()
.resolveDependency( RecordStorageEngine.class ).testAccessNeoStores();
int headerSize = LowLimit.RECORD_FORMATS.dynamic().getRecordHeaderSize();
assertEquals( arrayBlockSize + headerSize, stores.getPropertyStore().getArrayStore().getRecordSize() );
assertEquals( stringBlockSize + headerSize, stores.getPropertyStore().getStringStore().getRecordSize() );
}

private File writeArrayCsv( String[] headers, String[] values ) throws FileNotFoundException
{
File data = file( fileName( "whitespace.csv" ) );
try ( PrintStream writer = new PrintStream( data ) )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.neo4j.csv.reader.Extractors;
import org.neo4j.csv.reader.Readables;
import org.neo4j.helpers.Args;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.logging.SimpleLogService;
import org.neo4j.logging.FormattedLogProvider;
import org.neo4j.unsafe.impl.batchimport.BatchImporter;
Expand Down Expand Up @@ -85,7 +86,7 @@ public static void main( String[] arguments ) throws IOException
COMMAS, nodeCount, relationshipCount, new Groups(), idType, labelCount, relationshipTypeCount,
silentBadCollector( 0 ));
BatchImporter importer = new ParallelBatchImporter( dir, DEFAULT,
new SimpleLogService( sysoutLogProvider, sysoutLogProvider ), defaultVisible() );
new SimpleLogService( sysoutLogProvider, sysoutLogProvider ), defaultVisible(), new Config() );
importer.doImport( input );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ private void migrateWithBatchImporter( File storeDir, File migrationDir, long la
BatchImporter importer = new ParallelBatchImporter( migrationDir.getAbsoluteFile(), fileSystem,
importConfig, logService,
withDynamicProcessorAssignment( migrationBatchImporterMonitor( legacyStore, progressMonitor,
importConfig ), importConfig ), additionalInitialIds );
importConfig ), importConfig ), additionalInitialIds, config );
InputIterable<InputNode> nodes = legacyNodesAsInput( legacyStore );
InputIterable<InputRelationship> relationships = legacyRelationshipsAsInput( legacyStore );
File badFile = new File( storeDir, Configuration.BAD_FILE_NAME );
Expand Down Expand Up @@ -418,7 +418,7 @@ private void prepareBatchImportMigration( File storeDir, File migrationDir ) thr
// that dynamic record store over before doing the "batch import".
// Copying this file just as-is assumes that the format hasn't change. If that happens we're in
// a different situation, where we first need to migrate this file.
BatchingNeoStores.createStore( fileSystem, migrationDir.getPath() );
BatchingNeoStores.createStore( fileSystem, migrationDir.getPath(), config );
Iterable<StoreFile> storeFiles = iterable( StoreFile.NODE_LABEL_STORE );
StoreFile.fileOperation( COPY, fileSystem, storeDir, migrationDir, storeFiles,
true, // OK if it's not there (1.9)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.neo4j.helpers.Format;
import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.api.CountsAccessor;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.logging.Log;
Expand Down Expand Up @@ -71,19 +72,21 @@ public class ParallelBatchImporter implements BatchImporter
private final Log log;
private final ExecutionMonitor executionMonitor;
private final AdditionalInitialIds additionalInitialIds;
private final Config dbConfig;

/**
* Advanced usage of the parallel batch importer, for special and very specific cases. Please use
* a constructor with fewer arguments instead.
*/
public ParallelBatchImporter( File storeDir, FileSystemAbstraction fileSystem, Configuration config,
LogService logService, ExecutionMonitor executionMonitor,
AdditionalInitialIds additionalInitialIds )
AdditionalInitialIds additionalInitialIds, Config dbConfig )
{
this.storeDir = storeDir;
this.fileSystem = fileSystem;
this.config = config;
this.logService = logService;
this.dbConfig = dbConfig;
this.log = logService.getInternalLogProvider().getLog( getClass() );
this.executionMonitor = executionMonitor;
this.additionalInitialIds = additionalInitialIds;
Expand All @@ -95,10 +98,10 @@ public ParallelBatchImporter( File storeDir, FileSystemAbstraction fileSystem, C
* optimal assignment of processors to bottleneck steps over time.
*/
public ParallelBatchImporter( File storeDir, Configuration config, LogService logService,
ExecutionMonitor executionMonitor )
ExecutionMonitor executionMonitor, Config dbConfig )
{
this( storeDir, new DefaultFileSystemAbstraction(), config, logService,
withDynamicProcessorAssignment( executionMonitor, config ), EMPTY );
withDynamicProcessorAssignment( executionMonitor, config ), EMPTY, dbConfig );
}

@Override
Expand All @@ -115,7 +118,7 @@ public void doImport( Input input ) throws IOException
File badFile = new File( storeDir, Configuration.BAD_FILE_NAME );
CountingStoreUpdateMonitor storeUpdateMonitor = new CountingStoreUpdateMonitor();
try ( BatchingNeoStores neoStore =
new BatchingNeoStores( fileSystem, storeDir, config, logService, additionalInitialIds );
new BatchingNeoStores( fileSystem, storeDir, config, logService, additionalInitialIds, dbConfig );
CountsAccessor.Updater countsUpdater = neoStore.getCountsStore().reset(
neoStore.getLastCommittedTransactionId() );
InputCache inputCache = new InputCache( fileSystem, storeDir ) )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,12 @@ public class BatchingNeoStores implements AutoCloseable
private final IoTracer ioTracer;

public BatchingNeoStores( FileSystemAbstraction fileSystem, File storeDir, Configuration config,
LogService logService, AdditionalInitialIds initialIds )
LogService logService, AdditionalInitialIds initialIds, Config dbConfig )
{
this.fileSystem = fileSystem;
this.logProvider = logService.getInternalLogProvider();
this.storeDir = storeDir;
this.neo4jConfig = new Config( stringMap(
this.neo4jConfig = new Config( stringMap( dbConfig.getParams(),
dense_node_threshold.name(), valueOf( config.denseNodeThreshold() ),
pagecache_memory.name(), valueOf( config.writeBufferSize() ) ),
GraphDatabaseSettings.class );
Expand Down Expand Up @@ -166,9 +166,10 @@ private boolean alreadyContainsData( NeoStores neoStores )
* Useful for store migration where the {@link ParallelBatchImporter} is used as migrator and some of
* its data need to be communicated by copying a store file.
*/
public static void createStore( FileSystemAbstraction fileSystem, String storeDir ) throws IOException
public static void createStore( FileSystemAbstraction fileSystem, String storeDir, Config dbConfig )
throws IOException
{
try ( PageCache pageCache = createPageCache( fileSystem, new Config(), NullLogProvider.getInstance(),
try ( PageCache pageCache = createPageCache( fileSystem, dbConfig, NullLogProvider.getInstance(),
PageCacheTracer.NULL ) )
{
StoreFactory storeFactory =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.GraphDatabaseAPI;
import org.neo4j.kernel.api.ReadOperations;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.logging.NullLogService;
import org.neo4j.kernel.impl.storageengine.impl.recordstorage.RecordStorageEngine;
import org.neo4j.kernel.impl.store.NeoStores;
Expand Down Expand Up @@ -87,7 +88,7 @@ public void shouldImportDataComingFromCsvFiles() throws Exception
{
// GIVEN
BatchImporter importer = new ParallelBatchImporter( directory.graphDbDir(),
smallBatchSizeConfig(), NullLogService.getInstance(), invisible() );
smallBatchSizeConfig(), NullLogService.getInstance(), invisible(), new Config() );
List<InputNode> nodeData = randomNodeData();
List<InputRelationship> relationshipData = randomRelationshipData( nodeData );

Expand Down
Loading

0 comments on commit 9d58e54

Please sign in to comment.