diff --git a/community/bolt/src/test/java/org/neo4j/bolt/runtime/integration/BoltSchedulerShouldReportFailureWhenBusyIT.java b/community/bolt/src/test/java/org/neo4j/bolt/runtime/integration/BoltSchedulerShouldReportFailureWhenBusyIT.java index fef0d5573a2eb..0ed10225ead7b 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/runtime/integration/BoltSchedulerShouldReportFailureWhenBusyIT.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/runtime/integration/BoltSchedulerShouldReportFailureWhenBusyIT.java @@ -49,7 +49,6 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.neo4j.bolt.v1.messaging.message.DiscardAllMessage.discardAll; import static org.neo4j.bolt.v1.messaging.message.InitMessage.init; -import static org.neo4j.bolt.v1.messaging.message.PullAllMessage.pullAll; import static org.neo4j.bolt.v1.messaging.message.RunMessage.run; import static org.neo4j.bolt.v1.messaging.util.MessageMatchers.msgFailure; import static org.neo4j.bolt.v1.messaging.util.MessageMatchers.msgSuccess; @@ -96,17 +95,17 @@ public void setup() throws Exception { address = server.lookupDefaultConnector(); - connection1 = performHandshake( newConnection() ); - connection2 = performHandshake( newConnection() ); - connection3 = performHandshake( newConnection() ); + connection1 = newConnection(); + connection2 = newConnection(); + connection3 = newConnection(); } @After public void cleanup() throws Exception { - connection1.disconnect(); - connection2.disconnect(); - connection3.disconnect(); + close( connection1 ); + close( connection2 ); + close( connection3 ); } @Test @@ -120,15 +119,23 @@ public void shouldReportFailureWhenAllThreadsInThreadPoolAreBusy() throws Except try { - connection3.send( util.chunk( run( "RETURN 1" ), pullAll() ) ); + connection3 = newConnection(); + + connection3.connect( address ) + .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.chunk( init( "TestClient/1.1", emptyMap() ) ) ); + + assertThat( connection3, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); assertThat( connection3, util.eventuallyReceives( - msgFailure( Status.Request.NoThreadsAvailable, "There are no available threads to serve this request at the moment" ), msgFailure( Status.Request.NoThreadsAvailable, "There are no available threads to serve this request at the moment" ) ) ); - userLogProvider.assertContainsMessageContaining( "since there are no available threads to serve it at the moment. You can retry at a later time" ); - internalLogProvider.assertAtLeastOnce( AssertableLogProvider.inLog( startsWith( BoltConnection.class.getPackage().getName() ) ).error( - containsString( "since there are no available threads to serve it at the moment. You can retry at a later time" ), - isA( RejectedExecutionException.class ) ) ); + userLogProvider.assertContainsMessageContaining( + "since there are no available threads to serve it at the moment. You can retry at a later time" ); + internalLogProvider.assertAtLeastOnce( AssertableLogProvider + .inLog( startsWith( BoltConnection.class.getPackage().getName() ) ) + .error( + containsString( "since there are no available threads to serve it at the moment. You can retry at a later time" ), + isA( RejectedExecutionException.class ) ) ); } finally { @@ -137,20 +144,15 @@ public void shouldReportFailureWhenAllThreadsInThreadPoolAreBusy() throws Except } } - private TransportConnection performHandshake( TransportConnection connection ) throws Exception + private void enterStreaming( TransportConnection connection ) throws Exception { - connection.connect( address ).send( util.acceptedVersions( 1, 0, 0, 0 ) ).send( util.chunk( init( "TestClient/1.1", emptyMap() ) ) ); + connection.connect( address ) + .send( util.acceptedVersions( 1, 0, 0, 0 ) ) + .send( util.chunk( init( "TestClient/1.1", emptyMap() ) ) ) + .send( util.chunk( run( "UNWIND RANGE (1, 100) AS x RETURN x" ) ) ); assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) ); assertThat( connection, util.eventuallyReceives( msgSuccess() ) ); - - return connection; - } - - private void enterStreaming( TransportConnection connection ) throws Exception - { - connection.send( util.chunk( run( "UNWIND RANGE (1, 100) AS x RETURN x" ) ) ); - assertThat( connection, util.eventuallyReceives( msgSuccess() ) ); } @@ -160,4 +162,12 @@ private void exitStreaming( TransportConnection connection ) throws Exception assertThat( connection, util.eventuallyReceives( msgSuccess() ) ); } + + private void close( TransportConnection connection ) throws Exception + { + if ( connection != null ) + { + connection.disconnect(); + } + } }