diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementation.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementation.java index 7cd0ee1c6815b..0eaf0456a0bb8 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementation.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementation.java @@ -26,8 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.function.Supplier; import java.util.stream.Stream; @@ -70,6 +69,7 @@ import static org.neo4j.storageengine.api.TransactionApplicationMode.INTERNAL; + /** * This class should replace the {@link org.neo4j.kernel.api.KernelTransaction} interface, and take its name, as soon * as @@ -118,11 +118,9 @@ public class KernelTransactionImplementation implements KernelTransaction, TxSta private SecurityContext securityContext; private volatile StatementLocks statementLocks; private boolean beforeHookInvoked; - private volatile boolean closing; - private volatile boolean closed; + private final TransactionStatus transactionStatus = new TransactionStatus(); private boolean failure; private boolean success; - private volatile Status terminationReason; private long startTimeMillis; private long timeoutMillis; private long lastTransactionIdWhenStarted; @@ -134,15 +132,6 @@ public class KernelTransactionImplementation implements KernelTransaction, TxSta private volatile int reuseCount; private volatile Map userMetaData; - /** - * Lock prevents transaction {@link #markForTermination(Status)} transaction termination} from interfering with - * {@link #close() transaction commit} and specifically with {@link #release()}. - * Termination can run concurrently with commit and we need to make sure that it terminates the right lock client - * and the right transaction (with the right {@link #reuseCount}) because {@link KernelTransactionImplementation} - * instances are pooled. - */ - private final Lock terminationReleaseLock = new ReentrantLock(); - public KernelTransactionImplementation( StatementOperationContainer operationContainer, SchemaWriteGuard schemaWriteGuard, TransactionHooks hooks, @@ -187,13 +176,10 @@ public KernelTransactionImplementation initialize( { this.type = type; this.statementLocks = statementLocks; - this.terminationReason = null; - this.closing = false; - this. closed = false; - this.beforeHookInvoked = false; this.failure = false; this.success = false; this.beforeHookInvoked = false; + this.transactionStatus.init(); this.writeState = TransactionWriteState.NONE; this.startTimeMillis = clock.millis(); this.timeoutMillis = transactionTimeout; @@ -247,48 +233,24 @@ public void failure() @Override public Optional getReasonIfTerminated() { - return Optional.ofNullable( terminationReason ); + return transactionStatus.getTerminationReason(); } boolean markForTermination( long expectedReuseCount, Status reason ) { - terminationReleaseLock.lock(); - try - { - return expectedReuseCount == reuseCount && markForTerminationIfPossible( reason ); - } - finally - { - terminationReleaseLock.unlock(); - } + return expectedReuseCount == reuseCount && markForTerminationIfPossible( reason ); } - /** - * {@inheritDoc} - *

- * This method is guarded by {@link #terminationReleaseLock} to coordinate concurrent - * {@link #close()} and {@link #release()} calls. - */ @Override public void markForTermination( Status reason ) { - terminationReleaseLock.lock(); - try - { - markForTerminationIfPossible( reason ); - } - finally - { - terminationReleaseLock.unlock(); - } + markForTerminationIfPossible( reason ); } private boolean markForTerminationIfPossible( Status reason ) { - if ( canBeTerminated() ) + if ( transactionStatus.terminate( reason ) ) { - failure = true; - terminationReason = reason; if ( statementLocks != null ) { statementLocks.stop(); @@ -302,7 +264,7 @@ private boolean markForTerminationIfPossible( Status reason ) @Override public boolean isOpen() { - return !closed && !closing; + return transactionStatus.isOpen(); } @Override @@ -391,8 +353,6 @@ public boolean hasTxStateWithChanges() private void markAsClosed( long txId ) { - assertTransactionOpen(); - closed = true; closeCurrentStatementIfAny(); for ( CloseListener closeListener : closeListeners ) { @@ -405,17 +365,9 @@ private void closeCurrentStatementIfAny() currentStatement.forceClose(); } - private void assertTransactionNotClosing() - { - if ( closing ) - { - throw new IllegalStateException( "This transaction is already being closed." ); - } - } - private void assertTransactionOpen() { - if ( closed ) + if ( isClosed() ) { throw new IllegalStateException( "This transaction has already been completed." ); } @@ -436,15 +388,28 @@ private boolean hasDataChanges() return hasTxStateWithChanges() && txState.hasDataChanges(); } + public boolean isClosed() + { + return transactionStatus.isClosed(); + } + + public boolean isShutdown() + { + return transactionStatus.isShutdown(); + } + + public boolean isClosing() + { + return transactionStatus.isClosing(); + } + @Override public long closeTransaction() throws TransactionFailureException { - assertTransactionOpen(); - assertTransactionNotClosing(); - closeCurrentStatementIfAny(); - closing = true; + markTransactionAsClosing(); try { + closeCurrentStatementIfAny(); if ( failure || !success || isTerminated() ) { rollback(); @@ -460,8 +425,6 @@ public long closeTransaction() throws TransactionFailureException { try { - closed = true; - closing = false; transactionEvent.setSuccess( success ); transactionEvent.setFailure( failure ); transactionEvent.setTransactionType( writeState.name() ); @@ -475,9 +438,22 @@ public long closeTransaction() throws TransactionFailureException } } - public boolean isClosing() + private void markTransactionAsClosing() throws TransactionFailureException { - return closing; + if ( !transactionStatus.closing() ) + { + assertTransactionOpen(); + if ( transactionStatus.isShutdown() ) + { + throw new TransactionFailureException( Status.Transaction.TransactionTerminated, + "Transaction terminated since marked as shut down." ); + } + else + { + throw new IllegalStateException( + "Transaction is already closing. Repeated execution of transactions are not allowed." ); + } + } } /** @@ -496,18 +472,15 @@ public boolean isClosing() */ private void failOnNonExplicitRollbackIfNeeded() throws TransactionFailureException { - if ( success && isTerminated() ) - { - throw new TransactionTerminatedException( terminationReason ); - } if ( success ) { - // Success was called, but also failure which means that the client code using this - // transaction passed through a happy path, but the transaction was still marked as - // failed for one or more reasons. Tell the user that although it looked happy it - // wasn't committed, but was instead rolled back. - throw new TransactionFailureException( Status.Transaction.TransactionMarkedAsFailed, - "Transaction rolled back even if marked as successful" ); + throw getReasonIfTerminated().map( TransactionTerminatedException::new ) + // Success was called, but also failure which means that the client code using this + // transaction passed through a happy path, but the transaction was still marked as + // failed for one or more reasons. Tell the user that although it looked happy it + // wasn't committed, but was instead rolled back. + .orElseThrow( () -> new TransactionFailureException( Status.Transaction.TransactionMarkedAsFailed, + "Transaction rolled back even if marked as successful" ) ); } } @@ -579,7 +552,6 @@ private long commit() throws TransactionFailureException startTimeMillis, lastTransactionIdWhenStarted, timeCommitted, commitLocks.getLockSessionId() ); - // Commit the transaction success = true; TransactionToApply batch = new TransactionToApply( transactionRepresentation ); txId = transactionId = commitProcess.commit( batch, commitEvent, INTERNAL ); @@ -688,47 +660,29 @@ private void afterRollback() /** * Release resources held up by this transaction & return it to the transaction pool. - * This method is guarded by {@link #terminationReleaseLock} to coordinate concurrent * {@link #markForTermination(Status)} calls. */ private void release() { - terminationReleaseLock.lock(); - try - { - statementLocks.close(); - statementLocks = null; - terminationReason = null; - type = null; - securityContext = null; - transactionEvent = null; - legacyIndexTransactionState = null; - txState = null; - hooksState = null; - currentTransactionOperations = null; - closeListeners.clear(); - reuseCount++; - userMetaData = Collections.emptyMap(); - pool.release( this ); - } - finally - { - terminationReleaseLock.unlock(); - } - } - - /** - * Transaction can be terminated only when it is not closed and not already terminated. - * Otherwise termination does not make sense. - */ - private boolean canBeTerminated() - { - return !closed && !isTerminated(); + statementLocks.close(); + statementLocks = null; + transactionStatus.close(); + type = null; + securityContext = null; + transactionEvent = null; + legacyIndexTransactionState = null; + txState = null; + hooksState = null; + currentTransactionOperations = null; + closeListeners.clear(); + reuseCount++; + userMetaData = Collections.emptyMap(); + pool.release( this ); } private boolean isTerminated() { - return terminationReason != null; + return transactionStatus.isTerminated(); } @Override @@ -792,9 +746,24 @@ public String toString() public void dispose() { + markAsShutdown(); storageStatement.close(); } + void markAsShutdown() + { + if ( transactionStatus.shutdown() ) + { + // since transaction is marked as closed now and any new calls to close transaction + // are no longer possible we can release the locks now immediately + StatementLocks localLocks = this.statementLocks; + if ( localLocks != null ) + { + localLocks.close(); + } + } + } + /** * This method will be invoked by concurrent threads for inspecting the locks held by this transaction. *

@@ -849,4 +818,119 @@ TransactionWriteState upgradeToSchemaWrites() throws InvalidTransactionTypeKerne return SCHEMA; } } + + static class TransactionStatus + { + private static final int OPEN = 0; + private static final int CLOSING = 1; + private static final int CLOSED = 2; + private static final int SHUTDOWN = 3; + private static final int STATE_BITS_MASK = 0x3; + private static final int NON_STATE_BITS_MASK = 0xFFFF_FFFC; + private static final int TERMINATED = 1 << 3; + + private static final AtomicIntegerFieldUpdater statusUpdater = + AtomicIntegerFieldUpdater.newUpdater( TransactionStatus.class, "status" ); + // updated by statusUpdater + private volatile int status = CLOSED; + private Status terminationReason; + + public void init() + { + statusUpdater.set( this, OPEN ); + reset(); + } + + public void reset() + { + terminationReason = null; + } + + public boolean isOpen() + { + return !isClosed(); + } + + public boolean isClosed() + { + return is( CLOSED ); + } + + public boolean isClosing() + { + return is( CLOSING ); + } + + public boolean isTerminated() + { + return (statusUpdater.get( this ) & TERMINATED) != 0; + } + + public boolean terminate( Status reason ) + { + int currentStatus; + do + { + currentStatus = statusUpdater.get( this ); + if ( (currentStatus != OPEN) && (currentStatus != CLOSING) ) + { + return false; + } + terminationReason = reason; + } + while ( !statusUpdater.compareAndSet( this, currentStatus, currentStatus | TERMINATED ) ); + return true; + } + + public boolean closing() + { + return setOpenTransactionStatus( CLOSING ); + } + + private boolean setOpenTransactionStatus( int newStatus ) + { + int currentStatus; + do + { + currentStatus = statusUpdater.get( this ); + if ( (currentStatus & STATE_BITS_MASK) != OPEN ) + { + return false; + } + } + while ( !statusUpdater.compareAndSet( this, currentStatus, (currentStatus & NON_STATE_BITS_MASK) | newStatus ) ); + return true; + } + + boolean shutdown() + { + return setOpenTransactionStatus( SHUTDOWN ); + } + + public void close() + { + reset(); + statusUpdater.set( this, CLOSED ); + } + + Optional getTerminationReason() + { + return Optional.ofNullable( terminationReason ); + } + + private boolean is( int statusCode ) + { + return is( statusUpdater.get( this ), statusCode ); + } + + private boolean is( int currentStatus, int statusCode ) + { + return (currentStatus & STATE_BITS_MASK) == statusCode; + } + + public boolean isShutdown() + { + return is( SHUTDOWN ); + } + } } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactions.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactions.java index 899715592208f..d33d5d8497f16 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactions.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactions.java @@ -39,7 +39,6 @@ import org.neo4j.kernel.impl.api.state.LegacyIndexTransactionStateImpl; import org.neo4j.kernel.impl.factory.AccessCapability; import org.neo4j.kernel.impl.index.IndexConfigStore; -import org.neo4j.kernel.impl.locking.LockTracer; import org.neo4j.kernel.impl.locking.StatementLocks; import org.neo4j.kernel.impl.locking.StatementLocksFactory; import org.neo4j.kernel.impl.proc.Procedures; @@ -182,6 +181,7 @@ public Set activeTransactions() public void disposeAll() { terminateTransactions(); + markAllTransactionsAsShutdown(); localTxPool.disposeAll(); globalTxPool.disposeAll(); } @@ -191,6 +191,11 @@ public void terminateTransactions() markAllTransactionsAsTerminated(); } + private void markAllTransactionsAsShutdown() + { + allTransactions.forEach( KernelTransactionImplementation::markAsShutdown ); + } + private void markAllTransactionsAsTerminated() { // we mark all transactions for termination since we want to make sure these transactions diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/core/ThreadToStatementContextBridge.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/core/ThreadToStatementContextBridge.java index 46a411c0317c6..5ccbe8e0bae85 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/core/ThreadToStatementContextBridge.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/core/ThreadToStatementContextBridge.java @@ -115,7 +115,7 @@ public KernelTransaction getKernelTransactionBoundToThisThread( boolean strict ) return getTopLevelTransactionBoundToThisThread( strict ); } - // Exeptions below extend the public API exceptions with versions that have status codes. + // Exceptions below extend the public API exceptions with versions that have status codes. private static class BridgeNotInTransactionException extends NotInTransactionException implements Status.HasStatus { @Override diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionImplementationTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionImplementationTest.java index 94e986e7ab202..fdacc57b644a4 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionImplementationTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionImplementationTest.java @@ -35,6 +35,7 @@ import org.neo4j.graphdb.TransactionTerminatedException; import org.neo4j.kernel.api.KernelTransaction; +import org.neo4j.kernel.api.TransactionHook; import org.neo4j.kernel.api.exceptions.Status; import org.neo4j.kernel.api.exceptions.TransactionFailureException; import org.neo4j.kernel.api.security.AnonymousContext; @@ -47,7 +48,9 @@ import org.neo4j.kernel.impl.transaction.command.Command; import org.neo4j.storageengine.api.StorageCommand; import org.neo4j.storageengine.api.StorageStatement; +import org.neo4j.storageengine.api.StoreReadLayer; import org.neo4j.storageengine.api.lock.ResourceLocker; +import org.neo4j.storageengine.api.txstate.ReadableTransactionState; import org.neo4j.test.DoubleLatch; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -469,15 +472,6 @@ public void shouldIncrementReuseCounterOnReuse() throws Exception assertEquals( reuseCount + 1, transaction.getReuseCount() ); } - @Test - public void markForTerminationNotInitializedTransaction() - { - KernelTransactionImplementation tx = newNotInitializedTransaction(); - tx.markForTermination( Status.General.UnknownError ); - - assertEquals( Status.General.UnknownError, tx.getReasonIfTerminated().get() ); - } - @Test public void markForTerminationInitializedTransaction() { @@ -710,6 +704,76 @@ public void markForTerminationWithIncorrectReuseCount() throws Exception verify( locksClient, never() ).stop(); } + @Test + public void transactionIsOpenAfterInitialization() + { + KernelTransactionImplementation transaction = newNotInitializedTransaction(); + assertFalse( transaction.isOpen() ); + + transaction.initialize( 100, 100, new SimpleStatementLocks( new NoOpClient() ), + KernelTransaction.Type.explicit, securityContext(), 100 ); + assertTrue( transaction.isOpen() ); + } + + @Test + public void transactionIsClosedAfterCommit() throws TransactionFailureException + { + KernelTransactionImplementation tx = newTransaction( 1000 ); + assertTrue( tx.isOpen() ); + + tx.success(); + tx.close(); + + assertTrue( tx.isClosed() ); + } + + @Test + public void transactionIsClosedAfterRollback() throws TransactionFailureException + { + KernelTransactionImplementation tx = newTransaction( 1000 ); + assertTrue( tx.isOpen() ); + + tx.failure(); + tx.close(); + + assertTrue( tx.isClosed() ); + } + + @Test + public void closeShutdownTransactionIsNotAllowed() throws TransactionFailureException + { + KernelTransactionImplementation transaction = newTransaction( 1000 ); + transaction.markAsShutdown(); + + expectedException.expect( TransactionFailureException.class ); + expectedException.expectMessage( "Transaction terminated since marked as shut down." ); + transaction.close(); + } + + @Test + public void closeLocksOnTransactionShutdown() + { + Locks.Client locksClient = mock( Locks.Client.class ); + KernelTransactionImplementation transaction = newTransaction( securityContext(), locksClient ); + transaction.markAsShutdown(); + + verify( locksClient ).close(); + } + + @Test + public void closeClosingTransactionNotAllowed() throws TransactionFailureException + { + Locks.Client locksClient = mock( Locks.Client.class ); + KernelTransactionImplementation transaction = newTransaction( securityContext(), locksClient ); + transaction.txState().nodeDoCreate( 42L ); + hooks.register( new ClosingTransactionHook() ); + transaction.success(); + + expectedException.expect( IllegalStateException.class ); + expectedException.expectMessage("Transaction is already closing. Repeated execution of transactions are not allowed." ); + transaction.closeTransaction(); + } + @Test public void closeClosedTransactionIsNotAllowed() throws TransactionFailureException { @@ -735,4 +799,34 @@ private void initializeAndClose( KernelTransactionImplementation tx, int times ) tx.close(); } } + + private static class ClosingTransactionHook implements TransactionHook + { + @Override + public Outcome beforeCommit( ReadableTransactionState state, KernelTransaction transaction, + StoreReadLayer storeReadLayer, StorageStatement statement ) + { + try + { + transaction.closeTransaction(); + } + catch ( TransactionFailureException e ) + { + throw new RuntimeException( e ); + } + return null; + } + + @Override + public void afterCommit( ReadableTransactionState state, KernelTransaction transaction, Outcome outcome ) + { + + } + + @Override + public void afterRollback( ReadableTransactionState state, KernelTransaction transaction, Outcome outcome ) + { + + } + } } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionStatusTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionStatusTest.java new file mode 100644 index 0000000000000..4eabbcddf8ba1 --- /dev/null +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionStatusTest.java @@ -0,0 +1,154 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.impl.api; + +import org.junit.Test; + +import org.neo4j.kernel.api.exceptions.Status; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.neo4j.kernel.impl.api.KernelTransactionImplementation.TransactionStatus; + +public class KernelTransactionStatusTest +{ + + @Test + public void openStatusAfterInitialization() throws Exception + { + TransactionStatus status = getOpenStatus(); + assertTrue( "Status should be open after initialization.", status.isOpen() ); + } + + @Test + public void closedStatusAfterCreation() + { + TransactionStatus status = createStatus(); + assertTrue( "Status should be closed just after creation", status.isClosed() ); + } + + @Test + public void terminateOpenTransaction() + { + TransactionStatus status = getOpenStatus(); + + assertTrue( status.terminate( Status.Transaction.Terminated ) ); + assertTrue( status.isTerminated() ); + assertEquals( Status.Transaction.Terminated, status.getTerminationReason().get() ); + } + + @Test + public void terminateClosingTransaction() + { + TransactionStatus status = getOpenStatus(); + + assertTrue( status.closing() ); + assertTrue( status.terminate( Status.Transaction.Terminated ) ); + assertTrue( status.isTerminated() ); + assertEquals( Status.Transaction.Terminated, status.getTerminationReason().get() ); + } + + @Test + public void closedTransactionIsNotTerminatable() + { + TransactionStatus status = getOpenStatus(); + + status.close(); + assertFalse( status.terminate( Status.Transaction.Terminated ) ); + assertFalse( status.getTerminationReason().isPresent() ); + } + + @Test + public void shutdownTransactionIsNotTerminatable() + { + TransactionStatus status = getOpenStatus(); + + assertTrue( status.shutdown() ); + assertFalse( status.terminate( Status.Transaction.Terminated ) ); + } + + @Test + public void closingTransactionNotPossibleWhenTransactionIsClosedAlready() + { + TransactionStatus status = getOpenStatus(); + + status.close(); + assertFalse( status.closing() ); + assertTrue( status.isClosed() ); + } + + @Test + public void closingTransactionInClosingState() + { + TransactionStatus closingStatus = getOpenStatus(); + closingStatus.closing(); + closingStatus.close(); + assertTrue( closingStatus.isClosed() ); + } + + @Test + public void closingTransactionCanBeTerminated() + { + TransactionStatus openStatus = getOpenStatus(); + assertTrue( openStatus.closing() ); + assertTrue( openStatus.terminate( Status.Transaction.Terminated ) ); + + assertTrue( openStatus.isTerminated() ); + assertTrue( openStatus.isClosing() ); + } + + @Test + public void transactionStatusLifeCycle() + { + TransactionStatus status = getOpenStatus(); + + assertTrue( status.closing() ); + assertTrue( status.isClosing() ); + + status.close(); + assertTrue( status.isClosed() ); + } + + @Test + public void closeResetTerminationReason() + { + TransactionStatus openStatus = getOpenStatus(); + + assertTrue( openStatus.closing() ); + assertTrue( openStatus.terminate( Status.Transaction.Terminated ) ); + assertTrue( openStatus.getTerminationReason().isPresent() ); + openStatus.close(); + + assertFalse( openStatus.getTerminationReason().isPresent() ); + } + + private TransactionStatus getOpenStatus() + { + TransactionStatus status = createStatus(); + status.init(); + return status; + } + + private TransactionStatus createStatus() + { + return new TransactionStatus(); + } +} diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionTerminationTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionTerminationTest.java index 715710f5cdae5..169cc7fd0d74a 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionTerminationTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionTerminationTest.java @@ -62,20 +62,6 @@ public class KernelTransactionTerminationTest { private static final int TEST_RUN_TIME_MS = 5_000; - @Test( timeout = TEST_RUN_TIME_MS * 2 ) - public void transactionCantBeTerminatedAfterItIsClosed() throws Throwable - { - runTwoThreads( - tx -> tx.markForTermination( Status.Transaction.TransactionMarkedAsFailed ), - tx -> - { - close( tx ); - assertFalse( tx.getReasonIfTerminated().isPresent() ); - tx.initialize(); - } - ); - } - @Test( timeout = TEST_RUN_TIME_MS * 2 ) public void closeTransaction() throws Throwable { diff --git a/community/neo4j/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionsIT.java b/community/neo4j/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionsIT.java new file mode 100644 index 0000000000000..722050e30d8ab --- /dev/null +++ b/community/neo4j/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionsIT.java @@ -0,0 +1,308 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.impl.api; + +import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.RuleChain; + +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.neo4j.graphdb.DependencyResolver; +import org.neo4j.graphdb.Transaction; +import org.neo4j.graphdb.TransactionTerminatedException; +import org.neo4j.graphdb.event.ErrorState; +import org.neo4j.graphdb.event.KernelEventHandler; +import org.neo4j.graphdb.event.TransactionData; +import org.neo4j.graphdb.event.TransactionEventHandler; +import org.neo4j.kernel.api.KernelTransaction; +import org.neo4j.kernel.api.KernelTransactionHandle; +import org.neo4j.kernel.api.exceptions.Status; +import org.neo4j.kernel.api.exceptions.TransactionFailureException; +import org.neo4j.kernel.api.security.SecurityContext; +import org.neo4j.test.mockito.matcher.RootCauseMatcher; +import org.neo4j.test.rule.EmbeddedDatabaseRule; + +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.hasSize; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +public class KernelTransactionsIT +{ + + public ExpectedException exception = ExpectedException.none(); + public EmbeddedDatabaseRule database = new EmbeddedDatabaseRule(); + + @Rule + public RuleChain ruleChain = RuleChain.outerRule( database ).around( exception ); + + private ExecutorService executorService; + + @Before + public void setUp() + { + executorService = Executors.newCachedThreadPool(); + } + + @After + public void tearDown() + { + executorService.shutdown(); + } + + @Test + public void terminationOfRunningTransaction() throws Exception + { + KernelTransactions kernelTransactions = database.getDependencyResolver() + .resolveDependency( KernelTransactions.class ); + + CountDownLatch latch = new CountDownLatch( 1 ); + exception.expect( new RootCauseMatcher<>( TransactionTerminatedException.class, + "The transaction has been terminated." ) ); + try ( Transaction transaction = database.beginTx() ) + { + database.createNode(); + executorService.submit( () -> + { + Set transactions = kernelTransactions.activeTransactions(); + assertThat( "Contain one single active transaction", transactions, hasSize( 1 ) ); + transactions.forEach( tx -> tx.markForTermination( Status.Transaction.TransactionTerminated ) ); + latch.countDown(); + } ); + latch.await(); + transaction.success(); + } + + assertThat( "Transactions have been terminated", kernelTransactions.activeTransactions(), empty() ); + database.shutdown(); + } + + @Test + public void shutdownWhileRunningTransaction() + { + exception.expect( new RootCauseMatcher<>( TransactionFailureException.class, + "Transaction terminated since marked as shut down." ) ); + + try ( Transaction ignored = database.beginTx() ) + { + database.createNode(); + database.shutdown(); + } + } + + @Test + public void shutdownDatabaseWhileHaveActiveTransactionRunning() throws InterruptedException + { + exception.expect( new RootCauseMatcher<>( TransactionFailureException.class, + "Transaction terminated since marked as shut down." ) ); + + CountDownLatch latch = new CountDownLatch( 1 ); + try ( Transaction transaction = database.beginTx() ) + { + database.createNode(); + transaction.success(); + executorService.submit( () -> + { + database.shutdown(); + latch.countDown(); + } ); + latch.await(); + } + } + + @Test + public void shutdownWithHaveActiveTerminatedTransactionRunning() throws InterruptedException + { + exception.expect( new RootCauseMatcher<>( TransactionFailureException.class, + "Transaction terminated since marked as shut down." ) ); + + CountDownLatch latch = new CountDownLatch( 1 ); + try ( Transaction transaction = database.beginTx() ) + { + database.createNode(); + transaction.terminate(); + executorService.submit( () -> + { + database.shutdown(); + latch.countDown(); + } ); + latch.await(); + } + } + + @Test + public void terminateTransactionFromAnotherThread() throws InterruptedException + { + exception.expect( TransactionTerminatedException.class ); + exception.expectMessage( "The transaction has been terminated. Retry your operation in a new transaction, " + + "and you should see a successful result. Explicitly terminated by the user." ); + + CountDownLatch latch = new CountDownLatch( 1 ); + try ( Transaction transaction = database.beginTx() ) + { + database.createNode(); + executorService.submit( () -> + { + transaction.terminate(); + latch.countDown(); + } ); + latch.await(); + database.createNode(); + } + + database.shutdown(); + } + + @Test + public void terminateTransactionFromAnotherThreadByHandle() throws InterruptedException + { + exception.expect( TransactionTerminatedException.class ); + exception.expectMessage( "The transaction has been terminated. Retry your operation in a new transaction, " + + "and you should see a successful result. The request referred to a transaction that does not exist." ); + + CountDownLatch latch = new CountDownLatch( 1 ); + try ( Transaction ignored = database.beginTx() ) + { + database.createNode(); + executorService.submit( () -> + { + DependencyResolver dependencyResolver = database.getDependencyResolver(); + KernelTransactions transactions = dependencyResolver.resolveDependency( KernelTransactions.class ); + Set kernelTransactionHandles = transactions.activeTransactions(); + kernelTransactionHandles.forEach( tx -> tx.markForTermination( Status.Transaction.TransactionNotFound ) ); + latch.countDown(); + } ); + latch.await(); + database.createNode(); + } + + database.shutdown(); + } + + @Test + public void shutdownRunningTransactionsOnDispose() + { + KernelTransactions transactions = + database.getDependencyResolver().resolveDependency( KernelTransactions.class ); + try ( Transaction ignored = database.beginTx() ) + { + database.createNode(); + + database.shutdown(); + } + catch ( Exception ignored ) + { + // nothing + } + Set allTransactions = transactions.getAllTransactions(); + assertThat( "We should have one transaction that was open and how is shutdown.", + allTransactions, Matchers.hasSize( 1 ) ); + KernelTransactionImplementation shutdownTransaction = allTransactions.iterator().next(); + assertEquals( Status.General.DatabaseUnavailable, shutdownTransaction.getReasonIfTerminated().get() ); + assertTrue( "Transaction state should be shutdown.", shutdownTransaction.isShutdown() ); + } + + @Test + public void waitClosingTransactionOnShutdown() throws TransactionFailureException, InterruptedException + { + Kernel kernel = database.getDependencyResolver().resolveDependency( Kernel.class ); + KernelTransactions transactions = + database.getDependencyResolver().resolveDependency( KernelTransactions.class ); + + KernelTransactionImplementation kernelTransaction = (KernelTransactionImplementation) kernel + .newTransaction(KernelTransaction.Type.implicit, SecurityContext.AUTH_DISABLED, 10000L ); + + CountDownLatch shutdownLatch = new CountDownLatch( 1 ); + database.registerKernelEventHandler( new ShutdownEventHandler( kernelTransaction ) ); + database.registerTransactionEventHandler( new ShutdownTransactionEventHandler( shutdownLatch ) ); + + kernelTransaction.txState().nodeDoCreate( 2 ); + kernelTransaction.success(); + kernelTransaction.close(); + + shutdownLatch.await(); + + Set allTransactions = transactions.getAllTransactions(); + assertThat( "No transactions, everything should be closed during shutdown.", allTransactions, empty() ); + } + + private static class ShutdownEventHandler implements KernelEventHandler + { + private final KernelTransactionImplementation kernelTransaction; + + ShutdownEventHandler( KernelTransactionImplementation kernelTransaction ) + { + this.kernelTransaction = kernelTransaction; + } + + @Override + public void beforeShutdown() + { + assertTrue( "Transaction should be already closed.", kernelTransaction.isClosed() ); + } + + @Override + public void kernelPanic( ErrorState error ) + { + + } + + @Override + public Object getResource() + { + return null; + } + + @Override + public ExecutionOrder orderComparedTo( KernelEventHandler other ) + { + return null; + } + } + + private class ShutdownTransactionEventHandler extends TransactionEventHandler.Adapter + { + private final CountDownLatch shutdownLatch; + + ShutdownTransactionEventHandler( CountDownLatch shutdownLatch ) + { + this.shutdownLatch = shutdownLatch; + } + + @Override + public Object beforeCommit( TransactionData data ) throws Exception + { + executorService.submit(() -> { + database.shutdown(); + shutdownLatch.countDown(); + } ); + return null; + } + } +}