Skip to content

Commit

Permalink
Pluggable batch importer
Browse files Browse the repository at this point in the history
Loaded using service loading. Since importer logic moved into ImportLogic and
ParallelBatchImporter is a thin layer using it, this service loading enables
other implementations using that same base
  • Loading branch information
tinwelint committed Jan 12, 2018
1 parent ed2de02 commit 1d22edc
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 6 deletions.
Expand Up @@ -59,7 +59,7 @@
import org.neo4j.kernel.internal.Version;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.unsafe.impl.batchimport.BatchImporter;
import org.neo4j.unsafe.impl.batchimport.ParallelBatchImporter;
import org.neo4j.unsafe.impl.batchimport.BatchImporterFactory;
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.string.DuplicateInputIdException;
import org.neo4j.unsafe.impl.batchimport.input.BadCollector;
import org.neo4j.unsafe.impl.batchimport.input.Collector;
Expand Down Expand Up @@ -556,7 +556,7 @@ public static void doImport( PrintStream out, PrintStream err, InputStream in, F
LogService logService = life.add( StoreLogService.withInternalLog( internalLogFile ).build( fs ) );

life.start();
BatchImporter importer = new ParallelBatchImporter( storeDir,
BatchImporter importer = BatchImporterFactory.withHighestPriority().instantiate( storeDir,
fs,
null, // no external page cache
configuration,
Expand Down
Expand Up @@ -37,6 +37,7 @@
import org.neo4j.logging.LogProvider;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.unsafe.impl.batchimport.BatchImporter;
import org.neo4j.unsafe.impl.batchimport.BatchImporterFactory;
import org.neo4j.unsafe.impl.batchimport.ParallelBatchImporter;
import org.neo4j.unsafe.impl.batchimport.input.Collector;
import org.neo4j.unsafe.impl.batchimport.input.DataGeneratorInput;
Expand Down Expand Up @@ -160,7 +161,7 @@ public long maxMemoryUsage()
}
else
{
consumer = new ParallelBatchImporter( dir, fileSystem, null, importConfig,
consumer = BatchImporterFactory.withHighestPriority().instantiate( dir, fileSystem, null, importConfig,
new SimpleLogService( logging, logging ), defaultVisible(), EMPTY, dbConfig,
RecordFormatSelector.selectForConfig( dbConfig, logging ), NO_MONITOR );
ImportTool.printOverview( dir, Collections.emptyList(), Collections.emptyList(), importConfig, System.out );
Expand Down
Expand Up @@ -371,6 +371,8 @@ public boolean parallelRecordReadsWhenWriting()
readAdditionalIds( lastTxId, lastTxChecksum, lastTxLogVersion, lastTxLogByteOffset );

// We have to make sure to keep the token ids if we're migrating properties/labels
// Use the standard importer, instead of loading from BatchImporterFactory because an aborted store migration
// will be restarted from scratch anyway
BatchImporter importer = new ParallelBatchImporter( migrationDir.getAbsoluteFile(), fileSystem, pageCache,
importConfig, logService,
withDynamicProcessorAssignment( migrationBatchImporterMonitor( legacyStore, progressReporter,
Expand Down
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.unsafe.impl.batchimport;

import java.io.File;
import java.util.NoSuchElementException;

import org.neo4j.helpers.Service;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.store.format.RecordFormats;
import org.neo4j.unsafe.impl.batchimport.staging.ExecutionMonitor;

public abstract class BatchImporterFactory extends Service
{
private final int priority;

protected BatchImporterFactory( String key, int priority )
{
super( key );
this.priority = priority;
}

public abstract BatchImporter instantiate( File storeDir, FileSystemAbstraction fileSystem, PageCache externalPageCache,
Configuration config, LogService logService, ExecutionMonitor executionMonitor,
AdditionalInitialIds additionalInitialIds, Config dbConfig, RecordFormats recordFormats, ImportLogic.Monitor monitor );

public static BatchImporterFactory withHighestPriority()
{
Iterable<BatchImporterFactory> candidates = Service.load( BatchImporterFactory.class );
BatchImporterFactory highestPrioritized = null;
for ( BatchImporterFactory candidate : candidates )
{
if ( highestPrioritized == null || candidate.priority > highestPrioritized.priority )
{
highestPrioritized = candidate;
}
}
if ( highestPrioritized == null )
{
throw new NoSuchElementException( "No batch importers found" );
}
return highestPrioritized;
}
}
Expand Up @@ -27,7 +27,7 @@
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.store.format.RecordFormats;
import org.neo4j.unsafe.impl.batchimport.ImportLogic.Monitor;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.unsafe.impl.batchimport.input.Input;
import org.neo4j.unsafe.impl.batchimport.staging.ExecutionMonitor;
import org.neo4j.unsafe.impl.batchimport.store.BatchingNeoStores;
Expand All @@ -44,7 +44,7 @@
* Goes through multiple stages where each stage has one or more steps executing in parallel, passing
* batches between these steps through each stage, i.e. passing batches downstream.
*/
public class ParallelBatchImporter implements BatchImporter
public class ParallelBatchImporter extends LifecycleAdapter implements BatchImporter
{
private final PageCache externalPageCache;
private final File storeDir;
Expand All @@ -55,7 +55,7 @@ public class ParallelBatchImporter implements BatchImporter
private final RecordFormats recordFormats;
private final ExecutionMonitor executionMonitor;
private final AdditionalInitialIds additionalInitialIds;
private final Monitor monitor;
private final ImportLogic.Monitor monitor;

public ParallelBatchImporter( File storeDir, FileSystemAbstraction fileSystem, PageCache externalPageCache,
Configuration config, LogService logService, ExecutionMonitor executionMonitor,
Expand Down
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.unsafe.impl.batchimport;

import java.io.File;

import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.store.format.RecordFormats;
import org.neo4j.unsafe.impl.batchimport.staging.ExecutionMonitor;

public class StandardBatchImporterFactory extends BatchImporterFactory
{
public static final String NAME = "standard";

public StandardBatchImporterFactory()
{
super( NAME, 1 );
}

@Override
public BatchImporter instantiate( File storeDir, FileSystemAbstraction fileSystem, PageCache externalPageCache, Configuration config,
LogService logService, ExecutionMonitor executionMonitor, AdditionalInitialIds additionalInitialIds, Config dbConfig,
RecordFormats recordFormats, ImportLogic.Monitor monitor )
{
return new ParallelBatchImporter( storeDir, fileSystem, externalPageCache, config, logService, executionMonitor,
additionalInitialIds, dbConfig, recordFormats, monitor );
}
}
@@ -0,0 +1 @@
org.neo4j.unsafe.impl.batchimport.StandardBatchImporterFactory

0 comments on commit 1d22edc

Please sign in to comment.