Skip to content

Commit

Permalink
Adds configuration for releasing schema lock in constraint index buil…
Browse files Browse the repository at this point in the history
…ding
  • Loading branch information
tinwelint committed Feb 20, 2017
1 parent 18ad2f1 commit c4b7a4d
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 15 deletions.
Expand Up @@ -511,6 +511,11 @@ private static String defaultPageCacheMemory()
public static final Setting<File> auth_store =
pathSetting( "unsupported.dbms.security.auth_store.location", NO_DEFAULT );

@Description( "Whether or not to release the exclusive schema lock is while building uniqueness constraints index" )
@Internal
public static final Setting<Boolean> release_schema_lock_while_building_constraint = setting(
"unsupported.dbms.schema.release_lock_while_building_constraint", BOOLEAN, FALSE );

// Bolt Settings

@Group("dbms.connector")
Expand Down
Expand Up @@ -779,8 +779,11 @@ private KernelModule buildKernel( TransactionAppender appender,
*/
Supplier<KernelAPI> kernelProvider = () -> kernelModule.kernelAPI();

boolean releaseSchemaLockWhenBuildingConstratinIndexes =
config.get( GraphDatabaseSettings.release_schema_lock_while_building_constraint );
ConstraintIndexCreator constraintIndexCreator =
new ConstraintIndexCreator( kernelProvider, indexingService, propertyAccessor );
new ConstraintIndexCreator( kernelProvider, indexingService, propertyAccessor,
releaseSchemaLockWhenBuildingConstratinIndexes );

LegacyIndexStore legacyIndexStore = new LegacyIndexStore( config,
indexConfigStore, kernelProvider, legacyIndexProviderLookup );
Expand Down
Expand Up @@ -42,7 +42,6 @@
import org.neo4j.kernel.impl.api.index.IndexingService;
import org.neo4j.kernel.impl.api.operations.SchemaReadOperations;
import org.neo4j.kernel.impl.locking.Locks.Client;

import static java.util.Collections.singleton;

import static org.neo4j.kernel.impl.locking.ResourceTypes.SCHEMA;
Expand All @@ -54,13 +53,15 @@ public class ConstraintIndexCreator
private final IndexingService indexingService;
private final Supplier<KernelAPI> kernelSupplier;
private final PropertyAccessor propertyAccessor;
private final boolean releaseSchemaLockWhenCreatingConstraint;

public ConstraintIndexCreator( Supplier<KernelAPI> kernelSupplier, IndexingService indexingService,
PropertyAccessor propertyAccessor )
PropertyAccessor propertyAccessor, boolean releaseSchemaLockWhenCreatingConstraint )
{
this.kernelSupplier = kernelSupplier;
this.indexingService = indexingService;
this.propertyAccessor = propertyAccessor;
this.releaseSchemaLockWhenCreatingConstraint = releaseSchemaLockWhenCreatingConstraint;
}

/**
Expand Down Expand Up @@ -101,15 +102,15 @@ public long createUniquenessConstraintIndex( KernelStatement state, SchemaReadOp
// At this point the integrity of the constraint to be created was checked
// while holding the lock and the index rule backing the soon-to-be-created constraint
// has been created. Now it's just the population left, which can take a long time
locks.releaseExclusive( SCHEMA, schemaResource() );
releaseSchemaLock( locks );

awaitIndexPopulation( constraint, indexId );

// Index population was successful, but at this point we don't know if the uniqueness constraint holds.
// Acquire SCHEMA WRITE lock and verify the constraints here in this user transaction
// and if everything checks out then it will be held until after the constraint has been
// created and activated.
locks.acquireExclusive( SCHEMA, schemaResource() );
acquireSchemaLock( locks );
reacquiredSchemaLock = true;
indexingService.getIndexProxy( indexId ).verifyDeferredConstraints( propertyAccessor );

Expand All @@ -135,13 +136,29 @@ public long createUniquenessConstraintIndex( KernelStatement state, SchemaReadOp
{
if ( !reacquiredSchemaLock )
{
locks.acquireExclusive( SCHEMA, schemaResource() );
acquireSchemaLock( locks );
}
dropUniquenessConstraintIndex( descriptor );
}
}
}

private void acquireSchemaLock( Client locks )
{
if ( releaseSchemaLockWhenCreatingConstraint )
{
locks.acquireExclusive( SCHEMA, schemaResource() );
}
}

private void releaseSchemaLock( Client locks )
{
if ( releaseSchemaLockWhenCreatingConstraint )
{
locks.releaseExclusive( SCHEMA, schemaResource() );
}
}

/**
* You MUST hold a schema write lock before you call this method.
*/
Expand Down
Expand Up @@ -43,9 +43,11 @@
import org.neo4j.kernel.impl.api.index.IndexProxy;
import org.neo4j.kernel.impl.api.index.IndexingService;
import org.neo4j.kernel.impl.api.state.ConstraintIndexCreator;
import org.neo4j.kernel.impl.locking.ResourceTypes;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -77,7 +79,7 @@ public void shouldCreateIndexInAnotherTransaction() throws Exception
when( indexingService.getIndexProxy( 2468L ) ).thenReturn( indexProxy );
PropertyAccessor propertyAccessor = mock( PropertyAccessor.class );

ConstraintIndexCreator creator = new ConstraintIndexCreator( () -> kernel, indexingService, propertyAccessor );
ConstraintIndexCreator creator = new ConstraintIndexCreator( () -> kernel, indexingService, propertyAccessor, false );

// when
long indexId = creator.createUniquenessConstraintIndex( state, constraintCreationContext.schemaReadOperations(), 123, 456 );
Expand Down Expand Up @@ -113,7 +115,7 @@ public void shouldDropIndexIfPopulationFails() throws Exception
.when(indexProxy).awaitStoreScanCompleted();
PropertyAccessor propertyAccessor = mock( PropertyAccessor.class );

ConstraintIndexCreator creator = new ConstraintIndexCreator( () -> kernel, indexingService, propertyAccessor );
ConstraintIndexCreator creator = new ConstraintIndexCreator( () -> kernel, indexingService, propertyAccessor, false );

// when
try
Expand Down Expand Up @@ -149,7 +151,7 @@ public void shouldDropIndexInAnotherTransaction() throws Exception
IndexDescriptor descriptor = new IndexDescriptor( 123, 456 );
PropertyAccessor propertyAccessor = mock( PropertyAccessor.class );

ConstraintIndexCreator creator = new ConstraintIndexCreator( () -> kernel, indexingService, propertyAccessor );
ConstraintIndexCreator creator = new ConstraintIndexCreator( () -> kernel, indexingService, propertyAccessor, false );

// when
creator.dropUniquenessConstraintIndex( descriptor );
Expand All @@ -160,6 +162,33 @@ public void shouldDropIndexInAnotherTransaction() throws Exception
verifyZeroInteractions( indexingService );
}

@Test
public void shouldReleaseSchemaLockWhileAwaitingIndexPopulation() throws Exception
{
// given
StubKernel kernel = new StubKernel();
IndexingService indexingService = mock( IndexingService.class );
StatementOperationParts constraintCreationContext = mockedParts();
IndexDescriptor descriptor = new IndexDescriptor( 123, 456 );
PropertyAccessor propertyAccessor = mock( PropertyAccessor.class );

KernelStatement state = mockedState();

when( constraintCreationContext.schemaReadOperations().indexGetCommittedId( state, descriptor, CONSTRAINT ) )
.thenReturn( 2468L );
IndexProxy indexProxy = mock( IndexProxy.class );
when( indexingService.getIndexProxy( anyLong() ) ).thenReturn( indexProxy );

ConstraintIndexCreator creator = new ConstraintIndexCreator( () -> kernel, indexingService, propertyAccessor, true );

// when
creator.createUniquenessConstraintIndex( state, constraintCreationContext.schemaReadOperations(), 123, 456 );

// then
verify( state.locks().pessimistic() ).releaseExclusive( ResourceTypes.SCHEMA, ResourceTypes.schemaResource() );
verify( state.locks().pessimistic() ).acquireExclusive( ResourceTypes.SCHEMA, ResourceTypes.schemaResource() );
}

public class StubKernel implements KernelAPI
{
private final List<KernelStatement> statements = new ArrayList<>();
Expand Down
Expand Up @@ -136,7 +136,7 @@ public void shouldRemoveAConstraintIndexWithoutOwnerInRecovery() throws Exceptio
{
// given
PropertyAccessor propertyAccessor = mock( PropertyAccessor.class );
ConstraintIndexCreator creator = new ConstraintIndexCreator( () -> kernel, indexingService, propertyAccessor );
ConstraintIndexCreator creator = new ConstraintIndexCreator( () -> kernel, indexingService, propertyAccessor, false );
creator.createConstraintIndex( labelId, propertyKeyId );

// when
Expand Down
Expand Up @@ -57,20 +57,23 @@ command instanceof NodeCommand && mayResultInIndexUpdates( (NodeCommand) command
private final IntegrityValidator validator;
private final Monitor monitor;
private final Locks locks;
private final boolean reacquireSharedSchemaLockOnIncomingTransactions;

public interface Monitor
{
void missedReplicas( int number );
}

public MasterTransactionCommitProcess( TransactionCommitProcess commitProcess, TransactionPropagator txPropagator,
IntegrityValidator validator, Monitor monitor, Locks locks )
IntegrityValidator validator, Monitor monitor, Locks locks,
boolean reacquireSharedSchemaLockOnIncomingTransactions )
{
this.inner = commitProcess;
this.txPropagator = txPropagator;
this.validator = validator;
this.monitor = monitor;
this.locks = locks;
this.reacquireSharedSchemaLockOnIncomingTransactions = reacquireSharedSchemaLockOnIncomingTransactions;
}

@Override
Expand Down Expand Up @@ -102,7 +105,10 @@ private Locks.Client validate( TransactionToApply batch ) throws TransactionFail
{
while ( batch != null )
{
locks = acquireSharedSchemaLockIfTransactionResultsInIndexUpdates( batch, locks );
if ( reacquireSharedSchemaLockOnIncomingTransactions )
{
locks = acquireSharedSchemaLockIfTransactionResultsInIndexUpdates( batch, locks );
}
validator.validateTransactionStartKnowledge(
batch.transactionRepresentation().getLatestCommittedTxWhenStarted() );
batch = batch.next();
Expand Down
Expand Up @@ -42,18 +42,21 @@ public class CommitProcessSwitcher extends AbstractComponentSwitcher<Transaction
private final DependencyResolver dependencyResolver;
private final MasterTransactionCommitProcess.Monitor monitor;
private final Locks locks;
private final boolean reacquireSharedSchemaLockOnIncomingTransactions;

public CommitProcessSwitcher( TransactionPropagator txPropagator, Master master,
DelegateInvocationHandler<TransactionCommitProcess> delegate, RequestContextFactory requestContextFactory,
Locks locks,
Monitors monitors, DependencyResolver dependencyResolver )
Monitors monitors, DependencyResolver dependencyResolver,
boolean reacquireSharedSchemaLockOnIncomingTransactions )
{
super( delegate );
this.txPropagator = txPropagator;
this.master = master;
this.requestContextFactory = requestContextFactory;
this.locks = locks;
this.dependencyResolver = dependencyResolver;
this.reacquireSharedSchemaLockOnIncomingTransactions = reacquireSharedSchemaLockOnIncomingTransactions;
this.monitor = monitors.newMonitor( MasterTransactionCommitProcess.Monitor.class );
}

Expand All @@ -71,6 +74,7 @@ protected TransactionCommitProcess getMasterImpl()
dependencyResolver.resolveDependency( StorageEngine.class ) );

IntegrityValidator validator = dependencyResolver.resolveDependency( IntegrityValidator.class );
return new MasterTransactionCommitProcess( commitProcess, txPropagator, validator, monitor, locks );
return new MasterTransactionCommitProcess( commitProcess, txPropagator, validator, monitor, locks,
reacquireSharedSchemaLockOnIncomingTransactions );
}
}
Expand Up @@ -606,7 +606,8 @@ private CommitProcessFactory createCommitProcessFactory( Dependencies dependenci
TransactionCommitProcess.class );

CommitProcessSwitcher commitProcessSwitcher = new CommitProcessSwitcher( transactionPropagator,
master, commitProcessDelegate, requestContextFactory, lockManager, monitors, dependencies );
master, commitProcessDelegate, requestContextFactory, lockManager, monitors, dependencies,
config.get( GraphDatabaseSettings.release_schema_lock_while_building_constraint ) );
componentSwitcherContainer.add( commitProcessSwitcher );

return new HighlyAvailableCommitProcessFactory( commitProcessDelegate );
Expand Down

0 comments on commit c4b7a4d

Please sign in to comment.