From e76901e933153cf88e3c37f5d91489c6702b6fbf Mon Sep 17 00:00:00 2001 From: Ali Ince Date: Fri, 23 Mar 2018 13:28:25 +0000 Subject: [PATCH] Unflake BoltSchedulerShouldReportFailureWhenBusyIT --- ...chedulerShouldReportFailureWhenBusyIT.java | 88 ++++++++----------- 1 file changed, 37 insertions(+), 51 deletions(-) diff --git a/community/bolt/src/test/java/org/neo4j/bolt/runtime/integration/BoltSchedulerShouldReportFailureWhenBusyIT.java b/community/bolt/src/test/java/org/neo4j/bolt/runtime/integration/BoltSchedulerShouldReportFailureWhenBusyIT.java index ea14b1684982f..0edb66bc6ce6c 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/runtime/integration/BoltSchedulerShouldReportFailureWhenBusyIT.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/runtime/integration/BoltSchedulerShouldReportFailureWhenBusyIT.java @@ -19,7 +19,7 @@ */ package org.neo4j.bolt.runtime.integration; -import org.hamcrest.CoreMatchers; +import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -28,30 +28,23 @@ import org.junit.runners.Parameterized; import java.util.Map; -import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import org.neo4j.bolt.AbstractBoltTransportsTest; import org.neo4j.bolt.runtime.BoltConnection; +import org.neo4j.bolt.v1.messaging.message.ResetMessage; import org.neo4j.bolt.v1.transport.integration.Neo4jWithSocket; import org.neo4j.bolt.v1.transport.socket.client.TransportConnection; -import org.neo4j.function.Predicates; import org.neo4j.graphdb.factory.GraphDatabaseSettings; -import org.neo4j.helpers.HostnamePort; -import org.neo4j.helpers.collection.MapUtil; import org.neo4j.kernel.api.exceptions.Status; import org.neo4j.kernel.configuration.BoltConnector; -import org.neo4j.kernel.impl.util.ValueUtils; import org.neo4j.logging.AssertableLogProvider; import org.neo4j.test.TestGraphDatabaseFactory; import org.neo4j.test.rule.concurrent.OtherThreadRule; import org.neo4j.test.rule.fs.EphemeralFileSystemRule; import static java.util.Collections.emptyMap; -import static java.util.concurrent.TimeUnit.MINUTES; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.isA; import static org.hamcrest.CoreMatchers.startsWith; @@ -71,6 +64,9 @@ public class BoltSchedulerShouldReportFailureWhenBusyIT extends AbstractBoltTran private AssertableLogProvider userLogProvider = new AssertableLogProvider(); private EphemeralFileSystemRule fsRule = new EphemeralFileSystemRule(); private Neo4jWithSocket server = new Neo4jWithSocket( getClass(), getTestGraphDatabaseFactory(), fsRule::get, getSettingsFunction() ); + private TransportConnection connection1; + private TransportConnection connection2; + private TransportConnection connection3; @Rule public RuleChain ruleChain = RuleChain.outerRule( fsRule ).around( server ); @@ -100,30 +96,35 @@ protected Consumer> getSettingsFunction() }; } - @Test - public void shouldReportFailureWhenAllThreadsInThreadPoolAreBusy() throws Exception + @Before + public void setup() throws Exception { - AtomicInteger updateCounter = new AtomicInteger(); - address = server.lookupDefaultConnector(); - TransportConnection connection1 = performHandshake( newConnection() ); - TransportConnection connection2 = performHandshake( newConnection() ); - TransportConnection connection3 = performHandshake( newConnection() ); - TransportConnection connection4 = performHandshake( newConnection() ); - // Generate a Lock - createNode( connection1, 100 ); - // Start update request - updateNode( connection1, 100, 101, updateCounter ); - - // Try to update the same node, these two lines will block all available threads - Future result1 = spawnedUpdate1.execute( state -> updateNodeNoThrow( connection2, 100, 101, updateCounter ) ); - Future result2 = spawnedUpdate2.execute( state -> updateNodeNoThrow( connection3, 100, 101, updateCounter ) ); + connection1 = performHandshake( newConnection() ); + connection2 = performHandshake( newConnection() ); + connection3 = performHandshake( newConnection() ); + } - Predicates.await( () -> updateCounter.get() > 2, 1, MINUTES ); + @After + public void cleanup() throws Exception + { + reset( connection1 ); + reset( connection2 ); + reset( connection3 ); + } - connection4.send( util.chunk( run( "RETURN 1" ), pullAll() ) ); - assertThat( connection4, + @Test + public void shouldReportFailureWhenAllThreadsInThreadPoolAreBusy() throws Exception + { + // it's enough to get the bolt state machine into streaming mode to have + // the thread sticked to the connection, causing all the available threads + // to be busy (logically) + enterStreaming( connection1 ); + enterStreaming( connection2 ); + + connection3.send( util.chunk( run( "RETURN 1" ), pullAll() ) ); + assertThat( connection3, util.eventuallyReceives( msgFailure( Status.Request.NoThreadsAvailable, "There are no available threads to serve this request at the moment" ), msgFailure( Status.Request.NoThreadsAvailable, "There are no available threads to serve this request at the moment" ) ) ); @@ -143,35 +144,20 @@ private TransportConnection performHandshake( TransportConnection connection ) t return connection; } - private void createNode( TransportConnection connection, int id ) throws Exception + private void enterStreaming( TransportConnection connection ) throws Exception { - connection.send( util.chunk( run( "BEGIN" ), pullAll(), run( "CREATE (n { id: {id} })", ValueUtils.asMapValue( MapUtil.map( "id", id ) ) ), pullAll(), - run( "COMMIT" ), pullAll() ) ); + connection.send( util.chunk( run( "UNWIND RANGE (1, 100) AS x RETURN x" ) ) ); - assertThat( connection, util.eventuallyReceives( msgSuccess(), msgSuccess(), msgSuccess(), msgSuccess(), msgSuccess(), msgSuccess() ) ); - } - - private void updateNode( TransportConnection connection, int oldId, int newId, AtomicInteger updateCounter ) throws Exception - { - connection.send( util.chunk( run( "BEGIN" ), pullAll(), - run( "MATCH (n { id: {oldId} }) SET n.id = {newId}", ValueUtils.asMapValue( MapUtil.map( "oldId", oldId, "newId", newId ) ) ), pullAll() ) ); - - updateCounter.incrementAndGet(); - - assertThat( connection, util.eventuallyReceives( msgSuccess(), msgSuccess(), msgSuccess(), msgSuccess() ) ); + assertThat( connection, util.eventuallyReceives( msgSuccess() ) ); } - private int updateNodeNoThrow( TransportConnection connection, int oldId, int newId, AtomicInteger updateCounter ) + private void reset( TransportConnection connection ) throws Exception { - try - { - updateNode( connection, oldId, newId, updateCounter ); - } - catch ( Throwable t ) + if ( connection != null ) { - return -1; - } + connection.send( util.chunk( ResetMessage.reset() ) ); - return 0; + assertThat( connection, util.eventuallyReceives( msgSuccess() ) ); + } } }