Skip to content

Commit

Permalink
Remove done() from Node*Client
Browse files Browse the repository at this point in the history
Instead, progressor.next() will be used directly as
an indicator if client is done or not.

This means NodeValueClientFilter now sits in the
middle between NodeValueClient and IndexProgressor
and passes calls between them.
  • Loading branch information
burqen committed Nov 16, 2017
1 parent b6df60c commit 52e08c5
Show file tree
Hide file tree
Showing 10 changed files with 179 additions and 175 deletions.
Expand Up @@ -210,19 +210,19 @@ public boolean hasFullNumberPrecision( IndexQuery... predicates )
return true; return true;
} }


private void startSeekForInitializedRange( IndexProgressor.NodeValueClient cursor, KEY treeKeyFrom, KEY treeKeyTo ) private void startSeekForInitializedRange( IndexProgressor.NodeValueClient client, KEY treeKeyFrom, KEY treeKeyTo )
{ {
if ( layout.compare( treeKeyFrom, treeKeyTo ) > 0 ) if ( layout.compare( treeKeyFrom, treeKeyTo ) > 0 )
{ {
cursor.initialize( IndexProgressor.EMPTY, propertyKeys ); client.initialize( IndexProgressor.EMPTY, propertyKeys );
return; return;
} }
try try
{ {
RawCursor<Hit<KEY,VALUE>,IOException> seeker = tree.seek( treeKeyFrom, treeKeyTo ); RawCursor<Hit<KEY,VALUE>,IOException> seeker = tree.seek( treeKeyFrom, treeKeyTo );
openSeekers.add( seeker ); openSeekers.add( seeker );
IndexProgressor hitProgressor = new NumberHitIndexCursorProgressor<>( seeker, cursor, openSeekers ); IndexProgressor hitProgressor = new NumberHitIndexCursorProgressor<>( seeker, client, openSeekers );
cursor.initialize( hitProgressor, propertyKeys ); client.initialize( hitProgressor, propertyKeys );
} }
catch ( IOException e ) catch ( IOException e )
{ {
Expand Down
Expand Up @@ -21,15 +21,17 @@


import org.neo4j.collection.primitive.PrimitiveLongCollections; import org.neo4j.collection.primitive.PrimitiveLongCollections;
import org.neo4j.collection.primitive.PrimitiveLongIterator; import org.neo4j.collection.primitive.PrimitiveLongIterator;
import org.neo4j.graphdb.Resource;
import org.neo4j.storageengine.api.schema.IndexProgressor; import org.neo4j.storageengine.api.schema.IndexProgressor;
import org.neo4j.values.storable.Value; import org.neo4j.values.storable.Value;


/** /**
* A {@link IndexProgressor} + {@link IndexProgressor.NodeValueClient} combo presented as a {@link PrimitiveLongIterator}. * A {@link IndexProgressor} + {@link IndexProgressor.NodeValueClient} combo presented as a {@link PrimitiveLongIterator}.
*/ */
public class NodeValueIterator extends PrimitiveLongCollections.PrimitiveLongBaseIterator implements IndexProgressor.NodeValueClient public class NodeValueIterator extends PrimitiveLongCollections.PrimitiveLongBaseIterator
implements IndexProgressor.NodeValueClient, Resource
{ {
private boolean closed; private volatile boolean closed;
private IndexProgressor progressor; private IndexProgressor progressor;


@Override @Override
Expand All @@ -39,7 +41,7 @@ protected boolean fetchNext()
// and feed result into this with node( long reference, Value... values ) // and feed result into this with node( long reference, Value... values )
if ( closed || !progressor.next() ) if ( closed || !progressor.next() )
{ {
done(); close();
return false; return false;
} }
return true; return true;
Expand All @@ -58,12 +60,12 @@ public boolean acceptNode( long reference, Value... values )
} }


@Override @Override
public void done() public void close()
{ {
if ( !closed ) if ( !closed )
{ {
progressor.close();
closed = true; closed = true;
progressor.close();
} }
} }
} }
Expand Up @@ -30,15 +30,15 @@
public class NumberHitIndexCursorProgressor<KEY extends SchemaNumberKey, VALUE extends SchemaNumberValue> implements IndexProgressor public class NumberHitIndexCursorProgressor<KEY extends SchemaNumberKey, VALUE extends SchemaNumberValue> implements IndexProgressor
{ {
private final RawCursor<Hit<KEY,VALUE>,IOException> seeker; private final RawCursor<Hit<KEY,VALUE>,IOException> seeker;
private final NodeValueClient cursor; private final NodeValueClient client;
private final Collection<RawCursor<Hit<KEY,VALUE>,IOException>> toRemoveFromOnClose; private final Collection<RawCursor<Hit<KEY,VALUE>,IOException>> toRemoveFromOnClose;
private boolean closed; private boolean closed;


NumberHitIndexCursorProgressor( RawCursor<Hit<KEY,VALUE>,IOException> seeker, NodeValueClient cursor, NumberHitIndexCursorProgressor( RawCursor<Hit<KEY,VALUE>,IOException> seeker, NodeValueClient client,
Collection<RawCursor<Hit<KEY,VALUE>,IOException>> toRemoveFromOnClose ) Collection<RawCursor<Hit<KEY,VALUE>,IOException>> toRemoveFromOnClose )
{ {
this.seeker = seeker; this.seeker = seeker;
this.cursor = cursor; this.client = client;
this.toRemoveFromOnClose = toRemoveFromOnClose; this.toRemoveFromOnClose = toRemoveFromOnClose;
} }


Expand All @@ -50,7 +50,7 @@ public boolean next()
if ( seeker.next() ) if ( seeker.next() )
{ {
KEY key = seeker.get().key(); KEY key = seeker.get().key();
cursor.acceptNode( key.entityId, key.asValue() ); client.acceptNode( key.entityId, key.asValue() );
return true; return true;
} }
return false; return false;
Expand Down
Expand Up @@ -132,7 +132,7 @@ public void query( IndexProgressor.NodeValueClient cursor, IndexOrder indexOrder
// todo: There will be no ordering of the node ids here. Is this a problem? // todo: There will be no ordering of the node ids here. Is this a problem?
if ( predicates[0] instanceof ExistsPredicate ) if ( predicates[0] instanceof ExistsPredicate )
{ {
MultiProgressorNodeValueCursor multiProgressor = new MultiProgressorNodeValueCursor( cursor, propertyKeys ); BridgingIndexProgressor multiProgressor = new BridgingIndexProgressor( cursor, propertyKeys );
cursor.initialize( multiProgressor, propertyKeys ); cursor.initialize( multiProgressor, propertyKeys );
nativeReader.query( multiProgressor, indexOrder, predicates[0] ); nativeReader.query( multiProgressor, indexOrder, predicates[0] );
luceneReader.query( multiProgressor, indexOrder, predicates[0] ); luceneReader.query( multiProgressor, indexOrder, predicates[0] );
Expand Down Expand Up @@ -161,16 +161,19 @@ public boolean hasFullNumberPrecision( IndexQuery... predicates )
return predicates[0] instanceof NumberRangePredicate && nativeReader.hasFullNumberPrecision( predicates ); return predicates[0] instanceof NumberRangePredicate && nativeReader.hasFullNumberPrecision( predicates );
} }


private class MultiProgressorNodeValueCursor implements IndexProgressor.NodeValueClient, IndexProgressor /**
* Combine multiple progressor to act like one single logical progressor seen from clients perspective.
*/
private class BridgingIndexProgressor implements IndexProgressor.NodeValueClient, IndexProgressor
{ {
private final NodeValueClient cursor; private final NodeValueClient client;
private final int[] keys; private final int[] keys;
private final Queue<IndexProgressor> progressors; private final Queue<IndexProgressor> progressors;
private IndexProgressor current; private IndexProgressor current;


MultiProgressorNodeValueCursor( NodeValueClient cursor, int[] keys ) BridgingIndexProgressor( NodeValueClient client, int[] keys )
{ {
this.cursor = cursor; this.client = client;
this.keys = keys; this.keys = keys;
progressors = new ArrayDeque<>(); progressors = new ArrayDeque<>();
} }
Expand All @@ -190,6 +193,7 @@ public boolean next()
} }
else else
{ {
current.close();
current = progressors.poll(); current = progressors.poll();
} }
} }
Expand Down Expand Up @@ -223,13 +227,7 @@ private void assertKeysAlign( int[] keys )
@Override @Override
public boolean acceptNode( long reference, Value[] values ) public boolean acceptNode( long reference, Value[] values )
{ {
return cursor.acceptNode( reference, values ); return client.acceptNode( reference, values );
}

@Override
public void done()
{
cursor.done();
} }
} }
} }
Expand Up @@ -30,11 +30,6 @@ final void initialize( IndexProgressor progressor )
this.progressor = progressor; this.progressor = progressor;
} }


public final void done()
{
close();
}

public final boolean next() public final boolean next()
{ {
return progressor != null && progressor.next(); return progressor != null && progressor.next();
Expand Down
Expand Up @@ -24,23 +24,58 @@


import org.neo4j.internal.kernel.api.IndexQuery; import org.neo4j.internal.kernel.api.IndexQuery;
import org.neo4j.storageengine.api.schema.IndexProgressor; import org.neo4j.storageengine.api.schema.IndexProgressor;
import org.neo4j.storageengine.api.schema.IndexProgressor.NodeValueClient;
import org.neo4j.values.storable.Value; import org.neo4j.values.storable.Value;


/** /**
* This class filters acceptNode() calls from an index progressor, to assert that exact entries returned from the * This class filters acceptNode() calls from an index progressor, to assert that exact entries returned from the
* progressor really match the exact property values. See also org.neo4j.kernel.impl.api.LookupFilter. * progressor really match the exact property values. See also org.neo4j.kernel.impl.api.LookupFilter.
*
* It works by acting as a man-in-the-middle between outer {@link NodeValueClient client} and inner {@link IndexProgressor}.
* Interaction goes like:
*
* Initialize:
* <pre><code>
* client
* -- query( client ) -> filter = new filter(client)
* filter -- query( filter ) -> progressor
* filter <- initialize(progressor) -- progressor
* client <- initialize(filter) -- filter
* </code></pre>
*
* Progress:
* <pre><code>
* client -- next() -> filter
* filter -- next() -> progressor
* <- acceptNode() --
* -- :false ->
* <- acceptNode() --
* -- :false ->
* filter <- acceptNode() --
* client <- acceptNode() -- filter
* -- :true -> filter -- :true -> progressor
* client <----------------------------------------------
* </code></pre>
*
* Close:
* <pre><code>
* client -- close() -> filter
* filter -- close() -> progressor
* client <---------------------------------
* </code></pre>
*/ */
class NodeValueClientFilter implements IndexProgressor.NodeValueClient class NodeValueClientFilter implements NodeValueClient, IndexProgressor
{ {
private static final Comparator<IndexQuery> ASCENDING_BY_KEY = Comparator.comparingInt( IndexQuery::propertyKeyId ); private static final Comparator<IndexQuery> ASCENDING_BY_KEY = Comparator.comparingInt( IndexQuery::propertyKeyId );
private final IndexProgressor.NodeValueClient target; private final NodeValueClient target;
private final NodeCursor node; private final NodeCursor node;
private final PropertyCursor property; private final PropertyCursor property;
private final IndexQuery[] filters; private final IndexQuery[] filters;
private int[] keys; private int[] keys;
private IndexProgressor progressor;


NodeValueClientFilter( NodeValueClientFilter(
IndexProgressor.NodeValueClient target, NodeValueClient target,
NodeCursor node, PropertyCursor property, IndexQuery... filters ) NodeCursor node, PropertyCursor property, IndexQuery... filters )
{ {
this.target = target; this.target = target;
Expand All @@ -53,16 +88,9 @@ class NodeValueClientFilter implements IndexProgressor.NodeValueClient
@Override @Override
public void initialize( IndexProgressor progressor, int[] propertyIds ) public void initialize( IndexProgressor progressor, int[] propertyIds )
{ {
this.progressor = progressor;
this.keys = propertyIds; this.keys = propertyIds;
target.initialize( progressor, propertyIds ); target.initialize( this, propertyIds );
}

@Override
public void done()
{
node.close();
property.close();
target.done();
} }


@Override @Override
Expand All @@ -88,6 +116,20 @@ public boolean acceptNode( long reference, Value[] values )
} }
} }


@Override
public boolean next()
{
return progressor.next();
}

@Override
public void close()
{
node.close();
property.close();
progressor.close();
}

private boolean filterByIndexValues( long reference, Value[] values ) private boolean filterByIndexValues( long reference, Value[] values )
{ {
FILTERS: FILTERS:
Expand Down
Expand Up @@ -82,11 +82,6 @@ interface NodeValueClient
* @return true if the entry is accepted, false otherwise * @return true if the entry is accepted, false otherwise
*/ */
boolean acceptNode( long reference, Value... values ); boolean acceptNode( long reference, Value... values );

/**
* Called by progressor so signal that there are no more entries.
*/
void done();
} }


/** /**
Expand All @@ -109,11 +104,6 @@ interface NodeLabelClient
* @return true if the entry is accepted, false otherwise * @return true if the entry is accepted, false otherwise
*/ */
boolean acceptNode( long reference, LabelSet labels ); boolean acceptNode( long reference, LabelSet labels );

/**
* Called by progressor so signal that there are no more entries.
*/
void done();
} }


/** /**
Expand All @@ -135,11 +125,6 @@ interface ExplicitClient
* @return true if the entry is accepted, false otherwise * @return true if the entry is accepted, false otherwise
*/ */
boolean acceptEntity( long reference, float score ); boolean acceptEntity( long reference, float score );

/**
* Called by progressor so signal that there are no more entries.
*/
void done();
} }


IndexProgressor EMPTY = new IndexProgressor() IndexProgressor EMPTY = new IndexProgressor()
Expand Down

0 comments on commit 52e08c5

Please sign in to comment.