Skip to content

Commit

Permalink
Merge pull request #14 from davidegrohmann/3.3-parallel-scan-read-ope…
Browse files Browse the repository at this point in the history
…rations

Expose Parallel Node Scan in ReadOperations
  • Loading branch information
ragadeeshu authored and davidegrohmann committed May 8, 2017
2 parents 3f4bbe0 + d84082a commit 530cd9d
Show file tree
Hide file tree
Showing 56 changed files with 657 additions and 243 deletions.
Expand Up @@ -158,6 +158,7 @@
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
import org.neo4j.logging.Logger;
import org.neo4j.storageengine.api.BatchingProgressionFactory;
import org.neo4j.storageengine.api.StorageEngine;
import org.neo4j.storageengine.api.StoreFileMetadata;
import org.neo4j.storageengine.api.StoreReadLayer;
Expand Down Expand Up @@ -269,6 +270,7 @@ boolean applicable( DiagnosticsPhase phase )
private final AvailabilityGuard availabilityGuard;
private final SystemNanoClock clock;
private final StoreCopyCheckPointMutex storeCopyCheckPointMutex;
private final BatchingProgressionFactory progressionFactory;

private Dependencies dependencies;
private LifeSupport life;
Expand Down Expand Up @@ -324,7 +326,8 @@ public NeoStoreDataSource(
AvailabilityGuard availabilityGuard,
SystemNanoClock clock,
AccessCapability accessCapability,
StoreCopyCheckPointMutex storeCopyCheckPointMutex )
StoreCopyCheckPointMutex storeCopyCheckPointMutex,
BatchingProgressionFactory progressionFactory )
{
this.storeDir = storeDir;
this.config = config;
Expand Down Expand Up @@ -361,6 +364,7 @@ public NeoStoreDataSource(
this.availabilityGuard = availabilityGuard;
this.clock = clock;
this.accessCapability = accessCapability;
this.progressionFactory = progressionFactory;

readOnly = config.get( Configuration.read_only );
msgLog = logProvider.getLog( getClass() );
Expand Down Expand Up @@ -435,9 +439,9 @@ public void start() throws IOException

SynchronizedArrayIdOrderingQueue legacyIndexTransactionOrdering = new SynchronizedArrayIdOrderingQueue( 20 );

storageEngine = buildStorageEngine(
propertyKeyTokenHolder, labelTokens, relationshipTypeTokens, legacyIndexProviderLookup,
indexConfigStore, updateableSchemaState::clear, legacyIndexTransactionOrdering );
storageEngine = buildStorageEngine( propertyKeyTokenHolder, labelTokens, relationshipTypeTokens,
legacyIndexProviderLookup, indexConfigStore, updateableSchemaState::clear,
legacyIndexTransactionOrdering, progressionFactory );

LogEntryReader<ReadableClosablePositionAwareChannel> logEntryReader =
new VersionAwareLogEntryReader<>( storageEngine.commandReaderFactory(), STRICT );
Expand All @@ -448,7 +452,7 @@ public void start() throws IOException
buildTransactionLogs( storeDir, config, logProvider, scheduler, fs,
storageEngine, logEntryReader, legacyIndexTransactionOrdering,
transactionIdStore, logVersionRepository );
transactionLogModule.satisfyDependencies(dependencies);
transactionLogModule.satisfyDependencies( dependencies );

buildRecovery( fs,
transactionIdStore,
Expand Down Expand Up @@ -565,23 +569,24 @@ private void upgradeStore( RecordFormats format )
format ).migrate( storeDir );
}

private StorageEngine buildStorageEngine(
PropertyKeyTokenHolder propertyKeyTokenHolder, LabelTokenHolder labelTokens,
RelationshipTypeTokenHolder relationshipTypeTokens,
private StorageEngine buildStorageEngine( PropertyKeyTokenHolder propertyKeyTokenHolder,
LabelTokenHolder labelTokens, RelationshipTypeTokenHolder relationshipTypeTokens,
LegacyIndexProviderLookup legacyIndexProviderLookup, IndexConfigStore indexConfigStore,
Runnable schemaStateChangeCallback, SynchronizedArrayIdOrderingQueue legacyIndexTransactionOrdering )
Runnable schemaStateChangeCallback, SynchronizedArrayIdOrderingQueue legacyIndexTransactionOrdering,
BatchingProgressionFactory progressionFactory )
{
// TODO we should break this dependency on the kernelModule (which has not yet been created at this point in
// TODO the code) and instead let information about generations of transactions flow through the StorageEngine
// TODO API
Supplier<KernelTransactionsSnapshot> transactionSnapshotSupplier =
() -> kernelModule.kernelTransactions().get();
RecordStorageEngine storageEngine = new RecordStorageEngine( storeDir, config, idGeneratorFactory,
eligibleForReuse, idTypeConfigurationProvider, pageCache, fs, logProvider, propertyKeyTokenHolder,
labelTokens, relationshipTypeTokens, schemaStateChangeCallback, constraintSemantics,
storageStatementFactory, scheduler, tokenNameLookup, lockService, schemaIndexProvider,
indexingServiceMonitor, databaseHealth, labelScanStoreProvider, legacyIndexProviderLookup,
indexConfigStore, legacyIndexTransactionOrdering, transactionSnapshotSupplier );
RecordStorageEngine storageEngine =
new RecordStorageEngine( storeDir, config, idGeneratorFactory, eligibleForReuse,
idTypeConfigurationProvider, pageCache, fs, logProvider, propertyKeyTokenHolder, labelTokens,
relationshipTypeTokens, schemaStateChangeCallback, constraintSemantics, storageStatementFactory,
scheduler, tokenNameLookup, lockService, schemaIndexProvider, indexingServiceMonitor,
databaseHealth, labelScanStoreProvider, legacyIndexProviderLookup, indexConfigStore,
legacyIndexTransactionOrdering, transactionSnapshotSupplier, progressionFactory );

// We pretend that the storage engine abstract hides all details within it. Whereas that's mostly
// true it's not entirely true for the time being. As long as we need this call below, which
Expand Down
Expand Up @@ -48,9 +48,9 @@
import org.neo4j.kernel.api.schema.SchemaDescriptor;
import org.neo4j.kernel.api.schema.constaints.ConstraintDescriptor;
import org.neo4j.kernel.api.schema.index.IndexDescriptor;
import org.neo4j.kernel.impl.api.RelationshipVisitor;
import org.neo4j.kernel.impl.api.store.RelationshipIterator;
import org.neo4j.register.Register.DoubleLongRegister;
import org.neo4j.storageengine.api.BatchingLongProgression;
import org.neo4j.storageengine.api.NodeItem;
import org.neo4j.storageengine.api.PropertyItem;
import org.neo4j.storageengine.api.RelationshipItem;
Expand Down Expand Up @@ -212,6 +212,10 @@ long nodesCountIndexed( IndexDescriptor index, long nodeId, Object value )

Cursor<NodeItem> nodeCursorById( long nodeId ) throws EntityNotFoundException;

Cursor<NodeItem> nodeGeCursor( BatchingLongProgression progression );

BatchingLongProgression parallelNodeScan();

Cursor<RelationshipItem> relationshipCursorById( long relId ) throws EntityNotFoundException;

Cursor<PropertyItem> nodeGetProperties( NodeItem node );
Expand Down
Expand Up @@ -65,7 +65,7 @@ public void visitCreatedNode( long id )
public void visitDeletedNode( long id )
{
counts.incrementNodeCount( ANY_LABEL, -1 );
storeLayer.nodeCursor( statement, id, ReadableTransactionState.EMPTY )
storeLayer.nodeGetSingleCursor( statement, id, ReadableTransactionState.EMPTY )
.forAll( this::decrementCountForLabelsAndRelationships );
super.visitDeletedNode( id );
}
Expand Down Expand Up @@ -125,7 +125,7 @@ public void visitNodeLabelChanges( long id, final Set<Integer> added, final Set<
}
// get the relationship counts from *before* this transaction,
// the relationship changes will compensate for what happens during the transaction
storeLayer.nodeCursor( statement, id, ReadableTransactionState.EMPTY )
storeLayer.nodeGetSingleCursor( statement, id, ReadableTransactionState.EMPTY )
.forAll( node -> storeLayer.degrees( statement, node, ( type, out, in ) ->
{
added.forEach( label -> updateRelationshipsCountsFromDegrees( type, label, out, in ) );
Expand Down Expand Up @@ -161,6 +161,6 @@ private void updateRelationshipCount( long startNode, int type, long endNode, in

private void visitLabels( long nodeId, PrimitiveIntVisitor<RuntimeException> visitor )
{
storeLayer.nodeCursor( statement, nodeId, txState ).forAll( node -> node.labels().visitKeys( visitor ) );
storeLayer.nodeGetSingleCursor( statement, nodeId, txState ).forAll( node -> node.labels().visitKeys( visitor ) );
}
}
Expand Up @@ -72,6 +72,7 @@
import org.neo4j.kernel.impl.constraints.ConstraintSemantics;
import org.neo4j.kernel.impl.locking.LockTracer;
import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.storageengine.api.BatchingLongProgression;
import org.neo4j.storageengine.api.Direction;
import org.neo4j.storageengine.api.NodeItem;
import org.neo4j.storageengine.api.PropertyItem;
Expand Down Expand Up @@ -465,6 +466,18 @@ public PrimitiveLongIterator relationshipsGetAll( KernelStatement state )
return entityReadOperations.relationshipsGetAll( state );
}

@Override
public BatchingLongProgression parallelNodeScanProgression( KernelStatement statement )
{
return entityReadOperations.parallelNodeScanProgression( statement );
}

@Override
public Cursor<NodeItem> nodeGetCursor( KernelStatement statement, BatchingLongProgression progression )
{
return entityReadOperations.nodeGetCursor( statement, progression );
}

@Override
public Cursor<NodeItem> nodeGetAllCursor( KernelStatement statement )
{
Expand Down
Expand Up @@ -39,6 +39,7 @@
import org.neo4j.kernel.guard.Guard;
import org.neo4j.kernel.impl.api.operations.EntityReadOperations;
import org.neo4j.kernel.impl.api.operations.EntityWriteOperations;
import org.neo4j.storageengine.api.BatchingLongProgression;
import org.neo4j.storageengine.api.Direction;
import org.neo4j.storageengine.api.NodeItem;
import org.neo4j.storageengine.api.PropertyItem;
Expand Down Expand Up @@ -234,6 +235,20 @@ public PrimitiveLongIterator relationshipsGetAll( KernelStatement statement )
return entityReadDelegate.relationshipsGetAll( statement );
}

@Override
public BatchingLongProgression parallelNodeScanProgression( KernelStatement statement )
{
guard.check( statement );
return entityReadDelegate.parallelNodeScanProgression( statement );
}

@Override
public Cursor<NodeItem> nodeGetCursor( KernelStatement statement, BatchingLongProgression progression )
{
guard.check( statement );
return entityReadDelegate.nodeGetCursor( statement, progression );
}

@Override
public Cursor<NodeItem> nodeGetAllCursor( KernelStatement statement )
{
Expand Down
Expand Up @@ -96,10 +96,9 @@ public class KernelTransactions extends LifecycleAdapter implements Supplier<Ker
*/
private final Set<KernelTransactionImplementation> allTransactions = newSetFromMap( new ConcurrentHashMap<>() );

// This is the factory that actually builds brand-new instances.
private final Factory<KernelTransactionImplementation> factory = new KernelTransactionImplementationFactory( allTransactions );
// Global pool of transactions, wrapped by the thread-local marshland pool and so is not used directly.
private final LinkedQueuePool<KernelTransactionImplementation> globalTxPool = new GlobalKernelTransactionPool( allTransactions, factory );
private final LinkedQueuePool<KernelTransactionImplementation> globalTxPool =
new GlobalKernelTransactionPool( allTransactions );
// Pool of unused transactions.
private final MarshlandPool<KernelTransactionImplementation> localTxPool = new MarshlandPool<>( globalTxPool );

Expand Down Expand Up @@ -288,17 +287,6 @@ KernelTransactionHandle createHandle( KernelTransactionImplementation tx )
return new KernelTransactionImplementationHandle( tx );
}

/**
* Get all transactions
* * <p>
* <b>Note:</b> this method is package-private for testing <b>only</b>.
* @return set of all kernel transaction
*/
Set<KernelTransactionImplementation> getAllTransactions()
{
return allTransactions;
}

private void assertRunning()
{
if ( availabilityGuard.isShutdown() )
Expand All @@ -320,17 +308,18 @@ private void assertCurrentThreadIsNotBlockingNewTransactions()
}
}

private class KernelTransactionImplementationFactory implements Factory<KernelTransactionImplementation>
private class GlobalKernelTransactionPool extends LinkedQueuePool<KernelTransactionImplementation>
{
private Set<KernelTransactionImplementation> transactions;

KernelTransactionImplementationFactory( Set<KernelTransactionImplementation> transactions )
GlobalKernelTransactionPool( Set<KernelTransactionImplementation> transactions )
{
super( 8 );
this.transactions = transactions;
}

@Override
public KernelTransactionImplementation newInstance()
protected KernelTransactionImplementation create()
{
KernelTransactionImplementation tx =
new KernelTransactionImplementation( statementOperations, schemaWriteGuard, hooks,
Expand All @@ -341,18 +330,6 @@ public KernelTransactionImplementation newInstance()
this.transactions.add( tx );
return tx;
}
}

private class GlobalKernelTransactionPool extends LinkedQueuePool<KernelTransactionImplementation>
{
private Set<KernelTransactionImplementation> transactions;

GlobalKernelTransactionPool( Set<KernelTransactionImplementation> transactions,
Factory<KernelTransactionImplementation> factory )
{
super( 8, factory );
this.transactions = transactions;
}

@Override
protected void dispose( KernelTransactionImplementation tx )
Expand Down
Expand Up @@ -105,6 +105,7 @@
import org.neo4j.kernel.impl.proc.Procedures;
import org.neo4j.kernel.impl.query.clientconnection.ClientConnectionInfo;
import org.neo4j.register.Register.DoubleLongRegister;
import org.neo4j.storageengine.api.BatchingLongProgression;
import org.neo4j.storageengine.api.NodeItem;
import org.neo4j.storageengine.api.PropertyItem;
import org.neo4j.storageengine.api.RelationshipItem;
Expand Down Expand Up @@ -554,6 +555,20 @@ public Cursor<NodeItem> nodeCursorById( long nodeId ) throws EntityNotFoundExcep
return dataRead().nodeCursorById( statement, nodeId );
}

@Override
public Cursor<NodeItem> nodeGeCursor( BatchingLongProgression progression )
{
statement.assertOpen();
return dataRead().nodeGetCursor( statement, progression );
}

@Override
public BatchingLongProgression parallelNodeScan()
{
statement.assertOpen();
return dataRead().parallelNodeScanProgression( statement );
}

@Override
public Cursor<RelationshipItem> relationshipCursorById( long relId ) throws EntityNotFoundException
{
Expand Down
Expand Up @@ -89,6 +89,7 @@
import org.neo4j.kernel.impl.index.IndexEntityType;
import org.neo4j.kernel.impl.index.LegacyIndexStore;
import org.neo4j.register.Register.DoubleLongRegister;
import org.neo4j.storageengine.api.BatchingLongProgression;
import org.neo4j.storageengine.api.Direction;
import org.neo4j.storageengine.api.EntityType;
import org.neo4j.storageengine.api.NodeItem;
Expand Down Expand Up @@ -140,6 +141,18 @@ public StateHandlingStatementOperations( StoreReadLayer storeLayer, AutoIndexing

// <Cursors>

@Override
public BatchingLongProgression parallelNodeScanProgression( KernelStatement statement )
{
return storeLayer.parallelNodeScanProgression( statement.storageStatement() );
}

@Override
public Cursor<NodeItem> nodeGetCursor( KernelStatement statement, BatchingLongProgression progression )
{
return storeLayer.nodeGetCursor( statement.storageStatement(), progression, statement.readableTxState() );
}

@Override
public Cursor<NodeItem> nodeGetAllCursor( KernelStatement statement )
{
Expand All @@ -160,7 +173,7 @@ public Cursor<NodeItem> nodeCursorById( KernelStatement statement, long nodeId )

private Cursor<NodeItem> nodeCursor( KernelStatement statement, long nodeId )
{
return storeLayer.nodeCursor( statement.storageStatement(), nodeId, statement.readableTxState() );
return storeLayer.nodeGetSingleCursor( statement.storageStatement(), nodeId, statement.readableTxState() );
}

@Override
Expand Down
Expand Up @@ -32,7 +32,7 @@
import org.neo4j.kernel.api.schema.IndexQuery;
import org.neo4j.kernel.api.schema.index.IndexDescriptor;
import org.neo4j.kernel.impl.api.KernelStatement;
import org.neo4j.kernel.impl.api.RelationshipVisitor;
import org.neo4j.storageengine.api.BatchingLongProgression;
import org.neo4j.storageengine.api.Direction;
import org.neo4j.storageengine.api.NodeItem;
import org.neo4j.storageengine.api.PropertyItem;
Expand Down Expand Up @@ -83,6 +83,10 @@ long nodesCountIndexed( KernelStatement statement, IndexDescriptor index, long n

PrimitiveLongIterator relationshipsGetAll( KernelStatement state );

BatchingLongProgression parallelNodeScanProgression( KernelStatement statement );

Cursor<NodeItem> nodeGetCursor( KernelStatement statement, BatchingLongProgression progression );

Cursor<NodeItem> nodeGetAllCursor( KernelStatement statement );

Cursor<NodeItem> nodeCursorById( KernelStatement statement, long nodeId ) throws EntityNotFoundException;
Expand Down
Expand Up @@ -20,15 +20,16 @@
package org.neo4j.kernel.impl.api.store;

import org.neo4j.kernel.impl.store.NodeStore;
import org.neo4j.storageengine.api.BatchingLongProgression;

public class AllNodeProgression implements BatchingLongProgression
public class AllNodeScan implements BatchingLongProgression
{
private final NodeStore nodeStore;

private long start;
private boolean done;

AllNodeProgression( NodeStore nodeStore )
AllNodeScan( NodeStore nodeStore )
{
this.nodeStore = nodeStore;
this.start = nodeStore.getNumberOfReservedLowIds();
Expand Down

0 comments on commit 530cd9d

Please sign in to comment.