Skip to content

Commit

Permalink
Major refactoring to make certain Cypher is more lazy
Browse files Browse the repository at this point in the history
  • Loading branch information
systay committed Nov 3, 2012
1 parent b23f934 commit 5c592e9
Show file tree
Hide file tree
Showing 36 changed files with 524 additions and 161 deletions.
1 change: 1 addition & 0 deletions cypher/CHANGES.txt
Expand Up @@ -2,6 +2,7 @@
------------------- -------------------
o The traversal pattern matcher now supports variable length relationship patterns o The traversal pattern matcher now supports variable length relationship patterns
o Fixes #946 - HAS(...) fails with ThisShouldNotHappenException for some patterns o Fixes #946 - HAS(...) fails with ThisShouldNotHappenException for some patterns
o Major refactoring to make certain Cypher is more lazy


1.9.M01 (2012-10-23) 1.9.M01 (2012-10-23)
-------------------- --------------------
Expand Down
Expand Up @@ -23,25 +23,26 @@ import internal.pipes.QueryState
import org.neo4j.graphdb.GraphDatabaseService import org.neo4j.graphdb.GraphDatabaseService
import collection.Map import collection.Map


class EagerPipeExecutionResult(r: => Traversable[Map[String, Any]], class EagerPipeExecutionResult(result: Iterator[Map[String, Any]],
columns: List[String], columns: List[String],
state: QueryState, state: QueryState,
db: GraphDatabaseService) db: GraphDatabaseService)
extends PipeExecutionResult(r, columns) { extends PipeExecutionResult(result, columns) {


override lazy val queryStatistics = QueryStatistics( val (eagerResult,timeTaken) = super.createTimedResults
nodesCreated = state.createdNodes.count, lazy val inner = eagerResult.iterator
relationshipsCreated = state.createdRelationships.count,
propertiesSet = state.propertySet.count,
deletedNodes = state.deletedNodes.count,
deletedRelationships = state.deletedRelationships.count)


override val createTimedResults = { override def next() = inner.next().toMap
val start = System.currentTimeMillis() override def hasNext = inner.hasNext
val eagerResult = immutableResult.toList


val ms = System.currentTimeMillis() - start override def queryStatistics = {

QueryStatistics(
(eagerResult, ms.toString) nodesCreated = state.createdNodes.count,
relationshipsCreated = state.createdRelationships.count,
propertiesSet = state.propertySet.count,
deletedNodes = state.deletedNodes.count,
deletedRelationships = state.deletedRelationships.count)
} }

override def createTimedResults = (eagerResult,timeTaken)
} }
22 changes: 9 additions & 13 deletions cypher/src/main/scala/org/neo4j/cypher/PipeExecutionResult.scala
Expand Up @@ -27,23 +27,21 @@ import java.io.{StringWriter, PrintWriter}
import collection.Map import collection.Map
import collection.immutable.{Map => ImmutableMap} import collection.immutable.{Map => ImmutableMap}


class PipeExecutionResult(r: => Traversable[Map[String, Any]], val columns: List[String]) class PipeExecutionResult(result: Iterator[Map[String, Any]], val columns: List[String])
extends ExecutionResult extends ExecutionResult
with StringExtras with StringExtras
with CollectionSupport with CollectionSupport
with StringHelper { with StringHelper {


lazy val immutableResult = r.map(m => m.toMap)

def javaColumns: java.util.List[String] = columns.asJava def javaColumns: java.util.List[String] = columns.asJava


def javaColumnAs[T](column: String): java.util.Iterator[T] = columnAs[T](column).map(x => makeValueJavaCompatible(x).asInstanceOf[T]).asJava def javaColumnAs[T](column: String): java.util.Iterator[T] = columnAs[T](column).map(x => makeValueJavaCompatible(x).asInstanceOf[T]).asJava


def columnAs[T](column: String): Iterator[T] = { def columnAs[T](column: String): Iterator[T] = map {
this.map(m => { case m => {
val item: Any = m.getOrElse(column, throw new EntityNotFoundException("No column named '" + column + "' was found. Found: " + m.keys.mkString("(\"", "\", \"", "\")"))) val item: Any = m.getOrElse(column, throw new EntityNotFoundException("No column named '" + column + "' was found. Found: " + m.keys.mkString("(\"", "\", \"", "\")")))
item.asInstanceOf[T] item.asInstanceOf[T]
}).toIterator }
} }


private def makeValueJavaCompatible(value: Any): Any = value match { private def makeValueJavaCompatible(value: Any): Any = value match {
Expand All @@ -69,9 +67,9 @@ class PipeExecutionResult(r: => Traversable[Map[String, Any]], val columns: List
columnSizes.toMap columnSizes.toMap
} }


protected def createTimedResults = { protected def createTimedResults: (List[Map[String, Any]], String) = {
val start = System.currentTimeMillis() val start = System.currentTimeMillis()
val eagerResult = immutableResult.toList val eagerResult = result.toList
val ms = System.currentTimeMillis() - start val ms = System.currentTimeMillis() - start


(eagerResult, ms.toString) (eagerResult, ms.toString)
Expand Down Expand Up @@ -137,12 +135,10 @@ class PipeExecutionResult(r: => Traversable[Map[String, Any]], val columns: List
}).mkString("| ", " | ", " |") }).mkString("| ", " | ", " |")
} }


lazy val iterator = immutableResult.toIterator def hasNext: Boolean = result.hasNext

def hasNext: Boolean = iterator.hasNext


def next(): ImmutableMap[String, Any] = iterator.next() def next(): ImmutableMap[String, Any] = result.next().toMap


lazy val queryStatistics = QueryStatistics.empty def queryStatistics = QueryStatistics.empty
} }


Expand Up @@ -124,7 +124,8 @@ class ExecutionPlanImpl(inputQuery: Query, graph: GraphDatabaseService) extends
private def getLazyReadonlyQuery(pipe: Pipe, columns: List[String]): Map[String, Any] => ExecutionResult = { private def getLazyReadonlyQuery(pipe: Pipe, columns: List[String]): Map[String, Any] => ExecutionResult = {
val func = (params: Map[String, Any]) => { val func = (params: Map[String, Any]) => {
val state = new QueryState(graph, params) val state = new QueryState(graph, params)
new PipeExecutionResult(pipe.createResults(state), columns) val results = pipe.createResults(state)
new PipeExecutionResult(results, columns)
} }


func func
Expand All @@ -133,7 +134,8 @@ class ExecutionPlanImpl(inputQuery: Query, graph: GraphDatabaseService) extends
private def getEagerReadWriteQuery(pipe: Pipe, columns: List[String]): Map[String, Any] => ExecutionResult = { private def getEagerReadWriteQuery(pipe: Pipe, columns: List[String]): Map[String, Any] => ExecutionResult = {
val func = (params: Map[String, Any]) => { val func = (params: Map[String, Any]) => {
val state = new QueryState(graph, params) val state = new QueryState(graph, params)
new EagerPipeExecutionResult(pipe.createResults(state), columns, state, graph) val results = pipe.createResults(state)
new EagerPipeExecutionResult(results, columns, state, graph)
} }


func func
Expand Down
Expand Up @@ -34,11 +34,11 @@ class CommitPipe(source: Pipe, graph: GraphDatabaseService) extends PipeWithSour
} }
try { try {
try { try {
val result = source.createResults(state).toList val result = source.createResults(state).toList.iterator
tx.success() tx.success()
result result
} catch { } catch {
case e => { case e: Throwable => {
tx.failure() tx.failure()
throw e throw e
} }
Expand Down
Expand Up @@ -44,7 +44,7 @@ class EagerAggregationPipe(source: Pipe, val keyExpressions: Map[String, Express
new SymbolTable(keyIdentifiers ++ aggrIdentifiers) new SymbolTable(keyIdentifiers ++ aggrIdentifiers)
} }


def createResults(state: QueryState): Traversable[ExecutionContext] = { def createResults(state: QueryState) = {
// This is the temporary storage used while the aggregation is going on // This is the temporary storage used while the aggregation is going on
val result = MutableMap[NiceHasher, (ExecutionContext, Seq[AggregationFunction])]() val result = MutableMap[NiceHasher, (ExecutionContext, Seq[AggregationFunction])]()
val keyNames: Seq[String] = keyExpressions.map(_._1).toSeq val keyNames: Seq[String] = keyExpressions.map(_._1).toSeq
Expand All @@ -62,13 +62,13 @@ class EagerAggregationPipe(source: Pipe, val keyExpressions: Map[String, Express
ctx.newFrom(newMap) ctx.newFrom(newMap)
} }


def createEmptyResult(params:Map[String,Any]): Traversable[ExecutionContext] = { def createEmptyResult(params:Map[String,Any]): Iterator[ExecutionContext] = {
val newMap = MutableMaps.empty val newMap = MutableMaps.empty
val aggregationNamesAndFunctions = aggregationNames zip aggregations.map(_._2.createAggregationFunction.result) val aggregationNamesAndFunctions = aggregationNames zip aggregations.map(_._2.createAggregationFunction.result)


aggregationNamesAndFunctions.toMap aggregationNamesAndFunctions.toMap
.foreach { case (name, zeroValue) => newMap += name -> zeroValue } .foreach { case (name, zeroValue) => newMap += name -> zeroValue }
Traversable(ExecutionContext(newMap, params = params)) Iterator(ExecutionContext(newMap, params = params))
} }




Expand All @@ -79,15 +79,13 @@ class EagerAggregationPipe(source: Pipe, val keyExpressions: Map[String, Express
functions.foreach(func => func(ctx)) functions.foreach(func => func(ctx))
}) })


val a = if (result.isEmpty && keyNames.isEmpty) { if (result.isEmpty && keyNames.isEmpty) {
createEmptyResult(state.params) createEmptyResult(state.params)
} else { } else {
result.map { result.map {
case (key, (ctx, aggregator)) => createResults(key, aggregator, ctx) case (key, (ctx, aggregator)) => createResults(key, aggregator, ctx)
} }.toIterator
} }

a
} }


override def executionPlan(): String = source.executionPlan() + "\r\n" + "EagerAggregation( keys: [" + oldKeyExpressions.mkString(", ") + "], aggregates: [" + aggregations.mkString(", ") + "])" override def executionPlan(): String = source.executionPlan() + "\r\n" + "EagerAggregation( keys: [" + oldKeyExpressions.mkString(", ") + "], aggregates: [" + aggregations.mkString(", ") + "])"
Expand Down
Expand Up @@ -25,13 +25,14 @@ class EmptyResultPipe(source: Pipe)
extends PipeWithSource(source) { extends PipeWithSource(source) {


def createResults(state: QueryState) = { def createResults(state: QueryState) = {
source.createResults(state) val iter = source.createResults(state)
while(iter.hasNext) {
iter.next()
}


Seq() Iterator()
} }


// def symbols = new SymbolTable()

override def executionPlan(): String = source.executionPlan() + "\nEmptyResult()" override def executionPlan(): String = source.executionPlan() + "\nEmptyResult()"


def dependencies = Seq() def dependencies = Seq()
Expand Down
Expand Up @@ -23,7 +23,7 @@ import java.lang.String
import org.neo4j.cypher.internal.symbols.SymbolTable import org.neo4j.cypher.internal.symbols.SymbolTable


class ParameterPipe() extends Pipe { class ParameterPipe() extends Pipe {
def createResults(state: QueryState) = Seq(ExecutionContext(params = state.params)) def createResults(state: QueryState) = Iterator(ExecutionContext(params = state.params))


val symbols = new SymbolTable() val symbols = new SymbolTable()


Expand Down
Expand Up @@ -38,15 +38,15 @@ import java.util.concurrent.atomic.AtomicInteger
* the execute the query. * the execute the query.
*/ */
trait Pipe { trait Pipe {
def createResults(state: QueryState): Traversable[ExecutionContext] def createResults(state: QueryState): Iterator[ExecutionContext]


def symbols: SymbolTable def symbols: SymbolTable


def executionPlan(): String def executionPlan(): String
} }


class NullPipe extends Pipe { class NullPipe extends Pipe {
def createResults(state: QueryState) = Seq(ExecutionContext.empty) def createResults(state: QueryState) = Seq(ExecutionContext.empty).toIterator


def symbols: SymbolTable = new SymbolTable() def symbols: SymbolTable = new SymbolTable()


Expand Down
Expand Up @@ -20,31 +20,30 @@
package org.neo4j.cypher.internal.pipes package org.neo4j.cypher.internal.pipes


import org.neo4j.cypher.internal.commands.expressions.Expression import org.neo4j.cypher.internal.commands.expressions.Expression
import java.lang.String
import org.neo4j.helpers.ThisShouldNotHappenError import org.neo4j.helpers.ThisShouldNotHappenError
import collection.mutable.Map


class SlicePipe(source:Pipe, skip:Option[Expression], limit:Option[Expression]) extends Pipe { class SlicePipe(source:Pipe, skip:Option[Expression], limit:Option[Expression]) extends Pipe {
// val symbols = source.symbols
val symbols = source.symbols val symbols = source.symbols


//TODO: Make this nicer. I'm sure it's expensive and silly. def createResults(state: QueryState) : Iterator[ExecutionContext] = {
def createResults(state: QueryState): Traversable[ExecutionContext] = {
val sourceTraversable = source.createResults(state) val sourceTraversable = source.createResults(state)


if(sourceTraversable.isEmpty) if(sourceTraversable.isEmpty)
return Seq() return Iterator()

val first: ExecutionContext = sourceTraversable.next()


val first: ExecutionContext = sourceTraversable.head val sourceIter = new HeadAndTail[ExecutionContext](first, sourceTraversable)


def asInt(v:Expression)=v(first).asInstanceOf[Int] def asInt(v:Expression)=v(first).asInstanceOf[Int]


(skip, limit) match { (skip, limit) match {
case (Some(x), None) => sourceTraversable.drop(asInt(x)) case (Some(x), None) => sourceIter.drop(asInt(x))
case (None, Some(x)) => sourceTraversable.take(asInt(x)) case (None, Some(x)) => sourceIter.take(asInt(x))
case (Some(startAt), Some(count)) => { case (Some(startAt), Some(count)) => {
val start = asInt(startAt) val start = asInt(startAt)
sourceTraversable.slice(start, start + asInt(count)) sourceIter.slice(start, start + asInt(count))
} }
case (None, None)=>throw new ThisShouldNotHappenError("Andres Taylor", "A slice pipe that doesn't slice should never exist.") case (None, None)=>throw new ThisShouldNotHappenError("Andres Taylor", "A slice pipe that doesn't slice should never exist.")
} }
Expand All @@ -60,4 +59,18 @@ class SlicePipe(source:Pipe, skip:Option[Expression], limit:Option[Expression])
} }
source.executionPlan() + "\r\n" + "Slice(" + info + ")" source.executionPlan() + "\r\n" + "Slice(" + info + ")"
} }
}

class HeadAndTail[T](head:T, tail:Iterator[T]) extends Iterator[T] {
var usedHead = false
def headUnused = !usedHead

def hasNext = headUnused || tail.hasNext

def next() = if (headUnused) {
usedHead = true
head
} else {
tail.next()
}
} }
Expand Up @@ -35,7 +35,9 @@ class SortPipe(source: Pipe, sortDescription: List[SortItem]) extends PipeWithSo
} }
} }


def createResults(state:QueryState) = source.createResults(state).toList.sortWith((a, b) => compareBy(a, b, sortDescription)) def createResults(state:QueryState) =
source.createResults(state).toList.
sortWith((a, b) => compareBy(a, b, sortDescription)).iterator


def compareBy(a: Map[String, Any], b: Map[String, Any], order: Seq[SortItem]): Boolean = order match { def compareBy(a: Map[String, Any], b: Map[String, Any], order: Seq[SortItem]): Boolean = order match {
case Nil => false case Nil => false
Expand Down
Expand Up @@ -30,14 +30,13 @@ abstract class StartPipe[T <: PropertyContainer](inner: Pipe, name: String, crea


val symbols = inner.symbols.add(name, identifierType) val symbols = inner.symbols.add(name, identifierType)


def createResults(state: QueryState): Traversable[ExecutionContext] = { def createResults(state: QueryState) = {
val map = inner.createResults(state).flatMap(ctx => { inner.createResults(state).flatMap(ctx => {
val source: Iterable[T] = createSource(ctx) val source: Iterable[T] = createSource(ctx)
source.map(x => { source.map(x => {
ctx.newWith(name -> x) ctx.newWith(name -> x)
}) })
}) })
map
} }


def visibleName: String def visibleName: String
Expand Down

0 comments on commit 5c592e9

Please sign in to comment.