Skip to content

Commit

Permalink
Add tests for transaction states transitions.
Browse files Browse the repository at this point in the history
Integration tests for transactions termination on shutdown.
Introduce shutdown state to be able to track transactions that are interrupted by db shutdown.
  • Loading branch information
MishaDemianenko committed Dec 13, 2016
1 parent 7496fb5 commit b6728f7
Show file tree
Hide file tree
Showing 13 changed files with 732 additions and 86 deletions.
Expand Up @@ -740,9 +740,9 @@ public synchronized void stop()
private void awaitAllClosingTransactions() private void awaitAllClosingTransactions()
{ {
KernelTransactions kernelTransactions = kernelModule.kernelTransactions(); KernelTransactions kernelTransactions = kernelModule.kernelTransactions();
kernelTransactions.terminateAllTransactions(); kernelTransactions.terminateTransactions();


while ( kernelTransactions.haveCommittingTransaction() ) while ( kernelTransactions.haveClosingTransaction() )
{ {
LockSupport.parkNanos( TimeUnit.MILLISECONDS.toNanos( 10 ) ); LockSupport.parkNanos( TimeUnit.MILLISECONDS.toNanos( 10 ) );
} }
Expand Down
Expand Up @@ -148,7 +148,7 @@ default void close() throws TransactionFailureException
SecurityContext securityContext(); SecurityContext securityContext();


/** /**
* @return {@code true} if {@link #markForTermination(Status)} has been invoked, otherwise {@code false}. * @return {@link Status} if {@link #markForTermination(Status)} has been invoked, otherwise empty optional.
*/ */
Optional<Status> getReasonIfTerminated(); Optional<Status> getReasonIfTerminated();


Expand Down
Expand Up @@ -55,7 +55,6 @@
* {@link StateHandlingStatementOperations}, which includes any changes that exist in the current transaction, and then * {@link StateHandlingStatementOperations}, which includes any changes that exist in the current transaction, and then
* finally {@link org.neo4j.storageengine.api.StoreReadLayer} will read the current committed state from * finally {@link org.neo4j.storageengine.api.StoreReadLayer} will read the current committed state from
* the stores or caches. * the stores or caches.
*
*/ */
public class Kernel extends LifecycleAdapter implements KernelAPI public class Kernel extends LifecycleAdapter implements KernelAPI
{ {
Expand Down
Expand Up @@ -75,8 +75,8 @@ public class KernelTransactionImplementation implements KernelTransaction, TxSta
{ {
/* /*
* IMPORTANT: * IMPORTANT:
* This class is pooled and re-used. If you add *any* transactionStatus to it, you *must* make sure that: * This class is pooled and re-used. If you add *any* state to it, you *must* make sure that:
* - the #initialize() method resets that transactionStatus for re-use * - the #initialize() method resets that state for re-use
* - the #release() method releases resources acquired in #initialize() or during the transaction's life time * - the #release() method releases resources acquired in #initialize() or during the transaction's life time
*/ */


Expand All @@ -101,7 +101,7 @@ public class KernelTransactionImplementation implements KernelTransaction, TxSta
private final StoreReadLayer storeLayer; private final StoreReadLayer storeLayer;
private final Clock clock; private final Clock clock;


// TransactionStatus that needs to be reset between uses. Most of these should be cleared or released in #release(), // State that needs to be reset between uses. Most of these should be cleared or released in #release(),
// whereas others, such as timestamp or txId when transaction starts, even locks, needs to be set in #initialize(). // whereas others, such as timestamp or txId when transaction starts, even locks, needs to be set in #initialize().
private TransactionState txState; private TransactionState txState;
private LegacyIndexTransactionState legacyIndexTransactionState; private LegacyIndexTransactionState legacyIndexTransactionState;
Expand All @@ -116,7 +116,6 @@ public class KernelTransactionImplementation implements KernelTransaction, TxSta
private boolean beforeHookInvoked; private boolean beforeHookInvoked;


private TransactionStatus transactionStatus = new TransactionStatus(); private TransactionStatus transactionStatus = new TransactionStatus();
//:TODO do we need it to be volatile?
private volatile boolean failure; private volatile boolean failure;
private boolean success; private boolean success;
private long startTimeMillis; private long startTimeMillis;
Expand Down Expand Up @@ -232,18 +231,18 @@ public Optional<Status> getReasonIfTerminated()
return transactionStatus.getTerminationReason(); return transactionStatus.getTerminationReason();
} }


boolean markForTermination( long expectedReuseCount, org.neo4j.kernel.api.exceptions.Status reason ) boolean markForTermination( long expectedReuseCount, Status reason )
{ {
return expectedReuseCount == reuseCount && markForTerminationIfPossible( reason ); return expectedReuseCount == reuseCount && markForTerminationIfPossible( reason );
} }


@Override @Override
public void markForTermination( org.neo4j.kernel.api.exceptions.Status reason ) public void markForTermination( Status reason )
{ {
markForTerminationIfPossible( reason ); markForTerminationIfPossible( reason );
} }


private boolean markForTerminationIfPossible( org.neo4j.kernel.api.exceptions.Status reason ) private boolean markForTerminationIfPossible( Status reason )
{ {
if ( transactionStatus.terminate( reason ) ) if ( transactionStatus.terminate( reason ) )
{ {
Expand Down Expand Up @@ -391,16 +390,20 @@ public boolean isClosed()
return transactionStatus.isClosed(); return transactionStatus.isClosed();
} }


public boolean isCommitting() public boolean isShutdown()
{ {
return transactionStatus.isCommitting(); return transactionStatus.isShutdown();
}

public boolean isClosing()
{
return transactionStatus.isClosing();
} }


@Override @Override
public long closeTransaction() throws TransactionFailureException public long closeTransaction() throws TransactionFailureException
{ {
assertTransactionOpen(); markTransactionAsClosing();
markTransactionAsPreparingCommit();
try try
{ {
closeCurrentStatementIfAny(); closeCurrentStatementIfAny();
Expand All @@ -419,7 +422,6 @@ public long closeTransaction() throws TransactionFailureException
{ {
try try
{ {
transactionStatus.close();
transactionEvent.setSuccess( success ); transactionEvent.setSuccess( success );
transactionEvent.setFailure( failure ); transactionEvent.setFailure( failure );
transactionEvent.setTransactionType( writeState.name() ); transactionEvent.setTransactionType( writeState.name() );
Expand All @@ -433,11 +435,21 @@ public long closeTransaction() throws TransactionFailureException
} }
} }


private void markTransactionAsPreparingCommit() private void markTransactionAsClosing() throws TransactionFailureException
{ {
if ( !transactionStatus.prepare() ) if ( !transactionStatus.closing() )
{ {
throw new IllegalStateException( "This transaction is already closing." ); assertTransactionOpen();
if ( transactionStatus.isShutdown() )
{
throw new TransactionFailureException( Status.Transaction.TransactionTerminated,
"Transaction terminated since database is shutting down." );
}
else
{
throw new TransactionFailureException( Status.Transaction.TransactionTerminated,
"Transaction is already closing. Repeated execution of transactions are not allowed." );
}
} }
} }


Expand All @@ -460,7 +472,6 @@ private void failOnNonExplicitRollbackIfNeeded() throws TransactionFailureExcept
if ( success ) if ( success )
{ {
throw getReasonIfTerminated().map( TransactionTerminatedException::new ) throw getReasonIfTerminated().map( TransactionTerminatedException::new )
//TODO: provide test that proove that we do not create them each time
// Success was called, but also failure which means that the client code using this // Success was called, but also failure which means that the client code using this
// transaction passed through a happy path, but the transaction was still marked as // transaction passed through a happy path, but the transaction was still marked as
// failed for one or more reasons. Tell the user that although it looked happy it // failed for one or more reasons. Tell the user that although it looked happy it
Expand Down Expand Up @@ -516,7 +527,7 @@ private long commit() throws TransactionFailureException
legacyIndexTransactionState.extractCommands( extractedCommands ); legacyIndexTransactionState.extractCommands( extractedCommands );
} }


/* Here's the deal: we track a quick-to-access hasChanges in transaction transactionStatus which is true /* Here's the deal: we track a quick-to-access hasChanges in transaction state which is true
* if there are any changes imposed by this transaction. Some changes made inside a transaction undo * if there are any changes imposed by this transaction. Some changes made inside a transaction undo
* previously made changes in that same transaction, and so at some point a transaction may have * previously made changes in that same transaction, and so at some point a transaction may have
* changes and at another point, after more changes seemingly, * changes and at another point, after more changes seemingly,
Expand All @@ -538,18 +549,10 @@ private long commit() throws TransactionFailureException
startTimeMillis, lastTransactionIdWhenStarted, timeCommitted, startTimeMillis, lastTransactionIdWhenStarted, timeCommitted,
commitLocks.getLockSessionId() ); commitLocks.getLockSessionId() );


// Commit the transaction if not terminated success = true;
if ( transactionStatus.commit() ) TransactionToApply batch = new TransactionToApply( transactionRepresentation );
{ txId = transactionId = commitProcess.commit( batch, commitEvent, INTERNAL );
success = true; commitTime = timeCommitted;
TransactionToApply batch = new TransactionToApply( transactionRepresentation );
txId = transactionId = commitProcess.commit( batch, commitEvent, INTERNAL );
commitTime = timeCommitted;
}
else
{
throw new TransactionTerminatedException( Status.Transaction.TransactionTerminated );
}
} }
} }
success = true; success = true;
Expand Down Expand Up @@ -659,7 +662,7 @@ private void release()
{ {
statementLocks.close(); statementLocks.close();
statementLocks = null; statementLocks = null;
transactionStatus.reset(); transactionStatus.close();
type = null; type = null;
securityContext = null; securityContext = null;
transactionEvent = null; transactionEvent = null;
Expand Down Expand Up @@ -739,9 +742,24 @@ public String toString()


public void dispose() public void dispose()
{ {
markAsShutdown();
storageStatement.close(); storageStatement.close();
} }


void markAsShutdown()
{
if ( transactionStatus.shutdown() )
{
// since transaction is marked as closed now and any new calls to close transaction
// are no longer possible we can release the locks now immediately
StatementLocks localLocks = this.statementLocks;
if ( localLocks != null )
{
localLocks.close();
}
}
}

/** /**
* It is not allowed for the same transaction to perform database writes as well as schema writes. * It is not allowed for the same transaction to perform database writes as well as schema writes.
* This enum tracks the current write transactionStatus of the transaction, allowing it to transition from * This enum tracks the current write transactionStatus of the transaction, allowing it to transition from
Expand Down Expand Up @@ -783,24 +801,23 @@ TransactionWriteState upgradeToSchemaWrites() throws InvalidTransactionTypeKerne
} }
} }


private static class TransactionStatus static class TransactionStatus
{ {
private static final int OPEN = 0; private static final int OPEN = 0;
private static final int PREPARE = 1; private static final int CLOSING = 1;
private static final int COMMIT = 2; private static final int CLOSED = 2;
private static final int CLOSED = 4; private static final int SHUTDOWN = 3;

private static final int STATE_BITS_MASK = 0x3;
private static final int STATE_BITS_MASK = 0xF; private static final int NON_STATE_BITS_MASK = 0xFFFF_FFFC;
private static final int NON_STATE_BITS_MASK = 0xFFFF_FFF0; private static final int TERMINATED = 1 << 3;
private static final int TERMINATED = 1 << 5;

private AtomicInteger status = new AtomicInteger( CLOSED );
private AtomicInteger status = new AtomicInteger( OPEN );
private volatile Status terminationReason; private volatile Status terminationReason;


public void init() public void init()
{ {
reset();
status.set( OPEN ); status.set( OPEN );
reset();
} }


public void reset() public void reset()
Expand All @@ -818,9 +835,9 @@ public boolean isClosed()
return is( CLOSED ); return is( CLOSED );
} }


public boolean isCommitting() public boolean isClosing()
{ {
return is( COMMIT ); return is( CLOSING );
} }


public boolean isTerminated() public boolean isTerminated()
Expand All @@ -830,15 +847,11 @@ public boolean isTerminated()


public boolean terminate( Status reason ) public boolean terminate( Status reason )
{ {
if ( isTerminated() )
{
return false;
}
int currentStatus; int currentStatus;
do do
{ {
currentStatus = status.get(); currentStatus = status.get();
if ( (currentStatus != OPEN) && (currentStatus != PREPARE) ) if ( (currentStatus != OPEN) && (currentStatus != CLOSING) )
{ {
return false; return false;
} }
Expand All @@ -848,7 +861,7 @@ public boolean terminate( Status reason )
return true; return true;
} }


public boolean prepare() public boolean closing()
{ {
int currentStatus; int currentStatus;
do do
Expand All @@ -859,18 +872,29 @@ public boolean prepare()
return false; return false;
} }
} }
while ( !status.compareAndSet( currentStatus, (currentStatus & NON_STATE_BITS_MASK) | PREPARE ) ); while ( !status.compareAndSet( currentStatus, (currentStatus & NON_STATE_BITS_MASK) | CLOSING ) );
return true; return true;
} }


public void close() boolean shutdown()
{ {
status.set( CLOSED ); int currentStatus;
do
{
currentStatus = status.get();
if ( (currentStatus & STATE_BITS_MASK) != OPEN )
{
return false;
}
}
while ( !status.compareAndSet( currentStatus, (currentStatus & NON_STATE_BITS_MASK) | SHUTDOWN ) );
return true;
} }


public boolean commit() public void close()
{ {
return status.compareAndSet( PREPARE, COMMIT ); status.set( CLOSED );
reset();
} }


Optional<Status> getTerminationReason() Optional<Status> getTerminationReason()
Expand All @@ -880,7 +904,17 @@ Optional<Status> getTerminationReason()


private boolean is( int statusCode ) private boolean is( int statusCode )
{ {
return (status.get() & STATE_BITS_MASK) == statusCode; return is( status.get(), statusCode );
}

private boolean is( int currentStatus, int statusCode )
{
return (currentStatus & STATE_BITS_MASK) == statusCode;
}

public boolean isShutdown()
{
return is( SHUTDOWN );
} }
} }
} }

0 comments on commit b6728f7

Please sign in to comment.