Skip to content

Commit

Permalink
Use new compatibility compilation code path and remove the old one
Browse files Browse the repository at this point in the history
  • Loading branch information
fickludd authored and alexaverbuch committed May 25, 2018
1 parent 0590184 commit 4e0eddb
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 371 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@ package org.neo4j.cypher.internal
import java.time.Clock

import org.neo4j.cypher.internal.compiler.v3_5.{CypherPlannerConfiguration, StatsDivergenceCalculator}
import org.opencypher.v9_0.util.helpers.fixedPoint
import org.opencypher.v9_0.frontend.phases.CompilationPhaseTracer
import org.opencypher.v9_0.util.InputPosition
import org.neo4j.cypher.{InvalidArgumentException, SyntaxException, exceptionHandler, _}
import org.neo4j.cypher.{InvalidArgumentException, _}
import org.neo4j.graphdb.factory.GraphDatabaseSettings
import org.neo4j.graphdb.impl.notification.NotificationCode._
import org.neo4j.graphdb.impl.notification.NotificationDetail.Factory.message
import org.neo4j.kernel.GraphDatabaseQueryService
import org.neo4j.kernel.configuration.Config
import org.neo4j.kernel.impl.query.TransactionalContext
import org.neo4j.kernel.monitoring.{Monitors => KernelMonitors}
import org.neo4j.logging.{Log, LogProvider}
import org.opencypher.v9_0.frontend.phases.CompilationPhaseTracer
import org.opencypher.v9_0.util
import org.opencypher.v9_0.util.InputPosition

object CompilerEngineDelegator {
val DEFAULT_QUERY_CACHE_SIZE: Int = 128
Expand All @@ -53,14 +53,14 @@ class CompilerEngineDelegator(graph: GraphDatabaseQueryService,
kernelMonitors: KernelMonitors,
config: CypherConfiguration,
logProvider: LogProvider,
compatibilityFactory: CompatibilityFactory) {
compatibilityCache: CompatibilityCache) {

import org.neo4j.cypher.internal.CompilerEngineDelegator._

private val log: Log = logProvider.getLog(getClass)

private val compilerConfig = CypherPlannerConfiguration(
queryCacheSize = getQueryCacheSize,
queryCacheSize = config.queryCacheSize,
statsDivergenceCalculator = getStatisticsDivergenceCalculator,
useErrorsOverWarnings = config.useErrorsOverWarnings,
idpMaxTableSize = config.idpMaxTableSize,
Expand All @@ -73,96 +73,93 @@ class CompilerEngineDelegator(graph: GraphDatabaseQueryService,
planWithMinimumCardinalityEstimates = config.planWithMinimumCardinalityEstimates
)

@throws(classOf[SyntaxException])
def parseQuery(preParsedQueryArg: PreParsedQuery, tracer: CompilationPhaseTracer): ParsedQuery = {
import org.neo4j.cypher.internal.compatibility.v2_3.helpers._
import org.neo4j.cypher.internal.compatibility.v3_1.helpers._

var preParsedQuery = preParsedQueryArg
/**
* Compile pre-parsed query into executable query.
*
* @param preParsedQuery pre-parsed query to convert
* @param tracer compilation tracer to which events of the compilation process are reported
* @param transactionalContext transactional context to use during compilation (in logical and physical planning)
* @return a compiled and executable query
*/
def compile(preParsedQuery: PreParsedQuery,
tracer: CompilationPhaseTracer,
transactionalContext: TransactionalContext
): CachedExecutableQuery = {

var notifications = Set.newBuilder[org.neo4j.graphdb.Notification]
val supportedRuntimes3_1 = Seq(CypherRuntimeOption.interpreted, CypherRuntimeOption.default)
val inputPosition = preParsedQuery.offset

var preParsingNotifications: Set[org.neo4j.graphdb.Notification] = Set.empty
if ((preParsedQuery.version == CypherVersion.v3_3 || preParsedQuery.version == CypherVersion.v3_5) && preParsedQuery.planner == CypherPlannerOption.rule) {
preParsingNotifications = preParsingNotifications + rulePlannerUnavailableFallbackNotification(preParsedQuery.offset)
preParsedQuery = preParsedQuery.copy(version = CypherVersion.v3_1)
}

def checkSupportedRuntime(ex: util.SyntaxException): Unit = {
if (!supportedRuntimes3_1.contains(preParsedQuery.runtime)) {
def assertSupportedRuntime(ex: util.SyntaxException, runtime: CypherRuntimeOption): Unit = {
if (!supportedRuntimes3_1.contains(runtime)) {
if (config.useErrorsOverWarnings) {
throw new InvalidArgumentException("The given query is not currently supported in the selected runtime")
} else {
preParsingNotifications += runtimeUnsupportedNotification(ex, preParsedQuery)
preParsedQuery = preParsedQuery.copy(runtime = CypherRuntimeOption.interpreted)
notifications += runtimeUnsupportedNotification(ex, inputPosition)
}
}
}

def planForVersion(input: Either[CypherVersion, ParsedQuery]): Either[CypherVersion, ParsedQuery] = input match {
case r@Right(_) => r
def innerCompile(preParsedQuery: PreParsedQuery): CachedExecutableQuery = {

case Left(CypherVersion.v3_5) =>
val parserQuery = compatibilityFactory.
create(PlannerSpec_v3_5(preParsedQuery.planner, preParsedQuery.runtime, preParsedQuery.updateStrategy), compilerConfig).
produceParsedQuery(preParsedQuery, tracer, preParsingNotifications)
if ((preParsedQuery.version == CypherVersion.v3_3 || preParsedQuery.version == CypherVersion.v3_5) && preParsedQuery.planner == CypherPlannerOption.rule) {
notifications += rulePlannerUnavailableFallbackNotification(preParsedQuery.offset)
innerCompile(preParsedQuery.copy(version = CypherVersion.v3_1))

parserQuery.onError {
// if there is a create unique in the cypher 3.5 query try to fallback to 3.1
} else if (preParsedQuery.version == CypherVersion.v3_5) {
val compiler3_5 = compatibilityCache.selectCompiler(preParsedQuery.version,
preParsedQuery.planner,
preParsedQuery.runtime,
preParsedQuery.updateStrategy,
compilerConfig)

try {
compiler3_5.compile(preParsedQuery, tracer, notifications.result(), transactionalContext)
} catch {
case ex: util.SyntaxException if ex.getMessage.startsWith("CREATE UNIQUE") =>
preParsingNotifications = preParsingNotifications +
createUniqueNotification(ex, preParsedQuery)
checkSupportedRuntime(ex)
Left(CypherVersion.v3_1)
notifications += createUniqueNotification(ex, inputPosition)
assertSupportedRuntime(ex, preParsedQuery.runtime)
innerCompile(preParsedQuery.copy(version = CypherVersion.v3_1, runtime = CypherRuntimeOption.interpreted))

case ex: util.SyntaxException if ex.getMessage.startsWith("START is deprecated") =>
preParsingNotifications = preParsingNotifications +
createStartUnavailableNotification(ex, preParsedQuery) +
createStartDeprecatedNotification(ex, preParsedQuery)
checkSupportedRuntime(ex)
Left(CypherVersion.v3_1)
case _ => Right(parserQuery)
}.getOrElse(Right(parserQuery))

case Left(CypherVersion.v3_3) =>
val parsedQuery = compatibilityFactory.
create(PlannerSpec_v3_3(preParsedQueryArg.planner, preParsedQueryArg.runtime, preParsedQueryArg.updateStrategy), compilerConfig).
produceParsedQuery(preParsedQuery, tracer, preParsingNotifications)
Right(parsedQuery)

case Left(CypherVersion.v3_1) =>
val parsedQuery = compatibilityFactory.
create(PlannerSpec_v3_1(preParsedQuery.planner, preParsedQuery.runtime, preParsedQuery.updateStrategy), compilerConfig).
produceParsedQuery(preParsedQuery, as3_1(tracer), preParsingNotifications)
Right(parsedQuery)

case Left(CypherVersion.v2_3) =>
val parsedQuery = compatibilityFactory.
create(PlannerSpec_v2_3(preParsedQuery.planner, preParsedQuery.runtime), compilerConfig).
produceParsedQuery(preParsedQuery, tracer, preParsingNotifications)
Right(parsedQuery)
notifications += createStartUnavailableNotification(ex, inputPosition)
notifications += createStartDeprecatedNotification(ex, inputPosition)
assertSupportedRuntime(ex, preParsedQuery.runtime)
innerCompile(preParsedQuery.copy(version = CypherVersion.v3_1, runtime = CypherRuntimeOption.interpreted))
}

} else {

val compiler = compatibilityCache.selectCompiler(preParsedQuery.version,
preParsedQuery.planner,
preParsedQuery.runtime,
preParsedQuery.updateStrategy,
compilerConfig)

compiler.compile(preParsedQuery, tracer, notifications.result(), transactionalContext)
}
}

val result: Either[CypherVersion, ParsedQuery] = fixedPoint(planForVersion).apply(Left(preParsedQuery.version))
result.right.get
innerCompile(preParsedQuery)
}

private def createStartUnavailableNotification(ex: util.SyntaxException, preParsedQuery: PreParsedQuery) = {
val pos = convertInputPosition(ex.pos.getOrElse(preParsedQuery.offset))

private def createStartUnavailableNotification(ex: util.SyntaxException, inputPosition: InputPosition) = {
val pos = convertInputPosition(ex.pos.getOrElse(inputPosition))
START_UNAVAILABLE_FALLBACK.notification(pos)
}

private def createStartDeprecatedNotification(ex: util.SyntaxException, preParsedQuery: PreParsedQuery) = {
val pos = convertInputPosition(ex.pos.getOrElse(preParsedQuery.offset))
private def createStartDeprecatedNotification(ex: util.SyntaxException, inputPosition: InputPosition) = {
val pos = convertInputPosition(ex.pos.getOrElse(inputPosition))
START_DEPRECATED.notification(pos, message("START", ex.getMessage))
}

private def runtimeUnsupportedNotification(ex: util.SyntaxException, preParsedQuery: PreParsedQuery) = {
val pos = convertInputPosition(ex.pos.getOrElse(preParsedQuery.offset))
private def runtimeUnsupportedNotification(ex: util.SyntaxException, inputPosition: InputPosition) = {
val pos = convertInputPosition(ex.pos.getOrElse(inputPosition))
RUNTIME_UNSUPPORTED.notification(pos)
}

private def createUniqueNotification(ex: util.SyntaxException, preParsedQuery: PreParsedQuery) = {
val pos = convertInputPosition(ex.pos.getOrElse(preParsedQuery.offset))
private def createUniqueNotification(ex: util.SyntaxException, inputPosition: InputPosition) = {
val pos = convertInputPosition(ex.pos.getOrElse(inputPosition))
CREATE_UNIQUE_UNAVAILABLE_FALLBACK.notification(pos)
}

Expand All @@ -172,11 +169,6 @@ class CompilerEngineDelegator(graph: GraphDatabaseQueryService,
private def convertInputPosition(offset: InputPosition) =
new org.neo4j.graphdb.InputPosition(offset.offset, offset.line, offset.column)

private def getQueryCacheSize : Int = {
val setting: (Config) => Int = config => config.get(GraphDatabaseSettings.query_cache_size).intValue()
getSetting(graph, setting, DEFAULT_QUERY_CACHE_SIZE)
}

private def getStatisticsDivergenceCalculator: StatsDivergenceCalculator = {
val divergenceThreshold = getSetting(graph,
config => config.get(GraphDatabaseSettings.query_statistics_divergence_threshold).doubleValue(),
Expand All @@ -185,10 +177,10 @@ class CompilerEngineDelegator(graph: GraphDatabaseQueryService,
config => config.get(GraphDatabaseSettings.query_statistics_divergence_target).doubleValue(),
DEFAULT_STATISTICS_DIVERGENCE_TARGET)
val minReplanTime = getSetting(graph,
config => config.get(GraphDatabaseSettings.cypher_min_replan_interval).toMillis().longValue(),
config => config.get(GraphDatabaseSettings.cypher_min_replan_interval).toMillis.longValue(),
DEFAULT_QUERY_PLAN_TTL)
val targetReplanTime = getSetting(graph,
config => config.get(GraphDatabaseSettings.cypher_replan_interval_target).toMillis().longValue(),
config => config.get(GraphDatabaseSettings.cypher_replan_interval_target).toMillis.longValue(),
DEFAULT_QUERY_PLAN_TARGET)
val divergenceAlgorithm = getSetting(graph,
config => config.get(GraphDatabaseSettings.cypher_replan_algorithm),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,8 @@ class ExecutionEngine(val queryService: GraphDatabaseQueryService,
private val queryCache: QueryCache[String, CachedExecutableQuery] =
new QueryCache(config.queryCacheSize, planStalenessCaller, cacheTracer)

private val compatibilityCache: CompatibilityCache = new CompatibilityCache(compatibilityFactory)
private val compilerEngineDelegator: CompilerEngineDelegator =
new CompilerEngineDelegator(queryService, kernelMonitors, config, logProvider, compatibilityCache)
new CompilerEngineDelegator(queryService, kernelMonitors, config, logProvider, new CompatibilityCache(compatibilityFactory))

private val parsedQueries = new LFUCache[String, ParsedQuery](config.queryCacheSize)

Expand Down Expand Up @@ -134,7 +133,7 @@ class ExecutionEngine(val queryService: GraphDatabaseQueryService,
val schemaToken = schemaHelper.readSchemaToken(tc)
val cacheLookup = queryCache.computeIfAbsentOrStale(cacheKey,
tc,
() => compileQuery(preParsedQuery, tracer, tc),
() => compilerEngineDelegator.compile(preParsedQuery, tracer, tc),
preParsedQuery.rawStatement)
cacheLookup match {
case _: CacheHit[_] |
Expand All @@ -158,27 +157,6 @@ class ExecutionEngine(val queryService: GraphDatabaseQueryService,
throw new IllegalStateException("Could not compile query due to insanely frequent schema changes")
}

private def compileQuery(preParsedQuery: PreParsedQuery,
tracer: CompilationPhaseTracer,
tc: TransactionalContext): CachedExecutableQuery = {



val parsedQuery = parsePreParsedQuery(preParsedQuery, tracer)
val (executionPlan, extractedParams, paramNames) = parsedQuery.plan(tc, tracer)
CachedExecutableQuery(executionPlan, paramNames, ValueConversion.asValues(extractedParams))
}

@throws(classOf[SyntaxException])
private def parsePreParsedQuery(preParsedQuery: PreParsedQuery, tracer: CompilationPhaseTracer): ParsedQuery = {
parsedQueries.get(preParsedQuery.statementWithVersionAndPlanner).getOrElse {
val parsedQuery = compilerEngineDelegator.parseQuery(preParsedQuery, tracer)
//don't cache failed queries
if (!parsedQuery.hasErrors) parsedQueries.put(preParsedQuery.statementWithVersionAndPlanner, parsedQuery)
parsedQuery
}
}

def clearQueryCaches(): Long =
Math.max(parsedQueries.clear(), queryCache.clear())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,6 @@ STATEMENT <: AnyRef](configv3_5: CypherPlannerConfiguration,
protected val runSafelyDuringPlanning: RunSafely
protected val runSafelyDuringRuntime: RunSafely

def produceParsedQuery(preParsedQuery: PreParsedQuery, tracer: CompilationPhaseTracer,
preParsingNotifications: Set[org.neo4j.graphdb.Notification]): ParsedQuery

// concrete stuff
protected val logger: InfoLogger = new StringInfoLogger(log)
protected val monitorsv3_5: Monitors = WrappedMonitorsv3_5(kernelMonitors)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ import java.util.Collections.emptyList
import org.neo4j.cypher.CypherExecutionMode
import org.neo4j.cypher.internal._
import org.neo4j.cypher.internal.compatibility._
import org.neo4j.cypher.internal.compatibility.v2_3.helpers.as2_3
import org.neo4j.cypher.internal.compiler.v2_3
import org.neo4j.cypher.internal.compiler.v2_3.executionplan.{EntityAccessor, ExecutionPlan => ExecutionPlan_v2_3}
import org.neo4j.cypher.internal.compiler.v2_3.spi.{PlanContext, QueryContext}
import org.neo4j.cypher.internal.compiler.v2_3.tracing.rewriters.RewriterStepSequencer
import org.neo4j.cypher.internal.compiler.v2_3.{InfoLogger, ExplainMode => ExplainModev2_3, NormalMode => NormalModev2_3, ProfileMode => ProfileModev2_3, _}
import org.opencypher.v9_0.frontend.phases
import org.opencypher.v9_0.frontend.phases.CompilationPhaseTracer
import org.neo4j.cypher.internal.frontend.v2_3.InputPosition
import org.neo4j.cypher.internal.javacompat.ExecutionResult
import org.neo4j.cypher.internal.runtime.interpreted.{TransactionalContextWrapper, ValueConversion}
import org.neo4j.cypher.internal.spi.v2_3.{TransactionBoundGraphStatistics, TransactionBoundPlanContext, TransactionBoundQueryContext}
Expand All @@ -44,11 +44,9 @@ import org.neo4j.kernel.monitoring.{Monitors => KernelMonitors}
import org.neo4j.logging.Log
import org.neo4j.values.AnyValue
import org.neo4j.values.virtual.MapValue
import org.neo4j.cypher.internal.compatibility.v2_3.helpers.as2_3
import org.neo4j.cypher.internal.frontend.v2_3.InputPosition
import org.opencypher.v9_0.frontend.phases.CompilationPhaseTracer

import scala.collection.mutable
import scala.util.Try

trait Compatibility extends CachingCompiler[PreparedQuery] {

Expand All @@ -68,34 +66,7 @@ trait Compatibility extends CachingCompiler[PreparedQuery] {

protected val compiler: v2_3.CypherCompiler

implicit val executionMonitor = kernelMonitors.newMonitor(classOf[QueryExecutionMonitor])

def produceParsedQuery(preParsedQuery: PreParsedQuery, tracer: CompilationPhaseTracer,
preParsingNotifications: Set[org.neo4j.graphdb.Notification]): ParsedQuery = {
val notificationLogger = new RecordingNotificationLogger
val preparedQueryForV_2_3: Try[PreparedQuery] =
Try(compiler.prepareQuery(preParsedQuery.statement,
preParsedQuery.rawStatement,
notificationLogger,
preParsedQuery.planner.name,
Some(as2_3(preParsedQuery.offset)), as2_3(tracer)))
new ParsedQuery {
def plan(transactionalContext: TransactionalContext,
tracer: phases.CompilationPhaseTracer): (ExecutionPlan, Map[String, Any], Seq[String]) = exceptionHandler
.runSafely {
val planContext: PlanContext = new TransactionBoundPlanContext(TransactionalContextWrapper(transactionalContext))
val (planImpl, extractedParameters) = compiler
.planPreparedQuery(preparedQueryForV_2_3.get, planContext, as2_3(tracer))

// Log notifications/warnings from planning
planImpl.notifications(planContext).foreach(notificationLogger += _)

(new ExecutionPlanWrapper(planImpl, preParsingNotifications, as2_3(preParsedQuery.offset)), extractedParameters, Seq.empty[String])
}

override protected val trier = preparedQueryForV_2_3
}
}
implicit val executionMonitor: QueryExecutionMonitor = kernelMonitors.newMonitor(classOf[QueryExecutionMonitor])

class ExecutionPlanWrapper(inner: ExecutionPlan_v2_3, preParsingNotifications: Set[org.neo4j.graphdb.Notification], offSet: frontend.v2_3.InputPosition)
extends ExecutionPlan {
Expand Down

0 comments on commit 4e0eddb

Please sign in to comment.