diff --git a/quickfixj-core/src/test/java/quickfix/SocketAcceptorTest.java b/quickfixj-core/src/test/java/quickfix/SocketAcceptorTest.java index 0637d3c774..933b1d8103 100644 --- a/quickfixj-core/src/test/java/quickfix/SocketAcceptorTest.java +++ b/quickfixj-core/src/test/java/quickfix/SocketAcceptorTest.java @@ -75,8 +75,8 @@ public void cleanup() { @Test public void testRestartOfAcceptor() throws Exception { - TestAcceptorApplication testAcceptorApplication = new TestAcceptorApplication(); - TestInitiatorApplication testInitiatorApplication = new TestInitiatorApplication(); + TestConnectorApplication testAcceptorApplication = new TestConnectorApplication(); + TestConnectorApplication testInitiatorApplication = new TestConnectorApplication(); ThreadMXBean bean = ManagementFactory.getThreadMXBean(); Acceptor acceptor = null; Initiator initiator = null; @@ -109,6 +109,7 @@ public void testRestartOfAcceptor() throws Exception { log.error(e.getMessage(), e); } } + testAcceptorApplication.waitForLogout(); if (acceptor != null) { try { acceptor.stop(); @@ -116,7 +117,6 @@ public void testRestartOfAcceptor() throws Exception { log.error(e.getMessage(), e); } } - testAcceptorApplication.waitForLogout(); testInitiatorApplication.waitForLogout(); } } @@ -127,7 +127,7 @@ public void testQuickRestartOfAcceptor() throws Exception { Acceptor acceptor = null; try { ThreadMXBean bean = ManagementFactory.getThreadMXBean(); - TestAcceptorApplication testAcceptorApplication = new TestAcceptorApplication(); + TestConnectorApplication testAcceptorApplication = new TestConnectorApplication(); acceptor = createAcceptor(testAcceptorApplication); acceptor.start(); Thread.sleep(2500L); @@ -148,7 +148,7 @@ public void testDoubleStartOfAcceptor() throws Exception { Acceptor acceptor = null; try { ThreadMXBean bean = ManagementFactory.getThreadMXBean(); - TestAcceptorApplication testAcceptorApplication = new TestAcceptorApplication(); + TestConnectorApplication testAcceptorApplication = new TestConnectorApplication(); acceptor = createAcceptor(testAcceptorApplication); acceptor.start(); // second start should be ignored @@ -166,7 +166,7 @@ public void testDoubleStartOfAcceptor() throws Exception { public void testSessionsAreCleanedUp() throws Exception { Acceptor acceptor = null; try { - TestAcceptorApplication testAcceptorApplication = new TestAcceptorApplication(); + TestConnectorApplication testAcceptorApplication = new TestConnectorApplication(); acceptor = createAcceptor(testAcceptorApplication); acceptor.start(); assertEquals(1, acceptor.getSessions().size() ); @@ -185,7 +185,7 @@ public void testSessionsAreCleanedUp() throws Exception { public void testSessionsAreCleanedUpOnThreadedSocketAcceptor() throws Exception { Acceptor acceptor = null; try { - TestAcceptorApplication testAcceptorApplication = new TestAcceptorApplication(); + TestConnectorApplication testAcceptorApplication = new TestConnectorApplication(); acceptor = createAcceptorThreaded(testAcceptorApplication); acceptor.start(); assertEquals(1, acceptor.getSessions().size() ); @@ -292,12 +292,12 @@ private Session lookupSession(SessionID sessionID) { return Session.lookupSession(sessionID); } - private static class TestAcceptorApplication extends ApplicationAdapter { + private class TestConnectorApplication extends ApplicationAdapter { private final CountDownLatch logonLatch; private final CountDownLatch logoutLatch; - public TestAcceptorApplication() { + public TestConnectorApplication() { logonLatch = new CountDownLatch(1); logoutLatch = new CountDownLatch(1); } @@ -334,53 +334,15 @@ public void fromAdmin(Message message, SessionID sessionId) throws FieldNotFound // ignore } } - } - - private static class TestInitiatorApplication extends ApplicationAdapter { - - private final CountDownLatch logonLatch; - private final CountDownLatch logoutLatch; - - public TestInitiatorApplication() { - logonLatch = new CountDownLatch(1); - logoutLatch = new CountDownLatch(1); - } - - @Override - public void onLogon(SessionID sessionId) { - super.onLogon(sessionId); - logonLatch.countDown(); - } - - public void waitForLogon() { - try { - assertTrue("Logon timed out", logonLatch.await(10, TimeUnit.SECONDS)); - } catch (InterruptedException e) { - fail(e.getMessage()); - } - } - - public void waitForLogout() { - try { - assertTrue("Logout timed out", logoutLatch.await(10, TimeUnit.SECONDS)); - } catch (InterruptedException e) { - fail(e.getMessage()); - } - } @Override - public void fromAdmin(Message message, SessionID sessionId) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, RejectLogon { - try { - if (MsgType.LOGOUT.equals(MessageUtils.getMessageType(message.toString()))) { - logoutLatch.countDown(); - } - } catch (InvalidMessage ex) { - // ignore - } + public void toAdmin(Message message, SessionID sessionId) { + log.info("toAdmin: [{}] {}", sessionId, message); } } - private Acceptor createAcceptor(TestAcceptorApplication testAcceptorApplication) + + private Acceptor createAcceptor(TestConnectorApplication testAcceptorApplication) throws ConfigError { SessionSettings settings = createAcceptorSettings(); @@ -391,7 +353,7 @@ private Acceptor createAcceptor(TestAcceptorApplication testAcceptorApplication) new DefaultMessageFactory()); } - private Acceptor createAcceptorThreaded(TestAcceptorApplication testAcceptorApplication) + private Acceptor createAcceptorThreaded(TestConnectorApplication testAcceptorApplication) throws ConfigError { SessionSettings settings = createAcceptorSettings(); @@ -409,13 +371,14 @@ private SessionSettings createAcceptorSettings() { defaults.put("StartTime", "00:00:00"); defaults.put("EndTime", "00:00:00"); defaults.put("BeginString", "FIX.4.2"); + defaults.put("NonStopSession", "Y"); settings.setString(acceptorSessionID, "SocketAcceptProtocol", ProtocolFactory.getTypeString(ProtocolFactory.VM_PIPE)); settings.setString(acceptorSessionID, "SocketAcceptPort", "10000"); settings.set(defaults); return settings; } - private Initiator createInitiator(TestInitiatorApplication testInitiatorApplication) throws ConfigError { + private Initiator createInitiator(TestConnectorApplication testInitiatorApplication) throws ConfigError { SessionSettings settings = new SessionSettings(); HashMap defaults = new HashMap<>(); defaults.put("ConnectionType", "initiator"); @@ -425,6 +388,7 @@ private Initiator createInitiator(TestInitiatorApplication testInitiatorApplicat defaults.put("ReconnectInterval", "2"); defaults.put("FileStorePath", "target/data/client"); defaults.put("ValidateUserDefinedFields", "Y"); + defaults.put("NonStopSession", "Y"); settings.setString("BeginString", FixVersions.BEGINSTRING_FIX42); settings.setString(initiatorSessionID, "SocketConnectProtocol", ProtocolFactory.getTypeString(ProtocolFactory.VM_PIPE)); settings.setString(initiatorSessionID, "SocketConnectHost", "127.0.0.1"); diff --git a/quickfixj-core/src/test/java/quickfix/SocketInitiatorTest.java b/quickfixj-core/src/test/java/quickfix/SocketInitiatorTest.java index 6dd8033f58..3590333291 100644 --- a/quickfixj-core/src/test/java/quickfix/SocketInitiatorTest.java +++ b/quickfixj-core/src/test/java/quickfix/SocketInitiatorTest.java @@ -306,6 +306,7 @@ public void testConnectedSocketsAreClosedAfterInitiatorClosed() throws Exception socketThread.start(); final SessionSettings settings = new SessionSettings(); + settings.setString("NonStopSession", "Y"); settings.setString("StartTime", "00:00:00"); settings.setString("EndTime", "00:00:00"); settings.setString("ReconnectInterval", "30"); @@ -351,42 +352,6 @@ public void onConnect(SessionID sessionID) { public void onDisconnect(SessionID sessionID) { onDisconnectCallCount.incrementAndGet(); } - - @Override - public void onLogon(SessionID sessionID) { - } - - @Override - public void onLogout(SessionID sessionID) { - } - - @Override - public void onReset(SessionID sessionID) { - } - - @Override - public void onRefresh(SessionID sessionID) { - } - - @Override - public void onMissedHeartBeat(SessionID sessionID) { - } - - @Override - public void onHeartBeatTimeout(SessionID sessionID) { - } - - @Override - public void onResendRequestSent(SessionID sessionID, int beginSeqNo, int endSeqNo, int currentEndSeqNo) { - } - - @Override - public void onSequenceResetReceived(SessionID sessionID, int newSeqNo, boolean gapFillFlag) { - } - - @Override - public void onResendRequestSatisfied(SessionID sessionID, int beginSeqNo, int endSeqNo) { - } }; LogFactory logFactory = sessionID -> logSessionStateListener; @@ -420,6 +385,7 @@ public void testInitiatorContinueInitializationOnError() throws ConfigError, Int settings.setString("ConnectionType", "initiator"); settings.setLong(sessionId, "SocketConnectPort", port); settings.setString(sessionId, "SocketConnectHost", "localhost"); + settings.setString("NonStopSession", "Y"); settings.setString("StartTime", "00:00:00"); settings.setString("EndTime", "00:00:00"); settings.setString("HeartBtInt", "30"); @@ -446,13 +412,14 @@ private void doTestOfRestart(SessionID clientSessionID, ClientApplication client serverThread.start(); serverThread.waitForInitialization(); long messageLogLength = 0; + Session clientSession = null; try { clientApplication.setUpLogonExpectation(); initiator.start(); assertTrue(initiator.getSessions().contains(clientSessionID)); assertEquals(1, initiator.getSessions().size()); - Session clientSession = Session.lookupSession(clientSessionID); + clientSession = Session.lookupSession(clientSessionID); assertLoggedOn(clientApplication, clientSession); clientApplication.setUpLogoutExpectation(); @@ -478,8 +445,13 @@ private void doTestOfRestart(SessionID clientSessionID, ClientApplication client // QFJ-698: check that we were still able to write to the messageLog after the restart assertTrue(messageLog.length() > messageLogLength); } + + clientApplication.setUpLogoutExpectation(); } finally { initiator.stop(); + if (clientSession != null) { + assertLoggedOut(clientApplication, clientSession); + } } } finally { serverThread.interrupt(); @@ -527,6 +499,7 @@ private SessionSettings getClientSessionSettings(SessionID clientSessionID, int defaults.put("SocketConnectProtocol", ProtocolFactory.getTypeString(ProtocolFactory.VM_PIPE)); defaults.put("SocketConnectHost", "localhost"); defaults.put("SocketConnectPort", Integer.toString(port)); + defaults.put("NonStopSession", "Y"); defaults.put("StartTime", "00:00:00"); defaults.put("EndTime", "00:00:00"); defaults.put("HeartBtInt", "30"); @@ -611,11 +584,11 @@ public void onLogon(SessionID sessionId) { @Override public void toAdmin(Message message, SessionID sessionId) { - log.info("[{}] {}", sessionId, message); + log.info("toAdmin: [{}] {}", sessionId, message); // Only countdown the latch if a logout message is actually sent try { - if (logoutLatch != null && message.getHeader().isSetField(MsgType.FIELD) + if (logoutLatch != null && logoutLatch.getCount() > 0 && message.getHeader().isSetField(MsgType.FIELD) && MsgType.LOGOUT.equals(message.getHeader().getString(MsgType.FIELD))) { log.info("Releasing logout latch for session [{}] with message {}", sessionId, message); logoutLatch.countDown();