Skip to content

Commit

Permalink
Revamped parallelism of parts of batch importer
Browse files Browse the repository at this point in the history
The main story here is composed of two things:

- a new ForkedProcessorStep which does parallelization inside each
  batch, executed one by one. This to avoid difficulties parallelizing
  some steps which has a costly section which isn't parallelizable.
  With this new step items in a batch can be striped such that each
  forked processor knows which parts to process.
- better mechanical sympathy where most stages are optimized to work
  with batch sizes matching pages in the page cache of the store they
  (mainly) work with.

The forked processor simplifies a couple of stages, there are now
no artificial additional steps for splitting or otherwise modify
batches to be better parallelizable. Also the whole stage scales
better with added processors because the old way of parallelizing
those stages often involved a step which was single-threaded and
acted as a divider-of-work. Such a step would often become the
bottleneck in the end anyway.

About mechanical sympathy the main problem previously was that reader
and writer of stages which read from and wrote to the same store actually
contended on each other. Given the smaller batch size, there were
multiple batches of read records for any given page. Later in the stage
where store was updated would often update the same page and so the
reader (still reading that page) would need to do mych more retry-
reading and so slow the whole stage down. Now with the aligned
batch sizes the reader doesn't contend with the writers in the
page cache.

Additionally the main store updating step have been split into steps
updating entities and properties separately, this to have the
entity updating able to go even faster.

The net result of this change as a whole should be that more often
the disk is the only main bottleneck. On test machines and development
laptops a 2x-3x performance improvement of the importer have
been observed.
  • Loading branch information
tinwelint committed Aug 29, 2016
1 parent 9a2b369 commit bf0c0f8
Show file tree
Hide file tree
Showing 46 changed files with 1,592 additions and 865 deletions.
Expand Up @@ -107,7 +107,7 @@ public class ParallelBatchImporterTest
@Override
public int batchSize()
{
// Set to extra low to exercise the internals and IoQueue a bit more.
// Set to extra low to exercise the internals a bit more.
return 100;
}

Expand Down Expand Up @@ -175,7 +175,7 @@ public void shouldImportCsvData() throws Exception
nodes( nodeRandomSeed, NODE_COUNT, inputIdGenerator, groups ),
relationships( relationshipRandomSeed, RELATIONSHIP_COUNT, inputIdGenerator, groups ),
idMapper, idGenerator,
/*insanely high bad tolerance, but it will actually never be that many*/
/*insanely high bad tolerance, but it will actually never be that many*/
silentBadCollector( RELATIONSHIP_COUNT ) ) );

// THEN
Expand Down
@@ -0,0 +1,66 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.unsafe.impl.batchimport;

import org.neo4j.kernel.impl.store.id.IdGeneratorImpl;
import org.neo4j.kernel.impl.store.record.RelationshipRecord;
import org.neo4j.unsafe.impl.batchimport.input.InputRelationship;
import org.neo4j.unsafe.impl.batchimport.staging.BatchSender;
import org.neo4j.unsafe.impl.batchimport.staging.Configuration;
import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;

/**
* Assigns record ids to {@link Batch} for later record allocation. Since this step is single-threaded
* we can safely assign these ids here.
*/
public class AssignRelationshipIdBatchStep extends ProcessorStep<Batch<InputRelationship,RelationshipRecord>>
{
private long nextId;

public AssignRelationshipIdBatchStep( StageControl control, Configuration config, long firstRelationshipId )
{
super( control, "ASSIGN", config, 1 );
this.nextId = firstRelationshipId;
}

@Override
protected void process( Batch<InputRelationship,RelationshipRecord> batch, BatchSender sender ) throws Throwable
{
// Assign first record id and send
batch.firstRecordId = nextId;
sender.send( batch );

// Set state for the next batch
nextId += batch.input.length;
if ( nextId <= IdGeneratorImpl.INTEGER_MINUS_ONE &&
nextId + batch.input.length >= IdGeneratorImpl.INTEGER_MINUS_ONE )
{
// There's this pesky INTEGER_MINUS_ONE ID again. Easiest is to simply skip this batch of ids
// or at least the part up to that id and just continue after it.
nextId = IdGeneratorImpl.INTEGER_MINUS_ONE + 1;
}
}

public long getNextRelationshipId()
{
return nextId;
}
}
Expand Up @@ -45,8 +45,6 @@ public class Batch<INPUT,RECORD extends PrimitiveRecord>
// using the same index as the record. So it's a collective size suitable for complete looping
// over the batch.
public PropertyBlock[] propertyBlocks;
// Used by ParallelizeByNodeIdStep to help determine any two batches have any id in common
public long[] sortedIds;
// Used by relationship staged to query idMapper and store ids here
public long[] ids;
public boolean parallelizableWithPrevious;
Expand Down
Expand Up @@ -41,9 +41,11 @@ public CacheGroupsStep( StageControl control, Configuration config, Relationship
@Override
protected void process( RelationshipGroupRecord[] batch, BatchSender sender ) throws Throwable
{
for ( RelationshipGroupRecord groupRecord : batch )
// These records are read page-wise forwards, but should be cached in reverse
// since the records exists in the store in reverse order.
for ( int i = batch.length-1; i >= 0; i-- )
{
cache.put( groupRecord );
cache.put( batch[i] );
}
}
}

This file was deleted.

Expand Up @@ -55,8 +55,7 @@ public CalculateDenseNodesStage( Configuration config, InputIterable<InputRelati
add( typer = new RelationshipTypeCheckerStep( control(), config, neoStores.getRelationshipTypeRepository() ) );
add( new RelationshipPreparationStep( control(), config, idMapper ) );
add( new CalculateRelationshipsStep( control(), config, neoStores.getRelationshipStore() ) );
add( new CalculateDenseNodePrepareStep( control(), config, badCollector ) );
add( new CalculateDenseNodesStep( control(), config, cache ) );
add( new CalculateDenseNodesStep( control(), config, cache, badCollector ) );
}

/*
Expand Down
Expand Up @@ -19,43 +19,63 @@
*/
package org.neo4j.unsafe.impl.batchimport;

import org.neo4j.graphdb.Resource;
import org.neo4j.kernel.impl.store.record.RelationshipRecord;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache;
import org.neo4j.unsafe.impl.batchimport.staging.BatchSender;
import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep;
import org.neo4j.unsafe.impl.batchimport.input.Collector;
import org.neo4j.unsafe.impl.batchimport.input.InputRelationship;
import org.neo4j.unsafe.impl.batchimport.staging.Configuration;
import org.neo4j.unsafe.impl.batchimport.staging.ForkedProcessorStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;

import static org.neo4j.unsafe.impl.batchimport.CalculateDenseNodePrepareStep.RADIXES;
import static org.neo4j.unsafe.impl.batchimport.CalculateDenseNodePrepareStep.radixOf;

/**
* Runs through relationship input and counts relationships per node so that dense nodes can be designated.
* Increments counts for each visited relationship, once for start node and once for end node
* (unless for loops). This to be able to determine which nodes are dense before starting to import relationships.
*/
public class CalculateDenseNodesStep extends ProcessorStep<long[]>
public class CalculateDenseNodesStep extends ForkedProcessorStep<Batch<InputRelationship,RelationshipRecord>>
{
private final NodeRelationshipCache cache;
private final StripedLock lock = new StripedLock( RADIXES );
private final Collector badCollector;

public CalculateDenseNodesStep( StageControl control, Configuration config, NodeRelationshipCache cache )
public CalculateDenseNodesStep( StageControl control, Configuration config, NodeRelationshipCache cache,
Collector badCollector )
{
// Max 10 processors since we receive batches split by radix %10 so it doesn't make sense to have more
super( control, "CALCULATOR", config, RADIXES );
super( control, "CALCULATE", config, 0 );
this.cache = cache;
this.badCollector = badCollector;
}

@Override
protected void process( long[] ids, BatchSender sender )
protected void forkedProcess( int id, int processors, Batch<InputRelationship,RelationshipRecord> batch )
{
for ( int i = 0, idIndex = 0; i < batch.input.length; i++ )
{
InputRelationship relationship = batch.input[i];
long startNodeId = batch.ids[idIndex++];
long endNodeId = batch.ids[idIndex++];
processNodeId( id, processors, startNodeId, relationship, relationship.startNode() );
if ( startNodeId != endNodeId || // avoid counting loops twice
startNodeId == -1 || endNodeId == -1 ) // although always collect bad relationships
{
// Loops only counts as one
processNodeId( id, processors, endNodeId, relationship, relationship.endNode() );
}
}
}

private void processNodeId( int id, int processors, long nodeId,
InputRelationship relationship, Object inputId )
{
// We lock because we only want at most one processor processing ids of a certain radix.
try ( Resource automaticallyUnlocked = lock.lock( radixOf( ids[0] ) ) )
if ( nodeId == -1 )
{
for ( long id : ids )
if ( id == 0 )
{
if ( id != -1 )
{
cache.incrementCount( id );
}
// Only let the processor with id=0 (which always exists) report the bad relationships
badCollector.collectBadRelationship( relationship, inputId );
}
}
else if ( nodeId % processors == id )
{
cache.incrementCount( nodeId );
}
}
}
Expand Up @@ -26,6 +26,10 @@
import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;

/**
* Keeps track of number of relationships to import, this to set highId in relationship store before import.
* This is because of the way double-unit records works, so the secondary units will end up beyond this limit.
*/
public class CalculateRelationshipsStep extends ProcessorStep<Batch<InputRelationship,RelationshipRecord>>
{
private final RelationshipStore relationshipStore;
Expand Down
Expand Up @@ -80,6 +80,11 @@ class Overridden
private final Configuration defaults;
private final Config config;

public Overridden( Configuration defaults )
{
this( defaults, Config.empty() );
}

public Overridden( Configuration defaults, Config config )
{
super( defaults );
Expand Down Expand Up @@ -110,4 +115,16 @@ public int movingAverageSize()
return defaults.movingAverageSize();
}
}

public static Configuration withBatchSize( Configuration config, int batchSize )
{
return new Overridden( config )
{
@Override
public int batchSize()
{
return batchSize;
}
};
}
}
Expand Up @@ -25,6 +25,8 @@
import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep;
import org.neo4j.unsafe.impl.batchimport.staging.Stage;

import static org.neo4j.unsafe.impl.batchimport.RecordIdIterator.allIn;

/**
* Stage for counting groups per node, populates {@link RelationshipGroupCache}.
*/
Expand All @@ -35,7 +37,7 @@ public CountGroupsStage( Configuration config, RecordStore<RelationshipGroupReco
{
super( "Count groups", config );

add( new ReadRecordsStep<>( control(), config, store, RecordIdIteration.allIn( store ) ) );
add( new ReadRecordsStep<>( control(), config, store, allIn( store, config ) ) );
add( new CountGroupsStep( control(), config, groupCache ) );
}
}

0 comments on commit bf0c0f8

Please sign in to comment.