Skip to content

Commit

Permalink
Revert "Removed KTI pooling"
Browse files Browse the repository at this point in the history
This reverts commit 7735373.

At least parts of it. KTI objects are rather heavy weight and there's a cost
to registering each transaction instance in the shared "all transactions" collection
  • Loading branch information
tinwelint committed Feb 17, 2016
1 parent b8e9f87 commit 3321f97
Show file tree
Hide file tree
Showing 7 changed files with 192 additions and 109 deletions.
Expand Up @@ -33,6 +33,7 @@
import org.neo4j.register.Registers; import org.neo4j.register.Registers;


import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;

import static org.neo4j.kernel.api.ReadOperations.ANY_LABEL; import static org.neo4j.kernel.api.ReadOperations.ANY_LABEL;
import static org.neo4j.kernel.api.ReadOperations.ANY_RELATIONSHIP_TYPE; import static org.neo4j.kernel.api.ReadOperations.ANY_RELATIONSHIP_TYPE;
import static org.neo4j.kernel.impl.store.counts.keys.CountsKeyFactory.indexSampleKey; import static org.neo4j.kernel.impl.store.counts.keys.CountsKeyFactory.indexSampleKey;
Expand Down
Expand Up @@ -21,7 +21,9 @@


import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.function.Supplier;


import org.neo4j.collection.pool.Pool;
import org.neo4j.helpers.Clock; import org.neo4j.helpers.Clock;
import org.neo4j.kernel.api.AccessMode; import org.neo4j.kernel.api.AccessMode;
import org.neo4j.kernel.api.KernelTransaction; import org.neo4j.kernel.api.KernelTransaction;
Expand All @@ -36,13 +38,13 @@
import org.neo4j.kernel.api.exceptions.schema.CreateConstraintFailureException; import org.neo4j.kernel.api.exceptions.schema.CreateConstraintFailureException;
import org.neo4j.kernel.api.exceptions.schema.DropIndexFailureException; import org.neo4j.kernel.api.exceptions.schema.DropIndexFailureException;
import org.neo4j.kernel.api.index.IndexDescriptor; import org.neo4j.kernel.api.index.IndexDescriptor;
import org.neo4j.kernel.impl.proc.Procedures;
import org.neo4j.kernel.api.txstate.LegacyIndexTransactionState; import org.neo4j.kernel.api.txstate.LegacyIndexTransactionState;
import org.neo4j.kernel.api.txstate.TransactionState; import org.neo4j.kernel.api.txstate.TransactionState;
import org.neo4j.kernel.api.txstate.TxStateHolder; import org.neo4j.kernel.api.txstate.TxStateHolder;
import org.neo4j.kernel.impl.api.state.ConstraintIndexCreator; import org.neo4j.kernel.impl.api.state.ConstraintIndexCreator;
import org.neo4j.kernel.impl.api.state.TxState; import org.neo4j.kernel.impl.api.state.TxState;
import org.neo4j.kernel.impl.locking.Locks; import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.impl.proc.Procedures;
import org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory; import org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory;
import org.neo4j.kernel.impl.transaction.TransactionMonitor; import org.neo4j.kernel.impl.transaction.TransactionMonitor;
import org.neo4j.kernel.impl.transaction.log.PhysicalTransactionRepresentation; import org.neo4j.kernel.impl.transaction.log.PhysicalTransactionRepresentation;
Expand All @@ -51,6 +53,7 @@
import org.neo4j.kernel.impl.transaction.tracing.TransactionTracer; import org.neo4j.kernel.impl.transaction.tracing.TransactionTracer;
import org.neo4j.storageengine.api.StorageCommand; import org.neo4j.storageengine.api.StorageCommand;
import org.neo4j.storageengine.api.StorageEngine; import org.neo4j.storageengine.api.StorageEngine;
import org.neo4j.storageengine.api.StoreReadLayer;
import org.neo4j.storageengine.api.txstate.TxStateVisitor; import org.neo4j.storageengine.api.txstate.TxStateVisitor;


import static org.neo4j.storageengine.api.TransactionApplicationMode.INTERNAL; import static org.neo4j.storageengine.api.TransactionApplicationMode.INTERNAL;
Expand All @@ -62,6 +65,13 @@
*/ */
public class KernelTransactionImplementation implements KernelTransaction, TxStateHolder public class KernelTransactionImplementation implements KernelTransaction, TxStateHolder
{ {
/*
* IMPORTANT:
* This class is pooled and re-used. If you add *any* state to it, you *must* make sure that:
* - the #initialize() method resets that state for re-use
* - the #release() method releases resources acquired in #initialize() or during the transaction's life time
*/

private enum TransactionType private enum TransactionType
{ {
READ, READ,
Expand Down Expand Up @@ -100,69 +110,80 @@ TransactionType upgradeToSchemaTransaction() throws InvalidTransactionTypeKernel
private final TransactionHooks hooks; private final TransactionHooks hooks;
private final ConstraintIndexCreator constraintIndexCreator; private final ConstraintIndexCreator constraintIndexCreator;
private final StatementOperationParts operations; private final StatementOperationParts operations;
private final KernelTransactions kernelTransactions;
private final StorageEngine storageEngine; private final StorageEngine storageEngine;
private final Locks.Client locks;
private final Procedures procedures; private final Procedures procedures;
private final TransactionTracer tracer;
private final Pool<KernelTransactionImplementation> pool;
private final Supplier<LegacyIndexTransactionState> legacyIndexTxStateSupplier;


// For committing // For committing
private final TransactionHeaderInformationFactory headerInformationFactory; private final TransactionHeaderInformationFactory headerInformationFactory;
private final TransactionCommitProcess commitProcess; private final TransactionCommitProcess commitProcess;
private final TransactionMonitor transactionMonitor; private final TransactionMonitor transactionMonitor;
private final StoreReadLayer storeLayer;
private final Clock clock; private final Clock clock;


// State // State that needs to be reset between uses. Most of these should be cleared or released in #release(),
// whereas others, such as timestamp or txId when transaction starts, even locks, needs to be set in #initialize().
private TransactionState txState; private TransactionState txState;
private final LegacyIndexTransactionState legacyIndexTransactionState; private LegacyIndexTransactionState legacyIndexTransactionState;
private TransactionType transactionType = TransactionType.READ; // Tracks current state of transaction, which will upgrade to WRITE or SCHEMA mode when necessary private TransactionType transactionType; // Tracks current state of transaction, which will upgrade to WRITE or SCHEMA mode when necessary
private TransactionHooks.TransactionHooksState hooksState; private TransactionHooks.TransactionHooksState hooksState;
private KernelStatement currentStatement; private KernelStatement currentStatement;
private CloseListener closeListener; private CloseListener closeListener;
private AccessMode accessMode = AccessMode.FULL; // Defines whether a transaction is allowed to upgrade to WRITE or SCHEMA mode private AccessMode accessMode; // Defines whether a transaction is allowed to upgrade to WRITE or SCHEMA mode

private Locks.Client locks;
private boolean beforeHookInvoked; private boolean beforeHookInvoked;
private boolean closing, closed; private boolean closing, closed;
private boolean failure, success; private boolean failure, success;
private volatile boolean terminated; private volatile boolean terminated;

private long startTimeMillis;
// Header information private long lastTransactionIdWhenStarted;
private final long startTimeMillis; private TransactionEvent transactionEvent;
private final long lastTransactionIdWhenStarted;

// Event tracing
private final TransactionEvent transactionEvent;


public KernelTransactionImplementation( StatementOperationParts operations, public KernelTransactionImplementation( StatementOperationParts operations,
SchemaWriteGuard schemaWriteGuard, SchemaWriteGuard schemaWriteGuard,
Locks.Client locks,
TransactionHooks hooks, TransactionHooks hooks,
ConstraintIndexCreator constraintIndexCreator, ConstraintIndexCreator constraintIndexCreator,
Procedures procedures, TransactionHeaderInformationFactory headerInformationFactory, Procedures procedures, TransactionHeaderInformationFactory headerInformationFactory,
TransactionCommitProcess commitProcess, TransactionCommitProcess commitProcess,
TransactionMonitor transactionMonitor, TransactionMonitor transactionMonitor,
LegacyIndexTransactionState legacyIndexTransactionState, Supplier<LegacyIndexTransactionState> legacyIndexTxStateSupplier,
KernelTransactions kernelTransactions, Pool<KernelTransactionImplementation> pool,
Clock clock, Clock clock,
TransactionTracer tracer, TransactionTracer tracer,
StorageEngine storageEngine, StorageEngine storageEngine )
long lastTransactionIdWhenStarted )
{ {
this.operations = operations; this.operations = operations;
this.schemaWriteGuard = schemaWriteGuard; this.schemaWriteGuard = schemaWriteGuard;
this.hooks = hooks; this.hooks = hooks;
this.locks = locks;
this.constraintIndexCreator = constraintIndexCreator; this.constraintIndexCreator = constraintIndexCreator;
this.procedures = procedures; this.procedures = procedures;
this.headerInformationFactory = headerInformationFactory; this.headerInformationFactory = headerInformationFactory;
this.commitProcess = commitProcess; this.commitProcess = commitProcess;
this.transactionMonitor = transactionMonitor; this.transactionMonitor = transactionMonitor;
this.storeLayer = storageEngine.storeReadLayer();
this.storageEngine = storageEngine; this.storageEngine = storageEngine;
this.legacyIndexTransactionState = new CachingLegacyIndexTransactionState( legacyIndexTransactionState ); this.legacyIndexTxStateSupplier = legacyIndexTxStateSupplier;
this.kernelTransactions = kernelTransactions; this.pool = pool;
this.clock = clock; this.clock = clock;
this.tracer = tracer;
}

/**
* Reset this transaction to a vanilla state, turning it into a logically new transaction.
*/
public KernelTransactionImplementation initialize( long lastCommittedTx, Locks.Client locks )
{
this.locks = locks;
this.closing = closed = failure = success = terminated = beforeHookInvoked = false;
this.transactionType = TransactionType.READ;
this.startTimeMillis = clock.currentTimeMillis(); this.startTimeMillis = clock.currentTimeMillis();
this.lastTransactionIdWhenStarted = lastTransactionIdWhenStarted; this.lastTransactionIdWhenStarted = lastCommittedTx;
this.transactionEvent = tracer.beginTransaction(); this.transactionEvent = tracer.beginTransaction();
assert transactionEvent != null : "transactionEvent was null!";
this.accessMode = AccessMode.FULL;
return this;
} }


@Override @Override
Expand Down Expand Up @@ -294,7 +315,8 @@ public TransactionState txState()
@Override @Override
public LegacyIndexTransactionState legacyIndexTxState() public LegacyIndexTransactionState legacyIndexTxState()
{ {
return legacyIndexTransactionState; return legacyIndexTransactionState != null ? legacyIndexTransactionState :
(legacyIndexTransactionState = legacyIndexTxStateSupplier.get());
} }


@Override @Override
Expand Down Expand Up @@ -341,7 +363,12 @@ private void assertTransactionOpen()


private boolean hasChanges() private boolean hasChanges()
{ {
return hasTxStateWithChanges() || legacyIndexTransactionState.hasChanges(); return hasTxStateWithChanges() || hasLegacyIndexChanges();
}

private boolean hasLegacyIndexChanges()
{
return legacyIndexTransactionState != null && legacyIndexTransactionState.hasChanges();
} }


private boolean hasDataChanges() private boolean hasDataChanges()
Expand Down Expand Up @@ -390,8 +417,7 @@ public void close() throws TransactionFailureException
} }
finally finally
{ {
locks.close(); release();
kernelTransactions.transactionClosed( this );
} }
} }
} }
Expand Down Expand Up @@ -430,7 +456,7 @@ private void commit() throws TransactionFailureException
txState, txState,
locks, locks,
lastTransactionIdWhenStarted ); lastTransactionIdWhenStarted );
if ( legacyIndexTransactionState.hasChanges() ) if ( hasLegacyIndexChanges() )
{ {
legacyIndexTransactionState.extractCommands( extractedCommands ); legacyIndexTransactionState.extractCommands( extractedCommands );
} }
Expand Down Expand Up @@ -504,13 +530,13 @@ private void rollback() throws TransactionFailureException
@Override @Override
public void visitCreatedNode( long id ) public void visitCreatedNode( long id )
{ {
storageEngine.storeReadLayer().releaseNode( id ); storeLayer.releaseNode( id );
} }


@Override @Override
public void visitCreatedRelationship( long id, int type, long startNode, long endNode ) public void visitCreatedRelationship( long id, int type, long startNode, long endNode )
{ {
storageEngine.storeReadLayer().releaseRelationship( id ); storeLayer.releaseRelationship( id );
} }
} ); } );
} }
Expand Down Expand Up @@ -559,6 +585,20 @@ private void afterRollback()
} }
} }


/**
* Release resources held up by this transaction & return it to the transaction pool.
*/
private void release()
{
locks.close();
transactionEvent = null;
legacyIndexTransactionState = null;
txState = null;
hooksState = null;
closeListener = null;
pool.release( this );
}

@Override @Override
public void registerCloseListener( CloseListener listener ) public void registerCloseListener( CloseListener listener )
{ {
Expand Down

0 comments on commit 3321f97

Please sign in to comment.