Skip to content

Commit

Permalink
Avoid closing bolt state machine multiple times
Browse files Browse the repository at this point in the history
Each bolt worker owns a state machine. This state machine was closed
when worker exited (both cleanly and by throwing an exception) and when
worker was halted. This was problematic because workers can be halted
from different threads that react to Netty callbacks (`channelInactive`,
`handlerRemoved` and `exceptionCaught`). In such cases state machine
could be closed multiple times.

This commit makes it safe to call `RunnableBoltWorker#halt()` multiple
times. This method will only signal to the worker that it needs to stop
and close it's state machine.
  • Loading branch information
lutovich committed Jan 24, 2017
1 parent 4fca288 commit 3a91140
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 20 deletions.
Expand Up @@ -37,16 +37,14 @@
*/
class RunnableBoltWorker implements Runnable, BoltWorker
{
/** Poison pill for closing the session and shutting down the worker */
static final Job SHUTDOWN = session1 -> {};

private static final int workQueueSize = Integer.getInteger( "org.neo4j.bolt.workQueueSize", 100 );

private final ArrayBlockingQueue<Job> jobQueue = new ArrayBlockingQueue<>( workQueueSize );
private final BoltStateMachine machine;
private final Log log;
private final Log userLog;
private boolean keepRunning;

private volatile boolean keepRunning = true;

RunnableBoltWorker( BoltStateMachine machine, LogService logging )
{
Expand Down Expand Up @@ -76,7 +74,6 @@ public void enqueue( Job job )
@Override
public void run()
{
keepRunning = true;
ArrayList<Job> batch = new ArrayList<>( workQueueSize );

try
Expand Down Expand Up @@ -110,8 +107,7 @@ public void run()
}
finally
{
// Attempt to close the session, as an effort to release locks and other resources held by the session
machine.close();
closeStateMachine();
}
}

Expand All @@ -126,14 +122,7 @@ private void executeBatch( ArrayList<Job> batch ) throws BoltConnectionFatality

private void execute( Job job ) throws BoltConnectionFatality
{
if ( job == SHUTDOWN )
{
keepRunning = false;
}
else
{
job.perform( machine );
}
job.perform( machine );
}

@Override
Expand All @@ -144,15 +133,20 @@ public void interrupt()

@Override
public void halt()
{
keepRunning = false;
}

private void closeStateMachine()
{
try
{
// Attempt to close the state machine, as an effort to release locks and other resources
machine.close();
}
finally
catch ( Throwable t )
{
keepRunning = false;
log.error( "Unable to close Bolt session '" + machine.key() + "'", t );
}
}

}
Expand Up @@ -29,7 +29,13 @@
import org.neo4j.kernel.impl.logging.NullLogService;
import org.neo4j.logging.AssertableLogProvider;

import static org.mockito.Mockito.*;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static org.neo4j.logging.AssertableLogProvider.inLog;

public class RunnableBoltWorkerTest
Expand Down Expand Up @@ -62,7 +68,7 @@ public void shouldExecuteWorkWhenRun() throws Throwable
// Given
RunnableBoltWorker worker = new RunnableBoltWorker( machine, NullLogService.getInstance() );
worker.enqueue( s -> s.run( "Hello, world!", null, null ) );
worker.enqueue( RunnableBoltWorker.SHUTDOWN );
worker.enqueue( s -> worker.halt() );

// When
worker.run();
Expand Down Expand Up @@ -125,4 +131,67 @@ public void protocolBreachesShouldBeLoggedWithoutStackTraces() throws Throwable
"'test-session'" ) );
userLog.assertNone( inLog( RunnableBoltWorker.class ).any() );
}

@Test
public void workerCanBeHaltedMultipleTimes()
{
RunnableBoltWorker worker = new RunnableBoltWorker( machine, logService );

worker.halt();
worker.halt();
worker.halt();

worker.run();

verify( machine ).close();
}

@Test
public void stateMachineIsClosedOnExit()
{
RunnableBoltWorker worker = new RunnableBoltWorker( machine, logService );
worker.enqueue( s -> s.run( "RETURN 1", null, null ) );
worker.enqueue( s -> s.run( "RETURN 2", null, null ) );
worker.enqueue( s -> worker.halt() );

worker.run();

verify( machine ).close();
}

@Test
public void stateMachineNotClosedOnHalt()
{
RunnableBoltWorker worker = new RunnableBoltWorker( machine, logService );

worker.halt();

verify( machine, never() ).close();
}

@Test
public void stateMachineInterrupted()
{
RunnableBoltWorker worker = new RunnableBoltWorker( machine, logService );

worker.interrupt();

verify( machine ).interrupt();
}

@Test
public void stateMachineCloseFailureIsLogged()
{
RunnableBoltWorker worker = new RunnableBoltWorker( machine, logService );

RuntimeException closeError = new RuntimeException( "Oh!" );
doThrow( closeError ).when( machine ).close();

worker.enqueue( s -> worker.halt() );
worker.run();

internalLog.assertExactly( inLog( RunnableBoltWorker.class ).error(
equalTo( "Unable to close Bolt session 'test-session'" ),
equalTo( closeError ) ) );
}
}

0 comments on commit 3a91140

Please sign in to comment.