/
ExecutionPlanBuilder.scala
125 lines (104 loc) · 5.19 KB
/
ExecutionPlanBuilder.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
/**
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.cypher.internal.compiler.v2_3.executionplan
import org.neo4j.cypher.internal.{ProfileMode, ExecutionMode}
import org.neo4j.cypher.internal.compiler.v2_3._
import org.neo4j.cypher.internal.compiler.v2_3.ast.Statement
import org.neo4j.cypher.internal.compiler.v2_3.commands._
import org.neo4j.cypher.internal.compiler.v2_3.executionplan.builders._
import org.neo4j.cypher.internal.compiler.v2_3.pipes._
import org.neo4j.cypher.internal.compiler.v2_3.planner.CantHandleQueryException
import org.neo4j.cypher.internal.compiler.v2_3.profiler.Profiler
import org.neo4j.cypher.internal.compiler.v2_3.spi._
import org.neo4j.cypher.internal.compiler.v2_3.symbols.SymbolTable
import org.neo4j.graphdb.GraphDatabaseService
import org.neo4j.helpers.Clock
case class PipeInfo(pipe: Pipe,
updating: Boolean,
periodicCommit: Option[PeriodicCommitInfo] = None,
fingerprint: Option[PlanFingerprint] = None,
plannerUsed: PlannerName)
case class PeriodicCommitInfo(size: Option[Long]) {
def batchRowCount = size.getOrElse(/* defaultSize */ 1000L)
}
trait NewLogicalPlanSuccessRateMonitor {
def newQuerySeen(queryText: String, ast:Statement)
def unableToHandleQuery(queryText: String, ast:Statement, origin: CantHandleQueryException)
}
trait PipeBuilder {
def producePlan(inputQuery: PreparedQuery, planContext: PlanContext): PipeInfo
}
class ExecutionPlanBuilder(graph: GraphDatabaseService, statsDivergenceThreshold: Double, queryPlanTTL: Long,
clock: Clock, pipeBuilder: PipeBuilder) extends PatternGraphBuilder {
def build(planContext: PlanContext, inputQuery: PreparedQuery): ExecutionPlan = {
val abstractQuery = inputQuery.abstractQuery
val pipeInfo = pipeBuilder.producePlan(inputQuery, planContext)
val PipeInfo(pipe, updating, periodicCommitInfo, fp, planner) = pipeInfo
val columns = getQueryResultColumns(abstractQuery, pipe.symbols)
val resultBuilderFactory = new DefaultExecutionResultBuilderFactory(pipeInfo, columns)
val func = getExecutionPlanFunction(periodicCommitInfo, abstractQuery.getQueryText, updating, resultBuilderFactory, inputQuery.notificationLogger)
new ExecutionPlan {
private val fingerprint = PlanFingerprintReference(clock, queryPlanTTL, statsDivergenceThreshold, fp)
def run(queryContext: QueryContext, planType: ExecutionMode, params: Map[String, Any]) =
func(queryContext, planType, params)
def isPeriodicCommit = periodicCommitInfo.isDefined
def plannerUsed = planner
def isStale(lastTxId: () => Long, statistics: GraphStatistics) = fingerprint.isStale(lastTxId, statistics)
}
}
private def getQueryResultColumns(q: AbstractQuery, currentSymbols: SymbolTable): List[String] = q match {
case in: PeriodicCommitQuery =>
getQueryResultColumns(in.query, currentSymbols)
case in: Query =>
// Find the last query part
var query = in
while (query.tail.isDefined) {
query = query.tail.get
}
query.returns.columns.flatMap {
case "*" => currentSymbols.identifiers.keys
case x => Seq(x)
}
case union: Union =>
getQueryResultColumns(union.queries.head, currentSymbols)
case _ =>
List.empty
}
private def getExecutionPlanFunction(periodicCommit: Option[PeriodicCommitInfo],
queryId: AnyRef,
updating: Boolean,
resultBuilderFactory: ExecutionResultBuilderFactory,
notificationLogger: InternalNotificationLogger):
(QueryContext, ExecutionMode, Map[String, Any]) => InternalExecutionResult =
(queryContext: QueryContext, planType: ExecutionMode, params: Map[String, Any]) => {
val builder = resultBuilderFactory.create()
val profiling = planType == ProfileMode
val builderContext = if (updating || profiling) new UpdateCountingQueryContext(queryContext) else queryContext
builder.setQueryContext(builderContext)
if (periodicCommit.isDefined) {
if (!builderContext.isTopLevelTx)
throw new PeriodicCommitInOpenTransactionException()
builder.setLoadCsvPeriodicCommitObserver(periodicCommit.get.batchRowCount)
}
if (profiling)
builder.setPipeDecorator(new Profiler())
builder.build(graph, queryId, planType, params, notificationLogger)
}
}