Skip to content

Commit

Permalink
Add global cursor cache
Browse files Browse the repository at this point in the history
Previously only one cursor per type was cached in the transaction,
meaning that often new cursors were created and discarded.  This
change will make sure we cache more than one cursor per kind in a
local and global pool (similar to KernelTransactions) to minimize
creation and discard of cursors.
  • Loading branch information
davidegrohmann committed May 8, 2017
1 parent 530cd9d commit 2586429
Show file tree
Hide file tree
Showing 65 changed files with 848 additions and 1,199 deletions.
Expand Up @@ -42,7 +42,7 @@ import org.neo4j.kernel.impl.locking.LockTracer
import org.neo4j.kernel.impl.proc.Procedures
import org.neo4j.kernel.impl.query.clientconnection.ClientConnectionInfo
import org.neo4j.kernel.impl.query.{Neo4jTransactionalContext, Neo4jTransactionalContextFactory}
import org.neo4j.storageengine.api.StorageStatement
import org.neo4j.storageengine.api.SchemaResources
import org.neo4j.test.TestGraphDatabaseFactory

import scala.collection.JavaConverters._
Expand All @@ -61,7 +61,7 @@ class TransactionBoundQueryContextTest extends CypherFunSuite {
outerTx = mock[InternalTransaction]
val kernelTransaction = mock[KernelTransactionImplementation]
when(kernelTransaction.securityContext()).thenReturn(AUTH_DISABLED)
val storeStatement = mock[StorageStatement]
val storeStatement = mock[SchemaResources]
val operations = mock[StatementOperationParts](RETURNS_DEEP_STUBS)
statement = new KernelStatementImplementation(kernelTransaction, null, storeStatement, new Procedures(), new CanWrite(), LockTracer.NONE)
statement.initialize(null, operations, PageCursorTracerSupplier.NULL.get())
Expand Down
Expand Up @@ -86,7 +86,6 @@
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.proc.Procedures;
import org.neo4j.kernel.impl.storageengine.impl.recordstorage.RecordStorageEngine;
import org.neo4j.kernel.impl.storageengine.impl.recordstorage.StorageStatementFactory;
import org.neo4j.kernel.impl.store.MetaDataStore;
import org.neo4j.kernel.impl.store.StoreId;
import org.neo4j.kernel.impl.store.format.RecordFormatPropertyConfigurator;
Expand Down Expand Up @@ -264,7 +263,6 @@ boolean applicable( DiagnosticsPhase phase )
private final Map<String,IndexImplementation> indexProviders = new HashMap<>();
private final LegacyIndexProviderLookup legacyIndexProviderLookup;
private final ConstraintSemantics constraintSemantics;
private final StorageStatementFactory storageStatementFactory;
private final Procedures procedures;
private final IOLimiter ioLimiter;
private final AvailabilityGuard availabilityGuard;
Expand Down Expand Up @@ -318,7 +316,6 @@ public NeoStoreDataSource(
AutoIndexing autoIndexing,
PageCache pageCache,
ConstraintSemantics constraintSemantics,
StorageStatementFactory storageStatementFactory,
Monitors monitors,
Tracers tracers,
Procedures procedures,
Expand Down Expand Up @@ -356,7 +353,6 @@ public NeoStoreDataSource(
this.startupStatistics = startupStatistics;
this.guard = guard;
this.constraintSemantics = constraintSemantics;
this.storageStatementFactory = storageStatementFactory;
this.monitors = monitors;
this.tracers = tracers;
this.procedures = procedures;
Expand Down Expand Up @@ -583,7 +579,7 @@ private StorageEngine buildStorageEngine( PropertyKeyTokenHolder propertyKeyToke
RecordStorageEngine storageEngine =
new RecordStorageEngine( storeDir, config, idGeneratorFactory, eligibleForReuse,
idTypeConfigurationProvider, pageCache, fs, logProvider, propertyKeyTokenHolder, labelTokens,
relationshipTypeTokens, schemaStateChangeCallback, constraintSemantics, storageStatementFactory,
relationshipTypeTokens, schemaStateChangeCallback, constraintSemantics,
scheduler, tokenNameLookup, lockService, schemaIndexProvider, indexingServiceMonitor,
databaseHealth, labelScanStoreProvider, legacyIndexProviderLookup, indexConfigStore,
legacyIndexTransactionOrdering, transactionSnapshotSupplier, progressionFactory );
Expand Down
Expand Up @@ -19,7 +19,6 @@
*/
package org.neo4j.kernel.api;

import org.neo4j.storageengine.api.StorageStatement;
import org.neo4j.storageengine.api.StoreReadLayer;
import org.neo4j.storageengine.api.txstate.ReadableTransactionState;

Expand All @@ -35,7 +34,7 @@ interface Outcome
}

OUTCOME beforeCommit( ReadableTransactionState state, KernelTransaction transaction,
StoreReadLayer storeReadLayer, StorageStatement statement );
StoreReadLayer storeReadLayer );
void afterCommit( ReadableTransactionState state, KernelTransaction transaction, OUTCOME outcome );
void afterRollback( ReadableTransactionState state, KernelTransaction transaction, OUTCOME outcome );
}
Expand Up @@ -29,7 +29,6 @@
import org.neo4j.kernel.impl.api.CountsRecordState;
import org.neo4j.storageengine.api.NodeItem;
import org.neo4j.storageengine.api.RelationshipItem;
import org.neo4j.storageengine.api.StorageStatement;
import org.neo4j.storageengine.api.StoreReadLayer;
import org.neo4j.storageengine.api.txstate.ReadableTransactionState;
import org.neo4j.storageengine.api.txstate.TxStateVisitor;
Expand All @@ -40,16 +39,14 @@
public class TransactionCountingStateVisitor extends TxStateVisitor.Delegator
{
private final StoreReadLayer storeLayer;
private final StorageStatement statement;
private final CountsRecordState counts;
private final ReadableTransactionState txState;

public TransactionCountingStateVisitor( TxStateVisitor next, StoreReadLayer storeLayer, StorageStatement statement,
public TransactionCountingStateVisitor( TxStateVisitor next, StoreReadLayer storeLayer,
ReadableTransactionState txState, CountsRecordState counts )
{
super( next );
this.storeLayer = storeLayer;
this.statement = statement;
this.txState = txState;
this.counts = counts;
}
Expand All @@ -65,7 +62,7 @@ public void visitCreatedNode( long id )
public void visitDeletedNode( long id )
{
counts.incrementNodeCount( ANY_LABEL, -1 );
storeLayer.nodeGetSingleCursor( statement, id, ReadableTransactionState.EMPTY )
storeLayer.nodeGetSingleCursor( id, ReadableTransactionState.EMPTY )
.forAll( this::decrementCountForLabelsAndRelationships );
super.visitDeletedNode( id );
}
Expand All @@ -79,7 +76,7 @@ private void decrementCountForLabelsAndRelationships( NodeItem node )
return false;
} );

storeLayer.degrees( statement, node,
storeLayer.degrees( node,
( type, out, in ) -> updateRelationshipsCountsFromDegrees( labelIds, type, -out, -in ) );
}

Expand All @@ -94,8 +91,7 @@ public void visitCreatedRelationship( long id, int type, long startNode, long en
@Override
public void visitDeletedRelationship( long id )
{
try ( Cursor<RelationshipItem> cursor = storeLayer
.relationshipCursor( statement, id, ReadableTransactionState.EMPTY ) )
try ( Cursor<RelationshipItem> cursor = storeLayer.relationshipGetSingleCursor( id, ReadableTransactionState.EMPTY ) )
{
if ( !cursor.next() )
{
Expand Down Expand Up @@ -125,12 +121,11 @@ public void visitNodeLabelChanges( long id, final Set<Integer> added, final Set<
}
// get the relationship counts from *before* this transaction,
// the relationship changes will compensate for what happens during the transaction
storeLayer.nodeGetSingleCursor( statement, id, ReadableTransactionState.EMPTY )
.forAll( node -> storeLayer.degrees( statement, node, ( type, out, in ) ->
storeLayer.nodeGetSingleCursor( id, ReadableTransactionState.EMPTY )
.forAll( node -> storeLayer.degrees( node, ( type, out, in ) ->
{
added.forEach( label -> updateRelationshipsCountsFromDegrees( type, label, out, in ) );
removed.forEach(
label -> updateRelationshipsCountsFromDegrees( type, label, -out, -in ) );
removed.forEach( label -> updateRelationshipsCountsFromDegrees( type, label, -out, -in ) );
} ) );
}
super.visitNodeLabelChanges( id, added, removed );
Expand Down Expand Up @@ -161,6 +156,6 @@ private void updateRelationshipCount( long startNode, int type, long endNode, in

private void visitLabels( long nodeId, PrimitiveIntVisitor<RuntimeException> visitor )
{
storeLayer.nodeGetSingleCursor( statement, nodeId, txState ).forAll( node -> node.labels().visitKeys( visitor ) );
storeLayer.nodeGetSingleCursor( nodeId, txState ).forAll( node -> node.labels().visitKeys( visitor ) );
}
}
Expand Up @@ -26,7 +26,7 @@
import org.neo4j.kernel.api.txstate.TxStateHolder;
import org.neo4j.kernel.impl.locking.LockTracer;
import org.neo4j.kernel.impl.locking.StatementLocks;
import org.neo4j.storageengine.api.StorageStatement;
import org.neo4j.storageengine.api.SchemaResources;

public interface KernelStatement extends TxStateHolder, Statement
{
Expand All @@ -38,7 +38,7 @@ public interface KernelStatement extends TxStateHolder, Statement

PageCursorTracer pageCursorTracer();

StorageStatement storageStatement();
SchemaResources schemaResources();

KernelTransaction transaction();

Expand Down
Expand Up @@ -44,7 +44,7 @@
import org.neo4j.kernel.impl.locking.LockTracer;
import org.neo4j.kernel.impl.locking.StatementLocks;
import org.neo4j.kernel.impl.proc.Procedures;
import org.neo4j.storageengine.api.StorageStatement;
import org.neo4j.storageengine.api.SchemaResources;
import org.neo4j.storageengine.api.txstate.ReadableTransactionState;
import org.neo4j.storageengine.api.txstate.WritableTransactionState;

Expand All @@ -70,7 +70,7 @@
public class KernelStatementImplementation implements KernelStatement
{
private final TxStateHolder txStateHolder;
private final StorageStatement storeStatement;
private final SchemaResources schemaResources;
private final AccessCapability accessCapability;
private final KernelTransactionImplementation transaction;
private final OperationsFacade facade;
Expand All @@ -82,14 +82,14 @@ public class KernelStatementImplementation implements KernelStatement

public KernelStatementImplementation( KernelTransactionImplementation transaction,
TxStateHolder txStateHolder,
StorageStatement storeStatement,
SchemaResources schemaResources,
Procedures procedures,
AccessCapability accessCapability,
LockTracer systemLockTracer )
{
this.transaction = transaction;
this.txStateHolder = txStateHolder;
this.storeStatement = storeStatement;
this.schemaResources = schemaResources;
this.accessCapability = accessCapability;
this.facade = new OperationsFacade( transaction, this, procedures );
this.executingQueryList = ExecutingQueryList.EMPTY;
Expand Down Expand Up @@ -224,10 +224,7 @@ public PageCursorTracer pageCursorTracer()

public final void acquire()
{
if ( referenceCount++ == 0 )
{
storeStatement.acquire();
}
referenceCount++;
}

final boolean isAcquired()
Expand Down Expand Up @@ -270,15 +267,15 @@ public final void stopQueryExecution( ExecutingQuery executingQuery )
}

@Override
public StorageStatement storageStatement()
public SchemaResources schemaResources()
{
return storeStatement;
return schemaResources;
}

private void cleanupResources()
{
// closing is done by KTI
storeStatement.release();
schemaResources.close();
executingQueryList = ExecutingQueryList.EMPTY;
}

Expand Down
Expand Up @@ -65,7 +65,7 @@
import org.neo4j.kernel.impl.transaction.tracing.TransactionTracer;
import org.neo4j.storageengine.api.StorageCommand;
import org.neo4j.storageengine.api.StorageEngine;
import org.neo4j.storageengine.api.StorageStatement;
import org.neo4j.storageengine.api.SchemaResources;
import org.neo4j.storageengine.api.StoreReadLayer;
import org.neo4j.storageengine.api.txstate.ReadableTransactionState;
import org.neo4j.storageengine.api.txstate.TxStateVisitor;
Expand Down Expand Up @@ -117,7 +117,7 @@ public class KernelTransactionImplementation implements KernelTransaction, TxSta
private TransactionHooks.TransactionHooksState hooksState;
private StatementOperationParts currentTransactionOperations;
private final KernelStatementImplementation currentStatement;
private final StorageStatement storageStatement;
private final SchemaResources schemaResources;
private final List<CloseListener> closeListeners = new ArrayList<>( 2 );
private SecurityContext securityContext;
private volatile StatementLocks statementLocks;
Expand Down Expand Up @@ -178,9 +178,9 @@ public KernelTransactionImplementation( StatementOperationContainer operationCon
this.clock = clock;
this.transactionTracer = transactionTracer;
this.cursorTracerSupplier = cursorTracerSupplier;
this.storageStatement = storeLayer.newStatement();
this.schemaResources = storeLayer.schemaResources();
this.currentStatement =
new KernelStatementImplementation( this, this, storageStatement, procedures, accessCapability, lockTracer );
new KernelStatementImplementation( this, this, schemaResources, procedures, accessCapability, lockTracer );
this.userMetaData = Collections.emptyMap();
}

Expand Down Expand Up @@ -535,7 +535,7 @@ private long commit() throws TransactionFailureException
{
try
{
hooksState = hooks.beforeCommit( txState, this, storageEngine.storeReadLayer(), storageStatement );
hooksState = hooks.beforeCommit( txState, this, storageEngine.storeReadLayer() );
if ( hooksState != null && hooksState.failed() )
{
TransactionHookException cause = hooksState.failure();
Expand All @@ -560,8 +560,7 @@ private long commit() throws TransactionFailureException
Collection<StorageCommand> extractedCommands = new ArrayList<>();
storageEngine.createCommands(
extractedCommands,
txState,
storageStatement,
txState, schemaResources,
commitLocks,
lastTransactionIdWhenStarted );
if ( hasLegacyIndexChanges() )
Expand Down Expand Up @@ -806,7 +805,7 @@ public String toString()
@Override
public void dispose()
{
storageStatement.close();
schemaResources.close();
}

/**
Expand Down
Expand Up @@ -28,7 +28,6 @@

import org.neo4j.collection.pool.LinkedQueuePool;
import org.neo4j.collection.pool.MarshlandPool;
import org.neo4j.function.Factory;
import org.neo4j.graphdb.DatabaseShutdownException;
import org.neo4j.graphdb.TransactionFailureException;
import org.neo4j.kernel.AvailabilityGuard;
Expand Down

0 comments on commit 2586429

Please sign in to comment.