Skip to content

Commit

Permalink
Better naming around MultipleIndexPopulator (#11714)
Browse files Browse the repository at this point in the history
Better naming around MultipleIndexPopulator
  • Loading branch information
burqen committed May 8, 2018
1 parent cd5f3bf commit 2f8a11c
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 34 deletions.
Expand Up @@ -106,12 +106,12 @@ public StoreScan<IndexPopulationFailedKernelException> indexAllNodes()
}

@Override
protected void populateFromQueue( long currentlyIndexedNodeId )
protected void populateFromUpdateQueue( long currentlyIndexedNodeId )
{
log.debug( "Populating from queue." + EOL + this );
flushAll();
awaitCompletion();
super.populateFromQueue( currentlyIndexedNodeId );
super.populateFromUpdateQueue( currentlyIndexedNodeId );
log.debug( "Drained queue and all batched updates." + EOL + this );
}

Expand All @@ -124,7 +124,7 @@ public String toString()
.collect( joining( ", ", "[", "]" ) );

return "BatchingMultipleIndexPopulator{activeTasks=" + activeTasks + ", executor=" + executor + ", " +
"batchedUpdates = " + updatesString + ", queuedUpdates = " + queue.size() + "}";
"batchedUpdates = " + updatesString + ", queuedUpdates = " + updatesQueue.size() + "}";
}

/**
Expand Down
Expand Up @@ -169,7 +169,7 @@ void cancelPopulation( MultipleIndexPopulator.IndexPopulation population )
*/
public void update( IndexEntryUpdate<?> update )
{
multiPopulator.queue( update );
multiPopulator.queueUpdate( update );
}

@Override
Expand Down
Expand Up @@ -65,7 +65,7 @@
* and generate updates that are fed into the {@link IndexPopulator populators}. Only a single call to this
* method should be made during the life time of a {@link MultipleIndexPopulator} and should be called by the
* same thread instantiating this instance.</li>
* <li>{@link #queue(IndexEntryUpdate)} which queues updates which will be read by the thread currently executing
* <li>{@link #queueUpdate(IndexEntryUpdate)} which queues updates which will be read by the thread currently executing
* {@link #indexAllNodes()} and incorporated into that data stream. Calls to this method may come from any number
* of concurrent threads.</li>
* </ul>
Expand All @@ -77,7 +77,7 @@
* FailedIndexProxyFactory, String)}.</li>
* <li>Call to {@link #create()} to create data structures and files to start accepting updates.</li>
* <li>Call to {@link #indexAllNodes()} (blocking call).</li>
* <li>While all nodes are being indexed, calls to {@link #queue(IndexEntryUpdate)} are accepted.</li>
* <li>While all nodes are being indexed, calls to {@link #queueUpdate(IndexEntryUpdate)} are accepted.</li>
* <li>Call to {@link #flipAfterPopulation()} after successful population, or {@link #fail(Throwable)} if not</li>
* </ol>
*/
Expand All @@ -91,7 +91,7 @@ public class MultipleIndexPopulator implements IndexPopulator

// Concurrency queue since multiple concurrent threads may enqueue updates into it. It is important for this queue
// to have fast #size() method since it might be drained in batches
protected final Queue<IndexEntryUpdate<?>> queue = new LinkedBlockingQueue<>();
final Queue<IndexEntryUpdate<?>> updatesQueue = new LinkedBlockingQueue<>();

// Populators are added into this list. The same thread adding populators will later call #indexAllNodes.
// Multiple concurrent threads might fail individual populations.
Expand Down Expand Up @@ -124,7 +124,7 @@ IndexPopulation addPopulator(
return population;
}

protected IndexPopulation createPopulation( IndexPopulator populator, long indexId, IndexMeta indexMeta,
private IndexPopulation createPopulation( IndexPopulator populator, long indexId, IndexMeta indexMeta,
FlippableIndexProxy flipper, FailedIndexProxyFactory failedIndexProxyFactory, String indexUserDescription )
{
return new IndexPopulation( populator, indexId, indexMeta, flipper, failedIndexProxyFactory, indexUserDescription );
Expand Down Expand Up @@ -181,9 +181,9 @@ public void run() throws IndexPopulationFailedKernelException
*
* @param update {@link IndexEntryUpdate} to queue.
*/
public void queue( IndexEntryUpdate<?> update )
public void queueUpdate( IndexEntryUpdate<?> update )
{
queue.add( update );
updatesQueue.add( update );
}

/**
Expand Down Expand Up @@ -347,22 +347,22 @@ private boolean removeFromOngoingPopulations( IndexPopulation indexPopulation )
return populations.remove( indexPopulation );
}

void populateFromQueueBatched( long currentlyIndexedNodeId )
void populateFromUpdateQueueBatched( long currentlyIndexedNodeId )
{
if ( isQueueThresholdReached() )
if ( isUpdateQueueThresholdReached() )
{
populateFromQueue( currentlyIndexedNodeId );
populateFromUpdateQueue( currentlyIndexedNodeId );
}
}

private boolean isQueueThresholdReached()
private boolean isUpdateQueueThresholdReached()
{
return queue.size() >= QUEUE_THRESHOLD;
return updatesQueue.size() >= QUEUE_THRESHOLD;
}

protected void populateFromQueue( long currentlyIndexedNodeId )
protected void populateFromUpdateQueue( long currentlyIndexedNodeId )
{
populateFromQueueIfAvailable( currentlyIndexedNodeId );
populateFromUpdateQueueIfAvailable( currentlyIndexedNodeId );
}

void flushAll()
Expand All @@ -382,19 +382,19 @@ protected void flush( IndexPopulation population )
}
}

private void populateFromQueueIfAvailable( long currentlyIndexedNodeId )
private void populateFromUpdateQueueIfAvailable( long currentlyIndexedNodeId )
{
if ( !queue.isEmpty() )
if ( !updatesQueue.isEmpty() )
{
try ( MultipleIndexUpdater updater = newPopulatingUpdater( storeView ) )
{
do
{
// no need to check for null as nobody else is emptying this queue
IndexEntryUpdate<?> update = queue.poll();
IndexEntryUpdate<?> update = updatesQueue.poll();
storeScan.acceptUpdate( updater, update, currentlyIndexedNodeId );
}
while ( !queue.isEmpty() );
while ( !updatesQueue.isEmpty() );
}
}
}
Expand Down Expand Up @@ -569,7 +569,7 @@ void flip() throws FlipFailedKernelException
if ( populationOngoing )
{
populator.add( takeCurrentBatch() );
populateFromQueueIfAvailable( Long.MAX_VALUE );
populateFromUpdateQueueIfAvailable( Long.MAX_VALUE );
if ( populations.contains( IndexPopulation.this ) )
{
IndexSample sample = populator.sampleResult();
Expand Down Expand Up @@ -621,7 +621,7 @@ private class NodePopulationVisitor implements Visitor<NodeUpdates,
public boolean visit( NodeUpdates updates )
{
add( updates );
populateFromQueueBatched( updates.getNodeId() );
populateFromUpdateQueueBatched( updates.getNodeId() );
return false;
}

Expand Down
Expand Up @@ -96,10 +96,10 @@ public void populateFromQueueDoesNothingIfThresholdNotReached() throws Exception

IndexEntryUpdate<?> update1 = add( 1, index1.schema(), "foo" );
IndexEntryUpdate<?> update2 = add( 2, index1.schema(), "bar" );
batchingPopulator.queue( update1 );
batchingPopulator.queue( update2 );
batchingPopulator.queueUpdate( update1 );
batchingPopulator.queueUpdate( update2 );

batchingPopulator.populateFromQueueBatched( 42 );
batchingPopulator.populateFromUpdateQueueBatched( 42 );

verify( updater, never() ).process( any() );
verify( populator, never() ).newPopulatingUpdater( any() );
Expand Down Expand Up @@ -131,11 +131,11 @@ public void populateFromQueuePopulatesWhenThresholdReached() throws Exception
IndexEntryUpdate<?> update1 = add( 1, index1.schema(), "foo" );
IndexEntryUpdate<?> update2 = add( 2, index42.schema(), "bar" );
IndexEntryUpdate<?> update3 = add( 3, index1.schema(), "baz" );
batchingPopulator.queue( update1 );
batchingPopulator.queue( update2 );
batchingPopulator.queue( update3 );
batchingPopulator.queueUpdate( update1 );
batchingPopulator.queueUpdate( update2 );
batchingPopulator.queueUpdate( update3 );

batchingPopulator.populateFromQueue( 42 );
batchingPopulator.populateFromUpdateQueue( 42 );

verify( updater1 ).process( update1 );
verify( updater1 ).process( update3 );
Expand Down
Expand Up @@ -26,7 +26,6 @@
import org.neo4j.helpers.collection.Visitor;
import org.neo4j.internal.kernel.api.IndexCapability;
import org.neo4j.internal.kernel.api.InternalIndexState;
import org.neo4j.kernel.api.exceptions.index.ExceptionDuringFlipKernelException;
import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;
import org.neo4j.kernel.api.index.IndexAccessor;
import org.neo4j.kernel.api.index.IndexEntryUpdate;
Expand All @@ -42,7 +41,6 @@
import org.neo4j.values.storable.Values;

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class IndexPopulationTest
{
Expand All @@ -60,7 +58,7 @@ public void mustFlipToFailedIfFailureToApplyLastBatchWhileFlipping() throws Exce
MultipleIndexPopulator multipleIndexPopulator = new MultipleIndexPopulator( storeView, logProvider );
MultipleIndexPopulator.IndexPopulation indexPopulation =
multipleIndexPopulator.addPopulator( populator, 0, dummyMeta(), flipper, t -> failedProxy, "userDescription" );
multipleIndexPopulator.queue( someUpdate() );
multipleIndexPopulator.queueUpdate( someUpdate() );
multipleIndexPopulator.indexAllNodes().run();

// when
Expand Down
Expand Up @@ -146,7 +146,7 @@ public void receive( NodeRecord nodeRecord )
{
if ( nodeRecord.getId() == 7 )
{
indexPopulator.queue( IndexEntryUpdate.change( 8L, index, Values.of( "a" ), Values.of( "b" ) ) );
indexPopulator.queueUpdate( IndexEntryUpdate.change( 8L, index, Values.of( "a" ), Values.of( "b" ) ) );
}
}
}
Expand Down

0 comments on commit 2f8a11c

Please sign in to comment.