Skip to content

Commit

Permalink
Join Top implementation
Browse files Browse the repository at this point in the history
Instead of three disparate implementations of `Top`, use one implementation
in all three runtimes.
  • Loading branch information
pontusmelke committed Aug 23, 2018
1 parent 5af6d0d commit f04d799
Show file tree
Hide file tree
Showing 13 changed files with 211 additions and 474 deletions.
Expand Up @@ -19,6 +19,8 @@
*/ */
package org.neo4j.cypher.internal.compatibility.v3_4.runtime package org.neo4j.cypher.internal.compatibility.v3_4.runtime


import java.util.Comparator

import org.neo4j.cypher.internal.compatibility.v3_4.runtime.executionplan.builders.prepare.KeyTokenResolver import org.neo4j.cypher.internal.compatibility.v3_4.runtime.executionplan.builders.prepare.KeyTokenResolver
import org.neo4j.cypher.internal.compatibility.v3_4.runtime.pipes.DropResultPipe import org.neo4j.cypher.internal.compatibility.v3_4.runtime.pipes.DropResultPipe
import org.neo4j.cypher.internal.frontend.v3_4.phases.Monitors import org.neo4j.cypher.internal.frontend.v3_4.phases.Monitors
Expand Down Expand Up @@ -164,7 +166,7 @@ case class CommunityPipeBuilder(monitors: Monitors, recurse: LogicalPlan => Pipe
ActiveReadPipe(source)(id = id) ActiveReadPipe(source)(id = id)


case Optional(inner, protectedSymbols) => case Optional(inner, protectedSymbols) =>
OptionalPipe((inner.availableSymbols -- protectedSymbols), source)(id = id) OptionalPipe(inner.availableSymbols -- protectedSymbols, source)(id = id)


case PruningVarExpand(_, from, dir, types, toName, minLength, maxLength, predicates) => case PruningVarExpand(_, from, dir, types, toName, minLength, maxLength, predicates) =>
val predicate = varLengthPredicate(predicates) val predicate = varLengthPredicate(predicates)
Expand All @@ -176,19 +178,22 @@ case class CommunityPipeBuilder(monitors: Monitors, recurse: LogicalPlan => Pipe
case SkipPlan(_, count) => case SkipPlan(_, count) =>
SkipPipe(source, buildExpression(count))(id = id) SkipPipe(source, buildExpression(count))(id = id)


case Top(_, sortItems, _) if sortItems.isEmpty => source

case Top(_, sortItems, SignedDecimalIntegerLiteral("1")) => case Top(_, sortItems, SignedDecimalIntegerLiteral("1")) =>
Top1Pipe(source, sortItems.map(translateColumnOrder).toList)(id = id) Top1Pipe(source, ExecutionContextOrdering.asComparator(sortItems.map(translateColumnOrder).toList))(id = id)


case Top(_, sortItems, limit) => case Top(_, sortItems, limit) =>
TopNPipe(source, sortItems.map(translateColumnOrder).toList, buildExpression(limit))(id = id) TopNPipe(source, buildExpression(limit),
ExecutionContextOrdering.asComparator(sortItems.map(translateColumnOrder).toList))(id = id)


case LimitPlan(_, count, DoNotIncludeTies) => case LimitPlan(_, count, DoNotIncludeTies) =>
LimitPipe(source, buildExpression(count))(id = id) LimitPipe(source, buildExpression(count))(id = id)


case LimitPlan(_, count, IncludeTies) => case LimitPlan(_, count, IncludeTies) =>
(source, count) match { (source, count) match {
case (SortPipe(inner, sortDescription), SignedDecimalIntegerLiteral("1")) => case (SortPipe(inner, sortDescription), SignedDecimalIntegerLiteral("1")) =>
Top1WithTiesPipe(inner, sortDescription.toList)(id = id) Top1WithTiesPipe(inner, ExecutionContextOrdering.asComparator(sortDescription))(id = id)


case _ => throw new InternalException("Including ties is only supported for very specific plans") case _ => throw new InternalException("Including ties is only supported for very specific plans")
} }
Expand Down
Expand Up @@ -30,9 +30,7 @@ case class SortPipe(source: Pipe, orderBy: Seq[ColumnOrder])
extends PipeWithSource(source) { extends PipeWithSource(source) {
assert(orderBy.nonEmpty) assert(orderBy.nonEmpty)


private val comparator = orderBy private val comparator = ExecutionContextOrdering.asComparator(orderBy)
.map(new ExecutionContextOrdering(_))
.reduceLeft[Comparator[ExecutionContext]]((a, b) => a.thenComparing(b))


protected def internalCreateResults(input: Iterator[ExecutionContext], state: QueryState): Iterator[ExecutionContext] = { protected def internalCreateResults(input: Iterator[ExecutionContext], state: QueryState): Iterator[ExecutionContext] = {
val array = input.toArray val array = input.toArray
Expand All @@ -41,7 +39,7 @@ case class SortPipe(source: Pipe, orderBy: Seq[ColumnOrder])
} }
} }


private class ExecutionContextOrdering(order: ColumnOrder) extends scala.Ordering[ExecutionContext] { case class ExecutionContextOrdering(order: ColumnOrder) extends scala.Ordering[ExecutionContext] {
override def compare(a: ExecutionContext, b: ExecutionContext): Int = { override def compare(a: ExecutionContext, b: ExecutionContext): Int = {
val column = order.id val column = order.id
val aVal = a(column) val aVal = a(column)
Expand All @@ -50,6 +48,11 @@ private class ExecutionContextOrdering(order: ColumnOrder) extends scala.Orderin
} }
} }


object ExecutionContextOrdering {
def asComparator(orderBy: Seq[ColumnOrder]): Comparator[ExecutionContext] = orderBy.map(ExecutionContextOrdering.apply)
.reduceLeft[Comparator[ExecutionContext]]((a, b) => a.thenComparing(b))
}

sealed trait ColumnOrder { sealed trait ColumnOrder {
def id: String def id: String


Expand Down
Expand Up @@ -19,104 +19,43 @@
*/ */
package org.neo4j.cypher.internal.runtime.interpreted.pipes package org.neo4j.cypher.internal.runtime.interpreted.pipes


import java.util.{Collections, Comparator} import java.util.Comparator


import org.neo4j.cypher.internal.DefaultComparatorTopTable
import org.neo4j.cypher.internal.runtime.interpreted.ExecutionContext import org.neo4j.cypher.internal.runtime.interpreted.ExecutionContext
import org.neo4j.cypher.internal.runtime.interpreted.commands.expressions.Expression import org.neo4j.cypher.internal.runtime.interpreted.commands.expressions.Expression
import org.neo4j.cypher.internal.util.v3_4.attribution.Id import org.neo4j.cypher.internal.util.v3_4.attribution.Id
import org.neo4j.values.AnyValue
import org.neo4j.values.storable.NumberValue import org.neo4j.values.storable.NumberValue


import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConverters._
import scala.math._


/* /*
* TopPipe is used when a query does a ORDER BY ... LIMIT query. Instead of ordering the whole result set and then * TopPipe is used when a query does a ORDER BY ... LIMIT query. Instead of ordering the whole result set and then
* returning the matching top results, we only keep the top results in heap, which allows us to release memory earlier * returning the matching top results, we only keep the top results in heap, which allows us to release memory earlier
*/ */
abstract class TopPipe(source: Pipe, sortDescription: List[ColumnOrder]) abstract class TopPipe(source: Pipe, comparator: Comparator[ExecutionContext]) extends PipeWithSource(source)
extends PipeWithSource(source) {


val sortItems: IndexedSeq[ColumnOrder] = sortDescription.toIndexedSeq case class TopNPipe(source: Pipe, countExpression: Expression, comparator: Comparator[ExecutionContext])
private val sortItemsCount: Int = sortItems.length (val id: Id = Id.INVALID_ID) extends TopPipe(source, comparator: Comparator[ExecutionContext]) {

type SortDataWithContext = (IndexedSeq[AnyValue], ExecutionContext)

class LessThanComparator() extends Ordering[SortDataWithContext] {
override def compare(a: SortDataWithContext, b: SortDataWithContext): Int = {
val v1 = a._1
val v2 = b._1
var i = 0
while (i < sortItemsCount) {
val res = sortItems(i).compareValues(v1(i), v2(i))

if (res != 0)
return res
i += 1
}
0
}
}

def binarySearch(array: ArrayBuffer[SortDataWithContext], comparator: Ordering[SortDataWithContext])(key: SortDataWithContext) = {
import scala.collection.Searching._
array.search(key)(comparator).insertionPoint
}

def arrayEntry(ctx: ExecutionContext, state: QueryState): SortDataWithContext =
(sortItems.map(column => ctx(column.id)), ctx)
}

case class TopNPipe(source: Pipe, sortDescription: List[ColumnOrder], countExpression: Expression)
(val id: Id = Id.INVALID_ID) extends TopPipe(source, sortDescription) {


countExpression.registerOwningPipe(this) countExpression.registerOwningPipe(this)


protected override def internalCreateResults(input:Iterator[ExecutionContext], state: QueryState): Iterator[ExecutionContext] = { protected override def internalCreateResults(input:Iterator[ExecutionContext], state: QueryState): Iterator[ExecutionContext] = {
if (input.isEmpty) if (input.isEmpty) Iterator.empty
Iterator.empty
else if (sortDescription.isEmpty)
input
else { else {

val first = input.next() val first = input.next()
val count = countExpression(first, state).asInstanceOf[NumberValue].longValue().toInt val count = countExpression(first, state).asInstanceOf[NumberValue].longValue().toInt
val topTable = new DefaultComparatorTopTable(comparator, count)
topTable.add(first)


if (count <= 0) { input.foreach {
Iterator.empty ctx =>
} else { topTable.add(ctx)

var result = new ArrayBuffer[SortDataWithContext](Math.min(count, 1024))
result.append(arrayEntry(first, state))
var last : Int = 0

while ( last < count - 1 && input.hasNext ) {
last += 1
result.append(arrayEntry(input.next(), state))
}

val lessThan = new LessThanComparator()
if (input.isEmpty) {
result.slice(0,last + 1).sorted(lessThan).iterator.map(_._2)
} else {
result = result.sorted(lessThan)

val search = binarySearch(result, lessThan) _
input.foreach {
ctx =>
val next = arrayEntry(ctx, state)
if (lessThan.compare(next, result(last)) < 0) {
val idx = search(next)
val insertPosition = if (idx < 0 ) - idx - 1 else idx + 1
if (insertPosition >= 0 && insertPosition < count) {
Array.copy(result, insertPosition, result, insertPosition + 1, count - insertPosition - 1)
result(insertPosition) = next
}
}
}
result.toIterator.map(_._2)
}
} }

topTable.sort()

topTable.iterator.asScala
} }
} }
} }
Expand All @@ -125,75 +64,66 @@ case class TopNPipe(source: Pipe, sortDescription: List[ColumnOrder], countExpre
* Special case for when we only have one element, in this case it is no idea to store * Special case for when we only have one element, in this case it is no idea to store
* an array, instead just store a single value. * an array, instead just store a single value.
*/ */
case class Top1Pipe(source: Pipe, sortDescription: List[ColumnOrder]) case class Top1Pipe(source: Pipe, comparator: Comparator[ExecutionContext])
(val id: Id = Id.INVALID_ID) (val id: Id = Id.INVALID_ID)
extends TopPipe(source, sortDescription) { extends TopPipe(source, comparator: Comparator[ExecutionContext]) {


protected override def internalCreateResults(input: Iterator[ExecutionContext], protected override def internalCreateResults(input: Iterator[ExecutionContext],
state: QueryState): Iterator[ExecutionContext] = { state: QueryState): Iterator[ExecutionContext] = {
if (input.isEmpty) if (input.isEmpty) Iterator.empty
Iterator.empty
else if (sortDescription.isEmpty)
input
else { else {


val lessThan = new LessThanComparator()

val first = input.next() val first = input.next()
var result = arrayEntry(first, state) var result = first


input.foreach { input.foreach {
ctx => ctx =>
val next = arrayEntry(ctx, state) if (comparator.compare(ctx, result) < 0) {
if (lessThan.compare(next, result) < 0) { result = ctx
result = next
} }
} }
Iterator.single(result._2) Iterator.single(result)
} }
} }
} }


/* /*
* Special case for when we only want one element, and all others that have the same value (tied for first place) * Special case for when we only want one element, and all others that have the same value (tied for first place)
*/ */
case class Top1WithTiesPipe(source: Pipe, sortDescription: List[ColumnOrder]) case class Top1WithTiesPipe(source: Pipe, comparator: Comparator[ExecutionContext])
(val id: Id = Id.INVALID_ID) (val id: Id = Id.INVALID_ID)
extends TopPipe(source, sortDescription) { extends TopPipe(source, comparator: Comparator[ExecutionContext]) {


protected override def internalCreateResults(input: Iterator[ExecutionContext], protected override def internalCreateResults(input: Iterator[ExecutionContext],
state: QueryState): Iterator[ExecutionContext] = { state: QueryState): Iterator[ExecutionContext] = {
if (input.isEmpty) if (input.isEmpty)
Iterator.empty Iterator.empty
else { else {
val lessThan = new LessThanComparator()

val first = input.next() val first = input.next()
var best = arrayEntry(first, state) var best = first
var matchingRows = init(best) var matchingRows = init(best)


input.foreach { input.foreach {
ctx => ctx =>
val next = arrayEntry(ctx, state) val comparison = comparator.compare(ctx, best)
val comparison = lessThan.compare(next, best)
if (comparison < 0) { // Found a new best if (comparison < 0) { // Found a new best
best = next best = ctx
matchingRows.clear() matchingRows.clear()
matchingRows += next._2 matchingRows += ctx
} }


if (comparison == 0) { // Found a tie if (comparison == 0) { // Found a tie
matchingRows += next._2 matchingRows += ctx
} }
} }
matchingRows.result().iterator matchingRows.result().iterator
} }
} }


@inline @inline
private def init(first: SortDataWithContext) = { private def init(first: ExecutionContext) = {
val builder = Vector.newBuilder[ExecutionContext] val builder = Vector.newBuilder[ExecutionContext]
builder += first._2 builder += first
builder builder
} }
} }
Expand Up @@ -19,24 +19,26 @@
*/ */
package org.neo4j.cypher.internal.runtime.interpreted.pipes package org.neo4j.cypher.internal.runtime.interpreted.pipes


import org.neo4j.cypher.internal.runtime.interpreted.QueryStateHelper import java.util.Comparator

import org.neo4j.cypher.internal.runtime.interpreted.ValueComparisonHelper.beEquivalentTo import org.neo4j.cypher.internal.runtime.interpreted.ValueComparisonHelper.beEquivalentTo
import org.neo4j.cypher.internal.runtime.interpreted.{ExecutionContext, QueryStateHelper}
import org.neo4j.cypher.internal.util.v3_4.symbols._ import org.neo4j.cypher.internal.util.v3_4.symbols._
import org.neo4j.cypher.internal.util.v3_4.test_helpers.CypherFunSuite import org.neo4j.cypher.internal.util.v3_4.test_helpers.CypherFunSuite


class Top1WithTiesPipeTest extends CypherFunSuite { class Top1WithTiesPipeTest extends CypherFunSuite {


test("empty input gives empty output") { test("empty input gives empty output") {
val source = new FakePipe(List(), "x" -> CTAny) val source = new FakePipe(List(), "x" -> CTAny)
val sortPipe = Top1WithTiesPipe(source, List(Ascending("x")))() val sortPipe = Top1WithTiesPipe(source, ExecutionContextOrdering.asComparator(List(Ascending("x"))))()


sortPipe.createResults(QueryStateHelper.emptyWithValueSerialization) should be(empty) sortPipe.createResults(QueryStateHelper.emptyWithValueSerialization) should be(empty)
} }


test("simple sorting works as expected") { test("simple sorting works as expected") {
val list = List(Map("x" -> "B"), Map("x" -> "A")).iterator val list = List(Map("x" -> "B"), Map("x" -> "A")).iterator
val source = new FakePipe(list, "x" -> CTString) val source = new FakePipe(list, "x" -> CTString)
val sortPipe = Top1WithTiesPipe(source, List(Ascending("x")))() val sortPipe = Top1WithTiesPipe(source, ExecutionContextOrdering.asComparator(List(Ascending("x"))))()


sortPipe.createResults(QueryStateHelper.emptyWithValueSerialization).toList should beEquivalentTo(List(Map("x" -> "A"))) sortPipe.createResults(QueryStateHelper.emptyWithValueSerialization).toList should beEquivalentTo(List(Map("x" -> "A")))
} }
Expand All @@ -50,7 +52,7 @@ class Top1WithTiesPipeTest extends CypherFunSuite {
).iterator ).iterator


val source = new FakePipe(input, "x" -> CTInteger, "y" -> CTInteger) val source = new FakePipe(input, "x" -> CTInteger, "y" -> CTInteger)
val sortPipe = Top1WithTiesPipe(source, List(Ascending("x")))() val sortPipe = Top1WithTiesPipe(source, ExecutionContextOrdering.asComparator(List(Ascending("x"))))()


sortPipe.createResults(QueryStateHelper.emptyWithValueSerialization).toList should beEquivalentTo(List( sortPipe.createResults(QueryStateHelper.emptyWithValueSerialization).toList should beEquivalentTo(List(
Map("x" -> 1, "y" -> 1), Map("x" -> 1, "y" -> 1),
Expand All @@ -64,7 +66,7 @@ class Top1WithTiesPipeTest extends CypherFunSuite {
).iterator ).iterator


val source = new FakePipe(input, "x" -> CTInteger, "y" -> CTInteger) val source = new FakePipe(input, "x" -> CTInteger, "y" -> CTInteger)
val sortPipe = Top1WithTiesPipe(source, List(Ascending("x")))() val sortPipe = Top1WithTiesPipe(source, ExecutionContextOrdering.asComparator(List(Ascending("x"))))()


sortPipe.createResults(QueryStateHelper.emptyWithValueSerialization).toList should beEquivalentTo(List( sortPipe.createResults(QueryStateHelper.emptyWithValueSerialization).toList should beEquivalentTo(List(
Map("x" -> null, "y" -> 1), Map("x" -> null, "y" -> 1),
Expand All @@ -79,7 +81,7 @@ class Top1WithTiesPipeTest extends CypherFunSuite {
).iterator ).iterator


val source = new FakePipe(input, "x" -> CTInteger, "y" -> CTInteger) val source = new FakePipe(input, "x" -> CTInteger, "y" -> CTInteger)
val sortPipe = Top1WithTiesPipe(source, List(Ascending("x")))() val sortPipe = Top1WithTiesPipe(source, ExecutionContextOrdering.asComparator(List(Ascending("x"))))()


sortPipe.createResults(QueryStateHelper.emptyWithValueSerialization).toList should beEquivalentTo(List( sortPipe.createResults(QueryStateHelper.emptyWithValueSerialization).toList should beEquivalentTo(List(
Map("x" -> 1, "y" -> 1))) Map("x" -> 1, "y" -> 1)))
Expand All @@ -93,7 +95,7 @@ class Top1WithTiesPipeTest extends CypherFunSuite {
).iterator ).iterator


val source = new FakePipe(input, "x" -> CTInteger, "y" -> CTInteger) val source = new FakePipe(input, "x" -> CTInteger, "y" -> CTInteger)
val sortPipe = Top1WithTiesPipe(source, List(Ascending("x")))() val sortPipe = Top1WithTiesPipe(source, ExecutionContextOrdering.asComparator(List(Ascending("x"))))()


sortPipe.createResults(QueryStateHelper.emptyWithValueSerialization).toList should beEquivalentTo(List( sortPipe.createResults(QueryStateHelper.emptyWithValueSerialization).toList should beEquivalentTo(List(
Map("x" -> smaller, "y" -> 1) Map("x" -> smaller, "y" -> 1)
Expand All @@ -107,10 +109,11 @@ class Top1WithTiesPipeTest extends CypherFunSuite {
).iterator ).iterator


val source = new FakePipe(input, "x" -> CTInteger, "y" -> CTInteger) val source = new FakePipe(input, "x" -> CTInteger, "y" -> CTInteger)
val sortPipe = Top1WithTiesPipe(source, List(Ascending("x")))() val sortPipe = Top1WithTiesPipe(source, ExecutionContextOrdering.asComparator(List(Ascending("x"))))()


sortPipe.createResults(QueryStateHelper.emptyWithValueSerialization).toList should beEquivalentTo(List( sortPipe.createResults(QueryStateHelper.emptyWithValueSerialization).toList should beEquivalentTo(List(
Map("x" -> "A", "y" -> 2) Map("x" -> "A", "y" -> 2)
)) ))
} }

} }

0 comments on commit f04d799

Please sign in to comment.