Skip to content

Commit

Permalink
Make Transaction handling in rest api to mark txs implicit/explicit
Browse files Browse the repository at this point in the history
  • Loading branch information
davidegrohmann committed Feb 26, 2016
1 parent de525d8 commit 661ed73
Show file tree
Hide file tree
Showing 19 changed files with 178 additions and 272 deletions.
Expand Up @@ -19,9 +19,9 @@
*/ */
package org.neo4j.server.database; package org.neo4j.server.database;


import org.neo4j.kernel.impl.factory.CommunityFacadeFactory;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.factory.CommunityFacadeFactory;
import org.neo4j.kernel.impl.factory.GraphDatabaseFacade;
import org.neo4j.kernel.lifecycle.Lifecycle; import org.neo4j.kernel.lifecycle.Lifecycle;


public interface Database extends Lifecycle public interface Database extends Lifecycle
Expand All @@ -33,7 +33,7 @@ interface Factory


String getLocation(); String getLocation();


GraphDatabaseAPI getGraph(); GraphDatabaseFacade getGraph();


boolean isRunning(); boolean isRunning();
} }
Expand Up @@ -22,8 +22,8 @@
import java.io.File; import java.io.File;


import org.neo4j.graphdb.Result; import org.neo4j.graphdb.Result;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.factory.GraphDatabaseFacade;
import org.neo4j.kernel.impl.factory.GraphDatabaseFacadeFactory; import org.neo4j.kernel.impl.factory.GraphDatabaseFacadeFactory;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.server.configuration.ServerSettings; import org.neo4j.server.configuration.ServerSettings;
Expand All @@ -39,19 +39,12 @@ public class LifecycleManagingDatabase implements Database


public interface GraphFactory public interface GraphFactory
{ {
GraphDatabaseAPI newGraphDatabase( Config config, GraphDatabaseFacadeFactory.Dependencies dependencies ); GraphDatabaseFacade newGraphDatabase( Config config, GraphDatabaseFacadeFactory.Dependencies dependencies );
} }


public static Database.Factory lifecycleManagingDatabase( final GraphFactory graphDbFactory ) public static Database.Factory lifecycleManagingDatabase( final GraphFactory graphDbFactory )
{ {
return new Factory() return ( config, dependencies ) -> new LifecycleManagingDatabase( config, graphDbFactory, dependencies );
{
@Override
public Database newDatabase( Config config, GraphDatabaseFacadeFactory.Dependencies dependencies )
{
return new LifecycleManagingDatabase( config, graphDbFactory, dependencies );
}
};
} }


private final Config config; private final Config config;
Expand All @@ -60,7 +53,7 @@ public Database newDatabase( Config config, GraphDatabaseFacadeFactory.Dependenc
private final Log log; private final Log log;


private boolean isRunning = false; private boolean isRunning = false;
private GraphDatabaseAPI graph; private GraphDatabaseFacade graph;


public LifecycleManagingDatabase( Config config, GraphFactory dbFactory, public LifecycleManagingDatabase( Config config, GraphFactory dbFactory,
GraphDatabaseFacadeFactory.Dependencies dependencies ) GraphDatabaseFacadeFactory.Dependencies dependencies )
Expand All @@ -79,7 +72,7 @@ public String getLocation()
} }


@Override @Override
public GraphDatabaseAPI getGraph() public GraphDatabaseFacade getGraph()
{ {
return graph; return graph;
} }
Expand Down
Expand Up @@ -19,28 +19,14 @@
*/ */
package org.neo4j.server.database; package org.neo4j.server.database;


import org.neo4j.kernel.impl.factory.CommunityFacadeFactory; import org.neo4j.kernel.impl.factory.GraphDatabaseFacade;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.lifecycle.LifecycleAdapter; import org.neo4j.kernel.lifecycle.LifecycleAdapter;


public class WrappedDatabase extends LifecycleAdapter implements Database public class WrappedDatabase extends LifecycleAdapter implements Database
{ {
private final GraphDatabaseAPI graph; private final GraphDatabaseFacade graph;


public static Database.Factory wrappedDatabase( final GraphDatabaseAPI db ) public WrappedDatabase( GraphDatabaseFacade graph )
{
return new Factory()
{
@Override
public Database newDatabase( Config config, CommunityFacadeFactory.Dependencies dependencies)
{
return new WrappedDatabase( db );
}
};
}

public WrappedDatabase( GraphDatabaseAPI graph )
{ {
this.graph = graph; this.graph = graph;
try try
Expand All @@ -60,7 +46,7 @@ public String getLocation()
} }


@Override @Override
public GraphDatabaseAPI getGraph() public GraphDatabaseFacade getGraph()
{ {
return graph; return graph;
} }
Expand Down
Expand Up @@ -26,9 +26,10 @@
import org.neo4j.kernel.impl.query.QueryExecutionEngine; import org.neo4j.kernel.impl.query.QueryExecutionEngine;
import org.neo4j.logging.LogProvider; import org.neo4j.logging.LogProvider;
import org.neo4j.server.rest.transactional.error.TransactionLifecycleException; import org.neo4j.server.rest.transactional.error.TransactionLifecycleException;
import org.neo4j.server.rest.web.QuerySessionProvider;
import org.neo4j.server.rest.web.TransactionUriScheme; import org.neo4j.server.rest.web.TransactionUriScheme;


import static org.neo4j.server.rest.web.QuerySessionProvider.provider;

/** /**
* Transactional actions contains the business logic for executing statements against Neo4j across long-running * Transactional actions contains the business logic for executing statements against Neo4j across long-running
* transactions. * transactions.
Expand Down Expand Up @@ -65,9 +66,10 @@ public TransactionFacade( TransitionalPeriodTransactionMessContainer kernel, Que
this.logProvider = logProvider; this.logProvider = logProvider;
} }


public TransactionHandle newTransactionHandle( TransactionUriScheme uriScheme ) throws TransactionLifecycleException public TransactionHandle newTransactionHandle( TransactionUriScheme uriScheme, boolean implicitTransaction )
throws TransactionLifecycleException
{ {
return new TransactionHandle( kernel, engine, registry, uriScheme, logProvider, QuerySessionProvider.provider ); return new TransactionHandle( kernel, engine, registry, uriScheme, implicitTransaction, logProvider, provider );
} }


public TransactionHandle findTransactionHandle( long txId ) throws TransactionLifecycleException public TransactionHandle findTransactionHandle( long txId ) throws TransactionLifecycleException
Expand Down
Expand Up @@ -69,19 +69,21 @@ public class TransactionHandle implements TransactionTerminationHandle
private final QueryExecutionEngine engine; private final QueryExecutionEngine engine;
private final TransactionRegistry registry; private final TransactionRegistry registry;
private final TransactionUriScheme uriScheme; private final TransactionUriScheme uriScheme;
private final boolean implicitTransaction;
private final Log log; private final Log log;
private final long id; private final long id;
private final QuerySessionProvider sessionFactory; private final QuerySessionProvider sessionFactory;
private TransitionalTxManagementKernelTransaction context; private TransitionalTxManagementKernelTransaction context;


public TransactionHandle( TransitionalPeriodTransactionMessContainer txManagerFacade, QueryExecutionEngine engine, public TransactionHandle( TransitionalPeriodTransactionMessContainer txManagerFacade, QueryExecutionEngine engine,
TransactionRegistry registry, TransactionUriScheme uriScheme, LogProvider logProvider, TransactionRegistry registry, TransactionUriScheme uriScheme, boolean implicitTransaction,
QuerySessionProvider sessionFactory ) LogProvider logProvider, QuerySessionProvider sessionFactory )
{ {
this.txManagerFacade = txManagerFacade; this.txManagerFacade = txManagerFacade;
this.engine = engine; this.engine = engine;
this.registry = registry; this.registry = registry;
this.uriScheme = uriScheme; this.uriScheme = uriScheme;
this.implicitTransaction = implicitTransaction;
this.log = logProvider.getLog( getClass() ); this.log = logProvider.getLog( getClass() );
this.id = registry.begin( this ); this.id = registry.begin( this );
this.sessionFactory = sessionFactory; this.sessionFactory = sessionFactory;
Expand All @@ -92,6 +94,11 @@ public URI uri()
return uriScheme.txUri( id ); return uriScheme.txUri( id );
} }


public boolean isPristine()
{
return implicitTransaction;
}

public void execute( StatementDeserializer statements, ExecutionResultSerializer output, HttpServletRequest request ) public void execute( StatementDeserializer statements, ExecutionResultSerializer output, HttpServletRequest request )
{ {
List<Neo4jError> errors = new LinkedList<>(); List<Neo4jError> errors = new LinkedList<>();
Expand Down Expand Up @@ -122,14 +129,14 @@ public boolean terminate()
return true; return true;
} }


public void commit( StatementDeserializer statements, ExecutionResultSerializer output, boolean pristine, HttpServletRequest request ) public void commit( StatementDeserializer statements, ExecutionResultSerializer output, HttpServletRequest request )
{ {
List<Neo4jError> errors = new LinkedList<>(); List<Neo4jError> errors = new LinkedList<>();
try try
{ {
try try
{ {
StatementExecutionStrategy executionStrategy = selectExecutionStrategy( statements, pristine, errors ); StatementExecutionStrategy executionStrategy = selectExecutionStrategy( statements, errors );


switch ( executionStrategy ) { switch ( executionStrategy ) {
case EXECUTE_STATEMENT_USING_PERIODIC_COMMIT: case EXECUTE_STATEMENT_USING_PERIODIC_COMMIT:
Expand Down Expand Up @@ -167,43 +174,21 @@ public void commit( StatementDeserializer statements, ExecutionResultSerializer
} }
} }


private StatementExecutionStrategy selectExecutionStrategy( StatementDeserializer statements, boolean pristine, List<Neo4jError> errors ) private StatementExecutionStrategy selectExecutionStrategy( StatementDeserializer statements, List<Neo4jError> errors )
{ {
// PERIODIC COMMIT queries may only be used when directly committing a pristine (newly created) if ( implicitTransaction )
// transaction and when the first statement is an PERIODIC COMMIT statement.
//
// In that case we refrain from opening a transaction and leave management of
// transactions to Cypher. If there are any further statements they will all be
// executed in a separate transaction (Once you PERIODIC COMMIT all bets are off).
//
try
{ {
if ( pristine ) Statement peek = statements.peek();
if ( peek == null ) /* JSON parse error */
{ {
Statement peek = statements.peek(); return SKIP_EXECUTE_STATEMENT;
if ( peek == null ) /* JSON parse error */
{
return SKIP_EXECUTE_STATEMENT;
}
else if ( engine.isPeriodicCommit( peek.statement() ) )
{
return EXECUTE_STATEMENT_USING_PERIODIC_COMMIT;
}
else
{
return EXECUTE_STATEMENT;
}
} }
else else if ( engine.isPeriodicCommit( peek.statement() ) )
{ {
return EXECUTE_STATEMENT; return EXECUTE_STATEMENT_USING_PERIODIC_COMMIT;
} }
} }
catch ( CypherException e ) return EXECUTE_STATEMENT;
{
errors.add( new Neo4jError( e.status(), e ) );
throw e;
}
} }


public void rollback( ExecutionResultSerializer output ) public void rollback( ExecutionResultSerializer output )
Expand Down Expand Up @@ -237,7 +222,7 @@ private void ensureActiveTransaction() throws InternalBeginTransactionError
{ {
try try
{ {
context = txManagerFacade.newTransaction(); context = txManagerFacade.newTransaction( implicitTransaction );
} }
catch ( RuntimeException e ) catch ( RuntimeException e )
{ {
Expand Down
Expand Up @@ -20,28 +20,27 @@
package org.neo4j.server.rest.transactional; package org.neo4j.server.rest.transactional;


import org.neo4j.graphdb.Transaction; import org.neo4j.graphdb.Transaction;
import org.neo4j.kernel.internal.GraphDatabaseAPI; import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge; import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
import org.neo4j.kernel.impl.factory.GraphDatabaseFacade;


public class TransitionalPeriodTransactionMessContainer public class TransitionalPeriodTransactionMessContainer
{ {
private final GraphDatabaseAPI db; private final GraphDatabaseFacade db;
private final ThreadToStatementContextBridge txBridge; private final ThreadToStatementContextBridge txBridge;


public TransitionalPeriodTransactionMessContainer( GraphDatabaseAPI db ) public TransitionalPeriodTransactionMessContainer( GraphDatabaseFacade db )
{ {
this.db = db; this.db = db;
this.txBridge = db.getDependencyResolver().resolveDependency( ThreadToStatementContextBridge.class ); this.txBridge = db.getDependencyResolver().resolveDependency( ThreadToStatementContextBridge.class );
} }


public TransitionalTxManagementKernelTransaction newTransaction() public TransitionalTxManagementKernelTransaction newTransaction( boolean implicitTransaction )
{ {
Transaction tx = db.beginTx(); Transaction tx = db.beginTransaction( implicitTransaction
TransactionTerminator txInterruptor = new TransactionTerminator( tx ); ? KernelTransaction.Type.implicit

: KernelTransaction.Type.explicit );
// Get and use the TransactionContext created in db.beginTx(). The role of creating return new TransitionalTxManagementKernelTransaction( new TransactionTerminator( tx ), txBridge );
// TransactionContexts will be reversed soonish.
return new TransitionalTxManagementKernelTransaction( txInterruptor, txBridge );
} }


public ThreadToStatementContextBridge getBridge() public ThreadToStatementContextBridge getBridge()
Expand Down
Expand Up @@ -19,7 +19,6 @@
*/ */
package org.neo4j.server.rest.transactional; package org.neo4j.server.rest.transactional;


import org.neo4j.kernel.impl.coreapi.TopLevelTransaction;
import org.neo4j.kernel.api.KernelTransaction; import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.exceptions.TransactionFailureException; import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge; import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
Expand Down

0 comments on commit 661ed73

Please sign in to comment.