From 14c8ddaa346fa83884033b552b0b15c8fc94cc3e Mon Sep 17 00:00:00 2001 From: chrjohn Date: Tue, 19 Jun 2018 17:14:03 +0200 Subject: [PATCH 1/5] Corrections to SingleThreadedEventHandlingStrategy and SocketAcceptor/Initiator. - Reverted changes from #192 since we actually want to process all messages on stop(). --- .../mina/SingleThreadedEventHandlingStrategy.java | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java b/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java index c74d7a02ee..c5f686cbc2 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java +++ b/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java @@ -176,7 +176,6 @@ public synchronized void stopHandlingMessages() { public void stopHandlingMessages(boolean join) { stopHandlingMessages(); - messageProcessingThread.interrupt(); if (join) { try { messageProcessingThread.join(); @@ -227,12 +226,6 @@ public void start() { executor.execute(wrapper); } - public void interrupt() { - if (executor instanceof DedicatedThreadExecutor) { - ((DedicatedThreadExecutor)executor).interrupt(); - } - } - /** * Provides the Thread::join and Thread::isAlive semantics on the nested Runnable. */ @@ -290,12 +283,6 @@ public void execute(Runnable command) { thread.setDaemon(true); thread.start(); } - - public void interrupt() { - if (thread != null) { - thread.interrupt(); - } - } } } From d185b8499bff77922369d15fe2cc98c374bcb47a Mon Sep 17 00:00:00 2001 From: chrjohn Date: Tue, 19 Jun 2018 17:18:39 +0200 Subject: [PATCH 2/5] Return from block() when stopped. It does not make sense to call getMessage() in that case. --- .../java/quickfix/mina/SingleThreadedEventHandlingStrategy.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java b/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java index c5f686cbc2..c283fe984c 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java +++ b/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java @@ -99,8 +99,8 @@ private void block() { sessionConnector.stopSessionTimer(); // reset the stoptime stopTime = 0; - return; } + return; } } try { From d4711d4a4bf7a8cd6516385c487f03a3aba0d289 Mon Sep 17 00:00:00 2001 From: chrjohn Date: Tue, 19 Jun 2018 17:24:37 +0200 Subject: [PATCH 3/5] minor cleanup in ThreadPerSessionEventHandlingStrategy --- ...ThreadPerSessionEventHandlingStrategy.java | 105 +++++++++--------- 1 file changed, 52 insertions(+), 53 deletions(-) diff --git a/quickfixj-core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java b/quickfixj-core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java index 5cdd3bbe03..2c4b4ead5d 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java +++ b/quickfixj-core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java @@ -113,59 +113,58 @@ public void stopDispatcherThreads() { } } - /** - * A stand-in for the Thread class that delegates to an Executor. - * Implements all the API required by pre-existing QFJ code. - */ - protected static abstract class ThreadAdapter implements Runnable { - - private final Executor executor; - private final String name; - - public ThreadAdapter(String name, Executor executor) { - this.name = name; - this.executor = executor != null ? executor : new DedicatedThreadExecutor(name); - } - - public void start() { - executor.execute(this); - } - - @Override - public final void run() { - Thread currentThread = Thread.currentThread(); - String threadName = currentThread.getName(); - try { - if (!name.equals(threadName)) { - currentThread.setName(name + " (" + threadName + ")"); - } - doRun(); - } finally { - currentThread.setName(threadName); - } - } - - abstract void doRun(); - - /** - * An Executor that uses it's own dedicated Thread. - * Provides equivalent behavior to the prior non-Executor approach. - */ - static final class DedicatedThreadExecutor implements Executor { - - private final String name; - - DedicatedThreadExecutor(String name) { - this.name = name; - } - - @Override - public void execute(Runnable command) { - new Thread(command, name).start(); - } - - } + /** + * A stand-in for the Thread class that delegates to an Executor. + * Implements all the API required by pre-existing QFJ code. + */ + protected static abstract class ThreadAdapter implements Runnable { + + private final Executor executor; + private final String name; + public ThreadAdapter(String name, Executor executor) { + this.name = name; + this.executor = executor != null ? executor : new DedicatedThreadExecutor(name); + } + + public void start() { + executor.execute(this); + } + + @Override + public final void run() { + Thread currentThread = Thread.currentThread(); + String threadName = currentThread.getName(); + try { + if (!name.equals(threadName)) { + currentThread.setName(name + " (" + threadName + ")"); + } + doRun(); + } finally { + currentThread.setName(threadName); + } + } + + abstract void doRun(); + + /** + * An Executor that uses its own dedicated Thread. Provides equivalent + * behavior to the prior non-Executor approach. + */ + static final class DedicatedThreadExecutor implements Executor { + + private final String name; + + DedicatedThreadExecutor(String name) { + this.name = name; + } + + @Override + public void execute(Runnable command) { + new Thread(command, name).start(); + } + + } } protected class MessageDispatchingThread extends ThreadAdapter { @@ -226,7 +225,7 @@ void doRun() { } } if (!messages.isEmpty()) { - final List tempList = new ArrayList<>(); + final List tempList = new ArrayList<>(messages.size()); queueTracker.drainTo(tempList); for (Message message : tempList) { try { From c2ffb4b0c7ae730997d572b077df6b9f96dcf33f Mon Sep 17 00:00:00 2001 From: chrjohn Date: Tue, 19 Jun 2018 17:34:30 +0200 Subject: [PATCH 4/5] Added javadoc and unit test... to ensure that the session timer is not stopped by Message Processor thread when using stopHandlingMessages(true) --- .../SingleThreadedEventHandlingStrategy.java | 12 ++++++++++++ ...ingleThreadedEventHandlingStrategyTest.java | 18 ++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java b/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java index c283fe984c..18dccda863 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java +++ b/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java @@ -167,6 +167,12 @@ private synchronized void startHandlingMessages() { isStopped = false; } + /** + * Stops processing of messages without waiting for message processing + * thread to finish. + * + * It is advised to call stopHandlingMessages(true) instead of this method. + */ public synchronized void stopHandlingMessages() { for (Session session : sessionConnector.getSessionMap().values()) { onMessage(session, END_OF_STREAM); @@ -174,6 +180,12 @@ public synchronized void stopHandlingMessages() { isStopped = true; } + /** + * Stops processing of messages and optionally waits for message processing + * thread to finish. + * + * @param join true to wait for thread to finish + */ public void stopHandlingMessages(boolean join) { stopHandlingMessages(); if (join) { diff --git a/quickfixj-core/src/test/java/quickfix/mina/SingleThreadedEventHandlingStrategyTest.java b/quickfixj-core/src/test/java/quickfix/mina/SingleThreadedEventHandlingStrategyTest.java index 4669a14921..62bcf9078e 100644 --- a/quickfixj-core/src/test/java/quickfix/mina/SingleThreadedEventHandlingStrategyTest.java +++ b/quickfixj-core/src/test/java/quickfix/mina/SingleThreadedEventHandlingStrategyTest.java @@ -45,6 +45,7 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import org.junit.AfterClass; +import static org.junit.Assert.assertTrue; /** * @@ -149,6 +150,23 @@ public void testMultipleStartStop() throws Exception { } } + /** + * During quick restarts: make sure that session timer is started and not stopped via + * block() method called from Message Processor thread. + */ + @Test + public void testMultipleStartSessionTimer() throws Exception { + SessionSettings settings = new SessionSettings(); + SessionConnector connector = new SessionConnectorUnderTest(settings, sessionFactory); + ehs = new SingleThreadedEventHandlingStrategy(connector, 1000); + for (int i = 0; i < 1000; i++) { + connector.startSessionTimer(); + ehs.blockInThread(); + assertTrue(connector.checkSessionTimerRunning()); + ehs.stopHandlingMessages(true); + } + } + @Test public void shouldCleanUpAcceptorQFJMessageProcessorThreadAfterInterrupt() throws Exception { assertQFJMessageProcessorThreads(0); From 310d2c458c3dda3fd4074d7ea838252e8dd234fc Mon Sep 17 00:00:00 2001 From: chrjohn Date: Tue, 19 Jun 2018 17:36:46 +0200 Subject: [PATCH 5/5] Changed SocketAcceptor and SocketInitiator to use SingleThreadedEventHandlingStrategy.stopHandlingMessages(true). Otherwise it could happen on quick restarts that the Message Processor thread stopped the SessionTimer concurrently which lead to an unresponsive FIX session, i.e. no Logon sent, no Heartbeats. --- quickfixj-core/src/main/java/quickfix/SocketAcceptor.java | 2 +- .../src/main/java/quickfix/SocketInitiator.java | 2 +- .../src/main/java/quickfix/mina/SessionConnector.java | 8 ++++++++ 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/quickfixj-core/src/main/java/quickfix/SocketAcceptor.java b/quickfixj-core/src/main/java/quickfix/SocketAcceptor.java index 8fdc4e88ed..49f27d7f15 100644 --- a/quickfixj-core/src/main/java/quickfix/SocketAcceptor.java +++ b/quickfixj-core/src/main/java/quickfix/SocketAcceptor.java @@ -122,7 +122,7 @@ public void stop(boolean forceDisconnect) { stopSessionTimer(); } finally { try { - eventHandlingStrategy.stopHandlingMessages(); + eventHandlingStrategy.stopHandlingMessages(true); } finally { Session.unregisterSessions(getSessions(), true); clearConnectorSessions(); diff --git a/quickfixj-core/src/main/java/quickfix/SocketInitiator.java b/quickfixj-core/src/main/java/quickfix/SocketInitiator.java index bf24294f42..3502ae1467 100644 --- a/quickfixj-core/src/main/java/quickfix/SocketInitiator.java +++ b/quickfixj-core/src/main/java/quickfix/SocketInitiator.java @@ -134,7 +134,7 @@ public void stop(boolean forceDisconnect) { stopInitiators(); } finally { try { - eventHandlingStrategy.stopHandlingMessages(); + eventHandlingStrategy.stopHandlingMessages(true); } finally { Session.unregisterSessions(getSessions(), true); clearConnectorSessions(); diff --git a/quickfixj-core/src/main/java/quickfix/mina/SessionConnector.java b/quickfixj-core/src/main/java/quickfix/mina/SessionConnector.java index e53ee09f3e..ab87babd0c 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/SessionConnector.java +++ b/quickfixj-core/src/main/java/quickfix/mina/SessionConnector.java @@ -328,6 +328,14 @@ protected void stopSessionTimer() { } } + // visible for testing + boolean checkSessionTimerRunning() { + if ( sessionTimerFuture != null ) { + return !sessionTimerFuture.isDone(); + } + return false; + } + protected ScheduledExecutorService getScheduledExecutorService() { return scheduledExecutorService; }