Skip to content

Commit

Permalink
Make asynchronously terminated transactions release their locks
Browse files Browse the repository at this point in the history
Previously, asynchronously terminated transactions would only release their locks after checking the termination flag,
realising that the transaction had been asynchronously terminated, and then closing their transaction.
This process required that the terminated threads would cooperate and clean up the resources they are holding.
One such resource, a particularly precious one, is the locks acquired by the given transaction.
If a transaction never went into the kernel, it would never discover that it had been terminated, and thus could hold
on to their locks practically indefinitely, in turn blocking other transactions stuck on those locks from making
process.

The reason we did it this way, is because transactions might need to also grab locks during commit, and we cannot allow
asynchronous termination of transactions that have already started their commit process. Transactions that have started
committing will presumably relinquish their resource pretty soon anyway.

So to do this differently, to solve both the problem of letting killed transactions relinquish their locks sooner, and
to avoid killing transactions during commit, a new lock client state called "PREPARE" is introduced.
The lock client is moved to the PREPARE state when the transaction enters the PREPARE phase. Once the lock client is
in PREPARE, it can no longer be moved to the STOPPED state via asynchronous termination.
In the PREPARE state, the lock client will still allow new locks to be taken and released, and the client will move to
the STOPPED state when it is closed.

This mechanism protects the lock client and the commit process, and it allows us to release all locks held by a lock
client, when we asynchronously terminate with the stop() method it before the prepare phase.
This means that transaction timeout is now also effective on transactions that are stuck waiting for network traffic,
or other resources external to the database itself, for instance.
And it means that terminating a transaction will now immediately allow any other transactions, that are otherwise stuck
waiting for locks held by the terminated transaction, to make progress.

In this process, a bug has also been fixed in the listQueries procedure, where the lock count would come from whichever
lock client was used to *plan* the query, and not the lock client associated with the running transaction. The reason
this bug happened, is that the `StackingQueryRegistrationOperations` eagerly grabbed the lock client from the current
statement locks, when creating a lambda for getting the current lock count. The fix for that is to make the lambda
capture the `statement` reference, instead of the reference coming out of the `statement.locks()` call.
  • Loading branch information
chrisvest committed Nov 7, 2017
1 parent 9a93b55 commit d82b866
Show file tree
Hide file tree
Showing 19 changed files with 369 additions and 72 deletions.
Expand Up @@ -73,7 +73,7 @@ public ExecutingQuery startQueryExecution(
String threadName = thread.getName(); String threadName = thread.getName();
ExecutingQuery executingQuery = ExecutingQuery executingQuery =
new ExecutingQuery( queryId, clientConnection, statement.username(), queryText, queryParameters, new ExecutingQuery( queryId, clientConnection, statement.username(), queryText, queryParameters,
statement.getTransaction().getMetaData(), statement.locks()::activeLockCount, statement.getTransaction().getMetaData(), () -> statement.locks().activeLockCount(),
statement.getPageCursorTracer(), statement.getPageCursorTracer(),
threadId, threadName, clock, cpuClock, heapAllocation ); threadId, threadName, clock, cpuClock, heapAllocation );
registerExecutingQuery( statement, executingQuery ); registerExecutingQuery( statement, executingQuery );
Expand Down
Expand Up @@ -23,21 +23,24 @@


/** /**
* State control class for Locks.Clients. * State control class for Locks.Clients.
* Client state represent current Locks.Client state: <b>ACTIVE/STOPPED </b> and number of active clients. * Client state represent current Locks.Client state: <b>ACTIVE/PREPARE/STOPPED</b> and number of active clients.
* <p/> * <p/>
* Client states are: * Client states are:
* <ul> * <ul>
* <li><b>ACTIVE</b> state of fully functional locks client without any restriction or operations limitations</li> * <li><b>ACTIVE</b> state of fully functional locks client without any restriction or operations limitations.</li>
* <li><b>STOPPED</b> all current lock acquisitions will be interrupted/terminated without obtaining * <li><b>PREPARE</b> state prevents transition into STOPPED state, unless forced as part of closing the lock
* corresponding lock, new acquisitions will not be possible anymore, all locks that client holds are preserved.</li> * client.</li>
* <li><b>STOPPED</b> all current lock acquisitions will be interrupted/terminated without obtaining corresponding
* lock, new acquisitions will not be possible anymore, all locks that client holds are preserved.</li>
* </ul> * </ul>
*/ */
public final class LockClientStateHolder public final class LockClientStateHolder
{ {
private static final int FLAG_BITS = 1; private static final int FLAG_BITS = 2;
private static final int CLIENT_BITS = Integer.SIZE - FLAG_BITS; private static final int CLIENT_BITS = Integer.SIZE - FLAG_BITS;
private static final int STATE_BIT_MASK = 1 << CLIENT_BITS;
private static final int STOPPED = 1 << CLIENT_BITS; private static final int STOPPED = 1 << CLIENT_BITS;
private static final int PREPARE = 1 << CLIENT_BITS - 1;
private static final int STATE_BIT_MASK = STOPPED | PREPARE;
private static final int INITIAL_STATE = 0; private static final int INITIAL_STATE = 0;
private AtomicInteger clientState = new AtomicInteger( INITIAL_STATE ); private AtomicInteger clientState = new AtomicInteger( INITIAL_STATE );


Expand All @@ -52,16 +55,57 @@ public boolean hasActiveClients()
} }


/** /**
* Closing current client * Move the client to the PREPARE state, unless it is already STOPPED.
*/ */
public void stopClient() public void prepare( Locks.Client client )
{ {
int currentValue; int currentValue;
int newValue;
do do
{ {
currentValue = clientState.get(); currentValue = clientState.get();
if ( isStopped( currentValue ) )
{
throw new LockClientStoppedException( client );
}
newValue = stateWithNewStatus( currentValue, PREPARE );
}
while ( !clientState.compareAndSet( currentValue, newValue ) );
}

/**
* Move the client to STOPPED, unless it is already in PREPARE.
*/
public boolean stopClient()
{
int currentValue;
int newValue;
do
{
currentValue = clientState.get();
if ( isPrepare( currentValue ) )
{
return false; // Can't stop clients that are in PREPARE
}
newValue = stateWithNewStatus( currentValue, STOPPED );
}
while ( !clientState.compareAndSet( currentValue, newValue ) );
return true;
}

/**
* Move the client to STOPPED as part of closing the current client, regardless of what state it is currently in.
*/
public void closeClient()
{
int currentValue;
int newValue;
do
{
currentValue = clientState.get();
newValue = stateWithNewStatus( currentValue, STOPPED );
} }
while ( !clientState.compareAndSet( currentValue, stateWithNewStatus( currentValue, STOPPED ) ) ); while ( !clientState.compareAndSet( currentValue, newValue ) );
} }


/** /**
Expand Down Expand Up @@ -116,6 +160,11 @@ public void reset()
clientState.set( INITIAL_STATE ); clientState.set( INITIAL_STATE );
} }


private boolean isPrepare( int clientState )
{
return getStatus( clientState ) == PREPARE;
}

private boolean isStopped( int clientState ) private boolean isStopped( int clientState )
{ {
return getStatus( clientState ) == STOPPED; return getStatus( clientState ) == STOPPED;
Expand Down
Expand Up @@ -105,9 +105,21 @@ interface Client extends ResourceLocker, AutoCloseable
void releaseExclusive( ResourceType resourceType, long... resourceIds ); void releaseExclusive( ResourceType resourceType, long... resourceIds );


/** /**
* Stop all active lock waiters and release them. All already held locks remains. * Start preparing this transaction for committing. In two-phase locking palace, we will in principle no longer
* be acquiring any new locks - though we still allow it because it is useful in certain technical situations -
* but when we are ready, we will start releasing them. This also means that we will no longer accept being
* {@link #stop() asynchronously stopped}. From this point on, only the commit process can decide if the
* transaction lives or dies, and in either case, the lock client will end up releasing all locks via the
* {@link #close()} method.
*/
void prepare();

/**
* Stop all active lock waiters and release them.
* All new attempts to acquire any locks will cause exceptions. * All new attempts to acquire any locks will cause exceptions.
* This client can and should only be {@link #close() closed} afterwards. * This client can and should only be {@link #close() closed} afterwards.
* If this client has been {@link #prepare() prepared}, then all currently acquired locks will remain held,
* otherwise they will be released immediately.
*/ */
void stop(); void stop();


Expand Down
Expand Up @@ -70,6 +70,11 @@ public void releaseExclusive( ResourceType resourceType, long... resourceIds )
{ {
} }


@Override
public void prepare()
{
}

@Override @Override
public void stop() public void stop()
{ {
Expand Down
Expand Up @@ -50,6 +50,7 @@ public Locks.Client optimistic()
public void prepareForCommit( LockTracer lockTracer ) public void prepareForCommit( LockTracer lockTracer )
{ {
// Locks where grabbed eagerly by client so no need to prepare // Locks where grabbed eagerly by client so no need to prepare
client.prepare();
} }


@Override @Override
Expand Down
Expand Up @@ -46,7 +46,8 @@ public interface StatementLocks extends AutoCloseable


/** /**
* Prepare the underlying {@link Locks.Client client}(s) for commit. This will grab all locks that have * Prepare the underlying {@link Locks.Client client}(s) for commit. This will grab all locks that have
* previously been taken {@link #optimistic() optimistically}. * previously been taken {@link #optimistic() optimistically}, and tell the underlying lock client to enter the
* <em>prepare</em> state.
* @param lockTracer lock tracer * @param lockTracer lock tracer
*/ */
void prepareForCommit( LockTracer lockTracer ); void prepareForCommit( LockTracer lockTracer );
Expand Down
Expand Up @@ -315,12 +315,26 @@ public void releaseExclusive( ResourceType resourceType, long... resourceIds )
} }
} }


@Override
public void prepare()
{
stateHolder.prepare( this );
}

@Override @Override
public void stop() public void stop()
{ {
// closing client to prevent any new client to come // closing client to prevent any new client to come
stateHolder.stopClient(); if ( stateHolder.stopClient() )
// wake up and terminate waiters {
// wake up and terminate waiters
terminateAllWaitersAndWaitForClientsToLeave();
releaseLocks();
}
}

private void terminateAllWaitersAndWaitForClientsToLeave()
{
terminateAllWaiters(); terminateAllWaiters();
// wait for all active clients to go and terminate latecomers // wait for all active clients to go and terminate latecomers
while ( stateHolder.hasActiveClients() ) while ( stateHolder.hasActiveClients() )
Expand All @@ -333,9 +347,8 @@ public void stop()
@Override @Override
public void close() public void close()
{ {
stop(); stateHolder.closeClient();
// now we are only one who operate on this client terminateAllWaitersAndWaitForClientsToLeave();
// safe to release all the locks
releaseLocks(); releaseLocks();
} }


Expand Down
Expand Up @@ -27,8 +27,11 @@


import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;


import org.neo4j.concurrent.BinaryLatch;
import org.neo4j.graphdb.Node; import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Transaction; import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.factory.GraphDatabaseSettings; import org.neo4j.graphdb.factory.GraphDatabaseSettings;
Expand All @@ -38,17 +41,26 @@
import org.neo4j.test.rule.DatabaseRule; import org.neo4j.test.rule.DatabaseRule;
import org.neo4j.test.rule.EmbeddedDatabaseRule; import org.neo4j.test.rule.EmbeddedDatabaseRule;


import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class KernelTransactionTimeoutMonitorIT public class KernelTransactionTimeoutMonitorIT
{ {
@Rule @Rule
public DatabaseRule database = new EmbeddedDatabaseRule() public DatabaseRule database = createDatabaseRule();
.withSetting( GraphDatabaseSettings.transaction_monitor_check_interval, "100ms" );
@Rule @Rule
public ExpectedException expectedException = ExpectedException.none(); public ExpectedException expectedException = ExpectedException.none();


private static final int NODE_ID = 0; private static final int NODE_ID = 0;
private ExecutorService executor; private ExecutorService executor;


protected DatabaseRule createDatabaseRule()
{
return new EmbeddedDatabaseRule()
.withSetting( GraphDatabaseSettings.transaction_monitor_check_interval, "100ms" );
}

@Before @Before
public void setUp() throws Exception public void setUp() throws Exception
{ {
Expand Down Expand Up @@ -80,6 +92,52 @@ public void terminateExpiredTransaction() throws Exception
} }
} }


@Test( timeout = 30_000 )
public void terminatingTransactionMustEagerlyReleaseTheirLocks() throws Exception
{
AtomicBoolean nodeLockAcquired = new AtomicBoolean();
AtomicBoolean lockerDone = new AtomicBoolean();
BinaryLatch lockerPause = new BinaryLatch();
long nodeId;
try ( Transaction tx = database.beginTx() )
{
nodeId = database.createNode().getId();
tx.success();
}
Future<?> locker = executor.submit( () ->
{
try ( Transaction tx = database.beginTx( 100, TimeUnit.MILLISECONDS ) )
{
Node node = database.getNodeById( nodeId );
tx.acquireReadLock( node );
nodeLockAcquired.set( true );
lockerPause.await();
}
lockerDone.set( true );
} );

boolean proceed;
do
{
proceed = nodeLockAcquired.get();
}
while ( !proceed );

Thread.sleep( 150 ); // locker should be stopped by now
assertFalse( lockerDone.get() ); // but still blocked on the latch
// Yet we should be able to proceed and grab the locks they once held
try ( Transaction tx = database.beginTx() )
{
// Write-locking is only possible if their shared lock was released
tx.acquireWriteLock( database.getNodeById( nodeId ) );
tx.success();
}
// No exception from our lock client being stopped (e.g. we ended up blocked for too long) or from timeout
lockerPause.release();
locker.get();
assertTrue( lockerDone.get() );
}

private Runnable startAnotherTransaction() private Runnable startAnotherTransaction()
{ {
return () -> return () ->
Expand Down

0 comments on commit d82b866

Please sign in to comment.