Skip to content

Commit

Permalink
Ensure RESET messages clear pending ignore and error markers
Browse files Browse the repository at this point in the history
  • Loading branch information
ali-ince committed Jan 17, 2019
1 parent 56cec59 commit bc50848
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 71 deletions.
Expand Up @@ -54,6 +54,7 @@ else if ( message instanceof ResetMessage )
context.connectionState().markIgnored();
return this;
}
context.connectionState().resetPendingFailedAndIgnored();
boolean success = context.resetMachine();
return success ? readyState : failedState;
}
Expand Down
Expand Up @@ -53,6 +53,7 @@ public BoltStateMachineState process( RequestMessage message, StateMachineContex
context.connectionState().markIgnored();
return this;
}
context.connectionState().resetPendingFailedAndIgnored();
boolean success = context.resetMachine();
return success ? readyState : null;
}
Expand Down
Expand Up @@ -17,7 +17,7 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.bolt.v1.runtime;
package org.neo4j.bolt.runtime;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
Expand All @@ -27,11 +27,10 @@
import java.util.Optional;

import org.neo4j.bolt.BoltChannel;
import org.neo4j.bolt.runtime.BoltStateMachine;
import org.neo4j.bolt.runtime.BoltStateMachineFactoryImpl;
import org.neo4j.bolt.security.auth.Authentication;
import org.neo4j.bolt.testing.BoltTestUtil;
import org.neo4j.bolt.v1.BoltProtocolV1;
import org.neo4j.bolt.v1.runtime.BoltStateMachineV1;
import org.neo4j.bolt.v2.BoltProtocolV2;
import org.neo4j.bolt.v3.BoltStateMachineV3;
import org.neo4j.dbms.database.DatabaseManager;
Expand Down
Expand Up @@ -82,7 +82,7 @@
import static org.neo4j.bolt.v1.runtime.MachineRoom.newMachineWithTransaction;
import static org.neo4j.bolt.v1.runtime.MachineRoom.newMachineWithTransactionSPI;

public class BoltStateMachineTest
public class BoltStateMachineV1Test
{
@Test
public void allStateTransitionsShouldSendExactlyOneResponseToTheClient() throws Exception
Expand Down Expand Up @@ -567,43 +567,13 @@ public void shouldInvokeResponseHandlerOnNextDiscardAllMessageOnMarkFailedIfNoHa
@Test
public void shouldInvokeResponseHandlerOnNextResetMessageOnMarkFailedIfNoHandler() throws Exception
{
// Given
BoltStateMachine machine = newMachine();
BoltResponseHandler responseHandler = mock( BoltResponseHandler.class );

Neo4jError error = Neo4jError.from( Status.Request.NoThreadsAvailable, "no threads" );
machine.markFailed( error );

// When
machine.process( ResetMessage.INSTANCE, responseHandler );

// Expect
assertNull( pendingError( machine ) );
assertFalse( pendingIgnore( machine ) );
assertThat( machine, inState( ReadyState.class ) );
verify( responseHandler, never() ).markFailed( any() );
verify( responseHandler, never() ).markIgnored();
testReadyStateAfterMarkFailedOnNextMessage( ( machine, handler ) -> machine.process( ResetMessage.INSTANCE, handler ) );
}

@Test
public void shouldGotoReadyStateOnNextAckFailureMessageOnMarkFailedIfNoHandler() throws Exception
{
// Given
BoltStateMachine machine = newMachine();
BoltResponseHandler responseHandler = mock( BoltResponseHandler.class );

Neo4jError error = Neo4jError.from( Status.Request.NoThreadsAvailable, "no threads" );
machine.markFailed( error );

// When
machine.process( AckFailureMessage.INSTANCE, responseHandler );

// Expect
assertNull( pendingError( machine ) );
assertFalse( pendingIgnore( machine ) );
assertThat( machine, inState( ReadyState.class ) );
verify( responseHandler, never() ).markFailed( any() );
verify( responseHandler, never() ).markIgnored();
testReadyStateAfterMarkFailedOnNextMessage( ( machine, handler ) -> machine.process( AckFailureMessage.INSTANCE, handler ) );
}

@Test
Expand Down Expand Up @@ -655,45 +625,13 @@ public void shouldInvokeResponseHandlerOnNextDiscardAllMessageOnMarkFailedIfAlre
@Test
public void shouldInvokeResponseHandlerOnNextResetMessageOnMarkFailedIfAlreadyFailedAndNoHandler() throws Exception
{
// Given
BoltStateMachine machine = newMachine();
machine.markFailed( Neo4jError.from( new RuntimeException() ) );
BoltResponseHandler responseHandler = mock( BoltResponseHandler.class );

Neo4jError error = Neo4jError.from( Status.Request.NoThreadsAvailable, "no threads" );
machine.markFailed( error );

// When
machine.process( ResetMessage.INSTANCE, responseHandler );

// Expect
assertNull( pendingError( machine ) );
assertFalse( pendingIgnore( machine ) );
assertThat( machine, inState( ReadyState.class ) );
verify( responseHandler, never() ).markIgnored();
verify( responseHandler, never() ).markFailed( any() );
testMarkFailedShouldYieldSuccessIfAlreadyFailed( ( machine, handler ) -> machine.process( ResetMessage.INSTANCE, handler ) );
}

@Test
public void shouldInvokeResponseHandlerOnNextAckFailureMessageOnMarkFailedIfAlreadyFailedAndNoHandler() throws Exception
{
// Given
BoltStateMachine machine = newMachine();
machine.markFailed( Neo4jError.from( new RuntimeException() ) );
BoltResponseHandler responseHandler = mock( BoltResponseHandler.class );

Neo4jError error = Neo4jError.from( Status.Request.NoThreadsAvailable, "no threads" );
machine.markFailed( error );

// When
machine.process( AckFailureMessage.INSTANCE, responseHandler );

// Expect
assertNull( pendingError( machine ) );
assertFalse( pendingIgnore( machine ) );
assertThat( machine, inState( ReadyState.class ) );
verify( responseHandler, never() ).markIgnored();
verify( responseHandler, never() ).markFailed( any() );
testMarkFailedShouldYieldSuccessIfAlreadyFailed( ( machine, handler ) -> machine.process( AckFailureMessage.INSTANCE, handler ) );
}

@Test
Expand Down Expand Up @@ -736,6 +674,54 @@ public void shouldNotFailWhenMarkedForTerminationAndPullAll() throws Exception
assertThat( machine, not( inState( FailedState.class ) ) );
}

@Test
public void shouldSucceedOnResetOnFailedState() throws Exception
{
// Given
BoltResponseRecorder recorder = new BoltResponseRecorder();

// Given a FAILED machine
BoltStateMachine machine = init( newMachine() );

machine.markFailed( Neo4jError.from( Status.Request.NoThreadsAvailable, "No Threads Available" ) );
machine.process( PullAllMessage.INSTANCE, recorder );

// When I RESET...
machine.interrupt();
machine.markFailed( Neo4jError.from( Status.Request.NoThreadsAvailable, "No Threads Available" ) );
machine.process( ResetMessage.INSTANCE, recorder );

assertThat( recorder.nextResponse(), failedWithStatus( Status.Request.NoThreadsAvailable ) );
// ...successfully
assertThat( recorder.nextResponse(), succeeded() );
}

@Test
public void shouldSucceedOnConsecutiveResetsOnFailedState() throws Exception
{
// Given
BoltResponseRecorder recorder = new BoltResponseRecorder();

// Given a FAILED machine
BoltStateMachine machine = init( newMachine() );

machine.markFailed( Neo4jError.from( Status.Request.NoThreadsAvailable, "No Threads Available" ) );
machine.process( PullAllMessage.INSTANCE, recorder );

// When I RESET...
machine.interrupt();
machine.interrupt();
machine.markFailed( Neo4jError.from( Status.Request.NoThreadsAvailable, "No Threads Available" ) );
machine.process( ResetMessage.INSTANCE, recorder );
machine.markFailed( Neo4jError.from( Status.Request.NoThreadsAvailable, "No Threads Available" ) );
machine.process( ResetMessage.INSTANCE, recorder );

assertThat( recorder.nextResponse(), failedWithStatus( Status.Request.NoThreadsAvailable ) );
// ...successfully
assertThat( recorder.nextResponse(), wasIgnored() );
assertThat( recorder.nextResponse(), succeeded() );
}

private static void testMarkFailedOnNextMessage( ThrowingBiConsumer<BoltStateMachine,BoltResponseHandler,BoltConnectionFatality> action ) throws Exception
{
// Given
Expand All @@ -755,6 +741,27 @@ private static void testMarkFailedOnNextMessage( ThrowingBiConsumer<BoltStateMac
verify( responseHandler ).markFailed( error );
}

private static void testReadyStateAfterMarkFailedOnNextMessage( ThrowingBiConsumer<BoltStateMachine,BoltResponseHandler,BoltConnectionFatality> action )
throws Exception
{
// Given
BoltStateMachine machine = init( newMachine() );
BoltResponseHandler responseHandler = mock( BoltResponseHandler.class );

Neo4jError error = Neo4jError.from( Status.Request.NoThreadsAvailable, "no threads" );
machine.markFailed( error );

// When
action.accept( machine, responseHandler );

// Expect
assertNull( pendingError( machine ) );
assertFalse( pendingIgnore( machine ) );
assertThat( machine, inState( ReadyState.class ) );
verify( responseHandler, never() ).markFailed( any() );
verify( responseHandler, never() ).markIgnored();
}

private static void testMarkFailedShouldYieldIgnoredIfAlreadyFailed(
ThrowingBiConsumer<BoltStateMachine,BoltResponseHandler,BoltConnectionFatality> action ) throws Exception
{
Expand All @@ -776,6 +783,28 @@ private static void testMarkFailedShouldYieldIgnoredIfAlreadyFailed(
verify( responseHandler ).markIgnored();
}

private static void testMarkFailedShouldYieldSuccessIfAlreadyFailed(
ThrowingBiConsumer<BoltStateMachine,BoltResponseHandler,BoltConnectionFatality> action ) throws Exception
{
// Given
BoltStateMachine machine = init( newMachine() );
machine.markFailed( Neo4jError.from( new RuntimeException() ) );
BoltResponseHandler responseHandler = mock( BoltResponseHandler.class );

Neo4jError error = Neo4jError.from( Status.Request.NoThreadsAvailable, "no threads" );
machine.markFailed( error );

// When
action.accept( machine, responseHandler );

// Expect
assertNull( pendingError( machine ) );
assertFalse( pendingIgnore( machine ) );
assertThat( machine, inState( ReadyState.class ) );
verify( responseHandler, never() ).markIgnored();
verify( responseHandler, never() ).markFailed( any() );
}

private static TransactionStateMachine txStateMachine( BoltStateMachine machine )
{
return (TransactionStateMachine) ((BoltStateMachineV1) machine).statementProcessor();
Expand Down
Expand Up @@ -50,7 +50,7 @@
public class MachineRoom
{
static final MapValue EMPTY_PARAMS = VirtualValues.EMPTY_MAP;
static final String USER_AGENT = "BoltStateMachineTest/0.0";
static final String USER_AGENT = "BoltStateMachineV1Test/0.0";

private MachineRoom()
{
Expand Down

0 comments on commit bc50848

Please sign in to comment.