Skip to content

Commit

Permalink
Unflake BoltSchedulerShouldReportFailureWhenBusyIT
Browse files Browse the repository at this point in the history
  • Loading branch information
ali-ince committed Mar 23, 2018
1 parent c04ab49 commit e76901e
Showing 1 changed file with 37 additions and 51 deletions.
Expand Up @@ -19,7 +19,7 @@
*/
package org.neo4j.bolt.runtime.integration;

import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -28,30 +28,23 @@
import org.junit.runners.Parameterized;

import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import org.neo4j.bolt.AbstractBoltTransportsTest;
import org.neo4j.bolt.runtime.BoltConnection;
import org.neo4j.bolt.v1.messaging.message.ResetMessage;
import org.neo4j.bolt.v1.transport.integration.Neo4jWithSocket;
import org.neo4j.bolt.v1.transport.socket.client.TransportConnection;
import org.neo4j.function.Predicates;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.helpers.HostnamePort;
import org.neo4j.helpers.collection.MapUtil;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.configuration.BoltConnector;
import org.neo4j.kernel.impl.util.ValueUtils;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.test.TestGraphDatabaseFactory;
import org.neo4j.test.rule.concurrent.OtherThreadRule;
import org.neo4j.test.rule.fs.EphemeralFileSystemRule;

import static java.util.Collections.emptyMap;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.isA;
import static org.hamcrest.CoreMatchers.startsWith;
Expand All @@ -71,6 +64,9 @@ public class BoltSchedulerShouldReportFailureWhenBusyIT extends AbstractBoltTran
private AssertableLogProvider userLogProvider = new AssertableLogProvider();
private EphemeralFileSystemRule fsRule = new EphemeralFileSystemRule();
private Neo4jWithSocket server = new Neo4jWithSocket( getClass(), getTestGraphDatabaseFactory(), fsRule::get, getSettingsFunction() );
private TransportConnection connection1;
private TransportConnection connection2;
private TransportConnection connection3;

@Rule
public RuleChain ruleChain = RuleChain.outerRule( fsRule ).around( server );
Expand Down Expand Up @@ -100,30 +96,35 @@ protected Consumer<Map<String,String>> getSettingsFunction()
};
}

@Test
public void shouldReportFailureWhenAllThreadsInThreadPoolAreBusy() throws Exception
@Before
public void setup() throws Exception
{
AtomicInteger updateCounter = new AtomicInteger();

address = server.lookupDefaultConnector();
TransportConnection connection1 = performHandshake( newConnection() );
TransportConnection connection2 = performHandshake( newConnection() );
TransportConnection connection3 = performHandshake( newConnection() );
TransportConnection connection4 = performHandshake( newConnection() );

// Generate a Lock
createNode( connection1, 100 );
// Start update request
updateNode( connection1, 100, 101, updateCounter );

// Try to update the same node, these two lines will block all available threads
Future<Integer> result1 = spawnedUpdate1.execute( state -> updateNodeNoThrow( connection2, 100, 101, updateCounter ) );
Future<Integer> result2 = spawnedUpdate2.execute( state -> updateNodeNoThrow( connection3, 100, 101, updateCounter ) );
connection1 = performHandshake( newConnection() );
connection2 = performHandshake( newConnection() );
connection3 = performHandshake( newConnection() );
}

Predicates.await( () -> updateCounter.get() > 2, 1, MINUTES );
@After
public void cleanup() throws Exception
{
reset( connection1 );
reset( connection2 );
reset( connection3 );
}

connection4.send( util.chunk( run( "RETURN 1" ), pullAll() ) );
assertThat( connection4,
@Test
public void shouldReportFailureWhenAllThreadsInThreadPoolAreBusy() throws Exception
{
// it's enough to get the bolt state machine into streaming mode to have
// the thread sticked to the connection, causing all the available threads
// to be busy (logically)
enterStreaming( connection1 );
enterStreaming( connection2 );

connection3.send( util.chunk( run( "RETURN 1" ), pullAll() ) );
assertThat( connection3,
util.eventuallyReceives( msgFailure( Status.Request.NoThreadsAvailable, "There are no available threads to serve this request at the moment" ),
msgFailure( Status.Request.NoThreadsAvailable, "There are no available threads to serve this request at the moment" ) ) );

Expand All @@ -143,35 +144,20 @@ private TransportConnection performHandshake( TransportConnection connection ) t
return connection;
}

private void createNode( TransportConnection connection, int id ) throws Exception
private void enterStreaming( TransportConnection connection ) throws Exception
{
connection.send( util.chunk( run( "BEGIN" ), pullAll(), run( "CREATE (n { id: {id} })", ValueUtils.asMapValue( MapUtil.map( "id", id ) ) ), pullAll(),
run( "COMMIT" ), pullAll() ) );
connection.send( util.chunk( run( "UNWIND RANGE (1, 100) AS x RETURN x" ) ) );

assertThat( connection, util.eventuallyReceives( msgSuccess(), msgSuccess(), msgSuccess(), msgSuccess(), msgSuccess(), msgSuccess() ) );
}

private void updateNode( TransportConnection connection, int oldId, int newId, AtomicInteger updateCounter ) throws Exception
{
connection.send( util.chunk( run( "BEGIN" ), pullAll(),
run( "MATCH (n { id: {oldId} }) SET n.id = {newId}", ValueUtils.asMapValue( MapUtil.map( "oldId", oldId, "newId", newId ) ) ), pullAll() ) );

updateCounter.incrementAndGet();

assertThat( connection, util.eventuallyReceives( msgSuccess(), msgSuccess(), msgSuccess(), msgSuccess() ) );
assertThat( connection, util.eventuallyReceives( msgSuccess() ) );
}

private int updateNodeNoThrow( TransportConnection connection, int oldId, int newId, AtomicInteger updateCounter )
private void reset( TransportConnection connection ) throws Exception
{
try
{
updateNode( connection, oldId, newId, updateCounter );
}
catch ( Throwable t )
if ( connection != null )
{
return -1;
}
connection.send( util.chunk( ResetMessage.reset() ) );

return 0;
assertThat( connection, util.eventuallyReceives( msgSuccess() ) );
}
}
}

0 comments on commit e76901e

Please sign in to comment.