Skip to content

Commit

Permalink
Merge pull request #11280 from MishaDemianenko/3.4-index-population-c…
Browse files Browse the repository at this point in the history
…ancel

Guard index create/flip/cancel with a lock to prevent concurrent execution of those
  • Loading branch information
MishaDemianenko committed Mar 16, 2018
2 parents a27c124 + a19bdc0 commit a1071de
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import java.util.Queue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.IntPredicate;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -141,7 +141,7 @@ public void create()
forEachPopulation( population ->
{
log.info( "Index population started: [%s]", population.indexUserDescription );
population.populator.create();
population.create();
} );
}

Expand Down Expand Up @@ -206,9 +206,7 @@ protected void fail( IndexPopulation population, Throwable failure )
return;
}

// If the cause of index population failure is a conflict in a (unique) index, the conflict is the
// failure
// TODO do we need this?
// If the cause of index population failure is a conflict in a (unique) index, the conflict is the failure
if ( failure instanceof IndexPopulationFailedKernelException )
{
Throwable cause = failure.getCause();
Expand All @@ -219,7 +217,7 @@ protected void fail( IndexPopulation population, Throwable failure )
}

// Index conflicts are expected (for unique indexes) so we don't need to log them.
if ( !(failure instanceof IndexEntryConflictException) /*TODO: && this is a unique index...*/ )
if ( !(failure instanceof IndexEntryConflictException) )
{
log.error( format( "Failed to populate index: [%s]", population.indexUserDescription ), failure );
}
Expand Down Expand Up @@ -303,11 +301,7 @@ void flipAfterPopulation()
{
try
{
if ( population.markCompleted() )
{
population.flip();
removeFromOngoingPopulations( population );
}
population.flip();
}
catch ( Throwable t )
{
Expand Down Expand Up @@ -338,18 +332,13 @@ public void cancel()

void cancelIndexPopulation( IndexPopulation indexPopulation )
{
if ( indexPopulation.markCompleted() )
try
{
try
{
resetIndexCountsForPopulation( indexPopulation );
indexPopulation.populator.close( false );
removeFromOngoingPopulations( indexPopulation );
}
catch ( IOException e )
{
fail( indexPopulation, e );
}
indexPopulation.cancel();
}
catch ( IOException e )
{
fail( indexPopulation, e );
}
}

Expand Down Expand Up @@ -498,7 +487,8 @@ public class IndexPopulation implements LabelSchemaSupplier
private final IndexCountsRemover indexCountsRemover;
private final FailedIndexProxyFactory failedIndexProxyFactory;
private final String indexUserDescription;
private final AtomicBoolean completedPopulation = new AtomicBoolean();
private boolean populationOngoing = true;
private final ReentrantLock populatorLock = new ReentrantLock();

List<IndexEntryUpdate<?>> batchedUpdates;

Expand All @@ -525,9 +515,39 @@ private void flipToFailed( IndexPopulationFailure failure )
flipper.flipTo( new FailedIndexProxy( indexMeta, indexUserDescription, populator, failure, indexCountsRemover, logProvider ) );
}

boolean markCompleted()
void create() throws IOException
{
populatorLock.lock();
try
{
if ( populationOngoing )
{
populator.create();
}
}
finally
{
populatorLock.unlock();
}
}

void cancel() throws IOException
{
return completedPopulation.compareAndSet( false, true );
populatorLock.lock();
try
{
if ( populationOngoing )
{
populator.close( false );
resetIndexCountsForPopulation( this );
removeFromOngoingPopulations( this );
populationOngoing = false;
}
}
finally
{
populatorLock.unlock();
}
}

private void onUpdate( IndexEntryUpdate<?> update )
Expand All @@ -539,24 +559,42 @@ private void onUpdate( IndexEntryUpdate<?> update )
}
}

private void flip() throws FlipFailedKernelException
void flip() throws FlipFailedKernelException
{
flipper.flip( () ->
populatorLock.lock();
try
{
populator.add( takeCurrentBatch() );
populateFromQueueIfAvailable( Long.MAX_VALUE );
IndexSample sample = populator.sampleResult();
storeView.replaceIndexCounts( indexId, sample.uniqueValues(), sample.sampleSize(),
sample.indexSize() );
if ( populations.contains( IndexPopulation.this ) )
if ( populationOngoing )
{
populator.close( true );
try
{
flipper.flip( () ->
{
populator.add( takeCurrentBatch() );
populateFromQueueIfAvailable( Long.MAX_VALUE );
IndexSample sample = populator.sampleResult();
storeView.replaceIndexCounts( indexId, sample.uniqueValues(), sample.sampleSize(), sample.indexSize() );
if ( populations.contains( IndexPopulation.this ) )
{
populator.close( true );
}
// else it has failed when applying the last updates from queue. This is done because a multi-populator
// may have multiple populators running and they should not affect each other
return null;
}, failedIndexProxyFactory );
log.info( "Index population completed. Index is now online: [%s]", indexUserDescription );
removeFromOngoingPopulations( this );
}
finally
{
populationOngoing = false;
}
}
// else it has failed when applying the last updates from queue. This is done because a multi-populator
// may have multiple populators running and they should not affect each other
return null;
}, failedIndexProxyFactory );
log.info( "Index population completed. Index is now online: [%s]", indexUserDescription );
}
finally
{
populatorLock.unlock();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;
import org.neo4j.kernel.api.index.IndexEntryUpdate;
import org.neo4j.kernel.api.index.IndexPopulator;
import org.neo4j.kernel.api.index.IndexProvider;
import org.neo4j.kernel.api.index.IndexUpdater;
import org.neo4j.kernel.api.index.PropertyAccessor;
import org.neo4j.kernel.api.index.IndexProvider;
import org.neo4j.kernel.api.schema.SchemaDescriptorFactory;
import org.neo4j.kernel.api.schema.index.SchemaIndexDescriptor;
import org.neo4j.kernel.api.schema.index.SchemaIndexDescriptorFactory;
Expand Down Expand Up @@ -79,14 +79,42 @@ public class MultipleIndexPopulatorTest
private MultipleIndexPopulator multipleIndexPopulator;

@Test
public void indexPopulationCompletionOnlyOnce()
public void canceledPopulationNotAbleToCreateNewIndex() throws IOException
{
IndexPopulator indexPopulator1 = createIndexPopulator();
IndexPopulation indexPopulation = addPopulator( indexPopulator1, 1 );
assertTrue( indexPopulation.markCompleted() );
assertFalse( indexPopulation.markCompleted() );
assertFalse( indexPopulation.markCompleted() );
assertFalse( indexPopulation.markCompleted() );
IndexPopulator populator = createIndexPopulator();
IndexPopulation indexPopulation = addPopulator( populator, 1 );

indexPopulation.cancel();

multipleIndexPopulator.create();

verify( populator, never() ).create();
}

@Test
public void canceledPopulationNotAbleToFlip() throws IOException, FlipFailedKernelException
{
IndexPopulator populator = createIndexPopulator();
IndexPopulation indexPopulation = addPopulator( populator, 1 );

indexPopulation.cancel();

indexPopulation.flip();

verify( indexPopulation.flipper, never() ).flip( any( Callable.class ), any( FailedIndexProxyFactory.class ) );
}

@Test
public void flippedPopulationAreNotCanceable() throws IOException, FlipFailedKernelException
{
IndexPopulator populator = createIndexPopulator();
IndexPopulation indexPopulation = addPopulator( populator, 1 );

indexPopulation.flip();

indexPopulation.cancel();

verify( indexPopulation.populator, never() ).close( false );
}

@Test
Expand Down Expand Up @@ -150,7 +178,6 @@ public void cancelingSinglePopulatorDoNotCancelAnyOther() throws IOException, Fl

multipleIndexPopulator.indexAllNodes();

assertFalse( populationToCancel.markCompleted() );
assertTrue( multipleIndexPopulator.hasPopulators() );

multipleIndexPopulator.flipAfterPopulation();
Expand All @@ -173,7 +200,6 @@ public void canceledPopulatorDoNotFlipWhenPopulationCompleted() throws FlipFaile

multipleIndexPopulator.indexAllNodes();

assertFalse( populationToCancel.markCompleted() );
assertTrue( multipleIndexPopulator.hasPopulators() );

multipleIndexPopulator.flipAfterPopulation();
Expand Down

0 comments on commit a1071de

Please sign in to comment.