Skip to content

Commit

Permalink
Constraint index violation won't fail BatchInserter#shutdown()
Browse files Browse the repository at this point in the history
In BatchInserter the label index and created indexes are populated
when calling shutdown(). Previously this was done single-threaded
and with some home-grown code that was similar to what
IndexingService would do and would also fail shutdown(), i.e.
throw exception from it if there was an index entry conflict
in a uniqueness index. This would leave the database, and indexes
especially in an arbitrarily undefined state and would be
confusing to user making the insert. Was the insert successful
or not, should it be performed from scratch again?

This has been changed to instead instantiate an IndexingService
and let it populate the indexes in a multi-threaded fashion.
Any failed indexes and would neither affect other indexes nor
shutdown() itself. Such failed indexes will instead simply be
marked as FAILED and user can decide what to do about it.

Fixes #10738
  • Loading branch information
tinwelint committed Jul 31, 2018
1 parent ecd650c commit 4f3742c
Show file tree
Hide file tree
Showing 9 changed files with 174 additions and 237 deletions.
Expand Up @@ -712,7 +712,7 @@ private IndexPopulationJob newIndexPopulationJob( FailedIndexProxyFactory failur
flipper.setFlipTarget( mock( IndexProxyFactory.class ) );

MultipleIndexPopulator multiPopulator = new MultipleIndexPopulator( storeView, logProvider, type, stateHolder );
IndexPopulationJob job = new IndexPopulationJob( multiPopulator, NO_MONITOR );
IndexPopulationJob job = new IndexPopulationJob( multiPopulator, NO_MONITOR, false );
job.addPopulator( populator, descriptor.withId( indexId ).withoutCapabilities(),
format( ":%s(%s)", FIRST.name(), name ), flipper, failureDelegateFactory );
return job;
Expand Down
Expand Up @@ -41,7 +41,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.neo4j.graphdb.ConstraintViolationException;
Expand All @@ -61,7 +60,7 @@
import org.neo4j.helpers.collection.Pair;
import org.neo4j.index.internal.gbptree.RecoveryCleanupWorkCollector;
import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException;
import org.neo4j.kernel.api.index.IndexAccessor;
import org.neo4j.kernel.api.index.IndexPopulator;
import org.neo4j.kernel.api.index.IndexProvider;
import org.neo4j.kernel.api.index.NodePropertyAccessor;
Expand All @@ -87,6 +86,7 @@
import org.neo4j.kernel.impl.store.record.RecordLoad;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.storageengine.api.schema.IndexSample;
import org.neo4j.storageengine.api.schema.LabelScanReader;
import org.neo4j.storageengine.api.schema.SchemaRule;
import org.neo4j.test.TestGraphDatabaseFactory;
Expand All @@ -101,6 +101,7 @@
import static java.lang.Integer.parseInt;
import static java.lang.String.format;
import static java.util.Collections.singletonList;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.arrayContaining;
import static org.hamcrest.Matchers.emptyArray;
Expand All @@ -114,7 +115,6 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static org.mockito.hamcrest.MockitoHamcrest.argThat;
import static org.neo4j.graphdb.Label.label;
Expand Down Expand Up @@ -876,10 +876,12 @@ public void shouldRunIndexPopulationJobAtShutdown() throws Throwable
// GIVEN
IndexPopulator populator = mock( IndexPopulator.class );
IndexProvider provider = mock( IndexProvider.class );
IndexAccessor accessor = mock( IndexAccessor.class );

when( provider.getProviderDescriptor() ).thenReturn( DESCRIPTOR );
when( provider.getPopulator( any( StoreIndexDescriptor.class ), any( IndexSamplingConfig.class ) ) )
.thenReturn( populator );
when( provider.getPopulator( any( StoreIndexDescriptor.class ), any( IndexSamplingConfig.class ) ) ).thenReturn( populator );
when( populator.sampleResult() ).thenReturn( new IndexSample() );
when( provider.getOnlineAccessor( any( StoreIndexDescriptor.class ), any( IndexSamplingConfig.class ) ) ).thenReturn( accessor );

BatchInserter inserter = newBatchInserterWithIndexProvider(
singleInstanceIndexProviderFactory( KEY, provider ) );
Expand All @@ -896,13 +898,11 @@ public void shouldRunIndexPopulationJobAtShutdown() throws Throwable
verify( provider ).start();
verify( provider ).getPopulator( any( StoreIndexDescriptor.class ), any( IndexSamplingConfig.class ) );
verify( populator ).create();
verify( populator ).add( argThat( matchesCollection( add( nodeId, internalIndex.schema(),
Values.of( "Jakewins" ) ) ) ) );
verify( populator ).add( argThat( matchesCollection( add( nodeId, internalIndex.schema(), Values.of( "Jakewins" ) ) ) ) );
verify( populator ).verifyDeferredConstraints( any( NodePropertyAccessor.class ) );
verify( populator ).close( true );
verify( provider ).stop();
verify( provider ).shutdown();
verifyNoMoreInteractions( populator );
}

@Test
Expand All @@ -911,10 +911,12 @@ public void shouldRunConstraintPopulationJobAtShutdown() throws Throwable
// GIVEN
IndexPopulator populator = mock( IndexPopulator.class );
IndexProvider provider = mock( IndexProvider.class );
IndexAccessor accessor = mock( IndexAccessor.class );

when( provider.getProviderDescriptor() ).thenReturn( DESCRIPTOR );
when( provider.getPopulator( any( StoreIndexDescriptor.class ), any( IndexSamplingConfig.class ) ) )
.thenReturn( populator );
when( provider.getPopulator( any( StoreIndexDescriptor.class ), any( IndexSamplingConfig.class ) ) ).thenReturn( populator );
when( populator.sampleResult() ).thenReturn( new IndexSample() );
when( provider.getOnlineAccessor( any( StoreIndexDescriptor.class ), any( IndexSamplingConfig.class ) ) ).thenReturn( accessor );

BatchInserter inserter = newBatchInserterWithIndexProvider(
singleInstanceIndexProviderFactory( KEY, provider ) );
Expand All @@ -936,7 +938,6 @@ public void shouldRunConstraintPopulationJobAtShutdown() throws Throwable
verify( populator ).close( true );
verify( provider ).stop();
verify( provider ).shutdown();
verifyNoMoreInteractions( populator );
}

@Test
Expand All @@ -947,10 +948,12 @@ public void shouldRepopulatePreexistingIndexed() throws Throwable

IndexPopulator populator = mock( IndexPopulator.class );
IndexProvider provider = mock( IndexProvider.class );
IndexAccessor accessor = mock( IndexAccessor.class );

when( provider.getProviderDescriptor() ).thenReturn( DESCRIPTOR );
when( provider.getPopulator( any( StoreIndexDescriptor.class ), any( IndexSamplingConfig.class ) ) )
.thenReturn( populator );
when( provider.getPopulator( any( StoreIndexDescriptor.class ), any( IndexSamplingConfig.class ) ) ).thenReturn( populator );
when( populator.sampleResult() ).thenReturn( new IndexSample() );
when( provider.getOnlineAccessor( any( StoreIndexDescriptor.class ), any( IndexSamplingConfig.class ) ) ).thenReturn( accessor );

BatchInserter inserter = newBatchInserterWithIndexProvider(
singleInstanceIndexProviderFactory( KEY, provider ) );
Expand All @@ -972,7 +975,6 @@ public void shouldRepopulatePreexistingIndexed() throws Throwable
verify( populator ).close( true );
verify( provider ).stop();
verify( provider ).shutdown();
verifyNoMoreInteractions( populator );
}

@Test
Expand Down Expand Up @@ -1322,15 +1324,18 @@ public void uniquenessConstraintShouldBeCheckedOnBatchInserterShutdownAndFailIfV
inserter.createNode( Collections.singletonMap( property, value ), label );

// Then
try
GraphDatabaseService db = switchToEmbeddedGraphDatabaseService( inserter );
try ( Transaction tx = db.beginTx() )
{
inserter.shutdown();
fail( "Node that violates uniqueness constraint was created by batch inserter" );
IndexDefinition index = db.schema().getIndexes( label ).iterator().next();
String indexFailure = db.schema().getIndexFailure( index );
assertThat( indexFailure, containsString( "IndexEntryConflictException" ) );
assertThat( indexFailure, containsString( value ) );
tx.success();
}
catch ( RuntimeException ex )
finally
{
// good
assertEquals( new IndexEntryConflictException( 0, 1, Values.of( value ) ), ex.getCause() );
db.shutdown();
}
}

Expand Down Expand Up @@ -1399,18 +1404,10 @@ private GraphDatabaseService switchToEmbeddedGraphDatabaseService( BatchInserter
inserter.shutdown();
TestGraphDatabaseFactory factory = new TestGraphDatabaseFactory();
factory.setFileSystem( fileSystemRule.get() );
GraphDatabaseService db = factory.newImpermanentDatabaseBuilder( localTestDirectory.storeDir() )
return factory.newImpermanentDatabaseBuilder( localTestDirectory.storeDir() )
// Shouldn't be necessary to set dense node threshold since it's a stick config
.setConfig( configuration() )
.newGraphDatabase();

try ( Transaction tx = db.beginTx() )
{
db.schema().awaitIndexesOnline( 10, TimeUnit.SECONDS );
tx.success();
}

return db;
}

private LabelScanStore getLabelScanStore()
Expand Down
Expand Up @@ -51,7 +51,7 @@
* updates are inserted in the queue. When store scan notices that queue size has reached {@link #QUEUE_THRESHOLD} than
* it drains all batched updates and waits for all submitted to the executor tasks to complete and flushes updates from
* the queue using {@link MultipleIndexUpdater}. If queue size never reaches {@link #QUEUE_THRESHOLD} than all queued
* concurrent updates are flushed after the store scan in {@link #flipAfterPopulation()}.
* concurrent updates are flushed after the store scan in {@link MultipleIndexPopulator#flipAfterPopulation(boolean)}.
* <p>
* Inner {@link ExecutorService executor} is shut down after the store scan completes.
*/
Expand Down
Expand Up @@ -27,7 +27,6 @@
import org.neo4j.kernel.api.index.IndexEntryUpdate;
import org.neo4j.kernel.api.index.IndexPopulator;
import org.neo4j.kernel.api.schema.index.CapableIndexDescriptor;
import org.neo4j.kernel.impl.api.SchemaState;
import org.neo4j.storageengine.api.schema.PopulationProgress;

import static java.lang.Thread.currentThread;
Expand All @@ -42,16 +41,18 @@
public class IndexPopulationJob implements Runnable
{
private final IndexingService.Monitor monitor;
private final boolean verifyBeforeFlipping;
private final MultipleIndexPopulator multiPopulator;
private final CountDownLatch doneSignal = new CountDownLatch( 1 );

private volatile StoreScan<IndexPopulationFailedKernelException> storeScan;
private volatile boolean cancelled;

public IndexPopulationJob( MultipleIndexPopulator multiPopulator, IndexingService.Monitor monitor )
public IndexPopulationJob( MultipleIndexPopulator multiPopulator, IndexingService.Monitor monitor, boolean verifyBeforeFlipping )
{
this.multiPopulator = multiPopulator;
this.monitor = monitor;
this.verifyBeforeFlipping = verifyBeforeFlipping;
}

/**
Expand Down Expand Up @@ -106,7 +107,7 @@ public void run()
// We remain in POPULATING state
return;
}
multiPopulator.flipAfterPopulation();
multiPopulator.flipAfterPopulation( verifyBeforeFlipping );
}
catch ( Throwable t )
{
Expand Down
Expand Up @@ -52,8 +52,10 @@
import org.neo4j.kernel.api.exceptions.index.IndexPopulationFailedKernelException;
import org.neo4j.kernel.api.exceptions.schema.UniquePropertyValueValidationException;
import org.neo4j.kernel.api.index.IndexEntryUpdate;
import org.neo4j.kernel.api.index.IndexPopulator;
import org.neo4j.kernel.api.index.IndexProvider;
import org.neo4j.kernel.api.index.IndexUpdater;
import org.neo4j.kernel.api.index.NodePropertyAccessor;
import org.neo4j.kernel.api.schema.index.IndexDescriptor;
import org.neo4j.kernel.api.schema.index.StoreIndexDescriptor;
import org.neo4j.kernel.impl.api.SchemaState;
Expand Down Expand Up @@ -345,7 +347,7 @@ public void populateIndexesOfAllTypes( MutableLongObjectMap<StoreIndexDescriptor

for ( Map.Entry<EntityType,MutableLongObjectMap<StoreIndexDescriptor>> descriptorToPopulate : rebuildingDescriptorsByType.entrySet() )
{
IndexPopulationJob populationJob = newIndexPopulationJob( descriptorToPopulate.getKey() );
IndexPopulationJob populationJob = newIndexPopulationJob( descriptorToPopulate.getKey(), false );
populate( descriptorToPopulate.getValue(), indexMap, populationJob );
}
}
Expand Down Expand Up @@ -537,10 +539,28 @@ public Iterable<IndexEntryUpdate<SchemaDescriptor>> convertToIndexUpdates( Entit
* This code is called from the transaction infrastructure during transaction commits, which means that
* it is *vital* that it is stable, and handles errors very well. Failing here means that the entire db
* will shut down.
*
* {@link IndexPopulator#verifyDeferredConstraints(NodePropertyAccessor)} will not be called as part of populating these indexes,
* instead that will be done by code that activates the indexes later.
*/
public void createIndexes( StoreIndexDescriptor... rules )
{
createIndexes( false, rules );
}

/**
* Creates one or more indexes. They will all be populated by one and the same store scan.
*
* This code is called from the transaction infrastructure during transaction commits, which means that
* it is *vital* that it is stable, and handles errors very well. Failing here means that the entire db
* will shut down.
*
* @param verifyBeforeFlipping whether or not to call {@link IndexPopulator#verifyDeferredConstraints(NodePropertyAccessor)}
* as part of population, before flipping to a successful state.
*/
public void createIndexes( StoreIndexDescriptor... rules ) throws IOException
public void createIndexes( boolean verifyBeforeFlipping, StoreIndexDescriptor... rules )
{
IndexPopulationStarter populationStarter = new IndexPopulationStarter( rules );
IndexPopulationStarter populationStarter = new IndexPopulationStarter( verifyBeforeFlipping, rules );
indexMapRef.modify( populationStarter );
populationStarter.startPopulation();
}
Expand Down Expand Up @@ -713,10 +733,10 @@ public ResourceIterator<File> snapshotIndexFiles() throws IOException
return Iterators.concatResourceIterators( snapshots.iterator() );
}

private IndexPopulationJob newIndexPopulationJob( EntityType type )
private IndexPopulationJob newIndexPopulationJob( EntityType type, boolean verifyBeforeFlipping )
{
MultipleIndexPopulator multiPopulator = multiPopulatorFactory.create( storeView, logProvider, type, schemaState );
return new IndexPopulationJob( multiPopulator, monitor );
return new IndexPopulationJob( multiPopulator, monitor, verifyBeforeFlipping );
}

private void startIndexPopulation( IndexPopulationJob job )
Expand Down Expand Up @@ -761,12 +781,14 @@ private void logIndexStateSummary( String method, Map<InternalIndexState,List<In

private final class IndexPopulationStarter implements Function<IndexMap,IndexMap>
{
private final boolean verifyBeforeFlipping;
private final StoreIndexDescriptor[] descriptors;
private IndexPopulationJob nodePopulationJob;
private IndexPopulationJob relationshipPopulationJob;

IndexPopulationStarter( StoreIndexDescriptor[] descriptors )
IndexPopulationStarter( boolean verifyBeforeFlipping, StoreIndexDescriptor[] descriptors )
{
this.verifyBeforeFlipping = verifyBeforeFlipping;
this.descriptors = descriptors;
}

Expand Down Expand Up @@ -805,15 +827,15 @@ public IndexMap apply( IndexMap indexMap )
{
if ( descriptor.schema().entityType() == EntityType.NODE )
{
nodePopulationJob = nodePopulationJob == null ? newIndexPopulationJob( EntityType.NODE ) : nodePopulationJob;
nodePopulationJob = nodePopulationJob == null ? newIndexPopulationJob( EntityType.NODE, verifyBeforeFlipping ) : nodePopulationJob;
index = indexProxyCreator.createPopulatingIndexProxy( descriptor, flipToTentative, monitor,
nodePopulationJob );
index.start();
}
else
{
relationshipPopulationJob =
relationshipPopulationJob == null ? newIndexPopulationJob( EntityType.RELATIONSHIP ) : relationshipPopulationJob;
relationshipPopulationJob = relationshipPopulationJob == null ? newIndexPopulationJob( EntityType.RELATIONSHIP, verifyBeforeFlipping )
: relationshipPopulationJob;
index = indexProxyCreator.createPopulatingIndexProxy( descriptor, flipToTentative, monitor,
relationshipPopulationJob );
index.start();
Expand Down
Expand Up @@ -82,7 +82,7 @@
* <li>Call to {@link #create()} to create data structures and files to start accepting updates.</li>
* <li>Call to {@link #indexAllEntities()} (blocking call).</li>
* <li>While all nodes are being indexed, calls to {@link #queueUpdate(IndexEntryUpdate)} are accepted.</li>
* <li>Call to {@link #flipAfterPopulation()} after successful population, or {@link #fail(Throwable)} if not</li>
* <li>Call to {@link #flipAfterPopulation(boolean)} after successful population, or {@link #fail(Throwable)} if not</li>
* </ol>
*/
public class MultipleIndexPopulator implements IndexPopulator
Expand Down Expand Up @@ -304,13 +304,13 @@ private void resetIndexCountsForPopulation( IndexPopulation indexPopulation )
storeView.replaceIndexCounts( indexPopulation.indexId, 0, 0, 0 );
}

void flipAfterPopulation()
void flipAfterPopulation( boolean verifyBeforeFlipping )
{
for ( IndexPopulation population : populations )
{
try
{
population.flip();
population.flip( verifyBeforeFlipping );
}
catch ( Throwable t )
{
Expand Down Expand Up @@ -564,7 +564,7 @@ private void onUpdate( IndexEntryUpdate<?> update )
}
}

void flip() throws FlipFailedKernelException
void flip( boolean verifyBeforeFlipping ) throws FlipFailedKernelException
{
flipper.flip( () ->
{
Expand All @@ -577,6 +577,10 @@ void flip() throws FlipFailedKernelException
populateFromUpdateQueueIfAvailable( Long.MAX_VALUE );
if ( populations.contains( IndexPopulation.this ) )
{
if ( verifyBeforeFlipping )
{
populator.verifyDeferredConstraints( storeView );
}
IndexSample sample = populator.sampleResult();
storeView.replaceIndexCounts( indexId, sample.uniqueValues(), sample.sampleSize(), sample.indexSize() );
populator.close( true );
Expand Down

0 comments on commit 4f3742c

Please sign in to comment.