Skip to content

Commit

Permalink
Change nodeUniqueIndexSeek to use fresh index reader
Browse files Browse the repository at this point in the history
  • Loading branch information
fickludd committed Apr 10, 2018
1 parent 746ce26 commit ba4b828
Show file tree
Hide file tree
Showing 12 changed files with 78 additions and 33 deletions.
Expand Up @@ -353,7 +353,7 @@ final class TransactionBoundQueryContext(tc: TransactionalContextWrapper, val re
JavaConversionSupport.mapToScalaENFXSafe(scan(DefaultIndexReference.general(index.labelId, index.propertyId)))(nodeOps.getByIdIfExists) JavaConversionSupport.mapToScalaENFXSafe(scan(DefaultIndexReference.general(index.labelId, index.propertyId)))(nodeOps.getByIdIfExists)


override def lockingExactUniqueIndexSearch(index: SchemaTypes.IndexDescriptor, value: Any): Option[Node] = { override def lockingExactUniqueIndexSearch(index: SchemaTypes.IndexDescriptor, value: Any): Option[Node] = {
val nodeId: Long = tc.dataRead.nodeUniqueIndexSeek(DefaultIndexReference.general(index.labelId, index.propertyId), val nodeId: Long = tc.dataRead.lockingNodeUniqueIndexSeek(DefaultIndexReference.general(index.labelId, index.propertyId),
IndexQuery.exact(index.propertyId, Values.of(value))) IndexQuery.exact(index.propertyId, Values.of(value)))
if (StatementConstants.NO_SUCH_NODE == nodeId) None else Some(nodeOps.getById(nodeId)) if (StatementConstants.NO_SUCH_NODE == nodeId) None else Some(nodeOps.getById(nodeId))
} }
Expand Down
Expand Up @@ -360,7 +360,7 @@ final class TransactionBoundQueryContext(txContext: TransactionalContextWrapper,


override def lockingUniqueIndexSeek(index: IndexDescriptor, value: Any): Option[Node] = { override def lockingUniqueIndexSeek(index: IndexDescriptor, value: Any): Option[Node] = {
indexSearchMonitor.lockingUniqueIndexSeek(index, value) indexSearchMonitor.lockingUniqueIndexSeek(index, value)
val nodeId = reads().nodeUniqueIndexSeek(DefaultIndexReference.general(index.labelId, index.propertyId), IndexQuery.exact(index.propertyId, value)) val nodeId = reads().lockingNodeUniqueIndexSeek(DefaultIndexReference.general(index.labelId, index.propertyId), IndexQuery.exact(index.propertyId, value))
if (StatementConstants.NO_SUCH_NODE == nodeId) None else Some(nodeOps.getById(nodeId)) if (StatementConstants.NO_SUCH_NODE == nodeId) None else Some(nodeOps.getById(nodeId))
} }


Expand Down
Expand Up @@ -301,7 +301,7 @@ sealed class TransactionBoundQueryContext(val transactionalContext: Transactiona
override def lockingUniqueIndexSeek(indexReference: IndexReference, queries: Seq[IndexQuery.ExactPredicate]): Option[NodeValue] = { override def lockingUniqueIndexSeek(indexReference: IndexReference, queries: Seq[IndexQuery.ExactPredicate]): Option[NodeValue] = {
indexSearchMonitor.lockingUniqueIndexSeek(indexReference, queries) indexSearchMonitor.lockingUniqueIndexSeek(indexReference, queries)
val index = DefaultIndexReference.general(indexReference.label(), indexReference.properties():_*) val index = DefaultIndexReference.general(indexReference.label(), indexReference.properties():_*)
val nodeId = reads().nodeUniqueIndexSeek(index, queries:_*) val nodeId = reads().lockingNodeUniqueIndexSeek(index, queries:_*)
if (StatementConstants.NO_SUCH_NODE == nodeId) None else Some(nodeOps.getById(nodeId)) if (StatementConstants.NO_SUCH_NODE == nodeId) None else Some(nodeOps.getById(nodeId))
} }


Expand Down
Expand Up @@ -30,6 +30,8 @@ public interface Read
int ANY_RELATIONSHIP_TYPE = -1; int ANY_RELATIONSHIP_TYPE = -1;


/** /**
* Seek all nodes matching the provided index query in an index.
*
* @param index {@link IndexReference} referencing index to query. * @param index {@link IndexReference} referencing index to query.
* @param cursor the cursor to use for consuming the results. * @param cursor the cursor to use for consuming the results.
* @param indexOrder requested {@link IndexOrder} of result. Must be among the capabilities of * @param indexOrder requested {@link IndexOrder} of result. Must be among the capabilities of
Expand All @@ -45,13 +47,17 @@ void nodeIndexSeek( IndexReference index, NodeValueIndexCursor cursor, IndexOrde
* Note that this is a very special method and should be use with caution. It has special locking semantics in * Note that this is a very special method and should be use with caution. It has special locking semantics in
* order to facilitate unique creation of nodes. If a node is found; a shared lock for the index entry will be * order to facilitate unique creation of nodes. If a node is found; a shared lock for the index entry will be
* held whereas if no node is found we will hold onto an exclusive lock until the close of the transaction. * held whereas if no node is found we will hold onto an exclusive lock until the close of the transaction.
* @param index {@link IndexReference} referencing index to query. *
* {@link IndexReference referenced index}, or {@link IndexOrder#NONE}. * @param index {@link IndexReference} referencing index to query.
* {@link IndexReference referenced index}, or {@link IndexOrder#NONE}.
* @param predicates Combination of {@link IndexQuery.ExactPredicate index queries} to run against referenced index. * @param predicates Combination of {@link IndexQuery.ExactPredicate index queries} to run against referenced index.
*/ */
long nodeUniqueIndexSeek( IndexReference index, IndexQuery.ExactPredicate... predicates ) long lockingNodeUniqueIndexSeek( IndexReference index, IndexQuery.ExactPredicate... predicates )
throws KernelException; throws KernelException;

/** /**
* Scan all values in an index.
*
* @param index {@link IndexReference} referencing index to query. * @param index {@link IndexReference} referencing index to query.
* @param cursor the cursor to use for consuming the results. * @param cursor the cursor to use for consuming the results.
* @param indexOrder requested {@link IndexOrder} of result. Must be among the capabilities of * @param indexOrder requested {@link IndexOrder} of result. Must be among the capabilities of
Expand All @@ -75,6 +81,11 @@ long nodeUniqueIndexSeek( IndexReference index, IndexQuery.ExactPredicate... pre


Scan<NodeLabelIndexCursor> nodeLabelScan( int label ); Scan<NodeLabelIndexCursor> nodeLabelScan( int label );


/**
* Return all nodes in the graph.
*
* @param cursor Cursor to initialize for scanning.
*/
void allNodesScan( NodeCursor cursor ); void allNodesScan( NodeCursor cursor );


Scan<NodeCursor> allNodesScan(); Scan<NodeCursor> allNodesScan();
Expand Down
Expand Up @@ -43,7 +43,7 @@ public void nodeIndexSeek( IndexReference index, NodeValueIndexCursor cursor, In
} }


@Override @Override
public long nodeUniqueIndexSeek( IndexReference index, public long lockingNodeUniqueIndexSeek( IndexReference index,
IndexQuery.ExactPredicate... predicates ) throws KernelException IndexQuery.ExactPredicate... predicates ) throws KernelException
{ {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
Expand Down
Expand Up @@ -246,12 +246,13 @@ long graphPropertiesReference()
} }


@Override @Override
IndexReader indexReader( IndexReference index ) throws IndexNotFoundKernelException IndexReader indexReader( IndexReference index, boolean fresh ) throws IndexNotFoundKernelException
{ {
SchemaIndexDescriptor schemaIndexDescriptor = index.isUnique() ? SchemaIndexDescriptor schemaIndexDescriptor = index.isUnique() ?
SchemaIndexDescriptorFactory.uniqueForLabel( index.label(), index.properties() ) : SchemaIndexDescriptorFactory.uniqueForLabel( index.label(), index.properties() ) :
SchemaIndexDescriptorFactory.forLabel( index.label(), index.properties() ); SchemaIndexDescriptorFactory.forLabel( index.label(), index.properties() );
return statement.getIndexReader( schemaIndexDescriptor ); return fresh ? statement.getFreshIndexReader( schemaIndexDescriptor ) :
statement.getIndexReader( schemaIndexDescriptor );
} }


@Override @Override
Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.neo4j.collection.primitive.PrimitiveLongCollections; import org.neo4j.collection.primitive.PrimitiveLongCollections;
import org.neo4j.collection.primitive.PrimitiveLongIterator; import org.neo4j.collection.primitive.PrimitiveLongIterator;
import org.neo4j.collection.primitive.PrimitiveLongSet; import org.neo4j.collection.primitive.PrimitiveLongSet;
import org.neo4j.graphdb.Resource;
import org.neo4j.internal.kernel.api.IndexQuery; import org.neo4j.internal.kernel.api.IndexQuery;
import org.neo4j.internal.kernel.api.NodeCursor; import org.neo4j.internal.kernel.api.NodeCursor;
import org.neo4j.internal.kernel.api.NodeValueIndexCursor; import org.neo4j.internal.kernel.api.NodeValueIndexCursor;
Expand All @@ -47,6 +48,7 @@ final class DefaultNodeValueIndexCursor extends IndexCursor<IndexProgressor>
implements NodeValueIndexCursor, NodeValueClient implements NodeValueIndexCursor, NodeValueClient
{ {
private Read read; private Read read;
private Resource resource;
private long node; private long node;
private IndexQuery[] query; private IndexQuery[] query;
private Value[] values; private Value[] values;
Expand Down Expand Up @@ -142,9 +144,10 @@ public boolean next()
} }
} }


public void setRead( Read read ) public void setRead( Read read, Resource resource )
{ {
this.read = read; this.read = read;
this.resource = resource;
} }


@Override @Override
Expand Down Expand Up @@ -196,7 +199,18 @@ public void close()
this.added = emptyIterator(); this.added = emptyIterator();
this.removed = PrimitiveLongCollections.emptySet(); this.removed = PrimitiveLongCollections.emptySet();


pool.accept( this ); try
{
if ( resource != null )
{
resource.close();
resource = null;
}
}
finally
{
pool.accept( this );
}
} }
} }


Expand Down
Expand Up @@ -32,7 +32,6 @@
import org.neo4j.internal.kernel.api.CursorFactory; import org.neo4j.internal.kernel.api.CursorFactory;
import org.neo4j.internal.kernel.api.ExplicitIndexRead; import org.neo4j.internal.kernel.api.ExplicitIndexRead;
import org.neo4j.internal.kernel.api.ExplicitIndexWrite; import org.neo4j.internal.kernel.api.ExplicitIndexWrite;
import org.neo4j.internal.kernel.api.IndexOrder;
import org.neo4j.internal.kernel.api.IndexQuery; import org.neo4j.internal.kernel.api.IndexQuery;
import org.neo4j.internal.kernel.api.IndexReference; import org.neo4j.internal.kernel.api.IndexReference;
import org.neo4j.internal.kernel.api.Locks; import org.neo4j.internal.kernel.api.Locks;
Expand Down Expand Up @@ -394,8 +393,8 @@ private void validateNoExistingNodeWithExactValues( IndexBackedConstraintDescrip
indexEntryResourceId( labelId, propertyValues ) indexEntryResourceId( labelId, propertyValues )
); );


allStoreHolder.nodeIndexSeek( allStoreHolder.indexGetCapability( schemaIndexDescriptor ), valueCursor, allStoreHolder.nodeIndexSeekWithFreshIndexReader(
IndexOrder.NONE, propertyValues ); allStoreHolder.indexGetCapability( schemaIndexDescriptor ), valueCursor, propertyValues );
if ( valueCursor.next() && valueCursor.nodeReference() != modifiedNode ) if ( valueCursor.next() && valueCursor.nodeReference() != modifiedNode )
{ {
throw new UniquePropertyValueValidationException( constraint, VALIDATION, throw new UniquePropertyValueValidationException( constraint, VALIDATION,
Expand Down
Expand Up @@ -109,9 +109,17 @@ public final void nodeIndexSeek(
return; return;
} }


((DefaultNodeValueIndexCursor) cursor).setRead( this ); DefaultNodeValueIndexCursor cursorImpl = (DefaultNodeValueIndexCursor) cursor;
IndexProgressor.NodeValueClient target = (DefaultNodeValueIndexCursor) cursor; IndexReader reader = indexReader( index, false );
IndexReader reader = indexReader( index ); cursorImpl.setRead( this, null );
IndexProgressor.NodeValueClient target = withFullValuePrecision( cursorImpl, query, reader );
reader.query( target, indexOrder, query );
}

private IndexProgressor.NodeValueClient withFullValuePrecision( DefaultNodeValueIndexCursor cursor,
IndexQuery[] query, IndexReader reader )
{
IndexProgressor.NodeValueClient target = cursor;
if ( !reader.hasFullValuePrecision( query ) ) if ( !reader.hasFullValuePrecision( query ) )
{ {
IndexQuery[] filters = new IndexQuery[query.length]; IndexQuery[] filters = new IndexQuery[query.length];
Expand Down Expand Up @@ -148,10 +156,11 @@ public final void nodeIndexSeek(
cursors.allocatePropertyCursor(), this, filters ); cursors.allocatePropertyCursor(), this, filters );
} }
} }
reader.query( target, indexOrder, query ); return target;
} }


public final long nodeUniqueIndexSeek( @Override
public final long lockingNodeUniqueIndexSeek(
IndexReference index, IndexReference index,
IndexQuery.ExactPredicate... predicates ) IndexQuery.ExactPredicate... predicates )
throws IndexNotApplicableKernelException, IndexNotFoundKernelException, IndexBrokenKernelException throws IndexNotApplicableKernelException, IndexNotFoundKernelException, IndexBrokenKernelException
Expand All @@ -167,14 +176,14 @@ public final long nodeUniqueIndexSeek(
//First try to find node under a shared lock //First try to find node under a shared lock
//if not found upgrade to exclusive and try again //if not found upgrade to exclusive and try again
locks.acquireShared( lockTracer, INDEX_ENTRY, indexEntryId ); locks.acquireShared( lockTracer, INDEX_ENTRY, indexEntryId );
try ( NodeValueIndexCursor cursor = cursors.allocateNodeValueIndexCursor() ) try ( DefaultNodeValueIndexCursor cursor = cursors.allocateNodeValueIndexCursor() )
{ {
nodeIndexSeek( index, cursor, IndexOrder.NONE, predicates ); nodeIndexSeekWithFreshIndexReader( index, cursor, predicates );
if ( !cursor.next() ) if ( !cursor.next() )
{ {
locks.releaseShared( INDEX_ENTRY, indexEntryId ); locks.releaseShared( INDEX_ENTRY, indexEntryId );
locks.acquireExclusive( lockTracer, INDEX_ENTRY, indexEntryId ); locks.acquireExclusive( lockTracer, INDEX_ENTRY, indexEntryId );
nodeIndexSeek( index, cursor, IndexOrder.NONE, predicates ); nodeIndexSeekWithFreshIndexReader( index, cursor, predicates );
if ( cursor.next() ) // we found it under the exclusive lock if ( cursor.next() ) // we found it under the exclusive lock
{ {
// downgrade to a shared lock // downgrade to a shared lock
Expand All @@ -187,6 +196,17 @@ public final long nodeUniqueIndexSeek(
} }
} }


void nodeIndexSeekWithFreshIndexReader(
IndexReference index,
DefaultNodeValueIndexCursor cursor,
IndexQuery.ExactPredicate... query ) throws IndexNotFoundKernelException, IndexNotApplicableKernelException
{
IndexReader reader = indexReader( index, true );
cursor.setRead( this, reader );
IndexProgressor.NodeValueClient target = withFullValuePrecision( cursor, query, reader );
reader.query( target, IndexOrder.NONE, query );
}

@Override @Override
public final void nodeIndexScan( public final void nodeIndexScan(
IndexReference index, IndexReference index,
Expand All @@ -202,8 +222,8 @@ public final void nodeIndexScan(


// for a scan, we simply query for existence of the first property, which covers all entries in an index // for a scan, we simply query for existence of the first property, which covers all entries in an index
int firstProperty = index.properties()[0]; int firstProperty = index.properties()[0];
((DefaultNodeValueIndexCursor) cursor).setRead( this ); ((DefaultNodeValueIndexCursor) cursor).setRead( this, null );
indexReader( index ).query( (DefaultNodeValueIndexCursor) cursor, indexOrder, IndexQuery.exists( firstProperty ) ); indexReader( index, false ).query( (DefaultNodeValueIndexCursor) cursor, indexOrder, IndexQuery.exists( firstProperty ) );
} }


private boolean hasForbiddenProperties( IndexReference index ) private boolean hasForbiddenProperties( IndexReference index )
Expand Down Expand Up @@ -541,7 +561,7 @@ public final void futureRelationshipPropertyReferenceRead( long reference )
ktx.assertOpen(); ktx.assertOpen();
} }


abstract IndexReader indexReader( IndexReference index ) throws IndexNotFoundKernelException; abstract IndexReader indexReader( IndexReference index, boolean fresh ) throws IndexNotFoundKernelException;


abstract LabelScanReader labelScanReader(); abstract LabelScanReader labelScanReader();


Expand Down
Expand Up @@ -83,7 +83,7 @@ public void shouldFindMatchingNode() throws Exception
// when looking for it // when looking for it
Read read = newTransaction().dataRead(); Read read = newTransaction().dataRead();
int propertyId = index.properties()[0]; int propertyId = index.properties()[0];
long foundId = read.nodeUniqueIndexSeek( index, exact( propertyId, value ) ); long foundId = read.lockingNodeUniqueIndexSeek( index, exact( propertyId, value ) );
commit(); commit();


// then // then
Expand All @@ -100,7 +100,7 @@ public void shouldNotFindNonMatchingNode() throws Exception


// when looking for it // when looking for it
Transaction transaction = newTransaction(); Transaction transaction = newTransaction();
long foundId = transaction.dataRead().nodeUniqueIndexSeek( index, exact( propertyId1, value ) ); long foundId = transaction.dataRead().lockingNodeUniqueIndexSeek( index, exact( propertyId1, value ) );
commit(); commit();


// then // then
Expand All @@ -118,7 +118,7 @@ public void shouldCompositeFindMatchingNode() throws Exception


// when looking for it // when looking for it
Transaction transaction = newTransaction(); Transaction transaction = newTransaction();
long foundId = transaction.dataRead().nodeUniqueIndexSeek( index, long foundId = transaction.dataRead().lockingNodeUniqueIndexSeek( index,
exact( propertyId1, value1 ), exact( propertyId1, value1 ),
exact( propertyId2, value2 ) ); exact( propertyId2, value2 ) );
commit(); commit();
Expand All @@ -138,7 +138,7 @@ public void shouldNotCompositeFindNonMatchingNode() throws Exception


// when looking for it // when looking for it
Transaction transaction = newTransaction(); Transaction transaction = newTransaction();
long foundId = transaction.dataRead().nodeUniqueIndexSeek( index, long foundId = transaction.dataRead().lockingNodeUniqueIndexSeek( index,
exact( propertyId1, value1 ), exact( propertyId1, value1 ),
exact( propertyId2, value2 ) ); exact( propertyId2, value2 ) );
commit(); commit();
Expand Down Expand Up @@ -182,7 +182,7 @@ public void shouldBlockUniqueIndexSeekFromCompetingTransaction() throws Exceptio
latch.waitForAllToStart(); latch.waitForAllToStart();
try ( Transaction tx = session.beginTransaction() ) try ( Transaction tx = session.beginTransaction() )
{ {
tx.dataRead().nodeUniqueIndexSeek( index, exact( propertyId1, value ) ); tx.dataRead().lockingNodeUniqueIndexSeek( index, exact( propertyId1, value ) );
tx.success(); tx.success();
} }
catch ( KernelException e ) catch ( KernelException e )
Expand Down
Expand Up @@ -308,7 +308,7 @@ public void unrelatedNodesWithSamePropertyShouldNotInterfereWithUniquenessCheck(
createLabeledNode( transaction, "Item", "id", 2 ); createLabeledNode( transaction, "Item", "id", 2 );


// then I should find the original node // then I should find the original node
assertThat( transaction.dataRead().nodeUniqueIndexSeek( idx, exact( propId, Values.of( 1 ) ) ), assertThat( transaction.dataRead().lockingNodeUniqueIndexSeek( idx, exact( propId, Values.of( 1 ) ) ),
equalTo( ourNode ) ); equalTo( ourNode ) );
commit(); commit();
} }
Expand Down Expand Up @@ -336,7 +336,7 @@ public void addingUniqueNodeWithUnrelatedValueShouldNotAffectLookup() throws Exc
createLabeledNode( transaction, "Person", "id", 2 ); createLabeledNode( transaction, "Person", "id", 2 );


// then I should find the original node // then I should find the original node
assertThat( transaction.dataRead().nodeUniqueIndexSeek( idx, exact( propId, Values.of( 1 ) ) ), assertThat( transaction.dataRead().lockingNodeUniqueIndexSeek( idx, exact( propId, Values.of( 1 ) ) ),
equalTo( ourNode ) ); equalTo( ourNode ) );
commit(); commit();
} }
Expand Down
Expand Up @@ -101,7 +101,7 @@ long graphPropertiesReference()
} }


@Override @Override
IndexReader indexReader( org.neo4j.internal.kernel.api.IndexReference index ) IndexReader indexReader( IndexReference index, boolean fresh )
{ {
throw new UnsupportedOperationException( "not implemented" ); throw new UnsupportedOperationException( "not implemented" );
} }
Expand Down

0 comments on commit ba4b828

Please sign in to comment.