From 3321f9729f5047c654da5b49810043efa4088aea Mon Sep 17 00:00:00 2001 From: Mattias Persson Date: Thu, 11 Feb 2016 11:53:42 +0100 Subject: [PATCH] Revert "Removed KTI pooling" This reverts commit 77353739bf544313b354be18473f0ebf5448b11d. At least parts of it. KTI objects are rather heavy weight and there's a cost to registering each transaction instance in the shared "all transactions" collection --- .../kernel/impl/api/CountsRecordState.java | 1 + .../api/KernelTransactionImplementation.java | 102 ++++++++++----- .../kernel/impl/api/KernelTransactions.java | 119 ++++++++++++------ .../kernel/api/KernelTransactionFactory.java | 17 ++- .../KernelTransactionImplementationTest.java | 13 +- .../kernel/api/KernelTransactionTestBase.java | 11 +- .../impl/api/KernelTransactionsTest.java | 38 +++--- 7 files changed, 192 insertions(+), 109 deletions(-) diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/CountsRecordState.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/CountsRecordState.java index a86bc13e2e5c7..d118a13ca5c80 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/CountsRecordState.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/CountsRecordState.java @@ -33,6 +33,7 @@ import org.neo4j.register.Registers; import static java.util.Objects.requireNonNull; + import static org.neo4j.kernel.api.ReadOperations.ANY_LABEL; import static org.neo4j.kernel.api.ReadOperations.ANY_RELATIONSHIP_TYPE; import static org.neo4j.kernel.impl.store.counts.keys.CountsKeyFactory.indexSampleKey; diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementation.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementation.java index 4a5cca83b6dd2..6ef9621a9fff9 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementation.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactionImplementation.java @@ -21,7 +21,9 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.function.Supplier; +import org.neo4j.collection.pool.Pool; import org.neo4j.helpers.Clock; import org.neo4j.kernel.api.AccessMode; import org.neo4j.kernel.api.KernelTransaction; @@ -36,13 +38,13 @@ import org.neo4j.kernel.api.exceptions.schema.CreateConstraintFailureException; import org.neo4j.kernel.api.exceptions.schema.DropIndexFailureException; import org.neo4j.kernel.api.index.IndexDescriptor; -import org.neo4j.kernel.impl.proc.Procedures; import org.neo4j.kernel.api.txstate.LegacyIndexTransactionState; import org.neo4j.kernel.api.txstate.TransactionState; import org.neo4j.kernel.api.txstate.TxStateHolder; import org.neo4j.kernel.impl.api.state.ConstraintIndexCreator; import org.neo4j.kernel.impl.api.state.TxState; import org.neo4j.kernel.impl.locking.Locks; +import org.neo4j.kernel.impl.proc.Procedures; import org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory; import org.neo4j.kernel.impl.transaction.TransactionMonitor; import org.neo4j.kernel.impl.transaction.log.PhysicalTransactionRepresentation; @@ -51,6 +53,7 @@ import org.neo4j.kernel.impl.transaction.tracing.TransactionTracer; import org.neo4j.storageengine.api.StorageCommand; import org.neo4j.storageengine.api.StorageEngine; +import org.neo4j.storageengine.api.StoreReadLayer; import org.neo4j.storageengine.api.txstate.TxStateVisitor; import static org.neo4j.storageengine.api.TransactionApplicationMode.INTERNAL; @@ -62,6 +65,13 @@ */ public class KernelTransactionImplementation implements KernelTransaction, TxStateHolder { + /* + * IMPORTANT: + * This class is pooled and re-used. If you add *any* state to it, you *must* make sure that: + * - the #initialize() method resets that state for re-use + * - the #release() method releases resources acquired in #initialize() or during the transaction's life time + */ + private enum TransactionType { READ, @@ -100,69 +110,80 @@ TransactionType upgradeToSchemaTransaction() throws InvalidTransactionTypeKernel private final TransactionHooks hooks; private final ConstraintIndexCreator constraintIndexCreator; private final StatementOperationParts operations; - private final KernelTransactions kernelTransactions; private final StorageEngine storageEngine; - private final Locks.Client locks; private final Procedures procedures; + private final TransactionTracer tracer; + private final Pool pool; + private final Supplier legacyIndexTxStateSupplier; // For committing private final TransactionHeaderInformationFactory headerInformationFactory; private final TransactionCommitProcess commitProcess; private final TransactionMonitor transactionMonitor; + private final StoreReadLayer storeLayer; private final Clock clock; - // State + // State that needs to be reset between uses. Most of these should be cleared or released in #release(), + // whereas others, such as timestamp or txId when transaction starts, even locks, needs to be set in #initialize(). private TransactionState txState; - private final LegacyIndexTransactionState legacyIndexTransactionState; - private TransactionType transactionType = TransactionType.READ; // Tracks current state of transaction, which will upgrade to WRITE or SCHEMA mode when necessary + private LegacyIndexTransactionState legacyIndexTransactionState; + private TransactionType transactionType; // Tracks current state of transaction, which will upgrade to WRITE or SCHEMA mode when necessary private TransactionHooks.TransactionHooksState hooksState; private KernelStatement currentStatement; private CloseListener closeListener; - private AccessMode accessMode = AccessMode.FULL; // Defines whether a transaction is allowed to upgrade to WRITE or SCHEMA mode - + private AccessMode accessMode; // Defines whether a transaction is allowed to upgrade to WRITE or SCHEMA mode + private Locks.Client locks; private boolean beforeHookInvoked; private boolean closing, closed; private boolean failure, success; private volatile boolean terminated; - - // Header information - private final long startTimeMillis; - private final long lastTransactionIdWhenStarted; - - // Event tracing - private final TransactionEvent transactionEvent; + private long startTimeMillis; + private long lastTransactionIdWhenStarted; + private TransactionEvent transactionEvent; public KernelTransactionImplementation( StatementOperationParts operations, SchemaWriteGuard schemaWriteGuard, - Locks.Client locks, TransactionHooks hooks, ConstraintIndexCreator constraintIndexCreator, Procedures procedures, TransactionHeaderInformationFactory headerInformationFactory, TransactionCommitProcess commitProcess, TransactionMonitor transactionMonitor, - LegacyIndexTransactionState legacyIndexTransactionState, - KernelTransactions kernelTransactions, + Supplier legacyIndexTxStateSupplier, + Pool pool, Clock clock, TransactionTracer tracer, - StorageEngine storageEngine, - long lastTransactionIdWhenStarted ) + StorageEngine storageEngine ) { this.operations = operations; this.schemaWriteGuard = schemaWriteGuard; this.hooks = hooks; - this.locks = locks; this.constraintIndexCreator = constraintIndexCreator; this.procedures = procedures; this.headerInformationFactory = headerInformationFactory; this.commitProcess = commitProcess; this.transactionMonitor = transactionMonitor; + this.storeLayer = storageEngine.storeReadLayer(); this.storageEngine = storageEngine; - this.legacyIndexTransactionState = new CachingLegacyIndexTransactionState( legacyIndexTransactionState ); - this.kernelTransactions = kernelTransactions; + this.legacyIndexTxStateSupplier = legacyIndexTxStateSupplier; + this.pool = pool; this.clock = clock; + this.tracer = tracer; + } + + /** + * Reset this transaction to a vanilla state, turning it into a logically new transaction. + */ + public KernelTransactionImplementation initialize( long lastCommittedTx, Locks.Client locks ) + { + this.locks = locks; + this.closing = closed = failure = success = terminated = beforeHookInvoked = false; + this.transactionType = TransactionType.READ; this.startTimeMillis = clock.currentTimeMillis(); - this.lastTransactionIdWhenStarted = lastTransactionIdWhenStarted; + this.lastTransactionIdWhenStarted = lastCommittedTx; this.transactionEvent = tracer.beginTransaction(); + assert transactionEvent != null : "transactionEvent was null!"; + this.accessMode = AccessMode.FULL; + return this; } @Override @@ -294,7 +315,8 @@ public TransactionState txState() @Override public LegacyIndexTransactionState legacyIndexTxState() { - return legacyIndexTransactionState; + return legacyIndexTransactionState != null ? legacyIndexTransactionState : + (legacyIndexTransactionState = legacyIndexTxStateSupplier.get()); } @Override @@ -341,7 +363,12 @@ private void assertTransactionOpen() private boolean hasChanges() { - return hasTxStateWithChanges() || legacyIndexTransactionState.hasChanges(); + return hasTxStateWithChanges() || hasLegacyIndexChanges(); + } + + private boolean hasLegacyIndexChanges() + { + return legacyIndexTransactionState != null && legacyIndexTransactionState.hasChanges(); } private boolean hasDataChanges() @@ -390,8 +417,7 @@ public void close() throws TransactionFailureException } finally { - locks.close(); - kernelTransactions.transactionClosed( this ); + release(); } } } @@ -430,7 +456,7 @@ private void commit() throws TransactionFailureException txState, locks, lastTransactionIdWhenStarted ); - if ( legacyIndexTransactionState.hasChanges() ) + if ( hasLegacyIndexChanges() ) { legacyIndexTransactionState.extractCommands( extractedCommands ); } @@ -504,13 +530,13 @@ private void rollback() throws TransactionFailureException @Override public void visitCreatedNode( long id ) { - storageEngine.storeReadLayer().releaseNode( id ); + storeLayer.releaseNode( id ); } @Override public void visitCreatedRelationship( long id, int type, long startNode, long endNode ) { - storageEngine.storeReadLayer().releaseRelationship( id ); + storeLayer.releaseRelationship( id ); } } ); } @@ -559,6 +585,20 @@ private void afterRollback() } } + /** + * Release resources held up by this transaction & return it to the transaction pool. + */ + private void release() + { + locks.close(); + transactionEvent = null; + legacyIndexTransactionState = null; + txState = null; + hooksState = null; + closeListener = null; + pool.release( this ); + } + @Override public void registerCloseListener( CloseListener listener ) { diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactions.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactions.java index 3950b59ab00ee..28fcf1bfad3ee 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactions.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactions.java @@ -19,11 +19,13 @@ */ package org.neo4j.kernel.impl.api; -import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; +import org.neo4j.collection.pool.LinkedQueuePool; +import org.neo4j.collection.pool.MarshlandPool; import org.neo4j.function.Factory; import org.neo4j.graphdb.DatabaseShutdownException; import org.neo4j.helpers.Clock; @@ -47,22 +49,21 @@ /** * Central source of transactions in the database. *

- * This class maintains references to all running transactions and provides capabilities for enumerating them. - * During normal operation, acquiring new transactions and enumerating live ones requires same amount of - * synchronization as {@link ConcurrentHashMap} provides for insertions and iterations. - *

- * Live list is not guaranteed to be exact. + * This class maintains references to all transactions, a pool of passive kernel transactions, and provides + * capabilities + * for enumerating all running transactions. During normal operation, acquiring new transactions and enumerating live + * ones requires no synchronization (although the live list is not guaranteed to be exact). */ public class KernelTransactions extends LifecycleAdapter implements Factory { + // Transaction dependencies + private final Locks locks; private final ConstraintIndexCreator constraintIndexCreator; private final StatementOperationParts statementOperations; private final SchemaWriteGuard schemaWriteGuard; private final TransactionHeaderInformationFactory transactionHeaderInformationFactory; private final TransactionCommitProcess transactionCommitProcess; - private final IndexConfigStore indexConfigStore; - private final LegacyIndexProviderLookup legacyIndexProviderLookup; private final TransactionHooks hooks; private final TransactionMonitor transactionMonitor; private final LifeSupport dataSourceLife; @@ -70,8 +71,23 @@ public class KernelTransactions extends LifecycleAdapter implements Factory legacyIndexTxStateSupplier; - private final Set allTransactions = newSetFromMap( new ConcurrentHashMap<>() ); + // End Tx Dependencies + + /** + * Used to enumerate all transactions in the system, active and idle ones. + *

+ * This data structure is *only* updated when brand-new transactions are created, or when transactions are disposed + * of. During normal operation (where all transactions come from and are returned to the pool), this will be left + * in peace, working solely as a collection of references to all transaction objects (idle and active) in the + * database. + *

+ * As such, it provides a good mechanism for listing all transactions without requiring synchronization when + * starting and committing transactions. + */ + private final Set allTransactions = newSetFromMap( + new ConcurrentHashMap<>() ); public KernelTransactions( Locks locks, ConstraintIndexCreator constraintIndexCreator, @@ -95,8 +111,6 @@ public KernelTransactions( Locks locks, this.schemaWriteGuard = schemaWriteGuard; this.transactionHeaderInformationFactory = txHeaderFactory; this.transactionCommitProcess = transactionCommitProcess; - this.indexConfigStore = indexConfigStore; - this.legacyIndexProviderLookup = legacyIndexProviderLookup; this.hooks = hooks; this.transactionMonitor = transactionMonitor; this.dataSourceLife = dataSourceLife; @@ -104,44 +118,49 @@ public KernelTransactions( Locks locks, this.storageEngine = storageEngine; this.procedures = procedures; this.transactionIdStore = transactionIdStore; + this.legacyIndexTxStateSupplier = () -> new CachingLegacyIndexTransactionState( + new LegacyIndexTransactionStateImpl( indexConfigStore, legacyIndexProviderLookup ) ); } + /** + * This is the factory that actually builds brand-new instances. + */ + private final Factory factory = new Factory() + { + @Override + public KernelTransactionImplementation newInstance() + { + KernelTransactionImplementation tx = new KernelTransactionImplementation( + statementOperations, schemaWriteGuard, + hooks, constraintIndexCreator, procedures, transactionHeaderInformationFactory, + transactionCommitProcess, transactionMonitor, legacyIndexTxStateSupplier, + localTxPool, Clock.SYSTEM_CLOCK, tracers.transactionTracer, storageEngine ); + + allTransactions.add( tx ); + return tx; + } + }; + @Override public KernelTransaction newInstance() { assertDatabaseIsRunning(); - - Locks.Client locksClient = locks.newClient(); - LegacyIndexTransactionState legacyIndexTransactionState = - new LegacyIndexTransactionStateImpl( indexConfigStore, legacyIndexProviderLookup ); - - long lastTransactionIdWhenStarted = transactionIdStore.getLastCommittedTransactionId(); - - KernelTransactionImplementation tx = new KernelTransactionImplementation( statementOperations, schemaWriteGuard, - locksClient, hooks, constraintIndexCreator, procedures, transactionHeaderInformationFactory, - transactionCommitProcess, transactionMonitor, legacyIndexTransactionState, - this, Clock.SYSTEM_CLOCK, tracers.transactionTracer, storageEngine, lastTransactionIdWhenStarted ); - - allTransactions.add( tx ); - - return tx; + return localTxPool.acquire().initialize( transactionIdStore.getLastCommittedTransactionId(), locks.newClient() ); } /** - * Signals that given transaction is closed and should be removed from the set of running transactions. - * - * @param tx the closed transaction. - * @throws IllegalStateException if given transaction is not in the set of active transactions. + * Global pool of transactions, wrapped by the thread-local marshland pool and so is not used directly. */ - public void transactionClosed( KernelTransaction tx ) + private final LinkedQueuePool globalTxPool + = new LinkedQueuePool( 8, factory ) { - boolean removed = allTransactions.remove( tx ); - if ( !removed ) + @Override + protected void dispose( KernelTransactionImplementation tx ) { - throw new IllegalStateException( "Transaction: " + tx + " is not present in the " + - "set of known active transactions: " + allTransactions ); + allTransactions.remove( tx ); + super.dispose( tx ); } - } + }; /** * Give an approximate set of all transactions currently running. @@ -151,16 +170,38 @@ public void transactionClosed( KernelTransaction tx ) */ public Set activeTransactions() { - return Collections.unmodifiableSet( new HashSet<>( allTransactions ) ); + Set output = new HashSet<>(); + for ( KernelTransactionImplementation tx : allTransactions ) + { + if ( tx.isOpen() ) + { + output.add( tx ); + } + } + + return output; } /** - * Dispose of all active transactions. This is done on shutdown or on internal events (like an HA mode switch) that + * Pool of unused transactions. + */ + private final MarshlandPool localTxPool = new MarshlandPool<>( globalTxPool ); + + /** + * Dispose of all pooled transactions. This is done on shutdown or on internal events (like an HA mode switch) that * require transactions to be re-created. */ public void disposeAll() { - allTransactions.forEach( KernelTransaction::markForTermination ); + for ( KernelTransactionImplementation tx : allTransactions ) + { + // we mark all transactions for termination since we want to make sure these transactions + // won't be reused, ever. Each transaction has, among other things, a Locks.Client and we + // certainly want to keep that from being reused from this point. + tx.markForTermination(); + } + localTxPool.disposeAll(); + globalTxPool.disposeAll(); } @Override diff --git a/community/kernel/src/test/java/org/neo4j/kernel/api/KernelTransactionFactory.java b/community/kernel/src/test/java/org/neo4j/kernel/api/KernelTransactionFactory.java index 088521cf85086..b1b0afe167116 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/api/KernelTransactionFactory.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/api/KernelTransactionFactory.java @@ -19,16 +19,18 @@ */ package org.neo4j.kernel.api; +import org.neo4j.collection.pool.Pool; +import java.util.function.Supplier; + import org.neo4j.helpers.Clock; -import org.neo4j.kernel.api.txstate.LegacyIndexTransactionState; import org.neo4j.kernel.impl.api.KernelTransactionImplementation; -import org.neo4j.kernel.impl.api.KernelTransactions; import org.neo4j.kernel.impl.api.SchemaWriteGuard; import org.neo4j.kernel.impl.api.StatementOperationParts; import org.neo4j.kernel.impl.api.TransactionHeaderInformation; import org.neo4j.kernel.impl.api.TransactionHooks; import org.neo4j.kernel.impl.api.TransactionRepresentationCommitProcess; import org.neo4j.kernel.impl.api.state.ConstraintIndexCreator; +import org.neo4j.kernel.impl.locking.NoOpClient; import org.neo4j.kernel.impl.proc.Procedures; import org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory; import org.neo4j.kernel.impl.transaction.TransactionMonitor; @@ -48,8 +50,6 @@ static KernelTransaction kernelTransaction() TransactionHeaderInformationFactory headerInformationFactory = mock( TransactionHeaderInformationFactory.class ); when( headerInformationFactory.create() ).thenReturn( headerInformation ); - long lastTransactionIdWhenStarted = 0; - StorageEngine storageEngine = mock( StorageEngine.class ); StoreReadLayer storeReadLayer = mock( StoreReadLayer.class ); when( storeReadLayer.acquireStatement() ).thenReturn( mock( StorageStatement.class ) ); @@ -57,14 +57,13 @@ static KernelTransaction kernelTransaction() return new KernelTransactionImplementation( mock( StatementOperationParts.class ), mock( SchemaWriteGuard.class ), - null, new TransactionHooks(), + new TransactionHooks(), mock( ConstraintIndexCreator.class ), new Procedures(), headerInformationFactory, mock( TransactionRepresentationCommitProcess.class ), mock( TransactionMonitor.class ), - mock( LegacyIndexTransactionState.class ), - mock( KernelTransactions.class ), + mock( Supplier.class ), + mock( Pool.class ), Clock.SYSTEM_CLOCK, TransactionTracer.NULL, - storageEngine, - lastTransactionIdWhenStarted ); + storageEngine ).initialize( 0, new NoOpClient() ); } } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/api/KernelTransactionImplementationTest.java b/community/kernel/src/test/java/org/neo4j/kernel/api/KernelTransactionImplementationTest.java index 0d7ecface958b..6d74e27cd656e 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/api/KernelTransactionImplementationTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/api/KernelTransactionImplementationTest.java @@ -33,6 +33,7 @@ import org.neo4j.kernel.api.txstate.TransactionState; import org.neo4j.kernel.impl.api.KernelStatement; import org.neo4j.kernel.impl.api.KernelTransactionImplementation; +import org.neo4j.kernel.impl.locking.Locks; import org.neo4j.kernel.impl.transaction.TransactionMonitor; import org.neo4j.kernel.impl.transaction.command.Command; import org.neo4j.storageengine.api.StorageCommand; @@ -367,8 +368,14 @@ public Void answer( InvocationOnMock invocation ) throws Throwable any( ResourceLocker.class ), anyLong() ); - try ( KernelTransactionImplementation transaction = newTransaction( 5 ) ) + try ( KernelTransactionImplementation transaction = newTransaction() ) { + transaction.initialize( 5L, mock( Locks.Client.class ) ); + try ( KernelStatement statement = transaction.acquireStatement() ) + { + statement.legacyIndexTxState(); // which will pull it from the supplier and the mocking above + // will have it say that it has changes. + } // WHEN committing it at a later point clock.forward( 5, MILLISECONDS ); // ...and simulating some other transaction being committed @@ -390,7 +397,7 @@ public void successfulTxShouldNotifyKernelTransactionsThatItIsClosed() throws Tr tx.success(); tx.close(); - verify( kernelTransactions ).transactionClosed( tx ); + verify( txPool ).release( tx ); } @Test @@ -401,7 +408,7 @@ public void failedTxShouldNotifyKernelTransactionsThatItIsClosed() throws Transa tx.failure(); tx.close(); - verify( kernelTransactions ).transactionClosed( tx ); + verify( txPool ).release( tx ); } private void verifyExtraInteractionWithTheMonitor( TransactionMonitor transactionMonitor, boolean isWriteTx ) diff --git a/community/kernel/src/test/java/org/neo4j/kernel/api/KernelTransactionTestBase.java b/community/kernel/src/test/java/org/neo4j/kernel/api/KernelTransactionTestBase.java index 0740d8c5bf30f..db504c4ea9342 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/api/KernelTransactionTestBase.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/api/KernelTransactionTestBase.java @@ -21,6 +21,9 @@ import org.junit.Before; +import java.util.function.Supplier; + +import org.neo4j.collection.pool.Pool; import org.neo4j.helpers.FakeClock; import org.neo4j.kernel.api.exceptions.TransactionFailureException; import org.neo4j.kernel.api.txstate.LegacyIndexTransactionState; @@ -55,6 +58,7 @@ public class KernelTransactionTestBase protected final StoreReadLayer readLayer = mock( StoreReadLayer.class ); protected final TransactionHooks hooks = new TransactionHooks(); protected final LegacyIndexTransactionState legacyIndexState = mock( LegacyIndexTransactionState.class ); + protected final Supplier legacyIndexStateSupplier = () -> legacyIndexState; protected final TransactionMonitor transactionMonitor = mock( TransactionMonitor.class ); protected final CapturingCommitProcess commitProcess = new CapturingCommitProcess(); protected final TransactionHeaderInformation headerInformation = mock( TransactionHeaderInformation.class ); @@ -62,6 +66,7 @@ public class KernelTransactionTestBase protected final SchemaWriteGuard schemaWriteGuard = mock( SchemaWriteGuard.class ); protected final FakeClock clock = new FakeClock(); protected final KernelTransactions kernelTransactions = mock( KernelTransactions.class ); + protected final Pool txPool = mock( Pool.class ); @Before public void before() @@ -80,9 +85,9 @@ public KernelTransactionImplementation newTransaction() public KernelTransactionImplementation newTransaction( long lastTransactionIdWhenStarted ) { - return new KernelTransactionImplementation( null, schemaWriteGuard, new NoOpClient(), hooks, null, null, - headerInformationFactory, commitProcess, transactionMonitor, legacyIndexState, kernelTransactions, - clock, TransactionTracer.NULL, storageEngine, lastTransactionIdWhenStarted ); + return new KernelTransactionImplementation( null, schemaWriteGuard, hooks, null, null, headerInformationFactory, + commitProcess, transactionMonitor, legacyIndexStateSupplier, txPool, clock, TransactionTracer.NULL, + storageEngine ).initialize( lastTransactionIdWhenStarted, new NoOpClient() ); } public class CapturingCommitProcess implements TransactionCommitProcess diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionsTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionsTest.java index 98742b89c5fc2..7d6f485d0db38 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionsTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionsTest.java @@ -45,19 +45,18 @@ import org.neo4j.logging.NullLog; import org.neo4j.storageengine.api.StorageCommand; import org.neo4j.storageengine.api.StorageEngine; +import org.neo4j.storageengine.api.StorageStatement; import org.neo4j.storageengine.api.StoreReadLayer; import org.neo4j.storageengine.api.TransactionApplicationMode; import org.neo4j.storageengine.api.lock.ResourceLocker; import org.neo4j.storageengine.api.txstate.ReadableTransactionState; import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.not; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyCollection; import static org.mockito.Matchers.anyLong; @@ -66,6 +65,7 @@ import static org.mockito.Mockito.when; import static org.neo4j.helpers.collection.IteratorUtil.asSet; +import static org.neo4j.helpers.collection.IteratorUtil.asUniqueSet; public class KernelTransactionsTest { @@ -83,7 +83,7 @@ public void shouldListActiveTransactions() throws Exception first.close(); // Then - assertThat( registry.activeTransactions(), equalTo( asSet( second, third ) ) ); + assertThat( asUniqueSet( registry.activeTransactions() ), equalTo( asSet( second, third ) ) ); } @Test @@ -142,8 +142,8 @@ public void transactionCloseRemovesTxFromActiveTransactions() throws Exception KernelTransaction tx2 = kernelTransactions.newInstance(); KernelTransaction tx3 = kernelTransactions.newInstance(); - kernelTransactions.transactionClosed( tx1 ); - kernelTransactions.transactionClosed( tx3 ); + tx1.close(); + tx3.close(); assertEquals( asSet( tx2 ), kernelTransactions.activeTransactions() ); } @@ -162,22 +162,6 @@ public void transactionRemovesItselfFromActiveTransactions() throws Exception assertEquals( asSet( tx1, tx3 ), kernelTransactions.activeTransactions() ); } - @Test - public void exceptionIsThrownWhenUnknownTxIsClosed() throws Exception - { - KernelTransactions kernelTransactions = newKernelTransactions(); - - try - { - kernelTransactions.transactionClosed( mock( KernelTransactionImplementation.class ) ); - fail( "Exception expected" ); - } - catch ( Exception e ) - { - assertThat( e, instanceOf( IllegalStateException.class ) ); - } - } - @Test public void disposeAllMarksAllTransactionsForTermination() throws Exception { @@ -210,10 +194,16 @@ private static KernelTransactions newKernelTransactions( TransactionCommitProces MetaDataStore metaDataStore = mock( MetaDataStore.class ); NeoStores neoStores = mock( NeoStores.class ); - StoreStatement storeStatement = new StoreStatement( neoStores, new ReentrantLockService(), - mock( Supplier.class ), null ); StoreReadLayer readLayer = mock( StoreReadLayer.class ); - when( readLayer.acquireStatement() ).thenReturn( storeStatement ); + when( readLayer.acquireStatement() ).thenAnswer( new Answer() + { + @Override + public StorageStatement answer( InvocationOnMock invocation ) throws Throwable + { + return new StoreStatement( neoStores, new ReentrantLockService(), + mock( Supplier.class ), null ); + } + } ); StorageEngine storageEngine = mock( StorageEngine.class ); when( storageEngine.storeReadLayer() ).thenReturn( readLayer );