Skip to content

Commit

Permalink
Renames ExecutorServiceStep --> ProcessorsStep and proper step packages
Browse files Browse the repository at this point in the history
(cherry picked from commit 4304c9f)
  • Loading branch information
tinwelint committed Apr 9, 2015
1 parent eb5f1b8 commit aafdd8b
Show file tree
Hide file tree
Showing 14 changed files with 31 additions and 30 deletions.
Expand Up @@ -23,13 +23,13 @@
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipLink;
import org.neo4j.unsafe.impl.batchimport.input.Collector;
import org.neo4j.unsafe.impl.batchimport.input.InputRelationship;
import org.neo4j.unsafe.impl.batchimport.staging.ExecutorServiceStep;
import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;

/**
* Runs through relationship input and counts relationships per node so that dense nodes can be designated.
*/
public class CalculateDenseNodesStep extends ExecutorServiceStep<Batch<InputRelationship,RelationshipRecord>>
public class CalculateDenseNodesStep extends ProcessorStep<Batch<InputRelationship,RelationshipRecord>>
{
private final NodeRelationshipLink nodeRelationshipLink;
private final Collector<InputRelationship> badRelationshipsCollector;
Expand Down
Expand Up @@ -32,7 +32,7 @@
import org.neo4j.kernel.impl.transaction.state.PropertyCreator;
import org.neo4j.kernel.impl.util.ReusableIteratorCostume;
import org.neo4j.unsafe.impl.batchimport.input.InputEntity;
import org.neo4j.unsafe.impl.batchimport.staging.ExecutorServiceStep;
import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;
import org.neo4j.unsafe.impl.batchimport.store.BatchingPageCache.WriterFactory;
import org.neo4j.unsafe.impl.batchimport.store.BatchingPropertyRecordAccess;
Expand All @@ -49,7 +49,7 @@
* @param <INPUT> type of input.
*/
public class EntityStoreUpdaterStep<RECORD extends PrimitiveRecord,INPUT extends InputEntity>
extends ExecutorServiceStep<Batch<INPUT,RECORD>>
extends ProcessorStep<Batch<INPUT,RECORD>>
{
private final AbstractRecordStore<RECORD> entityStore;
private final PropertyStore propertyStore;
Expand Down
Expand Up @@ -17,11 +17,13 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.unsafe.impl.batchimport.staging;
package org.neo4j.unsafe.impl.batchimport;

import org.neo4j.helpers.progress.ProgressListener;
import org.neo4j.unsafe.impl.batchimport.InputIterable;
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper;
import org.neo4j.unsafe.impl.batchimport.staging.LonelyProcessingStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;
import org.neo4j.unsafe.impl.batchimport.staging.Step;
import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider;

/**
Expand Down
Expand Up @@ -24,14 +24,14 @@
import org.neo4j.kernel.impl.store.record.PrimitiveRecord;
import org.neo4j.unsafe.impl.batchimport.input.InputEntity;
import org.neo4j.unsafe.impl.batchimport.input.Receiver;
import org.neo4j.unsafe.impl.batchimport.staging.ExecutorServiceStep;
import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;

/**
* Caches the incoming {@link InputEntity} to disk, for later use.
*/
public class InputEntityCacherStep<INPUT extends InputEntity>
extends ExecutorServiceStep<Batch<INPUT,? extends PrimitiveRecord>>
extends ProcessorStep<Batch<INPUT,? extends PrimitiveRecord>>
{
private final Receiver<INPUT[],IOException> cacher;

Expand Down
Expand Up @@ -17,11 +17,10 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.unsafe.impl.batchimport.staging;
package org.neo4j.unsafe.impl.batchimport;

import org.neo4j.unsafe.impl.batchimport.Batch;
import org.neo4j.unsafe.impl.batchimport.BatchImporter;
import org.neo4j.unsafe.impl.batchimport.InputIterator;
import org.neo4j.unsafe.impl.batchimport.staging.IteratorBatcherStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;

/**
* {@link IteratorBatcherStep} that is tailored to the {@link BatchImporter} as it produces {@link Batch}
Expand Down
Expand Up @@ -28,7 +28,7 @@
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdGenerator;
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper;
import org.neo4j.unsafe.impl.batchimport.input.InputNode;
import org.neo4j.unsafe.impl.batchimport.staging.ExecutorServiceStep;
import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;
import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider;
import org.neo4j.unsafe.impl.batchimport.store.BatchingTokenRepository.BatchingLabelTokenRepository;
Expand All @@ -39,7 +39,7 @@
/**
* Creates {@link NodeRecord nodes} with labels from input.
*/
public final class NodeEncoderStep extends ExecutorServiceStep<Batch<InputNode,NodeRecord>>
public final class NodeEncoderStep extends ProcessorStep<Batch<InputNode,NodeRecord>>
{
private final IdMapper idMapper;
private final IdGenerator idGenerator;
Expand Down
Expand Up @@ -25,14 +25,14 @@
import org.neo4j.kernel.impl.api.CountsAccessor;
import org.neo4j.unsafe.impl.batchimport.cache.NodeLabelsCache;
import org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory;
import org.neo4j.unsafe.impl.batchimport.staging.ExecutorServiceStep;
import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;

/**
* Processes relationship count data received from {@link ReadRelationshipCountsDataStep} and keeps
* the accumulated counts per thread. Aggregated when {@link #done()}.
*/
public class ProcessRelationshipCountsDataStep extends ExecutorServiceStep<long[]>
public class ProcessRelationshipCountsDataStep extends ProcessorStep<long[]>
{
private final NodeLabelsCache cache;
private final Map<Thread,RelationshipCountsProcessor> processors = new ConcurrentHashMap<>();
Expand Down
Expand Up @@ -25,7 +25,7 @@
import org.neo4j.kernel.impl.transaction.state.PropertyCreator;
import org.neo4j.kernel.impl.util.MovingAverage;
import org.neo4j.unsafe.impl.batchimport.input.InputEntity;
import org.neo4j.unsafe.impl.batchimport.staging.ExecutorServiceStep;
import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep;
import org.neo4j.unsafe.impl.batchimport.staging.Stage;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;
import org.neo4j.unsafe.impl.batchimport.store.BatchingTokenRepository.BatchingPropertyKeyTokenRepository;
Expand All @@ -39,7 +39,7 @@
* since property encoding is potentially the most costly step in this {@link Stage}.
*/
public class PropertyEncoderStep<RECORD extends PrimitiveRecord,INPUT extends InputEntity>
extends ExecutorServiceStep<Batch<INPUT,RECORD>>
extends ProcessorStep<Batch<INPUT,RECORD>>
{
private final BatchingPropertyKeyTokenRepository propertyKeyHolder;
private final int arrayDataSize;
Expand Down
Expand Up @@ -20,15 +20,15 @@
package org.neo4j.unsafe.impl.batchimport;

import org.neo4j.kernel.impl.store.record.AbstractBaseRecord;
import org.neo4j.unsafe.impl.batchimport.staging.ExecutorServiceStep;
import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;
import org.neo4j.unsafe.impl.batchimport.staging.Step;
import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider;

/**
* {@link RecordProcessor} in {@link Step Step-form}.
*/
public class RecordProcessorStep<T extends AbstractBaseRecord> extends ExecutorServiceStep<T[]>
public class RecordProcessorStep<T extends AbstractBaseRecord> extends ProcessorStep<T[]>
{
private final RecordProcessor<T> processor;
private final boolean endOfLine;
Expand Down
Expand Up @@ -24,7 +24,7 @@
import org.neo4j.kernel.impl.store.record.RelationshipRecord;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipLink;
import org.neo4j.unsafe.impl.batchimport.input.InputRelationship;
import org.neo4j.unsafe.impl.batchimport.staging.ExecutorServiceStep;
import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;
import org.neo4j.unsafe.impl.batchimport.store.BatchingTokenRepository;

Expand All @@ -38,7 +38,7 @@
* relationship ids are kept in {@link NodeRelationshipLink node cache}, which is a point of scalability issues,
* although mitigated using multi-pass techniques.
*/
public class RelationshipEncoderStep extends ExecutorServiceStep<Batch<InputRelationship,RelationshipRecord>>
public class RelationshipEncoderStep extends ProcessorStep<Batch<InputRelationship,RelationshipRecord>>
{
private final BatchingTokenRepository<?> relationshipTypeRepository;
private final RelationshipStore relationshipStore;
Expand Down
Expand Up @@ -22,15 +22,15 @@
import org.neo4j.kernel.impl.store.record.RelationshipRecord;
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper;
import org.neo4j.unsafe.impl.batchimport.input.InputRelationship;
import org.neo4j.unsafe.impl.batchimport.staging.ExecutorServiceStep;
import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;

/**
* Prepares {@link InputRelationship}, or at least potential slow parts of it, namely {@link IdMapper} lookup.
* This step is also parallelizable so if it becomes a bottleneck then more processors will automatically
* be assigned to it.
*/
public class RelationshipPreparationStep extends ExecutorServiceStep<Batch<InputRelationship,RelationshipRecord>>
public class RelationshipPreparationStep extends ProcessorStep<Batch<InputRelationship,RelationshipRecord>>
{
private final IdMapper idMapper;

Expand Down
Expand Up @@ -23,7 +23,7 @@

import org.neo4j.kernel.impl.store.RecordStore;
import org.neo4j.kernel.impl.store.record.AbstractBaseRecord;
import org.neo4j.unsafe.impl.batchimport.staging.ExecutorServiceStep;
import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;
import org.neo4j.unsafe.impl.batchimport.stats.Key;
import org.neo4j.unsafe.impl.batchimport.stats.Keys;
Expand All @@ -34,7 +34,7 @@
* Updates a batch of records to a store.
*/
public class UpdateRecordsStep<RECORD extends AbstractBaseRecord>
extends ExecutorServiceStep<RECORD[]>
extends ProcessorStep<RECORD[]>
implements StatsProvider
{
private final RecordStore<RECORD> store;
Expand Down
Expand Up @@ -35,7 +35,7 @@
* {@link Step} that uses {@link TaskExecutor} as a queue and execution mechanism.
* Supports an arbitrary number of threads to execute batches in parallel.
*/
public abstract class ExecutorServiceStep<T> extends AbstractStep<T>
public abstract class ProcessorStep<T> extends AbstractStep<T>
{
private TaskExecutor executor;
private final int workAheadSize;
Expand All @@ -54,7 +54,7 @@ public boolean accept( long queueSizeThreshold )
// Useful for tracking how much time we spend waiting for batches from upstream.
private final AtomicLong lastBatchEndTime = new AtomicLong();

protected ExecutorServiceStep( StageControl control, String name, int workAheadSize, int movingAverageSize,
protected ProcessorStep( StageControl control, String name, int workAheadSize, int movingAverageSize,
int initialProcessorCount, boolean allowMultipleProcessors, StatsProvider... additionalStatsProviders )
{
super( control, name, movingAverageSize, additionalStatsProviders );
Expand All @@ -63,7 +63,7 @@ protected ExecutorServiceStep( StageControl control, String name, int workAheadS
this.allowMultipleProcessors = allowMultipleProcessors;
}

protected ExecutorServiceStep( StageControl control, String name, int workAheadSize, int movingAverageSize,
protected ProcessorStep( StageControl control, String name, int workAheadSize, int movingAverageSize,
int initialProcessorCount, StatsProvider... additionalStatsProviders )
{
this( control, name, workAheadSize, movingAverageSize, initialProcessorCount, initialProcessorCount > 1,
Expand Down
Expand Up @@ -79,7 +79,7 @@ protected Object nextBatchOrNull( int batchSize )
stage.close();
}

private static class ReceiveOrderAssertingStep extends ExecutorServiceStep<Object>
private static class ReceiveOrderAssertingStep extends ProcessorStep<Object>
{
private final AtomicLong lastTicket = new AtomicLong();
private final long processingTime;
Expand Down

0 comments on commit aafdd8b

Please sign in to comment.