diff --git a/community/kernel/src/main/java/org/neo4j/graphdb/TransactionTerminatedException.java b/community/kernel/src/main/java/org/neo4j/graphdb/TransactionTerminatedException.java index fd86f2adbd712..c42a2f339e9d6 100644 --- a/community/kernel/src/main/java/org/neo4j/graphdb/TransactionTerminatedException.java +++ b/community/kernel/src/main/java/org/neo4j/graphdb/TransactionTerminatedException.java @@ -19,6 +19,8 @@ */ package org.neo4j.graphdb; +import static java.util.Objects.requireNonNull; + /** * Signals that the transaction within which the failed operations ran * has been terminated with {@link Transaction#terminate()}. @@ -27,6 +29,11 @@ public class TransactionTerminatedException extends TransactionFailureException { public TransactionTerminatedException() { - super( "The transaction has been terminated." ); + this( "" ); + } + + protected TransactionTerminatedException( String info ) + { + super( "The transaction has been terminated. " + requireNonNull( info ) ); } } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/NeoStoreDataSource.java b/community/kernel/src/main/java/org/neo4j/kernel/NeoStoreDataSource.java index 8af421b938313..b2810e1361e26 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/NeoStoreDataSource.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/NeoStoreDataSource.java @@ -1112,7 +1112,7 @@ public KernelAPI get() statementOperations, updateableSchemaState, schemaWriteGuard, schemaIndexProviderMap, transactionHeaderInformationFactory, storeLayer, transactionCommitProcess, indexConfigStore, legacyIndexProviderLookup, hooks, constraintSemantics, - transactionMonitor, life, procedureCache, tracers ) ); + transactionMonitor, life, procedureCache, config, tracers ) ); final Kernel kernel = new Kernel( kernelTransactions, hooks, kernelHealth, transactionMonitor ); diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementation.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementation.java index e241a3e34b77d..257fc0c91b028 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementation.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementation.java @@ -159,6 +159,7 @@ TransactionType upgradeToSchemaTransaction() throws InvalidTransactionTypeKernel private final TransactionToRecordStateVisitor txStateToRecordStateVisitor = new TransactionToRecordStateVisitor(); private final Collection extractedCommands = new ArrayCollection<>( 32 ); private final Locks locksManager; + private final boolean txTerminationAwareLocks; private TransactionState txState; private LegacyIndexTransactionState legacyIndexTransactionState; private TransactionType transactionType = TransactionType.ANY; @@ -204,7 +205,8 @@ public KernelTransactionImplementation( StatementOperationParts operations, Clock clock, TransactionTracer tracer, ProcedureCache procedureCache, - NeoStoreTransactionContext context ) + NeoStoreTransactionContext context, + boolean txTerminationAwareLocks ) { this.operations = operations; this.schemaWriteGuard = schemaWriteGuard; @@ -214,6 +216,7 @@ public KernelTransactionImplementation( StatementOperationParts operations, this.providerMap = providerMap; this.schemaState = schemaState; this.locksManager = locks; + this.txTerminationAwareLocks = txTerminationAwareLocks; this.hooks = hooks; this.constraintIndexCreator = constraintIndexCreator; this.headerInformationFactory = headerInformationFactory; @@ -279,6 +282,10 @@ public void markForTermination() { failure = true; terminated = true; + if ( txTerminationAwareLocks && locks != null ) + { + locks.stop(); + } transactionMonitor.transactionTerminated(); } } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactions.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactions.java index 8328b8f587999..789217171f6b2 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactions.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactions.java @@ -29,10 +29,13 @@ import org.neo4j.function.Factory; import org.neo4j.function.Supplier; import org.neo4j.graphdb.DatabaseShutdownException; +import org.neo4j.graphdb.config.Setting; import org.neo4j.helpers.Clock; import org.neo4j.kernel.api.KernelTransaction; import org.neo4j.kernel.api.labelscan.LabelScanStore; import org.neo4j.kernel.api.txstate.LegacyIndexTransactionState; +import org.neo4j.kernel.configuration.Config; +import org.neo4j.kernel.configuration.Settings; import org.neo4j.kernel.impl.api.index.IndexingService; import org.neo4j.kernel.impl.api.index.SchemaIndexProviderMap; import org.neo4j.kernel.impl.api.state.ConstraintIndexCreator; @@ -54,6 +57,7 @@ import org.neo4j.kernel.monitoring.tracing.Tracers; import static java.util.Collections.newSetFromMap; +import static org.neo4j.kernel.configuration.Settings.setting; /** * Central source of transactions in the database. @@ -67,11 +71,15 @@ public class KernelTransactions extends LifecycleAdapter implements Factory, // For providing KernelTransaction instances Supplier // For providing KernelTransactionSnapshots { + public static final Setting tx_termination_aware_locks = setting( + "experimental.tx_termination_aware_locks", Settings.BOOLEAN, Settings.FALSE ); + // Transaction dependencies private final NeoStoreTransactionContextFactory neoStoreTransactionContextFactory; private final NeoStores neoStores; private final Locks locks; + private final boolean txTerminationAwareLocks; private final IntegrityValidator integrityValidator; private final ConstraintIndexCreator constraintIndexCreator; private final IndexingService indexingService; @@ -123,11 +131,13 @@ public KernelTransactions( NeoStoreTransactionContextFactory neoStoreTransaction ConstraintSemantics constraintSemantics, TransactionMonitor transactionMonitor, LifeSupport dataSourceLife, ProcedureCache procedureCache, + Config config, Tracers tracers ) { this.neoStoreTransactionContextFactory = neoStoreTransactionContextFactory; this.neoStores = neoStores; this.locks = locks; + this.txTerminationAwareLocks = config.get( tx_termination_aware_locks ); this.integrityValidator = integrityValidator; this.constraintIndexCreator = constraintIndexCreator; this.indexingService = indexingService; @@ -168,7 +178,7 @@ public KernelTransactionImplementation newInstance() neoStores, locks, hooks, constraintIndexCreator, transactionHeaderInformationFactory, transactionCommitProcess, transactionMonitor, storeLayer, legacyIndexTransactionState, localTxPool, constraintSemantics, Clock.SYSTEM_CLOCK, tracers.transactionTracer, procedureCache, - context ); + context, txTerminationAwareLocks ); allTransactions.add( tx ); return tx; diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/LockClientStateHolder.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/LockClientStateHolder.java index 07828bbbee66e..aafd68b4fc4b6 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/LockClientStateHolder.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/LockClientStateHolder.java @@ -66,10 +66,12 @@ public void stopClient() /** * Increment active number of clients that use current state instance. - * @return false if already stopped and not possible to increment active clients counter, true in case if counter - * was successfully incremented. + * + * @param client the locks client associated with this state; used only to create pretty exception + * with {@link LockClientStoppedException#LockClientStoppedException(Locks.Client)}. + * @throws LockClientStoppedException when stopped. */ - public boolean incrementActiveClients() + public void incrementActiveClients( Locks.Client client ) { int currentState; do @@ -77,11 +79,10 @@ public boolean incrementActiveClients() currentState = clientState.get(); if ( isStopped( currentState ) ) { - return false; + throw new LockClientStoppedException( client ); } } while ( !clientState.compareAndSet( currentState, statusWithUpdatedClients( currentState, 1 ) ) ); - return true; } /** @@ -138,4 +139,4 @@ private int statusWithUpdatedClients( int clientState, int delta ) { return getStatus( clientState ) | (getActiveClients( clientState ) + delta); } -} \ No newline at end of file +} diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/LockClientAlreadyClosedException.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/LockClientStoppedException.java similarity index 69% rename from community/kernel/src/main/java/org/neo4j/kernel/impl/locking/LockClientAlreadyClosedException.java rename to community/kernel/src/main/java/org/neo4j/kernel/impl/locking/LockClientStoppedException.java index 5df0ff0aeff58..505074f7c5558 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/LockClientAlreadyClosedException.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/LockClientStoppedException.java @@ -19,14 +19,17 @@ */ package org.neo4j.kernel.impl.locking; +import org.neo4j.graphdb.TransactionTerminatedException; + +import static java.util.Objects.requireNonNull; + /** - * Exception that will be thrown in case when closed {@link org.neo4j.kernel.impl.locking.Locks.Client} - * will be used to acquire shared/exclusive lock + * Exception thrown when stopped {@link Locks.Client} used to acquire locks. */ -public class LockClientAlreadyClosedException extends RuntimeException +public class LockClientStoppedException extends TransactionTerminatedException { - public LockClientAlreadyClosedException( String message ) + public LockClientStoppedException( Locks.Client client ) { - super( message ); + super( requireNonNull( client ) + " is stopped" ); } } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/community/CommunityLockClient.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/community/CommunityLockClient.java index adff70c7c0c57..d40a2b9d8e556 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/community/CommunityLockClient.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/community/CommunityLockClient.java @@ -28,8 +28,8 @@ import org.neo4j.collection.primitive.PrimitiveLongObjectMap; import org.neo4j.collection.primitive.PrimitiveLongObjectVisitor; import org.neo4j.helpers.collection.Visitor; -import org.neo4j.kernel.impl.locking.LockClientAlreadyClosedException; import org.neo4j.kernel.impl.locking.LockClientStateHolder; +import org.neo4j.kernel.impl.locking.LockClientStoppedException; import org.neo4j.kernel.impl.locking.Locks; import static java.lang.String.format; @@ -62,10 +62,7 @@ public CommunityLockClient( LockManagerImpl manager ) @Override public void acquireShared( Locks.ResourceType resourceType, long resourceId ) { - if ( !stateHolder.incrementActiveClients() ) - { - throw new LockClientAlreadyClosedException( String.format( "%s is already closed", this ) ); - } + stateHolder.incrementActiveClients( this ); try { PrimitiveLongObjectMap localLocks = localShared( resourceType ); @@ -83,7 +80,7 @@ public void acquireShared( Locks.ResourceType resourceType, long resourceId ) } else { - throw new LockClientAlreadyClosedException( String.format( "%s is already closed", this ) ); + throw new LockClientStoppedException( this ); } } } @@ -98,10 +95,7 @@ public void acquireShared( Locks.ResourceType resourceType, long resourceId ) @Override public void acquireExclusive( Locks.ResourceType resourceType, long resourceId ) { - if ( !stateHolder.incrementActiveClients() ) - { - throw new LockClientAlreadyClosedException( String.format( "%s is already closed", this ) ); - } + stateHolder.incrementActiveClients( this ); try { PrimitiveLongObjectMap localLocks = localExclusive( resourceType ); @@ -119,7 +113,7 @@ public void acquireExclusive( Locks.ResourceType resourceType, long resourceId ) } else { - throw new LockClientAlreadyClosedException( String.format( "%s is already closed", this ) ); + throw new LockClientStoppedException( this ); } } } @@ -132,10 +126,7 @@ public void acquireExclusive( Locks.ResourceType resourceType, long resourceId ) @Override public boolean tryExclusiveLock( Locks.ResourceType resourceType, long resourceId ) { - if ( !stateHolder.incrementActiveClients() ) - { - return false; - } + stateHolder.incrementActiveClients( this ); try { PrimitiveLongObjectMap localLocks = localExclusive( resourceType ); @@ -168,10 +159,7 @@ public boolean tryExclusiveLock( Locks.ResourceType resourceType, long resourceI @Override public boolean trySharedLock( Locks.ResourceType resourceType, long resourceId ) { - if ( !stateHolder.incrementActiveClients() ) - { - return false; - } + stateHolder.incrementActiveClients( this ); try { PrimitiveLongObjectMap localLocks = localShared( resourceType ); @@ -204,10 +192,7 @@ public boolean trySharedLock( Locks.ResourceType resourceType, long resourceId ) @Override public void releaseShared( Locks.ResourceType resourceType, long resourceId ) { - if ( !stateHolder.incrementActiveClients() ) - { - throw new LockClientAlreadyClosedException( String.format( "%s is already closed", this ) ); - } + stateHolder.incrementActiveClients( this ); try { PrimitiveLongObjectMap localLocks = localShared( resourceType ); @@ -230,10 +215,7 @@ public void releaseShared( Locks.ResourceType resourceType, long resourceId ) @Override public void releaseExclusive( Locks.ResourceType resourceType, long resourceId ) { - if ( !stateHolder.incrementActiveClients() ) - { - throw new LockClientAlreadyClosedException( String.format( "%s is already closed", this ) ); - } + stateHolder.incrementActiveClients( this ); try { PrimitiveLongObjectMap localLocks = localExclusive( resourceType ); @@ -255,10 +237,7 @@ public void releaseExclusive( Locks.ResourceType resourceType, long resourceId ) @Override public void releaseAll() { - if ( !stateHolder.incrementActiveClients() ) - { - throw new LockClientAlreadyClosedException( String.format( "%s is already closed", this ) ); - } + stateHolder.incrementActiveClients( this ); try { releaseLocks(); diff --git a/community/kernel/src/test/java/org/neo4j/graphdb/GraphDatabaseShutdownTest.java b/community/kernel/src/test/java/org/neo4j/graphdb/GraphDatabaseShutdownTest.java index 309c78dfe81af..2bb965144e667 100644 --- a/community/kernel/src/test/java/org/neo4j/graphdb/GraphDatabaseShutdownTest.java +++ b/community/kernel/src/test/java/org/neo4j/graphdb/GraphDatabaseShutdownTest.java @@ -19,12 +19,12 @@ */ package org.neo4j.graphdb; +import org.junit.Test; + import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; -import org.junit.Test; - import org.neo4j.kernel.GraphDatabaseAPI; import org.neo4j.kernel.impl.locking.LockCountVisitor; import org.neo4j.kernel.impl.locking.Locks; @@ -32,12 +32,12 @@ import static java.util.concurrent.Executors.newSingleThreadExecutor; import static java.util.concurrent.TimeUnit.SECONDS; - +import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; - +import static org.junit.Assert.fail; import static org.neo4j.graphdb.DynamicLabel.label; import static org.neo4j.helpers.Exceptions.rootCause; @@ -130,10 +130,12 @@ public Void call() throws Exception try { secondTxResult.get( 60, SECONDS ); + fail( "Exception expected" ); } catch ( Exception e ) { - assertThat( rootCause( e ), instanceOf( TransactionTerminatedException.class ) ); + assertThat( rootCause( e ), anyOf( instanceOf( TransactionFailureException.class ), + instanceOf( TransactionTerminatedException.class ) ) ); } } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/api/KernelTransactionFactory.java b/community/kernel/src/test/java/org/neo4j/kernel/api/KernelTransactionFactory.java index b4bcbbb84b82c..fb8c2c42d1bce 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/api/KernelTransactionFactory.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/api/KernelTransactionFactory.java @@ -64,6 +64,7 @@ null, mock( NeoStores.class ), new NoOpLocks(), new TransactionHooks(), Clock.SYSTEM_CLOCK, TransactionTracer.NULL, new ProcedureCache(), - mock( NeoStoreTransactionContext.class )); + mock( NeoStoreTransactionContext.class ), + false ); } } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionImplementationTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionImplementationTest.java index 7337cf7861777..c0b3d8a25f9c8 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionImplementationTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionImplementationTest.java @@ -31,11 +31,6 @@ import org.neo4j.kernel.api.KernelTransaction; import org.neo4j.kernel.api.exceptions.TransactionFailureException; import org.neo4j.kernel.api.txstate.LegacyIndexTransactionState; -import org.neo4j.kernel.impl.api.KernelTransactionImplementation; -import org.neo4j.kernel.impl.api.TransactionApplicationMode; -import org.neo4j.kernel.impl.api.TransactionCommitProcess; -import org.neo4j.kernel.impl.api.TransactionHeaderInformation; -import org.neo4j.kernel.impl.api.TransactionHooks; import org.neo4j.kernel.impl.api.store.ProcedureCache; import org.neo4j.kernel.impl.api.store.StoreReadLayer; import org.neo4j.kernel.impl.api.store.StoreStatement; @@ -459,7 +454,7 @@ private KernelTransactionImplementation newTransaction() null, null, null, null, null, recordState, null, neoStores, locks, hooks, null, headerInformationFactory, commitProcess, transactionMonitor, storeReadLayer, legacyIndexState, pool, new StandardConstraintSemantics(), clock, TransactionTracer.NULL, new ProcedureCache(), mock( NeoStoreTransactionContext - .class ) ); + .class ), false ); transaction.initialize( 0 ); return transaction; } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionsTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionsTest.java index af0fd63890332..b5f0a73f528b2 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionsTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionsTest.java @@ -28,12 +28,13 @@ import java.util.concurrent.atomic.AtomicReferenceArray; import org.neo4j.helpers.collection.Iterables; -import org.neo4j.kernel.impl.api.store.ProcedureCache; -import org.neo4j.kernel.impl.constraints.ConstraintSemantics; import org.neo4j.kernel.api.KernelTransaction; import org.neo4j.kernel.api.exceptions.TransactionFailureException; +import org.neo4j.kernel.configuration.Config; +import org.neo4j.kernel.impl.api.store.ProcedureCache; import org.neo4j.kernel.impl.api.store.StoreReadLayer; import org.neo4j.kernel.impl.api.store.StoreStatement; +import org.neo4j.kernel.impl.constraints.ConstraintSemantics; import org.neo4j.kernel.impl.locking.LockGroup; import org.neo4j.kernel.impl.locking.Locks; import org.neo4j.kernel.impl.store.MetaDataStore; @@ -53,6 +54,8 @@ import org.neo4j.logging.NullLog; import org.neo4j.test.Race; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.locks.LockSupport.parkNanos; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.not; import static org.junit.Assert.assertFalse; @@ -64,10 +67,6 @@ import static org.mockito.Mockito.RETURNS_MOCKS; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; - -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.locks.LockSupport.parkNanos; - import static org.neo4j.helpers.collection.IteratorUtil.asSet; import static org.neo4j.helpers.collection.IteratorUtil.asUniqueSet; @@ -267,8 +266,8 @@ private static KernelTransactions newKernelTransactions( TransactionCommitProces return new KernelTransactions( contextSupplier, neoStores, locks, mock( IntegrityValidator.class ), null, null, null, null, null, null, null, TransactionHeaderInformationFactory.DEFAULT, readLayer, commitProcess, null, - null, new TransactionHooks(), mock( ConstraintSemantics.class ), mock( TransactionMonitor.class ), life, new ProcedureCache(), - new Tracers( "null", NullLog.getInstance() )); + null, new TransactionHooks(), mock( ConstraintSemantics.class ), mock( TransactionMonitor.class ), + life, new ProcedureCache(), new Config(), new Tracers( "null", NullLog.getInstance() ) ); } private static TransactionCommitProcess newRememberingCommitProcess( final TransactionRepresentation[] slot ) diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/CloseCompatibility.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/CloseCompatibility.java index c08bb169b2ae7..aa114948c475f 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/CloseCompatibility.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/CloseCompatibility.java @@ -87,14 +87,14 @@ public void closeShouldWaitAllOperationToFinish() } - @Test( expected = LockClientAlreadyClosedException.class ) + @Test( expected = LockClientStoppedException.class ) public void shouldNotBeAbleToAcquireSharedLockFromClosedClient() { clientA.close(); clientA.acquireShared( NODE, 1l ); } - @Test( expected = LockClientAlreadyClosedException.class ) + @Test( expected = LockClientStoppedException.class ) public void shouldNotBeAbleToAcquireExclusiveLockFromClosedClient() { clientA.close(); diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/LockClientStateHolderTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/LockClientStateHolderTest.java index 536d73e938bef..a62c6ce3abe28 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/LockClientStateHolderTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/LockClientStateHolderTest.java @@ -21,8 +21,11 @@ import org.junit.Test; +import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class LockClientStateHolderTest { @@ -35,10 +38,10 @@ public void shouldAllowIncrementDecrementClientsWhileNotClosed() // expect assertFalse( lockClientStateHolder.hasActiveClients() ); - assertTrue( lockClientStateHolder.incrementActiveClients() ); + lockClientStateHolder.incrementActiveClients( new NoOpClient() ); assertTrue( lockClientStateHolder.hasActiveClients() ); - assertTrue( lockClientStateHolder.incrementActiveClients() ); - assertTrue( lockClientStateHolder.incrementActiveClients() ); + lockClientStateHolder.incrementActiveClients( new NoOpClient() ); + lockClientStateHolder.incrementActiveClients( new NoOpClient() ); lockClientStateHolder.decrementActiveClients(); lockClientStateHolder.decrementActiveClients(); lockClientStateHolder.decrementActiveClients(); @@ -56,7 +59,15 @@ public void shouldNotAllowNewClientsWhenClosed() // then assertFalse( lockClientStateHolder.hasActiveClients() ); - assertFalse( lockClientStateHolder.incrementActiveClients() ); + try + { + lockClientStateHolder.incrementActiveClients( new NoOpClient() ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( LockClientStoppedException.class ) ); + } } @Test @@ -66,10 +77,10 @@ public void shouldBeAbleToDecrementActiveItemAndDetectWhenFree() LockClientStateHolder lockClientStateHolder = new LockClientStateHolder(); // when - lockClientStateHolder.incrementActiveClients(); - lockClientStateHolder.incrementActiveClients(); + lockClientStateHolder.incrementActiveClients(new NoOpClient()); + lockClientStateHolder.incrementActiveClients(new NoOpClient()); lockClientStateHolder.decrementActiveClients(); - lockClientStateHolder.incrementActiveClients(); + lockClientStateHolder.incrementActiveClients(new NoOpClient()); // expect assertTrue( lockClientStateHolder.hasActiveClients() ); @@ -92,8 +103,8 @@ public void shouldBeAbleToResetAndReuseClientState() LockClientStateHolder lockClientStateHolder = new LockClientStateHolder(); // when - assertTrue( lockClientStateHolder.incrementActiveClients() ); - assertTrue( lockClientStateHolder.incrementActiveClients() ); + lockClientStateHolder.incrementActiveClients( new NoOpClient() ); + lockClientStateHolder.incrementActiveClients( new NoOpClient() ); lockClientStateHolder.decrementActiveClients(); // expect @@ -114,9 +125,9 @@ public void shouldBeAbleToResetAndReuseClientState() assertFalse( lockClientStateHolder.isStopped() ); // when - assertTrue( lockClientStateHolder.incrementActiveClients() ); + lockClientStateHolder.incrementActiveClients( new NoOpClient() ); assertTrue( lockClientStateHolder.hasActiveClients() ); assertFalse( lockClientStateHolder.isStopped() ); } -} \ No newline at end of file +} diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/StopCompatibility.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/StopCompatibility.java index 4c07debc1fd5c..4b54b2c7b2197 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/StopCompatibility.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/StopCompatibility.java @@ -19,20 +19,66 @@ */ package org.neo4j.kernel.impl.locking; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.neo4j.kernel.impl.locking.ResourceTypes.NODE; @Ignore( "Not a test. This is a compatibility suite, run from LockingCompatibilityTestSuite." ) public class StopCompatibility extends LockingCompatibilityTestSuite.Compatibility { + private static final Locks.ResourceType RESOURCE_TYPE = ResourceTypes.NODE; + private static final long RESOURCE_ID = 42; + private static final long OTHER_RESOURCE_ID = 4242; + + private ExecutorService executor; + private Locks.Client client; + public StopCompatibility( LockingCompatibilityTestSuite suite ) { super( suite ); } + @Before + public void setUp() throws Exception + { + executor = Executors.newSingleThreadExecutor(); + client = locks.newClient(); + } + + @After + public void tearDown() throws Exception + { + client.close(); + executor.shutdownNow(); + executor.awaitTermination( 1, TimeUnit.MINUTES ); + } + @Test public void releaseWriteLockWaitersOnStop() { @@ -70,4 +116,572 @@ public void releaseReadLockWaitersOnStop() locks.accept( lockCountVisitor ); Assert.assertEquals( 2, lockCountVisitor.getLockCount() ); } + + @Test( expected = LockClientStoppedException.class ) + public void acquireSharedThrowsWhenClientStopped() + { + stoppedClient().acquireShared( ResourceTypes.NODE, 1 ); + } + + @Test( expected = LockClientStoppedException.class ) + public void acquireExclusiveThrowsWhenClientStopped() + { + stoppedClient().acquireExclusive( ResourceTypes.NODE, 1 ); + } + + @Test( expected = LockClientStoppedException.class ) + public void trySharedLockThrowsWhenClientStopped() + { + stoppedClient().trySharedLock( ResourceTypes.NODE, 1 ); + } + + @Test( expected = LockClientStoppedException.class ) + public void tryExclusiveLockThrowsWhenClientStopped() + { + stoppedClient().tryExclusiveLock( ResourceTypes.NODE, 1 ); + } + + @Test( expected = LockClientStoppedException.class ) + public void releaseSharedThrowsWhenClientStopped() + { + stoppedClient().releaseShared( ResourceTypes.NODE, 1 ); + } + + @Test( expected = LockClientStoppedException.class ) + public void releaseExclusiveThrowsWhenClientStopped() + { + stoppedClient().releaseExclusive( ResourceTypes.NODE, 1 ); + } + + @Test( expected = LockClientStoppedException.class ) + public void releaseAllThrowsWhenClientStopped() + { + stoppedClient().releaseAll(); + } + + @Test + public void sharedLockCanBeStopped() throws Exception + { + acquireExclusiveLockInThisThread(); + + LockAcquisition sharedLockAcquisition = acquireSharedLockInAnotherThread(); + assertThreadIsWaitingForLock( sharedLockAcquisition ); + + sharedLockAcquisition.stop(); + assertLockAcquisitionFailed( sharedLockAcquisition ); + } + + @Test + public void exclusiveLockCanBeStopped() throws Exception + { + acquireExclusiveLockInThisThread(); + + LockAcquisition exclusiveLockAcquisition = acquireExclusiveLockInAnotherThread(); + assertThreadIsWaitingForLock( exclusiveLockAcquisition ); + + exclusiveLockAcquisition.stop(); + assertLockAcquisitionFailed( exclusiveLockAcquisition ); + } + + @Test + public void acquireSharedLockAfterSharedLockStoppedOtherThread() throws Exception + { + acquireExclusiveLockInThisThread(); + + LockAcquisition sharedLockAcquisition1 = acquireSharedLockInAnotherThread(); + assertThreadIsWaitingForLock( sharedLockAcquisition1 ); + + sharedLockAcquisition1.stop(); + assertLockAcquisitionFailed( sharedLockAcquisition1 ); + + releaseAllLocksInThisThread(); + + LockAcquisition sharedLockAcquisition2 = acquireSharedLockInAnotherThread(); + assertLockAcquisitionSucceeded( sharedLockAcquisition2 ); + } + + @Test + public void acquireExclusiveLockAfterExclusiveLockStoppedOtherThread() throws Exception + { + acquireExclusiveLockInThisThread(); + + LockAcquisition exclusiveLockAcquisition1 = acquireExclusiveLockInAnotherThread(); + assertThreadIsWaitingForLock( exclusiveLockAcquisition1 ); + + exclusiveLockAcquisition1.stop(); + assertLockAcquisitionFailed( exclusiveLockAcquisition1 ); + + releaseAllLocksInThisThread(); + + LockAcquisition exclusiveLockAcquisition2 = acquireExclusiveLockInAnotherThread(); + assertLockAcquisitionSucceeded( exclusiveLockAcquisition2 ); + } + + @Test + public void acquireSharedLockAfterExclusiveLockStoppedOtherThread() throws Exception + { + acquireExclusiveLockInThisThread(); + + LockAcquisition exclusiveLockAcquisition = acquireExclusiveLockInAnotherThread(); + assertThreadIsWaitingForLock( exclusiveLockAcquisition ); + + exclusiveLockAcquisition.stop(); + assertLockAcquisitionFailed( exclusiveLockAcquisition ); + + releaseAllLocksInThisThread(); + + LockAcquisition sharedLockAcquisition = acquireSharedLockInAnotherThread(); + assertLockAcquisitionSucceeded( sharedLockAcquisition ); + } + + @Test + public void acquireExclusiveLockAfterSharedLockStoppedOtherThread() throws Exception + { + acquireExclusiveLockInThisThread(); + + LockAcquisition sharedLockAcquisition = acquireSharedLockInAnotherThread(); + assertThreadIsWaitingForLock( sharedLockAcquisition ); + + sharedLockAcquisition.stop(); + assertLockAcquisitionFailed( sharedLockAcquisition ); + + releaseAllLocksInThisThread(); + + LockAcquisition exclusiveLockAcquisition = acquireExclusiveLockInAnotherThread(); + assertLockAcquisitionSucceeded( exclusiveLockAcquisition ); + } + + @Test + public void acquireSharedLockAfterSharedLockStoppedSameThread() throws Exception + { + acquireLockAfterOtherLockStoppedSameThread( true, true ); + } + + @Test + public void acquireExclusiveLockAfterExclusiveLockStoppedSameThread() throws Exception + { + acquireLockAfterOtherLockStoppedSameThread( false, false ); + } + + @Test + public void acquireSharedLockAfterExclusiveLockStoppedSameThread() throws Exception + { + acquireLockAfterOtherLockStoppedSameThread( true, false ); + } + + @Test + public void acquireExclusiveLockAfterSharedLockStoppedSameThread() throws Exception + { + acquireLockAfterOtherLockStoppedSameThread( false, true ); + } + + @Test + public void closeClientAfterSharedLockStopped() throws Exception + { + closeClientAfterLockStopped( true ); + } + + @Test + public void closeClientAfterExclusiveLockStopped() throws Exception + { + closeClientAfterLockStopped( false ); + } + + @Test + public void acquireExclusiveLockWhileHoldingSharedLockCanBeStopped() throws Exception + { + acquireSharedLockInThisThread(); + + CountDownLatch sharedLockAcquired = new CountDownLatch( 1 ); + CountDownLatch startExclusiveLock = new CountDownLatch( 1 ); + LockAcquisition acquisition = acquireSharedAndExclusiveLocksInAnotherThread( sharedLockAcquired, + startExclusiveLock ); + + await( sharedLockAcquired ); + startExclusiveLock.countDown(); + assertThreadIsWaitingForLock( acquisition ); + + acquisition.stop(); + assertLockAcquisitionFailed( acquisition ); + + releaseAllLocksInThisThread(); + assertNoLocksHeld(); + } + + private Locks.Client stoppedClient() + { + try + { + client.stop(); + return client; + } + catch ( Throwable t ) + { + throw new AssertionError( "Unable to stop client", t ); + } + } + + private void closeClientAfterLockStopped( boolean shared ) throws Exception + { + acquireExclusiveLockInThisThread(); + + CountDownLatch firstLockAcquired = new CountDownLatch( 1 ); + LockAcquisition + acquisition = tryAcquireTwoLocksLockInAnotherThread( shared, firstLockAcquired ); + + await( firstLockAcquired ); + assertThreadIsWaitingForLock( acquisition ); + assertLocksHeld( RESOURCE_ID, OTHER_RESOURCE_ID ); + + acquisition.stop(); + assertLockAcquisitionFailed( acquisition ); + assertLocksHeld( RESOURCE_ID ); + + releaseAllLocksInThisThread(); + assertNoLocksHeld(); + } + + private void acquireLockAfterOtherLockStoppedSameThread( boolean firstLockShared, boolean secondLockShared ) + throws Exception + { + acquireExclusiveLockInThisThread(); + + CountDownLatch firstLockFailed = new CountDownLatch( 1 ); + CountDownLatch startSecondLock = new CountDownLatch( 1 ); + + LockAcquisition + lockAcquisition = acquireTwoLocksInAnotherThread( firstLockShared, secondLockShared, + firstLockFailed, startSecondLock ); + assertThreadIsWaitingForLock( lockAcquisition ); + + lockAcquisition.stop(); + await( firstLockFailed ); + releaseAllLocksInThisThread(); + startSecondLock.countDown(); + + assertLockAcquisitionSucceeded( lockAcquisition ); + } + + private void acquireSharedLockInThisThread() + { + client.acquireShared( RESOURCE_TYPE, RESOURCE_ID ); + assertLocksHeld( RESOURCE_ID ); + } + + private void acquireExclusiveLockInThisThread() + { + client.acquireExclusive( RESOURCE_TYPE, RESOURCE_ID ); + assertLocksHeld( RESOURCE_ID ); + } + + private void releaseAllLocksInThisThread() + { + client.releaseAll(); + } + + private LockAcquisition acquireSharedLockInAnotherThread() + { + return acquireLockInAnotherThread( true ); + } + + private LockAcquisition acquireExclusiveLockInAnotherThread() + { + return acquireLockInAnotherThread( false ); + } + + private LockAcquisition acquireLockInAnotherThread( final boolean shared ) + { + final LockAcquisition lockAcquisition = new LockAcquisition(); + + Future future = executor.submit( new Callable() + { + @Override + public Void call() throws Exception + { + Locks.Client client = newLockClient( lockAcquisition ); + if ( shared ) + { + client.acquireShared( RESOURCE_TYPE, RESOURCE_ID ); + } + else + { + client.acquireExclusive( RESOURCE_TYPE, RESOURCE_ID ); + } + return null; + } + } ); + lockAcquisition.setFuture( future ); + + return lockAcquisition; + } + + private LockAcquisition acquireTwoLocksInAnotherThread( final boolean firstShared, final boolean secondShared, + final CountDownLatch firstLockFailed, final CountDownLatch startSecondLock ) + { + final LockAcquisition lockAcquisition = new LockAcquisition(); + + Future future = executor.submit( new Callable() + { + @Override + public Void call() throws Exception + { + try ( Locks.Client client = newLockClient( lockAcquisition ) ) + { + try + { + if ( firstShared ) + { + client.acquireShared( RESOURCE_TYPE, RESOURCE_ID ); + } + else + { + client.acquireExclusive( RESOURCE_TYPE, RESOURCE_ID ); + } + fail( "Transaction termination expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( LockClientStoppedException.class ) ); + } + } + + lockAcquisition.setClient( null ); + firstLockFailed.countDown(); + await( startSecondLock ); + + try ( Locks.Client client = newLockClient( lockAcquisition ) ) + { + if ( secondShared ) + { + client.acquireShared( RESOURCE_TYPE, RESOURCE_ID ); + } + else + { + client.acquireExclusive( RESOURCE_TYPE, RESOURCE_ID ); + } + } + return null; + } + } ); + lockAcquisition.setFuture( future ); + + return lockAcquisition; + } + + private LockAcquisition acquireSharedAndExclusiveLocksInAnotherThread( final CountDownLatch sharedLockAcquired, + final CountDownLatch startExclusiveLock ) + { + final LockAcquisition lockAcquisition = new LockAcquisition(); + + Future future = executor.submit( new Callable() + { + @Override + public Void call() throws Exception + { + try ( Locks.Client client = newLockClient( lockAcquisition ) ) + { + client.acquireShared( RESOURCE_TYPE, RESOURCE_ID ); + + sharedLockAcquired.countDown(); + await( startExclusiveLock ); + + client.acquireExclusive( RESOURCE_TYPE, RESOURCE_ID ); + } + return null; + } + } ); + lockAcquisition.setFuture( future ); + + return lockAcquisition; + } + + private LockAcquisition tryAcquireTwoLocksLockInAnotherThread( final boolean shared, + final CountDownLatch firstLockAcquired ) + { + final LockAcquisition lockAcquisition = new LockAcquisition(); + + Future future = executor.submit( new Callable() + { + @Override + public Void call() throws Exception + { + try ( Locks.Client client = newLockClient( lockAcquisition ) ) + { + if ( shared ) + { + client.acquireShared( RESOURCE_TYPE, OTHER_RESOURCE_ID ); + } + else + { + client.acquireExclusive( RESOURCE_TYPE, OTHER_RESOURCE_ID ); + } + + firstLockAcquired.countDown(); + + if ( shared ) + { + client.acquireShared( RESOURCE_TYPE, RESOURCE_ID ); + } + else + { + client.acquireExclusive( RESOURCE_TYPE, RESOURCE_ID ); + } + } + return null; + } + } ); + lockAcquisition.setFuture( future ); + + return lockAcquisition; + } + + private Locks.Client newLockClient( LockAcquisition lockAcquisition ) + { + Locks.Client client = locks.newClient(); + lockAcquisition.setClient( client ); + return client; + } + + private void assertLocksHeld( final Long... expectedResourceIds ) + { + final List expectedLockedIds = Arrays.asList( expectedResourceIds ); + final List seenLockedIds = new ArrayList<>(); + + locks.accept( new Locks.Visitor() + { + @Override + public void visit( Locks.ResourceType resourceType, long resourceId, String description, + long estimatedWaitTime, + long lockIdentityHashCode ) + { + seenLockedIds.add( resourceId ); + } + } ); + + Collections.sort( expectedLockedIds ); + Collections.sort( seenLockedIds ); + assertEquals( "unexpected locked resource ids", expectedLockedIds, seenLockedIds ); + } + + private void assertNoLocksHeld() + { + locks.accept( new Locks.Visitor() + { + @Override + public void visit( Locks.ResourceType resourceType, long resourceId, String description, + long estimatedWaitTime, + long lockIdentityHashCode ) + { + fail( "Unexpected lock on " + resourceType + " " + resourceId ); + } + } ); + } + + private void assertThreadIsWaitingForLock( LockAcquisition lockAcquisition ) throws Exception + { + for ( int i = 0; i < 30; i++ ) + { + try + { + lockAcquisition.result(); + fail( "Timeout expected" ); + } + catch ( TimeoutException ignore ) + { + } + } + assertFalse( "locking thread completed", lockAcquisition.completed() ); + } + + private void assertLockAcquisitionSucceeded( LockAcquisition lockAcquisition ) throws Exception + { + boolean completed = false; + for ( int i = 0; i < 30; i++ ) + { + try + { + assertNull( lockAcquisition.result() ); + completed = true; + } + catch ( TimeoutException ignore ) + { + } + } + assertTrue( "lock was not acquired in time", completed ); + assertTrue( "locking thread seem to be still in progress", lockAcquisition.completed() ); + } + + private void assertLockAcquisitionFailed( LockAcquisition lockAcquisition ) throws Exception + { + ExecutionException executionException = null; + for ( int i = 0; i < 30; i++ ) + { + try + { + lockAcquisition.result(); + fail( "Transaction termination expected" ); + } + catch ( ExecutionException e ) + { + executionException = e; + } + catch ( TimeoutException ignore ) + { + } + } + assertNotNull( "execution should fail", executionException ); + assertThat( executionException.getCause(), instanceOf( LockClientStoppedException.class ) ); + assertTrue( "locking thread seem to be still in progress", lockAcquisition.completed() ); + } + + private static void await( CountDownLatch latch ) throws InterruptedException + { + if ( !latch.await( 1, TimeUnit.MINUTES ) ) + { + fail( "Count down did not happen" ); + } + } + + private static class LockAcquisition + { + volatile Future future; + volatile Locks.Client client; + + Future getFuture() + { + Objects.requireNonNull( future, "lock acquisition was not initialized with future" ); + return future; + } + + void setFuture( Future future ) + { + this.future = future; + } + + Locks.Client getClient() + { + Objects.requireNonNull( client, "lock acquisition was not initialized with client" ); + return client; + } + + void setClient( Locks.Client client ) + { + this.client = client; + } + + Object result() throws InterruptedException, ExecutionException, TimeoutException + { + return getFuture().get( 100, TimeUnit.MILLISECONDS ); + } + + boolean completed() + { + return getFuture().isDone(); + } + + void stop() + { + getClient().stop(); + } + } } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/community/CommunityLocksCompatibility.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/community/CommunityLocksCompatibilityIT.java similarity index 92% rename from community/kernel/src/test/java/org/neo4j/kernel/impl/locking/community/CommunityLocksCompatibility.java rename to community/kernel/src/test/java/org/neo4j/kernel/impl/locking/community/CommunityLocksCompatibilityIT.java index cf95350101fb3..59892658d3ad7 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/community/CommunityLocksCompatibility.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/community/CommunityLocksCompatibilityIT.java @@ -22,7 +22,7 @@ import org.neo4j.kernel.impl.locking.LockingCompatibilityTestSuite; import org.neo4j.kernel.impl.locking.Locks; -public class CommunityLocksCompatibility extends LockingCompatibilityTestSuite +public class CommunityLocksCompatibilityIT extends LockingCompatibilityTestSuite { @Override protected Locks createLockManager() diff --git a/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/enterprise/lock/forseti/ForsetiClient.java b/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/enterprise/lock/forseti/ForsetiClient.java index 1040d07b71f11..dedb6ad8aa228 100644 --- a/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/enterprise/lock/forseti/ForsetiClient.java +++ b/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/enterprise/lock/forseti/ForsetiClient.java @@ -21,15 +21,15 @@ import java.util.concurrent.ConcurrentMap; -import org.neo4j.collection.pool.LinkedQueuePool; +import org.neo4j.collection.pool.Pool; import org.neo4j.collection.primitive.Primitive; import org.neo4j.collection.primitive.PrimitiveIntIterator; import org.neo4j.collection.primitive.PrimitiveLongIntMap; import org.neo4j.collection.primitive.PrimitiveLongVisitor; import org.neo4j.kernel.DeadlockDetectedException; import org.neo4j.kernel.impl.locking.AcquireLockTimeoutException; -import org.neo4j.kernel.impl.locking.LockClientAlreadyClosedException; import org.neo4j.kernel.impl.locking.LockClientStateHolder; +import org.neo4j.kernel.impl.locking.LockClientStoppedException; import org.neo4j.kernel.impl.locking.Locks; import org.neo4j.kernel.impl.util.collection.SimpleBitSet; import org.neo4j.kernel.impl.util.concurrent.WaitStrategy; @@ -59,7 +59,7 @@ public class ForsetiClient implements Locks.Client private final WaitStrategy[] waitStrategies; /** Handle to return client to pool when closed. */ - private final LinkedQueuePool clientPool; + private final Pool clientPool; /** * The client uses this to track which locks it holds. It is solely an optimization to ensure we don't need to @@ -92,7 +92,7 @@ public class ForsetiClient implements Locks.Client public ForsetiClient( int id, ConcurrentMap[] lockMaps, WaitStrategy[] waitStrategies, - LinkedQueuePool clientPool ) + Pool clientPool ) { this.clientId = id; this.lockMaps = lockMaps; @@ -120,11 +120,8 @@ public void reset() @Override public void acquireShared( Locks.ResourceType resourceType, long resourceId ) throws AcquireLockTimeoutException { - // increment number of active clients if we can't do so we are closed so exiting - if ( !stateHolder.incrementActiveClients() ) - { - throw new LockClientAlreadyClosedException( String.format( "%s is already closed", this ) ); - } + stateHolder.incrementActiveClients( this ); + try { // Grab the global lock map we will be using @@ -159,11 +156,8 @@ public void acquireShared( Locks.ResourceType resourceType, long resourceId ) th // Retry loop while(true) { - // client closed exiting - if ( stateHolder.isStopped() ) - { - throw new LockClientAlreadyClosedException( String.format( "%s is already closed", this ) ); - } + assertNotStopped(); + // Check if there is a lock for this entity in the map ForsetiLockManager.Lock existingLock = lockMap.get( resourceId ); @@ -207,8 +201,7 @@ else if(existingLock instanceof ExclusiveLock) throw new UnsupportedOperationException( "Unknown lock type: " + existingLock ); } - // Apply the designated wait strategy - waitStrategies[resourceType.typeId()].apply( tries++ ); + applyWaitStrategy( resourceType, tries++ ); // And take note of who we are waiting for. This is used for deadlock detection. markAsWaitingFor( existingLock, resourceType, resourceId ); @@ -231,11 +224,8 @@ public void acquireExclusive( Locks.ResourceType resourceType, long resourceId ) { // For details on how this works, refer to the acquireShared method call, as the two are very similar - // increment number of active clients if we can't do so we are closed so exiting - if ( !stateHolder.incrementActiveClients() ) - { - throw new LockClientAlreadyClosedException( String.format( "%s is already closed", this ) ); - } + stateHolder.incrementActiveClients( this ); + try { ConcurrentMap lockMap = lockMaps[resourceType.typeId()]; @@ -254,11 +244,8 @@ public void acquireExclusive( Locks.ResourceType resourceType, long resourceId ) int tries = 0; while( (existingLock = lockMap.putIfAbsent( resourceId, myExclusiveLock )) != null) { - // client closed exiting - if ( stateHolder.isStopped() ) - { - throw new LockClientAlreadyClosedException( String.format( "%s is already closed", this ) ); - } + assertNotStopped(); + // If this is a shared lock: // Given a grace period of tries (to try and not starve readers), grab an update lock and wait for it // to convert to an exclusive lock. @@ -272,7 +259,7 @@ public void acquireExclusive( Locks.ResourceType resourceType, long resourceId ) } } - waitStrategies[resourceType.typeId()].apply( tries++ ); + applyWaitStrategy( resourceType, tries++ ); markAsWaitingFor( existingLock, resourceType, resourceId ); } @@ -288,11 +275,7 @@ public void acquireExclusive( Locks.ResourceType resourceType, long resourceId ) @Override public boolean tryExclusiveLock( Locks.ResourceType resourceType, long resourceId ) { - // increment number of active clients if we can't do so we are closed so exiting - if ( !stateHolder.incrementActiveClients() ) - { - return false; - } + stateHolder.incrementActiveClients( this ); try { @@ -343,11 +326,8 @@ public boolean tryExclusiveLock( Locks.ResourceType resourceType, long resourceI @Override public boolean trySharedLock( Locks.ResourceType resourceType, long resourceId ) { - // increment number of active clients if we can't do so we are closed so exiting - if ( !stateHolder.incrementActiveClients() ) - { - return false; - } + stateHolder.incrementActiveClients( this ); + try { ConcurrentMap lockMap = lockMaps[resourceType.typeId()]; @@ -372,11 +352,8 @@ public boolean trySharedLock( Locks.ResourceType resourceType, long resourceId ) while ( true ) { - // client closed exiting - if ( stateHolder.isStopped() ) - { - return false; - } + assertNotStopped(); + ForsetiLockManager.Lock existingLock = lockMap.get( resourceId ); if ( existingLock == null ) { @@ -423,11 +400,8 @@ else if ( existingLock instanceof ExclusiveLock ) @Override public void releaseShared( Locks.ResourceType resourceType, long resourceId ) { - // increment number of active clients if we can't do so we are closed so exiting - if ( !stateHolder.incrementActiveClients() ) - { - throw new LockClientAlreadyClosedException( String.format( "%s is already closed", this ) ); - } + stateHolder.incrementActiveClients( this ); + try { if ( releaseLocalLock( resourceType, resourceId, sharedLockCounts[resourceType.typeId()] ) ) @@ -450,11 +424,8 @@ public void releaseShared( Locks.ResourceType resourceType, long resourceId ) @Override public void releaseExclusive( Locks.ResourceType resourceType, long resourceId ) { - // increment number of active clients if we can't do so we are closed so exiting - if ( !stateHolder.incrementActiveClients() ) - { - throw new LockClientAlreadyClosedException( String.format( "%s is already closed", this ) ); - } + stateHolder.incrementActiveClients( this ); + try { if ( releaseLocalLock( resourceType, resourceId, exclusiveLockCounts[resourceType.typeId()] ) ) @@ -502,11 +473,7 @@ public void releaseExclusive( Locks.ResourceType resourceType, long resourceId ) @Override public void releaseAll() { - // increment number of active clients if we can't do so we are closed so exiting - if ( !stateHolder.incrementActiveClients() ) - { - throw new LockClientAlreadyClosedException( String.format( "%s is already closed", this ) ); - } + stateHolder.incrementActiveClients( this ); try { @@ -735,34 +702,40 @@ private boolean tryUpgradeToExclusiveWithShareLockHeld( // Now we just wait for all clients to release the the share lock while(sharedLock.numberOfHolders() > 1) { - // client closed exiting - if ( stateHolder.isStopped() ) - { - sharedLock.releaseUpdateLock( this ); - return false; - } - waitStrategies[resourceType.typeId()].apply( tries++ ); + applyWaitStrategy( resourceType, tries++ ); markAsWaitingFor( sharedLock, resourceType, resourceId ); } return true; } - catch(DeadlockDetectedException e) + catch ( DeadlockDetectedException e ) { - sharedLock.releaseUpdateLock(this); + sharedLock.releaseUpdateLock( this ); + // wait list is not cleared here as in other catch blocks because it is cleared in + // markAsWaitingFor() before throwing DeadlockDetectedException throw e; } - catch(Throwable e) + catch ( LockClientStoppedException e ) + { + handleUpgradeToExclusiveFailure( sharedLock ); + throw e; + } + catch ( Throwable e ) { - sharedLock.releaseUpdateLock(this); - clearWaitList(); + handleUpgradeToExclusiveFailure( sharedLock ); throw new RuntimeException( e ); } } return false; } + private void handleUpgradeToExclusiveFailure( SharedLock sharedLock ) + { + sharedLock.releaseUpdateLock( this ); + clearWaitList(); + } + private void clearWaitList() { waitList.clear(); @@ -807,6 +780,22 @@ public int id() return clientId; } + private void applyWaitStrategy( Locks.ResourceType resourceType, int tries ) + { + WaitStrategy waitStrategy = waitStrategies[resourceType.typeId()]; + waitStrategy.apply( tries ); + + assertNotStopped(); + } + + private void assertNotStopped() + { + if ( stateHolder.isStopped() ) + { + throw new LockClientStoppedException( this ); + } + } + // Visitors used for bulk ops on the lock maps (such as releasing all locks) /** diff --git a/enterprise/kernel/src/test/java/org/neo4j/kernel/impl/enterprise/lock/forseti/ForsetiLocksCompatibility.java b/enterprise/kernel/src/test/java/org/neo4j/kernel/impl/enterprise/lock/forseti/ForsetiLocksCompatibilityIT.java similarity index 93% rename from enterprise/kernel/src/test/java/org/neo4j/kernel/impl/enterprise/lock/forseti/ForsetiLocksCompatibility.java rename to enterprise/kernel/src/test/java/org/neo4j/kernel/impl/enterprise/lock/forseti/ForsetiLocksCompatibilityIT.java index d2399ac923bc2..84650a7548299 100644 --- a/enterprise/kernel/src/test/java/org/neo4j/kernel/impl/enterprise/lock/forseti/ForsetiLocksCompatibility.java +++ b/enterprise/kernel/src/test/java/org/neo4j/kernel/impl/enterprise/lock/forseti/ForsetiLocksCompatibilityIT.java @@ -23,7 +23,7 @@ import org.neo4j.kernel.impl.locking.Locks; import org.neo4j.kernel.impl.locking.ResourceTypes; -public class ForsetiLocksCompatibility extends LockingCompatibilityTestSuite +public class ForsetiLocksCompatibilityIT extends LockingCompatibilityTestSuite { @Override protected Locks createLockManager() diff --git a/integrationtests/src/test/java/org/neo4j/server/TransactionTerminationIT.java b/integrationtests/src/test/java/org/neo4j/server/TransactionTerminationIT.java new file mode 100644 index 0000000000000..97a47099110c4 --- /dev/null +++ b/integrationtests/src/test/java/org/neo4j/server/TransactionTerminationIT.java @@ -0,0 +1,193 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.server; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.RuleChain; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.neo4j.function.ThrowingSupplier; +import org.neo4j.graphdb.DependencyResolver; +import org.neo4j.graphdb.DynamicLabel; +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.graphdb.Node; +import org.neo4j.graphdb.Transaction; +import org.neo4j.harness.junit.Neo4jRule; +import org.neo4j.kernel.GraphDatabaseAPI; +import org.neo4j.kernel.api.exceptions.Status; +import org.neo4j.kernel.configuration.Settings; +import org.neo4j.kernel.impl.api.KernelTransactions; +import org.neo4j.kernel.impl.locking.LockClientStoppedException; +import org.neo4j.server.configuration.ServerSettings; +import org.neo4j.server.rest.domain.JsonParseException; +import org.neo4j.test.SuppressOutput; +import org.neo4j.test.server.HTTP.RawPayload; +import org.neo4j.test.server.HTTP.Response; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.neo4j.helpers.collection.IteratorUtil.single; +import static org.neo4j.server.rest.transactional.integration.TransactionMatchers.containsNoErrors; +import static org.neo4j.server.rest.transactional.integration.TransactionMatchers.hasErrors; +import static org.neo4j.test.Assert.assertEventually; +import static org.neo4j.test.server.HTTP.Builder; +import static org.neo4j.test.server.HTTP.RawPayload.quotedJson; +import static org.neo4j.test.server.HTTP.withBaseUri; + +public class TransactionTerminationIT +{ + private final Neo4jRule neo4j = new Neo4jRule() + .withConfig( ServerSettings.auth_enabled, Settings.FALSE ) + .withConfig( KernelTransactions.tx_termination_aware_locks, Settings.TRUE ); + + @Rule + public final RuleChain ruleChain = RuleChain.outerRule( SuppressOutput.suppressAll() ).around( neo4j ); + + private GraphDatabaseService db; + private Builder http; + + @Before + public void setUp() throws Exception + { + db = neo4j.getGraphDatabaseService(); + http = withBaseUri( neo4j.httpURI().toString() ); + } + + @Test + public void terminateTransactionThatWaitsForLock() throws Exception + { + final String labelName = "foo"; + final String property = "bar"; + final long value1 = 1L; + final long value2 = 2L; + + createNode( labelName ); + + final Response tx1 = startTx(); + final Response tx2 = startTx(); + + assertNumberOfActiveTransactions( 2 ); + + Response update1 = executeUpdateStatement( tx1, labelName, property, value1 ); + assertThat( update1.status(), equalTo( 200 ) ); + assertThat( update1, containsNoErrors() ); + + final CountDownLatch latch = new CountDownLatch( 1 ); + Future tx2Result = Executors.newSingleThreadExecutor().submit( new Runnable() + { + @Override + public void run() + { + latch.countDown(); + Response update2 = executeUpdateStatement( tx2, labelName, property, value2 ); + assertEquals( 200, update2.status() ); + assertThat( update2, hasErrors( Status.Statement.ExecutionFailure ) ); + assertThat( update2.rawContent(), containsString( LockClientStoppedException.class.getSimpleName() ) ); + } + } ); + + assertTrue( latch.await( 1, TimeUnit.MINUTES ) ); + Thread.sleep( 2000 ); + + terminate( tx2 ); + commit( tx1 ); + + Response update3 = executeUpdateStatement( tx2, labelName, property, value2 ); + assertThat( update3.status(), equalTo( 404 ) ); + + tx2Result.get( 1, TimeUnit.MINUTES ); + + assertSingleNodeExists( labelName, property, value1 ); + } + + private void createNode( String labelName ) + { + try ( Transaction tx = db.beginTx() ) + { + db.createNode( DynamicLabel.label( labelName ) ); + tx.success(); + } + } + + private Response startTx() + { + Response tx = http.POST( "db/data/transaction" ); + assertThat( tx.status(), equalTo( 201 ) ); + assertThat( tx, containsNoErrors() ); + return tx; + } + + private void commit( Response tx ) throws JsonParseException + { + http.POST( tx.stringFromContent( "commit" ) ); + } + + private void terminate( Response tx ) + { + http.DELETE( tx.location() ); + } + + private Response executeUpdateStatement( Response tx, String labelName, String property, long value ) + { + String updateQuery = "MATCH (n:" + labelName + ") SET n." + property + "=" + value; + RawPayload json = quotedJson( "{'statements': [{'statement':'" + updateQuery + "'}]}" ); + return http.POST( tx.location(), json ); + } + + private void assertNumberOfActiveTransactions( int expectedCount ) + { + ThrowingSupplier txCountSupplier = new ThrowingSupplier() + { + @Override + public Integer get() throws RuntimeException + { + return activeTxCount(); + } + }; + + assertEventually( "Wrong active tx count", txCountSupplier, equalTo( expectedCount ), 1, TimeUnit.MINUTES ); + } + + private int activeTxCount() + { + DependencyResolver resolver = ((GraphDatabaseAPI) db).getDependencyResolver(); + KernelTransactions kernelTransactions = resolver.resolveDependency( KernelTransactions.class ); + return kernelTransactions.activeTransactions().size(); + } + + private void assertSingleNodeExists( String labelName, String property, long value ) + { + try ( Transaction tx = db.beginTx() ) + { + Node node = single( db.findNodes( DynamicLabel.label( labelName ) ) ); + assertEquals( value, node.getProperty( property ) ); + tx.success(); + } + } +}