diff --git a/quickfixj-core/src/main/java/quickfix/SocketAcceptor.java b/quickfixj-core/src/main/java/quickfix/SocketAcceptor.java index 49f27d7f15..23a2769448 100644 --- a/quickfixj-core/src/main/java/quickfix/SocketAcceptor.java +++ b/quickfixj-core/src/main/java/quickfix/SocketAcceptor.java @@ -19,6 +19,7 @@ package quickfix; +import java.util.concurrent.atomic.AtomicBoolean; import quickfix.mina.EventHandlingStrategy; import quickfix.mina.SingleThreadedEventHandlingStrategy; import quickfix.mina.acceptor.AbstractSocketAcceptor; @@ -28,7 +29,7 @@ * sessions. */ public class SocketAcceptor extends AbstractSocketAcceptor { - private volatile Boolean isStarted = Boolean.FALSE; + private final AtomicBoolean isStarted = new AtomicBoolean(false); private final SingleThreadedEventHandlingStrategy eventHandlingStrategy; private SocketAcceptor(Builder builder) throws ConfigError { @@ -103,30 +104,30 @@ public void start() throws ConfigError, RuntimeError { } private void initialize() throws ConfigError { - if (isStarted.equals(Boolean.FALSE)) { - eventHandlingStrategy.setExecutor(longLivedExecutor); - startAcceptingConnections(); - eventHandlingStrategy.blockInThread(); - isStarted = Boolean.TRUE; - } else { - log.warn("Ignored attempt to start already running SocketAcceptor."); + synchronized (isStarted) { + if (isStarted.compareAndSet(false, true)) { + eventHandlingStrategy.setExecutor(longLivedExecutor); + startAcceptingConnections(); + eventHandlingStrategy.blockInThread(); + } } } @Override public void stop(boolean forceDisconnect) { - if (isStarted.equals(Boolean.TRUE)) { - try { - logoutAllSessions(forceDisconnect); - stopAcceptingConnections(); - stopSessionTimer(); - } finally { + synchronized (isStarted) { + if (isStarted.compareAndSet(true, false)) { try { - eventHandlingStrategy.stopHandlingMessages(true); + logoutAllSessions(forceDisconnect); + stopAcceptingConnections(); + stopSessionTimer(); } finally { - Session.unregisterSessions(getSessions(), true); - clearConnectorSessions(); - isStarted = Boolean.FALSE; + try { + eventHandlingStrategy.stopHandlingMessages(true); + } finally { + Session.unregisterSessions(getSessions(), true); + clearConnectorSessions(); + } } } } diff --git a/quickfixj-core/src/main/java/quickfix/SocketInitiator.java b/quickfixj-core/src/main/java/quickfix/SocketInitiator.java index f7c7389cbd..7da2f1b3f7 100644 --- a/quickfixj-core/src/main/java/quickfix/SocketInitiator.java +++ b/quickfixj-core/src/main/java/quickfix/SocketInitiator.java @@ -19,6 +19,7 @@ package quickfix; +import java.util.concurrent.atomic.AtomicBoolean; import quickfix.mina.EventHandlingStrategy; import quickfix.mina.SingleThreadedEventHandlingStrategy; import quickfix.mina.initiator.AbstractSocketInitiator; @@ -28,7 +29,7 @@ * sessions. */ public class SocketInitiator extends AbstractSocketInitiator { - private volatile Boolean isStarted = Boolean.FALSE; + private final AtomicBoolean isStarted = new AtomicBoolean(false); private final SingleThreadedEventHandlingStrategy eventHandlingStrategy; private SocketInitiator(Builder builder) throws ConfigError { @@ -120,33 +121,33 @@ public void start() throws ConfigError, RuntimeError { } private void initialize() throws ConfigError { - if (isStarted.equals(Boolean.FALSE)) { - eventHandlingStrategy.setExecutor(longLivedExecutor); - createSessionInitiators(); - for (Session session : getSessionMap().values()) { - Session.registerSession(session); + synchronized (isStarted) { + if (isStarted.compareAndSet(false, true)) { + eventHandlingStrategy.setExecutor(longLivedExecutor); + createSessionInitiators(); + for (Session session : getSessionMap().values()) { + Session.registerSession(session); + } + startInitiators(); + eventHandlingStrategy.blockInThread(); } - startInitiators(); - eventHandlingStrategy.blockInThread(); - isStarted = Boolean.TRUE; - } else { - log.warn("Ignored attempt to start already running SocketInitiator."); } } @Override public void stop(boolean forceDisconnect) { - if (isStarted.equals(Boolean.TRUE)) { - try { - logoutAllSessions(forceDisconnect); - stopInitiators(); - } finally { + synchronized (isStarted) { + if (isStarted.compareAndSet(true, false)) { try { - eventHandlingStrategy.stopHandlingMessages(true); + logoutAllSessions(forceDisconnect); + stopInitiators(); } finally { - Session.unregisterSessions(getSessions(), true); - clearConnectorSessions(); - isStarted = Boolean.FALSE; + try { + eventHandlingStrategy.stopHandlingMessages(true); + } finally { + Session.unregisterSessions(getSessions(), true); + clearConnectorSessions(); + } } } } diff --git a/quickfixj-core/src/main/java/quickfix/mina/acceptor/AbstractSocketAcceptor.java b/quickfixj-core/src/main/java/quickfix/mina/acceptor/AbstractSocketAcceptor.java index e20b76d84b..556a24c047 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/acceptor/AbstractSocketAcceptor.java +++ b/quickfixj-core/src/main/java/quickfix/mina/acceptor/AbstractSocketAcceptor.java @@ -56,6 +56,8 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; /** * Abstract base class for socket acceptors. @@ -64,7 +66,7 @@ public abstract class AbstractSocketAcceptor extends SessionConnector implements private final Map sessionProviders = new HashMap<>(); private final SessionFactory sessionFactory; private final Map socketDescriptorForAddress = new HashMap<>(); - private final Map ioAcceptors = new HashMap<>(); + private final ConcurrentMap ioAcceptors = new ConcurrentHashMap<>(); protected AbstractSocketAcceptor(SessionSettings settings, SessionFactory sessionFactory) throws ConfigError { diff --git a/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java b/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java index 09c07cbc6b..893c3cac1e 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java +++ b/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java @@ -47,10 +47,10 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; @@ -63,7 +63,7 @@ public abstract class AbstractSocketInitiator extends SessionConnector implements Initiator { protected final Logger log = LoggerFactory.getLogger(getClass()); - private final Set initiators = new HashSet<>(); + private final Set initiators = ConcurrentHashMap.newKeySet(); private final ScheduledExecutorService scheduledReconnectExecutor; public static final String QFJ_RECONNECT_THREAD_PREFIX = "QFJ Reconnect Thread-";