Permalink
Browse files

Made update statistics go through a monitor

  • Loading branch information...
1 parent f825088 commit fea22806e0ffb983dbe0e3bd03dd7e7f4adb31e0 @systay committed Oct 27, 2012
Showing with 172 additions and 77 deletions.
  1. +1 −6 cypher/src/main/scala/org/neo4j/cypher/EagerPipeExecutionResult.scala
  2. +2 −2 cypher/src/main/scala/org/neo4j/cypher/ExecutionEngine.scala
  3. +4 −3 cypher/src/main/scala/org/neo4j/cypher/internal/commands/StartItem.scala
  4. +6 −5 cypher/src/main/scala/org/neo4j/cypher/internal/executionplan/ExecutionPlanImpl.scala
  5. +2 −2 cypher/src/main/scala/org/neo4j/cypher/internal/mutation/DeleteEntityAction.scala
  6. +1 −1 cypher/src/main/scala/org/neo4j/cypher/internal/mutation/DeletePropertyAction.scala
  7. +2 −2 cypher/src/main/scala/org/neo4j/cypher/internal/mutation/MapPropertySetAction.scala
  8. +1 −1 cypher/src/main/scala/org/neo4j/cypher/internal/mutation/PropertySetAction.scala
  9. +2 −2 cypher/src/main/scala/org/neo4j/cypher/internal/mutation/UpdateAction.scala
  10. +84 −6 cypher/src/main/scala/org/neo4j/cypher/internal/pipes/Pipe.scala
  11. +20 −0 cypher/src/test/scala/org/neo4j/cypher/ExecutionEngineHelper.scala
  12. +2 −1 cypher/src/test/scala/org/neo4j/cypher/internal/executionplan/ExecutionPlanImplTest.scala
  13. +1 −1 cypher/src/test/scala/org/neo4j/cypher/internal/mutation/CreateNodeActionTest.scala
  14. +2 −2 cypher/src/test/scala/org/neo4j/cypher/internal/mutation/DoubleCheckCreateUniqueTest.scala
  15. +9 −9 cypher/src/test/scala/org/neo4j/cypher/internal/mutation/MapPropertySetActionTest.scala
  16. +1 −1 cypher/src/test/scala/org/neo4j/cypher/internal/pipes/AllShortestPathsPipeTest.scala
  17. +1 −1 cypher/src/test/scala/org/neo4j/cypher/internal/pipes/ColumnFilterPipeTest.scala
  18. +2 −2 cypher/src/test/scala/org/neo4j/cypher/internal/pipes/EagerAggregationPipeTest.scala
  19. +19 −20 cypher/src/test/scala/org/neo4j/cypher/internal/pipes/MutationTest.scala
  20. +2 −2 cypher/src/test/scala/org/neo4j/cypher/internal/pipes/NamedPathPipeTest.scala
  21. +1 −1 cypher/src/test/scala/org/neo4j/cypher/internal/pipes/SingleShortestPathPipeTest.scala
  22. +5 −5 cypher/src/test/scala/org/neo4j/cypher/internal/pipes/SortPipeTest.scala
  23. +2 −2 cypher/src/test/scala/org/neo4j/cypher/internal/pipes/matching/TraversalMatcherTest.scala
@@ -29,12 +29,7 @@ class EagerPipeExecutionResult(r: => Traversable[Map[String, Any]],
db: GraphDatabaseService)
extends PipeExecutionResult(r, columns) {
- override lazy val queryStatistics = QueryStatistics(
- nodesCreated = state.createdNodes.count,
- relationshipsCreated = state.createdRelationships.count,
- propertiesSet = state.propertySet.count,
- deletedNodes = state.deletedNodes.count,
- deletedRelationships = state.deletedRelationships.count)
+ override lazy val queryStatistics = state.updateCounter.toStats
override val createTimedResults = {
val start = System.currentTimeMillis()
@@ -62,7 +62,7 @@ class ExecutionEngine(graph: GraphDatabaseService) {
@throws(classOf[SyntaxException])
def prepare(query: String): ExecutionPlan =
- executionPlanCache.getOrElseUpdate(query, new ExecutionPlanImpl(parser.parse(query), graph))
+ executionPlanCache.getOrElseUpdate(query, new ExecutionPlanImpl(parser.parse(query), graph, monitors))
@throws(classOf[SyntaxException])
@deprecated(message = "You should not parse queries manually any more. Use the execute(String) instead")
@@ -74,7 +74,7 @@ class ExecutionEngine(graph: GraphDatabaseService) {
@throws(classOf[SyntaxException])
@deprecated(message = "You should not parse queries manually any more. Use the execute(String) instead")
- def execute(query: Query, params: Map[String, Any]): ExecutionResult = new ExecutionPlanImpl(query, graph).execute(params)
+ def execute(query: Query, params: Map[String, Any]): ExecutionResult = new ExecutionPlanImpl(query, graph, monitors).execute(params)
private def checkScalaVersion() {
if (util.Properties.versionString.matches("^version 2.9.0")) {
@@ -77,13 +77,13 @@ case class CreateNodeStartItem(key: String, props: Map[String, Expression])
case (k, v) => (k -> Literal(v))
}
val node = db.createNode()
- state.createdNodes.increase()
+ state.updateCounter.createdNode()
setProperties(node, m, context, state)
context.newWith(key -> node)
})
} else {
val node = db.createNode()
- state.createdNodes.increase()
+ state.updateCounter.createdNode()
setProperties(node, props, context, state)
Stream(context.newWith(key -> node))
@@ -121,7 +121,8 @@ case class CreateRelationshipStartItem(key: String,
val f = from._1(context).asInstanceOf[Node]
val t = to._1(context).asInstanceOf[Node]
val relationship = f.createRelationshipTo(t, relationshipType)
- state.createdRelationships.increase()
+ state.updateCounter.createdRelationship()
+
setProperties(relationship, props, context, state)
context.put(key, relationship)
Stream(context)
@@ -25,9 +25,12 @@ import collection.Seq
import org.neo4j.cypher.internal.pipes._
import org.neo4j.cypher._
import internal.commands._
+import internal.statistics.Monitors
import internal.symbols.{NodeType, RelationshipType, SymbolTable}
-class ExecutionPlanImpl(inputQuery: Query, graph: GraphDatabaseService) extends ExecutionPlan with PatternGraphBuilder {
+class ExecutionPlanImpl(inputQuery: Query, graph: GraphDatabaseService, monitors: Monitors)
+ extends ExecutionPlan with PatternGraphBuilder {
+
val (executionPlan, executionPlanText) = prepareExecutionPlan()
def execute(params: Map[String, Any]): ExecutionResult = executionPlan(params)
@@ -123,7 +126,7 @@ class ExecutionPlanImpl(inputQuery: Query, graph: GraphDatabaseService) extends
private def getLazyReadonlyQuery(pipe: Pipe, columns: List[String]): Map[String, Any] => ExecutionResult = {
val func = (params: Map[String, Any]) => {
- val state = new QueryState(graph, params)
+ val state = new QueryState(graph, params, None, monitors.newMonitor(classOf[UpdatingMonitor]))
new PipeExecutionResult(pipe.createResults(state), columns)
}
@@ -132,16 +135,14 @@ class ExecutionPlanImpl(inputQuery: Query, graph: GraphDatabaseService) extends
private def getEagerReadWriteQuery(pipe: Pipe, columns: List[String]): Map[String, Any] => ExecutionResult = {
val func = (params: Map[String, Any]) => {
- val state = new QueryState(graph, params)
+ val state = new QueryState(graph, params, None, monitors.newMonitor(classOf[UpdatingMonitor]))
new EagerPipeExecutionResult(pipe.createResults(state), columns, state, graph)
}
func
}
private def produceAndThrowException(plan: ExecutionPlanInProgress) {
- val s = plan.pipe.symbols
-
val errors = builders.flatMap(builder => builder.missingDependencies(plan).map(builder -> _)).toList.
sortBy {
case (builder, _) => builder.priority
@@ -47,11 +47,11 @@ case class DeleteEntityAction(elementToDelete: Expression)
x match {
case n: Node if (!nodeManager.isDeleted(n)) =>
- state.deletedNodes.increase()
+ state.updateCounter.deletedNode()
n.delete()
case r: Relationship if (!nodeManager.isDeleted(r))=>
- state.deletedRelationships.increase()
+ state.updateCounter.deletedRelationship()
r.delete()
case _ => // Entity is already deleted. No need to do anything
@@ -31,7 +31,7 @@ case class DeletePropertyAction(element: Expression, property: String)
val entity = element(context).asInstanceOf[PropertyContainer]
if (entity.hasProperty(property)) {
entity.removeProperty(property)
- state.propertySet.increase()
+ state.updateCounter.setProperty()
}
Stream(context)
@@ -49,7 +49,7 @@ case class MapPropertySetAction(element: Expression, mapExpression: Expression)
/*Set all map values on the property container*/
map.foreach(kv => {
- state.propertySet.increase()
+ state.updateCounter.setProperty()
kv match {
case (k, v) =>
@@ -65,7 +65,7 @@ case class MapPropertySetAction(element: Expression, mapExpression: Expression)
case k if map.contains(k) => //Do nothing
case k =>
pc.removeProperty(k)
- state.propertySet.increase()
+ state.updateCounter.setProperty()
}
Stream(context)
@@ -37,7 +37,7 @@ case class PropertySetAction(prop: Property, e: Expression)
case _ => entity.setProperty(propertyKey, value)
}
- state.propertySet.increase()
+ state.updateCounter.setProperty()
Stream(context)
}
@@ -66,15 +66,15 @@ trait GraphElementPropertyFunctions extends CollectionSupport {
map.foreach {
case (key, value) => {
pc.setProperty(key, value)
- state.propertySet.increase()
+ state.updateCounter.setProperty()
}
}
}
private def setSingleValue(expression: Expression, context: ExecutionContext, pc: PropertyContainer, key: String, state: QueryState) {
val value = makeValueNeoSafe(expression(context))
pc.setProperty(key, value)
- state.propertySet.increase()
+ state.updateCounter.setProperty()
}
def makeValueNeoSafe(a: Any): Any = if (isCollection(a)) {
@@ -28,8 +28,9 @@ import collection.mutable.{Queue, Map => MutableMap}
import scala.collection.JavaConverters._
import java.util.HashMap
import org.neo4j.kernel.GraphDatabaseAPI
-import org.neo4j.cypher.ParameterNotFoundException
+import org.neo4j.cypher.{QueryStatistics, ParameterNotFoundException}
import java.util.concurrent.atomic.AtomicInteger
+import org.neo4j.cypher.internal.commands.Query
/**
* Pipe is a central part of Cypher. Most pipes are decorators - they
@@ -69,25 +70,102 @@ object MutableMaps {
}
}
-object QueryState {
- def apply() = new QueryState(null, Map.empty)
+
+trait UpdatingMonitor {
+ def createdNode(q: Query)
+
+ def deletedNode(q: Query)
+
+ def createdRelationship(q: Query)
+
+ def deletedRelationship(q: Query)
+
+ def setProperty(q: Query)
}
-class QueryState(val db: GraphDatabaseService,
- val params: Map[String, Any],
- var transaction: Option[Transaction] = None) {
+class UpdatingCounter(inner: UpdatingMonitor, q: Query) {
val createdNodes = new Counter
val createdRelationships = new Counter
val propertySet = new Counter
val deletedNodes = new Counter
val deletedRelationships = new Counter
+ def createdNode() {
+ createdNodes.increase()
+ inner.createdNode(q)
+ }
+
+ def createdRelationship() {
+ createdRelationships.increase()
+ inner.createdRelationship(q)
+ }
+
+ def deletedNode() {
+ deletedNodes.increase()
+ inner.deletedNode(q)
+ }
+
+ def deletedRelationship() {
+ deletedRelationships.increase()
+ inner.deletedRelationship(q)
+ }
+
+ def setProperty() {
+ propertySet.increase()
+ inner.setProperty(q)
+ }
+
+ def toStats = QueryStatistics(
+ nodesCreated = createdNodes.count,
+ relationshipsCreated = createdRelationships.count,
+ propertiesSet = propertySet.count,
+ deletedNodes = deletedNodes.count,
+ deletedRelationships = deletedRelationships.count)
+
+}
+
+class NullMonitor extends UpdatingMonitor {
+ def createdNode(q: Query) {}
+
+ def createdRelationship(q: Query) {}
+
+ def deletedNode(q: Query) {}
+
+ def deletedRelationship(q: Query) {}
+
+ def setProperty(q: Query) {}
+}
+
+object QueryState {
+ def forTest(db: GraphDatabaseService = null) = new QueryState(db, Map.empty, None, new NullMonitor)
+}
+
+class QueryState(val db: GraphDatabaseService,
+ val params: Map[String, Any],
+ var transaction: Option[Transaction] = None,
+ val monitor: UpdatingMonitor) {
+
+ val updateCounter = new UpdatingCounter(monitor, null)
+
def graphDatabaseAPI: GraphDatabaseAPI = if (db.isInstanceOf[GraphDatabaseAPI])
db.asInstanceOf[GraphDatabaseAPI]
else
throw new IllegalStateException("Graph database does not implement GraphDatabaseAPI")
}
+trait QueryUpdatesMonitor {
+ def createdNode(queryId: Int)
+
+ def createdRelationship(queryId: Int)
+
+ def setProperty(queryId: Int)
+
+ def deletedNode(queryId: Int)
+
+ def deletedRelationship(queryId: Int)
+}
+
+
class Counter {
private val counter: AtomicInteger = new AtomicInteger()
@@ -20,7 +20,9 @@
package org.neo4j.cypher
import internal.commands.Query
+import internal.pipes.QueryState
import org.junit.Before
+import org.scalatest.Assertions
trait ExecutionEngineHelper extends GraphDatabaseTestBase {
@@ -51,4 +53,22 @@ trait ExecutionEngineHelper extends GraphDatabaseTestBase {
case x => fail(x.toString())
}
+
+
+}
+
+trait StatsAssertions extends Assertions {
+ protected def assertStats(state:QueryState,
+ createdNode: Int = 0,
+ deletedNode: Int = 0,
+ createdRelationship: Int = 0,
+ deletedRelationship: Int = 0,
+ setProperty: Int = 0) {
+ val stats = state.updateCounter.toStats
+ assert(stats.nodesCreated === createdNode, "Nodes created didn't match")
+ assert(stats.deletedNodes === deletedNode, "Nodes deleted didn't match")
+ assert(stats.relationshipsCreated === createdRelationship, "Relationships created didn't match")
+ assert(stats.deletedRelationships === deletedRelationship, "Relationship deleted didn't match")
+ assert(stats.propertiesSet === setProperty, "Properties set didn't match")
+ }
}
@@ -28,6 +28,7 @@ import org.scalatest.Assertions
import org.neo4j.cypher.InternalException
import actors.threadpool.{ExecutionException, TimeUnit, Executors}
import org.neo4j.cypher.internal.commands.expressions.Identifier
+import org.neo4j.cypher.internal.statistics.Monitors
class ExecutionPlanImplTest extends Assertions with Timed {
@Test def should_not_go_into_never_ending_loop() {
@@ -42,7 +43,7 @@ class ExecutionPlanImplTest extends Assertions with Timed {
}
}
-class FakeEPI(q: Query, gds: GraphDatabaseService) extends ExecutionPlanImpl(q, gds) {
+class FakeEPI(q: Query, gds: GraphDatabaseService) extends ExecutionPlanImpl(q, gds, new Monitors()) {
override lazy val builders = Seq(new BadBuilder)
}
@@ -32,7 +32,7 @@ class CreateNodeActionTest extends ExecutionEngineHelper with Assertions {
val action = CreateNodeStartItem("id", Map("*" -> Literal(Map("name" -> "Andres", "age" -> 37))))
val tx = graph.beginTx()
- action.exec(ExecutionContext.empty, new QueryState(graph, Map.empty))
+ action.exec(ExecutionContext.empty, QueryState.forTest(graph))
tx.success()
tx.finish()
@@ -25,7 +25,7 @@ import org.neo4j.test.ImpermanentGraphDatabase
import java.lang.Iterable
import org.neo4j.graphdb.Traverser.Order
import org.neo4j.graphdb._
-import org.neo4j.cypher.internal.pipes.{ExecutionContext, QueryState}
+import org.neo4j.cypher.internal.pipes.{NullMonitor, ExecutionContext, QueryState}
import collection.JavaConverters._
/*
@@ -68,7 +68,7 @@ class DoubleCheckCreateUniqueTest extends Assertions {
}
private def createQueryState(tx: Transaction): QueryState = {
- new QueryState(db, Map.empty, Some(tx))
+ new QueryState(db, Map.empty, Some(tx), new NullMonitor)
}
private def createRel(node:Node) {
Oops, something went wrong.

0 comments on commit fea2280

Please sign in to comment.