From c33133cd5d266a78c57aa96302988cc46b28abe9 Mon Sep 17 00:00:00 2001 From: Ali Ince Date: Tue, 8 May 2018 22:59:12 +0100 Subject: [PATCH] Avoid generating consecutive failure messages --- .../bolt/v1/runtime/BoltStateMachine.java | 52 ++++++++--- .../bolt/v1/runtime/BoltStateMachineTest.java | 91 +++++++++++++++++-- 2 files changed, 123 insertions(+), 20 deletions(-) diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/BoltStateMachine.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/BoltStateMachine.java index 755e7f3eaa1f3..ae3ea6bfc2f47 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/BoltStateMachine.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/BoltStateMachine.java @@ -110,11 +110,21 @@ private void after() { try { - if ( hasPendingError() ) + if ( hasFailedOrIgnored() ) { Neo4jError pendingError = ctx.pendingError; + + if ( pendingError != null ) + { + ctx.markFailed( pendingError ); + } + else + { + ctx.markIgnored(); + } + ctx.pendingError = null; - ctx.markFailed( pendingError ); + ctx.pendingIgnore = false; } ctx.responseHandler.onFinish(); @@ -135,7 +145,7 @@ public void init( String userAgent, Map authToken, before( handler ); try { - if ( !hasPendingError() ) + if ( !hasFailedOrIgnored() ) { state = state.init( this, userAgent, authToken ); } @@ -162,7 +172,7 @@ public void ackFailure( BoltResponseHandler handler ) throws BoltConnectionFatal before( handler ); try { - if ( !hasPendingError() ) + if ( !hasFailedOrIgnored() ) { state = state.ackFailure( this ); } @@ -192,7 +202,7 @@ public void reset( BoltResponseHandler handler ) throws BoltConnectionFatality before( handler ); try { - if ( !hasPendingError() ) + if ( !hasFailedOrIgnored() ) { state = state.reset( this ); } @@ -217,7 +227,7 @@ public void run( String statement, MapValue params, BoltResponseHandler handler before( handler ); try { - if ( !hasPendingError() ) + if ( !hasFailedOrIgnored() ) { state = state.run( this, statement, params ); handler.onMetadata( "result_available_after", Values.longValue( clock.millis() - start ) ); @@ -239,7 +249,7 @@ public void discardAll( BoltResponseHandler handler ) throws BoltConnectionFatal before( handler ); try { - if ( !hasPendingError() ) + if ( !hasFailedOrIgnored() ) { state = state.discardAll( this ); } @@ -259,7 +269,7 @@ public void pullAll( BoltResponseHandler handler ) throws BoltConnectionFatality before( handler ); try { - if ( !hasPendingError() ) + if ( !hasFailedOrIgnored() ) { state = state.pullAll( this ); } @@ -276,9 +286,9 @@ public void markFailed( Neo4jError error ) state = State.FAILED; } - private boolean hasPendingError() + private boolean hasFailedOrIgnored() { - return ctx.pendingError != null; + return ctx.pendingError != null || ctx.pendingIgnore; } /** A session id that is unique for this database instance */ @@ -321,8 +331,11 @@ public void externalError( Neo4jError error, BoltResponseHandler handler ) throw before( handler ); try { - fail( this, error ); - this.state = State.FAILED; + if ( !hasFailedOrIgnored() ) + { + fail( this, error ); + this.state = State.FAILED; + } } finally { @@ -746,7 +759,14 @@ private static State handleFailure( BoltStateMachine machine, Throwable t, Neo4j private static void fail( BoltStateMachine machine, Neo4jError neo4jError ) { machine.spi.reportError( neo4jError ); - machine.ctx.markFailed( neo4jError ); + if ( machine.state == State.FAILED ) + { + machine.ctx.markIgnored(); + } + else + { + machine.ctx.markFailed( neo4jError ); + } } private void reset() @@ -774,6 +794,8 @@ static class MutableConnectionState implements BoltResponseHandler Neo4jError pendingError; + boolean pendingIgnore; + /** * This is incremented each time {@link #interrupt()} is called, * and decremented each time a {@link BoltStateMachine#reset(BoltResponseHandler)} message @@ -843,6 +865,10 @@ public void markIgnored() { responseHandler.markIgnored(); } + else + { + pendingIgnore = true; + } } @Override diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/BoltStateMachineTest.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/BoltStateMachineTest.java index cc54e10da4134..dc2f62a987b03 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/BoltStateMachineTest.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/BoltStateMachineTest.java @@ -39,6 +39,7 @@ import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -579,17 +580,73 @@ public void shouldInvokeResponseHandlerOnNextExternalErrorMessageOnMarkFailedIfN } @Test - public void shouldInvokeResponseHandlerOnMarkFailedIfThereIsHandler() + public void shouldSetPendingIgnoreOnMarkFailedIfAlreadyFailedAndNoHandler() throws Exception { - BoltStateMachineSPI spi = mock( BoltStateMachineSPI.class ); - BoltChannel boltChannel = mock( BoltChannel.class ); - BoltStateMachine machine = new BoltStateMachine( spi, boltChannel, Clock.systemUTC(), NullLogService.getInstance() ); + BoltStateMachine machine = newMachine( BoltStateMachine.State.FAILED ); + + Neo4jError error = Neo4jError.from( Status.Request.NoThreadsAvailable, "no threads" ); + machine.markFailed( error ); + + assertTrue( machine.ctx.pendingIgnore ); + assertEquals( null, machine.ctx.pendingError ); + assertEquals( BoltStateMachine.State.FAILED, machine.state ); + } + + @Test + public void shouldInvokeResponseHandlerOnNextInitMessageOnMarkFailedIfAlreadyFailedAndNoHandler() throws Exception + { + testMarkFailedShouldYieldIgnoredIfAlreadyFailed( ( machine, handler ) -> machine.init( "Test/1.0", Collections.emptyMap(), handler ) ); + } + + @Test + public void shouldInvokeResponseHandlerOnNextRunMessageOnMarkFailedIfAlreadyFailedAndNoHandler() throws Exception + { + testMarkFailedShouldYieldIgnoredIfAlreadyFailed( + ( machine, handler ) -> machine.run( "RETURN 1", ValueUtils.asMapValue( Collections.emptyMap() ), handler ) ); + } + + @Test + public void shouldInvokeResponseHandlerOnNextPullAllMessageOnMarkFailedIfAlreadyFailedAndNoHandler() throws Exception + { + testMarkFailedShouldYieldIgnoredIfAlreadyFailed( ( machine, handler ) -> machine.pullAll( handler ) ); + } + + @Test + public void shouldInvokeResponseHandlerOnNextDiscardAllMessageOnMarkFailedIfAlreadyFailedAndNoHandler() throws Exception + { + testMarkFailedShouldYieldIgnoredIfAlreadyFailed( ( machine, handler ) -> machine.discardAll( handler ) ); + } + + @Test + public void shouldInvokeResponseHandlerOnNextResetMessageOnMarkFailedIfAlreadyFailedAndNoHandler() throws Exception + { + testMarkFailedShouldYieldIgnoredIfAlreadyFailed( ( machine, handler ) -> machine.reset( handler ) ); + } + + @Test + public void shouldInvokeResponseHandlerOnNextAckFailureMessageOnMarkFailedIfAlreadyFailedAndNoHandler() throws Exception + { + testMarkFailedShouldYieldIgnoredIfAlreadyFailed( ( machine, handler ) -> machine.ackFailure( handler ) ); + } + + @Test + public void shouldInvokeResponseHandlerOnNextExternalErrorMessageOnMarkFailedIfAlreadyFailedAndNoHandler() throws Exception + { + testMarkFailedShouldYieldIgnoredIfAlreadyFailed( + ( machine, handler ) -> machine.externalError( Neo4jError.from( Status.Request.Invalid, "invalid" ), handler ) ); + } + + @Test + public void shouldInvokeResponseHandlerOnMarkFailedIfThereIsHandler() throws Exception + { + BoltStateMachine machine = newMachine( BoltStateMachine.State.READY ); Neo4jError error = Neo4jError.from( Status.Request.NoThreadsAvailable, "no threads" ); machine.ctx.responseHandler = mock( BoltResponseHandler.class ); machine.markFailed( error ); assertNull( machine.ctx.pendingError ); + assertFalse( machine.ctx.pendingIgnore ); assertEquals( BoltStateMachine.State.FAILED, machine.state ); verify( machine.ctx.responseHandler ).markFailed( error ); } @@ -597,10 +654,9 @@ public void shouldInvokeResponseHandlerOnMarkFailedIfThereIsHandler() private static void testMarkFailedOnNextMessage( ThrowingBiConsumer action ) throws Exception { // Given - BoltStateMachineSPI spi = mock( BoltStateMachineSPI.class ); - BoltChannel boltChannel = mock( BoltChannel.class ); - BoltStateMachine machine = new BoltStateMachine( spi, boltChannel, Clock.systemUTC(), NullLogService.getInstance() ); + BoltStateMachine machine = newMachine( BoltStateMachine.State.READY ); BoltResponseHandler responseHandler = mock( BoltResponseHandler.class ); + Neo4jError error = Neo4jError.from( Status.Request.NoThreadsAvailable, "no threads" ); machine.markFailed( error ); @@ -608,8 +664,29 @@ private static void testMarkFailedOnNextMessage( ThrowingBiConsumer action ) throws Exception + { + // Given + BoltStateMachine machine = newMachine( BoltStateMachine.State.FAILED ); + BoltResponseHandler responseHandler = mock( BoltResponseHandler.class ); + + Neo4jError error = Neo4jError.from( Status.Request.NoThreadsAvailable, "no threads" ); + machine.markFailed( error ); + + // When + action.accept( machine, responseHandler ); + + // Expect + assertNull( machine.ctx.pendingError ); + assertFalse( machine.ctx.pendingIgnore ); + assertEquals( BoltStateMachine.State.FAILED, machine.state ); + verify( responseHandler ).markIgnored(); + } }