Skip to content

Commit

Permalink
Better lifecycle and more readable DataImporter
Browse files Browse the repository at this point in the history
  • Loading branch information
tinwelint committed Jan 10, 2018
1 parent 36d2b82 commit 353f123
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 35 deletions.
Expand Up @@ -69,7 +69,6 @@
*/
public class DataImporter
{
public static final String ID_PROPERTY = "__id";
public static final String NODE_IMPORT_NAME = "Nodes";
public static final String RELATIONSHIP_IMPORT_NAME = "Relationships";

Expand Down Expand Up @@ -138,30 +137,33 @@ private static long importData( String title, int numRunners, InputIterable data
ControllableStep step = new ControllableStep( title, roughEntityCountProgress, Configuration.DEFAULT,
writeMonitor, memoryStatsProvider );
StageExecution execution = new StageExecution( title, null, Configuration.DEFAULT, Collections.singletonList( step ), 0 );
InputIterator dataIterator = data.iterator();
for ( int i = 0; i < numRunners; i++ )
{
pool.submit( new ExhaustingEntityImporterRunnable(
execution, dataIterator, visitors.get(), roughEntityCountProgress ) );
}
pool.shutdown();

executionMonitor.start( execution );
long startTime = currentTimeMillis();
long nextWait = 0;
try
try ( InputIterator dataIterator = data.iterator() )
{
while ( !pool.awaitTermination( nextWait, TimeUnit.MILLISECONDS ) )
for ( int i = 0; i < numRunners; i++ )
{
executionMonitor.check( execution );
nextWait = executionMonitor.nextCheckTime() - currentTimeMillis();
pool.submit( new ExhaustingEntityImporterRunnable(
execution, dataIterator, visitors.get(), roughEntityCountProgress ) );
}
pool.shutdown();

executionMonitor.start( execution );
long nextWait = 0;
try
{
while ( !pool.awaitTermination( nextWait, TimeUnit.MILLISECONDS ) )
{
executionMonitor.check( execution );
nextWait = executionMonitor.nextCheckTime() - currentTimeMillis();
}
}
catch ( InterruptedException e )
{
Thread.currentThread().interrupt();
throw new IOException( e );
}
}
catch ( InterruptedException e )
{
Thread.currentThread().interrupt();
throw new IOException( e );
}

execution.assertHealthy();
step.markAsCompleted();
writeMonitor.stop();
Expand All @@ -175,8 +177,9 @@ public static void importNodes( int numRunners, Input input, BatchingNeoStores s
ExecutionMonitor executionMonitor, Monitor monitor )
throws IOException
{
importData( NODE_IMPORT_NAME, numRunners, input.nodes(), stores, () ->
new NodeImporter( stores, idMapper, monitor ), executionMonitor, new MemoryUsageStatsProvider( stores, idMapper ) );
Supplier<EntityImporter> importers = () -> new NodeImporter( stores, idMapper, monitor );
importData( NODE_IMPORT_NAME, numRunners, input.nodes(), stores, importers, executionMonitor,
new MemoryUsageStatsProvider( stores, idMapper ) );
}

public static DataStatistics importRelationships( int numRunners, Input input,
Expand All @@ -185,9 +188,10 @@ public static DataStatistics importRelationships( int numRunners, Input input,
throws IOException
{
DataStatistics typeDistribution = new DataStatistics( monitor.nodes.sum(), monitor.properties.sum(), new RelationshipTypeCount[0] );
importData( RELATIONSHIP_IMPORT_NAME, numRunners, input.relationships(), stores, () ->
new RelationshipImporter( stores, idMapper, typeDistribution, monitor, badCollector, validateRelationshipData,
stores.usesDoubleRelationshipRecordUnits() ), executionMonitor, new MemoryUsageStatsProvider( stores, idMapper ) );
Supplier<EntityImporter> importers = () -> new RelationshipImporter( stores, idMapper, typeDistribution, monitor,
badCollector, validateRelationshipData, stores.usesDoubleRelationshipRecordUnits() );
importData( RELATIONSHIP_IMPORT_NAME, numRunners, input.relationships(), stores, importers, executionMonitor,
new MemoryUsageStatsProvider( stores, idMapper ) );
return typeDistribution;
}

Expand Down
Expand Up @@ -19,8 +19,6 @@
*/
package org.neo4j.unsafe.impl.batchimport;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.concurrent.atomic.LongAdder;

import org.neo4j.unsafe.impl.batchimport.input.InputChunk;
Expand Down Expand Up @@ -72,14 +70,6 @@ public void run()
finally
{
visitor.close();
try
{
data.close();
}
catch ( IOException e )
{
throw new UncheckedIOException( e );
}
}
}
}

0 comments on commit 353f123

Please sign in to comment.