From 3a91140c7e53a5da13d66e6f4b264fb618c0973a Mon Sep 17 00:00:00 2001 From: lutovich Date: Tue, 24 Jan 2017 17:02:06 +0100 Subject: [PATCH] Avoid closing bolt state machine multiple times Each bolt worker owns a state machine. This state machine was closed when worker exited (both cleanly and by throwing an exception) and when worker was halted. This was problematic because workers can be halted from different threads that react to Netty callbacks (`channelInactive`, `handlerRemoved` and `exceptionCaught`). In such cases state machine could be closed multiple times. This commit makes it safe to call `RunnableBoltWorker#halt()` multiple times. This method will only signal to the worker that it needs to stop and close it's state machine. --- .../concurrent/RunnableBoltWorker.java | 30 +++----- .../concurrent/RunnableBoltWorkerTest.java | 73 ++++++++++++++++++- 2 files changed, 83 insertions(+), 20 deletions(-) diff --git a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/concurrent/RunnableBoltWorker.java b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/concurrent/RunnableBoltWorker.java index afb28faf76ac0..bca0ea3cf7449 100644 --- a/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/concurrent/RunnableBoltWorker.java +++ b/community/bolt/src/main/java/org/neo4j/bolt/v1/runtime/concurrent/RunnableBoltWorker.java @@ -37,16 +37,14 @@ */ class RunnableBoltWorker implements Runnable, BoltWorker { - /** Poison pill for closing the session and shutting down the worker */ - static final Job SHUTDOWN = session1 -> {}; - private static final int workQueueSize = Integer.getInteger( "org.neo4j.bolt.workQueueSize", 100 ); private final ArrayBlockingQueue jobQueue = new ArrayBlockingQueue<>( workQueueSize ); private final BoltStateMachine machine; private final Log log; private final Log userLog; - private boolean keepRunning; + + private volatile boolean keepRunning = true; RunnableBoltWorker( BoltStateMachine machine, LogService logging ) { @@ -76,7 +74,6 @@ public void enqueue( Job job ) @Override public void run() { - keepRunning = true; ArrayList batch = new ArrayList<>( workQueueSize ); try @@ -110,8 +107,7 @@ public void run() } finally { - // Attempt to close the session, as an effort to release locks and other resources held by the session - machine.close(); + closeStateMachine(); } } @@ -126,14 +122,7 @@ private void executeBatch( ArrayList batch ) throws BoltConnectionFatality private void execute( Job job ) throws BoltConnectionFatality { - if ( job == SHUTDOWN ) - { - keepRunning = false; - } - else - { - job.perform( machine ); - } + job.perform( machine ); } @Override @@ -144,15 +133,20 @@ public void interrupt() @Override public void halt() + { + keepRunning = false; + } + + private void closeStateMachine() { try { + // Attempt to close the state machine, as an effort to release locks and other resources machine.close(); } - finally + catch ( Throwable t ) { - keepRunning = false; + log.error( "Unable to close Bolt session '" + machine.key() + "'", t ); } } - } diff --git a/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/concurrent/RunnableBoltWorkerTest.java b/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/concurrent/RunnableBoltWorkerTest.java index d5f61f4dbff2c..c1907e6bf4acd 100644 --- a/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/concurrent/RunnableBoltWorkerTest.java +++ b/community/bolt/src/test/java/org/neo4j/bolt/v1/runtime/concurrent/RunnableBoltWorkerTest.java @@ -29,7 +29,13 @@ import org.neo4j.kernel.impl.logging.NullLogService; import org.neo4j.logging.AssertableLogProvider; -import static org.mockito.Mockito.*; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; import static org.neo4j.logging.AssertableLogProvider.inLog; public class RunnableBoltWorkerTest @@ -62,7 +68,7 @@ public void shouldExecuteWorkWhenRun() throws Throwable // Given RunnableBoltWorker worker = new RunnableBoltWorker( machine, NullLogService.getInstance() ); worker.enqueue( s -> s.run( "Hello, world!", null, null ) ); - worker.enqueue( RunnableBoltWorker.SHUTDOWN ); + worker.enqueue( s -> worker.halt() ); // When worker.run(); @@ -125,4 +131,67 @@ public void protocolBreachesShouldBeLoggedWithoutStackTraces() throws Throwable "'test-session'" ) ); userLog.assertNone( inLog( RunnableBoltWorker.class ).any() ); } + + @Test + public void workerCanBeHaltedMultipleTimes() + { + RunnableBoltWorker worker = new RunnableBoltWorker( machine, logService ); + + worker.halt(); + worker.halt(); + worker.halt(); + + worker.run(); + + verify( machine ).close(); + } + + @Test + public void stateMachineIsClosedOnExit() + { + RunnableBoltWorker worker = new RunnableBoltWorker( machine, logService ); + worker.enqueue( s -> s.run( "RETURN 1", null, null ) ); + worker.enqueue( s -> s.run( "RETURN 2", null, null ) ); + worker.enqueue( s -> worker.halt() ); + + worker.run(); + + verify( machine ).close(); + } + + @Test + public void stateMachineNotClosedOnHalt() + { + RunnableBoltWorker worker = new RunnableBoltWorker( machine, logService ); + + worker.halt(); + + verify( machine, never() ).close(); + } + + @Test + public void stateMachineInterrupted() + { + RunnableBoltWorker worker = new RunnableBoltWorker( machine, logService ); + + worker.interrupt(); + + verify( machine ).interrupt(); + } + + @Test + public void stateMachineCloseFailureIsLogged() + { + RunnableBoltWorker worker = new RunnableBoltWorker( machine, logService ); + + RuntimeException closeError = new RuntimeException( "Oh!" ); + doThrow( closeError ).when( machine ).close(); + + worker.enqueue( s -> worker.halt() ); + worker.run(); + + internalLog.assertExactly( inLog( RunnableBoltWorker.class ).error( + equalTo( "Unable to close Bolt session 'test-session'" ), + equalTo( closeError ) ) ); + } }