From a209fdb1f5d1b26855722ff843583678a58f3d6f Mon Sep 17 00:00:00 2001 From: Chris Busbey Date: Tue, 23 Aug 2016 11:16:45 -0500 Subject: [PATCH 1/2] calls toApp toAdmin on resend and seq reset --- in_session.go | 32 +++++++++---- in_session_test.go | 109 +++++++++++++++++++++++++++++++++++++++++++++ quickfix_test.go | 24 +++++++--- session.go | 9 +--- 4 files changed, 154 insertions(+), 20 deletions(-) diff --git a/in_session.go b/in_session.go index 68d572108..a9ac443f2 100644 --- a/in_session.go +++ b/in_session.go @@ -220,21 +220,31 @@ func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int continue } + if !session.resend(msg) { + nextSeqNum = sentMessageSeqNum + 1 + continue + } + if seqNum != sentMessageSeqNum { - state.generateSequenceReset(session, seqNum, sentMessageSeqNum) + if err = state.generateSequenceReset(session, seqNum, sentMessageSeqNum); err != nil { + return err + } } session.log.OnEventf("Resending Message: %v", sentMessageSeqNum) - if err = session.resend(msg); err != nil { - return + if _, err := msg.Build(); err != nil { + return err } + session.sendBytes(msg.rawMessage) seqNum = sentMessageSeqNum + 1 nextSeqNum = seqNum } if seqNum != nextSeqNum { // gapfill for catch-up - state.generateSequenceReset(session, seqNum, nextSeqNum) + if err = state.generateSequenceReset(session, seqNum, nextSeqNum); err != nil { + return err + } } return @@ -335,7 +345,7 @@ func (state inSession) doTargetTooLow(session *session, msg Message, rej targetT return state } -func (state *inSession) generateSequenceReset(session *session, beginSeqNo int, endSeqNo int) { +func (state *inSession) generateSequenceReset(session *session, beginSeqNo int, endSeqNo int) (err error) { sequenceReset := NewMessage() session.fillDefaultHeader(sequenceReset) @@ -350,9 +360,15 @@ func (state *inSession) generateSequenceReset(session *session, beginSeqNo int, sequenceReset.Header.SetField(tagOrigSendingTime, origSendingTime) } - //FIXME error check? - msgBytes, _ := sequenceReset.Build() - session.sendBytes(msgBytes) + session.application.ToAdmin(sequenceReset, session.sessionID) + msgBytes, err := sequenceReset.Build() + if err != nil { + return + } + + session.sendBytes(msgBytes) session.log.OnEventf("Sent SequenceReset TO: %v", endSeqNo) + + return } diff --git a/in_session_test.go b/in_session_test.go index dde812380..8479431a3 100644 --- a/in_session_test.go +++ b/in_session_test.go @@ -133,3 +133,112 @@ func (s *InSessionTestSuite) TestFIXMsgInTargetTooHigh() { stashedRawMsg, _ := stashedMsg.Build() s.Equal(string(rawMsg), string(stashedRawMsg)) } + +func (s *InSessionTestSuite) TestFIXMsgInResendRequestAllAdminExpectGapFill() { + s.MockApp.On("ToAdmin") + s.session.Timeout(s.session, internal.NeedHeartbeat) + s.LastToAdminMessageSent() + + s.session.Timeout(s.session, internal.NeedHeartbeat) + s.LastToAdminMessageSent() + + s.session.Timeout(s.session, internal.NeedHeartbeat) + s.LastToAdminMessageSent() + + s.MockApp.AssertNumberOfCalls(s.T(), "ToAdmin", 3) + s.NextSenderMsgSeqNum(4) + + s.MockApp.On("FromAdmin").Return(nil) + s.MockApp.On("ToAdmin") + s.fixMsgIn(s.session, s.ResendRequest(1)) + + s.MockApp.AssertExpectations(s.T()) + s.LastToAdminMessageSent() + s.MessageType(enum.MsgType_SEQUENCE_RESET, s.MockApp.lastToAdmin) + s.FieldEquals(tagMsgSeqNum, 1, s.MockApp.lastToAdmin.Header) + s.FieldEquals(tagPossDupFlag, true, s.MockApp.lastToAdmin.Header) + s.FieldEquals(tagNewSeqNo, 4, s.MockApp.lastToAdmin.Body) + s.FieldEquals(tagGapFillFlag, true, s.MockApp.lastToAdmin.Body) + + s.NextSenderMsgSeqNum(4) + s.State(inSession{}) +} + +func (s *InSessionTestSuite) TestFIXMsgInResendRequestAllAdminThenApp() { + s.MockApp.On("ToAdmin") + s.session.Timeout(s.session, internal.NeedHeartbeat) + s.LastToAdminMessageSent() + + s.session.Timeout(s.session, internal.NeedHeartbeat) + s.LastToAdminMessageSent() + + s.MockApp.On("ToApp").Return(nil) + s.session.send(s.NewOrderSingle()) + s.LastToAppMessageSent() + + s.MockApp.AssertNumberOfCalls(s.T(), "ToAdmin", 2) + s.MockApp.AssertNumberOfCalls(s.T(), "ToApp", 1) + s.NextSenderMsgSeqNum(4) + + s.MockApp.On("FromAdmin").Return(nil) + s.MockApp.On("ToAdmin") + s.MockApp.On("ToApp").Return(nil) + s.fixMsgIn(s.session, s.ResendRequest(1)) + + s.MockApp.AssertNumberOfCalls(s.T(), "ToAdmin", 3) + s.MockApp.AssertNumberOfCalls(s.T(), "ToApp", 2) + + s.LastToAdminMessageSent() + s.MessageType(enum.MsgType_SEQUENCE_RESET, s.MockApp.lastToAdmin) + s.FieldEquals(tagMsgSeqNum, 1, s.MockApp.lastToAdmin.Header) + s.FieldEquals(tagPossDupFlag, true, s.MockApp.lastToAdmin.Header) + s.FieldEquals(tagNewSeqNo, 3, s.MockApp.lastToAdmin.Body) + s.FieldEquals(tagGapFillFlag, true, s.MockApp.lastToAdmin.Body) + + s.LastToAppMessageSent() + s.MessageType(enum.MsgType_ORDER_SINGLE, s.MockApp.lastToApp) + s.FieldEquals(tagMsgSeqNum, 3, s.MockApp.lastToApp.Header) + s.FieldEquals(tagPossDupFlag, true, s.MockApp.lastToApp.Header) + + s.NextSenderMsgSeqNum(4) + s.State(inSession{}) +} + +func (s *InSessionTestSuite) TestFIXMsgInResendRequestDoNotSendApp() { + s.MockApp.On("ToAdmin") + s.session.Timeout(s.session, internal.NeedHeartbeat) + s.LastToAdminMessageSent() + + s.MockApp.On("ToApp").Return(nil) + s.session.send(s.NewOrderSingle()) + s.LastToAppMessageSent() + + s.session.Timeout(s.session, internal.NeedHeartbeat) + s.LastToAdminMessageSent() + + s.MockApp.AssertNumberOfCalls(s.T(), "ToAdmin", 2) + s.MockApp.AssertNumberOfCalls(s.T(), "ToApp", 1) + s.NextSenderMsgSeqNum(4) + + //NOTE: a cheat here, need to reset mock + s.MockApp = MockApp{} + s.MockApp.On("FromAdmin").Return(nil) + s.MockApp.On("ToApp").Return(ErrDoNotSend) + s.MockApp.On("ToAdmin") + s.fixMsgIn(s.session, s.ResendRequest(1)) + + s.MockApp.AssertNumberOfCalls(s.T(), "ToAdmin", 1) + s.MockApp.AssertNumberOfCalls(s.T(), "ToApp", 1) + + s.LastToAdminMessageSent() + s.MessageType(enum.MsgType_SEQUENCE_RESET, s.MockApp.lastToAdmin) + s.FieldEquals(tagMsgSeqNum, 1, s.MockApp.lastToAdmin.Header) + s.FieldEquals(tagPossDupFlag, true, s.MockApp.lastToAdmin.Header) + s.FieldEquals(tagNewSeqNo, 4, s.MockApp.lastToAdmin.Body) + s.FieldEquals(tagGapFillFlag, true, s.MockApp.lastToAdmin.Body) + + s.NoMessageSent() + + s.NextSenderMsgSeqNum(4) + s.State(inSession{}) +} diff --git a/quickfix_test.go b/quickfix_test.go index c2cee6b8e..597afb512 100644 --- a/quickfix_test.go +++ b/quickfix_test.go @@ -17,6 +17,7 @@ type KnowsFieldMap interface { Has(Tag) bool GetString(Tag) (string, MessageRejectError) GetInt(Tag) (int, MessageRejectError) + GetField(Tag, FieldValueReader) MessageRejectError } func (s *QuickFIXSuite) MessageType(msgType string, msg Message) { @@ -35,6 +36,11 @@ func (s *QuickFIXSuite) FieldEquals(tag Tag, expectedValue interface{}, fieldMap val, err := fieldMap.GetInt(tag) s.Nil(err) s.Equal(expected, val) + case bool: + var val FIXBoolean + err := fieldMap.GetField(tag, &val) + s.Nil(err) + s.Equal(expected, val.Bool()) default: s.FailNow("Field type not handled") } @@ -122,19 +128,27 @@ func (m *MessageFactory) buildMessage(msgType string) Message { } func (m *MessageFactory) Logout() Message { - return m.buildMessage("5") + return m.buildMessage(enum.MsgType_LOGOUT) } func (m *MessageFactory) NewOrderSingle() Message { - return m.buildMessage("D") + return m.buildMessage(enum.MsgType_ORDER_SINGLE) } func (m *MessageFactory) Heartbeat() Message { - return m.buildMessage("0") + return m.buildMessage(enum.MsgType_HEARTBEAT) } func (m *MessageFactory) Logon() Message { - return m.buildMessage("A") + return m.buildMessage(enum.MsgType_LOGON) +} + +func (m *MessageFactory) ResendRequest(beginSeqNo int) Message { + msg := m.buildMessage(enum.MsgType_RESEND_REQUEST) + msg.Body.SetField(tagBeginSeqNo, FIXInt(beginSeqNo)) + msg.Body.SetField(tagEndSeqNo, FIXInt(0)) + + return msg } type MockSessionReceiver struct { @@ -216,7 +230,7 @@ func (s *SessionSuiteRig) Disconnected() { func (s *SessionSuiteRig) NoMessageSent() { msg, _ := s.Receiver.LastMessage() - s.Nil(msg, "no message should be sent") + s.Nil(msg, "no message should be sent but got %s", msg) } func (s *SessionSuiteRig) NoMessageQueued() { diff --git a/session.go b/session.go index e3efd6bdb..aba7efa5d 100644 --- a/session.go +++ b/session.go @@ -146,7 +146,7 @@ func (s *session) sendLogout(reason string) error { return s.send(logout) } -func (s *session) resend(msg Message) error { +func (s *session) resend(msg Message) bool { msg.Header.SetField(tagPossDupFlag, FIXBoolean(true)) var origSendingTime FIXString @@ -156,12 +156,7 @@ func (s *session) resend(msg Message) error { s.insertSendingTime(msg.Header) - if _, err := msg.Build(); err != nil { - return err - } - s.sendBytes(msg.rawMessage) - - return nil + return s.application.ToApp(msg, s.sessionID) == nil } //queueForSend will validate, persist, and queue the message for send From 7f2fb2d609829267dd04a705de0f8247fb242b6c Mon Sep 17 00:00:00 2001 From: Chris Busbey Date: Tue, 23 Aug 2016 11:51:48 -0500 Subject: [PATCH 2/2] test coverage around sequence resets in resend state --- quickfix_test.go | 7 +++++++ resend_state_test.go | 31 +++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/quickfix_test.go b/quickfix_test.go index 597afb512..7d799a63c 100644 --- a/quickfix_test.go +++ b/quickfix_test.go @@ -151,6 +151,13 @@ func (m *MessageFactory) ResendRequest(beginSeqNo int) Message { return msg } +func (m *MessageFactory) SequenceReset(seqNo int) Message { + msg := m.buildMessage(enum.MsgType_SEQUENCE_RESET) + msg.Body.SetField(tagNewSeqNo, FIXInt(seqNo)) + + return msg +} + type MockSessionReceiver struct { sendChannel chan []byte } diff --git a/resend_state_test.go b/resend_state_test.go index 401d2b274..123130193 100644 --- a/resend_state_test.go +++ b/resend_state_test.go @@ -95,3 +95,34 @@ func (s *resendStateTestSuite) TestFixMsgIn() { s.State(inSession{}) s.NextTargetMsgSeqNum(5) } + +func (s *resendStateTestSuite) TestFixMsgInSequenceReset() { + s.session.State = inSession{} + + //in session expects seq number 1, send too high + s.MessageFactory.SetNextSeqNum(3) + s.MockApp.On("ToAdmin") + + msgSeqNum2 := s.NewOrderSingle() + s.fixMsgIn(s.session, msgSeqNum2) + + s.MockApp.AssertExpectations(s.T()) + s.State(resendState{}) + s.LastToAdminMessageSent() + s.MessageType(enum.MsgType_RESEND_REQUEST, s.MockApp.lastToAdmin) + s.FieldEquals(tagBeginSeqNo, 1, s.MockApp.lastToAdmin.Body) + s.NextTargetMsgSeqNum(1) + + s.MessageFactory.SetNextSeqNum(1) + s.MockApp.On("FromAdmin").Return(nil) + s.fixMsgIn(s.session, s.SequenceReset(2)) + s.NextTargetMsgSeqNum(2) + s.State(resendState{}) + + s.MockApp.On("FromApp").Return(nil) + s.fixMsgIn(s.session, s.NewOrderSingle()) + + s.MockApp.AssertNumberOfCalls(s.T(), "FromApp", 2) + s.NextTargetMsgSeqNum(4) + s.State(inSession{}) +}