From 4f3742c8bfb32c2f01c031f89bdf22cea180f18f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mattias=20Finn=C3=A9?= Date: Mon, 18 Jun 2018 19:26:45 +0200 Subject: [PATCH] Constraint index violation won't fail BatchInserter#shutdown() In BatchInserter the label index and created indexes are populated when calling shutdown(). Previously this was done single-threaded and with some home-grown code that was similar to what IndexingService would do and would also fail shutdown(), i.e. throw exception from it if there was an index entry conflict in a uniqueness index. This would leave the database, and indexes especially in an arbitrarily undefined state and would be confusing to user making the insert. Was the insert successful or not, should it be performed from scratch again? This has been changed to instead instantiate an IndexingService and let it populate the indexes in a multi-threaded fashion. Any failed indexes and would neither affect other indexes nor shutdown() itself. Such failed indexes will instead simply be marked as FAILED and user can decide what to do about it. Fixes #10738 --- .../api/index/IndexPopulationJobTest.java | 2 +- .../batchinsert/internal/BatchInsertTest.java | 55 ++-- .../index/BatchingMultipleIndexPopulator.java | 2 +- .../impl/api/index/IndexPopulationJob.java | 7 +- .../impl/api/index/IndexingService.java | 40 ++- .../api/index/MultipleIndexPopulator.java | 12 +- .../internal/BatchInserterImpl.java | 254 +++++------------- .../impl/api/index/IndexPopulationTest.java | 2 +- .../api/index/MultipleIndexPopulatorTest.java | 37 ++- 9 files changed, 174 insertions(+), 237 deletions(-) diff --git a/community/community-it/kernel-it/src/test/java/org/neo4j/kernel/impl/api/index/IndexPopulationJobTest.java b/community/community-it/kernel-it/src/test/java/org/neo4j/kernel/impl/api/index/IndexPopulationJobTest.java index 40c3a823235d9..7e5fab82e5d44 100644 --- a/community/community-it/kernel-it/src/test/java/org/neo4j/kernel/impl/api/index/IndexPopulationJobTest.java +++ b/community/community-it/kernel-it/src/test/java/org/neo4j/kernel/impl/api/index/IndexPopulationJobTest.java @@ -712,7 +712,7 @@ private IndexPopulationJob newIndexPopulationJob( FailedIndexProxyFactory failur flipper.setFlipTarget( mock( IndexProxyFactory.class ) ); MultipleIndexPopulator multiPopulator = new MultipleIndexPopulator( storeView, logProvider, type, stateHolder ); - IndexPopulationJob job = new IndexPopulationJob( multiPopulator, NO_MONITOR ); + IndexPopulationJob job = new IndexPopulationJob( multiPopulator, NO_MONITOR, false ); job.addPopulator( populator, descriptor.withId( indexId ).withoutCapabilities(), format( ":%s(%s)", FIRST.name(), name ), flipper, failureDelegateFactory ); return job; diff --git a/community/community-it/kernel-it/src/test/java/org/neo4j/unsafe/batchinsert/internal/BatchInsertTest.java b/community/community-it/kernel-it/src/test/java/org/neo4j/unsafe/batchinsert/internal/BatchInsertTest.java index e12296556293e..b92f6ec5f2e0b 100644 --- a/community/community-it/kernel-it/src/test/java/org/neo4j/unsafe/batchinsert/internal/BatchInsertTest.java +++ b/community/community-it/kernel-it/src/test/java/org/neo4j/unsafe/batchinsert/internal/BatchInsertTest.java @@ -41,7 +41,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.neo4j.graphdb.ConstraintViolationException; @@ -61,7 +60,7 @@ import org.neo4j.helpers.collection.Pair; import org.neo4j.index.internal.gbptree.RecoveryCleanupWorkCollector; import org.neo4j.io.fs.DefaultFileSystemAbstraction; -import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException; +import org.neo4j.kernel.api.index.IndexAccessor; import org.neo4j.kernel.api.index.IndexPopulator; import org.neo4j.kernel.api.index.IndexProvider; import org.neo4j.kernel.api.index.NodePropertyAccessor; @@ -87,6 +86,7 @@ import org.neo4j.kernel.impl.store.record.RecordLoad; import org.neo4j.kernel.internal.GraphDatabaseAPI; import org.neo4j.kernel.monitoring.Monitors; +import org.neo4j.storageengine.api.schema.IndexSample; import org.neo4j.storageengine.api.schema.LabelScanReader; import org.neo4j.storageengine.api.schema.SchemaRule; import org.neo4j.test.TestGraphDatabaseFactory; @@ -101,6 +101,7 @@ import static java.lang.Integer.parseInt; import static java.lang.String.format; import static java.util.Collections.singletonList; +import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.arrayContaining; import static org.hamcrest.Matchers.emptyArray; @@ -114,7 +115,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import static org.mockito.hamcrest.MockitoHamcrest.argThat; import static org.neo4j.graphdb.Label.label; @@ -876,10 +876,12 @@ public void shouldRunIndexPopulationJobAtShutdown() throws Throwable // GIVEN IndexPopulator populator = mock( IndexPopulator.class ); IndexProvider provider = mock( IndexProvider.class ); + IndexAccessor accessor = mock( IndexAccessor.class ); when( provider.getProviderDescriptor() ).thenReturn( DESCRIPTOR ); - when( provider.getPopulator( any( StoreIndexDescriptor.class ), any( IndexSamplingConfig.class ) ) ) - .thenReturn( populator ); + when( provider.getPopulator( any( StoreIndexDescriptor.class ), any( IndexSamplingConfig.class ) ) ).thenReturn( populator ); + when( populator.sampleResult() ).thenReturn( new IndexSample() ); + when( provider.getOnlineAccessor( any( StoreIndexDescriptor.class ), any( IndexSamplingConfig.class ) ) ).thenReturn( accessor ); BatchInserter inserter = newBatchInserterWithIndexProvider( singleInstanceIndexProviderFactory( KEY, provider ) ); @@ -896,13 +898,11 @@ public void shouldRunIndexPopulationJobAtShutdown() throws Throwable verify( provider ).start(); verify( provider ).getPopulator( any( StoreIndexDescriptor.class ), any( IndexSamplingConfig.class ) ); verify( populator ).create(); - verify( populator ).add( argThat( matchesCollection( add( nodeId, internalIndex.schema(), - Values.of( "Jakewins" ) ) ) ) ); + verify( populator ).add( argThat( matchesCollection( add( nodeId, internalIndex.schema(), Values.of( "Jakewins" ) ) ) ) ); verify( populator ).verifyDeferredConstraints( any( NodePropertyAccessor.class ) ); verify( populator ).close( true ); verify( provider ).stop(); verify( provider ).shutdown(); - verifyNoMoreInteractions( populator ); } @Test @@ -911,10 +911,12 @@ public void shouldRunConstraintPopulationJobAtShutdown() throws Throwable // GIVEN IndexPopulator populator = mock( IndexPopulator.class ); IndexProvider provider = mock( IndexProvider.class ); + IndexAccessor accessor = mock( IndexAccessor.class ); when( provider.getProviderDescriptor() ).thenReturn( DESCRIPTOR ); - when( provider.getPopulator( any( StoreIndexDescriptor.class ), any( IndexSamplingConfig.class ) ) ) - .thenReturn( populator ); + when( provider.getPopulator( any( StoreIndexDescriptor.class ), any( IndexSamplingConfig.class ) ) ).thenReturn( populator ); + when( populator.sampleResult() ).thenReturn( new IndexSample() ); + when( provider.getOnlineAccessor( any( StoreIndexDescriptor.class ), any( IndexSamplingConfig.class ) ) ).thenReturn( accessor ); BatchInserter inserter = newBatchInserterWithIndexProvider( singleInstanceIndexProviderFactory( KEY, provider ) ); @@ -936,7 +938,6 @@ public void shouldRunConstraintPopulationJobAtShutdown() throws Throwable verify( populator ).close( true ); verify( provider ).stop(); verify( provider ).shutdown(); - verifyNoMoreInteractions( populator ); } @Test @@ -947,10 +948,12 @@ public void shouldRepopulatePreexistingIndexed() throws Throwable IndexPopulator populator = mock( IndexPopulator.class ); IndexProvider provider = mock( IndexProvider.class ); + IndexAccessor accessor = mock( IndexAccessor.class ); when( provider.getProviderDescriptor() ).thenReturn( DESCRIPTOR ); - when( provider.getPopulator( any( StoreIndexDescriptor.class ), any( IndexSamplingConfig.class ) ) ) - .thenReturn( populator ); + when( provider.getPopulator( any( StoreIndexDescriptor.class ), any( IndexSamplingConfig.class ) ) ).thenReturn( populator ); + when( populator.sampleResult() ).thenReturn( new IndexSample() ); + when( provider.getOnlineAccessor( any( StoreIndexDescriptor.class ), any( IndexSamplingConfig.class ) ) ).thenReturn( accessor ); BatchInserter inserter = newBatchInserterWithIndexProvider( singleInstanceIndexProviderFactory( KEY, provider ) ); @@ -972,7 +975,6 @@ public void shouldRepopulatePreexistingIndexed() throws Throwable verify( populator ).close( true ); verify( provider ).stop(); verify( provider ).shutdown(); - verifyNoMoreInteractions( populator ); } @Test @@ -1322,15 +1324,18 @@ public void uniquenessConstraintShouldBeCheckedOnBatchInserterShutdownAndFailIfV inserter.createNode( Collections.singletonMap( property, value ), label ); // Then - try + GraphDatabaseService db = switchToEmbeddedGraphDatabaseService( inserter ); + try ( Transaction tx = db.beginTx() ) { - inserter.shutdown(); - fail( "Node that violates uniqueness constraint was created by batch inserter" ); + IndexDefinition index = db.schema().getIndexes( label ).iterator().next(); + String indexFailure = db.schema().getIndexFailure( index ); + assertThat( indexFailure, containsString( "IndexEntryConflictException" ) ); + assertThat( indexFailure, containsString( value ) ); + tx.success(); } - catch ( RuntimeException ex ) + finally { - // good - assertEquals( new IndexEntryConflictException( 0, 1, Values.of( value ) ), ex.getCause() ); + db.shutdown(); } } @@ -1399,18 +1404,10 @@ private GraphDatabaseService switchToEmbeddedGraphDatabaseService( BatchInserter inserter.shutdown(); TestGraphDatabaseFactory factory = new TestGraphDatabaseFactory(); factory.setFileSystem( fileSystemRule.get() ); - GraphDatabaseService db = factory.newImpermanentDatabaseBuilder( localTestDirectory.storeDir() ) + return factory.newImpermanentDatabaseBuilder( localTestDirectory.storeDir() ) // Shouldn't be necessary to set dense node threshold since it's a stick config .setConfig( configuration() ) .newGraphDatabase(); - - try ( Transaction tx = db.beginTx() ) - { - db.schema().awaitIndexesOnline( 10, TimeUnit.SECONDS ); - tx.success(); - } - - return db; } private LabelScanStore getLabelScanStore() diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/BatchingMultipleIndexPopulator.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/BatchingMultipleIndexPopulator.java index d50483bf08060..2570a9979d6f0 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/BatchingMultipleIndexPopulator.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/BatchingMultipleIndexPopulator.java @@ -51,7 +51,7 @@ * updates are inserted in the queue. When store scan notices that queue size has reached {@link #QUEUE_THRESHOLD} than * it drains all batched updates and waits for all submitted to the executor tasks to complete and flushes updates from * the queue using {@link MultipleIndexUpdater}. If queue size never reaches {@link #QUEUE_THRESHOLD} than all queued - * concurrent updates are flushed after the store scan in {@link #flipAfterPopulation()}. + * concurrent updates are flushed after the store scan in {@link MultipleIndexPopulator#flipAfterPopulation(boolean)}. *

* Inner {@link ExecutorService executor} is shut down after the store scan completes. */ diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/IndexPopulationJob.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/IndexPopulationJob.java index 3fc7a55fc2b8a..dc01e7850a58e 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/IndexPopulationJob.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/IndexPopulationJob.java @@ -27,7 +27,6 @@ import org.neo4j.kernel.api.index.IndexEntryUpdate; import org.neo4j.kernel.api.index.IndexPopulator; import org.neo4j.kernel.api.schema.index.CapableIndexDescriptor; -import org.neo4j.kernel.impl.api.SchemaState; import org.neo4j.storageengine.api.schema.PopulationProgress; import static java.lang.Thread.currentThread; @@ -42,16 +41,18 @@ public class IndexPopulationJob implements Runnable { private final IndexingService.Monitor monitor; + private final boolean verifyBeforeFlipping; private final MultipleIndexPopulator multiPopulator; private final CountDownLatch doneSignal = new CountDownLatch( 1 ); private volatile StoreScan storeScan; private volatile boolean cancelled; - public IndexPopulationJob( MultipleIndexPopulator multiPopulator, IndexingService.Monitor monitor ) + public IndexPopulationJob( MultipleIndexPopulator multiPopulator, IndexingService.Monitor monitor, boolean verifyBeforeFlipping ) { this.multiPopulator = multiPopulator; this.monitor = monitor; + this.verifyBeforeFlipping = verifyBeforeFlipping; } /** @@ -106,7 +107,7 @@ public void run() // We remain in POPULATING state return; } - multiPopulator.flipAfterPopulation(); + multiPopulator.flipAfterPopulation( verifyBeforeFlipping ); } catch ( Throwable t ) { diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/IndexingService.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/IndexingService.java index 08e220803839f..6619e2a70eccb 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/IndexingService.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/IndexingService.java @@ -52,8 +52,10 @@ import org.neo4j.kernel.api.exceptions.index.IndexPopulationFailedKernelException; import org.neo4j.kernel.api.exceptions.schema.UniquePropertyValueValidationException; import org.neo4j.kernel.api.index.IndexEntryUpdate; +import org.neo4j.kernel.api.index.IndexPopulator; import org.neo4j.kernel.api.index.IndexProvider; import org.neo4j.kernel.api.index.IndexUpdater; +import org.neo4j.kernel.api.index.NodePropertyAccessor; import org.neo4j.kernel.api.schema.index.IndexDescriptor; import org.neo4j.kernel.api.schema.index.StoreIndexDescriptor; import org.neo4j.kernel.impl.api.SchemaState; @@ -345,7 +347,7 @@ public void populateIndexesOfAllTypes( MutableLongObjectMap> descriptorToPopulate : rebuildingDescriptorsByType.entrySet() ) { - IndexPopulationJob populationJob = newIndexPopulationJob( descriptorToPopulate.getKey() ); + IndexPopulationJob populationJob = newIndexPopulationJob( descriptorToPopulate.getKey(), false ); populate( descriptorToPopulate.getValue(), indexMap, populationJob ); } } @@ -537,10 +539,28 @@ public Iterable> convertToIndexUpdates( Entit * This code is called from the transaction infrastructure during transaction commits, which means that * it is *vital* that it is stable, and handles errors very well. Failing here means that the entire db * will shut down. + * + * {@link IndexPopulator#verifyDeferredConstraints(NodePropertyAccessor)} will not be called as part of populating these indexes, + * instead that will be done by code that activates the indexes later. + */ + public void createIndexes( StoreIndexDescriptor... rules ) + { + createIndexes( false, rules ); + } + + /** + * Creates one or more indexes. They will all be populated by one and the same store scan. + * + * This code is called from the transaction infrastructure during transaction commits, which means that + * it is *vital* that it is stable, and handles errors very well. Failing here means that the entire db + * will shut down. + * + * @param verifyBeforeFlipping whether or not to call {@link IndexPopulator#verifyDeferredConstraints(NodePropertyAccessor)} + * as part of population, before flipping to a successful state. */ - public void createIndexes( StoreIndexDescriptor... rules ) throws IOException + public void createIndexes( boolean verifyBeforeFlipping, StoreIndexDescriptor... rules ) { - IndexPopulationStarter populationStarter = new IndexPopulationStarter( rules ); + IndexPopulationStarter populationStarter = new IndexPopulationStarter( verifyBeforeFlipping, rules ); indexMapRef.modify( populationStarter ); populationStarter.startPopulation(); } @@ -713,10 +733,10 @@ public ResourceIterator snapshotIndexFiles() throws IOException return Iterators.concatResourceIterators( snapshots.iterator() ); } - private IndexPopulationJob newIndexPopulationJob( EntityType type ) + private IndexPopulationJob newIndexPopulationJob( EntityType type, boolean verifyBeforeFlipping ) { MultipleIndexPopulator multiPopulator = multiPopulatorFactory.create( storeView, logProvider, type, schemaState ); - return new IndexPopulationJob( multiPopulator, monitor ); + return new IndexPopulationJob( multiPopulator, monitor, verifyBeforeFlipping ); } private void startIndexPopulation( IndexPopulationJob job ) @@ -761,12 +781,14 @@ private void logIndexStateSummary( String method, Map { + private final boolean verifyBeforeFlipping; private final StoreIndexDescriptor[] descriptors; private IndexPopulationJob nodePopulationJob; private IndexPopulationJob relationshipPopulationJob; - IndexPopulationStarter( StoreIndexDescriptor[] descriptors ) + IndexPopulationStarter( boolean verifyBeforeFlipping, StoreIndexDescriptor[] descriptors ) { + this.verifyBeforeFlipping = verifyBeforeFlipping; this.descriptors = descriptors; } @@ -805,15 +827,15 @@ public IndexMap apply( IndexMap indexMap ) { if ( descriptor.schema().entityType() == EntityType.NODE ) { - nodePopulationJob = nodePopulationJob == null ? newIndexPopulationJob( EntityType.NODE ) : nodePopulationJob; + nodePopulationJob = nodePopulationJob == null ? newIndexPopulationJob( EntityType.NODE, verifyBeforeFlipping ) : nodePopulationJob; index = indexProxyCreator.createPopulatingIndexProxy( descriptor, flipToTentative, monitor, nodePopulationJob ); index.start(); } else { - relationshipPopulationJob = - relationshipPopulationJob == null ? newIndexPopulationJob( EntityType.RELATIONSHIP ) : relationshipPopulationJob; + relationshipPopulationJob = relationshipPopulationJob == null ? newIndexPopulationJob( EntityType.RELATIONSHIP, verifyBeforeFlipping ) + : relationshipPopulationJob; index = indexProxyCreator.createPopulatingIndexProxy( descriptor, flipToTentative, monitor, relationshipPopulationJob ); index.start(); diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/MultipleIndexPopulator.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/MultipleIndexPopulator.java index 62899195d3c56..62db749de57d1 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/MultipleIndexPopulator.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/MultipleIndexPopulator.java @@ -82,7 +82,7 @@ *

  • Call to {@link #create()} to create data structures and files to start accepting updates.
  • *
  • Call to {@link #indexAllEntities()} (blocking call).
  • *
  • While all nodes are being indexed, calls to {@link #queueUpdate(IndexEntryUpdate)} are accepted.
  • - *
  • Call to {@link #flipAfterPopulation()} after successful population, or {@link #fail(Throwable)} if not
  • + *
  • Call to {@link #flipAfterPopulation(boolean)} after successful population, or {@link #fail(Throwable)} if not
  • * */ public class MultipleIndexPopulator implements IndexPopulator @@ -304,13 +304,13 @@ private void resetIndexCountsForPopulation( IndexPopulation indexPopulation ) storeView.replaceIndexCounts( indexPopulation.indexId, 0, 0, 0 ); } - void flipAfterPopulation() + void flipAfterPopulation( boolean verifyBeforeFlipping ) { for ( IndexPopulation population : populations ) { try { - population.flip(); + population.flip( verifyBeforeFlipping ); } catch ( Throwable t ) { @@ -564,7 +564,7 @@ private void onUpdate( IndexEntryUpdate update ) } } - void flip() throws FlipFailedKernelException + void flip( boolean verifyBeforeFlipping ) throws FlipFailedKernelException { flipper.flip( () -> { @@ -577,6 +577,10 @@ void flip() throws FlipFailedKernelException populateFromUpdateQueueIfAvailable( Long.MAX_VALUE ); if ( populations.contains( IndexPopulation.this ) ) { + if ( verifyBeforeFlipping ) + { + populator.verifyDeferredConstraints( storeView ); + } IndexSample sample = populator.sampleResult(); storeView.replaceIndexCounts( indexId, sample.uniqueValues(), sample.sampleSize(), sample.indexSize() ); populator.close( true ); diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/batchinsert/internal/BatchInserterImpl.java b/community/kernel/src/main/java/org/neo4j/unsafe/batchinsert/internal/BatchInserterImpl.java index cc10a23402aac..3fb254c217528 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/batchinsert/internal/BatchInserterImpl.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/batchinsert/internal/BatchInserterImpl.java @@ -19,7 +19,6 @@ */ package org.neo4j.unsafe.batchinsert.internal; -import java.io.Closeable; import java.io.File; import java.io.IOException; import java.io.PrintStream; @@ -45,48 +44,40 @@ import org.neo4j.graphdb.schema.ConstraintDefinition; import org.neo4j.graphdb.schema.IndexCreator; import org.neo4j.graphdb.schema.IndexDefinition; -import org.neo4j.helpers.Exceptions; import org.neo4j.helpers.collection.Iterables; import org.neo4j.helpers.collection.IteratorWrapper; -import org.neo4j.helpers.collection.Visitor; import org.neo4j.index.internal.gbptree.RecoveryCleanupWorkCollector; import org.neo4j.internal.kernel.api.InternalIndexState; import org.neo4j.internal.kernel.api.NamedToken; import org.neo4j.internal.kernel.api.exceptions.KernelException; import org.neo4j.internal.kernel.api.schema.LabelSchemaDescriptor; -import org.neo4j.internal.kernel.api.schema.SchemaDescriptor; -import org.neo4j.internal.kernel.api.schema.SchemaDescriptorSupplier; -import org.neo4j.io.IOUtils; import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.pagecache.IOLimiter; import org.neo4j.io.pagecache.PageCache; import org.neo4j.io.pagecache.tracing.PageCacheTracer; import org.neo4j.io.pagecache.tracing.cursor.PageCursorTracerSupplier; import org.neo4j.io.pagecache.tracing.cursor.context.EmptyVersionContextSupplier; -import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException; -import org.neo4j.kernel.api.index.IndexEntryUpdate; -import org.neo4j.kernel.api.index.IndexPopulator; +import org.neo4j.kernel.api.exceptions.index.IndexNotFoundKernelException; +import org.neo4j.kernel.api.exceptions.index.IndexPopulationFailedKernelException; import org.neo4j.kernel.api.index.IndexProvider; -import org.neo4j.kernel.api.index.NodePropertyAccessor; -import org.neo4j.kernel.api.labelscan.LabelScanStore; -import org.neo4j.kernel.api.labelscan.LabelScanWriter; -import org.neo4j.kernel.api.labelscan.NodeLabelUpdate; import org.neo4j.kernel.api.schema.SchemaDescriptorFactory; import org.neo4j.kernel.api.schema.constraints.ConstraintDescriptor; import org.neo4j.kernel.api.schema.constraints.ConstraintDescriptorFactory; import org.neo4j.kernel.api.schema.constraints.IndexBackedConstraintDescriptor; -import org.neo4j.kernel.api.schema.index.IndexDescriptor; import org.neo4j.kernel.api.schema.index.IndexDescriptorFactory; import org.neo4j.kernel.api.schema.index.StoreIndexDescriptor; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.extension.DatabaseKernelExtensions; import org.neo4j.kernel.extension.KernelExtensionFactory; import org.neo4j.kernel.extension.UnsatisfiedDependencyStrategies; -import org.neo4j.kernel.impl.api.index.EntityUpdates; +import org.neo4j.kernel.impl.api.DatabaseSchemaState; +import org.neo4j.kernel.impl.api.NonTransactionalTokenNameLookup; import org.neo4j.kernel.impl.api.index.IndexProviderMap; -import org.neo4j.kernel.impl.api.index.StoreScan; -import org.neo4j.kernel.impl.api.index.sampling.IndexSamplingConfig; -import org.neo4j.kernel.impl.api.scan.FullStoreChangeStream; +import org.neo4j.kernel.impl.api.index.IndexProxy; +import org.neo4j.kernel.impl.api.index.IndexStoreView; +import org.neo4j.kernel.impl.api.index.IndexingService; +import org.neo4j.kernel.impl.api.index.IndexingServiceFactory; +import org.neo4j.kernel.impl.api.scan.FullLabelStream; import org.neo4j.kernel.impl.api.store.SchemaCache; import org.neo4j.kernel.impl.constraints.StandardConstraintSemantics; import org.neo4j.kernel.impl.core.DelegatingTokenHolder; @@ -104,12 +95,12 @@ import org.neo4j.kernel.impl.factory.DatabaseInfo; import org.neo4j.kernel.impl.index.IndexConfigStore; import org.neo4j.kernel.impl.index.labelscan.NativeLabelScanStore; -import org.neo4j.kernel.impl.locking.LockService; import org.neo4j.kernel.impl.locking.Locks; import org.neo4j.kernel.impl.locking.NoOpClient; import org.neo4j.kernel.impl.logging.StoreLogService; import org.neo4j.kernel.impl.pagecache.ConfiguringPageCacheFactory; import org.neo4j.kernel.impl.pagecache.PageCacheLifecycle; +import org.neo4j.kernel.impl.scheduler.CentralJobScheduler; import org.neo4j.kernel.impl.spi.SimpleKernelContext; import org.neo4j.kernel.impl.storageengine.impl.recordstorage.PropertyCreator; import org.neo4j.kernel.impl.storageengine.impl.recordstorage.PropertyDeleter; @@ -152,6 +143,7 @@ import org.neo4j.kernel.impl.transaction.state.DefaultIndexProviderMap; import org.neo4j.kernel.impl.transaction.state.RecordAccess; import org.neo4j.kernel.impl.transaction.state.RecordAccess.RecordProxy; +import org.neo4j.kernel.impl.transaction.state.storeview.DynamicIndexStoreView; import org.neo4j.kernel.impl.transaction.state.storeview.NeoStoreIndexStoreView; import org.neo4j.kernel.impl.util.Dependencies; import org.neo4j.kernel.impl.util.ValueUtils; @@ -162,6 +154,7 @@ import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; import org.neo4j.logging.NullLog; +import org.neo4j.scheduler.JobScheduler; import org.neo4j.storageengine.api.schema.SchemaRule; import org.neo4j.unsafe.batchinsert.BatchInserter; import org.neo4j.unsafe.batchinsert.BatchRelationship; @@ -170,13 +163,15 @@ import static java.lang.Boolean.parseBoolean; import static java.util.Collections.emptyIterator; -import static org.eclipse.collections.impl.utility.ArrayIterate.contains; +import static java.util.Collections.emptyList; import static org.neo4j.collection.PrimitiveLongCollections.map; import static org.neo4j.graphdb.Label.label; import static org.neo4j.graphdb.factory.GraphDatabaseSettings.logs_directory; import static org.neo4j.graphdb.factory.GraphDatabaseSettings.store_internal_log_path; import static org.neo4j.helpers.Numbers.safeCastLongToInt; import static org.neo4j.internal.kernel.api.TokenRead.NO_TOKEN; +import static org.neo4j.kernel.impl.api.index.IndexingService.NO_MONITOR; +import static org.neo4j.kernel.impl.locking.LockService.NO_LOCK_SERVICE; import static org.neo4j.kernel.impl.store.NodeLabelsField.parseLabelsField; import static org.neo4j.kernel.impl.store.PropertyStore.encodeString; @@ -189,7 +184,6 @@ public class BatchInserterImpl implements BatchInserter, IndexConfigStoreProvide private final TokenHolders tokenHolders; private final IdGeneratorFactory idGeneratorFactory; private final IndexProviderMap indexProviderMap; - private final LabelScanStore labelScanStore; private final Log msgLog; private final SchemaCache schemaCache; private final Config config; @@ -197,6 +191,9 @@ public class BatchInserterImpl implements BatchInserter, IndexConfigStoreProvide private final StoreLocker storeLocker; private final PageCache pageCache; private final RecordStorageReader storageReader; + private final StoreLogService logService; + private final FileSystemAbstraction fileSystem; + private final Monitors monitors; private boolean labelsTouched; private boolean isShutdown; @@ -230,7 +227,7 @@ public Label apply( long from ) private final PropertyKeyTokenStore propertyKeyTokenStore; private final PropertyStore propertyStore; private final SchemaStore schemaStore; - private final NeoStoreIndexStoreView indexStoreView; + private final NeoStoreIndexStoreView storeIndexStoreView; private final LabelTokenStore labelTokenStore; private final Locks.Client noopLockClient = new NoOpClient(); @@ -243,6 +240,7 @@ public BatchInserterImpl( final File databaseDirectory, final FileSystemAbstract Map params = getDefaultParams(); params.putAll( stringParams ); this.config = Config.defaults( params ); + this.fileSystem = fileSystem; life = new LifeSupport(); this.databaseDirectory = databaseDirectory; @@ -256,7 +254,7 @@ public BatchInserterImpl( final File databaseDirectory, final FileSystemAbstract config.augment( logs_directory, databaseDirectory.getCanonicalPath() ); File internalLog = config.get( store_internal_log_path ); - StoreLogService logService = life.add( StoreLogService.withInternalLog( internalLog).build( fileSystem ) ); + logService = life.add( StoreLogService.withInternalLog( internalLog).build( fileSystem ) ); msgLog = logService.getInternalLog( getClass() ); boolean dump = config.get( GraphDatabaseSettings.dump_configuration ); @@ -289,11 +287,12 @@ public BatchInserterImpl( final File databaseDirectory, final FileSystemAbstract schemaStore = neoStores.getSchemaStore(); labelTokenStore = neoStores.getLabelTokenStore(); - indexStoreView = new NeoStoreIndexStoreView( LockService.NO_LOCK_SERVICE, neoStores ); + monitors = new Monitors(); + + storeIndexStoreView = new NeoStoreIndexStoreView( NO_LOCK_SERVICE, neoStores ); Dependencies deps = new Dependencies(); Monitors monitors = new Monitors(); - deps.satisfyDependencies( fileSystem, config, logService, indexStoreView, pageCache, monitors, - RecoveryCleanupWorkCollector.IMMEDIATE ); + deps.satisfyDependencies( fileSystem, config, logService, storeIndexStoreView, pageCache, monitors, RecoveryCleanupWorkCollector.IMMEDIATE ); DatabaseKernelExtensions extensions = life.add( new DatabaseKernelExtensions( new SimpleKernelContext( databaseDirectory, DatabaseInfo.UNKNOWN, deps ), @@ -307,14 +306,11 @@ public BatchInserterImpl( final File databaseDirectory, final FileSystemAbstract relationshipTypeTokenHolder.setInitialTokens( relationshipTypeTokenStore.getTokens() ); TokenHolder labelTokenHolder = new DelegatingTokenHolder( this::createNewLabelId, TokenHolder.TYPE_LABEL ); labelTokenHolder.setInitialTokens( labelTokenStore.getTokens() ); - this.tokenHolders = new TokenHolders( propertyKeyTokenHolder, labelTokenHolder, relationshipTypeTokenHolder ); + tokenHolders = new TokenHolders( propertyKeyTokenHolder, labelTokenHolder, relationshipTypeTokenHolder ); indexStore = life.add( new IndexConfigStore( this.databaseDirectory, fileSystem ) ); schemaCache = new SchemaCache( new StandardConstraintSemantics(), schemaStore, indexProviderMap ); - labelScanStore = new NativeLabelScanStore( pageCache, databaseDirectory, fileSystem, FullStoreChangeStream.EMPTY, false, monitors, - RecoveryCleanupWorkCollector.IMMEDIATE ); - life.add( labelScanStore ); actions = new BatchSchemaActions(); // Record access @@ -491,122 +487,63 @@ private void createIndex( int labelId, int[] propertyKeyIds ) flushStrategy.forceFlush(); } - private void repopulateAllIndexes() throws IOException, IndexEntryConflictException + private void repopulateAllIndexes( NativeLabelScanStore labelIndex ) { - if ( !labelsTouched ) - { - return; - } - - final StoreIndexDescriptor[] indexDescriptors = getIndexesNeedingPopulation(); - final List populators = new ArrayList<>( indexDescriptors.length ); - + LogProvider logProvider = logService.getInternalLogProvider(); + IndexStoreView indexStoreView = new DynamicIndexStoreView( storeIndexStoreView, labelIndex, NO_LOCK_SERVICE, neoStores, logProvider ); + JobScheduler jobScheduler = life.add( new CentralJobScheduler() ); + IndexingService indexingService = life.add( IndexingServiceFactory.createIndexingService( config, jobScheduler, indexProviderMap, indexStoreView, + new NonTransactionalTokenNameLookup( tokenHolders ), emptyList(), logProvider, NO_MONITOR, new DatabaseSchemaState( logProvider ) ) ); try { - final SchemaDescriptor[] descriptors = new LabelSchemaDescriptor[indexDescriptors.length]; - - for ( int i = 0; i < indexDescriptors.length; i++ ) - { - StoreIndexDescriptor index = indexDescriptors[i]; - descriptors[i] = index.schema(); - IndexPopulator populator = indexProviderMap.lookup( index.providerDescriptor() ) - .getPopulator( index, new IndexSamplingConfig( config ) ); - populator.create(); - populators.add( new IndexPopulatorWithSchema( populator, index ) ); - } - - Visitor propertyUpdateVisitor = updates -> - { - // Do a lookup from which property has changed to a list of indexes worried about that property. - // We do not need to load additional properties as the EntityUpdates for a full node store scan already - // include all properties for the node. - for ( IndexEntryUpdate indexUpdate : updates.forIndexKeys( populators ) ) - { - try - { - indexUpdate.indexKey().add( indexUpdate ); - } - catch ( IndexEntryConflictException conflict ) - { - throw conflict.notAllowed( indexUpdate.indexKey().index() ); - } - } - return true; - }; - - List descriptorList = Arrays.asList( descriptors ); - int[] labelIds = descriptorList.stream() - .flatMapToInt( d -> Arrays.stream( d.getEntityTokenIds() ) ) - .toArray(); - - int[] propertyKeyIds = descriptorList.stream() - .flatMapToInt( d -> Arrays.stream( d.getPropertyIds() ) ) - .toArray(); - - try ( InitialNodeLabelCreationVisitor labelUpdateVisitor = new InitialNodeLabelCreationVisitor() ) + StoreIndexDescriptor[] descriptors = getIndexesNeedingPopulation(); + indexingService.createIndexes( true /*verify constraints before flipping over*/, descriptors ); + for ( StoreIndexDescriptor descriptor : descriptors ) { - StoreScan storeScan = indexStoreView.visitNodes( labelIds, - propertyKeyId -> contains( propertyKeyIds, propertyKeyId ), - propertyUpdateVisitor, labelUpdateVisitor, true ); - storeScan.run(); - - IndexEntryConflictException conflictException = null; - for ( IndexPopulatorWithSchema populator : populators ) + IndexProxy indexProxy = getIndexProxy( indexingService, descriptor ); + try { - try - { - populator.verifyDeferredConstraints( indexStoreView ); - populator.close( true ); - } - catch ( IndexEntryConflictException e ) - { - populator.close( false ); - conflictException = Exceptions.chain( conflictException, e ); - } + indexProxy.awaitStoreScanCompleted(); } - if ( conflictException != null ) + catch ( IndexPopulationFailedKernelException e ) { - throw conflictException; + // In this scenario this is OK } } + indexingService.forceAll( IOLimiter.UNLIMITED ); } - finally + catch ( InterruptedException e ) { - IOUtils.closeAll( populators ); + // Someone wanted us to abort this. The indexes may not have been fully populated. This just means that they will be populated on next startup. + Thread.currentThread().interrupt(); } } - private void rebuildCounts() + private IndexProxy getIndexProxy( IndexingService indexingService, StoreIndexDescriptor descriptpr ) { - CountsTracker counts = neoStores.getCounts(); try { - counts.start(); + return indexingService.getIndexProxy( descriptpr.schema() ); } - catch ( IOException e ) + catch ( IndexNotFoundKernelException e ) { - throw new UnderlyingStorageException( e ); + throw new IllegalStateException( "Expected index by descriptor " + descriptpr + " to exist, but didn't", e ); } - - CountsComputer.recomputeCounts( neoStores, pageCache ); } - private class InitialNodeLabelCreationVisitor implements Visitor, Closeable + private void rebuildCounts() { - LabelScanWriter writer = labelScanStore.newWriter(); - - @Override - public boolean visit( NodeLabelUpdate update ) throws IOException + CountsTracker counts = neoStores.getCounts(); + try { - writer.write( update ); - return true; + counts.start(); } - - @Override - public void close() throws IOException + catch ( IOException e ) { - writer.close(); + throw new UnderlyingStorageException( e ); } + + CountsComputer.recomputeCounts( neoStores, pageCache ); } private StoreIndexDescriptor[] getIndexesNeedingPopulation() @@ -991,10 +928,10 @@ public void shutdown() try { - repopulateAllIndexes(); - labelScanStore.force( IOLimiter.UNLIMITED ); + NativeLabelScanStore labelIndex = buildLabelIndex(); + repopulateAllIndexes( labelIndex ); } - catch ( IOException | IndexEntryConflictException e ) + catch ( IOException e ) { throw new RuntimeException( e ); } @@ -1016,6 +953,20 @@ public void shutdown() } } + private NativeLabelScanStore buildLabelIndex() throws IOException + { + NativeLabelScanStore labelIndex = + new NativeLabelScanStore( pageCache, databaseDirectory, fileSystem, new FullLabelStream( storeIndexStoreView ), false, monitors, + RecoveryCleanupWorkCollector.IMMEDIATE ); + if ( labelsTouched ) + { + labelIndex.drop(); + } + // Rebuild will happen as part of this call if it was dropped + life.add( labelIndex ); + return labelIndex; + } + @Override public String toString() { @@ -1302,63 +1253,4 @@ public void forceFlush() attempts = 0; } } - - private static class IndexPopulatorWithSchema extends IndexPopulator.Adapter implements SchemaDescriptorSupplier, AutoCloseable - { - private static final int batchSize = 1_000; - private final IndexPopulator populator; - private final IndexDescriptor index; - private Collection> batchedUpdates = new ArrayList<>( batchSize ); - private boolean closed; - - IndexPopulatorWithSchema( IndexPopulator populator, IndexDescriptor index ) - { - this.populator = populator; - this.index = index; - } - - @Override - public SchemaDescriptor schema() - { - return index.schema(); - } - - public IndexDescriptor index() - { - return index; - } - - public void add( IndexEntryUpdate update ) throws IndexEntryConflictException - { - batchedUpdates.add( update ); - if ( batchedUpdates.size() > batchSize ) - { - populator.add( batchedUpdates ); - batchedUpdates = new ArrayList<>( batchSize ); - } - } - - @Override - public void verifyDeferredConstraints( NodePropertyAccessor nodePropertyAccessor ) throws IndexEntryConflictException - { - populator.add( batchedUpdates ); - populator.verifyDeferredConstraints( nodePropertyAccessor ); - } - - @Override - public void close() throws IOException - { - close( false ); - } - - @Override - public void close( boolean populationCompletedSuccessfully ) - { - if ( !closed ) - { - closed = true; - populator.close( populationCompletedSuccessfully ); - } - } - } } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/IndexPopulationTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/IndexPopulationTest.java index 8ad0172479776..7c3b2512751ca 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/IndexPopulationTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/IndexPopulationTest.java @@ -67,7 +67,7 @@ public void mustFlipToFailedIfFailureToApplyLastBatchWhileFlipping() throws Exce multipleIndexPopulator.indexAllEntities().run(); // when - indexPopulation.flip(); + indexPopulation.flip( false ); // then assertTrue( "flipper should have flipped to failing proxy", flipper.getState() == InternalIndexState.FAILED ); diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/MultipleIndexPopulatorTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/MultipleIndexPopulatorTest.java index ed5ebb1e2c9fe..ea0db70ea78fe 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/MultipleIndexPopulatorTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/MultipleIndexPopulatorTest.java @@ -103,7 +103,7 @@ public void canceledPopulationNotAbleToFlip() throws IOException, FlipFailedKern indexPopulation.cancel(); - indexPopulation.flip(); + indexPopulation.flip( false ); verify( indexPopulation.populator, never() ).sampleResult(); } @@ -114,7 +114,7 @@ public void flippedPopulationAreNotCanceable() throws IOException, FlipFailedKer IndexPopulator populator = createIndexPopulator(); IndexPopulation indexPopulation = addPopulator( populator, 1 ); - indexPopulation.flip(); + indexPopulation.flip( false ); indexPopulation.cancel(); @@ -184,7 +184,7 @@ public void cancelingSinglePopulatorDoNotCancelAnyOther() throws FlipFailedKerne assertTrue( multipleIndexPopulator.hasPopulators() ); - multipleIndexPopulator.flipAfterPopulation(); + multipleIndexPopulator.flipAfterPopulation( false ); verify( populationToKeepActive.flipper ).flip( any( Callable.class ), any( FailedIndexProxyFactory.class ) ); } @@ -206,7 +206,7 @@ public void canceledPopulatorDoNotFlipWhenPopulationCompleted() throws FlipFaile assertTrue( multipleIndexPopulator.hasPopulators() ); - multipleIndexPopulator.flipAfterPopulation(); + multipleIndexPopulator.flipAfterPopulation( false ); verify( populationToCancel.flipper, never() ).flip( any( Callable.class ), any( FailedIndexProxyFactory.class ) ); } @@ -314,7 +314,7 @@ public void testFlipAfterPopulation() throws Exception FlippableIndexProxy flipper1 = addPopulator( indexPopulator1, 1 ).flipper; FlippableIndexProxy flipper2 = addPopulator( indexPopulator2, 2 ).flipper; - multipleIndexPopulator.flipAfterPopulation(); + multipleIndexPopulator.flipAfterPopulation( false ); verify( flipper1 ).flip( any( Callable.class ), any( FailedIndexProxyFactory.class ) ); verify( flipper2 ).flip( any( Callable.class ), any( FailedIndexProxyFactory.class ) ); @@ -331,7 +331,7 @@ public void populationsRemovedDuringFlip() throws FlipFailedKernelException assertTrue( multipleIndexPopulator.hasPopulators() ); - multipleIndexPopulator.flipAfterPopulation(); + multipleIndexPopulator.flipAfterPopulation( false ); assertFalse( multipleIndexPopulator.hasPopulators() ); } @@ -368,7 +368,7 @@ public void testIndexFlip() when( indexPopulator1.sampleResult() ).thenThrow( getSampleError() ); multipleIndexPopulator.indexAllEntities(); - multipleIndexPopulator.flipAfterPopulation(); + multipleIndexPopulator.flipAfterPopulation( false ); verify( indexPopulator1 ).close( false ); verify( failedIndexProxyFactory, times( 1 ) ).create( any( RuntimeException.class ) ); @@ -462,7 +462,28 @@ public void testMultiplePropertyUpdateFailures() throws IndexEntryConflictExcept checkPopulatorFailure( populator ); } - private static IndexEntryUpdate createIndexEntryUpdate( LabelSchemaDescriptor schemaDescriptor ) + @Test + public void shouldVerifyConstraintsBeforeFlippingIfToldTo() throws IOException, IndexEntryConflictException + { + // given + IndexProxyFactory indexProxyFactory = mock( IndexProxyFactory.class ); + FailedIndexProxyFactory failedIndexProxyFactory = mock( FailedIndexProxyFactory.class ); + FlippableIndexProxy flipper = new FlippableIndexProxy(); + flipper.setFlipTarget( indexProxyFactory ); + IndexPopulator indexPopulator = createIndexPopulator(); + addPopulator( indexPopulator, 1, flipper, failedIndexProxyFactory ); + when( indexPopulator.sampleResult() ).thenReturn( new IndexSample() ); + + // when + multipleIndexPopulator.indexAllEntities(); + multipleIndexPopulator.flipAfterPopulation( true ); + + // then + verify( indexPopulator ).verifyDeferredConstraints( any( NodePropertyAccessor.class ) ); + verify( indexPopulator ).close( true ); + } + + private IndexEntryUpdate createIndexEntryUpdate( LabelSchemaDescriptor schemaDescriptor ) { return add( 1, schemaDescriptor, "theValue" ); }