Skip to content

Commit

Permalink
Shows total number of imported entities and properties after import
Browse files Browse the repository at this point in the history
also prints available machine/heap memory before import starts.
  • Loading branch information
tinwelint committed May 26, 2015
1 parent ecc1aa1 commit 4bb01c9
Show file tree
Hide file tree
Showing 18 changed files with 154 additions and 44 deletions.
Expand Up @@ -19,8 +19,6 @@
*/ */
package org.neo4j.graphalgo.path; package org.neo4j.graphalgo.path;


import org.junit.Ignore;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;


import org.neo4j.graphalgo.GraphAlgoFactory; import org.neo4j.graphalgo.GraphAlgoFactory;
Expand All @@ -33,9 +31,7 @@
import org.neo4j.graphdb.RelationshipExpander; import org.neo4j.graphdb.RelationshipExpander;
import org.neo4j.kernel.Traversal; import org.neo4j.kernel.Traversal;


import static org.junit.Assert.fail;
import common.Neo4jAlgoTestCase; import common.Neo4jAlgoTestCase;
import common.Neo4jAlgoTestCase.MyRelTypes;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;


public class TestExactDepthPathFinder extends Neo4jAlgoTestCase public class TestExactDepthPathFinder extends Neo4jAlgoTestCase
Expand Down
Expand Up @@ -65,9 +65,11 @@


import static org.neo4j.graphdb.factory.GraphDatabaseSettings.store_dir; import static org.neo4j.graphdb.factory.GraphDatabaseSettings.store_dir;
import static org.neo4j.helpers.Exceptions.launderedException; import static org.neo4j.helpers.Exceptions.launderedException;
import static org.neo4j.helpers.Format.bytes;
import static org.neo4j.helpers.collection.MapUtil.stringMap; import static org.neo4j.helpers.collection.MapUtil.stringMap;
import static org.neo4j.kernel.impl.util.Converters.withDefault; import static org.neo4j.kernel.impl.util.Converters.withDefault;
import static org.neo4j.unsafe.impl.batchimport.Configuration.BAD_FILE_NAME; import static org.neo4j.unsafe.impl.batchimport.Configuration.BAD_FILE_NAME;
import static org.neo4j.unsafe.impl.batchimport.cache.AvailableMemoryCalculator.RUNTIME;
import static org.neo4j.unsafe.impl.batchimport.input.Collectors.badCollector; import static org.neo4j.unsafe.impl.batchimport.input.Collectors.badCollector;
import static org.neo4j.unsafe.impl.batchimport.input.Collectors.collect; import static org.neo4j.unsafe.impl.batchimport.input.Collectors.collect;
import static org.neo4j.unsafe.impl.batchimport.input.InputEntityDecorators.NO_NODE_DECORATOR; import static org.neo4j.unsafe.impl.batchimport.input.InputEntityDecorators.NO_NODE_DECORATOR;
Expand Down Expand Up @@ -312,7 +314,7 @@ idType, csvConfiguration( args, defaultSettingsSuitableForTests ),
config, config,
logging, logging,
ExecutionMonitors.defaultVisible() ); ExecutionMonitors.defaultVisible() );
printInputSummary( storeDir, nodesFiles, relationshipsFiles ); printOverview( storeDir, nodesFiles, relationshipsFiles );
boolean success = false; boolean success = false;
try try
{ {
Expand Down Expand Up @@ -353,12 +355,17 @@ idType, csvConfiguration( args, defaultSettingsSuitableForTests ),
} }
} }


private static void printInputSummary( File storeDir, Collection<Option<File[]>> nodesFiles, private static void printOverview( File storeDir, Collection<Option<File[]>> nodesFiles,
Collection<Option<File[]>> relationshipsFiles ) Collection<Option<File[]>> relationshipsFiles )
{ {
System.out.println( "Importing the contents of these files into " + storeDir + ":" ); System.out.println( "Importing the contents of these files into " + storeDir + ":" );
printInputFiles( "Nodes", nodesFiles ); printInputFiles( "Nodes", nodesFiles );
printInputFiles( "Relationships", relationshipsFiles ); printInputFiles( "Relationships", relationshipsFiles );
System.out.println();
System.out.println( "Available memory:" );
printIndented( "Free machine memory: " + bytes( RUNTIME.availableOffHeapMemory() ) );
printIndented( "Max heap memory : " + bytes( Runtime.getRuntime().maxMemory() ) );
System.out.println();
} }


private static void printInputFiles( String name, Collection<Option<File[]>> files ) private static void printInputFiles( String name, Collection<Option<File[]>> files )
Expand All @@ -370,7 +377,6 @@ private static void printInputFiles( String name, Collection<Option<File[]>> fil


System.out.println( name + ":" ); System.out.println( name + ":" );
int i = 0; int i = 0;
String indent = " ";
for ( Option<File[]> group : files ) for ( Option<File[]> group : files )
{ {
if ( i++ > 0 ) if ( i++ > 0 )
Expand All @@ -379,14 +385,18 @@ private static void printInputFiles( String name, Collection<Option<File[]>> fil
} }
if ( group.metadata() != null ) if ( group.metadata() != null )
{ {
System.out.println( indent + ":" + group.metadata() ); printIndented( ":" + group.metadata() );
} }
for ( File file : group.value() ) for ( File file : group.value() )
{ {
System.out.println( indent + file ); printIndented( file );
} }
} }
System.out.println(); }

private static void printIndented( Object value )
{
System.out.println( " " + value );
} }


private static void validateInputFiles( Collection<Option<File[]>> nodesFiles, private static void validateInputFiles( Collection<Option<File[]>> nodesFiles,
Expand Down
@@ -0,0 +1,80 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.unsafe.impl.batchimport;

import org.neo4j.kernel.impl.store.record.NodeRecord;
import org.neo4j.kernel.impl.store.record.PrimitiveRecord;
import org.neo4j.kernel.impl.store.record.RelationshipRecord;

import static java.lang.String.format;

/**
* Simply counts all written entities and properties and can present totals in the end.
*/
public class CountingStoreUpdateMonitor implements EntityStoreUpdaterStep.Monitor
{
private long nodes;
private long relationships;
private long properties;

@Override
public void entitiesWritten( Class<? extends PrimitiveRecord> type, long count )
{
if ( type.equals( NodeRecord.class ) )
{
nodes += count;
}
else if ( type.equals( RelationshipRecord.class ) )
{
relationships += count;
}
else
{
throw new IllegalArgumentException( type.getName() );
}
}

@Override
public void propertiesWritten( long count )
{
properties += count;
}

public long propertiesWritten()
{
return properties;
}

public long nodesWritten()
{
return nodes;
}

public long relationshipsWritten()
{
return relationships;
}

@Override
public String toString()
{
return format( "Imported:%n %d nodes%n %d relationships%n %d properties", nodes, relationships, properties );
}
}
Expand Up @@ -52,27 +52,37 @@
public class EntityStoreUpdaterStep<RECORD extends PrimitiveRecord,INPUT extends InputEntity> public class EntityStoreUpdaterStep<RECORD extends PrimitiveRecord,INPUT extends InputEntity>
extends ProcessorStep<Batch<INPUT,RECORD>> extends ProcessorStep<Batch<INPUT,RECORD>>
{ {
public interface Monitor
{
void entitiesWritten( Class<? extends PrimitiveRecord> type, long count );

void propertiesWritten( long count );
}

private final AbstractRecordStore<RECORD> entityStore; private final AbstractRecordStore<RECORD> entityStore;
private final PropertyStore propertyStore; private final PropertyStore propertyStore;
private final IoMonitor monitor; private final IoMonitor ioMonitor;
private final WriterFactory writerFactory; private final WriterFactory writerFactory;
private final PropertyCreator propertyCreator; private final PropertyCreator propertyCreator;
private final Monitor monitor;


// Reusable instances for less GC // Reusable instances for less GC
private final BatchingPropertyRecordAccess propertyRecords = new BatchingPropertyRecordAccess(); private final BatchingPropertyRecordAccess propertyRecords = new BatchingPropertyRecordAccess();
private final ReusableIteratorCostume<PropertyBlock> blockIterator = new ReusableIteratorCostume<>(); private final ReusableIteratorCostume<PropertyBlock> blockIterator = new ReusableIteratorCostume<>();


EntityStoreUpdaterStep( StageControl control, Configuration config, EntityStoreUpdaterStep( StageControl control, Configuration config,
AbstractRecordStore<RECORD> entityStore, AbstractRecordStore<RECORD> entityStore,
PropertyStore propertyStore, IoMonitor monitor, WriterFactory writerFactory ) PropertyStore propertyStore, IoMonitor ioMonitor, WriterFactory writerFactory,
Monitor monitor )
{ {
super( control, "v", config, 1, monitor ); super( control, "v", config, 1, ioMonitor );
this.entityStore = entityStore; this.entityStore = entityStore;
this.propertyStore = propertyStore; this.propertyStore = propertyStore;
this.writerFactory = writerFactory; this.writerFactory = writerFactory;
this.propertyCreator = new PropertyCreator( propertyStore, null );
this.monitor = monitor; this.monitor = monitor;
this.monitor.reset(); this.propertyCreator = new PropertyCreator( propertyStore, null );
this.ioMonitor = ioMonitor;
this.ioMonitor.reset();
} }


@Override @Override
Expand All @@ -84,7 +94,12 @@ protected void process( Batch<INPUT,RECORD> batch, BatchSender sender )
// Write the entity records, and at the same time allocate property records for its property blocks. // Write the entity records, and at the same time allocate property records for its property blocks.
long highestId = 0; long highestId = 0;
RECORD[] records = batch.records; RECORD[] records = batch.records;
int propertyBlockCursor = 0; if ( records.length == 0 )
{
return;
}

int propertyBlockCursor = 0, skipped = 0;
for ( int i = 0; i < records.length; i++ ) for ( int i = 0; i < records.length; i++ )
{ {
RECORD record = records[i]; RECORD record = records[i];
Expand Down Expand Up @@ -114,6 +129,7 @@ protected void process( Batch<INPUT,RECORD> batch, BatchSender sender )
else else
{ // Here we have a relationship that refers to missing nodes. It's within the tolerance levels { // Here we have a relationship that refers to missing nodes. It's within the tolerance levels
// of number of bad relationships. Just don't import this relationship. // of number of bad relationships. Just don't import this relationship.
skipped++;
} }
propertyBlockCursor += propertyBlockCount; propertyBlockCursor += propertyBlockCount;
} }
Expand All @@ -124,6 +140,9 @@ protected void process( Batch<INPUT,RECORD> batch, BatchSender sender )
{ {
propertyStore.updateRecord( propertyRecord ); propertyStore.updateRecord( propertyRecord );
} }

monitor.entitiesWritten( records[0].getClass(), records.length-skipped );
monitor.propertiesWritten( propertyBlockCursor );
} }


private void reassignDynamicRecordIds( PropertyBlock[] blocks, int offset, int length ) private void reassignDynamicRecordIds( PropertyBlock[] blocks, int offset, int length )
Expand Down Expand Up @@ -172,7 +191,7 @@ protected void done()
// Stop the I/O monitor, since the stats in there is based on time passed since the start // Stop the I/O monitor, since the stats in there is based on time passed since the start
// and bytes written. NodeStage and CalculateDenseNodesStage can be run in parallel so if // and bytes written. NodeStage and CalculateDenseNodesStage can be run in parallel so if
// NodeStage completes before CalculateDenseNodesStage then we want to stop the time in the I/O monitor. // NodeStage completes before CalculateDenseNodesStage then we want to stop the time in the I/O monitor.
monitor.stop(); ioMonitor.stop();
} }


// Below we override the "parallizable" methods to go directly towards the I/O writer, since // Below we override the "parallizable" methods to go directly towards the I/O writer, since
Expand Down
Expand Up @@ -46,6 +46,7 @@ public class NodeStage extends Stage
public NodeStage( Configuration config, IoMonitor writeMonitor, WriterFactory writerFactory, public NodeStage( Configuration config, IoMonitor writeMonitor, WriterFactory writerFactory,
InputIterable<InputNode> nodes, IdMapper idMapper, IdGenerator idGenerator, InputIterable<InputNode> nodes, IdMapper idMapper, IdGenerator idGenerator,
BatchingNeoStore neoStore, InputCache inputCache, LabelScanStore labelScanStore, BatchingNeoStore neoStore, InputCache inputCache, LabelScanStore labelScanStore,
EntityStoreUpdaterStep.Monitor storeUpdateMonitor,
StatsProvider memoryUsage ) throws IOException StatsProvider memoryUsage ) throws IOException
{ {
super( "Nodes", config, ORDER_SEND_DOWNSTREAM ); super( "Nodes", config, ORDER_SEND_DOWNSTREAM );
Expand All @@ -63,6 +64,6 @@ public NodeStage( Configuration config, IoMonitor writeMonitor, WriterFactory wr
neoStore.getLabelRepository(), nodeStore, memoryUsage ) ); neoStore.getLabelRepository(), nodeStore, memoryUsage ) );
add( new LabelScanStorePopulationStep( control(), config, labelScanStore ) ); add( new LabelScanStorePopulationStep( control(), config, labelScanStore ) );
add( new EntityStoreUpdaterStep<>( control(), config, nodeStore, propertyStore, add( new EntityStoreUpdaterStep<>( control(), config, nodeStore, propertyStore,
writeMonitor, writerFactory ) ); writeMonitor, writerFactory, storeUpdateMonitor ) );
} }
} }
Expand Up @@ -125,6 +125,7 @@ public void doImport( Input input ) throws IOException
long startTime = currentTimeMillis(); long startTime = currentTimeMillis();
boolean hasBadEntries = false; boolean hasBadEntries = false;
File badFile = new File( storeDir, Configuration.BAD_FILE_NAME ); File badFile = new File( storeDir, Configuration.BAD_FILE_NAME );
CountingStoreUpdateMonitor storeUpdateMonitor = new CountingStoreUpdateMonitor();
try ( BatchingNeoStore neoStore = new BatchingNeoStore( fileSystem, storeDir, config, try ( BatchingNeoStore neoStore = new BatchingNeoStore( fileSystem, storeDir, config,
writeMonitor, logging, monitors, writerFactory, additionalInitialIds ); writeMonitor, logging, monitors, writerFactory, additionalInitialIds );
OutputStream badOutput = new BufferedOutputStream( fileSystem.openAsOutputStream( badFile, false ) ); OutputStream badOutput = new BufferedOutputStream( fileSystem.openAsOutputStream( badFile, false ) );
Expand All @@ -143,7 +144,8 @@ public void doImport( Input input ) throws IOException


// Stage 1 -- nodes, properties, labels // Stage 1 -- nodes, properties, labels
NodeStage nodeStage = new NodeStage( config, writeMonitor, writerFactory, NodeStage nodeStage = new NodeStage( config, writeMonitor, writerFactory,
nodes, idMapper, idGenerator, neoStore, inputCache, neoStore.getLabelScanStore(), memoryUsageStats ); nodes, idMapper, idGenerator, neoStore, inputCache, neoStore.getLabelScanStore(),
storeUpdateMonitor, memoryUsageStats );


// Stage 2 -- calculate dense node threshold // Stage 2 -- calculate dense node threshold
CalculateDenseNodesStage calculateDenseNodesStage = new CalculateDenseNodesStage( config, relationships, CalculateDenseNodesStage calculateDenseNodesStage = new CalculateDenseNodesStage( config, relationships,
Expand All @@ -169,7 +171,7 @@ public void doImport( Input input ) throws IOException
// Stage 3 -- relationships, properties // Stage 3 -- relationships, properties
final RelationshipStage relationshipStage = new RelationshipStage( config, writeMonitor, writerFactory, final RelationshipStage relationshipStage = new RelationshipStage( config, writeMonitor, writerFactory,
relationships.supportsMultiplePasses() ? relationships : inputCache.relationships(), relationships.supportsMultiplePasses() ? relationships : inputCache.relationships(),
idMapper, neoStore, nodeRelationshipCache, input.specificRelationshipIds() ); idMapper, neoStore, nodeRelationshipCache, input.specificRelationshipIds(), storeUpdateMonitor );
executeStages( relationshipStage ); executeStages( relationshipStage );
nodeRelationshipCache.fixateGroups(); nodeRelationshipCache.fixateGroups();


Expand Down Expand Up @@ -201,9 +203,10 @@ public void doImport( Input input ) throws IOException
neoStore.getRelationshipTypeRepository().getHighId(), countsUpdater, AUTO ) ); neoStore.getRelationshipTypeRepository().getHighId(), countsUpdater, AUTO ) );


// We're done, do some final logging about it // We're done, do some final logging about it
writerFactory.awaitEverythingWritten();
long totalTimeMillis = currentTimeMillis() - startTime; long totalTimeMillis = currentTimeMillis() - startTime;
executionMonitor.done( totalTimeMillis ); executionMonitor.done( totalTimeMillis, storeUpdateMonitor.toString() );
logger.info( "Import completed, took " + Format.duration( totalTimeMillis ) ); logger.info( "IMPORT DONE in " + Format.duration( totalTimeMillis ) + ". " + storeUpdateMonitor );
hasBadEntries = badCollector.badEntries() > 0; hasBadEntries = badCollector.badEntries() > 0;
if ( hasBadEntries ) if ( hasBadEntries )
{ {
Expand Down
Expand Up @@ -40,7 +40,8 @@ public class RelationshipStage extends Stage
{ {
public RelationshipStage( Configuration config, IoMonitor writeMonitor, WriterFactory writerFactory, public RelationshipStage( Configuration config, IoMonitor writeMonitor, WriterFactory writerFactory,
InputIterable<InputRelationship> relationships, IdMapper idMapper, InputIterable<InputRelationship> relationships, IdMapper idMapper,
BatchingNeoStore neoStore, NodeRelationshipCache cache, boolean specificIds ) BatchingNeoStore neoStore, NodeRelationshipCache cache, boolean specificIds,
EntityStoreUpdaterStep.Monitor storeUpdateMonitor )
{ {
super( "Relationships", config, ORDER_SEND_DOWNSTREAM | ORDER_PROCESS ); super( "Relationships", config, ORDER_SEND_DOWNSTREAM | ORDER_PROCESS );
add( new InputIteratorBatcherStep<>( control(), config, relationships.iterator(), InputRelationship.class ) ); add( new InputIteratorBatcherStep<>( control(), config, relationships.iterator(), InputRelationship.class ) );
Expand All @@ -53,6 +54,6 @@ public RelationshipStage( Configuration config, IoMonitor writeMonitor, WriterFa
add( new RelationshipEncoderStep( control(), config, add( new RelationshipEncoderStep( control(), config,
neoStore.getRelationshipTypeRepository(), cache, specificIds ) ); neoStore.getRelationshipTypeRepository(), cache, specificIds ) );
add( new EntityStoreUpdaterStep<>( control(), config, add( new EntityStoreUpdaterStep<>( control(), config,
relationshipStore, propertyStore, writeMonitor, writerFactory ) ); relationshipStore, propertyStore, writeMonitor, writerFactory, storeUpdateMonitor ) );
} }
} }
Expand Up @@ -31,7 +31,7 @@
* An {@link ExecutionMonitor} that prints progress in percent, knowing the max number of nodes and relationships * An {@link ExecutionMonitor} that prints progress in percent, knowing the max number of nodes and relationships
* in advance. * in advance.
*/ */
public class CoarseBoundedProgressExecutionMonitor extends ExecutionMonitor.Adpter public class CoarseBoundedProgressExecutionMonitor extends ExecutionMonitor.Adapter
{ {
private long totalDoneBatches; private long totalDoneBatches;
private final long highNodeId; private final long highNodeId;
Expand Down Expand Up @@ -104,7 +104,7 @@ public void end( StageExecution[] executions, long totalTimeMillis )
} }


@Override @Override
public void done( long totalTimeMillis ) public void done( long totalTimeMillis, String additionalInformation )
{ {
applyPercentage( 100 ); applyPercentage( 100 );
} }
Expand Down
Expand Up @@ -31,7 +31,7 @@
/** /**
* {@link ExecutionMonitor} that prints progress, e.g. a dot every N batches completed. * {@link ExecutionMonitor} that prints progress, e.g. a dot every N batches completed.
*/ */
public class CoarseUnboundedProgressExecutionMonitor extends ExecutionMonitor.Adpter public class CoarseUnboundedProgressExecutionMonitor extends ExecutionMonitor.Adapter
{ {
private int prevN = 0; private int prevN = 0;
private final int dotEveryN; private final int dotEveryN;
Expand Down Expand Up @@ -81,7 +81,7 @@ protected void progress()
} }


@Override @Override
public void done( long totalTimeMillis ) public void done( long totalTimeMillis, String additionalInformation )
{ {
out.println(); out.println();
} }
Expand Down
Expand Up @@ -34,7 +34,7 @@
* An {@link ExecutionMonitor} that prints very detailed information about each {@link Stage} and the * An {@link ExecutionMonitor} that prints very detailed information about each {@link Stage} and the
* {@link Step steps} therein. * {@link Step steps} therein.
*/ */
public class DetailedExecutionMonitor extends ExecutionMonitor.Adpter public class DetailedExecutionMonitor extends ExecutionMonitor.Adapter
{ {
private final PrintStream out; private final PrintStream out;


Expand Down
Expand Up @@ -46,7 +46,7 @@
* {@link Configuration#maxNumberOfProcessors()}.</li> * {@link Configuration#maxNumberOfProcessors()}.</li>
* </ul> * </ul>
*/ */
public class DynamicProcessorAssigner extends ExecutionMonitor.Adpter public class DynamicProcessorAssigner extends ExecutionMonitor.Adapter
{ {
private final Configuration config; private final Configuration config;
private final Map<Step<?>,Long/*done batches*/> lastChangedProcessors = new HashMap<>(); private final Map<Step<?>,Long/*done batches*/> lastChangedProcessors = new HashMap<>();
Expand Down

0 comments on commit 4bb01c9

Please sign in to comment.