diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/StateHandlingStatementOperations.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/StateHandlingStatementOperations.java index 400264c4da693..866dbb9a7cc45 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/StateHandlingStatementOperations.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/StateHandlingStatementOperations.java @@ -607,7 +607,15 @@ private IndexBackedConstraintDescriptor indexBackedConstraintCreate( KernelState } } long indexId = constraintIndexCreator.createUniquenessConstraintIndex( state, this, descriptor ); - state.txState().constraintDoAdd( constraint, indexId ); + if ( !constraintExists( state, constraint ) ) + { + // This looks weird, but since we release the label lock while awaiting population of the index + // backing this constraint there can be someone else getting ahead of us, creating this exact constraint + // before we do, so now getting out here under the lock we must check again and if it exists + // we must at this point consider this an idempotent operation because we verified earlier + // that it didn't exist and went on to create it. + state.txState().constraintDoAdd( constraint, indexId ); + } } return constraint; } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/FlippableIndexProxy.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/FlippableIndexProxy.java index 9aa6f26707099..7a181da4ec0d6 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/FlippableIndexProxy.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/index/FlippableIndexProxy.java @@ -64,7 +64,7 @@ public FlippableIndexProxy() this( null ); } - public FlippableIndexProxy( IndexProxy originalDelegate ) + FlippableIndexProxy( IndexProxy originalDelegate ) { this.delegate = originalDelegate; } @@ -292,7 +292,7 @@ public boolean awaitStoreScanCompleted() throws IndexPopulationFailedKernelExcep lock.readLock().lock(); proxy = delegate; lock.readLock().unlock(); - } while ( proxy.awaitStoreScanCompleted() ); + } while ( !closed && proxy.awaitStoreScanCompleted() ); return true; } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/state/ConstraintIndexCreator.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/state/ConstraintIndexCreator.java index 53edc40a01a38..46c83fb192ca8 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/state/ConstraintIndexCreator.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/state/ConstraintIndexCreator.java @@ -129,14 +129,19 @@ public long createUniquenessConstraintIndex( // created and activated. acquireLabelLock( state, locks, descriptor.getLabelId() ); reacquiredLabelLock = true; + indexingService.getIndexProxy( indexId ).verifyDeferredConstraints( propertyAccessor ); success = true; return indexId; } - catch ( SchemaRuleNotFoundException | IndexNotFoundKernelException e ) + catch ( SchemaRuleNotFoundException e ) { throw new IllegalStateException( - String.format( "Index (%s) that we just created does not exist.", descriptor ) ); + String.format( "Index (%s) that we just created does not exist.", descriptor ), e ); + } + catch ( IndexNotFoundKernelException e ) + { + throw new TransactionFailureException( String.format( "Index (%s) that we just created does not exist.", descriptor ), e ); } catch ( IndexEntryConflictException e ) { @@ -154,11 +159,22 @@ public long createUniquenessConstraintIndex( { acquireLabelLock( state, locks, descriptor.getLabelId() ); } - dropUniquenessConstraintIndex( index ); + + if ( indexStillExists( schemaOps, state, descriptor, index ) ) + { + dropUniquenessConstraintIndex( index ); + } } } } + private boolean indexStillExists( SchemaReadOperations schemaOps, KernelStatement state, LabelSchemaDescriptor descriptor, + IndexDescriptor index ) + { + IndexDescriptor existingIndex = schemaOps.indexGetForSchema( state, descriptor ); + return existingIndex != null && existingIndex.equals( index ); + } + private void acquireLabelLock( KernelStatement state, Client locks, int labelId ) { locks.acquireExclusive( state.lockTracer(), LABEL, labelId ); diff --git a/community/kernel/src/test/java/org/neo4j/graphdb/schema/ConcurrentCreateDropIndexIT.java b/community/kernel/src/test/java/org/neo4j/graphdb/schema/ConcurrentCreateDropIndexIT.java index b4e0b0ae4f4e3..f697b7f19cec4 100644 --- a/community/kernel/src/test/java/org/neo4j/graphdb/schema/ConcurrentCreateDropIndexIT.java +++ b/community/kernel/src/test/java/org/neo4j/graphdb/schema/ConcurrentCreateDropIndexIT.java @@ -28,15 +28,23 @@ import java.util.List; import java.util.Set; +import org.neo4j.graphdb.ConstraintViolationException; import org.neo4j.graphdb.Label; import org.neo4j.graphdb.Transaction; +import org.neo4j.graphdb.TransientFailureException; import org.neo4j.test.Race; import org.neo4j.test.rule.DatabaseRule; import org.neo4j.test.rule.ImpermanentDatabaseRule; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static java.util.concurrent.TimeUnit.SECONDS; + import static org.neo4j.helpers.collection.Iterables.asList; +import static org.neo4j.helpers.collection.Iterables.single; +import static org.neo4j.helpers.collection.Iterables.singleOrNull; public class ConcurrentCreateDropIndexIT { @@ -159,6 +167,78 @@ public void concurrentMixedCreatingAndDroppingOfIndexesShouldNotInterfere() thro } } + @Test + public void concurrentCreatingUniquenessConstraint() throws Throwable + { + // given + Race race = new Race().withMaxDuration( 10, SECONDS ); + Label label = label( 0 ); + race.addContestants( 10, () -> + { + try ( Transaction tx = db.beginTx() ) + { + db.schema().constraintFor( label ).assertPropertyIsUnique( KEY ).create(); + tx.success(); + } + catch ( TransientFailureException | ConstraintViolationException e ) + { // It's OK + } + }, 300 ); + + // when + race.go(); + + try ( Transaction tx = db.beginTx() ) + { + // then + ConstraintDefinition constraint = single( db.schema().getConstraints( label ) ); + assertNotNull( constraint ); + IndexDefinition index = single( db.schema().getIndexes( label ) ); + assertNotNull( index ); + tx.success(); + } + } + + @Test + public void concurrentCreatingUniquenessConstraintOnNonUniqueData() throws Throwable + { + // given + Label label = label( 0 ); + try ( Transaction tx = db.beginTx() ) + { + for ( int i = 0; i < 2; i++ ) + { + db.createNode( label ).setProperty( KEY, "A" ); + } + tx.success(); + } + Race race = new Race().withMaxDuration( 10, SECONDS ); + race.addContestants( 3, () -> + { + try ( Transaction tx = db.beginTx() ) + { + db.schema().constraintFor( label ).assertPropertyIsUnique( KEY ).create(); + tx.success(); + } + catch ( TransientFailureException | ConstraintViolationException e ) + { // It's OK + } + }, 100 ); + + // when + race.go(); + + try ( Transaction tx = db.beginTx() ) + { + // then + ConstraintDefinition constraint = singleOrNull( db.schema().getConstraints( label ) ); + assertNull( constraint ); + IndexDefinition index = singleOrNull( db.schema().getIndexes( label ) ); + assertNull( index ); + tx.success(); + } + } + private Runnable indexCreate( int labelIndex ) { return () -> diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionTerminationTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionTerminationTest.java index 192c199ac1383..7ecdc82917d99 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionTerminationTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionTerminationTest.java @@ -329,7 +329,6 @@ private static class TestKernelTransaction extends KernelTransactionImplementati { final CommitTrackingMonitor monitor; - @SuppressWarnings( "unchecked" ) TestKernelTransaction( CommitTrackingMonitor monitor ) { super( mock( StatementOperationParts.class ), mock( SchemaWriteGuard.class ), new TransactionHooks(), diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/constraints/ConstraintIndexCreatorTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/constraints/ConstraintIndexCreatorTest.java index 9c34eb9fc49a1..a18a340dc718a 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/constraints/ConstraintIndexCreatorTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/constraints/ConstraintIndexCreatorTest.java @@ -50,16 +50,19 @@ import org.neo4j.kernel.impl.api.StatementOperationParts; import org.neo4j.kernel.impl.api.index.IndexProxy; import org.neo4j.kernel.impl.api.index.IndexingService; +import org.neo4j.kernel.impl.api.operations.SchemaReadOperations; import org.neo4j.kernel.impl.api.state.ConstraintIndexCreator; import org.neo4j.kernel.impl.locking.ResourceTypes; import org.neo4j.values.storable.Values; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyZeroInteractions; @@ -120,22 +123,23 @@ public void shouldDropIndexIfPopulationFails() throws Exception IndexingService indexingService = mock( IndexingService.class ); StubKernel kernel = new StubKernel(); - when( constraintCreationContext.schemaReadOperations().indexGetCommittedId( state, index ) ) - .thenReturn( 2468L ); + SchemaReadOperations schemaOps = constraintCreationContext.schemaReadOperations(); + when( schemaOps.indexGetCommittedId( state, index ) ).thenReturn( 2468L ); IndexProxy indexProxy = mock( IndexProxy.class ); when( indexingService.getIndexProxy( 2468L ) ).thenReturn( indexProxy ); IndexEntryConflictException cause = new IndexEntryConflictException( 2, 1, Values.of( "a" ) ); doThrow( new IndexPopulationFailedKernelException( descriptor, "some index", cause ) ) .when( indexProxy ).awaitStoreScanCompleted(); PropertyAccessor propertyAccessor = mock( PropertyAccessor.class ); - when( constraintCreationContext.schemaReadOperations().indexGetForSchema( state, descriptor ) ) - .thenReturn( null ); + when( schemaOps.indexGetForSchema( any( KernelStatement.class ), any( LabelSchemaDescriptor.class ) ) ) + .thenReturn( null ) // first claim it doesn't exist, because it doesn't... so that it gets created + .thenReturn( index ); // then after it failed claim it does exist ConstraintIndexCreator creator = new ConstraintIndexCreator( () -> kernel, indexingService, propertyAccessor ); // when try { - creator.createUniquenessConstraintIndex( state, constraintCreationContext.schemaReadOperations(), descriptor ); + creator.createUniquenessConstraintIndex( state, schemaOps, descriptor ); fail( "expected exception" ); } @@ -150,9 +154,9 @@ public void shouldDropIndexIfPopulationFails() throws Exception IndexDescriptor newIndex = IndexDescriptorFactory.uniqueForLabel( 123, 456 ); verify( tx1 ).indexRuleDoAdd( newIndex ); verifyNoMoreInteractions( tx1 ); - verify( constraintCreationContext.schemaReadOperations() ).indexGetCommittedId( state, index ); - verify( constraintCreationContext.schemaReadOperations() ).indexGetForSchema( state, descriptor ); - verifyNoMoreInteractions( constraintCreationContext.schemaReadOperations() ); + verify( schemaOps ).indexGetCommittedId( state, index ); + verify( schemaOps, times( 2 ) ).indexGetForSchema( state, descriptor ); + verifyNoMoreInteractions( schemaOps ); TransactionState tx2 = kernel.statements.get( 1 ).txState(); verify( tx2 ).indexDoDrop( newIndex ); verifyNoMoreInteractions( tx2 ); diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/FlippableIndexProxyTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/FlippableIndexProxyTest.java index 5f5ae1b8b311f..a4427f596b5ee 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/FlippableIndexProxyTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/index/FlippableIndexProxyTest.java @@ -23,28 +23,27 @@ import org.junit.Test; import org.junit.rules.ExpectedException; -import java.io.IOException; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; -import org.neo4j.kernel.api.exceptions.index.FlipFailedKernelException; import org.neo4j.kernel.api.exceptions.index.IndexProxyAlreadyClosedKernelException; import org.neo4j.test.OtherThreadExecutor; import org.neo4j.test.rule.CleanupRule; -import static java.util.concurrent.TimeUnit.SECONDS; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; + +import static java.util.concurrent.TimeUnit.SECONDS; + import static org.neo4j.kernel.impl.api.index.SchemaIndexTestHelper.awaitFuture; import static org.neo4j.kernel.impl.api.index.SchemaIndexTestHelper.awaitLatch; import static org.neo4j.kernel.impl.api.index.SchemaIndexTestHelper.mockIndexProxy; public class FlippableIndexProxyTest { - @Rule public final CleanupRule cleanup = new CleanupRule(); @Rule @@ -149,6 +148,28 @@ public void shouldBlockAccessDuringFlipAndThenDelegateToCorrectContext() throws verify( contextAfterFlip ).drop(); } + @Test + public void shouldAbortStoreScanWaitOnDrop() throws Exception + { + // given the proxy structure + FakePopulatingIndexProxy delegate = new FakePopulatingIndexProxy(); + FlippableIndexProxy flipper = new FlippableIndexProxy( delegate ); + OtherThreadExecutor waiter = cleanup.add( new OtherThreadExecutor( "Waiter", null ) ); + + // and a thread stuck in the awaitStoreScanCompletion loop + Future waiting = waiter.executeDontWait( state -> flipper.awaitStoreScanCompleted() ); + while ( !delegate.awaitCalled ) + { + Thread.sleep( 10 ); + } + + // when + flipper.drop().get(); + + // then the waiting should quickly be over + waiting.get( 10, SECONDS ); + } + private OtherThreadExecutor.WorkerCommand dropTheIndex( final FlippableIndexProxy flippable ) { return state -> @@ -188,4 +209,16 @@ private FailedIndexProxyFactory singleFailedDelegate( final IndexProxy failed ) { return failure -> failed; } + + private static class FakePopulatingIndexProxy extends IndexProxyAdapter + { + private volatile boolean awaitCalled; + + @Override + public boolean awaitStoreScanCompleted() + { + awaitCalled = true; + return true; + } + } }