Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion logon_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (s logonState) FixMsgIn(session *session, msg *Message) (nextState sessionS
session.log.OnEvent(err.Text)
logout := session.buildLogout(err.Text)

if err := session.dropAndSendInReplyTo(logout, false, msg); err != nil {
if err := session.dropAndSendInReplyTo(logout, msg); err != nil {
session.logError(err)
}

Expand Down
39 changes: 20 additions & 19 deletions mongostore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package quickfix

import (
"fmt"
"time"

"github.com/globalsign/mgo"
"github.com/globalsign/mgo/bson"
"time"

"github.com/quickfixgo/quickfix/config"
)
Expand All @@ -18,7 +19,7 @@ type mongoStoreFactory struct {
type mongoStore struct {
sessionID SessionID
cache *memoryStore
mongoUrl string
mongoURL string
mongoDatabase string
db *mgo.Session
messagesCollection string
Expand All @@ -45,29 +46,29 @@ func (f mongoStoreFactory) Create(sessionID SessionID) (msgStore MessageStore, e
if !ok {
return nil, fmt.Errorf("unknown session: %v", sessionID)
}
mongoConnectionUrl, err := sessionSettings.Setting(config.MongoStoreConnection)
mongoConnectionURL, err := sessionSettings.Setting(config.MongoStoreConnection)
if err != nil {
return nil, err
}
mongoDatabase, err := sessionSettings.Setting(config.MongoStoreDatabase)
if err != nil {
return nil, err
}
return newMongoStore(sessionID, mongoConnectionUrl, mongoDatabase, f.messagesCollection, f.sessionsCollection)
return newMongoStore(sessionID, mongoConnectionURL, mongoDatabase, f.messagesCollection, f.sessionsCollection)
}

func newMongoStore(sessionID SessionID, mongoUrl string, mongoDatabase string, messagesCollection string, sessionsCollection string) (store *mongoStore, err error) {
func newMongoStore(sessionID SessionID, mongoURL string, mongoDatabase string, messagesCollection string, sessionsCollection string) (store *mongoStore, err error) {
store = &mongoStore{
sessionID: sessionID,
cache: &memoryStore{},
mongoUrl: mongoUrl,
mongoURL: mongoURL,
mongoDatabase: mongoDatabase,
messagesCollection: messagesCollection,
sessionsCollection: sessionsCollection,
}
store.cache.Reset()

if store.db, err = mgo.Dial(mongoUrl); err != nil {
if store.db, err = mgo.Dial(mongoURL); err != nil {
return
}
err = store.populateCache()
Expand All @@ -79,12 +80,12 @@ func generateMessageFilter(s *SessionID) (messageFilter *mongoQuickFixEntryData)
messageFilter = &mongoQuickFixEntryData{
BeginString: s.BeginString,
SessionQualifier: s.Qualifier,
SenderCompId: s.SenderCompID,
SenderSubId: s.SenderSubID,
SenderLocId: s.SenderLocationID,
TargetCompId: s.TargetCompID,
TargetSubId: s.TargetSubID,
TargetLocId: s.TargetLocationID,
SenderCompID: s.SenderCompID,
SenderSubID: s.SenderSubID,
SenderLocID: s.SenderLocationID,
TargetCompID: s.TargetCompID,
TargetSubID: s.TargetSubID,
TargetLocID: s.TargetLocationID,
}
return
}
Expand All @@ -100,12 +101,12 @@ type mongoQuickFixEntryData struct {
//Indexed data
BeginString string `bson:"begin_string"`
SessionQualifier string `bson:"session_qualifier"`
SenderCompId string `bson:"sender_comp_id"`
SenderSubId string `bson:"sender_sub_id"`
SenderLocId string `bson:"sender_loc_id"`
TargetCompId string `bson:"target_comp_id"`
TargetSubId string `bson:"target_sub_id"`
TargetLocId string `bson:"target_loc_id"`
SenderCompID string `bson:"sender_comp_id"`
SenderSubID string `bson:"sender_sub_id"`
SenderLocID string `bson:"sender_loc_id"`
TargetCompID string `bson:"target_comp_id"`
TargetSubID string `bson:"target_sub_id"`
TargetLocID string `bson:"target_loc_id"`
}

// Reset deletes the store records and sets the seqnums back to 1
Expand Down
34 changes: 18 additions & 16 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,20 @@ func (s *session) fillDefaultHeader(msg *Message, inReplyTo *Message) {
}
}

func (s *session) sendLogon(resetStore, setResetSeqNum bool) error {
return s.sendLogonInReplyTo(resetStore, setResetSeqNum, nil)
func (s *session) shouldSendReset() bool {
if s.sessionID.BeginString < BeginStringFIX41 {
return false
}

return (s.ResetOnLogon || s.ResetOnDisconnect || s.ResetOnLogout) &&
s.store.NextTargetMsgSeqNum() == 1 && s.store.NextSenderMsgSeqNum() == 1
}

func (s *session) sendLogon() error {
return s.sendLogonInReplyTo(s.shouldSendReset(), nil)
}

func (s *session) sendLogonInReplyTo(resetStore, setResetSeqNum bool, inReplyTo *Message) error {
func (s *session) sendLogonInReplyTo(setResetSeqNum bool, inReplyTo *Message) error {
logon := NewMessage()
logon.Header.SetField(tagMsgType, FIXString("A"))
logon.Header.SetField(tagBeginString, FIXString(s.sessionID.BeginString))
Expand All @@ -154,7 +163,7 @@ func (s *session) sendLogonInReplyTo(resetStore, setResetSeqNum bool, inReplyTo
logon.Body.SetField(tagDefaultApplVerID, FIXString(s.DefaultApplVerID))
}

if err := s.dropAndSendInReplyTo(logon, resetStore, inReplyTo); err != nil {
if err := s.dropAndSendInReplyTo(logon, inReplyTo); err != nil {
return err
}

Expand Down Expand Up @@ -248,20 +257,14 @@ func (s *session) dropAndReset() error {
return s.store.Reset()
}

//dropAndSend will optionally reset the store, validate and persist the message, then drops the send queue and sends the message.
func (s *session) dropAndSend(msg *Message, resetStore bool) error {
return s.dropAndSendInReplyTo(msg, resetStore, nil)
//dropAndSend will validate and persist the message, then drops the send queue and sends the message.
func (s *session) dropAndSend(msg *Message) error {
return s.dropAndSendInReplyTo(msg, nil)
}
func (s *session) dropAndSendInReplyTo(msg *Message, resetStore bool, inReplyTo *Message) error {
func (s *session) dropAndSendInReplyTo(msg *Message, inReplyTo *Message) error {
s.sendMutex.Lock()
defer s.sendMutex.Unlock()

if resetStore {
if err := s.store.Reset(); err != nil {
return err
}
}

msgBytes, err := s.prepMessageForSend(msg, inReplyTo)
if err != nil {
return err
Expand Down Expand Up @@ -304,7 +307,6 @@ func (s *session) prepMessageForSend(msg *Message, inReplyTo *Message) (msgBytes
seqNum = s.store.NextSenderMsgSeqNum()
msg.Header.SetField(tagMsgSeqNum, FIXInt(seqNum))
}

}
} else {
if err = s.application.ToApp(msg, s.sessionID); err != nil {
Expand Down Expand Up @@ -437,7 +439,7 @@ func (s *session) handleLogon(msg *Message) error {
}

s.log.OnEvent("Responding to logon request")
if err := s.sendLogonInReplyTo(resetStore, resetSeqNumFlag.Bool(), msg); err != nil {
if err := s.sendLogonInReplyTo(resetSeqNumFlag.Bool(), msg); err != nil {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion session_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (sm *stateMachine) Connect(session *session) {
}

session.log.OnEvent("Sending logon request")
if err := session.sendLogon(false, false); err != nil {
if err := session.sendLogon(); err != nil {
session.logError(err)
return
}
Expand Down
62 changes: 59 additions & 3 deletions session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,61 @@ func (s *SessionSuite) TestCheckTargetTooLow() {
s.Nil(s.session.checkTargetTooLow(msg))
}

func (s *SessionSuite) TestShouldSendReset() {
var tests = []struct {
BeginString string
ResetOnLogon bool
ResetOnDisconnect bool
ResetOnLogout bool
NextSenderMsgSeqNum int
NextTargetMsgSeqNum int
Expected bool
}{
{BeginStringFIX40, true, false, false, 1, 1, false}, //ResetSeqNumFlag not available < fix41

{BeginStringFIX41, true, false, false, 1, 1, true}, //session must be configured to reset on logon
{BeginStringFIX42, true, false, false, 1, 1, true},
{BeginStringFIX43, true, false, false, 1, 1, true},
{BeginStringFIX44, true, false, false, 1, 1, true},
{BeginStringFIXT11, true, false, false, 1, 1, true},

{BeginStringFIX41, false, true, false, 1, 1, true}, //or disconnect
{BeginStringFIX42, false, true, false, 1, 1, true},
{BeginStringFIX43, false, true, false, 1, 1, true},
{BeginStringFIX44, false, true, false, 1, 1, true},
{BeginStringFIXT11, false, true, false, 1, 1, true},

{BeginStringFIX41, false, false, true, 1, 1, true}, //or logout
{BeginStringFIX42, false, false, true, 1, 1, true},
{BeginStringFIX43, false, false, true, 1, 1, true},
{BeginStringFIX44, false, false, true, 1, 1, true},
{BeginStringFIXT11, false, false, true, 1, 1, true},

{BeginStringFIX41, true, true, false, 1, 1, true}, //or combo
{BeginStringFIX42, false, true, true, 1, 1, true},
{BeginStringFIX43, true, false, true, 1, 1, true},
{BeginStringFIX44, true, true, true, 1, 1, true},

{BeginStringFIX41, false, false, false, 1, 1, false}, //or will not be set

{BeginStringFIX41, true, false, false, 1, 10, false}, //session seq numbers should be reset at the time of check
{BeginStringFIX42, true, false, false, 2, 1, false},
{BeginStringFIX43, true, false, false, 14, 100, false},
}

for _, test := range tests {
s.session.sessionID.BeginString = test.BeginString
s.session.ResetOnLogon = test.ResetOnLogon
s.session.ResetOnDisconnect = test.ResetOnDisconnect
s.session.ResetOnLogout = test.ResetOnLogout

s.MockStore.SetNextSenderMsgSeqNum(test.NextSenderMsgSeqNum)
s.MockStore.SetNextTargetMsgSeqNum(test.NextTargetMsgSeqNum)

s.Equal(s.shouldSendReset(), test.Expected)
}
}

func (s *SessionSuite) TestCheckSessionTimeNoStartTimeEndTime() {
var tests = []struct {
before, after sessionState
Expand Down Expand Up @@ -851,7 +906,7 @@ func (suite *SessionSendTestSuite) TestSendDisableMessagePersist() {

func (suite *SessionSendTestSuite) TestDropAndSendAdminMessage() {
suite.MockApp.On("ToAdmin")
suite.Require().Nil(suite.dropAndSend(suite.Heartbeat(), false))
suite.Require().Nil(suite.dropAndSend(suite.Heartbeat()))
suite.MockApp.AssertExpectations(suite.T())

suite.MessagePersisted(suite.MockApp.lastToAdmin)
Expand All @@ -868,7 +923,7 @@ func (suite *SessionSendTestSuite) TestDropAndSendDropsQueue() {
suite.NoMessageSent()

suite.MockApp.On("ToAdmin")
require.Nil(suite.T(), suite.dropAndSend(suite.Logon(), false))
require.Nil(suite.T(), suite.dropAndSend(suite.Logon()))
suite.MockApp.AssertExpectations(suite.T())

msg := suite.MockApp.lastToAdmin
Expand All @@ -889,7 +944,8 @@ func (suite *SessionSendTestSuite) TestDropAndSendDropsQueueWithReset() {
suite.NoMessageSent()

suite.MockApp.On("ToAdmin")
require.Nil(suite.T(), suite.dropAndSend(suite.Logon(), true))
suite.MockStore.Reset()
require.Nil(suite.T(), suite.dropAndSend(suite.Logon()))
suite.MockApp.AssertExpectations(suite.T())
msg := suite.MockApp.lastToAdmin

Expand Down