Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/3.5' into 3.5-more-longarrayset
Browse files Browse the repository at this point in the history
  • Loading branch information
systay committed May 22, 2018
2 parents f4f9624 + ba565f0 commit 4c8d46d
Show file tree
Hide file tree
Showing 409 changed files with 4,653 additions and 4,540 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,20 @@ private void after()
{
try
{
if ( hasPendingError() )
if ( ctx.hasFailedOrIgnored() )
{
Neo4jError pendingError = ctx.pendingError;
ctx.pendingError = null;
ctx.markFailed( pendingError );

if ( pendingError != null )
{
ctx.markFailed( pendingError );
}
else
{
ctx.markIgnored();
}

ctx.resetPendingFailedAndIgnored();
}

ctx.responseHandler.onFinish();
Expand All @@ -135,7 +144,7 @@ public void init( String userAgent, Map<String,Object> authToken,
before( handler );
try
{
if ( !hasPendingError() )
if ( !ctx.hasFailedOrIgnored() )
{
state = state.init( this, userAgent, authToken );
}
Expand All @@ -162,10 +171,7 @@ public void ackFailure( BoltResponseHandler handler ) throws BoltConnectionFatal
before( handler );
try
{
if ( !hasPendingError() )
{
state = state.ackFailure( this );
}
state = state.ackFailure( this );
}
finally
{
Expand All @@ -192,7 +198,7 @@ public void reset( BoltResponseHandler handler ) throws BoltConnectionFatality
before( handler );
try
{
if ( !hasPendingError() )
if ( !ctx.hasFailedOrIgnored() )
{
state = state.reset( this );
}
Expand All @@ -217,7 +223,7 @@ public void run( String statement, MapValue params, BoltResponseHandler handler
before( handler );
try
{
if ( !hasPendingError() )
if ( !ctx.hasFailedOrIgnored() )
{
state = state.run( this, statement, params );
handler.onMetadata( "result_available_after", Values.longValue( clock.millis() - start ) );
Expand All @@ -239,7 +245,7 @@ public void discardAll( BoltResponseHandler handler ) throws BoltConnectionFatal
before( handler );
try
{
if ( !hasPendingError() )
if ( !ctx.hasFailedOrIgnored() )
{
state = state.discardAll( this );
}
Expand All @@ -259,7 +265,7 @@ public void pullAll( BoltResponseHandler handler ) throws BoltConnectionFatality
before( handler );
try
{
if ( !hasPendingError() )
if ( !ctx.hasFailedOrIgnored() )
{
state = state.pullAll( this );
}
Expand All @@ -276,11 +282,6 @@ public void markFailed( Neo4jError error )
state = State.FAILED;
}

private boolean hasPendingError()
{
return ctx.pendingError != null;
}

/** A session id that is unique for this database instance */
public String key()
{
Expand Down Expand Up @@ -321,8 +322,11 @@ public void externalError( Neo4jError error, BoltResponseHandler handler ) throw
before( handler );
try
{
fail( this, error );
this.state = State.FAILED;
if ( !ctx.hasFailedOrIgnored() )
{
fail( this, error );
this.state = State.FAILED;
}
}
finally
{
Expand Down Expand Up @@ -564,6 +568,7 @@ public State reset( BoltStateMachine machine ) throws BoltConnectionFatality
@Override
public State ackFailure( BoltStateMachine machine )
{
machine.ctx.resetPendingFailedAndIgnored();
return READY;
}

Expand Down Expand Up @@ -746,7 +751,14 @@ private static State handleFailure( BoltStateMachine machine, Throwable t, Neo4j
private static void fail( BoltStateMachine machine, Neo4jError neo4jError )
{
machine.spi.reportError( neo4jError );
machine.ctx.markFailed( neo4jError );
if ( machine.state == State.FAILED )
{
machine.ctx.markIgnored();
}
else
{
machine.ctx.markFailed( neo4jError );
}
}

private void reset()
Expand Down Expand Up @@ -774,6 +786,8 @@ static class MutableConnectionState implements BoltResponseHandler

Neo4jError pendingError;

boolean pendingIgnore;

/**
* This is incremented each time {@link #interrupt()} is called,
* and decremented each time a {@link BoltStateMachine#reset(BoltResponseHandler)} message
Expand Down Expand Up @@ -843,6 +857,10 @@ public void markIgnored()
{
responseHandler.markIgnored();
}
else
{
pendingIgnore = true;
}
}

@Override
Expand All @@ -867,6 +885,17 @@ public void onFinish()
}
}

private boolean hasFailedOrIgnored()
{
return pendingError != null || pendingIgnore;
}

private void resetPendingFailedAndIgnored()
{
pendingError = null;
pendingIgnore = false;
}

}

public interface SPI
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand All @@ -47,6 +48,7 @@
import static org.mockito.Mockito.RETURNS_MOCKS;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.neo4j.bolt.testing.BoltMatchers.canReset;
Expand Down Expand Up @@ -567,9 +569,24 @@ public void shouldInvokeResponseHandlerOnNextResetMessageOnMarkFailedIfNoHandler
}

@Test
public void shouldInvokeResponseHandlerOnNextAckFailureMessageOnMarkFailedIfNoHandler() throws Exception
public void shouldGotoReadyStateOnNextAckFailureMessageOnMarkFailedIfNoHandler() throws Exception
{
testMarkFailedOnNextMessage( ( machine, handler ) -> machine.ackFailure( handler ) );
// Given
BoltStateMachine machine = newMachine( BoltStateMachine.State.READY );
BoltResponseHandler responseHandler = mock( BoltResponseHandler.class );

Neo4jError error = Neo4jError.from( Status.Request.NoThreadsAvailable, "no threads" );
machine.markFailed( error );

// When
machine.ackFailure( responseHandler );

// Expect
assertNull( machine.ctx.pendingError );
assertFalse( machine.ctx.pendingIgnore );
assertEquals( BoltStateMachine.State.READY, machine.state );
verify( responseHandler, never() ).markFailed( any() );
verify( responseHandler, never() ).markIgnored();
}

@Test
Expand All @@ -579,37 +596,128 @@ public void shouldInvokeResponseHandlerOnNextExternalErrorMessageOnMarkFailedIfN
}

@Test
public void shouldInvokeResponseHandlerOnMarkFailedIfThereIsHandler()
public void shouldSetPendingIgnoreOnMarkFailedIfAlreadyFailedAndNoHandler() throws Exception
{
BoltStateMachineSPI spi = mock( BoltStateMachineSPI.class );
BoltChannel boltChannel = mock( BoltChannel.class );
BoltStateMachine machine = new BoltStateMachine( spi, boltChannel, Clock.systemUTC(), NullLogService.getInstance() );
BoltStateMachine machine = newMachine( BoltStateMachine.State.FAILED );

Neo4jError error = Neo4jError.from( Status.Request.NoThreadsAvailable, "no threads" );
machine.markFailed( error );

assertTrue( machine.ctx.pendingIgnore );
assertEquals( null, machine.ctx.pendingError );
assertEquals( BoltStateMachine.State.FAILED, machine.state );
}

@Test
public void shouldInvokeResponseHandlerOnNextInitMessageOnMarkFailedIfAlreadyFailedAndNoHandler() throws Exception
{
testMarkFailedShouldYieldIgnoredIfAlreadyFailed( ( machine, handler ) -> machine.init( "Test/1.0", Collections.emptyMap(), handler ) );
}

@Test
public void shouldInvokeResponseHandlerOnNextRunMessageOnMarkFailedIfAlreadyFailedAndNoHandler() throws Exception
{
testMarkFailedShouldYieldIgnoredIfAlreadyFailed(
( machine, handler ) -> machine.run( "RETURN 1", ValueUtils.asMapValue( Collections.emptyMap() ), handler ) );
}

@Test
public void shouldInvokeResponseHandlerOnNextPullAllMessageOnMarkFailedIfAlreadyFailedAndNoHandler() throws Exception
{
testMarkFailedShouldYieldIgnoredIfAlreadyFailed( ( machine, handler ) -> machine.pullAll( handler ) );
}

@Test
public void shouldInvokeResponseHandlerOnNextDiscardAllMessageOnMarkFailedIfAlreadyFailedAndNoHandler() throws Exception
{
testMarkFailedShouldYieldIgnoredIfAlreadyFailed( ( machine, handler ) -> machine.discardAll( handler ) );
}

@Test
public void shouldInvokeResponseHandlerOnNextResetMessageOnMarkFailedIfAlreadyFailedAndNoHandler() throws Exception
{
testMarkFailedShouldYieldIgnoredIfAlreadyFailed( ( machine, handler ) -> machine.reset( handler ) );
}

@Test
public void shouldInvokeResponseHandlerOnNextAckFailureMessageOnMarkFailedIfAlreadyFailedAndNoHandler() throws Exception
{
// Given
BoltStateMachine machine = newMachine( BoltStateMachine.State.FAILED );
BoltResponseHandler responseHandler = mock( BoltResponseHandler.class );

Neo4jError error = Neo4jError.from( Status.Request.NoThreadsAvailable, "no threads" );
machine.markFailed( error );

// When
machine.ackFailure( responseHandler );

// Expect
assertNull( machine.ctx.pendingError );
assertFalse( machine.ctx.pendingIgnore );
assertEquals( BoltStateMachine.State.READY, machine.state );
verify( responseHandler, never() ).markIgnored();
verify( responseHandler, never() ).markFailed( any() );
}

@Test
public void shouldInvokeResponseHandlerOnNextExternalErrorMessageOnMarkFailedIfAlreadyFailedAndNoHandler() throws Exception
{
testMarkFailedShouldYieldIgnoredIfAlreadyFailed(
( machine, handler ) -> machine.externalError( Neo4jError.from( Status.Request.Invalid, "invalid" ), handler ) );
}

@Test
public void shouldInvokeResponseHandlerOnMarkFailedIfThereIsHandler() throws Exception
{
BoltStateMachine machine = newMachine( BoltStateMachine.State.READY );
Neo4jError error = Neo4jError.from( Status.Request.NoThreadsAvailable, "no threads" );

machine.ctx.responseHandler = mock( BoltResponseHandler.class );
machine.markFailed( error );

assertNull( machine.ctx.pendingError );
assertFalse( machine.ctx.pendingIgnore );
assertEquals( BoltStateMachine.State.FAILED, machine.state );
verify( machine.ctx.responseHandler ).markFailed( error );
}

private static void testMarkFailedOnNextMessage( ThrowingBiConsumer<BoltStateMachine,BoltResponseHandler,BoltConnectionFatality> action ) throws Exception
{
// Given
BoltStateMachineSPI spi = mock( BoltStateMachineSPI.class );
BoltChannel boltChannel = mock( BoltChannel.class );
BoltStateMachine machine = new BoltStateMachine( spi, boltChannel, Clock.systemUTC(), NullLogService.getInstance() );
BoltStateMachine machine = newMachine( BoltStateMachine.State.READY );
BoltResponseHandler responseHandler = mock( BoltResponseHandler.class );

Neo4jError error = Neo4jError.from( Status.Request.NoThreadsAvailable, "no threads" );
machine.markFailed( error );

// When
action.accept( machine, responseHandler );

// Expect
assertNull( machine.ctx.pendingError );
assertFalse( machine.ctx.pendingIgnore );
assertEquals( BoltStateMachine.State.FAILED, machine.state );
verify( responseHandler ).markFailed( error );
}

private static void testMarkFailedShouldYieldIgnoredIfAlreadyFailed(
ThrowingBiConsumer<BoltStateMachine,BoltResponseHandler,BoltConnectionFatality> action ) throws Exception
{
// Given
BoltStateMachine machine = newMachine( BoltStateMachine.State.FAILED );
BoltResponseHandler responseHandler = mock( BoltResponseHandler.class );

Neo4jError error = Neo4jError.from( Status.Request.NoThreadsAvailable, "no threads" );
machine.markFailed( error );

// When
action.accept( machine, responseHandler );

// Expect
assertNull( machine.ctx.pendingError );
assertFalse( machine.ctx.pendingIgnore );
assertEquals( BoltStateMachine.State.FAILED, machine.state );
verify( responseHandler ).markIgnored();
}
}

0 comments on commit 4c8d46d

Please sign in to comment.