From 353f12354ee85d37d3096cbb6ce89d74490a8b20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mattias=20Finn=C3=A9?= Date: Wed, 10 Jan 2018 16:42:51 +0100 Subject: [PATCH] Better lifecycle and more readable DataImporter --- .../unsafe/impl/batchimport/DataImporter.java | 54 ++++++++++--------- .../ExhaustingEntityImporterRunnable.java | 10 ---- 2 files changed, 29 insertions(+), 35 deletions(-) diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/DataImporter.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/DataImporter.java index a7f3779f304c2..96288ef2ea099 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/DataImporter.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/DataImporter.java @@ -69,7 +69,6 @@ */ public class DataImporter { - public static final String ID_PROPERTY = "__id"; public static final String NODE_IMPORT_NAME = "Nodes"; public static final String RELATIONSHIP_IMPORT_NAME = "Relationships"; @@ -138,30 +137,33 @@ private static long importData( String title, int numRunners, InputIterable data ControllableStep step = new ControllableStep( title, roughEntityCountProgress, Configuration.DEFAULT, writeMonitor, memoryStatsProvider ); StageExecution execution = new StageExecution( title, null, Configuration.DEFAULT, Collections.singletonList( step ), 0 ); - InputIterator dataIterator = data.iterator(); - for ( int i = 0; i < numRunners; i++ ) - { - pool.submit( new ExhaustingEntityImporterRunnable( - execution, dataIterator, visitors.get(), roughEntityCountProgress ) ); - } - pool.shutdown(); - - executionMonitor.start( execution ); long startTime = currentTimeMillis(); - long nextWait = 0; - try + try ( InputIterator dataIterator = data.iterator() ) { - while ( !pool.awaitTermination( nextWait, TimeUnit.MILLISECONDS ) ) + for ( int i = 0; i < numRunners; i++ ) { - executionMonitor.check( execution ); - nextWait = executionMonitor.nextCheckTime() - currentTimeMillis(); + pool.submit( new ExhaustingEntityImporterRunnable( + execution, dataIterator, visitors.get(), roughEntityCountProgress ) ); + } + pool.shutdown(); + + executionMonitor.start( execution ); + long nextWait = 0; + try + { + while ( !pool.awaitTermination( nextWait, TimeUnit.MILLISECONDS ) ) + { + executionMonitor.check( execution ); + nextWait = executionMonitor.nextCheckTime() - currentTimeMillis(); + } + } + catch ( InterruptedException e ) + { + Thread.currentThread().interrupt(); + throw new IOException( e ); } } - catch ( InterruptedException e ) - { - Thread.currentThread().interrupt(); - throw new IOException( e ); - } + execution.assertHealthy(); step.markAsCompleted(); writeMonitor.stop(); @@ -175,8 +177,9 @@ public static void importNodes( int numRunners, Input input, BatchingNeoStores s ExecutionMonitor executionMonitor, Monitor monitor ) throws IOException { - importData( NODE_IMPORT_NAME, numRunners, input.nodes(), stores, () -> - new NodeImporter( stores, idMapper, monitor ), executionMonitor, new MemoryUsageStatsProvider( stores, idMapper ) ); + Supplier importers = () -> new NodeImporter( stores, idMapper, monitor ); + importData( NODE_IMPORT_NAME, numRunners, input.nodes(), stores, importers, executionMonitor, + new MemoryUsageStatsProvider( stores, idMapper ) ); } public static DataStatistics importRelationships( int numRunners, Input input, @@ -185,9 +188,10 @@ public static DataStatistics importRelationships( int numRunners, Input input, throws IOException { DataStatistics typeDistribution = new DataStatistics( monitor.nodes.sum(), monitor.properties.sum(), new RelationshipTypeCount[0] ); - importData( RELATIONSHIP_IMPORT_NAME, numRunners, input.relationships(), stores, () -> - new RelationshipImporter( stores, idMapper, typeDistribution, monitor, badCollector, validateRelationshipData, - stores.usesDoubleRelationshipRecordUnits() ), executionMonitor, new MemoryUsageStatsProvider( stores, idMapper ) ); + Supplier importers = () -> new RelationshipImporter( stores, idMapper, typeDistribution, monitor, + badCollector, validateRelationshipData, stores.usesDoubleRelationshipRecordUnits() ); + importData( RELATIONSHIP_IMPORT_NAME, numRunners, input.relationships(), stores, importers, executionMonitor, + new MemoryUsageStatsProvider( stores, idMapper ) ); return typeDistribution; } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ExhaustingEntityImporterRunnable.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ExhaustingEntityImporterRunnable.java index 8ad1f06689cbf..c73b30f4324d4 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ExhaustingEntityImporterRunnable.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ExhaustingEntityImporterRunnable.java @@ -19,8 +19,6 @@ */ package org.neo4j.unsafe.impl.batchimport; -import java.io.IOException; -import java.io.UncheckedIOException; import java.util.concurrent.atomic.LongAdder; import org.neo4j.unsafe.impl.batchimport.input.InputChunk; @@ -72,14 +70,6 @@ public void run() finally { visitor.close(); - try - { - data.close(); - } - catch ( IOException e ) - { - throw new UncheckedIOException( e ); - } } } }