Skip to content

Commit

Permalink
Fixes a concurrency issue creating constraints
Browse files Browse the repository at this point in the history
When multiple concurrent transactions wants to create uniqueness constraint
for a given label and property they would first race towards creating the backing index.
One would win and get the exclusive lock on the label first and the others would
await that lock. This lock would would be released during building of the index,
which would then set the stage for the next race, which would be creation of the
constraint for that index rule. One would win and set the last constraint tx field
in the meta data store, whereas the others would notice that change and rollback.

So to the problem: The transactions doing the rollback would think that it should
delete the index that it registered as being the index backing the constraint that
it wanted to create. This perhaps was correct previously when constraint creation
was monolithic both for the whole schema store as well as keeping that lock
during building of the backing index. With that gone this is no longer correct.

The solution is to simply not delete the backing index when encountering this
type of rollback. Trying to create a uniqueness constraint on non-unique data will
still delete the backing index in addition to not creating the constraint, so this
change just fix a concurrency issue with creating successful uniqueness constraint.
  • Loading branch information
tinwelint committed Dec 12, 2017
1 parent 2bd1aef commit a205ecf
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 19 deletions.
Expand Up @@ -607,7 +607,15 @@ private IndexBackedConstraintDescriptor indexBackedConstraintCreate( KernelState
}
}
long indexId = constraintIndexCreator.createUniquenessConstraintIndex( state, this, descriptor );
state.txState().constraintDoAdd( constraint, indexId );
if ( !constraintExists( state, constraint ) )
{
// This looks weird, but since we release the label lock while awaiting population of the index
// backing this constraint there can be someone else getting ahead of us, creating this exact constraint
// before we do, so now getting out here under the lock we must check again and if it exists
// we must at this point consider this an idempotent operation because we verified earlier
// that it didn't exist and went on to create it.
state.txState().constraintDoAdd( constraint, indexId );
}
}
return constraint;
}
Expand Down
Expand Up @@ -64,7 +64,7 @@ public FlippableIndexProxy()
this( null );
}

public FlippableIndexProxy( IndexProxy originalDelegate )
FlippableIndexProxy( IndexProxy originalDelegate )
{
this.delegate = originalDelegate;
}
Expand Down Expand Up @@ -292,7 +292,7 @@ public boolean awaitStoreScanCompleted() throws IndexPopulationFailedKernelExcep
lock.readLock().lock();
proxy = delegate;
lock.readLock().unlock();
} while ( proxy.awaitStoreScanCompleted() );
} while ( !closed && proxy.awaitStoreScanCompleted() );
return true;
}

Expand Down
Expand Up @@ -129,14 +129,19 @@ public long createUniquenessConstraintIndex(
// created and activated.
acquireLabelLock( state, locks, descriptor.getLabelId() );
reacquiredLabelLock = true;

indexingService.getIndexProxy( indexId ).verifyDeferredConstraints( propertyAccessor );
success = true;
return indexId;
}
catch ( SchemaRuleNotFoundException | IndexNotFoundKernelException e )
catch ( SchemaRuleNotFoundException e )
{
throw new IllegalStateException(
String.format( "Index (%s) that we just created does not exist.", descriptor ) );
String.format( "Index (%s) that we just created does not exist.", descriptor ), e );
}
catch ( IndexNotFoundKernelException e )
{
throw new TransactionFailureException( String.format( "Index (%s) that we just created does not exist.", descriptor ), e );
}
catch ( IndexEntryConflictException e )
{
Expand All @@ -154,11 +159,22 @@ public long createUniquenessConstraintIndex(
{
acquireLabelLock( state, locks, descriptor.getLabelId() );
}
dropUniquenessConstraintIndex( index );

if ( indexStillExists( schemaOps, state, descriptor, index ) )
{
dropUniquenessConstraintIndex( index );
}
}
}
}

private boolean indexStillExists( SchemaReadOperations schemaOps, KernelStatement state, LabelSchemaDescriptor descriptor,
IndexDescriptor index )
{
IndexDescriptor existingIndex = schemaOps.indexGetForSchema( state, descriptor );
return existingIndex != null && existingIndex.equals( index );
}

private void acquireLabelLock( KernelStatement state, Client locks, int labelId )
{
locks.acquireExclusive( state.lockTracer(), LABEL, labelId );
Expand Down
Expand Up @@ -28,15 +28,23 @@
import java.util.List;
import java.util.Set;

import org.neo4j.graphdb.ConstraintViolationException;
import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.TransientFailureException;
import org.neo4j.test.Race;
import org.neo4j.test.rule.DatabaseRule;
import org.neo4j.test.rule.ImpermanentDatabaseRule;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static java.util.concurrent.TimeUnit.SECONDS;

import static org.neo4j.helpers.collection.Iterables.asList;
import static org.neo4j.helpers.collection.Iterables.single;
import static org.neo4j.helpers.collection.Iterables.singleOrNull;

public class ConcurrentCreateDropIndexIT
{
Expand Down Expand Up @@ -159,6 +167,78 @@ public void concurrentMixedCreatingAndDroppingOfIndexesShouldNotInterfere() thro
}
}

@Test
public void concurrentCreatingUniquenessConstraint() throws Throwable
{
// given
Race race = new Race().withMaxDuration( 10, SECONDS );
Label label = label( 0 );
race.addContestants( 10, () ->
{
try ( Transaction tx = db.beginTx() )
{
db.schema().constraintFor( label ).assertPropertyIsUnique( KEY ).create();
tx.success();
}
catch ( TransientFailureException | ConstraintViolationException e )
{ // It's OK
}
}, 300 );

// when
race.go();

try ( Transaction tx = db.beginTx() )
{
// then
ConstraintDefinition constraint = single( db.schema().getConstraints( label ) );
assertNotNull( constraint );
IndexDefinition index = single( db.schema().getIndexes( label ) );
assertNotNull( index );
tx.success();
}
}

@Test
public void concurrentCreatingUniquenessConstraintOnNonUniqueData() throws Throwable
{
// given
Label label = label( 0 );
try ( Transaction tx = db.beginTx() )
{
for ( int i = 0; i < 2; i++ )
{
db.createNode( label ).setProperty( KEY, "A" );
}
tx.success();
}
Race race = new Race().withMaxDuration( 10, SECONDS );
race.addContestants( 3, () ->
{
try ( Transaction tx = db.beginTx() )
{
db.schema().constraintFor( label ).assertPropertyIsUnique( KEY ).create();
tx.success();
}
catch ( TransientFailureException | ConstraintViolationException e )
{ // It's OK
}
}, 100 );

// when
race.go();

try ( Transaction tx = db.beginTx() )
{
// then
ConstraintDefinition constraint = singleOrNull( db.schema().getConstraints( label ) );
assertNull( constraint );
IndexDefinition index = singleOrNull( db.schema().getIndexes( label ) );
assertNull( index );
tx.success();
}
}

private Runnable indexCreate( int labelIndex )
{
return () ->
Expand Down
Expand Up @@ -329,7 +329,6 @@ private static class TestKernelTransaction extends KernelTransactionImplementati
{
final CommitTrackingMonitor monitor;

@SuppressWarnings( "unchecked" )
TestKernelTransaction( CommitTrackingMonitor monitor )
{
super( mock( StatementOperationParts.class ), mock( SchemaWriteGuard.class ), new TransactionHooks(),
Expand Down
Expand Up @@ -50,16 +50,19 @@
import org.neo4j.kernel.impl.api.StatementOperationParts;
import org.neo4j.kernel.impl.api.index.IndexProxy;
import org.neo4j.kernel.impl.api.index.IndexingService;
import org.neo4j.kernel.impl.api.operations.SchemaReadOperations;
import org.neo4j.kernel.impl.api.state.ConstraintIndexCreator;
import org.neo4j.kernel.impl.locking.ResourceTypes;
import org.neo4j.values.storable.Values;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
Expand Down Expand Up @@ -120,22 +123,23 @@ public void shouldDropIndexIfPopulationFails() throws Exception
IndexingService indexingService = mock( IndexingService.class );
StubKernel kernel = new StubKernel();

when( constraintCreationContext.schemaReadOperations().indexGetCommittedId( state, index ) )
.thenReturn( 2468L );
SchemaReadOperations schemaOps = constraintCreationContext.schemaReadOperations();
when( schemaOps.indexGetCommittedId( state, index ) ).thenReturn( 2468L );
IndexProxy indexProxy = mock( IndexProxy.class );
when( indexingService.getIndexProxy( 2468L ) ).thenReturn( indexProxy );
IndexEntryConflictException cause = new IndexEntryConflictException( 2, 1, Values.of( "a" ) );
doThrow( new IndexPopulationFailedKernelException( descriptor, "some index", cause ) )
.when( indexProxy ).awaitStoreScanCompleted();
PropertyAccessor propertyAccessor = mock( PropertyAccessor.class );
when( constraintCreationContext.schemaReadOperations().indexGetForSchema( state, descriptor ) )
.thenReturn( null );
when( schemaOps.indexGetForSchema( any( KernelStatement.class ), any( LabelSchemaDescriptor.class ) ) )
.thenReturn( null ) // first claim it doesn't exist, because it doesn't... so that it gets created
.thenReturn( index ); // then after it failed claim it does exist
ConstraintIndexCreator creator = new ConstraintIndexCreator( () -> kernel, indexingService, propertyAccessor );

// when
try
{
creator.createUniquenessConstraintIndex( state, constraintCreationContext.schemaReadOperations(), descriptor );
creator.createUniquenessConstraintIndex( state, schemaOps, descriptor );

fail( "expected exception" );
}
Expand All @@ -150,9 +154,9 @@ public void shouldDropIndexIfPopulationFails() throws Exception
IndexDescriptor newIndex = IndexDescriptorFactory.uniqueForLabel( 123, 456 );
verify( tx1 ).indexRuleDoAdd( newIndex );
verifyNoMoreInteractions( tx1 );
verify( constraintCreationContext.schemaReadOperations() ).indexGetCommittedId( state, index );
verify( constraintCreationContext.schemaReadOperations() ).indexGetForSchema( state, descriptor );
verifyNoMoreInteractions( constraintCreationContext.schemaReadOperations() );
verify( schemaOps ).indexGetCommittedId( state, index );
verify( schemaOps, times( 2 ) ).indexGetForSchema( state, descriptor );
verifyNoMoreInteractions( schemaOps );
TransactionState tx2 = kernel.statements.get( 1 ).txState();
verify( tx2 ).indexDoDrop( newIndex );
verifyNoMoreInteractions( tx2 );
Expand Down
Expand Up @@ -23,28 +23,27 @@
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;

import org.neo4j.kernel.api.exceptions.index.FlipFailedKernelException;
import org.neo4j.kernel.api.exceptions.index.IndexProxyAlreadyClosedKernelException;
import org.neo4j.test.OtherThreadExecutor;
import org.neo4j.test.rule.CleanupRule;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

import static java.util.concurrent.TimeUnit.SECONDS;

import static org.neo4j.kernel.impl.api.index.SchemaIndexTestHelper.awaitFuture;
import static org.neo4j.kernel.impl.api.index.SchemaIndexTestHelper.awaitLatch;
import static org.neo4j.kernel.impl.api.index.SchemaIndexTestHelper.mockIndexProxy;

public class FlippableIndexProxyTest
{

@Rule
public final CleanupRule cleanup = new CleanupRule();
@Rule
Expand Down Expand Up @@ -149,6 +148,28 @@ public void shouldBlockAccessDuringFlipAndThenDelegateToCorrectContext() throws
verify( contextAfterFlip ).drop();
}

@Test
public void shouldAbortStoreScanWaitOnDrop() throws Exception
{
// given the proxy structure
FakePopulatingIndexProxy delegate = new FakePopulatingIndexProxy();
FlippableIndexProxy flipper = new FlippableIndexProxy( delegate );
OtherThreadExecutor<Void> waiter = cleanup.add( new OtherThreadExecutor<Void>( "Waiter", null ) );

// and a thread stuck in the awaitStoreScanCompletion loop
Future<Object> waiting = waiter.executeDontWait( state -> flipper.awaitStoreScanCompleted() );
while ( !delegate.awaitCalled )
{
Thread.sleep( 10 );
}

// when
flipper.drop().get();

// then the waiting should quickly be over
waiting.get( 10, SECONDS );
}

private OtherThreadExecutor.WorkerCommand<Void, Void> dropTheIndex( final FlippableIndexProxy flippable )
{
return state ->
Expand Down Expand Up @@ -188,4 +209,16 @@ private FailedIndexProxyFactory singleFailedDelegate( final IndexProxy failed )
{
return failure -> failed;
}

private static class FakePopulatingIndexProxy extends IndexProxyAdapter
{
private volatile boolean awaitCalled;

@Override
public boolean awaitStoreScanCompleted()
{
awaitCalled = true;
return true;
}
}
}

0 comments on commit a205ecf

Please sign in to comment.