Skip to content

Commit

Permalink
Added warning for unwanted cartesian products.
Browse files Browse the repository at this point in the history
This commit also adds the minimal infrastructural changes needed to propagate notifications.
- Added `Iterable<Notification> getNotifications`
- Created `NotificationLogger` that is sent around and records notifications
- Added `notifications: [...]` to json-response
  • Loading branch information
pontusmelke committed Mar 13, 2015
1 parent be16ffa commit 8d43f8f
Show file tree
Hide file tree
Showing 41 changed files with 868 additions and 101 deletions.
Expand Up @@ -19,7 +19,7 @@
*/
package org.neo4j.cypher.internal.compiler.v2_3

import org.neo4j.cypher.internal.LRUCache
import org.neo4j.cypher.internal.{ExecutionMode, LRUCache}
import org.neo4j.cypher.internal.compiler.v2_3.ast.Statement
import org.neo4j.cypher.internal.compiler.v2_3.ast.rewriters.{normalizeReturnClauses, normalizeWithClauses}
import org.neo4j.cypher.internal.compiler.v2_3.executionplan._
Expand Down Expand Up @@ -61,11 +61,13 @@ trait CypherCacheMonitor[T, E] extends CypherCacheHitMonitor[T] with CypherCache
trait AstCacheMonitor extends CypherCacheMonitor[Statement, CacheAccessor[Statement, ExecutionPlan]]

object CypherCompilerFactory {
val monitorTag = "cypher2.2"
val monitorTag = "cypher2.3"

def costBasedCompiler(graph: GraphDatabaseService, queryCacheSize: Int, statsDivergenceThreshold: Double,
queryPlanTTL: Long, clock: Clock, monitors: Monitors,
logger: InfoLogger, plannerName: CostBasedPlannerName): CypherCompiler = {
logger: InfoLogger,
notificationLoggerBuilder: (ExecutionMode => InternalNotificationLogger),
plannerName: CostBasedPlannerName): CypherCompiler = {
val parser = new CypherParser(monitors.newMonitor[ParserMonitor[Statement]](monitorTag))
val checker = new SemanticChecker(monitors.newMonitor[SemanticCheckMonitor](monitorTag))
val rewriter = new ASTRewriter(monitors.newMonitor[AstRewritingMonitor](monitorTag))
Expand All @@ -80,11 +82,11 @@ object CypherCompilerFactory {
val cacheMonitor = monitors.newMonitor[AstCacheMonitor](monitorTag)
val cache = new MonitoringCacheAccessor[Statement, ExecutionPlan](cacheMonitor)

new CypherCompiler(parser, checker, execPlanBuilder, rewriter, cache, planCacheFactory, cacheMonitor, monitors)
new CypherCompiler(parser, checker, execPlanBuilder, rewriter, cache, planCacheFactory, cacheMonitor, monitors, notificationLoggerBuilder)
}

def ruleBasedCompiler(graph: GraphDatabaseService, queryCacheSize: Int, statsDivergenceThreshold: Double,
queryPlanTTL: Long, clock: Clock, monitors: Monitors): CypherCompiler = {
queryPlanTTL: Long, clock: Clock, monitors: Monitors, notificationLoggerBuilder: (ExecutionMode => InternalNotificationLogger)): CypherCompiler = {
val parser = new CypherParser(monitors.newMonitor[ParserMonitor[ast.Statement]](monitorTag))
val checker = new SemanticChecker(monitors.newMonitor[SemanticCheckMonitor](monitorTag))
val rewriter = new ASTRewriter(monitors.newMonitor[AstRewritingMonitor](monitorTag))
Expand All @@ -95,7 +97,7 @@ object CypherCompilerFactory {
val cacheMonitor = monitors.newMonitor[AstCacheMonitor](monitorTag)
val cache = new MonitoringCacheAccessor[Statement, ExecutionPlan](cacheMonitor)

new CypherCompiler(parser, checker, execPlanBuilder, rewriter, cache, planCacheFactory, cacheMonitor, monitors)
new CypherCompiler(parser, checker, execPlanBuilder, rewriter, cache, planCacheFactory, cacheMonitor, monitors, notificationLoggerBuilder)
}

private def logStalePlanRemovalMonitor(log: InfoLogger) = new AstCacheMonitor {
Expand All @@ -112,22 +114,25 @@ case class CypherCompiler(parser: CypherParser,
cacheAccessor: CacheAccessor[Statement, ExecutionPlan],
planCacheFactory: () => LRUCache[Statement, ExecutionPlan],
cacheMonitor: CypherCacheFlushingMonitor[CacheAccessor[Statement, ExecutionPlan]],
monitors: Monitors) {
monitors: Monitors,
notificationLoggerBuilder: (ExecutionMode => InternalNotificationLogger)) {

def planQuery(queryText: String, context: PlanContext): (ExecutionPlan, Map[String, Any]) =
planPreparedQuery(prepareQuery(queryText), context)
def planQuery(queryText: String, context: PlanContext, executionMode: ExecutionMode): (ExecutionPlan, Map[String, Any]) =
planPreparedQuery(prepareQuery(queryText, executionMode), context)

def prepareQuery(queryText: String): PreparedQuery = {
def prepareQuery(queryText: String, executionMode: ExecutionMode): PreparedQuery = {

val notificationLogger = notificationLoggerBuilder(executionMode)
val parsedStatement = parser.parse(queryText)

val cleanedStatement: Statement = parsedStatement.endoRewrite(inSequence(normalizeReturnClauses, normalizeWithClauses))
val originalSemanticState = semanticChecker.check(queryText, cleanedStatement)
val originalSemanticState = semanticChecker.check(queryText, cleanedStatement, notificationLogger)

val (rewrittenStatement, extractedParams, postConditions) = astRewriter.rewrite(queryText, cleanedStatement, originalSemanticState)
val postRewriteSemanticState = semanticChecker.check(queryText, rewrittenStatement)

val table = SemanticTable(types = postRewriteSemanticState.typeTable, recordedScopes = postRewriteSemanticState.recordedScopes)
PreparedQuery(rewrittenStatement, queryText, extractedParams)(table, postConditions, postRewriteSemanticState.scopeTree)
PreparedQuery(rewrittenStatement, queryText, extractedParams)(table, postConditions, postRewriteSemanticState.scopeTree, notificationLogger)
}

def planPreparedQuery(parsedQuery: PreparedQuery, context: PlanContext): (ExecutionPlan, Map[String, Any]) = {
Expand Down
Expand Up @@ -25,11 +25,13 @@ import java.util.Collections

import org.neo4j.cypher.internal.compiler.v2_3.executionplan.InternalExecutionResult
import org.neo4j.cypher.internal.compiler.v2_3.planDescription.InternalPlanDescription
import org.neo4j.cypher.internal.compiler.v2_3.notification.InternalNotification
import org.neo4j.graphdb.QueryExecutionType.{QueryType, explained}
import org.neo4j.graphdb.ResourceIterator
import org.neo4j.graphdb.{Notification, ResourceIterator}

case class ExplainExecutionResult(closer: TaskCloser, columns: List[String],
executionPlanDescription: InternalPlanDescription, queryType: QueryType)
executionPlanDescription: InternalPlanDescription, queryType: QueryType,
notifications: Seq[InternalNotification])
extends InternalExecutionResult {

def javaIterator: ResourceIterator[util.Map[String, Any]] = new EmptyResourceIterator(close)
Expand Down
@@ -0,0 +1,53 @@
/**
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.cypher.internal.compiler.v2_3

import org.neo4j.cypher.internal.compiler.v2_3.notification.InternalNotification

/**
* A NotificationLogger records notifications.
*/
sealed trait InternalNotificationLogger {
def log(notification: InternalNotification )

def notifications: Seq[InternalNotification ]

def += (notification: InternalNotification) = log(notification)
}

/**
* A null implementation that discards all notifications.
*/
case object devNullLogger extends InternalNotificationLogger {
override def log(notification: InternalNotification ) {}

override def notifications: Seq[InternalNotification ] = Seq.empty
}

/**
* NotificationLogger that records all notifications for later retrieval.
*/
class RecordingNotificationLogger extends InternalNotificationLogger {
private val builder = Seq.newBuilder[InternalNotification]

def log(notification: InternalNotification ) = builder += notification
def notifications = builder.result()
}

Expand Up @@ -29,6 +29,7 @@ import org.neo4j.cypher.internal.compiler.v2_3.pipes.QueryState
import org.neo4j.cypher.internal.compiler.v2_3.planDescription.InternalPlanDescription
import org.neo4j.cypher.internal.compiler.v2_3.spi.QueryContext
import org.neo4j.cypher.internal.{ExplainMode, ExecutionMode, ProfileMode}
import org.neo4j.cypher.internal.compiler.v2_3.notification.InternalNotification
import org.neo4j.graphdb.QueryExecutionType.{QueryType, profiled, query}
import org.neo4j.graphdb.ResourceIterator

Expand Down Expand Up @@ -101,5 +102,8 @@ class PipeExecutionResult(val result: ResultIterator,
}

def executionType = if (executionMode == ProfileMode) profiled(queryType) else query(queryType)

//notifications only present for EXPLAIN
override val notifications = Iterable.empty[InternalNotification]
}

Expand Up @@ -27,7 +27,10 @@ import org.neo4j.cypher.internal.compiler.v2_3.tracing.rewriters.RewriterConditi

case class PreparedQuery(statement: Statement,
queryText: String,
extractedParams: Map[String, Any])(val semanticTable: SemanticTable, val conditions: Set[RewriterCondition], val scopeTree: Scope) {
extractedParams: Map[String, Any])(val semanticTable: SemanticTable,
val conditions: Set[RewriterCondition],
val scopeTree: Scope,
val notificationLogger: InternalNotificationLogger) {

def abstractQuery: AbstractQuery = statement.asQuery.setQueryText(queryText)

Expand All @@ -37,5 +40,5 @@ case class PreparedQuery(statement: Statement,
}

def rewrite(rewriter: Rewriter): PreparedQuery =
copy(statement = statement.endoRewrite(rewriter))(semanticTable, conditions, scopeTree)
copy(statement = statement.endoRewrite(rewriter))(semanticTable, conditions, scopeTree, notificationLogger)
}
Expand Up @@ -22,9 +22,9 @@ package org.neo4j.cypher.internal.compiler.v2_3
import org.neo4j.cypher.internal.compiler.v2_3.ast.Statement

class SemanticChecker(semanticCheckMonitor: SemanticCheckMonitor) {
def check(queryText: String, statement: Statement): SemanticState = {
def check(queryText: String, statement: Statement, notificationLogger: InternalNotificationLogger = devNullLogger): SemanticState = {
semanticCheckMonitor.startSemanticCheck(queryText)
val SemanticCheckResult(semanticState, semanticErrors) = statement.semanticCheck(SemanticState.clean)
val SemanticCheckResult(semanticState, semanticErrors) = statement.semanticCheck(SemanticState.clean.withNotificationLogger(notificationLogger))

val scopeTreeIssues = ScopeTreeVerifier.verify(semanticState.scopeTree)
if (scopeTreeIssues.nonEmpty)
Expand Down
Expand Up @@ -170,7 +170,8 @@ import org.neo4j.cypher.internal.compiler.v2_3.SemanticState.ScopeLocation

case class SemanticState(currentScope: ScopeLocation,
typeTable: ASTAnnotationMap[ast.Expression, ExpressionTypeInfo],
recordedScopes: ASTAnnotationMap[ast.ASTNode, Scope]) {
recordedScopes: ASTAnnotationMap[ast.ASTNode, Scope],
notificationLogger: InternalNotificationLogger = devNullLogger) {
def scopeTree = currentScope.rootScope

def newChildScope = copy(currentScope = currentScope.newChildScope)
Expand All @@ -190,6 +191,8 @@ case class SemanticState(currentScope: ScopeLocation,
Left(SemanticError(s"${identifier.name} already declared", identifier.position, symbol.positions.toSeq: _*))
}

def withNotificationLogger(notificationLogger: InternalNotificationLogger) = copy(notificationLogger = notificationLogger)

def implicitIdentifier(identifier: ast.Identifier, possibleTypes: TypeSpec): Either[SemanticError, SemanticState] =
this.symbol(identifier.name) match {
case None =>
Expand Down
Expand Up @@ -22,6 +22,9 @@ package org.neo4j.cypher.internal.compiler.v2_3.ast
import org.neo4j.cypher.internal.compiler.v2_3._
import org.neo4j.cypher.internal.compiler.v2_3.commands.expressions.StringHelper.RichString
import org.neo4j.cypher.internal.compiler.v2_3.symbols._
import org.neo4j.cypher.internal.compiler.v2_3.notification.CartesianProductNotification

import scala.annotation.tailrec

sealed trait Clause extends ASTNode with ASTPhrase with SemanticCheckable {
def name: String
Expand Down Expand Up @@ -70,7 +73,8 @@ case class Match(optional: Boolean, pattern: Pattern, hints: Seq[UsingHint], whe
hints.semanticCheck chain
uniqueHints chain
where.semanticCheck chain
checkHints
checkHints chain
checkForCartesianProducts

private def uniqueHints: SemanticCheck = {
val errors = hints.groupBy(_.identifier).collect {
Expand All @@ -81,6 +85,30 @@ case class Match(optional: Boolean, pattern: Pattern, hints: Seq[UsingHint], whe
(state: SemanticState) => SemanticCheckResult(state, errors)
}

private def checkForCartesianProducts: SemanticCheck = (state: SemanticState) => {
val nodes = pattern.patternParts.map(_.fold(Set.empty[Identifier]) {
case NodePattern(Some(id), _, _, _) => list => list + id
})

def pickFirst(a: Identifier, b: Identifier) = if (a.position.offset < b.position.offset) a else b

@tailrec
def loop(compare: Set[Identifier], rest: Seq[Set[Identifier]]) {
val shouldWarn = rest.exists( o => (compare intersect o).isEmpty)

if (shouldWarn) {
val notification = CartesianProductNotification(compare.reduce(pickFirst).position)
state.notificationLogger += notification
}

if (rest.nonEmpty) loop(rest.head, rest.tail)
}

if (nodes.nonEmpty) loop(nodes.head, nodes.tail)

SemanticCheckResult(state, Seq.empty)
}

private def checkHints: SemanticCheck = {
val error: Option[SemanticCheck] = hints.collectFirst {
case hint@UsingIndexHint(Identifier(identifier), LabelName(labelName), Identifier(property))
Expand Down
Expand Up @@ -56,12 +56,12 @@ case class DefaultExecutionResultBuilderFactory(pipeInfo: PipeInfo, columns: Lis
exceptionDecorator = newDecorator
}

def build(graph: GraphDatabaseService, queryId: AnyRef, planType: ExecutionMode, params: Map[String, Any]): InternalExecutionResult = {
def build(graph: GraphDatabaseService, queryId: AnyRef, planType: ExecutionMode, params: Map[String, Any], notificationLogger: InternalNotificationLogger): InternalExecutionResult = {
taskCloser.addTask(queryContext.close)
val state = new QueryState(graph, queryContext, externalResource, params, pipeDecorator, queryId = queryId)
try {
try {
createResults(state, planType)
createResults(state, planType, notificationLogger)
}
catch {
case e: CypherException =>
Expand All @@ -75,7 +75,7 @@ case class DefaultExecutionResultBuilderFactory(pipeInfo: PipeInfo, columns: Lis
}
}

private def createResults(state: QueryState, planType: ExecutionMode): InternalExecutionResult = {
private def createResults(state: QueryState, planType: ExecutionMode, notificationLogger: InternalNotificationLogger): InternalExecutionResult = {
val queryType =
if (pipeInfo.pipe.isInstanceOf[IndexOperationPipe] || pipeInfo.pipe.isInstanceOf[ConstraintOperationPipe])
QueryType.SCHEMA_WRITE
Expand All @@ -87,7 +87,7 @@ case class DefaultExecutionResultBuilderFactory(pipeInfo: PipeInfo, columns: Lis
} else
QueryType.READ_ONLY
if (planType == ExplainMode) {
new ExplainExecutionResult(taskCloser, columns, pipeInfo.pipe.planDescription, queryType)
new ExplainExecutionResult(taskCloser, columns, pipeInfo.pipe.planDescription, queryType, notificationLogger.notifications)
} else {
val results = pipeInfo.pipe.createResults(state)
val resultIterator = buildResultIterator(results, pipeInfo.updating)
Expand Down
Expand Up @@ -61,7 +61,7 @@ class ExecutionPlanBuilder(graph: GraphDatabaseService, statsDivergenceThreshold

val columns = getQueryResultColumns(abstractQuery, pipe.symbols)
val resultBuilderFactory = new DefaultExecutionResultBuilderFactory(pipeInfo, columns)
val func = getExecutionPlanFunction(periodicCommitInfo, abstractQuery.getQueryText, updating, resultBuilderFactory)
val func = getExecutionPlanFunction(periodicCommitInfo, abstractQuery.getQueryText, updating, resultBuilderFactory, inputQuery.notificationLogger)

new ExecutionPlan {
private val fingerprint = PlanFingerprintReference(clock, queryPlanTTL, statsDivergenceThreshold, fp)
Expand Down Expand Up @@ -101,7 +101,8 @@ class ExecutionPlanBuilder(graph: GraphDatabaseService, statsDivergenceThreshold
private def getExecutionPlanFunction(periodicCommit: Option[PeriodicCommitInfo],
queryId: AnyRef,
updating: Boolean,
resultBuilderFactory: ExecutionResultBuilderFactory):
resultBuilderFactory: ExecutionResultBuilderFactory,
notificationLogger: InternalNotificationLogger):
(QueryContext, ExecutionMode, Map[String, Any]) => InternalExecutionResult =
(queryContext: QueryContext, planType: ExecutionMode, params: Map[String, Any]) => {
val builder = resultBuilderFactory.create()
Expand All @@ -119,6 +120,6 @@ class ExecutionPlanBuilder(graph: GraphDatabaseService, statsDivergenceThreshold
if (profiling)
builder.setPipeDecorator(new Profiler())

builder.build(graph, queryId, planType, params)
builder.build(graph, queryId, planType, params, notificationLogger)
}
}
Expand Up @@ -20,7 +20,7 @@
package org.neo4j.cypher.internal.compiler.v2_3.executionplan

import org.neo4j.cypher.internal.ExecutionMode
import org.neo4j.cypher.internal.compiler.v2_3.CypherException
import org.neo4j.cypher.internal.compiler.v2_3.{InternalNotificationLogger, CypherException}
import org.neo4j.cypher.internal.compiler.v2_3.pipes._
import org.neo4j.cypher.internal.compiler.v2_3.spi.QueryContext
import org.neo4j.graphdb.GraphDatabaseService
Expand All @@ -30,7 +30,7 @@ trait ExecutionResultBuilder {
def setLoadCsvPeriodicCommitObserver(batchRowCount: Long)
def setPipeDecorator(newDecorator: PipeDecorator)
def setExceptionDecorator(newDecorator: CypherException => CypherException)
def build(graph: GraphDatabaseService, queryId: AnyRef, planType: ExecutionMode, params: Map[String, Any]): InternalExecutionResult
def build(graph: GraphDatabaseService, queryId: AnyRef, planType: ExecutionMode, params: Map[String, Any], notificationLogger: InternalNotificationLogger): InternalExecutionResult
}

trait ExecutionResultBuilderFactory {
Expand Down
Expand Up @@ -23,6 +23,7 @@ import java.io.PrintWriter

import org.neo4j.cypher.internal.compiler.v2_3.InternalQueryStatistics
import org.neo4j.cypher.internal.compiler.v2_3.planDescription.InternalPlanDescription
import org.neo4j.cypher.internal.compiler.v2_3.notification.InternalNotification
import org.neo4j.graphdb.{QueryExecutionType, ResourceIterator}

trait InternalExecutionResult extends Iterator[Map[String, Any]] {
Expand All @@ -38,4 +39,5 @@ trait InternalExecutionResult extends Iterator[Map[String, Any]] {
def close()
def planDescriptionRequested: Boolean
def executionType: QueryExecutionType
def notifications: Iterable[InternalNotification]
}

0 comments on commit 8d43f8f

Please sign in to comment.