Skip to content

Commit

Permalink
Avoid generating consecutive failure messages
Browse files Browse the repository at this point in the history
  • Loading branch information
ali-ince committed May 18, 2018
1 parent 33c337f commit c33133c
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 20 deletions.
Expand Up @@ -110,11 +110,21 @@ private void after()
{ {
try try
{ {
if ( hasPendingError() ) if ( hasFailedOrIgnored() )
{ {
Neo4jError pendingError = ctx.pendingError; Neo4jError pendingError = ctx.pendingError;

if ( pendingError != null )
{
ctx.markFailed( pendingError );
}
else
{
ctx.markIgnored();
}

ctx.pendingError = null; ctx.pendingError = null;
ctx.markFailed( pendingError ); ctx.pendingIgnore = false;
} }


ctx.responseHandler.onFinish(); ctx.responseHandler.onFinish();
Expand All @@ -135,7 +145,7 @@ public void init( String userAgent, Map<String,Object> authToken,
before( handler ); before( handler );
try try
{ {
if ( !hasPendingError() ) if ( !hasFailedOrIgnored() )
{ {
state = state.init( this, userAgent, authToken ); state = state.init( this, userAgent, authToken );
} }
Expand All @@ -162,7 +172,7 @@ public void ackFailure( BoltResponseHandler handler ) throws BoltConnectionFatal
before( handler ); before( handler );
try try
{ {
if ( !hasPendingError() ) if ( !hasFailedOrIgnored() )
{ {
state = state.ackFailure( this ); state = state.ackFailure( this );
} }
Expand Down Expand Up @@ -192,7 +202,7 @@ public void reset( BoltResponseHandler handler ) throws BoltConnectionFatality
before( handler ); before( handler );
try try
{ {
if ( !hasPendingError() ) if ( !hasFailedOrIgnored() )
{ {
state = state.reset( this ); state = state.reset( this );
} }
Expand All @@ -217,7 +227,7 @@ public void run( String statement, MapValue params, BoltResponseHandler handler
before( handler ); before( handler );
try try
{ {
if ( !hasPendingError() ) if ( !hasFailedOrIgnored() )
{ {
state = state.run( this, statement, params ); state = state.run( this, statement, params );
handler.onMetadata( "result_available_after", Values.longValue( clock.millis() - start ) ); handler.onMetadata( "result_available_after", Values.longValue( clock.millis() - start ) );
Expand All @@ -239,7 +249,7 @@ public void discardAll( BoltResponseHandler handler ) throws BoltConnectionFatal
before( handler ); before( handler );
try try
{ {
if ( !hasPendingError() ) if ( !hasFailedOrIgnored() )
{ {
state = state.discardAll( this ); state = state.discardAll( this );
} }
Expand All @@ -259,7 +269,7 @@ public void pullAll( BoltResponseHandler handler ) throws BoltConnectionFatality
before( handler ); before( handler );
try try
{ {
if ( !hasPendingError() ) if ( !hasFailedOrIgnored() )
{ {
state = state.pullAll( this ); state = state.pullAll( this );
} }
Expand All @@ -276,9 +286,9 @@ public void markFailed( Neo4jError error )
state = State.FAILED; state = State.FAILED;
} }


private boolean hasPendingError() private boolean hasFailedOrIgnored()
{ {
return ctx.pendingError != null; return ctx.pendingError != null || ctx.pendingIgnore;
} }


/** A session id that is unique for this database instance */ /** A session id that is unique for this database instance */
Expand Down Expand Up @@ -321,8 +331,11 @@ public void externalError( Neo4jError error, BoltResponseHandler handler ) throw
before( handler ); before( handler );
try try
{ {
fail( this, error ); if ( !hasFailedOrIgnored() )
this.state = State.FAILED; {
fail( this, error );
this.state = State.FAILED;
}
} }
finally finally
{ {
Expand Down Expand Up @@ -746,7 +759,14 @@ private static State handleFailure( BoltStateMachine machine, Throwable t, Neo4j
private static void fail( BoltStateMachine machine, Neo4jError neo4jError ) private static void fail( BoltStateMachine machine, Neo4jError neo4jError )
{ {
machine.spi.reportError( neo4jError ); machine.spi.reportError( neo4jError );
machine.ctx.markFailed( neo4jError ); if ( machine.state == State.FAILED )
{
machine.ctx.markIgnored();
}
else
{
machine.ctx.markFailed( neo4jError );
}
} }


private void reset() private void reset()
Expand Down Expand Up @@ -774,6 +794,8 @@ static class MutableConnectionState implements BoltResponseHandler


Neo4jError pendingError; Neo4jError pendingError;


boolean pendingIgnore;

/** /**
* This is incremented each time {@link #interrupt()} is called, * This is incremented each time {@link #interrupt()} is called,
* and decremented each time a {@link BoltStateMachine#reset(BoltResponseHandler)} message * and decremented each time a {@link BoltStateMachine#reset(BoltResponseHandler)} message
Expand Down Expand Up @@ -843,6 +865,10 @@ public void markIgnored()
{ {
responseHandler.markIgnored(); responseHandler.markIgnored();
} }
else
{
pendingIgnore = true;
}
} }


@Override @Override
Expand Down
Expand Up @@ -39,6 +39,7 @@
import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
Expand Down Expand Up @@ -579,37 +580,113 @@ public void shouldInvokeResponseHandlerOnNextExternalErrorMessageOnMarkFailedIfN
} }


@Test @Test
public void shouldInvokeResponseHandlerOnMarkFailedIfThereIsHandler() public void shouldSetPendingIgnoreOnMarkFailedIfAlreadyFailedAndNoHandler() throws Exception
{ {
BoltStateMachineSPI spi = mock( BoltStateMachineSPI.class ); BoltStateMachine machine = newMachine( BoltStateMachine.State.FAILED );
BoltChannel boltChannel = mock( BoltChannel.class );
BoltStateMachine machine = new BoltStateMachine( spi, boltChannel, Clock.systemUTC(), NullLogService.getInstance() ); Neo4jError error = Neo4jError.from( Status.Request.NoThreadsAvailable, "no threads" );
machine.markFailed( error );

assertTrue( machine.ctx.pendingIgnore );
assertEquals( null, machine.ctx.pendingError );
assertEquals( BoltStateMachine.State.FAILED, machine.state );
}

@Test
public void shouldInvokeResponseHandlerOnNextInitMessageOnMarkFailedIfAlreadyFailedAndNoHandler() throws Exception
{
testMarkFailedShouldYieldIgnoredIfAlreadyFailed( ( machine, handler ) -> machine.init( "Test/1.0", Collections.emptyMap(), handler ) );
}

@Test
public void shouldInvokeResponseHandlerOnNextRunMessageOnMarkFailedIfAlreadyFailedAndNoHandler() throws Exception
{
testMarkFailedShouldYieldIgnoredIfAlreadyFailed(
( machine, handler ) -> machine.run( "RETURN 1", ValueUtils.asMapValue( Collections.emptyMap() ), handler ) );
}

@Test
public void shouldInvokeResponseHandlerOnNextPullAllMessageOnMarkFailedIfAlreadyFailedAndNoHandler() throws Exception
{
testMarkFailedShouldYieldIgnoredIfAlreadyFailed( ( machine, handler ) -> machine.pullAll( handler ) );
}

@Test
public void shouldInvokeResponseHandlerOnNextDiscardAllMessageOnMarkFailedIfAlreadyFailedAndNoHandler() throws Exception
{
testMarkFailedShouldYieldIgnoredIfAlreadyFailed( ( machine, handler ) -> machine.discardAll( handler ) );
}

@Test
public void shouldInvokeResponseHandlerOnNextResetMessageOnMarkFailedIfAlreadyFailedAndNoHandler() throws Exception
{
testMarkFailedShouldYieldIgnoredIfAlreadyFailed( ( machine, handler ) -> machine.reset( handler ) );
}

@Test
public void shouldInvokeResponseHandlerOnNextAckFailureMessageOnMarkFailedIfAlreadyFailedAndNoHandler() throws Exception
{
testMarkFailedShouldYieldIgnoredIfAlreadyFailed( ( machine, handler ) -> machine.ackFailure( handler ) );
}

@Test
public void shouldInvokeResponseHandlerOnNextExternalErrorMessageOnMarkFailedIfAlreadyFailedAndNoHandler() throws Exception
{
testMarkFailedShouldYieldIgnoredIfAlreadyFailed(
( machine, handler ) -> machine.externalError( Neo4jError.from( Status.Request.Invalid, "invalid" ), handler ) );
}

@Test
public void shouldInvokeResponseHandlerOnMarkFailedIfThereIsHandler() throws Exception
{
BoltStateMachine machine = newMachine( BoltStateMachine.State.READY );
Neo4jError error = Neo4jError.from( Status.Request.NoThreadsAvailable, "no threads" ); Neo4jError error = Neo4jError.from( Status.Request.NoThreadsAvailable, "no threads" );


machine.ctx.responseHandler = mock( BoltResponseHandler.class ); machine.ctx.responseHandler = mock( BoltResponseHandler.class );
machine.markFailed( error ); machine.markFailed( error );


assertNull( machine.ctx.pendingError ); assertNull( machine.ctx.pendingError );
assertFalse( machine.ctx.pendingIgnore );
assertEquals( BoltStateMachine.State.FAILED, machine.state ); assertEquals( BoltStateMachine.State.FAILED, machine.state );
verify( machine.ctx.responseHandler ).markFailed( error ); verify( machine.ctx.responseHandler ).markFailed( error );
} }


private static void testMarkFailedOnNextMessage( ThrowingBiConsumer<BoltStateMachine,BoltResponseHandler,BoltConnectionFatality> action ) throws Exception private static void testMarkFailedOnNextMessage( ThrowingBiConsumer<BoltStateMachine,BoltResponseHandler,BoltConnectionFatality> action ) throws Exception
{ {
// Given // Given
BoltStateMachineSPI spi = mock( BoltStateMachineSPI.class ); BoltStateMachine machine = newMachine( BoltStateMachine.State.READY );
BoltChannel boltChannel = mock( BoltChannel.class );
BoltStateMachine machine = new BoltStateMachine( spi, boltChannel, Clock.systemUTC(), NullLogService.getInstance() );
BoltResponseHandler responseHandler = mock( BoltResponseHandler.class ); BoltResponseHandler responseHandler = mock( BoltResponseHandler.class );

Neo4jError error = Neo4jError.from( Status.Request.NoThreadsAvailable, "no threads" ); Neo4jError error = Neo4jError.from( Status.Request.NoThreadsAvailable, "no threads" );
machine.markFailed( error ); machine.markFailed( error );


// When // When
action.accept( machine, responseHandler ); action.accept( machine, responseHandler );


// Expect // Expect
assertNull( machine.ctx.pendingError );
assertFalse( machine.ctx.pendingIgnore );
assertEquals( BoltStateMachine.State.FAILED, machine.state ); assertEquals( BoltStateMachine.State.FAILED, machine.state );
verify( responseHandler ).markFailed( error ); verify( responseHandler ).markFailed( error );
} }


private static void testMarkFailedShouldYieldIgnoredIfAlreadyFailed(
ThrowingBiConsumer<BoltStateMachine,BoltResponseHandler,BoltConnectionFatality> action ) throws Exception
{
// Given
BoltStateMachine machine = newMachine( BoltStateMachine.State.FAILED );
BoltResponseHandler responseHandler = mock( BoltResponseHandler.class );

Neo4jError error = Neo4jError.from( Status.Request.NoThreadsAvailable, "no threads" );
machine.markFailed( error );

// When
action.accept( machine, responseHandler );

// Expect
assertNull( machine.ctx.pendingError );
assertFalse( machine.ctx.pendingIgnore );
assertEquals( BoltStateMachine.State.FAILED, machine.state );
verify( responseHandler ).markIgnored();
}
} }

0 comments on commit c33133c

Please sign in to comment.