Skip to content

Commit

Permalink
Moved functionality from QuerySession to TransactionContext
Browse files Browse the repository at this point in the history
When adding connection details to ExecutingQuery, there was no longer
any need to keep QuerySession around. Instead, query logging (the main
user of QuerySession) can now get it's data through the TransactionContext
which includes an ExecutingQuery.
  • Loading branch information
boggle authored and fickludd committed Sep 16, 2016
1 parent cca93de commit 687c6dd
Show file tree
Hide file tree
Showing 96 changed files with 1,405 additions and 1,394 deletions.
Expand Up @@ -19,7 +19,6 @@
*/
package org.neo4j.bolt.v1.runtime;

import java.util.function.Supplier;
import java.time.Clock;

import org.neo4j.bolt.security.auth.Authentication;
Expand Down Expand Up @@ -93,8 +92,7 @@ public void shutdown() throws Throwable
@Override
public BoltStateMachine newMachine( String connectionDescriptor, Runnable onClose, Clock clock )
{
final CypherStatementRunner statementRunner = new CypherStatementRunner( queryExecutionEngine, txBridge,
queryService );
final CypherStatementRunner statementRunner = new CypherStatementRunner( queryExecutionEngine, queryService );
TransactionStateMachine.SPI transactionSPI = new TransactionStateMachineSPI( gds, txBridge,
queryExecutionEngine, statementRunner, transactionIdStore );
BoltStateMachine.SPI boltSPI = new BoltStateMachineSPI( connectionDescriptor, usageData,
Expand Down
Expand Up @@ -27,60 +27,51 @@
import org.neo4j.kernel.GraphDatabaseQueryService;
import org.neo4j.kernel.api.exceptions.KernelException;
import org.neo4j.kernel.api.security.AuthSubject;
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
import org.neo4j.kernel.impl.coreapi.InternalTransaction;
import org.neo4j.kernel.impl.coreapi.PropertyContainerLocker;
import org.neo4j.kernel.impl.query.Neo4jTransactionalContext;
import org.neo4j.kernel.impl.query.Neo4jTransactionalContextFactory;
import org.neo4j.kernel.impl.query.QueryExecutionEngine;
import org.neo4j.kernel.impl.query.QuerySession;
import org.neo4j.kernel.impl.query.QuerySource;
import org.neo4j.kernel.impl.query.TransactionalContext;
import org.neo4j.kernel.impl.query.TransactionalContextFactory;

import static java.lang.String.format;
import static org.neo4j.kernel.api.KernelTransaction.Type.implicit;

public class CypherStatementRunner implements StatementRunner
{
private static final PropertyContainerLocker locker = new PropertyContainerLocker();

private final QueryExecutionEngine queryExecutionEngine;
private final ThreadToStatementContextBridge txBridge;
private GraphDatabaseQueryService queryService;
private final TransactionalContextFactory contextFactory;
private final GraphDatabaseQueryService queryService;

public CypherStatementRunner( QueryExecutionEngine queryExecutionEngine, ThreadToStatementContextBridge txBridge,
GraphDatabaseQueryService queryService )
public CypherStatementRunner( QueryExecutionEngine queryExecutionEngine, GraphDatabaseQueryService queryService )
{
this.queryExecutionEngine = queryExecutionEngine;
this.txBridge = txBridge;
this.contextFactory = new Neo4jTransactionalContextFactory( queryService, locker );
this.queryService = queryService;
}

@Override
public Result run( final String querySource, final AuthSubject authSubject, final String statement, final Map<String, Object> params )
throws KernelException
public Result run(
final String querySource,
final AuthSubject authSubject,
final String queryText,
final Map<String, Object> queryParameters
) throws KernelException
{
InternalTransaction transaction = queryService.beginTransaction( implicit, authSubject );
TransactionalContext transactionalContext =
new Neo4jTransactionalContext( queryService, transaction, txBridge.get(), statement, params, locker );
QuerySession session = new BoltQuerySession( transactionalContext, querySource );
return queryExecutionEngine.executeQuery( statement, params, session );
contextFactory.newContext(BoltQuerySession.descriptor( querySource ), transaction, queryText,
queryParameters);
return queryExecutionEngine.executeQuery( queryText, queryParameters, transactionalContext );
}

static class BoltQuerySession extends QuerySession
static class BoltQuerySession
{
private final String querySource;
private final String username;

BoltQuerySession( TransactionalContext transactionalContext, String querySource )
{
super( transactionalContext );
this.username = transactionalContext.accessMode().name();
this.querySource = querySource;
}

@Override
public String toString()
public static QuerySource descriptor( String querySource )
{
return format( "bolt-session\t%s\t%s", querySource, username );
return new QuerySource( "bolt-session", querySource );
}
}
}
Expand Up @@ -361,7 +361,7 @@ class LoadCsvAcceptanceTest
.newGraphDatabase())

intercept[LoadExternalResourceException] {
new ExecutionEngine(db).execute(s"LOAD CSV FROM 'file:///tmp/blah.csv' AS line CREATE (a {name:line[0]})", Map.empty[String, Any], db.session())
new ExecutionEngine(db).execute(s"LOAD CSV FROM 'file:///tmp/blah.csv' AS line CREATE (a {name:line[0]})", Map.empty[String, Any])
}.getMessage should endWith(": configuration property 'dbms.security.allow_csv_import_from_file_urls' is false")
}

Expand All @@ -377,7 +377,7 @@ class LoadCsvAcceptanceTest
.setConfig(GraphDatabaseSettings.load_csv_file_url_root, dir.toString)
.newGraphDatabase())

val result = new ExecutionEngine(db).execute(s"LOAD CSV FROM 'file:///tmp/blah.csv' AS line RETURN line[0] AS field", Map.empty[String, Any], db.session())
val result = new ExecutionEngine(db).execute(s"LOAD CSV FROM 'file:///tmp/blah.csv' AS line RETURN line[0] AS field", Map.empty[String, Any])
result.toList should equal(List(Map("field" -> "something")))
}

Expand All @@ -391,7 +391,7 @@ class LoadCsvAcceptanceTest

intercept[LoadExternalResourceException] {
new ExecutionEngine(db)
.execute(s"LOAD CSV FROM 'file:///../foo.csv' AS line RETURN line[0] AS field", Map.empty[String, Any], db.session()).size
.execute(s"LOAD CSV FROM 'file:///../foo.csv' AS line RETURN line[0] AS field", Map.empty[String, Any]).size
}.getMessage should endWith(" file URL points outside configured import directory")
}

Expand All @@ -418,7 +418,7 @@ class LoadCsvAcceptanceTest
.newImpermanentDatabaseBuilder()
.newGraphDatabase())

val result = new ExecutionEngine(db).execute(s"LOAD CSV FROM 'testproto://foo.bar' AS line RETURN line[0] AS field", Map.empty[String, Any], db.session())
val result = new ExecutionEngine(db).execute(s"LOAD CSV FROM 'testproto://foo.bar' AS line RETURN line[0] AS field", Map.empty[String, Any])
result.toList should equal(List(Map("field" -> "something")))
}

Expand All @@ -431,18 +431,20 @@ class LoadCsvAcceptanceTest
writer.println("3,The Shawshank Redemption,USA,1994")
})
for (url <- urls) {
eengine.execute(s"LOAD CSV WITH HEADERS FROM '$url' AS csvLine " +
"MERGE (country:Country {name: csvLine.country}) " +
"CREATE (movie:Movie {id: toInt(csvLine.id), title: csvLine.title, year:toInt(csvLine.year)})" +
"CREATE (movie)-[:MADE_IN]->(country)", Map.empty[String, Any], graph.session())
val query =
s"""LOAD CSV WITH HEADERS FROM '$url' AS csvLine
|MERGE (country:Country {name: csvLine.country})
|CREATE (movie:Movie {id: toInt(csvLine.id), title: csvLine.title, year:toInt(csvLine.year)})
|CREATE (movie)-[:MADE_IN]->(country)""".stripMargin
innerExecute(query)


//make sure three unique movies are created
val result = executeWithAllPlannersAndCompatibilityMode("match (m:Movie) return m.id AS id ORDER BY m.id").toList

result should equal(List(Map("id" -> 1), Map("id" -> 2), Map("id" -> 3)))
//empty database
eengine.execute("MATCH (n) DETACH DELETE n", Map.empty[String, Any], graph.session())
innerExecute("MATCH (n) DETACH DELETE n")
}
}

Expand Down Expand Up @@ -517,6 +519,5 @@ class LoadCsvAcceptanceTest
httpServer.stop()
}

private def createFile(f: PrintWriter => Unit): String = createFile()(f)
private def createFile(filename: String = "cypher", dir: String = null)(f: PrintWriter => Unit): String = createTempFileURL(filename, ".csv")(f)
}
Expand Up @@ -23,6 +23,7 @@ import org.neo4j.cypher._
import org.neo4j.cypher.internal.compiler.v3_1.commands.expressions.PathImpl
import org.neo4j.graphdb._
import org.neo4j.helpers.collection.Iterators.single
import org.neo4j.kernel.impl.query.TransactionalContext

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -454,7 +455,7 @@ return p""")
graph.createIndex("User", "email")

// when
val result = eengine.execute("CYPHER planner=rule MATCH (n:User) USING INDEX n:User(email) WHERE exists(n.email) RETURN n", Map.empty[String, Any], graph.session())
val result = innerExecute("CYPHER planner=rule MATCH (n:User) USING INDEX n:User(email) WHERE exists(n.email) RETURN n")

// then
result.toList should equal(List(Map("n" -> n), Map("n" -> m)))
Expand All @@ -469,7 +470,7 @@ return p""")
graph.createIndex("User", "email")

// when
val result = eengine.execute("CYPHER planner=rule MATCH (n:User) WHERE exists(n.email) RETURN n", Map.empty[String, Any], graph.session())
val result = innerExecute("CYPHER planner=rule MATCH (n:User) WHERE exists(n.email) RETURN n")

// then
result.toList should equal(List(Map("n" -> n), Map("n" -> m)))
Expand Down
Expand Up @@ -19,12 +19,13 @@
*/
package org.neo4j.internal.cypher.acceptance

import java.nio.file.Files
import java.util

import org.neo4j.cypher._
import org.neo4j.cypher.internal.compatibility.CompatibilityPlanDescriptionFor3_1
import org.neo4j.cypher.internal.compiler.v3_1.planDescription.InternalPlanDescription
import org.neo4j.cypher.internal.compiler.v3_1.planner.logical.idp.IDPSolverMonitor
import org.neo4j.cypher.internal.compiler.v3_1.{IDPPlannerName, InterpretedRuntimeName}
import org.neo4j.cypher.internal.{ExecutionEngine, PlanDescription}
import org.neo4j.cypher.javacompat.internal.GraphDatabaseCypherService
import org.neo4j.graphdb.config.Setting
Expand Down Expand Up @@ -90,7 +91,7 @@ class MatchLongPatternAcceptanceTest extends ExecutionEngineFunSuite with QueryS
println(s"\t$query")
}
val start = System.currentTimeMillis()
val result = eengine.execute(s"EXPLAIN CYPHER planner=IDP $query", Map.empty[String, Any], graph.session())
val result = innerExecute(s"EXPLAIN CYPHER planner=IDP $query")
val duration = System.currentTimeMillis() - start
if (VERBOSE) {
println(result.executionPlanDescription())
Expand Down Expand Up @@ -122,14 +123,14 @@ class MatchLongPatternAcceptanceTest extends ExecutionEngineFunSuite with QueryS
if(VERBOSE) println("QUERY: " + query)

// measure planning time
var startPlaning = System.currentTimeMillis()
val resultPlanning = eengine.execute(s"EXPLAIN CYPHER planner=$planner $query", Map.empty[String, Any], graph.session())
val startPlaning = System.currentTimeMillis()
val resultPlanning = innerExecute(s"EXPLAIN CYPHER planner=$planner $query")
val durationPlanning = System.currentTimeMillis()-startPlaning
val plan = resultPlanning.executionPlanDescription()

// measure query time
var start = System.currentTimeMillis()
val result = eengine.execute(s"CYPHER planner=$planner $query", Map.empty[String, Any], graph.session())
val start = System.currentTimeMillis()
val result = innerExecute(s"CYPHER planner=$planner $query")
val resultCount = result.toList.length
val duration = System.currentTimeMillis()-start
val expectedResultCount = Math.pow(2, pathlen % indexStep).toInt
Expand All @@ -141,7 +142,7 @@ class MatchLongPatternAcceptanceTest extends ExecutionEngineFunSuite with QueryS
"joins" -> (if(planner == "IDP") pathlen / 15 else 0)
)
val counts = assertMinExpandsAndJoins(plan, minCounts)
acc :+ Seq(durationPlanning,duration,counts("joins").toLong,resultCount.toLong)
acc :+ Seq(durationPlanning, duration, counts("joins").toLong, resultCount.toLong)
}
data + (planner -> times)
}
Expand All @@ -160,7 +161,12 @@ class MatchLongPatternAcceptanceTest extends ExecutionEngineFunSuite with QueryS
}
}

private def assertMinExpandsAndJoins(plan: PlanDescription, minCounts: Map[String, Int]) = {
private def assertMinExpandsAndJoins(plan: InternalPlanDescription, minCounts: Map[String, Int]): Map[String, Int] = {
val externalPlanDescription = CompatibilityPlanDescriptionFor3_1(plan, CypherVersion.v3_1, IDPPlannerName, InterpretedRuntimeName)
assertMinExpandsAndJoins(externalPlanDescription, minCounts)
}

private def assertMinExpandsAndJoins(plan: PlanDescription, minCounts: Map[String, Int]): Map[String, Int] = {
val counts = countExpandsAndJoins(plan)
Seq("expands", "joins").foreach { op =>
if(VERBOSE) println(s"\t$op\t${counts(op)}")
Expand All @@ -169,7 +175,12 @@ class MatchLongPatternAcceptanceTest extends ExecutionEngineFunSuite with QueryS
counts
}

private def countExpandsAndJoins(plan: PlanDescription) = {
private def countExpandsAndJoins(plan: InternalPlanDescription): Map[String, Int] = {
val externalPlanDescription = CompatibilityPlanDescriptionFor3_1(plan, CypherVersion.v3_1, IDPPlannerName, InterpretedRuntimeName)
countExpandsAndJoins(externalPlanDescription)
}

private def countExpandsAndJoins(plan: PlanDescription): Map[String, Int] = {
def addCounts(map1: Map[String, Int], map2: Map[String, Int]) = map1 ++ map2.map { case (k, v) => k -> (v + map1.getOrElse(k, 0)) }
def incrCount(map: Map[String, Int], key: String) = addCounts(map, Map(key -> 1))
def expandsAndJoinsCount(plan: PlanDescription, counts: Map[String, Int]): Map[String, Int] = {
Expand All @@ -194,11 +205,12 @@ class MatchLongPatternAcceptanceTest extends ExecutionEngineFunSuite with QueryS
runWithConfig(config.toSeq: _*) {
(engine, db) =>
graph = db
eengine = new ExecutionEngine(graph)
makeLargeMatrixDataset(100)
val monitor = TestIDPSolverMonitor()
val monitors: monitoring.Monitors = graph.getDependencyResolver.resolveDependency(classOf[org.neo4j.kernel.monitoring.Monitors])
monitors.addMonitorListener(monitor)
val result = engine.execute(s"EXPLAIN CYPHER planner=IDP $query", Map.empty[String, Any], graph.session())
val result = innerExecute(s"EXPLAIN CYPHER planner=IDP $query")
val counts = countExpandsAndJoins(result.executionPlanDescription())
counts("joins") should be > 1
counts("joins") should be < numberOfPatternRelationships / 2
Expand Down
Expand Up @@ -23,62 +23,63 @@ import org.neo4j.cypher.ExecutionEngineFunSuite
import org.neo4j.cypher.internal.ExecutionResult
import org.neo4j.cypher.internal.compatibility.CompatibilityPlanDescriptionFor3_1
import org.neo4j.cypher.internal.compiler.v3_1._
import org.neo4j.kernel.impl.query.TransactionalContext
import org.scalatest.matchers.{MatchResult, Matcher}

class PreParsingAcceptanceTest extends ExecutionEngineFunSuite {

test("specifying no planner should provide IDP") {
val query = "PROFILE RETURN 1"

eengine.execute(query, Map.empty[String,Any], graph.session()) should havePlanner(IDPPlannerName)
eengine.execute(query, Map.empty[String,Any]) should havePlanner(IDPPlannerName)
}

test("specifying cost planner should provide IDP") {
val query = "PROFILE CYPHER planner=cost RETURN 1"

eengine.execute(query, Map.empty[String,Any], graph.session()) should havePlanner(IDPPlannerName)
eengine.execute(query, Map.empty[String,Any]) should havePlanner(IDPPlannerName)
}

test("specifying idp planner should provide IDP") {
val query = "PROFILE CYPHER planner=idp RETURN 1"

eengine.execute(query, Map.empty[String,Any], graph.session()) should havePlanner(IDPPlannerName)
eengine.execute(query, Map.empty[String,Any]) should havePlanner(IDPPlannerName)
}

test("specifying dp planner should provide DP") {
val query = "PROFILE CYPHER planner=dp RETURN 1"

eengine.execute(query, Map.empty[String,Any], graph.session()) should havePlanner(DPPlannerName)
eengine.execute(query, Map.empty[String,Any]) should havePlanner(DPPlannerName)
}

test("specifying rule planner should provide RULE") {
val query = "PROFILE CYPHER planner=rule RETURN 1"

eengine.execute(query, Map.empty[String,Any], graph.session()) should havePlanner(RulePlannerName)
eengine.execute(query, Map.empty[String,Any]) should havePlanner(RulePlannerName)
}

test("specifying cost planner should provide IDP using old syntax") {
val query = "PROFILE CYPHER planner=cost RETURN 1"

eengine.execute(query, Map.empty[String,Any], graph.session()) should havePlanner(IDPPlannerName)
eengine.execute(query, Map.empty[String,Any]) should havePlanner(IDPPlannerName)
}

test("specifying idp planner should provide IDP using old syntax") {
val query = "PROFILE CYPHER planner=idp RETURN 1"

eengine.execute(query, Map.empty[String,Any], graph.session()) should havePlanner(IDPPlannerName)
eengine.execute(query, Map.empty[String,Any]) should havePlanner(IDPPlannerName)
}

test("specifying dp planner should provide DP using old syntax") {
val query = "PROFILE CYPHER planner=dp RETURN 1"

eengine.execute(query, Map.empty[String,Any], graph.session()) should havePlanner(DPPlannerName)
eengine.execute(query, Map.empty[String,Any]) should havePlanner(DPPlannerName)
}

test("specifying rule planner should provide RULE using old syntax") {
val query = "PROFILE CYPHER planner=rule RETURN 1"

eengine.execute(query, Map.empty[String,Any], graph.session()) should havePlanner(RulePlannerName)
eengine.execute(query, Map.empty[String,Any]) should havePlanner(RulePlannerName)
}

private def havePlanner(expected: PlannerName): Matcher[ExecutionResult] = new Matcher[ExecutionResult] {
Expand Down
Expand Up @@ -20,6 +20,7 @@
package org.neo4j.internal.cypher.acceptance

import org.neo4j.cypher.{ExecutionEngineFunSuite, InvalidArgumentException, NewPlannerTestSupport, QueryStatisticsTestSupport}
import org.neo4j.kernel.impl.query.TransactionalContext

class SetAcceptanceTest extends ExecutionEngineFunSuite with QueryStatisticsTestSupport with NewPlannerTestSupport {

Expand Down Expand Up @@ -225,7 +226,7 @@ class SetAcceptanceTest extends ExecutionEngineFunSuite with QueryStatisticsTest
val threads = (0 until updates).map { i =>
new Thread(new Runnable {
override def run(): Unit = {
eengine.execute(queryWithPlanner, Map.empty[String, Any], graph.session())
eengine.execute(queryWithPlanner, Map.empty[String, Any])
}
})
}
Expand All @@ -236,7 +237,7 @@ class SetAcceptanceTest extends ExecutionEngineFunSuite with QueryStatisticsTest
assert(result == resultValue, s": we lost updates with $planner planner!")

// Reset for run on next planner
eengine.execute("MATCH (n) DETACH DELETE n", Map.empty[String, Any], graph.session())
eengine.execute("MATCH (n) DETACH DELETE n", Map.empty[String, Any])
}
}
}

0 comments on commit 687c6dd

Please sign in to comment.