Skip to content

Commit

Permalink
Use batched mode of update population in all multiple index populators.
Browse files Browse the repository at this point in the history
  • Loading branch information
MishaDemianenko committed Sep 27, 2016
1 parent 8829602 commit 0dd7d6b
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 13 deletions.
Expand Up @@ -67,15 +67,13 @@
*/
public class BatchingMultipleIndexPopulator extends MultipleIndexPopulator
{
public static final String QUEUE_THRESHOLD_NAME = "queue_threshold";
static final String TASK_QUEUE_SIZE_NAME = "task_queue_size";
static final String AWAIT_TIMEOUT_MINUTES_NAME = "await_timeout_minutes";
static final String BATCH_SIZE_NAME = "batch_size";

private static final String EOL = System.lineSeparator();
private static final String FLUSH_THREAD_NAME_PREFIX = "Index Population Flush Thread";

private final int QUEUE_THRESHOLD = FeatureToggles.getInteger( getClass(), QUEUE_THRESHOLD_NAME, 20_000 );
private final int TASK_QUEUE_SIZE = FeatureToggles.getInteger( getClass(), TASK_QUEUE_SIZE_NAME,
getNumberOfPopulationWorkers() * 2 );
private final int AWAIT_TIMEOUT_MINUTES = FeatureToggles.getInteger( getClass(), AWAIT_TIMEOUT_MINUTES_NAME, 30 );
Expand Down Expand Up @@ -131,14 +129,11 @@ protected IndexPopulation createPopulation( IndexPopulator populator,
@Override
protected void populateFromQueue( long currentlyIndexedNodeId )
{
if ( queue.size() >= QUEUE_THRESHOLD )
{
log.debug( "Populating from queue." + EOL + this );
flushAll();
awaitCompletion();
super.populateFromQueue( currentlyIndexedNodeId );
log.debug( "Drained queue and all batched updates." + EOL + this );
}
log.debug( "Populating from queue." + EOL + this );
flushAll();
awaitCompletion();
super.populateFromQueue( currentlyIndexedNodeId );
log.debug( "Drained queue and all batched updates." + EOL + this );
}

@Override
Expand Down
Expand Up @@ -48,6 +48,7 @@
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
import org.neo4j.storageengine.api.schema.IndexSample;
import org.neo4j.unsafe.impl.internal.dragons.FeatureToggles;

import static java.lang.String.format;
import static org.neo4j.collection.primitive.PrimitiveIntCollections.contains;
Expand Down Expand Up @@ -81,6 +82,10 @@
*/
public class MultipleIndexPopulator implements IndexPopulator
{

public static final String QUEUE_THRESHOLD_NAME = "queue_threshold";
private final int QUEUE_THRESHOLD = FeatureToggles.getInteger( getClass(), QUEUE_THRESHOLD_NAME, 20_000 );

// Concurrency queue since multiple concurrent threads may enqueue updates into it. It is important for this queue
// to have fast #size() method since it might be drained in batches
protected final Queue<NodePropertyUpdate> queue = new LinkedBlockingQueue<>();
Expand Down Expand Up @@ -319,6 +324,19 @@ public void cancel()
close( false );
}

void populateFromQueueBatched( long currentlyIndexedNodeId )
{
if ( isQueueThresholdReached() )
{
populateFromQueue( currentlyIndexedNodeId );
}
}

private boolean isQueueThresholdReached()
{
return queue.size() >= QUEUE_THRESHOLD;
}

protected void populateFromQueue( long currentlyIndexedNodeId )
{
populateFromQueueIfAvailable( currentlyIndexedNodeId );
Expand Down Expand Up @@ -520,7 +538,7 @@ private class NodePopulationVisitor implements Visitor<NodePropertyUpdates,
public boolean visit( NodePropertyUpdates updates ) throws IndexPopulationFailedKernelException
{
add( updates );
populateFromQueue( updates.getNodeId() );
populateFromQueueBatched( updates.getNodeId() );
return false;
}

Expand Down
Expand Up @@ -89,7 +89,7 @@ public void populateFromQueueDoesNothingIfThresholdNotReached() throws Exception
batchingPopulator.queue( update1 );
batchingPopulator.queue( update2 );

batchingPopulator.populateFromQueue( 42 );
batchingPopulator.populateFromQueueBatched( 42 );

verify( updater, never() ).process( any() );
verify( populator, never() ).newPopulatingUpdater( any() );
Expand Down
Expand Up @@ -253,9 +253,9 @@ public void shouldDeliverUpdatesThatOccurDuringPopulationToPopulator() throws Ex
//
// (We don't get an update for value2 here because we mock a fake store that doesn't contain it
// just for the purpose of testing this behavior)
order.verify( populator ).verifyDeferredConstraints( storeView );
order.verify( populator ).newPopulatingUpdater( storeView );
order.verify( updater ).close();
order.verify( populator ).verifyDeferredConstraints( storeView );
order.verify( populator ).sampleResult();
order.verify( populator ).close( true );
verifyNoMoreInteractions( updater );
Expand Down

0 comments on commit 0dd7d6b

Please sign in to comment.