Skip to content

Commit

Permalink
Fix periodic commit in server
Browse files Browse the repository at this point in the history
Periodic commit wasn't working in server due to the fact of nesting a
placebo transaction for executing the Cypher statement. This placebo
transaction was preventing periodic commit to actually commit in the
middle of the query.  This commit fixes that behaviour by executing
periodic commit in a toplevel transaction.
  • Loading branch information
davidegrohmann committed Apr 20, 2016
1 parent c410cfd commit 363daf4
Show file tree
Hide file tree
Showing 10 changed files with 121 additions and 79 deletions.
Expand Up @@ -77,7 +77,7 @@ public class TransactionHandle implements TransactionTerminationHandle
private final long id;
private TransitionalTxManagementKernelTransaction context;

public TransactionHandle( TransitionalPeriodTransactionMessContainer txManagerFacade, QueryExecutionEngine engine,
TransactionHandle( TransitionalPeriodTransactionMessContainer txManagerFacade, QueryExecutionEngine engine,
TransactionRegistry registry, TransactionUriScheme uriScheme, boolean implicitTransaction, AccessMode mode,
LogProvider logProvider )
{
Expand All @@ -86,7 +86,6 @@ public TransactionHandle( TransitionalPeriodTransactionMessContainer txManagerFa
this.registry = registry;
this.uriScheme = uriScheme;
this.type = implicitTransaction ? Type.implicit : Type.explicit;
;
this.mode = mode;
this.log = logProvider.getLog( getClass() );
this.id = registry.begin( this );
Expand Down Expand Up @@ -192,7 +191,7 @@ public void rollback( ExecutionResultSerializer output )
}
}

public void forceRollback() throws TransactionFailureException
void forceRollback() throws TransactionFailureException
{
context.resumeSinceTransactionsAreStillThreadBound();
context.rollback();
Expand Down Expand Up @@ -298,16 +297,22 @@ private void executeStatements( StatementDeserializer statements, ExecutionResul
Statement statement = statements.next();
try
{
if ( (statements.hasNext() || hasPrevious) && engine.isPeriodicCommit( statement.statement() ) )
boolean hasPeriodicCommit = engine.isPeriodicCommit( statement.statement() );
if ( (statements.hasNext() || hasPrevious) && hasPeriodicCommit )
{
throw new QueryExecutionKernelException(
new InvalidSemanticsException( "Cannot execute another statement after executing " +
"PERIODIC COMMIT statement in the same transaction" ) );
}

if ( !hasPrevious && hasPeriodicCommit )
{
context.closeTransactionForPeriodicCommit();
}

hasPrevious = true;
QuerySession querySession = txManagerFacade.create( engine.queryService(), type, mode, request );
Result result = engine.executeQuery( statement.statement(), statement.parameters(), querySession );
Result result = safelyExecute( statement, hasPeriodicCommit, querySession );
output.statementResult( result, statement.includeStats(), statement.resultDataContents() );
output.notifications( result.getNotifications() );
}
Expand All @@ -327,9 +332,10 @@ private void executeStatements( StatementDeserializer statements, ExecutionResul
}
catch ( Exception e )
{
if ( e.getCause() instanceof Status.HasStatus )
Throwable cause = e.getCause();
if ( cause instanceof Status.HasStatus )
{
errors.add( new Neo4jError( ((Status.HasStatus) e.getCause()).status(), e ) );
errors.add( new Neo4jError( ((Status.HasStatus) cause).status(), cause ) );
}
else
{
Expand All @@ -347,4 +353,20 @@ private void executeStatements( StatementDeserializer statements, ExecutionResul
errors.add( new Neo4jError( Status.General.UnknownError, e ) );
}
}

private Result safelyExecute( Statement statement, boolean hasPeriodicCommit, QuerySession querySession )
throws QueryExecutionKernelException
{
try
{
return engine.executeQuery( statement.statement(), statement.parameters(), querySession );
}
finally
{
if ( hasPeriodicCommit )
{
context.reopenAfterPeriodicCommit();
}
}
}
}

This file was deleted.

Expand Up @@ -75,7 +75,7 @@ else if ( o instanceof CypherService )
{
CypherService cypherService = (CypherService) o;

final Transaction transaction = database.getGraph().beginTransaction( KernelTransaction.Type.implicit, mode );
final Transaction transaction = database.getGraph().beginTransaction( KernelTransaction.Type.explicit, mode );

cypherService.getOutputFormat().setRepresentationWriteHandler( representationWriteHandler = new
CommitOnSuccessfulStatusCodeRepresentationWriteHandler( httpContext, transaction ) );
Expand Down
Expand Up @@ -21,7 +21,6 @@

import javax.servlet.http.HttpServletRequest;

import org.neo4j.graphdb.Transaction;
import org.neo4j.kernel.GraphDatabaseQueryService;
import org.neo4j.kernel.api.KernelTransaction.Type;
import org.neo4j.kernel.api.security.AccessMode;
Expand Down Expand Up @@ -49,8 +48,7 @@ public TransitionalPeriodTransactionMessContainer( GraphDatabaseFacade db )

public TransitionalTxManagementKernelTransaction newTransaction( Type type, AccessMode mode )
{
Transaction tx = db.beginTransaction( type, mode );
return new TransitionalTxManagementKernelTransaction( new TransactionTerminator( tx ), txBridge );
return new TransitionalTxManagementKernelTransaction( db, type, mode, txBridge );
}

public ThreadToStatementContextBridge getBridge()
Expand Down
Expand Up @@ -21,30 +21,39 @@

import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.api.security.AccessMode;
import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge;
import org.neo4j.kernel.impl.coreapi.InternalTransaction;
import org.neo4j.kernel.impl.factory.GraphDatabaseFacade;

class TransitionalTxManagementKernelTransaction
{
private final TransactionTerminator txTerminator;
private final GraphDatabaseFacade db;
private final KernelTransaction.Type type;
private final AccessMode mode;
private final ThreadToStatementContextBridge bridge;

private InternalTransaction tx;
private KernelTransaction suspendedTransaction;

TransitionalTxManagementKernelTransaction( TransactionTerminator txTerminator,
ThreadToStatementContextBridge bridge )
TransitionalTxManagementKernelTransaction( GraphDatabaseFacade db, KernelTransaction.Type type,
AccessMode mode, ThreadToStatementContextBridge bridge )
{
this.txTerminator = txTerminator;
this.db = db;
this.type = type;
this.mode = mode;
this.bridge = bridge;
this.tx = db.beginTransaction( type, mode );
}

public void suspendSinceTransactionsAreStillThreadBound()
void suspendSinceTransactionsAreStillThreadBound()
{
assert suspendedTransaction == null : "Can't suspend the transaction if it already is suspended.";
suspendedTransaction = bridge.getTopLevelTransactionBoundToThisThread( true );
bridge.unbindTransactionFromCurrentThread();
}

public void resumeSinceTransactionsAreStillThreadBound()
void resumeSinceTransactionsAreStillThreadBound()
{
assert suspendedTransaction != null : "Can't resume the transaction if it has not first been suspended.";
bridge.bindTransactionToCurrentThread( suspendedTransaction );
Expand All @@ -53,7 +62,7 @@ public void resumeSinceTransactionsAreStillThreadBound()

public void terminate()
{
txTerminator.terminate();
tx.terminate();
}

public void rollback()
Expand Down Expand Up @@ -91,4 +100,14 @@ public void commit()
bridge.unbindTransactionFromCurrentThread();
}
}

void closeTransactionForPeriodicCommit()
{
tx.close();
}

void reopenAfterPeriodicCommit()
{
tx = db.beginTransaction( type, mode );
}
}
Expand Up @@ -29,6 +29,7 @@
import javax.ws.rs.core.Response;

import org.neo4j.cypher.CypherException;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Result;
import org.neo4j.kernel.impl.query.QueryExecutionEngine;
import org.neo4j.kernel.impl.query.QuerySession;
Expand All @@ -38,6 +39,7 @@
import org.neo4j.server.rest.repr.InputFormat;
import org.neo4j.server.rest.repr.InvalidArgumentsException;
import org.neo4j.server.rest.repr.OutputFormat;
import org.neo4j.server.rest.transactional.CommitOnSuccessfulStatusCodeRepresentationWriteHandler;
import org.neo4j.udc.UsageData;

import static org.neo4j.udc.UsageDataKeys.Features.http_cypher_endpoint;
Expand All @@ -54,14 +56,16 @@ public class CypherService
private static final String INCLUDE_PLAN_PARAM = "includePlan";
private static final String PROFILE_PARAM = "profile";

private final GraphDatabaseService database;
private final CypherExecutor cypherExecutor;
private final UsageData usage;
private final OutputFormat output;
private final InputFormat input;

public CypherService( @Context CypherExecutor cypherExecutor, @Context InputFormat input,
@Context OutputFormat output, @Context UsageData usage )
public CypherService( @Context GraphDatabaseService database, @Context CypherExecutor cypherExecutor,
@Context InputFormat input, @Context OutputFormat output, @Context UsageData usage )
{
this.database = database;
this.cypherExecutor = cypherExecutor;
this.input = input;
this.output = output;
Expand Down Expand Up @@ -104,6 +108,14 @@ public Response cypher(String body,
try
{
QueryExecutionEngine executionEngine = cypherExecutor.getExecutionEngine();
boolean periodicCommitQuery = executionEngine.isPeriodicCommit( query );
CommitOnSuccessfulStatusCodeRepresentationWriteHandler handler =
(CommitOnSuccessfulStatusCodeRepresentationWriteHandler) output.getRepresentationWriteHandler();
if ( periodicCommitQuery )
{
handler.closeTransaction();
}

QuerySession querySession = cypherExecutor.createSession( request );

Result result;
Expand All @@ -118,6 +130,11 @@ public Response cypher(String body,
includePlan = result.getQueryExecutionType().requestedExecutionPlanDescription();
}

if ( periodicCommitQuery )
{
handler.setTransaction( database.beginTx() );
}

return output.ok( new CypherResultRepresentation( result, includeStats, includePlan ) );
}
catch ( Throwable e )
Expand Down
Expand Up @@ -748,9 +748,9 @@ public void execute( String url ) throws Exception
JsonNode errors = result.get(0).get("body").get("errors");

assertTrue( "Results not an array", results.isArray() );
assertTrue( "Results not empty", 0 == results.size() );
assertEquals( 0, results.size() );
assertTrue( "Errors not an array", errors.isArray() );
assertTrue("Didn't find exactly one error", 1 == errors.size());
assertEquals( 1, errors.size() );

String errorCode = errors.get(0).get("code").getTextValue();
assertEquals( "Neo.ClientError.Statement.SemanticError", errorCode );
Expand Down
Expand Up @@ -479,7 +479,8 @@ public void shouldInterruptTransaction() throws Exception
NullLogProvider.getInstance() );

ExecutionResultSerializer output = mock( ExecutionResultSerializer.class );
handle.execute( statements(), output, mock( HttpServletRequest.class ) );
Statement statement = new Statement( "MATCH (n) RETURN n", map(), false, (ResultDataContent[]) null );
handle.execute( statements( statement ), output, mock( HttpServletRequest.class ) );

// when
handle.terminate();
Expand Down
Expand Up @@ -19,22 +19,21 @@
*/
package org.neo4j.server.rest.transactional.integration;

import org.codehaus.jackson.JsonNode;
import org.junit.Test;

import java.io.File;
import java.io.FileOutputStream;
import java.io.PrintStream;

import org.codehaus.jackson.JsonNode;
import org.junit.Test;

import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.server.rest.AbstractRestFunctionalTestBase;
import org.neo4j.test.server.HTTP;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.*;

import static org.junit.Assert.assertTrue;
import static org.neo4j.kernel.api.exceptions.Status.Request.InvalidFormat;
import static org.neo4j.kernel.api.exceptions.Status.Statement.SyntaxError;
import static org.neo4j.server.rest.transactional.integration.TransactionMatchers.containsNoStackTraces;
Expand Down Expand Up @@ -90,7 +89,8 @@ public void begin__commit_with_malformed_json() throws Exception
public void begin_and_execute_periodic_commit_that_fails() throws Exception
{
File file = File.createTempFile("begin_and_execute_periodic_commit_that_fails", ".csv").getAbsoluteFile();
try {
try
{
PrintStream out = new PrintStream( new FileOutputStream( file ) );
out.println("1");
out.println("2");
Expand Down

0 comments on commit 363daf4

Please sign in to comment.