Skip to content

Commit

Permalink
Fixed an error in bolt where failure message is not flushed when no f…
Browse files Browse the repository at this point in the history
…ree work thread is available.

When the worker thread pool is full, bolt server will fail new requests from clients. However this error is not flushed correctly to the client.
This PR make bolt server to first flush the failure message to client and then close the connection to enforce the driver failing fast without waiting for more responses.
  • Loading branch information
Zhen Li authored and zhenlineo committed May 8, 2019
1 parent d598c28 commit c7cf018
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 19 deletions.
Expand Up @@ -199,7 +199,7 @@ protected boolean processNextBatch( int batchCount, boolean exitIfNoJobsAvailabl
}

// we processed all pending messages, let's flush underlying channel
if ( queue.size() == 0 || maxBatchSize == 1 )
if ( queue.size() == 0 || batchCount == 1 )
{
output.flush();
}
Expand Down Expand Up @@ -275,6 +275,9 @@ public void handleSchedulingError( Throwable t )
// and it will either send a failure response to the client or close the connection and its
// related resources (if closing)
processNextBatch( 1, true );
// we close the connection directly to enforce the client to stop waiting for
// any more messages responses besides the failure message.
close();
}

@Override
Expand Down
Expand Up @@ -76,17 +76,4 @@ protected States buildStates()

return new States( connected, failed );
}

@Override
protected void after()
{
if ( connectionState.isTerminated() )
{
close();
}
else
{
super.after();
}
}
}
Expand Up @@ -43,15 +43,16 @@
import org.neo4j.logging.internal.SimpleLogService;
import org.neo4j.test.rule.concurrent.OtherThreadRule;

import static org.hamcrest.CoreMatchers.any;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isA;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyCollection;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
Expand All @@ -67,6 +68,7 @@ public class DefaultBoltConnectionTest
private final BoltConnectionLifetimeListener connectionListener = mock( BoltConnectionLifetimeListener.class );
private final BoltConnectionQueueMonitor queueMonitor = mock( BoltConnectionQueueMonitor.class );
private final EmbeddedChannel channel = new EmbeddedChannel();
private final PackOutput output = mock( PackOutput.class );

private BoltChannel boltChannel;
private BoltStateMachine stateMachine;
Expand Down Expand Up @@ -245,7 +247,7 @@ public void stopShouldFirstMarkStateMachineForTermination()
connection.stop();

verify( stateMachine ).markForTermination();
verify( queueMonitor ).enqueued( ArgumentMatchers.eq( connection ), ArgumentMatchers.any( Job.class ) );
verify( queueMonitor ).enqueued( ArgumentMatchers.eq( connection ), any( Job.class ) );
}

@Test
Expand All @@ -257,7 +259,7 @@ public void stopShouldCloseStateMachineOnProcessNextBatch()

connection.processNextBatch();

verify( queueMonitor ).enqueued( ArgumentMatchers.eq( connection ), ArgumentMatchers.any( Job.class ) );
verify( queueMonitor ).enqueued( ArgumentMatchers.eq( connection ), any( Job.class ) );
verify( stateMachine ).markForTermination();
verify( stateMachine ).close();
}
Expand All @@ -271,7 +273,7 @@ public void stopShouldCloseStateMachineIfEnqueueEndsWithRejectedExecutionExcepti
{
connection.handleSchedulingError( new RejectedExecutionException() );
return null;
} ).when( queueMonitor ).enqueued( ArgumentMatchers.eq( connection ), ArgumentMatchers.any( Job.class ) );
} ).when( queueMonitor ).enqueued( ArgumentMatchers.eq( connection ), any( Job.class ) );

connection.stop();

Expand Down Expand Up @@ -397,14 +399,30 @@ public void processNextBatchShouldReturnWhenConnectionIsStopped() throws Excepti
verify( stateMachine ).close();
}

@Test
public void shouldFlushErrorAndCloseConnectionIfFailedToSchedule() throws Throwable
{
// Given
BoltConnection connection = newConnection();

// When
RejectedExecutionException error = new RejectedExecutionException( "Failed to schedule" );
connection.handleSchedulingError( error );

// Then
verify( stateMachine ).markFailed( argThat( e -> e.status().equals( Status.Request.NoThreadsAvailable ) ) );
verify( stateMachine ).close();
verify( output ).flush();
}

private DefaultBoltConnection newConnection()
{
return newConnection( 10 );
}

private DefaultBoltConnection newConnection( int maxBatchSize )
{
return new DefaultBoltConnection( boltChannel, mock( PackOutput.class ), stateMachine, logService, connectionListener, queueMonitor, maxBatchSize );
return new DefaultBoltConnection( boltChannel, output, stateMachine, logService, connectionListener, queueMonitor, maxBatchSize );
}

}

0 comments on commit c7cf018

Please sign in to comment.