diff --git a/quickfixj-core/src/main/java/quickfix/DefaultSessionFactory.java b/quickfixj-core/src/main/java/quickfix/DefaultSessionFactory.java index 355d7612eb..dc90935d4a 100644 --- a/quickfixj-core/src/main/java/quickfix/DefaultSessionFactory.java +++ b/quickfixj-core/src/main/java/quickfix/DefaultSessionFactory.java @@ -221,6 +221,7 @@ public Session create(SessionID sessionID, SessionSettings settings) throws Conf final SessionSchedule sessionSchedule = sessionScheduleFactory.create(sessionID, settings); final List logonTags = getLogonTags(settings, sessionID); + final int maxMessagesQueuedWhilePendingResend = getSetting(settings, sessionID, Session.SETTING_MAX_MESSAGES_QUEUED_WHILE_PENDING_RESEND, 2000); final Session session = new Session(application, messageStoreFactory, sessionID, dataDictionaryProvider, sessionSchedule, logFactory, @@ -232,7 +233,7 @@ public Session create(SessionID sessionID, SessionSettings settings) throws Conf rejectInvalidMessage, rejectMessageOnUnhandledException, requiresOrigSendingTime, forceResendWhenCorruptedStore, allowedRemoteAddresses, validateIncomingMessage, resendRequestChunkSize, enableNextExpectedMsgSeqNum, enableLastMsgSeqNumProcessed, - validateChecksum, logonTags, heartBeatTimeoutMultiplier, allowPossDup); + validateChecksum, logonTags, heartBeatTimeoutMultiplier, allowPossDup, maxMessagesQueuedWhilePendingResend); session.setLogonTimeout(logonTimeout); session.setLogoutTimeout(logoutTimeout); diff --git a/quickfixj-core/src/main/java/quickfix/Session.java b/quickfixj-core/src/main/java/quickfix/Session.java index 54bf389b5a..2766d8e1a1 100644 --- a/quickfixj-core/src/main/java/quickfix/Session.java +++ b/quickfixj-core/src/main/java/quickfix/Session.java @@ -385,6 +385,8 @@ public class Session implements Closeable { */ public static final String SETTING_ALLOW_POS_DUP_MESSAGES = "AllowPosDup"; + public static final String SETTING_MAX_MESSAGES_QUEUED_WHILE_PENDING_RESEND = "MaxMessagesQueuedWhilePendingResend"; + private static final ConcurrentMap sessions = new ConcurrentHashMap<>(); private final Application application; @@ -458,6 +460,7 @@ public class Session implements Closeable { public static final double DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER = 0.5; public static final double DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER = 1.4; private static final String ENCOUNTERED_END_OF_STREAM = "Encountered END_OF_STREAM"; + public static final int DEFAULT_MAX_MESSAGES_PENDING_RESEND = 2000; private static final int BAD_COMPID_REJ_REASON = SessionRejectReason.COMPID_PROBLEM; @@ -477,7 +480,7 @@ public class Session implements Closeable { messageFactory, heartbeatInterval, true, DEFAULT_MAX_LATENCY, UtcTimestampPrecision.MILLIS, false, false, false, false, true, false, true, false, DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, null, true, new int[] {5}, false, false, false, false, true, false, true, false, null, true, DEFAULT_RESEND_RANGE_CHUNK_SIZE, false, - false, false, new ArrayList(), DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false); + false, false, new ArrayList(), DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false, DEFAULT_MAX_MESSAGES_PENDING_RESEND); } Session(Application application, MessageStoreFactory messageStoreFactory, SessionID sessionID, @@ -496,7 +499,8 @@ public class Session implements Closeable { boolean validateIncomingMessage, int resendRequestChunkSize, boolean enableNextExpectedMsgSeqNum, boolean enableLastMsgSeqNumProcessed, boolean validateChecksum, List logonTags, double heartBeatTimeoutMultiplier, - boolean allowPossDup) { + boolean allowPossDup, + int maxMessagesQueuedWhilePendingResend) { this.application = application; this.sessionID = sessionID; this.sessionSchedule = sessionSchedule; @@ -544,7 +548,7 @@ public class Session implements Closeable { } state = new SessionState(this, engineLog, heartbeatInterval, heartbeatInterval != 0, - messageStore, testRequestDelayMultiplier, heartBeatTimeoutMultiplier); + messageStore, testRequestDelayMultiplier, heartBeatTimeoutMultiplier, maxMessagesQueuedWhilePendingResend); registerSession(this); diff --git a/quickfixj-core/src/main/java/quickfix/SessionState.java b/quickfixj-core/src/main/java/quickfix/SessionState.java index 39f71ee1e6..c1d6a5ea3b 100644 --- a/quickfixj-core/src/main/java/quickfix/SessionState.java +++ b/quickfixj-core/src/main/java/quickfix/SessionState.java @@ -59,6 +59,7 @@ public final class SessionState { private final double heartBeatTimeoutMultiplier; private long heartBeatMillis = Long.MAX_VALUE; private int heartBeatInterval; + private final int maxMessagesQueuedWhilePendingResend; private final ResendRange resendRange = new ResendRange(); private boolean resetSent; @@ -83,7 +84,7 @@ public final class SessionState { private final Map messageQueue = new LinkedHashMap<>(); public SessionState(Object lock, Log log, int heartBeatInterval, boolean initiator, MessageStore messageStore, - double testRequestDelayMultiplier, double heartBeatTimeoutMultiplier) { + double testRequestDelayMultiplier, double heartBeatTimeoutMultiplier, int maxMessagesQueuedWhilePendingResend) { this.lock = lock; this.initiator = initiator; this.messageStore = messageStore; @@ -91,6 +92,7 @@ public SessionState(Object lock, Log log, int heartBeatInterval, boolean initiat this.log = log == null ? new NullLog() : log; this.testRequestDelayMultiplier = testRequestDelayMultiplier; this.heartBeatTimeoutMultiplier = heartBeatTimeoutMultiplier; + this.maxMessagesQueuedWhilePendingResend = maxMessagesQueuedWhilePendingResend; } public int getHeartBeatInterval() { @@ -306,7 +308,9 @@ public void get(int first, int last, Collection messages) throws IOExcep } public void enqueue(int sequence, Message message) { - messageQueue.put(sequence, message); + if (messageQueue.size() < maxMessagesQueuedWhilePendingResend || maxMessagesQueuedWhilePendingResend == -1) { + messageQueue.put(sequence, message); + } } public Message dequeue(int sequence) { diff --git a/quickfixj-core/src/test/java/quickfix/SessionFactoryTestSupport.java b/quickfixj-core/src/test/java/quickfix/SessionFactoryTestSupport.java index af6ad7a851..120f42f60c 100644 --- a/quickfixj-core/src/test/java/quickfix/SessionFactoryTestSupport.java +++ b/quickfixj-core/src/test/java/quickfix/SessionFactoryTestSupport.java @@ -113,6 +113,7 @@ public static final class Builder { private final boolean validateChecksum = true; private final boolean allowPosDup = false; private List logonTags = new ArrayList<>(); + private final int maxMessagesQueuedWhilePendingResend = 2000; public Session build() { return new Session(applicationSupplier.get(), messageStoreFactorySupplier.get(), sessionIDSupplier.get(), @@ -124,7 +125,8 @@ public Session build() { resetOnError, disconnectOnError, disableHeartBeatCheck, false, rejectInvalidMessage, rejectMessageOnUnhandledException, requiresOrigSendingTime, forceResendWhenCorruptedStore, allowedRemoteAddresses, validateIncomingMessage, resendRequestChunkSize, enableNextExpectedMsgSeqNum, - enableLastMsgSeqNumProcessed, validateChecksum, logonTags, heartBeatTimeoutMultiplier, allowPosDup); + enableLastMsgSeqNumProcessed, validateChecksum, logonTags, heartBeatTimeoutMultiplier, allowPosDup, + maxMessagesQueuedWhilePendingResend); } public Builder setBeginString(final String beginString) { diff --git a/quickfixj-core/src/test/java/quickfix/SessionStateTest.java b/quickfixj-core/src/test/java/quickfix/SessionStateTest.java index a19e48f949..6eebd40f92 100644 --- a/quickfixj-core/src/test/java/quickfix/SessionStateTest.java +++ b/quickfixj-core/src/test/java/quickfix/SessionStateTest.java @@ -23,8 +23,7 @@ import org.junit.Before; import org.junit.Test; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; public class SessionStateTest { @@ -41,10 +40,26 @@ public void tearDown() { SystemTime.setTimeSource(null); } + @Test + public void testMaxMessagesPendingResend() { + SessionState state = new SessionState(new Object(), null, 0, false, null, + Session.DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, 1); + Message message1 = new Message(); + message1.getHeader().setString(35, "D"); + Message message2 = new Message(); + message2.getHeader().setString(35, "D"); + state.enqueue(1, message1); + state.enqueue(2, message2); + assertNull(state.dequeue(2)); + state.dequeue(1); + state.enqueue(2, message2); + assertNotNull(state.dequeue(2)); + } + @Test public void testTimeoutDefaultsAreNonzero() { SessionState state = new SessionState(new Object(), null, 0, false, null, - Session.DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER); + Session.DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, Session.DEFAULT_MAX_MESSAGES_PENDING_RESEND); state.setLastReceivedTime(900); assertFalse("logon timeout not init'ed", state.isLogonTimedOut()); @@ -56,7 +71,7 @@ public void testTimeoutDefaultsAreNonzero() { @Test public void testTestRequestTiming() { SessionState state = new SessionState(new Object(), null, 0, false, null, - Session.DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER); + Session.DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, Session.DEFAULT_MAX_MESSAGES_PENDING_RESEND); state.setLastReceivedTime(950); state.setHeartBeatInterval(50); assertFalse("testRequest shouldn't be needed yet", state.isTestRequestNeeded()); @@ -74,7 +89,7 @@ public void testTestRequestTiming() { public void testHeartbeatTiming() { // we set a HB interval of 2 seconds = 2000ms SessionState state = new SessionState(new Object(), null, 2 /* HB interval */, false, null, - Session.DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER); + Session.DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, Session.DEFAULT_MAX_MESSAGES_PENDING_RESEND); long now = System.currentTimeMillis(); timeSource.setSystemTimes(now); @@ -90,7 +105,7 @@ public void testHeartbeatTiming() { @Test public void testSessionTimeout() { SessionState state = new SessionState(new Object(), null, 30, false, null, - Session.DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER); + Session.DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, Session.DEFAULT_MAX_MESSAGES_PENDING_RESEND); // session should timeout after 2.4 * 30 = 72 seconds state.setLastReceivedTime(950_000); diff --git a/quickfixj-core/src/test/java/quickfix/SessionTest.java b/quickfixj-core/src/test/java/quickfix/SessionTest.java index 3293c932d7..9509cee094 100644 --- a/quickfixj-core/src/test/java/quickfix/SessionTest.java +++ b/quickfixj-core/src/test/java/quickfix/SessionTest.java @@ -101,7 +101,7 @@ public void testDisposalOfFileResources() throws Exception { new DefaultMessageFactory(), 30, false, 30, UtcTimestampPrecision.MILLIS, true, false, false, false, false, false, true, false, 1.5, null, true, new int[] { 5 }, false, false, false, false, true, false, true, false, - null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false)) { + null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false, 2000)) { // Simulate socket disconnect session.setResponder(null); } @@ -141,7 +141,7 @@ public void testNondisposableFileResources() throws Exception { new DefaultMessageFactory(), 30, false, 30, UtcTimestampPrecision.MILLIS, true, false, false, false, false, false, true, false, 1.5, null, true, new int[] { 5 }, false, false, false, false, true, false, true, false, - null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false)) { + null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false, 2000)) { // Simulate socket disconnect session.setResponder(null); @@ -2102,8 +2102,7 @@ private void testSequenceResetGapFillWithChunkSize(int chunkSize) UtcTimestampPrecision.MILLIS, resetOnLogon, false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers, new int[] { 5 }, false, false, false, false, true, false, true, false, null, true, - chunkSize, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false)) { - + chunkSize, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false, 2000)) { UnitTestResponder responder = new UnitTestResponder(); session.setResponder(responder); final SessionState state = getSessionState(session); @@ -2164,7 +2163,7 @@ public void correct_sequence_number_for_last_gap_fill_if_next_sender_sequence_nu new DefaultMessageFactory(), 30, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon, false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers, new int[]{5}, false, false, false, false, true, false, true, false, null, true, 0, - false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false); + false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false, 2000); Responder mockResponder = mock(Responder.class); when(mockResponder.send(anyString())).thenReturn(true); @@ -2212,7 +2211,7 @@ public void correct_sequence_number_for_last_gap_fill_if_next_sender_sequence_nu new DefaultMessageFactory(), 30, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon, false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers, new int[]{5}, false, false, false, false, true, false, true, false, null, true, 0, - enableNextExpectedMsgSeqNum, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false); + enableNextExpectedMsgSeqNum, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false, 2000); Responder mockResponder = mock(Responder.class); when(mockResponder.send(anyString())).thenReturn(true); @@ -2261,7 +2260,7 @@ public void testMsgSeqNumTooHighWithDisconnectOnError() throws Exception { UtcTimestampPrecision.MILLIS, resetOnLogon, false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers, new int[] { 5 }, false, disconnectOnError, false, false, true, false, true, false, - null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false)) { + null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false, 2000)) { UnitTestResponder responder = new UnitTestResponder(); session.setResponder(responder); @@ -2297,7 +2296,8 @@ public void testTimestampPrecision() throws Exception { UtcTimestampPrecision.NANOS, resetOnLogon, false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers, new int[] { 5 }, false, disconnectOnError, false, false, true, false, true, false, - null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false)) { + null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false, + 2000)) { UnitTestResponder responder = new UnitTestResponder(); session.setResponder(responder); @@ -2349,7 +2349,7 @@ private void testLargeQueue(int N) throws Exception { new DefaultMessageFactory(), isInitiator ? 30 : 0, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon, false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers, new int[]{5}, false, false, false, false, true, false, true, false, null, true, 0, - false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false); + false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false, -1); UnitTestResponder responder = new UnitTestResponder(); session.setResponder(responder); @@ -2465,7 +2465,7 @@ public void fromAdmin(Message message, SessionID sessionId) throws FieldNotFound new DefaultMessageFactory(), isInitiator ? 30 : 0, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon, false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers, new int[]{5}, false, false, false, false, true, false, true, false, null, true, 0, - enableNextExpectedMsgSeqNum, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false); + enableNextExpectedMsgSeqNum, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false, 2000); UnitTestResponder responder = new UnitTestResponder(); session.setResponder(responder);