From 13a2c93aa65e3e6811318097c850a8421db696dd Mon Sep 17 00:00:00 2001 From: chris busbey Date: Tue, 15 Jan 2019 14:06:23 -0600 Subject: [PATCH 1/3] refactoring around session reset, do not send reset for < fix41 --- logon_state.go | 2 +- session.go | 35 ++++++++++++++++++----------------- session_state.go | 2 +- session_test.go | 26 +++++++++++++++++++++++--- 4 files changed, 43 insertions(+), 22 deletions(-) diff --git a/logon_state.go b/logon_state.go index fc7fffd9e..351fa629c 100644 --- a/logon_state.go +++ b/logon_state.go @@ -27,7 +27,7 @@ func (s logonState) FixMsgIn(session *session, msg *Message) (nextState sessionS session.log.OnEvent(err.Text) logout := session.buildLogout(err.Text) - if err := session.dropAndSendInReplyTo(logout, false, msg); err != nil { + if err := session.dropAndSendInReplyTo(logout, msg); err != nil { session.logError(err) } diff --git a/session.go b/session.go index e6dde986b..010760d75 100644 --- a/session.go +++ b/session.go @@ -133,11 +133,19 @@ func (s *session) fillDefaultHeader(msg *Message, inReplyTo *Message) { } } -func (s *session) sendLogon(resetStore, setResetSeqNum bool) error { - return s.sendLogonInReplyTo(resetStore, setResetSeqNum, nil) +func (s *session) shouldSendReset() bool { + if s.sessionID.BeginString < BeginStringFIX41 { + return false + } + + return true +} + +func (s *session) sendLogon(setResetSeqNum bool) error { + return s.sendLogonInReplyTo(setResetSeqNum, nil) } -func (s *session) sendLogonInReplyTo(resetStore, setResetSeqNum bool, inReplyTo *Message) error { +func (s *session) sendLogonInReplyTo(setResetSeqNum bool, inReplyTo *Message) error { logon := NewMessage() logon.Header.SetField(tagMsgType, FIXString("A")) logon.Header.SetField(tagBeginString, FIXString(s.sessionID.BeginString)) @@ -146,7 +154,7 @@ func (s *session) sendLogonInReplyTo(resetStore, setResetSeqNum bool, inReplyTo logon.Body.SetField(tagEncryptMethod, FIXString("0")) logon.Body.SetField(tagHeartBtInt, FIXInt(s.HeartBtInt.Seconds())) - if setResetSeqNum { + if s.shouldSendReset() && setResetSeqNum { logon.Body.SetField(tagResetSeqNumFlag, FIXBoolean(true)) } @@ -154,7 +162,7 @@ func (s *session) sendLogonInReplyTo(resetStore, setResetSeqNum bool, inReplyTo logon.Body.SetField(tagDefaultApplVerID, FIXString(s.DefaultApplVerID)) } - if err := s.dropAndSendInReplyTo(logon, resetStore, inReplyTo); err != nil { + if err := s.dropAndSendInReplyTo(logon, inReplyTo); err != nil { return err } @@ -248,20 +256,14 @@ func (s *session) dropAndReset() error { return s.store.Reset() } -//dropAndSend will optionally reset the store, validate and persist the message, then drops the send queue and sends the message. -func (s *session) dropAndSend(msg *Message, resetStore bool) error { - return s.dropAndSendInReplyTo(msg, resetStore, nil) +//dropAndSend will validate and persist the message, then drops the send queue and sends the message. +func (s *session) dropAndSend(msg *Message) error { + return s.dropAndSendInReplyTo(msg, nil) } -func (s *session) dropAndSendInReplyTo(msg *Message, resetStore bool, inReplyTo *Message) error { +func (s *session) dropAndSendInReplyTo(msg *Message, inReplyTo *Message) error { s.sendMutex.Lock() defer s.sendMutex.Unlock() - if resetStore { - if err := s.store.Reset(); err != nil { - return err - } - } - msgBytes, err := s.prepMessageForSend(msg, inReplyTo) if err != nil { return err @@ -304,7 +306,6 @@ func (s *session) prepMessageForSend(msg *Message, inReplyTo *Message) (msgBytes seqNum = s.store.NextSenderMsgSeqNum() msg.Header.SetField(tagMsgSeqNum, FIXInt(seqNum)) } - } } else { if err = s.application.ToApp(msg, s.sessionID); err != nil { @@ -437,7 +438,7 @@ func (s *session) handleLogon(msg *Message) error { } s.log.OnEvent("Responding to logon request") - if err := s.sendLogonInReplyTo(resetStore, resetSeqNumFlag.Bool(), msg); err != nil { + if err := s.sendLogonInReplyTo(resetSeqNumFlag.Bool(), msg); err != nil { return err } } diff --git a/session_state.go b/session_state.go index 0d8263fe3..59b5f9e72 100644 --- a/session_state.go +++ b/session_state.go @@ -37,7 +37,7 @@ func (sm *stateMachine) Connect(session *session) { } session.log.OnEvent("Sending logon request") - if err := session.sendLogon(false, false); err != nil { + if err := session.sendLogon(false); err != nil { session.logError(err) return } diff --git a/session_test.go b/session_test.go index bd2bd18fa..1f80f9ec3 100644 --- a/session_test.go +++ b/session_test.go @@ -228,6 +228,25 @@ func (s *SessionSuite) TestCheckTargetTooLow() { s.Nil(s.session.checkTargetTooLow(msg)) } +func (s *SessionSuite) TestShouldSendReset() { + var tests = []struct { + BeginString string + Expected bool + }{ + {BeginStringFIX40, false}, //ResetSeqNumFlag not available < fix41 + {BeginStringFIX41, true}, + {BeginStringFIX42, true}, + {BeginStringFIX43, true}, + {BeginStringFIX44, true}, + {BeginStringFIXT11, true}, + } + + for _, test := range tests { + s.session.sessionID.BeginString = test.BeginString + s.Equal(s.shouldSendReset(), test.Expected) + } +} + func (s *SessionSuite) TestCheckSessionTimeNoStartTimeEndTime() { var tests = []struct { before, after sessionState @@ -851,7 +870,7 @@ func (suite *SessionSendTestSuite) TestSendDisableMessagePersist() { func (suite *SessionSendTestSuite) TestDropAndSendAdminMessage() { suite.MockApp.On("ToAdmin") - suite.Require().Nil(suite.dropAndSend(suite.Heartbeat(), false)) + suite.Require().Nil(suite.dropAndSend(suite.Heartbeat())) suite.MockApp.AssertExpectations(suite.T()) suite.MessagePersisted(suite.MockApp.lastToAdmin) @@ -868,7 +887,7 @@ func (suite *SessionSendTestSuite) TestDropAndSendDropsQueue() { suite.NoMessageSent() suite.MockApp.On("ToAdmin") - require.Nil(suite.T(), suite.dropAndSend(suite.Logon(), false)) + require.Nil(suite.T(), suite.dropAndSend(suite.Logon())) suite.MockApp.AssertExpectations(suite.T()) msg := suite.MockApp.lastToAdmin @@ -889,7 +908,8 @@ func (suite *SessionSendTestSuite) TestDropAndSendDropsQueueWithReset() { suite.NoMessageSent() suite.MockApp.On("ToAdmin") - require.Nil(suite.T(), suite.dropAndSend(suite.Logon(), true)) + suite.MockStore.Reset() + require.Nil(suite.T(), suite.dropAndSend(suite.Logon())) suite.MockApp.AssertExpectations(suite.T()) msg := suite.MockApp.lastToAdmin From 27b642ee8d3aea0e211d40aa72a59bde7e365c0e Mon Sep 17 00:00:00 2001 From: chris busbey Date: Tue, 15 Jan 2019 15:10:20 -0600 Subject: [PATCH 2/3] initiator sets seq reset flag when configured --- session.go | 9 +++++---- session_state.go | 2 +- session_test.go | 52 ++++++++++++++++++++++++++++++++++++++++-------- 3 files changed, 50 insertions(+), 13 deletions(-) diff --git a/session.go b/session.go index 010760d75..556a06b9c 100644 --- a/session.go +++ b/session.go @@ -138,11 +138,12 @@ func (s *session) shouldSendReset() bool { return false } - return true + return (s.ResetOnLogon || s.ResetOnDisconnect || s.ResetOnLogout) && + s.store.NextTargetMsgSeqNum() == 1 && s.store.NextSenderMsgSeqNum() == 1 } -func (s *session) sendLogon(setResetSeqNum bool) error { - return s.sendLogonInReplyTo(setResetSeqNum, nil) +func (s *session) sendLogon() error { + return s.sendLogonInReplyTo(s.shouldSendReset(), nil) } func (s *session) sendLogonInReplyTo(setResetSeqNum bool, inReplyTo *Message) error { @@ -154,7 +155,7 @@ func (s *session) sendLogonInReplyTo(setResetSeqNum bool, inReplyTo *Message) er logon.Body.SetField(tagEncryptMethod, FIXString("0")) logon.Body.SetField(tagHeartBtInt, FIXInt(s.HeartBtInt.Seconds())) - if s.shouldSendReset() && setResetSeqNum { + if setResetSeqNum { logon.Body.SetField(tagResetSeqNumFlag, FIXBoolean(true)) } diff --git a/session_state.go b/session_state.go index 59b5f9e72..1f076f221 100644 --- a/session_state.go +++ b/session_state.go @@ -37,7 +37,7 @@ func (sm *stateMachine) Connect(session *session) { } session.log.OnEvent("Sending logon request") - if err := session.sendLogon(false); err != nil { + if err := session.sendLogon(); err != nil { session.logError(err) return } diff --git a/session_test.go b/session_test.go index 1f80f9ec3..33c7cf417 100644 --- a/session_test.go +++ b/session_test.go @@ -230,19 +230,55 @@ func (s *SessionSuite) TestCheckTargetTooLow() { func (s *SessionSuite) TestShouldSendReset() { var tests = []struct { - BeginString string - Expected bool + BeginString string + ResetOnLogon bool + ResetOnDisconnect bool + ResetOnLogout bool + NextSenderMsgSeqNum int + NextTargetMsgSeqNum int + Expected bool }{ - {BeginStringFIX40, false}, //ResetSeqNumFlag not available < fix41 - {BeginStringFIX41, true}, - {BeginStringFIX42, true}, - {BeginStringFIX43, true}, - {BeginStringFIX44, true}, - {BeginStringFIXT11, true}, + {BeginStringFIX40, true, false, false, 1, 1, false}, //ResetSeqNumFlag not available < fix41 + + {BeginStringFIX41, true, false, false, 1, 1, true}, //session must be configured to reset on logon + {BeginStringFIX42, true, false, false, 1, 1, true}, + {BeginStringFIX43, true, false, false, 1, 1, true}, + {BeginStringFIX44, true, false, false, 1, 1, true}, + {BeginStringFIXT11, true, false, false, 1, 1, true}, + + {BeginStringFIX41, false, true, false, 1, 1, true}, //or disconnect + {BeginStringFIX42, false, true, false, 1, 1, true}, + {BeginStringFIX43, false, true, false, 1, 1, true}, + {BeginStringFIX44, false, true, false, 1, 1, true}, + {BeginStringFIXT11, false, true, false, 1, 1, true}, + + {BeginStringFIX41, false, false, true, 1, 1, true}, //or logout + {BeginStringFIX42, false, false, true, 1, 1, true}, + {BeginStringFIX43, false, false, true, 1, 1, true}, + {BeginStringFIX44, false, false, true, 1, 1, true}, + {BeginStringFIXT11, false, false, true, 1, 1, true}, + + {BeginStringFIX41, true, true, false, 1, 1, true}, //or combo + {BeginStringFIX42, false, true, true, 1, 1, true}, + {BeginStringFIX43, true, false, true, 1, 1, true}, + {BeginStringFIX44, true, true, true, 1, 1, true}, + + {BeginStringFIX41, false, false, false, 1, 1, false}, //or will not be set + + {BeginStringFIX41, true, false, false, 1, 10, false}, //session seq numbers should be reset at the time of check + {BeginStringFIX42, true, false, false, 2, 1, false}, + {BeginStringFIX43, true, false, false, 14, 100, false}, } for _, test := range tests { s.session.sessionID.BeginString = test.BeginString + s.session.ResetOnLogon = test.ResetOnLogon + s.session.ResetOnDisconnect = test.ResetOnDisconnect + s.session.ResetOnLogout = test.ResetOnLogout + + s.MockStore.SetNextSenderMsgSeqNum(test.NextSenderMsgSeqNum) + s.MockStore.SetNextTargetMsgSeqNum(test.NextTargetMsgSeqNum) + s.Equal(s.shouldSendReset(), test.Expected) } } From c24ccbb11d6f90ff12988977c2361002f0252ea3 Mon Sep 17 00:00:00 2001 From: chris busbey Date: Tue, 15 Jan 2019 15:10:37 -0600 Subject: [PATCH 3/3] linting mongostore --- mongostore.go | 39 ++++++++++++++++++++------------------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/mongostore.go b/mongostore.go index bac1cd50a..76789dbe9 100644 --- a/mongostore.go +++ b/mongostore.go @@ -2,9 +2,10 @@ package quickfix import ( "fmt" + "time" + "github.com/globalsign/mgo" "github.com/globalsign/mgo/bson" - "time" "github.com/quickfixgo/quickfix/config" ) @@ -18,7 +19,7 @@ type mongoStoreFactory struct { type mongoStore struct { sessionID SessionID cache *memoryStore - mongoUrl string + mongoURL string mongoDatabase string db *mgo.Session messagesCollection string @@ -45,7 +46,7 @@ func (f mongoStoreFactory) Create(sessionID SessionID) (msgStore MessageStore, e if !ok { return nil, fmt.Errorf("unknown session: %v", sessionID) } - mongoConnectionUrl, err := sessionSettings.Setting(config.MongoStoreConnection) + mongoConnectionURL, err := sessionSettings.Setting(config.MongoStoreConnection) if err != nil { return nil, err } @@ -53,21 +54,21 @@ func (f mongoStoreFactory) Create(sessionID SessionID) (msgStore MessageStore, e if err != nil { return nil, err } - return newMongoStore(sessionID, mongoConnectionUrl, mongoDatabase, f.messagesCollection, f.sessionsCollection) + return newMongoStore(sessionID, mongoConnectionURL, mongoDatabase, f.messagesCollection, f.sessionsCollection) } -func newMongoStore(sessionID SessionID, mongoUrl string, mongoDatabase string, messagesCollection string, sessionsCollection string) (store *mongoStore, err error) { +func newMongoStore(sessionID SessionID, mongoURL string, mongoDatabase string, messagesCollection string, sessionsCollection string) (store *mongoStore, err error) { store = &mongoStore{ sessionID: sessionID, cache: &memoryStore{}, - mongoUrl: mongoUrl, + mongoURL: mongoURL, mongoDatabase: mongoDatabase, messagesCollection: messagesCollection, sessionsCollection: sessionsCollection, } store.cache.Reset() - if store.db, err = mgo.Dial(mongoUrl); err != nil { + if store.db, err = mgo.Dial(mongoURL); err != nil { return } err = store.populateCache() @@ -79,12 +80,12 @@ func generateMessageFilter(s *SessionID) (messageFilter *mongoQuickFixEntryData) messageFilter = &mongoQuickFixEntryData{ BeginString: s.BeginString, SessionQualifier: s.Qualifier, - SenderCompId: s.SenderCompID, - SenderSubId: s.SenderSubID, - SenderLocId: s.SenderLocationID, - TargetCompId: s.TargetCompID, - TargetSubId: s.TargetSubID, - TargetLocId: s.TargetLocationID, + SenderCompID: s.SenderCompID, + SenderSubID: s.SenderSubID, + SenderLocID: s.SenderLocationID, + TargetCompID: s.TargetCompID, + TargetSubID: s.TargetSubID, + TargetLocID: s.TargetLocationID, } return } @@ -100,12 +101,12 @@ type mongoQuickFixEntryData struct { //Indexed data BeginString string `bson:"begin_string"` SessionQualifier string `bson:"session_qualifier"` - SenderCompId string `bson:"sender_comp_id"` - SenderSubId string `bson:"sender_sub_id"` - SenderLocId string `bson:"sender_loc_id"` - TargetCompId string `bson:"target_comp_id"` - TargetSubId string `bson:"target_sub_id"` - TargetLocId string `bson:"target_loc_id"` + SenderCompID string `bson:"sender_comp_id"` + SenderSubID string `bson:"sender_sub_id"` + SenderLocID string `bson:"sender_loc_id"` + TargetCompID string `bson:"target_comp_id"` + TargetSubID string `bson:"target_sub_id"` + TargetLocID string `bson:"target_loc_id"` } // Reset deletes the store records and sets the seqnums back to 1