From 6bf9d1cbcb89f6d81431310f93e2862a18ff43a2 Mon Sep 17 00:00:00 2001 From: Andrei Korzhevskii Date: Thu, 15 Apr 2021 18:55:24 +0100 Subject: [PATCH 1/3] [QFJ-968] Allow provision of custom Message Queue implementation --- .../java/quickfix/DefaultSessionFactory.java | 19 +++++- .../java/quickfix/InMemoryMessageQueue.java | 55 +++++++++++++++++ .../quickfix/InMemoryMessageQueueFactory.java | 33 ++++++++++ .../src/main/java/quickfix/MessageQueue.java | 60 +++++++++++++++++++ .../java/quickfix/MessageQueueFactory.java | 36 +++++++++++ .../src/main/java/quickfix/Session.java | 38 ++++++++++-- .../src/main/java/quickfix/SessionState.java | 46 +++----------- .../quickfix/SessionFactoryTestSupport.java | 10 +++- .../test/java/quickfix/SessionStateTest.java | 6 +- .../src/test/java/quickfix/SessionTest.java | 38 +++++++----- 10 files changed, 278 insertions(+), 63 deletions(-) create mode 100644 quickfixj-core/src/main/java/quickfix/InMemoryMessageQueue.java create mode 100644 quickfixj-core/src/main/java/quickfix/InMemoryMessageQueueFactory.java create mode 100644 quickfixj-core/src/main/java/quickfix/MessageQueue.java create mode 100644 quickfixj-core/src/main/java/quickfix/MessageQueueFactory.java diff --git a/quickfixj-core/src/main/java/quickfix/DefaultSessionFactory.java b/quickfixj-core/src/main/java/quickfix/DefaultSessionFactory.java index e16a377004..b022a36ade 100644 --- a/quickfixj-core/src/main/java/quickfix/DefaultSessionFactory.java +++ b/quickfixj-core/src/main/java/quickfix/DefaultSessionFactory.java @@ -47,6 +47,7 @@ public class DefaultSessionFactory implements SessionFactory { private final Application application; private final MessageStoreFactory messageStoreFactory; + private final MessageQueueFactory messageQueueFactory; private final LogFactory logFactory; private final MessageFactory messageFactory; private final SessionScheduleFactory sessionScheduleFactory; @@ -55,6 +56,7 @@ public DefaultSessionFactory(Application application, MessageStoreFactory messag LogFactory logFactory) { this.application = application; this.messageStoreFactory = messageStoreFactory; + this.messageQueueFactory = new InMemoryMessageQueueFactory(); this.logFactory = logFactory; this.messageFactory = new DefaultMessageFactory(); this.sessionScheduleFactory = new DefaultSessionScheduleFactory(); @@ -64,6 +66,7 @@ public DefaultSessionFactory(Application application, MessageStoreFactory messag LogFactory logFactory, MessageFactory messageFactory) { this.application = application; this.messageStoreFactory = messageStoreFactory; + this.messageQueueFactory = new InMemoryMessageQueueFactory(); this.logFactory = logFactory; this.messageFactory = messageFactory; this.sessionScheduleFactory = new DefaultSessionScheduleFactory(); @@ -74,6 +77,18 @@ public DefaultSessionFactory(Application application, MessageStoreFactory messag SessionScheduleFactory sessionScheduleFactory) { this.application = application; this.messageStoreFactory = messageStoreFactory; + this.messageQueueFactory = new InMemoryMessageQueueFactory(); + this.logFactory = logFactory; + this.messageFactory = messageFactory; + this.sessionScheduleFactory = sessionScheduleFactory; + } + + public DefaultSessionFactory(Application application, MessageStoreFactory messageStoreFactory, + MessageQueueFactory messageQueueFactory, LogFactory logFactory, + MessageFactory messageFactory, SessionScheduleFactory sessionScheduleFactory) { + this.application = application; + this.messageStoreFactory = messageStoreFactory; + this.messageQueueFactory = messageQueueFactory; this.logFactory = logFactory; this.messageFactory = messageFactory; this.sessionScheduleFactory = sessionScheduleFactory; @@ -221,8 +236,8 @@ public Session create(SessionID sessionID, SessionSettings settings) throws Conf final List logonTags = getLogonTags(settings, sessionID); - final Session session = new Session(application, messageStoreFactory, sessionID, - dataDictionaryProvider, sessionSchedule, logFactory, + final Session session = new Session(application, messageStoreFactory, messageQueueFactory, + sessionID, dataDictionaryProvider, sessionSchedule, logFactory, messageFactory, heartbeatInterval, checkLatency, maxLatency, timestampPrecision, resetOnLogon, resetOnLogout, resetOnDisconnect, refreshOnLogon, checkCompID, redundantResentRequestAllowed, persistMessages, useClosedIntervalForResend, diff --git a/quickfixj-core/src/main/java/quickfix/InMemoryMessageQueue.java b/quickfixj-core/src/main/java/quickfix/InMemoryMessageQueue.java new file mode 100644 index 0000000000..9c16dd619b --- /dev/null +++ b/quickfixj-core/src/main/java/quickfix/InMemoryMessageQueue.java @@ -0,0 +1,55 @@ +/******************************************************************************* + * Copyright (c) quickfixengine.org All rights reserved. + * + * This file is part of the QuickFIX FIX Engine + * + * This file may be distributed under the terms of the quickfixengine.org + * license as defined by quickfixengine.org and appearing in the file + * LICENSE included in the packaging of this file. + * + * This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING + * THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A + * PARTICULAR PURPOSE. + * + * See http://www.quickfixengine.org/LICENSE for licensing information. + * + * Contact ask@quickfixengine.org if any conditions of this licensing + * are not clear to you. + ******************************************************************************/ + +package quickfix; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * An in-memory implementation of MessageQueue. + * It uses a linked hash map as a backing map. + * + * @see MessageQueue + */ +public class InMemoryMessageQueue implements MessageQueue { + + // The map should be accessed from a single thread + private final Map backingMap = new LinkedHashMap<>(); + + @Override + public void enqueue(int sequence, Message message) { + backingMap.put(sequence, message); + } + + @Override + public Message dequeue(int sequence) { + return backingMap.remove(sequence); + } + + @Override + public void clear() { + backingMap.clear(); + } + + // used in tests + Map getBackingMap() { + return backingMap; + } +} diff --git a/quickfixj-core/src/main/java/quickfix/InMemoryMessageQueueFactory.java b/quickfixj-core/src/main/java/quickfix/InMemoryMessageQueueFactory.java new file mode 100644 index 0000000000..a6b8b4502e --- /dev/null +++ b/quickfixj-core/src/main/java/quickfix/InMemoryMessageQueueFactory.java @@ -0,0 +1,33 @@ +/******************************************************************************* + * Copyright (c) quickfixengine.org All rights reserved. + * + * This file is part of the QuickFIX FIX Engine + * + * This file may be distributed under the terms of the quickfixengine.org + * license as defined by quickfixengine.org and appearing in the file + * LICENSE included in the packaging of this file. + * + * This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING + * THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A + * PARTICULAR PURPOSE. + * + * See http://www.quickfixengine.org/LICENSE for licensing information. + * + * Contact ask@quickfixengine.org if any conditions of this licensing + * are not clear to you. + ******************************************************************************/ + +package quickfix; + +/** + * Creates a message queue that stores all messages in memory. + * + * @see MessageQueue + */ +public class InMemoryMessageQueueFactory implements MessageQueueFactory { + + @Override + public MessageQueue create(SessionID sessionID) { + return new InMemoryMessageQueue(); + } +} diff --git a/quickfixj-core/src/main/java/quickfix/MessageQueue.java b/quickfixj-core/src/main/java/quickfix/MessageQueue.java new file mode 100644 index 0000000000..79e4e16ee8 --- /dev/null +++ b/quickfixj-core/src/main/java/quickfix/MessageQueue.java @@ -0,0 +1,60 @@ +/******************************************************************************* + * Copyright (c) quickfixengine.org All rights reserved. + * + * This file is part of the QuickFIX FIX Engine + * + * This file may be distributed under the terms of the quickfixengine.org + * license as defined by quickfixengine.org and appearing in the file + * LICENSE included in the packaging of this file. + * + * This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING + * THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A + * PARTICULAR PURPOSE. + * + * See http://www.quickfixengine.org/LICENSE for licensing information. + * + * Contact ask@quickfixengine.org if any conditions of this licensing + * are not clear to you. + ******************************************************************************/ + +package quickfix; + +/** + * Used by a Session to store and retrieve messages with a sequence number higher than expected. + * + * @see quickfix.Session + */ +interface MessageQueue { + + /** + * Enqueue a message. + * + * @param sequence the sequence number + * @param message the FIX message + */ + void enqueue(int sequence, Message message); + + /** + * Dequeue a message with given sequence number. + * + * @param sequence the sequence number + * @return message the FIX message + */ + Message dequeue(int sequence); + + /** + * Remove messages from queue up to a given sequence number. + * + * @param seqnum up to which sequence number messages should be deleted + */ + default void dequeueMessagesUpTo(int seqnum) { + for (int i = 1; i < seqnum; i++) { + dequeue(i); + } + } + + /** + * Clear the queue. + */ + void clear(); +} diff --git a/quickfixj-core/src/main/java/quickfix/MessageQueueFactory.java b/quickfixj-core/src/main/java/quickfix/MessageQueueFactory.java new file mode 100644 index 0000000000..0660844024 --- /dev/null +++ b/quickfixj-core/src/main/java/quickfix/MessageQueueFactory.java @@ -0,0 +1,36 @@ +/******************************************************************************* + * Copyright (c) quickfixengine.org All rights reserved. + * + * This file is part of the QuickFIX FIX Engine + * + * This file may be distributed under the terms of the quickfixengine.org + * license as defined by quickfixengine.org and appearing in the file + * LICENSE included in the packaging of this file. + * + * This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING + * THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A + * PARTICULAR PURPOSE. + * + * See http://www.quickfixengine.org/LICENSE for licensing information. + * + * Contact ask@quickfixengine.org if any conditions of this licensing + * are not clear to you. + ******************************************************************************/ + +package quickfix; + +/** + * Used by a Session to create a message queue implementation. + * + * @see Session + */ +public interface MessageQueueFactory { + + /** + * Creates a message queue implementation. + * + * @param sessionID the session ID, often used to access session configurations + * @return the message queue implementation + */ + MessageQueue create(SessionID sessionID); +} diff --git a/quickfixj-core/src/main/java/quickfix/Session.java b/quickfixj-core/src/main/java/quickfix/Session.java index 41600a1dc5..e23ef60297 100644 --- a/quickfixj-core/src/main/java/quickfix/Session.java +++ b/quickfixj-core/src/main/java/quickfix/Session.java @@ -483,6 +483,29 @@ public class Session implements Closeable { boolean validateIncomingMessage, int resendRequestChunkSize, boolean enableNextExpectedMsgSeqNum, boolean enableLastMsgSeqNumProcessed, boolean validateChecksum, List logonTags, double heartBeatTimeoutMultiplier) { + this(application, messageStoreFactory, new InMemoryMessageQueueFactory(), sessionID, dataDictionaryProvider, sessionSchedule, logFactory, + messageFactory, heartbeatInterval, true, DEFAULT_MAX_LATENCY, UtcTimestampPrecision.MILLIS, false, false, + false, false, true, false, true, false, DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, null, true, new int[] {5}, + false, false, false, false, true, false, true, false, null, true, DEFAULT_RESEND_RANGE_CHUNK_SIZE, false, + false, false, new ArrayList(), DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER); + } + + Session(Application application, MessageStoreFactory messageStoreFactory, MessageQueueFactory messageQueueFactory, + SessionID sessionID, DataDictionaryProvider dataDictionaryProvider, SessionSchedule sessionSchedule, + LogFactory logFactory, MessageFactory messageFactory, int heartbeatInterval, + boolean checkLatency, int maxLatency, UtcTimestampPrecision timestampPrecision, + boolean resetOnLogon, boolean resetOnLogout, boolean resetOnDisconnect, + boolean refreshOnLogon, boolean checkCompID, + boolean redundantResentRequestsAllowed, boolean persistMessages, + boolean useClosedRangeForResend, double testRequestDelayMultiplier, + DefaultApplVerID senderDefaultApplVerID, boolean validateSequenceNumbers, + int[] logonIntervals, boolean resetOnError, boolean disconnectOnError, + boolean disableHeartBeatCheck, boolean rejectGarbledMessage, boolean rejectInvalidMessage, + boolean rejectMessageOnUnhandledException, boolean requiresOrigSendingTime, + boolean forceResendWhenCorruptedStore, Set allowedRemoteAddresses, + boolean validateIncomingMessage, int resendRequestChunkSize, + boolean enableNextExpectedMsgSeqNum, boolean enableLastMsgSeqNumProcessed, + boolean validateChecksum, List logonTags, double heartBeatTimeoutMultiplier) { this.application = application; this.sessionID = sessionID; this.sessionSchedule = sessionSchedule; @@ -528,8 +551,13 @@ public class Session implements Closeable { addStateListener((SessionStateListener) messageStore); } + final MessageQueue messageQueue = messageQueueFactory.create(sessionID); + if (messageQueue instanceof SessionStateListener) { + addStateListener((SessionStateListener) messageQueue); + } + state = new SessionState(this, engineLog, heartbeatInterval, heartbeatInterval != 0, - messageStore, testRequestDelayMultiplier, heartBeatTimeoutMultiplier); + messageStore, messageQueue, testRequestDelayMultiplier, heartBeatTimeoutMultiplier); registerSession(this); @@ -1518,7 +1546,7 @@ private void nextSequenceReset(Message sequenceReset) throws IOException, Reject } // QFJ-728: newSequence will be the seqnum of the next message so we // delete all older messages from the queue since they are effectively skipped. - state.dequeueMessagesUpTo(newSequence); + state.getMessageQueue().dequeueMessagesUpTo(newSequence); } else if (newSequence < getExpectedTargetNum()) { getLog().onErrorEvent( @@ -2092,7 +2120,7 @@ public void disconnect(String reason, boolean logError) throws IOException { state.setLogoutReceived(false); state.setResetReceived(false); state.setResetSent(false); - state.clearQueue(); + state.getMessageQueue().clear(); state.clearLogoutReason(); state.setResendRange(0, 0); @@ -2384,7 +2412,7 @@ private void nextQueued() throws FieldNotFound, RejectLogon, IncorrectDataFormat private boolean nextQueued(int num) throws FieldNotFound, RejectLogon, IncorrectDataFormat, IncorrectTagValue, UnsupportedMessageType, IOException, InvalidMessage { - final Message msg = state.dequeue(num); + final Message msg = state.getMessageQueue().dequeue(num); if (msg != null) { getLog().onEvent("Processing queued message: " + num); @@ -2653,7 +2681,7 @@ private boolean sendRaw(Message message, int num) { } private void enqueueMessage(final Message msg, final int msgSeqNum) { - state.enqueue(msgSeqNum, msg); + state.getMessageQueue().enqueue(msgSeqNum, msg); getLog().onEvent("Enqueued at pos " + msgSeqNum + ": " + msg); } diff --git a/quickfixj-core/src/main/java/quickfix/SessionState.java b/quickfixj-core/src/main/java/quickfix/SessionState.java index 39f71ee1e6..ed1e995daf 100644 --- a/quickfixj-core/src/main/java/quickfix/SessionState.java +++ b/quickfixj-core/src/main/java/quickfix/SessionState.java @@ -22,8 +22,6 @@ import java.io.IOException; import java.util.Collection; import java.util.Date; -import java.util.LinkedHashMap; -import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; @@ -40,6 +38,9 @@ public final class SessionState { // MessageStore implementation must be thread safe private final MessageStore messageStore; + // MessageQueue implementation must be thread safe + private final MessageQueue messageQueue; + private final Lock senderMsgSeqNumLock = new ReentrantLock(); private final Lock targetMsgSeqNumLock = new ReentrantLock(); @@ -79,14 +80,12 @@ public final class SessionState { */ private final AtomicInteger nextExpectedMsgSeqNum = new AtomicInteger(0); - // The messageQueue should be accessed from a single thread - private final Map messageQueue = new LinkedHashMap<>(); - public SessionState(Object lock, Log log, int heartBeatInterval, boolean initiator, MessageStore messageStore, - double testRequestDelayMultiplier, double heartBeatTimeoutMultiplier) { + MessageQueue messageQueue, double testRequestDelayMultiplier, double heartBeatTimeoutMultiplier) { this.lock = lock; this.initiator = initiator; this.messageStore = messageStore; + this.messageQueue = messageQueue; setHeartBeatInterval(heartBeatInterval); this.log = log == null ? new NullLog() : log; this.testRequestDelayMultiplier = testRequestDelayMultiplier; @@ -260,6 +259,10 @@ public MessageStore getMessageStore() { return messageStore; } + public MessageQueue getMessageQueue() { + return messageQueue; + } + private int getTestRequestCounter() { synchronized (lock) { return testRequestCounter; @@ -305,37 +308,6 @@ public void get(int first, int last, Collection messages) throws IOExcep messageStore.get(first, last, messages); } - public void enqueue(int sequence, Message message) { - messageQueue.put(sequence, message); - } - - public Message dequeue(int sequence) { - return messageQueue.remove(sequence); - } - - /** - * Remove messages from messageQueue up to a given sequence number. - * - * @param seqnum up to which sequence number messages should be deleted - */ - public void dequeueMessagesUpTo(int seqnum) { - for (int i = 1; i < seqnum; i++) { - dequeue(i); - } - } - - public Message getNextQueuedMessage() { - return !messageQueue.isEmpty() ? messageQueue.values().iterator().next() : null; - } - - public Collection getQueuedSeqNums() { - return messageQueue.keySet(); - } - - public void clearQueue() { - messageQueue.clear(); - } - public void lockSenderMsgSeqNum() { senderMsgSeqNumLock.lock(); } diff --git a/quickfixj-core/src/test/java/quickfix/SessionFactoryTestSupport.java b/quickfixj-core/src/test/java/quickfix/SessionFactoryTestSupport.java index 8be5bbad1f..5dfe57f06f 100644 --- a/quickfixj-core/src/test/java/quickfix/SessionFactoryTestSupport.java +++ b/quickfixj-core/src/test/java/quickfix/SessionFactoryTestSupport.java @@ -77,6 +77,7 @@ public static final class Builder { private Supplier sessionIDSupplier = () -> new SessionID(beginString, senderCompID, targetCompID); private Supplier applicationSupplier = UnitTestApplication::new; private Supplier messageStoreFactorySupplier = MemoryStoreFactory::new; + private Supplier messageQueueFactorySupplier = InMemoryMessageQueueFactory::new; private Supplier dataDictionaryProviderSupplier = () -> null; private Supplier sessionScheduleSupplier = () -> null; private Supplier logFactorySupplier = () -> new ScreenLogFactory(true, true, true); @@ -114,8 +115,8 @@ public static final class Builder { private List logonTags = new ArrayList<>(); public Session build() { - return new Session(applicationSupplier.get(), messageStoreFactorySupplier.get(), sessionIDSupplier.get(), - dataDictionaryProviderSupplier.get(), sessionScheduleSupplier.get(), logFactorySupplier.get(), + return new Session(applicationSupplier.get(), messageStoreFactorySupplier.get(), messageQueueFactorySupplier.get(), + sessionIDSupplier.get(), dataDictionaryProviderSupplier.get(), sessionScheduleSupplier.get(), logFactorySupplier.get(), messageFactorySupplier.get(), sessionHeartbeatIntervalSupplier.get(), checkLatency, maxLatency, timestampPrecision, resetOnLogon, resetOnLogout, resetOnDisconnect, refreshMessageStoreAtLogon, checkCompID, redundantResentRequestsAllowed, persistMessages, useClosedRangeForResend, @@ -158,6 +159,11 @@ public Builder setMessageStoreFactory(final MessageStoreFactory messageStoreFact return this; } + public Builder setMessageQueueFactory(final MessageQueueFactory messageQueueFactory) { + this.messageQueueFactorySupplier = () -> messageQueueFactory; + return this; + } + public Builder setDataDictionaryProvider(final DataDictionaryProvider dataDictionaryProvider) { this.dataDictionaryProviderSupplier = () -> dataDictionaryProvider; return this; diff --git a/quickfixj-core/src/test/java/quickfix/SessionStateTest.java b/quickfixj-core/src/test/java/quickfix/SessionStateTest.java index dc0a5e8440..bcc43d21ef 100644 --- a/quickfixj-core/src/test/java/quickfix/SessionStateTest.java +++ b/quickfixj-core/src/test/java/quickfix/SessionStateTest.java @@ -44,7 +44,7 @@ public void tearDown() { @Test public void testTimeoutDefaultsAreNonzero() { SessionState state = new SessionState(new Object(), null, 0, false, null, - Session.DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER); + null, Session.DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER); state.setLastReceivedTime(900); assertFalse("logon timeout not init'ed", state.isLogonTimedOut()); @@ -56,7 +56,7 @@ public void testTimeoutDefaultsAreNonzero() { @Test public void testTestRequestTiming() { SessionState state = new SessionState(new Object(), null, 0, false, null, - Session.DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER); + null, Session.DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER); state.setLastReceivedTime(950); state.setHeartBeatInterval(50); assertFalse("testRequest shouldn't be needed yet", state.isTestRequestNeeded()); @@ -73,7 +73,7 @@ public void testTestRequestTiming() { @Test public void testSessionTimeout() { SessionState state = new SessionState(new Object(), null, 30, false, null, - Session.DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER); + null, Session.DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER); // session should timeout after 2.4 * 30 = 72 seconds state.setLastReceivedTime(950_000); diff --git a/quickfixj-core/src/test/java/quickfix/SessionTest.java b/quickfixj-core/src/test/java/quickfix/SessionTest.java index d7e3741177..b67b212941 100644 --- a/quickfixj-core/src/test/java/quickfix/SessionTest.java +++ b/quickfixj-core/src/test/java/quickfix/SessionTest.java @@ -94,12 +94,16 @@ public void testDisposalOfFileResources() throws Exception { stub(mockMessageStoreFactory.create(sessionID)).toReturn( mockMessageStore); + final MessageQueueFactory mockMessageQueueFactory = mock(MessageQueueFactory.class); + final MessageQueue mockMessageQueue = mock(MessageQueue.class); + stub(mockMessageQueueFactory.create(sessionID)).toReturn(mockMessageQueue); + final LogFactory mockLogFactory = mock(LogFactory.class); final CloseableLog mockLog = mock(CloseableLog.class); stub(mockLogFactory.create(sessionID)).toReturn(mockLog); try (Session session = new Session(application, - mockMessageStoreFactory, sessionID, null, null, mockLogFactory, + mockMessageStoreFactory, mockMessageQueueFactory, sessionID, null, null, mockLogFactory, new DefaultMessageFactory(), 30, false, 30, UtcTimestampPrecision.MILLIS, true, false, false, false, false, false, true, false, 1.5, null, true, new int[] { 5 }, false, false, false, false, true, false, true, false, @@ -135,12 +139,16 @@ public void testNondisposableFileResources() throws Exception { stub(mockMessageStoreFactory.create(sessionID)).toReturn( mockMessageStore); + final MessageQueueFactory mockMessageQueueFactory = mock(MessageQueueFactory.class); + final MessageQueue mockMessageQueue = mock(MessageQueue.class); + stub(mockMessageQueueFactory.create(sessionID)).toReturn(mockMessageQueue); + final LogFactory mockLogFactory = mock(LogFactory.class); final Log mockLog = mock(Log.class); stub(mockLogFactory.create(sessionID)).toReturn(mockLog); try (Session session = new Session(application, - mockMessageStoreFactory, sessionID, null, null, mockLogFactory, + mockMessageStoreFactory, mockMessageQueueFactory, sessionID, null, null, mockLogFactory, new DefaultMessageFactory(), 30, false, 30, UtcTimestampPrecision.MILLIS, true, false, false, false, false, false, true, false, 1.5, null, true, new int[] { 5 }, false, false, false, false, true, false, true, false, @@ -1301,7 +1309,8 @@ public void testSequenceResetStackOverflow() throws Exception { try (Session session = setUpSession(application, false, new UnitTestResponder())) { final SessionState state = getSessionState(session); - + final InMemoryMessageQueue queue = (InMemoryMessageQueue) state.getMessageQueue(); + logonTo(session, 1); assertTrue(session.isLoggedOn()); @@ -1324,7 +1333,7 @@ public void testSequenceResetStackOverflow() throws Exception { assertEquals(52, state.getNextTargetMsgSeqNum()); assertTrue(session.isLoggedOn()); assertFalse(state.isResendRequested()); - assertTrue(state.getQueuedSeqNums().isEmpty()); + assertTrue(queue.getBackingMap().isEmpty()); } } @@ -1528,6 +1537,7 @@ public void testRemoveQueuedMessagesOnSequenceReset() throws Exception { try (Session session = setUpSession(application, false, new UnitTestResponder())) { final SessionState state = getSessionState(session); + final InMemoryMessageQueue queue = (InMemoryMessageQueue) state.getMessageQueue(); final int from = 10; int numberOfMsgs = 200; @@ -1539,10 +1549,10 @@ public void testRemoveQueuedMessagesOnSequenceReset() throws Exception { processMessage(session, createAppMessage(i)); } for (int i = from; i < to; i++) { - assertTrue(state.getQueuedSeqNums().contains(i)); + assertTrue(queue.getBackingMap().containsKey(i)); } - assertEquals(state.getQueuedSeqNums().size(), numberOfMsgs); + assertEquals(queue.getBackingMap().size(), numberOfMsgs); assertTrue(application.fromAppMessages.isEmpty()); // Create a sequence reset which will cause deletion of almost all // messages @@ -1554,7 +1564,7 @@ public void testRemoveQueuedMessagesOnSequenceReset() throws Exception { assertEquals(application.fromAppMessages.size(), two); assertFalse(state.isResendRequested()); assertTrue(session.isLoggedOn()); - assertTrue(state.getQueuedSeqNums().isEmpty()); + assertTrue(queue.getBackingMap().isEmpty()); } } @@ -2099,7 +2109,7 @@ private void testSequenceResetGapFillWithChunkSize(int chunkSize) boolean isInitiator = true, resetOnLogon = false, validateSequenceNumbers = true; try (Session session = new Session(new UnitTestApplication(), - new MemoryStoreFactory(), sessionID, null, null, + new MemoryStoreFactory(), new InMemoryMessageQueueFactory(), sessionID, null, null, new SLF4JLogFactory(new SessionSettings()), new DefaultMessageFactory(), isInitiator ? 30 : 0, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon, false, false, false, false, false, true, @@ -2162,7 +2172,7 @@ public void correct_sequence_number_for_last_gap_fill_if_next_sender_sequence_nu final boolean resetOnLogon = false; final boolean validateSequenceNumbers = true; - Session session = new Session(new UnitTestApplication(), new MemoryStoreFactory(), + Session session = new Session(new UnitTestApplication(), new MemoryStoreFactory(), new InMemoryMessageQueueFactory(), sessionID, null, null, null, new DefaultMessageFactory(), 30, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon, false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers, @@ -2210,7 +2220,7 @@ public void correct_sequence_number_for_last_gap_fill_if_next_sender_sequence_nu final boolean resetOnLogon = false; final boolean validateSequenceNumbers = true; - Session session = new Session(new UnitTestApplication(), new MemoryStoreFactory(), + Session session = new Session(new UnitTestApplication(), new MemoryStoreFactory(), new InMemoryMessageQueueFactory(), sessionID, null, null, null, new DefaultMessageFactory(), 30, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon, false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers, @@ -2258,7 +2268,7 @@ public void testMsgSeqNumTooHighWithDisconnectOnError() throws Exception { final boolean disconnectOnError = true; try (Session session = new Session(new UnitTestApplication(), - new MemoryStoreFactory(), sessionID, null, null, + new MemoryStoreFactory(), new InMemoryMessageQueueFactory(), sessionID, null, null, new SLF4JLogFactory(new SessionSettings()), new DefaultMessageFactory(), isInitiator ? 30 : 0, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon, false, false, false, false, false, true, @@ -2294,7 +2304,7 @@ public void testTimestampPrecision() throws Exception { UnitTestApplication unitTestApplication = new UnitTestApplication(); try (Session session = new Session(unitTestApplication, - new MemoryStoreFactory(), sessionID, null, null, + new MemoryStoreFactory(), new InMemoryMessageQueueFactory(), sessionID, null, null, new SLF4JLogFactory(new SessionSettings()), new DefaultMessageFactory(), isInitiator ? 30 : 0, false, 30, UtcTimestampPrecision.NANOS, resetOnLogon, false, false, false, false, false, true, @@ -2347,7 +2357,7 @@ private void testLargeQueue(int N) throws Exception { boolean isInitiator = true, resetOnLogon = false, validateSequenceNumbers = true; final UnitTestApplication unitTestApplication = new UnitTestApplication(); - Session session = new Session(unitTestApplication, new MemoryStoreFactory(), + Session session = new Session(unitTestApplication, new MemoryStoreFactory(), new InMemoryMessageQueueFactory(), sessionID, null, null, null, new DefaultMessageFactory(), isInitiator ? 30 : 0, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon, false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers, @@ -2463,7 +2473,7 @@ public void fromAdmin(Message message, SessionID sessionId) throws FieldNotFound } }; - Session session = new Session(app, new MemoryStoreFactory(), + Session session = new Session(app, new MemoryStoreFactory(), new InMemoryMessageQueueFactory(), sessionID, null, null, null, new DefaultMessageFactory(), isInitiator ? 30 : 0, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon, false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers, From 01a8c66cc0588903962d70990ef1d61e26e9c416 Mon Sep 17 00:00:00 2001 From: Christoph John Date: Mon, 12 Sep 2022 11:48:53 +0200 Subject: [PATCH 2/3] Replaced mockito `stub` call --- quickfixj-core/src/test/java/quickfix/SessionTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/quickfixj-core/src/test/java/quickfix/SessionTest.java b/quickfixj-core/src/test/java/quickfix/SessionTest.java index c910d677f5..98d2f7ff37 100644 --- a/quickfixj-core/src/test/java/quickfix/SessionTest.java +++ b/quickfixj-core/src/test/java/quickfix/SessionTest.java @@ -94,7 +94,7 @@ public void testDisposalOfFileResources() throws Exception { final MessageQueueFactory mockMessageQueueFactory = mock(MessageQueueFactory.class); final MessageQueue mockMessageQueue = mock(MessageQueue.class); - stub(mockMessageQueueFactory.create(sessionID)).toReturn(mockMessageQueue); + when(mockMessageQueueFactory.create(sessionID)).thenReturn(mockMessageQueue); final LogFactory mockLogFactory = mock(LogFactory.class); final CloseableLog mockLog = mock(CloseableLog.class); @@ -138,7 +138,7 @@ public void testNondisposableFileResources() throws Exception { final MessageQueueFactory mockMessageQueueFactory = mock(MessageQueueFactory.class); final MessageQueue mockMessageQueue = mock(MessageQueue.class); - stub(mockMessageQueueFactory.create(sessionID)).toReturn(mockMessageQueue); + when(mockMessageQueueFactory.create(sessionID)).thenReturn(mockMessageQueue); final LogFactory mockLogFactory = mock(LogFactory.class); final Log mockLog = mock(Log.class); From cc4058b5397bdc9475548996e3fd7e49943fb43e Mon Sep 17 00:00:00 2001 From: Christoph John Date: Mon, 12 Sep 2022 11:51:11 +0200 Subject: [PATCH 3/3] Added missing constructor parameter --- quickfixj-core/src/test/java/quickfix/SessionStateTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quickfixj-core/src/test/java/quickfix/SessionStateTest.java b/quickfixj-core/src/test/java/quickfix/SessionStateTest.java index 507f0f3c58..c058189b2a 100644 --- a/quickfixj-core/src/test/java/quickfix/SessionStateTest.java +++ b/quickfixj-core/src/test/java/quickfix/SessionStateTest.java @@ -74,7 +74,7 @@ public void testTestRequestTiming() { public void testHeartbeatTiming() { // we set a HB interval of 2 seconds = 2000ms SessionState state = new SessionState(new Object(), null, 2 /* HB interval */, false, null, - Session.DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER); + null, Session.DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER); long now = System.currentTimeMillis(); timeSource.setSystemTimes(now);