Skip to content

Commit

Permalink
Enables sending batches at different pace than processing
Browse files Browse the repository at this point in the history
ProcessorStep used to have the invariant that one received/processed
batch resulted in one batch sent downstream. This made certain processing
impossible where the items in each batch were split up by a certain
criteria and sent downstream when batch size reached per part.

This commit changes that so that instead of returning a batch object to
send downstream as part of processing it, processing has access to a
batch sender allowing for more flexible sending downstream.

As part of doing this, TaskExecutor got a generic parameter for a
thread-local state and using its own Task instead of Callable.

Also cleaning up configuration of steps into a Configuration instead of
passing in the individual configuration into all step constructors.

(cherry picked from commit 06da331)
  • Loading branch information
tinwelint committed Apr 9, 2015
1 parent aafdd8b commit f9ff718
Show file tree
Hide file tree
Showing 48 changed files with 575 additions and 314 deletions.
Expand Up @@ -24,8 +24,6 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -37,9 +35,7 @@
import org.neo4j.consistency.checking.CheckerEngine;
import org.neo4j.consistency.checking.index.IndexAccessors;
import org.neo4j.consistency.report.ConsistencyReport;
import org.neo4j.function.Function;
import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.helpers.collection.Iterables;
import org.neo4j.helpers.collection.MapUtil;
import org.neo4j.kernel.api.direct.BoundedIterable;
import org.neo4j.kernel.api.index.IndexAccessor;
Expand All @@ -53,12 +49,14 @@
import org.neo4j.kernel.impl.store.record.PropertyBlock;
import org.neo4j.register.Register;

import static java.util.Arrays.asList;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;

import static java.util.Arrays.asList;

import static org.neo4j.collection.primitive.PrimitiveLongCollections.emptyIterator;
import static org.neo4j.kernel.api.properties.Property.stringProperty;
import static org.neo4j.kernel.impl.store.record.IndexRule.constraintIndexRule;
Expand Down
Expand Up @@ -337,7 +337,7 @@ private void migrateWithBatchImporter( File storeDir, File migrationDir, long la
throw new IllegalStateException( "Unknown version to upgrade from: " + versionToUpgradeFrom( storeDir ) );
}

Configuration importConfig = new Configuration.OverrideFromConfig( config );
Configuration importConfig = new Configuration.Overridden( config );
BatchImporter importer = new ParallelBatchImporter( migrationDir.getAbsolutePath(), fileSystem,
importConfig, logging, withDynamicProcessorAssignment( migrationBatchImporterMonitor(
legacyStore, progressMonitor ), importConfig ),
Expand Down
Expand Up @@ -26,7 +26,6 @@
import org.neo4j.unsafe.impl.batchimport.input.Collector;
import org.neo4j.unsafe.impl.batchimport.input.InputCache;
import org.neo4j.unsafe.impl.batchimport.input.InputRelationship;
import org.neo4j.unsafe.impl.batchimport.staging.InputIteratorBatcherStep;
import org.neo4j.unsafe.impl.batchimport.staging.Stage;

/**
Expand All @@ -42,12 +41,10 @@ public CalculateDenseNodesStage( Configuration config, InputIterable<InputRelati
InputCache inputCache ) throws IOException
{
super( "Calculate dense nodes", config, false );
add( new InputIteratorBatcherStep<>( control(), config.batchSize(), config.movingAverageSize(),
relationships.iterator(), InputRelationship.class ) );
add( new InputIteratorBatcherStep<>( control(), config, relationships.iterator(), InputRelationship.class ) );
if ( !relationships.supportsMultiplePasses() )
{
add( new InputEntityCacherStep<>( control(), config.workAheadSize(), config.movingAverageSize(),
inputCache.cacheRelationships() ) );
add( new InputEntityCacherStep<>( control(), config, inputCache.cacheRelationships() ) );
}
add( new RelationshipPreparationStep( control(), config, idMapper ) );
add( new CalculateDenseNodesStep( control(), config, nodeRelationshipLink, badRelationshipsCollector ) );
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipLink;
import org.neo4j.unsafe.impl.batchimport.input.Collector;
import org.neo4j.unsafe.impl.batchimport.input.InputRelationship;
import org.neo4j.unsafe.impl.batchimport.staging.BatchSender;
import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;

Expand All @@ -37,13 +38,13 @@ public class CalculateDenseNodesStep extends ProcessorStep<Batch<InputRelationsh
public CalculateDenseNodesStep( StageControl control, Configuration config,
NodeRelationshipLink nodeRelationshipLink, Collector<InputRelationship> badRelationshipsCollector )
{
super( control, "CALCULATOR", config.workAheadSize(), config.movingAverageSize(), 1 );
super( control, "CALCULATOR", config, false );
this.nodeRelationshipLink = nodeRelationshipLink;
this.badRelationshipsCollector = badRelationshipsCollector;
}

@Override
protected Object process( long ticket, Batch<InputRelationship,RelationshipRecord> batch )
protected void process( Batch<InputRelationship,RelationshipRecord> batch, BatchSender sender )
{
InputRelationship[] input = batch.input;
long[] ids = batch.ids;
Expand All @@ -59,7 +60,6 @@ protected Object process( long ticket, Batch<InputRelationship,RelationshipRecor
incrementCount( rel, endNode, rel.endNode() );
}
}
return null; // end of the line
}

private void incrementCount( InputRelationship relationship, long nodeId, Object inputNodeId )
Expand Down
Expand Up @@ -30,14 +30,8 @@
/**
* User controlled configuration for a {@link BatchImporter}.
*/
public interface Configuration
public interface Configuration extends org.neo4j.unsafe.impl.batchimport.staging.Configuration
{
/**
* Batch importer works with batches going through one or more stages where one or more threads
* process each stage. This setting dictates how big the batches that are passed around are.
*/
int batchSize();

/**
* Memory dedicated to buffering data to be written to each store file.
*/
Expand All @@ -49,11 +43,6 @@ public interface Configuration
*/
int bigFileChannelBufferSizeMultiplier();

/**
* Number of batches that a step can queue up before awaiting the downstream step to catch up.
*/
int workAheadSize();

/**
* The number of relationships threshold for considering a node dense.
*/
Expand Down Expand Up @@ -84,13 +73,6 @@ public interface Configuration
*/
int maxNumberOfProcessors();

/**
* For statistics the average processing time is based on total processing time divided by
* number of batches processed. A total average is probably not that interesting so this configuration
* option specifies how many of the latest processed batches counts in the equation above.
*/
int movingAverageSize();

/**
* File name of log accepting bad entries encountered during import. Can be relative (to where the
* store directory of the database that gets created) or absolute.
Expand All @@ -99,7 +81,9 @@ public interface Configuration
*/
File badFile( File storeDirectory );

public static class Default implements Configuration
class Default
extends org.neo4j.unsafe.impl.batchimport.staging.Configuration.Default
implements Configuration
{
private static final int OPTIMAL_FILE_CHANNEL_CHUNK_SIZE = 1024 * 4;

Expand Down Expand Up @@ -169,28 +153,25 @@ public File badFile( File storeDirectory )
}
}

public static final Configuration DEFAULT = new Default();
Configuration DEFAULT = new Default();

public static class OverrideFromConfig implements Configuration
class Overridden
extends org.neo4j.unsafe.impl.batchimport.staging.Configuration.Overridden
implements Configuration
{
private final Configuration defaults;
private final Config config;

public OverrideFromConfig( Configuration defaults, Config config )
public Overridden( Configuration defaults, Config config )
{
super( defaults );
this.defaults = defaults;
this.config = config;
}

public OverrideFromConfig( Config config )
{
this( DEFAULT, config );
}

@Override
public int batchSize()
public Overridden( Config config )
{
return defaults.batchSize();
this( Configuration.DEFAULT, config );
}

@Override
Expand All @@ -205,12 +186,6 @@ public int bigFileChannelBufferSizeMultiplier()
return defaults.bigFileChannelBufferSizeMultiplier();
}

@Override
public int workAheadSize()
{
return defaults.workAheadSize();
}

@Override
public int denseNodeThreshold()
{
Expand Down Expand Up @@ -241,6 +216,4 @@ public File badFile( File storeDirectory )
return defaults.badFile( storeDirectory );
}
}

// TODO Add Configuration option "calibrate()" which probes the hardware and returns optimal values.
}
Expand Up @@ -32,6 +32,7 @@
import org.neo4j.kernel.impl.transaction.state.PropertyCreator;
import org.neo4j.kernel.impl.util.ReusableIteratorCostume;
import org.neo4j.unsafe.impl.batchimport.input.InputEntity;
import org.neo4j.unsafe.impl.batchimport.staging.BatchSender;
import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;
import org.neo4j.unsafe.impl.batchimport.store.BatchingPageCache.WriterFactory;
Expand Down Expand Up @@ -65,7 +66,7 @@ public class EntityStoreUpdaterStep<RECORD extends PrimitiveRecord,INPUT extends
AbstractRecordStore<RECORD> entityStore,
PropertyStore propertyStore, IoMonitor monitor, WriterFactory writerFactory )
{
super( control, "v", 1, config.movingAverageSize(), 1, monitor ); // work-ahead doesn't matter, we're the last one
super( control, "v", config, false );
this.entityStore = entityStore;
this.propertyStore = propertyStore;
this.writerFactory = writerFactory;
Expand All @@ -75,7 +76,7 @@ public class EntityStoreUpdaterStep<RECORD extends PrimitiveRecord,INPUT extends
}

@Override
protected Object process( long ticket, Batch<INPUT,RECORD> batch )
protected void process( Batch<INPUT,RECORD> batch, BatchSender sender )
{
// Clear reused data structures
propertyRecords.close();
Expand Down Expand Up @@ -122,7 +123,6 @@ protected Object process( long ticket, Batch<INPUT,RECORD> batch )
{
propertyStore.updateRecord( propertyRecord );
}
return null; // end of the line
}

private void reassignDynamicRecordIds( PropertyBlock[] blocks, int offset, int length )
Expand Down
Expand Up @@ -22,7 +22,6 @@
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper;
import org.neo4j.unsafe.impl.batchimport.input.InputCache;
import org.neo4j.unsafe.impl.batchimport.input.InputNode;
import org.neo4j.unsafe.impl.batchimport.staging.IdMapperPreparationStep;
import org.neo4j.unsafe.impl.batchimport.staging.Stage;
import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider;

Expand All @@ -38,7 +37,7 @@ public IdMapperPreparationStage( Configuration config, IdMapper idMapper, InputI
InputCache inputCache, StatsProvider memoryUsageStats )
{
super( "Prepare node index", config, false );
add( new IdMapperPreparationStep( control(), config.batchSize(), config.movingAverageSize(),
add( new IdMapperPreparationStep( control(), config,
idMapper, idsOf( nodes.supportsMultiplePasses() ? nodes : inputCache.nodes() ), memoryUsageStats ) );
}
}
Expand Up @@ -36,10 +36,10 @@ public class IdMapperPreparationStep extends LonelyProcessingStep
private final IdMapper idMapper;
private final InputIterable<Object> allIds;

public IdMapperPreparationStep( StageControl control, int batchSize, int movingAverageSize,
public IdMapperPreparationStep( StageControl control, Configuration config,
IdMapper idMapper, InputIterable<Object> allIds, StatsProvider memoryUsageStats )
{
super( control, "" /*named later in the progress listener*/, batchSize, movingAverageSize, memoryUsageStats );
super( control, "" /*named later in the progress listener*/, config, memoryUsageStats );
this.idMapper = idMapper;
this.allIds = allIds;
}
Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.neo4j.kernel.impl.store.record.PrimitiveRecord;
import org.neo4j.unsafe.impl.batchimport.input.InputEntity;
import org.neo4j.unsafe.impl.batchimport.input.Receiver;
import org.neo4j.unsafe.impl.batchimport.staging.BatchSender;
import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;

Expand All @@ -35,18 +36,17 @@ public class InputEntityCacherStep<INPUT extends InputEntity>
{
private final Receiver<INPUT[],IOException> cacher;

public InputEntityCacherStep( StageControl control, int workAheadSize, int movingAverageSize,
Receiver<INPUT[],IOException> cacher )
public InputEntityCacherStep( StageControl control, Configuration config, Receiver<INPUT[],IOException> cacher )
{
super( control, "CACHE", workAheadSize, movingAverageSize, 1 );
super( control, "CACHE", config, false );
this.cacher = cacher;
}

@Override
protected Object process( long ticket, Batch<INPUT,? extends PrimitiveRecord> batch ) throws IOException
protected void process( Batch<INPUT,? extends PrimitiveRecord> batch, BatchSender sender ) throws IOException
{
cacher.receive( batch.input );
return batch;
sender.send( batch );
}

@Override
Expand Down
Expand Up @@ -28,17 +28,17 @@
*/
public class InputIteratorBatcherStep<T> extends IteratorBatcherStep<T>
{
public InputIteratorBatcherStep( StageControl control, int batchSize, int movingAverageSize,
public InputIteratorBatcherStep( StageControl control, Configuration config,
InputIterator<T> data, Class<T> itemClass )
{
super( control, batchSize, movingAverageSize, data, itemClass );
super( control, config, data, itemClass );
}

@SuppressWarnings( { "unchecked", "rawtypes" } )
@Override
protected Object nextBatchOrNull( int batchSize )
protected Object nextBatchOrNull( long ticket, int batchSize )
{
Object batch = super.nextBatchOrNull( batchSize );
Object batch = super.nextBatchOrNull( ticket, batchSize );
return batch != null ? new Batch( (Object[]) batch ) : null;
}
}
Expand Up @@ -36,8 +36,8 @@ public NodeCountsStage( Configuration config, NodeLabelsCache cache, NodeStore n
int highLabelId, CountsAccessor.Updater countsUpdater, StatsProvider... additionalStatsProviders )
{
super( "Node counts", config, false );
add( new ReadNodeRecordsStep( control(), config.batchSize(), config.movingAverageSize(), nodeStore ) );
add( new RecordProcessorStep<>( control(), "COUNT", config.workAheadSize(), config.movingAverageSize(),
new NodeCountsProcessor( nodeStore, cache, highLabelId, countsUpdater ), true, additionalStatsProviders ) );
add( new ReadNodeRecordsStep( control(), config, nodeStore ) );
add( new RecordProcessorStep<>( control(), "COUNT", config, new NodeCountsProcessor(
nodeStore, cache, highLabelId, countsUpdater ), true, additionalStatsProviders ) );
}
}
Expand Up @@ -28,6 +28,7 @@
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdGenerator;
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper;
import org.neo4j.unsafe.impl.batchimport.input.InputNode;
import org.neo4j.unsafe.impl.batchimport.staging.BatchSender;
import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;
import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider;
Expand All @@ -52,15 +53,15 @@ public NodeEncoderStep( StageControl control, Configuration config,
NodeStore nodeStore,
StatsProvider memoryUsageStats )
{
super( control, "NODE", config.workAheadSize(), config.movingAverageSize(), 1, memoryUsageStats );
super( control, "NODE", config, false, memoryUsageStats );
this.idMapper = idMapper;
this.idGenerator = idGenerator;
this.nodeStore = nodeStore;
this.labelHolder = labelHolder;
}

@Override
protected Object process( long ticket, Batch<InputNode,NodeRecord> batch )
protected void process( Batch<InputNode,NodeRecord> batch, BatchSender sender )
{
InputNode[] input = batch.input;
batch.records = new NodeRecord[input.length];
Expand Down Expand Up @@ -89,6 +90,6 @@ protected Object process( long ticket, Batch<InputNode,NodeRecord> batch )
InlineNodeLabels.putSorted( nodeRecord, labels, null, nodeStore.getDynamicLabelStore() );
}
}
return batch;
sender.send( batch );
}
}
Expand Up @@ -34,9 +34,9 @@ public NodeFirstRelationshipStage( Configuration config, NodeStore nodeStore,
RelationshipGroupStore relationshipGroupStore, NodeRelationshipLink cache )
{
super( "Node --> Relationship", config, false );
add( new ReadNodeRecordsStep( control(), config.batchSize(), config.movingAverageSize(), nodeStore ) );
add( new RecordProcessorStep<>( control(), "LINK", config.workAheadSize(), config.movingAverageSize(),
add( new ReadNodeRecordsStep( control(), config, nodeStore ) );
add( new RecordProcessorStep<>( control(), "LINK", config,
new NodeFirstRelationshipProcessor( relationshipGroupStore, cache ), false ) );
add( new UpdateRecordsStep<>( control(), config.workAheadSize(), config.movingAverageSize(), nodeStore ) );
add( new UpdateRecordsStep<>( control(), config, nodeStore ) );
}
}

0 comments on commit f9ff718

Please sign in to comment.