From 395526cb3954dc465df1953a28dff28d344c9e5e Mon Sep 17 00:00:00 2001 From: Christoph John Date: Mon, 11 Oct 2021 13:33:51 +0200 Subject: [PATCH 1/3] Interrupt current thread where applicable. --- .../quickfix/mina/SingleThreadedEventHandlingStrategy.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java b/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java index 8d2e88d3ea..58b7e1cf41 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java +++ b/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java @@ -76,7 +76,7 @@ public void onMessage(Session quickfixSession, Message message) { queueTracker.put(new SessionMessageEvent(quickfixSession, message)); } catch (InterruptedException e) { isStopped = true; - throw new RuntimeException(e); + Thread.currentThread().interrupt(); } } @@ -196,7 +196,8 @@ public void stopHandlingMessages(boolean join) { try { messageProcessingThread.join(); } catch (InterruptedException e) { - sessionConnector.log.error("{} interrupted.", MESSAGE_PROCESSOR_THREAD_NAME); + sessionConnector.log.warn("{} interrupted.", MESSAGE_PROCESSOR_THREAD_NAME); + Thread.currentThread().interrupt(); } } } From b0f1adbc36606d609be76181e205677518ddd04e Mon Sep 17 00:00:00 2001 From: Christoph John Date: Tue, 12 Oct 2021 00:03:24 +0200 Subject: [PATCH 2/3] Changed SessionConnector and ThreadPerSessionEventHandlingStrategy --- .../src/main/java/quickfix/mina/SessionConnector.java | 9 ++++++--- .../mina/ThreadPerSessionEventHandlingStrategy.java | 2 ++ 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/quickfixj-core/src/main/java/quickfix/mina/SessionConnector.java b/quickfixj-core/src/main/java/quickfix/mina/SessionConnector.java index 29c09d7cba..f58455343a 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/SessionConnector.java +++ b/quickfixj-core/src/main/java/quickfix/mina/SessionConnector.java @@ -272,6 +272,7 @@ protected void waitForLogout() { Thread.sleep(100L); } catch (InterruptedException e) { log.error(e.getMessage(), e); + Thread.currentThread().interrupt(); } final long elapsed = System.currentTimeMillis() - start; Iterator sessionItr = loggedOnSessions.iterator(); @@ -375,6 +376,7 @@ public void run() { try { delegate.await(); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } @@ -449,9 +451,10 @@ public static void closeManagedSessionsAndDispose(IoService ioService, boolean a completed = closeFuture.await(1000, TimeUnit.MILLISECONDS); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); - } - if (!completed) { - logger.warn("Could not close IoSession {}", ioSession); + } finally { + if (!completed) { + logger.warn("Could not close IoSession {}", ioSession); + } } } } diff --git a/quickfixj-core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java b/quickfixj-core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java index dce95ec787..90d977a9ce 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java +++ b/quickfixj-core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java @@ -203,6 +203,7 @@ public void enqueue(Message message) { queueTracker.put(message); } catch (final InterruptedException e) { quickfixSession.getLog().onErrorEvent(e.toString()); + Thread.currentThread().interrupt(); } } @@ -227,6 +228,7 @@ void doRun() { LogUtil.logThrowable(quickfixSession.getSessionID(), "Message dispatcher interrupted", e); stopping = true; + Thread.currentThread().interrupt(); } catch (final Throwable e) { LogUtil.logThrowable(quickfixSession.getSessionID(), "Error during message processing", e); From f8a446f78b2bd50e633108fb6401c5795b42e70b Mon Sep 17 00:00:00 2001 From: Christoph John Date: Tue, 12 Oct 2021 00:27:49 +0200 Subject: [PATCH 3/3] Added interruption of current thread on InterruptedException. --- .../src/test/java/quickfix/MultiAcceptorTest.java | 2 ++ .../java/quickfix/SessionDisconnectConcurrentlyTest.java | 4 +++- .../src/test/java/quickfix/mina/LostLogoutTest.java | 2 +- .../test/java/quickfix/mina/LostLogoutThreadedTest.java | 2 +- .../mina/SingleThreadedEventHandlingStrategyTest.java | 4 ++-- .../src/test/java/quickfix/mina/ssl/SSLAndNonSSLTest.java | 1 + .../test/java/quickfix/mina/ssl/SSLCertificateTest.java | 1 + .../quickfix/test/acceptance/ConnectToServerStep.java | 2 +- .../java/quickfix/test/acceptance/TestConnection.java | 8 ++++++-- 9 files changed, 18 insertions(+), 8 deletions(-) diff --git a/quickfixj-core/src/test/java/quickfix/MultiAcceptorTest.java b/quickfixj-core/src/test/java/quickfix/MultiAcceptorTest.java index d0161450f5..b4c06797ae 100644 --- a/quickfixj-core/src/test/java/quickfix/MultiAcceptorTest.java +++ b/quickfixj-core/src/test/java/quickfix/MultiAcceptorTest.java @@ -206,6 +206,7 @@ public void waitForLogon() { try { logonLatch.await(20, TimeUnit.SECONDS); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); fail(e.getMessage()); } } @@ -220,6 +221,7 @@ public synchronized void waitForMessages() { fail("Timed out waiting for message"); } } catch (InterruptedException e) { + Thread.currentThread().interrupt(); fail(e.getMessage()); } } diff --git a/quickfixj-core/src/test/java/quickfix/SessionDisconnectConcurrentlyTest.java b/quickfixj-core/src/test/java/quickfix/SessionDisconnectConcurrentlyTest.java index 7cfeed11b9..66d8edd9ba 100644 --- a/quickfixj-core/src/test/java/quickfix/SessionDisconnectConcurrentlyTest.java +++ b/quickfixj-core/src/test/java/quickfix/SessionDisconnectConcurrentlyTest.java @@ -134,6 +134,7 @@ public void waitForLogon() { try { logonLatch.await(10, TimeUnit.SECONDS); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); fail(e.getMessage()); } } @@ -148,6 +149,7 @@ public synchronized void waitForMessages() { fail("Timed out waiting for message"); } } catch (InterruptedException e) { + Thread.currentThread().interrupt(); fail(e.getMessage()); } } @@ -225,7 +227,7 @@ public void run() { try { Thread.sleep(12000); } catch (InterruptedException e) { - e.printStackTrace(); + Thread.currentThread().interrupt(); } threadIds = bean.findDeadlockedThreads(); } diff --git a/quickfixj-core/src/test/java/quickfix/mina/LostLogoutTest.java b/quickfixj-core/src/test/java/quickfix/mina/LostLogoutTest.java index 63215c1981..0278b65f0e 100644 --- a/quickfixj-core/src/test/java/quickfix/mina/LostLogoutTest.java +++ b/quickfixj-core/src/test/java/quickfix/mina/LostLogoutTest.java @@ -144,7 +144,7 @@ public void fromApp(Message message, SessionID sessionId) throws FieldNotFound, try { Thread.sleep(1000); } catch (InterruptedException e) { - e.printStackTrace(); + Thread.currentThread().interrupt(); } System.out.println("Server: message processing end"); } diff --git a/quickfixj-core/src/test/java/quickfix/mina/LostLogoutThreadedTest.java b/quickfixj-core/src/test/java/quickfix/mina/LostLogoutThreadedTest.java index 1f13240842..67d7af559b 100644 --- a/quickfixj-core/src/test/java/quickfix/mina/LostLogoutThreadedTest.java +++ b/quickfixj-core/src/test/java/quickfix/mina/LostLogoutThreadedTest.java @@ -144,7 +144,7 @@ public void fromApp(Message message, SessionID sessionId) throws FieldNotFound, try { Thread.sleep(1000); } catch (InterruptedException e) { - e.printStackTrace(); + Thread.currentThread().interrupt(); } System.out.println("Server: message processing end"); } diff --git a/quickfixj-core/src/test/java/quickfix/mina/SingleThreadedEventHandlingStrategyTest.java b/quickfixj-core/src/test/java/quickfix/mina/SingleThreadedEventHandlingStrategyTest.java index 8802783564..7ca548f20c 100644 --- a/quickfixj-core/src/test/java/quickfix/mina/SingleThreadedEventHandlingStrategyTest.java +++ b/quickfixj-core/src/test/java/quickfix/mina/SingleThreadedEventHandlingStrategyTest.java @@ -68,7 +68,7 @@ public void cleanup() { try { Thread.sleep(500); } catch (InterruptedException ex) { - // ignored + Thread.currentThread().interrupt(); } } } @@ -373,7 +373,7 @@ private static void assertQFJMessageProcessorThreads(int expected) { try { Thread.sleep(100); } catch (InterruptedException ex) { - // ignored + Thread.currentThread().interrupt(); } dumpAllThreads = bean.dumpAllThreads(false, false); qfjMPThreads = getMessageProcessorThreads(dumpAllThreads); diff --git a/quickfixj-core/src/test/java/quickfix/mina/ssl/SSLAndNonSSLTest.java b/quickfixj-core/src/test/java/quickfix/mina/ssl/SSLAndNonSSLTest.java index d000fd6568..bc46036b6d 100644 --- a/quickfixj-core/src/test/java/quickfix/mina/ssl/SSLAndNonSSLTest.java +++ b/quickfixj-core/src/test/java/quickfix/mina/ssl/SSLAndNonSSLTest.java @@ -214,6 +214,7 @@ public void run() { shutdownLatch.await(); } catch (InterruptedException e1) { try { + Thread.currentThread().interrupt(); acceptor.stop(true); } catch (RuntimeException e) { e.printStackTrace(); diff --git a/quickfixj-core/src/test/java/quickfix/mina/ssl/SSLCertificateTest.java b/quickfixj-core/src/test/java/quickfix/mina/ssl/SSLCertificateTest.java index 008b1c6940..053dbb2018 100644 --- a/quickfixj-core/src/test/java/quickfix/mina/ssl/SSLCertificateTest.java +++ b/quickfixj-core/src/test/java/quickfix/mina/ssl/SSLCertificateTest.java @@ -65,6 +65,7 @@ public void cleanup() { try { Thread.sleep(500); } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); java.util.logging.Logger.getLogger(SSLCertificateTest.class.getName()).log(Level.SEVERE, null, ex); } } diff --git a/quickfixj-core/src/test/java/quickfix/test/acceptance/ConnectToServerStep.java b/quickfixj-core/src/test/java/quickfix/test/acceptance/ConnectToServerStep.java index f64e7a084d..e92888f2bc 100644 --- a/quickfixj-core/src/test/java/quickfix/test/acceptance/ConnectToServerStep.java +++ b/quickfixj-core/src/test/java/quickfix/test/acceptance/ConnectToServerStep.java @@ -61,7 +61,7 @@ public void run(TestResult result, TestConnection connection) { try { Thread.sleep(reconnectDelay); } catch (InterruptedException e1) { - e1.printStackTrace(); + Thread.currentThread().interrupt(); } } try { diff --git a/quickfixj-core/src/test/java/quickfix/test/acceptance/TestConnection.java b/quickfixj-core/src/test/java/quickfix/test/acceptance/TestConnection.java index 36fc4f0403..a754d5ad4c 100644 --- a/quickfixj-core/src/test/java/quickfix/test/acceptance/TestConnection.java +++ b/quickfixj-core/src/test/java/quickfix/test/acceptance/TestConnection.java @@ -68,8 +68,11 @@ private TestIoHandler getIoHandler(int clientId) { public void tearDown() { for (TestIoHandler testIoHandler : ioHandlers.values()) { - CloseFuture closeFuture = testIoHandler.getSession().closeNow(); - closeFuture.awaitUninterruptibly(); + IoSession session = testIoHandler.getSession(); + if (session != null) { + CloseFuture closeFuture = session.closeNow(); + closeFuture.awaitUninterruptibly(); + } } ioHandlers.clear(); } @@ -165,6 +168,7 @@ public IoSession getSession() { } } } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new RuntimeException(e); } return session;