Skip to content

Commit

Permalink
Merge pull request #6453 from davidegrohmann/3.0-abstract-tx-manageme…
Browse files Browse the repository at this point in the history
…nt-in-cypher

Abstract tx management in ExecutionEngine
  • Loading branch information
pontusmelke committed Feb 22, 2016
2 parents e475de7 + f518c93 commit 6e28dc9
Show file tree
Hide file tree
Showing 36 changed files with 369 additions and 392 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.neo4j.cypher.internal.compiler.v3_0.commands.expressions.{Expander, K
import org.neo4j.cypher.internal.compiler.v3_0.pipes.matching.PatternNode
import org.neo4j.cypher.internal.frontend.v3_0.SemanticDirection
import org.neo4j.graphdb.{Node, Path, PropertyContainer, Relationship}
import org.neo4j.kernel.api.ReadOperations
import org.neo4j.kernel.api.index.IndexDescriptor

import scala.collection.Iterator
Expand All @@ -34,15 +35,9 @@ class DelegatingQueryContext(val inner: QueryContext) extends QueryContext {
protected def singleDbHit[A](value: A): A = value
protected def manyDbHits[A](value: Iterator[A]): Iterator[A] = value

type Graph = inner.Graph

type KernelStatement = inner.KernelStatement

type EntityAccessor = inner.EntityAccessor

type StateView = inner.StateView

override def transactionalContext: TransactionalContext[Graph,KernelStatement,StateView] = inner.transactionalContext
override def transactionalContext: TransactionalContext = inner.transactionalContext

override def entityAccessor: EntityAccessor = inner.entityAccessor

Expand Down Expand Up @@ -226,21 +221,15 @@ class DelegatingOperations[T <: PropertyContainer](protected val inner: Operatio
override def releaseExclusiveLock(obj: Long): Unit = inner.releaseExclusiveLock(obj)
}

class DelegatingTransactionalContext[Graph,KernelStatement,StateView](val inner: TransactionalContext[Graph,KernelStatement,StateView]) extends TransactionalContext[Graph,KernelStatement,StateView] {
class DelegatingTransactionalContext(val inner: TransactionalContext) extends TransactionalContext {

override def commitAndRestartTx() { inner.commitAndRestartTx() }
override type ReadOps = inner.ReadOps

override def isTopLevelTx: Boolean = inner.isTopLevelTx
override def readOperations: ReadOps = inner.readOperations

override def statement: KernelStatement = inner.statement
override def commitAndRestartTx() { inner.commitAndRestartTx() }

override def isOpen: Boolean = inner.isOpen
override def isTopLevelTx: Boolean = inner.isTopLevelTx

override def close(success: Boolean) { inner.close(success) }

override def graph: Graph = inner.graph

override def newContext(): TransactionalContext[Graph,KernelStatement,StateView] = inner.newContext()

override def stateView: StateView = inner.stateView
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,12 @@ import scala.collection.Iterator
* the core layer, we can move that responsibility outside of the scope of cypher.
*/
trait QueryContext extends TokenContext {
type Graph

type KernelStatement

type EntityAccessor

type StateView

def entityAccessor: EntityAccessor

def transactionalContext: TransactionalContext[Graph,KernelStatement,StateView]
def transactionalContext: TransactionalContext

def nodeOps: Operations[Node]

Expand Down Expand Up @@ -194,21 +189,15 @@ trait Operations[T <: PropertyContainer] {
def releaseExclusiveLock(obj: Long): Unit
}

trait TransactionalContext[Graph,KernelStatement,StateView] {

def newContext(): TransactionalContext[Graph,KernelStatement,StateView]
trait TransactionalContext {

def statement: KernelStatement
type ReadOps

def isOpen: Boolean
def readOperations: ReadOps

def isTopLevelTx: Boolean

def close(success: Boolean)

def commitAndRestartTx()

def graph: Graph

def stateView: StateView
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ import org.neo4j.cypher.internal.compiler.v3_0._
import org.neo4j.cypher.internal.compiler.v3_0.pipes.Pipe
import org.neo4j.cypher.internal.compiler.v3_0.spi.{TransactionalContext, QueryContext}
import org.neo4j.cypher.internal.frontend.v3_0.test_helpers.CypherFunSuite
import org.neo4j.kernel.GraphDatabaseAPI
import org.neo4j.kernel.api.Statement
import org.neo4j.kernel.api.txstate.TxStateHolder

class ExecutionWorkflowBuilderTest extends CypherFunSuite {
val PlannerName = IDPPlannerName
Expand All @@ -37,7 +34,7 @@ class ExecutionWorkflowBuilderTest extends CypherFunSuite {
val pipe = mock[Pipe]
when(pipe.createResults(any())).thenReturn(Iterator.empty)
val context = mock[QueryContext]
when(context.transactionalContext).thenReturn(mock[TransactionalContext[GraphDatabaseAPI, Statement, TxStateHolder]].asInstanceOf[TransactionalContext[context.Graph,context.KernelStatement, context.StateView]])
when(context.transactionalContext).thenReturn(mock[TransactionalContext])
val builderFactory = DefaultExecutionResultBuilderFactory(PipeInfo(pipe, updating = true, None, None, PlannerName), List.empty)

// WHEN
Expand Down Expand Up @@ -72,7 +69,7 @@ class ExecutionWorkflowBuilderTest extends CypherFunSuite {
val pipe = mock[Pipe]
when(pipe.createResults(any())).thenReturn(Iterator.empty)
val context = mock[QueryContext]
when(context.transactionalContext).thenReturn(mock[TransactionalContext[GraphDatabaseAPI, Statement, TxStateHolder]].asInstanceOf[TransactionalContext[context.Graph,context.KernelStatement, context.StateView]])
when(context.transactionalContext).thenReturn(mock[TransactionalContext])
val builderFactory = DefaultExecutionResultBuilderFactory(PipeInfo(pipe, updating = false, None, None, PlannerName), List.empty)

// WHEN
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,10 @@ import org.mockito.Mockito._
import org.neo4j.cypher.internal.compiler.v3_0.pipes.ExternalCSVResource
import org.neo4j.cypher.internal.compiler.v3_0.spi.{TransactionalContext, QueryContext}
import org.neo4j.cypher.internal.frontend.v3_0.test_helpers.CypherFunSuite
import org.neo4j.kernel.GraphDatabaseAPI
import org.neo4j.kernel.api.Statement
import org.neo4j.kernel.api.txstate.TxStateHolder

class LoadCsvPeriodicCommitObserverTest extends CypherFunSuite {

var resourceUnderTest: LoadCsvPeriodicCommitObserver = _
var transactionalContext: TransactionalContext[_,_,_] = _
var transactionalContext: TransactionalContext = _
var resource: ExternalCSVResource = _
val url: URL = new URL("file:///tmp/something.csv")

Expand Down Expand Up @@ -77,8 +73,8 @@ class LoadCsvPeriodicCommitObserverTest extends CypherFunSuite {

override protected def beforeEach() {
val queryContext = mock[QueryContext]
transactionalContext = mock[TransactionalContext[GraphDatabaseAPI, Statement, TxStateHolder]]
when(queryContext.transactionalContext).thenReturn(transactionalContext.asInstanceOf[TransactionalContext[queryContext.Graph,queryContext.KernelStatement, queryContext.StateView]])
transactionalContext = mock[TransactionalContext]
when(queryContext.transactionalContext).thenReturn(transactionalContext)
resource = mock[ExternalCSVResource]
resourceUnderTest = new LoadCsvPeriodicCommitObserver(1, resource, queryContext)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class CallProcedureExecutionPlanTest extends CypherFunSuite {
}
}

when(ctx.transactionalContext).thenReturn(mock[TransactionalContext[GraphDatabaseAPI,Statement,TxStateHolder]].asInstanceOf[TransactionalContext[ctx.Graph,ctx.KernelStatement,ctx.StateView]])
when(ctx.transactionalContext).thenReturn(mock[TransactionalContext])
when(ctx.callReadOnlyProcedure(any[ProcedureName], any[Seq[AnyRef]])).thenAnswer(procedureResult)
when(ctx.callReadWriteProcedure(any[ProcedureName], any[Seq[AnyRef]])).thenAnswer(procedureResult)
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ import org.neo4j.cypher._
import org.neo4j.cypher.internal.compiler.v3_0.helpers.JavaResultValueConverter
import org.neo4j.cypher.internal.compiler.v3_0.prettifier.Prettifier
import org.neo4j.cypher.internal.compiler.v3_0.{LRUCache => LRUCachev3_0, _}
import org.neo4j.cypher.internal.spi.ExtendedTransactionalContext
import org.neo4j.cypher.internal.tracing.{CompilationTracer, TimingCompilationTracer}
import org.neo4j.graphdb.config.Setting
import org.neo4j.graphdb.factory.GraphDatabaseSettings
import org.neo4j.kernel.api.ReadOperations
import org.neo4j.kernel.configuration.Config
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge
import org.neo4j.kernel.impl.query.{QueryEngineProvider, QueryExecutionMonitor, QuerySession}
Expand Down Expand Up @@ -90,9 +92,8 @@ class ExecutionEngine(graph: GraphDatabaseQueryService, logProvider: LogProvider
def profile(query: String, params: Map[String, Any],session: QuerySession): ExtendedExecutionResult = {
val javaParams = javaValues.asDeepJavaResultMap(params).asInstanceOf[JavaMap[String, AnyRef]]
executionMonitor.startQueryExecution(session, query, javaParams)

val (preparedPlanExecution, txInfo) = planQuery(query)
preparedPlanExecution.profile(graph, txInfo, params, session)
val (preparedPlanExecution, transactionalContext) = planQuery(query)
preparedPlanExecution.profile(transactionalContext, params, session)
}

@throws(classOf[SyntaxException])
Expand All @@ -117,8 +118,8 @@ class ExecutionEngine(graph: GraphDatabaseQueryService, logProvider: LogProvider
def execute(query: String, params: Map[String, Any], session: QuerySession): ExtendedExecutionResult = {
val javaParams = javaValues.asDeepJavaResultMap(params).asInstanceOf[JavaMap[String, AnyRef]]
executionMonitor.startQueryExecution(session, query, javaParams)
val (preparedPlanExecution, txInfo) = planQuery(query)
preparedPlanExecution.execute(graph, txInfo, params, session)
val (preparedPlanExecution, transactionalContext) = planQuery(query)
preparedPlanExecution.execute(transactionalContext, params, session)
}

@throws(classOf[SyntaxException])
Expand All @@ -140,7 +141,7 @@ class ExecutionEngine(graph: GraphDatabaseQueryService, logProvider: LogProvider
preParsedQueries.getOrElseUpdate(queryText, compiler.preParseQuery(queryText))

@throws(classOf[SyntaxException])
protected def planQuery(queryText: String): (PreparedPlanExecution, TransactionInfo) = {
protected def planQuery(queryText: String): (PreparedPlanExecution, ExtendedTransactionalContext) = {
val phaseTracer = compilationTracer.compileQuery(queryText)
try {

Expand All @@ -152,25 +153,23 @@ class ExecutionEngine(graph: GraphDatabaseQueryService, logProvider: LogProvider
while (n < ExecutionEngine.PLAN_BUILDING_TRIES) {
// create transaction and query context
var touched = false
val isTopLevelTx = !txBridge.hasTransaction
val tx = graph.beginTx()
val kernelStatement = txBridge.get()
val tc = TransactionContextFactory.open(graph,txBridge)

val (plan: ExecutionPlan, extractedParameters) = try {
// fetch plan cache
val cache: LRUCachev3_0[String, (ExecutionPlan, Map[String, Any])] = getOrCreateFromSchemaState(kernelStatement, {
cacheMonitor.cacheFlushDetected(kernelStatement)
val cache: LRUCachev3_0[String, (ExecutionPlan, Map[String, Any])] = getOrCreateFromSchemaState(tc.readOperations, {
cacheMonitor.cacheFlushDetected(tc.statement)
new LRUCachev3_0[String, (ExecutionPlan, Map[String, Any])](getPlanCacheSize)
})

Iterator.continually {
cacheAccessor.getOrElseUpdate(cache)(cacheKey, {
touched = true
val parsedQuery = parsePreParsedQuery(preParsedQuery, phaseTracer)
parsedQuery.plan(kernelStatement, phaseTracer)
parsedQuery.plan(tc, phaseTracer)
})
}.flatMap { case (candidatePlan, params) =>
if (!touched && candidatePlan.isStale(lastCommittedTxId, kernelStatement)) {
if (!touched && candidatePlan.isStale(lastCommittedTxId, tc)) {
cacheAccessor.remove(cache)(cacheKey, queryText)
None
} else {
Expand All @@ -179,23 +178,15 @@ class ExecutionEngine(graph: GraphDatabaseQueryService, logProvider: LogProvider
}.next()
} catch {
case (t: Throwable) =>
kernelStatement.close()
tx.failure()
tx.close()
tc.close(success = false)
throw t
}

if (touched) {
kernelStatement.close()
tx.success()
tx.close()
tc.close(success = true)
} else {
// close the old statement reference after the statement has been "upgraded"
// to either a schema data or a schema statement, so that the locks are "handed over".
kernelStatement.close()
val preparedPlanExecution = PreparedPlanExecution(plan, executionMode, extractedParameters)
val txInfo = TransactionInfo(tx, isTopLevelTx, txBridge.get())
return (preparedPlanExecution, txInfo)
tc.cleanForReuse()
return (PreparedPlanExecution(plan, executionMode, extractedParameters), tc)
}

n += 1
Expand All @@ -207,11 +198,11 @@ class ExecutionEngine(graph: GraphDatabaseQueryService, logProvider: LogProvider

private val txBridge = graph.getDependencyResolver.resolveDependency(classOf[ThreadToStatementContextBridge])

private def getOrCreateFromSchemaState[V](statement: api.Statement, creator: => V) = {
private def getOrCreateFromSchemaState[V](operations: ReadOperations, creator: => V) = {
val javaCreator = new java.util.function.Function[ExecutionEngine, V]() {
def apply(key: ExecutionEngine) = creator
}
statement.readOperations().schemaStateGetOrCreate(this, javaCreator)
operations.schemaStateGetOrCreate(this, javaCreator)
}

def prettify(query: String): String = Prettifier(query)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@
*/
package org.neo4j.cypher.internal

import org.neo4j.cypher.internal.spi.ExtendedTransactionalContext
import org.neo4j.graphdb.Transaction
import org.neo4j.kernel.GraphDatabaseQueryService
import org.neo4j.kernel.api.Statement
import org.neo4j.kernel.impl.query.QuerySession

final case class TransactionInfo(tx: Transaction, isTopLevelTx: Boolean, statement: Statement)

trait ExecutionPlan {
def run(graph: GraphDatabaseQueryService, txInfo: TransactionInfo, executionMode: CypherExecutionMode, params: Map[String, Any], session: QuerySession): ExtendedExecutionResult

def run(transactionalContext: ExtendedTransactionalContext, executionMode: CypherExecutionMode, params: Map[String, Any], session: QuerySession): ExtendedExecutionResult

def isPeriodicCommit: Boolean

def isStale(lastCommittedTxId: LastCommittedTxIdProvider, statement: Statement): Boolean
def isStale(lastCommittedTxId: LastCommittedTxIdProvider, ctx: ExtendedTransactionalContext): Boolean
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,35 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.cypher.internal.spi.v3_0
package org.neo4j.cypher.internal

import org.neo4j.cypher.internal.compiler.v3_0.spi.TransactionalContext
import org.neo4j.graphdb.Transaction
import org.neo4j.cypher.internal.spi.ExtendedTransactionalContext
import org.neo4j.graphdb.{Lock, PropertyContainer, Transaction}
import org.neo4j.kernel.GraphDatabaseQueryService
import org.neo4j.kernel.api.Statement
import org.neo4j.kernel.api.{ReadOperations, Statement}
import org.neo4j.kernel.api.txstate.TxStateHolder
import org.neo4j.kernel.impl.api.KernelStatement
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge

case class TransactionBoundTransactionalContext(graph: GraphDatabaseQueryService, initialTx: Transaction,
val isTopLevelTx: Boolean, initialStatement: Statement)
extends TransactionalContext[GraphDatabaseQueryService, Statement, TxStateHolder] {
object TransactionContextFactory {
def open(graphDatabaseAPI: GraphDatabaseQueryService, txBridge: ThreadToStatementContextBridge): Neo4jTransactionContext = {
val isTopLevelTx = !txBridge.hasTransaction
val tx = graphDatabaseAPI.beginTx()
val statement = txBridge.get()
Neo4jTransactionContext(graphDatabaseAPI, tx, isTopLevelTx, statement)
}
}

// please construct this class through TransactionContextFactory, this is public only for tests
case class Neo4jTransactionContext(val graph: GraphDatabaseQueryService, initialTx: Transaction, val isTopLevelTx: Boolean,
initialStatement: Statement) extends ExtendedTransactionalContext {
private var tx = initialTx
private var open = true
private var _statement = initialStatement
private val txBridge = graph.getDependencyResolver.resolveDependency(classOf[ThreadToStatementContextBridge])

override def readOperations: ReadOps = statement.readOperations()

override def statement = _statement

override def isOpen = open
Expand Down Expand Up @@ -62,12 +73,16 @@ case class TransactionBoundTransactionalContext(graph: GraphDatabaseQueryService
_statement = txBridge.get()
}

override def newContext() = {
val isTopLevelTx = !txBridge.hasTransaction
val tx = graph.beginTx()
val statement = txBridge.get()
new TransactionBoundTransactionalContext(graph, tx, isTopLevelTx, statement)
def cleanForReuse() {
// close the old statement reference after the statement has been "upgraded"
// to either a schema data or a schema statement, so that the locks are "handed over".
statement.close()
_statement = txBridge.get()
}

override def newContext() = TransactionContextFactory.open(graph, txBridge)

override def stateView: TxStateHolder = statement.asInstanceOf[KernelStatement]

override def acquireWriteLock(p: PropertyContainer): Lock = tx.acquireWriteLock(p)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
package org.neo4j.cypher.internal

import org.neo4j.cypher.internal.compiler.v3_0.CompilationPhaseTracer
import org.neo4j.cypher.internal.spi.ExtendedTransactionalContext
import org.neo4j.kernel.api.Statement

trait ParsedQuery {
def isPeriodicCommit: Boolean
def plan(statement: Statement, tracer: CompilationPhaseTracer): (ExecutionPlan, Map[String, Any])
def plan(transactionContext: ExtendedTransactionalContext, tracer: CompilationPhaseTracer): (ExecutionPlan, Map[String, Any])
def hasErrors: Boolean
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
*/
package org.neo4j.cypher.internal

import org.neo4j.kernel.GraphDatabaseQueryService
import org.neo4j.cypher.internal.spi.ExtendedTransactionalContext
import org.neo4j.kernel.impl.query.QuerySession

case class PreparedPlanExecution(plan: ExecutionPlan, executionMode: CypherExecutionMode, extractedParams: Map[String, Any]) {
def execute(graph: GraphDatabaseQueryService, txInfo: TransactionInfo, params: Map[String, Any], session: QuerySession) =
plan.run(graph, txInfo, executionMode, params ++ extractedParams, session)
def execute(transactionalContext: ExtendedTransactionalContext, params: Map[String, Any], session: QuerySession) =
plan.run(transactionalContext, executionMode, params ++ extractedParams, session)

def profile(graph: GraphDatabaseQueryService, txInfo: TransactionInfo, params: Map[String, Any], session: QuerySession) =
plan.run(graph, txInfo, CypherExecutionMode.profile, params ++ extractedParams, session)
def profile(transactionalContext: ExtendedTransactionalContext, params: Map[String, Any], session: QuerySession) =
plan.run(transactionalContext, CypherExecutionMode.profile, params ++ extractedParams, session)
}

0 comments on commit 6e28dc9

Please sign in to comment.