diff --git a/community/community-it/kernel-it/src/test/java/org/neo4j/kernel/impl/api/index/IndexPopulationJobTest.java b/community/community-it/kernel-it/src/test/java/org/neo4j/kernel/impl/api/index/IndexPopulationJobTest.java index 40c3a823235d9..7e5fab82e5d44 100644 --- a/community/community-it/kernel-it/src/test/java/org/neo4j/kernel/impl/api/index/IndexPopulationJobTest.java +++ b/community/community-it/kernel-it/src/test/java/org/neo4j/kernel/impl/api/index/IndexPopulationJobTest.java @@ -712,7 +712,7 @@ private IndexPopulationJob newIndexPopulationJob( FailedIndexProxyFactory failur flipper.setFlipTarget( mock( IndexProxyFactory.class ) ); MultipleIndexPopulator multiPopulator = new MultipleIndexPopulator( storeView, logProvider, type, stateHolder ); - IndexPopulationJob job = new IndexPopulationJob( multiPopulator, NO_MONITOR ); + IndexPopulationJob job = new IndexPopulationJob( multiPopulator, NO_MONITOR, false ); job.addPopulator( populator, descriptor.withId( indexId ).withoutCapabilities(), format( ":%s(%s)", FIRST.name(), name ), flipper, failureDelegateFactory ); return job; diff --git a/community/community-it/kernel-it/src/test/java/org/neo4j/unsafe/batchinsert/internal/BatchInsertTest.java b/community/community-it/kernel-it/src/test/java/org/neo4j/unsafe/batchinsert/internal/BatchInsertTest.java index e12296556293e..b92f6ec5f2e0b 100644 --- a/community/community-it/kernel-it/src/test/java/org/neo4j/unsafe/batchinsert/internal/BatchInsertTest.java +++ b/community/community-it/kernel-it/src/test/java/org/neo4j/unsafe/batchinsert/internal/BatchInsertTest.java @@ -41,7 +41,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.neo4j.graphdb.ConstraintViolationException; @@ -61,7 +60,7 @@ import org.neo4j.helpers.collection.Pair; import org.neo4j.index.internal.gbptree.RecoveryCleanupWorkCollector; import org.neo4j.io.fs.DefaultFileSystemAbstraction; -import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException; +import org.neo4j.kernel.api.index.IndexAccessor; import org.neo4j.kernel.api.index.IndexPopulator; import org.neo4j.kernel.api.index.IndexProvider; import org.neo4j.kernel.api.index.NodePropertyAccessor; @@ -87,6 +86,7 @@ import org.neo4j.kernel.impl.store.record.RecordLoad; import org.neo4j.kernel.internal.GraphDatabaseAPI; import org.neo4j.kernel.monitoring.Monitors; +import org.neo4j.storageengine.api.schema.IndexSample; import org.neo4j.storageengine.api.schema.LabelScanReader; import org.neo4j.storageengine.api.schema.SchemaRule; import org.neo4j.test.TestGraphDatabaseFactory; @@ -101,6 +101,7 @@ import static java.lang.Integer.parseInt; import static java.lang.String.format; import static java.util.Collections.singletonList; +import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.arrayContaining; import static org.hamcrest.Matchers.emptyArray; @@ -114,7 +115,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import static org.mockito.hamcrest.MockitoHamcrest.argThat; import static org.neo4j.graphdb.Label.label; @@ -876,10 +876,12 @@ public void shouldRunIndexPopulationJobAtShutdown() throws Throwable // GIVEN IndexPopulator populator = mock( IndexPopulator.class ); IndexProvider provider = mock( IndexProvider.class ); + IndexAccessor accessor = mock( IndexAccessor.class ); when( provider.getProviderDescriptor() ).thenReturn( DESCRIPTOR ); - when( provider.getPopulator( any( StoreIndexDescriptor.class ), any( IndexSamplingConfig.class ) ) ) - .thenReturn( populator ); + when( provider.getPopulator( any( StoreIndexDescriptor.class ), any( IndexSamplingConfig.class ) ) ).thenReturn( populator ); + when( populator.sampleResult() ).thenReturn( new IndexSample() ); + when( provider.getOnlineAccessor( any( StoreIndexDescriptor.class ), any( IndexSamplingConfig.class ) ) ).thenReturn( accessor ); BatchInserter inserter = newBatchInserterWithIndexProvider( singleInstanceIndexProviderFactory( KEY, provider ) ); @@ -896,13 +898,11 @@ public void shouldRunIndexPopulationJobAtShutdown() throws Throwable verify( provider ).start(); verify( provider ).getPopulator( any( StoreIndexDescriptor.class ), any( IndexSamplingConfig.class ) ); verify( populator ).create(); - verify( populator ).add( argThat( matchesCollection( add( nodeId, internalIndex.schema(), - Values.of( "Jakewins" ) ) ) ) ); + verify( populator ).add( argThat( matchesCollection( add( nodeId, internalIndex.schema(), Values.of( "Jakewins" ) ) ) ) ); verify( populator ).verifyDeferredConstraints( any( NodePropertyAccessor.class ) ); verify( populator ).close( true ); verify( provider ).stop(); verify( provider ).shutdown(); - verifyNoMoreInteractions( populator ); } @Test @@ -911,10 +911,12 @@ public void shouldRunConstraintPopulationJobAtShutdown() throws Throwable // GIVEN IndexPopulator populator = mock( IndexPopulator.class ); IndexProvider provider = mock( IndexProvider.class ); + IndexAccessor accessor = mock( IndexAccessor.class ); when( provider.getProviderDescriptor() ).thenReturn( DESCRIPTOR ); - when( provider.getPopulator( any( StoreIndexDescriptor.class ), any( IndexSamplingConfig.class ) ) ) - .thenReturn( populator ); + when( provider.getPopulator( any( StoreIndexDescriptor.class ), any( IndexSamplingConfig.class ) ) ).thenReturn( populator ); + when( populator.sampleResult() ).thenReturn( new IndexSample() ); + when( provider.getOnlineAccessor( any( StoreIndexDescriptor.class ), any( IndexSamplingConfig.class ) ) ).thenReturn( accessor ); BatchInserter inserter = newBatchInserterWithIndexProvider( singleInstanceIndexProviderFactory( KEY, provider ) ); @@ -936,7 +938,6 @@ public void shouldRunConstraintPopulationJobAtShutdown() throws Throwable verify( populator ).close( true ); verify( provider ).stop(); verify( provider ).shutdown(); - verifyNoMoreInteractions( populator ); } @Test @@ -947,10 +948,12 @@ public void shouldRepopulatePreexistingIndexed() throws Throwable IndexPopulator populator = mock( IndexPopulator.class ); IndexProvider provider = mock( IndexProvider.class ); + IndexAccessor accessor = mock( IndexAccessor.class ); when( provider.getProviderDescriptor() ).thenReturn( DESCRIPTOR ); - when( provider.getPopulator( any( StoreIndexDescriptor.class ), any( IndexSamplingConfig.class ) ) ) - .thenReturn( populator ); + when( provider.getPopulator( any( StoreIndexDescriptor.class ), any( IndexSamplingConfig.class ) ) ).thenReturn( populator ); + when( populator.sampleResult() ).thenReturn( new IndexSample() ); + when( provider.getOnlineAccessor( any( StoreIndexDescriptor.class ), any( IndexSamplingConfig.class ) ) ).thenReturn( accessor ); BatchInserter inserter = newBatchInserterWithIndexProvider( singleInstanceIndexProviderFactory( KEY, provider ) ); @@ -972,7 +975,6 @@ public void shouldRepopulatePreexistingIndexed() throws Throwable verify( populator ).close( true ); verify( provider ).stop(); verify( provider ).shutdown(); - verifyNoMoreInteractions( populator ); } @Test @@ -1322,15 +1324,18 @@ public void uniquenessConstraintShouldBeCheckedOnBatchInserterShutdownAndFailIfV inserter.createNode( Collections.singletonMap( property, value ), label ); // Then - try + GraphDatabaseService db = switchToEmbeddedGraphDatabaseService( inserter ); + try ( Transaction tx = db.beginTx() ) { - inserter.shutdown(); - fail( "Node that violates uniqueness constraint was created by batch inserter" ); + IndexDefinition index = db.schema().getIndexes( label ).iterator().next(); + String indexFailure = db.schema().getIndexFailure( index ); + assertThat( indexFailure, containsString( "IndexEntryConflictException" ) ); + assertThat( indexFailure, containsString( value ) ); + tx.success(); } - catch ( RuntimeException ex ) + finally { - // good - assertEquals( new IndexEntryConflictException( 0, 1, Values.of( value ) ), ex.getCause() ); + db.shutdown(); } } @@ -1399,18 +1404,10 @@ private GraphDatabaseService switchToEmbeddedGraphDatabaseService( BatchInserter inserter.shutdown(); TestGraphDatabaseFactory factory = new TestGraphDatabaseFactory(); factory.setFileSystem( fileSystemRule.get() ); - GraphDatabaseService db = factory.newImpermanentDatabaseBuilder( localTestDirectory.storeDir() ) + return factory.newImpermanentDatabaseBuilder( localTestDirectory.storeDir() ) // Shouldn't be necessary to set dense node threshold since it's a stick config .setConfig( configuration() ) .newGraphDatabase(); - - try ( Transaction tx = db.beginTx() ) - { - db.schema().awaitIndexesOnline( 10, TimeUnit.SECONDS ); - tx.success(); - } - - return db; } private LabelScanStore getLabelScanStore() diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/BatchingMultipleIndexPopulator.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/BatchingMultipleIndexPopulator.java index d50483bf08060..2570a9979d6f0 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/BatchingMultipleIndexPopulator.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/BatchingMultipleIndexPopulator.java @@ -51,7 +51,7 @@ * updates are inserted in the queue. When store scan notices that queue size has reached {@link #QUEUE_THRESHOLD} than * it drains all batched updates and waits for all submitted to the executor tasks to complete and flushes updates from * the queue using {@link MultipleIndexUpdater}. If queue size never reaches {@link #QUEUE_THRESHOLD} than all queued - * concurrent updates are flushed after the store scan in {@link #flipAfterPopulation()}. + * concurrent updates are flushed after the store scan in {@link MultipleIndexPopulator#flipAfterPopulation(boolean)}. *

* Inner {@link ExecutorService executor} is shut down after the store scan completes. */ diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/IndexPopulationJob.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/IndexPopulationJob.java index 3fc7a55fc2b8a..dc01e7850a58e 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/IndexPopulationJob.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/IndexPopulationJob.java @@ -27,7 +27,6 @@ import org.neo4j.kernel.api.index.IndexEntryUpdate; import org.neo4j.kernel.api.index.IndexPopulator; import org.neo4j.kernel.api.schema.index.CapableIndexDescriptor; -import org.neo4j.kernel.impl.api.SchemaState; import org.neo4j.storageengine.api.schema.PopulationProgress; import static java.lang.Thread.currentThread; @@ -42,16 +41,18 @@ public class IndexPopulationJob implements Runnable { private final IndexingService.Monitor monitor; + private final boolean verifyBeforeFlipping; private final MultipleIndexPopulator multiPopulator; private final CountDownLatch doneSignal = new CountDownLatch( 1 ); private volatile StoreScan storeScan; private volatile boolean cancelled; - public IndexPopulationJob( MultipleIndexPopulator multiPopulator, IndexingService.Monitor monitor ) + public IndexPopulationJob( MultipleIndexPopulator multiPopulator, IndexingService.Monitor monitor, boolean verifyBeforeFlipping ) { this.multiPopulator = multiPopulator; this.monitor = monitor; + this.verifyBeforeFlipping = verifyBeforeFlipping; } /** @@ -106,7 +107,7 @@ public void run() // We remain in POPULATING state return; } - multiPopulator.flipAfterPopulation(); + multiPopulator.flipAfterPopulation( verifyBeforeFlipping ); } catch ( Throwable t ) { diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/IndexingService.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/IndexingService.java index 08e220803839f..6619e2a70eccb 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/IndexingService.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/IndexingService.java @@ -52,8 +52,10 @@ import org.neo4j.kernel.api.exceptions.index.IndexPopulationFailedKernelException; import org.neo4j.kernel.api.exceptions.schema.UniquePropertyValueValidationException; import org.neo4j.kernel.api.index.IndexEntryUpdate; +import org.neo4j.kernel.api.index.IndexPopulator; import org.neo4j.kernel.api.index.IndexProvider; import org.neo4j.kernel.api.index.IndexUpdater; +import org.neo4j.kernel.api.index.NodePropertyAccessor; import org.neo4j.kernel.api.schema.index.IndexDescriptor; import org.neo4j.kernel.api.schema.index.StoreIndexDescriptor; import org.neo4j.kernel.impl.api.SchemaState; @@ -345,7 +347,7 @@ public void populateIndexesOfAllTypes( MutableLongObjectMap> descriptorToPopulate : rebuildingDescriptorsByType.entrySet() ) { - IndexPopulationJob populationJob = newIndexPopulationJob( descriptorToPopulate.getKey() ); + IndexPopulationJob populationJob = newIndexPopulationJob( descriptorToPopulate.getKey(), false ); populate( descriptorToPopulate.getValue(), indexMap, populationJob ); } } @@ -537,10 +539,28 @@ public Iterable> convertToIndexUpdates( Entit * This code is called from the transaction infrastructure during transaction commits, which means that * it is *vital* that it is stable, and handles errors very well. Failing here means that the entire db * will shut down. + * + * {@link IndexPopulator#verifyDeferredConstraints(NodePropertyAccessor)} will not be called as part of populating these indexes, + * instead that will be done by code that activates the indexes later. + */ + public void createIndexes( StoreIndexDescriptor... rules ) + { + createIndexes( false, rules ); + } + + /** + * Creates one or more indexes. They will all be populated by one and the same store scan. + * + * This code is called from the transaction infrastructure during transaction commits, which means that + * it is *vital* that it is stable, and handles errors very well. Failing here means that the entire db + * will shut down. + * + * @param verifyBeforeFlipping whether or not to call {@link IndexPopulator#verifyDeferredConstraints(NodePropertyAccessor)} + * as part of population, before flipping to a successful state. */ - public void createIndexes( StoreIndexDescriptor... rules ) throws IOException + public void createIndexes( boolean verifyBeforeFlipping, StoreIndexDescriptor... rules ) { - IndexPopulationStarter populationStarter = new IndexPopulationStarter( rules ); + IndexPopulationStarter populationStarter = new IndexPopulationStarter( verifyBeforeFlipping, rules ); indexMapRef.modify( populationStarter ); populationStarter.startPopulation(); } @@ -713,10 +733,10 @@ public ResourceIterator snapshotIndexFiles() throws IOException return Iterators.concatResourceIterators( snapshots.iterator() ); } - private IndexPopulationJob newIndexPopulationJob( EntityType type ) + private IndexPopulationJob newIndexPopulationJob( EntityType type, boolean verifyBeforeFlipping ) { MultipleIndexPopulator multiPopulator = multiPopulatorFactory.create( storeView, logProvider, type, schemaState ); - return new IndexPopulationJob( multiPopulator, monitor ); + return new IndexPopulationJob( multiPopulator, monitor, verifyBeforeFlipping ); } private void startIndexPopulation( IndexPopulationJob job ) @@ -761,12 +781,14 @@ private void logIndexStateSummary( String method, Map { + private final boolean verifyBeforeFlipping; private final StoreIndexDescriptor[] descriptors; private IndexPopulationJob nodePopulationJob; private IndexPopulationJob relationshipPopulationJob; - IndexPopulationStarter( StoreIndexDescriptor[] descriptors ) + IndexPopulationStarter( boolean verifyBeforeFlipping, StoreIndexDescriptor[] descriptors ) { + this.verifyBeforeFlipping = verifyBeforeFlipping; this.descriptors = descriptors; } @@ -805,15 +827,15 @@ public IndexMap apply( IndexMap indexMap ) { if ( descriptor.schema().entityType() == EntityType.NODE ) { - nodePopulationJob = nodePopulationJob == null ? newIndexPopulationJob( EntityType.NODE ) : nodePopulationJob; + nodePopulationJob = nodePopulationJob == null ? newIndexPopulationJob( EntityType.NODE, verifyBeforeFlipping ) : nodePopulationJob; index = indexProxyCreator.createPopulatingIndexProxy( descriptor, flipToTentative, monitor, nodePopulationJob ); index.start(); } else { - relationshipPopulationJob = - relationshipPopulationJob == null ? newIndexPopulationJob( EntityType.RELATIONSHIP ) : relationshipPopulationJob; + relationshipPopulationJob = relationshipPopulationJob == null ? newIndexPopulationJob( EntityType.RELATIONSHIP, verifyBeforeFlipping ) + : relationshipPopulationJob; index = indexProxyCreator.createPopulatingIndexProxy( descriptor, flipToTentative, monitor, relationshipPopulationJob ); index.start(); diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/MultipleIndexPopulator.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/MultipleIndexPopulator.java index 62899195d3c56..62db749de57d1 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/MultipleIndexPopulator.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/MultipleIndexPopulator.java @@ -82,7 +82,7 @@ *

  • Call to {@link #create()} to create data structures and files to start accepting updates.
  • *
  • Call to {@link #indexAllEntities()} (blocking call).
  • *
  • While all nodes are being indexed, calls to {@link #queueUpdate(IndexEntryUpdate)} are accepted.
  • - *
  • Call to {@link #flipAfterPopulation()} after successful population, or {@link #fail(Throwable)} if not
  • + *
  • Call to {@link #flipAfterPopulation(boolean)} after successful population, or {@link #fail(Throwable)} if not
  • * */ public class MultipleIndexPopulator implements IndexPopulator @@ -304,13 +304,13 @@ private void resetIndexCountsForPopulation( IndexPopulation indexPopulation ) storeView.replaceIndexCounts( indexPopulation.indexId, 0, 0, 0 ); } - void flipAfterPopulation() + void flipAfterPopulation( boolean verifyBeforeFlipping ) { for ( IndexPopulation population : populations ) { try { - population.flip(); + population.flip( verifyBeforeFlipping ); } catch ( Throwable t ) { @@ -564,7 +564,7 @@ private void onUpdate( IndexEntryUpdate update ) } } - void flip() throws FlipFailedKernelException + void flip( boolean verifyBeforeFlipping ) throws FlipFailedKernelException { flipper.flip( () -> { @@ -577,6 +577,10 @@ void flip() throws FlipFailedKernelException populateFromUpdateQueueIfAvailable( Long.MAX_VALUE ); if ( populations.contains( IndexPopulation.this ) ) { + if ( verifyBeforeFlipping ) + { + populator.verifyDeferredConstraints( storeView ); + } IndexSample sample = populator.sampleResult(); storeView.replaceIndexCounts( indexId, sample.uniqueValues(), sample.sampleSize(), sample.indexSize() ); populator.close( true ); diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/batchinsert/internal/BatchInserterImpl.java b/community/kernel/src/main/java/org/neo4j/unsafe/batchinsert/internal/BatchInserterImpl.java index cc10a23402aac..3fb254c217528 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/batchinsert/internal/BatchInserterImpl.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/batchinsert/internal/BatchInserterImpl.java @@ -19,7 +19,6 @@ */ package org.neo4j.unsafe.batchinsert.internal; -import java.io.Closeable; import java.io.File; import java.io.IOException; import java.io.PrintStream; @@ -45,48 +44,40 @@ import org.neo4j.graphdb.schema.ConstraintDefinition; import org.neo4j.graphdb.schema.IndexCreator; import org.neo4j.graphdb.schema.IndexDefinition; -import org.neo4j.helpers.Exceptions; import org.neo4j.helpers.collection.Iterables; import org.neo4j.helpers.collection.IteratorWrapper; -import org.neo4j.helpers.collection.Visitor; import org.neo4j.index.internal.gbptree.RecoveryCleanupWorkCollector; import org.neo4j.internal.kernel.api.InternalIndexState; import org.neo4j.internal.kernel.api.NamedToken; import org.neo4j.internal.kernel.api.exceptions.KernelException; import org.neo4j.internal.kernel.api.schema.LabelSchemaDescriptor; -import org.neo4j.internal.kernel.api.schema.SchemaDescriptor; -import org.neo4j.internal.kernel.api.schema.SchemaDescriptorSupplier; -import org.neo4j.io.IOUtils; import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.pagecache.IOLimiter; import org.neo4j.io.pagecache.PageCache; import org.neo4j.io.pagecache.tracing.PageCacheTracer; import org.neo4j.io.pagecache.tracing.cursor.PageCursorTracerSupplier; import org.neo4j.io.pagecache.tracing.cursor.context.EmptyVersionContextSupplier; -import org.neo4j.kernel.api.exceptions.index.IndexEntryConflictException; -import org.neo4j.kernel.api.index.IndexEntryUpdate; -import org.neo4j.kernel.api.index.IndexPopulator; +import org.neo4j.kernel.api.exceptions.index.IndexNotFoundKernelException; +import org.neo4j.kernel.api.exceptions.index.IndexPopulationFailedKernelException; import org.neo4j.kernel.api.index.IndexProvider; -import org.neo4j.kernel.api.index.NodePropertyAccessor; -import org.neo4j.kernel.api.labelscan.LabelScanStore; -import org.neo4j.kernel.api.labelscan.LabelScanWriter; -import org.neo4j.kernel.api.labelscan.NodeLabelUpdate; import org.neo4j.kernel.api.schema.SchemaDescriptorFactory; import org.neo4j.kernel.api.schema.constraints.ConstraintDescriptor; import org.neo4j.kernel.api.schema.constraints.ConstraintDescriptorFactory; import org.neo4j.kernel.api.schema.constraints.IndexBackedConstraintDescriptor; -import org.neo4j.kernel.api.schema.index.IndexDescriptor; import org.neo4j.kernel.api.schema.index.IndexDescriptorFactory; import org.neo4j.kernel.api.schema.index.StoreIndexDescriptor; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.extension.DatabaseKernelExtensions; import org.neo4j.kernel.extension.KernelExtensionFactory; import org.neo4j.kernel.extension.UnsatisfiedDependencyStrategies; -import org.neo4j.kernel.impl.api.index.EntityUpdates; +import org.neo4j.kernel.impl.api.DatabaseSchemaState; +import org.neo4j.kernel.impl.api.NonTransactionalTokenNameLookup; import org.neo4j.kernel.impl.api.index.IndexProviderMap; -import org.neo4j.kernel.impl.api.index.StoreScan; -import org.neo4j.kernel.impl.api.index.sampling.IndexSamplingConfig; -import org.neo4j.kernel.impl.api.scan.FullStoreChangeStream; +import org.neo4j.kernel.impl.api.index.IndexProxy; +import org.neo4j.kernel.impl.api.index.IndexStoreView; +import org.neo4j.kernel.impl.api.index.IndexingService; +import org.neo4j.kernel.impl.api.index.IndexingServiceFactory; +import org.neo4j.kernel.impl.api.scan.FullLabelStream; import org.neo4j.kernel.impl.api.store.SchemaCache; import org.neo4j.kernel.impl.constraints.StandardConstraintSemantics; import org.neo4j.kernel.impl.core.DelegatingTokenHolder; @@ -104,12 +95,12 @@ import org.neo4j.kernel.impl.factory.DatabaseInfo; import org.neo4j.kernel.impl.index.IndexConfigStore; import org.neo4j.kernel.impl.index.labelscan.NativeLabelScanStore; -import org.neo4j.kernel.impl.locking.LockService; import org.neo4j.kernel.impl.locking.Locks; import org.neo4j.kernel.impl.locking.NoOpClient; import org.neo4j.kernel.impl.logging.StoreLogService; import org.neo4j.kernel.impl.pagecache.ConfiguringPageCacheFactory; import org.neo4j.kernel.impl.pagecache.PageCacheLifecycle; +import org.neo4j.kernel.impl.scheduler.CentralJobScheduler; import org.neo4j.kernel.impl.spi.SimpleKernelContext; import org.neo4j.kernel.impl.storageengine.impl.recordstorage.PropertyCreator; import org.neo4j.kernel.impl.storageengine.impl.recordstorage.PropertyDeleter; @@ -152,6 +143,7 @@ import org.neo4j.kernel.impl.transaction.state.DefaultIndexProviderMap; import org.neo4j.kernel.impl.transaction.state.RecordAccess; import org.neo4j.kernel.impl.transaction.state.RecordAccess.RecordProxy; +import org.neo4j.kernel.impl.transaction.state.storeview.DynamicIndexStoreView; import org.neo4j.kernel.impl.transaction.state.storeview.NeoStoreIndexStoreView; import org.neo4j.kernel.impl.util.Dependencies; import org.neo4j.kernel.impl.util.ValueUtils; @@ -162,6 +154,7 @@ import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; import org.neo4j.logging.NullLog; +import org.neo4j.scheduler.JobScheduler; import org.neo4j.storageengine.api.schema.SchemaRule; import org.neo4j.unsafe.batchinsert.BatchInserter; import org.neo4j.unsafe.batchinsert.BatchRelationship; @@ -170,13 +163,15 @@ import static java.lang.Boolean.parseBoolean; import static java.util.Collections.emptyIterator; -import static org.eclipse.collections.impl.utility.ArrayIterate.contains; +import static java.util.Collections.emptyList; import static org.neo4j.collection.PrimitiveLongCollections.map; import static org.neo4j.graphdb.Label.label; import static org.neo4j.graphdb.factory.GraphDatabaseSettings.logs_directory; import static org.neo4j.graphdb.factory.GraphDatabaseSettings.store_internal_log_path; import static org.neo4j.helpers.Numbers.safeCastLongToInt; import static org.neo4j.internal.kernel.api.TokenRead.NO_TOKEN; +import static org.neo4j.kernel.impl.api.index.IndexingService.NO_MONITOR; +import static org.neo4j.kernel.impl.locking.LockService.NO_LOCK_SERVICE; import static org.neo4j.kernel.impl.store.NodeLabelsField.parseLabelsField; import static org.neo4j.kernel.impl.store.PropertyStore.encodeString; @@ -189,7 +184,6 @@ public class BatchInserterImpl implements BatchInserter, IndexConfigStoreProvide private final TokenHolders tokenHolders; private final IdGeneratorFactory idGeneratorFactory; private final IndexProviderMap indexProviderMap; - private final LabelScanStore labelScanStore; private final Log msgLog; private final SchemaCache schemaCache; private final Config config; @@ -197,6 +191,9 @@ public class BatchInserterImpl implements BatchInserter, IndexConfigStoreProvide private final StoreLocker storeLocker; private final PageCache pageCache; private final RecordStorageReader storageReader; + private final StoreLogService logService; + private final FileSystemAbstraction fileSystem; + private final Monitors monitors; private boolean labelsTouched; private boolean isShutdown; @@ -230,7 +227,7 @@ public Label apply( long from ) private final PropertyKeyTokenStore propertyKeyTokenStore; private final PropertyStore propertyStore; private final SchemaStore schemaStore; - private final NeoStoreIndexStoreView indexStoreView; + private final NeoStoreIndexStoreView storeIndexStoreView; private final LabelTokenStore labelTokenStore; private final Locks.Client noopLockClient = new NoOpClient(); @@ -243,6 +240,7 @@ public BatchInserterImpl( final File databaseDirectory, final FileSystemAbstract Map params = getDefaultParams(); params.putAll( stringParams ); this.config = Config.defaults( params ); + this.fileSystem = fileSystem; life = new LifeSupport(); this.databaseDirectory = databaseDirectory; @@ -256,7 +254,7 @@ public BatchInserterImpl( final File databaseDirectory, final FileSystemAbstract config.augment( logs_directory, databaseDirectory.getCanonicalPath() ); File internalLog = config.get( store_internal_log_path ); - StoreLogService logService = life.add( StoreLogService.withInternalLog( internalLog).build( fileSystem ) ); + logService = life.add( StoreLogService.withInternalLog( internalLog).build( fileSystem ) ); msgLog = logService.getInternalLog( getClass() ); boolean dump = config.get( GraphDatabaseSettings.dump_configuration ); @@ -289,11 +287,12 @@ public BatchInserterImpl( final File databaseDirectory, final FileSystemAbstract schemaStore = neoStores.getSchemaStore(); labelTokenStore = neoStores.getLabelTokenStore(); - indexStoreView = new NeoStoreIndexStoreView( LockService.NO_LOCK_SERVICE, neoStores ); + monitors = new Monitors(); + + storeIndexStoreView = new NeoStoreIndexStoreView( NO_LOCK_SERVICE, neoStores ); Dependencies deps = new Dependencies(); Monitors monitors = new Monitors(); - deps.satisfyDependencies( fileSystem, config, logService, indexStoreView, pageCache, monitors, - RecoveryCleanupWorkCollector.IMMEDIATE ); + deps.satisfyDependencies( fileSystem, config, logService, storeIndexStoreView, pageCache, monitors, RecoveryCleanupWorkCollector.IMMEDIATE ); DatabaseKernelExtensions extensions = life.add( new DatabaseKernelExtensions( new SimpleKernelContext( databaseDirectory, DatabaseInfo.UNKNOWN, deps ), @@ -307,14 +306,11 @@ public BatchInserterImpl( final File databaseDirectory, final FileSystemAbstract relationshipTypeTokenHolder.setInitialTokens( relationshipTypeTokenStore.getTokens() ); TokenHolder labelTokenHolder = new DelegatingTokenHolder( this::createNewLabelId, TokenHolder.TYPE_LABEL ); labelTokenHolder.setInitialTokens( labelTokenStore.getTokens() ); - this.tokenHolders = new TokenHolders( propertyKeyTokenHolder, labelTokenHolder, relationshipTypeTokenHolder ); + tokenHolders = new TokenHolders( propertyKeyTokenHolder, labelTokenHolder, relationshipTypeTokenHolder ); indexStore = life.add( new IndexConfigStore( this.databaseDirectory, fileSystem ) ); schemaCache = new SchemaCache( new StandardConstraintSemantics(), schemaStore, indexProviderMap ); - labelScanStore = new NativeLabelScanStore( pageCache, databaseDirectory, fileSystem, FullStoreChangeStream.EMPTY, false, monitors, - RecoveryCleanupWorkCollector.IMMEDIATE ); - life.add( labelScanStore ); actions = new BatchSchemaActions(); // Record access @@ -491,122 +487,63 @@ private void createIndex( int labelId, int[] propertyKeyIds ) flushStrategy.forceFlush(); } - private void repopulateAllIndexes() throws IOException, IndexEntryConflictException + private void repopulateAllIndexes( NativeLabelScanStore labelIndex ) { - if ( !labelsTouched ) - { - return; - } - - final StoreIndexDescriptor[] indexDescriptors = getIndexesNeedingPopulation(); - final List populators = new ArrayList<>( indexDescriptors.length ); - + LogProvider logProvider = logService.getInternalLogProvider(); + IndexStoreView indexStoreView = new DynamicIndexStoreView( storeIndexStoreView, labelIndex, NO_LOCK_SERVICE, neoStores, logProvider ); + JobScheduler jobScheduler = life.add( new CentralJobScheduler() ); + IndexingService indexingService = life.add( IndexingServiceFactory.createIndexingService( config, jobScheduler, indexProviderMap, indexStoreView, + new NonTransactionalTokenNameLookup( tokenHolders ), emptyList(), logProvider, NO_MONITOR, new DatabaseSchemaState( logProvider ) ) ); try { - final SchemaDescriptor[] descriptors = new LabelSchemaDescriptor[indexDescriptors.length]; - - for ( int i = 0; i < indexDescriptors.length; i++ ) - { - StoreIndexDescriptor index = indexDescriptors[i]; - descriptors[i] = index.schema(); - IndexPopulator populator = indexProviderMap.lookup( index.providerDescriptor() ) - .getPopulator( index, new IndexSamplingConfig( config ) ); - populator.create(); - populators.add( new IndexPopulatorWithSchema( populator, index ) ); - } - - Visitor propertyUpdateVisitor = updates -> - { - // Do a lookup from which property has changed to a list of indexes worried about that property. - // We do not need to load additional properties as the EntityUpdates for a full node store scan already - // include all properties for the node. - for ( IndexEntryUpdate indexUpdate : updates.forIndexKeys( populators ) ) - { - try - { - indexUpdate.indexKey().add( indexUpdate ); - } - catch ( IndexEntryConflictException conflict ) - { - throw conflict.notAllowed( indexUpdate.indexKey().index() ); - } - } - return true; - }; - - List descriptorList = Arrays.asList( descriptors ); - int[] labelIds = descriptorList.stream() - .flatMapToInt( d -> Arrays.stream( d.getEntityTokenIds() ) ) - .toArray(); - - int[] propertyKeyIds = descriptorList.stream() - .flatMapToInt( d -> Arrays.stream( d.getPropertyIds() ) ) - .toArray(); - - try ( InitialNodeLabelCreationVisitor labelUpdateVisitor = new InitialNodeLabelCreationVisitor() ) + StoreIndexDescriptor[] descriptors = getIndexesNeedingPopulation(); + indexingService.createIndexes( true /*verify constraints before flipping over*/, descriptors ); + for ( StoreIndexDescriptor descriptor : descriptors ) { - StoreScan storeScan = indexStoreView.visitNodes( labelIds, - propertyKeyId -> contains( propertyKeyIds, propertyKeyId ), - propertyUpdateVisitor, labelUpdateVisitor, true ); - storeScan.run(); - - IndexEntryConflictException conflictException = null; - for ( IndexPopulatorWithSchema populator : populators ) + IndexProxy indexProxy = getIndexProxy( indexingService, descriptor ); + try { - try - { - populator.verifyDeferredConstraints( indexStoreView ); - populator.close( true ); - } - catch ( IndexEntryConflictException e ) - { - populator.close( false ); - conflictException = Exceptions.chain( conflictException, e ); - } + indexProxy.awaitStoreScanCompleted(); } - if ( conflictException != null ) + catch ( IndexPopulationFailedKernelException e ) { - throw conflictException; + // In this scenario this is OK } } + indexingService.forceAll( IOLimiter.UNLIMITED ); } - finally + catch ( InterruptedException e ) { - IOUtils.closeAll( populators ); + // Someone wanted us to abort this. The indexes may not have been fully populated. This just means that they will be populated on next startup. + Thread.currentThread().interrupt(); } } - private void rebuildCounts() + private IndexProxy getIndexProxy( IndexingService indexingService, StoreIndexDescriptor descriptpr ) { - CountsTracker counts = neoStores.getCounts(); try { - counts.start(); + return indexingService.getIndexProxy( descriptpr.schema() ); } - catch ( IOException e ) + catch ( IndexNotFoundKernelException e ) { - throw new UnderlyingStorageException( e ); + throw new IllegalStateException( "Expected index by descriptor " + descriptpr + " to exist, but didn't", e ); } - - CountsComputer.recomputeCounts( neoStores, pageCache ); } - private class InitialNodeLabelCreationVisitor implements Visitor, Closeable + private void rebuildCounts() { - LabelScanWriter writer = labelScanStore.newWriter(); - - @Override - public boolean visit( NodeLabelUpdate update ) throws IOException + CountsTracker counts = neoStores.getCounts(); + try { - writer.write( update ); - return true; + counts.start(); } - - @Override - public void close() throws IOException + catch ( IOException e ) { - writer.close(); + throw new UnderlyingStorageException( e ); } + + CountsComputer.recomputeCounts( neoStores, pageCache ); } private StoreIndexDescriptor[] getIndexesNeedingPopulation() @@ -991,10 +928,10 @@ public void shutdown() try { - repopulateAllIndexes(); - labelScanStore.force( IOLimiter.UNLIMITED ); + NativeLabelScanStore labelIndex = buildLabelIndex(); + repopulateAllIndexes( labelIndex ); } - catch ( IOException | IndexEntryConflictException e ) + catch ( IOException e ) { throw new RuntimeException( e ); } @@ -1016,6 +953,20 @@ public void shutdown() } } + private NativeLabelScanStore buildLabelIndex() throws IOException + { + NativeLabelScanStore labelIndex = + new NativeLabelScanStore( pageCache, databaseDirectory, fileSystem, new FullLabelStream( storeIndexStoreView ), false, monitors, + RecoveryCleanupWorkCollector.IMMEDIATE ); + if ( labelsTouched ) + { + labelIndex.drop(); + } + // Rebuild will happen as part of this call if it was dropped + life.add( labelIndex ); + return labelIndex; + } + @Override public String toString() { @@ -1302,63 +1253,4 @@ public void forceFlush() attempts = 0; } } - - private static class IndexPopulatorWithSchema extends IndexPopulator.Adapter implements SchemaDescriptorSupplier, AutoCloseable - { - private static final int batchSize = 1_000; - private final IndexPopulator populator; - private final IndexDescriptor index; - private Collection> batchedUpdates = new ArrayList<>( batchSize ); - private boolean closed; - - IndexPopulatorWithSchema( IndexPopulator populator, IndexDescriptor index ) - { - this.populator = populator; - this.index = index; - } - - @Override - public SchemaDescriptor schema() - { - return index.schema(); - } - - public IndexDescriptor index() - { - return index; - } - - public void add( IndexEntryUpdate update ) throws IndexEntryConflictException - { - batchedUpdates.add( update ); - if ( batchedUpdates.size() > batchSize ) - { - populator.add( batchedUpdates ); - batchedUpdates = new ArrayList<>( batchSize ); - } - } - - @Override - public void verifyDeferredConstraints( NodePropertyAccessor nodePropertyAccessor ) throws IndexEntryConflictException - { - populator.add( batchedUpdates ); - populator.verifyDeferredConstraints( nodePropertyAccessor ); - } - - @Override - public void close() throws IOException - { - close( false ); - } - - @Override - public void close( boolean populationCompletedSuccessfully ) - { - if ( !closed ) - { - closed = true; - populator.close( populationCompletedSuccessfully ); - } - } - } } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/IndexPopulationTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/IndexPopulationTest.java index 8ad0172479776..7c3b2512751ca 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/IndexPopulationTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/IndexPopulationTest.java @@ -67,7 +67,7 @@ public void mustFlipToFailedIfFailureToApplyLastBatchWhileFlipping() throws Exce multipleIndexPopulator.indexAllEntities().run(); // when - indexPopulation.flip(); + indexPopulation.flip( false ); // then assertTrue( "flipper should have flipped to failing proxy", flipper.getState() == InternalIndexState.FAILED ); diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/MultipleIndexPopulatorTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/MultipleIndexPopulatorTest.java index ed5ebb1e2c9fe..ea0db70ea78fe 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/MultipleIndexPopulatorTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/MultipleIndexPopulatorTest.java @@ -103,7 +103,7 @@ public void canceledPopulationNotAbleToFlip() throws IOException, FlipFailedKern indexPopulation.cancel(); - indexPopulation.flip(); + indexPopulation.flip( false ); verify( indexPopulation.populator, never() ).sampleResult(); } @@ -114,7 +114,7 @@ public void flippedPopulationAreNotCanceable() throws IOException, FlipFailedKer IndexPopulator populator = createIndexPopulator(); IndexPopulation indexPopulation = addPopulator( populator, 1 ); - indexPopulation.flip(); + indexPopulation.flip( false ); indexPopulation.cancel(); @@ -184,7 +184,7 @@ public void cancelingSinglePopulatorDoNotCancelAnyOther() throws FlipFailedKerne assertTrue( multipleIndexPopulator.hasPopulators() ); - multipleIndexPopulator.flipAfterPopulation(); + multipleIndexPopulator.flipAfterPopulation( false ); verify( populationToKeepActive.flipper ).flip( any( Callable.class ), any( FailedIndexProxyFactory.class ) ); } @@ -206,7 +206,7 @@ public void canceledPopulatorDoNotFlipWhenPopulationCompleted() throws FlipFaile assertTrue( multipleIndexPopulator.hasPopulators() ); - multipleIndexPopulator.flipAfterPopulation(); + multipleIndexPopulator.flipAfterPopulation( false ); verify( populationToCancel.flipper, never() ).flip( any( Callable.class ), any( FailedIndexProxyFactory.class ) ); } @@ -314,7 +314,7 @@ public void testFlipAfterPopulation() throws Exception FlippableIndexProxy flipper1 = addPopulator( indexPopulator1, 1 ).flipper; FlippableIndexProxy flipper2 = addPopulator( indexPopulator2, 2 ).flipper; - multipleIndexPopulator.flipAfterPopulation(); + multipleIndexPopulator.flipAfterPopulation( false ); verify( flipper1 ).flip( any( Callable.class ), any( FailedIndexProxyFactory.class ) ); verify( flipper2 ).flip( any( Callable.class ), any( FailedIndexProxyFactory.class ) ); @@ -331,7 +331,7 @@ public void populationsRemovedDuringFlip() throws FlipFailedKernelException assertTrue( multipleIndexPopulator.hasPopulators() ); - multipleIndexPopulator.flipAfterPopulation(); + multipleIndexPopulator.flipAfterPopulation( false ); assertFalse( multipleIndexPopulator.hasPopulators() ); } @@ -368,7 +368,7 @@ public void testIndexFlip() when( indexPopulator1.sampleResult() ).thenThrow( getSampleError() ); multipleIndexPopulator.indexAllEntities(); - multipleIndexPopulator.flipAfterPopulation(); + multipleIndexPopulator.flipAfterPopulation( false ); verify( indexPopulator1 ).close( false ); verify( failedIndexProxyFactory, times( 1 ) ).create( any( RuntimeException.class ) ); @@ -462,7 +462,28 @@ public void testMultiplePropertyUpdateFailures() throws IndexEntryConflictExcept checkPopulatorFailure( populator ); } - private static IndexEntryUpdate createIndexEntryUpdate( LabelSchemaDescriptor schemaDescriptor ) + @Test + public void shouldVerifyConstraintsBeforeFlippingIfToldTo() throws IOException, IndexEntryConflictException + { + // given + IndexProxyFactory indexProxyFactory = mock( IndexProxyFactory.class ); + FailedIndexProxyFactory failedIndexProxyFactory = mock( FailedIndexProxyFactory.class ); + FlippableIndexProxy flipper = new FlippableIndexProxy(); + flipper.setFlipTarget( indexProxyFactory ); + IndexPopulator indexPopulator = createIndexPopulator(); + addPopulator( indexPopulator, 1, flipper, failedIndexProxyFactory ); + when( indexPopulator.sampleResult() ).thenReturn( new IndexSample() ); + + // when + multipleIndexPopulator.indexAllEntities(); + multipleIndexPopulator.flipAfterPopulation( true ); + + // then + verify( indexPopulator ).verifyDeferredConstraints( any( NodePropertyAccessor.class ) ); + verify( indexPopulator ).close( true ); + } + + private IndexEntryUpdate createIndexEntryUpdate( LabelSchemaDescriptor schemaDescriptor ) { return add( 1, schemaDescriptor, "theValue" ); }