Skip to content

Commit

Permalink
Group cursor to handle new types added in transaction
Browse files Browse the repository at this point in the history
When there are new relationship types added in the transaction the
group cursor must go over these as well.
  • Loading branch information
pontusmelke committed Feb 25, 2018
1 parent 3ec7c9f commit 773d7eb
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 27 deletions.
Expand Up @@ -682,6 +682,114 @@ public void shouldCountFromTxState() throws Exception
} );
}

@Test
public void groupCursorShouldSeeNewTypes() throws Exception
{
try (Transaction tx = session.beginTransaction())
{
Write write = tx.dataWrite();
long start = write.nodeCreate();
long out = write.relationshipCreate( start,
tx.tokenWrite().relationshipTypeGetOrCreateForName( "OUT" ),
write.nodeCreate() );
long in1 = write.relationshipCreate( write.nodeCreate(),
tx.tokenWrite().relationshipTypeGetOrCreateForName( "IN" ),
start );
long in2 = write.relationshipCreate( write.nodeCreate(),
tx.tokenWrite().relationshipTypeGetOrCreateForName( "IN" ),
start );
long loop = write.relationshipCreate( start,
tx.tokenWrite().relationshipTypeGetOrCreateForName( "LOOP" ),
start );
try ( NodeCursor node = cursors.allocateNodeCursor();
RelationshipTraversalCursor traversal = cursors.allocateRelationshipTraversalCursor();
RelationshipGroupCursor group = cursors.allocateRelationshipGroupCursor() )
{
Read read = tx.dataRead();
read.singleNode( start, node );
assertTrue( node.next() );
node.relationships( group );

//Outgoing
assertTrue( group.next() );
assertEquals( 1, group.outgoingCount() );
assertEquals( 0, group.incomingCount() );
assertEquals( 0, group.loopCount() );
assertRelationships(OUT, group, traversal, out);
assertNoRelationships(IN, group, traversal);
assertNoRelationships(LOOP, group, traversal);

//Incoming
assertTrue( group.next() );
assertEquals( 0, group.outgoingCount() );
assertEquals( 2, group.incomingCount() );
assertEquals( 0, group.loopCount() );
assertRelationships(IN, group, traversal, in1, in2);
assertNoRelationships(OUT, group, traversal);
assertNoRelationships(LOOP, group, traversal);

//Loop
assertTrue( group.next() );
assertEquals( 0, group.outgoingCount() );
assertEquals( 0, group.incomingCount() );
assertEquals( 1, group.loopCount() );
assertRelationships(LOOP, group, traversal, loop);
assertNoRelationships(OUT, group, traversal);
assertNoRelationships(IN, group, traversal);

assertFalse( group.next() );
}
}
}

void assertRelationships( RelationshipDirection direction, RelationshipGroupCursor group,
RelationshipTraversalCursor traversal, long...relationships )
{
switch ( direction )
{
case OUT:
group.outgoing( traversal );
break;
case IN:
group.incoming( traversal );
break;
case LOOP:
group.loops( traversal );
break;
default:
throw new AssertionError( "Where is your god now!" );
}

for ( long relationship : relationships )
{
assertTrue( traversal.next() );
assertEquals( relationship, traversal.relationshipReference() );
}
assertFalse( traversal.next() );
}

void assertNoRelationships( RelationshipDirection direction, RelationshipGroupCursor group,
RelationshipTraversalCursor traversal )
{
switch ( direction )
{
case OUT:
group.outgoing( traversal );
assertFalse( traversal.next() );
break;
case IN:
group.incoming( traversal );
assertFalse( traversal.next() );
break;
case LOOP:
group.loops( traversal );
assertFalse( traversal.next() );
break;
default:
throw new AssertionError( "Where is your god now!" );
}
}

private void traverseWithoutGroups( RelationshipTestSupport.StartNode start, boolean detached ) throws Exception
{
try ( Transaction tx = session.beginTransaction() )
Expand Down
Expand Up @@ -20,13 +20,19 @@
package org.neo4j.kernel.impl.newapi;

import org.neo4j.collection.primitive.Primitive;
import org.neo4j.collection.primitive.PrimitiveIntIterator;
import org.neo4j.collection.primitive.PrimitiveIntObjectMap;
import org.neo4j.collection.primitive.PrimitiveIntSet;
import org.neo4j.collection.primitive.PrimitiveLongIterator;
import org.neo4j.internal.kernel.api.RelationshipGroupCursor;
import org.neo4j.internal.kernel.api.RelationshipTraversalCursor;
import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.kernel.impl.api.RelationshipVisitor;
import org.neo4j.kernel.impl.newapi.DefaultRelationshipTraversalCursor.Record;
import org.neo4j.kernel.impl.store.record.RelationshipGroupRecord;
import org.neo4j.kernel.impl.store.record.RelationshipRecord;
import org.neo4j.storageengine.api.txstate.NodeState;
import org.neo4j.storageengine.api.txstate.RelationshipState;

import static org.neo4j.kernel.impl.newapi.RelationshipReferenceEncoding.encodeForFiltering;
import static org.neo4j.kernel.impl.newapi.RelationshipReferenceEncoding.encodeForTxStateFiltering;
Expand All @@ -41,6 +47,9 @@ class DefaultRelationshipGroupCursor extends RelationshipGroupRecord implements
private BufferedGroup bufferedGroup;
private PageCursor page;
private PageCursor edgePage;
private boolean hasCheckedTxState;
private PrimitiveIntSet txTypes = Primitive.intSet();
private PrimitiveIntIterator txTypeIterator = null;

DefaultRelationshipGroupCursor()
{
Expand Down Expand Up @@ -89,16 +98,21 @@ else if ( edge.getSecondNode() == nodeReference ) // incoming
throw new IllegalStateException( "not a part of the chain! TODO: better exception" );
}
}
//we never need to check tx state, handled by traversal
this.hasCheckedTxState = true;
this.bufferedGroup = new BufferedGroup( edge, current ); // we need a dummy before the first to denote the initial pos
this.read = read;
}
}

void direct( long nodeReference, long reference, Read read )
{
setOwningNode( nodeReference );
bufferedGroup = null;
clear();
txTypes.clear();
txTypeIterator = null;
hasCheckedTxState = false;
setOwningNode( nodeReference );
setNext( reference );
if ( page == null )
{
Expand All @@ -124,30 +138,93 @@ public boolean next()
{
if ( isBuffered() )
{
bufferedGroup = bufferedGroup.next;
if ( bufferedGroup == null )
{
return false; // we never have both types of traversal, so terminate early
}
setType( bufferedGroup.label );
setFirstOut( bufferedGroup.outgoing() );
setFirstIn( bufferedGroup.incoming() );
setFirstLoop( bufferedGroup.loops() );
return true;
return nextFromBuffer();
}

//We need to check tx state if there are new types added
//on the first call of next
if ( !hasCheckedTxState )
{
checkTxStateForUpdates();
hasCheckedTxState = true;
}

do
{
if ( getNext() == NO_ID )
{
return false;
//We have now run out of groups from the store, however there may still
//be new types that was added in the transaction that we haven't visited yet.
return nextFromTxState();
}
read.group( this, getNext(), page );
} while ( !inUse() );

markTypeAsSeen( type() );
return true;
}

private boolean nextFromBuffer()
{
bufferedGroup = bufferedGroup.next;
if ( bufferedGroup == null )
{
return false; // we never have both types of traversal, so terminate early
}
setType( bufferedGroup.label );
setFirstOut( bufferedGroup.outgoing() );
setFirstIn( bufferedGroup.incoming() );
setFirstLoop( bufferedGroup.loops() );
return true;
}

private boolean nextFromTxState()
{
if ( txTypeIterator == null && !txTypes.isEmpty() )
{
txTypeIterator = txTypes.iterator();
//here it may be tempting to do txTypes.clear()
//however that will also clear the iterator
}
if ( txTypeIterator != null && txTypeIterator.hasNext() )
{
setType( txTypeIterator.next() );
return true;
}
return false;
}

/**
* Marks the given type as already seen
* @param type the type we have seen
*/
private void markTypeAsSeen( int type )
{
if ( txTypes != null )
{
txTypes.remove( type );
}
}

/**
* Store all types that was added in the transaction for the current node
*/
private void checkTxStateForUpdates()
{
if (read.hasTxStateWithChanges())
{
NodeState nodeState = read.txState().getNodeState( getOwningNode() );
PrimitiveLongIterator addedRelationships = nodeState.getAddedRelationships();
while (addedRelationships.hasNext())
{
RelationshipState relationshipState = read.txState().getRelationshipState( addedRelationships.next() );
relationshipState.accept(
(RelationshipVisitor<RuntimeException>) ( relationshipId, typeId, startNodeId, endNodeId ) ->
txTypes.add( typeId ) );
}
}
}

@Override
public boolean shouldRetry()
{
Expand Down Expand Up @@ -184,11 +261,7 @@ public int relationshipLabel()
public int outgoingCount()
{
int count;
if ( read.hasTxStateWithChanges() && read.txState().nodeIsAddedInThisTx( getOwningNode() ) )
{
count = 0;
}
else if ( isBuffered() )
if ( isBuffered() )
{
count = bufferedGroup.outgoingCount;
}
Expand Down Expand Up @@ -223,11 +296,7 @@ public int incomingCount()
public int loopCount()
{
int count;
if ( read.hasTxStateWithChanges() && read.txState().nodeIsAddedInThisTx( getOwningNode() ) )
{
count = 0;
}
else if ( isBuffered() )
if ( isBuffered() )
{
count = bufferedGroup.loopsCount;
}
Expand Down
Expand Up @@ -272,11 +272,8 @@ public final void relationshipGroups(
long nodeReference, long reference, RelationshipGroupCursor cursor )
{
ktx.assertOpen();
if ( reference == NO_ID ) // there are no relationships for this node
{
cursor.close();
}
else if ( isRelationship( reference ) ) // the relationships for this node are not grouped in the store
// the relationships for this node are not grouped in the store
if ( reference != NO_ID && isRelationship( reference ) )
{
((DefaultRelationshipGroupCursor) cursor).buffer( nodeReference, clearEncoding( reference ), this );
}
Expand Down

0 comments on commit 773d7eb

Please sign in to comment.