Skip to content

Commit

Permalink
Skip message processing when Bolt connection terminated
Browse files Browse the repository at this point in the history
Bolt connection can be terminated asynchronously by the Netty's
`#channelInactive()` callback. This will mark the Bolt state machine
for termination. When worker thread detects termination it rolls back
the open transaction and clears the existing result handle.

Previously, it also tried to continue streaming the result after
processing the termination signal. This caused an attempt to stream
a `null` result and an NPE. This NPE only appeared in the database
logs. It wasn't possible to stream it back in a FAILURE message
because the connection has been terminated.

This commit fixes the problem by making Bolt state machine skip message
processing when it is terminated. It also makes `ErrorReporter` log
full status code with classification.
  • Loading branch information
lutovich committed Jun 1, 2018
1 parent 77162c7 commit ef0ccfe
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 193 deletions.
Expand Up @@ -34,9 +34,9 @@
import org.neo4j.function.ThrowingConsumer; import org.neo4j.function.ThrowingConsumer;
import org.neo4j.graphdb.security.AuthorizationExpiredException; import org.neo4j.graphdb.security.AuthorizationExpiredException;
import org.neo4j.internal.kernel.api.exceptions.KernelException; import org.neo4j.internal.kernel.api.exceptions.KernelException;
import org.neo4j.internal.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.api.bolt.ManagedBoltStateMachine; import org.neo4j.kernel.api.bolt.ManagedBoltStateMachine;
import org.neo4j.kernel.api.exceptions.Status; import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.internal.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.impl.logging.LogService; import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;
import org.neo4j.values.AnyValue; import org.neo4j.values.AnyValue;
Expand Down Expand Up @@ -110,22 +110,18 @@ private void after()
{ {
try try
{ {
if ( ctx.hasFailedOrIgnored() ) Neo4jError pendingError = ctx.pendingError;
if ( pendingError != null )
{ {
Neo4jError pendingError = ctx.pendingError; ctx.markFailed( pendingError );

}
if ( pendingError != null )
{
ctx.markFailed( pendingError );
}
else
{
ctx.markIgnored();
}


ctx.resetPendingFailedAndIgnored(); if ( ctx.pendingIgnore )
{
ctx.markIgnored();
} }


ctx.resetPendingFailedAndIgnored();
ctx.responseHandler.onFinish(); ctx.responseHandler.onFinish();
} }
finally finally
Expand All @@ -144,7 +140,7 @@ public void init( String userAgent, Map<String,Object> authToken,
before( handler ); before( handler );
try try
{ {
if ( !ctx.hasFailedOrIgnored() ) if ( ctx.canProcessMessage() )
{ {
state = state.init( this, userAgent, authToken ); state = state.init( this, userAgent, authToken );
} }
Expand All @@ -171,6 +167,7 @@ public void ackFailure( BoltResponseHandler handler ) throws BoltConnectionFatal
before( handler ); before( handler );
try try
{ {
// it should always be fine to ACK_FAILURE thus no canProcessMessage check
state = state.ackFailure( this ); state = state.ackFailure( this );
} }
finally finally
Expand Down Expand Up @@ -198,7 +195,7 @@ public void reset( BoltResponseHandler handler ) throws BoltConnectionFatality
before( handler ); before( handler );
try try
{ {
if ( !ctx.hasFailedOrIgnored() ) if ( ctx.canProcessMessage() )
{ {
state = state.reset( this ); state = state.reset( this );
} }
Expand All @@ -223,7 +220,7 @@ public void run( String statement, MapValue params, BoltResponseHandler handler
before( handler ); before( handler );
try try
{ {
if ( !ctx.hasFailedOrIgnored() ) if ( ctx.canProcessMessage() )
{ {
state = state.run( this, statement, params ); state = state.run( this, statement, params );
handler.onMetadata( "result_available_after", Values.longValue( clock.millis() - start ) ); handler.onMetadata( "result_available_after", Values.longValue( clock.millis() - start ) );
Expand All @@ -245,7 +242,7 @@ public void discardAll( BoltResponseHandler handler ) throws BoltConnectionFatal
before( handler ); before( handler );
try try
{ {
if ( !ctx.hasFailedOrIgnored() ) if ( ctx.canProcessMessage() )
{ {
state = state.discardAll( this ); state = state.discardAll( this );
} }
Expand All @@ -265,7 +262,7 @@ public void pullAll( BoltResponseHandler handler ) throws BoltConnectionFatality
before( handler ); before( handler );
try try
{ {
if ( !ctx.hasFailedOrIgnored() ) if ( ctx.canProcessMessage() )
{ {
state = state.pullAll( this ); state = state.pullAll( this );
} }
Expand Down Expand Up @@ -322,7 +319,7 @@ public void externalError( Neo4jError error, BoltResponseHandler handler ) throw
before( handler ); before( handler );
try try
{ {
if ( !ctx.hasFailedOrIgnored() ) if ( ctx.canProcessMessage() )
{ {
fail( this, error ); fail( this, error );
this.state = State.FAILED; this.state = State.FAILED;
Expand Down Expand Up @@ -885,9 +882,9 @@ public void onFinish()
} }
} }


private boolean hasFailedOrIgnored() private boolean canProcessMessage()
{ {
return pendingError != null || pendingIgnore; return !closed && pendingError == null && !pendingIgnore;
} }


private void resetPendingFailedAndIgnored() private void resetPendingFailedAndIgnored()
Expand Down
Expand Up @@ -61,7 +61,7 @@ public void report( Neo4jError error )
if ( error.status().code().classification() == DatabaseError ) if ( error.status().code().classification() == DatabaseError )
{ {
String message = format( "Client triggered an unexpected error [%s]: %s, reference %s.", String message = format( "Client triggered an unexpected error [%s]: %s, reference %s.",
error.status(), error.message(), error.reference() ); error.status().code().serialize(), error.message(), error.reference() );


// Writing to user log gets duplicated to the internal log // Writing to user log gets duplicated to the internal log
userLog.error( message ); userLog.error( message );
Expand Down
Expand Up @@ -40,6 +40,7 @@
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
Expand Down Expand Up @@ -67,6 +68,7 @@
import static org.neo4j.bolt.v1.runtime.BoltStateMachine.State.STREAMING; import static org.neo4j.bolt.v1.runtime.BoltStateMachine.State.STREAMING;
import static org.neo4j.bolt.v1.runtime.MachineRoom.EMPTY_PARAMS; import static org.neo4j.bolt.v1.runtime.MachineRoom.EMPTY_PARAMS;
import static org.neo4j.bolt.v1.runtime.MachineRoom.USER_AGENT; import static org.neo4j.bolt.v1.runtime.MachineRoom.USER_AGENT;
import static org.neo4j.bolt.v1.runtime.MachineRoom.init;
import static org.neo4j.bolt.v1.runtime.MachineRoom.newMachine; import static org.neo4j.bolt.v1.runtime.MachineRoom.newMachine;
import static org.neo4j.bolt.v1.runtime.MachineRoom.newMachineWithTransaction; import static org.neo4j.bolt.v1.runtime.MachineRoom.newMachineWithTransaction;
import static org.neo4j.bolt.v1.runtime.MachineRoom.newMachineWithTransactionSPI; import static org.neo4j.bolt.v1.runtime.MachineRoom.newMachineWithTransactionSPI;
Expand Down Expand Up @@ -682,6 +684,23 @@ public void shouldInvokeResponseHandlerOnMarkFailedIfThereIsHandler() throws Exc
verify( machine.ctx.responseHandler ).markFailed( error ); verify( machine.ctx.responseHandler ).markFailed( error );
} }


@Test
public void shouldNotFailWhenTerminatedAndPullAll() throws Exception
{
BoltStateMachineSPI spi = mock( BoltStateMachineSPI.class, RETURNS_MOCKS );
BoltStateMachine machine = init( newMachine( spi ) );
machine.state = STREAMING;
((TransactionStateMachine) machine.statementProcessor()).ctx.currentResult = BoltResult.EMPTY;

BoltResponseHandler responseHandler = mock( BoltResponseHandler.class );

machine.terminate();
machine.pullAll( responseHandler );

verify( spi, never() ).reportError( any() );
assertNotEquals( FAILED, machine.state );
}

private static void testMarkFailedOnNextMessage( ThrowingBiConsumer<BoltStateMachine,BoltResponseHandler,BoltConnectionFatality> action ) throws Exception private static void testMarkFailedOnNextMessage( ThrowingBiConsumer<BoltStateMachine,BoltResponseHandler,BoltConnectionFatality> action ) throws Exception
{ {
// Given // Given
Expand Down
Expand Up @@ -52,9 +52,14 @@ private MachineRoom()
} }


public static BoltStateMachine newMachine() public static BoltStateMachine newMachine()
{
return newMachine( mock( BoltStateMachineSPI.class, RETURNS_MOCKS ) );
}

public static BoltStateMachine newMachine( BoltStateMachineSPI spi )
{ {
BoltChannel boltChannel = mock( BoltChannel.class ); BoltChannel boltChannel = mock( BoltChannel.class );
return new BoltStateMachine( mock( BoltStateMachineSPI.class, RETURNS_MOCKS ), boltChannel, Clock.systemUTC(), NullLogService.getInstance() ); return new BoltStateMachine( spi, boltChannel, Clock.systemUTC(), NullLogService.getInstance() );
} }


public static BoltStateMachine newMachine( BoltStateMachine.State state ) throws AuthenticationException, BoltConnectionFatality public static BoltStateMachine newMachine( BoltStateMachine.State state ) throws AuthenticationException, BoltConnectionFatality
Expand Down Expand Up @@ -95,16 +100,17 @@ public static BoltStateMachine newMachineWithTransactionSPI( TransactionStateMac
return machine; return machine;
} }


private static void init( BoltStateMachine machine ) throws AuthenticationException, BoltConnectionFatality public static BoltStateMachine init( BoltStateMachine machine ) throws AuthenticationException, BoltConnectionFatality
{ {
init( machine, null ); return init( machine, null );
} }


private static void init( BoltStateMachine machine, String owner ) throws AuthenticationException, BoltConnectionFatality private static BoltStateMachine init( BoltStateMachine machine, String owner ) throws AuthenticationException, BoltConnectionFatality
{ {
AuthenticationResult authenticationResult = mock( AuthenticationResult.class ); AuthenticationResult authenticationResult = mock( AuthenticationResult.class );
when( machine.spi.authenticate( any() ) ).thenReturn( authenticationResult ); when( machine.spi.authenticate( any() ) ).thenReturn( authenticationResult );
machine.init( USER_AGENT, owner == null ? emptyMap() : Collections.singletonMap( AuthToken.PRINCIPAL, owner ), nullResponseHandler() ); machine.init( USER_AGENT, owner == null ? emptyMap() : Collections.singletonMap( AuthToken.PRINCIPAL, owner ), nullResponseHandler() );
return machine;
} }


private static void runBegin( BoltStateMachine machine ) throws BoltConnectionFatality private static void runBegin( BoltStateMachine machine ) throws BoltConnectionFatality
Expand Down
@@ -0,0 +1,156 @@
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.bolt.v1.transport.integration;

import org.junit.After;
import org.junit.Rule;
import org.junit.Test;

import java.io.IOException;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.neo4j.bolt.v1.messaging.Neo4jPackV1;
import org.neo4j.bolt.v1.transport.socket.client.SocketConnection;
import org.neo4j.bolt.v1.transport.socket.client.TransportConnection;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.test.TestGraphDatabaseFactory;

import static java.util.Collections.emptyMap;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertThat;
import static org.neo4j.bolt.v1.messaging.message.InitMessage.init;
import static org.neo4j.bolt.v1.messaging.message.PullAllMessage.pullAll;
import static org.neo4j.bolt.v1.messaging.message.RunMessage.run;
import static org.neo4j.bolt.v1.messaging.util.MessageMatchers.msgSuccess;

public class ConnectionTerminationIT
{
private static final int EXECUTION_TIME_SECONDS = 20;

private final AssertableLogProvider logProvider = new AssertableLogProvider();
private final TransportTestUtil util = new TransportTestUtil( new Neo4jPackV1() );
private final ExecutorService executor = Executors.newCachedThreadPool();

@Rule
public final Neo4jWithSocket server = new Neo4jWithSocket( getClass(),
new TestGraphDatabaseFactory().setInternalLogProvider( logProvider ),
settings -> settings.put( GraphDatabaseSettings.auth_enabled.name(), "false" ) );

@After
public void tearDown() throws Exception
{
executor.shutdownNow();
executor.awaitTermination( 1, TimeUnit.MINUTES );
}

@Test
public void shouldNotFailWhenConnectionsAreKilled() throws Exception
{
Set<TransportConnection> connections = ConcurrentHashMap.newKeySet();
AtomicBoolean stop = new AtomicBoolean();

for ( int i = 0; i < 10; i++ )
{
executor.submit( () -> runQueriesUntilStopped( connections, stop ) );
}

executor.submit( () -> killConnectionsUntilStopped( connections, stop ) );

SECONDS.sleep( EXECUTION_TIME_SECONDS );
stop.set( true );

logProvider.assertNoLogCallContaining( Status.Classification.DatabaseError.toString() );
logProvider.assertNoLogCallContaining( Status.General.UnknownError.code().serialize() );
}

private void runQueriesUntilStopped( Set<TransportConnection> connections, AtomicBoolean stop )
{
while ( !stop.get() )
{
TransportConnection connection = null;
try
{
connection = new SocketConnection().connect( server.lookupDefaultConnector() );
connections.add( connection );

connection.send( util.defaultAcceptedVersions() );
assertThat( connection, util.eventuallyReceivesSelectedProtocolVersion() );

connection.send( util.chunk( init( "TestClient/1.1", emptyMap() ) ) );
assertThat( connection, util.eventuallyReceives( msgSuccess() ) );

connection.send( util.chunk(
run( "UNWIND range(1, 10000) AS x RETURN x, x * x, 'Hello-' + x" ),
pullAll() ) );

// sleep a bit before disconnecting to allow db start streaming the result
Thread.sleep( 500 );
}
catch ( Throwable ignore )
{
}
finally
{
if ( connection != null && connections.remove( connection ) )
{
disconnect( connection );
}
}
}
}

private static void killConnectionsUntilStopped( Set<TransportConnection> connections, AtomicBoolean stop )
{
while ( !stop.get() )
{
Iterator<TransportConnection> iterator = connections.iterator();
if ( iterator.hasNext() )
{
TransportConnection connection = iterator.next();
if ( connections.remove( connection ) )
{
disconnect( connection );
}
}
}
}

private static void disconnect( TransportConnection connection )
{
try
{
if ( connection != null )
{
connection.disconnect();
}
}
catch ( IOException ignore )
{
}
}
}

0 comments on commit ef0ccfe

Please sign in to comment.