Skip to content

Commit

Permalink
Simplify NodeItem interface
Browse files Browse the repository at this point in the history
Add new operations to RawCursor

Simlify a lot of other code as well in the process
  • Loading branch information
davidegrohmann committed Jan 9, 2017
1 parent 74f2e12 commit 43a48c1
Show file tree
Hide file tree
Showing 10 changed files with 260 additions and 281 deletions.
Expand Up @@ -43,12 +43,6 @@ public boolean hasLabel( int labelId )
}
}

@Override
public PrimitiveIntIterator getLabels()
{
return Cursors.intIterator( labels(), GET_LABEL );
}

@Override
public RelationshipIterator getRelationships( Direction direction, int[] relTypes )
{
Expand All @@ -63,12 +57,6 @@ public RelationshipIterator getRelationships( Direction direction )
return new CursorRelationshipIterator( relationships( direction ) );
}

@Override
public PrimitiveIntIterator getRelationshipTypes()
{
return Cursors.intIterator( relationshipTypes(), GET_RELATIONSHIP_TYPE );
}

private static int[] deduplicate( int[] types )
{
int unique = 0;
Expand Down
Expand Up @@ -20,15 +20,17 @@
package org.neo4j.kernel.api.txstate;

import java.util.Set;
import java.util.function.Consumer;

import org.neo4j.collection.primitive.Primitive;
import org.neo4j.collection.primitive.PrimitiveIntCollections;
import org.neo4j.collection.primitive.PrimitiveIntIterator;
import org.neo4j.collection.primitive.PrimitiveIntSet;
import org.neo4j.cursor.Cursor;
import org.neo4j.kernel.api.exceptions.EntityNotFoundException;
import org.neo4j.kernel.api.exceptions.schema.ConstraintValidationKernelException;
import org.neo4j.kernel.impl.api.CountsRecordState;
import org.neo4j.kernel.impl.api.RelationshipDataExtractor;
import org.neo4j.storageengine.api.DegreeItem;
import org.neo4j.storageengine.api.LabelItem;
import org.neo4j.storageengine.api.NodeItem;
import org.neo4j.storageengine.api.StorageStatement;
import org.neo4j.storageengine.api.StoreReadLayer;
Expand All @@ -46,8 +48,7 @@ public class TransactionCountingStateVisitor extends TxStateVisitor.Delegator
private final CountsRecordState counts;
private final ReadableTransactionState txState;

public TransactionCountingStateVisitor( TxStateVisitor next,
StoreReadLayer storeLayer, StorageStatement statement,
public TransactionCountingStateVisitor( TxStateVisitor next, StoreReadLayer storeLayer, StorageStatement statement,
ReadableTransactionState txState, CountsRecordState counts )
{
super( next );
Expand All @@ -68,35 +69,27 @@ public void visitCreatedNode( long id )
public void visitDeletedNode( long id )
{
counts.incrementNodeCount( ANY_LABEL, -1 );
try ( Cursor<NodeItem> node = statement.acquireSingleNodeCursor( id ) )
statement.acquireSingleNodeCursor( id ).forAll( node ->
{
if ( node.next() )
{
// TODO Rewrite this to use cursors directly instead of iterator
PrimitiveIntIterator labels = node.get().getLabels();
if ( labels.hasNext() )
{
final int[] removed = PrimitiveIntCollections.asArray( labels );
for ( int label : removed )
PrimitiveIntSet set =
node.labels().mapReduce( Primitive.intSet(), LabelItem::getAsInt, ( labelId, current ) ->
{
counts.incrementNodeCount( label, -1 );
}
current.add( labelId );
counts.incrementNodeCount( labelId, -1 );
return current;
} );

try ( Cursor<DegreeItem> degrees = node.get().degrees() )
{
while ( degrees.next() )
{
DegreeItem degree = degrees.get();
for ( int label : removed )
{
updateRelationshipsCountsFromDegrees( degree.type(), label, -degree.outgoing(),
-degree.incoming() );
}
}
}
int[] labelIds = PrimitiveIntCollections.asArray( set.iterator() );
node.degrees().forAll( degree ->
{
for ( int labelId : labelIds )
{
updateRelationshipsCountsFromDegrees( degree.type(), labelId, -degree.outgoing(),
-degree.incoming() );
}
}
}
} );

} );
super.visitDeletedNode( id );
}

Expand All @@ -118,8 +111,7 @@ public void visitDeletedRelationship( long id )
}
catch ( EntityNotFoundException e )
{
throw new IllegalStateException(
"Relationship being deleted should exist along with its nodes.", e );
throw new IllegalStateException( "Relationship being deleted should exist along with its nodes.", e );
}
super.visitDeletedRelationship( id );
}
Expand All @@ -141,30 +133,18 @@ public void visitNodeLabelChanges( long id, final Set<Integer> added, final Set<
}
// get the relationship counts from *before* this transaction,
// the relationship changes will compensate for what happens during the transaction
try ( Cursor<NodeItem> node = statement.acquireSingleNodeCursor( id ) )
statement.acquireSingleNodeCursor( id ).forAll( node -> node.degrees().forAll( degree ->
{
if ( node.next() )
for ( Integer label : added )
{
try ( Cursor<DegreeItem> degrees = node.get().degrees() )
{
while ( degrees.next() )
{
DegreeItem degree = degrees.get();

for ( Integer label : added )
{
updateRelationshipsCountsFromDegrees( degree.type(), label, degree.outgoing(),
degree.incoming() );
}
for ( Integer label : removed )
{
updateRelationshipsCountsFromDegrees( degree.type(), label, -degree.outgoing(),
-degree.incoming() );
}
}
}
updateRelationshipsCountsFromDegrees( degree.type(), label, degree.outgoing(), degree.incoming() );
}
}
for ( Integer label : removed )
{
updateRelationshipsCountsFromDegrees( degree.type(), label, -degree.outgoing(),
-degree.incoming() );
}
} ) );
}
super.visitNodeLabelChanges( id, added, removed );
}
Expand All @@ -182,31 +162,17 @@ private void updateRelationshipsCountsFromDegrees( int type, int label, long out
private void updateRelationshipCount( long startNode, int type, long endNode, int delta )
{
updateRelationshipsCountsFromDegrees( type, ANY_LABEL, delta, 0 );
for ( PrimitiveIntIterator startLabels = labelsOf( startNode ); startLabels.hasNext(); )
{
updateRelationshipsCountsFromDegrees( type, startLabels.next(), delta, 0 );
}
for ( PrimitiveIntIterator endLabels = labelsOf( endNode ); endLabels.hasNext(); )
{
updateRelationshipsCountsFromDegrees( type, endLabels.next(), 0, delta );
}
visitLabels( startNode, ( label ) -> updateRelationshipsCountsFromDegrees( type, label.getAsInt(), delta, 0 ) );
visitLabels( endNode, ( label ) -> updateRelationshipsCountsFromDegrees( type, label.getAsInt(), 0, delta ) );
}

private PrimitiveIntIterator labelsOf( long nodeId )
private void visitLabels( long nodeId, Consumer<LabelItem> consumer )
{
try ( Cursor<NodeItem> node = nodeCursor( statement, nodeId ) )
{
if ( node.next() )
{
return node.get().getLabels();
}
return PrimitiveIntCollections.emptyIterator();
}
nodeCursor( statement, nodeId ).forAll( node -> node.labels().forAll( consumer ) );
}

private Cursor<NodeItem> nodeCursor( StorageStatement statement, long nodeId )
{
Cursor<NodeItem> cursor = statement.acquireSingleNodeCursor( nodeId );
return txState.augmentSingleNodeCursor( cursor, nodeId );
return txState.augmentSingleNodeCursor( statement.acquireSingleNodeCursor( nodeId ), nodeId );
}
}
Expand Up @@ -24,6 +24,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.IntSupplier;
import java.util.stream.Stream;

import org.neo4j.collection.RawIterator;
Expand Down Expand Up @@ -98,7 +99,9 @@
import org.neo4j.kernel.impl.api.store.RelationshipIterator;
import org.neo4j.kernel.impl.proc.Procedures;
import org.neo4j.kernel.impl.query.QuerySource;
import org.neo4j.kernel.impl.util.Cursors;
import org.neo4j.register.Register.DoubleLongRegister;
import org.neo4j.storageengine.api.LabelItem;
import org.neo4j.storageengine.api.NodeItem;
import org.neo4j.storageengine.api.RelationshipItem;
import org.neo4j.storageengine.api.Token;
Expand Down Expand Up @@ -319,7 +322,7 @@ public PrimitiveIntIterator nodeGetLabels( long nodeId ) throws EntityNotFoundEx
statement.assertOpen();
try ( Cursor<NodeItem> node = dataRead().nodeCursorById( statement, nodeId ) )
{
return node.get().getLabels();
return Cursors.intIterator( node.get().labels(), LabelItem::getAsInt );
}
}

Expand Down Expand Up @@ -425,7 +428,7 @@ public PrimitiveIntIterator nodeGetRelationshipTypes( long nodeId ) throws Entit
statement.assertOpen();
try ( Cursor<NodeItem> node = dataRead().nodeCursorById( statement, nodeId ) )
{
return node.get().getRelationshipTypes();
return Cursors.intIterator( node.get().relationshipTypes(), IntSupplier::getAsInt );
}
}

Expand Down
Expand Up @@ -26,104 +26,104 @@
import org.neo4j.function.ThrowingConsumer;
import org.neo4j.kernel.api.exceptions.KernelException;
import org.neo4j.kernel.impl.api.operations.EntityReadOperations;
import org.neo4j.kernel.impl.api.store.RelationshipIterator;
import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.kernel.impl.locking.ResourceTypes;
import org.neo4j.storageengine.api.Direction;
import org.neo4j.storageengine.api.NodeItem;
import org.neo4j.storageengine.api.RelationshipItem;

class TwoPhaseNodeForRelationshipLocking
{
private final PrimitiveLongSet nodeIds = Primitive.longSet();
private final EntityReadOperations entityReadOperations;
private final ThrowingConsumer<Long,KernelException> relIdAction;

private boolean retry = true;
private long firstRelId;
private boolean first;

private final RelationshipVisitor<RuntimeException> collectNodeIdVisitor =
(relId, type, startNode, endNode) -> {
if ( firstRelId == -1 )
{
firstRelId = relId;
}
nodeIds.add( startNode );
nodeIds.add( endNode );
};

private final RelationshipVisitor<KernelException> relationshipConsumingVisitor =
new RelationshipVisitor<KernelException>()
{
@Override
public void visit( long relId, int type, long startNode, long endNode ) throws KernelException
{
if ( first )
{
first = false;
if ( relId != firstRelId )
{
// if the first relationship is not the same someone added some new rels, so we need to
// lock them all again
retry = true;
return;
}
}

relIdAction.accept( relId );
}
};

TwoPhaseNodeForRelationshipLocking( EntityReadOperations entityReadOperations, ThrowingConsumer<Long,KernelException> relIdAction )
TwoPhaseNodeForRelationshipLocking( EntityReadOperations entityReadOperations,
ThrowingConsumer<Long,KernelException> relIdAction )
{
this.entityReadOperations = entityReadOperations;
this.relIdAction = relIdAction;
}

void lockAllNodesAndConsumeRelationships( long nodeId, final KernelStatement state ) throws KernelException
{
boolean retry = true;
nodeIds.add( nodeId );
while ( retry )
{
retry = false;
first = true;
firstRelId = -1;

// lock all the nodes involved by following the node id ordering
try ( Cursor<NodeItem> cursor = entityReadOperations.nodeCursorById( state, nodeId ) )
try( Cursor<NodeItem> node = entityReadOperations.nodeCursorById( state, nodeId ) )
{
RelationshipIterator relationships = cursor.get().getRelationships( Direction.BOTH );
while ( relationships.hasNext() )
{
entityReadOperations.relationshipVisit( state, relationships.next(), collectNodeIdVisitor );
}
node.get().relationships( Direction.BOTH ).forAll( this::collectNodeId );
}

PrimitiveLongIterator nodeIdIterator = nodeIds.iterator();
while ( nodeIdIterator.hasNext() )
{
state.locks().optimistic().acquireExclusive( state.lockTracer(), ResourceTypes.NODE, nodeIdIterator.next() );
}
lockAllNodes( state );

// perform the action on each relationship, we will retry if the the relationship iterator contains new relationships
try ( Cursor<NodeItem> cursor = entityReadOperations.nodeCursorById( state, nodeId ) )
try ( Cursor<NodeItem> node = entityReadOperations.nodeCursorById( state, nodeId ) )
{
RelationshipIterator relationships = cursor.get().getRelationships( Direction.BOTH );
while ( relationships.hasNext() )
try ( Cursor<RelationshipItem> relationships = node.get().relationships( Direction.BOTH ) )
{
entityReadOperations.relationshipVisit( state, relationships.next(), relationshipConsumingVisitor );
if ( retry )
boolean first = true;
while ( relationships.next() && !retry )
{
PrimitiveLongIterator iterator = nodeIds.iterator();
while ( iterator.hasNext() )
{
state.locks().optimistic().releaseExclusive( ResourceTypes.NODE, iterator.next() );
}
nodeIds.clear();
break;
retry = performAction( state, relationships.get(), first );
first = false;
}
}
}
}
}

private void lockAllNodes( KernelStatement state )
{
PrimitiveLongIterator nodeIdIterator = nodeIds.iterator();
while ( nodeIdIterator.hasNext() )
{
state.locks().optimistic()
.acquireExclusive( state.lockTracer(), ResourceTypes.NODE, nodeIdIterator.next() );
}
}

private void unlockAllNodes( KernelStatement state )
{
PrimitiveLongIterator iterator = nodeIds.iterator();
while ( iterator.hasNext() )
{
state.locks().optimistic().releaseExclusive( ResourceTypes.NODE, iterator.next() );
}
nodeIds.clear();
}

private boolean performAction( KernelStatement state, RelationshipItem rel, boolean first ) throws KernelException
{
if ( first )
{
if ( rel.id() != firstRelId )
{
// if the first relationship is not the same someone added some new rels, so we need to
// lock them all again
unlockAllNodes( state );
return true;
}
}

relIdAction.accept( rel.id() );
return false;
}

private void collectNodeId( RelationshipItem rel )
{
if ( firstRelId == -1 )
{
firstRelId = rel.id();
}

nodeIds.add( rel.startNode() );
nodeIds.add( rel.endNode() );
}
}

0 comments on commit 43a48c1

Please sign in to comment.