Skip to content

Commit

Permalink
Exit streaming state instead of resetting bolt state machine
Browse files Browse the repository at this point in the history
  • Loading branch information
ali-ince committed Mar 26, 2018
1 parent f88d6e8 commit 969d808
Showing 1 changed file with 24 additions and 24 deletions.
Expand Up @@ -33,22 +33,21 @@


import org.neo4j.bolt.AbstractBoltTransportsTest; import org.neo4j.bolt.AbstractBoltTransportsTest;
import org.neo4j.bolt.runtime.BoltConnection; import org.neo4j.bolt.runtime.BoltConnection;
import org.neo4j.bolt.v1.messaging.message.ResetMessage;
import org.neo4j.bolt.v1.transport.integration.Neo4jWithSocket; import org.neo4j.bolt.v1.transport.integration.Neo4jWithSocket;
import org.neo4j.bolt.v1.transport.socket.client.TransportConnection; import org.neo4j.bolt.v1.transport.socket.client.TransportConnection;
import org.neo4j.graphdb.factory.GraphDatabaseSettings; import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.kernel.api.exceptions.Status; import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.configuration.BoltConnector; import org.neo4j.kernel.configuration.BoltConnector;
import org.neo4j.logging.AssertableLogProvider; import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.test.TestGraphDatabaseFactory; import org.neo4j.test.TestGraphDatabaseFactory;
import org.neo4j.test.rule.concurrent.OtherThreadRule;
import org.neo4j.test.rule.fs.EphemeralFileSystemRule; import org.neo4j.test.rule.fs.EphemeralFileSystemRule;


import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.isA; import static org.hamcrest.CoreMatchers.isA;
import static org.hamcrest.CoreMatchers.startsWith; import static org.hamcrest.CoreMatchers.startsWith;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.neo4j.bolt.v1.messaging.message.DiscardAllMessage.discardAll;
import static org.neo4j.bolt.v1.messaging.message.InitMessage.init; import static org.neo4j.bolt.v1.messaging.message.InitMessage.init;
import static org.neo4j.bolt.v1.messaging.message.PullAllMessage.pullAll; import static org.neo4j.bolt.v1.messaging.message.PullAllMessage.pullAll;
import static org.neo4j.bolt.v1.messaging.message.RunMessage.run; import static org.neo4j.bolt.v1.messaging.message.RunMessage.run;
Expand All @@ -70,10 +69,6 @@ public class BoltSchedulerShouldReportFailureWhenBusyIT extends AbstractBoltTran


@Rule @Rule
public RuleChain ruleChain = RuleChain.outerRule( fsRule ).around( server ); public RuleChain ruleChain = RuleChain.outerRule( fsRule ).around( server );
@Rule
public OtherThreadRule<Integer> spawnedUpdate1 = new OtherThreadRule<>();
@Rule
public OtherThreadRule<Integer> spawnedUpdate2 = new OtherThreadRule<>();


protected TestGraphDatabaseFactory getTestGraphDatabaseFactory() protected TestGraphDatabaseFactory getTestGraphDatabaseFactory()
{ {
Expand Down Expand Up @@ -109,9 +104,9 @@ public void setup() throws Exception
@After @After
public void cleanup() throws Exception public void cleanup() throws Exception
{ {
reset( connection1 ); connection1.disconnect();
reset( connection2 ); connection2.disconnect();
reset( connection3 ); connection3.disconnect();
} }


@Test @Test
Expand All @@ -123,15 +118,23 @@ public void shouldReportFailureWhenAllThreadsInThreadPoolAreBusy() throws Except
enterStreaming( connection1 ); enterStreaming( connection1 );
enterStreaming( connection2 ); enterStreaming( connection2 );


connection3.send( util.chunk( run( "RETURN 1" ), pullAll() ) ); try
assertThat( connection3, {
util.eventuallyReceives( msgFailure( Status.Request.NoThreadsAvailable, "There are no available threads to serve this request at the moment" ), connection3.send( util.chunk( run( "RETURN 1" ), pullAll() ) );
msgFailure( Status.Request.NoThreadsAvailable, "There are no available threads to serve this request at the moment" ) ) ); assertThat( connection3, util.eventuallyReceives(

msgFailure( Status.Request.NoThreadsAvailable, "There are no available threads to serve this request at the moment" ),
userLogProvider.assertContainsMessageContaining( "since there are no available threads to serve it at the moment. You can retry at a later time" ); msgFailure( Status.Request.NoThreadsAvailable, "There are no available threads to serve this request at the moment" ) ) );
internalLogProvider.assertAtLeastOnce( AssertableLogProvider.inLog( startsWith( BoltConnection.class.getPackage().getName() ) ).error(
containsString( "since there are no available threads to serve it at the moment. You can retry at a later time" ), userLogProvider.assertContainsMessageContaining( "since there are no available threads to serve it at the moment. You can retry at a later time" );
isA( RejectedExecutionException.class ) ) ); internalLogProvider.assertAtLeastOnce( AssertableLogProvider.inLog( startsWith( BoltConnection.class.getPackage().getName() ) ).error(
containsString( "since there are no available threads to serve it at the moment. You can retry at a later time" ),
isA( RejectedExecutionException.class ) ) );
}
finally
{
exitStreaming( connection1 );
exitStreaming( connection2 );
}
} }


private TransportConnection performHandshake( TransportConnection connection ) throws Exception private TransportConnection performHandshake( TransportConnection connection ) throws Exception
Expand All @@ -151,13 +154,10 @@ private void enterStreaming( TransportConnection connection ) throws Exception
assertThat( connection, util.eventuallyReceives( msgSuccess() ) ); assertThat( connection, util.eventuallyReceives( msgSuccess() ) );
} }


private void reset( TransportConnection connection ) throws Exception private void exitStreaming( TransportConnection connection ) throws Exception
{ {
if ( connection != null ) connection.send( util.chunk( discardAll() ) );
{
connection.send( util.chunk( ResetMessage.reset() ) );


assertThat( connection, util.eventuallyReceives( msgSuccess() ) ); assertThat( connection, util.eventuallyReceives( msgSuccess() ) );
}
} }
} }

0 comments on commit 969d808

Please sign in to comment.