Skip to content

Commit

Permalink
Bolt should not use the GraphDatabaseService for executing Cypher que…
Browse files Browse the repository at this point in the history
…ries
  • Loading branch information
davidegrohmann committed Feb 26, 2016
1 parent dbb962f commit b3a8b8f
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 25 deletions.
Expand Up @@ -23,24 +23,22 @@

import org.neo4j.bolt.v1.runtime.spi.RecordStream;
import org.neo4j.bolt.v1.runtime.spi.StatementRunner;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.kernel.api.exceptions.KernelException;
import org.neo4j.kernel.impl.query.QueryExecutionEngine;
import org.neo4j.kernel.impl.query.QuerySession;

public class CypherStatementRunner implements StatementRunner
{
private final GraphDatabaseService db;
private final QueryExecutionEngine queryExecutionEngine;

public CypherStatementRunner( GraphDatabaseService db, QueryExecutionEngine queryExecutionEngine )
public CypherStatementRunner( QueryExecutionEngine queryExecutionEngine )
{
this.db = db;
this.queryExecutionEngine = queryExecutionEngine;
}

@Override
public RecordStream run( final SessionState ctx, final String statement,
final Map<String,Object> params ) throws KernelException
public RecordStream run( final SessionState ctx, final String statement, final Map<String,Object> params )
throws KernelException
{
// Temporary until we move parsing to cypher, or run a parser up here
if ( statement.equalsIgnoreCase( "begin" ) )
Expand All @@ -60,15 +58,24 @@ else if ( statement.equalsIgnoreCase( "rollback" ) )
}
else
{
if ( ctx.hasTransaction() || queryExecutionEngine.isPeriodicCommit( statement ) )
{
return new CypherAdapterStream( db.execute( statement, params ) );
}
else
// begin transaction if there is no open transaction,
// but avoid opening it for the special case of periodic commit
if ( !ctx.hasTransaction() && !queryExecutionEngine.isPeriodicCommit( statement ) )
{
ctx.beginImplicitTransaction();
return new CypherAdapterStream( db.execute( statement, params ) );
}

return new CypherAdapterStream(
queryExecutionEngine.executeQuery( statement, params, new BoltQuerySession() ) );
}
}

static class BoltQuerySession extends QuerySession
{
@Override
public String toString()
{
return "bolt";
}
}
}
Expand Up @@ -72,9 +72,9 @@ public void init() throws Throwable
@Override
public void start() throws Throwable
{
QueryExecutionEngine engine =
QueryExecutionEngine queryExecutionEngine =
gds.getDependencyResolver().resolveDependency( QueryExecutionEngine.class );
statementRunner = new CypherStatementRunner( gds, engine );
statementRunner = new CypherStatementRunner( queryExecutionEngine );
life.start();
}

Expand Down
Expand Up @@ -20,22 +20,23 @@
package org.neo4j.bolt.v1.runtime.internal;

import org.junit.Test;
import org.neo4j.graphdb.GraphDatabaseService;

import org.neo4j.bolt.v1.runtime.internal.CypherStatementRunner.BoltQuerySession;
import org.neo4j.graphdb.Result;
import org.neo4j.kernel.impl.query.QueryExecutionEngine;

import static java.util.Collections.EMPTY_MAP;

import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyMap;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

public class CypherStatementRunnerTest
{
private final GraphDatabaseService db = mock( GraphDatabaseService.class );
private final QueryExecutionEngine engine = mock( QueryExecutionEngine.class );
private final SessionStateMachine ctx = mock( SessionStateMachine.class );

Expand All @@ -44,11 +45,11 @@ public class CypherStatementRunnerTest
public void shouldCreateImplicitTxIfNoneExists() throws Exception
{
// Given
when( db.execute( anyString(), anyMap() ) ).thenReturn( mock( Result.class ) );
when( engine.executeQuery( anyString(), anyMap(), any( BoltQuerySession.class ) ) ).thenReturn( mock( Result.class ) );
when( engine.isPeriodicCommit( anyString() ) ).thenReturn( false );
when( ctx.hasTransaction() ).thenReturn( false );

CypherStatementRunner cypherRunner = new CypherStatementRunner( db, engine );
CypherStatementRunner cypherRunner = new CypherStatementRunner( engine );

// When
cypherRunner.run( ctx, "<query>", EMPTY_MAP );
Expand All @@ -57,28 +58,28 @@ public void shouldCreateImplicitTxIfNoneExists() throws Exception
verify( ctx ).hasTransaction();
verify( engine ).isPeriodicCommit( "<query>" );
verify( ctx ).beginImplicitTransaction();
verify( db ).execute( "<query>", EMPTY_MAP );
verifyNoMoreInteractions( db, engine, ctx );
verify( engine ).executeQuery( eq( "<query>" ), eq( EMPTY_MAP ), any( BoltQuerySession.class ) );
verifyNoMoreInteractions( engine, ctx );
}

@Test
@SuppressWarnings("unchecked")
public void shouldNotCreateImplicitTxIfUsingPeriodicCommit() throws Exception
{
// Given
when( db.execute( anyString(), anyMap() ) ).thenReturn( mock( Result.class ) );
when( engine.executeQuery( anyString(), anyMap(), any( BoltQuerySession.class ) ) ).thenReturn( mock( Result.class ) );
when( engine.isPeriodicCommit( anyString() ) ).thenReturn( true );
when( ctx.hasTransaction() ).thenReturn( false );

CypherStatementRunner cypherRunner = new CypherStatementRunner( db, engine );
CypherStatementRunner cypherRunner = new CypherStatementRunner( engine );

// When
cypherRunner.run( ctx, "<query>", EMPTY_MAP );

// Then
verify( ctx ).hasTransaction();
verify( engine ).isPeriodicCommit( "<query>" );
verify( db ).execute( "<query>", EMPTY_MAP );
verifyNoMoreInteractions( db, engine, ctx );
verify( engine ).executeQuery( eq( "<query>" ), eq( EMPTY_MAP ), any( BoltQuerySession.class ) );
verifyNoMoreInteractions( engine, ctx );
}
}

0 comments on commit b3a8b8f

Please sign in to comment.