Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,12 @@ <H3>QuickFIX Settings</H3>
</TD>
<TD> empty, ie all remote addresses are allowed </TD>
</TR>
<TR ALIGN="left" VALIGN="middle">
<TD> <I>AcceptorTemplate</I> </TD>
<TD> Designates a template Acceptor session. See <a href="acceptor_dynamic.html">Dynamic Acceptor Sessions</a></TD>
<TD> Y<BR>N</TD>
<TD>N</TD>
</TR>

<TR ALIGN="center" VALIGN="middle">
<TD COLSPAN="4" class="subsection"><A NAME="Security">Secure Communication Options</A></TD>
Expand Down
9 changes: 5 additions & 4 deletions quickfixj-core/src/main/java/quickfix/Acceptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
* Contact ask@quickfixengine.org if any conditions of this licensing
* are not clear to you.
******************************************************************************/

package quickfix;

/**
Expand All @@ -25,7 +24,8 @@
public interface Acceptor extends Connector {

/**
* Acceptor setting specifying the socket protocol used to accept connections.
* Acceptor setting specifying the socket protocol used to accept
* connections.
*/
String SETTING_SOCKET_ACCEPT_PROTOCOL = "SocketAcceptProtocol";

Expand All @@ -35,12 +35,13 @@ public interface Acceptor extends Connector {
String SETTING_SOCKET_ACCEPT_PORT = "SocketAcceptPort";

/**
* Acceptor setting specifying local IP interface address for accepting connections.
* Acceptor setting specifying local IP interface address for accepting
* connections.
*/
String SETTING_SOCKET_ACCEPT_ADDRESS = "SocketAcceptAddress";

/**
* Acceptor setting specifying local IP interface address for accepting connections.
* Acceptor setting specifying a template acceptor session.
*/
String SETTING_ACCEPTOR_TEMPLATE = "AcceptorTemplate";
}
Original file line number Diff line number Diff line change
Expand Up @@ -460,10 +460,14 @@ public static void closeManagedSessionsAndDispose(IoService ioService, boolean a
}
}

protected boolean isContinueInitOnError() throws ConfigError, FieldConvertError {
protected boolean isContinueInitOnError() {
boolean continueInitOnError = false;
if (settings.isSetting(SessionFactory.SETTING_CONTINUE_INIT_ON_ERROR)) {
continueInitOnError = settings.getBool(SessionFactory.SETTING_CONTINUE_INIT_ON_ERROR);
try {
continueInitOnError = settings.getBool(SessionFactory.SETTING_CONTINUE_INIT_ON_ERROR);
} catch (ConfigError | FieldConvertError ex) {
// ignore and return default
}
}
return continueInitOnError;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import quickfix.mina.ssl.SSLSupport;

import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.net.SocketAddress;
import java.security.GeneralSecurityException;
import java.util.Collection;
Expand Down Expand Up @@ -93,12 +94,13 @@ protected AbstractSocketAcceptor(Application application,
// TODO SYNC Does this method really need synchronization?
protected synchronized void startAcceptingConnections() throws ConfigError {

SocketAddress address = null;
try {
createSessions(getSettings());
startSessionTimer();
boolean continueInitOnError = isContinueInitOnError();
createSessions(getSettings(), continueInitOnError);
startSessionTimer();

for (AcceptorSocketDescriptor socketDescriptor : socketDescriptorForAddress.values()) {
SocketAddress address = null;
for (AcceptorSocketDescriptor socketDescriptor : socketDescriptorForAddress.values()) {
try {
address = socketDescriptor.getAddress();
IoAcceptor ioAcceptor = getIoAcceptor(socketDescriptor);
CompositeIoFilterChainBuilder ioFilterChainBuilder = new CompositeIoFilterChainBuilder(getIoFilterChainBuilder());
Expand All @@ -114,12 +116,14 @@ protected synchronized void startAcceptingConnections() throws ConfigError {
ioAcceptor.setCloseOnDeactivation(false);
ioAcceptor.bind(socketDescriptor.getAddress());
log.info("Listening for connections at {} for session(s) {}", address, socketDescriptor.getAcceptedSessions().keySet());
} catch (IOException | GeneralSecurityException | ConfigError e) {
if (continueInitOnError) {
log.warn("error during session initialization for session(s) {}, continuing...", socketDescriptor.getAcceptedSessions().keySet(), e);
} else {
log.error("Cannot start acceptor session for {}, error: {}", address, e);
throw new RuntimeError(e);
}
}
} catch (FieldConvertError e) {
throw new ConfigError(e);
} catch (Exception e) {
log.error("Cannot start acceptor session for {}, error: {}", address, e);
throw new RuntimeError(e);
}
}

Expand Down Expand Up @@ -213,35 +217,40 @@ private boolean equals(Object object1, Object object2) {
return object1 == null ? object2 == null : object1.equals(object2);
}

private void createSessions(SessionSettings settings) throws ConfigError, FieldConvertError {
private void createSessions(SessionSettings settings, boolean continueInitOnError) throws ConfigError {
Map<SessionID, Session> allSessions = new HashMap<>();
boolean continueInitOnError = isContinueInitOnError();

for (Iterator<SessionID> i = settings.sectionIterator(); i.hasNext();) {
SessionID sessionID = i.next();
String connectionType = settings.getString(sessionID,
SessionFactory.SETTING_CONNECTION_TYPE);
try {
String connectionType = null;
if (settings.isSetting(sessionID, SessionFactory.SETTING_CONNECTION_TYPE)) {
connectionType = settings.getString(sessionID,
SessionFactory.SETTING_CONNECTION_TYPE);
}

boolean isTemplate = false;
if (settings.isSetting(sessionID, Acceptor.SETTING_ACCEPTOR_TEMPLATE)) {
isTemplate = settings.getBool(sessionID, Acceptor.SETTING_ACCEPTOR_TEMPLATE);
}
if (SessionFactory.ACCEPTOR_CONNECTION_TYPE.equals(connectionType)) {
boolean isTemplate = false;
if (settings.isSetting(sessionID, Acceptor.SETTING_ACCEPTOR_TEMPLATE)) {
try {
isTemplate = settings.getBool(sessionID, Acceptor.SETTING_ACCEPTOR_TEMPLATE);
} catch (FieldConvertError | ConfigError ex) {
// ignore and use default
}
}

if (connectionType.equals(SessionFactory.ACCEPTOR_CONNECTION_TYPE)) {
try {
AcceptorSocketDescriptor descriptor = getAcceptorSocketDescriptor(settings, sessionID);
if (!isTemplate) {
AcceptorSocketDescriptor descriptor = getAcceptorSocketDescriptor(settings, sessionID);
Session session = sessionFactory.create(sessionID, settings);
descriptor.acceptSession(session);
allSessions.put(sessionID, session);
}
} catch (Throwable t) {
if (continueInitOnError) {
log.error("error during session initialization for {}, continuing...", sessionID, t);
} else {
throw t instanceof ConfigError ? (ConfigError) t : new ConfigError(
"error during session initialization", t);
}
}
} catch (Throwable t) {
if (continueInitOnError) {
log.warn("error during session initialization for {}, continuing...", sessionID, t);
} else {
throw t instanceof ConfigError ? (ConfigError) t : new ConfigError(
"error during session initialization", t);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,17 @@ protected AbstractSocketInitiator(SessionSettings settings, SessionFactory sessi
protected void createSessionInitiators()
throws ConfigError {
try {
createSessions();
boolean continueInitOnError = isContinueInitOnError();
createSessions(continueInitOnError);
for (final Session session : getSessionMap().values()) {
createInitiator(session);
createInitiator(session, continueInitOnError);
}
} catch (final FieldConvertError e) {
throw new ConfigError(e);
}
}

private void createInitiator(final Session session) throws ConfigError, FieldConvertError {
private void createInitiator(final Session session, final boolean continueInitOnError) throws ConfigError, FieldConvertError {

SessionSettings settings = getSettings();
final SessionID sessionID = session.getSessionID();
Expand Down Expand Up @@ -171,14 +172,21 @@ && getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_DOMAIN)) {
}

ScheduledExecutorService scheduledExecutorService = (scheduledReconnectExecutor != null ? scheduledReconnectExecutor : getScheduledExecutorService());
final IoSessionInitiator ioSessionInitiator = new IoSessionInitiator(session,
socketAddresses, localAddress, reconnectingIntervals,
scheduledExecutorService, networkingOptions,
getEventHandlingStrategy(), getIoFilterChainBuilder(), sslEnabled, sslConfig,
proxyType, proxyVersion, proxyHost, proxyPort, proxyUser, proxyPassword, proxyDomain, proxyWorkstation);

initiators.add(ioSessionInitiator);

try {
final IoSessionInitiator ioSessionInitiator = new IoSessionInitiator(session,
socketAddresses, localAddress, reconnectingIntervals,
scheduledExecutorService, networkingOptions,
getEventHandlingStrategy(), getIoFilterChainBuilder(), sslEnabled, sslConfig,
proxyType, proxyVersion, proxyHost, proxyPort, proxyUser, proxyPassword, proxyDomain, proxyWorkstation);

initiators.add(ioSessionInitiator);
} catch (ConfigError e) {
if (continueInitOnError) {
log.warn("error during session initialization for {}, continuing...", sessionID, e);
} else {
throw e;
}
}
}

// QFJ-482
Expand All @@ -201,10 +209,8 @@ private SocketAddress getLocalAddress(SessionSettings settings, final SessionID
return localAddress;
}

private void createSessions() throws ConfigError, FieldConvertError {
private void createSessions(boolean continueInitOnError) throws ConfigError, FieldConvertError {
final SessionSettings settings = getSettings();
boolean continueInitOnError = isContinueInitOnError();

final Map<SessionID, Session> initiatorSessions = new HashMap<>();
for (final Iterator<SessionID> i = settings.sectionIterator(); i.hasNext();) {
final SessionID sessionID = i.next();
Expand All @@ -216,7 +222,7 @@ private void createSessions() throws ConfigError, FieldConvertError {
}
} catch (final Throwable e) {
if (continueInitOnError) {
log.error("error during session initialization for {}, continuing...", sessionID, e);
log.warn("error during session initialization for {}, continuing...", sessionID, e);
} else {
throw e instanceof ConfigError ? (ConfigError) e : new ConfigError(
"error during session initialization", e);
Expand All @@ -232,7 +238,7 @@ public void createDynamicSession(SessionID sessionID) throws ConfigError {
try {
Session session = createSession(sessionID);
super.addDynamicSession(session);
createInitiator(session);
createInitiator(session, isContinueInitOnError());
startInitiators();
} catch (final FieldConvertError e) {
throw new ConfigError(e);
Expand Down
59 changes: 58 additions & 1 deletion quickfixj-core/src/test/java/quickfix/SocketAcceptorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,19 @@

package quickfix;

import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.util.AvailablePortFinder;
import org.junit.After;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import quickfix.field.MsgType;
import quickfix.mina.ProtocolFactory;
import quickfix.mina.SingleThreadedEventHandlingStrategy;
import quickfix.mina.message.FIXProtocolCodecFactory;
import quickfix.mina.ssl.SSLSupport;

import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
Expand All @@ -35,7 +40,12 @@
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;

import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

/**
* QFJ-643: Unable to restart a stopped acceptor (SocketAcceptor)
Expand Down Expand Up @@ -190,6 +200,53 @@ public void testSessionsAreCleanedUpOnThreadedSocketAcceptor() throws Exception
}
}

@Test
public void testAcceptorContinueInitializationOnError() throws ConfigError, InterruptedException, IOException {
final int port = AvailablePortFinder.getNextAvailable();
final int port2 = AvailablePortFinder.getNextAvailable();
final SessionSettings settings = new SessionSettings();
final SessionID sessionId = new SessionID("FIX.4.4", "SENDER", "TARGET");
final SessionID sessionId2 = new SessionID("FIX.4.4", "FOO", "BAR");
final SessionID sessionId3 = new SessionID("FIX.4.4", "BAR", "BAZ");
settings.setString(SessionFactory.SETTING_CONTINUE_INIT_ON_ERROR, "Y");
settings.setString("ConnectionType", "acceptor");
settings.setString("StartTime", "00:00:00");
settings.setString("EndTime", "00:00:00");
settings.setString("HeartBtInt", "30");
settings.setString("BeginString", "FIX.4.4");
settings.setLong(sessionId, "SocketAcceptPort", port);
settings.setLong(sessionId2, "SocketAcceptPort", port2);
settings.setLong(sessionId3, "SocketAcceptPort", port2);
settings.setString(sessionId, SSLSupport.SETTING_USE_SSL, "Y");
settings.setString(sessionId, SSLSupport.SETTING_KEY_STORE_NAME, "test.keystore");
// supply a wrong password to make initialization fail
settings.setString(sessionId, SSLSupport.SETTING_KEY_STORE_PWD, "wrong-password");
// supply a wrong protocol to make initialization fail
settings.setString(sessionId3, "SocketAcceptProtocol", "foobar");

final SocketAcceptor acceptor = new SocketAcceptor(new ApplicationAdapter(), new MemoryStoreFactory(), settings,
new ScreenLogFactory(settings), new DefaultMessageFactory());
acceptor.start();

for (IoAcceptor endpoint : acceptor.getEndpoints()) {
boolean containsFIXCodec = endpoint.getFilterChain().contains(FIXProtocolCodecFactory.FILTER_NAME);
if (endpoint.getLocalAddress() == null) { // failing session is not bound!
assertFalse(containsFIXCodec);
} else {
assertTrue(containsFIXCodec);
}
}

// sessionid1 is present since it fails after the setup phase
assertTrue(acceptor.getSessions().contains(sessionId));
// sessionid2 is set up normally
assertTrue(acceptor.getSessions().contains(sessionId2));
// sessionid3 could not be set up due to problems in the config itself
assertFalse(acceptor.getSessions().contains(sessionId3));

acceptor.stop();
}

private void checkThreads(ThreadMXBean bean, int expectedNum) {
ThreadInfo[] dumpAllThreads = bean.dumpAllThreads(false, false);
int qfjMPThreads = 0;
Expand Down
31 changes: 31 additions & 0 deletions quickfixj-core/src/test/java/quickfix/SocketInitiatorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import quickfix.field.MsgType;
import quickfix.mina.ssl.SSLSupport;
import quickfix.test.util.ReflectionUtil;

public class SocketInitiatorTest {
Expand Down Expand Up @@ -405,6 +406,36 @@ public void onResendRequestSatisfied(int beginSeqNo, int endSeqNo) {
assertEquals(1, onDisconnectCallCount.intValue());
}


@Test
public void testInitiatorContinueInitializationOnError() throws ConfigError, InterruptedException, IOException {
final ServerSocket serverSocket = new ServerSocket(0);
final int port = serverSocket.getLocalPort();
final SessionSettings settings = new SessionSettings();
final SessionID sessionId = new SessionID("FIX.4.4", "SENDER", "TARGET");
settings.setString(SessionFactory.SETTING_CONTINUE_INIT_ON_ERROR, "Y");
settings.setString(sessionId, "BeginString", "FIX.4.4");
settings.setString("ConnectionType", "initiator");
settings.setLong(sessionId, "SocketConnectPort", port);
settings.setString(sessionId, "SocketConnectHost", "localhost");
settings.setString("StartTime", "00:00:00");
settings.setString("EndTime", "00:00:00");
settings.setString("HeartBtInt", "30");
settings.setString("SocketConnectProtocol", ProtocolFactory.getTypeString(ProtocolFactory.SOCKET));
settings.setString(sessionId, SSLSupport.SETTING_USE_SSL, "Y");
settings.setString(sessionId, SSLSupport.SETTING_KEY_STORE_NAME, "test.keystore");
// supply a wrong password to make initialization fail
settings.setString(sessionId, SSLSupport.SETTING_KEY_STORE_PWD, "wrong-password");

final SocketInitiator initiator = new SocketInitiator(new ApplicationAdapter(), new MemoryStoreFactory(), settings,
new ScreenLogFactory(settings), new DefaultMessageFactory());
initiator.start();

assertTrue(initiator.getInitiators().isEmpty());
initiator.stop();
}


private void doTestOfRestart(SessionID clientSessionID, ClientApplication clientApplication,
final Initiator initiator, File messageLog, int port) throws InterruptedException, ConfigError {
ServerThread serverThread = new ServerThread(port);
Expand Down