Skip to content

Commit

Permalink
Update handling of failed queries
Browse files Browse the repository at this point in the history
  • Loading branch information
injectives committed Jun 8, 2021
1 parent ac17aa5 commit 605194f
Show file tree
Hide file tree
Showing 22 changed files with 132 additions and 75 deletions.
Expand Up @@ -23,12 +23,12 @@
public interface FailableCursor
{
/**
* Discarding all unconsumed records and returning failure if there is any to run and/or pulls.
* Discarding all unconsumed records and returning failure if there is any pull errors.
*/
CompletionStage<Throwable> discardAllFailureAsync();

/**
* Pulling all unconsumed records into memory and returning failure if there is any to run and/or pulls.
* Pulling all unconsumed records into memory and returning failure if there is any pull errors.
*/
CompletionStage<Throwable> pullAllFailureAsync();
}
Expand Up @@ -18,6 +18,7 @@
*/
package org.neo4j.driver.internal.async;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -82,7 +83,10 @@ public CompletionStage<ResultCursor> runAsync( Query query, TransactionConfig co
buildResultCursorFactory( query, config ).thenCompose( ResultCursorFactory::asyncResult );

resultCursorStage = newResultCursorStage.exceptionally( error -> null );
return newResultCursorStage.thenApply( cursor -> cursor ); // convert the return type
return newResultCursorStage.thenCompose(
cursor -> cursor.runError()
.map( Futures::<ResultCursor>failedFuture )
.orElseGet( () -> CompletableFuture.completedFuture( cursor ) ) );
}

public CompletionStage<RxResultCursor> runRx(Query query, TransactionConfig config )
Expand Down
Expand Up @@ -18,7 +18,9 @@
*/
package org.neo4j.driver.internal.async;

import java.util.Arrays;
import java.util.EnumSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -210,7 +212,10 @@ public CompletionStage<ResultCursor> runAsync( Query query )
CompletionStage<AsyncResultCursor> cursorStage =
protocol.runInUnmanagedTransaction( connection, query, this, fetchSize ).asyncResult();
resultCursors.add( cursorStage );
return cursorStage.thenApply( cursor -> cursor );
return cursorStage.thenCompose(
cursor -> cursor.runError()
.map( Futures::<ResultCursor>failedFuture )
.orElseGet( () -> CompletableFuture.completedFuture( cursor ) ) );
}

public CompletionStage<RxResultCursor> runRx(Query query)
Expand All @@ -229,7 +234,29 @@ public boolean isOpen()

public void markTerminated( Throwable cause )
{
state = StateHolder.terminatedWith( cause );
if ( state.value == State.TERMINATED )
{
if ( state.causeOfTermination != null )
{
addSuppressedWhenNotCaptured( state.causeOfTermination, cause );
}
}
else
{
state = StateHolder.terminatedWith( cause );
}
}

private void addSuppressedWhenNotCaptured( Throwable currentCause, Throwable newCause )
{
if ( currentCause != newCause )
{
boolean noneMatch = Arrays.stream( currentCause.getSuppressed() ).noneMatch( suppressed -> suppressed == newCause );
if ( noneMatch )
{
currentCause.addSuppressed( newCause );
}
}
}

public Connection connection()
Expand Down
Expand Up @@ -18,9 +18,12 @@
*/
package org.neo4j.driver.internal.cursor;

import org.neo4j.driver.internal.FailableCursor;
import java.util.Optional;

import org.neo4j.driver.async.ResultCursor;
import org.neo4j.driver.internal.FailableCursor;

public interface AsyncResultCursor extends ResultCursor, FailableCursor
{
Optional<Throwable> runError();
}
Expand Up @@ -19,6 +19,7 @@
package org.neo4j.driver.internal.cursor;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
Expand All @@ -33,11 +34,13 @@

public class AsyncResultCursorImpl implements AsyncResultCursor
{
private final Throwable runError;
private final RunResponseHandler runHandler;
private final PullAllResponseHandler pullAllHandler;

public AsyncResultCursorImpl(RunResponseHandler runHandler, PullAllResponseHandler pullAllHandler )
public AsyncResultCursorImpl( Throwable runError, RunResponseHandler runHandler, PullAllResponseHandler pullAllHandler )
{
this.runError = runError;
this.runHandler = runHandler;
this.pullAllHandler = pullAllHandler;
}
Expand Down Expand Up @@ -113,13 +116,14 @@ public <T> CompletionStage<List<T>> listAsync( Function<Record,T> mapFunction )
@Override
public CompletionStage<Throwable> discardAllFailureAsync()
{
return consumeAsync().handle( ( summary, error ) -> error );
return consumeAsync().handle( ( summary, error ) -> runError != null && runError == Futures.completionExceptionCause( error ) ? null : error );
}

@Override
public CompletionStage<Throwable> pullAllFailureAsync()
{
return pullAllHandler.pullAllFailureAsync();
return pullAllHandler.pullAllFailureAsync()
.thenApply( error -> runError != null && runError == Futures.completionExceptionCause( error ) ? null : error );
}

private void internalForEachAsync( Consumer<Record> action, CompletableFuture<Void> resultFuture )
Expand Down Expand Up @@ -154,4 +158,10 @@ else if ( record != null )
}
} );
}

@Override
public Optional<Throwable> runError()
{
return Optional.ofNullable( runError );
}
}
Expand Up @@ -64,7 +64,7 @@ public CompletionStage<AsyncResultCursor> asyncResult()
connection.write( runMessage, runHandler ); // queues the run message, will be flushed with pull message together
pullAllHandler.prePopulateRecords();

return runFuture.thenApply( ignore -> new DisposableAsyncResultCursor( new AsyncResultCursorImpl( runHandler, pullAllHandler ) ) );
return runFuture.handle( ( ignored, error ) -> new DisposableAsyncResultCursor( new AsyncResultCursorImpl( error, runHandler, pullAllHandler ) ) );
}

public CompletionStage<RxResultCursor> rxResult()
Expand Down
Expand Up @@ -19,6 +19,7 @@
package org.neo4j.driver.internal.cursor;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
Expand Down Expand Up @@ -118,4 +119,10 @@ boolean isDisposed()
{
return this.isDisposed;
}

@Override
public Optional<Throwable> runError()
{
return this.delegate.runError();
}
}
Expand Up @@ -66,7 +66,7 @@ public CompletionStage<AsyncResultCursor> asyncResult()
// only write and flush messages when async result is wanted.
connection.write( runMessage, runHandler ); // queues the run message, will be flushed with pull message together
pullAllHandler.prePopulateRecords();
return runFuture.thenApply( ignore -> new DisposableAsyncResultCursor( new AsyncResultCursorImpl( runHandler, pullAllHandler ) ) );
return runFuture.handle( ( ignored, error ) -> new DisposableAsyncResultCursor( new AsyncResultCursorImpl( error, runHandler, pullAllHandler ) ) );
}

@Override
Expand Down
Expand Up @@ -66,16 +66,9 @@ public void onFailure( Throwable error )
{
tx.markTerminated( error );
}
else
else if ( error instanceof AuthorizationExpiredException )
{
if ( error instanceof AuthorizationExpiredException )
{
connection.terminateAndRelease( AuthorizationExpiredException.DESCRIPTION );
}
else
{
connection.release();
}
connection.terminateAndRelease( AuthorizationExpiredException.DESCRIPTION );
}
runFuture.completeExceptionally( error );
}
Expand Down
Expand Up @@ -45,9 +45,6 @@ public void afterFailure( Throwable error )
// always mark transaction as terminated because every error is "acknowledged" with a RESET message
// so database forgets about the transaction after the first error
// such transaction should not attempt to commit and can be considered as rolled back
if ( tx.isOpen() )
{
tx.markTerminated( error );
}
tx.markTerminated( error );
}
}
Expand Up @@ -278,7 +278,7 @@ void connectionUsedForTransactionReturnedToThePoolWhenTransactionFailsToCommitte
void connectionUsedForSessionRunReturnedToThePoolWhenSessionClose()
{
Session session = driver.session();
Result result = createNodes( 12, session );
createNodes( 12, session );

Connection connection1 = connectionPool.lastAcquiredConnectionSpy;
verify( connection1, never() ).release();
Expand Down
Expand Up @@ -31,13 +31,15 @@
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;

import org.neo4j.driver.AccessMode;
import org.neo4j.driver.AuthToken;
import org.neo4j.driver.AuthTokens;
import org.neo4j.driver.Config;
import org.neo4j.driver.Driver;
import org.neo4j.driver.GraphDatabase;
import org.neo4j.driver.Logging;
import org.neo4j.driver.Record;
import org.neo4j.driver.Session;
import org.neo4j.driver.TransactionWork;
Expand Down Expand Up @@ -104,7 +106,8 @@ void shouldHandleLeaderSwitchAndRetryWhenWritingInTxFunctionAsync() throws IOExc
StubServer writeServer = stubController.startStub( "not_able_to_write_server_tx_func_retries.script", 9007 );
URI uri = URI.create( "neo4j://127.0.0.1:9001" );

Driver driver = GraphDatabase.driver( uri, Config.builder().withMaxTransactionRetryTime( 1, TimeUnit.MILLISECONDS ).build() );
Driver driver = GraphDatabase
.driver( uri, Config.builder().withLogging( Logging.console( Level.FINE ) ).withMaxTransactionRetryTime( 1, TimeUnit.MILLISECONDS ).build() );
AsyncSession session = driver.asyncSession( builder().withDatabase( "mydatabase" ).build() );
List<String> names = Futures.blockingGet( session.writeTransactionAsync(
tx -> tx.runAsync( "RETURN 1" )
Expand Down
Expand Up @@ -373,7 +373,7 @@ private Result createResult(int numberOfRecords )
}
pullAllHandler.onSuccess( emptyMap() );

AsyncResultCursor cursor = new AsyncResultCursorImpl( runHandler, pullAllHandler );
AsyncResultCursor cursor = new AsyncResultCursorImpl( null, runHandler, pullAllHandler );
return new InternalResult( connection, new DisposableAsyncResultCursor( cursor ) );
}

Expand Down
Expand Up @@ -404,12 +404,12 @@ void shouldPropagateFailureInConsumeAsync()

private static AsyncResultCursorImpl newCursor(PullAllResponseHandler pullAllHandler )
{
return new AsyncResultCursorImpl( newRunResponseHandler(), pullAllHandler );
return new AsyncResultCursorImpl( null, newRunResponseHandler(), pullAllHandler );
}

private static AsyncResultCursorImpl newCursor(RunResponseHandler runHandler, PullAllResponseHandler pullAllHandler )
{
return new AsyncResultCursorImpl( runHandler, pullAllHandler );
return new AsyncResultCursorImpl( null, runHandler, pullAllHandler );
}

private static RunResponseHandler newRunResponseHandler()
Expand Down
Expand Up @@ -31,10 +31,11 @@
import org.neo4j.driver.internal.spi.Connection;

import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertThat;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
Expand All @@ -58,7 +59,7 @@ void shouldReturnAsyncResultWhenRunSucceeded()
}

@Test
void shouldFailAsyncResultWhenRunFailed()
void shouldReturnAsyncResultWithRunErrorWhenRunFailed()
{
// Given
Throwable error = new RuntimeException( "Hi there" );
Expand All @@ -68,8 +69,9 @@ void shouldFailAsyncResultWhenRunFailed()
CompletionStage<AsyncResultCursor> cursorFuture = cursorFactory.asyncResult();

// Then
CompletionException actual = assertThrows( CompletionException.class, () -> getNow( cursorFuture ) );
assertThat( actual.getCause(), equalTo( error ) );
AsyncResultCursor cursor = getNow( cursorFuture );
assertTrue( cursor.runError().isPresent() );
assertSame( error, cursor.runError().get() );
}

@Test
Expand Down
Expand Up @@ -21,7 +21,6 @@
import org.junit.jupiter.api.Test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;

import org.neo4j.driver.internal.handlers.PullAllResponseHandler;
Expand All @@ -30,10 +29,10 @@
import org.neo4j.driver.internal.messaging.Message;
import org.neo4j.driver.internal.spi.Connection;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
Expand All @@ -58,7 +57,7 @@ void shouldReturnAsyncResultWhenRunSucceeded()
}

@Test
void shouldFailAsyncResultWhenRunFailed()
void shouldReturnAsyncResultWithRunErrorWhenRunFailed()
{
// Given
Throwable error = new RuntimeException( "Hi there" );
Expand All @@ -68,8 +67,9 @@ void shouldFailAsyncResultWhenRunFailed()
CompletionStage<AsyncResultCursor> cursorFuture = cursorFactory.asyncResult();

// Then
CompletionException actual = assertThrows( CompletionException.class, () -> getNow( cursorFuture ) );
assertThat( actual.getCause(), equalTo( error ) );
AsyncResultCursor cursor = getNow( cursorFuture );
assertTrue( cursor.runError().isPresent() );
assertSame( error, cursor.runError().get() );
}

@Test
Expand Down
Expand Up @@ -154,7 +154,7 @@ void shouldMarkTxAndKeepConnectionAndFailOnFailure()
}

@Test
void shouldReleaseConnectionAndFailOnFailure()
void shouldNotReleaseConnectionAndFailOnFailure()
{
CompletableFuture<Void> runFuture = new CompletableFuture<>();
Connection connection = mock( Connection.class );
Expand All @@ -167,7 +167,7 @@ void shouldReleaseConnectionAndFailOnFailure()
assertTrue( runFuture.isCompletedExceptionally() );
Throwable actualException = assertThrows( Throwable.class, () -> await( runFuture ) );
assertSame( throwable, actualException );
verify( connection ).release();
verify( connection, never() ).release();
verify( connection, never() ).terminateAndRelease( any( String.class ) );
}

Expand Down

0 comments on commit 605194f

Please sign in to comment.