Skip to content

Commit

Permalink
Simpler queueing and less state for waiting in importer steps
Browse files Browse the repository at this point in the history
instead relying on queue limit in DynamicTaskExecutor. This does away
with a bunch of state and additional check/park loops in and around
the various steps and executors.
  • Loading branch information
tinwelint committed Apr 27, 2017
1 parent 269ca1b commit fbef3f5
Show file tree
Hide file tree
Showing 18 changed files with 202 additions and 230 deletions.
Expand Up @@ -27,7 +27,7 @@
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache.GroupVisitor;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache.NodeChangeVisitor;
import org.neo4j.unsafe.impl.batchimport.cache.NodeType;
import org.neo4j.unsafe.impl.batchimport.staging.AbstractStep;
import org.neo4j.unsafe.impl.batchimport.staging.ProducerStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;

import static java.lang.System.nanoTime;
Expand All @@ -36,38 +36,26 @@
* Using the {@link NodeRelationshipCache} efficiently looks for changed nodes and reads those
* {@link NodeRecord} and sends downwards.
*/
public class ReadGroupRecordsByCacheStep extends AbstractStep<Void>
public class ReadGroupRecordsByCacheStep extends ProducerStep
{
private final RecordStore<RelationshipGroupRecord> store;
private final NodeRelationshipCache cache;
private final int batchSize;

public ReadGroupRecordsByCacheStep( StageControl control, Configuration config,
RecordStore<RelationshipGroupRecord> store, NodeRelationshipCache cache )
{
super( control, ">", config );
super( control, config );
this.store = store;
this.cache = cache;
this.batchSize = config.batchSize();
}

@Override
public long receive( long ticket, Void ignored )
protected void process()
{
new Thread()
try ( NodeVisitor visitor = new NodeVisitor() )
{
@Override
public void run()
{
assertHealthy();
try ( NodeVisitor visitor = new NodeVisitor() )
{
cache.visitChangedNodes( visitor, NodeType.NODE_TYPE_DENSE );
}
endOfUpstream();
}
}.start();
return 0;
cache.visitChangedNodes( visitor, NodeType.NODE_TYPE_DENSE );
}
}

private class NodeVisitor implements NodeChangeVisitor, AutoCloseable, GroupVisitor
Expand Down Expand Up @@ -98,11 +86,10 @@ public long visit( long nodeId, int typeId, long out, long in, long loop )
return id;
}

@SuppressWarnings( "unchecked" )
private void send()
{
totalProcessingTime.add( nanoTime() - time );
downstream.receive( doneBatches.getAndIncrement(), batch );
sendDownstream( batch );
time = nanoTime();
assertHealthy();
}
Expand All @@ -116,4 +103,10 @@ public void close()
}
}
}

@Override
protected long position()
{
return store.getHighId() * store.getRecordSize();
}
}
Expand Up @@ -24,7 +24,7 @@

import org.neo4j.helpers.collection.PrefetchingIterator;
import org.neo4j.kernel.impl.store.record.RelationshipGroupRecord;
import org.neo4j.unsafe.impl.batchimport.staging.IoProducerStep;
import org.neo4j.unsafe.impl.batchimport.staging.PullingProducerStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;

import static org.neo4j.helpers.collection.Iterators.prefetching;
Expand All @@ -33,7 +33,7 @@
* Reads {@link RelationshipGroupRecord group records} from {@link RelationshipGroupCache}, sending
* them downstream in batches.
*/
public class ReadGroupsFromCacheStep extends IoProducerStep
public class ReadGroupsFromCacheStep extends PullingProducerStep
{
private final int itemSize;
private final PrefetchingIterator<RelationshipGroupRecord> data;
Expand Down
Expand Up @@ -25,7 +25,7 @@
import org.neo4j.unsafe.impl.batchimport.cache.ByteArray;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache.NodeChangeVisitor;
import org.neo4j.unsafe.impl.batchimport.staging.AbstractStep;
import org.neo4j.unsafe.impl.batchimport.staging.ProducerStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;

import static java.lang.System.nanoTime;
Expand All @@ -36,38 +36,27 @@
* Using the {@link NodeRelationshipCache} efficiently looks for changed nodes and reads those
* {@link NodeRecord} and sends downwards.
*/
public class ReadNodeIdsByCacheStep extends AbstractStep<Void>
public class ReadNodeIdsByCacheStep extends ProducerStep
{
private final int nodeTypes;
private final NodeRelationshipCache cache;
private final int batchSize;
private volatile long highId;

public ReadNodeIdsByCacheStep( StageControl control, Configuration config,
NodeRelationshipCache cache, int nodeTypes )
{
super( control, ">", config );
super( control, config );
this.cache = cache;
this.nodeTypes = nodeTypes;
this.batchSize = config.batchSize();
}

@Override
public long receive( long ticket, Void ignored )
protected void process()
{
new Thread()
try ( NodeVisitor visitor = new NodeVisitor() )
{
@Override
public void run()
{
assertHealthy();
try ( NodeVisitor visitor = new NodeVisitor() )
{
cache.visitChangedNodes( visitor, nodeTypes );
}
endOfUpstream();
}
}.start();
return 0;
cache.visitChangedNodes( visitor, nodeTypes );
}
}

private class NodeVisitor implements NodeChangeVisitor, AutoCloseable
Expand All @@ -88,13 +77,13 @@ public void change( long nodeId, ByteArray array )
}
}

@SuppressWarnings( "unchecked" )
private void send()
{
totalProcessingTime.add( nanoTime() - time );
downstream.receive( doneBatches.getAndIncrement(), iterator( batch ) );
sendDownstream( iterator( batch ) );
time = nanoTime();
assertHealthy();
highId = batch[cursor - 1];
}

@Override
Expand All @@ -107,4 +96,10 @@ public void close()
}
}
}

@Override
protected long position()
{
return highId * Long.BYTES;
}
}
Expand Up @@ -130,7 +130,6 @@ public void submit( Task<LOCAL> task )
{ // Then just stay here and try
assertHealthy();
}
notifyProcessors();
}
catch ( InterruptedException e )
{
Expand All @@ -148,14 +147,6 @@ public void assertHealthy()
}
}

private void notifyProcessors()
{
for ( Processor processor : processors )
{
parkStrategy.unpark( processor );
}
}

@Override
public void receivePanic( Throwable cause )
{
Expand Down Expand Up @@ -228,7 +219,17 @@ public void run()
final LOCAL threadLocalState = initialLocalState.get();
while ( !shutDown && !processorShutDown )
{
Task<LOCAL> task = queue.poll();
Task<LOCAL> task;
try
{
task = queue.poll( 10, MILLISECONDS );
}
catch ( InterruptedException e )
{
Thread.interrupted();
break;
}

if ( task != null )
{
try
Expand All @@ -242,14 +243,6 @@ public void run()
throw launderedException( e );
}
}
else
{
if ( processorShutDown )
{
break;
}
parkAWhile();
}
}
}
}
Expand Down
Expand Up @@ -245,4 +245,10 @@ public String toString()
return format( "%s[%s, processors:%d, batches:%d", getClass().getSimpleName(),
name, processors( 0 ), doneBatches.get() );
}

@Override
public long doneBatches()
{
return doneBatches.get();
}
}
Expand Up @@ -25,7 +25,7 @@
/**
* Releases batches of record ids to be read, potentially in parallel, by downstream batches.
*/
public class BatchFeedStep extends IoProducerStep
public class BatchFeedStep extends PullingProducerStep
{
private final RecordIdIterator ids;
private final int recordSize;
Expand Down

This file was deleted.

Expand Up @@ -29,7 +29,7 @@
/**
* Takes an Iterator and chops it up into array batches downstream.
*/
public abstract class IteratorBatcherStep<T> extends IoProducerStep
public abstract class IteratorBatcherStep<T> extends PullingProducerStep
{
private final Iterator<T> data;
private final Class<T> itemClass;
Expand Down

0 comments on commit fbef3f5

Please sign in to comment.