From 77353739bf544313b354be18473f0ebf5448b11d Mon Sep 17 00:00:00 2001 From: lutovich Date: Wed, 9 Dec 2015 12:22:56 +0100 Subject: [PATCH] Removed KTI pooling Currently KernelTransactionImplementations are pooled and reused. This is done with MarshlandPool that has ThreadLocals backed by some other pool. So each thread that executed transactions has a KTI in it's thread local storage and reuses it. Such reuse of KTI instance with their internal state has caused quite some issues because restoring KTIs to the pristine state before putting them back to the pool is tricky. This commit removed pooling of KTIs. So whenever new transaction is started new KTI object is created. Reasons: - biggest reason to pool KTIs is transaction state but it is completely reinitialized when KTI is taken from the pool - recent storage engine API refactoring made KTI state even smaller making even less things added to the pool - code simplification - removed need to reinitialize KTI state Tested: - microbenchmarks via Core API show 30% degradation for tiny read transactions (read couple nodes by id without labels and properties, etc.) and no degradation for read/write transactions of reasonable size - microbenchmarks via Core API show same heap usage and GC - benchmarking read/write Cypher queries show no performance degradation - small LDBC read/write queries shows no performance degradation on a machine with 32 hardware threads - long running soak test shows no throughput degradation, no excessive heap usage and GC activity --- .../txstate/LegacyIndexTransactionState.java | 2 - .../CachingLegacyIndexTransactionState.java | 14 -- .../kernel/impl/api/CountsRecordState.java | 12 -- .../api/KernelTransactionImplementation.java | 132 +++++------------- .../kernel/impl/api/KernelTransactions.java | 127 ++++++----------- .../LegacyIndexTransactionStateImpl.java | 19 --- .../api/state/RelationshipChangesForNode.java | 14 -- .../graphdb/GraphDatabaseServiceTest.java | 38 ++++- .../kernel/api/KernelTransactionFactory.java | 12 +- .../KernelTransactionImplementationTest.java | 60 ++++---- .../impl/api/KernelTransactionsTest.java | 73 +++++++++- 11 files changed, 219 insertions(+), 284 deletions(-) diff --git a/community/kernel/src/main/java/org/neo4j/kernel/api/txstate/LegacyIndexTransactionState.java b/community/kernel/src/main/java/org/neo4j/kernel/api/txstate/LegacyIndexTransactionState.java index e2a666306a8fc..9666a2cac5eba 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/api/txstate/LegacyIndexTransactionState.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/api/txstate/LegacyIndexTransactionState.java @@ -34,8 +34,6 @@ */ public interface LegacyIndexTransactionState extends RecordState { - void clear(); - LegacyIndex nodeChanges( String indexName ) throws LegacyIndexNotFoundKernelException; LegacyIndex relationshipChanges( String indexName ) throws LegacyIndexNotFoundKernelException; diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/CachingLegacyIndexTransactionState.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/CachingLegacyIndexTransactionState.java index ece5ffba39b03..d3b4f493e50d1 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/CachingLegacyIndexTransactionState.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/CachingLegacyIndexTransactionState.java @@ -41,20 +41,6 @@ public CachingLegacyIndexTransactionState( LegacyIndexTransactionState txState ) this.txState = txState; } - @Override - public void clear() - { - txState.clear(); - if ( nodeLegacyIndexChanges != null && !nodeLegacyIndexChanges.isEmpty() ) - { - nodeLegacyIndexChanges.clear(); - } - if ( relationshipLegacyIndexChanges != null && !relationshipLegacyIndexChanges.isEmpty() ) - { - relationshipLegacyIndexChanges.clear(); - } - } - @Override public LegacyIndex nodeChanges( String indexName ) throws LegacyIndexNotFoundKernelException { diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/CountsRecordState.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/CountsRecordState.java index 1f66a6940ae13..cb2b541e776c8 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/CountsRecordState.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/CountsRecordState.java @@ -32,7 +32,6 @@ import org.neo4j.register.Registers; import static java.util.Objects.requireNonNull; - import static org.neo4j.kernel.api.ReadOperations.ANY_LABEL; import static org.neo4j.kernel.api.ReadOperations.ANY_RELATIONSHIP_TYPE; import static org.neo4j.kernel.impl.store.counts.keys.CountsKeyFactory.indexSampleKey; @@ -142,17 +141,6 @@ public boolean hasChanges() return !counts.isEmpty(); } - /** - * Set this counter up to a pristine state, as if it had just been initialized. - */ - public void clear() - { - if ( !counts.isEmpty() ) - { - counts.clear(); - } - } - public static final class Difference { private final CountsKey key; diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementation.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementation.java index bd05859bdfa20..a830548c692d4 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementation.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementation.java @@ -21,7 +21,6 @@ import java.util.Collection; -import org.neo4j.collection.pool.Pool; import org.neo4j.helpers.Clock; import org.neo4j.kernel.api.KernelTransaction; import org.neo4j.kernel.api.KeyReadTokenNameLookup; @@ -30,6 +29,7 @@ import org.neo4j.kernel.api.exceptions.InvalidTransactionTypeKernelException; import org.neo4j.kernel.api.exceptions.Status; import org.neo4j.kernel.api.exceptions.TransactionFailureException; +import org.neo4j.kernel.api.exceptions.TransactionHookException; import org.neo4j.kernel.api.exceptions.schema.ConstraintValidationKernelException; import org.neo4j.kernel.api.exceptions.schema.CreateConstraintFailureException; import org.neo4j.kernel.api.exceptions.schema.DropIndexFailureException; @@ -40,7 +40,6 @@ import org.neo4j.kernel.api.txstate.TxStateVisitor; import org.neo4j.kernel.impl.api.state.ConstraintIndexCreator; import org.neo4j.kernel.impl.api.state.TxState; -import org.neo4j.kernel.impl.api.store.StoreReadLayer; import org.neo4j.kernel.impl.api.store.StoreStatement; import org.neo4j.kernel.impl.locking.Locks; import org.neo4j.kernel.impl.storageengine.StorageEngine; @@ -61,12 +60,6 @@ */ public class KernelTransactionImplementation implements KernelTransaction, TxStateHolder { - /* - * IMPORTANT: - * This class is pooled and re-used. If you add *any* state to it, you *must* make sure that the #initialize() - * method resets that state for re-use. - */ - private enum TransactionType { ANY, @@ -105,36 +98,36 @@ TransactionType upgradeToSchemaTransaction() throws InvalidTransactionTypeKernel private final TransactionHooks hooks; private final ConstraintIndexCreator constraintIndexCreator; private final StatementOperationParts operations; - private final Pool pool; - // State + private final KernelTransactions kernelTransactions; + private final StorageEngine storageEngine; + private final StoreStatement storeStatement; + private final Locks.Client locks; + // For committing private final TransactionHeaderInformationFactory headerInformationFactory; private final TransactionCommitProcess commitProcess; private final TransactionMonitor transactionMonitor; - private final StoreReadLayer storeLayer; - private final StorageEngine storageEngine; private final Clock clock; + + // State private TransactionState txState; private LegacyIndexTransactionState legacyIndexTransactionState; private TransactionType transactionType = TransactionType.ANY; private TransactionHooks.TransactionHooksState hooksState; + private KernelStatement currentStatement; + private CloseListener closeListener; + private boolean beforeHookInvoked; - private Locks.Client locks; - private StoreStatement storeStatement; private boolean closing, closed; private boolean failure, success; private volatile boolean terminated; - // Some header information - private long startTimeMillis; - private long lastTransactionIdWhenStarted; - /** - * Implements reusing the same underlying {@link KernelStatement} for overlapping statements. - */ - private KernelStatement currentStatement; + + // Header information + private final long startTimeMillis; + private final long lastTransactionIdWhenStarted; + // Event tracing - private final TransactionTracer tracer; - private TransactionEvent transactionEvent; - private CloseListener closeListener; + private final TransactionEvent transactionEvent; public KernelTransactionImplementation( StatementOperationParts operations, SchemaWriteGuard schemaWriteGuard, @@ -145,10 +138,11 @@ public KernelTransactionImplementation( StatementOperationParts operations, TransactionCommitProcess commitProcess, TransactionMonitor transactionMonitor, LegacyIndexTransactionState legacyIndexTransactionState, - Pool pool, + KernelTransactions kernelTransactions, Clock clock, TransactionTracer tracer, - StorageEngine storageEngine ) + StorageEngine storageEngine, + long lastTransactionIdWhenStarted ) { this.operations = operations; this.schemaWriteGuard = schemaWriteGuard; @@ -158,30 +152,14 @@ public KernelTransactionImplementation( StatementOperationParts operations, this.headerInformationFactory = headerInformationFactory; this.commitProcess = commitProcess; this.transactionMonitor = transactionMonitor; - this.storeLayer = storageEngine.storeReadLayer(); + this.storeStatement = storageEngine.storeReadLayer().acquireStatement(); this.storageEngine = storageEngine; this.legacyIndexTransactionState = new CachingLegacyIndexTransactionState( legacyIndexTransactionState ); - this.pool = pool; + this.kernelTransactions = kernelTransactions; this.clock = clock; - this.tracer = tracer; - } - - /** - * Reset this transaction to a vanilla state, turning it into a logically new transaction. - */ - public KernelTransactionImplementation initialize( long lastCommittedTx ) - { - assert locks != null : "This transaction has been disposed off, it should not be used."; - this.closing = closed = failure = success = false; - this.transactionType = TransactionType.ANY; - this.beforeHookInvoked = false; this.startTimeMillis = clock.currentTimeMillis(); - this.lastTransactionIdWhenStarted = lastCommittedTx; + this.lastTransactionIdWhenStarted = lastTransactionIdWhenStarted; this.transactionEvent = tracer.beginTransaction(); - assert transactionEvent != null : "transactionEvent was null!"; - this.storeStatement = storeLayer.acquireStatement(); - this.closeListener = null; - return this; } @Override @@ -385,39 +363,16 @@ public void close() throws TransactionFailureException transactionEvent.setTransactionType( transactionType.name() ); transactionEvent.setReadOnly( txState == null || !txState.hasChanges() ); transactionEvent.close(); - transactionEvent = null; - legacyIndexTransactionState.clear(); - txState = null; - hooksState = null; - closeListener = null; } finally { - release(); + locks.close(); + storeStatement.close(); + kernelTransactions.transactionClosed( this ); } } } - protected void dispose() - { - if ( locks != null ) - { - locks.close(); - } - - this.locks = null; - this.transactionType = null; - this.hooksState = null; - this.txState = null; - this.legacyIndexTransactionState = null; - - if ( storeStatement != null ) - { - this.storeStatement.close(); - this.storeStatement = null; - } - } - private void commit() throws TransactionFailureException { boolean success = false; @@ -429,10 +384,11 @@ private void commit() throws TransactionFailureException { try { - if ( (hooksState = hooks.beforeCommit( txState, this, storeLayer )) != null && hooksState.failed() ) + hooksState = hooks.beforeCommit( txState, this, storageEngine.storeReadLayer() ); + if ( hooksState != null && hooksState.failed() ) { - throw new TransactionFailureException( Status.Transaction.HookFailed, hooksState.failure(), - "" ); + TransactionHookException cause = hooksState.failure(); + throw new TransactionFailureException( Status.Transaction.HookFailed, cause, "" ); } } finally @@ -522,13 +478,13 @@ private void rollback() throws TransactionFailureException @Override public void visitCreatedNode( long id ) { - storeLayer.releaseNode( id ); + storageEngine.storeReadLayer().releaseNode( id ); } @Override public void visitCreatedRelationship( long id, int type, long startNode, long endNode ) { - storeLayer.releaseRelationship( id ); + storageEngine.storeReadLayer().releaseRelationship( id ); } } ); } @@ -577,30 +533,6 @@ private void afterRollback() } } - /** - * Release resources held up by this transaction & return it to the transaction pool. - */ - private void release() - { - locks.releaseAll(); - if ( terminated ) - { - // This transaction has been externally marked for termination. - // Just dispose of this transaction and don't return it to the pool. - dispose(); - } - else - { - // Return this instance to the pool so that another transaction may use it. - pool.release( this ); - if ( storeStatement != null ) - { - storeStatement.close(); - storeStatement = null; - } - } - } - @Override public void registerCloseListener( CloseListener listener ) { diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactions.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactions.java index fa524093bae73..02bb3304b1067 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactions.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactions.java @@ -19,13 +19,11 @@ */ package org.neo4j.kernel.impl.api; -import java.util.ArrayList; -import java.util.List; +import java.util.Collections; +import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import org.neo4j.collection.pool.LinkedQueuePool; -import org.neo4j.collection.pool.MarshlandPool; import org.neo4j.function.Factory; import org.neo4j.graphdb.DatabaseShutdownException; import org.neo4j.helpers.Clock; @@ -47,15 +45,14 @@ /** * Central source of transactions in the database. *

- * This class maintains references to all transactions, a pool of passive kernel transactions, and provides - * capabilities - * for enumerating all running transactions. During normal operation, acquiring new transactions and enumerating live - * ones requires no synchronization (although the live list is not guaranteed to be exact). + * This class maintains references to all running transactions and provides capabilities for enumerating them. + * During normal operation, acquiring new transactions and enumerating live ones requires same amount of + * synchronization as {@link ConcurrentHashMap} provides for insertions and iterations. + *

+ * Live list is not guaranteed to be exact. */ public class KernelTransactions extends LifecycleAdapter implements Factory { - // Transaction dependencies - private final Locks locks; private final ConstraintIndexCreator constraintIndexCreator; private final StatementOperationParts statementOperations; @@ -70,21 +67,7 @@ public class KernelTransactions extends LifecycleAdapter implements Factory - * This data structure is *only* updated when brand-new transactions are created, or when transactions are disposed - * of. During normal operation (where all transactions come from and are returned to the pool), this will be left - * in peace, working solely as a collection of references to all transaction objects (idle and active) in the - * database. - *

- * As such, it provides a good mechanism for listing all transactions without requiring synchronization when - * starting and committing transactions. - */ - private final Set allTransactions = newSetFromMap( - new ConcurrentHashMap<>() ); + private final Set allTransactions = newSetFromMap( new ConcurrentHashMap<>() ); public KernelTransactions( Locks locks, ConstraintIndexCreator constraintIndexCreator, @@ -115,89 +98,61 @@ public KernelTransactions( Locks locks, this.storageEngine = storageEngine; } - /** - * This is the factory that actually builds brand-new instances. - */ - private final Factory factory = new Factory() - { - @Override - public KernelTransactionImplementation newInstance() - { - Locks.Client locksClient = locks.newClient(); - LegacyIndexTransactionState legacyIndexTransactionState = - new LegacyIndexTransactionStateImpl( indexConfigStore, legacyIndexProviderLookup ); - KernelTransactionImplementation tx = new KernelTransactionImplementation( - statementOperations, schemaWriteGuard, - locksClient, hooks, constraintIndexCreator, transactionHeaderInformationFactory, - transactionCommitProcess, transactionMonitor, legacyIndexTransactionState, - localTxPool, Clock.SYSTEM_CLOCK, tracers.transactionTracer, storageEngine ); - - allTransactions.add( tx ); - - return tx; - } - }; - @Override public KernelTransaction newInstance() { assertDatabaseIsRunning(); - return localTxPool.acquire().initialize( storageEngine.metaDataStore().getLastCommittedTransactionId() ); + + Locks.Client locksClient = locks.newClient(); + LegacyIndexTransactionState legacyIndexTransactionState = + new LegacyIndexTransactionStateImpl( indexConfigStore, legacyIndexProviderLookup ); + + long lastTransactionIdWhenStarted = storageEngine.metaDataStore().getLastCommittedTransactionId(); + + KernelTransactionImplementation tx = new KernelTransactionImplementation( statementOperations, schemaWriteGuard, + locksClient, hooks, constraintIndexCreator, transactionHeaderInformationFactory, + transactionCommitProcess, transactionMonitor, legacyIndexTransactionState, + this, Clock.SYSTEM_CLOCK, tracers.transactionTracer, storageEngine, lastTransactionIdWhenStarted ); + + allTransactions.add( tx ); + + return tx; } /** - * Global pool of transactions, wrapped by the thread-local marshland pool and so is not used directly. + * Signals that given transaction is closed and should be removed from the set of running transactions. + * + * @param tx the closed transaction. + * @throws IllegalStateException if given transaction is not in the set of active transactions. */ - private final LinkedQueuePool globalTxPool - = new LinkedQueuePool( 8, factory ) + public void transactionClosed( KernelTransaction tx ) { - @Override - protected void dispose( KernelTransactionImplementation tx ) + boolean removed = allTransactions.remove( tx ); + if ( !removed ) { - allTransactions.remove( tx ); - tx.dispose(); - super.dispose( tx ); + throw new IllegalStateException( "Transaction: " + tx + " is not present in the " + + "set of known active transactions: " + allTransactions ); } - }; + } /** - * Give an approximate list of all transactions currently running. This is not guaranteed to be exact, as - * transactions may stop and start while this list is gathered. + * Give an approximate set of all transactions currently running. + * This is not guaranteed to be exact, as transactions may stop and start while this set is gathered. + * + * @return the set of open transactions. */ - public List activeTransactions() + public Set activeTransactions() { - List output = new ArrayList<>(); - for ( KernelTransactionImplementation tx : allTransactions ) - { - if ( tx.isOpen() ) - { - output.add( tx ); - } - } - - return output; + return Collections.unmodifiableSet( new HashSet<>( allTransactions ) ); } /** - * Pool of unused transactions. - */ - private final MarshlandPool localTxPool = new MarshlandPool<>( globalTxPool ); - - /** - * Dispose of all pooled transactions. This is done on shutdown or on internal events (like an HA mode switch) that + * Dispose of all active transactions. This is done on shutdown or on internal events (like an HA mode switch) that * require transactions to be re-created. */ public void disposeAll() { - for ( KernelTransactionImplementation tx : allTransactions ) - { - // we mark all transactions for termination since we want to make sure these transactions - // won't be reused, ever. Each transaction has, among other things, a Locks.Client and we - // certainly want to keep that from being reused from this point. - tx.markForTermination(); - } - localTxPool.disposeAll(); - globalTxPool.disposeAll(); + allTransactions.forEach( KernelTransaction::markForTermination ); } @Override diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/state/LegacyIndexTransactionStateImpl.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/state/LegacyIndexTransactionStateImpl.java index 40fb398c50c4e..ccb4cdcee1a05 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/state/LegacyIndexTransactionStateImpl.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/state/LegacyIndexTransactionStateImpl.java @@ -241,23 +241,4 @@ public boolean hasChanges() { return defineCommand != null; } - - /** Set this data structure to it's initial state, allowing it to be re-used as if it had just been new'ed up. */ - @Override - public void clear() - { - if ( !transactions.isEmpty() ) - { - transactions.clear(); - } - defineCommand = null; - if ( !nodeCommands.isEmpty() ) - { - nodeCommands.clear(); - } - if ( !relationshipCommands.isEmpty() ) - { - relationshipCommands.clear(); - } - } } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/state/RelationshipChangesForNode.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/state/RelationshipChangesForNode.java index de0965150e963..993627e2c7656 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/state/RelationshipChangesForNode.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/state/RelationshipChangesForNode.java @@ -25,10 +25,8 @@ import java.util.Map; import java.util.Set; -import org.neo4j.collection.primitive.Primitive; import org.neo4j.collection.primitive.PrimitiveIntCollections; import org.neo4j.collection.primitive.PrimitiveIntIterator; -import org.neo4j.collection.primitive.PrimitiveIntSet; import org.neo4j.function.Function; import org.neo4j.graphdb.Direction; import org.neo4j.helpers.collection.PrefetchingIterator; @@ -135,7 +133,6 @@ abstract RelationshipIterator augmentPrimitiveIterator( RelationshipIterator ori private Map> outgoing; private Map> incoming; private Map> loops; - private PrimitiveIntSet typesChanged; private int totalOutgoing = 0; private int totalIncoming = 0; @@ -150,7 +147,6 @@ public RelationshipChangesForNode( DiffStrategy diffStrategy, RelationshipVisito public void addRelationship( long relId, int typeId, Direction direction ) { Map> relTypeToRelsMap = getTypeToRelMapForDirection( direction ); - typeChanged( typeId ); Set rels = relTypeToRelsMap.get( typeId ); if ( rels == null ) { @@ -174,19 +170,9 @@ public void addRelationship( long relId, int typeId, Direction direction ) } } - private void typeChanged( int type ) - { - if ( typesChanged == null ) - { - typesChanged = Primitive.intSet(); - } - typesChanged.add( type ); - } - public boolean removeRelationship( long relId, int typeId, Direction direction ) { Map> relTypeToRelsMap = getTypeToRelMapForDirection( direction ); - typeChanged( typeId ); Set rels = relTypeToRelsMap.get( typeId ); if ( rels != null ) { diff --git a/community/kernel/src/test/java/org/neo4j/graphdb/GraphDatabaseServiceTest.java b/community/kernel/src/test/java/org/neo4j/graphdb/GraphDatabaseServiceTest.java index 5a4d5ed1e60aa..b3de822b9b081 100644 --- a/community/kernel/src/test/java/org/neo4j/graphdb/GraphDatabaseServiceTest.java +++ b/community/kernel/src/test/java/org/neo4j/graphdb/GraphDatabaseServiceTest.java @@ -20,7 +20,6 @@ package org.neo4j.graphdb; import org.hamcrest.CoreMatchers; -import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -36,7 +35,10 @@ import org.neo4j.test.OtherThreadExecutor.WorkerCommand; import org.neo4j.test.TestGraphDatabaseFactory; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.iterableWithSize; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; public class GraphDatabaseServiceTest @@ -58,8 +60,7 @@ public void givenShutdownDatabaseWhenBeginTxThenExceptionIsThrown() throws Excep catch ( Exception e ) { // Then - Assert.assertThat( e.getClass().getName(), CoreMatchers.equalTo( TransactionFailureException.class - .getName() ) ); + assertThat( e.getClass().getName(), CoreMatchers.equalTo( TransactionFailureException.class.getName() ) ); } } @@ -245,7 +246,7 @@ public void run() } } - Assert.assertThat( result.get().getClass(), CoreMatchers.equalTo( TransactionFailureException.class ) ); + assertThat( result.get().getClass(), CoreMatchers.equalTo( TransactionFailureException.class ) ); } @Test @@ -299,6 +300,35 @@ public void shouldLetDetectedDeadlocksDuringCommitBeThrownInTheirOriginalForm() } } + /** + * GitHub issue #5996 + */ + @Test + public void terminationOfClosedTransactionDoesNotInfluenceNextTransaction() + { + GraphDatabaseService db = cleanup.add( new TestGraphDatabaseFactory().newImpermanentDatabase() ); + + try ( Transaction tx = db.beginTx() ) + { + db.createNode(); + tx.success(); + } + + Transaction transaction = db.beginTx(); + try ( Transaction tx = transaction ) + { + db.createNode(); + tx.success(); + } + transaction.terminate(); + + try ( Transaction tx = db.beginTx() ) + { + assertThat( db.getAllNodes(), is( iterableWithSize( 2 ) ) ); + tx.success(); + } + } + private WorkerCommand beginTx( final GraphDatabaseService db ) { return new WorkerCommand() diff --git a/community/kernel/src/test/java/org/neo4j/kernel/api/KernelTransactionFactory.java b/community/kernel/src/test/java/org/neo4j/kernel/api/KernelTransactionFactory.java index 6e940a5ce95dc..eaa23b233f7cf 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/api/KernelTransactionFactory.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/api/KernelTransactionFactory.java @@ -19,16 +19,17 @@ */ package org.neo4j.kernel.api; -import org.neo4j.collection.pool.Pool; import org.neo4j.helpers.Clock; import org.neo4j.kernel.api.txstate.LegacyIndexTransactionState; import org.neo4j.kernel.impl.api.KernelTransactionImplementation; +import org.neo4j.kernel.impl.api.KernelTransactions; import org.neo4j.kernel.impl.api.SchemaWriteGuard; import org.neo4j.kernel.impl.api.StatementOperationParts; import org.neo4j.kernel.impl.api.TransactionHeaderInformation; import org.neo4j.kernel.impl.api.TransactionHooks; import org.neo4j.kernel.impl.api.TransactionRepresentationCommitProcess; import org.neo4j.kernel.impl.api.state.ConstraintIndexCreator; +import org.neo4j.kernel.impl.api.store.StoreReadLayer; import org.neo4j.kernel.impl.storageengine.StorageEngine; import org.neo4j.kernel.impl.store.NeoStores; import org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory; @@ -46,17 +47,22 @@ static KernelTransaction kernelTransaction() TransactionHeaderInformationFactory headerInformationFactory = mock( TransactionHeaderInformationFactory.class ); when( headerInformationFactory.create() ).thenReturn( headerInformation ); + long lastTransactionIdWhenStarted = 0; + StorageEngine storageEngine = mock( StorageEngine.class ); when( storageEngine.neoStores() ).thenReturn( mock( NeoStores.class ) ); + when( storageEngine.storeReadLayer() ).thenReturn( mock( StoreReadLayer.class ) ); + return new KernelTransactionImplementation( mock( StatementOperationParts.class ), mock( SchemaWriteGuard.class ), null, new TransactionHooks(), mock( ConstraintIndexCreator.class ), headerInformationFactory, mock( TransactionRepresentationCommitProcess.class ), mock( TransactionMonitor.class ), mock( LegacyIndexTransactionState.class ), - mock(Pool.class), + mock( KernelTransactions.class ), Clock.SYSTEM_CLOCK, TransactionTracer.NULL, - storageEngine ); + storageEngine, + lastTransactionIdWhenStarted ); } } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/api/KernelTransactionImplementationTest.java b/community/kernel/src/test/java/org/neo4j/kernel/api/KernelTransactionImplementationTest.java index a3743d944032f..0f2cc6aecc7c0 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/api/KernelTransactionImplementationTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/api/KernelTransactionImplementationTest.java @@ -28,7 +28,6 @@ import java.util.Arrays; import java.util.Collection; -import org.neo4j.collection.pool.Pool; import org.neo4j.function.Consumers; import org.neo4j.function.ThrowingConsumer; import org.neo4j.helpers.FakeClock; @@ -37,6 +36,7 @@ import org.neo4j.kernel.api.txstate.TransactionState; import org.neo4j.kernel.impl.api.KernelStatement; import org.neo4j.kernel.impl.api.KernelTransactionImplementation; +import org.neo4j.kernel.impl.api.KernelTransactions; import org.neo4j.kernel.impl.api.StatementOperationParts; import org.neo4j.kernel.impl.api.TransactionApplicationMode; import org.neo4j.kernel.impl.api.TransactionCommitProcess; @@ -59,6 +59,7 @@ import org.neo4j.kernel.impl.transaction.tracing.TransactionTracer; import org.neo4j.test.DoubleLatch; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; @@ -67,11 +68,8 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; -import static java.util.concurrent.TimeUnit.MILLISECONDS; - @RunWith( Parameterized.class ) public class KernelTransactionImplementationTest { @@ -385,10 +383,8 @@ public void shouldUseStartTimeAndTxIdFromWhenStartingTxAsHeader() throws Excepti any( StoreStatement.class ), anyLong() ) ).thenReturn( sillyCommandList() ); - try ( KernelTransactionImplementation transaction = newTransaction() ) + try ( KernelTransactionImplementation transaction = newTransaction( 5 ) ) { - transaction.initialize( 5L ); - // WHEN committing it at a later point clock.forward( 5, MILLISECONDS ); // ...and simulating some other transaction being committed @@ -402,27 +398,35 @@ public void shouldUseStartTimeAndTxIdFromWhenStartingTxAsHeader() throws Excepti assertEquals( startingTime + 5, commitProcess.transaction.getTimeCommitted() ); } - private void verifyExtraInteractionWithTheMonitor( TransactionMonitor transactionMonitor, boolean isWriteTx ) + @Test + public void successfulTxShouldNotifyKernelTransactionsThatItIsClosed() throws TransactionFailureException { - if ( isWriteTx ) - { - verify( this.transactionMonitor, times( 1 ) ).upgradeToWriteTransaction(); - } - verifyNoMoreInteractions( transactionMonitor ); + KernelTransactionImplementation tx = newTransaction(); + + tx.success(); + tx.close(); + + verify( kernelTransactions ).transactionClosed( tx ); } @Test - public void shouldNotReturnTransactionInstanceWithTerminationMarkToPool() throws Exception + public void failedTxShouldNotifyKernelTransactionsThatItIsClosed() throws TransactionFailureException { - // GIVEN - KernelTransactionImplementation transaction = newTransaction(); + KernelTransactionImplementation tx = newTransaction(); - // WHEN - transaction.markForTermination(); - transaction.close(); + tx.failure(); + tx.close(); - // THEN - verifyZeroInteractions( pool ); + verify( kernelTransactions ).transactionClosed( tx ); + } + + private void verifyExtraInteractionWithTheMonitor( TransactionMonitor transactionMonitor, boolean isWriteTx ) + { + if ( isWriteTx ) + { + verify( this.transactionMonitor, times( 1 ) ).upgradeToWriteTransaction(); + } + verifyNoMoreInteractions( transactionMonitor ); } private final StorageEngine storageEngine = mock( StorageEngine.class ); @@ -437,7 +441,7 @@ public void shouldNotReturnTransactionInstanceWithTerminationMarkToPool() throws private final TransactionHeaderInformationFactory headerInformationFactory = mock( TransactionHeaderInformationFactory.class ); private final FakeClock clock = new FakeClock(); - private final Pool pool = mock( Pool.class ); + private final KernelTransactions kernelTransactions = mock( KernelTransactions.class ); @Before public void before() @@ -454,12 +458,14 @@ public void before() private KernelTransactionImplementation newTransaction() { + return newTransaction( 0 ); + } - KernelTransactionImplementation transaction = new KernelTransactionImplementation( - null, null, new NoOpClient(), hooks, null, headerInformationFactory, commitProcess, transactionMonitor, - legacyIndexState, pool, clock, TransactionTracer.NULL, storageEngine ); - transaction.initialize( 0 ); - return transaction; + private KernelTransactionImplementation newTransaction( long lastTransactionIdWhenStarted ) + { + return new KernelTransactionImplementation( null, null, new NoOpClient(), hooks, null, headerInformationFactory, + commitProcess, transactionMonitor, legacyIndexState, kernelTransactions, clock, TransactionTracer.NULL, + storageEngine, lastTransactionIdWhenStarted ); } public class CapturingCommitProcess implements TransactionCommitProcess diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionsTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionsTest.java index 33e21ea0afa66..a5786668ce68d 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionsTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionsTest.java @@ -31,6 +31,7 @@ import org.neo4j.kernel.impl.api.store.StoreReadLayer; import org.neo4j.kernel.impl.api.store.StoreStatement; import org.neo4j.kernel.impl.locking.Locks; +import org.neo4j.kernel.impl.locking.ReentrantLockService; import org.neo4j.kernel.impl.storageengine.StorageEngine; import org.neo4j.kernel.impl.store.MetaDataStore; import org.neo4j.kernel.impl.store.NeoStores; @@ -45,16 +46,18 @@ import org.neo4j.logging.NullLog; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.not; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.neo4j.helpers.collection.IteratorUtil.asSet; -import static org.neo4j.helpers.collection.IteratorUtil.asUniqueSet; public class KernelTransactionsTest { @@ -72,7 +75,7 @@ public void shouldListActiveTransactions() throws Exception first.close(); // Then - assertThat( asUniqueSet( registry.activeTransactions() ), equalTo( asSet( second, third ) ) ); + assertThat( registry.activeTransactions(), equalTo( asSet( second, third ) ) ); } @Test @@ -122,6 +125,67 @@ public void shouldIncludeRandomBytesInAdditionalHeader() throws Exception assertTrue( additionalHeader.length > 0 ); } + @Test + public void transactionCloseRemovesTxFromActiveTransactions() throws Exception + { + KernelTransactions kernelTransactions = newKernelTransactions(); + + KernelTransaction tx1 = kernelTransactions.newInstance(); + KernelTransaction tx2 = kernelTransactions.newInstance(); + KernelTransaction tx3 = kernelTransactions.newInstance(); + + kernelTransactions.transactionClosed( tx1 ); + kernelTransactions.transactionClosed( tx3 ); + + assertEquals( asSet( tx2 ), kernelTransactions.activeTransactions() ); + } + + @Test + public void transactionRemovesItselfFromActiveTransactions() throws Exception + { + KernelTransactions kernelTransactions = newKernelTransactions(); + + KernelTransaction tx1 = kernelTransactions.newInstance(); + KernelTransaction tx2 = kernelTransactions.newInstance(); + KernelTransaction tx3 = kernelTransactions.newInstance(); + + tx2.close(); + + assertEquals( asSet( tx1, tx3 ), kernelTransactions.activeTransactions() ); + } + + @Test + public void exceptionIsThrownWhenUnknownTxIsClosed() throws Exception + { + KernelTransactions kernelTransactions = newKernelTransactions(); + + try + { + kernelTransactions.transactionClosed( mock( KernelTransactionImplementation.class ) ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( IllegalStateException.class ) ); + } + } + + @Test + public void disposeAllMarksAllTransactionsForTermination() throws Exception + { + KernelTransactions kernelTransactions = newKernelTransactions(); + + KernelTransaction tx1 = kernelTransactions.newInstance(); + KernelTransaction tx2 = kernelTransactions.newInstance(); + KernelTransaction tx3 = kernelTransactions.newInstance(); + + kernelTransactions.disposeAll(); + + assertTrue( tx1.shouldBeTerminated() ); + assertTrue( tx2.shouldBeTerminated() ); + assertTrue( tx3.shouldBeTerminated() ); + } + private static KernelTransactions newKernelTransactions() throws Exception { return newKernelTransactions( mock( TransactionCommitProcess.class ) ); @@ -135,11 +199,14 @@ private static KernelTransactions newKernelTransactions( TransactionCommitProces Locks locks = mock( Locks.class ); when( locks.newClient() ).thenReturn( mock( Locks.Client.class ) ); - StoreReadLayer readLayer = mock( StoreReadLayer.class ); MetaDataStore metaDataStore = mock( MetaDataStore.class ); IntegrityValidator integrityValidator = mock( IntegrityValidator.class ); NeoStores neoStores = mock( NeoStores.class ); + StoreStatement storeStatement = new StoreStatement( neoStores, new ReentrantLockService() ); + StoreReadLayer readLayer = mock( StoreReadLayer.class ); + when( readLayer.acquireStatement() ).thenReturn( storeStatement ); + StorageEngine storageEngine = mock( StorageEngine.class ); when( storageEngine.storeReadLayer() ).thenReturn( readLayer ); when( storageEngine.neoStores() ).thenReturn( neoStores );