Skip to content

Commit

Permalink
Make run methods wait for successful request acknowledgement
Browse files Browse the repository at this point in the history
This update makes `run` methods (session, tx) wait for successful request acknowledgement by the server. Specifically, the `Result` object will only be returned when there is a successful response from the server to the `RUN` message, meaning that the request was at least accepted by the server for processing. Otherwise, an appropriate error will be thrown.

Additionally, this update makes the `Result.keys` method non-blocking and free from potential communication errors.
  • Loading branch information
injectives committed Jun 1, 2021
1 parent 635e404 commit dfc7a49
Show file tree
Hide file tree
Showing 51 changed files with 634 additions and 900 deletions.
Expand Up @@ -39,7 +39,6 @@ public class InternalResult implements Result
{
private final Connection connection;
private final ResultCursor cursor;
private List<String> keys;

public InternalResult(Connection connection, ResultCursor cursor )
{
Expand All @@ -50,12 +49,7 @@ public InternalResult(Connection connection, ResultCursor cursor )
@Override
public List<String> keys()
{
if ( keys == null )
{
blockingGet( cursor.peekAsync() );
keys = cursor.keys();
}
return keys;
return cursor.keys();
}

@Override
Expand Down
Expand Up @@ -23,14 +23,14 @@
import org.neo4j.driver.AccessMode;
import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Query;
import org.neo4j.driver.Session;
import org.neo4j.driver.Result;
import org.neo4j.driver.Session;
import org.neo4j.driver.Transaction;
import org.neo4j.driver.TransactionConfig;
import org.neo4j.driver.TransactionWork;
import org.neo4j.driver.async.ResultCursor;
import org.neo4j.driver.internal.async.UnmanagedTransaction;
import org.neo4j.driver.internal.async.NetworkSession;
import org.neo4j.driver.internal.async.UnmanagedTransaction;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.util.Futures;

Expand Down Expand Up @@ -66,8 +66,8 @@ public Result run(String query, Map<String,Object> parameters, TransactionConfig
@Override
public Result run(Query query, TransactionConfig config )
{
ResultCursor cursor = Futures.blockingGet( session.runAsync(query, config, false ),
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while running query in session" ) );
ResultCursor cursor = Futures.blockingGet( session.runAsync( query, config ),
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while running query in session" ) );

// query executed, it is safe to obtain a connection in a blocking way
Connection connection = Futures.getNow( session.connectionAsync() );
Expand Down
Expand Up @@ -57,8 +57,8 @@ public void close()
@Override
public Result run(Query query)
{
ResultCursor cursor = Futures.blockingGet( tx.runAsync(query, false ),
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while running query in transaction" ) );
ResultCursor cursor = Futures.blockingGet( tx.runAsync( query ),
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while running query in transaction" ) );
return new InternalResult( tx.connection(), cursor );
}

Expand Down
Expand Up @@ -23,13 +23,13 @@
import java.util.concurrent.CompletionStage;

import org.neo4j.driver.AccessMode;
import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Query;
import org.neo4j.driver.TransactionConfig;
import org.neo4j.driver.async.AsyncSession;
import org.neo4j.driver.async.AsyncTransaction;
import org.neo4j.driver.async.AsyncTransactionWork;
import org.neo4j.driver.async.ResultCursor;
import org.neo4j.driver.Bookmark;
import org.neo4j.driver.internal.util.Futures;

import static java.util.Collections.emptyMap;
Expand Down Expand Up @@ -66,7 +66,7 @@ public CompletionStage<ResultCursor> runAsync(String query, Map<String,Object> p
@Override
public CompletionStage<ResultCursor> runAsync(Query query, TransactionConfig config )
{
return session.runAsync(query, config, true );
return session.runAsync( query, config );
}

@Override
Expand Down
Expand Up @@ -47,7 +47,7 @@ public CompletionStage<Void> rollbackAsync()
@Override
public CompletionStage<ResultCursor> runAsync(Query query)
{
return tx.runAsync(query, true );
return tx.runAsync( query );
}

public boolean isOpen()
Expand Down
Expand Up @@ -35,8 +35,8 @@
import org.neo4j.driver.internal.DatabaseName;
import org.neo4j.driver.internal.FailableCursor;
import org.neo4j.driver.internal.cursor.AsyncResultCursor;
import org.neo4j.driver.internal.cursor.RxResultCursor;
import org.neo4j.driver.internal.cursor.ResultCursorFactory;
import org.neo4j.driver.internal.cursor.RxResultCursor;
import org.neo4j.driver.internal.logging.PrefixedLogger;
import org.neo4j.driver.internal.retry.RetryLogic;
import org.neo4j.driver.internal.spi.Connection;
Expand Down Expand Up @@ -76,10 +76,10 @@ public NetworkSession( ConnectionProvider connectionProvider, RetryLogic retryLo
this.fetchSize = fetchSize;
}

public CompletionStage<ResultCursor> runAsync(Query query, TransactionConfig config, boolean waitForRunResponse )
public CompletionStage<ResultCursor> runAsync( Query query, TransactionConfig config )
{
CompletionStage<AsyncResultCursor> newResultCursorStage =
buildResultCursorFactory(query, config, waitForRunResponse ).thenCompose( ResultCursorFactory::asyncResult );
buildResultCursorFactory( query, config ).thenCompose( ResultCursorFactory::asyncResult );

resultCursorStage = newResultCursorStage.exceptionally( error -> null );
return newResultCursorStage.thenApply( cursor -> cursor ); // convert the return type
Expand All @@ -88,7 +88,7 @@ public CompletionStage<ResultCursor> runAsync(Query query, TransactionConfig con
public CompletionStage<RxResultCursor> runRx(Query query, TransactionConfig config )
{
CompletionStage<RxResultCursor> newResultCursorStage =
buildResultCursorFactory(query, config, true ).thenCompose( ResultCursorFactory::rxResult );
buildResultCursorFactory( query, config ).thenCompose( ResultCursorFactory::rxResult );

resultCursorStage = newResultCursorStage.exceptionally( error -> null );
return newResultCursorStage;
Expand Down Expand Up @@ -223,24 +223,27 @@ protected CompletionStage<Boolean> currentConnectionIsOpen()
connection.isOpen() ); // and it's still open
}

private CompletionStage<ResultCursorFactory> buildResultCursorFactory(Query query, TransactionConfig config, boolean waitForRunResponse )
private CompletionStage<ResultCursorFactory> buildResultCursorFactory( Query query, TransactionConfig config )
{
ensureSessionIsOpen();

return ensureNoOpenTxBeforeRunningQuery()
.thenCompose( ignore -> acquireConnection( mode ) )
.thenCompose( connection -> {
try
{
ResultCursorFactory factory = connection.protocol()
.runInAutoCommitTransaction( connection, query, bookmarkHolder, config, waitForRunResponse, fetchSize );
return completedFuture( factory );
}
catch ( Throwable e )
{
return Futures.failedFuture( e );
}
} );
.thenCompose(
connection ->
{
try
{
ResultCursorFactory factory = connection
.protocol()
.runInAutoCommitTransaction( connection, query, bookmarkHolder, config, fetchSize );
return completedFuture( factory );
}
catch ( Throwable e )
{
return Futures.failedFuture( e );
}
} );
}

private CompletionStage<Connection> acquireConnection( AccessMode mode )
Expand Down
Expand Up @@ -204,11 +204,11 @@ else if ( state.value == State.ROLLED_BACK )
}
}

public CompletionStage<ResultCursor> runAsync(Query query, boolean waitForRunResponse )
public CompletionStage<ResultCursor> runAsync( Query query )
{
ensureCanRunQueries();
CompletionStage<AsyncResultCursor> cursorStage =
protocol.runInUnmanagedTransaction( connection, query, this, waitForRunResponse, fetchSize ).asyncResult();
protocol.runInUnmanagedTransaction( connection, query, this, fetchSize ).asyncResult();
resultCursors.add( cursorStage );
return cursorStage.thenApply( cursor -> cursor );
}
Expand All @@ -217,7 +217,7 @@ public CompletionStage<RxResultCursor> runRx(Query query)
{
ensureCanRunQueries();
CompletionStage<RxResultCursor> cursorStage =
protocol.runInUnmanagedTransaction( connection, query, this, false, fetchSize ).rxResult();
protocol.runInUnmanagedTransaction( connection, query, this, fetchSize ).rxResult();
resultCursors.add( cursorStage );
return cursorStage;
}
Expand Down
Expand Up @@ -92,8 +92,8 @@ BookmarkHolder bookmarkHolder( Bookmark ignored )
CompletionStage<List<Record>> runProcedure(Connection connection, Query procedure, BookmarkHolder bookmarkHolder )
{
return connection.protocol()
.runInAutoCommitTransaction( connection, procedure, bookmarkHolder, TransactionConfig.empty(), true, UNLIMITED_FETCH_SIZE )
.asyncResult().thenCompose( ResultCursor::listAsync );
.runInAutoCommitTransaction( connection, procedure, bookmarkHolder, TransactionConfig.empty(), UNLIMITED_FETCH_SIZE )
.asyncResult().thenCompose( ResultCursor::listAsync );
}

private CompletionStage<List<Record>> releaseConnection( Connection connection, List<Record> records )
Expand Down
Expand Up @@ -18,6 +18,7 @@
*/
package org.neo4j.driver.internal.cursor;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import org.neo4j.driver.exceptions.ClientException;
Expand All @@ -28,7 +29,6 @@
import org.neo4j.driver.internal.util.Futures;

import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;

/**
* Used by Bolt V1, V2, V3
Expand All @@ -38,23 +38,24 @@ public class AsyncResultCursorOnlyFactory implements ResultCursorFactory
protected final Connection connection;
protected final Message runMessage;
protected final RunResponseHandler runHandler;
private final CompletableFuture<Void> runFuture;
protected final PullAllResponseHandler pullAllHandler;
private final boolean waitForRunResponse;

public AsyncResultCursorOnlyFactory(Connection connection, Message runMessage, RunResponseHandler runHandler,
PullAllResponseHandler pullHandler, boolean waitForRunResponse )
public AsyncResultCursorOnlyFactory( Connection connection, Message runMessage, RunResponseHandler runHandler, CompletableFuture<Void> runFuture,
PullAllResponseHandler pullHandler )
{
requireNonNull( connection );
requireNonNull( runMessage );
requireNonNull( runHandler );
requireNonNull( runFuture );
requireNonNull( pullHandler );

this.connection = connection;
this.runMessage = runMessage;
this.runHandler = runHandler;
this.runFuture = runFuture;

this.pullAllHandler = pullHandler;
this.waitForRunResponse = waitForRunResponse;
}

public CompletionStage<AsyncResultCursor> asyncResult()
Expand All @@ -63,16 +64,7 @@ public CompletionStage<AsyncResultCursor> asyncResult()
connection.write( runMessage, runHandler ); // queues the run message, will be flushed with pull message together
pullAllHandler.prePopulateRecords();

if ( waitForRunResponse )
{
// wait for response of RUN before proceeding
return runHandler.runFuture().thenApply( ignore ->
new DisposableAsyncResultCursor( new AsyncResultCursorImpl( runHandler, pullAllHandler ) ) );
}
else
{
return completedFuture( new DisposableAsyncResultCursor( new AsyncResultCursorImpl( runHandler, pullAllHandler ) ) );
}
return runFuture.thenApply( ignore -> new DisposableAsyncResultCursor( new AsyncResultCursorImpl( runHandler, pullAllHandler ) ) );
}

public CompletionStage<RxResultCursor> rxResult()
Expand Down
Expand Up @@ -18,6 +18,7 @@
*/
package org.neo4j.driver.internal.cursor;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import org.neo4j.driver.internal.handlers.PullAllResponseHandler;
Expand All @@ -27,7 +28,6 @@
import org.neo4j.driver.internal.spi.Connection;

import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;

/**
* Bolt V4
Expand All @@ -39,24 +39,25 @@ public class ResultCursorFactoryImpl implements ResultCursorFactory

private final PullResponseHandler pullHandler;
private final PullAllResponseHandler pullAllHandler;
private final boolean waitForRunResponse;
private final Message runMessage;
private final CompletableFuture<Void> runFuture;

public ResultCursorFactoryImpl(Connection connection, Message runMessage, RunResponseHandler runHandler, PullResponseHandler pullHandler,
PullAllResponseHandler pullAllHandler, boolean waitForRunResponse )
public ResultCursorFactoryImpl( Connection connection, Message runMessage, RunResponseHandler runHandler, CompletableFuture<Void> runFuture,
PullResponseHandler pullHandler, PullAllResponseHandler pullAllHandler )
{
requireNonNull( connection );
requireNonNull( runMessage );
requireNonNull( runHandler );
requireNonNull( runFuture );
requireNonNull( pullHandler );
requireNonNull( pullAllHandler );

this.connection = connection;
this.runMessage = runMessage;
this.runHandler = runHandler;
this.runFuture = runFuture;
this.pullHandler = pullHandler;
this.pullAllHandler = pullAllHandler;
this.waitForRunResponse = waitForRunResponse;
}

@Override
Expand All @@ -65,29 +66,13 @@ public CompletionStage<AsyncResultCursor> asyncResult()
// only write and flush messages when async result is wanted.
connection.write( runMessage, runHandler ); // queues the run message, will be flushed with pull message together
pullAllHandler.prePopulateRecords();

if ( waitForRunResponse )
{
// wait for response of RUN before proceeding
return runHandler.runFuture().thenApply(
ignore -> new DisposableAsyncResultCursor( new AsyncResultCursorImpl( runHandler, pullAllHandler ) ) );
}
else
{
return completedFuture( new DisposableAsyncResultCursor( new AsyncResultCursorImpl( runHandler, pullAllHandler ) ) );
}
return runFuture.thenApply( ignore -> new DisposableAsyncResultCursor( new AsyncResultCursorImpl( runHandler, pullAllHandler ) ) );
}

@Override
public CompletionStage<RxResultCursor> rxResult()
{
connection.writeAndFlush( runMessage, runHandler );
// we always wait for run reply
return runHandler.runFuture().thenApply( this::composeRxCursor );
}

private RxResultCursor composeRxCursor(Throwable runError )
{
return new RxResultCursorImpl( runError, runHandler, pullHandler );
return runFuture.handle( ( ignored, error ) -> new RxResultCursorImpl( error, runHandler, pullHandler ) );
}
}
Expand Up @@ -54,7 +54,6 @@ public RxResultCursorImpl(Throwable runError, RunResponseHandler runHandler, Pul
{
Objects.requireNonNull( runHandler );
Objects.requireNonNull( pullHandler );
assertRunResponseArrived( runHandler );

this.runResponseError = runError;
this.runHandler = runHandler;
Expand Down Expand Up @@ -160,14 +159,6 @@ else if ( summary != null )
} );
}

private void assertRunResponseArrived( RunResponseHandler runHandler )
{
if ( !runHandler.runFuture().isDone() )
{
throw new IllegalStateException( "Should wait for response of RUN before allowing PULL." );
}
}

enum RecordConsumerStatus
{
NOT_INSTALLED( false, false ),
Expand Down

0 comments on commit dfc7a49

Please sign in to comment.