Skip to content

Commit

Permalink
Index population applies all concurrent updates
Browse files Browse the repository at this point in the history
=== Main issue ===
Regardless of where the population scan is currently at.

Previously the index population scan would now and then digress
from its normal scanning and apply updates that had trickled in
from concurrent transactions while the population was running.
While going through those updates it would ignore those that were
for an entity id that the scan hadn't already visited, and i.e.
would visit later and apply then instead. This was a merely a
performance optimization, but a flawed one when considering how
a label scan works, which is what drives an index population scan.
A label scan reader reads bit-sets of size 64, loops through those
bits, one per entity id and then moves on to read the next.
Problem arises when a concurrent creation happens right in front of
the scan, but inside the same bit-set range as the label scan is
currently at. If the timing was such that, at this time the population
would decide to apply external updates this would result in this
particular update being skipped AND the label scan missing it.

The result would be an index that was inconsistent with the store,
even after a successfully population and ONLINE index.
A consistency-checker run would also have pointed that out to you.

Fix is to err on the safe side and always apply external updates,
even if they are in front of the scan. Simpler is better.
The down side is that the index population will have to do more work
if there are concurrent transactions going on which affect the index.
Comparing to previously then statistically twice as many
external updates will be applied for an index population now.

=== Another related issue ===
There was another problem with applying updates right at the end
of the population where the executor would be shut down before being
able to apply those final updates and so they would also be missed.
This has also been fixed.
  • Loading branch information
tinwelint committed Aug 29, 2018
1 parent e5b6c63 commit 895fee8
Show file tree
Hide file tree
Showing 12 changed files with 347 additions and 149 deletions.
Expand Up @@ -33,7 +33,6 @@


import org.neo4j.function.Predicates; import org.neo4j.function.Predicates;
import org.neo4j.helpers.Exceptions; import org.neo4j.helpers.Exceptions;
import org.neo4j.kernel.api.exceptions.index.IndexPopulationFailedKernelException;
import org.neo4j.kernel.api.index.IndexEntryUpdate; import org.neo4j.kernel.api.index.IndexEntryUpdate;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;
import org.neo4j.unsafe.impl.internal.dragons.FeatureToggles; import org.neo4j.unsafe.impl.internal.dragons.FeatureToggles;
Expand Down Expand Up @@ -98,21 +97,6 @@ public class BatchingMultipleIndexPopulator extends MultipleIndexPopulator
this.executor = executor; this.executor = executor;
} }


@Override
public StoreScan<IndexPopulationFailedKernelException> indexAllNodes()
{
StoreScan<IndexPopulationFailedKernelException> storeScan = super.indexAllNodes();
return new BatchingStoreScan<>( storeScan );
}

@Override
protected void populateFromQueue( long currentlyIndexedNodeId )
{
log.debug( "Populating from queue." + EOL + this );
super.populateFromQueue( currentlyIndexedNodeId );
log.debug( "Drained queue and all batched updates." + EOL + this );
}

@Override @Override
protected void flushAll() protected void flushAll()
{ {
Expand Down Expand Up @@ -182,6 +166,13 @@ protected void flush( IndexPopulation population )
} ); } );
} }


@Override
public void close( boolean populationCompletedSuccessfully )
{
super.close( populationCompletedSuccessfully );
shutdownExecutor( !populationCompletedSuccessfully );
}

/** /**
* Shuts down the executor waiting {@link #AWAIT_TIMEOUT_MINUTES} minutes for it's termination. * Shuts down the executor waiting {@link #AWAIT_TIMEOUT_MINUTES} minutes for it's termination.
* Restores the interrupted status and exits normally when interrupted during waiting. * Restores the interrupted status and exits normally when interrupted during waiting.
Expand Down Expand Up @@ -262,43 +253,4 @@ private int getNumberOfPopulationWorkers()
{ {
return Math.max( 2, MAXIMUM_NUMBER_OF_WORKERS ); return Math.max( 2, MAXIMUM_NUMBER_OF_WORKERS );
} }

/**
* A delegating {@link StoreScan} implementation that flushes all pending updates and terminates the executor after
* the delegate store scan completes.
*
* @param <E> type of the exception this store scan might get.
*/
private class BatchingStoreScan<E extends Exception> extends DelegatingStoreScan<E>
{
BatchingStoreScan( StoreScan<E> delegate )
{
super( delegate );
}

@Override
public void run() throws E
{
try
{
super.run();
log.info( "Completed node store scan. " +
"Flushing all pending updates." + EOL + BatchingMultipleIndexPopulator.this );
flushAll();
}
catch ( Throwable scanError )
{
try
{
shutdownExecutor( true );
}
catch ( Throwable error )
{
scanError.addSuppressed( error );
}
throw scanError;
}
shutdownExecutor( false );
}
}
} }
Expand Up @@ -24,7 +24,6 @@
import org.neo4j.collection.primitive.PrimitiveIntSet; import org.neo4j.collection.primitive.PrimitiveIntSet;
import org.neo4j.helpers.collection.Visitor; import org.neo4j.helpers.collection.Visitor;
import org.neo4j.kernel.api.exceptions.EntityNotFoundException; import org.neo4j.kernel.api.exceptions.EntityNotFoundException;
import org.neo4j.kernel.api.index.IndexEntryUpdate;
import org.neo4j.kernel.api.index.PropertyAccessor; import org.neo4j.kernel.api.index.PropertyAccessor;
import org.neo4j.kernel.api.labelscan.NodeLabelUpdate; import org.neo4j.kernel.api.labelscan.NodeLabelUpdate;
import org.neo4j.register.Register.DoubleLongRegister; import org.neo4j.register.Register.DoubleLongRegister;
Expand Down Expand Up @@ -84,12 +83,6 @@ public void stop()
{ {
} }


@Override
public void acceptUpdate( MultipleIndexPopulator.MultipleIndexUpdater updater, IndexEntryUpdate update,
long currentlyIndexedNodeId )
{
}

@Override @Override
public PopulationProgress getProgress() public PopulationProgress getProgress()
{ {
Expand Down
Expand Up @@ -334,22 +334,9 @@ public void cancel()
close( false ); close( false );
} }


void populateFromQueueBatched( long currentlyIndexedNodeId ) void populateFromQueueBatched()
{ {
if ( isQueueThresholdReached() ) populateFromQueue( QUEUE_THRESHOLD );
{
populateFromQueue( currentlyIndexedNodeId );
}
}

private boolean isQueueThresholdReached()
{
return queue.size() >= QUEUE_THRESHOLD;
}

protected void populateFromQueue( long currentlyIndexedNodeId )
{
populateFromQueueIfAvailable( currentlyIndexedNodeId );
} }


protected void flushAll() protected void flushAll()
Expand All @@ -369,9 +356,13 @@ protected void flush( IndexPopulation population )
} }
} }


private void populateFromQueueIfAvailable( long currentlyIndexedNodeId ) /**
* Populates external updates from the update queue if there are {@code queueThreshold} or more queued updates.
*/
protected void populateFromQueue( int queueThreshold )
{ {
if ( !queue.isEmpty() ) int queueSize = queue.size();
if ( queueSize > 0 && queueSize >= queueThreshold )
{ {
// Before applying updates from the updates queue any pending scan updates needs to be applied, i.e. flushed. // Before applying updates from the updates queue any pending scan updates needs to be applied, i.e. flushed.
// This is because 'currentlyIndexedNodeId' is based on how far the scan has come. // This is because 'currentlyIndexedNodeId' is based on how far the scan has come.
Expand All @@ -381,9 +372,7 @@ private void populateFromQueueIfAvailable( long currentlyIndexedNodeId )
{ {
do do
{ {
// no need to check for null as nobody else is emptying this queue updater.process( queue.poll() );
IndexEntryUpdate<?> update = queue.poll();
storeScan.acceptUpdate( updater, update, currentlyIndexedNodeId );
} }
while ( !queue.isEmpty() ); while ( !queue.isEmpty() );
} }
Expand Down Expand Up @@ -523,7 +512,7 @@ private void flip() throws FlipFailedKernelException
flipper.flip( () -> flipper.flip( () ->
{ {
populator.add( takeCurrentBatch() ); populator.add( takeCurrentBatch() );
populateFromQueueIfAvailable( Long.MAX_VALUE ); populateFromQueue( 0 );
IndexSample sample = populator.sampleResult(); IndexSample sample = populator.sampleResult();
storeView.replaceIndexCounts( indexId, sample.uniqueValues(), sample.sampleSize(), storeView.replaceIndexCounts( indexId, sample.uniqueValues(), sample.sampleSize(),
sample.indexSize() ); sample.indexSize() );
Expand Down Expand Up @@ -566,10 +555,10 @@ private class NodePopulationVisitor implements Visitor<NodeUpdates,
IndexPopulationFailedKernelException> IndexPopulationFailedKernelException>
{ {
@Override @Override
public boolean visit( NodeUpdates updates ) throws IndexPopulationFailedKernelException public boolean visit( NodeUpdates updates )
{ {
add( updates ); add( updates );
populateFromQueueBatched( updates.getNodeId() ); populateFromQueueBatched();
return false; return false;
} }


Expand Down Expand Up @@ -605,12 +594,6 @@ public void stop()
delegate.stop(); delegate.stop();
} }


@Override
public void acceptUpdate( MultipleIndexUpdater updater, IndexEntryUpdate<?> update, long currentlyIndexedNodeId )
{
delegate.acceptUpdate( updater, update, currentlyIndexedNodeId );
}

@Override @Override
public PopulationProgress getProgress() public PopulationProgress getProgress()
{ {
Expand Down
Expand Up @@ -19,7 +19,6 @@
*/ */
package org.neo4j.kernel.impl.api.index; package org.neo4j.kernel.impl.api.index;


import org.neo4j.kernel.api.index.IndexEntryUpdate;
import org.neo4j.storageengine.api.schema.PopulationProgress; import org.neo4j.storageengine.api.schema.PopulationProgress;


public interface StoreScan<FAILURE extends Exception> public interface StoreScan<FAILURE extends Exception>
Expand All @@ -28,8 +27,5 @@ public interface StoreScan<FAILURE extends Exception>


void stop(); void stop();


void acceptUpdate( MultipleIndexPopulator.MultipleIndexUpdater updater, IndexEntryUpdate<?> update,
long currentlyIndexedNodeId );

PopulationProgress getProgress(); PopulationProgress getProgress();
} }
Expand Up @@ -27,9 +27,7 @@
import org.neo4j.collection.primitive.PrimitiveLongResourceIterator; import org.neo4j.collection.primitive.PrimitiveLongResourceIterator;
import org.neo4j.helpers.collection.PrefetchingIterator; import org.neo4j.helpers.collection.PrefetchingIterator;
import org.neo4j.helpers.collection.Visitor; import org.neo4j.helpers.collection.Visitor;
import org.neo4j.kernel.api.index.IndexEntryUpdate;
import org.neo4j.kernel.api.labelscan.NodeLabelUpdate; import org.neo4j.kernel.api.labelscan.NodeLabelUpdate;
import org.neo4j.kernel.impl.api.index.MultipleIndexPopulator;
import org.neo4j.kernel.impl.api.index.NodeUpdates; import org.neo4j.kernel.impl.api.index.NodeUpdates;
import org.neo4j.kernel.impl.locking.LockService; import org.neo4j.kernel.impl.locking.LockService;
import org.neo4j.kernel.impl.store.NodeStore; import org.neo4j.kernel.impl.store.NodeStore;
Expand Down Expand Up @@ -142,16 +140,6 @@ private static boolean containsAnyLabel( int[] labelIdFilter, long[] labels )
return false; return false;
} }


@Override
public void acceptUpdate( MultipleIndexPopulator.MultipleIndexUpdater updater, IndexEntryUpdate<?> update,
long currentlyIndexedNodeId )
{
if ( update.getEntityId() <= currentlyIndexedNodeId )
{
updater.process( update );
}
}

private class PropertyBlockIterator extends PrefetchingIterator<PropertyBlock> private class PropertyBlockIterator extends PrefetchingIterator<PropertyBlock>
{ {
private final Iterator<PropertyRecord> records; private final Iterator<PropertyRecord> records;
Expand Down
Expand Up @@ -73,7 +73,7 @@ public class BatchingMultipleIndexPopulatorTest
private final IndexDescriptor index42 = IndexDescriptorFactory.forLabel(42, 42); private final IndexDescriptor index42 = IndexDescriptorFactory.forLabel(42, 42);


@After @After
public void tearDown() throws Exception public void tearDown()
{ {
clearProperty( QUEUE_THRESHOLD_NAME ); clearProperty( QUEUE_THRESHOLD_NAME );
clearProperty( TASK_QUEUE_SIZE_NAME ); clearProperty( TASK_QUEUE_SIZE_NAME );
Expand All @@ -98,7 +98,7 @@ public void populateFromQueueDoesNothingIfThresholdNotReached() throws Exception
batchingPopulator.queue( update1 ); batchingPopulator.queue( update1 );
batchingPopulator.queue( update2 ); batchingPopulator.queue( update2 );


batchingPopulator.populateFromQueueBatched( 42 ); batchingPopulator.populateFromQueueBatched();


verify( updater, never() ).process( any() ); verify( updater, never() ).process( any() );
verify( populator, never() ).newPopulatingUpdater( any() ); verify( populator, never() ).newPopulatingUpdater( any() );
Expand Down Expand Up @@ -134,7 +134,7 @@ public void populateFromQueuePopulatesWhenThresholdReached() throws Exception
batchingPopulator.queue( update2 ); batchingPopulator.queue( update2 );
batchingPopulator.queue( update3 ); batchingPopulator.queue( update3 );


batchingPopulator.populateFromQueue( 42 ); batchingPopulator.populateFromQueueBatched();


verify( updater1 ).process( update1 ); verify( updater1 ).process( update1 );
verify( updater1 ).process( update3 ); verify( updater1 ).process( update3 );
Expand All @@ -157,6 +157,9 @@ public void executorShutdownAfterStoreScanCompletes() throws Exception
verify( executor, never() ).shutdown(); verify( executor, never() ).shutdown();


storeScan.run(); storeScan.run();
verify( executor, never() ).shutdown();
verify( executor, never() ).awaitTermination( anyLong(), any() );
batchingPopulator.close( true );
verify( executor ).shutdown(); verify( executor ).shutdown();
verify( executor ).awaitTermination( anyLong(), any() ); verify( executor ).awaitTermination( anyLong(), any() );
} }
Expand Down Expand Up @@ -190,6 +193,9 @@ public void executorForcefullyShutdownIfStoreScanFails() throws Exception
assertSame( scanError, t ); assertSame( scanError, t );
} }


verify( executor, never() ).shutdown();
verify( executor, never() ).awaitTermination( anyLong(), any() );
batchingPopulator.close( false );
verify( executor ).shutdownNow(); verify( executor ).shutdownNow();
verify( executor ).awaitTermination( anyLong(), any() ); verify( executor ).awaitTermination( anyLong(), any() );
} }
Expand Down Expand Up @@ -435,12 +441,6 @@ public void stop()
stop = true; stop = true;
} }


@Override
public void acceptUpdate( MultipleIndexPopulator.MultipleIndexUpdater updater, IndexEntryUpdate<?> update,
long currentlyIndexedNodeId )
{
}

@Override @Override
public PopulationProgress getProgress() public PopulationProgress getProgress()
{ {
Expand Down
Expand Up @@ -428,13 +428,6 @@ public void stop()
latch.finish(); latch.finish();
} }


@Override
public void acceptUpdate( MultipleIndexPopulator.MultipleIndexUpdater updater, IndexEntryUpdate<?> update,
long currentlyIndexedNodeId )
{
// no-op
}

@Override @Override
public PopulationProgress getProgress() public PopulationProgress getProgress()
{ {
Expand Down

0 comments on commit 895fee8

Please sign in to comment.