Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 17 additions & 53 deletions quickfixj-core/src/test/java/quickfix/SocketAcceptorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ public void cleanup() {

@Test
public void testRestartOfAcceptor() throws Exception {
TestAcceptorApplication testAcceptorApplication = new TestAcceptorApplication();
TestInitiatorApplication testInitiatorApplication = new TestInitiatorApplication();
TestConnectorApplication testAcceptorApplication = new TestConnectorApplication();
TestConnectorApplication testInitiatorApplication = new TestConnectorApplication();
ThreadMXBean bean = ManagementFactory.getThreadMXBean();
Acceptor acceptor = null;
Initiator initiator = null;
Expand Down Expand Up @@ -109,14 +109,14 @@ public void testRestartOfAcceptor() throws Exception {
log.error(e.getMessage(), e);
}
}
testAcceptorApplication.waitForLogout();
if (acceptor != null) {
try {
acceptor.stop();
} catch (RuntimeException e) {
log.error(e.getMessage(), e);
}
}
testAcceptorApplication.waitForLogout();
testInitiatorApplication.waitForLogout();
}
}
Expand All @@ -127,7 +127,7 @@ public void testQuickRestartOfAcceptor() throws Exception {
Acceptor acceptor = null;
try {
ThreadMXBean bean = ManagementFactory.getThreadMXBean();
TestAcceptorApplication testAcceptorApplication = new TestAcceptorApplication();
TestConnectorApplication testAcceptorApplication = new TestConnectorApplication();
acceptor = createAcceptor(testAcceptorApplication);
acceptor.start();
Thread.sleep(2500L);
Expand All @@ -148,7 +148,7 @@ public void testDoubleStartOfAcceptor() throws Exception {
Acceptor acceptor = null;
try {
ThreadMXBean bean = ManagementFactory.getThreadMXBean();
TestAcceptorApplication testAcceptorApplication = new TestAcceptorApplication();
TestConnectorApplication testAcceptorApplication = new TestConnectorApplication();
acceptor = createAcceptor(testAcceptorApplication);
acceptor.start();
// second start should be ignored
Expand All @@ -166,7 +166,7 @@ public void testDoubleStartOfAcceptor() throws Exception {
public void testSessionsAreCleanedUp() throws Exception {
Acceptor acceptor = null;
try {
TestAcceptorApplication testAcceptorApplication = new TestAcceptorApplication();
TestConnectorApplication testAcceptorApplication = new TestConnectorApplication();
acceptor = createAcceptor(testAcceptorApplication);
acceptor.start();
assertEquals(1, acceptor.getSessions().size() );
Expand All @@ -185,7 +185,7 @@ public void testSessionsAreCleanedUp() throws Exception {
public void testSessionsAreCleanedUpOnThreadedSocketAcceptor() throws Exception {
Acceptor acceptor = null;
try {
TestAcceptorApplication testAcceptorApplication = new TestAcceptorApplication();
TestConnectorApplication testAcceptorApplication = new TestConnectorApplication();
acceptor = createAcceptorThreaded(testAcceptorApplication);
acceptor.start();
assertEquals(1, acceptor.getSessions().size() );
Expand Down Expand Up @@ -292,12 +292,12 @@ private Session lookupSession(SessionID sessionID) {
return Session.lookupSession(sessionID);
}

private static class TestAcceptorApplication extends ApplicationAdapter {
private class TestConnectorApplication extends ApplicationAdapter {

private final CountDownLatch logonLatch;
private final CountDownLatch logoutLatch;

public TestAcceptorApplication() {
public TestConnectorApplication() {
logonLatch = new CountDownLatch(1);
logoutLatch = new CountDownLatch(1);
}
Expand Down Expand Up @@ -334,53 +334,15 @@ public void fromAdmin(Message message, SessionID sessionId) throws FieldNotFound
// ignore
}
}
}

private static class TestInitiatorApplication extends ApplicationAdapter {

private final CountDownLatch logonLatch;
private final CountDownLatch logoutLatch;

public TestInitiatorApplication() {
logonLatch = new CountDownLatch(1);
logoutLatch = new CountDownLatch(1);
}

@Override
public void onLogon(SessionID sessionId) {
super.onLogon(sessionId);
logonLatch.countDown();
}

public void waitForLogon() {
try {
assertTrue("Logon timed out", logonLatch.await(10, TimeUnit.SECONDS));
} catch (InterruptedException e) {
fail(e.getMessage());
}
}

public void waitForLogout() {
try {
assertTrue("Logout timed out", logoutLatch.await(10, TimeUnit.SECONDS));
} catch (InterruptedException e) {
fail(e.getMessage());
}
}

@Override
public void fromAdmin(Message message, SessionID sessionId) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, RejectLogon {
try {
if (MsgType.LOGOUT.equals(MessageUtils.getMessageType(message.toString()))) {
logoutLatch.countDown();
}
} catch (InvalidMessage ex) {
// ignore
}
public void toAdmin(Message message, SessionID sessionId) {
log.info("toAdmin: [{}] {}", sessionId, message);
}
}

private Acceptor createAcceptor(TestAcceptorApplication testAcceptorApplication)

private Acceptor createAcceptor(TestConnectorApplication testAcceptorApplication)
throws ConfigError {

SessionSettings settings = createAcceptorSettings();
Expand All @@ -391,7 +353,7 @@ private Acceptor createAcceptor(TestAcceptorApplication testAcceptorApplication)
new DefaultMessageFactory());
}

private Acceptor createAcceptorThreaded(TestAcceptorApplication testAcceptorApplication)
private Acceptor createAcceptorThreaded(TestConnectorApplication testAcceptorApplication)
throws ConfigError {

SessionSettings settings = createAcceptorSettings();
Expand All @@ -409,13 +371,14 @@ private SessionSettings createAcceptorSettings() {
defaults.put("StartTime", "00:00:00");
defaults.put("EndTime", "00:00:00");
defaults.put("BeginString", "FIX.4.2");
defaults.put("NonStopSession", "Y");
settings.setString(acceptorSessionID, "SocketAcceptProtocol", ProtocolFactory.getTypeString(ProtocolFactory.VM_PIPE));
settings.setString(acceptorSessionID, "SocketAcceptPort", "10000");
settings.set(defaults);
return settings;
}

private Initiator createInitiator(TestInitiatorApplication testInitiatorApplication) throws ConfigError {
private Initiator createInitiator(TestConnectorApplication testInitiatorApplication) throws ConfigError {
SessionSettings settings = new SessionSettings();
HashMap<Object, Object> defaults = new HashMap<>();
defaults.put("ConnectionType", "initiator");
Expand All @@ -425,6 +388,7 @@ private Initiator createInitiator(TestInitiatorApplication testInitiatorApplicat
defaults.put("ReconnectInterval", "2");
defaults.put("FileStorePath", "target/data/client");
defaults.put("ValidateUserDefinedFields", "Y");
defaults.put("NonStopSession", "Y");
settings.setString("BeginString", FixVersions.BEGINSTRING_FIX42);
settings.setString(initiatorSessionID, "SocketConnectProtocol", ProtocolFactory.getTypeString(ProtocolFactory.VM_PIPE));
settings.setString(initiatorSessionID, "SocketConnectHost", "127.0.0.1");
Expand Down
51 changes: 12 additions & 39 deletions quickfixj-core/src/test/java/quickfix/SocketInitiatorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ public void testConnectedSocketsAreClosedAfterInitiatorClosed() throws Exception
socketThread.start();

final SessionSettings settings = new SessionSettings();
settings.setString("NonStopSession", "Y");
settings.setString("StartTime", "00:00:00");
settings.setString("EndTime", "00:00:00");
settings.setString("ReconnectInterval", "30");
Expand Down Expand Up @@ -351,42 +352,6 @@ public void onConnect(SessionID sessionID) {
public void onDisconnect(SessionID sessionID) {
onDisconnectCallCount.incrementAndGet();
}

@Override
public void onLogon(SessionID sessionID) {
}

@Override
public void onLogout(SessionID sessionID) {
}

@Override
public void onReset(SessionID sessionID) {
}

@Override
public void onRefresh(SessionID sessionID) {
}

@Override
public void onMissedHeartBeat(SessionID sessionID) {
}

@Override
public void onHeartBeatTimeout(SessionID sessionID) {
}

@Override
public void onResendRequestSent(SessionID sessionID, int beginSeqNo, int endSeqNo, int currentEndSeqNo) {
}

@Override
public void onSequenceResetReceived(SessionID sessionID, int newSeqNo, boolean gapFillFlag) {
}

@Override
public void onResendRequestSatisfied(SessionID sessionID, int beginSeqNo, int endSeqNo) {
}
};

LogFactory logFactory = sessionID -> logSessionStateListener;
Expand Down Expand Up @@ -420,6 +385,7 @@ public void testInitiatorContinueInitializationOnError() throws ConfigError, Int
settings.setString("ConnectionType", "initiator");
settings.setLong(sessionId, "SocketConnectPort", port);
settings.setString(sessionId, "SocketConnectHost", "localhost");
settings.setString("NonStopSession", "Y");
settings.setString("StartTime", "00:00:00");
settings.setString("EndTime", "00:00:00");
settings.setString("HeartBtInt", "30");
Expand All @@ -446,13 +412,14 @@ private void doTestOfRestart(SessionID clientSessionID, ClientApplication client
serverThread.start();
serverThread.waitForInitialization();
long messageLogLength = 0;
Session clientSession = null;
try {
clientApplication.setUpLogonExpectation();
initiator.start();
assertTrue(initiator.getSessions().contains(clientSessionID));
assertEquals(1, initiator.getSessions().size());

Session clientSession = Session.lookupSession(clientSessionID);
clientSession = Session.lookupSession(clientSessionID);
assertLoggedOn(clientApplication, clientSession);

clientApplication.setUpLogoutExpectation();
Expand All @@ -478,8 +445,13 @@ private void doTestOfRestart(SessionID clientSessionID, ClientApplication client
// QFJ-698: check that we were still able to write to the messageLog after the restart
assertTrue(messageLog.length() > messageLogLength);
}

clientApplication.setUpLogoutExpectation();
} finally {
initiator.stop();
if (clientSession != null) {
assertLoggedOut(clientApplication, clientSession);
}
}
} finally {
serverThread.interrupt();
Expand Down Expand Up @@ -527,6 +499,7 @@ private SessionSettings getClientSessionSettings(SessionID clientSessionID, int
defaults.put("SocketConnectProtocol", ProtocolFactory.getTypeString(ProtocolFactory.VM_PIPE));
defaults.put("SocketConnectHost", "localhost");
defaults.put("SocketConnectPort", Integer.toString(port));
defaults.put("NonStopSession", "Y");
defaults.put("StartTime", "00:00:00");
defaults.put("EndTime", "00:00:00");
defaults.put("HeartBtInt", "30");
Expand Down Expand Up @@ -611,11 +584,11 @@ public void onLogon(SessionID sessionId) {

@Override
public void toAdmin(Message message, SessionID sessionId) {
log.info("[{}] {}", sessionId, message);
log.info("toAdmin: [{}] {}", sessionId, message);

// Only countdown the latch if a logout message is actually sent
try {
if (logoutLatch != null && message.getHeader().isSetField(MsgType.FIELD)
if (logoutLatch != null && logoutLatch.getCount() > 0 && message.getHeader().isSetField(MsgType.FIELD)
&& MsgType.LOGOUT.equals(message.getHeader().getString(MsgType.FIELD))) {
log.info("Releasing logout latch for session [{}] with message {}", sessionId, message);
logoutLatch.countDown();
Expand Down