Skip to content

Commit

Permalink
Reduces peak memory required by importer
Browse files Browse the repository at this point in the history
by separating import logic so that IdMapper and NodeRelationshipCache aren't
used at the same time. For this to work one additional stage is introduced
before relationship linking that goes through the imported relationship records
and counts node degrees.

Typically this separation reduces the peak memory usage by 40%.

main author: @SRaghuram
  • Loading branch information
tinwelint committed Aug 2, 2017
1 parent b71a5dc commit c06babc
Show file tree
Hide file tree
Showing 11 changed files with 257 additions and 152 deletions.
Expand Up @@ -21,60 +21,43 @@

import org.neo4j.kernel.impl.store.record.RelationshipRecord;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache;
import org.neo4j.unsafe.impl.batchimport.input.Collector;
import org.neo4j.unsafe.impl.batchimport.input.InputRelationship;
import org.neo4j.unsafe.impl.batchimport.staging.ForkedProcessorStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;

import static org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper.ID_NOT_FOUND;

/**
* Increments counts for each visited relationship, once for start node and once for end node
* (unless for loops). This to be able to determine which nodes are dense before starting to import relationships.
*/
public class CalculateDenseNodesStep extends ForkedProcessorStep<Batch<InputRelationship,RelationshipRecord>>
public class CalculateDenseNodesStep extends ForkedProcessorStep<RelationshipRecord[]>
{
private final NodeRelationshipCache cache;
private final Collector badCollector;

public CalculateDenseNodesStep( StageControl control, Configuration config, NodeRelationshipCache cache,
Collector badCollector )
public CalculateDenseNodesStep( StageControl control, Configuration config, NodeRelationshipCache cache )
{
super( control, "CALCULATE", config );
this.cache = cache;
this.badCollector = badCollector;
}

@Override
protected void forkedProcess( int id, int processors, Batch<InputRelationship,RelationshipRecord> batch )
protected void forkedProcess( int id, int processors, RelationshipRecord[] batch )
{
for ( int i = 0, idIndex = 0; i < batch.input.length; i++ )
for ( int i = 0; i < batch.length; i++ )
{
InputRelationship relationship = batch.input[i];
long startNodeId = batch.ids[idIndex++];
long endNodeId = batch.ids[idIndex++];
processNodeId( id, processors, startNodeId, relationship, relationship.startNode() );
if ( startNodeId != endNodeId || // avoid counting loops twice
startNodeId == ID_NOT_FOUND ) // although always collect bad relationships
RelationshipRecord relationship = batch[i];
long startNodeId = relationship.getFirstNode();
long endNodeId = relationship.getSecondNode();
processNodeId( id, processors, startNodeId );
if ( startNodeId != endNodeId ) // avoid counting loops twice
{
// Loops only counts as one
processNodeId( id, processors, endNodeId, relationship, relationship.endNode() );
processNodeId( id, processors, endNodeId );
}
}
}

private void processNodeId( int id, int processors, long nodeId,
InputRelationship relationship, Object inputId )
private void processNodeId( int id, int processors, long nodeId )
{
if ( nodeId == ID_NOT_FOUND )
{
if ( id == MAIN )
{
// Only let the processor with id=0 (which always exists) report the bad relationships
badCollector.collectBadRelationship( relationship, inputId );
}
}
else if ( nodeId % processors == id )
if ( nodeId % processors == id )
{
cache.incrementCount( nodeId );
}
Expand Down
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.unsafe.impl.batchimport;

import static org.neo4j.unsafe.impl.batchimport.RecordIdIterator.forwards;
import static org.neo4j.unsafe.impl.batchimport.staging.Step.ORDER_SEND_DOWNSTREAM;

import org.neo4j.kernel.impl.store.RelationshipStore;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache;
import org.neo4j.unsafe.impl.batchimport.staging.BatchFeedStep;
import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep;
import org.neo4j.unsafe.impl.batchimport.staging.Stage;

/**
* Goes through {@link RelationshipStore} and increments counts per start/end node,
* calling {@link NodeRelationshipCache#incrementCount(long)}. This is in preparation of linking relationships.
*/
public class NodeDegreeCountStage extends Stage
{
public NodeDegreeCountStage( Configuration config, RelationshipStore store, NodeRelationshipCache cache )
{
super( "Node Degrees", config, ORDER_SEND_DOWNSTREAM );
add( new BatchFeedStep( control(), config, forwards( 0, store.getHighId(), config ), store.getRecordSize()) );
add( new ReadRecordsStep<>( control(), config, false, store, null ) );
add( new CalculateDenseNodesStep( control(), config, cache ) );
}
}
Expand Up @@ -60,18 +60,15 @@
*/
public class NodeStage extends Stage
{
private final NodeRelationshipCache cache;
private final NodeStore nodeStore;

public NodeStage( Configuration config, IoMonitor writeMonitor,
InputIterable<InputNode> nodes, IdMapper idMapper, IdGenerator idGenerator,
BatchingNeoStores neoStore, InputCache inputCache, LabelScanStore labelScanStore,
EntityStoreUpdaterStep.Monitor storeUpdateMonitor,
NodeRelationshipCache cache,
StatsProvider memoryUsage ) throws IOException
{
super( "Nodes", config, ORDER_SEND_DOWNSTREAM );
this.cache = cache;
add( new InputIteratorBatcherStep<>( control(), config, nodes.iterator(), InputNode.class, t -> true ) );
if ( !nodes.supportsMultiplePasses() )
{
Expand All @@ -87,13 +84,4 @@ public NodeStage( Configuration config, IoMonitor writeMonitor,
add( new EntityStoreUpdaterStep<>( control(), config, nodeStore, propertyStore, writeMonitor,
storeUpdateMonitor ) );
}

@Override
public void close()
{
// At this point we know how many nodes we have, so we tell the cache that instead of having the
// cache keeping track of that in a the face of concurrent updates.
cache.setHighNodeId( nodeStore.getHighId() );
super.close();
}
}
Expand Up @@ -63,10 +63,10 @@
import org.neo4j.unsafe.impl.batchimport.store.BatchingNeoStores;
import org.neo4j.unsafe.impl.batchimport.store.BatchingTokenRepository.BatchingRelationshipTypeTokenRepository;
import org.neo4j.unsafe.impl.batchimport.store.io.IoMonitor;

import static java.lang.Long.max;
import static java.lang.String.format;
import static java.lang.System.currentTimeMillis;

import static org.neo4j.helpers.Format.bytes;
import static org.neo4j.unsafe.impl.batchimport.AdditionalInitialIds.EMPTY;
import static org.neo4j.unsafe.impl.batchimport.SourceOrCachedInputIterable.cachedForSure;
Expand Down Expand Up @@ -187,7 +187,7 @@ public void doImport( Input input ) throws IOException
Configuration nodeConfig = configWithRecordsPerPageBasedBatchSize( config, neoStore.getNodeStore() );
NodeStage nodeStage = new NodeStage( nodeConfig, writeMonitor,
nodes, idMapper, idGenerator, neoStore, inputCache, neoStore.getLabelScanStore(),
storeUpdateMonitor, nodeRelationshipCache, memoryUsageStats );
storeUpdateMonitor, memoryUsageStats );
neoStore.startFlushingPageCache();
executeStage( nodeStage );
neoStore.stopFlushingPageCache();
Expand All @@ -201,34 +201,39 @@ public void doImport( Input input ) throws IOException
executeStage( new DeleteDuplicateNodesStage( config, duplicateNodeIds, neoStore ) );
}
}

// Import relationships (unlinked), properties
Configuration relationshipConfig =
configWithRecordsPerPageBasedBatchSize( config, neoStore.getNodeStore() );
RelationshipStage unlinkedRelationshipStage =
new RelationshipStage( relationshipConfig, writeMonitor, relationships, idMapper,
badCollector, inputCache, nodeRelationshipCache, neoStore, storeUpdateMonitor );
badCollector, inputCache, neoStore, storeUpdateMonitor );
neoStore.startFlushingPageCache();
executeStage( unlinkedRelationshipStage );
neoStore.stopFlushingPageCache();

idMapper.close();
idMapper = null;
// Link relationships together with each other, their nodes and their relationship groups
long availableMemory = maxMemory - totalMemoryUsageOf( nodeRelationshipCache, idMapper, neoStore );
long availableMemory = maxMemory - totalMemoryUsageOf( nodeRelationshipCache, neoStore );
// This is where the nodeRelationshipCache is allocated memory.
// This has to happen after idMapped is released
nodeRelationshipCache.setHighNodeId( neoStore.getNodeStore().getHighId() );
NodeDegreeCountStage nodeDegreeStage = new NodeDegreeCountStage( relationshipConfig,
neoStore.getRelationshipStore(), nodeRelationshipCache );
neoStore.startFlushingPageCache();
executeStage( nodeDegreeStage );
neoStore.stopFlushingPageCache();

linkData( nodeRelationshipCache, neoStore, unlinkedRelationshipStage.getDistribution(),
availableMemory );

// Release this potentially really big piece of cached data
long peakMemoryUsage = totalMemoryUsageOf( nodeRelationshipCache, idMapper, neoStore );
long peakMemoryUsage = totalMemoryUsageOf( nodeRelationshipCache, neoStore );
long highNodeId = nodeRelationshipCache.getHighNodeId();
idMapper.close();
idMapper = null;
nodeRelationshipCache.close();
nodeRelationshipCache = null;

// Defragment relationships groups for better performance
RelationshipGroupDefragmenter groupDefragmenter =
new RelationshipGroupDefragmenter( config, executionMonitor, numberArrayFactory );
groupDefragmenter.run( max( maxMemory, peakMemoryUsage ), neoStore, highNodeId );
new RelationshipGroupDefragmenter( config, executionMonitor, numberArrayFactory )
.run( max( maxMemory, peakMemoryUsage ), neoStore, highNodeId );

// Count nodes per label and labels per node
nodeLabelsCache = new NodeLabelsCache( numberArrayFactory, neoStore.getLabelRepository().getHighId() );
Expand Down
Expand Up @@ -21,6 +21,7 @@

import org.neo4j.kernel.impl.store.record.Record;
import org.neo4j.kernel.impl.store.record.RelationshipRecord;
import org.neo4j.unsafe.impl.batchimport.input.Collector;
import org.neo4j.unsafe.impl.batchimport.input.InputRelationship;
import org.neo4j.unsafe.impl.batchimport.staging.BatchSender;
import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep;
Expand All @@ -36,12 +37,14 @@
public class RelationshipRecordPreparationStep extends ProcessorStep<Batch<InputRelationship,RelationshipRecord>>
{
private final BatchingRelationshipTypeTokenRepository relationshipTypeRepository;
private final Collector badCollector;

public RelationshipRecordPreparationStep( StageControl control, Configuration config,
BatchingRelationshipTypeTokenRepository relationshipTypeRepository )
BatchingRelationshipTypeTokenRepository relationshipTypeRepository, Collector badCollector )
{
super( control, "RECORDS", config, 0 );
this.relationshipTypeRepository = relationshipTypeRepository;
this.badCollector = badCollector;
}

@Override
Expand All @@ -57,7 +60,14 @@ protected void process( Batch<InputRelationship,RelationshipRecord> batch, Batch
long endNodeId = batch.ids[idIndex++];
if ( startNodeId == ID_NOT_FOUND || endNodeId == ID_NOT_FOUND )
{
relationship.setInUse( false );
if ( startNodeId == ID_NOT_FOUND )
{
badCollector.collectBadRelationship( batchRelationship, batchRelationship.startNode() );
}
if ( endNodeId == ID_NOT_FOUND )
{
badCollector.collectBadRelationship( batchRelationship, batchRelationship.endNode() );
}
}
else
{
Expand Down
Expand Up @@ -26,7 +26,6 @@
import org.neo4j.kernel.impl.store.record.PropertyBlock;
import org.neo4j.kernel.impl.store.record.PropertyRecord;
import org.neo4j.kernel.impl.store.record.RelationshipRecord;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache;
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper;
import org.neo4j.unsafe.impl.batchimport.input.Collector;
import org.neo4j.unsafe.impl.batchimport.input.Input;
Expand Down Expand Up @@ -62,7 +61,7 @@ public class RelationshipStage extends Stage

public RelationshipStage( Configuration config, IoMonitor writeMonitor,
InputIterable<InputRelationship> relationships, IdMapper idMapper,
Collector badCollector, InputCache inputCache, NodeRelationshipCache cache,
Collector badCollector, InputCache inputCache,
BatchingNeoStores neoStore, EntityStoreUpdaterStep.Monitor storeUpdateMonitor ) throws IOException
{
super( "Relationships", config, ORDER_SEND_DOWNSTREAM );
Expand All @@ -78,8 +77,8 @@ public RelationshipStage( Configuration config, IoMonitor writeMonitor,
add( typer = new RelationshipTypeCheckerStep( control(), config, neoStore.getRelationshipTypeRepository() ) );
add( new AssignRelationshipIdBatchStep( control(), config, 0 ) );
add( new RelationshipPreparationStep( control(), config, idMapper ) );
add( new RelationshipRecordPreparationStep( control(), config, neoStore.getRelationshipTypeRepository() ) );
add( new CalculateDenseNodesStep( control(), config, cache, badCollector ) );
add( new RelationshipRecordPreparationStep( control(), config,
neoStore.getRelationshipTypeRepository(), badCollector ) );
add( new PropertyEncoderStep<>( control(), config, neoStore.getPropertyKeyRepository(), propertyStore ) );
add( new EntityStoreUpdaterStep<>( control(), config, relationshipStore, propertyStore,
writeMonitor, storeUpdateMonitor ) );
Expand Down
Expand Up @@ -76,7 +76,7 @@ public BadCollector( OutputStream out, long tolerance, int collect, boolean skip
}

@Override
public void collectBadRelationship( final InputRelationship relationship, final Object specificValue )
public synchronized void collectBadRelationship( final InputRelationship relationship, final Object specificValue )
{
checkTolerance( BAD_RELATIONSHIPS, new RelationshipsProblemReporter( relationship, specificValue ) );
}
Expand Down

0 comments on commit c06babc

Please sign in to comment.