Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions quickfixj-core/src/main/java/quickfix/DefaultSessionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class DefaultSessionFactory implements SessionFactory {

private final Application application;
private final MessageStoreFactory messageStoreFactory;
private final MessageQueueFactory messageQueueFactory;
private final LogFactory logFactory;
private final MessageFactory messageFactory;
private final SessionScheduleFactory sessionScheduleFactory;
Expand All @@ -55,6 +56,7 @@ public DefaultSessionFactory(Application application, MessageStoreFactory messag
LogFactory logFactory) {
this.application = application;
this.messageStoreFactory = messageStoreFactory;
this.messageQueueFactory = new InMemoryMessageQueueFactory();
this.logFactory = logFactory;
this.messageFactory = new DefaultMessageFactory();
this.sessionScheduleFactory = new DefaultSessionScheduleFactory();
Expand All @@ -64,6 +66,7 @@ public DefaultSessionFactory(Application application, MessageStoreFactory messag
LogFactory logFactory, MessageFactory messageFactory) {
this.application = application;
this.messageStoreFactory = messageStoreFactory;
this.messageQueueFactory = new InMemoryMessageQueueFactory();
this.logFactory = logFactory;
this.messageFactory = messageFactory;
this.sessionScheduleFactory = new DefaultSessionScheduleFactory();
Expand All @@ -74,6 +77,18 @@ public DefaultSessionFactory(Application application, MessageStoreFactory messag
SessionScheduleFactory sessionScheduleFactory) {
this.application = application;
this.messageStoreFactory = messageStoreFactory;
this.messageQueueFactory = new InMemoryMessageQueueFactory();
this.logFactory = logFactory;
this.messageFactory = messageFactory;
this.sessionScheduleFactory = sessionScheduleFactory;
}

public DefaultSessionFactory(Application application, MessageStoreFactory messageStoreFactory,
MessageQueueFactory messageQueueFactory, LogFactory logFactory,
MessageFactory messageFactory, SessionScheduleFactory sessionScheduleFactory) {
this.application = application;
this.messageStoreFactory = messageStoreFactory;
this.messageQueueFactory = messageQueueFactory;
this.logFactory = logFactory;
this.messageFactory = messageFactory;
this.sessionScheduleFactory = sessionScheduleFactory;
Expand Down Expand Up @@ -222,8 +237,8 @@ public Session create(SessionID sessionID, SessionSettings settings) throws Conf

final List<StringField> logonTags = getLogonTags(settings, sessionID);

final Session session = new Session(application, messageStoreFactory, sessionID,
dataDictionaryProvider, sessionSchedule, logFactory,
final Session session = new Session(application, messageStoreFactory, messageQueueFactory,
sessionID, dataDictionaryProvider, sessionSchedule, logFactory,
messageFactory, heartbeatInterval, checkLatency, maxLatency, timestampPrecision,
resetOnLogon, resetOnLogout, resetOnDisconnect, refreshOnLogon, checkCompID,
redundantResentRequestAllowed, persistMessages, useClosedIntervalForResend,
Expand Down
55 changes: 55 additions & 0 deletions quickfixj-core/src/main/java/quickfix/InMemoryMessageQueue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*******************************************************************************
* Copyright (c) quickfixengine.org All rights reserved.
*
* This file is part of the QuickFIX FIX Engine
*
* This file may be distributed under the terms of the quickfixengine.org
* license as defined by quickfixengine.org and appearing in the file
* LICENSE included in the packaging of this file.
*
* This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING
* THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE.
*
* See http://www.quickfixengine.org/LICENSE for licensing information.
*
* Contact ask@quickfixengine.org if any conditions of this licensing
* are not clear to you.
******************************************************************************/

package quickfix;

import java.util.LinkedHashMap;
import java.util.Map;

/**
* An in-memory implementation of MessageQueue.
* It uses a linked hash map as a backing map.
*
* @see MessageQueue
*/
public class InMemoryMessageQueue implements MessageQueue {

// The map should be accessed from a single thread
private final Map<Integer, Message> backingMap = new LinkedHashMap<>();

@Override
public void enqueue(int sequence, Message message) {
backingMap.put(sequence, message);
}

@Override
public Message dequeue(int sequence) {
return backingMap.remove(sequence);
}

@Override
public void clear() {
backingMap.clear();
}

// used in tests
Map<Integer, Message> getBackingMap() {
return backingMap;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*******************************************************************************
* Copyright (c) quickfixengine.org All rights reserved.
*
* This file is part of the QuickFIX FIX Engine
*
* This file may be distributed under the terms of the quickfixengine.org
* license as defined by quickfixengine.org and appearing in the file
* LICENSE included in the packaging of this file.
*
* This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING
* THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE.
*
* See http://www.quickfixengine.org/LICENSE for licensing information.
*
* Contact ask@quickfixengine.org if any conditions of this licensing
* are not clear to you.
******************************************************************************/

package quickfix;

/**
* Creates a message queue that stores all messages in memory.
*
* @see MessageQueue
*/
public class InMemoryMessageQueueFactory implements MessageQueueFactory {

@Override
public MessageQueue create(SessionID sessionID) {
return new InMemoryMessageQueue();
}
}
60 changes: 60 additions & 0 deletions quickfixj-core/src/main/java/quickfix/MessageQueue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*******************************************************************************
* Copyright (c) quickfixengine.org All rights reserved.
*
* This file is part of the QuickFIX FIX Engine
*
* This file may be distributed under the terms of the quickfixengine.org
* license as defined by quickfixengine.org and appearing in the file
* LICENSE included in the packaging of this file.
*
* This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING
* THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE.
*
* See http://www.quickfixengine.org/LICENSE for licensing information.
*
* Contact ask@quickfixengine.org if any conditions of this licensing
* are not clear to you.
******************************************************************************/

package quickfix;

/**
* Used by a Session to store and retrieve messages with a sequence number higher than expected.
*
* @see quickfix.Session
*/
interface MessageQueue {

/**
* Enqueue a message.
*
* @param sequence the sequence number
* @param message the FIX message
*/
void enqueue(int sequence, Message message);

/**
* Dequeue a message with given sequence number.
*
* @param sequence the sequence number
* @return message the FIX message
*/
Message dequeue(int sequence);

/**
* Remove messages from queue up to a given sequence number.
*
* @param seqnum up to which sequence number messages should be deleted
*/
default void dequeueMessagesUpTo(int seqnum) {
for (int i = 1; i < seqnum; i++) {
dequeue(i);
}
}

/**
* Clear the queue.
*/
void clear();
}
36 changes: 36 additions & 0 deletions quickfixj-core/src/main/java/quickfix/MessageQueueFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*******************************************************************************
* Copyright (c) quickfixengine.org All rights reserved.
*
* This file is part of the QuickFIX FIX Engine
*
* This file may be distributed under the terms of the quickfixengine.org
* license as defined by quickfixengine.org and appearing in the file
* LICENSE included in the packaging of this file.
*
* This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING
* THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE.
*
* See http://www.quickfixengine.org/LICENSE for licensing information.
*
* Contact ask@quickfixengine.org if any conditions of this licensing
* are not clear to you.
******************************************************************************/

package quickfix;

/**
* Used by a Session to create a message queue implementation.
*
* @see Session
*/
public interface MessageQueueFactory {

/**
* Creates a message queue implementation.
*
* @param sessionID the session ID, often used to access session configurations
* @return the message queue implementation
*/
MessageQueue create(SessionID sessionID);
}
39 changes: 34 additions & 5 deletions quickfixj-core/src/main/java/quickfix/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,30 @@ public class Session implements Closeable {
boolean enableNextExpectedMsgSeqNum, boolean enableLastMsgSeqNumProcessed,
boolean validateChecksum, List<StringField> logonTags, double heartBeatTimeoutMultiplier,
boolean allowPossDup) {
this(application, messageStoreFactory, new InMemoryMessageQueueFactory(), sessionID, dataDictionaryProvider, sessionSchedule, logFactory,
messageFactory, heartbeatInterval, true, DEFAULT_MAX_LATENCY, UtcTimestampPrecision.MILLIS, false, false,
false, false, true, false, true, false, DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, null, true, new int[] {5},
false, false, false, false, true, false, true, false, null, true, DEFAULT_RESEND_RANGE_CHUNK_SIZE, false,
false, false, new ArrayList<StringField>(), DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, allowPossDup);
}

Session(Application application, MessageStoreFactory messageStoreFactory, MessageQueueFactory messageQueueFactory,
SessionID sessionID, DataDictionaryProvider dataDictionaryProvider, SessionSchedule sessionSchedule,
LogFactory logFactory, MessageFactory messageFactory, int heartbeatInterval,
boolean checkLatency, int maxLatency, UtcTimestampPrecision timestampPrecision,
boolean resetOnLogon, boolean resetOnLogout, boolean resetOnDisconnect,
boolean refreshOnLogon, boolean checkCompID,
boolean redundantResentRequestsAllowed, boolean persistMessages,
boolean useClosedRangeForResend, double testRequestDelayMultiplier,
DefaultApplVerID senderDefaultApplVerID, boolean validateSequenceNumbers,
int[] logonIntervals, boolean resetOnError, boolean disconnectOnError,
boolean disableHeartBeatCheck, boolean rejectGarbledMessage, boolean rejectInvalidMessage,
boolean rejectMessageOnUnhandledException, boolean requiresOrigSendingTime,
boolean forceResendWhenCorruptedStore, Set<InetAddress> allowedRemoteAddresses,
boolean validateIncomingMessage, int resendRequestChunkSize,
boolean enableNextExpectedMsgSeqNum, boolean enableLastMsgSeqNumProcessed,
boolean validateChecksum, List<StringField> logonTags, double heartBeatTimeoutMultiplier,
boolean allowPossDup) {
this.application = application;
this.sessionID = sessionID;
this.sessionSchedule = sessionSchedule;
Expand Down Expand Up @@ -543,8 +567,13 @@ public class Session implements Closeable {
addStateListener((SessionStateListener) messageStore);
}

final MessageQueue messageQueue = messageQueueFactory.create(sessionID);
if (messageQueue instanceof SessionStateListener) {
addStateListener((SessionStateListener) messageQueue);
}

state = new SessionState(this, engineLog, heartbeatInterval, heartbeatInterval != 0,
messageStore, testRequestDelayMultiplier, heartBeatTimeoutMultiplier);
messageStore, messageQueue, testRequestDelayMultiplier, heartBeatTimeoutMultiplier);

registerSession(this);

Expand Down Expand Up @@ -1533,7 +1562,7 @@ private void nextSequenceReset(Message sequenceReset) throws IOException, Reject
}
// QFJ-728: newSequence will be the seqnum of the next message so we
// delete all older messages from the queue since they are effectively skipped.
state.dequeueMessagesUpTo(newSequence);
state.getMessageQueue().dequeueMessagesUpTo(newSequence);
} else if (newSequence < getExpectedTargetNum()) {

getLog().onErrorEvent(
Expand Down Expand Up @@ -2107,7 +2136,7 @@ public void disconnect(String reason, boolean logError) throws IOException {
state.setLogoutReceived(false);
state.setResetReceived(false);
state.setResetSent(false);
state.clearQueue();
state.getMessageQueue().clear();
state.clearLogoutReason();
state.setResendRange(0, 0);

Expand Down Expand Up @@ -2399,7 +2428,7 @@ private void nextQueued() throws FieldNotFound, RejectLogon, IncorrectDataFormat

private boolean nextQueued(int num) throws FieldNotFound, RejectLogon, IncorrectDataFormat,
IncorrectTagValue, UnsupportedMessageType, IOException, InvalidMessage {
final Message msg = state.dequeue(num);
final Message msg = state.getMessageQueue().dequeue(num);
if (msg != null) {
getLog().onEvent("Processing queued message: " + num);

Expand Down Expand Up @@ -2668,7 +2697,7 @@ private boolean sendRaw(Message message, int num) {
}

private void enqueueMessage(final Message msg, final int msgSeqNum) {
state.enqueue(msgSeqNum, msg);
state.getMessageQueue().enqueue(msgSeqNum, msg);
getLog().onEvent("Enqueued at pos " + msgSeqNum + ": " + msg);
}

Expand Down
Loading