Skip to content

Commit

Permalink
Unflake BoltSchedulerShouldReportFailureWhenBusyIT
Browse files Browse the repository at this point in the history
It used to fail sometimes because bolt server rejected incoming
messages. They were rejected because server did not have free
threads to process them. It might take some time for a thread to get
returned to the thread pool after the previous task is executed.

This commit makes the test retry finite amount of time on message
rejection. It also adds increasing sleeps between messages with every
retry.
  • Loading branch information
lutovich committed Apr 30, 2018
1 parent 2d8a5f3 commit 2640a4b
Showing 1 changed file with 69 additions and 21 deletions.
Expand Up @@ -27,6 +27,7 @@
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Consumer;
Expand All @@ -43,6 +44,7 @@
import org.neo4j.test.rule.fs.EphemeralFileSystemRule;

import static java.util.Collections.emptyMap;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.isA;
import static org.hamcrest.CoreMatchers.startsWith;
Expand Down Expand Up @@ -94,10 +96,6 @@ protected Consumer<Map<String,String>> getSettingsFunction()
public void setup() throws Exception
{
address = server.lookupDefaultConnector();

connection1 = newConnection();
connection2 = newConnection();
connection3 = newConnection();
}

@After
Expand All @@ -109,23 +107,19 @@ public void cleanup() throws Exception
}

@Test
public void shouldReportFailureWhenAllThreadsInThreadPoolAreBusy() throws Exception
public void shouldReportFailureWhenAllThreadsInThreadPoolAreBusy() throws Throwable
{
// it's enough to get the bolt state machine into streaming mode to have
// the thread sticked to the connection, causing all the available threads
// to be busy (logically)
enterStreaming( connection1 );
enterStreaming( connection2 );
connection1 = enterStreaming();
connection2 = enterStreaming();

try
{
connection3 = newConnection();

connection3.connect( address )
.send( util.acceptedVersions( 1, 0, 0, 0 ) )
.send( util.chunk( init( "TestClient/1.1", emptyMap() ) ) );
connection3 = connectAndPerformBoltHandshake( newConnection() );

assertThat( connection3, eventuallyReceives( new byte[]{0, 0, 0, 1} ) );
connection3.send( util.chunk( init( "TestClient/1.1", emptyMap() ) ) );
assertThat( connection3, util.eventuallyReceives(
msgFailure( Status.Request.NoThreadsAvailable, "There are no available threads to serve this request at the moment" ) ) );

Expand All @@ -144,30 +138,84 @@ public void shouldReportFailureWhenAllThreadsInThreadPoolAreBusy() throws Except
}
}

private void enterStreaming( TransportConnection connection ) throws Exception
private TransportConnection enterStreaming() throws Throwable
{
connection.connect( address )
.send( util.acceptedVersions( 1, 0, 0, 0 ) )
.send( util.chunk( init( "TestClient/1.1", emptyMap() ) ) )
.send( util.chunk( run( "UNWIND RANGE (1, 100) AS x RETURN x" ) ) );
TransportConnection connection = null;
Throwable error = null;

assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) );
// retry couple times because worker threads might seem busy
for ( int i = 1; i <= 7; i++ )
{
try
{
connection = newConnection();
enterStreaming( connection, i );
error = null;
return connection;
}
catch ( Throwable t )
{
// failed to enter the streaming state, record the error and retry
if ( error == null )
{
error = t;
}
else
{
error.addSuppressed( t );
}

close( connection );
SECONDS.sleep( i );
}
}

if ( error != null )
{
throw error;
}

throw new IllegalStateException( "Unable to enter the streaming state" );
}

private void enterStreaming( TransportConnection connection, int sleepSeconds ) throws Exception
{
connectAndPerformBoltHandshake( connection );

connection.send( util.chunk( init( "TestClient/1.1", emptyMap() ) ) );
assertThat( connection, util.eventuallyReceives( msgSuccess() ) );

SECONDS.sleep( sleepSeconds ); // sleep a bit to allow worker thread return back to the pool

connection.send( util.chunk( run( "UNWIND RANGE (1, 100) AS x RETURN x" ) ) );
assertThat( connection, util.eventuallyReceives( msgSuccess() ) );
}

private TransportConnection connectAndPerformBoltHandshake( TransportConnection connection ) throws Exception
{
connection.connect( address ).send( util.acceptedVersions( 1, 0, 0, 0 ) );
assertThat( connection, eventuallyReceives( new byte[]{0, 0, 0, 1} ) );
return connection;
}

private void exitStreaming( TransportConnection connection ) throws Exception
{
connection.send( util.chunk( discardAll() ) );

assertThat( connection, util.eventuallyReceives( msgSuccess() ) );
}

private void close( TransportConnection connection ) throws Exception
private void close( TransportConnection connection )
{
if ( connection != null )
{
connection.disconnect();
try
{
connection.disconnect();
}
catch ( IOException ignore )
{
}
}
}
}

0 comments on commit 2640a4b

Please sign in to comment.