Skip to content

Commit

Permalink
Made PULL_ALL and DISCARD_ALL response return bookmark for auto t…
Browse files Browse the repository at this point in the history
…x only

On the other hand, bookmark will only return in response of `COMMIT` for explicit tx
  • Loading branch information
Zhen Li authored and lutovich committed Jul 26, 2018
1 parent c86b7cb commit e9791e0
Show file tree
Hide file tree
Showing 11 changed files with 359 additions and 198 deletions.
Expand Up @@ -38,7 +38,7 @@ public interface StatementProcessor


StatementMetadata run( String statement, MapValue params, Bookmark bookmark, Duration txTimeout, Map<String,Object> txMetaData ) throws KernelException; StatementMetadata run( String statement, MapValue params, Bookmark bookmark, Duration txTimeout, Map<String,Object> txMetaData ) throws KernelException;


void streamResult( ThrowingConsumer<BoltResult,Exception> resultConsumer ) throws Exception; Bookmark streamResult( ThrowingConsumer<BoltResult,Exception> resultConsumer ) throws Exception;


Bookmark commitTransaction() throws KernelException; Bookmark commitTransaction() throws KernelException;


Expand Down Expand Up @@ -84,7 +84,7 @@ public StatementMetadata run( String statement, MapValue params, Bookmark bookma
} }


@Override @Override
public void streamResult( ThrowingConsumer<BoltResult,Exception> resultConsumer ) throws Exception public Bookmark streamResult( ThrowingConsumer<BoltResult,Exception> resultConsumer ) throws Exception
{ {
throw new UnsupportedOperationException( "Unable to stream results" ); throw new UnsupportedOperationException( "Unable to stream results" );
} }
Expand Down
Expand Up @@ -117,14 +117,14 @@ public StatementMetadata run( String statement, MapValue params, Bookmark bookma
} }


@Override @Override
public void streamResult( ThrowingConsumer<BoltResult, Exception> resultConsumer ) throws Exception public Bookmark streamResult( ThrowingConsumer<BoltResult, Exception> resultConsumer ) throws Exception
{ {
before(); before();
try try
{ {
ensureNoPendingTerminationNotice(); ensureNoPendingTerminationNotice();


state.streamResult( ctx, resultConsumer ); return state.streamResult( ctx, spi, resultConsumer );
} }
finally finally
{ {
Expand Down Expand Up @@ -323,14 +323,16 @@ void execute( MutableTransactionState ctx, TransactionStateMachineSPI spi, Strin
} }


@Override @Override
void streamResult( MutableTransactionState ctx, ThrowingConsumer<BoltResult,Exception> resultConsumer ) throws Exception Bookmark streamResult( MutableTransactionState ctx, TransactionStateMachineSPI spi, ThrowingConsumer<BoltResult,Exception> resultConsumer )
throws Exception
{ {
assert ctx.currentResult != null; assert ctx.currentResult != null;


boolean success = false; boolean success = false;
try try
{ {
success = consumeResult( ctx, resultConsumer ); success = consumeResult( ctx, resultConsumer );
return newestBookmark( spi );
} }
finally finally
{ {
Expand Down Expand Up @@ -362,7 +364,7 @@ State beginTransaction( MutableTransactionState ctx, TransactionStateMachineSPI


@Override @Override
State run( MutableTransactionState ctx, TransactionStateMachineSPI spi, String statement, MapValue params, Bookmark bookmark, State run( MutableTransactionState ctx, TransactionStateMachineSPI spi, String statement, MapValue params, Bookmark bookmark,
Duration txTimeout, Map<String,Object> txMetadata ) Duration ignored1, Map<String,Object> ignored2 )
throws KernelException throws KernelException
{ {
if ( statement.isEmpty() ) if ( statement.isEmpty() )
Expand All @@ -389,11 +391,12 @@ State run( MutableTransactionState ctx, TransactionStateMachineSPI spi, String s
} }


@Override @Override
void streamResult( MutableTransactionState ctx, Bookmark streamResult( MutableTransactionState ctx, TransactionStateMachineSPI spi, ThrowingConsumer<BoltResult,Exception> resultConsumer )
ThrowingConsumer<BoltResult,Exception> resultConsumer ) throws Exception throws Exception
{ {
assert ctx.currentResult != null; assert ctx.currentResult != null;
consumeResult( ctx, resultConsumer ); consumeResult( ctx, resultConsumer );
return null; // Explict tx shall not get a bookmark in PULL_ALL or DISCARD_ALL
} }


@Override @Override
Expand Down Expand Up @@ -421,7 +424,8 @@ abstract State run( MutableTransactionState ctx, TransactionStateMachineSPI spi,
Duration txTimeout, Map<String,Object> txMetadata ) Duration txTimeout, Map<String,Object> txMetadata )
throws KernelException; throws KernelException;


abstract void streamResult( MutableTransactionState ctx, ThrowingConsumer<BoltResult,Exception> resultConsumer ) throws Exception; abstract Bookmark streamResult( MutableTransactionState ctx, TransactionStateMachineSPI spi, ThrowingConsumer<BoltResult,Exception> resultConsumer )
throws Exception;


abstract State commitTransaction( MutableTransactionState ctx, TransactionStateMachineSPI spi ) throws KernelException; abstract State commitTransaction( MutableTransactionState ctx, TransactionStateMachineSPI spi ) throws KernelException;


Expand Down
Expand Up @@ -25,12 +25,13 @@
import org.neo4j.bolt.runtime.BoltStateMachineSPI; import org.neo4j.bolt.runtime.BoltStateMachineSPI;
import org.neo4j.bolt.v1.runtime.BoltStateMachineV1; import org.neo4j.bolt.v1.runtime.BoltStateMachineV1;
import org.neo4j.bolt.v1.runtime.InterruptedState; import org.neo4j.bolt.v1.runtime.InterruptedState;
import org.neo4j.bolt.v3.runtime.FailedState;
import org.neo4j.bolt.v3.runtime.DefunctState; import org.neo4j.bolt.v3.runtime.DefunctState;
import org.neo4j.bolt.v3.runtime.ExtraMetaDataConnectedState; import org.neo4j.bolt.v3.runtime.ExtraMetaDataConnectedState;
import org.neo4j.bolt.v3.runtime.FailedState;
import org.neo4j.bolt.v3.runtime.ReadyState; import org.neo4j.bolt.v3.runtime.ReadyState;
import org.neo4j.bolt.v3.runtime.StreamingState; import org.neo4j.bolt.v3.runtime.StreamingState;
import org.neo4j.bolt.v3.runtime.TransactionReadyState; import org.neo4j.bolt.v3.runtime.TransactionReadyState;
import org.neo4j.bolt.v3.runtime.TransactionStreamingState;


public class BoltStateMachineV3 extends BoltStateMachineV1 public class BoltStateMachineV3 extends BoltStateMachineV1
{ {
Expand All @@ -49,7 +50,7 @@ protected States buildStates()
InterruptedState interrupted = new InterruptedState(); InterruptedState interrupted = new InterruptedState();
DefunctState defunct = new DefunctState(); DefunctState defunct = new DefunctState();
TransactionReadyState txReady = new TransactionReadyState(); TransactionReadyState txReady = new TransactionReadyState();
StreamingState txStreaming = new StreamingState(); TransactionStreamingState txStreaming = new TransactionStreamingState();


connected.setReadyState( ready ); connected.setReadyState( ready );
connected.setFailedState( defunct ); connected.setFailedState( defunct );
Expand Down
@@ -0,0 +1,89 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.bolt.v3.runtime;

import org.neo4j.bolt.messaging.RequestMessage;
import org.neo4j.bolt.runtime.BoltConnectionFatality;
import org.neo4j.bolt.runtime.BoltStateMachineState;
import org.neo4j.bolt.runtime.StateMachineContext;
import org.neo4j.bolt.v1.messaging.request.DiscardAllMessage;
import org.neo4j.bolt.v1.messaging.request.InterruptSignal;
import org.neo4j.bolt.v1.messaging.request.PullAllMessage;

import static org.neo4j.util.Preconditions.checkState;

/**
* When STREAMING, a result is available as a stream of records.
* These must be PULLed or DISCARDed before any further statements
* can be executed.
*/
public abstract class AbstractStreamingState extends FailSafeBoltStateMachineState
{
protected BoltStateMachineState readyState;
private BoltStateMachineState interruptedState;

@Override
public BoltStateMachineState process( RequestMessage message, StateMachineContext context ) throws BoltConnectionFatality
{
assertInitialized();
if ( message instanceof PullAllMessage )
{
return processPullAllMessage( context );
}
if ( message instanceof DiscardAllMessage )
{
return processDiscardAllMessage( context );
}
if ( message instanceof InterruptSignal )
{
return interruptedState;
}
return null;
}

public void setReadyState( BoltStateMachineState readyState )
{
this.readyState = readyState;
}

public void setInterruptedState( BoltStateMachineState interruptedState )
{
this.interruptedState = interruptedState;
}

private BoltStateMachineState processPullAllMessage( StateMachineContext context ) throws BoltConnectionFatality
{
return processMessage( context, () -> processStreamResultMessage( true, context ) );
}

private BoltStateMachineState processDiscardAllMessage( StateMachineContext context ) throws BoltConnectionFatality
{
return processMessage( context, () -> processStreamResultMessage( false, context ) );
}

abstract BoltStateMachineState processStreamResultMessage( boolean pull, StateMachineContext context ) throws Throwable;

private void assertInitialized()
{
checkState( readyState != null, "Ready state not set" );
checkState( interruptedState != null, "Interrupted state not set" );
checkState( failedState != null, "Failed state not set" );
}
}
Expand Up @@ -19,82 +19,27 @@
*/ */
package org.neo4j.bolt.v3.runtime; package org.neo4j.bolt.v3.runtime;


import org.neo4j.bolt.messaging.RequestMessage;
import org.neo4j.bolt.runtime.BoltConnectionFatality;
import org.neo4j.bolt.runtime.BoltStateMachineState; import org.neo4j.bolt.runtime.BoltStateMachineState;
import org.neo4j.bolt.runtime.StateMachineContext; import org.neo4j.bolt.runtime.StateMachineContext;
import org.neo4j.bolt.v1.messaging.request.DiscardAllMessage; import org.neo4j.bolt.v1.runtime.bookmarking.Bookmark;
import org.neo4j.bolt.v1.messaging.request.InterruptSignal;
import org.neo4j.bolt.v1.messaging.request.PullAllMessage;

import static org.neo4j.util.Preconditions.checkState;


/** /**
* When STREAMING, a result is available as a stream of records. * When STREAMING, additionally attach bookmark to PULL_ALL, DISCARD_ALL result
* These must be PULLed or DISCARDed before any further statements
* can be executed.
*/ */
public class StreamingState extends FailSafeBoltStateMachineState public class StreamingState extends AbstractStreamingState
{ {
private BoltStateMachineState readyState;
private BoltStateMachineState interruptedState;

@Override
public BoltStateMachineState process( RequestMessage message, StateMachineContext context ) throws BoltConnectionFatality
{
assertInitialized();
if ( message instanceof PullAllMessage )
{
return processPullAllMessage( context );
}
if ( message instanceof DiscardAllMessage )
{
return processDiscardAllMessage( context );
}
if ( message instanceof InterruptSignal )
{
return interruptedState;
}
return null;
}

@Override @Override
public String name() public String name()
{ {
return "STREAMING"; return "STREAMING";
} }


public void setReadyState( BoltStateMachineState readyState ) @Override
{ protected BoltStateMachineState processStreamResultMessage( boolean pull, StateMachineContext context ) throws Throwable
this.readyState = readyState;
}

public void setInterruptedState( BoltStateMachineState interruptedState )
{
this.interruptedState = interruptedState;
}

private BoltStateMachineState processPullAllMessage( StateMachineContext context ) throws BoltConnectionFatality
{
return processMessage( context, () -> processStreamResultMessage( true, context ) );
}

private BoltStateMachineState processDiscardAllMessage( StateMachineContext context ) throws BoltConnectionFatality
{
return processMessage( context, () -> processStreamResultMessage( false, context ) );
}

private BoltStateMachineState processStreamResultMessage( boolean pull, StateMachineContext context ) throws Throwable
{ {
context.connectionState().getStatementProcessor().streamResult( Bookmark bookmark = context.connectionState().getStatementProcessor().streamResult(
recordStream -> context.connectionState().getResponseHandler().onRecords( recordStream, pull ) ); recordStream -> context.connectionState().getResponseHandler().onRecords( recordStream, pull ) );
bookmark.attachTo( context.connectionState() );
return readyState; return readyState;
} }

private void assertInitialized()
{
checkState( readyState != null, "Ready state not set" );
checkState( interruptedState != null, "Interrupted state not set" );
checkState( failedState != null, "Failed state not set" );
}
} }
Expand Up @@ -101,11 +101,16 @@ private BoltStateMachineState processRunMessage( RunMessage message, StateMachin
} }


private BoltStateMachineState processCommitMessage( StateMachineContext context ) throws Exception private BoltStateMachineState processCommitMessage( StateMachineContext context ) throws Exception
{
appendBookmarkInResponse( context );
return readyState;
}

static void appendBookmarkInResponse( StateMachineContext context ) throws KernelException
{ {
StatementProcessor statementProcessor = context.connectionState().getStatementProcessor(); StatementProcessor statementProcessor = context.connectionState().getStatementProcessor();
Bookmark bookmark = statementProcessor.commitTransaction(); Bookmark bookmark = statementProcessor.commitTransaction();
bookmark.attachTo( context.connectionState() ); bookmark.attachTo( context.connectionState() );
return readyState;
} }


private BoltStateMachineState processRollbackMessage( StateMachineContext context ) throws Exception private BoltStateMachineState processRollbackMessage( StateMachineContext context ) throws Exception
Expand Down
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.bolt.v3.runtime;

import org.neo4j.bolt.runtime.BoltStateMachineState;
import org.neo4j.bolt.runtime.StateMachineContext;

public class TransactionStreamingState extends AbstractStreamingState
{
@Override
public String name()
{
return "TX_STREAMING";
}

protected BoltStateMachineState processStreamResultMessage( boolean pull, StateMachineContext context ) throws Throwable
{
context.connectionState().getStatementProcessor().streamResult(
recordStream -> context.connectionState().getResponseHandler().onRecords( recordStream, pull ) );
return readyState;
}
}
Expand Up @@ -42,7 +42,7 @@ class RunMessageTest
void shouldParseEmptyTransactionMetadataCorrectly() throws Throwable void shouldParseEmptyTransactionMetadataCorrectly() throws Throwable
{ {
// When // When
RunMessage message = new RunMessage("RETURN 1" ); RunMessage message = new RunMessage( "RETURN 1" );


// Then // Then
assertNull( message.transactionMetadata() ); assertNull( message.transactionMetadata() );
Expand Down

0 comments on commit e9791e0

Please sign in to comment.