Skip to content

Commit

Permalink
Only keep the top x matching rows
Browse files Browse the repository at this point in the history
When a query with both ORDER BY and LIMIT is encountered, Cypher now only keeps a limited set
of the input in heap. Before this commit, the whole input would be ordered, and then the top x
rows kept. Now, Cypher builds a buffer of the top x hits, that it keeps ordered. Really large
queries can now still be executed without leading to a OutOfMemoryError.
  • Loading branch information
systay committed Nov 3, 2012
1 parent 5c592e9 commit ab728fe
Show file tree
Hide file tree
Showing 12 changed files with 251 additions and 22 deletions.
1 change: 1 addition & 0 deletions cypher/CHANGES.txt
Expand Up @@ -3,6 +3,7 @@
o The traversal pattern matcher now supports variable length relationship patterns o The traversal pattern matcher now supports variable length relationship patterns
o Fixes #946 - HAS(...) fails with ThisShouldNotHappenException for some patterns o Fixes #946 - HAS(...) fails with ThisShouldNotHappenException for some patterns
o Major refactoring to make certain Cypher is more lazy o Major refactoring to make certain Cypher is more lazy
o When asking for the top x rows by some value, Cypher will now only keep a list the size of x


1.9.M01 (2012-10-23) 1.9.M01 (2012-10-23)
-------------------- --------------------
Expand Down
Expand Up @@ -181,7 +181,8 @@ The Neo4j Team""")
new CreateNodesAndRelationshipsBuilder(graph), new CreateNodesAndRelationshipsBuilder(graph),
new UpdateActionBuilder(graph), new UpdateActionBuilder(graph),
new EmptyResultBuilder, new EmptyResultBuilder,
new TraversalMatcherBuilder(graph) new TraversalMatcherBuilder(graph),
new TopPipeBuilder
) )


override def toString = executionPlanText override def toString = executionPlanText
Expand Down
Expand Up @@ -42,7 +42,7 @@ object PartiallySolvedQuery {
where = q.where.toSeq.flatMap(_.atoms.map(Unsolved(_))), where = q.where.toSeq.flatMap(_.atoms.map(Unsolved(_))),
aggregation = q.aggregation.toSeq.flatten.map(Unsolved(_)), aggregation = q.aggregation.toSeq.flatten.map(Unsolved(_)),
sort = q.sort.map(Unsolved(_)), sort = q.sort.map(Unsolved(_)),
slice = q.slice.toSeq.map(Unsolved(_)), slice = q.slice.map(Unsolved(_)),
namedPaths = q.namedPaths.map(Unsolved(_)), namedPaths = q.namedPaths.map(Unsolved(_)),
aggregateQuery = if (q.aggregation.isDefined) aggregateQuery = if (q.aggregation.isDefined)
Unsolved(true) Unsolved(true)
Expand All @@ -62,7 +62,7 @@ object PartiallySolvedQuery {
where = Seq(), where = Seq(),
aggregation = Seq(), aggregation = Seq(),
sort = Seq(), sort = Seq(),
slice = Seq(), slice = None,
namedPaths = Seq(), namedPaths = Seq(),
aggregateQuery = Solved(false), aggregateQuery = Solved(false),
extracted = false, extracted = false,
Expand All @@ -81,7 +81,7 @@ case class PartiallySolvedQuery(returns: Seq[QueryToken[ReturnColumn]],
where: Seq[QueryToken[Predicate]], where: Seq[QueryToken[Predicate]],
aggregation: Seq[QueryToken[AggregationExpression]], aggregation: Seq[QueryToken[AggregationExpression]],
sort: Seq[QueryToken[SortItem]], sort: Seq[QueryToken[SortItem]],
slice: Seq[QueryToken[Slice]], slice: Option[QueryToken[Slice]],
namedPaths: Seq[QueryToken[NamedPath]], namedPaths: Seq[QueryToken[NamedPath]],
aggregateQuery: QueryToken[Boolean], aggregateQuery: QueryToken[Boolean],
extracted: Boolean, extracted: Boolean,
Expand Down
Expand Up @@ -45,6 +45,7 @@ object PlanBuilder extends Enumeration {
val RelationshipById = -1 val RelationshipById = -1
val IndexQuery = 0 val IndexQuery = 0
val Extraction = 0 val Extraction = 0
val TopX = -1
val Slice = 0 val Slice = 0
val ColumnFilter = 0 val ColumnFilter = 0
val GlobalStart = 1 val GlobalStart = 1
Expand Down
Expand Up @@ -24,19 +24,9 @@ import org.neo4j.cypher.internal.executionplan.{ExecutionPlanInProgress, PlanBui
import org.neo4j.cypher.internal.commands.expressions.{Identifier, CachedExpression, Expression} import org.neo4j.cypher.internal.commands.expressions.{Identifier, CachedExpression, Expression}
import org.neo4j.cypher.CypherTypeException import org.neo4j.cypher.CypherTypeException


class SortBuilder extends PlanBuilder { class SortBuilder extends PlanBuilder with SortingPreparations {
def apply(plan: ExecutionPlanInProgress) = { def apply(plan: ExecutionPlanInProgress) = {
val sortExpressionsToExtract: Seq[(String, Expression)] = plan.query.sort.flatMap(x => x.token.expression match { val newPlan = extractBeforeSort(plan)
case _: CachedExpression => None
case _: Identifier => None
case e => Some(" INTERNAL_SORT" + e.## -> e)
})

val newPlan = try {
ExtractBuilder.extractIfNecessary(plan, sortExpressionsToExtract.toMap)
} catch {
case e: CypherTypeException => throw new CypherTypeException(e.getMessage + " - maybe aggregation removed it?")
}


val q = newPlan.query val q = newPlan.query
val sortItems = q.sort.map(_.token) val sortItems = q.sort.map(_.token)
Expand All @@ -50,4 +40,20 @@ class SortBuilder extends PlanBuilder {
def canWorkWith(plan: ExecutionPlanInProgress) = plan.query.extracted && plan.query.sort.filter(_.unsolved).nonEmpty def canWorkWith(plan: ExecutionPlanInProgress) = plan.query.extracted && plan.query.sort.filter(_.unsolved).nonEmpty


def priority: Int = PlanBuilder.Sort def priority: Int = PlanBuilder.Sort
}

trait SortingPreparations {
def extractBeforeSort(plan: ExecutionPlanInProgress): ExecutionPlanInProgress = {
val sortExpressionsToExtract: Seq[(String, Expression)] = plan.query.sort.flatMap(x => x.token.expression match {
case _: CachedExpression => None
case _: Identifier => None
case e => Some(" INTERNAL_SORT" + e.## -> e)
})

try {
ExtractBuilder.extractIfNecessary(plan, sortExpressionsToExtract.toMap)
} catch {
case e: CypherTypeException => throw new CypherTypeException(e.getMessage + " - maybe aggregation removed it?")
}
}
} }
@@ -0,0 +1,65 @@
/**
* Copyright (c) 2002-2012 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.cypher.internal.executionplan.builders

import org.neo4j.cypher.internal.pipes.TopPipe
import org.neo4j.cypher.internal.executionplan.{ExecutionPlanInProgress, PlanBuilder}
import org.neo4j.cypher.internal.commands.expressions.Add
import org.neo4j.cypher.internal.commands.Slice
import org.neo4j.helpers.ThisShouldNotHappenError

class TopPipeBuilder extends PlanBuilder with SortingPreparations {
def apply(plan: ExecutionPlanInProgress) = {
val newPlan = extractBeforeSort(plan)

val q = newPlan.query
val sortItems = q.sort.map(_.token)
val slice = q.slice.get.token
val limit = slice match {
case Slice(Some(skip), Some(l)) => Add(skip, l)
case Slice(None, Some(l)) => l
}

val resultPipe = new TopPipe(newPlan.pipe, sortItems.toList, limit)

val solvedSort = q.sort.map(_.solve)
val solvedSlice = slice match {
case Slice(Some(x), _) => Some(Unsolved(Slice(Some(x), None)))
case Slice(None, _) => None
case _ => throw new ThisShouldNotHappenError("Andres", "This builder should not be called for this query")
}

val resultQ = q.copy(sort = solvedSort, slice = solvedSlice)

plan.copy(pipe = resultPipe, query = resultQ)
}

def canWorkWith(plan: ExecutionPlanInProgress) = {
val q = plan.query
val extracted = q.extracted
val unsolvedOrdering = q.sort.filter(_.unsolved).nonEmpty
val limited = q.slice.exists(_.token.limit.nonEmpty)

extracted && unsolvedOrdering && limited
}

def priority: Int = PlanBuilder.TopX
}

Expand Up @@ -26,7 +26,7 @@ import org.neo4j.cypher.internal.Comparer
import collection.mutable.Map import collection.mutable.Map
import org.neo4j.cypher.internal.symbols.SymbolTable import org.neo4j.cypher.internal.symbols.SymbolTable


class SortPipe(source: Pipe, sortDescription: List[SortItem]) extends PipeWithSource(source) with Comparer { class SortPipe(source: Pipe, sortDescription: List[SortItem]) extends PipeWithSource(source) with ExecutionContextComparer {
def symbols = source.symbols def symbols = source.symbols


def assertTypes(symbols: SymbolTable) { def assertTypes(symbols: SymbolTable) {
Expand All @@ -39,6 +39,11 @@ class SortPipe(source: Pipe, sortDescription: List[SortItem]) extends PipeWithSo
source.createResults(state).toList. source.createResults(state).toList.
sortWith((a, b) => compareBy(a, b, sortDescription)).iterator sortWith((a, b) => compareBy(a, b, sortDescription)).iterator



override def executionPlan(): String = source.executionPlan() + "\r\nSort(" + sortDescription.mkString(",") + ")"
}

trait ExecutionContextComparer extends Comparer {
def compareBy(a: Map[String, Any], b: Map[String, Any], order: Seq[SortItem]): Boolean = order match { def compareBy(a: Map[String, Any], b: Map[String, Any], order: Seq[SortItem]): Boolean = order match {
case Nil => false case Nil => false
case head :: tail => { case head :: tail => {
Expand All @@ -53,5 +58,4 @@ class SortPipe(source: Pipe, sortDescription: List[SortItem]) extends PipeWithSo
} }
} }


override def executionPlan(): String = source.executionPlan() + "\r\nSort(" + sortDescription.mkString(",") + ")"
} }
@@ -0,0 +1,82 @@
/**
* Copyright (c) 2002-2012 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.cypher.internal.pipes

import org.neo4j.cypher.internal.commands.SortItem
import org.neo4j.cypher.internal.symbols.{NumberType, SymbolTable}
import collection.mutable.ListBuffer
import org.neo4j.cypher.internal.commands.expressions.Expression

/*
* TopPipe is used when a query does a ORDER BY ... LIMIT query. Instead of ordering the whole result set and then
* returning the matching top results, we only keep the top results in heap, which allows us to release memory earlier
*/
class TopPipe(source: Pipe, sortDescription: List[SortItem], countExpression: Expression) extends PipeWithSource(source) with ExecutionContextComparer {
def createResults(state: QueryState): Iterator[ExecutionContext] = {

var result = new ListBuffer[ExecutionContext]()
var last: Option[ExecutionContext] = None
val largerThanLast = (ctx: ExecutionContext) => last.forall(s => compareBy(s, ctx, sortDescription))
var size = 0

val input = source.createResults(state)

if (input.isEmpty)
Iterator()
else {
val first = input.next()
val count = countExpression(first).asInstanceOf[Number].intValue()

val iter = new HeadAndTail(first, input)
iter.foreach {
case ctx =>

if (size < count) {
result += ctx
size += 1

if (largerThanLast(ctx)) {
last = Some(ctx)
}
} else
if (!largerThanLast(ctx)) {
result -= last.get
result += ctx
result = result.sortWith((a, b) => compareBy(a, b, sortDescription))

last = Some(result.last)
}
}
}



result.toIterator
}

def executionPlan() = "%s\rTopPipe(ORDER BY %s LIMIT %s)".format(source.executionPlan(), sortDescription.mkString(","), countExpression)

def symbols = source.symbols

def assertTypes(symbols: SymbolTable) {
sortDescription.foreach(_.expression.assertTypes(symbols))
countExpression.evaluateType(NumberType(), symbols)
}
}
Expand Up @@ -71,7 +71,7 @@ class ColumnFilterBuilderTest extends BuilderTest {
@Test def should_not_accept_if_not_sliced() { @Test def should_not_accept_if_not_sliced() {
val q = PartiallySolvedQuery().copy( val q = PartiallySolvedQuery().copy(
extracted = true, extracted = true,
slice = Seq(Unsolved(Slice(Some(Literal(19)), None))), slice = Some(Unsolved(Slice(Some(Literal(19)), None))),
returns = Seq(Unsolved(ReturnItem(Literal("foo"), "foo"))) returns = Seq(Unsolved(ReturnItem(Literal("foo"), "foo")))
) )


Expand Down
Expand Up @@ -32,7 +32,7 @@ class SliceBuilderTest extends BuilderTest {


@Test def should_accept_if_all_work_is_done_and_sorting_not_yet() { @Test def should_accept_if_all_work_is_done_and_sorting_not_yet() {
val q = PartiallySolvedQuery().copy( val q = PartiallySolvedQuery().copy(
slice = Seq(Unsolved(Slice(Some(Literal(10)), Some(Literal(10))))), slice = Some(Unsolved(Slice(Some(Literal(10)), Some(Literal(10))))),
extracted = true extracted = true
) )


Expand All @@ -47,7 +47,7 @@ class SliceBuilderTest extends BuilderTest {


@Test def should_not_accept_if_not_yet_sorted() { @Test def should_not_accept_if_not_yet_sorted() {
val q = PartiallySolvedQuery().copy( val q = PartiallySolvedQuery().copy(
slice = Seq(Unsolved(Slice(Some(Literal(10)), Some(Literal(10))))), slice = Some(Unsolved(Slice(Some(Literal(10)), Some(Literal(10))))),
extracted = true, extracted = true,
sort = Seq(Unsolved(SortItem(Literal(1), true))) sort = Seq(Unsolved(SortItem(Literal(1), true)))
) )
Expand Down
Expand Up @@ -22,7 +22,7 @@ package org.neo4j.cypher.internal.pipes
import org.neo4j.cypher.internal.symbols.{SymbolTable, CypherType} import org.neo4j.cypher.internal.symbols.{SymbolTable, CypherType}
import collection.Map import collection.Map


class FakePipe(data: Iterator[Map[String, Any]], identifiers: (String, CypherType)*) extends Pipe { class FakePipe(val data: Iterator[Map[String, Any]], identifiers: (String, CypherType)*) extends Pipe {


def this(data: Traversable[Map[String, Any]], identifiers: (String, CypherType)*) = this(data.toIterator, identifiers:_*) def this(data: Traversable[Map[String, Any]], identifiers: (String, CypherType)*) = this(data.toIterator, identifiers:_*)


Expand Down
@@ -0,0 +1,69 @@
/**
* Copyright (c) 2002-2012 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.cypher.internal.pipes

import org.scalatest.Assertions
import org.junit.Test
import org.neo4j.cypher.internal.commands.SortItem
import org.neo4j.cypher.internal.commands.expressions.{Literal, Identifier}
import org.neo4j.cypher.internal.symbols.IntegerType


class TopPipeTest extends Assertions {
@Test def top10From5ReturnsAll() {
val input = createFakePipeWith(5)
val pipe = new TopPipe(input, List(SortItem(Identifier("a"), ascending = true)), Literal(10))
val result = pipe.createResults(QueryState()).map(ctx => ctx("a")).toList

assert(result === List(0, 1, 2, 3, 4))
}

@Test def top5From10ReturnsAll() {
val input = createFakePipeWith(10)
val pipe = new TopPipe(input, List(SortItem(Identifier("a"), ascending = true)), Literal(5))
val result = pipe.createResults(QueryState()).map(ctx => ctx("a")).toList

assert(result === List(0, 1, 2, 3, 4))
}

@Test def reversedTop5From10ReturnsAll() {
val in = (0 until 100).toSeq.map(i => Map("a" -> i)).reverse
val input = new FakePipe(in, "a" -> IntegerType())

val pipe = new TopPipe(input, List(SortItem(Identifier("a"), ascending = true)), Literal(5))
val result = pipe.createResults(QueryState()).map(ctx => ctx("a")).toList

assert(result === List(0, 1, 2, 3, 4))
}

@Test def emptyInputIsNotAProblem() {
val input = new FakePipe(Iterator(), "a" -> IntegerType())

val pipe = new TopPipe(input, List(SortItem(Identifier("a"), ascending = true)), Literal(5))
val result = pipe.createResults(QueryState()).map(ctx => ctx("a")).toList

assert(result === List())
}

private def createFakePipeWith(count: Int): FakePipe = {
val in = (0 until count).toSeq.map(i => Map("a" -> i))
new FakePipe(in, "a" -> IntegerType())
}
}

0 comments on commit ab728fe

Please sign in to comment.