Skip to content

Commit

Permalink
Ensure ackFailure always gets processed
Browse files Browse the repository at this point in the history
  • Loading branch information
ali-ince committed May 18, 2018
1 parent c33133c commit 2eacd17
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 21 deletions.
Expand Up @@ -110,7 +110,7 @@ private void after()
{
try
{
if ( hasFailedOrIgnored() )
if ( ctx.hasFailedOrIgnored() )
{
Neo4jError pendingError = ctx.pendingError;

Expand All @@ -123,8 +123,7 @@ private void after()
ctx.markIgnored();
}

ctx.pendingError = null;
ctx.pendingIgnore = false;
ctx.resetPendingFailedAndIgnored();
}

ctx.responseHandler.onFinish();
Expand All @@ -145,7 +144,7 @@ public void init( String userAgent, Map<String,Object> authToken,
before( handler );
try
{
if ( !hasFailedOrIgnored() )
if ( !ctx.hasFailedOrIgnored() )
{
state = state.init( this, userAgent, authToken );
}
Expand All @@ -172,10 +171,7 @@ public void ackFailure( BoltResponseHandler handler ) throws BoltConnectionFatal
before( handler );
try
{
if ( !hasFailedOrIgnored() )
{
state = state.ackFailure( this );
}
state = state.ackFailure( this );
}
finally
{
Expand All @@ -202,7 +198,7 @@ public void reset( BoltResponseHandler handler ) throws BoltConnectionFatality
before( handler );
try
{
if ( !hasFailedOrIgnored() )
if ( !ctx.hasFailedOrIgnored() )
{
state = state.reset( this );
}
Expand All @@ -227,7 +223,7 @@ public void run( String statement, MapValue params, BoltResponseHandler handler
before( handler );
try
{
if ( !hasFailedOrIgnored() )
if ( !ctx.hasFailedOrIgnored() )
{
state = state.run( this, statement, params );
handler.onMetadata( "result_available_after", Values.longValue( clock.millis() - start ) );
Expand All @@ -249,7 +245,7 @@ public void discardAll( BoltResponseHandler handler ) throws BoltConnectionFatal
before( handler );
try
{
if ( !hasFailedOrIgnored() )
if ( !ctx.hasFailedOrIgnored() )
{
state = state.discardAll( this );
}
Expand All @@ -269,7 +265,7 @@ public void pullAll( BoltResponseHandler handler ) throws BoltConnectionFatality
before( handler );
try
{
if ( !hasFailedOrIgnored() )
if ( !ctx.hasFailedOrIgnored() )
{
state = state.pullAll( this );
}
Expand All @@ -286,11 +282,6 @@ public void markFailed( Neo4jError error )
state = State.FAILED;
}

private boolean hasFailedOrIgnored()
{
return ctx.pendingError != null || ctx.pendingIgnore;
}

/** A session id that is unique for this database instance */
public String key()
{
Expand Down Expand Up @@ -331,7 +322,7 @@ public void externalError( Neo4jError error, BoltResponseHandler handler ) throw
before( handler );
try
{
if ( !hasFailedOrIgnored() )
if ( !ctx.hasFailedOrIgnored() )
{
fail( this, error );
this.state = State.FAILED;
Expand Down Expand Up @@ -577,6 +568,7 @@ public State reset( BoltStateMachine machine ) throws BoltConnectionFatality
@Override
public State ackFailure( BoltStateMachine machine )
{
machine.ctx.resetPendingFailedAndIgnored();
return READY;
}

Expand Down Expand Up @@ -893,6 +885,17 @@ public void onFinish()
}
}

private boolean hasFailedOrIgnored()
{
return pendingError != null || pendingIgnore;
}

private void resetPendingFailedAndIgnored()
{
pendingError = null;
pendingIgnore = false;
}

}

public interface SPI
Expand Down
Expand Up @@ -48,6 +48,7 @@
import static org.mockito.Mockito.RETURNS_MOCKS;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.neo4j.bolt.testing.BoltMatchers.canReset;
Expand Down Expand Up @@ -568,9 +569,24 @@ public void shouldInvokeResponseHandlerOnNextResetMessageOnMarkFailedIfNoHandler
}

@Test
public void shouldInvokeResponseHandlerOnNextAckFailureMessageOnMarkFailedIfNoHandler() throws Exception
public void shouldGotoReadyStateOnNextAckFailureMessageOnMarkFailedIfNoHandler() throws Exception
{
testMarkFailedOnNextMessage( ( machine, handler ) -> machine.ackFailure( handler ) );
// Given
BoltStateMachine machine = newMachine( BoltStateMachine.State.READY );
BoltResponseHandler responseHandler = mock( BoltResponseHandler.class );

Neo4jError error = Neo4jError.from( Status.Request.NoThreadsAvailable, "no threads" );
machine.markFailed( error );

// When
machine.ackFailure( responseHandler );

// Expect
assertNull( machine.ctx.pendingError );
assertFalse( machine.ctx.pendingIgnore );
assertEquals( BoltStateMachine.State.READY, machine.state );
verify( responseHandler, never() ).markFailed( any() );
verify( responseHandler, never() ).markIgnored();
}

@Test
Expand Down Expand Up @@ -626,7 +642,22 @@ public void shouldInvokeResponseHandlerOnNextResetMessageOnMarkFailedIfAlreadyFa
@Test
public void shouldInvokeResponseHandlerOnNextAckFailureMessageOnMarkFailedIfAlreadyFailedAndNoHandler() throws Exception
{
testMarkFailedShouldYieldIgnoredIfAlreadyFailed( ( machine, handler ) -> machine.ackFailure( handler ) );
// Given
BoltStateMachine machine = newMachine( BoltStateMachine.State.FAILED );
BoltResponseHandler responseHandler = mock( BoltResponseHandler.class );

Neo4jError error = Neo4jError.from( Status.Request.NoThreadsAvailable, "no threads" );
machine.markFailed( error );

// When
machine.ackFailure( responseHandler );

// Expect
assertNull( machine.ctx.pendingError );
assertFalse( machine.ctx.pendingIgnore );
assertEquals( BoltStateMachine.State.READY, machine.state );
verify( responseHandler, never() ).markIgnored();
verify( responseHandler, never() ).markFailed( any() );
}

@Test
Expand Down

0 comments on commit 2eacd17

Please sign in to comment.