Skip to content

Commit

Permalink
Morsel: Get index provided property values
Browse files Browse the repository at this point in the history
This enables index related Operators to get the index provided property
values and write them into rows.
  • Loading branch information
sherfert committed Aug 14, 2018
1 parent 1c3b023 commit 27d6d8c
Show file tree
Hide file tree
Showing 10 changed files with 594 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.internal.kernel.api.helpers;

import java.util.Iterator;
import java.util.List;

import org.neo4j.helpers.collection.Pair;
import org.neo4j.internal.kernel.api.NodeCursor;
import org.neo4j.internal.kernel.api.NodeValueIndexCursor;
import org.neo4j.values.storable.Value;

public class StubNodeValueIndexCursor implements NodeValueIndexCursor
{
Iterator<Pair<Long,List<Value>>> things;
Pair<Long,List<Value>> current = null;

public StubNodeValueIndexCursor( Iterator<Pair<Long,List<Value>>> things )
{
this.things = things;
}

@Override
public void node( NodeCursor cursor )
{
throw new UnsupportedOperationException();
}

@Override
public long nodeReference()
{
return current.first();
}

@Override
public boolean next()
{
if ( things.hasNext() )
{
current = things.next();
return true;
}
return false;
}

@Override
public int numberOfProperties()
{
return current.other().size();
}

@Override
public int propertyKey( int offset )
{
return 0;
}

@Override
public boolean hasValue()
{
return current.other() != null;
}

@Override
public Value propertyValue( int offset )
{
return current.other().get( offset );
}

@Override
public void close()
{

}

@Override
public boolean isClosed()
{
return false;
}
}
16 changes: 16 additions & 0 deletions enterprise/cypher/morsel-runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,22 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.neo4j</groupId>
<artifactId>neo4j-cypher-interpreted-runtime</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.neo4j</groupId>
<artifactId>neo4j-kernel-api</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,19 @@
*/
package org.neo4j.cypher.internal.runtime.vectorized

import org.neo4j.cypher.internal.compatibility.v3_5.runtime.RefSlot
import org.neo4j.cypher.internal.compatibility.v3_5.runtime.SlotAllocation.PhysicalPlan
import org.neo4j.cypher.internal.compatibility.v3_5.runtime.{RefSlot, SlotConfiguration}
import org.neo4j.cypher.internal.compiler.v3_5.planner.CantCompileQueryException
import org.neo4j.cypher.internal.runtime.interpreted.commands.convert.ExpressionConverters
import org.neo4j.cypher.internal.runtime.interpreted.pipes.{IndexSeekModeFactory, LazyLabel, LazyTypes}
import org.neo4j.cypher.internal.runtime.slotted.SlottedPipeBuilder.translateColumnOrder
import org.neo4j.cypher.internal.runtime.slotted.pipes.SlottedIndexedProperty
import org.neo4j.cypher.internal.runtime.vectorized.expressions.AggregationExpressionOperator
import org.neo4j.cypher.internal.runtime.vectorized.operators._
import org.neo4j.cypher.internal.v3_5.logical.plans
import org.neo4j.cypher.internal.v3_5.logical.plans._
import org.opencypher.v9_0.ast.semantics.SemanticTable
import org.opencypher.v9_0.expressions.PropertyKeyToken
import org.opencypher.v9_0.util.InternalException

class PipelineBuilder(physicalPlan: PhysicalPlan, converters: ExpressionConverters, readOnly: Boolean)
Expand Down Expand Up @@ -64,13 +66,15 @@ class PipelineBuilder(physicalPlan: PhysicalPlan, converters: ExpressionConverte
slots.getLongOffsetFor(column),
labelToken.nameId.id,
propertyKey.nameId.id,
getMaybeIndexedValueOffset(column, slots, propertyKey),
argumentSize)

case NodeIndexContainsScan(column, labelToken, propertyKey, valueExpr, _) =>
new NodeIndexContainsScanOperator(
slots.getLongOffsetFor(column),
labelToken.nameId.id,
propertyKey.nameId.id,
getMaybeIndexedValueOffset(column, slots, propertyKey),
converters.toCommandExpression(valueExpr),
argumentSize)

Expand All @@ -79,7 +83,7 @@ class PipelineBuilder(physicalPlan: PhysicalPlan, converters: ExpressionConverte
new NodeIndexSeekOperator(
slots.getLongOffsetFor(column),
label,
propertyKeys,
getIndexedProperties(column, propertyKeys, slots),
argumentSize,
valueExpr.map(converters.toCommandExpression),
indexSeekMode)
Expand All @@ -89,7 +93,7 @@ class PipelineBuilder(physicalPlan: PhysicalPlan, converters: ExpressionConverte
new NodeIndexSeekOperator(
slots.getLongOffsetFor(column),
label,
propertyKeys,
getIndexedProperties(column, propertyKeys, slots),
argumentSize,
valueExpr.map(converters.toCommandExpression),
indexSeekMode)
Expand All @@ -103,6 +107,27 @@ class PipelineBuilder(physicalPlan: PhysicalPlan, converters: ExpressionConverte
new StreamingPipeline(thisOp, slots, None)
}

private def getIndexedProperties(column: String, propertyKeys: Seq[PropertyKeyToken], slots: SlotConfiguration): Seq[SlottedIndexedProperty] = {
propertyKeys.map { pk =>
val maybeOffset =
getMaybeIndexedValueOffset(column, slots, pk)
SlottedIndexedProperty(pk.nameId.id, maybeOffset)
}
}

/**
* If the value of a property should be fetched from the index, this returns the slot offset to store the value in
*/
private def getMaybeIndexedValueOffset(column: String, slots: SlotConfiguration, pk: PropertyKeyToken): Option[Int] = {
// TODO getValueFromIndex
if (false) {
val name = column + "." + pk.name
Some(slots.getReferenceOffsetFor(name))
} else {
None
}
}

override protected def build(plan: LogicalPlan, from: Pipeline): Pipeline = {
var source = from
val slots = physicalPlan.slotConfigurations(plan.id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,18 @@ import org.neo4j.internal.kernel.api._
import org.neo4j.values.storable.{TextValue, Values}
import org.opencypher.v9_0.util.CypherTypeException

class NodeIndexContainsScanOperator(offset: Int, label: Int, propertyKey: Int, valueExpr: Expression,
class NodeIndexContainsScanOperator(offset: Int,
label: Int,
propertyKey: Int,
maybeValueFromIndexOffset: Option[Int],
valueExpr: Expression,
argumentSize: SlotConfiguration.Size)
extends NodeIndexOperator[NodeValueIndexCursor](offset) {
extends NodeIndexOperatorWithValues[NodeValueIndexCursor](offset, maybeValueFromIndexOffset) {

override def init(context: QueryContext,
state: QueryState,
inputMorsel: MorselExecutionContext): ContinuableOperatorTask = {
val valueIndexCursor: NodeValueIndexCursor = context.transactionalContext.cursors.allocateNodeValueIndexCursor()
val read = context.transactionalContext.dataRead
val index = context.transactionalContext.schemaRead.index(label, propertyKey)
new OTask(valueIndexCursor, index)
}
Expand All @@ -61,8 +64,7 @@ class NodeIndexContainsScanOperator(offset: Int, label: Int, propertyKey: Int, v

value match {
case value: TextValue =>
//TODO we need to figure out how to deal with values and indexes here
read.nodeIndexSeek(index, valueIndexCursor, IndexOrder.NONE, true, IndexQuery.stringContains(index.properties()(0), value.stringValue()))
read.nodeIndexSeek(index, valueIndexCursor, IndexOrder.NONE, maybeValueFromIndexOffset.isDefined, IndexQuery.stringContains(index.properties()(0), value.stringValue()))
case Values.NO_VALUE =>
// CONTAINS null does not produce any rows
nullExpression = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ package org.neo4j.cypher.internal.runtime.vectorized.operators

import org.neo4j.cypher.internal.compatibility.v3_5.runtime.SlotConfiguration
import org.neo4j.cypher.internal.runtime.vectorized._
import org.neo4j.internal.kernel.api.NodeIndexCursor
import org.neo4j.internal.kernel.api.{NodeIndexCursor, NodeValueIndexCursor}

abstract class NodeIndexOperator[CURSOR <: NodeIndexCursor](offset: Int) extends StreamingOperator {

Expand All @@ -33,9 +33,8 @@ abstract class NodeIndexOperator[CURSOR <: NodeIndexCursor](offset: Int) extends
while (currentRow.hasMoreRows && cursorHasMore) {
cursorHasMore = cursor.next()
if (cursorHasMore) {
// iterationState.copyArgumentStateTo(currentRow, argumentSize.nLongs, argumentSize.nReferences)

currentRow.setLongAt(offset, cursor.nodeReference())
extensionForEachRow(cursor, currentRow)
currentRow.moveToNextRow()
}
}
Expand All @@ -49,4 +48,28 @@ abstract class NodeIndexOperator[CURSOR <: NodeIndexCursor](offset: Int) extends
}
cursorHasMore
}

/**
* An extension point for subclasses to do more with each row.
* This function is called in between `cursor.next()` and `currentRow.moveToNextRow()`
*/
protected def extensionForEachRow(cursor: CURSOR, currentRow: MorselExecutionContext): Unit = {}
}

/**
* Provides helper methods for index operators that get nodes together with actual property values.
*/
abstract class NodeIndexOperatorWithValues[CURSOR <: NodeValueIndexCursor](offset: Int, maybeValueFromIndexOffset: Option[Int])
extends NodeIndexOperator[CURSOR](offset) {

override protected def extensionForEachRow(cursor: CURSOR, currentRow: MorselExecutionContext): Unit = {
maybeValueFromIndexOffset.foreach { offset =>
if (!cursor.hasValue) {
// We were promised at plan time that we can get values everywhere, so this should never happen
throw new IllegalStateException("NodeCursor did unexpectedly not have values during index scan.")
}
val value = cursor.propertyValue(offset)
currentRow.setRefAt(offset, value)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@ import org.neo4j.cypher.internal.runtime.vectorized._
import org.neo4j.internal.kernel.api.{IndexOrder, IndexReference, NodeValueIndexCursor}


class NodeIndexScanOperator(offset: Int, label: Int, propertyKey: Int, argumentSize: SlotConfiguration.Size)
extends NodeIndexOperator[NodeValueIndexCursor](offset) {
class NodeIndexScanOperator(offset: Int,
label: Int,
propertyKey: Int,
maybeValueFromIndexOffset: Option[Int],
argumentSize: SlotConfiguration.Size)
extends NodeIndexOperatorWithValues[NodeValueIndexCursor](offset, maybeValueFromIndexOffset) {

override def init(context: QueryContext, state: QueryState, inputMorsel: MorselExecutionContext): ContinuableOperatorTask = {
val valueIndexCursor = context.transactionalContext.cursors.allocateNodeValueIndexCursor()
val read = context.transactionalContext.dataRead
val index = context.transactionalContext.schemaRead.index(label, propertyKey)
new OTask(valueIndexCursor, index)
}
Expand All @@ -48,8 +51,7 @@ class NodeIndexScanOperator(offset: Int, label: Int, propertyKey: Int, argumentS
val read = context.transactionalContext.dataRead

if (!hasMore) {
//TODO we need to figure out how to deal with values and indexes here
read.nodeIndexScan(index, valueIndexCursor, IndexOrder.NONE, true)
read.nodeIndexScan(index, valueIndexCursor, IndexOrder.NONE, maybeValueFromIndexOffset.isDefined)
}

hasMore = iterate(currentRow, valueIndexCursor, argumentSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,34 +26,33 @@ import org.neo4j.cypher.internal.compatibility.v3_5.runtime.SlotConfiguration
import org.neo4j.cypher.internal.runtime.QueryContext
import org.neo4j.cypher.internal.runtime.interpreted.commands.expressions.Expression
import org.neo4j.cypher.internal.runtime.interpreted.pipes.{IndexSeek, IndexSeekMode, NodeIndexSeeker, QueryState => OldQueryState}
import org.neo4j.cypher.internal.runtime.slotted.pipes.SlottedIndexedProperty
import org.neo4j.cypher.internal.runtime.vectorized._
import org.neo4j.cypher.internal.v3_5.logical.plans.QueryExpression
import org.neo4j.internal.kernel.api._
import org.neo4j.values.storable.Value
import org.neo4j.values.virtual.NodeValue
import org.opencypher.v9_0.expressions.{LabelToken, PropertyKeyToken}
import org.opencypher.v9_0.expressions.LabelToken

class NodeIndexSeekOperator(offset: Int,
label: LabelToken,
propertyKeys: Seq[PropertyKeyToken],
properties: Seq[SlottedIndexedProperty],
argumentSize: SlotConfiguration.Size,
override val valueExpr: QueryExpression[Expression],
override val indexMode: IndexSeekMode = IndexSeek)
extends StreamingOperator with NodeIndexSeeker {

// TODO only the propertyKeys that we can get and conditionally other method just for nodes
val propertyIndicesWithValues = propertyKeys.indices
private val propertyIndicesWithValues: Seq[Int] = properties.zipWithIndex.filter(_._1.getValueFromIndex).map(_._2)
val propertyOffsets: Seq[Int] = properties.map(_.slotOffset).collect{ case Some(o) => o }

override def init(context: QueryContext, state: QueryState, currentRow: MorselExecutionContext): ContinuableOperatorTask = {
val valueIndexCursor = context.transactionalContext.cursors.allocateNodeValueIndexCursor()
val read = context.transactionalContext.dataRead
val queryState = new OldQueryState(context, resources = null, params = state.params)
val indexReference = reference(context)
val tupleIterator = indexSeek(queryState, indexReference, propertyIndicesWithValues, currentRow)
new OTask(tupleIterator)
}

override val propertyIds: Array[Int] = propertyKeys.map(_.nameId.id).toArray
override val propertyIds: Array[Int] = properties.map(_.propertyKeyId).toArray

private var reference: IndexReference = IndexReference.NO_INDEX

Expand All @@ -71,10 +70,11 @@ class NodeIndexSeekOperator(offset: Int,

var processedRows = 0
while (currentRow.hasMoreRows && tupleIterator.hasNext) {
// iterationState.copyArgumentStateTo(currentRow, argumentSize.nLongs, argumentSize.nReferences)
val (node, values) = tupleIterator.next()
currentRow.setLongAt(offset, node.id())
// TODO set values
propertyOffsets.foreach {
offset => currentRow.setRefAt(offset, values(offset))
}
currentRow.moveToNextRow()
}

Expand Down

0 comments on commit 27d6d8c

Please sign in to comment.