diff --git a/community/kernel/src/main/java/org/neo4j/kernel/NeoStoreDataSource.java b/community/kernel/src/main/java/org/neo4j/kernel/NeoStoreDataSource.java index d2e7b79630d73..0762a35364744 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/NeoStoreDataSource.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/NeoStoreDataSource.java @@ -740,9 +740,9 @@ public synchronized void stop() private void awaitAllClosingTransactions() { KernelTransactions kernelTransactions = kernelModule.kernelTransactions(); - kernelTransactions.terminateAllTransactions(); + kernelTransactions.terminateTransactions(); - while ( kernelTransactions.haveCommittingTransaction() ) + while ( kernelTransactions.haveClosingTransaction() ) { LockSupport.parkNanos( TimeUnit.MILLISECONDS.toNanos( 10 ) ); } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/api/KernelTransaction.java b/community/kernel/src/main/java/org/neo4j/kernel/api/KernelTransaction.java index e29541ffc2e1e..4008e92c03aea 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/api/KernelTransaction.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/api/KernelTransaction.java @@ -148,7 +148,7 @@ default void close() throws TransactionFailureException SecurityContext securityContext(); /** - * @return {@code true} if {@link #markForTermination(Status)} has been invoked, otherwise {@code false}. + * @return {@link Status} if {@link #markForTermination(Status)} has been invoked, otherwise empty optional. */ Optional getReasonIfTerminated(); diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/Kernel.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/Kernel.java index d311a99981275..d79c42430878c 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/Kernel.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/Kernel.java @@ -55,7 +55,6 @@ * {@link StateHandlingStatementOperations}, which includes any changes that exist in the current transaction, and then * finally {@link org.neo4j.storageengine.api.StoreReadLayer} will read the current committed state from * the stores or caches. - * */ public class Kernel extends LifecycleAdapter implements KernelAPI { diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementation.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementation.java index 51facbf130341..7ba0fd36d2e86 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementation.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementation.java @@ -75,8 +75,8 @@ public class KernelTransactionImplementation implements KernelTransaction, TxSta { /* * IMPORTANT: - * This class is pooled and re-used. If you add *any* transactionStatus to it, you *must* make sure that: - * - the #initialize() method resets that transactionStatus for re-use + * This class is pooled and re-used. If you add *any* state to it, you *must* make sure that: + * - the #initialize() method resets that state for re-use * - the #release() method releases resources acquired in #initialize() or during the transaction's life time */ @@ -101,7 +101,7 @@ public class KernelTransactionImplementation implements KernelTransaction, TxSta private final StoreReadLayer storeLayer; private final Clock clock; - // TransactionStatus that needs to be reset between uses. Most of these should be cleared or released in #release(), + // State that needs to be reset between uses. Most of these should be cleared or released in #release(), // whereas others, such as timestamp or txId when transaction starts, even locks, needs to be set in #initialize(). private TransactionState txState; private LegacyIndexTransactionState legacyIndexTransactionState; @@ -116,7 +116,6 @@ public class KernelTransactionImplementation implements KernelTransaction, TxSta private boolean beforeHookInvoked; private TransactionStatus transactionStatus = new TransactionStatus(); - //:TODO do we need it to be volatile? private volatile boolean failure; private boolean success; private long startTimeMillis; @@ -232,18 +231,18 @@ public Optional getReasonIfTerminated() return transactionStatus.getTerminationReason(); } - boolean markForTermination( long expectedReuseCount, org.neo4j.kernel.api.exceptions.Status reason ) + boolean markForTermination( long expectedReuseCount, Status reason ) { return expectedReuseCount == reuseCount && markForTerminationIfPossible( reason ); } @Override - public void markForTermination( org.neo4j.kernel.api.exceptions.Status reason ) + public void markForTermination( Status reason ) { markForTerminationIfPossible( reason ); } - private boolean markForTerminationIfPossible( org.neo4j.kernel.api.exceptions.Status reason ) + private boolean markForTerminationIfPossible( Status reason ) { if ( transactionStatus.terminate( reason ) ) { @@ -391,16 +390,20 @@ public boolean isClosed() return transactionStatus.isClosed(); } - public boolean isCommitting() + public boolean isShutdown() { - return transactionStatus.isCommitting(); + return transactionStatus.isShutdown(); + } + + public boolean isClosing() + { + return transactionStatus.isClosing(); } @Override public long closeTransaction() throws TransactionFailureException { - assertTransactionOpen(); - markTransactionAsPreparingCommit(); + markTransactionAsClosing(); try { closeCurrentStatementIfAny(); @@ -419,7 +422,6 @@ public long closeTransaction() throws TransactionFailureException { try { - transactionStatus.close(); transactionEvent.setSuccess( success ); transactionEvent.setFailure( failure ); transactionEvent.setTransactionType( writeState.name() ); @@ -433,11 +435,21 @@ public long closeTransaction() throws TransactionFailureException } } - private void markTransactionAsPreparingCommit() + private void markTransactionAsClosing() throws TransactionFailureException { - if ( !transactionStatus.prepare() ) + if ( !transactionStatus.closing() ) { - throw new IllegalStateException( "This transaction is already closing." ); + assertTransactionOpen(); + if ( transactionStatus.isShutdown() ) + { + throw new TransactionFailureException( Status.Transaction.TransactionTerminated, + "Transaction terminated since database is shutting down." ); + } + else + { + throw new TransactionFailureException( Status.Transaction.TransactionTerminated, + "Transaction is already closing. Repeated execution of transactions are not allowed." ); + } } } @@ -460,7 +472,6 @@ private void failOnNonExplicitRollbackIfNeeded() throws TransactionFailureExcept if ( success ) { throw getReasonIfTerminated().map( TransactionTerminatedException::new ) - //TODO: provide test that proove that we do not create them each time // Success was called, but also failure which means that the client code using this // transaction passed through a happy path, but the transaction was still marked as // failed for one or more reasons. Tell the user that although it looked happy it @@ -516,7 +527,7 @@ private long commit() throws TransactionFailureException legacyIndexTransactionState.extractCommands( extractedCommands ); } - /* Here's the deal: we track a quick-to-access hasChanges in transaction transactionStatus which is true + /* Here's the deal: we track a quick-to-access hasChanges in transaction state which is true * if there are any changes imposed by this transaction. Some changes made inside a transaction undo * previously made changes in that same transaction, and so at some point a transaction may have * changes and at another point, after more changes seemingly, @@ -538,18 +549,10 @@ private long commit() throws TransactionFailureException startTimeMillis, lastTransactionIdWhenStarted, timeCommitted, commitLocks.getLockSessionId() ); - // Commit the transaction if not terminated - if ( transactionStatus.commit() ) - { - success = true; - TransactionToApply batch = new TransactionToApply( transactionRepresentation ); - txId = transactionId = commitProcess.commit( batch, commitEvent, INTERNAL ); - commitTime = timeCommitted; - } - else - { - throw new TransactionTerminatedException( Status.Transaction.TransactionTerminated ); - } + success = true; + TransactionToApply batch = new TransactionToApply( transactionRepresentation ); + txId = transactionId = commitProcess.commit( batch, commitEvent, INTERNAL ); + commitTime = timeCommitted; } } success = true; @@ -659,7 +662,7 @@ private void release() { statementLocks.close(); statementLocks = null; - transactionStatus.reset(); + transactionStatus.close(); type = null; securityContext = null; transactionEvent = null; @@ -739,9 +742,24 @@ public String toString() public void dispose() { + markAsShutdown(); storageStatement.close(); } + void markAsShutdown() + { + if ( transactionStatus.shutdown() ) + { + // since transaction is marked as closed now and any new calls to close transaction + // are no longer possible we can release the locks now immediately + StatementLocks localLocks = this.statementLocks; + if ( localLocks != null ) + { + localLocks.close(); + } + } + } + /** * It is not allowed for the same transaction to perform database writes as well as schema writes. * This enum tracks the current write transactionStatus of the transaction, allowing it to transition from @@ -783,24 +801,23 @@ TransactionWriteState upgradeToSchemaWrites() throws InvalidTransactionTypeKerne } } - private static class TransactionStatus + static class TransactionStatus { private static final int OPEN = 0; - private static final int PREPARE = 1; - private static final int COMMIT = 2; - private static final int CLOSED = 4; - - private static final int STATE_BITS_MASK = 0xF; - private static final int NON_STATE_BITS_MASK = 0xFFFF_FFF0; - private static final int TERMINATED = 1 << 5; - - private AtomicInteger status = new AtomicInteger( OPEN ); + private static final int CLOSING = 1; + private static final int CLOSED = 2; + private static final int SHUTDOWN = 3; + private static final int STATE_BITS_MASK = 0x3; + private static final int NON_STATE_BITS_MASK = 0xFFFF_FFFC; + private static final int TERMINATED = 1 << 3; + + private AtomicInteger status = new AtomicInteger( CLOSED ); private volatile Status terminationReason; public void init() { - reset(); status.set( OPEN ); + reset(); } public void reset() @@ -818,9 +835,9 @@ public boolean isClosed() return is( CLOSED ); } - public boolean isCommitting() + public boolean isClosing() { - return is( COMMIT ); + return is( CLOSING ); } public boolean isTerminated() @@ -830,15 +847,11 @@ public boolean isTerminated() public boolean terminate( Status reason ) { - if ( isTerminated() ) - { - return false; - } int currentStatus; do { currentStatus = status.get(); - if ( (currentStatus != OPEN) && (currentStatus != PREPARE) ) + if ( (currentStatus != OPEN) && (currentStatus != CLOSING) ) { return false; } @@ -848,7 +861,7 @@ public boolean terminate( Status reason ) return true; } - public boolean prepare() + public boolean closing() { int currentStatus; do @@ -859,18 +872,29 @@ public boolean prepare() return false; } } - while ( !status.compareAndSet( currentStatus, (currentStatus & NON_STATE_BITS_MASK) | PREPARE ) ); + while ( !status.compareAndSet( currentStatus, (currentStatus & NON_STATE_BITS_MASK) | CLOSING ) ); return true; } - public void close() + boolean shutdown() { - status.set( CLOSED ); + int currentStatus; + do + { + currentStatus = status.get(); + if ( (currentStatus & STATE_BITS_MASK) != OPEN ) + { + return false; + } + } + while ( !status.compareAndSet( currentStatus, (currentStatus & NON_STATE_BITS_MASK) | SHUTDOWN ) ); + return true; } - public boolean commit() + public void close() { - return status.compareAndSet( PREPARE, COMMIT ); + status.set( CLOSED ); + reset(); } Optional getTerminationReason() @@ -880,7 +904,17 @@ Optional getTerminationReason() private boolean is( int statusCode ) { - return (status.get() & STATE_BITS_MASK) == statusCode; + return is( status.get(), statusCode ); + } + + private boolean is( int currentStatus, int statusCode ) + { + return (currentStatus & STATE_BITS_MASK) == statusCode; + } + + public boolean isShutdown() + { + return is( SHUTDOWN ); } } } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactions.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactions.java index b50e7303dcd0b..41a7e39ede8b6 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactions.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactions.java @@ -180,25 +180,33 @@ public Set activeTransactions() */ public void disposeAll() { - terminateAllTransactions(); + terminateTransactions(); localTxPool.disposeAll(); globalTxPool.disposeAll(); } - public void terminateAllTransactions() + public void terminateTransactions() { - for ( KernelTransactionImplementation tx : allTransactions ) - { - // we mark all transactions for termination since we want to make sure these transactions - // won't be reused, ever. Each transaction has, among other things, a Locks.Client and we - // certainly want to keep that from being reused from this point. - tx.markForTermination( Status.General.DatabaseUnavailable ); - } + markAllTransactionsAsTerminated(); + markAllTransactionsAsClosed(); + } + + private void markAllTransactionsAsTerminated() + { + // we mark all transactions for termination since we want to make sure these transactions + // won't be reused, ever. Each transaction has, among other things, a Locks.Client and we + // certainly want to keep that from being reused from this point. + allTransactions.forEach( tx -> tx.markForTermination( Status.General.DatabaseUnavailable ) ); + } + + private void markAllTransactionsAsClosed() + { + allTransactions.forEach( KernelTransactionImplementation::markAsShutdown ); } - public boolean haveCommittingTransaction() + public boolean haveClosingTransaction() { - return allTransactions.stream().anyMatch( KernelTransactionImplementation::isCommitting ); + return allTransactions.stream().anyMatch( KernelTransactionImplementation::isClosing ); } @Override @@ -264,6 +272,17 @@ KernelTransactionHandle createHandle( KernelTransactionImplementation tx ) return new KernelTransactionImplementationHandle( tx ); } + /** + * Get all transactions + * *

+ * Note: this method is package-private for testing only. + * @return set of all kernel transaction + */ + Set getAllTransactions() + { + return allTransactions; + } + private void assertDatabaseIsRunning() { if ( availabilityGuard.isShutdown() ) diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelStatementTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelStatementTest.java index e1b5f8f7c32c3..46a77d40a2681 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelStatementTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelStatementTest.java @@ -23,8 +23,11 @@ import java.util.Optional; +import org.neo4j.graphdb.NotInTransactionException; import org.neo4j.graphdb.TransactionTerminatedException; import org.neo4j.kernel.api.exceptions.Status; +import org.neo4j.kernel.api.txstate.TxStateHolder; +import org.neo4j.kernel.impl.factory.AccessCapability; import org.neo4j.kernel.impl.factory.CanWrite; import org.neo4j.kernel.impl.proc.Procedures; import org.neo4j.storageengine.api.StorageStatement; @@ -64,4 +67,18 @@ public void shouldReleaseStorageStatementWhenForceClosed() throws Exception // then verify( storeStatement ).release(); } + + @Test(expected = NotInTransactionException.class) + public void assertStatementIsNotOpenWhileAcquireIsNotInvoked() + { + KernelTransactionImplementation transaction = mock( KernelTransactionImplementation.class ); + TxStateHolder txStateHolder = mock( TxStateHolder.class ); + StorageStatement storeStatement = mock( StorageStatement.class ); + AccessCapability accessCapability = mock( AccessCapability.class ); + Procedures procedures = mock( Procedures.class ); + KernelStatement statement = new KernelStatement( transaction, txStateHolder, + storeStatement, procedures, accessCapability ); + + statement.assertOpen(); + } } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionImplementationTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionImplementationTest.java index 6c9c90df669d3..752681c5180fe 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionImplementationTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionImplementationTest.java @@ -19,7 +19,9 @@ */ package org.neo4j.kernel.impl.api; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -33,6 +35,7 @@ import org.neo4j.graphdb.TransactionTerminatedException; import org.neo4j.kernel.api.KernelTransaction; +import org.neo4j.kernel.api.TransactionHook; import org.neo4j.kernel.api.exceptions.Status; import org.neo4j.kernel.api.exceptions.TransactionFailureException; import org.neo4j.kernel.api.security.AnonymousContext; @@ -45,8 +48,11 @@ import org.neo4j.kernel.impl.transaction.command.Command; import org.neo4j.storageengine.api.StorageCommand; import org.neo4j.storageengine.api.StorageStatement; +import org.neo4j.storageengine.api.StoreReadLayer; import org.neo4j.storageengine.api.lock.ResourceLocker; +import org.neo4j.storageengine.api.txstate.ReadableTransactionState; import org.neo4j.test.DoubleLatch; +import org.neo4j.test.mockito.matcher.RootCauseMatcher; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.hamcrest.Matchers.equalTo; @@ -75,6 +81,9 @@ @RunWith( Parameterized.class ) public class KernelTransactionImplementationTest extends KernelTransactionTestBase { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + @Parameterized.Parameter() public Consumer transactionInitializer; @@ -139,11 +148,6 @@ public void shouldCommitSuccessfulTransaction() throws Exception verifyExtraInteractionWithTheMonitor( transactionMonitor, isWriteTx ); } - private SecurityContext securityContext() - { - return isWriteTx ? AnonymousContext.write() : AnonymousContext.read(); - } - @Test public void shouldRollbackUnsuccessfulTransaction() throws Exception { @@ -470,13 +474,12 @@ public void shouldIncrementReuseCounterOnReuse() throws Exception } @Test - public void markForTerminationNotInitializedTransaction() + public void markForTerminationNotInitializedTransactionDoNotHaveAnyEffect() { KernelTransactionImplementation tx = newNotInitializedTransaction(); - tx.markForTermination( Status.General.UnknownError ); - assertEquals( Status.General.UnknownError, tx.getReasonIfTerminated().get() ); + assertFalse( tx.getReasonIfTerminated().isPresent() ); } @Test @@ -711,6 +714,91 @@ public void markForTerminationWithIncorrectReuseCount() throws Exception verify( locksClient, never() ).stop(); } + @Test + public void transactionIsOpenAfterInitialization() + { + KernelTransactionImplementation transaction = newNotInitializedTransaction(); + assertFalse( transaction.isOpen() ); + + transaction.initialize( 100, 100, new SimpleStatementLocks( new NoOpClient() ), + KernelTransaction.Type.explicit, securityContext(), 100 ); + assertTrue( transaction.isOpen() ); + } + + @Test + public void transactionIsClosedAfterCommit() throws TransactionFailureException + { + KernelTransactionImplementation tx = newTransaction( 1000 ); + assertTrue( tx.isOpen() ); + + tx.success(); + tx.close(); + + assertTrue( tx.isClosed() ); + } + + @Test + public void transactionIsClosedAfterRollback() throws TransactionFailureException + { + KernelTransactionImplementation tx = newTransaction( 1000 ); + assertTrue( tx.isOpen() ); + + tx.failure(); + tx.close(); + + assertTrue( tx.isClosed() ); + } + + @Test + public void closeClosedTransactionIsNotAllowed() throws TransactionFailureException + { + KernelTransactionImplementation transaction = newTransaction( 1000 ); + transaction.close(); + + expectedException.expect( IllegalStateException.class ); + expectedException.expectMessage( "This transaction has already been completed." ); + transaction.close(); + } + + @Test + public void closeShutdownTransactionIsNotAllowed() throws TransactionFailureException + { + KernelTransactionImplementation transaction = newTransaction( 1000 ); + transaction.markAsShutdown(); + + expectedException.expect( TransactionFailureException.class ); + expectedException.expectMessage( "Transaction terminated since database is shutting down." ); + transaction.close(); + } + + @Test + public void closeLocksOnTransactionShutdown() + { + Locks.Client locksClient = mock( Locks.Client.class ); + KernelTransactionImplementation transaction = newTransaction( securityContext(), locksClient ); + transaction.markAsShutdown(); + + verify( locksClient ).close(); + } + + @Test + public void closeClosingTransactionNotAllowed() throws TransactionFailureException + { + Locks.Client locksClient = mock( Locks.Client.class ); + KernelTransactionImplementation transaction = newTransaction( securityContext(), locksClient ); + transaction.txState().nodeDoCreate( 42L ); + hooks.register( new ClosingTransactionHook() ); + transaction.success(); + + expectedException.expect( new RootCauseMatcher<>( TransactionFailureException.class, "Transaction is already closing. Repeated execution of transactions are not allowed." ) ); + transaction.closeTransaction(); + } + + private SecurityContext securityContext() + { + return isWriteTx ? AnonymousContext.write() : AnonymousContext.read(); + } + private void initializeAndClose( KernelTransactionImplementation tx, int times ) throws Exception { for ( int i = 0; i < times; i++ ) @@ -720,4 +808,34 @@ private void initializeAndClose( KernelTransactionImplementation tx, int times ) tx.close(); } } + + private static class ClosingTransactionHook implements TransactionHook + { + @Override + public Outcome beforeCommit( ReadableTransactionState state, KernelTransaction transaction, + StoreReadLayer storeReadLayer, StorageStatement statement ) + { + try + { + transaction.closeTransaction(); + } + catch ( TransactionFailureException e ) + { + throw new RuntimeException( e ); + } + return null; + } + + @Override + public void afterCommit( ReadableTransactionState state, KernelTransaction transaction, Outcome outcome ) + { + + } + + @Override + public void afterRollback( ReadableTransactionState state, KernelTransaction transaction, Outcome outcome ) + { + + } + } } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionStatusTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionStatusTest.java new file mode 100644 index 0000000000000..58d91e1df5274 --- /dev/null +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionStatusTest.java @@ -0,0 +1,154 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.impl.api; + +import org.junit.Test; + +import org.neo4j.kernel.api.exceptions.Status; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.neo4j.kernel.impl.api.KernelTransactionImplementation.TransactionStatus; + +public class KernelTransactionStatusTest +{ + + @Test + public void openStatusAfterInitialization() throws Exception + { + TransactionStatus status = getOpenStatus(); + assertTrue( "Status should be open after initialization.", status.isOpen() ); + } + + @Test + public void closedStatusAfterCreation() + { + TransactionStatus status = createStatus(); + assertTrue( "Status should be closed just after creation", status.isClosed() ); + } + + @Test + public void terminateOpenTransaction() + { + TransactionStatus status = getOpenStatus(); + + assertTrue( status.terminate( Status.Transaction.Terminated ) ); + assertTrue( status.isTerminated() ); + assertEquals( Status.Transaction.Terminated, status.getTerminationReason().get() ); + } + + @Test + public void terminateClosingTransaction() + { + TransactionStatus status = getOpenStatus(); + + assertTrue( status.closing() ); + assertTrue( status.terminate( Status.Transaction.Terminated ) ); + assertTrue( status.isTerminated() ); + assertEquals( Status.Transaction.Terminated, status.getTerminationReason().get() ); + } + + @Test + public void closedTransactionIsNotTerminatable() + { + TransactionStatus status = getOpenStatus(); + + status.close(); + assertFalse( status.terminate( Status.Transaction.Terminated ) ); + assertFalse( status.getTerminationReason().isPresent() ); + } + + @Test + public void shutdownTransactionIsNotTerminatable() + { + TransactionStatus status = getOpenStatus(); + + assertTrue( status.shutdown() ); + assertFalse( status.terminate( Status.Transaction.Terminated ) ); + } + + @Test + public void closingTransactionNotPossibleWhenTransactionIsClosedAlready() + { + TransactionStatus status = getOpenStatus(); + + status.close(); + assertFalse( status.closing() ); + assertTrue( status.isClosed() ); + } + + @Test + public void closingTransactionInClosingState() + { + TransactionStatus closingStatus = getOpenStatus(); + closingStatus.closing(); + closingStatus.close(); + assertTrue( closingStatus.isClosed() ); + } + + @Test + public void closingTransactionCanBeTerminated() + { + TransactionStatus openStatus = getOpenStatus(); + assertTrue( openStatus.closing() ); + assertTrue( openStatus.terminate( Status.Transaction.Terminated ) ); + + assertTrue( openStatus.isTerminated() ); + assertTrue( openStatus.isClosing() ); + } + + @Test + public void transactionStatusLifeCycle() + { + TransactionStatus status = getOpenStatus(); + + assertTrue( status.closing() ); + assertTrue( status.isClosing() ); + + status.close(); + assertTrue( status.isClosed() ); + } + + @Test + public void closeResetTerminationReason() + { + TransactionStatus openStatus = getOpenStatus(); + + assertTrue( openStatus.closing() ); + assertTrue( openStatus.terminate( Status.Transaction.Terminated ) ); + assertTrue( openStatus.getTerminationReason().isPresent() ); + openStatus.close(); + + assertFalse( openStatus.getTerminationReason().isPresent() ); + } + + private TransactionStatus getOpenStatus() + { + TransactionStatus status = createStatus(); + status.init(); + return status; + } + + private TransactionStatus createStatus() + { + return new TransactionStatus(); + } +} diff --git a/community/kernel/src/test/java/org/neo4j/test/TestGraphDatabaseFactory.java b/community/kernel/src/test/java/org/neo4j/test/TestGraphDatabaseFactory.java index b61f42d6de720..b8bd55e01bd4d 100644 --- a/community/kernel/src/test/java/org/neo4j/test/TestGraphDatabaseFactory.java +++ b/community/kernel/src/test/java/org/neo4j/test/TestGraphDatabaseFactory.java @@ -93,6 +93,7 @@ protected void configure( GraphDatabaseBuilder builder ) { // Reduce the default page cache memory size to 8 mega-bytes for test databases. builder.setConfig( GraphDatabaseSettings.pagecache_memory, "8m" ); + builder.setConfig( GraphDatabaseSettings.shutdown_transaction_end_timeout, "1s" ); builder.setConfig( boltConnector("bolt").type, BOLT.name() ); builder.setConfig( boltConnector("bolt").enabled, "false" ); } diff --git a/community/kernel/src/test/java/org/neo4j/test/mockito/matcher/RootCauseMatcher.java b/community/kernel/src/test/java/org/neo4j/test/mockito/matcher/RootCauseMatcher.java index 9a8680e003e95..f4e26620f1c0e 100644 --- a/community/kernel/src/test/java/org/neo4j/test/mockito/matcher/RootCauseMatcher.java +++ b/community/kernel/src/test/java/org/neo4j/test/mockito/matcher/RootCauseMatcher.java @@ -39,7 +39,7 @@ public RootCauseMatcher( Class rootCause, String message ) protected boolean matchesSafely( T item ) { cause = ExceptionUtils.getRootCause( item ); - return rootCause.isInstance( cause ) && cause.getMessage().equals( message ); + return rootCause.isInstance( cause ) && cause.getMessage().startsWith( message ); } @Override @@ -56,6 +56,5 @@ public void describeTo( Description description ) { description.appendText( "actual exception was never thrown." ); } - } } diff --git a/community/neo4j/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionsIT.java b/community/neo4j/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionsIT.java new file mode 100644 index 0000000000000..afd6627546ffc --- /dev/null +++ b/community/neo4j/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionsIT.java @@ -0,0 +1,310 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.impl.api; + +import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.RuleChain; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.neo4j.graphdb.DependencyResolver; +import org.neo4j.graphdb.Transaction; +import org.neo4j.graphdb.TransactionTerminatedException; +import org.neo4j.graphdb.event.ErrorState; +import org.neo4j.graphdb.event.KernelEventHandler; +import org.neo4j.graphdb.event.TransactionData; +import org.neo4j.graphdb.event.TransactionEventHandler; +import org.neo4j.kernel.api.KernelTransaction; +import org.neo4j.kernel.api.KernelTransactionHandle; +import org.neo4j.kernel.api.exceptions.Status; +import org.neo4j.kernel.api.exceptions.TransactionFailureException; +import org.neo4j.kernel.api.security.AnonymousContext; +import org.neo4j.kernel.api.security.SecurityContext; +import org.neo4j.test.mockito.matcher.RootCauseMatcher; +import org.neo4j.test.rule.EmbeddedDatabaseRule; + +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.hasSize; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +public class KernelTransactionsIT +{ + + public ExpectedException exception = ExpectedException.none(); + public EmbeddedDatabaseRule database = new EmbeddedDatabaseRule(); + + @Rule + public RuleChain ruleChain = RuleChain.outerRule( database ).around( exception ); + + private ExecutorService executorService; + + @Before + public void setUp() + { + executorService = Executors.newCachedThreadPool(); + } + + @After + public void tearDown() + { + executorService.shutdown(); + } + + @Test + public void terminationOfRunningTransaction() throws Exception + { + KernelTransactions kernelTransactions = database.getDependencyResolver() + .resolveDependency( KernelTransactions.class ); + + CountDownLatch latch = new CountDownLatch( 1 ); + exception.expect( new RootCauseMatcher<>( TransactionTerminatedException.class, + "The transaction has been terminated." ) ); + try ( Transaction transaction = database.beginTx() ) + { + database.createNode(); + executorService.submit( () -> + { + Set transactions = kernelTransactions.activeTransactions(); + assertThat( "Contain one single active transaction", transactions, hasSize( 1 ) ); + transactions.forEach( tx -> tx.markForTermination( Status.Transaction.TransactionTerminated ) ); + latch.countDown(); + } ); + latch.await(); + transaction.success(); + } + + assertThat( "Transactions have been terminated", kernelTransactions.activeTransactions(), empty() ); + database.shutdown(); + } + + @Test + public void shutdownWhileRunningTransaction() + { + exception.expect( new RootCauseMatcher<>( TransactionFailureException.class, + "Transaction terminated since database is shutting down." ) ); + + try ( Transaction ignored = database.beginTx() ) + { + database.createNode(); + database.shutdown(); + } + } + + @Test + public void shutdownDatabaseWhileHaveActiveTransactionRunning() throws InterruptedException + { + exception.expect( new RootCauseMatcher<>( TransactionFailureException.class, + "Transaction terminated since database is shutting down." ) ); + + CountDownLatch latch = new CountDownLatch( 1 ); + try ( Transaction transaction = database.beginTx() ) + { + database.createNode(); + transaction.success(); + executorService.submit( () -> + { + database.shutdown(); + latch.countDown(); + } ); + latch.await(); + } + } + + @Test + public void shutdownWithHaveActiveTerminatedTransactionRunning() throws InterruptedException + { + exception.expect( new RootCauseMatcher<>( TransactionFailureException.class, + "Transaction terminated since database is shutting down." ) ); + + CountDownLatch latch = new CountDownLatch( 1 ); + try ( Transaction transaction = database.beginTx() ) + { + database.createNode(); + transaction.terminate(); + executorService.submit( () -> + { + database.shutdown(); + latch.countDown(); + } ); + latch.await(); + } + } + + @Test + public void terminateTransactionFromAnotherThread() throws InterruptedException + { + exception.expect( TransactionTerminatedException.class ); + exception.expectMessage( "The transaction has been terminated. Retry your operation in a new transaction, " + + "and you should see a successful result. Explicitly terminated by the user." ); + + CountDownLatch latch = new CountDownLatch( 1 ); + try ( Transaction transaction = database.beginTx() ) + { + database.createNode(); + executorService.submit( () -> + { + transaction.terminate(); + latch.countDown(); + } ); + latch.await(); + database.createNode(); + } + + database.shutdown(); + } + + @Test + public void terminateTransactionFromAnotherThreadByHandle() throws InterruptedException + { + exception.expect( TransactionTerminatedException.class ); + exception.expectMessage( "The transaction has been terminated. Retry your operation in a new transaction, " + + "and you should see a successful result. The request referred to a transaction that does not exist." ); + + CountDownLatch latch = new CountDownLatch( 1 ); + try ( Transaction ignored = database.beginTx() ) + { + database.createNode(); + executorService.submit( () -> + { + DependencyResolver dependencyResolver = database.getDependencyResolver(); + KernelTransactions transactions = dependencyResolver.resolveDependency( KernelTransactions.class ); + Set kernelTransactionHandles = transactions.activeTransactions(); + kernelTransactionHandles.forEach( tx -> tx.markForTermination( Status.Transaction.TransactionNotFound ) ); + latch.countDown(); + } ); + latch.await(); + database.createNode(); + } + + database.shutdown(); + } + + @Test + public void shutdownRunningTransactionsOnDispose() + { + KernelTransactions transactions = + database.getDependencyResolver().resolveDependency( KernelTransactions.class ); + try ( Transaction ignored = database.beginTx() ) + { + database.createNode(); + + database.shutdown(); + } + catch ( Exception ignored ) + { + // nothing + } + Set allTransactions = transactions.getAllTransactions(); + assertThat( "We should have one transaction that was open and how is shutdown.", + allTransactions, Matchers.hasSize( 1 ) ); + KernelTransactionImplementation shutdownTransaction = allTransactions.iterator().next(); + assertEquals( Status.General.DatabaseUnavailable, shutdownTransaction.getReasonIfTerminated().get() ); + assertTrue( "Transaction state should be shutdown.", shutdownTransaction.isShutdown() ); + } + + @Test + public void waitClosingTransactionOnShutdown() throws TransactionFailureException, InterruptedException + { + Kernel kernel = database.getDependencyResolver().resolveDependency( Kernel.class ); + KernelTransactions transactions = + database.getDependencyResolver().resolveDependency( KernelTransactions.class ); + + KernelTransactionImplementation kernelTransaction = (KernelTransactionImplementation) kernel + .newTransaction(KernelTransaction.Type.implicit, SecurityContext.AUTH_DISABLED, 10000L ); + + CountDownLatch shutdownLatch = new CountDownLatch( 1 ); + database.registerKernelEventHandler( new ShutdownEventHandler( kernelTransaction ) ); + database.registerTransactionEventHandler( new ShutdownTransactionEventHandler( shutdownLatch ) ); + + kernelTransaction.txState().nodeDoCreate( 2 ); + kernelTransaction.success(); + kernelTransaction.close(); + + shutdownLatch.await(); + + Set allTransactions = transactions.getAllTransactions(); + assertThat( "No transactions, everything should be closed during shutdown.", allTransactions, empty() ); + } + + private static class ShutdownEventHandler implements KernelEventHandler + { + private final KernelTransactionImplementation kernelTransaction; + + ShutdownEventHandler( KernelTransactionImplementation kernelTransaction ) + { + this.kernelTransaction = kernelTransaction; + } + + @Override + public void beforeShutdown() + { + assertTrue( "Transaction should be already closed.", kernelTransaction.isClosed() ); + } + + @Override + public void kernelPanic( ErrorState error ) + { + + } + + @Override + public Object getResource() + { + return null; + } + + @Override + public ExecutionOrder orderComparedTo( KernelEventHandler other ) + { + return null; + } + } + + private class ShutdownTransactionEventHandler extends TransactionEventHandler.Adapter + { + private final CountDownLatch shutdownLatch; + + ShutdownTransactionEventHandler( CountDownLatch shutdownLatch ) + { + this.shutdownLatch = shutdownLatch; + } + + @Override + public Object beforeCommit( TransactionData data ) throws Exception + { + executorService.submit(() -> { + database.shutdown(); + shutdownLatch.countDown(); + } ); + return null; + } + } +} diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/CoreReplicationIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/CoreReplicationIT.java index 0865a7774cb9e..b49b3651df168 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/CoreReplicationIT.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/CoreReplicationIT.java @@ -23,23 +23,18 @@ import org.junit.Rule; import org.junit.Test; -import java.util.Arrays; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.stream.Stream; -import org.neo4j.causalclustering.core.CausalClusteringSettings; import org.neo4j.causalclustering.core.CoreGraphDatabase; import org.neo4j.causalclustering.core.consensus.roles.Role; import org.neo4j.causalclustering.discovery.Cluster; import org.neo4j.causalclustering.discovery.CoreClusterMember; import org.neo4j.graphdb.Label; import org.neo4j.graphdb.Node; -import org.neo4j.graphdb.ResourceIterable; import org.neo4j.graphdb.Transaction; import org.neo4j.graphdb.security.WriteOperationsNotAllowedException; -import org.neo4j.helpers.collection.Iterables; import org.neo4j.test.causalclustering.ClusterRule; import static org.hamcrest.CoreMatchers.containsString; @@ -49,7 +44,6 @@ import static org.neo4j.causalclustering.discovery.Cluster.dataMatchesEventually; import static org.neo4j.function.Predicates.await; import static org.neo4j.graphdb.Label.label; -import static org.neo4j.helpers.collection.Iterables.asList; import static org.neo4j.helpers.collection.Iterables.count; public class CoreReplicationIT diff --git a/enterprise/ha/src/test/java/org/neo4j/kernel/impl/ha/ClusterManager.java b/enterprise/ha/src/test/java/org/neo4j/kernel/impl/ha/ClusterManager.java index 5be423373bc31..5b359bd0a899f 100644 --- a/enterprise/ha/src/test/java/org/neo4j/kernel/impl/ha/ClusterManager.java +++ b/enterprise/ha/src/test/java/org/neo4j/kernel/impl/ha/ClusterManager.java @@ -141,6 +141,7 @@ public enum NetworkFlag public static final long DEFAULT_TIMEOUT_SECONDS = 60L; public static final Map CONFIG_FOR_SINGLE_JVM_CLUSTER = unmodifiableMap( stringMap( GraphDatabaseSettings.pagecache_memory.name(), "8m", + GraphDatabaseSettings.shutdown_transaction_end_timeout.name(), "1s", boltConnector( "0" ).type.name(), "BOLT", boltConnector( "0" ).enabled.name(), "false" ) );