Skip to content

Commit

Permalink
More ITs for Bolt failures
Browse files Browse the repository at this point in the history
Added tests to check that driver does not hang when message processing fails
with exception. They are mostly to make sure that network resources
(like Netty channels) are properly closed in case of failure.
  • Loading branch information
lutovich committed Jan 3, 2017
1 parent 9b0c8d9 commit 14dab3b
Showing 1 changed file with 208 additions and 7 deletions.
215 changes: 208 additions & 7 deletions integrationtests/src/test/java/org/neo4j/bolt/BoltFailuresIT.java
Expand Up @@ -23,15 +23,22 @@
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;


import java.util.function.Consumer;

import org.neo4j.bolt.v1.runtime.BoltFactory; import org.neo4j.bolt.v1.runtime.BoltFactory;
import org.neo4j.bolt.v1.runtime.MonitoredWorkerFactory.SessionMonitor;
import org.neo4j.bolt.v1.runtime.WorkerFactory; import org.neo4j.bolt.v1.runtime.WorkerFactory;
import org.neo4j.driver.v1.Driver; import org.neo4j.driver.v1.Driver;
import org.neo4j.driver.v1.GraphDatabase; import org.neo4j.driver.v1.GraphDatabase;
import org.neo4j.driver.v1.Session;
import org.neo4j.driver.v1.exceptions.ConnectionFailureException; import org.neo4j.driver.v1.exceptions.ConnectionFailureException;
import org.neo4j.graphdb.GraphDatabaseService; import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.factory.GraphDatabaseFactory;
import org.neo4j.graphdb.factory.GraphDatabaseSettings; import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.io.IOUtils;
import org.neo4j.kernel.impl.logging.LogService; import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.util.JobScheduler; import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.test.rule.TestDirectory; import org.neo4j.test.rule.TestDirectory;


import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
Expand All @@ -40,6 +47,7 @@
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.neo4j.graphdb.factory.GraphDatabaseSettings.Connector.ConnectorType.BOLT; import static org.neo4j.graphdb.factory.GraphDatabaseSettings.Connector.ConnectorType.BOLT;
import static org.neo4j.graphdb.factory.GraphDatabaseSettings.boltConnector; import static org.neo4j.graphdb.factory.GraphDatabaseSettings.boltConnector;
Expand All @@ -48,10 +56,13 @@


public class BoltFailuresIT public class BoltFailuresIT
{ {
private static final int TEST_TIMEOUT = 20_000;

@Rule @Rule
public final TestDirectory dir = TestDirectory.testDirectory(); public final TestDirectory dir = TestDirectory.testDirectory();


private GraphDatabaseService db; private GraphDatabaseService db;
private Driver driver;


@After @After
public void shutdownDb() public void shutdownDb()
Expand All @@ -60,17 +71,21 @@ public void shutdownDb()
{ {
db.shutdown(); db.shutdown();
} }
IOUtils.closeAllSilently( driver );
} }


@Test( timeout = 20_000 ) @Test( timeout = TEST_TIMEOUT )
public void throwsWhenSessionCreationFails() public void throwsWhenWorkerCreationFails()
{ {
WorkerFactory workerFactory = mock( WorkerFactory.class ); WorkerFactory workerFactory = mock( WorkerFactory.class );
when( workerFactory.newWorker( anyString(), any() ) ).thenThrow( new IllegalStateException( "Oh!" ) ); when( workerFactory.newWorker( anyString(), any() ) ).thenThrow( new IllegalStateException( "Oh!" ) );


db = newDbFactory( new BoltKernelExtensionWithWorkerFactory( workerFactory ) ); BoltKernelExtension extension = new BoltKernelExtensionWithWorkerFactory( workerFactory );


try ( Driver driver = GraphDatabase.driver( "bolt://localhost" ) ) db = startDbWithBolt( new GraphDatabaseFactoryWithCustomBoltKernelExtension( extension ) );
driver = createDriver();

try
{ {
driver.session(); driver.session();
fail( "Exception expected" ); fail( "Exception expected" );
Expand All @@ -81,16 +96,142 @@ public void throwsWhenSessionCreationFails()
} }
} }


private GraphDatabaseService newDbFactory( BoltKernelExtension boltKernelExtension ) @Test( timeout = TEST_TIMEOUT )
public void throwsWhenMonitoredWorkerCreationFails()
{
ThrowingSessionMonitor sessionMonitor = new ThrowingSessionMonitor();
sessionMonitor.throwInSessionStarted();
Monitors monitors = newMonitorsSpy( sessionMonitor );

db = startDbWithBolt( new GraphDatabaseFactory().setMonitors( monitors ) );
driver = createDriver();

try
{
driver.session();
fail( "Exception expected" );
}
catch ( Exception e )
{
assertThat( e, instanceOf( ConnectionFailureException.class ) );
}
}

@Test( timeout = TEST_TIMEOUT )
public void throwsWhenInitMessageReceiveFails()
{
throwsWhenInitMessageFails( ThrowingSessionMonitor::throwInMessageReceived, false );
}

@Test( timeout = TEST_TIMEOUT )
public void throwsWhenInitMessageProcessingFailsToStart()
{
throwsWhenInitMessageFails( ThrowingSessionMonitor::throwInProcessingStarted, false );
}

@Test( timeout = TEST_TIMEOUT )
public void throwsWhenInitMessageProcessingFailsToComplete()
{
throwsWhenInitMessageFails( ThrowingSessionMonitor::throwInProcessingDone, true );
}

@Test( timeout = TEST_TIMEOUT )
public void throwsWhenRunMessageReceiveFails()
{
throwsWhenRunMessageFails( ThrowingSessionMonitor::throwInMessageReceived );
}

@Test( timeout = TEST_TIMEOUT )
public void throwsWhenRunMessageProcessingFailsToStart()
{
throwsWhenRunMessageFails( ThrowingSessionMonitor::throwInProcessingStarted );
}

@Test( timeout = TEST_TIMEOUT )
public void throwsWhenRunMessageProcessingFailsToComplete()
{
throwsWhenRunMessageFails( ThrowingSessionMonitor::throwInProcessingDone );
}

private void throwsWhenInitMessageFails( Consumer<ThrowingSessionMonitor> monitorSetup,
boolean shouldBeAbleToGetSession )
{
ThrowingSessionMonitor sessionMonitor = new ThrowingSessionMonitor();
monitorSetup.accept( sessionMonitor );
Monitors monitors = newMonitorsSpy( sessionMonitor );

db = startTestDb( monitors );
driver = createDriver();

try
{
Session session = driver.session();
if ( shouldBeAbleToGetSession )
{
session.run( "CREATE ()" ).consume();
}
else
{
fail( "Exception expected" );
}
}
catch ( Exception e )
{
assertThat( e, instanceOf( ConnectionFailureException.class ) );
}
}

private void throwsWhenRunMessageFails( Consumer<ThrowingSessionMonitor> monitorSetup )
{
ThrowingSessionMonitor sessionMonitor = new ThrowingSessionMonitor();
Monitors monitors = newMonitorsSpy( sessionMonitor );

db = startTestDb( monitors );
driver = createDriver();

Session session = driver.session();
session.run( "CREATE ()" );
try
{
monitorSetup.accept( sessionMonitor );
session.close();
fail( "Exception expected" );
}
catch ( Exception e )
{
assertThat( e, instanceOf( ConnectionFailureException.class ) );
}
}

private GraphDatabaseService startTestDb( Monitors monitors )
{ {
return new GraphDatabaseFactoryWithCustomBoltKernelExtension( boltKernelExtension ) return startDbWithBolt( new GraphDatabaseFactory().setMonitors( monitors ) );
.newEmbeddedDatabaseBuilder( dir.graphDbDir() ) }

private GraphDatabaseService startDbWithBolt( GraphDatabaseFactory dbFactory )
{
return dbFactory.newEmbeddedDatabaseBuilder( dir.graphDbDir() )
.setConfig( boltConnector( "0" ).type, BOLT.name() ) .setConfig( boltConnector( "0" ).type, BOLT.name() )
.setConfig( boltConnector( "0" ).enabled, TRUE ) .setConfig( boltConnector( "0" ).enabled, TRUE )
.setConfig( GraphDatabaseSettings.auth_enabled, FALSE ) .setConfig( GraphDatabaseSettings.auth_enabled, FALSE )
.newGraphDatabase(); .newGraphDatabase();
} }


private static Driver createDriver()
{
return GraphDatabase.driver( "bolt://localhost" );
}

private static Monitors newMonitorsSpy( ThrowingSessionMonitor sessionMonitor )
{
Monitors monitors = spy( new Monitors() );
// it is not allowed to throw exceptions from monitors
// make the given sessionMonitor be returned as is, without any proxying
when( monitors.newMonitor( SessionMonitor.class ) ).thenReturn( sessionMonitor );
when( monitors.hasListeners( SessionMonitor.class ) ).thenReturn( true );
return monitors;
}

private static class BoltKernelExtensionWithWorkerFactory extends BoltKernelExtension private static class BoltKernelExtensionWithWorkerFactory extends BoltKernelExtension
{ {
final WorkerFactory workerFactory; final WorkerFactory workerFactory;
Expand All @@ -107,4 +248,64 @@ protected WorkerFactory createWorkerFactory( BoltFactory boltFactory, JobSchedul
return workerFactory; return workerFactory;
} }
} }

private static class ThrowingSessionMonitor implements SessionMonitor
{
volatile boolean throwInSessionStarted;
volatile boolean throwInMessageReceived;
volatile boolean throwInProcessingStarted;
volatile boolean throwInProcessingDone;

@Override
public void sessionStarted()
{
throwIfNeeded( throwInSessionStarted );
}

@Override
public void messageReceived()
{
throwIfNeeded( throwInMessageReceived );
}

@Override
public void processingStarted( long queueTime )
{
throwIfNeeded( throwInProcessingStarted );
}

@Override
public void processingDone( long processingTime )
{
throwIfNeeded( throwInProcessingDone );
}

void throwInSessionStarted()
{
throwInSessionStarted = true;
}

void throwInMessageReceived()
{
throwInMessageReceived = true;
}

void throwInProcessingStarted()
{
throwInProcessingStarted = true;
}

void throwInProcessingDone()
{
throwInProcessingDone = true;
}

void throwIfNeeded( boolean shouldThrow )
{
if ( shouldThrow )
{
throw new RuntimeException();
}
}
}
} }

0 comments on commit 14dab3b

Please sign in to comment.