Skip to content

Commit

Permalink
Log remote address for broken network connection
Browse files Browse the repository at this point in the history
Commit makes bolt server log client address when it's unable to write
error back. Such error most probably means that client driver
has been closed.

Also removed a flaky log message IT. All cases seem to already be
covered by smaller unit tests.
  • Loading branch information
lutovich committed Nov 7, 2017
1 parent d07198d commit 675ca92
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 137 deletions.
Expand Up @@ -141,8 +141,7 @@ private void publishError( BoltResponseMessageHandler<IOException> out, Neo4jErr
{ {
// we tried to write error back to the client and realized that the underlying channel is closed // we tried to write error back to the client and realized that the underlying channel is closed
// log a warning, client driver might have just been stopped and closed all socket connections // log a warning, client driver might have just been stopped and closed all socket connections
log.warn( "Unable to send error back to the client. " + log.warn( "Unable to send error back to the client. " + e.getMessage(), error.cause() );
"Communication channel is closed. Client has probably been stopped.", error.cause() );
} }
catch ( Throwable t ) catch ( Throwable t )
{ {
Expand Down
Expand Up @@ -163,7 +163,8 @@ private void ensure( int size ) throws IOException
assert size <= maxChunkSize : size + " > " + maxChunkSize; assert size <= maxChunkSize : size + " > " + maxChunkSize;
if ( closed.get() ) if ( closed.get() )
{ {
throw new PackOutputClosedException( "Unable to write to the closed output channel" ); throw new PackOutputClosedException( "Network channel towards " + channel.remoteAddress() + " is closed. " +
"Client has probably been stopped." );
} }
int toWriteSize = chunkOpen ? size : size + CHUNK_HEADER_SIZE; int toWriteSize = chunkOpen ? size : size + CHUNK_HEADER_SIZE;
synchronized ( this ) synchronized ( this )
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.junit.Test; import org.junit.Test;


import java.io.IOException; import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
Expand All @@ -36,12 +37,14 @@
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;


import org.neo4j.bolt.v1.packstream.PackOutputClosedException;
import org.neo4j.bolt.v1.transport.ChunkedOutput; import org.neo4j.bolt.v1.transport.ChunkedOutput;
import org.neo4j.kernel.impl.util.HexPrinter; import org.neo4j.kernel.impl.util.HexPrinter;


import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
Expand All @@ -53,6 +56,19 @@ public class ChunkedOutputTest
private final ByteBuffer writtenData = ByteBuffer.allocate( 1024 ); private final ByteBuffer writtenData = ByteBuffer.allocate( 1024 );
private ChunkedOutput out; private ChunkedOutput out;


@Before
public void setUp()
{
when( ch.alloc() ).thenReturn( UnpooledByteBufAllocator.DEFAULT );
this.out = new ChunkedOutput( ch, 16 );
}

@After
public void tearDown()
{
out.close();
}

@Test @Test
public void shouldNotNPE() throws Throwable public void shouldNotNPE() throws Throwable
{ {
Expand Down Expand Up @@ -273,6 +289,26 @@ public void shouldFlushOnClose() throws Throwable
"00 00 00 02 00 08 00 00 00 00 00 00 00 03 00 00" ) ); "00 00 00 02 00 08 00 00 00 00 00 00 00 03 00 00" ) );
} }


@Test
public void shouldThrowErrorWithRemoteAddressWhenClosed() throws Exception
{
SocketAddress remoteAddress = mock( SocketAddress.class );
String remoteAddressString = "client.server.com:7687";
when( remoteAddress.toString() ).thenReturn( remoteAddressString );
when( ch.remoteAddress() ).thenReturn( remoteAddress );

out.close();

try
{
out.writeInt( 42 );
}
catch ( PackOutputClosedException e )
{
assertThat( e.getMessage(), containsString( remoteAddressString ) );
}
}

private void setupWriteAndFlush() private void setupWriteAndFlush()
{ {
when( ch.writeAndFlush( any(), any( ChannelPromise.class ) ) ).thenAnswer( invocation -> when( ch.writeAndFlush( any(), any( ChannelPromise.class ) ) ).thenAnswer( invocation ->
Expand All @@ -283,18 +319,4 @@ private void setupWriteAndFlush()
return null; return null;
} ); } );
} }

@Before
public void setup()
{
when( ch.alloc() ).thenReturn( UnpooledByteBufAllocator.DEFAULT );
this.out = new ChunkedOutput( ch, 16 );
}

@After
public void teardown()
{
out.close();
}

} }
122 changes: 3 additions & 119 deletions integrationtests/src/test/java/org/neo4j/bolt/BoltFailuresIT.java
Expand Up @@ -26,63 +26,42 @@
import org.junit.rules.Timeout; import org.junit.rules.Timeout;


import java.time.Clock; import java.time.Clock;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Supplier;


import org.neo4j.bolt.v1.runtime.BoltFactory; import org.neo4j.bolt.v1.runtime.BoltFactory;
import org.neo4j.bolt.v1.runtime.MonitoredWorkerFactory.SessionMonitor; import org.neo4j.bolt.v1.runtime.MonitoredWorkerFactory.SessionMonitor;
import org.neo4j.bolt.v1.runtime.WorkerFactory; import org.neo4j.bolt.v1.runtime.WorkerFactory;
import org.neo4j.bolt.v1.transport.BoltMessagingProtocolV1Handler;
import org.neo4j.driver.v1.Config; import org.neo4j.driver.v1.Config;
import org.neo4j.driver.v1.Driver; import org.neo4j.driver.v1.Driver;
import org.neo4j.driver.v1.GraphDatabase; import org.neo4j.driver.v1.GraphDatabase;
import org.neo4j.driver.v1.Session; import org.neo4j.driver.v1.Session;
import org.neo4j.driver.v1.Transaction; import org.neo4j.driver.v1.Transaction;
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
import org.neo4j.function.Predicates;
import org.neo4j.graphdb.GraphDatabaseService; import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.TransactionTerminatedException;
import org.neo4j.graphdb.factory.GraphDatabaseFactory; import org.neo4j.graphdb.factory.GraphDatabaseFactory;
import org.neo4j.graphdb.factory.GraphDatabaseSettings; import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.io.IOUtils; import org.neo4j.io.IOUtils;
import org.neo4j.kernel.configuration.BoltConnector;
import org.neo4j.kernel.impl.logging.LogService; import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.logging.AssertableLogProvider.LogMatcher;
import org.neo4j.logging.LogProvider;
import org.neo4j.scheduler.JobScheduler; import org.neo4j.scheduler.JobScheduler;
import org.neo4j.test.TestEnterpriseGraphDatabaseFactory; import org.neo4j.test.TestEnterpriseGraphDatabaseFactory;
import org.neo4j.test.rule.TestDirectory; import org.neo4j.test.rule.TestDirectory;


import static java.util.concurrent.CompletableFuture.runAsync;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.neo4j.graphdb.Label.label;
import static org.neo4j.graphdb.factory.GraphDatabaseSettings.Connector.ConnectorType.BOLT; import static org.neo4j.graphdb.factory.GraphDatabaseSettings.Connector.ConnectorType.BOLT;
import static org.neo4j.graphdb.factory.GraphDatabaseSettings.boltConnector;
import static org.neo4j.helpers.collection.Iterators.count;
import static org.neo4j.helpers.collection.Iterators.single;
import static org.neo4j.kernel.configuration.Settings.FALSE; import static org.neo4j.kernel.configuration.Settings.FALSE;
import static org.neo4j.kernel.configuration.Settings.TRUE; import static org.neo4j.kernel.configuration.Settings.TRUE;
import static org.neo4j.logging.AssertableLogProvider.inLog;


public class BoltFailuresIT public class BoltFailuresIT
{ {
private static final int TEST_TIMEOUT_SECONDS = 120; private static final int TEST_TIMEOUT_SECONDS = 120;
private static final int PREDICATE_AWAIT_TIMEOUT_SECONDS = TEST_TIMEOUT_SECONDS / 2;


private final TestDirectory dir = TestDirectory.testDirectory(); private final TestDirectory dir = TestDirectory.testDirectory();


Expand Down Expand Up @@ -180,42 +159,6 @@ public void throwsWhenRunMessageProcessingFailsToComplete()
throwsWhenRunMessageFails( ThrowingSessionMonitor::throwInProcessingDone ); throwsWhenRunMessageFails( ThrowingSessionMonitor::throwInProcessingDone );
} }


@Test
public void boltServerLogsRealErrorWhenDriverIsClosedWithRunningTransactions() throws Exception
{
AssertableLogProvider internalLogProvider = new AssertableLogProvider();
db = startTestDb( internalLogProvider );

// create a dummy node
db.execute( "CREATE (:Node)" ).close();

// lock that dummy node to make all subsequent writes wait on an exclusive lock
org.neo4j.graphdb.Transaction tx = db.beginTx();
Node node = single( db.findNodes( label( "Node" ) ) );
tx.acquireWriteLock( node );

Driver driver = createDriver();

// try to execute write query for the same node through the driver
Future<?> writeThroughDriverFuture = updateAllNodesAsync( driver );
// make sure this query is executing and visible in query listing
awaitNumberOfActiveQueriesToBe( 1 );

// close driver while it has ongoing transaction, it should get terminated
driver.close();

// driver transaction should fail
expectFailure( writeThroughDriverFuture );

// make sure there are no active queries
awaitNumberOfActiveQueriesToBe( 0 );

// verify that closing of the driver resulted in transaction termination on the server and correct log message
awaitLogToContainMessage( internalLogProvider, inLog( BoltMessagingProtocolV1Handler.class ).warn(
startsWith( "Unable to send error back to the client" ),
instanceOf( TransactionTerminatedException.class ) ) );
}

private void throwsWhenInitMessageFails( Consumer<ThrowingSessionMonitor> monitorSetup, private void throwsWhenInitMessageFails( Consumer<ThrowingSessionMonitor> monitorSetup,
boolean shouldBeAbleToBeginTransaction ) boolean shouldBeAbleToBeginTransaction )
{ {
Expand Down Expand Up @@ -281,69 +224,15 @@ private GraphDatabaseService startTestDb( Monitors monitors )
return startDbWithBolt( newDbFactory().setMonitors( monitors ) ); return startDbWithBolt( newDbFactory().setMonitors( monitors ) );
} }


private GraphDatabaseService startTestDb( LogProvider internalLogProvider )
{
return startDbWithBolt( newDbFactory().setInternalLogProvider( internalLogProvider ) );
}

private GraphDatabaseService startDbWithBolt( GraphDatabaseFactory dbFactory ) private GraphDatabaseService startDbWithBolt( GraphDatabaseFactory dbFactory )
{ {
return dbFactory.newEmbeddedDatabaseBuilder( dir.graphDbDir() ) return dbFactory.newEmbeddedDatabaseBuilder( dir.graphDbDir() )
.setConfig( boltConnector( "0" ).type, BOLT.name() ) .setConfig( new BoltConnector( "0" ).type, BOLT.name() )
.setConfig( boltConnector( "0" ).enabled, TRUE ) .setConfig( new BoltConnector( "0" ).enabled, TRUE )
.setConfig( GraphDatabaseSettings.auth_enabled, FALSE ) .setConfig( GraphDatabaseSettings.auth_enabled, FALSE )
.newGraphDatabase(); .newGraphDatabase();
} }


private void awaitNumberOfActiveQueriesToBe( int value ) throws TimeoutException
{
await( () ->
{
Result listQueriesResult = db.execute( "CALL dbms.listQueries()" );
return count( listQueriesResult ) == value + 1; // procedure call itself is also listed
} );
}

private void awaitLogToContainMessage( AssertableLogProvider logProvider, LogMatcher matcher )
throws TimeoutException
{
try
{
await( () -> logProvider.containsMatchingLogCall( matcher ) );
}
catch ( TimeoutException e )
{
System.err.println( "Expected log call did not happen. Full log:" );
System.err.println( logProvider.serialize() );
throw e;
}
}

private Future<?> updateAllNodesAsync( Driver driver )
{
return runAsync( () ->
{
try ( Session session = driver.session() )
{
session.run( "MATCH (n) SET n.prop = 42" ).consume();
}
} );
}

private static void expectFailure( Future<?> future ) throws TimeoutException, InterruptedException
{
try
{
future.get( 1, MINUTES );
fail( "Exception expected" );
}
catch ( ExecutionException e )
{
// expected
e.printStackTrace();
}
}

private static TestEnterpriseGraphDatabaseFactory newDbFactory() private static TestEnterpriseGraphDatabaseFactory newDbFactory()
{ {
return new TestEnterpriseGraphDatabaseFactory(); return new TestEnterpriseGraphDatabaseFactory();
Expand All @@ -364,11 +253,6 @@ private static Monitors newMonitorsSpy( ThrowingSessionMonitor sessionMonitor )
return monitors; return monitors;
} }


private static void await( Supplier<Boolean> condition ) throws TimeoutException
{
Predicates.await( condition, PREDICATE_AWAIT_TIMEOUT_SECONDS, SECONDS );
}

private static class BoltKernelExtensionWithWorkerFactory extends BoltKernelExtension private static class BoltKernelExtensionWithWorkerFactory extends BoltKernelExtension
{ {
final WorkerFactory workerFactory; final WorkerFactory workerFactory;
Expand Down

0 comments on commit 675ca92

Please sign in to comment.