From 1d22edc530128533daaa4383ad9ea3a0a0e05714 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mattias=20Finn=C3=A9?= Date: Fri, 5 Jan 2018 14:14:10 +0100 Subject: [PATCH] Pluggable batch importer Loaded using service loading. Since importer logic moved into ImportLogic and ParallelBatchImporter is a thin layer using it, this service loading enables other implementations using that same base --- .../java/org/neo4j/tooling/ImportTool.java | 4 +- .../java/org/neo4j/tooling/QuickImport.java | 3 +- .../participant/StoreMigrator.java | 2 + .../batchimport/BatchImporterFactory.java | 64 +++++++++++++++++++ .../batchimport/ParallelBatchImporter.java | 6 +- .../StandardBatchImporterFactory.java | 48 ++++++++++++++ ...safe.impl.batchimport.BatchImporterFactory | 1 + 7 files changed, 122 insertions(+), 6 deletions(-) create mode 100644 community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/BatchImporterFactory.java create mode 100644 community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/StandardBatchImporterFactory.java create mode 100644 community/kernel/src/main/resources/META-INF/services/org.neo4j.unsafe.impl.batchimport.BatchImporterFactory diff --git a/community/import-tool/src/main/java/org/neo4j/tooling/ImportTool.java b/community/import-tool/src/main/java/org/neo4j/tooling/ImportTool.java index 4e7970e509122..011ec6cc430d4 100644 --- a/community/import-tool/src/main/java/org/neo4j/tooling/ImportTool.java +++ b/community/import-tool/src/main/java/org/neo4j/tooling/ImportTool.java @@ -59,7 +59,7 @@ import org.neo4j.kernel.internal.Version; import org.neo4j.kernel.lifecycle.LifeSupport; import org.neo4j.unsafe.impl.batchimport.BatchImporter; -import org.neo4j.unsafe.impl.batchimport.ParallelBatchImporter; +import org.neo4j.unsafe.impl.batchimport.BatchImporterFactory; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.string.DuplicateInputIdException; import org.neo4j.unsafe.impl.batchimport.input.BadCollector; import org.neo4j.unsafe.impl.batchimport.input.Collector; @@ -556,7 +556,7 @@ public static void doImport( PrintStream out, PrintStream err, InputStream in, F LogService logService = life.add( StoreLogService.withInternalLog( internalLogFile ).build( fs ) ); life.start(); - BatchImporter importer = new ParallelBatchImporter( storeDir, + BatchImporter importer = BatchImporterFactory.withHighestPriority().instantiate( storeDir, fs, null, // no external page cache configuration, diff --git a/community/import-tool/src/test/java/org/neo4j/tooling/QuickImport.java b/community/import-tool/src/test/java/org/neo4j/tooling/QuickImport.java index 010e8286fda50..f9c6a97f2cb73 100644 --- a/community/import-tool/src/test/java/org/neo4j/tooling/QuickImport.java +++ b/community/import-tool/src/test/java/org/neo4j/tooling/QuickImport.java @@ -37,6 +37,7 @@ import org.neo4j.logging.LogProvider; import org.neo4j.logging.NullLogProvider; import org.neo4j.unsafe.impl.batchimport.BatchImporter; +import org.neo4j.unsafe.impl.batchimport.BatchImporterFactory; import org.neo4j.unsafe.impl.batchimport.ParallelBatchImporter; import org.neo4j.unsafe.impl.batchimport.input.Collector; import org.neo4j.unsafe.impl.batchimport.input.DataGeneratorInput; @@ -160,7 +161,7 @@ public long maxMemoryUsage() } else { - consumer = new ParallelBatchImporter( dir, fileSystem, null, importConfig, + consumer = BatchImporterFactory.withHighestPriority().instantiate( dir, fileSystem, null, importConfig, new SimpleLogService( logging, logging ), defaultVisible(), EMPTY, dbConfig, RecordFormatSelector.selectForConfig( dbConfig, logging ), NO_MONITOR ); ImportTool.printOverview( dir, Collections.emptyList(), Collections.emptyList(), importConfig, System.out ); diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/participant/StoreMigrator.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/participant/StoreMigrator.java index 766a4528058f3..cb41727e05e86 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/participant/StoreMigrator.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/storemigration/participant/StoreMigrator.java @@ -371,6 +371,8 @@ public boolean parallelRecordReadsWhenWriting() readAdditionalIds( lastTxId, lastTxChecksum, lastTxLogVersion, lastTxLogByteOffset ); // We have to make sure to keep the token ids if we're migrating properties/labels + // Use the standard importer, instead of loading from BatchImporterFactory because an aborted store migration + // will be restarted from scratch anyway BatchImporter importer = new ParallelBatchImporter( migrationDir.getAbsoluteFile(), fileSystem, pageCache, importConfig, logService, withDynamicProcessorAssignment( migrationBatchImporterMonitor( legacyStore, progressReporter, diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/BatchImporterFactory.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/BatchImporterFactory.java new file mode 100644 index 0000000000000..52802fbc7b57c --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/BatchImporterFactory.java @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport; + +import java.io.File; +import java.util.NoSuchElementException; + +import org.neo4j.helpers.Service; +import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.io.pagecache.PageCache; +import org.neo4j.kernel.configuration.Config; +import org.neo4j.kernel.impl.logging.LogService; +import org.neo4j.kernel.impl.store.format.RecordFormats; +import org.neo4j.unsafe.impl.batchimport.staging.ExecutionMonitor; + +public abstract class BatchImporterFactory extends Service +{ + private final int priority; + + protected BatchImporterFactory( String key, int priority ) + { + super( key ); + this.priority = priority; + } + + public abstract BatchImporter instantiate( File storeDir, FileSystemAbstraction fileSystem, PageCache externalPageCache, + Configuration config, LogService logService, ExecutionMonitor executionMonitor, + AdditionalInitialIds additionalInitialIds, Config dbConfig, RecordFormats recordFormats, ImportLogic.Monitor monitor ); + + public static BatchImporterFactory withHighestPriority() + { + Iterable candidates = Service.load( BatchImporterFactory.class ); + BatchImporterFactory highestPrioritized = null; + for ( BatchImporterFactory candidate : candidates ) + { + if ( highestPrioritized == null || candidate.priority > highestPrioritized.priority ) + { + highestPrioritized = candidate; + } + } + if ( highestPrioritized == null ) + { + throw new NoSuchElementException( "No batch importers found" ); + } + return highestPrioritized; + } +} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java index 83b8c5938615f..174cf78634bb1 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java @@ -27,7 +27,7 @@ import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.impl.logging.LogService; import org.neo4j.kernel.impl.store.format.RecordFormats; -import org.neo4j.unsafe.impl.batchimport.ImportLogic.Monitor; +import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.unsafe.impl.batchimport.input.Input; import org.neo4j.unsafe.impl.batchimport.staging.ExecutionMonitor; import org.neo4j.unsafe.impl.batchimport.store.BatchingNeoStores; @@ -44,7 +44,7 @@ * Goes through multiple stages where each stage has one or more steps executing in parallel, passing * batches between these steps through each stage, i.e. passing batches downstream. */ -public class ParallelBatchImporter implements BatchImporter +public class ParallelBatchImporter extends LifecycleAdapter implements BatchImporter { private final PageCache externalPageCache; private final File storeDir; @@ -55,7 +55,7 @@ public class ParallelBatchImporter implements BatchImporter private final RecordFormats recordFormats; private final ExecutionMonitor executionMonitor; private final AdditionalInitialIds additionalInitialIds; - private final Monitor monitor; + private final ImportLogic.Monitor monitor; public ParallelBatchImporter( File storeDir, FileSystemAbstraction fileSystem, PageCache externalPageCache, Configuration config, LogService logService, ExecutionMonitor executionMonitor, diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/StandardBatchImporterFactory.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/StandardBatchImporterFactory.java new file mode 100644 index 0000000000000..26929ab790ef9 --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/StandardBatchImporterFactory.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport; + +import java.io.File; + +import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.io.pagecache.PageCache; +import org.neo4j.kernel.configuration.Config; +import org.neo4j.kernel.impl.logging.LogService; +import org.neo4j.kernel.impl.store.format.RecordFormats; +import org.neo4j.unsafe.impl.batchimport.staging.ExecutionMonitor; + +public class StandardBatchImporterFactory extends BatchImporterFactory +{ + public static final String NAME = "standard"; + + public StandardBatchImporterFactory() + { + super( NAME, 1 ); + } + + @Override + public BatchImporter instantiate( File storeDir, FileSystemAbstraction fileSystem, PageCache externalPageCache, Configuration config, + LogService logService, ExecutionMonitor executionMonitor, AdditionalInitialIds additionalInitialIds, Config dbConfig, + RecordFormats recordFormats, ImportLogic.Monitor monitor ) + { + return new ParallelBatchImporter( storeDir, fileSystem, externalPageCache, config, logService, executionMonitor, + additionalInitialIds, dbConfig, recordFormats, monitor ); + } +} diff --git a/community/kernel/src/main/resources/META-INF/services/org.neo4j.unsafe.impl.batchimport.BatchImporterFactory b/community/kernel/src/main/resources/META-INF/services/org.neo4j.unsafe.impl.batchimport.BatchImporterFactory new file mode 100644 index 0000000000000..58102ea36c281 --- /dev/null +++ b/community/kernel/src/main/resources/META-INF/services/org.neo4j.unsafe.impl.batchimport.BatchImporterFactory @@ -0,0 +1 @@ +org.neo4j.unsafe.impl.batchimport.StandardBatchImporterFactory