diff --git a/community/kernel/src/main/java/org/neo4j/kernel/api/txstate/LegacyIndexTransactionState.java b/community/kernel/src/main/java/org/neo4j/kernel/api/txstate/LegacyIndexTransactionState.java index e2a666306a8fc..9666a2cac5eba 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/api/txstate/LegacyIndexTransactionState.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/api/txstate/LegacyIndexTransactionState.java @@ -34,8 +34,6 @@ */ public interface LegacyIndexTransactionState extends RecordState { - void clear(); - LegacyIndex nodeChanges( String indexName ) throws LegacyIndexNotFoundKernelException; LegacyIndex relationshipChanges( String indexName ) throws LegacyIndexNotFoundKernelException; diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/CachingLegacyIndexTransactionState.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/CachingLegacyIndexTransactionState.java index ece5ffba39b03..d3b4f493e50d1 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/CachingLegacyIndexTransactionState.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/CachingLegacyIndexTransactionState.java @@ -41,20 +41,6 @@ public CachingLegacyIndexTransactionState( LegacyIndexTransactionState txState ) this.txState = txState; } - @Override - public void clear() - { - txState.clear(); - if ( nodeLegacyIndexChanges != null && !nodeLegacyIndexChanges.isEmpty() ) - { - nodeLegacyIndexChanges.clear(); - } - if ( relationshipLegacyIndexChanges != null && !relationshipLegacyIndexChanges.isEmpty() ) - { - relationshipLegacyIndexChanges.clear(); - } - } - @Override public LegacyIndex nodeChanges( String indexName ) throws LegacyIndexNotFoundKernelException { diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/CountsRecordState.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/CountsRecordState.java index 1f66a6940ae13..cb2b541e776c8 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/CountsRecordState.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/CountsRecordState.java @@ -32,7 +32,6 @@ import org.neo4j.register.Registers; import static java.util.Objects.requireNonNull; - import static org.neo4j.kernel.api.ReadOperations.ANY_LABEL; import static org.neo4j.kernel.api.ReadOperations.ANY_RELATIONSHIP_TYPE; import static org.neo4j.kernel.impl.store.counts.keys.CountsKeyFactory.indexSampleKey; @@ -142,17 +141,6 @@ public boolean hasChanges() return !counts.isEmpty(); } - /** - * Set this counter up to a pristine state, as if it had just been initialized. - */ - public void clear() - { - if ( !counts.isEmpty() ) - { - counts.clear(); - } - } - public static final class Difference { private final CountsKey key; diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementation.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementation.java index bd05859bdfa20..a830548c692d4 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementation.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementation.java @@ -21,7 +21,6 @@ import java.util.Collection; -import org.neo4j.collection.pool.Pool; import org.neo4j.helpers.Clock; import org.neo4j.kernel.api.KernelTransaction; import org.neo4j.kernel.api.KeyReadTokenNameLookup; @@ -30,6 +29,7 @@ import org.neo4j.kernel.api.exceptions.InvalidTransactionTypeKernelException; import org.neo4j.kernel.api.exceptions.Status; import org.neo4j.kernel.api.exceptions.TransactionFailureException; +import org.neo4j.kernel.api.exceptions.TransactionHookException; import org.neo4j.kernel.api.exceptions.schema.ConstraintValidationKernelException; import org.neo4j.kernel.api.exceptions.schema.CreateConstraintFailureException; import org.neo4j.kernel.api.exceptions.schema.DropIndexFailureException; @@ -40,7 +40,6 @@ import org.neo4j.kernel.api.txstate.TxStateVisitor; import org.neo4j.kernel.impl.api.state.ConstraintIndexCreator; import org.neo4j.kernel.impl.api.state.TxState; -import org.neo4j.kernel.impl.api.store.StoreReadLayer; import org.neo4j.kernel.impl.api.store.StoreStatement; import org.neo4j.kernel.impl.locking.Locks; import org.neo4j.kernel.impl.storageengine.StorageEngine; @@ -61,12 +60,6 @@ */ public class KernelTransactionImplementation implements KernelTransaction, TxStateHolder { - /* - * IMPORTANT: - * This class is pooled and re-used. If you add *any* state to it, you *must* make sure that the #initialize() - * method resets that state for re-use. - */ - private enum TransactionType { ANY, @@ -105,36 +98,36 @@ TransactionType upgradeToSchemaTransaction() throws InvalidTransactionTypeKernel private final TransactionHooks hooks; private final ConstraintIndexCreator constraintIndexCreator; private final StatementOperationParts operations; - private final Pool pool; - // State + private final KernelTransactions kernelTransactions; + private final StorageEngine storageEngine; + private final StoreStatement storeStatement; + private final Locks.Client locks; + // For committing private final TransactionHeaderInformationFactory headerInformationFactory; private final TransactionCommitProcess commitProcess; private final TransactionMonitor transactionMonitor; - private final StoreReadLayer storeLayer; - private final StorageEngine storageEngine; private final Clock clock; + + // State private TransactionState txState; private LegacyIndexTransactionState legacyIndexTransactionState; private TransactionType transactionType = TransactionType.ANY; private TransactionHooks.TransactionHooksState hooksState; + private KernelStatement currentStatement; + private CloseListener closeListener; + private boolean beforeHookInvoked; - private Locks.Client locks; - private StoreStatement storeStatement; private boolean closing, closed; private boolean failure, success; private volatile boolean terminated; - // Some header information - private long startTimeMillis; - private long lastTransactionIdWhenStarted; - /** - * Implements reusing the same underlying {@link KernelStatement} for overlapping statements. - */ - private KernelStatement currentStatement; + + // Header information + private final long startTimeMillis; + private final long lastTransactionIdWhenStarted; + // Event tracing - private final TransactionTracer tracer; - private TransactionEvent transactionEvent; - private CloseListener closeListener; + private final TransactionEvent transactionEvent; public KernelTransactionImplementation( StatementOperationParts operations, SchemaWriteGuard schemaWriteGuard, @@ -145,10 +138,11 @@ public KernelTransactionImplementation( StatementOperationParts operations, TransactionCommitProcess commitProcess, TransactionMonitor transactionMonitor, LegacyIndexTransactionState legacyIndexTransactionState, - Pool pool, + KernelTransactions kernelTransactions, Clock clock, TransactionTracer tracer, - StorageEngine storageEngine ) + StorageEngine storageEngine, + long lastTransactionIdWhenStarted ) { this.operations = operations; this.schemaWriteGuard = schemaWriteGuard; @@ -158,30 +152,14 @@ public KernelTransactionImplementation( StatementOperationParts operations, this.headerInformationFactory = headerInformationFactory; this.commitProcess = commitProcess; this.transactionMonitor = transactionMonitor; - this.storeLayer = storageEngine.storeReadLayer(); + this.storeStatement = storageEngine.storeReadLayer().acquireStatement(); this.storageEngine = storageEngine; this.legacyIndexTransactionState = new CachingLegacyIndexTransactionState( legacyIndexTransactionState ); - this.pool = pool; + this.kernelTransactions = kernelTransactions; this.clock = clock; - this.tracer = tracer; - } - - /** - * Reset this transaction to a vanilla state, turning it into a logically new transaction. - */ - public KernelTransactionImplementation initialize( long lastCommittedTx ) - { - assert locks != null : "This transaction has been disposed off, it should not be used."; - this.closing = closed = failure = success = false; - this.transactionType = TransactionType.ANY; - this.beforeHookInvoked = false; this.startTimeMillis = clock.currentTimeMillis(); - this.lastTransactionIdWhenStarted = lastCommittedTx; + this.lastTransactionIdWhenStarted = lastTransactionIdWhenStarted; this.transactionEvent = tracer.beginTransaction(); - assert transactionEvent != null : "transactionEvent was null!"; - this.storeStatement = storeLayer.acquireStatement(); - this.closeListener = null; - return this; } @Override @@ -385,39 +363,16 @@ public void close() throws TransactionFailureException transactionEvent.setTransactionType( transactionType.name() ); transactionEvent.setReadOnly( txState == null || !txState.hasChanges() ); transactionEvent.close(); - transactionEvent = null; - legacyIndexTransactionState.clear(); - txState = null; - hooksState = null; - closeListener = null; } finally { - release(); + locks.close(); + storeStatement.close(); + kernelTransactions.transactionClosed( this ); } } } - protected void dispose() - { - if ( locks != null ) - { - locks.close(); - } - - this.locks = null; - this.transactionType = null; - this.hooksState = null; - this.txState = null; - this.legacyIndexTransactionState = null; - - if ( storeStatement != null ) - { - this.storeStatement.close(); - this.storeStatement = null; - } - } - private void commit() throws TransactionFailureException { boolean success = false; @@ -429,10 +384,11 @@ private void commit() throws TransactionFailureException { try { - if ( (hooksState = hooks.beforeCommit( txState, this, storeLayer )) != null && hooksState.failed() ) + hooksState = hooks.beforeCommit( txState, this, storageEngine.storeReadLayer() ); + if ( hooksState != null && hooksState.failed() ) { - throw new TransactionFailureException( Status.Transaction.HookFailed, hooksState.failure(), - "" ); + TransactionHookException cause = hooksState.failure(); + throw new TransactionFailureException( Status.Transaction.HookFailed, cause, "" ); } } finally @@ -522,13 +478,13 @@ private void rollback() throws TransactionFailureException @Override public void visitCreatedNode( long id ) { - storeLayer.releaseNode( id ); + storageEngine.storeReadLayer().releaseNode( id ); } @Override public void visitCreatedRelationship( long id, int type, long startNode, long endNode ) { - storeLayer.releaseRelationship( id ); + storageEngine.storeReadLayer().releaseRelationship( id ); } } ); } @@ -577,30 +533,6 @@ private void afterRollback() } } - /** - * Release resources held up by this transaction & return it to the transaction pool. - */ - private void release() - { - locks.releaseAll(); - if ( terminated ) - { - // This transaction has been externally marked for termination. - // Just dispose of this transaction and don't return it to the pool. - dispose(); - } - else - { - // Return this instance to the pool so that another transaction may use it. - pool.release( this ); - if ( storeStatement != null ) - { - storeStatement.close(); - storeStatement = null; - } - } - } - @Override public void registerCloseListener( CloseListener listener ) { diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactions.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactions.java index fa524093bae73..02bb3304b1067 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactions.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactions.java @@ -19,13 +19,11 @@ */ package org.neo4j.kernel.impl.api; -import java.util.ArrayList; -import java.util.List; +import java.util.Collections; +import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import org.neo4j.collection.pool.LinkedQueuePool; -import org.neo4j.collection.pool.MarshlandPool; import org.neo4j.function.Factory; import org.neo4j.graphdb.DatabaseShutdownException; import org.neo4j.helpers.Clock; @@ -47,15 +45,14 @@ /** * Central source of transactions in the database. *

- * This class maintains references to all transactions, a pool of passive kernel transactions, and provides - * capabilities - * for enumerating all running transactions. During normal operation, acquiring new transactions and enumerating live - * ones requires no synchronization (although the live list is not guaranteed to be exact). + * This class maintains references to all running transactions and provides capabilities for enumerating them. + * During normal operation, acquiring new transactions and enumerating live ones requires same amount of + * synchronization as {@link ConcurrentHashMap} provides for insertions and iterations. + *

+ * Live list is not guaranteed to be exact. */ public class KernelTransactions extends LifecycleAdapter implements Factory { - // Transaction dependencies - private final Locks locks; private final ConstraintIndexCreator constraintIndexCreator; private final StatementOperationParts statementOperations; @@ -70,21 +67,7 @@ public class KernelTransactions extends LifecycleAdapter implements Factory - * This data structure is *only* updated when brand-new transactions are created, or when transactions are disposed - * of. During normal operation (where all transactions come from and are returned to the pool), this will be left - * in peace, working solely as a collection of references to all transaction objects (idle and active) in the - * database. - *

- * As such, it provides a good mechanism for listing all transactions without requiring synchronization when - * starting and committing transactions. - */ - private final Set allTransactions = newSetFromMap( - new ConcurrentHashMap<>() ); + private final Set allTransactions = newSetFromMap( new ConcurrentHashMap<>() ); public KernelTransactions( Locks locks, ConstraintIndexCreator constraintIndexCreator, @@ -115,89 +98,61 @@ public KernelTransactions( Locks locks, this.storageEngine = storageEngine; } - /** - * This is the factory that actually builds brand-new instances. - */ - private final Factory factory = new Factory() - { - @Override - public KernelTransactionImplementation newInstance() - { - Locks.Client locksClient = locks.newClient(); - LegacyIndexTransactionState legacyIndexTransactionState = - new LegacyIndexTransactionStateImpl( indexConfigStore, legacyIndexProviderLookup ); - KernelTransactionImplementation tx = new KernelTransactionImplementation( - statementOperations, schemaWriteGuard, - locksClient, hooks, constraintIndexCreator, transactionHeaderInformationFactory, - transactionCommitProcess, transactionMonitor, legacyIndexTransactionState, - localTxPool, Clock.SYSTEM_CLOCK, tracers.transactionTracer, storageEngine ); - - allTransactions.add( tx ); - - return tx; - } - }; - @Override public KernelTransaction newInstance() { assertDatabaseIsRunning(); - return localTxPool.acquire().initialize( storageEngine.metaDataStore().getLastCommittedTransactionId() ); + + Locks.Client locksClient = locks.newClient(); + LegacyIndexTransactionState legacyIndexTransactionState = + new LegacyIndexTransactionStateImpl( indexConfigStore, legacyIndexProviderLookup ); + + long lastTransactionIdWhenStarted = storageEngine.metaDataStore().getLastCommittedTransactionId(); + + KernelTransactionImplementation tx = new KernelTransactionImplementation( statementOperations, schemaWriteGuard, + locksClient, hooks, constraintIndexCreator, transactionHeaderInformationFactory, + transactionCommitProcess, transactionMonitor, legacyIndexTransactionState, + this, Clock.SYSTEM_CLOCK, tracers.transactionTracer, storageEngine, lastTransactionIdWhenStarted ); + + allTransactions.add( tx ); + + return tx; } /** - * Global pool of transactions, wrapped by the thread-local marshland pool and so is not used directly. + * Signals that given transaction is closed and should be removed from the set of running transactions. + * + * @param tx the closed transaction. + * @throws IllegalStateException if given transaction is not in the set of active transactions. */ - private final LinkedQueuePool globalTxPool - = new LinkedQueuePool( 8, factory ) + public void transactionClosed( KernelTransaction tx ) { - @Override - protected void dispose( KernelTransactionImplementation tx ) + boolean removed = allTransactions.remove( tx ); + if ( !removed ) { - allTransactions.remove( tx ); - tx.dispose(); - super.dispose( tx ); + throw new IllegalStateException( "Transaction: " + tx + " is not present in the " + + "set of known active transactions: " + allTransactions ); } - }; + } /** - * Give an approximate list of all transactions currently running. This is not guaranteed to be exact, as - * transactions may stop and start while this list is gathered. + * Give an approximate set of all transactions currently running. + * This is not guaranteed to be exact, as transactions may stop and start while this set is gathered. + * + * @return the set of open transactions. */ - public List activeTransactions() + public Set activeTransactions() { - List output = new ArrayList<>(); - for ( KernelTransactionImplementation tx : allTransactions ) - { - if ( tx.isOpen() ) - { - output.add( tx ); - } - } - - return output; + return Collections.unmodifiableSet( new HashSet<>( allTransactions ) ); } /** - * Pool of unused transactions. - */ - private final MarshlandPool localTxPool = new MarshlandPool<>( globalTxPool ); - - /** - * Dispose of all pooled transactions. This is done on shutdown or on internal events (like an HA mode switch) that + * Dispose of all active transactions. This is done on shutdown or on internal events (like an HA mode switch) that * require transactions to be re-created. */ public void disposeAll() { - for ( KernelTransactionImplementation tx : allTransactions ) - { - // we mark all transactions for termination since we want to make sure these transactions - // won't be reused, ever. Each transaction has, among other things, a Locks.Client and we - // certainly want to keep that from being reused from this point. - tx.markForTermination(); - } - localTxPool.disposeAll(); - globalTxPool.disposeAll(); + allTransactions.forEach( KernelTransaction::markForTermination ); } @Override diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/state/LegacyIndexTransactionStateImpl.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/state/LegacyIndexTransactionStateImpl.java index 40fb398c50c4e..ccb4cdcee1a05 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/state/LegacyIndexTransactionStateImpl.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/state/LegacyIndexTransactionStateImpl.java @@ -241,23 +241,4 @@ public boolean hasChanges() { return defineCommand != null; } - - /** Set this data structure to it's initial state, allowing it to be re-used as if it had just been new'ed up. */ - @Override - public void clear() - { - if ( !transactions.isEmpty() ) - { - transactions.clear(); - } - defineCommand = null; - if ( !nodeCommands.isEmpty() ) - { - nodeCommands.clear(); - } - if ( !relationshipCommands.isEmpty() ) - { - relationshipCommands.clear(); - } - } } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/state/RelationshipChangesForNode.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/state/RelationshipChangesForNode.java index de0965150e963..993627e2c7656 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/state/RelationshipChangesForNode.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/state/RelationshipChangesForNode.java @@ -25,10 +25,8 @@ import java.util.Map; import java.util.Set; -import org.neo4j.collection.primitive.Primitive; import org.neo4j.collection.primitive.PrimitiveIntCollections; import org.neo4j.collection.primitive.PrimitiveIntIterator; -import org.neo4j.collection.primitive.PrimitiveIntSet; import org.neo4j.function.Function; import org.neo4j.graphdb.Direction; import org.neo4j.helpers.collection.PrefetchingIterator; @@ -135,7 +133,6 @@ abstract RelationshipIterator augmentPrimitiveIterator( RelationshipIterator ori private Map> outgoing; private Map> incoming; private Map> loops; - private PrimitiveIntSet typesChanged; private int totalOutgoing = 0; private int totalIncoming = 0; @@ -150,7 +147,6 @@ public RelationshipChangesForNode( DiffStrategy diffStrategy, RelationshipVisito public void addRelationship( long relId, int typeId, Direction direction ) { Map> relTypeToRelsMap = getTypeToRelMapForDirection( direction ); - typeChanged( typeId ); Set rels = relTypeToRelsMap.get( typeId ); if ( rels == null ) { @@ -174,19 +170,9 @@ public void addRelationship( long relId, int typeId, Direction direction ) } } - private void typeChanged( int type ) - { - if ( typesChanged == null ) - { - typesChanged = Primitive.intSet(); - } - typesChanged.add( type ); - } - public boolean removeRelationship( long relId, int typeId, Direction direction ) { Map> relTypeToRelsMap = getTypeToRelMapForDirection( direction ); - typeChanged( typeId ); Set rels = relTypeToRelsMap.get( typeId ); if ( rels != null ) { diff --git a/community/kernel/src/test/java/org/neo4j/graphdb/GraphDatabaseServiceTest.java b/community/kernel/src/test/java/org/neo4j/graphdb/GraphDatabaseServiceTest.java index 5a4d5ed1e60aa..b3de822b9b081 100644 --- a/community/kernel/src/test/java/org/neo4j/graphdb/GraphDatabaseServiceTest.java +++ b/community/kernel/src/test/java/org/neo4j/graphdb/GraphDatabaseServiceTest.java @@ -20,7 +20,6 @@ package org.neo4j.graphdb; import org.hamcrest.CoreMatchers; -import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -36,7 +35,10 @@ import org.neo4j.test.OtherThreadExecutor.WorkerCommand; import org.neo4j.test.TestGraphDatabaseFactory; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.iterableWithSize; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; public class GraphDatabaseServiceTest @@ -58,8 +60,7 @@ public void givenShutdownDatabaseWhenBeginTxThenExceptionIsThrown() throws Excep catch ( Exception e ) { // Then - Assert.assertThat( e.getClass().getName(), CoreMatchers.equalTo( TransactionFailureException.class - .getName() ) ); + assertThat( e.getClass().getName(), CoreMatchers.equalTo( TransactionFailureException.class.getName() ) ); } } @@ -245,7 +246,7 @@ public void run() } } - Assert.assertThat( result.get().getClass(), CoreMatchers.equalTo( TransactionFailureException.class ) ); + assertThat( result.get().getClass(), CoreMatchers.equalTo( TransactionFailureException.class ) ); } @Test @@ -299,6 +300,35 @@ public void shouldLetDetectedDeadlocksDuringCommitBeThrownInTheirOriginalForm() } } + /** + * GitHub issue #5996 + */ + @Test + public void terminationOfClosedTransactionDoesNotInfluenceNextTransaction() + { + GraphDatabaseService db = cleanup.add( new TestGraphDatabaseFactory().newImpermanentDatabase() ); + + try ( Transaction tx = db.beginTx() ) + { + db.createNode(); + tx.success(); + } + + Transaction transaction = db.beginTx(); + try ( Transaction tx = transaction ) + { + db.createNode(); + tx.success(); + } + transaction.terminate(); + + try ( Transaction tx = db.beginTx() ) + { + assertThat( db.getAllNodes(), is( iterableWithSize( 2 ) ) ); + tx.success(); + } + } + private WorkerCommand beginTx( final GraphDatabaseService db ) { return new WorkerCommand() diff --git a/community/kernel/src/test/java/org/neo4j/kernel/api/KernelTransactionFactory.java b/community/kernel/src/test/java/org/neo4j/kernel/api/KernelTransactionFactory.java index 6e940a5ce95dc..eaa23b233f7cf 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/api/KernelTransactionFactory.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/api/KernelTransactionFactory.java @@ -19,16 +19,17 @@ */ package org.neo4j.kernel.api; -import org.neo4j.collection.pool.Pool; import org.neo4j.helpers.Clock; import org.neo4j.kernel.api.txstate.LegacyIndexTransactionState; import org.neo4j.kernel.impl.api.KernelTransactionImplementation; +import org.neo4j.kernel.impl.api.KernelTransactions; import org.neo4j.kernel.impl.api.SchemaWriteGuard; import org.neo4j.kernel.impl.api.StatementOperationParts; import org.neo4j.kernel.impl.api.TransactionHeaderInformation; import org.neo4j.kernel.impl.api.TransactionHooks; import org.neo4j.kernel.impl.api.TransactionRepresentationCommitProcess; import org.neo4j.kernel.impl.api.state.ConstraintIndexCreator; +import org.neo4j.kernel.impl.api.store.StoreReadLayer; import org.neo4j.kernel.impl.storageengine.StorageEngine; import org.neo4j.kernel.impl.store.NeoStores; import org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory; @@ -46,17 +47,22 @@ static KernelTransaction kernelTransaction() TransactionHeaderInformationFactory headerInformationFactory = mock( TransactionHeaderInformationFactory.class ); when( headerInformationFactory.create() ).thenReturn( headerInformation ); + long lastTransactionIdWhenStarted = 0; + StorageEngine storageEngine = mock( StorageEngine.class ); when( storageEngine.neoStores() ).thenReturn( mock( NeoStores.class ) ); + when( storageEngine.storeReadLayer() ).thenReturn( mock( StoreReadLayer.class ) ); + return new KernelTransactionImplementation( mock( StatementOperationParts.class ), mock( SchemaWriteGuard.class ), null, new TransactionHooks(), mock( ConstraintIndexCreator.class ), headerInformationFactory, mock( TransactionRepresentationCommitProcess.class ), mock( TransactionMonitor.class ), mock( LegacyIndexTransactionState.class ), - mock(Pool.class), + mock( KernelTransactions.class ), Clock.SYSTEM_CLOCK, TransactionTracer.NULL, - storageEngine ); + storageEngine, + lastTransactionIdWhenStarted ); } } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/api/KernelTransactionImplementationTest.java b/community/kernel/src/test/java/org/neo4j/kernel/api/KernelTransactionImplementationTest.java index a3743d944032f..0f2cc6aecc7c0 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/api/KernelTransactionImplementationTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/api/KernelTransactionImplementationTest.java @@ -28,7 +28,6 @@ import java.util.Arrays; import java.util.Collection; -import org.neo4j.collection.pool.Pool; import org.neo4j.function.Consumers; import org.neo4j.function.ThrowingConsumer; import org.neo4j.helpers.FakeClock; @@ -37,6 +36,7 @@ import org.neo4j.kernel.api.txstate.TransactionState; import org.neo4j.kernel.impl.api.KernelStatement; import org.neo4j.kernel.impl.api.KernelTransactionImplementation; +import org.neo4j.kernel.impl.api.KernelTransactions; import org.neo4j.kernel.impl.api.StatementOperationParts; import org.neo4j.kernel.impl.api.TransactionApplicationMode; import org.neo4j.kernel.impl.api.TransactionCommitProcess; @@ -59,6 +59,7 @@ import org.neo4j.kernel.impl.transaction.tracing.TransactionTracer; import org.neo4j.test.DoubleLatch; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; @@ -67,11 +68,8 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; -import static java.util.concurrent.TimeUnit.MILLISECONDS; - @RunWith( Parameterized.class ) public class KernelTransactionImplementationTest { @@ -385,10 +383,8 @@ public void shouldUseStartTimeAndTxIdFromWhenStartingTxAsHeader() throws Excepti any( StoreStatement.class ), anyLong() ) ).thenReturn( sillyCommandList() ); - try ( KernelTransactionImplementation transaction = newTransaction() ) + try ( KernelTransactionImplementation transaction = newTransaction( 5 ) ) { - transaction.initialize( 5L ); - // WHEN committing it at a later point clock.forward( 5, MILLISECONDS ); // ...and simulating some other transaction being committed @@ -402,27 +398,35 @@ public void shouldUseStartTimeAndTxIdFromWhenStartingTxAsHeader() throws Excepti assertEquals( startingTime + 5, commitProcess.transaction.getTimeCommitted() ); } - private void verifyExtraInteractionWithTheMonitor( TransactionMonitor transactionMonitor, boolean isWriteTx ) + @Test + public void successfulTxShouldNotifyKernelTransactionsThatItIsClosed() throws TransactionFailureException { - if ( isWriteTx ) - { - verify( this.transactionMonitor, times( 1 ) ).upgradeToWriteTransaction(); - } - verifyNoMoreInteractions( transactionMonitor ); + KernelTransactionImplementation tx = newTransaction(); + + tx.success(); + tx.close(); + + verify( kernelTransactions ).transactionClosed( tx ); } @Test - public void shouldNotReturnTransactionInstanceWithTerminationMarkToPool() throws Exception + public void failedTxShouldNotifyKernelTransactionsThatItIsClosed() throws TransactionFailureException { - // GIVEN - KernelTransactionImplementation transaction = newTransaction(); + KernelTransactionImplementation tx = newTransaction(); - // WHEN - transaction.markForTermination(); - transaction.close(); + tx.failure(); + tx.close(); - // THEN - verifyZeroInteractions( pool ); + verify( kernelTransactions ).transactionClosed( tx ); + } + + private void verifyExtraInteractionWithTheMonitor( TransactionMonitor transactionMonitor, boolean isWriteTx ) + { + if ( isWriteTx ) + { + verify( this.transactionMonitor, times( 1 ) ).upgradeToWriteTransaction(); + } + verifyNoMoreInteractions( transactionMonitor ); } private final StorageEngine storageEngine = mock( StorageEngine.class ); @@ -437,7 +441,7 @@ public void shouldNotReturnTransactionInstanceWithTerminationMarkToPool() throws private final TransactionHeaderInformationFactory headerInformationFactory = mock( TransactionHeaderInformationFactory.class ); private final FakeClock clock = new FakeClock(); - private final Pool pool = mock( Pool.class ); + private final KernelTransactions kernelTransactions = mock( KernelTransactions.class ); @Before public void before() @@ -454,12 +458,14 @@ public void before() private KernelTransactionImplementation newTransaction() { + return newTransaction( 0 ); + } - KernelTransactionImplementation transaction = new KernelTransactionImplementation( - null, null, new NoOpClient(), hooks, null, headerInformationFactory, commitProcess, transactionMonitor, - legacyIndexState, pool, clock, TransactionTracer.NULL, storageEngine ); - transaction.initialize( 0 ); - return transaction; + private KernelTransactionImplementation newTransaction( long lastTransactionIdWhenStarted ) + { + return new KernelTransactionImplementation( null, null, new NoOpClient(), hooks, null, headerInformationFactory, + commitProcess, transactionMonitor, legacyIndexState, kernelTransactions, clock, TransactionTracer.NULL, + storageEngine, lastTransactionIdWhenStarted ); } public class CapturingCommitProcess implements TransactionCommitProcess diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionsTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionsTest.java index 33e21ea0afa66..a5786668ce68d 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionsTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionsTest.java @@ -31,6 +31,7 @@ import org.neo4j.kernel.impl.api.store.StoreReadLayer; import org.neo4j.kernel.impl.api.store.StoreStatement; import org.neo4j.kernel.impl.locking.Locks; +import org.neo4j.kernel.impl.locking.ReentrantLockService; import org.neo4j.kernel.impl.storageengine.StorageEngine; import org.neo4j.kernel.impl.store.MetaDataStore; import org.neo4j.kernel.impl.store.NeoStores; @@ -45,16 +46,18 @@ import org.neo4j.logging.NullLog; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.not; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.neo4j.helpers.collection.IteratorUtil.asSet; -import static org.neo4j.helpers.collection.IteratorUtil.asUniqueSet; public class KernelTransactionsTest { @@ -72,7 +75,7 @@ public void shouldListActiveTransactions() throws Exception first.close(); // Then - assertThat( asUniqueSet( registry.activeTransactions() ), equalTo( asSet( second, third ) ) ); + assertThat( registry.activeTransactions(), equalTo( asSet( second, third ) ) ); } @Test @@ -122,6 +125,67 @@ public void shouldIncludeRandomBytesInAdditionalHeader() throws Exception assertTrue( additionalHeader.length > 0 ); } + @Test + public void transactionCloseRemovesTxFromActiveTransactions() throws Exception + { + KernelTransactions kernelTransactions = newKernelTransactions(); + + KernelTransaction tx1 = kernelTransactions.newInstance(); + KernelTransaction tx2 = kernelTransactions.newInstance(); + KernelTransaction tx3 = kernelTransactions.newInstance(); + + kernelTransactions.transactionClosed( tx1 ); + kernelTransactions.transactionClosed( tx3 ); + + assertEquals( asSet( tx2 ), kernelTransactions.activeTransactions() ); + } + + @Test + public void transactionRemovesItselfFromActiveTransactions() throws Exception + { + KernelTransactions kernelTransactions = newKernelTransactions(); + + KernelTransaction tx1 = kernelTransactions.newInstance(); + KernelTransaction tx2 = kernelTransactions.newInstance(); + KernelTransaction tx3 = kernelTransactions.newInstance(); + + tx2.close(); + + assertEquals( asSet( tx1, tx3 ), kernelTransactions.activeTransactions() ); + } + + @Test + public void exceptionIsThrownWhenUnknownTxIsClosed() throws Exception + { + KernelTransactions kernelTransactions = newKernelTransactions(); + + try + { + kernelTransactions.transactionClosed( mock( KernelTransactionImplementation.class ) ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( IllegalStateException.class ) ); + } + } + + @Test + public void disposeAllMarksAllTransactionsForTermination() throws Exception + { + KernelTransactions kernelTransactions = newKernelTransactions(); + + KernelTransaction tx1 = kernelTransactions.newInstance(); + KernelTransaction tx2 = kernelTransactions.newInstance(); + KernelTransaction tx3 = kernelTransactions.newInstance(); + + kernelTransactions.disposeAll(); + + assertTrue( tx1.shouldBeTerminated() ); + assertTrue( tx2.shouldBeTerminated() ); + assertTrue( tx3.shouldBeTerminated() ); + } + private static KernelTransactions newKernelTransactions() throws Exception { return newKernelTransactions( mock( TransactionCommitProcess.class ) ); @@ -135,11 +199,14 @@ private static KernelTransactions newKernelTransactions( TransactionCommitProces Locks locks = mock( Locks.class ); when( locks.newClient() ).thenReturn( mock( Locks.Client.class ) ); - StoreReadLayer readLayer = mock( StoreReadLayer.class ); MetaDataStore metaDataStore = mock( MetaDataStore.class ); IntegrityValidator integrityValidator = mock( IntegrityValidator.class ); NeoStores neoStores = mock( NeoStores.class ); + StoreStatement storeStatement = new StoreStatement( neoStores, new ReentrantLockService() ); + StoreReadLayer readLayer = mock( StoreReadLayer.class ); + when( readLayer.acquireStatement() ).thenReturn( storeStatement ); + StorageEngine storageEngine = mock( StorageEngine.class ); when( storageEngine.storeReadLayer() ).thenReturn( readLayer ); when( storageEngine.neoStores() ).thenReturn( neoStores );