Skip to content

Commit

Permalink
Merge pull request #12 from davidegrohmann/3.3-parallel-node-scan
Browse files Browse the repository at this point in the history
Parallel node scan
  • Loading branch information
ragadeeshu authored and davidegrohmann committed May 8, 2017
2 parents 9608c17 + dec070f commit 0157505
Show file tree
Hide file tree
Showing 26 changed files with 1,127 additions and 239 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,18 @@ public void clear()
}
}

@Override
public PrimitiveIntSet augmentLabels( PrimitiveIntSet labels )
{
ReadableDiffSets<Integer> labelDiffSets = labelDiffSets();
if ( !labelDiffSets.isEmpty() )
{
labelDiffSets.getRemoved().forEach( labels::remove );
labelDiffSets.getAdded().forEach( labels::add );
}
return labels;
}

@Override
public int augmentDegree( Direction direction, int degree )
{
Expand Down Expand Up @@ -242,116 +254,7 @@ final NodeStateImpl createValue( Long id, TxState state )
@Override
final NodeState defaultValue()
{
return DEFAULT;
return NodeState.EMPTY;
}

private static final NodeState DEFAULT = new NodeState()
{
@Override
public Iterator<StorageProperty> addedProperties()
{
return Iterators.emptyIterator();
}

@Override
public Iterator<StorageProperty> changedProperties()
{
return Iterators.emptyIterator();
}

@Override
public Iterator<Integer> removedProperties()
{
return Iterators.emptyIterator();
}

@Override
public Iterator<StorageProperty> addedAndChangedProperties()
{
return Iterators.emptyIterator();
}

@Override
public Iterator<StorageProperty> augmentProperties( Iterator<StorageProperty> iterator )
{
return iterator;
}

@Override
public void accept( PropertyContainerState.Visitor visitor ) throws ConstraintValidationException
{
}

@Override
public ReadableDiffSets<Integer> labelDiffSets()
{
return ReadableDiffSets.Empty.instance();
}

@Override
public int augmentDegree( Direction direction, int degree )
{
return degree;
}

@Override
public int augmentDegree( Direction direction, int degree, int typeId )
{
return degree;
}

@Override
public void accept( NodeState.Visitor visitor )
{
}

@Override
public PrimitiveIntSet relationshipTypes()
{
return Primitive.intSet();
}

@Override
public long getId()
{
throw new UnsupportedOperationException( "id not defined" );
}

@Override
public boolean hasChanges()
{
return false;
}

@Override
public StorageProperty getChangedProperty( int propertyKeyId )
{
return null;
}

@Override
public StorageProperty getAddedProperty( int propertyKeyId )
{
return null;
}

@Override
public boolean isPropertyRemoved( int propertyKeyId )
{
return false;
}

@Override
public PrimitiveLongIterator getAddedRelationships( Direction direction )
{
return null;
}

@Override
public PrimitiveLongIterator getAddedRelationships( Direction direction, int[] relTypes )
{
return null;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -642,18 +642,6 @@ public RelationshipState getRelationshipState( long id )
return RELATIONSHIP_STATE.get( this, id );
}

@Override
public PrimitiveIntSet augmentLabels( PrimitiveIntSet labels, NodeState nodeState )
{
ReadableDiffSets<Integer> labelDiffSets = nodeState.labelDiffSets();
if ( !labelDiffSets.isEmpty() )
{
labelDiffSets.getRemoved().forEach( labels::remove );
labelDiffSets.getAdded().forEach( labels::add );
}
return labels;
}

@Override
public ReadableDiffSets<Long> nodesWithLabelChanged( int labelId )
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,71 @@
*/
package org.neo4j.kernel.impl.api.store;

import org.neo4j.kernel.api.StatementConstants;
import java.util.Iterator;

import org.neo4j.kernel.impl.store.NodeStore;
import org.neo4j.storageengine.api.txstate.NodeState;
import org.neo4j.storageengine.api.txstate.ReadableTransactionState;

public class AllNodeProgression implements NodeProgression
{
private final AllIdIterator allIdIterator;
private final NodeStore nodeStore;
private final ReadableTransactionState state;

private long start;
private boolean done;

AllNodeProgression( NodeStore nodeStore )
AllNodeProgression( NodeStore nodeStore, ReadableTransactionState state )
{
allIdIterator = new AllIdIterator( nodeStore );
this.nodeStore = nodeStore;
this.state = state;
this.start = nodeStore.getNumberOfReservedLowIds();
}

@Override
public long nextId()
public boolean nextBatch( Batch batch )
{
if ( allIdIterator.hasNext() )
while ( true )
{
return allIdIterator.next();
if ( done )
{
batch.nothing();
return false;
}

long highId = nodeStore.getHighestPossibleIdInUse();
if ( start <= highId )
{
batch.init( start, highId );
start = highId + 1;
return true;
}

done = true;
}
return StatementConstants.NO_SUCH_NODE;
}

@Override
public TransactionStateAccessMode mode()
public Iterator<Long> addedNodes()
{
return state == null ? null : state.addedAndRemovedNodes().getAdded().iterator();
}

@Override
public boolean fetchFromTxState( long id )
{
return false;
}

@Override
public boolean fetchFromDisk( long id )
{
return state == null || !state.nodeIsDeletedInThisTx( id );
}

@Override
public NodeState nodeState( long id )
{
return TransactionStateAccessMode.APPEND;
return state == null ? NodeState.EMPTY : state.getNodeState( id );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,26 +35,23 @@
import org.neo4j.kernel.impl.util.IoPrimitiveUtils;
import org.neo4j.storageengine.api.NodeItem;
import org.neo4j.storageengine.api.txstate.NodeState;
import org.neo4j.storageengine.api.txstate.ReadableTransactionState;

import static org.neo4j.collection.primitive.PrimitiveIntCollections.asSet;
import static org.neo4j.kernel.impl.api.store.TransactionStateAccessMode.APPEND;
import static org.neo4j.kernel.impl.api.store.TransactionStateAccessMode.FETCH;
import static org.neo4j.kernel.impl.locking.LockService.NO_LOCK;
import static org.neo4j.kernel.impl.locking.LockService.NO_LOCK_SERVICE;
import static org.neo4j.kernel.impl.store.record.RecordLoad.CHECK;
import static org.neo4j.kernel.impl.util.IoPrimitiveUtils.safeCastLongToInt;

public class NodeCursor implements NodeItem, Cursor<NodeItem>, Disposable
{
private final NodeProgression.Batch batch = new NodeProgression.Batch();
private final NodeRecord nodeRecord;
private final Consumer<NodeCursor> instanceCache;
private final NodeStore nodeStore;
private final LockService lockService;
private final PageCursor pageCursor;

private NodeProgression progression;
private ReadableTransactionState state;
private boolean fetched;
private long[] labels;
private Iterator<Long> added;
Expand All @@ -68,13 +65,10 @@ public class NodeCursor implements NodeItem, Cursor<NodeItem>, Disposable
this.lockService = lockService;
}

public Cursor<NodeItem> init( NodeProgression progression, ReadableTransactionState state )
public Cursor<NodeItem> init( NodeProgression progression )
{
this.progression = progression;
this.state = state;
this.added = state != null && progression.mode() == APPEND
? state.addedAndRemovedNodes().getAdded().iterator()
: null;
this.added = progression.addedNodes();
return this;
}

Expand All @@ -87,18 +81,16 @@ public boolean next()
private boolean fetchNext()
{
labels = null;
long id;
while ( progression != null && (id = progression.nextId()) >= 0 )
while ( progression != null && (batch.hasNext() || progression.nextBatch( batch ) ) )
{

if ( state != null && progression.mode() == FETCH && state.nodeIsAddedInThisTx( id ) )
long id = batch.next();
if ( progression.fetchFromTxState( id ) )
{
recordFromTxState( id );
return true;
}

if ( (state == null || !state.nodeIsDeletedInThisTx( id )) &&
nodeStore.readRecord( id, nodeRecord, CHECK, pageCursor ).inUse() )
if ( progression.fetchFromDisk( id ) && nodeStore.readRecord( id, nodeRecord, CHECK, pageCursor ).inUse() )
{
return true;
}
Expand All @@ -125,8 +117,8 @@ public void close()
fetched = false;
labels = null;
added = null;
state = null;
progression = null;
batch.nothing();
instanceCache.accept( this );
}

Expand All @@ -151,19 +143,19 @@ public NodeItem get()
public PrimitiveIntSet labels()
{
PrimitiveIntSet labels = asSet( loadedLabels(), IoPrimitiveUtils::safeCastLongToInt );
return state != null ? state.augmentLabels( labels, state.getNodeState( id() ) ) : labels;
return progression.nodeState( id() ).augmentLabels( labels );
}

@Override
public boolean hasLabel( int labelId )
{
NodeState nodeState = state == null ? null : state.getNodeState( id() );
if ( state != null && nodeState.labelDiffSets().getRemoved().contains( labelId ) )
NodeState nodeState = progression.nodeState( id() );
if ( nodeState.labelDiffSets().getRemoved().contains( labelId ) )
{
return false;
}

if ( state != null && nodeState.labelDiffSets().getAdded().contains( labelId ) )
if ( nodeState.labelDiffSets().getAdded().contains( labelId ) )
{
return true;
}
Expand Down Expand Up @@ -221,7 +213,7 @@ public long nextPropertyId()
@Override
public Lock lock()
{
return state != null && state.nodeIsAddedInThisTx( id() ) ? NO_LOCK : acquireLock();
return progression.fetchFromTxState( id() ) ? NO_LOCK : acquireLock();
}

private Lock acquireLock()
Expand Down

0 comments on commit 0157505

Please sign in to comment.