Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ public Session create(SessionID sessionID, SessionSettings settings) throws Conf
final SessionSchedule sessionSchedule = sessionScheduleFactory.create(sessionID, settings);

final List<StringField> logonTags = getLogonTags(settings, sessionID);
final int maxMessagesQueuedWhilePendingResend = getSetting(settings, sessionID, Session.SETTING_MAX_MESSAGES_QUEUED_WHILE_PENDING_RESEND, 2000);

final Session session = new Session(application, messageStoreFactory, sessionID,
dataDictionaryProvider, sessionSchedule, logFactory,
Expand All @@ -232,7 +233,7 @@ public Session create(SessionID sessionID, SessionSettings settings) throws Conf
rejectInvalidMessage, rejectMessageOnUnhandledException, requiresOrigSendingTime,
forceResendWhenCorruptedStore, allowedRemoteAddresses, validateIncomingMessage,
resendRequestChunkSize, enableNextExpectedMsgSeqNum, enableLastMsgSeqNumProcessed,
validateChecksum, logonTags, heartBeatTimeoutMultiplier, allowPossDup);
validateChecksum, logonTags, heartBeatTimeoutMultiplier, allowPossDup, maxMessagesQueuedWhilePendingResend);

session.setLogonTimeout(logonTimeout);
session.setLogoutTimeout(logoutTimeout);
Expand Down
10 changes: 7 additions & 3 deletions quickfixj-core/src/main/java/quickfix/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,8 @@ public class Session implements Closeable {
*/
public static final String SETTING_ALLOW_POS_DUP_MESSAGES = "AllowPosDup";

public static final String SETTING_MAX_MESSAGES_QUEUED_WHILE_PENDING_RESEND = "MaxMessagesQueuedWhilePendingResend";

private static final ConcurrentMap<SessionID, Session> sessions = new ConcurrentHashMap<>();

private final Application application;
Expand Down Expand Up @@ -458,6 +460,7 @@ public class Session implements Closeable {
public static final double DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER = 0.5;
public static final double DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER = 1.4;
private static final String ENCOUNTERED_END_OF_STREAM = "Encountered END_OF_STREAM";
public static final int DEFAULT_MAX_MESSAGES_PENDING_RESEND = 2000;


private static final int BAD_COMPID_REJ_REASON = SessionRejectReason.COMPID_PROBLEM;
Expand All @@ -477,7 +480,7 @@ public class Session implements Closeable {
messageFactory, heartbeatInterval, true, DEFAULT_MAX_LATENCY, UtcTimestampPrecision.MILLIS, false, false,
false, false, true, false, true, false, DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, null, true, new int[] {5},
false, false, false, false, true, false, true, false, null, true, DEFAULT_RESEND_RANGE_CHUNK_SIZE, false,
false, false, new ArrayList<StringField>(), DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false);
false, false, new ArrayList<StringField>(), DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false, DEFAULT_MAX_MESSAGES_PENDING_RESEND);
}

Session(Application application, MessageStoreFactory messageStoreFactory, SessionID sessionID,
Expand All @@ -496,7 +499,8 @@ public class Session implements Closeable {
boolean validateIncomingMessage, int resendRequestChunkSize,
boolean enableNextExpectedMsgSeqNum, boolean enableLastMsgSeqNumProcessed,
boolean validateChecksum, List<StringField> logonTags, double heartBeatTimeoutMultiplier,
boolean allowPossDup) {
boolean allowPossDup,
int maxMessagesQueuedWhilePendingResend) {
this.application = application;
this.sessionID = sessionID;
this.sessionSchedule = sessionSchedule;
Expand Down Expand Up @@ -544,7 +548,7 @@ public class Session implements Closeable {
}

state = new SessionState(this, engineLog, heartbeatInterval, heartbeatInterval != 0,
messageStore, testRequestDelayMultiplier, heartBeatTimeoutMultiplier);
messageStore, testRequestDelayMultiplier, heartBeatTimeoutMultiplier, maxMessagesQueuedWhilePendingResend);

registerSession(this);

Expand Down
8 changes: 6 additions & 2 deletions quickfixj-core/src/main/java/quickfix/SessionState.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public final class SessionState {
private final double heartBeatTimeoutMultiplier;
private long heartBeatMillis = Long.MAX_VALUE;
private int heartBeatInterval;
private final int maxMessagesQueuedWhilePendingResend;

private final ResendRange resendRange = new ResendRange();
private boolean resetSent;
Expand All @@ -83,14 +84,15 @@ public final class SessionState {
private final Map<Integer, Message> messageQueue = new LinkedHashMap<>();

public SessionState(Object lock, Log log, int heartBeatInterval, boolean initiator, MessageStore messageStore,
double testRequestDelayMultiplier, double heartBeatTimeoutMultiplier) {
double testRequestDelayMultiplier, double heartBeatTimeoutMultiplier, int maxMessagesQueuedWhilePendingResend) {
this.lock = lock;
this.initiator = initiator;
this.messageStore = messageStore;
setHeartBeatInterval(heartBeatInterval);
this.log = log == null ? new NullLog() : log;
this.testRequestDelayMultiplier = testRequestDelayMultiplier;
this.heartBeatTimeoutMultiplier = heartBeatTimeoutMultiplier;
this.maxMessagesQueuedWhilePendingResend = maxMessagesQueuedWhilePendingResend;
}

public int getHeartBeatInterval() {
Expand Down Expand Up @@ -306,7 +308,9 @@ public void get(int first, int last, Collection<String> messages) throws IOExcep
}

public void enqueue(int sequence, Message message) {
messageQueue.put(sequence, message);
if (messageQueue.size() < maxMessagesQueuedWhilePendingResend || maxMessagesQueuedWhilePendingResend == -1) {
messageQueue.put(sequence, message);
}
}

public Message dequeue(int sequence) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public static final class Builder {
private final boolean validateChecksum = true;
private final boolean allowPosDup = false;
private List<StringField> logonTags = new ArrayList<>();
private final int maxMessagesQueuedWhilePendingResend = 2000;

public Session build() {
return new Session(applicationSupplier.get(), messageStoreFactorySupplier.get(), sessionIDSupplier.get(),
Expand All @@ -124,7 +125,8 @@ public Session build() {
resetOnError, disconnectOnError, disableHeartBeatCheck, false, rejectInvalidMessage,
rejectMessageOnUnhandledException, requiresOrigSendingTime, forceResendWhenCorruptedStore,
allowedRemoteAddresses, validateIncomingMessage, resendRequestChunkSize, enableNextExpectedMsgSeqNum,
enableLastMsgSeqNumProcessed, validateChecksum, logonTags, heartBeatTimeoutMultiplier, allowPosDup);
enableLastMsgSeqNumProcessed, validateChecksum, logonTags, heartBeatTimeoutMultiplier, allowPosDup,
maxMessagesQueuedWhilePendingResend);
}

public Builder setBeginString(final String beginString) {
Expand Down
27 changes: 21 additions & 6 deletions quickfixj-core/src/test/java/quickfix/SessionStateTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;

public class SessionStateTest {

Expand All @@ -41,10 +40,26 @@ public void tearDown() {
SystemTime.setTimeSource(null);
}

@Test
public void testMaxMessagesPendingResend() {
SessionState state = new SessionState(new Object(), null, 0, false, null,
Session.DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, 1);
Message message1 = new Message();
message1.getHeader().setString(35, "D");
Message message2 = new Message();
message2.getHeader().setString(35, "D");
state.enqueue(1, message1);
state.enqueue(2, message2);
assertNull(state.dequeue(2));
state.dequeue(1);
state.enqueue(2, message2);
assertNotNull(state.dequeue(2));
}

@Test
public void testTimeoutDefaultsAreNonzero() {
SessionState state = new SessionState(new Object(), null, 0, false, null,
Session.DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER);
Session.DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, Session.DEFAULT_MAX_MESSAGES_PENDING_RESEND);
state.setLastReceivedTime(900);
assertFalse("logon timeout not init'ed", state.isLogonTimedOut());

Expand All @@ -56,7 +71,7 @@ public void testTimeoutDefaultsAreNonzero() {
@Test
public void testTestRequestTiming() {
SessionState state = new SessionState(new Object(), null, 0, false, null,
Session.DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER);
Session.DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, Session.DEFAULT_MAX_MESSAGES_PENDING_RESEND);
state.setLastReceivedTime(950);
state.setHeartBeatInterval(50);
assertFalse("testRequest shouldn't be needed yet", state.isTestRequestNeeded());
Expand All @@ -74,7 +89,7 @@ public void testTestRequestTiming() {
public void testHeartbeatTiming() {
// we set a HB interval of 2 seconds = 2000ms
SessionState state = new SessionState(new Object(), null, 2 /* HB interval */, false, null,
Session.DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER);
Session.DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, Session.DEFAULT_MAX_MESSAGES_PENDING_RESEND);

long now = System.currentTimeMillis();
timeSource.setSystemTimes(now);
Expand All @@ -90,7 +105,7 @@ public void testHeartbeatTiming() {
@Test
public void testSessionTimeout() {
SessionState state = new SessionState(new Object(), null, 30, false, null,
Session.DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER);
Session.DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, Session.DEFAULT_MAX_MESSAGES_PENDING_RESEND);

// session should timeout after 2.4 * 30 = 72 seconds
state.setLastReceivedTime(950_000);
Expand Down
20 changes: 10 additions & 10 deletions quickfixj-core/src/test/java/quickfix/SessionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void testDisposalOfFileResources() throws Exception {
new DefaultMessageFactory(), 30, false, 30, UtcTimestampPrecision.MILLIS, true, false,
false, false, false, false, true, false, 1.5, null, true,
new int[] { 5 }, false, false, false, false, true, false, true, false,
null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false)) {
null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false, 2000)) {
// Simulate socket disconnect
session.setResponder(null);
}
Expand Down Expand Up @@ -141,7 +141,7 @@ public void testNondisposableFileResources() throws Exception {
new DefaultMessageFactory(), 30, false, 30, UtcTimestampPrecision.MILLIS, true, false,
false, false, false, false, true, false, 1.5, null, true,
new int[] { 5 }, false, false, false, false, true, false, true, false,
null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false)) {
null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false, 2000)) {
// Simulate socket disconnect
session.setResponder(null);

Expand Down Expand Up @@ -2102,8 +2102,7 @@ private void testSequenceResetGapFillWithChunkSize(int chunkSize)
UtcTimestampPrecision.MILLIS, resetOnLogon, false, false, false, false, false, true,
false, 1.5, null, validateSequenceNumbers, new int[] { 5 },
false, false, false, false, true, false, true, false, null, true,
chunkSize, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false)) {

chunkSize, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false, 2000)) {
UnitTestResponder responder = new UnitTestResponder();
session.setResponder(responder);
final SessionState state = getSessionState(session);
Expand Down Expand Up @@ -2164,7 +2163,7 @@ public void correct_sequence_number_for_last_gap_fill_if_next_sender_sequence_nu
new DefaultMessageFactory(), 30, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon,
false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers,
new int[]{5}, false, false, false, false, true, false, true, false, null, true, 0,
false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false);
false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false, 2000);

Responder mockResponder = mock(Responder.class);
when(mockResponder.send(anyString())).thenReturn(true);
Expand Down Expand Up @@ -2212,7 +2211,7 @@ public void correct_sequence_number_for_last_gap_fill_if_next_sender_sequence_nu
new DefaultMessageFactory(), 30, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon,
false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers,
new int[]{5}, false, false, false, false, true, false, true, false, null, true, 0,
enableNextExpectedMsgSeqNum, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false);
enableNextExpectedMsgSeqNum, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false, 2000);

Responder mockResponder = mock(Responder.class);
when(mockResponder.send(anyString())).thenReturn(true);
Expand Down Expand Up @@ -2261,7 +2260,7 @@ public void testMsgSeqNumTooHighWithDisconnectOnError() throws Exception {
UtcTimestampPrecision.MILLIS, resetOnLogon, false, false, false, false, false, true,
false, 1.5, null, validateSequenceNumbers, new int[] { 5 },
false, disconnectOnError, false, false, true, false, true, false,
null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false)) {
null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false, 2000)) {

UnitTestResponder responder = new UnitTestResponder();
session.setResponder(responder);
Expand Down Expand Up @@ -2297,7 +2296,8 @@ public void testTimestampPrecision() throws Exception {
UtcTimestampPrecision.NANOS, resetOnLogon, false, false, false, false, false, true,
false, 1.5, null, validateSequenceNumbers, new int[] { 5 },
false, disconnectOnError, false, false, true, false, true, false,
null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false)) {
null, true, 0, false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false,
2000)) {

UnitTestResponder responder = new UnitTestResponder();
session.setResponder(responder);
Expand Down Expand Up @@ -2349,7 +2349,7 @@ private void testLargeQueue(int N) throws Exception {
new DefaultMessageFactory(), isInitiator ? 30 : 0, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon,
false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers,
new int[]{5}, false, false, false, false, true, false, true, false, null, true, 0,
false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false);
false, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false, -1);

UnitTestResponder responder = new UnitTestResponder();
session.setResponder(responder);
Expand Down Expand Up @@ -2465,7 +2465,7 @@ public void fromAdmin(Message message, SessionID sessionId) throws FieldNotFound
new DefaultMessageFactory(), isInitiator ? 30 : 0, false, 30, UtcTimestampPrecision.MILLIS, resetOnLogon,
false, false, false, false, false, true, false, 1.5, null, validateSequenceNumbers,
new int[]{5}, false, false, false, false, true, false, true, false, null, true, 0,
enableNextExpectedMsgSeqNum, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false);
enableNextExpectedMsgSeqNum, false, true, new ArrayList<>(), Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false, 2000);
UnitTestResponder responder = new UnitTestResponder();
session.setResponder(responder);

Expand Down