From d82b86663713b4b273506667ceb2ea9cc753dea9 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Thu, 26 Oct 2017 19:04:26 +0200 Subject: [PATCH] Make asynchronously terminated transactions release their locks Previously, asynchronously terminated transactions would only release their locks after checking the termination flag, realising that the transaction had been asynchronously terminated, and then closing their transaction. This process required that the terminated threads would cooperate and clean up the resources they are holding. One such resource, a particularly precious one, is the locks acquired by the given transaction. If a transaction never went into the kernel, it would never discover that it had been terminated, and thus could hold on to their locks practically indefinitely, in turn blocking other transactions stuck on those locks from making process. The reason we did it this way, is because transactions might need to also grab locks during commit, and we cannot allow asynchronous termination of transactions that have already started their commit process. Transactions that have started committing will presumably relinquish their resource pretty soon anyway. So to do this differently, to solve both the problem of letting killed transactions relinquish their locks sooner, and to avoid killing transactions during commit, a new lock client state called "PREPARE" is introduced. The lock client is moved to the PREPARE state when the transaction enters the PREPARE phase. Once the lock client is in PREPARE, it can no longer be moved to the STOPPED state via asynchronous termination. In the PREPARE state, the lock client will still allow new locks to be taken and released, and the client will move to the STOPPED state when it is closed. This mechanism protects the lock client and the commit process, and it allows us to release all locks held by a lock client, when we asynchronously terminate with the stop() method it before the prepare phase. This means that transaction timeout is now also effective on transactions that are stuck waiting for network traffic, or other resources external to the database itself, for instance. And it means that terminating a transaction will now immediately allow any other transactions, that are otherwise stuck waiting for locks held by the terminated transaction, to make progress. In this process, a bug has also been fixed in the listQueries procedure, where the lock count would come from whichever lock client was used to *plan* the query, and not the lock client associated with the running transaction. The reason this bug happened, is that the `StackingQueryRegistrationOperations` eagerly grabbed the lock client from the current statement locks, when creating a lambda for getting the current lock count. The fix for that is to make the lambda capture the `statement` reference, instead of the reference coming out of the `statement.locks()` call. --- .../StackingQueryRegistrationOperations.java | 2 +- .../impl/locking/LockClientStateHolder.java | 67 +++++++- .../org/neo4j/kernel/impl/locking/Locks.java | 14 +- .../neo4j/kernel/impl/locking/NoOpClient.java | 5 + .../impl/locking/SimpleStatementLocks.java | 1 + .../kernel/impl/locking/StatementLocks.java | 3 +- .../community/CommunityLockClient.java | 23 ++- .../KernelTransactionTimeoutMonitorIT.java | 62 ++++++- .../impl/locking/StopCompatibility.java | 154 ++++++++++++------ .../machines/locks/LeaderOnlyLockManager.java | 6 + .../readreplica/ReadReplicaLockManager.java | 5 + .../impl/locking/DeferringLockClient.java | 6 + .../impl/locking/DeferringStatementLocks.java | 1 + .../impl/locking/DeferringLockClientTest.java | 19 +++ .../locking/DeferringStatementLocksTest.java | 2 + .../kernel/ha/lock/SlaveLocksClient.java | 6 + .../lock/forseti/ForsetiClient.java | 27 ++- .../lock/forseti/ForsetiLocksFactory.java | 4 +- ...ransactionTimeoutMonitorWithForsetiIT.java | 34 ++++ 19 files changed, 369 insertions(+), 72 deletions(-) create mode 100644 enterprise/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionTimeoutMonitorWithForsetiIT.java diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/StackingQueryRegistrationOperations.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/StackingQueryRegistrationOperations.java index e694840213f31..5e4c6c99642b8 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/StackingQueryRegistrationOperations.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/StackingQueryRegistrationOperations.java @@ -73,7 +73,7 @@ public ExecutingQuery startQueryExecution( String threadName = thread.getName(); ExecutingQuery executingQuery = new ExecutingQuery( queryId, clientConnection, statement.username(), queryText, queryParameters, - statement.getTransaction().getMetaData(), statement.locks()::activeLockCount, + statement.getTransaction().getMetaData(), () -> statement.locks().activeLockCount(), statement.getPageCursorTracer(), threadId, threadName, clock, cpuClock, heapAllocation ); registerExecutingQuery( statement, executingQuery ); diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/LockClientStateHolder.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/LockClientStateHolder.java index de8e99086c296..e71285556bee8 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/LockClientStateHolder.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/LockClientStateHolder.java @@ -23,21 +23,24 @@ /** * State control class for Locks.Clients. - * Client state represent current Locks.Client state: ACTIVE/STOPPED and number of active clients. + * Client state represent current Locks.Client state: ACTIVE/PREPARE/STOPPED and number of active clients. *

* Client states are: *

*/ public final class LockClientStateHolder { - private static final int FLAG_BITS = 1; + private static final int FLAG_BITS = 2; private static final int CLIENT_BITS = Integer.SIZE - FLAG_BITS; - private static final int STATE_BIT_MASK = 1 << CLIENT_BITS; private static final int STOPPED = 1 << CLIENT_BITS; + private static final int PREPARE = 1 << CLIENT_BITS - 1; + private static final int STATE_BIT_MASK = STOPPED | PREPARE; private static final int INITIAL_STATE = 0; private AtomicInteger clientState = new AtomicInteger( INITIAL_STATE ); @@ -52,16 +55,57 @@ public boolean hasActiveClients() } /** - * Closing current client + * Move the client to the PREPARE state, unless it is already STOPPED. */ - public void stopClient() + public void prepare( Locks.Client client ) { int currentValue; + int newValue; do { currentValue = clientState.get(); + if ( isStopped( currentValue ) ) + { + throw new LockClientStoppedException( client ); + } + newValue = stateWithNewStatus( currentValue, PREPARE ); + } + while ( !clientState.compareAndSet( currentValue, newValue ) ); + } + + /** + * Move the client to STOPPED, unless it is already in PREPARE. + */ + public boolean stopClient() + { + int currentValue; + int newValue; + do + { + currentValue = clientState.get(); + if ( isPrepare( currentValue ) ) + { + return false; // Can't stop clients that are in PREPARE + } + newValue = stateWithNewStatus( currentValue, STOPPED ); + } + while ( !clientState.compareAndSet( currentValue, newValue ) ); + return true; + } + + /** + * Move the client to STOPPED as part of closing the current client, regardless of what state it is currently in. + */ + public void closeClient() + { + int currentValue; + int newValue; + do + { + currentValue = clientState.get(); + newValue = stateWithNewStatus( currentValue, STOPPED ); } - while ( !clientState.compareAndSet( currentValue, stateWithNewStatus( currentValue, STOPPED ) ) ); + while ( !clientState.compareAndSet( currentValue, newValue ) ); } /** @@ -116,6 +160,11 @@ public void reset() clientState.set( INITIAL_STATE ); } + private boolean isPrepare( int clientState ) + { + return getStatus( clientState ) == PREPARE; + } + private boolean isStopped( int clientState ) { return getStatus( clientState ) == STOPPED; diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/Locks.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/Locks.java index 922fbef32cd83..62bcaf4e41055 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/Locks.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/Locks.java @@ -105,9 +105,21 @@ interface Client extends ResourceLocker, AutoCloseable void releaseExclusive( ResourceType resourceType, long... resourceIds ); /** - * Stop all active lock waiters and release them. All already held locks remains. + * Start preparing this transaction for committing. In two-phase locking palace, we will in principle no longer + * be acquiring any new locks - though we still allow it because it is useful in certain technical situations - + * but when we are ready, we will start releasing them. This also means that we will no longer accept being + * {@link #stop() asynchronously stopped}. From this point on, only the commit process can decide if the + * transaction lives or dies, and in either case, the lock client will end up releasing all locks via the + * {@link #close()} method. + */ + void prepare(); + + /** + * Stop all active lock waiters and release them. * All new attempts to acquire any locks will cause exceptions. * This client can and should only be {@link #close() closed} afterwards. + * If this client has been {@link #prepare() prepared}, then all currently acquired locks will remain held, + * otherwise they will be released immediately. */ void stop(); diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/NoOpClient.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/NoOpClient.java index b999ed24e92ee..f5b20d46506d0 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/NoOpClient.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/NoOpClient.java @@ -70,6 +70,11 @@ public void releaseExclusive( ResourceType resourceType, long... resourceIds ) { } + @Override + public void prepare() + { + } + @Override public void stop() { diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/SimpleStatementLocks.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/SimpleStatementLocks.java index 00d7bb4f3944c..00e698090db74 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/SimpleStatementLocks.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/SimpleStatementLocks.java @@ -50,6 +50,7 @@ public Locks.Client optimistic() public void prepareForCommit( LockTracer lockTracer ) { // Locks where grabbed eagerly by client so no need to prepare + client.prepare(); } @Override diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/StatementLocks.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/StatementLocks.java index 5f6bb3d4d9d3d..4d109346d1a84 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/StatementLocks.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/StatementLocks.java @@ -46,7 +46,8 @@ public interface StatementLocks extends AutoCloseable /** * Prepare the underlying {@link Locks.Client client}(s) for commit. This will grab all locks that have - * previously been taken {@link #optimistic() optimistically}. + * previously been taken {@link #optimistic() optimistically}, and tell the underlying lock client to enter the + * prepare state. * @param lockTracer lock tracer */ void prepareForCommit( LockTracer lockTracer ); diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/community/CommunityLockClient.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/community/CommunityLockClient.java index ecaff2cdd97c9..60901a219789a 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/community/CommunityLockClient.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/community/CommunityLockClient.java @@ -315,12 +315,26 @@ public void releaseExclusive( ResourceType resourceType, long... resourceIds ) } } + @Override + public void prepare() + { + stateHolder.prepare( this ); + } + @Override public void stop() { // closing client to prevent any new client to come - stateHolder.stopClient(); - // wake up and terminate waiters + if ( stateHolder.stopClient() ) + { + // wake up and terminate waiters + terminateAllWaitersAndWaitForClientsToLeave(); + releaseLocks(); + } + } + + private void terminateAllWaitersAndWaitForClientsToLeave() + { terminateAllWaiters(); // wait for all active clients to go and terminate latecomers while ( stateHolder.hasActiveClients() ) @@ -333,9 +347,8 @@ public void stop() @Override public void close() { - stop(); - // now we are only one who operate on this client - // safe to release all the locks + stateHolder.closeClient(); + terminateAllWaitersAndWaitForClientsToLeave(); releaseLocks(); } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionTimeoutMonitorIT.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionTimeoutMonitorIT.java index fe3bee9969add..4e22da7f1a8f9 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionTimeoutMonitorIT.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionTimeoutMonitorIT.java @@ -27,8 +27,11 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.neo4j.concurrent.BinaryLatch; import org.neo4j.graphdb.Node; import org.neo4j.graphdb.Transaction; import org.neo4j.graphdb.factory.GraphDatabaseSettings; @@ -38,17 +41,26 @@ import org.neo4j.test.rule.DatabaseRule; import org.neo4j.test.rule.EmbeddedDatabaseRule; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + public class KernelTransactionTimeoutMonitorIT { @Rule - public DatabaseRule database = new EmbeddedDatabaseRule() - .withSetting( GraphDatabaseSettings.transaction_monitor_check_interval, "100ms" ); + public DatabaseRule database = createDatabaseRule(); + @Rule public ExpectedException expectedException = ExpectedException.none(); private static final int NODE_ID = 0; private ExecutorService executor; + protected DatabaseRule createDatabaseRule() + { + return new EmbeddedDatabaseRule() + .withSetting( GraphDatabaseSettings.transaction_monitor_check_interval, "100ms" ); + } + @Before public void setUp() throws Exception { @@ -80,6 +92,52 @@ public void terminateExpiredTransaction() throws Exception } } + @Test( timeout = 30_000 ) + public void terminatingTransactionMustEagerlyReleaseTheirLocks() throws Exception + { + AtomicBoolean nodeLockAcquired = new AtomicBoolean(); + AtomicBoolean lockerDone = new AtomicBoolean(); + BinaryLatch lockerPause = new BinaryLatch(); + long nodeId; + try ( Transaction tx = database.beginTx() ) + { + nodeId = database.createNode().getId(); + tx.success(); + } + Future locker = executor.submit( () -> + { + try ( Transaction tx = database.beginTx( 100, TimeUnit.MILLISECONDS ) ) + { + Node node = database.getNodeById( nodeId ); + tx.acquireReadLock( node ); + nodeLockAcquired.set( true ); + lockerPause.await(); + } + lockerDone.set( true ); + } ); + + boolean proceed; + do + { + proceed = nodeLockAcquired.get(); + } + while ( !proceed ); + + Thread.sleep( 150 ); // locker should be stopped by now + assertFalse( lockerDone.get() ); // but still blocked on the latch + // Yet we should be able to proceed and grab the locks they once held + try ( Transaction tx = database.beginTx() ) + { + // Write-locking is only possible if their shared lock was released + tx.acquireWriteLock( database.getNodeById( nodeId ) ); + tx.success(); + } + // No exception from our lock client being stopped (e.g. we ended up blocked for too long) or from timeout + lockerPause.release(); + locker.get(); + assertTrue( lockerDone.get() ); + } + private Runnable startAnotherTransaction() { return () -> diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/StopCompatibility.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/StopCompatibility.java index 03177131924d9..ef98f4a94bf03 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/StopCompatibility.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/StopCompatibility.java @@ -20,7 +20,6 @@ package org.neo4j.kernel.impl.locking; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; @@ -54,9 +53,9 @@ @Ignore( "Not a test. This is a compatibility suite, run from LockingCompatibilityTestSuite." ) public class StopCompatibility extends LockingCompatibilityTestSuite.Compatibility { - private static final ResourceType RESOURCE_TYPE = ResourceTypes.NODE; - private static final long RESOURCE_ID = 42; - private static final long OTHER_RESOURCE_ID = 4242; + private static final long FIRST_NODE_ID = 42; + private static final long SECOND_NODE_ID = 4242; + private static final LockTracer TRACER = LockTracer.NONE; private Locks.Client client; @@ -78,77 +77,136 @@ public void tearDown() throws Exception } @Test - public void releaseWriteLockWaitersOnStop() + public void mustReleaseWriteLockWaitersOnStop() { - // given - clientA.acquireShared( LockTracer.NONE, NODE, 1L ); - clientB.acquireShared( LockTracer.NONE, NODE, 2L ); - clientC.acquireShared( LockTracer.NONE, NODE, 3L ); - acquireExclusive( clientB, LockTracer.NONE, NODE, 1L ).callAndAssertWaiting(); - acquireExclusive( clientC, LockTracer.NONE, NODE, 1L ).callAndAssertWaiting(); + // Given + clientA.acquireShared( TRACER, NODE, 1L ); + clientB.acquireShared( TRACER, NODE, 2L ); + clientC.acquireShared( TRACER, NODE, 3L ); + acquireExclusive( clientB, TRACER, NODE, 1L ).callAndAssertWaiting(); + acquireExclusive( clientC, TRACER, NODE, 1L ).callAndAssertWaiting(); - // when + // When clientC.stop(); clientB.stop(); clientA.stop(); - // all locks clients should be stopped at this point and all clients should still hold their shared locks + // All locks clients should be stopped at this point, and all all locks should be released because none of the + // clients entered the prepare phase LockCountVisitor lockCountVisitor = new LockCountVisitor(); locks.accept( lockCountVisitor ); - Assert.assertEquals( 3, lockCountVisitor.getLockCount() ); + assertEquals( 0, lockCountVisitor.getLockCount() ); } @Test - public void releaseReadLockWaitersOnStop() - { // given - clientA.acquireExclusive( LockTracer.NONE, NODE, 1L ); - clientB.acquireExclusive( LockTracer.NONE, NODE, 2L ); - acquireShared( clientB, LockTracer.NONE, NODE, 1L ).callAndAssertWaiting(); + public void mustNotReleaseLocksAfterPrepareOnStop() throws Exception + { + // Given + clientA.acquireShared( TRACER, NODE, 1L ); + clientA.acquireExclusive( TRACER, NODE, 2L ); + clientA.prepare(); + + // When + clientA.stop(); + + // The client entered the prepare phase, so it gets to keep its locks + LockCountVisitor lockCountVisitor = new LockCountVisitor(); + locks.accept( lockCountVisitor ); + assertEquals( 2, lockCountVisitor.getLockCount() ); + } + + @Test + public void mustReleaseUnpreparedLocksOnStop() throws Exception + { + // Given + clientA.acquireShared( TRACER, NODE, 1L ); + clientA.acquireExclusive( TRACER, NODE, 2L ); + + // When + clientA.stop(); + + // The client was stopped before it could enter the prepare phase, so all of its locks are released + LockCountVisitor lockCountVisitor = new LockCountVisitor(); + locks.accept( lockCountVisitor ); + assertEquals( 0, lockCountVisitor.getLockCount() ); + } - // when + @Test + public void mustReleaseReadLockWaitersOnStop() + { + // Given + clientA.acquireExclusive( TRACER, NODE, 1L ); + clientB.acquireExclusive( TRACER, NODE, 2L ); + acquireShared( clientB, TRACER, NODE, 1L ).callAndAssertWaiting(); + + // When clientB.stop(); clientA.stop(); - // all locks clients should be stopped at this point and all clients should still hold their exclusive locks + // All locks clients should be stopped at this point, and all all locks should be released because none of the + // clients entered the prepare phase + LockCountVisitor lockCountVisitor = new LockCountVisitor(); + locks.accept( lockCountVisitor ); + assertEquals( 0, lockCountVisitor.getLockCount() ); + } + + @Test + public void prepareMustAllowAcquiringNewLocksAfterStop() throws Exception + { + // Given + clientA.prepare(); + clientA.stop(); + + // When + clientA.acquireShared( TRACER, NODE, 1 ); + clientA.acquireExclusive( TRACER, NODE, 2 ); + + // Stopped essentially has no effect when it comes after the client has entered the prepare phase LockCountVisitor lockCountVisitor = new LockCountVisitor(); locks.accept( lockCountVisitor ); - Assert.assertEquals( 2, lockCountVisitor.getLockCount() ); + assertEquals( 2, lockCountVisitor.getLockCount() ); + } + + @Test( expected = LockClientStoppedException.class ) + public void prepareMustThrowWhenClientStopped() throws Exception + { + stoppedClient().prepare(); } @Test( expected = LockClientStoppedException.class ) public void acquireSharedThrowsWhenClientStopped() { - stoppedClient().acquireShared( LockTracer.NONE, ResourceTypes.NODE, 1 ); + stoppedClient().acquireShared( TRACER, NODE, 1 ); } @Test( expected = LockClientStoppedException.class ) public void acquireExclusiveThrowsWhenClientStopped() { - stoppedClient().acquireExclusive( LockTracer.NONE, ResourceTypes.NODE, 1 ); + stoppedClient().acquireExclusive( TRACER, NODE, 1 ); } @Test( expected = LockClientStoppedException.class ) public void trySharedLockThrowsWhenClientStopped() { - stoppedClient().trySharedLock( ResourceTypes.NODE, 1 ); + stoppedClient().trySharedLock( NODE, 1 ); } @Test( expected = LockClientStoppedException.class ) public void tryExclusiveLockThrowsWhenClientStopped() { - stoppedClient().tryExclusiveLock( ResourceTypes.NODE, 1 ); + stoppedClient().tryExclusiveLock( NODE, 1 ); } @Test( expected = LockClientStoppedException.class ) public void releaseSharedThrowsWhenClientStopped() { - stoppedClient().releaseShared( ResourceTypes.NODE, 1 ); + stoppedClient().releaseShared( NODE, 1 ); } @Test( expected = LockClientStoppedException.class ) public void releaseExclusiveThrowsWhenClientStopped() { - stoppedClient().releaseExclusive( ResourceTypes.NODE, 1 ); + stoppedClient().releaseExclusive( NODE, 1 ); } @Test @@ -323,11 +381,11 @@ private void closeClientAfterLockStopped( boolean shared ) throws Exception await( firstLockAcquired ); assertThreadIsWaitingForLock( acquisition ); - assertLocksHeld( RESOURCE_ID, OTHER_RESOURCE_ID ); + assertLocksHeld( FIRST_NODE_ID, SECOND_NODE_ID ); acquisition.stop(); assertLockAcquisitionFailed( acquisition ); - assertLocksHeld( RESOURCE_ID ); + assertLocksHeld( FIRST_NODE_ID ); thisThreadsExclusiveLock.release(); assertNoLocksHeld(); @@ -356,16 +414,16 @@ private void acquireLockAfterOtherLockStoppedSameThread( boolean firstLockShared private AcquiredLock acquireSharedLockInThisThread() { - client.acquireShared( LockTracer.NONE, RESOURCE_TYPE, RESOURCE_ID ); - assertLocksHeld( RESOURCE_ID ); - return AcquiredLock.shared( client, RESOURCE_TYPE, RESOURCE_ID ); + client.acquireShared( TRACER, NODE, FIRST_NODE_ID ); + assertLocksHeld( FIRST_NODE_ID ); + return AcquiredLock.shared( client, NODE, FIRST_NODE_ID ); } private AcquiredLock acquireExclusiveLockInThisThread() { - client.acquireExclusive( LockTracer.NONE, RESOURCE_TYPE, RESOURCE_ID ); - assertLocksHeld( RESOURCE_ID ); - return AcquiredLock.exclusive( client, RESOURCE_TYPE, RESOURCE_ID ); + client.acquireExclusive( TRACER, NODE, FIRST_NODE_ID ); + assertLocksHeld( FIRST_NODE_ID ); + return AcquiredLock.exclusive( client, NODE, FIRST_NODE_ID ); } private LockAcquisition acquireSharedLockInAnotherThread() @@ -387,11 +445,11 @@ private LockAcquisition acquireLockInAnotherThread( final boolean shared ) Locks.Client client = newLockClient( lockAcquisition ); if ( shared ) { - client.acquireShared( LockTracer.NONE, RESOURCE_TYPE, RESOURCE_ID ); + client.acquireShared( TRACER, NODE, FIRST_NODE_ID ); } else { - client.acquireExclusive( LockTracer.NONE, RESOURCE_TYPE, RESOURCE_ID ); + client.acquireExclusive( TRACER, NODE, FIRST_NODE_ID ); } return null; } ); @@ -413,11 +471,11 @@ private LockAcquisition acquireTwoLocksInAnotherThread( final boolean firstShare { if ( firstShared ) { - client.acquireShared( LockTracer.NONE, RESOURCE_TYPE, RESOURCE_ID ); + client.acquireShared( TRACER, NODE, FIRST_NODE_ID ); } else { - client.acquireExclusive( LockTracer.NONE, RESOURCE_TYPE, RESOURCE_ID ); + client.acquireExclusive( TRACER, NODE, FIRST_NODE_ID ); } fail( "Transaction termination expected" ); } @@ -435,11 +493,11 @@ private LockAcquisition acquireTwoLocksInAnotherThread( final boolean firstShare { if ( secondShared ) { - client.acquireShared( LockTracer.NONE, RESOURCE_TYPE, RESOURCE_ID ); + client.acquireShared( TRACER, NODE, FIRST_NODE_ID ); } else { - client.acquireExclusive( LockTracer.NONE, RESOURCE_TYPE, RESOURCE_ID ); + client.acquireExclusive( TRACER, NODE, FIRST_NODE_ID ); } } return null; @@ -458,12 +516,12 @@ private LockAcquisition acquireSharedAndExclusiveLocksInAnotherThread( final Cou { try ( Locks.Client client = newLockClient( lockAcquisition ) ) { - client.acquireShared( LockTracer.NONE, RESOURCE_TYPE, RESOURCE_ID ); + client.acquireShared( TRACER, NODE, FIRST_NODE_ID ); sharedLockAcquired.countDown(); await( startExclusiveLock ); - client.acquireExclusive( LockTracer.NONE, RESOURCE_TYPE, RESOURCE_ID ); + client.acquireExclusive( TRACER, NODE, FIRST_NODE_ID ); } return null; } ); @@ -483,22 +541,22 @@ private LockAcquisition tryAcquireTwoLocksLockInAnotherThread( final boolean sha { if ( shared ) { - client.acquireShared( LockTracer.NONE, RESOURCE_TYPE, OTHER_RESOURCE_ID ); + client.acquireShared( TRACER, NODE, SECOND_NODE_ID ); } else { - client.acquireExclusive( LockTracer.NONE, RESOURCE_TYPE, OTHER_RESOURCE_ID ); + client.acquireExclusive( TRACER, NODE, SECOND_NODE_ID ); } firstLockAcquired.countDown(); if ( shared ) { - client.acquireShared( LockTracer.NONE, RESOURCE_TYPE, RESOURCE_ID ); + client.acquireShared( TRACER, NODE, FIRST_NODE_ID ); } else { - client.acquireExclusive( LockTracer.NONE, RESOURCE_TYPE, RESOURCE_ID ); + client.acquireExclusive( TRACER, NODE, FIRST_NODE_ID ); } } return null; diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/locks/LeaderOnlyLockManager.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/locks/LeaderOnlyLockManager.java index 6eb17931b76f4..04def63456a8e 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/locks/LeaderOnlyLockManager.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/locks/LeaderOnlyLockManager.java @@ -259,6 +259,12 @@ public void releaseExclusive( ResourceType resourceType, long... resourceIds ) localClient.releaseExclusive( resourceType, resourceIds ); } + @Override + public void prepare() + { + localClient.prepare(); + } + @Override public void stop() { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ReadReplicaLockManager.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ReadReplicaLockManager.java index 98625f6dedf63..786876a3be03f 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ReadReplicaLockManager.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/ReadReplicaLockManager.java @@ -94,6 +94,11 @@ public void releaseExclusive( ResourceType resourceType, long... resourceIds ) throw new IllegalStateException( "Should never happen" ); } + @Override + public void prepare() + { + } + @Override public void stop() { diff --git a/enterprise/deferred-locks/src/main/java/org/neo4j/kernel/impl/locking/DeferringLockClient.java b/enterprise/deferred-locks/src/main/java/org/neo4j/kernel/impl/locking/DeferringLockClient.java index a689b9f8713ca..c8e06992dbf0c 100644 --- a/enterprise/deferred-locks/src/main/java/org/neo4j/kernel/impl/locking/DeferringLockClient.java +++ b/enterprise/deferred-locks/src/main/java/org/neo4j/kernel/impl/locking/DeferringLockClient.java @@ -157,6 +157,12 @@ private void flushLocks( LockTracer lockTracer, long[] current, int cursor, Reso } } + @Override + public void prepare() + { + clientDelegate.prepare(); + } + @Override public void stop() { diff --git a/enterprise/deferred-locks/src/main/java/org/neo4j/kernel/impl/locking/DeferringStatementLocks.java b/enterprise/deferred-locks/src/main/java/org/neo4j/kernel/impl/locking/DeferringStatementLocks.java index 9e115997b8b0c..66e6549209014 100644 --- a/enterprise/deferred-locks/src/main/java/org/neo4j/kernel/impl/locking/DeferringStatementLocks.java +++ b/enterprise/deferred-locks/src/main/java/org/neo4j/kernel/impl/locking/DeferringStatementLocks.java @@ -52,6 +52,7 @@ public Locks.Client optimistic() public void prepareForCommit( LockTracer lockTracer ) { implicit.acquireDeferredLocks( lockTracer ); + explicit.prepare(); } @Override diff --git a/enterprise/deferred-locks/src/test/java/org/neo4j/kernel/impl/locking/DeferringLockClientTest.java b/enterprise/deferred-locks/src/test/java/org/neo4j/kernel/impl/locking/DeferringLockClientTest.java index 72e11447472d1..55a3c913654ca 100644 --- a/enterprise/deferred-locks/src/test/java/org/neo4j/kernel/impl/locking/DeferringLockClientTest.java +++ b/enterprise/deferred-locks/src/test/java/org/neo4j/kernel/impl/locking/DeferringLockClientTest.java @@ -136,6 +136,20 @@ public void shouldStopUnderlyingClient() throws Exception verify( actualClient ).stop(); } + @Test + public void shouldPrepareUnderlyingClient() throws Exception + { + // GIVEN + Locks.Client actualClient = mock( Locks.Client.class ); + DeferringLockClient client = new DeferringLockClient( actualClient ); + + // WHEN + client.prepare(); + + // THEN + verify( actualClient ).prepare(); + } + @Test public void shouldCloseUnderlyingClient() throws Exception { @@ -466,6 +480,11 @@ public void releaseExclusive( ResourceType resourceType, long... resourceIds ) { } + @Override + public void prepare() + { + } + @Override public void stop() { diff --git a/enterprise/deferred-locks/src/test/java/org/neo4j/kernel/impl/locking/DeferringStatementLocksTest.java b/enterprise/deferred-locks/src/test/java/org/neo4j/kernel/impl/locking/DeferringStatementLocksTest.java index 92fb68f0b93dc..562ce84d78551 100644 --- a/enterprise/deferred-locks/src/test/java/org/neo4j/kernel/impl/locking/DeferringStatementLocksTest.java +++ b/enterprise/deferred-locks/src/test/java/org/neo4j/kernel/impl/locking/DeferringStatementLocksTest.java @@ -59,6 +59,7 @@ public void shouldDoNothingWithClientWhenPreparingForCommitWithNoLocksAcquired() statementLocks.prepareForCommit( LockTracer.NONE ); // THEN + verify( client ).prepare(); verifyNoMoreInteractions( client ); } @@ -76,6 +77,7 @@ public void shouldPrepareExplicitForCommitWhenLocksAcquire() throws Exception statementLocks.prepareForCommit( LockTracer.NONE ); // THEN + verify( client ).prepare(); verify( client ).acquireExclusive( LockTracer.NONE, ResourceTypes.NODE, 1 ); verify( client ).acquireExclusive( LockTracer.NONE, ResourceTypes.RELATIONSHIP, 42 ); verifyNoMoreInteractions( client ); diff --git a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/lock/SlaveLocksClient.java b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/lock/SlaveLocksClient.java index 5d02b720ff97a..0183890661ecd 100644 --- a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/lock/SlaveLocksClient.java +++ b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/lock/SlaveLocksClient.java @@ -186,6 +186,12 @@ public void releaseExclusive( ResourceType resourceType, long... resourceIds ) client.releaseExclusive( resourceType, resourceIds ); } + @Override + public void prepare() + { + client.prepare(); + } + @Override public void stop() { diff --git a/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/enterprise/lock/forseti/ForsetiClient.java b/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/enterprise/lock/forseti/ForsetiClient.java index c5c45227fc178..932235699cbd3 100644 --- a/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/enterprise/lock/forseti/ForsetiClient.java +++ b/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/enterprise/lock/forseti/ForsetiClient.java @@ -678,12 +678,26 @@ private void releaseAllClientLocks() } } + @Override + public void prepare() + { + stateHolder.prepare( this ); + } + @Override public void stop() { // marking client as closed - stateHolder.stopClient(); - // waiting for all operations to be completed + if ( stateHolder.stopClient() ) + { + // waiting for all operations to be completed + waitForAllClientsToLeave(); + releaseAllLocks(); + } + } + + private void waitForAllClientsToLeave() + { while ( stateHolder.hasActiveClients() ) { try @@ -700,14 +714,19 @@ public void stop() @Override public void close() { - stop(); + stateHolder.closeClient(); + waitForAllClientsToLeave(); + releaseAllLocks(); + } + + private void releaseAllLocks() + { if ( hasLocks ) { releaseAllClientLocks(); clearWaitList(); hasLocks = false; } - clientPool.release( this ); } @Override diff --git a/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/enterprise/lock/forseti/ForsetiLocksFactory.java b/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/enterprise/lock/forseti/ForsetiLocksFactory.java index 7c570dc47e0f4..608e84f737218 100644 --- a/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/enterprise/lock/forseti/ForsetiLocksFactory.java +++ b/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/enterprise/lock/forseti/ForsetiLocksFactory.java @@ -30,9 +30,11 @@ @Service.Implementation( Locks.Factory.class ) public class ForsetiLocksFactory extends Locks.Factory { + public static final String KEY = "forseti"; + public ForsetiLocksFactory() { - super( "forseti" ); + super( KEY ); } @Override diff --git a/enterprise/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionTimeoutMonitorWithForsetiIT.java b/enterprise/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionTimeoutMonitorWithForsetiIT.java new file mode 100644 index 0000000000000..5f39111a030b7 --- /dev/null +++ b/enterprise/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionTimeoutMonitorWithForsetiIT.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.impl.api; + +import org.neo4j.kernel.impl.enterprise.lock.forseti.ForsetiLocksFactory; +import org.neo4j.test.rule.DatabaseRule; + +import static org.neo4j.kernel.impl.factory.GraphDatabaseFacadeFactory.Configuration.lock_manager; + +public class KernelTransactionTimeoutMonitorWithForsetiIT extends KernelTransactionTimeoutMonitorIT +{ + @Override + protected DatabaseRule createDatabaseRule() + { + return super.createDatabaseRule().withSetting( lock_manager, ForsetiLocksFactory.KEY ); + } +}