From 5712b8c10d8364f3bb69f22e765a4cc024dc66c5 Mon Sep 17 00:00:00 2001 From: lutovich Date: Wed, 18 May 2016 00:50:34 +0300 Subject: [PATCH] Locks made transaction termination aware This commit makes community and forseti locks react on transaction termination by throwing TransactionTerminatedException. Previously when a terminated transaction was waiting to grab the lock this waiting did not end. Feature will allow us to develop kill-transaction functionality for transactions that are basically starved on locks. Feature is currently guarded by a boolean system property 'tx_termination_aware_locks' and off by default. --- .../api/KernelTransactionImplementation.java | 26 +- .../kernel/impl/api/tx/TxTermination.java | 34 ++ .../kernel/impl/api/tx/TxTerminationImpl.java | 40 ++ .../org/neo4j/kernel/impl/locking/Locks.java | 4 +- .../community/CommunityLockClient.java | 9 +- .../community/CommunityLockManger.java | 5 +- .../locking/community/LockManagerImpl.java | 9 +- .../kernel/impl/locking/community/RWLock.java | 105 ++-- .../graphdb/GraphDatabaseShutdownTest.java | 12 +- .../KernelTransactionImplementationTest.java | 12 +- .../impl/api/KernelTransactionsTest.java | 5 +- .../impl/locking/CloseCompatibility.java | 5 +- .../locking/LegacyDeadlockCompatibility.java | 11 +- .../locking/LockServiceMicroBenchmark.java | 3 +- .../kernel/impl/locking/LockWorkerState.java | 4 +- .../LockingCompatibilityTestSuite.java | 11 +- .../neo4j/kernel/impl/locking/NoOpLocks.java | 3 +- .../impl/locking/RWLockCompatibility.java | 11 +- .../locking/TxTerminationCompatibility.java | 479 ++++++++++++++++++ ...ava => CommunityLocksCompatibilityIT.java} | 2 +- .../locking/community/RWLockLeakTest.java | 11 +- .../PerformanceTestLegacyLocks.java | 4 +- .../neo4j/test/NeoStoreDataSourceRule.java | 14 +- .../ha/cluster/DefaultMasterImplSPI.java | 4 +- .../kernel/ha/lock/SlaveLockManager.java | 5 +- .../kernel/ha/lock/forseti/ForsetiClient.java | 30 +- .../ha/lock/forseti/ForsetiLockManager.java | 8 +- .../kernel/ha/lock/SlaveLockManagerTest.java | 5 +- .../lock/SlaveLocksClientConcurrentTest.java | 3 +- .../kernel/ha/lock/SlaveLocksClientTest.java | 3 +- ....java => ForsetiLocksCompatibilityIT.java} | 2 +- 31 files changed, 741 insertions(+), 138 deletions(-) create mode 100644 community/kernel/src/main/java/org/neo4j/kernel/impl/api/tx/TxTermination.java create mode 100644 community/kernel/src/main/java/org/neo4j/kernel/impl/api/tx/TxTerminationImpl.java create mode 100644 community/kernel/src/test/java/org/neo4j/kernel/impl/locking/TxTerminationCompatibility.java rename community/kernel/src/test/java/org/neo4j/kernel/impl/locking/community/{CommunityLocksCompatibility.java => CommunityLocksCompatibilityIT.java} (92%) rename enterprise/ha/src/test/java/org/neo4j/kernel/ha/lock/forseti/{ForsetiLocksCompatibility.java => ForsetiLocksCompatibilityIT.java} (93%) diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementation.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementation.java index 8f5ce8f8b96be..21317e6f42c4d 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementation.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementation.java @@ -52,6 +52,8 @@ import org.neo4j.kernel.impl.api.state.TxState; import org.neo4j.kernel.impl.api.store.PersistenceCache; import org.neo4j.kernel.impl.api.store.StoreReadLayer; +import org.neo4j.kernel.impl.api.tx.TxTermination; +import org.neo4j.kernel.impl.api.tx.TxTerminationImpl; import org.neo4j.kernel.impl.index.IndexEntityType; import org.neo4j.kernel.impl.locking.LockGroup; import org.neo4j.kernel.impl.locking.Locks; @@ -120,6 +122,8 @@ TransactionType upgradeToSchemaTransaction() throws InvalidTransactionTypeKernel } } + private static final boolean TX_TERMINATION_AWARE_LOCKS = Boolean.getBoolean( "tx_termination_aware_locks" ); + // Logic private final SchemaWriteGuard schemaWriteGuard; private final IndexingService indexService; @@ -153,7 +157,7 @@ TransactionType upgradeToSchemaTransaction() throws InvalidTransactionTypeKernel private Locks.Client locks; private boolean closing, closed; private boolean failure, success; - private volatile boolean terminated; + private final TxTerminationImpl termination = new TxTerminationImpl(); // Some header information private long startTimeMillis; private long lastTransactionIdWhenStarted; @@ -212,9 +216,10 @@ public KernelTransactionImplementation( StatementOperationParts operations, /** Reset this transaction to a vanilla state, turning it into a logically new transaction. */ public KernelTransactionImplementation initialize( long lastCommittedTx ) { - this.locks = locksManager.newClient(); + this.locks = newLocksClient(); this.context.bind( locks ); - this.closing = closed = failure = success = terminated = false; + this.closing = closed = failure = success = false; + this.termination.reset(); this.transactionType = TransactionType.ANY; this.beforeHookInvoked = false; this.recordState.initialize( lastCommittedTx ); @@ -240,16 +245,16 @@ public void failure() @Override public boolean shouldBeTerminated() { - return terminated; + return termination.shouldBeTerminated(); } @Override public void markForTermination() { - if ( !terminated ) + if ( !termination.shouldBeTerminated() ) { failure = true; - terminated = true; + termination.markForTermination(); transactionMonitor.transactionTerminated(); } } @@ -968,4 +973,13 @@ public String toString() { return "KernelTransaction[" + this.locks.getLockSessionId() + "]"; } + + private Locks.Client newLocksClient() + { + if ( TX_TERMINATION_AWARE_LOCKS ) + { + return locksManager.newClient( termination ); + } + return locksManager.newClient( TxTermination.NONE ); + } } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/tx/TxTermination.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/tx/TxTermination.java new file mode 100644 index 0000000000000..2f655ddcb5664 --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/tx/TxTermination.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.impl.api.tx; + +public interface TxTermination +{ + TxTermination NONE = new TxTermination() + { + @Override + public boolean shouldBeTerminated() + { + return false; + } + }; + + boolean shouldBeTerminated(); +} diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/tx/TxTerminationImpl.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/tx/TxTerminationImpl.java new file mode 100644 index 0000000000000..a5fe16b7d7812 --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/tx/TxTerminationImpl.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.impl.api.tx; + +public class TxTerminationImpl implements TxTermination +{ + private volatile boolean terminated; + + public void markForTermination() + { + terminated = true; + } + + public boolean shouldBeTerminated() + { + return terminated; + } + + public void reset() + { + terminated = false; + } +} diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/Locks.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/Locks.java index e80485661134a..bf3dc56eb6f1c 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/Locks.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/Locks.java @@ -20,6 +20,7 @@ package org.neo4j.kernel.impl.locking; import org.neo4j.helpers.Service; +import org.neo4j.kernel.impl.api.tx.TxTermination; import org.neo4j.kernel.impl.util.concurrent.WaitStrategy; import org.neo4j.kernel.lifecycle.Lifecycle; @@ -114,8 +115,9 @@ interface Client extends AutoCloseable * you call {@link Locks.Client#close()}. * * @throws IllegalStateException if this instance has been closed, i.e has had {@link #shutdown()} called. + * @param txTermination shows if transaction owning the client should be terminated */ - Client newClient(); + Client newClient( TxTermination txTermination ); /** Visit all held locks. */ void accept(Visitor visitor); diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/community/CommunityLockClient.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/community/CommunityLockClient.java index c0337fa03df5e..9960ecbdba2ca 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/community/CommunityLockClient.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/community/CommunityLockClient.java @@ -24,6 +24,7 @@ import org.neo4j.collection.primitive.PrimitiveIntObjectVisitor; import org.neo4j.collection.primitive.PrimitiveLongObjectMap; import org.neo4j.collection.primitive.PrimitiveLongObjectVisitor; +import org.neo4j.kernel.impl.api.tx.TxTermination; import org.neo4j.kernel.impl.locking.Locks; import static java.lang.String.format; @@ -31,14 +32,16 @@ public class CommunityLockClient implements Locks.Client { private final LockManagerImpl manager; + private final TxTermination txTermination; private final LockTransaction lockTransaction = new LockTransaction(); private final PrimitiveIntObjectMap> sharedLocks = Primitive.intObjectMap(); private final PrimitiveIntObjectMap> exclusiveLocks = Primitive.intObjectMap(); - public CommunityLockClient( LockManagerImpl manager ) + public CommunityLockClient( LockManagerImpl manager, TxTermination txTermination ) { this.manager = manager; + this.txTermination = txTermination; } @Override @@ -55,7 +58,7 @@ public void acquireShared( Locks.ResourceType resourceType, long... resourceIds } resource = new LockResource( resourceType, resourceId ); - manager.getReadLock( resource, lockTransaction ); + manager.getReadLock( resource, lockTransaction, txTermination ); localLocks.put(resourceId, resource); } } @@ -74,7 +77,7 @@ public void acquireExclusive( Locks.ResourceType resourceType, long... resourceI } resource = new LockResource( resourceType, resourceId ); - manager.getWriteLock( resource, lockTransaction ); + manager.getWriteLock( resource, lockTransaction, txTermination ); localLocks.put(resourceId, resource); } } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/community/CommunityLockManger.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/community/CommunityLockManger.java index bf2574b3e60e2..fecb2469a5c82 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/community/CommunityLockManger.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/community/CommunityLockManger.java @@ -19,6 +19,7 @@ */ package org.neo4j.kernel.impl.locking.community; +import org.neo4j.kernel.impl.api.tx.TxTermination; import org.neo4j.kernel.impl.locking.Locks; import org.neo4j.kernel.lifecycle.LifecycleAdapter; @@ -28,7 +29,7 @@ public class CommunityLockManger extends LifecycleAdapter implements Locks private volatile boolean closed; @Override - public Client newClient() + public Client newClient( TxTermination txTermination ) { // We check this volatile closed flag here, which may seem like a contention overhead, but as the time // of writing we apply pooling of transactions and in extension pooling of lock clients, @@ -37,7 +38,7 @@ public Client newClient() { throw new IllegalStateException( this + " already closed" ); } - return new CommunityLockClient( manager ); + return new CommunityLockClient( manager, txTermination ); } @Override diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/community/LockManagerImpl.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/community/LockManagerImpl.java index d5394e32fd1d7..b163788203e47 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/community/LockManagerImpl.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/community/LockManagerImpl.java @@ -24,6 +24,7 @@ import org.neo4j.helpers.collection.Visitor; import org.neo4j.kernel.DeadlockDetectedException; +import org.neo4j.kernel.impl.api.tx.TxTermination; import org.neo4j.kernel.impl.transaction.IllegalResourceException; import org.neo4j.kernel.impl.util.StringLogger; import org.neo4j.kernel.logging.Logging; @@ -43,10 +44,10 @@ public long getDetectedDeadlockCount() return ragManager.getDeadlockCount(); } - public void getReadLock( Object resource, Object tx ) + public void getReadLock( Object resource, Object tx, TxTermination txTermination ) throws DeadlockDetectedException, IllegalResourceException { - getRWLockForAcquiring( resource, tx ).acquireReadLock( tx ); + getRWLockForAcquiring( resource, tx ).acquireReadLock( tx, txTermination ); } public boolean tryReadLock( Object resource, Object tx ) @@ -55,10 +56,10 @@ public boolean tryReadLock( Object resource, Object tx ) return getRWLockForAcquiring( resource, tx ).tryAcquireReadLock( tx ); } - public void getWriteLock( Object resource, Object tx ) + public void getWriteLock( Object resource, Object tx, TxTermination txTermination ) throws DeadlockDetectedException, IllegalResourceException { - getRWLockForAcquiring( resource, tx ).acquireWriteLock( tx ); + getRWLockForAcquiring( resource, tx ).acquireWriteLock( tx, txTermination ); } public boolean tryWriteLock( Object resource, Object tx ) diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/community/RWLock.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/community/RWLock.java index 7d32b990c533d..d74ca969ac89e 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/community/RWLock.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/locking/community/RWLock.java @@ -23,15 +23,16 @@ import java.util.LinkedList; import java.util.ListIterator; +import org.neo4j.graphdb.TransactionTerminatedException; import org.neo4j.helpers.collection.Visitor; import org.neo4j.kernel.DeadlockDetectedException; +import org.neo4j.kernel.impl.api.tx.TxTermination; import org.neo4j.kernel.impl.locking.LockType; import org.neo4j.kernel.impl.util.ArrayMap; import org.neo4j.kernel.impl.util.StringLogger.LineLogger; import static java.lang.Thread.currentThread; import static java.lang.Thread.interrupted; - import static org.neo4j.kernel.impl.locking.LockType.READ; import static org.neo4j.kernel.impl.locking.LockType.WRITE; @@ -143,7 +144,8 @@ synchronized boolean isMarked() * @throws DeadlockDetectedException * if a deadlock is detected */ - synchronized void acquireReadLock( Object tx ) throws DeadlockDetectedException + synchronized void acquireReadLock( Object tx, TxTermination txTermination ) + throws DeadlockDetectedException { TxLockElement tle = getOrCreateLockElement( tx ); @@ -151,30 +153,9 @@ synchronized void acquireReadLock( Object tx ) throws DeadlockDetectedException { tle.movedOn = false; - boolean shouldAddWait = true; - Thread currentThread = currentThread(); - while ( totalWriteCount > tle.writeCount ) { - ragManager.checkWaitOn( this, tx ); - - if (shouldAddWait) - { - waitingThreadList.addFirst( new WaitElement( tle, READ, currentThread) ); - } - - try - { - wait(); - shouldAddWait = false; - } - catch ( InterruptedException e ) - { - interrupted(); - - shouldAddWait = true; - } - ragManager.stopWaitOn( this, tx ); + acquireLock( READ, tx, txTermination, tle ); } registerReadLockAcquired( tx, tle ); @@ -305,16 +286,6 @@ else if ( we.lockType == LockType.READ ) } } - /** - * Calls {@link #acquireWriteLock(Object)} with the - * transaction associated with the current thread. - * @throws DeadlockDetectedException - */ - void acquireWriteLock() throws DeadlockDetectedException - { - acquireWriteLock( null ); - } - /** * Tries to acquire write lock for a given transaction. If * this.writeCount is greater than the currents tx's write @@ -328,7 +299,8 @@ void acquireWriteLock() throws DeadlockDetectedException * @throws DeadlockDetectedException * if a deadlock is detected */ - synchronized void acquireWriteLock( Object tx ) throws DeadlockDetectedException + synchronized void acquireWriteLock( Object tx, TxTermination txTermination ) + throws DeadlockDetectedException { TxLockElement tle = getOrCreateLockElement( tx ); @@ -336,30 +308,9 @@ synchronized void acquireWriteLock( Object tx ) throws DeadlockDetectedException { tle.movedOn = false; - boolean shouldAddWait = true; - Thread currentThread = currentThread(); - while ( totalWriteCount > tle.writeCount || totalReadCount > tle.readCount ) { - ragManager.checkWaitOn( this, tx ); - - if (shouldAddWait) - { - waitingThreadList.addFirst( new WaitElement( tle, WRITE, currentThread) ); - } - - try - { - wait(); - shouldAddWait = false; - } - catch ( InterruptedException e ) - { - interrupted(); - - shouldAddWait = true; - } - ragManager.stopWaitOn( this, tx ); + acquireLock( WRITE, tx, txTermination, tle ); } registerWriteLockAcquired( tx, tle ); @@ -545,6 +496,46 @@ public String toString() return "RWLock[" + resource + ", hash="+hashCode()+"]"; } + private void acquireLock( LockType type, Object tx, TxTermination termination, TxLockElement tle ) + { + ragManager.checkWaitOn( this, tx ); + WaitElement waitElement = new WaitElement( tle, type, currentThread() ); + waitingThreadList.addFirst( waitElement ); + + try + { + waitForLock( termination, waitElement ); + } + finally + { + ragManager.stopWaitOn( this, tx ); + } + } + + private void waitForLock( TxTermination txTermination, WaitElement waitElement ) + { + try + { + while ( true ) + { + // we can exit from this loop only by throwing InterruptedException + // this is fine because current thread will be interrupted when lock in question is released + wait( 10 ); + + if ( txTermination.shouldBeTerminated() ) + { + // remove all signs of our waiting + waitingThreadList.remove( waitElement ); + throw new TransactionTerminatedException(); + } + } + } + catch ( InterruptedException e ) + { + interrupted(); + } + } + private void registerReadLockAcquired( Object tx, TxLockElement tle ) { registerLockAcquired( tx, tle ); diff --git a/community/kernel/src/test/java/org/neo4j/graphdb/GraphDatabaseShutdownTest.java b/community/kernel/src/test/java/org/neo4j/graphdb/GraphDatabaseShutdownTest.java index 9f903bf9bf459..1e72c221647cc 100644 --- a/community/kernel/src/test/java/org/neo4j/graphdb/GraphDatabaseShutdownTest.java +++ b/community/kernel/src/test/java/org/neo4j/graphdb/GraphDatabaseShutdownTest.java @@ -19,12 +19,12 @@ */ package org.neo4j.graphdb; +import org.junit.Test; + import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; -import org.junit.Test; - import org.neo4j.kernel.GraphDatabaseAPI; import org.neo4j.kernel.api.exceptions.TransactionFailureException; import org.neo4j.kernel.impl.locking.Locks; @@ -32,12 +32,12 @@ import static java.util.concurrent.Executors.newSingleThreadExecutor; import static java.util.concurrent.TimeUnit.SECONDS; - +import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; - +import static org.junit.Assert.fail; import static org.neo4j.graphdb.DynamicLabel.label; import static org.neo4j.helpers.Exceptions.rootCause; @@ -130,10 +130,12 @@ public Void call() throws Exception try { secondTxResult.get( 60, SECONDS ); + fail("exception expected"); } catch ( Exception e ) { - assertThat( rootCause( e ), instanceOf( TransactionFailureException.class ) ); + assertThat( rootCause( e ), anyOf( instanceOf( TransactionFailureException.class ), + instanceOf( TransactionTerminatedException.class ) ) ); } } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/api/KernelTransactionImplementationTest.java b/community/kernel/src/test/java/org/neo4j/kernel/api/KernelTransactionImplementationTest.java index 7343a3f3bc8bc..42a918c36d5a9 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/api/KernelTransactionImplementationTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/api/KernelTransactionImplementationTest.java @@ -38,11 +38,10 @@ import org.neo4j.kernel.impl.api.TransactionHooks; import org.neo4j.kernel.impl.api.store.PersistenceCache; import org.neo4j.kernel.impl.api.store.StoreReadLayer; +import org.neo4j.kernel.impl.api.tx.TxTermination; import org.neo4j.kernel.impl.locking.LockGroup; import org.neo4j.kernel.impl.locking.Locks; -import org.neo4j.kernel.impl.locking.NoOpClient; import org.neo4j.kernel.impl.locking.NoOpLocks; -import org.neo4j.kernel.impl.locking.community.CommunityLockManger; import org.neo4j.kernel.impl.store.NeoStore; import org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory; import org.neo4j.kernel.impl.transaction.TransactionMonitor; @@ -54,9 +53,9 @@ import org.neo4j.kernel.impl.transaction.tracing.TransactionTracer; import org.neo4j.test.DoubleLatch; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyListOf; import static org.mockito.Mockito.doAnswer; @@ -66,11 +65,8 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; -import static java.util.concurrent.TimeUnit.MILLISECONDS; - public class KernelTransactionImplementationTest { @Test @@ -404,7 +400,7 @@ public void shouldAcquireNewLocksClientEveryTimeTransactionIsReused() throws Exc // GIVEN KernelTransactionImplementation transaction = newTransaction(); transaction.close(); - verify( locks ).newClient(); + verify( locks ).newClient( any( TxTermination.class ) ); reset( locks ); // WHEN @@ -412,7 +408,7 @@ public void shouldAcquireNewLocksClientEveryTimeTransactionIsReused() throws Exc transaction.close(); // THEN - verify( locks ).newClient(); + verify( locks ).newClient( any( TxTermination.class ) ); } private final NeoStore neoStore = mock( NeoStore.class ); diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionsTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionsTest.java index 30ad75cefb47d..6c480dc5c570b 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionsTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionsTest.java @@ -26,11 +26,11 @@ import org.neo4j.helpers.collection.Iterables; import org.neo4j.kernel.api.KernelTransaction; import org.neo4j.kernel.api.exceptions.TransactionFailureException; +import org.neo4j.kernel.impl.api.tx.TxTermination; import org.neo4j.kernel.impl.locking.LockGroup; import org.neo4j.kernel.impl.locking.Locks; import org.neo4j.kernel.impl.store.NeoStore; import org.neo4j.kernel.impl.store.record.NodeRecord; -import org.neo4j.kernel.impl.transaction.tracing.CommitEvent; import org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory; import org.neo4j.kernel.impl.transaction.TransactionMonitor; import org.neo4j.kernel.impl.transaction.TransactionRepresentation; @@ -39,6 +39,7 @@ import org.neo4j.kernel.impl.transaction.state.NeoStoreTransactionContextSupplier; import org.neo4j.kernel.impl.transaction.state.RecordAccess; import org.neo4j.kernel.impl.transaction.state.RecordAccess.RecordProxy; +import org.neo4j.kernel.impl.transaction.tracing.CommitEvent; import org.neo4j.kernel.impl.util.StringLogger; import org.neo4j.kernel.lifecycle.LifeSupport; import org.neo4j.kernel.monitoring.tracing.Tracers; @@ -131,7 +132,7 @@ private static KernelTransactions newKernelTransactions( TransactionCommitProces life.start(); Locks locks = mock( Locks.class ); - when( locks.newClient() ).thenReturn( mock( Locks.Client.class ) ); + when( locks.newClient( any( TxTermination.class ) ) ).thenReturn( mock( Locks.Client.class ) ); return new KernelTransactions( contextSupplier, mock( NeoStore.class ), locks, mock( IntegrityValidator.class ), null, null, null, null, null, null, null, diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/CloseCompatibility.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/CloseCompatibility.java index 7af7ed4b41c20..425bbe8d07cab 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/CloseCompatibility.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/CloseCompatibility.java @@ -22,6 +22,7 @@ import org.junit.Ignore; import org.junit.Test; +import org.neo4j.kernel.impl.api.tx.TxTermination; import org.neo4j.kernel.impl.locking.Locks.Client; import static org.junit.Assert.fail; @@ -38,7 +39,7 @@ public CloseCompatibility( LockingCompatibilityTestSuite suite ) public void shouldNotBeAbleToHandOutClientsIfShutDown() throws Throwable { // GIVEN a lock manager and working clients - try ( Client client = locks.newClient() ) + try ( Client client = locks.newClient( TxTermination.NONE ) ) { client.acquireExclusive( ResourceTypes.NODE, 0 ); } @@ -50,7 +51,7 @@ public void shouldNotBeAbleToHandOutClientsIfShutDown() throws Throwable // THEN try { - locks.newClient(); + locks.newClient( TxTermination.NONE ); fail( "Should fail" ); } catch ( IllegalStateException e ) diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/LegacyDeadlockCompatibility.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/LegacyDeadlockCompatibility.java index f891fcfbd48c1..37d8293ec6b09 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/LegacyDeadlockCompatibility.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/LegacyDeadlockCompatibility.java @@ -19,20 +19,19 @@ */ package org.neo4j.kernel.impl.locking; +import org.junit.Ignore; +import org.junit.Test; + import java.io.File; import java.util.Random; import java.util.Stack; import java.util.concurrent.CountDownLatch; - import javax.transaction.Transaction; -import org.junit.Ignore; -import org.junit.Test; - import org.neo4j.kernel.DeadlockDetectedException; +import org.neo4j.kernel.impl.api.tx.TxTermination; import static java.lang.System.currentTimeMillis; - import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; @@ -274,7 +273,7 @@ public void testStressMultipleThreads() throws Exception int depthCount = 10; float readWriteRatio = 0.80f; stressThreads[i] = new StressThread( "T" + i, numberOfIterations, depthCount, readWriteRatio, - locks.newClient(), + locks.newClient( TxTermination.NONE ), startSignal ); } for ( Thread thread : stressThreads ) diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/LockServiceMicroBenchmark.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/LockServiceMicroBenchmark.java index 15339c0a1b15c..9e49322d420e2 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/LockServiceMicroBenchmark.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/LockServiceMicroBenchmark.java @@ -27,6 +27,7 @@ import javax.transaction.Transaction; import javax.transaction.xa.XAResource; +import org.neo4j.kernel.impl.api.tx.TxTermination; import org.neo4j.kernel.impl.locking.community.LockManagerImpl; import org.neo4j.kernel.impl.locking.community.RagManager; @@ -275,7 +276,7 @@ protected Transaction initialValue() public Lock acquireNodeLock( long nodeId, LockType type ) { AbstractLockService.LockedNode resource = new AbstractLockService.LockedNode( nodeId ); - getWriteLock( resource, threadMark.get() ); + getWriteLock( resource, threadMark.get(), TxTermination.NONE ); return new WriteRelease( resource ); } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/LockWorkerState.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/LockWorkerState.java index b3c94bb0e4afe..3c4f9b6634d5b 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/LockWorkerState.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/LockWorkerState.java @@ -22,7 +22,7 @@ import java.util.ArrayList; import java.util.List; -import org.neo4j.kernel.impl.locking.Locks; +import org.neo4j.kernel.impl.api.tx.TxTermination; class LockWorkerState { @@ -35,7 +35,7 @@ class LockWorkerState public LockWorkerState( Locks locks ) { this.grabber = locks; - this.client = locks.newClient(); + this.client = locks.newClient( TxTermination.NONE ); } public void doing( String doing ) diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/LockingCompatibilityTestSuite.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/LockingCompatibilityTestSuite.java index ada1690f62ca6..30ecdff8a3cb4 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/LockingCompatibilityTestSuite.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/LockingCompatibilityTestSuite.java @@ -31,13 +31,13 @@ import java.util.concurrent.TimeoutException; import org.neo4j.kernel.api.index.ParameterizedSuiteRunner; +import org.neo4j.kernel.impl.api.tx.TxTermination; import org.neo4j.test.OtherThreadExecutor.WorkerCommand; import org.neo4j.test.OtherThreadRule; import static junit.framework.TestCase.assertFalse; import static junit.framework.TestCase.fail; import static org.hamcrest.MatcherAssert.assertThat; - import static org.neo4j.test.OtherThreadRule.isWaiting; /** Base for locking tests. */ @@ -47,7 +47,8 @@ DeadlockCompatibility.class, LockReentrancyCompatibility.class, RWLockCompatibility.class, - CloseCompatibility.class + CloseCompatibility.class, + TxTerminationCompatibility.class }) public abstract class LockingCompatibilityTestSuite { @@ -74,9 +75,9 @@ public static abstract class Compatibility public Compatibility( LockingCompatibilityTestSuite suite ) { this.locks = suite.createLockManager(); - clientA = this.locks.newClient(); - clientB = this.locks.newClient(); - clientC = this.locks.newClient(); + clientA = this.locks.newClient( TxTermination.NONE ); + clientB = this.locks.newClient( TxTermination.NONE ); + clientC = this.locks.newClient( TxTermination.NONE ); clientToThreadMap.put( clientA, threadA ); clientToThreadMap.put( clientB, threadB ); diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/NoOpLocks.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/NoOpLocks.java index 07547400d6aba..d40e71dc5f203 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/NoOpLocks.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/NoOpLocks.java @@ -19,6 +19,7 @@ */ package org.neo4j.kernel.impl.locking; +import org.neo4j.kernel.impl.api.tx.TxTermination; import org.neo4j.kernel.lifecycle.LifecycleAdapter; public class NoOpLocks extends LifecycleAdapter implements Locks @@ -32,7 +33,7 @@ public void shutdown() throws Throwable } @Override - public Client newClient() + public Client newClient( TxTermination txTermination ) { if ( closed ) { diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/RWLockCompatibility.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/RWLockCompatibility.java index be73dc0bb74b2..8d5a0c3b28640 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/RWLockCompatibility.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/RWLockCompatibility.java @@ -19,18 +19,21 @@ */ package org.neo4j.kernel.impl.locking; +import org.junit.Ignore; +import org.junit.Test; + import java.io.File; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; -import org.junit.Ignore; -import org.junit.Test; import org.neo4j.kernel.DeadlockDetectedException; +import org.neo4j.kernel.impl.api.tx.TxTermination; import static java.lang.System.currentTimeMillis; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.junit.Assert.*; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.neo4j.kernel.impl.locking.ResourceTypes.NODE; /** @@ -218,7 +221,7 @@ public class StressThread extends Thread { super(); this.nodeId = nodeId; - this.client = locks.newClient(); + this.client = locks.newClient( TxTermination.NONE ); this.name = name; this.numberOfIterations = numberOfIterations; this.depthCount = depthCount; diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/TxTerminationCompatibility.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/TxTerminationCompatibility.java new file mode 100644 index 0000000000000..2e53d13558eed --- /dev/null +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/TxTerminationCompatibility.java @@ -0,0 +1,479 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.impl.locking; + +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.neo4j.graphdb.TransactionTerminatedException; +import org.neo4j.kernel.impl.api.tx.TxTermination; +import org.neo4j.kernel.impl.api.tx.TxTerminationImpl; + +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.neo4j.kernel.impl.locking.Locks.Client; +import static org.neo4j.kernel.impl.locking.Locks.ResourceType; +import static org.neo4j.kernel.impl.locking.Locks.Visitor; + +@Ignore( "Not a test. This is a compatibility suite, run from LockingCompatibilityTestSuite." ) +public class TxTerminationCompatibility extends LockingCompatibilityTestSuite.Compatibility +{ + private static final ResourceType RESOURCE_TYPE = ResourceTypes.NODE; + private static final long RESOURCE_ID = 42; + private static final long OTHER_RESOURCE_ID = 4242; + + private ExecutorService executor; + private Client client; + + public TxTerminationCompatibility( LockingCompatibilityTestSuite suite ) + { + super( suite ); + } + + @Before + public void setUp() throws Exception + { + executor = Executors.newSingleThreadExecutor(); + client = locks.newClient( TxTermination.NONE ); + } + + @After + public void tearDown() throws Exception + { + client.close(); + executor.shutdownNow(); + executor.awaitTermination( 1, TimeUnit.MINUTES ); + } + + @Test + public void sharedLockIsTransactionTerminationAware() throws Exception + { + acquireExclusiveLockInThisThread(); + + TxTerminationImpl txTermination = new TxTerminationImpl(); + Future sharedLockAcquisition = acquireSharedLockInAnotherThread( txTermination ); + assertThreadIsWaitingForLock( sharedLockAcquisition ); + + txTermination.markForTermination(); + assertLockAcquisitionFailed( sharedLockAcquisition ); + } + + @Test + public void exclusiveLockIsTransactionTerminationAware() throws Exception + { + acquireExclusiveLockInThisThread(); + + TxTerminationImpl txTermination = new TxTerminationImpl(); + Future exclusiveLockAcquisition = acquireExclusiveLockInAnotherThread( txTermination ); + assertThreadIsWaitingForLock( exclusiveLockAcquisition ); + + txTermination.markForTermination(); + assertLockAcquisitionFailed( exclusiveLockAcquisition ); + } + + @Test + public void acquireSharedLockAfterSharedLockFailureOnTransactionTerminationOtherThread() throws Exception + { + acquireExclusiveLockInThisThread(); + + TxTerminationImpl txTermination = new TxTerminationImpl(); + Future sharedLockAcquisition1 = acquireSharedLockInAnotherThread( txTermination ); + assertThreadIsWaitingForLock( sharedLockAcquisition1 ); + + txTermination.markForTermination(); + assertLockAcquisitionFailed( sharedLockAcquisition1 ); + + releaseAllLocksInThisThread(); + + Future sharedLockAcquisition2 = acquireSharedLockInAnotherThread( txTermination ); + assertLockAcquisitionSucceeded( sharedLockAcquisition2 ); + } + + @Test + public void acquireExclusiveLockAfterExclusiveLockFailureOnTransactionTerminationOtherThread() throws Exception + { + acquireExclusiveLockInThisThread(); + + TxTerminationImpl txTermination = new TxTerminationImpl(); + Future exclusiveLockAcquisition1 = acquireExclusiveLockInAnotherThread( txTermination ); + assertThreadIsWaitingForLock( exclusiveLockAcquisition1 ); + + txTermination.markForTermination(); + assertLockAcquisitionFailed( exclusiveLockAcquisition1 ); + + releaseAllLocksInThisThread(); + + Future exclusiveLockAcquisition2 = acquireExclusiveLockInAnotherThread( txTermination ); + assertLockAcquisitionSucceeded( exclusiveLockAcquisition2 ); + } + + @Test + public void acquireSharedLockAfterExclusiveLockFailureOnTransactionTerminationOtherThread() throws Exception + { + acquireExclusiveLockInThisThread(); + + TxTerminationImpl txTermination = new TxTerminationImpl(); + Future exclusiveLockAcquisition = acquireExclusiveLockInAnotherThread( txTermination ); + assertThreadIsWaitingForLock( exclusiveLockAcquisition ); + + txTermination.markForTermination(); + assertLockAcquisitionFailed( exclusiveLockAcquisition ); + + releaseAllLocksInThisThread(); + + Future sharedLockAcquisition = acquireSharedLockInAnotherThread( txTermination ); + assertLockAcquisitionSucceeded( sharedLockAcquisition ); + } + + @Test + public void acquireExclusiveLockAfterSharedLockFailureOnTransactionTerminationOtherThread() throws Exception + { + acquireExclusiveLockInThisThread(); + + TxTerminationImpl txTermination = new TxTerminationImpl(); + Future sharedLockAcquisition = acquireSharedLockInAnotherThread( txTermination ); + assertThreadIsWaitingForLock( sharedLockAcquisition ); + + txTermination.markForTermination(); + assertLockAcquisitionFailed( sharedLockAcquisition ); + + releaseAllLocksInThisThread(); + + Future exclusiveLockAcquisition = acquireExclusiveLockInAnotherThread( txTermination ); + assertLockAcquisitionSucceeded( exclusiveLockAcquisition ); + } + + @Test + public void acquireSharedLockAfterSharedLockFailureOnTransactionTerminationSameThread() throws Exception + { + acquireLockAfterOtherLockFailureOnTransactionTerminationSameThread( true, true ); + } + + @Test + public void acquireExclusiveLockAfterExclusiveLockFailureOnTransactionTerminationSameThread() throws Exception + { + acquireLockAfterOtherLockFailureOnTransactionTerminationSameThread( false, false ); + } + + @Test + public void acquireSharedLockAfterExclusiveLockFailureOnTransactionTerminationSameThread() throws Exception + { + acquireLockAfterOtherLockFailureOnTransactionTerminationSameThread( true, false ); + } + + @Test + public void acquireExclusiveLockAfterSharedLockFailureOnTransactionTerminationSameThread() throws Exception + { + acquireLockAfterOtherLockFailureOnTransactionTerminationSameThread( false, true ); + } + + @Test + public void closeClientAfterSharedLockFailureOnTransactionTermination() throws Exception + { + closeClientAfterLockFailureOnTransactionTermination( true ); + } + + @Test + public void closeClientAfterExclusiveLockFailureOnTransactionTermination() throws Exception + { + closeClientAfterLockFailureOnTransactionTermination( false ); + } + + private void closeClientAfterLockFailureOnTransactionTermination( boolean shared ) throws Exception + { + acquireExclusiveLockInThisThread(); + + TxTerminationImpl txTermination = new TxTerminationImpl(); + CountDownLatch firstLockAcquired = new CountDownLatch( 1 ); + Future acquisition = tryAcquireTwoLocksLockInAnotherThread( shared, txTermination, firstLockAcquired ); + + await( firstLockAcquired ); + assertThreadIsWaitingForLock( acquisition ); + assertLocksHeld( RESOURCE_ID, OTHER_RESOURCE_ID ); + + txTermination.markForTermination(); + assertLockAcquisitionFailed( acquisition ); + assertLocksHeld( RESOURCE_ID ); + + releaseAllLocksInThisThread(); + assertNoLocksHeld(); + } + + private void acquireLockAfterOtherLockFailureOnTransactionTerminationSameThread( boolean firstLockShared, + boolean secondLockShared ) throws Exception + { + acquireExclusiveLockInThisThread(); + + TxTerminationImpl txTermination = new TxTerminationImpl(); + CountDownLatch firstLockFailed = new CountDownLatch( 1 ); + CountDownLatch startSecondLock = new CountDownLatch( 1 ); + + Future locking = acquireTwoLocksInAnotherThread( firstLockShared, secondLockShared, txTermination, + firstLockFailed, startSecondLock ); + assertThreadIsWaitingForLock( locking ); + + txTermination.markForTermination(); + await( firstLockFailed ); + txTermination.reset(); + releaseAllLocksInThisThread(); + startSecondLock.countDown(); + + assertLockAcquisitionSucceeded( locking ); + } + + private void acquireExclusiveLockInThisThread() + { + client.acquireExclusive( RESOURCE_TYPE, RESOURCE_ID ); + assertLocksHeld( RESOURCE_ID ); + } + + private void releaseAllLocksInThisThread() + { + client.releaseAll(); + } + + private Future acquireSharedLockInAnotherThread( TxTermination txTermination ) + { + return acquireLockInAnotherThread( true, txTermination ); + } + + private Future acquireExclusiveLockInAnotherThread( TxTermination txTermination ) + { + return acquireLockInAnotherThread( false, txTermination ); + } + + private Future acquireLockInAnotherThread( final boolean shared, final TxTermination txTermination ) + { + return executor.submit( new Callable() + { + @Override + public Void call() throws Exception + { + Client client = locks.newClient( txTermination ); + if ( shared ) + { + client.acquireShared( RESOURCE_TYPE, RESOURCE_ID ); + } + else + { + client.acquireExclusive( RESOURCE_TYPE, RESOURCE_ID ); + } + return null; + } + } ); + } + + private Future acquireTwoLocksInAnotherThread( final boolean firstShared, final boolean secondShared, + final TxTermination txTermination, final CountDownLatch firstLockFailed, + final CountDownLatch startSecondLock ) + { + return executor.submit( new Callable() + { + @Override + public Void call() throws Exception + { + try ( Client client = locks.newClient( txTermination ) ) + { + try + { + if ( firstShared ) + { + client.acquireShared( RESOURCE_TYPE, RESOURCE_ID ); + } + else + { + client.acquireExclusive( RESOURCE_TYPE, RESOURCE_ID ); + } + fail( "Transaction termination expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( TransactionTerminatedException.class ) ); + } + + firstLockFailed.countDown(); + await( startSecondLock ); + + if ( secondShared ) + { + client.acquireShared( RESOURCE_TYPE, RESOURCE_ID ); + } + else + { + client.acquireExclusive( RESOURCE_TYPE, RESOURCE_ID ); + } + } + return null; + } + } ); + } + + private Future tryAcquireTwoLocksLockInAnotherThread( final boolean shared, final TxTermination txTermination, + final CountDownLatch firstLockAcquired ) + { + return executor.submit( new Callable() + { + @Override + public Void call() throws Exception + { + try ( Client client = locks.newClient( txTermination ) ) + { + if ( shared ) + { + client.acquireShared( RESOURCE_TYPE, OTHER_RESOURCE_ID ); + } + else + { + client.acquireExclusive( RESOURCE_TYPE, OTHER_RESOURCE_ID ); + } + + firstLockAcquired.countDown(); + + if ( shared ) + { + client.acquireShared( RESOURCE_TYPE, RESOURCE_ID ); + } + else + { + client.acquireExclusive( RESOURCE_TYPE, RESOURCE_ID ); + } + } + return null; + } + } ); + } + + private void assertLocksHeld( final Long... expectedResourceIds ) + { + final List expectedLockedIds = Arrays.asList( expectedResourceIds ); + final List seenLockedIds = new ArrayList<>(); + + locks.accept( new Visitor() + { + @Override + public void visit( ResourceType resourceType, long resourceId, String description, long estimatedWaitTime ) + { + seenLockedIds.add( resourceId ); + } + } ); + + Collections.sort( expectedLockedIds ); + Collections.sort( seenLockedIds ); + assertEquals( "unexpected locked resource ids", expectedLockedIds, seenLockedIds ); + } + + private void assertNoLocksHeld() + { + locks.accept( new Visitor() + { + @Override + public void visit( ResourceType resourceType, long resourceId, String description, long estimatedWaitTime ) + { + fail( "Unexpected lock on " + resourceType + " " + resourceId ); + } + } ); + } + + private void assertThreadIsWaitingForLock( Future lockAcquisition ) throws Exception + { + for ( int i = 0; i < 20; i++ ) + { + try + { + lockAcquisition.get( 50, TimeUnit.MILLISECONDS ); + fail( "Timeout expected" ); + } + catch ( TimeoutException ignore ) + { + } + } + assertFalse( "locking thread completed", lockAcquisition.isDone() ); + } + + private void assertLockAcquisitionSucceeded( Future lockAcquisition ) throws Exception + { + boolean completed = false; + for ( int i = 0; i < 20; i++ ) + { + try + { + assertNull( lockAcquisition.get( 50, TimeUnit.MILLISECONDS ) ); + completed = true; + } + catch ( TimeoutException ignore ) + { + } + } + assertTrue( "lock was not acquired in time", completed ); + assertTrue( "locking thread seem to be still in progress", lockAcquisition.isDone() ); + } + + private void assertLockAcquisitionFailed( Future lockAcquisition ) throws Exception + { + ExecutionException executionException = null; + for ( int i = 0; i < 20; i++ ) + { + try + { + lockAcquisition.get( 50, TimeUnit.MILLISECONDS ); + fail( "Transaction termination expected" ); + } + catch ( ExecutionException e ) + { + executionException = e; + } + catch ( TimeoutException ignore ) + { + } + } + assertNotNull( "execution should fail", executionException ); + assertThat( executionException.getCause(), instanceOf( TransactionTerminatedException.class ) ); + assertTrue( "locking thread seem to be still in progress", lockAcquisition.isDone() ); + } + + private static void await( CountDownLatch latch ) throws InterruptedException + { + if ( !latch.await( 1, TimeUnit.MINUTES ) ) + { + fail( "Count down did not happen" ); + } + } +} diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/community/CommunityLocksCompatibility.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/community/CommunityLocksCompatibilityIT.java similarity index 92% rename from community/kernel/src/test/java/org/neo4j/kernel/impl/locking/community/CommunityLocksCompatibility.java rename to community/kernel/src/test/java/org/neo4j/kernel/impl/locking/community/CommunityLocksCompatibilityIT.java index cf95350101fb3..59892658d3ad7 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/community/CommunityLocksCompatibility.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/community/CommunityLocksCompatibilityIT.java @@ -22,7 +22,7 @@ import org.neo4j.kernel.impl.locking.LockingCompatibilityTestSuite; import org.neo4j.kernel.impl.locking.Locks; -public class CommunityLocksCompatibility extends LockingCompatibilityTestSuite +public class CommunityLocksCompatibilityIT extends LockingCompatibilityTestSuite { @Override protected Locks createLockManager() diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/community/RWLockLeakTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/community/RWLockLeakTest.java index 7434e5a9ac210..1722394953560 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/community/RWLockLeakTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/community/RWLockLeakTest.java @@ -19,13 +19,14 @@ */ package org.neo4j.kernel.impl.locking.community; -import static org.mockito.Mockito.mock; +import org.junit.Test; import javax.transaction.Transaction; -import static org.junit.Assert.assertEquals; +import org.neo4j.kernel.impl.api.tx.TxTermination; -import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; public class RWLockLeakTest { @@ -38,7 +39,7 @@ public void assertWriteLockDoesNotLeakMemory() throws InterruptedException final Transaction tx1 = mock( Transaction.class ); lock.mark(); - lock.acquireWriteLock( tx1 ); + lock.acquireWriteLock( tx1, TxTermination.NONE ); lock.mark(); assertEquals( 1, lock.getTxLockElementCount() ); @@ -55,7 +56,7 @@ public void assertReadLockDoesNotLeakMemory() throws InterruptedException final Transaction tx1 = mock( Transaction.class ); lock.mark(); - lock.acquireReadLock( tx1 ); + lock.acquireReadLock( tx1, TxTermination.NONE ); lock.mark(); assertEquals( 1, lock.getTxLockElementCount() ); diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/performance/PerformanceTestLegacyLocks.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/performance/PerformanceTestLegacyLocks.java index 638def4ec6870..1ee3f5cbd1015 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/performance/PerformanceTestLegacyLocks.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/locking/performance/PerformanceTestLegacyLocks.java @@ -23,7 +23,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; - import javax.transaction.HeuristicMixedException; import javax.transaction.HeuristicRollbackException; import javax.transaction.RollbackException; @@ -33,6 +32,7 @@ import javax.transaction.xa.XAResource; import org.neo4j.kernel.DeadlockDetectedException; +import org.neo4j.kernel.impl.api.tx.TxTermination; import org.neo4j.kernel.impl.locking.community.LockManagerImpl; import org.neo4j.kernel.impl.locking.community.RagManager; @@ -92,7 +92,7 @@ public void run() for(; currentLock[] lockMaps, WaitStrategy[] waitStrategies, @@ -95,6 +99,11 @@ public ForsetiClient( int id, } } + public void initialize( TxTermination txTermination ) + { + this.txTermination = txTermination; + } + @Override public void acquireShared( Locks.ResourceType resourceType, long... resourceIds ) throws AcquireLockTimeoutException { @@ -175,8 +184,7 @@ else if(existingLock instanceof ExclusiveLock) throw new UnsupportedOperationException( "Unknown lock type: " + existingLock ); } - // Apply the designated wait strategy - waitStrategies[resourceType.typeId()].apply( tries++ ); + applyWaitStrategy( resourceType, tries++ ); // And take note of who we are waiting for. This is used for deadlock detection. markAsWaitingFor( existingLock, resourceType, resourceId ); @@ -226,7 +234,7 @@ public void acquireExclusive( Locks.ResourceType resourceType, long... resourceI } } - waitStrategies[resourceType.typeId()].apply( tries++ ); + applyWaitStrategy( resourceType, tries++ ); markAsWaitingFor( existingLock, resourceType, resourceId ); } @@ -450,6 +458,7 @@ public void releaseAll() @Override public void close() { + txTermination = null; releaseAll(); clientPool.release( this ); } @@ -594,7 +603,7 @@ private boolean tryUpgradeToExclusiveWithShareLockHeld( // Now we just wait for all clients to release the the share lock while(sharedLock.numberOfHolders() > 1) { - waitStrategies[resourceType.typeId()].apply( tries++ ); + applyWaitStrategy( resourceType, tries++ ); markAsWaitingFor( sharedLock, resourceType, resourceId ); } @@ -656,6 +665,17 @@ public int id() return myId; } + private void applyWaitStrategy( Locks.ResourceType resourceType, int tries ) + { + WaitStrategy waitStrategy = waitStrategies[resourceType.typeId()]; + waitStrategy.apply( tries ); + + if ( txTermination.shouldBeTerminated() ) + { + throw new TransactionTerminatedException(); + } + } + // Visitors used for bulk ops on the lock maps (such as releasing all locks) /** diff --git a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/lock/forseti/ForsetiLockManager.java b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/lock/forseti/ForsetiLockManager.java index 83abddcb95d81..916e604310f8c 100644 --- a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/lock/forseti/ForsetiLockManager.java +++ b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/lock/forseti/ForsetiLockManager.java @@ -28,6 +28,7 @@ import org.neo4j.collection.pool.LinkedQueuePool; import org.neo4j.collection.pool.Pool; +import org.neo4j.kernel.impl.api.tx.TxTermination; import org.neo4j.kernel.impl.locking.AcquireLockTimeoutException; import org.neo4j.kernel.impl.locking.Locks; import org.neo4j.kernel.impl.util.collection.SimpleBitSet; @@ -142,9 +143,10 @@ public ForsetiLockManager( ResourceType... resourceTypes ) /** * Create a new client to use to grab and release locks. + * @param txTermination shows if transaction owning the client should be terminated */ @Override - public Client newClient() + public Client newClient( TxTermination txTermination ) { // We check this volatile closed flag here, which may seem like a contention overhead, but as the time // of writing we apply pooling of transactions and in extension pooling of lock clients, @@ -154,7 +156,9 @@ public Client newClient() throw new IllegalStateException( this + " already closed" ); } - return clientPool.acquire(); + ForsetiClient client = clientPool.acquire(); + client.initialize( txTermination ); + return client; } @Override diff --git a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/lock/SlaveLockManagerTest.java b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/lock/SlaveLockManagerTest.java index bd2b34a5de4e6..b4d5543cdeabd 100644 --- a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/lock/SlaveLockManagerTest.java +++ b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/lock/SlaveLockManagerTest.java @@ -27,6 +27,7 @@ import org.neo4j.kernel.AvailabilityGuard; import org.neo4j.kernel.ha.com.RequestContextFactory; import org.neo4j.kernel.ha.com.master.Master; +import org.neo4j.kernel.impl.api.tx.TxTermination; import org.neo4j.kernel.impl.locking.Locks; import org.neo4j.kernel.impl.locking.community.CommunityLockManger; @@ -69,13 +70,13 @@ public void doesNotCreateClientsAfterShutdown() throws Throwable { SlaveLockManager slaveLockManager = newSlaveLockManager( new CommunityLockManger() ); - assertNotNull( slaveLockManager.newClient() ); + assertNotNull( slaveLockManager.newClient( TxTermination.NONE ) ); slaveLockManager.shutdown(); try { - slaveLockManager.newClient(); + slaveLockManager.newClient( TxTermination.NONE ); fail( "Exception expected" ); } catch ( Exception e ) diff --git a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/lock/SlaveLocksClientConcurrentTest.java b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/lock/SlaveLocksClientConcurrentTest.java index 9fbd7bf0b719f..58e0234b0ac7d 100644 --- a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/lock/SlaveLocksClientConcurrentTest.java +++ b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/lock/SlaveLocksClientConcurrentTest.java @@ -40,6 +40,7 @@ import org.neo4j.kernel.ha.com.RequestContextFactory; import org.neo4j.kernel.ha.com.master.Master; import org.neo4j.kernel.ha.lock.forseti.ForsetiLockManager; +import org.neo4j.kernel.impl.api.tx.TxTermination; import org.neo4j.kernel.impl.locking.Locks; import org.neo4j.kernel.impl.locking.ResourceTypes; @@ -111,7 +112,7 @@ public void readersCanAcquireLockAsSoonAsItReleasedOnMaster() throws Interrupted private SlaveLocksClient createClient() { - return new SlaveLocksClient( master, lockManager.newClient(), lockManager, + return new SlaveLocksClient( master, lockManager.newClient( TxTermination.NONE ), lockManager, requestContextFactory, availabilityGuard, availabilityTimeoutMillis ); } diff --git a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/lock/SlaveLocksClientTest.java b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/lock/SlaveLocksClientTest.java index 6e89dca2ad952..508ab984dc88c 100644 --- a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/lock/SlaveLocksClientTest.java +++ b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/lock/SlaveLocksClientTest.java @@ -37,6 +37,7 @@ import org.neo4j.kernel.api.exceptions.TransactionFailureException; import org.neo4j.kernel.ha.com.RequestContextFactory; import org.neo4j.kernel.ha.com.master.Master; +import org.neo4j.kernel.impl.api.tx.TxTermination; import org.neo4j.kernel.impl.locking.Locks; import org.neo4j.kernel.impl.locking.ResourceTypes; import org.neo4j.kernel.impl.locking.community.CommunityLockManger; @@ -68,7 +69,7 @@ public void setUp() throws Exception availabilityGuard = new AvailabilityGuard( new FakeClock() ); Locks lockManager = new CommunityLockManger(); - local = spy( lockManager.newClient() ); + local = spy( lockManager.newClient( TxTermination.NONE ) ); LockResult lockResultOk = new LockResult( LockStatus.OK_LOCKED ); TransactionStreamResponse responseOk = diff --git a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/lock/forseti/ForsetiLocksCompatibility.java b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/lock/forseti/ForsetiLocksCompatibilityIT.java similarity index 93% rename from enterprise/ha/src/test/java/org/neo4j/kernel/ha/lock/forseti/ForsetiLocksCompatibility.java rename to enterprise/ha/src/test/java/org/neo4j/kernel/ha/lock/forseti/ForsetiLocksCompatibilityIT.java index c7e570d0905f3..1ff8655ad9791 100644 --- a/enterprise/ha/src/test/java/org/neo4j/kernel/ha/lock/forseti/ForsetiLocksCompatibility.java +++ b/enterprise/ha/src/test/java/org/neo4j/kernel/ha/lock/forseti/ForsetiLocksCompatibilityIT.java @@ -23,7 +23,7 @@ import org.neo4j.kernel.impl.locking.Locks; import org.neo4j.kernel.impl.locking.ResourceTypes; -public class ForsetiLocksCompatibility extends LockingCompatibilityTestSuite +public class ForsetiLocksCompatibilityIT extends LockingCompatibilityTestSuite { @Override protected Locks createLockManager()