From fe7398cc8cfd3e4e9cf6aebf9ba4859e363987aa Mon Sep 17 00:00:00 2001 From: Christoph John Date: Wed, 21 Oct 2020 13:29:29 +0200 Subject: [PATCH 1/3] Made start/stop of SocketInitiator/SocketAcceptor thread-safe. --- .../src/main/java/quickfix/SocketAcceptor.java | 10 +++++----- .../src/main/java/quickfix/SocketInitiator.java | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/quickfixj-core/src/main/java/quickfix/SocketAcceptor.java b/quickfixj-core/src/main/java/quickfix/SocketAcceptor.java index 49f27d7f15..64a394d40d 100644 --- a/quickfixj-core/src/main/java/quickfix/SocketAcceptor.java +++ b/quickfixj-core/src/main/java/quickfix/SocketAcceptor.java @@ -19,6 +19,7 @@ package quickfix; +import java.util.concurrent.atomic.AtomicBoolean; import quickfix.mina.EventHandlingStrategy; import quickfix.mina.SingleThreadedEventHandlingStrategy; import quickfix.mina.acceptor.AbstractSocketAcceptor; @@ -28,7 +29,7 @@ * sessions. */ public class SocketAcceptor extends AbstractSocketAcceptor { - private volatile Boolean isStarted = Boolean.FALSE; + private final AtomicBoolean isStarted = new AtomicBoolean(false); private final SingleThreadedEventHandlingStrategy eventHandlingStrategy; private SocketAcceptor(Builder builder) throws ConfigError { @@ -103,11 +104,10 @@ public void start() throws ConfigError, RuntimeError { } private void initialize() throws ConfigError { - if (isStarted.equals(Boolean.FALSE)) { + if (isStarted.compareAndSet(false, true)) { eventHandlingStrategy.setExecutor(longLivedExecutor); startAcceptingConnections(); eventHandlingStrategy.blockInThread(); - isStarted = Boolean.TRUE; } else { log.warn("Ignored attempt to start already running SocketAcceptor."); } @@ -115,7 +115,7 @@ private void initialize() throws ConfigError { @Override public void stop(boolean forceDisconnect) { - if (isStarted.equals(Boolean.TRUE)) { + if (isStarted.get() == true) { try { logoutAllSessions(forceDisconnect); stopAcceptingConnections(); @@ -126,7 +126,7 @@ public void stop(boolean forceDisconnect) { } finally { Session.unregisterSessions(getSessions(), true); clearConnectorSessions(); - isStarted = Boolean.FALSE; + isStarted.set(false); } } } diff --git a/quickfixj-core/src/main/java/quickfix/SocketInitiator.java b/quickfixj-core/src/main/java/quickfix/SocketInitiator.java index f7c7389cbd..60b038f95d 100644 --- a/quickfixj-core/src/main/java/quickfix/SocketInitiator.java +++ b/quickfixj-core/src/main/java/quickfix/SocketInitiator.java @@ -19,6 +19,7 @@ package quickfix; +import java.util.concurrent.atomic.AtomicBoolean; import quickfix.mina.EventHandlingStrategy; import quickfix.mina.SingleThreadedEventHandlingStrategy; import quickfix.mina.initiator.AbstractSocketInitiator; @@ -28,7 +29,7 @@ * sessions. */ public class SocketInitiator extends AbstractSocketInitiator { - private volatile Boolean isStarted = Boolean.FALSE; + private final AtomicBoolean isStarted = new AtomicBoolean(false); private final SingleThreadedEventHandlingStrategy eventHandlingStrategy; private SocketInitiator(Builder builder) throws ConfigError { @@ -120,7 +121,7 @@ public void start() throws ConfigError, RuntimeError { } private void initialize() throws ConfigError { - if (isStarted.equals(Boolean.FALSE)) { + if (isStarted.compareAndSet(false, true)) { eventHandlingStrategy.setExecutor(longLivedExecutor); createSessionInitiators(); for (Session session : getSessionMap().values()) { @@ -128,7 +129,6 @@ private void initialize() throws ConfigError { } startInitiators(); eventHandlingStrategy.blockInThread(); - isStarted = Boolean.TRUE; } else { log.warn("Ignored attempt to start already running SocketInitiator."); } @@ -136,7 +136,7 @@ private void initialize() throws ConfigError { @Override public void stop(boolean forceDisconnect) { - if (isStarted.equals(Boolean.TRUE)) { + if (isStarted.get() == true) { try { logoutAllSessions(forceDisconnect); stopInitiators(); @@ -146,7 +146,7 @@ public void stop(boolean forceDisconnect) { } finally { Session.unregisterSessions(getSessions(), true); clearConnectorSessions(); - isStarted = Boolean.FALSE; + isStarted.set(false); } } } From af8b2dd05e82b0df84cd2adaddef1cd606d20c6f Mon Sep 17 00:00:00 2001 From: Christoph John Date: Thu, 22 Oct 2020 00:32:00 +0200 Subject: [PATCH 2/3] Made iterator removal thread-safe. - This could throw ConcurrentModificationExceptions when initiator/acceptor was started/stopped from different threads. --- .../java/quickfix/mina/acceptor/AbstractSocketAcceptor.java | 4 +++- .../java/quickfix/mina/initiator/AbstractSocketInitiator.java | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/quickfixj-core/src/main/java/quickfix/mina/acceptor/AbstractSocketAcceptor.java b/quickfixj-core/src/main/java/quickfix/mina/acceptor/AbstractSocketAcceptor.java index 6b1e5bea22..858363a207 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/acceptor/AbstractSocketAcceptor.java +++ b/quickfixj-core/src/main/java/quickfix/mina/acceptor/AbstractSocketAcceptor.java @@ -56,6 +56,8 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; /** * Abstract base class for socket acceptors. @@ -64,7 +66,7 @@ public abstract class AbstractSocketAcceptor extends SessionConnector implements private final Map sessionProviders = new HashMap<>(); private final SessionFactory sessionFactory; private final Map socketDescriptorForAddress = new HashMap<>(); - private final Map ioAcceptors = new HashMap<>(); + private final ConcurrentMap ioAcceptors = new ConcurrentHashMap<>(); protected AbstractSocketAcceptor(SessionSettings settings, SessionFactory sessionFactory) throws ConfigError { diff --git a/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java b/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java index 11f67e1afd..3df749212c 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java +++ b/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java @@ -47,10 +47,10 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; @@ -63,7 +63,7 @@ public abstract class AbstractSocketInitiator extends SessionConnector implements Initiator { protected final Logger log = LoggerFactory.getLogger(getClass()); - private final Set initiators = new HashSet<>(); + private final Set initiators = ConcurrentHashMap.newKeySet(); private final ScheduledExecutorService scheduledReconnectExecutor; public static final String QFJ_RECONNECT_THREAD_PREFIX = "QFJ Reconnect Thread-"; From 72ce82ab10ae1b0a7989e98b9983a1ae93767b9d Mon Sep 17 00:00:00 2001 From: Christoph John Date: Wed, 9 Dec 2020 01:43:29 +0100 Subject: [PATCH 3/3] - prevent concurrent access to start()/stop() method - allow access to either stop() or start() at the same time --- .../main/java/quickfix/SocketAcceptor.java | 33 +++++++++-------- .../main/java/quickfix/SocketInitiator.java | 37 ++++++++++--------- 2 files changed, 36 insertions(+), 34 deletions(-) diff --git a/quickfixj-core/src/main/java/quickfix/SocketAcceptor.java b/quickfixj-core/src/main/java/quickfix/SocketAcceptor.java index 64a394d40d..23a2769448 100644 --- a/quickfixj-core/src/main/java/quickfix/SocketAcceptor.java +++ b/quickfixj-core/src/main/java/quickfix/SocketAcceptor.java @@ -104,29 +104,30 @@ public void start() throws ConfigError, RuntimeError { } private void initialize() throws ConfigError { - if (isStarted.compareAndSet(false, true)) { - eventHandlingStrategy.setExecutor(longLivedExecutor); - startAcceptingConnections(); - eventHandlingStrategy.blockInThread(); - } else { - log.warn("Ignored attempt to start already running SocketAcceptor."); + synchronized (isStarted) { + if (isStarted.compareAndSet(false, true)) { + eventHandlingStrategy.setExecutor(longLivedExecutor); + startAcceptingConnections(); + eventHandlingStrategy.blockInThread(); + } } } @Override public void stop(boolean forceDisconnect) { - if (isStarted.get() == true) { - try { - logoutAllSessions(forceDisconnect); - stopAcceptingConnections(); - stopSessionTimer(); - } finally { + synchronized (isStarted) { + if (isStarted.compareAndSet(true, false)) { try { - eventHandlingStrategy.stopHandlingMessages(true); + logoutAllSessions(forceDisconnect); + stopAcceptingConnections(); + stopSessionTimer(); } finally { - Session.unregisterSessions(getSessions(), true); - clearConnectorSessions(); - isStarted.set(false); + try { + eventHandlingStrategy.stopHandlingMessages(true); + } finally { + Session.unregisterSessions(getSessions(), true); + clearConnectorSessions(); + } } } } diff --git a/quickfixj-core/src/main/java/quickfix/SocketInitiator.java b/quickfixj-core/src/main/java/quickfix/SocketInitiator.java index 60b038f95d..7da2f1b3f7 100644 --- a/quickfixj-core/src/main/java/quickfix/SocketInitiator.java +++ b/quickfixj-core/src/main/java/quickfix/SocketInitiator.java @@ -121,32 +121,33 @@ public void start() throws ConfigError, RuntimeError { } private void initialize() throws ConfigError { - if (isStarted.compareAndSet(false, true)) { - eventHandlingStrategy.setExecutor(longLivedExecutor); - createSessionInitiators(); - for (Session session : getSessionMap().values()) { - Session.registerSession(session); + synchronized (isStarted) { + if (isStarted.compareAndSet(false, true)) { + eventHandlingStrategy.setExecutor(longLivedExecutor); + createSessionInitiators(); + for (Session session : getSessionMap().values()) { + Session.registerSession(session); + } + startInitiators(); + eventHandlingStrategy.blockInThread(); } - startInitiators(); - eventHandlingStrategy.blockInThread(); - } else { - log.warn("Ignored attempt to start already running SocketInitiator."); } } @Override public void stop(boolean forceDisconnect) { - if (isStarted.get() == true) { - try { - logoutAllSessions(forceDisconnect); - stopInitiators(); - } finally { + synchronized (isStarted) { + if (isStarted.compareAndSet(true, false)) { try { - eventHandlingStrategy.stopHandlingMessages(true); + logoutAllSessions(forceDisconnect); + stopInitiators(); } finally { - Session.unregisterSessions(getSessions(), true); - clearConnectorSessions(); - isStarted.set(false); + try { + eventHandlingStrategy.stopHandlingMessages(true); + } finally { + Session.unregisterSessions(getSessions(), true); + clearConnectorSessions(); + } } } }