Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 19 additions & 18 deletions quickfixj-core/src/main/java/quickfix/SocketAcceptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package quickfix;

import java.util.concurrent.atomic.AtomicBoolean;
import quickfix.mina.EventHandlingStrategy;
import quickfix.mina.SingleThreadedEventHandlingStrategy;
import quickfix.mina.acceptor.AbstractSocketAcceptor;
Expand All @@ -28,7 +29,7 @@
* sessions.
*/
public class SocketAcceptor extends AbstractSocketAcceptor {
private volatile Boolean isStarted = Boolean.FALSE;
private final AtomicBoolean isStarted = new AtomicBoolean(false);
private final SingleThreadedEventHandlingStrategy eventHandlingStrategy;

private SocketAcceptor(Builder builder) throws ConfigError {
Expand Down Expand Up @@ -103,30 +104,30 @@ public void start() throws ConfigError, RuntimeError {
}

private void initialize() throws ConfigError {
if (isStarted.equals(Boolean.FALSE)) {
eventHandlingStrategy.setExecutor(longLivedExecutor);
startAcceptingConnections();
eventHandlingStrategy.blockInThread();
isStarted = Boolean.TRUE;
} else {
log.warn("Ignored attempt to start already running SocketAcceptor.");
synchronized (isStarted) {
if (isStarted.compareAndSet(false, true)) {
eventHandlingStrategy.setExecutor(longLivedExecutor);
startAcceptingConnections();
eventHandlingStrategy.blockInThread();
}
}
}

@Override
public void stop(boolean forceDisconnect) {
if (isStarted.equals(Boolean.TRUE)) {
try {
logoutAllSessions(forceDisconnect);
stopAcceptingConnections();
stopSessionTimer();
} finally {
synchronized (isStarted) {
if (isStarted.compareAndSet(true, false)) {
try {
eventHandlingStrategy.stopHandlingMessages(true);
logoutAllSessions(forceDisconnect);
stopAcceptingConnections();
stopSessionTimer();
} finally {
Session.unregisterSessions(getSessions(), true);
clearConnectorSessions();
isStarted = Boolean.FALSE;
try {
eventHandlingStrategy.stopHandlingMessages(true);
} finally {
Session.unregisterSessions(getSessions(), true);
clearConnectorSessions();
}
}
}
}
Expand Down
41 changes: 21 additions & 20 deletions quickfixj-core/src/main/java/quickfix/SocketInitiator.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package quickfix;

import java.util.concurrent.atomic.AtomicBoolean;
import quickfix.mina.EventHandlingStrategy;
import quickfix.mina.SingleThreadedEventHandlingStrategy;
import quickfix.mina.initiator.AbstractSocketInitiator;
Expand All @@ -28,7 +29,7 @@
* sessions.
*/
public class SocketInitiator extends AbstractSocketInitiator {
private volatile Boolean isStarted = Boolean.FALSE;
private final AtomicBoolean isStarted = new AtomicBoolean(false);
private final SingleThreadedEventHandlingStrategy eventHandlingStrategy;

private SocketInitiator(Builder builder) throws ConfigError {
Expand Down Expand Up @@ -120,33 +121,33 @@ public void start() throws ConfigError, RuntimeError {
}

private void initialize() throws ConfigError {
if (isStarted.equals(Boolean.FALSE)) {
eventHandlingStrategy.setExecutor(longLivedExecutor);
createSessionInitiators();
for (Session session : getSessionMap().values()) {
Session.registerSession(session);
synchronized (isStarted) {
if (isStarted.compareAndSet(false, true)) {
eventHandlingStrategy.setExecutor(longLivedExecutor);
createSessionInitiators();
for (Session session : getSessionMap().values()) {
Session.registerSession(session);
}
startInitiators();
eventHandlingStrategy.blockInThread();
}
startInitiators();
eventHandlingStrategy.blockInThread();
isStarted = Boolean.TRUE;
} else {
log.warn("Ignored attempt to start already running SocketInitiator.");
}
}

@Override
public void stop(boolean forceDisconnect) {
if (isStarted.equals(Boolean.TRUE)) {
try {
logoutAllSessions(forceDisconnect);
stopInitiators();
} finally {
synchronized (isStarted) {
if (isStarted.compareAndSet(true, false)) {
try {
eventHandlingStrategy.stopHandlingMessages(true);
logoutAllSessions(forceDisconnect);
stopInitiators();
} finally {
Session.unregisterSessions(getSessions(), true);
clearConnectorSessions();
isStarted = Boolean.FALSE;
try {
eventHandlingStrategy.stopHandlingMessages(true);
} finally {
Session.unregisterSessions(getSessions(), true);
clearConnectorSessions();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* Abstract base class for socket acceptors.
Expand All @@ -64,7 +66,7 @@ public abstract class AbstractSocketAcceptor extends SessionConnector implements
private final Map<SocketAddress, AcceptorSessionProvider> sessionProviders = new HashMap<>();
private final SessionFactory sessionFactory;
private final Map<SocketAddress, AcceptorSocketDescriptor> socketDescriptorForAddress = new HashMap<>();
private final Map<AcceptorSocketDescriptor, IoAcceptor> ioAcceptors = new HashMap<>();
private final ConcurrentMap<AcceptorSocketDescriptor, IoAcceptor> ioAcceptors = new ConcurrentHashMap<>();

protected AbstractSocketAcceptor(SessionSettings settings, SessionFactory sessionFactory)
throws ConfigError {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
Expand All @@ -63,7 +63,7 @@
public abstract class AbstractSocketInitiator extends SessionConnector implements Initiator {

protected final Logger log = LoggerFactory.getLogger(getClass());
private final Set<IoSessionInitiator> initiators = new HashSet<>();
private final Set<IoSessionInitiator> initiators = ConcurrentHashMap.newKeySet();
private final ScheduledExecutorService scheduledReconnectExecutor;
public static final String QFJ_RECONNECT_THREAD_PREFIX = "QFJ Reconnect Thread-";

Expand Down