Skip to content

Commit

Permalink
Move transaction creation outside Cypher ExecutionEngine
Browse files Browse the repository at this point in the history
  • Loading branch information
davidegrohmann committed Mar 8, 2016
1 parent 86899fa commit 4ca8d36
Show file tree
Hide file tree
Showing 122 changed files with 1,484 additions and 1,189 deletions.
Expand Up @@ -24,11 +24,13 @@
import org.neo4j.bolt.v1.runtime.spi.RecordStream;
import org.neo4j.bolt.v1.runtime.spi.StatementRunner;
import org.neo4j.kernel.api.exceptions.KernelException;
import org.neo4j.kernel.impl.coreapi.PropertyContainerLocker;
import org.neo4j.kernel.impl.query.QueryExecutionEngine;
import org.neo4j.kernel.impl.query.QuerySession;

public class CypherStatementRunner implements StatementRunner
{
private static final PropertyContainerLocker locker = new PropertyContainerLocker();
private final QueryExecutionEngine queryExecutionEngine;

public CypherStatementRunner( QueryExecutionEngine queryExecutionEngine )
Expand Down Expand Up @@ -63,17 +65,8 @@ else if ( statement.equalsIgnoreCase( "rollback" ) )
ctx.beginImplicitTransaction();
}

return new CypherAdapterStream(
queryExecutionEngine.executeQuery( statement, params, new BoltQuerySession() ) );
}
}

static class BoltQuerySession extends QuerySession
{
@Override
public String toString()
{
return "bolt";
QuerySession session = ctx.createSession( queryExecutionEngine.queryService(), locker );
return new CypherAdapterStream( queryExecutionEngine.executeQuery( statement, params, session ) );
}
}
}
Expand Up @@ -19,6 +19,10 @@
*/
package org.neo4j.bolt.v1.runtime.internal;

import org.neo4j.kernel.GraphDatabaseQueryService;
import org.neo4j.kernel.impl.coreapi.PropertyContainerLocker;
import org.neo4j.kernel.impl.query.QuerySession;

/**
* Exposes the ability to manipulate the state of a running session in various ways. This is the interface Tank
* uses to manipulate the session.
Expand Down Expand Up @@ -46,4 +50,5 @@ public interface SessionState
/** Rollback the current explicit transaction associated with this session. */
void rollbackTransaction();

QuerySession createSession( GraphDatabaseQueryService service, PropertyContainerLocker locker );
}
Expand Up @@ -29,12 +29,17 @@
import org.neo4j.bolt.v1.runtime.spi.RecordStream;
import org.neo4j.bolt.v1.runtime.spi.StatementRunner;
import org.neo4j.concurrent.DecayingFlags;
import org.neo4j.kernel.GraphDatabaseQueryService;
import org.neo4j.kernel.api.AccessMode;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
import org.neo4j.kernel.impl.coreapi.InternalTransaction;
import org.neo4j.kernel.impl.coreapi.PropertyContainerLocker;
import org.neo4j.kernel.impl.factory.GraphDatabaseFacade;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.query.Neo4jTransactionalContext;
import org.neo4j.kernel.impl.query.QuerySession;
import org.neo4j.logging.Log;
import org.neo4j.udc.UsageData;
import org.neo4j.udc.UsageDataKeys;
Expand Down Expand Up @@ -66,17 +71,17 @@ public State init( SessionStateMachine ctx, String clientName, Map<String,Object
ctx.usageData.get( UsageDataKeys.clientNames ).add( clientName );
return IDLE;
}
catch ( AuthenticationException e)
catch ( AuthenticationException e )
{
return error( ctx, new Neo4jError( e.status(), e.getMessage(), e) );
return error( ctx, new Neo4jError( e.status(), e.getMessage(), e ) );
}
}

@Override
protected State onNoImplementation( SessionStateMachine ctx, String command )
{
ctx.error( new Neo4jError( Status.Request.Invalid, "No operations allowed until you send an " +
"INIT message." ));
"INIT message." ) );
return halt( ctx );
}
},
Expand All @@ -90,7 +95,7 @@ protected State onNoImplementation( SessionStateMachine ctx, String command )
public State beginTransaction( SessionStateMachine ctx )
{
assert ctx.currentTransaction == null;
ctx.db.beginTransaction( KernelTransaction.Type.explicit, AccessMode.FULL);
ctx.db.beginTransaction( KernelTransaction.Type.explicit, AccessMode.FULL );
ctx.currentTransaction = ctx.txBridge.getKernelTransactionBoundToThisThread( false );
return IN_TRANSACTION;
}
Expand All @@ -104,7 +109,7 @@ public State runStatement( SessionStateMachine ctx, String statement, Map<String
ctx.currentResult = ctx.statementRunner.run( ctx, statement, params );
ctx.result( ctx.currentStatementMetadata );
//if the call to run failed we must remain in state ERROR
if (ctx.state == ERROR)
if ( ctx.state == ERROR )
{
return ERROR;
}
Expand All @@ -123,7 +128,7 @@ public State runStatement( SessionStateMachine ctx, String statement, Map<String
public State beginImplicitTransaction( SessionStateMachine ctx )
{
assert ctx.currentTransaction == null;
ctx.db.beginTransaction( KernelTransaction.Type.implicit, AccessMode.FULL);
ctx.db.beginTransaction( KernelTransaction.Type.implicit, AccessMode.FULL );
ctx.currentTransaction = ctx.txBridge.getKernelTransactionBoundToThisThread( false );
return IN_TRANSACTION;
}
Expand All @@ -144,7 +149,7 @@ public State rollbackTransaction( SessionStateMachine ctx )

/**
* Open transaction, no open stream
* <p/>
* <p>
* This is when the client has explicitly requested a transaction to be opened.
*/
IN_TRANSACTION
Expand Down Expand Up @@ -283,13 +288,13 @@ protected State onNoImplementation( SessionStateMachine ctx, String command )
RECOVERABLE_ERROR
{
@Override
public State reset (SessionStateMachine ctx)
public State reset( SessionStateMachine ctx )
{
return IN_TRANSACTION;
}

@Override
protected State onNoImplementation (SessionStateMachine ctx, String command)
protected State onNoImplementation( SessionStateMachine ctx, String command )
{
ctx.ignored();
return RECOVERABLE_ERROR;
Expand Down Expand Up @@ -396,7 +401,7 @@ State error( SessionStateMachine ctx, Neo4jError err )
{
// Is this error bad enough that we should roll back, or did the failure occur in an implicit
// transaction?
if( err.status().code().classification().rollbackTransaction() ||
if ( err.status().code().classification().rollbackTransaction() ||
ctx.currentTransaction.transactionType() == KernelTransaction.Type.implicit )
{
try
Expand All @@ -407,7 +412,7 @@ State error( SessionStateMachine ctx, Neo4jError err )
catch ( Throwable t )
{
ctx.log.error( "While handling '" + err.status() + "', a second failure occurred when " +
"rolling back transaction: " + t.getMessage(), t );
"rolling back transaction: " + t.getMessage(), t );
}
finally
{
Expand All @@ -427,7 +432,6 @@ State error( SessionStateMachine ctx, Neo4jError err )
ctx.error( err );
return outcome;
}

}

private final UsageData usageData;
Expand Down Expand Up @@ -494,14 +498,14 @@ public <A> void init( String clientName, Map<String,Object> authToken, A attachm
before( attachment, callback );
try
{
state = state.init( this, clientName, authToken);
state = state.init( this, clientName, authToken );
}
finally { after(); }
}

@Override
public <A> void run( String statement, Map<String,Object> params, A attachment,
Callback<StatementMetadata,A> callback )
Callback<StatementMetadata,A> callback )
{
before( attachment, callback );
try
Expand Down Expand Up @@ -534,7 +538,7 @@ public <A> void discardAll( A attachment, Callback<Void,A> callback )
}

@Override
public <A> void reset( A attachment, Callback<Void, A> callback )
public <A> void reset( A attachment, Callback<Void,A> callback )
{
before( attachment, callback );
try
Expand Down Expand Up @@ -587,6 +591,25 @@ public boolean hasTransaction()
return currentTransaction != null;
}

@Override
public QuerySession createSession( GraphDatabaseQueryService service, PropertyContainerLocker locker )
{
InternalTransaction transaction =
service.beginTransaction( currentTransaction.transactionType(), currentTransaction.mode() );
Neo4jTransactionalContext transactionalContext =
new Neo4jTransactionalContext( service, transaction, txBridge.get(), locker );

return new QuerySession( transactionalContext )
{

@Override
public String toString()
{
return "bolt";
}
};
}

public State state()
{
return state;
Expand All @@ -605,7 +628,7 @@ public String toString()
*/
private void before( Object attachment, Callback cb )
{
if( cb != null )
if ( cb != null )
{
cb.started( attachment );
}
Expand Down Expand Up @@ -650,7 +673,7 @@ private void after()
/** Forward an error to the currently attached callback */
private void error( Neo4jError err )
{
if( err.status().code().classification() == Status.Classification.DatabaseError )
if ( err.status().code().classification() == Status.Classification.DatabaseError )
{
log.error( "A database error occurred while servicing a user request: " + err );
}
Expand Down
Expand Up @@ -21,9 +21,11 @@

import org.junit.Test;

import org.neo4j.bolt.v1.runtime.internal.CypherStatementRunner.BoltQuerySession;
import org.neo4j.graphdb.Result;
import org.neo4j.kernel.GraphDatabaseQueryService;
import org.neo4j.kernel.impl.coreapi.PropertyContainerLocker;
import org.neo4j.kernel.impl.query.QueryExecutionEngine;
import org.neo4j.kernel.impl.query.QuerySession;

import static java.util.Collections.EMPTY_MAP;
import static org.mockito.Matchers.any;
Expand All @@ -45,7 +47,7 @@ public class CypherStatementRunnerTest
public void shouldCreateImplicitTxIfNoneExists() throws Exception
{
// Given
when( engine.executeQuery( anyString(), anyMap(), any( BoltQuerySession.class ) ) ).thenReturn( mock( Result.class ) );
when( engine.executeQuery( anyString(), anyMap(), any( QuerySession.class ) ) ).thenReturn( mock( Result.class ) );
when( ctx.hasTransaction() ).thenReturn( false );

CypherStatementRunner cypherRunner = new CypherStatementRunner( engine );
Expand All @@ -54,9 +56,11 @@ public void shouldCreateImplicitTxIfNoneExists() throws Exception
cypherRunner.run( ctx, "<query>", EMPTY_MAP );

// Then
verify( ctx ).createSession( any( GraphDatabaseQueryService.class ), any( PropertyContainerLocker.class ));
verify( ctx ).hasTransaction();
verify( ctx ).beginImplicitTransaction();
verify( engine ).executeQuery( eq( "<query>" ), eq( EMPTY_MAP ), any( BoltQuerySession.class ) );
verify( engine ).queryService();
verify( engine ).executeQuery( eq( "<query>" ), eq( EMPTY_MAP ), any( QuerySession.class ) );
verifyNoMoreInteractions( engine, ctx );
}
}
Expand Up @@ -39,7 +39,7 @@
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
import org.neo4j.kernel.impl.coreapi.TopLevelTransaction;
import org.neo4j.kernel.impl.coreapi.InternalTransaction;
import org.neo4j.kernel.impl.factory.GraphDatabaseFacade;
import org.neo4j.kernel.impl.logging.NullLogService;
import org.neo4j.kernel.impl.util.JobScheduler;
Expand All @@ -61,7 +61,7 @@ public class StateMachineErrorTest
private GraphDatabaseFacade db = mock( GraphDatabaseFacade.class );
private ThreadToStatementContextBridge txBridge = mock( ThreadToStatementContextBridge.class );
private StatementRunner runner = mock( StatementRunner.class );
private TopLevelTransaction tx = mock( TopLevelTransaction.class );
private InternalTransaction tx = mock( InternalTransaction.class );
private JobScheduler scheduler = mock(JobScheduler.class );

@Before
Expand Down
Expand Up @@ -619,9 +619,9 @@ class FunctionsAcceptanceTest extends ExecutionEngineFunSuite with NewPlannerTes
val query: String = "MATCH (a)-[r]->() WITH [r, 1] as coll RETURN [x in coll | type(x) ]"

//Expect
a [SyntaxException] shouldBe thrownBy(eengine.execute(s"CYPHER runtime=compiled $query"))
a [SyntaxException] shouldBe thrownBy(eengine.execute(s"CYPHER runtime=interpreted $query"))
a [SyntaxException] shouldBe thrownBy(eengine.execute(s"CYPHER planner=cost $query"))
a [SyntaxException] shouldBe thrownBy(eengine.execute(s"CYPHER planner=rule $query"))
a [SyntaxException] shouldBe thrownBy(eengine.execute(s"CYPHER runtime=compiled $query", Map.empty[String,Any], graph.session()))
a [SyntaxException] shouldBe thrownBy(eengine.execute(s"CYPHER runtime=interpreted $query", Map.empty[String,Any], graph.session()))
a [SyntaxException] shouldBe thrownBy(eengine.execute(s"CYPHER planner=cost $query", Map.empty[String,Any], graph.session()))
a [SyntaxException] shouldBe thrownBy(eengine.execute(s"CYPHER planner=rule $query", Map.empty[String,Any], graph.session()))
}
}
Expand Up @@ -360,7 +360,7 @@ class LoadCsvAcceptanceTest
.newGraphDatabase())

intercept[LoadExternalResourceException] {
new ExecutionEngine(db).execute(s"LOAD CSV FROM 'file:///tmp/blah.csv' AS line CREATE (a {name:line[0]})")
new ExecutionEngine(db).execute(s"LOAD CSV FROM 'file:///tmp/blah.csv' AS line CREATE (a {name:line[0]})", Map.empty[String, Any], db.session())
}.getMessage should endWith(": configuration property 'dbms.security.allow_csv_import_from_file_urls' is false")
}

Expand All @@ -376,7 +376,7 @@ class LoadCsvAcceptanceTest
.setConfig(GraphDatabaseSettings.load_csv_file_url_root, dir.toString)
.newGraphDatabase())

val result = new ExecutionEngine(db).execute(s"LOAD CSV FROM 'file:///tmp/blah.csv' AS line RETURN line[0] AS field")
val result = new ExecutionEngine(db).execute(s"LOAD CSV FROM 'file:///tmp/blah.csv' AS line RETURN line[0] AS field", Map.empty[String, Any], db.session())
result.toList should equal(List(Map("field" -> "something")))
}

Expand All @@ -403,7 +403,7 @@ class LoadCsvAcceptanceTest
.newImpermanentDatabaseBuilder()
.newGraphDatabase())

val result = new ExecutionEngine(db).execute(s"LOAD CSV FROM 'testproto://foo.bar' AS line RETURN line[0] AS field")
val result = new ExecutionEngine(db).execute(s"LOAD CSV FROM 'testproto://foo.bar' AS line RETURN line[0] AS field", Map.empty[String, Any], db.session())
result.toList should equal(List(Map("field" -> "something")))
}

Expand All @@ -419,15 +419,15 @@ class LoadCsvAcceptanceTest
eengine.execute(s"LOAD CSV WITH HEADERS FROM '$url' AS csvLine " +
"MERGE (country:Country {name: csvLine.country}) " +
"CREATE (movie:Movie {id: toInt(csvLine.id), title: csvLine.title, year:toInt(csvLine.year)})" +
"CREATE (movie)-[:MADE_IN]->(country)")
"CREATE (movie)-[:MADE_IN]->(country)", Map.empty[String, Any], graph.session())


//make sure three unique movies are created
val result = executeWithAllPlannersAndCompatibilityMode("match (m:Movie) return m.id AS id ORDER BY m.id").toList

result should equal(List(Map("id" -> 1), Map("id" -> 2), Map("id" -> 3)))
//empty database
eengine.execute("MATCH (n) DETACH DELETE n")
eengine.execute("MATCH (n) DETACH DELETE n", Map.empty[String, Any], graph.session())
}
}

Expand Down
Expand Up @@ -1143,7 +1143,7 @@ return b
graph.createIndex("User", "email")

// when
val result = eengine.execute("CYPHER planner=rule MATCH (n:User) USING INDEX n:User(email) WHERE exists(n.email) RETURN n")
val result = eengine.execute("CYPHER planner=rule MATCH (n:User) USING INDEX n:User(email) WHERE exists(n.email) RETURN n", Map.empty[String, Any], graph.session())

// then
result.toList should equal(List(Map("n" -> n), Map("n" -> m)))
Expand All @@ -1158,7 +1158,7 @@ return b
graph.createIndex("User", "email")

// when
val result = eengine.execute("CYPHER planner=rule MATCH (n:User) WHERE exists(n.email) RETURN n")
val result = eengine.execute("CYPHER planner=rule MATCH (n:User) WHERE exists(n.email) RETURN n", Map.empty[String, Any], graph.session())

// then
result.toList should equal(List(Map("n" -> n), Map("n" -> m)))
Expand Down

0 comments on commit 4ca8d36

Please sign in to comment.