Skip to content

Commit

Permalink
Removed KTI pooling
Browse files Browse the repository at this point in the history
Currently KernelTransactionImplementations are pooled and reused. This is done
with MarshlandPool that has ThreadLocals backed by some other pool. So each
thread that executed transactions has a KTI in it's thread local storage and
reuses it.

Such reuse of KTI instance with their internal state has caused quite some
issues because restoring KTIs to the pristine state before putting them back
to the pool is tricky.

This commit removed pooling of KTIs. So whenever new transaction is started
new KTI object is created.

Reasons:
 - biggest reason to pool KTIs is transaction state but it is completely
   reinitialized when KTI is taken from the pool
 - recent storage engine API refactoring made KTI state even smaller making
   even less things added to the pool
 - code simplification
 - removed need to reinitialize KTI state

Tested:
 - microbenchmarks via Core API show 30% degradation for tiny read
   transactions (read couple nodes by id without labels and properties, etc.)
   and no degradation for read/write transactions of reasonable size
 - microbenchmarks via Core API show same heap usage and GC
 - benchmarking read/write Cypher queries show no performance degradation
 - small LDBC read/write queries shows no performance degradation on a
   machine with 32 hardware threads
 - long running soak test shows no throughput degradation, no excessive heap
   usage and GC activity
  • Loading branch information
lutovich committed Dec 10, 2015
1 parent efe7c31 commit 7735373
Show file tree
Hide file tree
Showing 11 changed files with 219 additions and 284 deletions.
Expand Up @@ -34,8 +34,6 @@
*/ */
public interface LegacyIndexTransactionState extends RecordState public interface LegacyIndexTransactionState extends RecordState
{ {
void clear();

LegacyIndex nodeChanges( String indexName ) throws LegacyIndexNotFoundKernelException; LegacyIndex nodeChanges( String indexName ) throws LegacyIndexNotFoundKernelException;


LegacyIndex relationshipChanges( String indexName ) throws LegacyIndexNotFoundKernelException; LegacyIndex relationshipChanges( String indexName ) throws LegacyIndexNotFoundKernelException;
Expand Down
Expand Up @@ -41,20 +41,6 @@ public CachingLegacyIndexTransactionState( LegacyIndexTransactionState txState )
this.txState = txState; this.txState = txState;
} }


@Override
public void clear()
{
txState.clear();
if ( nodeLegacyIndexChanges != null && !nodeLegacyIndexChanges.isEmpty() )
{
nodeLegacyIndexChanges.clear();
}
if ( relationshipLegacyIndexChanges != null && !relationshipLegacyIndexChanges.isEmpty() )
{
relationshipLegacyIndexChanges.clear();
}
}

@Override @Override
public LegacyIndex nodeChanges( String indexName ) throws LegacyIndexNotFoundKernelException public LegacyIndex nodeChanges( String indexName ) throws LegacyIndexNotFoundKernelException
{ {
Expand Down
Expand Up @@ -32,7 +32,6 @@
import org.neo4j.register.Registers; import org.neo4j.register.Registers;


import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;

import static org.neo4j.kernel.api.ReadOperations.ANY_LABEL; import static org.neo4j.kernel.api.ReadOperations.ANY_LABEL;
import static org.neo4j.kernel.api.ReadOperations.ANY_RELATIONSHIP_TYPE; import static org.neo4j.kernel.api.ReadOperations.ANY_RELATIONSHIP_TYPE;
import static org.neo4j.kernel.impl.store.counts.keys.CountsKeyFactory.indexSampleKey; import static org.neo4j.kernel.impl.store.counts.keys.CountsKeyFactory.indexSampleKey;
Expand Down Expand Up @@ -142,17 +141,6 @@ public boolean hasChanges()
return !counts.isEmpty(); return !counts.isEmpty();
} }


/**
* Set this counter up to a pristine state, as if it had just been initialized.
*/
public void clear()
{
if ( !counts.isEmpty() )
{
counts.clear();
}
}

public static final class Difference public static final class Difference
{ {
private final CountsKey key; private final CountsKey key;
Expand Down
Expand Up @@ -21,7 +21,6 @@


import java.util.Collection; import java.util.Collection;


import org.neo4j.collection.pool.Pool;
import org.neo4j.helpers.Clock; import org.neo4j.helpers.Clock;
import org.neo4j.kernel.api.KernelTransaction; import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.KeyReadTokenNameLookup; import org.neo4j.kernel.api.KeyReadTokenNameLookup;
Expand All @@ -30,6 +29,7 @@
import org.neo4j.kernel.api.exceptions.InvalidTransactionTypeKernelException; import org.neo4j.kernel.api.exceptions.InvalidTransactionTypeKernelException;
import org.neo4j.kernel.api.exceptions.Status; import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.exceptions.TransactionFailureException; import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.api.exceptions.TransactionHookException;
import org.neo4j.kernel.api.exceptions.schema.ConstraintValidationKernelException; import org.neo4j.kernel.api.exceptions.schema.ConstraintValidationKernelException;
import org.neo4j.kernel.api.exceptions.schema.CreateConstraintFailureException; import org.neo4j.kernel.api.exceptions.schema.CreateConstraintFailureException;
import org.neo4j.kernel.api.exceptions.schema.DropIndexFailureException; import org.neo4j.kernel.api.exceptions.schema.DropIndexFailureException;
Expand All @@ -40,7 +40,6 @@
import org.neo4j.kernel.api.txstate.TxStateVisitor; import org.neo4j.kernel.api.txstate.TxStateVisitor;
import org.neo4j.kernel.impl.api.state.ConstraintIndexCreator; import org.neo4j.kernel.impl.api.state.ConstraintIndexCreator;
import org.neo4j.kernel.impl.api.state.TxState; import org.neo4j.kernel.impl.api.state.TxState;
import org.neo4j.kernel.impl.api.store.StoreReadLayer;
import org.neo4j.kernel.impl.api.store.StoreStatement; import org.neo4j.kernel.impl.api.store.StoreStatement;
import org.neo4j.kernel.impl.locking.Locks; import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.impl.storageengine.StorageEngine; import org.neo4j.kernel.impl.storageengine.StorageEngine;
Expand All @@ -61,12 +60,6 @@
*/ */
public class KernelTransactionImplementation implements KernelTransaction, TxStateHolder public class KernelTransactionImplementation implements KernelTransaction, TxStateHolder
{ {
/*
* IMPORTANT:
* This class is pooled and re-used. If you add *any* state to it, you *must* make sure that the #initialize()
* method resets that state for re-use.
*/

private enum TransactionType private enum TransactionType
{ {
ANY, ANY,
Expand Down Expand Up @@ -105,36 +98,36 @@ TransactionType upgradeToSchemaTransaction() throws InvalidTransactionTypeKernel
private final TransactionHooks hooks; private final TransactionHooks hooks;
private final ConstraintIndexCreator constraintIndexCreator; private final ConstraintIndexCreator constraintIndexCreator;
private final StatementOperationParts operations; private final StatementOperationParts operations;
private final Pool<KernelTransactionImplementation> pool; private final KernelTransactions kernelTransactions;
// State private final StorageEngine storageEngine;
private final StoreStatement storeStatement;
private final Locks.Client locks;

// For committing // For committing
private final TransactionHeaderInformationFactory headerInformationFactory; private final TransactionHeaderInformationFactory headerInformationFactory;
private final TransactionCommitProcess commitProcess; private final TransactionCommitProcess commitProcess;
private final TransactionMonitor transactionMonitor; private final TransactionMonitor transactionMonitor;
private final StoreReadLayer storeLayer;
private final StorageEngine storageEngine;
private final Clock clock; private final Clock clock;

// State
private TransactionState txState; private TransactionState txState;
private LegacyIndexTransactionState legacyIndexTransactionState; private LegacyIndexTransactionState legacyIndexTransactionState;
private TransactionType transactionType = TransactionType.ANY; private TransactionType transactionType = TransactionType.ANY;
private TransactionHooks.TransactionHooksState hooksState; private TransactionHooks.TransactionHooksState hooksState;
private KernelStatement currentStatement;
private CloseListener closeListener;

private boolean beforeHookInvoked; private boolean beforeHookInvoked;
private Locks.Client locks;
private StoreStatement storeStatement;
private boolean closing, closed; private boolean closing, closed;
private boolean failure, success; private boolean failure, success;
private volatile boolean terminated; private volatile boolean terminated;
// Some header information
private long startTimeMillis; // Header information
private long lastTransactionIdWhenStarted; private final long startTimeMillis;
/** private final long lastTransactionIdWhenStarted;
* Implements reusing the same underlying {@link KernelStatement} for overlapping statements.
*/
private KernelStatement currentStatement;
// Event tracing // Event tracing
private final TransactionTracer tracer; private final TransactionEvent transactionEvent;
private TransactionEvent transactionEvent;
private CloseListener closeListener;


public KernelTransactionImplementation( StatementOperationParts operations, public KernelTransactionImplementation( StatementOperationParts operations,
SchemaWriteGuard schemaWriteGuard, SchemaWriteGuard schemaWriteGuard,
Expand All @@ -145,10 +138,11 @@ public KernelTransactionImplementation( StatementOperationParts operations,
TransactionCommitProcess commitProcess, TransactionCommitProcess commitProcess,
TransactionMonitor transactionMonitor, TransactionMonitor transactionMonitor,
LegacyIndexTransactionState legacyIndexTransactionState, LegacyIndexTransactionState legacyIndexTransactionState,
Pool<KernelTransactionImplementation> pool, KernelTransactions kernelTransactions,
Clock clock, Clock clock,
TransactionTracer tracer, TransactionTracer tracer,
StorageEngine storageEngine ) StorageEngine storageEngine,
long lastTransactionIdWhenStarted )
{ {
this.operations = operations; this.operations = operations;
this.schemaWriteGuard = schemaWriteGuard; this.schemaWriteGuard = schemaWriteGuard;
Expand All @@ -158,30 +152,14 @@ public KernelTransactionImplementation( StatementOperationParts operations,
this.headerInformationFactory = headerInformationFactory; this.headerInformationFactory = headerInformationFactory;
this.commitProcess = commitProcess; this.commitProcess = commitProcess;
this.transactionMonitor = transactionMonitor; this.transactionMonitor = transactionMonitor;
this.storeLayer = storageEngine.storeReadLayer(); this.storeStatement = storageEngine.storeReadLayer().acquireStatement();
this.storageEngine = storageEngine; this.storageEngine = storageEngine;
this.legacyIndexTransactionState = new CachingLegacyIndexTransactionState( legacyIndexTransactionState ); this.legacyIndexTransactionState = new CachingLegacyIndexTransactionState( legacyIndexTransactionState );
this.pool = pool; this.kernelTransactions = kernelTransactions;
this.clock = clock; this.clock = clock;
this.tracer = tracer;
}

/**
* Reset this transaction to a vanilla state, turning it into a logically new transaction.
*/
public KernelTransactionImplementation initialize( long lastCommittedTx )
{
assert locks != null : "This transaction has been disposed off, it should not be used.";
this.closing = closed = failure = success = false;
this.transactionType = TransactionType.ANY;
this.beforeHookInvoked = false;
this.startTimeMillis = clock.currentTimeMillis(); this.startTimeMillis = clock.currentTimeMillis();
this.lastTransactionIdWhenStarted = lastCommittedTx; this.lastTransactionIdWhenStarted = lastTransactionIdWhenStarted;
this.transactionEvent = tracer.beginTransaction(); this.transactionEvent = tracer.beginTransaction();
assert transactionEvent != null : "transactionEvent was null!";
this.storeStatement = storeLayer.acquireStatement();
this.closeListener = null;
return this;
} }


@Override @Override
Expand Down Expand Up @@ -385,39 +363,16 @@ public void close() throws TransactionFailureException
transactionEvent.setTransactionType( transactionType.name() ); transactionEvent.setTransactionType( transactionType.name() );
transactionEvent.setReadOnly( txState == null || !txState.hasChanges() ); transactionEvent.setReadOnly( txState == null || !txState.hasChanges() );
transactionEvent.close(); transactionEvent.close();
transactionEvent = null;
legacyIndexTransactionState.clear();
txState = null;
hooksState = null;
closeListener = null;
} }
finally finally
{ {
release(); locks.close();
storeStatement.close();
kernelTransactions.transactionClosed( this );
} }
} }
} }


protected void dispose()
{
if ( locks != null )
{
locks.close();
}

this.locks = null;
this.transactionType = null;
this.hooksState = null;
this.txState = null;
this.legacyIndexTransactionState = null;

if ( storeStatement != null )
{
this.storeStatement.close();
this.storeStatement = null;
}
}

private void commit() throws TransactionFailureException private void commit() throws TransactionFailureException
{ {
boolean success = false; boolean success = false;
Expand All @@ -429,10 +384,11 @@ private void commit() throws TransactionFailureException
{ {
try try
{ {
if ( (hooksState = hooks.beforeCommit( txState, this, storeLayer )) != null && hooksState.failed() ) hooksState = hooks.beforeCommit( txState, this, storageEngine.storeReadLayer() );
if ( hooksState != null && hooksState.failed() )
{ {
throw new TransactionFailureException( Status.Transaction.HookFailed, hooksState.failure(), TransactionHookException cause = hooksState.failure();
"" ); throw new TransactionFailureException( Status.Transaction.HookFailed, cause, "" );
} }
} }
finally finally
Expand Down Expand Up @@ -522,13 +478,13 @@ private void rollback() throws TransactionFailureException
@Override @Override
public void visitCreatedNode( long id ) public void visitCreatedNode( long id )
{ {
storeLayer.releaseNode( id ); storageEngine.storeReadLayer().releaseNode( id );
} }


@Override @Override
public void visitCreatedRelationship( long id, int type, long startNode, long endNode ) public void visitCreatedRelationship( long id, int type, long startNode, long endNode )
{ {
storeLayer.releaseRelationship( id ); storageEngine.storeReadLayer().releaseRelationship( id );
} }
} ); } );
} }
Expand Down Expand Up @@ -577,30 +533,6 @@ private void afterRollback()
} }
} }


/**
* Release resources held up by this transaction & return it to the transaction pool.
*/
private void release()
{
locks.releaseAll();
if ( terminated )
{
// This transaction has been externally marked for termination.
// Just dispose of this transaction and don't return it to the pool.
dispose();
}
else
{
// Return this instance to the pool so that another transaction may use it.
pool.release( this );
if ( storeStatement != null )
{
storeStatement.close();
storeStatement = null;
}
}
}

@Override @Override
public void registerCloseListener( CloseListener listener ) public void registerCloseListener( CloseListener listener )
{ {
Expand Down

0 comments on commit 7735373

Please sign in to comment.